From 3f2aeadbc5eab3af044a651ffb950e09225d0a85 Mon Sep 17 00:00:00 2001 From: Ian Fijolek Date: Wed, 17 Jul 2024 21:09:36 -0700 Subject: [PATCH] Support for installing hass fork components --- unhacs/git.py | 23 ++++++++++- unhacs/main.py | 89 ++++++++++++++++++++++++++------------- unhacs/packages.py | 101 ++++++++++++++++++++++++++++++++++++++++----- 3 files changed, 173 insertions(+), 40 deletions(-) diff --git a/unhacs/git.py b/unhacs/git.py index a01616b..9803349 100644 --- a/unhacs/git.py +++ b/unhacs/git.py @@ -67,5 +67,26 @@ def get_repo_tags(repository_url: str) -> list[str]: return [tag.name for tag in tags] -def get_ref_zip(repository_url: str, tag_name: str) -> str: +def get_latest_sha(repository_url: str, branch_name: str) -> str: + command = f"git ls-remote {repository_url} {branch_name}" + result = subprocess.run( + command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE + ) + + # Check for errors + if result.returncode != 0: + raise Exception(f"Error running command: {command}\n{result.stderr.decode()}") + + for line in result.stdout.decode().split("\n"): + if line: + return line.partition("\t")[0] + + raise ValueError(f"branch name '{branch_name}' not found for {repository_url}") + + +def get_tag_zip(repository_url: str, tag_name: str) -> str: return f"{repository_url}/archive/refs/tags/{tag_name}.zip" + + +def get_branch_zip(repository_url: str, branch_name: str) -> str: + return f"{repository_url}/archive/{branch_name}.zip" diff --git a/unhacs/main.py b/unhacs/main.py index 4ac2e4d..a293c47 100644 --- a/unhacs/main.py +++ b/unhacs/main.py @@ -12,7 +12,7 @@ from unhacs.packages import read_lock_packages from unhacs.packages import write_lock_packages -def create_parser(): +def parse_args(): parser = ArgumentParser( description="Unhacs - Command line interface for the Home Assistant Community Store" ) @@ -68,14 +68,31 @@ def create_parser(): dest="type", const=PackageType.INTEGRATION, default=PackageType.INTEGRATION, + help="The package is an integration.", ) package_type_group.add_argument( - "--plugin", action="store_const", dest="type", const=PackageType.PLUGIN + "--plugin", + action="store_const", + dest="type", + const=PackageType.PLUGIN, + help="The package is a JavaScript plugin.", + ) + package_type_group.add_argument( + "--forked-component", + type=str, + dest="component", + help="Component name from a forked type.", ) add_parser.add_argument( "--version", "-v", type=str, help="The version of the package." ) + add_parser.add_argument( + "--branch", + "-b", + type=str, + help="For foked types only, branch that should be used.", + ) add_parser.add_argument( "--update", "-u", @@ -101,7 +118,20 @@ def create_parser(): ) update_parser.add_argument("packages", nargs="*") - return parser + args = parser.parse_args() + + if args.subcommand == "add": + # Component implies forked package + if args.component and args.type != PackageType.FORK: + args.type = PackageType.FORK + + # Branch is only valid for forked packages + if args.type != PackageType.FORK and args.branch: + raise ValueError( + "Branch and component can only be used with forked packages" + ) + + return args class Unhacs: @@ -121,19 +151,10 @@ class Unhacs: def add_package( self, - package_url: str, - version: str | None = None, + package: Package, update: bool = False, - package_type: PackageType = PackageType.INTEGRATION, - ignore_versions: set[str] | None = None, ): """Install and add a package to the lock or install a specific version.""" - package = Package( - package_url, - version=version, - package_type=package_type, - ignored_versions=ignore_versions, - ) packages = self.read_lock_packages() # Raise an error if the package is already in the list @@ -201,8 +222,20 @@ class Unhacs: packages_to_remove = [ package for package in get_installed_packages() - if (package.name in package_names or package.url in package_names) + if ( + package.name in package_names + or package.url in package_names + or package.fork_component in package_names + ) ] + + if packages_to_remove and input("Remove all packages? (y/N) ").lower() != "y": + return + + if package_names and not packages_to_remove: + print("No packages found to remove") + return + remaining_packages = [ package for package in self.read_lock_packages() @@ -217,8 +250,7 @@ class Unhacs: def main(): # If the sub command is add package, it should pass the parsed arguments to the add_package function and return - parser = create_parser() - args = parser.parse_args() + args = parse_args() unhacs = Unhacs(args.config, args.package_file) Package.git_tags = args.git_tags @@ -229,23 +261,24 @@ def main(): packages = read_lock_packages(args.file) for package in packages: unhacs.add_package( - package.url, - package.version, + package, update=True, - package_type=package.package_type, - ignore_versions=package.ignored_versions, ) elif args.url: unhacs.add_package( - args.url, - version=args.version, - update=args.update, - package_type=args.type, - ignore_versions=( - {version for version in args.ignore_versions.split(",")} - if args.ignore_versions - else None + Package( + args.url, + version=args.version, + package_type=args.type, + ignored_versions=( + {version for version in args.ignore_versions.split(",")} + if args.ignore_versions + else None + ), + branch_name=args.branch, + fork_component=args.component, ), + update=args.update, ) else: raise ValueError("Either a file or a URL must be provided") diff --git a/unhacs/packages.py b/unhacs/packages.py index 8ecb009..c275e0e 100644 --- a/unhacs/packages.py +++ b/unhacs/packages.py @@ -7,14 +7,17 @@ from enum import StrEnum from enum import auto from io import BytesIO from pathlib import Path +from typing import Any from typing import cast from zipfile import ZipFile import requests import yaml -from unhacs.git import get_ref_zip +from unhacs.git import get_branch_zip +from unhacs.git import get_latest_sha from unhacs.git import get_repo_tags +from unhacs.git import get_tag_zip DEFAULT_HASS_CONFIG_PATH: Path = Path(".") DEFAULT_PACKAGE_FILE = Path("unhacs.yaml") @@ -36,6 +39,7 @@ def extract_zip(zip_file: ZipFile, dest_dir: Path): class PackageType(StrEnum): INTEGRATION = auto() PLUGIN = auto() + FORK = auto() class Package: @@ -47,10 +51,17 @@ class Package: version: str | None = None, package_type: PackageType = PackageType.INTEGRATION, ignored_versions: set[str] | None = None, + branch_name: str | None = None, + fork_component: str | None = None, ): + if package_type == PackageType.FORK and not fork_component: + raise ValueError(f"Fork with no component specified {url}@{branch_name}") + self.url = url self.package_type = package_type + self.fork_component = fork_component self.ignored_versions = ignored_versions or set() + self.branch_name = branch_name parts = self.url.split("/") self.owner = parts[-2] @@ -64,31 +75,53 @@ class Package: self.version = version def __str__(self): - return f"{self.name} {self.version}" + name = self.name + if self.fork_component: + name = f"{self.fork_component} ({name})" + version = self.version + if self.branch_name: + version = f"{version} ({self.branch_name})" + return f"{self.package_type}: {name} {version}" def __eq__(self, other): - return self.url == other.url and self.version == other.version + return all( + ( + self.url == other.url, + self.version == other.version, + self.branch_name == other.branch_name, + self.fork_component == other.fork_component, + ) + ) def verbose_str(self): - return f"{self.name} {self.version} ({self.url})" + return f"{str(self)} ({self.url})" @staticmethod - def from_yaml(yaml: dict) -> "Package": + def from_yaml(yml: dict) -> "Package": # Convert package_type to enum - package_type = yaml.pop("package_type", None) + package_type = yml.pop("package_type", None) if package_type and isinstance(package_type, str): package_type = PackageType(package_type) - yaml["package_type"] = package_type + yml["package_type"] = package_type - return Package(**yaml) + return Package(**yml) def to_yaml(self: "Package") -> dict: - return { + data: dict[str, Any] = { "url": self.url, "version": self.version, "package_type": str(self.package_type), } + if self.branch_name: + data["branch_name"] = self.branch_name + if self.fork_component: + data["fork_component"] = self.fork_component + if self.ignored_versions: + data["ignored_versions"] = self.ignored_versions + + return data + def add_ignored_version(self, version: str): self.ignored_versions.add(version) @@ -130,8 +163,13 @@ class Package: return version + def _fetch_latest_sha(self, branch_name: str) -> str: + return get_latest_sha(self.url, branch_name) + def fetch_version_release(self, version: str | None = None) -> str: - if self.git_tags: + if self.branch_name: + return self._fetch_latest_sha(self.branch_name) + elif self.git_tags: return self._fetch_version_release_git(version) else: return self._fetch_version_release_releases(version) @@ -216,7 +254,7 @@ class Package: def install_integration(self, hass_config_path: Path): """Installs the integration package.""" - zipball_url = get_ref_zip(self.url, self.version) + zipball_url = get_tag_zip(self.url, self.version) response = requests.get(zipball_url) response.raise_for_status() @@ -244,12 +282,53 @@ class Package: yaml.dump(self.to_yaml(), dest.joinpath("unhacs.yaml").open("w")) + def install_fork_component(self, hass_config_path: Path): + """Installs the integration from hass fork.""" + if not self.fork_component: + raise ValueError(f"No fork component specified for {self.verbose_str()}") + if not self.branch_name: + raise ValueError(f"No branch name specified for {self.verbose_str()}") + + zipball_url = get_branch_zip(self.url, self.branch_name) + response = requests.get(zipball_url) + response.raise_for_status() + + with tempfile.TemporaryDirectory(prefix="unhacs-") as tempdir: + tmpdir = Path(tempdir) + extract_zip(ZipFile(BytesIO(response.content)), tmpdir) + + source, dest = None, None + source = tmpdir / "homeassistant" / "components" / self.fork_component + if not source.exists() or not source.is_dir(): + raise ValueError( + f"Could not find {self.fork_component} in {self.url}@{self.version}" + ) + + # Add version to manifest + manifest_file = source / "manifest.json" + manifest = json.load(manifest_file.open()) + manifest["version"] = "0.0.0" + json.dump(manifest, manifest_file.open("w")) + + dest = hass_config_path / "custom_components" / source.name + + if not source or not dest: + raise ValueError("No custom_components directory found") + + dest.parent.mkdir(parents=True, exist_ok=True) + shutil.rmtree(dest, ignore_errors=True) + shutil.move(source, dest) + + yaml.dump(self.to_yaml(), dest.joinpath("unhacs.yaml").open("w")) + def install(self, hass_config_path: Path): """Installs the package.""" if self.package_type == PackageType.PLUGIN: self.install_plugin(hass_config_path) elif self.package_type == PackageType.INTEGRATION: self.install_integration(hass_config_path) + elif self.package_type == PackageType.FORK: + self.install_fork_component(hass_config_path) else: raise NotImplementedError(f"Unknown package type {self.package_type}")