Compare commits
3 Commits
main
...
hass-fork-
Author | SHA1 | Date | |
---|---|---|---|
b019185380 | |||
ea982ee5a9 | |||
273199a9a6 |
3
.gitignore
vendored
3
.gitignore
vendored
@ -141,6 +141,3 @@ cython_debug/
|
||||
tags
|
||||
unhacs.txt
|
||||
poetry.lock
|
||||
custom_components/
|
||||
themes/
|
||||
unhacs.yaml
|
||||
|
23
Makefile
23
Makefile
@ -24,7 +24,7 @@ lint: devenv
|
||||
# Runs tests
|
||||
.PHONY: test
|
||||
test: devenv
|
||||
poetry run python -m unittest discover tests --pattern "*_test.py"
|
||||
@echo TODO: poetry run pytest
|
||||
|
||||
# Builds wheel for package to upload
|
||||
.PHONY: build
|
||||
@ -37,27 +37,6 @@ verify-tag-version:
|
||||
$(eval TAG_NAME = $(shell [ -n "$(DRONE_TAG)" ] && echo $(DRONE_TAG) || git describe --tags --exact-match))
|
||||
test "v$(shell poetry version | awk '{print $$2}')" = "$(TAG_NAME)"
|
||||
|
||||
.PHONY: bump-patch
|
||||
bump-patch:
|
||||
$(eval NEW_VERSION = $(shell poetry version patch | awk '{print $$6}'))
|
||||
git add pyproject.toml
|
||||
git commit -m "Bump version to $(NEW_VERSION)"
|
||||
git tag "v$(NEW_VERSION)"
|
||||
|
||||
.PHONY: bump-minor
|
||||
bump-minor:
|
||||
$(eval NEW_VERSION = $(shell poetry version minor | awk '{print $$6}'))
|
||||
git add pyproject.toml
|
||||
git commit -m "Bump version to $(NEW_VERSION)"
|
||||
git tag "v$(NEW_VERSION)"
|
||||
|
||||
.PHONY: bump-major
|
||||
bump-major:
|
||||
$(eval NEW_VERSION = $(shell poetry version major | awk '{print $$6}'))
|
||||
git add pyproject.toml
|
||||
git commit -m "Bump version to $(NEW_VERSION)"
|
||||
git tag "v$(NEW_VERSION)"
|
||||
|
||||
# Upload to pypi
|
||||
.PHONY: upload
|
||||
upload: verify-tag-version build
|
||||
|
@ -38,14 +38,6 @@ If you already have a list of packages in a file, you can add them all at once u
|
||||
unhacs add --file <file_path>
|
||||
```
|
||||
|
||||
### Add a component from a forked Home Assistant Core repository
|
||||
|
||||
To add a component from a fork of home-assistant/core, use the `--forked-component` flag followed by the URL of the forked repository and then specify the branch with the `--branch` flag:
|
||||
|
||||
```bash
|
||||
unhacs add --forked-component <forked_repo_url> --branch <branch>
|
||||
```
|
||||
|
||||
### List packages
|
||||
|
||||
To list all installed packages, use the `list` command:
|
||||
|
@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"
|
||||
|
||||
[tool.poetry]
|
||||
name = "unhacs"
|
||||
version = "0.7.1"
|
||||
version = "0.5.2"
|
||||
description = "Command line interface to install Home Assistant Community Store packages"
|
||||
authors = ["Ian Fijolek <ian@iamthefij.com>"]
|
||||
license = "MIT"
|
||||
@ -12,16 +12,14 @@ readme = "README.md"
|
||||
|
||||
[tool.poetry.dependencies]
|
||||
python = "^3.11"
|
||||
requests = "^2.32.0"
|
||||
pyyaml = "^6.0.0"
|
||||
requests = "^2.32.3"
|
||||
pyyaml = "^6.0.1"
|
||||
|
||||
[tool.poetry.group.dev.dependencies]
|
||||
black = "^24.4.2"
|
||||
isort = "^5.13.2"
|
||||
mypy = "^1.10.0"
|
||||
pre-commit = "^3.7.1"
|
||||
types-requests = "^2.32.0"
|
||||
types-pyyaml = "^6.0.0"
|
||||
types-requests = "^2.32.0.20240602"
|
||||
|
||||
[tool.poetry.scripts]
|
||||
unhacs = 'unhacs.main:main'
|
||||
|
@ -1,260 +0,0 @@
|
||||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
|
||||
from unhacs.main import main
|
||||
from unhacs.packages import get_installed_packages
|
||||
from unhacs.packages import read_lock_packages
|
||||
|
||||
INTEGRATION_URL = "https://github.com/simbaja/ha_gehome"
|
||||
INTEGRATION_VERSION = "v0.6.9"
|
||||
|
||||
PLUGIN_URL = "https://github.com/kalkih/mini-media-player"
|
||||
PLUGIN_VERSION = "v1.16.8"
|
||||
|
||||
THEME_URL = "https://github.com/basnijholt/lovelace-ios-themes"
|
||||
THEME_VERSION = "v3.0.1"
|
||||
|
||||
FORK_URL = "https://github.com/ViViDboarder/home-assistant"
|
||||
FORK_BRANCH = "dev"
|
||||
FORK_COMPONENT = "nextbus"
|
||||
FORK_VERSION = "3b2893f2f4e16f9a05d9cc4a7ba9f31984c841be"
|
||||
|
||||
|
||||
class TestMainIntegrarion(unittest.TestCase):
|
||||
test_dir: str
|
||||
|
||||
def setUp(self):
|
||||
self.test_dir = tempfile.mkdtemp()
|
||||
os.chdir(self.test_dir)
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.test_dir)
|
||||
pass
|
||||
|
||||
def run_itest(
|
||||
self,
|
||||
test_name: str,
|
||||
command: str,
|
||||
expected_files: list[str] | None = None,
|
||||
expect_missing_files: list[str] | None = None,
|
||||
expected_code: int = 0,
|
||||
):
|
||||
with self.subTest(test_name, command=command):
|
||||
self.assertEqual(main(command.split()), expected_code)
|
||||
|
||||
# Verify that the package was installed by checking the filesystem
|
||||
if expected_files:
|
||||
expected_files = [
|
||||
os.path.join(self.test_dir, file) for file in expected_files
|
||||
]
|
||||
missing_files = [
|
||||
file for file in expected_files if not os.path.exists(file)
|
||||
]
|
||||
if missing_files:
|
||||
self.fail(f"Missing files: {missing_files}")
|
||||
|
||||
if expect_missing_files:
|
||||
expect_missing_files = [
|
||||
os.path.join(self.test_dir, file) for file in expect_missing_files
|
||||
]
|
||||
existing_files = [
|
||||
file for file in expect_missing_files if os.path.exists(file)
|
||||
]
|
||||
if existing_files:
|
||||
self.fail(f"Files should not exist: {existing_files}")
|
||||
|
||||
def test_integration(self):
|
||||
self.run_itest(
|
||||
"Add integration",
|
||||
f"add {INTEGRATION_URL} --version {INTEGRATION_VERSION}",
|
||||
expected_files=[
|
||||
"custom_components/ge_home/__init__.py",
|
||||
"custom_components/ge_home/manifest.json",
|
||||
"custom_components/ge_home/switch.py",
|
||||
],
|
||||
)
|
||||
|
||||
self.run_itest(
|
||||
"List installed packages",
|
||||
"list",
|
||||
)
|
||||
installed = get_installed_packages()
|
||||
self.assertEqual(len(installed), 1)
|
||||
self.assertEqual(installed[0].url, INTEGRATION_URL)
|
||||
self.assertEqual(installed[0].version, INTEGRATION_VERSION)
|
||||
|
||||
self.run_itest(
|
||||
"Double add",
|
||||
f"add {INTEGRATION_URL}",
|
||||
expected_code=1,
|
||||
)
|
||||
|
||||
self.run_itest(
|
||||
"Upgrade to latest version",
|
||||
"upgrade ha_gehome --yes",
|
||||
expected_files=[
|
||||
"custom_components/ge_home/__init__.py",
|
||||
"custom_components/ge_home/manifest.json",
|
||||
"custom_components/ge_home/switch.py",
|
||||
],
|
||||
)
|
||||
installed = get_installed_packages()
|
||||
self.assertEqual(len(installed), 1)
|
||||
self.assertEqual(installed[0].url, INTEGRATION_URL)
|
||||
self.assertNotEqual(installed[0].version, INTEGRATION_VERSION)
|
||||
|
||||
self.run_itest(
|
||||
"Downgrade integration",
|
||||
f"add {INTEGRATION_URL} --version {INTEGRATION_VERSION} --update",
|
||||
expected_files=[
|
||||
"custom_components/ge_home/__init__.py",
|
||||
"custom_components/ge_home/manifest.json",
|
||||
"custom_components/ge_home/switch.py",
|
||||
],
|
||||
)
|
||||
|
||||
self.run_itest(
|
||||
"List installed packages",
|
||||
"list",
|
||||
)
|
||||
installed = get_installed_packages()
|
||||
self.assertEqual(len(installed), 1)
|
||||
self.assertEqual(installed[0].url, INTEGRATION_URL)
|
||||
self.assertEqual(installed[0].version, INTEGRATION_VERSION)
|
||||
|
||||
# Delete the custom_components folder and re-install the integration using the lock file
|
||||
shutil.rmtree(os.path.join(self.test_dir, "custom_components"))
|
||||
self.run_itest(
|
||||
"Re-install integration using lock file",
|
||||
"add --file unhacs.yaml",
|
||||
expected_files=[
|
||||
"custom_components/ge_home/__init__.py",
|
||||
"custom_components/ge_home/manifest.json",
|
||||
"custom_components/ge_home/switch.py",
|
||||
],
|
||||
)
|
||||
|
||||
# Delete the lock file and then regenerate it
|
||||
os.remove(os.path.join(self.test_dir, "unhacs.yaml"))
|
||||
self.run_itest(
|
||||
"Regenerate lock file",
|
||||
"list --freeze",
|
||||
expected_files=[
|
||||
"unhacs.yaml",
|
||||
],
|
||||
)
|
||||
|
||||
self.assertGreater(len(read_lock_packages()), 0)
|
||||
|
||||
self.run_itest(
|
||||
"Remove integration",
|
||||
"remove ha_gehome --yes",
|
||||
expect_missing_files=[
|
||||
"custom_components/ge_home/__init__.py",
|
||||
"custom_components/ge_home/manifest.json",
|
||||
"custom_components/ge_home/switch.py",
|
||||
],
|
||||
)
|
||||
|
||||
installed = get_installed_packages()
|
||||
self.assertEqual(len(installed), 0)
|
||||
|
||||
def test_plugin(self):
|
||||
self.run_itest(
|
||||
"Add plugin",
|
||||
f"add --plugin {PLUGIN_URL} --version {PLUGIN_VERSION}",
|
||||
expected_files=[
|
||||
"www/js/mini-media-player-bundle.js",
|
||||
],
|
||||
)
|
||||
|
||||
self.run_itest(
|
||||
"List installed packages",
|
||||
"list",
|
||||
)
|
||||
installed = get_installed_packages()
|
||||
self.assertEqual(len(installed), 1)
|
||||
self.assertEqual(installed[0].url, PLUGIN_URL)
|
||||
self.assertEqual(installed[0].version, PLUGIN_VERSION)
|
||||
|
||||
self.run_itest(
|
||||
"Remove plugin",
|
||||
"remove mini-media-player --yes",
|
||||
expect_missing_files=[
|
||||
"www/js/mini-media-player-bundle.js",
|
||||
],
|
||||
)
|
||||
|
||||
installed = get_installed_packages()
|
||||
self.assertEqual(len(installed), 0)
|
||||
|
||||
def test_theme(self):
|
||||
self.run_itest(
|
||||
"Add theme",
|
||||
f"add --theme {THEME_URL} --version {THEME_VERSION}",
|
||||
expected_files=[
|
||||
"themes/ios-themes.yaml",
|
||||
],
|
||||
)
|
||||
|
||||
self.run_itest(
|
||||
"List installed packages",
|
||||
"list",
|
||||
)
|
||||
installed = get_installed_packages()
|
||||
self.assertEqual(len(installed), 1)
|
||||
self.assertEqual(installed[0].url, THEME_URL)
|
||||
self.assertEqual(installed[0].version, THEME_VERSION)
|
||||
|
||||
self.run_itest(
|
||||
"Remove theme",
|
||||
"remove lovelace-ios-themes --yes",
|
||||
expect_missing_files=[
|
||||
"themes/ios-themes.yaml",
|
||||
],
|
||||
)
|
||||
|
||||
installed = get_installed_packages()
|
||||
self.assertEqual(len(installed), 0)
|
||||
|
||||
def test_fork(self):
|
||||
self.run_itest(
|
||||
"Add fork",
|
||||
f"add {FORK_URL} --fork-component {FORK_COMPONENT} --fork-branch {FORK_BRANCH} --version {FORK_VERSION}",
|
||||
expected_files=[
|
||||
"custom_components/nextbus/__init__.py",
|
||||
"custom_components/nextbus/manifest.json",
|
||||
"custom_components/nextbus/sensor.py",
|
||||
"custom_components/nextbus/unhacs.yaml",
|
||||
],
|
||||
)
|
||||
|
||||
self.run_itest(
|
||||
"List installed packages",
|
||||
"list",
|
||||
)
|
||||
installed = get_installed_packages()
|
||||
self.assertEqual(len(installed), 1)
|
||||
self.assertEqual(installed[0].url, FORK_URL)
|
||||
self.assertEqual(installed[0].version, FORK_VERSION)
|
||||
|
||||
self.run_itest(
|
||||
"Remove fork",
|
||||
f"remove {FORK_URL} --yes",
|
||||
expect_missing_files=[
|
||||
"custom_components/nextbus/__init__.py",
|
||||
"custom_components/nextbus/manifest.json",
|
||||
"custom_components/nextbus/sensor.py",
|
||||
"custom_components/nextbus/unhacs.yaml",
|
||||
],
|
||||
)
|
||||
|
||||
installed = get_installed_packages()
|
||||
self.assertEqual(len(installed), 0)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
@ -1,4 +1,4 @@
|
||||
from unhacs.main import main
|
||||
|
||||
if __name__ == "__main__":
|
||||
exit(main())
|
||||
main()
|
||||
|
@ -79,7 +79,7 @@ def get_latest_sha(repository_url: str, branch_name: str) -> str:
|
||||
|
||||
for line in result.stdout.decode().split("\n"):
|
||||
if line:
|
||||
return line.partition("\t")[0]
|
||||
return line.partition(" ")[0]
|
||||
|
||||
raise ValueError(f"branch name '{branch_name}' not found for {repository_url}")
|
||||
|
||||
@ -87,10 +87,7 @@ def get_latest_sha(repository_url: str, branch_name: str) -> str:
|
||||
def get_tag_zip(repository_url: str, tag_name: str) -> str:
|
||||
return f"{repository_url}/archive/refs/tags/{tag_name}.zip"
|
||||
|
||||
|
||||
def get_branch_zip(repository_url: str, branch_name: str) -> str:
|
||||
return f"{repository_url}/archive/{branch_name}.zip"
|
||||
|
||||
|
||||
def get_sha_zip(repository_url: str, sha: str) -> str:
|
||||
return f"{repository_url}/archive/{sha}.zip"
|
||||
|
||||
|
214
unhacs/main.py
214
unhacs/main.py
@ -1,30 +1,18 @@
|
||||
import sys
|
||||
from argparse import ArgumentParser
|
||||
from collections.abc import Iterable
|
||||
from pathlib import Path
|
||||
|
||||
from unhacs.git import get_repo_tags
|
||||
from unhacs.packages import DEFAULT_HASS_CONFIG_PATH
|
||||
from unhacs.packages import DEFAULT_PACKAGE_FILE
|
||||
from unhacs.packages import Package
|
||||
from unhacs.packages import PackageType
|
||||
from unhacs.packages import get_installed_packages
|
||||
from unhacs.packages import read_lock_packages
|
||||
from unhacs.packages import write_lock_packages
|
||||
from unhacs.packages.fork import Fork
|
||||
from unhacs.packages.integration import Integration
|
||||
from unhacs.packages.plugin import Plugin
|
||||
from unhacs.packages.theme import Theme
|
||||
from unhacs.utils import DEFAULT_HASS_CONFIG_PATH
|
||||
from unhacs.utils import DEFAULT_PACKAGE_FILE
|
||||
|
||||
|
||||
class InvalidArgumentsError(ValueError):
|
||||
pass
|
||||
|
||||
|
||||
class DuplicatePackageError(ValueError):
|
||||
pass
|
||||
|
||||
|
||||
def parse_args(argv: list[str]):
|
||||
def create_parser():
|
||||
parser = ArgumentParser(
|
||||
description="Unhacs - Command line interface for the Home Assistant Community Store"
|
||||
)
|
||||
@ -54,12 +42,6 @@ def parse_args(argv: list[str]):
|
||||
# List installed packages
|
||||
list_parser = subparsers.add_parser("list", description="List installed packages.")
|
||||
list_parser.add_argument("--verbose", "-v", action="store_true")
|
||||
list_parser.add_argument(
|
||||
"--freeze",
|
||||
"-f",
|
||||
action="store_true",
|
||||
help="Regenerate unhacs.yaml with installed packages.",
|
||||
)
|
||||
|
||||
# List git tags for a given package
|
||||
list_tags_parser = subparsers.add_parser("tags", help="List tags for a package.")
|
||||
@ -84,35 +66,11 @@ def parse_args(argv: list[str]):
|
||||
"--integration",
|
||||
action="store_const",
|
||||
dest="type",
|
||||
const=Integration,
|
||||
default=Integration,
|
||||
help="The package is an integration.",
|
||||
const=PackageType.INTEGRATION,
|
||||
default=PackageType.INTEGRATION,
|
||||
)
|
||||
package_type_group.add_argument(
|
||||
"--plugin",
|
||||
action="store_const",
|
||||
dest="type",
|
||||
const=Plugin,
|
||||
help="The package is a JavaScript plugin.",
|
||||
)
|
||||
package_type_group.add_argument(
|
||||
"--theme",
|
||||
action="store_const",
|
||||
dest="type",
|
||||
const=Theme,
|
||||
help="The package is a theme.",
|
||||
)
|
||||
package_type_group.add_argument(
|
||||
"--fork-component",
|
||||
type=str,
|
||||
help="Name of component from forked core repo.",
|
||||
)
|
||||
# Additional arguments for forked packages
|
||||
add_parser.add_argument(
|
||||
"--fork-branch",
|
||||
"-b",
|
||||
type=str,
|
||||
help="Name of branch of forked core repo. (Only for forked components.)",
|
||||
"--plugin", action="store_const", dest="type", const=PackageType.PLUGIN
|
||||
)
|
||||
|
||||
add_parser.add_argument(
|
||||
@ -135,34 +93,15 @@ def parse_args(argv: list[str]):
|
||||
remove_parser = subparsers.add_parser(
|
||||
"remove", description="Remove installed packages."
|
||||
)
|
||||
remove_parser.add_argument(
|
||||
"--yes", "-y", action="store_true", help="Do not prompt for confirmation."
|
||||
)
|
||||
remove_parser.add_argument("packages", nargs="+")
|
||||
|
||||
# Upgrade packages
|
||||
update_parser = subparsers.add_parser(
|
||||
"upgrade", description="Upgrade installed packages."
|
||||
)
|
||||
update_parser.add_argument(
|
||||
"--yes", "-y", action="store_true", help="Do not prompt for confirmation."
|
||||
)
|
||||
update_parser.add_argument("packages", nargs="*")
|
||||
|
||||
args = parser.parse_args(argv)
|
||||
|
||||
if args.subcommand == "add":
|
||||
# Component implies forked package
|
||||
if args.fork_component and args.type != Fork:
|
||||
args.type = Fork
|
||||
|
||||
# Branch is only valid for forked packages
|
||||
if args.type != Fork and args.fork_branch:
|
||||
raise InvalidArgumentsError(
|
||||
"Branch and component can only be used with forked packages"
|
||||
)
|
||||
|
||||
return args
|
||||
return parser
|
||||
|
||||
|
||||
class Unhacs:
|
||||
@ -182,26 +121,36 @@ class Unhacs:
|
||||
|
||||
def add_package(
|
||||
self,
|
||||
package: Package,
|
||||
package_url: str,
|
||||
version: str | None = None,
|
||||
update: bool = False,
|
||||
package_type: PackageType = PackageType.INTEGRATION,
|
||||
ignore_versions: set[str] | None = None,
|
||||
):
|
||||
"""Install and add a package to the lock or install a specific version."""
|
||||
package = Package(
|
||||
package_url,
|
||||
version=version,
|
||||
package_type=package_type,
|
||||
ignored_versions=ignore_versions,
|
||||
)
|
||||
packages = self.read_lock_packages()
|
||||
|
||||
# Raise an error if the package is already in the list
|
||||
if existing_package := next((p for p in packages if p.same(package)), None):
|
||||
existing_package = next((p for p in packages if p.url == package.url), None)
|
||||
if existing_package:
|
||||
if update:
|
||||
# Remove old version of the package
|
||||
packages = [p for p in packages if p == existing_package]
|
||||
packages = [p for p in packages if p.url != package.url]
|
||||
else:
|
||||
raise DuplicatePackageError("Package already exists in the list")
|
||||
raise ValueError("Package already exists in the list")
|
||||
|
||||
package.install(self.hass_config)
|
||||
|
||||
packages.append(package)
|
||||
self.write_lock_packages(packages)
|
||||
|
||||
def upgrade_packages(self, package_names: list[str], yes: bool = False):
|
||||
def upgrade_packages(self, package_names: list[str]):
|
||||
"""Uograde to latest version of packages and update lock."""
|
||||
installed_packages: Iterable[Package]
|
||||
|
||||
@ -225,60 +174,35 @@ class Unhacs:
|
||||
)
|
||||
outdated_packages.append(latest_package)
|
||||
|
||||
confirmed = yes or input("Upgrade all packages? (y/N) ").lower() == "y"
|
||||
if outdated_packages and not confirmed:
|
||||
if outdated_packages and input("Upgrade all packages? (y/N) ").lower() != "y":
|
||||
return
|
||||
|
||||
for installed_package in outdated_packages:
|
||||
installed_package.install(self.hass_config)
|
||||
|
||||
# Update lock file to latest now that we know they are uograded
|
||||
latest_lookup = {p: p for p in latest_packages}
|
||||
packages = [latest_lookup.get(p, p) for p in self.read_lock_packages()]
|
||||
latest_lookup = {p.url: p for p in latest_packages}
|
||||
packages = [latest_lookup.get(p.url, p) for p in self.read_lock_packages()]
|
||||
|
||||
self.write_lock_packages(packages)
|
||||
|
||||
def list_packages(self, verbose: bool = False, freeze: bool = False):
|
||||
def list_packages(self, verbose: bool = False):
|
||||
"""List installed packages and their versions."""
|
||||
installed_packages = get_installed_packages()
|
||||
for package in installed_packages:
|
||||
for package in get_installed_packages():
|
||||
print(package.verbose_str() if verbose else str(package))
|
||||
|
||||
if freeze:
|
||||
self.write_lock_packages(installed_packages)
|
||||
|
||||
def list_tags(self, url: str, limit: int = 10):
|
||||
print(f"Tags for {url}:")
|
||||
for tag in get_repo_tags(url)[-1 * limit :]:
|
||||
print(tag)
|
||||
|
||||
def remove_packages(self, package_names: list[str], yes: bool = False):
|
||||
def remove_packages(self, package_names: list[str]):
|
||||
"""Remove installed packages and uodate lock."""
|
||||
packages_to_remove = [
|
||||
package
|
||||
for package in get_installed_packages()
|
||||
if (
|
||||
package.name in package_names
|
||||
or package.url in package_names
|
||||
or (
|
||||
hasattr(package, "fork_component")
|
||||
and getattr(package, "fork_component") in package_names
|
||||
)
|
||||
)
|
||||
if (package.name in package_names or package.url in package_names)
|
||||
]
|
||||
|
||||
if package_names and not packages_to_remove:
|
||||
print("No packages found to remove")
|
||||
return
|
||||
|
||||
print("Packages to remove:")
|
||||
for package in packages_to_remove:
|
||||
print(package)
|
||||
|
||||
confirmed = yes or input("Remove listed packages? (y/N) ").lower() == "y"
|
||||
if packages_to_remove and not confirmed:
|
||||
return
|
||||
|
||||
remaining_packages = [
|
||||
package
|
||||
for package in self.read_lock_packages()
|
||||
@ -291,36 +215,10 @@ class Unhacs:
|
||||
self.write_lock_packages(remaining_packages)
|
||||
|
||||
|
||||
def args_to_package(args) -> Package:
|
||||
ignore_versions = (
|
||||
{version for version in args.ignore_versions.split(",")}
|
||||
if args.ignore_versions
|
||||
else None
|
||||
)
|
||||
|
||||
if args.type == Fork:
|
||||
if not args.fork_branch:
|
||||
raise InvalidArgumentsError(
|
||||
"A branch must be provided for forked components"
|
||||
)
|
||||
if not args.fork_component:
|
||||
raise InvalidArgumentsError(
|
||||
"A component must be provided for forked components"
|
||||
)
|
||||
|
||||
return Fork(
|
||||
args.url,
|
||||
branch_name=args.fork_branch,
|
||||
fork_component=args.fork_component,
|
||||
version=args.version,
|
||||
ignored_versions=ignore_versions,
|
||||
)
|
||||
|
||||
return args.type(args.url, version=args.version, ignored_versions=ignore_versions)
|
||||
|
||||
|
||||
def main(argv: list[str] | None = None) -> int:
|
||||
args = parse_args(argv or sys.argv[1:])
|
||||
def main():
|
||||
# If the sub command is add package, it should pass the parsed arguments to the add_package function and return
|
||||
parser = create_parser()
|
||||
args = parser.parse_args()
|
||||
|
||||
unhacs = Unhacs(args.config, args.package_file)
|
||||
Package.git_tags = args.git_tags
|
||||
@ -331,40 +229,38 @@ def main(argv: list[str] | None = None) -> int:
|
||||
packages = read_lock_packages(args.file)
|
||||
for package in packages:
|
||||
unhacs.add_package(
|
||||
package,
|
||||
package.url,
|
||||
package.version,
|
||||
update=True,
|
||||
package_type=package.package_type,
|
||||
ignore_versions=package.ignored_versions,
|
||||
)
|
||||
elif args.url:
|
||||
try:
|
||||
new_package = args_to_package(args)
|
||||
except InvalidArgumentsError as e:
|
||||
print(e)
|
||||
return 1
|
||||
try:
|
||||
unhacs.add_package(
|
||||
new_package,
|
||||
update=args.update,
|
||||
)
|
||||
except DuplicatePackageError as e:
|
||||
print(e)
|
||||
return 1
|
||||
unhacs.add_package(
|
||||
args.url,
|
||||
version=args.version,
|
||||
update=args.update,
|
||||
package_type=args.type,
|
||||
ignore_versions=(
|
||||
{version for version in args.ignore_versions.split(",")}
|
||||
if args.ignore_versions
|
||||
else None
|
||||
),
|
||||
)
|
||||
else:
|
||||
print("Either a file or a URL must be provided")
|
||||
return 1
|
||||
raise ValueError("Either a file or a URL must be provided")
|
||||
elif args.subcommand == "list":
|
||||
unhacs.list_packages(args.verbose, args.freeze)
|
||||
unhacs.list_packages(args.verbose)
|
||||
elif args.subcommand == "tags":
|
||||
unhacs.list_tags(args.url, limit=args.limit)
|
||||
elif args.subcommand == "remove":
|
||||
unhacs.remove_packages(args.packages, yes=args.yes)
|
||||
unhacs.remove_packages(args.packages)
|
||||
elif args.subcommand == "upgrade":
|
||||
unhacs.upgrade_packages(args.packages, yes=args.yes)
|
||||
unhacs.upgrade_packages(args.packages)
|
||||
else:
|
||||
print(f"Command {args.subcommand} is not implemented")
|
||||
return 1
|
||||
|
||||
return 0
|
||||
exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
exit(main())
|
||||
main()
|
||||
|
406
unhacs/packages.py
Normal file
406
unhacs/packages.py
Normal file
@ -0,0 +1,406 @@
|
||||
import json
|
||||
import shutil
|
||||
import tempfile
|
||||
from collections.abc import Generator
|
||||
from collections.abc import Iterable
|
||||
from enum import StrEnum
|
||||
from enum import auto
|
||||
from io import BytesIO
|
||||
from pathlib import Path
|
||||
from typing import cast
|
||||
from zipfile import ZipFile
|
||||
|
||||
import requests
|
||||
import yaml
|
||||
|
||||
from unhacs.git import get_branch_zip
|
||||
from unhacs.git import get_latest_sha
|
||||
from unhacs.git import get_repo_tags
|
||||
from unhacs.git import get_tag_zip
|
||||
|
||||
DEFAULT_HASS_CONFIG_PATH: Path = Path(".")
|
||||
DEFAULT_PACKAGE_FILE = Path("unhacs.yaml")
|
||||
|
||||
|
||||
def extract_zip(zip_file: ZipFile, dest_dir: Path):
|
||||
for info in zip_file.infolist():
|
||||
if info.is_dir():
|
||||
continue
|
||||
file = Path(info.filename)
|
||||
# Strip top directory from path
|
||||
file = Path(*file.parts[1:])
|
||||
path = dest_dir / file
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with zip_file.open(info) as source, open(path, "wb") as dest:
|
||||
dest.write(source.read())
|
||||
|
||||
|
||||
class PackageType(StrEnum):
|
||||
INTEGRATION = auto()
|
||||
PLUGIN = auto()
|
||||
FORK = auto()
|
||||
|
||||
|
||||
class Package:
|
||||
git_tags = False
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
url: str,
|
||||
version: str | None = None,
|
||||
package_type: PackageType = PackageType.INTEGRATION,
|
||||
ignored_versions: set[str] | None = None,
|
||||
branch_name: str | None = None,
|
||||
fork_component: str | None = None,
|
||||
):
|
||||
if package_type == PackageType.FORK and not fork_component:
|
||||
raise ValueError(f"Fork with no component specified {url}@{branch_name}")
|
||||
|
||||
self.url = url
|
||||
self.package_type = package_type
|
||||
self.fork_component = fork_component
|
||||
self.ignored_versions = ignored_versions or set()
|
||||
self.branch_name = branch_name
|
||||
|
||||
parts = self.url.split("/")
|
||||
self.owner = parts[-2]
|
||||
self.name = parts[-1]
|
||||
|
||||
self.path: Path | None = None
|
||||
|
||||
if not version:
|
||||
self.version = self.fetch_version_release()
|
||||
else:
|
||||
self.version = version
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.name} {self.version}"
|
||||
|
||||
def __eq__(self, other):
|
||||
return self.url == other.url and self.version == other.version
|
||||
|
||||
def verbose_str(self):
|
||||
return f"{self.name} {self.version} ({self.url})"
|
||||
|
||||
@staticmethod
|
||||
def from_yaml(yaml: dict) -> "Package":
|
||||
# Convert package_type to enum
|
||||
package_type = yaml.pop("package_type", None)
|
||||
if package_type and isinstance(package_type, str):
|
||||
package_type = PackageType(package_type)
|
||||
yaml["package_type"] = package_type
|
||||
|
||||
return Package(**yaml)
|
||||
|
||||
def to_yaml(self: "Package") -> dict:
|
||||
data = {
|
||||
"url": self.url,
|
||||
"version": self.version,
|
||||
"package_type": str(self.package_type),
|
||||
}
|
||||
|
||||
if self.branch_name:
|
||||
data["branch_name"] = self.branch_name
|
||||
|
||||
return data
|
||||
|
||||
def add_ignored_version(self, version: str):
|
||||
self.ignored_versions.add(version)
|
||||
|
||||
def _fetch_version_release_releases(self, version: str | None = None) -> str:
|
||||
# Fetch the releases from the GitHub API
|
||||
response = requests.get(
|
||||
f"https://api.github.com/repos/{self.owner}/{self.name}/releases"
|
||||
)
|
||||
response.raise_for_status()
|
||||
releases = response.json()
|
||||
|
||||
if not releases:
|
||||
raise ValueError(f"No releases found for package {self.name}")
|
||||
|
||||
# Default to latest
|
||||
desired_release = releases[0]
|
||||
|
||||
# If a version is provided, check if it exists in the releases
|
||||
if version:
|
||||
for release in releases:
|
||||
if release["tag_name"] == version:
|
||||
desired_release = release
|
||||
break
|
||||
else:
|
||||
raise ValueError(f"Version {version} does not exist for this package")
|
||||
|
||||
return cast(str, desired_release["tag_name"])
|
||||
|
||||
def _fetch_version_release_git(self, version: str | None = None) -> str:
|
||||
tags = get_repo_tags(self.url)
|
||||
if not tags:
|
||||
raise ValueError(f"No tags found for package {self.name}")
|
||||
if version and version not in tags:
|
||||
raise ValueError(f"Version {version} does not exist for this package")
|
||||
|
||||
tags = [tag for tag in tags if tag not in self.ignored_versions]
|
||||
if not version:
|
||||
version = tags[-1]
|
||||
|
||||
return version
|
||||
|
||||
def _fetch_latest_sha(self, branch_name: str) -> str:
|
||||
return get_latest_sha(self.url, branch_name)
|
||||
|
||||
def fetch_version_release(self, version: str | None = None) -> str:
|
||||
if self.branch_name:
|
||||
return self._fetch_latest_sha(self.branch_name)
|
||||
elif self.git_tags:
|
||||
return self._fetch_version_release_git(version)
|
||||
else:
|
||||
return self._fetch_version_release_releases(version)
|
||||
|
||||
def fetch_versions(self) -> list[str]:
|
||||
return get_repo_tags(self.url)
|
||||
|
||||
def get_hacs_json(self, version: str | None = None) -> dict:
|
||||
"""Fetches the hacs.json file for the package."""
|
||||
version = version or self.version
|
||||
response = requests.get(
|
||||
f"https://raw.githubusercontent.com/{self.owner}/{self.name}/{version}/hacs.json"
|
||||
)
|
||||
|
||||
if response.status_code == 404:
|
||||
return {}
|
||||
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
def install_plugin(self, hass_config_path: Path):
|
||||
"""Installs the plugin package."""
|
||||
|
||||
valid_filenames: Iterable[str]
|
||||
if filename := self.get_hacs_json().get("filename"):
|
||||
valid_filenames = (cast(str, filename),)
|
||||
else:
|
||||
valid_filenames = (
|
||||
f"{self.name.removeprefix('lovelace-')}.js",
|
||||
f"{self.name}.js",
|
||||
f"{self.name}-umd.js",
|
||||
f"{self.name}-bundle.js",
|
||||
)
|
||||
|
||||
def real_get(filename) -> requests.Response | None:
|
||||
urls = [
|
||||
f"https://raw.githubusercontent.com/{self.owner}/{self.version}/dist/{filename}",
|
||||
f"https://github.com/{self.owner}/{self.name}/releases/download/{self.version}/{filename}",
|
||||
f"https://raw.githubusercontent.com/{self.owner}/{self.version}/{filename}",
|
||||
]
|
||||
|
||||
for url in urls:
|
||||
plugin = requests.get(url)
|
||||
|
||||
if int(plugin.status_code / 100) == 4:
|
||||
continue
|
||||
|
||||
plugin.raise_for_status()
|
||||
|
||||
return plugin
|
||||
|
||||
return None
|
||||
|
||||
for filename in valid_filenames:
|
||||
plugin = real_get(filename)
|
||||
if plugin:
|
||||
break
|
||||
else:
|
||||
raise ValueError(f"No valid filename found for package {self.name}")
|
||||
|
||||
js_path = hass_config_path / "www" / "js"
|
||||
js_path.mkdir(parents=True, exist_ok=True)
|
||||
js_path.joinpath(filename).write_text(plugin.text)
|
||||
|
||||
yaml.dump(self.to_yaml(), js_path.joinpath(f"{filename}-unhacs.yaml").open("w"))
|
||||
|
||||
# Write to resources
|
||||
resources: list[dict] = []
|
||||
resources_file = hass_config_path / "resources.yaml"
|
||||
if resources_file.exists():
|
||||
resources = yaml.safe_load(resources_file.open()) or []
|
||||
|
||||
if not any(r["url"] == f"/local/js/{filename}" for r in resources):
|
||||
resources.append(
|
||||
{
|
||||
"url": f"/local/js/{filename}",
|
||||
"type": "module",
|
||||
}
|
||||
)
|
||||
|
||||
yaml.dump(resources, resources_file.open("w"))
|
||||
|
||||
def install_integration(self, hass_config_path: Path):
|
||||
"""Installs the integration package."""
|
||||
zipball_url = get_tag_zip(self.url, self.version)
|
||||
response = requests.get(zipball_url)
|
||||
response.raise_for_status()
|
||||
|
||||
with tempfile.TemporaryDirectory(prefix="unhacs-") as tempdir:
|
||||
tmpdir = Path(tempdir)
|
||||
extract_zip(ZipFile(BytesIO(response.content)), tmpdir)
|
||||
|
||||
source, dest = None, None
|
||||
for custom_component in tmpdir.glob("custom_components/*"):
|
||||
source = custom_component
|
||||
dest = hass_config_path / "custom_components" / custom_component.name
|
||||
break
|
||||
else:
|
||||
hacs_json = json.loads((tmpdir / "hacs.json").read_text())
|
||||
if hacs_json.get("content_in_root"):
|
||||
source = tmpdir
|
||||
dest = hass_config_path / "custom_components" / self.name
|
||||
|
||||
if not source or not dest:
|
||||
raise ValueError("No custom_components directory found")
|
||||
|
||||
dest.parent.mkdir(parents=True, exist_ok=True)
|
||||
shutil.rmtree(dest, ignore_errors=True)
|
||||
shutil.move(source, dest)
|
||||
|
||||
yaml.dump(self.to_yaml(), dest.joinpath("unhacs.yaml").open("w"))
|
||||
|
||||
def install_fork_component(self, hass_config_path: Path):
|
||||
"""Installs the integration from hass fork."""
|
||||
# TODO: Replace asserts with errors
|
||||
assert self.fork_component
|
||||
assert self.branch_name
|
||||
zipball_url = get_branch_zip(self.url, self.branch_name)
|
||||
response = requests.get(zipball_url)
|
||||
response.raise_for_status()
|
||||
|
||||
with tempfile.TemporaryDirectory(prefix="unhacs-") as tempdir:
|
||||
tmpdir = Path(tempdir)
|
||||
extract_zip(ZipFile(BytesIO(response.content)), tmpdir)
|
||||
|
||||
source, dest = None, None
|
||||
source = tmpdir / "homeassistant" / "components" / self.fork_component
|
||||
if not source.exists() or not source.is_dir():
|
||||
raise ValueError(
|
||||
f"Could not find {self.fork_component} in {self.url}@{self.version}"
|
||||
)
|
||||
|
||||
# Add version to manifest
|
||||
manifest_file = source / "manifest.json"
|
||||
manifest = json.load(manifest_file.open())
|
||||
manifest["version"] = "0.0.0"
|
||||
json.dump(manifest, manifest_file.open("w"))
|
||||
|
||||
dest = hass_config_path / "custom_components" / source.name
|
||||
|
||||
if not source or not dest:
|
||||
raise ValueError("No custom_components directory found")
|
||||
|
||||
dest.parent.mkdir(parents=True, exist_ok=True)
|
||||
shutil.rmtree(dest, ignore_errors=True)
|
||||
shutil.move(source, dest)
|
||||
|
||||
yaml.dump(self.to_yaml(), dest.joinpath("unhacs.yaml").open("w"))
|
||||
|
||||
def install(self, hass_config_path: Path):
|
||||
"""Installs the package."""
|
||||
if self.package_type == PackageType.PLUGIN:
|
||||
self.install_plugin(hass_config_path)
|
||||
elif self.package_type == PackageType.INTEGRATION:
|
||||
self.install_integration(hass_config_path)
|
||||
else:
|
||||
raise NotImplementedError(f"Unknown package type {self.package_type}")
|
||||
|
||||
def uninstall(self, hass_config_path: Path) -> bool:
|
||||
"""Uninstalls the package if it is installed, returning True if it was uninstalled."""
|
||||
if not self.path:
|
||||
print("No path found for package, searching...")
|
||||
if installed_package := self.installed_package(hass_config_path):
|
||||
installed_package.uninstall(hass_config_path)
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
print("Removing", self.path)
|
||||
|
||||
if self.path.is_dir():
|
||||
shutil.rmtree(self.path)
|
||||
else:
|
||||
self.path.unlink()
|
||||
self.path.with_name(f"{self.path.name}-unhacs.yaml").unlink()
|
||||
|
||||
# Remove from resources
|
||||
resources_file = hass_config_path / "resources.yaml"
|
||||
if resources_file.exists():
|
||||
with resources_file.open("r") as f:
|
||||
resources = yaml.safe_load(f) or []
|
||||
new_resources = [
|
||||
r for r in resources if r["url"] != f"/local/js/{self.path.name}"
|
||||
]
|
||||
if len(new_resources) != len(resources):
|
||||
|
||||
with resources_file.open("w") as f:
|
||||
yaml.dump(new_resources, f)
|
||||
|
||||
return True
|
||||
|
||||
def installed_package(self, hass_config_path: Path) -> "Package|None":
|
||||
"""Returns the installed package if it exists, otherwise None."""
|
||||
for package in get_installed_packages(hass_config_path, [self.package_type]):
|
||||
if package.url == self.url:
|
||||
return package
|
||||
|
||||
return None
|
||||
|
||||
def is_update(self, hass_config_path: Path) -> bool:
|
||||
"""Returns True if the package is not installed or the installed version is different from the latest."""
|
||||
installed_package = self.installed_package(hass_config_path)
|
||||
return installed_package is None or installed_package.version != self.version
|
||||
|
||||
def get_latest(self) -> "Package":
|
||||
"""Returns a new Package representing the latest version of this package."""
|
||||
package = self.to_yaml()
|
||||
package.pop("version")
|
||||
return Package(**package)
|
||||
|
||||
|
||||
def get_installed_packages(
|
||||
hass_config_path: Path = DEFAULT_HASS_CONFIG_PATH,
|
||||
package_types: Iterable[PackageType] = (
|
||||
PackageType.INTEGRATION,
|
||||
PackageType.PLUGIN,
|
||||
),
|
||||
) -> Generator[Package, None, None]:
|
||||
# Integration packages
|
||||
if PackageType.INTEGRATION in package_types:
|
||||
for custom_component in (hass_config_path / "custom_components").glob("*"):
|
||||
unhacs = custom_component / "unhacs.yaml"
|
||||
if unhacs.exists():
|
||||
package = Package.from_yaml(yaml.safe_load(unhacs.open()))
|
||||
package.path = custom_component
|
||||
yield package
|
||||
|
||||
# Plugin packages
|
||||
if PackageType.PLUGIN in package_types:
|
||||
for js_unhacs in (hass_config_path / "www" / "js").glob("*-unhacs.yaml"):
|
||||
package = Package.from_yaml(yaml.safe_load(js_unhacs.open()))
|
||||
package.path = js_unhacs.with_name(
|
||||
js_unhacs.name.removesuffix("-unhacs.yaml")
|
||||
)
|
||||
yield package
|
||||
|
||||
|
||||
# Read a list of Packages from a text file in the plain text format "URL version name"
|
||||
def read_lock_packages(package_file: Path = DEFAULT_PACKAGE_FILE) -> list[Package]:
|
||||
if package_file.exists():
|
||||
return [
|
||||
Package.from_yaml(p)
|
||||
for p in yaml.safe_load(package_file.open())["packages"]
|
||||
]
|
||||
return []
|
||||
|
||||
|
||||
# Write a list of Packages to a text file in the format URL version name
|
||||
def write_lock_packages(
|
||||
packages: Iterable[Package], package_file: Path = DEFAULT_PACKAGE_FILE
|
||||
):
|
||||
yaml.dump({"packages": [p.to_yaml() for p in packages]}, package_file.open("w"))
|
@ -1,82 +0,0 @@
|
||||
from collections.abc import Iterable
|
||||
from pathlib import Path
|
||||
from typing import cast
|
||||
|
||||
import yaml
|
||||
|
||||
from unhacs.packages.common import Package
|
||||
from unhacs.packages.common import PackageType
|
||||
from unhacs.packages.fork import Fork
|
||||
from unhacs.packages.integration import Integration
|
||||
from unhacs.packages.plugin import Plugin
|
||||
from unhacs.packages.theme import Theme
|
||||
from unhacs.utils import DEFAULT_HASS_CONFIG_PATH
|
||||
from unhacs.utils import DEFAULT_PACKAGE_FILE
|
||||
|
||||
|
||||
def from_yaml(data: dict | Path | str) -> Package:
|
||||
if isinstance(data, Path):
|
||||
data = yaml.safe_load(data.open())
|
||||
elif isinstance(data, str):
|
||||
data = yaml.safe_load(data)
|
||||
|
||||
data = cast(dict, data)
|
||||
|
||||
# Convert package_type to enum
|
||||
package_type = data.pop("package_type", None)
|
||||
if package_type and isinstance(package_type, str):
|
||||
package_type = PackageType(package_type)
|
||||
|
||||
url = data.pop("url")
|
||||
|
||||
return {
|
||||
PackageType.INTEGRATION: Integration,
|
||||
PackageType.PLUGIN: Plugin,
|
||||
PackageType.THEME: Theme,
|
||||
PackageType.FORK: Fork,
|
||||
}[package_type](url, **data)
|
||||
|
||||
|
||||
def get_installed_packages(
|
||||
hass_config_path: Path = DEFAULT_HASS_CONFIG_PATH,
|
||||
package_types: Iterable[PackageType] = (
|
||||
PackageType.INTEGRATION,
|
||||
PackageType.FORK,
|
||||
PackageType.PLUGIN,
|
||||
PackageType.THEME,
|
||||
),
|
||||
) -> list[Package]:
|
||||
# Integration packages
|
||||
packages: list[Package] = []
|
||||
|
||||
if PackageType.INTEGRATION in package_types:
|
||||
packages.extend(Integration.find_installed(hass_config_path))
|
||||
|
||||
if PackageType.FORK in package_types:
|
||||
packages.extend(Fork.find_installed(hass_config_path))
|
||||
|
||||
# Plugin packages
|
||||
if PackageType.PLUGIN in package_types:
|
||||
packages.extend(Plugin.find_installed(hass_config_path))
|
||||
|
||||
# Theme packages
|
||||
if PackageType.THEME in package_types:
|
||||
packages.extend(Theme.find_installed(hass_config_path))
|
||||
|
||||
return packages
|
||||
|
||||
|
||||
# Read a list of Packages from a text file in the plain text format "URL version name"
|
||||
def read_lock_packages(package_file: Path = DEFAULT_PACKAGE_FILE) -> list[Package]:
|
||||
if package_file.exists():
|
||||
with package_file.open() as f:
|
||||
return [from_yaml(p) for p in yaml.safe_load(f)["packages"]]
|
||||
return []
|
||||
|
||||
|
||||
# Write a list of Packages to a text file in the format URL version name
|
||||
def write_lock_packages(
|
||||
packages: Iterable[Package], package_file: Path = DEFAULT_PACKAGE_FILE
|
||||
):
|
||||
with open(package_file, "w") as f:
|
||||
yaml.dump({"packages": [p.to_yaml() for p in packages]}, f)
|
@ -1,225 +0,0 @@
|
||||
import shutil
|
||||
from enum import StrEnum
|
||||
from enum import auto
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from typing import cast
|
||||
|
||||
import requests
|
||||
import yaml
|
||||
|
||||
from unhacs.git import get_repo_tags
|
||||
|
||||
|
||||
class PackageType(StrEnum):
|
||||
INTEGRATION = auto()
|
||||
PLUGIN = auto()
|
||||
FORK = auto()
|
||||
THEME = auto()
|
||||
|
||||
|
||||
class Package:
|
||||
git_tags = False
|
||||
package_type: PackageType
|
||||
|
||||
other_fields: list[str] = []
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
url: str,
|
||||
version: str | None = None,
|
||||
ignored_versions: set[str] | None = None,
|
||||
):
|
||||
self.url = url
|
||||
self.ignored_versions = ignored_versions or set()
|
||||
|
||||
parts = self.url.split("/")
|
||||
self.owner = parts[-2]
|
||||
self.name = parts[-1]
|
||||
|
||||
self.path: Path | None = None
|
||||
|
||||
if not version:
|
||||
self.version = self.fetch_version_release()
|
||||
else:
|
||||
self.version = version
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.package_type}: {self.name} {self.version}"
|
||||
|
||||
def __eq__(self, other):
|
||||
return all(
|
||||
(
|
||||
self.same(other),
|
||||
self.version == other.version,
|
||||
)
|
||||
)
|
||||
|
||||
def same(self, other):
|
||||
fields = list(["url"] + self.other_fields)
|
||||
|
||||
return all((getattr(self, field) == getattr(other, field) for field in fields))
|
||||
|
||||
def __hash__(self):
|
||||
fields = list(["url"] + self.other_fields)
|
||||
|
||||
return hash(tuple(getattr(self, field) for field in fields))
|
||||
|
||||
def verbose_str(self):
|
||||
return f"{str(self)} ({self.url})"
|
||||
|
||||
@classmethod
|
||||
def from_yaml(cls, data: dict | Path | str) -> "Package":
|
||||
if isinstance(data, Path):
|
||||
with data.open() as f:
|
||||
data = yaml.safe_load(f)
|
||||
elif isinstance(data, str):
|
||||
data = yaml.safe_load(data)
|
||||
|
||||
data = cast(dict, data)
|
||||
|
||||
if (package_type := data.pop("package_type")) != cls.package_type:
|
||||
raise ValueError(
|
||||
f"Invalid package_type ({package_type}) for this class {cls.package_type}"
|
||||
)
|
||||
|
||||
return cls(data.pop("url"), **data)
|
||||
|
||||
def to_yaml(self, dest: Path | None = None) -> dict:
|
||||
data: dict[str, Any] = {
|
||||
"url": self.url,
|
||||
"version": self.version,
|
||||
"package_type": str(self.package_type),
|
||||
}
|
||||
|
||||
if self.ignored_versions:
|
||||
data["ignored_versions"] = self.ignored_versions
|
||||
|
||||
for field in self.other_fields:
|
||||
if hasattr(self, field):
|
||||
data[field] = getattr(self, field)
|
||||
|
||||
if dest:
|
||||
with dest.open("w") as f:
|
||||
yaml.dump(self.to_yaml(), f)
|
||||
|
||||
return data
|
||||
|
||||
def add_ignored_version(self, version: str):
|
||||
self.ignored_versions.add(version)
|
||||
|
||||
def _fetch_version_release_releases(self, version: str | None = None) -> str:
|
||||
# Fetch the releases from the GitHub API
|
||||
response = requests.get(
|
||||
f"https://api.github.com/repos/{self.owner}/{self.name}/releases"
|
||||
)
|
||||
response.raise_for_status()
|
||||
releases = response.json()
|
||||
|
||||
if not releases:
|
||||
raise ValueError(f"No releases found for package {self.name}")
|
||||
|
||||
# Default to latest
|
||||
desired_release = releases[0]
|
||||
|
||||
# If a version is provided, check if it exists in the releases
|
||||
if version:
|
||||
for release in releases:
|
||||
if release["tag_name"] == version:
|
||||
desired_release = release
|
||||
break
|
||||
else:
|
||||
raise ValueError(f"Version {version} does not exist for this package")
|
||||
|
||||
return cast(str, desired_release["tag_name"])
|
||||
|
||||
def _fetch_version_release_git(self, version: str | None = None) -> str:
|
||||
tags = get_repo_tags(self.url)
|
||||
if not tags:
|
||||
raise ValueError(f"No tags found for package {self.name}")
|
||||
if version and version not in tags:
|
||||
raise ValueError(f"Version {version} does not exist for this package")
|
||||
|
||||
tags = [tag for tag in tags if tag not in self.ignored_versions]
|
||||
if not version:
|
||||
version = tags[-1]
|
||||
|
||||
return version
|
||||
|
||||
def fetch_version_release(self, version: str | None = None) -> str:
|
||||
if self.git_tags:
|
||||
return self._fetch_version_release_git(version)
|
||||
else:
|
||||
return self._fetch_version_release_releases(version)
|
||||
|
||||
def _fetch_versions(self) -> list[str]:
|
||||
return get_repo_tags(self.url)
|
||||
|
||||
def get_hacs_json(self, version: str | None = None) -> dict:
|
||||
"""Fetches the hacs.json file for the package."""
|
||||
version = version or self.version
|
||||
response = requests.get(
|
||||
f"https://raw.githubusercontent.com/{self.owner}/{self.name}/{version}/hacs.json"
|
||||
)
|
||||
|
||||
if response.status_code == 404:
|
||||
return {}
|
||||
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
def install(self, hass_config_path: Path):
|
||||
raise NotImplementedError()
|
||||
|
||||
@property
|
||||
def unhacs_path(self) -> Path | None:
|
||||
if self.path is None:
|
||||
return None
|
||||
|
||||
return self.path / "unhacs.yaml"
|
||||
|
||||
def uninstall(self, hass_config_path: Path) -> bool:
|
||||
"""Uninstalls the package if it is installed, returning True if it was uninstalled."""
|
||||
if not self.path:
|
||||
if installed_package := self.installed_package(hass_config_path):
|
||||
installed_package.uninstall(hass_config_path)
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
if self.path.is_dir():
|
||||
shutil.rmtree(self.path)
|
||||
else:
|
||||
self.path.unlink()
|
||||
if self.unhacs_path and self.unhacs_path.exists():
|
||||
self.unhacs_path.unlink()
|
||||
|
||||
return True
|
||||
|
||||
@classmethod
|
||||
def get_install_dir(cls, hass_config_path: Path) -> Path:
|
||||
raise NotImplementedError()
|
||||
|
||||
@classmethod
|
||||
def find_installed(cls, hass_config_path: Path) -> list["Package"]:
|
||||
raise NotImplementedError()
|
||||
|
||||
def installed_package(self, hass_config_path: Path) -> "Package|None":
|
||||
"""Returns the installed package if it exists, otherwise None."""
|
||||
for package in self.find_installed(hass_config_path):
|
||||
if self.same(package):
|
||||
return package
|
||||
|
||||
return None
|
||||
|
||||
def is_update(self, hass_config_path: Path) -> bool:
|
||||
"""Returns True if the package is not installed or the installed version is different from the latest."""
|
||||
installed_package = self.installed_package(hass_config_path)
|
||||
return installed_package is None or installed_package.version != self.version
|
||||
|
||||
def get_latest(self) -> "Package":
|
||||
"""Returns a new Package representing the latest version of this package."""
|
||||
package = self.to_yaml()
|
||||
package.pop("version")
|
||||
package.pop("package_type")
|
||||
return self.__class__(package.pop("url"), **package)
|
@ -1,106 +0,0 @@
|
||||
import json
|
||||
import shutil
|
||||
import tempfile
|
||||
from io import BytesIO
|
||||
from pathlib import Path
|
||||
from zipfile import ZipFile
|
||||
|
||||
import requests
|
||||
import yaml
|
||||
|
||||
from unhacs.git import get_branch_zip
|
||||
from unhacs.git import get_latest_sha
|
||||
from unhacs.git import get_sha_zip
|
||||
from unhacs.packages import PackageType
|
||||
from unhacs.packages.common import Package
|
||||
from unhacs.packages.integration import Integration
|
||||
from unhacs.utils import extract_zip
|
||||
|
||||
|
||||
class Fork(Integration):
|
||||
other_fields = ["fork_component", "branch_name"]
|
||||
package_type = PackageType.FORK
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
url: str,
|
||||
fork_component: str,
|
||||
branch_name: str,
|
||||
version: str | None = None,
|
||||
ignored_versions: set[str] | None = None,
|
||||
):
|
||||
self.fork_component = fork_component
|
||||
self.branch_name = branch_name
|
||||
|
||||
super().__init__(
|
||||
url,
|
||||
version=version,
|
||||
ignored_versions=ignored_versions,
|
||||
)
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.package_type}: {self.fork_component} ({self.owner}/{self.name}@{self.branch_name}) {self.version}"
|
||||
|
||||
def fetch_version_release(self, version: str | None = None) -> str:
|
||||
if version:
|
||||
return version
|
||||
|
||||
return get_latest_sha(self.url, self.branch_name)
|
||||
|
||||
@classmethod
|
||||
def find_installed(cls, hass_config_path: Path) -> list[Package]:
|
||||
packages: list[Package] = []
|
||||
|
||||
for custom_component in cls.get_install_dir(hass_config_path).glob("*"):
|
||||
unhacs = custom_component / "unhacs.yaml"
|
||||
if unhacs.exists():
|
||||
data = yaml.safe_load(unhacs.read_text())
|
||||
if data["package_type"] != "fork":
|
||||
continue
|
||||
package = cls.from_yaml(data)
|
||||
package.path = custom_component
|
||||
packages.append(package)
|
||||
|
||||
return packages
|
||||
|
||||
def install(self, hass_config_path: Path) -> None:
|
||||
"""Installs the integration from hass fork."""
|
||||
if self.version:
|
||||
zipball_url = get_sha_zip(self.url, self.version)
|
||||
else:
|
||||
zipball_url = get_branch_zip(self.url, self.branch_name)
|
||||
|
||||
response = requests.get(zipball_url)
|
||||
response.raise_for_status()
|
||||
|
||||
with tempfile.TemporaryDirectory(prefix="unhacs-") as tempdir:
|
||||
tmpdir = Path(tempdir)
|
||||
extract_zip(ZipFile(BytesIO(response.content)), tmpdir)
|
||||
|
||||
source, dest = None, None
|
||||
source = tmpdir / "homeassistant" / "components" / self.fork_component
|
||||
if not source.exists() or not source.is_dir():
|
||||
raise ValueError(
|
||||
f"Could not find {self.fork_component} in {self.url}@{self.version}"
|
||||
)
|
||||
|
||||
# Add version to manifest
|
||||
manifest_file = source / "manifest.json"
|
||||
manifest: dict[str, str]
|
||||
with manifest_file.open("r") as f:
|
||||
manifest = json.load(f)
|
||||
manifest["version"] = "0.0.0"
|
||||
with manifest_file.open("w") as f:
|
||||
json.dump(manifest, f)
|
||||
|
||||
dest = self.get_install_dir(hass_config_path) / source.name
|
||||
|
||||
if not source or not dest:
|
||||
raise ValueError("No custom_components directory found")
|
||||
|
||||
dest.parent.mkdir(parents=True, exist_ok=True)
|
||||
shutil.rmtree(dest, ignore_errors=True)
|
||||
shutil.move(source, dest)
|
||||
self.path = dest
|
||||
|
||||
self.to_yaml(self.unhacs_path)
|
@ -1,81 +0,0 @@
|
||||
import json
|
||||
import shutil
|
||||
import tempfile
|
||||
from io import BytesIO
|
||||
from pathlib import Path
|
||||
from zipfile import ZipFile
|
||||
|
||||
import requests
|
||||
import yaml
|
||||
|
||||
from unhacs.git import get_tag_zip
|
||||
from unhacs.packages import Package
|
||||
from unhacs.packages import PackageType
|
||||
from unhacs.utils import extract_zip
|
||||
|
||||
|
||||
class Integration(Package):
|
||||
package_type = PackageType.INTEGRATION
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
url: str,
|
||||
version: str | None = None,
|
||||
ignored_versions: set[str] | None = None,
|
||||
):
|
||||
super().__init__(
|
||||
url,
|
||||
version=version,
|
||||
ignored_versions=ignored_versions,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def get_install_dir(cls, hass_config_path: Path) -> Path:
|
||||
return hass_config_path / "custom_components"
|
||||
|
||||
@classmethod
|
||||
def find_installed(cls, hass_config_path: Path) -> list[Package]:
|
||||
packages: list[Package] = []
|
||||
|
||||
for custom_component in cls.get_install_dir(hass_config_path).glob("*"):
|
||||
unhacs = custom_component / "unhacs.yaml"
|
||||
if unhacs.exists():
|
||||
data = yaml.safe_load(unhacs.read_text())
|
||||
if data["package_type"] == "fork":
|
||||
continue
|
||||
package = cls.from_yaml(data)
|
||||
package.path = custom_component
|
||||
packages.append(package)
|
||||
|
||||
return packages
|
||||
|
||||
def install(self, hass_config_path: Path) -> None:
|
||||
"""Installs the integration package."""
|
||||
zipball_url = get_tag_zip(self.url, self.version)
|
||||
response = requests.get(zipball_url)
|
||||
response.raise_for_status()
|
||||
|
||||
with tempfile.TemporaryDirectory(prefix="unhacs-") as tempdir:
|
||||
tmpdir = Path(tempdir)
|
||||
extract_zip(ZipFile(BytesIO(response.content)), tmpdir)
|
||||
|
||||
source, dest = None, None
|
||||
for custom_component in tmpdir.glob("custom_components/*"):
|
||||
source = custom_component
|
||||
dest = self.get_install_dir(hass_config_path) / custom_component.name
|
||||
break
|
||||
else:
|
||||
hacs_json = json.loads((tmpdir / "hacs.json").read_text())
|
||||
if hacs_json.get("content_in_root"):
|
||||
source = tmpdir
|
||||
dest = self.get_install_dir(hass_config_path) / self.name
|
||||
|
||||
if not source or not dest:
|
||||
raise ValueError("No custom_components directory found")
|
||||
|
||||
dest.parent.mkdir(parents=True, exist_ok=True)
|
||||
shutil.rmtree(dest, ignore_errors=True)
|
||||
shutil.move(source, dest)
|
||||
self.path = dest
|
||||
|
||||
self.to_yaml(self.unhacs_path)
|
@ -1,95 +0,0 @@
|
||||
from pathlib import Path
|
||||
from typing import cast
|
||||
|
||||
import requests
|
||||
|
||||
from unhacs.packages import Package
|
||||
from unhacs.packages import PackageType
|
||||
|
||||
|
||||
class Plugin(Package):
|
||||
package_type = PackageType.PLUGIN
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
url: str,
|
||||
version: str | None = None,
|
||||
ignored_versions: set[str] | None = None,
|
||||
):
|
||||
super().__init__(
|
||||
url,
|
||||
version=version,
|
||||
ignored_versions=ignored_versions,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def get_install_dir(cls, hass_config_path: Path) -> Path:
|
||||
return hass_config_path / "www" / "js"
|
||||
|
||||
@property
|
||||
def unhacs_path(self) -> Path | None:
|
||||
if self.path is None:
|
||||
return None
|
||||
|
||||
return self.path.with_name(f"{self.path.name}-unhacs.yaml")
|
||||
|
||||
@classmethod
|
||||
def find_installed(cls, hass_config_path: Path) -> list["Package"]:
|
||||
packages: list[Package] = []
|
||||
|
||||
for js_unhacs in cls.get_install_dir(hass_config_path).glob("*-unhacs.yaml"):
|
||||
package = cls.from_yaml(js_unhacs)
|
||||
package.path = js_unhacs.with_name(
|
||||
js_unhacs.name.removesuffix("-unhacs.yaml")
|
||||
)
|
||||
packages.append(package)
|
||||
|
||||
return packages
|
||||
|
||||
def install(self, hass_config_path: Path) -> None:
|
||||
"""Installs the plugin package."""
|
||||
|
||||
valid_filenames: list[str]
|
||||
if filename := self.get_hacs_json().get("filename"):
|
||||
valid_filenames = [cast(str, filename)]
|
||||
else:
|
||||
valid_filenames = [
|
||||
f"{self.name.removeprefix('lovelace-')}.js",
|
||||
f"{self.name}.js",
|
||||
f"{self.name}-umd.js",
|
||||
f"{self.name}-bundle.js",
|
||||
]
|
||||
|
||||
def real_get(filename) -> requests.Response | None:
|
||||
urls = [
|
||||
f"https://raw.githubusercontent.com/{self.owner}/{self.name}/{self.version}/dist/{filename}",
|
||||
f"https://github.com/{self.owner}/{self.name}/releases/download/{self.version}/{filename}",
|
||||
f"https://raw.githubusercontent.com/{self.owner}/{self.name}/{self.version}/{filename}",
|
||||
]
|
||||
|
||||
for url in urls:
|
||||
plugin = requests.get(url)
|
||||
|
||||
if int(plugin.status_code / 100) == 4:
|
||||
continue
|
||||
|
||||
plugin.raise_for_status()
|
||||
|
||||
return plugin
|
||||
|
||||
return None
|
||||
|
||||
for filename in valid_filenames:
|
||||
plugin = real_get(filename)
|
||||
if plugin:
|
||||
break
|
||||
else:
|
||||
raise ValueError(f"No valid filename found for package {self.name}")
|
||||
|
||||
js_path = self.get_install_dir(hass_config_path)
|
||||
js_path.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
self.path = js_path.joinpath(filename)
|
||||
self.path.write_text(plugin.text)
|
||||
|
||||
self.to_yaml(self.unhacs_path)
|
@ -1,63 +0,0 @@
|
||||
from pathlib import Path
|
||||
from typing import cast
|
||||
|
||||
import requests
|
||||
|
||||
from unhacs.packages import Package
|
||||
from unhacs.packages import PackageType
|
||||
|
||||
|
||||
class Theme(Package):
|
||||
package_type = PackageType.THEME
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
url: str,
|
||||
version: str | None = None,
|
||||
ignored_versions: set[str] | None = None,
|
||||
):
|
||||
super().__init__(
|
||||
url,
|
||||
version=version,
|
||||
ignored_versions=ignored_versions,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def get_install_dir(cls, hass_config_path: Path) -> Path:
|
||||
return hass_config_path / "themes"
|
||||
|
||||
@property
|
||||
def unhacs_path(self) -> Path | None:
|
||||
if self.path is None:
|
||||
return None
|
||||
|
||||
return self.path.with_name(f"{self.path.name}.unhacs")
|
||||
|
||||
@classmethod
|
||||
def find_installed(cls, hass_config_path: Path) -> list["Package"]:
|
||||
packages: list[Package] = []
|
||||
|
||||
for js_unhacs in cls.get_install_dir(hass_config_path).glob("*.unhacs"):
|
||||
package = cls.from_yaml(js_unhacs)
|
||||
package.path = js_unhacs.with_name(js_unhacs.name.removesuffix(".unhacs"))
|
||||
packages.append(package)
|
||||
|
||||
return packages
|
||||
|
||||
def install(self, hass_config_path: Path) -> None:
|
||||
"""Install theme yaml."""
|
||||
filename = self.get_hacs_json().get("filename")
|
||||
if not filename:
|
||||
raise ValueError(f"No filename found for theme {self.name}")
|
||||
|
||||
filename = cast(str, filename)
|
||||
url = f"https://raw.githubusercontent.com/{self.owner}/{self.name}/{self.version}/themes/{filename}"
|
||||
theme = requests.get(url)
|
||||
theme.raise_for_status()
|
||||
|
||||
themes_path = self.get_install_dir(hass_config_path)
|
||||
themes_path.mkdir(parents=True, exist_ok=True)
|
||||
self.path = themes_path.joinpath(filename)
|
||||
self.path.write_text(theme.text)
|
||||
|
||||
self.to_yaml(self.unhacs_path)
|
@ -1,21 +0,0 @@
|
||||
from pathlib import Path
|
||||
from zipfile import ZipFile
|
||||
|
||||
DEFAULT_HASS_CONFIG_PATH: Path = Path(".")
|
||||
DEFAULT_PACKAGE_FILE = Path("unhacs.yaml")
|
||||
|
||||
|
||||
def extract_zip(zip_file: ZipFile, dest_dir: Path) -> Path:
|
||||
"""Extract a zip file to a directory."""
|
||||
for info in zip_file.infolist():
|
||||
if info.is_dir():
|
||||
continue
|
||||
file = Path(info.filename)
|
||||
# Strip top directory from path
|
||||
file = Path(*file.parts[1:])
|
||||
path = dest_dir / file
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with zip_file.open(info) as source, open(path, "wb") as dest:
|
||||
dest.write(source.read())
|
||||
|
||||
return dest_dir
|
Loading…
Reference in New Issue
Block a user