diff --git a/.gitignore b/.gitignore index 61ae24a..0850c69 100644 --- a/.gitignore +++ b/.gitignore @@ -141,3 +141,5 @@ cython_debug/ tags unhacs.txt poetry.lock +custom_components/ +themes/ diff --git a/unhacs/main.py b/unhacs/main.py index b4d214b..9f2b0fe 100644 --- a/unhacs/main.py +++ b/unhacs/main.py @@ -3,13 +3,17 @@ from collections.abc import Iterable from pathlib import Path from unhacs.git import get_repo_tags -from unhacs.packages import DEFAULT_HASS_CONFIG_PATH -from unhacs.packages import DEFAULT_PACKAGE_FILE from unhacs.packages import Package from unhacs.packages import PackageType from unhacs.packages import get_installed_packages from unhacs.packages import read_lock_packages from unhacs.packages import write_lock_packages +from unhacs.packages.fork import Fork +from unhacs.packages.integration import Integration +from unhacs.packages.plugin import Plugin +from unhacs.packages.theme import Theme +from unhacs.utils import DEFAULT_HASS_CONFIG_PATH +from unhacs.utils import DEFAULT_PACKAGE_FILE def parse_args(): @@ -66,40 +70,40 @@ def parse_args(): "--integration", action="store_const", dest="type", - const=PackageType.INTEGRATION, - default=PackageType.INTEGRATION, + const=Integration, + default=Integration, help="The package is an integration.", ) package_type_group.add_argument( "--plugin", action="store_const", dest="type", - const=PackageType.PLUGIN, + const=Plugin, help="The package is a JavaScript plugin.", ) - package_type_group.add_argument( - "--forked-component", - type=str, - dest="component", - help="Component name from a forked type.", - ) package_type_group.add_argument( "--theme", action="store_const", dest="type", - const=PackageType.THEME, + const=Theme, help="The package is a theme.", ) + package_type_group.add_argument( + "--fork-component", + type=str, + help="Name of component from forked core repo.", + ) + # Additional arguments for forked packages + add_parser.add_argument( + "--fork-branch", + "-b", + type=str, + help="Name of branch of forked core repo. (Only for forked components.)", + ) add_parser.add_argument( "--version", "-v", type=str, help="The version of the package." ) - add_parser.add_argument( - "--branch", - "-b", - type=str, - help="For forked types only, branch that should be used.", - ) add_parser.add_argument( "--update", "-u", @@ -129,11 +133,11 @@ def parse_args(): if args.subcommand == "add": # Component implies forked package - if args.component and args.type != PackageType.FORK: - args.type = PackageType.FORK + if args.fork_component and args.type != Fork: + args.type = Fork # Branch is only valid for forked packages - if args.type != PackageType.FORK and args.branch: + if args.type != Fork and args.fork_branch: raise ValueError( "Branch and component can only be used with forked packages" ) @@ -231,7 +235,10 @@ class Unhacs: if ( package.name in package_names or package.url in package_names - or package.fork_component in package_names + or ( + hasattr(package, "fork_component") + and getattr(package, "fork_component") in package_names + ) ) ] @@ -261,6 +268,30 @@ class Unhacs: self.write_lock_packages(remaining_packages) +def args_to_package(args) -> Package: + ignore_versions = ( + {version for version in args.ignore_versions.split(",")} + if args.ignore_versions + else None + ) + + if args.type == Fork: + if not args.fork_branch: + raise ValueError("A branch must be provided for forked components") + if not args.fork_component: + raise ValueError("A component must be provided for forked components") + + return Fork( + args.url, + branch_name=args.fork_branch, + fork_component=args.fork_component, + version=args.version, + ignored_versions=ignore_versions, + ) + + return args.type(args.url, version=args.version, ignored_versions=ignore_versions) + + def main(): # If the sub command is add package, it should pass the parsed arguments to the add_package function and return args = parse_args() @@ -278,19 +309,9 @@ def main(): update=True, ) elif args.url: + new_package = args_to_package(args) unhacs.add_package( - Package( - args.url, - version=args.version, - package_type=args.type, - ignored_versions=( - {version for version in args.ignore_versions.split(",")} - if args.ignore_versions - else None - ), - branch_name=args.branch, - fork_component=args.component, - ), + new_package, update=args.update, ) else: diff --git a/unhacs/packages.py b/unhacs/packages.py deleted file mode 100644 index 1c2b5c4..0000000 --- a/unhacs/packages.py +++ /dev/null @@ -1,489 +0,0 @@ -import json -import shutil -import tempfile -from collections.abc import Generator -from collections.abc import Iterable -from enum import StrEnum -from enum import auto -from io import BytesIO -from pathlib import Path -from typing import Any -from typing import cast -from zipfile import ZipFile - -import requests -import yaml - -from unhacs.git import get_branch_zip -from unhacs.git import get_latest_sha -from unhacs.git import get_repo_tags -from unhacs.git import get_tag_zip - -DEFAULT_HASS_CONFIG_PATH: Path = Path(".") -DEFAULT_PACKAGE_FILE = Path("unhacs.yaml") - - -def extract_zip(zip_file: ZipFile, dest_dir: Path): - for info in zip_file.infolist(): - if info.is_dir(): - continue - file = Path(info.filename) - # Strip top directory from path - file = Path(*file.parts[1:]) - path = dest_dir / file - path.parent.mkdir(parents=True, exist_ok=True) - with zip_file.open(info) as source, open(path, "wb") as dest: - dest.write(source.read()) - - -class PackageType(StrEnum): - INTEGRATION = auto() - PLUGIN = auto() - FORK = auto() - THEME = auto() - - -def get_install_path(hass_config_path: Path, package_type: PackageType) -> Path: - if package_type == PackageType.PLUGIN: - return hass_config_path / "www" / "js" - elif package_type in (PackageType.INTEGRATION, PackageType.FORK): - return hass_config_path / "custom_components" - elif package_type == PackageType.THEME: - return hass_config_path / "themes" - else: - raise NotImplementedError(f"Unknown package type {package_type}") - - -class Package: - git_tags = False - - def __init__( - self, - url: str, - version: str | None = None, - package_type: PackageType = PackageType.INTEGRATION, - ignored_versions: set[str] | None = None, - branch_name: str | None = None, - fork_component: str | None = None, - ): - if package_type == PackageType.FORK and not fork_component: - raise ValueError(f"Fork with no component specified {url}@{branch_name}") - - self.url = url - self.package_type = package_type - self.fork_component = fork_component - self.ignored_versions = ignored_versions or set() - self.branch_name = branch_name - - parts = self.url.split("/") - self.owner = parts[-2] - self.name = parts[-1] - - self.path: Path | None = None - - if not version: - self.version = self.fetch_version_release() - else: - self.version = version - - def __str__(self): - name = self.name - if self.fork_component: - name = f"{self.fork_component} ({name})" - version = self.version - if self.branch_name: - version = f"{version} ({self.branch_name})" - return f"{self.package_type}: {name} {version}" - - def __eq__(self, other): - return all( - ( - self.same(other), - self.fork_component == other.fork_component, - ) - ) - - def same(self, other): - return all( - ( - self.url == other.url, - self.branch_name == other.branch_name, - self.fork_component == other.fork_component, - ) - ) - - def __hash__(self): - return hash((self.url, self.branch_name, self.fork_component)) - - def verbose_str(self): - return f"{str(self)} ({self.url})" - - @staticmethod - def from_yaml(yml: dict) -> "Package": - # Convert package_type to enum - package_type = yml.pop("package_type", None) - if package_type and isinstance(package_type, str): - package_type = PackageType(package_type) - yml["package_type"] = package_type - - return Package(**yml) - - def to_yaml(self: "Package") -> dict: - data: dict[str, Any] = { - "url": self.url, - "version": self.version, - "package_type": str(self.package_type), - } - - if self.branch_name: - data["branch_name"] = self.branch_name - if self.fork_component: - data["fork_component"] = self.fork_component - if self.ignored_versions: - data["ignored_versions"] = self.ignored_versions - - return data - - def add_ignored_version(self, version: str): - self.ignored_versions.add(version) - - def _fetch_version_release_releases(self, version: str | None = None) -> str: - # Fetch the releases from the GitHub API - response = requests.get( - f"https://api.github.com/repos/{self.owner}/{self.name}/releases" - ) - response.raise_for_status() - releases = response.json() - - if not releases: - raise ValueError(f"No releases found for package {self.name}") - - # Default to latest - desired_release = releases[0] - - # If a version is provided, check if it exists in the releases - if version: - for release in releases: - if release["tag_name"] == version: - desired_release = release - break - else: - raise ValueError(f"Version {version} does not exist for this package") - - return cast(str, desired_release["tag_name"]) - - def _fetch_version_release_git(self, version: str | None = None) -> str: - tags = get_repo_tags(self.url) - if not tags: - raise ValueError(f"No tags found for package {self.name}") - if version and version not in tags: - raise ValueError(f"Version {version} does not exist for this package") - - tags = [tag for tag in tags if tag not in self.ignored_versions] - if not version: - version = tags[-1] - - return version - - def _fetch_latest_sha(self, branch_name: str) -> str: - return get_latest_sha(self.url, branch_name) - - def fetch_version_release(self, version: str | None = None) -> str: - if self.branch_name: - return self._fetch_latest_sha(self.branch_name) - elif self.git_tags: - return self._fetch_version_release_git(version) - else: - return self._fetch_version_release_releases(version) - - def fetch_versions(self) -> list[str]: - return get_repo_tags(self.url) - - def get_hacs_json(self, version: str | None = None) -> dict: - """Fetches the hacs.json file for the package.""" - version = version or self.version - response = requests.get( - f"https://raw.githubusercontent.com/{self.owner}/{self.name}/{version}/hacs.json" - ) - - if response.status_code == 404: - return {} - - response.raise_for_status() - return response.json() - - def get_install_path(self, hass_config_path: Path) -> Path: - return get_install_path(hass_config_path, self.package_type) - - def install_plugin(self, hass_config_path: Path): - """Installs the plugin package.""" - - valid_filenames: Iterable[str] - if filename := self.get_hacs_json().get("filename"): - valid_filenames = (cast(str, filename),) - else: - valid_filenames = ( - f"{self.name.removeprefix('lovelace-')}.js", - f"{self.name}.js", - f"{self.name}-umd.js", - f"{self.name}-bundle.js", - ) - - def real_get(filename) -> requests.Response | None: - urls = [ - f"https://raw.githubusercontent.com/{self.owner}/{self.name}/{self.version}/dist/{filename}", - f"https://github.com/{self.owner}/{self.name}/releases/download/{self.version}/{filename}", - f"https://raw.githubusercontent.com/{self.owner}/{self.name}/{self.version}/{filename}", - ] - - for url in urls: - plugin = requests.get(url) - - if int(plugin.status_code / 100) == 4: - continue - - plugin.raise_for_status() - - return plugin - - return None - - for filename in valid_filenames: - plugin = real_get(filename) - if plugin: - break - else: - raise ValueError(f"No valid filename found for package {self.name}") - - js_path = self.get_install_path(hass_config_path) - js_path.mkdir(parents=True, exist_ok=True) - js_path.joinpath(filename).write_text(plugin.text) - - yaml.dump(self.to_yaml(), js_path.joinpath(f"{filename}-unhacs.yaml").open("w")) - - # Write to resources - resources: list[dict] = [] - resources_file = hass_config_path / "resources.yaml" - if resources_file.exists(): - resources = yaml.safe_load(resources_file.open()) or [] - - if not any(r["url"] == f"/local/js/{filename}" for r in resources): - resources.append( - { - "url": f"/local/js/{filename}", - "type": "module", - } - ) - - yaml.dump(resources, resources_file.open("w")) - - def install_theme_component(self, hass_config_path: Path): - filename = self.get_hacs_json().get("filename") - if not filename: - raise ValueError(f"No filename found for theme {self.name}") - - filename = cast(str, filename) - url = f"https://raw.githubusercontent.com/{self.owner}/{self.name}/{self.version}/themes/{filename}" - theme = requests.get(url) - theme.raise_for_status() - - themes_path = self.get_install_path(hass_config_path) - themes_path.mkdir(parents=True, exist_ok=True) - themes_path.joinpath(filename).write_text(theme.text) - - yaml.dump(self.to_yaml(), themes_path.joinpath(f"{filename}.unhacs").open("w")) - - def install_integration(self, hass_config_path: Path): - """Installs the integration package.""" - zipball_url = get_tag_zip(self.url, self.version) - response = requests.get(zipball_url) - response.raise_for_status() - - with tempfile.TemporaryDirectory(prefix="unhacs-") as tempdir: - tmpdir = Path(tempdir) - extract_zip(ZipFile(BytesIO(response.content)), tmpdir) - - source, dest = None, None - for custom_component in tmpdir.glob("custom_components/*"): - source = custom_component - dest = self.get_install_path(hass_config_path) / custom_component.name - break - else: - hacs_json = json.loads((tmpdir / "hacs.json").read_text()) - if hacs_json.get("content_in_root"): - source = tmpdir - dest = self.get_install_path(hass_config_path) / self.name - - if not source or not dest: - raise ValueError("No custom_components directory found") - - dest.parent.mkdir(parents=True, exist_ok=True) - shutil.rmtree(dest, ignore_errors=True) - shutil.move(source, dest) - - yaml.dump(self.to_yaml(), dest.joinpath("unhacs.yaml").open("w")) - - def install_fork_component(self, hass_config_path: Path): - """Installs the integration from hass fork.""" - if not self.fork_component: - raise ValueError(f"No fork component specified for {self.verbose_str()}") - if not self.branch_name: - raise ValueError(f"No branch name specified for {self.verbose_str()}") - - zipball_url = get_branch_zip(self.url, self.branch_name) - response = requests.get(zipball_url) - response.raise_for_status() - - with tempfile.TemporaryDirectory(prefix="unhacs-") as tempdir: - tmpdir = Path(tempdir) - extract_zip(ZipFile(BytesIO(response.content)), tmpdir) - - source, dest = None, None - source = tmpdir / "homeassistant" / "components" / self.fork_component - if not source.exists() or not source.is_dir(): - raise ValueError( - f"Could not find {self.fork_component} in {self.url}@{self.version}" - ) - - # Add version to manifest - manifest_file = source / "manifest.json" - manifest = json.load(manifest_file.open()) - manifest["version"] = "0.0.0" - json.dump(manifest, manifest_file.open("w")) - - dest = self.get_install_path(hass_config_path) / source.name - - if not source or not dest: - raise ValueError("No custom_components directory found") - - dest.parent.mkdir(parents=True, exist_ok=True) - shutil.rmtree(dest, ignore_errors=True) - shutil.move(source, dest) - - yaml.dump(self.to_yaml(), dest.joinpath("unhacs.yaml").open("w")) - - def install(self, hass_config_path: Path): - """Installs the package.""" - if self.package_type == PackageType.PLUGIN: - self.install_plugin(hass_config_path) - elif self.package_type == PackageType.INTEGRATION: - self.install_integration(hass_config_path) - elif self.package_type == PackageType.FORK: - self.install_fork_component(hass_config_path) - elif self.package_type == PackageType.THEME: - self.install_theme_component(hass_config_path) - else: - raise NotImplementedError(f"Unknown package type {self.package_type}") - - def uninstall(self, hass_config_path: Path) -> bool: - """Uninstalls the package if it is installed, returning True if it was uninstalled.""" - if not self.path: - print("No path found for package, searching...") - if installed_package := self.installed_package(hass_config_path): - installed_package.uninstall(hass_config_path) - return True - - return False - - print("Removing", self.path) - - if self.path.is_dir(): - shutil.rmtree(self.path) - else: - self.path.unlink() - self.path.with_name(f"{self.path.name}-unhacs.yaml").unlink() - - # Remove from resources - resources_file = hass_config_path / "resources.yaml" - if resources_file.exists(): - with resources_file.open("r") as f: - resources = yaml.safe_load(f) or [] - new_resources = [ - r for r in resources if r["url"] != f"/local/js/{self.path.name}" - ] - if len(new_resources) != len(resources): - - with resources_file.open("w") as f: - yaml.dump(new_resources, f) - - return True - - def installed_package(self, hass_config_path: Path) -> "Package|None": - """Returns the installed package if it exists, otherwise None.""" - for package in get_installed_packages(hass_config_path, [self.package_type]): - if self.same(package): - return package - - return None - - def is_update(self, hass_config_path: Path) -> bool: - """Returns True if the package is not installed or the installed version is different from the latest.""" - installed_package = self.installed_package(hass_config_path) - return installed_package is None or installed_package.version != self.version - - def get_latest(self) -> "Package": - """Returns a new Package representing the latest version of this package.""" - package = self.to_yaml() - package.pop("version") - return Package(**package) - - -def get_installed_packages( - hass_config_path: Path = DEFAULT_HASS_CONFIG_PATH, - package_types: Iterable[PackageType] = ( - PackageType.INTEGRATION, - PackageType.PLUGIN, - PackageType.THEME, - ), -) -> list[Package]: - # Integration packages - packages: list[Package] = [] - - if PackageType.INTEGRATION in package_types: - for custom_component in get_install_path( - hass_config_path, PackageType.INTEGRATION - ).glob("*"): - unhacs = custom_component / "unhacs.yaml" - if unhacs.exists(): - package = Package.from_yaml(yaml.safe_load(unhacs.open())) - package.path = custom_component - packages.append(package) - - # Plugin packages - if PackageType.PLUGIN in package_types: - for js_unhacs in get_install_path(hass_config_path, PackageType.PLUGIN).glob( - "*-unhacs.yaml" - ): - package = Package.from_yaml(yaml.safe_load(js_unhacs.open())) - package.path = js_unhacs.with_name( - js_unhacs.name.removesuffix("-unhacs.yaml") - ) - packages.append(package) - - # Theme packages - if PackageType.THEME in package_types: - for js_unhacs in get_install_path(hass_config_path, PackageType.THEME).glob( - "*.unhacs" - ): - package = Package.from_yaml(yaml.safe_load(js_unhacs.open())) - package.path = js_unhacs.with_name(js_unhacs.name.removesuffix(".unhacs")) - packages.append(package) - - return packages - - -# Read a list of Packages from a text file in the plain text format "URL version name" -def read_lock_packages(package_file: Path = DEFAULT_PACKAGE_FILE) -> list[Package]: - if package_file.exists(): - return [ - Package.from_yaml(p) - for p in yaml.safe_load(package_file.open())["packages"] - ] - return [] - - -# Write a list of Packages to a text file in the format URL version name -def write_lock_packages( - packages: Iterable[Package], package_file: Path = DEFAULT_PACKAGE_FILE -): - yaml.dump({"packages": [p.to_yaml() for p in packages]}, package_file.open("w")) diff --git a/unhacs/packages/__init__.py b/unhacs/packages/__init__.py new file mode 100644 index 0000000..c38fdad --- /dev/null +++ b/unhacs/packages/__init__.py @@ -0,0 +1,76 @@ +from collections.abc import Iterable +from pathlib import Path +from typing import cast + +import yaml + +from unhacs.packages.common import Package +from unhacs.packages.common import PackageType +from unhacs.packages.fork import Fork +from unhacs.packages.integration import Integration +from unhacs.packages.plugin import Plugin +from unhacs.packages.theme import Theme +from unhacs.utils import DEFAULT_HASS_CONFIG_PATH +from unhacs.utils import DEFAULT_PACKAGE_FILE + + +def from_yaml(data: dict | Path | str) -> Package: + if isinstance(data, Path): + data = yaml.safe_load(data.open()) + elif isinstance(data, str): + data = yaml.safe_load(data) + + data = cast(dict, data) + + # Convert package_type to enum + package_type = data.pop("package_type", None) + if package_type and isinstance(package_type, str): + package_type = PackageType(package_type) + + url = data.pop("url") + + return { + PackageType.INTEGRATION: Integration, + PackageType.PLUGIN: Plugin, + PackageType.THEME: Theme, + PackageType.FORK: Fork, + }[package_type](url, **data) + + +def get_installed_packages( + hass_config_path: Path = DEFAULT_HASS_CONFIG_PATH, + package_types: Iterable[PackageType] = ( + PackageType.INTEGRATION, + PackageType.PLUGIN, + PackageType.THEME, + ), +) -> list[Package]: + # Integration packages + packages: list[Package] = [] + + if PackageType.INTEGRATION in package_types: + packages.extend(Integration.find_installed(hass_config_path)) + + # Plugin packages + if PackageType.PLUGIN in package_types: + packages.extend(Plugin.find_installed(hass_config_path)) + + # Theme packages + if PackageType.THEME in package_types: + packages.extend(Theme.find_installed(hass_config_path)) + + return packages + + +# Read a list of Packages from a text file in the plain text format "URL version name" +def read_lock_packages(package_file: Path = DEFAULT_PACKAGE_FILE) -> list[Package]: + if package_file.exists(): + return [from_yaml(p) for p in yaml.safe_load(package_file.open())["packages"]] + return [] + + +# Write a list of Packages to a text file in the format URL version name +def write_lock_packages( + packages: Iterable[Package], package_file: Path = DEFAULT_PACKAGE_FILE +): + yaml.dump({"packages": [p.to_yaml() for p in packages]}, package_file.open("w")) diff --git a/unhacs/packages/common.py b/unhacs/packages/common.py new file mode 100644 index 0000000..0fe1279 --- /dev/null +++ b/unhacs/packages/common.py @@ -0,0 +1,228 @@ +import shutil +from enum import StrEnum +from enum import auto +from pathlib import Path +from typing import Any +from typing import cast + +import requests +import yaml + +from unhacs.git import get_repo_tags + + +class PackageType(StrEnum): + INTEGRATION = auto() + PLUGIN = auto() + FORK = auto() + THEME = auto() + + +class Package: + git_tags = False + package_type: PackageType + + other_fields: list[str] = [] + + def __init__( + self, + url: str, + version: str | None = None, + ignored_versions: set[str] | None = None, + ): + self.url = url + self.ignored_versions = ignored_versions or set() + + parts = self.url.split("/") + self.owner = parts[-2] + self.name = parts[-1] + + self.path: Path | None = None + + if not version: + self.version = self.fetch_version_release() + else: + self.version = version + + def __str__(self): + return f"{self.package_type}: {self.name} {self.version}" + + def __eq__(self, other): + return all( + ( + self.same(other), + # TODO: Should this match versions? + ) + ) + + def same(self, other): + fields = list(["url"] + self.other_fields) + + return all((getattr(self, field) == getattr(other, field) for field in fields)) + + def __hash__(self): + fields = list(["url"] + self.other_fields) + + return hash(tuple(getattr(self, field) for field in fields)) + + def verbose_str(self): + return f"{str(self)} ({self.url})" + + @classmethod + def from_yaml(cls, data: dict | Path | str) -> "Package": + if isinstance(data, Path): + data = yaml.safe_load(data.open()) + elif isinstance(data, str): + data = yaml.safe_load(data) + + data = cast(dict, data) + + if data["package_type"] != cls.package_type: + raise ValueError("Invalid package_type") + + return cls(data.pop("url"), **data) + + def to_yaml(self, dest: Path | None = None) -> dict: + data: dict[str, Any] = { + "url": self.url, + "version": self.version, + "package_type": str(self.package_type), + } + + if self.ignored_versions: + data["ignored_versions"] = self.ignored_versions + + for field in self.other_fields: + if hasattr(self, field): + data[field] = getattr(self, field) + + if dest: + yaml.dump(self.to_yaml(), dest.open("w")) + + return data + + def add_ignored_version(self, version: str): + self.ignored_versions.add(version) + + def _fetch_version_release_releases(self, version: str | None = None) -> str: + # Fetch the releases from the GitHub API + response = requests.get( + f"https://api.github.com/repos/{self.owner}/{self.name}/releases" + ) + response.raise_for_status() + releases = response.json() + + if not releases: + raise ValueError(f"No releases found for package {self.name}") + + # Default to latest + desired_release = releases[0] + + # If a version is provided, check if it exists in the releases + if version: + for release in releases: + if release["tag_name"] == version: + desired_release = release + break + else: + raise ValueError(f"Version {version} does not exist for this package") + + return cast(str, desired_release["tag_name"]) + + def _fetch_version_release_git(self, version: str | None = None) -> str: + tags = get_repo_tags(self.url) + if not tags: + raise ValueError(f"No tags found for package {self.name}") + if version and version not in tags: + raise ValueError(f"Version {version} does not exist for this package") + + tags = [tag for tag in tags if tag not in self.ignored_versions] + if not version: + version = tags[-1] + + return version + + def fetch_version_release(self, version: str | None = None) -> str: + if self.git_tags: + return self._fetch_version_release_git(version) + else: + return self._fetch_version_release_releases(version) + + def _fetch_versions(self) -> list[str]: + return get_repo_tags(self.url) + + def get_hacs_json(self, version: str | None = None) -> dict: + """Fetches the hacs.json file for the package.""" + version = version or self.version + response = requests.get( + f"https://raw.githubusercontent.com/{self.owner}/{self.name}/{version}/hacs.json" + ) + + if response.status_code == 404: + return {} + + response.raise_for_status() + return response.json() + + def install(self, hass_config_path: Path): + raise NotImplementedError() + + def uninstall(self, hass_config_path: Path) -> bool: + """Uninstalls the package if it is installed, returning True if it was uninstalled.""" + if not self.path: + print("No path found for package, searching...") + if installed_package := self.installed_package(hass_config_path): + installed_package.uninstall(hass_config_path) + return True + + return False + + print("Removing", self.path) + + if self.path.is_dir(): + shutil.rmtree(self.path) + else: + self.path.unlink() + self.path.with_name(f"{self.path.name}-unhacs.yaml").unlink() + + # Remove from resources + resources_file = hass_config_path / "resources.yaml" + if resources_file.exists(): + with resources_file.open("r") as f: + resources = yaml.safe_load(f) or [] + new_resources = [ + r for r in resources if r["url"] != f"/local/js/{self.path.name}" + ] + if len(new_resources) != len(resources): + + with resources_file.open("w") as f: + yaml.dump(new_resources, f) + + return True + + @classmethod + def get_install_dir(cls, hass_config_path: Path) -> Path: + raise NotImplementedError() + + @classmethod + def find_installed(cls, hass_config_path: Path) -> list["Package"]: + raise NotImplementedError() + + def installed_package(self, hass_config_path: Path) -> "Package|None": + """Returns the installed package if it exists, otherwise None.""" + for package in self.find_installed(hass_config_path): + if self.same(package): + return package + + return None + + def is_update(self, hass_config_path: Path) -> bool: + """Returns True if the package is not installed or the installed version is different from the latest.""" + installed_package = self.installed_package(hass_config_path) + return installed_package is None or installed_package.version != self.version + + def get_latest(self) -> "Package": + """Returns a new Package representing the latest version of this package.""" + package = self.to_yaml() + package.pop("version") + return Package(**package) diff --git a/unhacs/packages/fork.py b/unhacs/packages/fork.py new file mode 100644 index 0000000..b96266c --- /dev/null +++ b/unhacs/packages/fork.py @@ -0,0 +1,76 @@ +import json +import shutil +import tempfile +from io import BytesIO +from pathlib import Path +from zipfile import ZipFile + +import requests + +from unhacs.git import get_branch_zip +from unhacs.git import get_latest_sha +from unhacs.packages import PackageType +from unhacs.packages.integration import Integration +from unhacs.utils import extract_zip + + +class Fork(Integration): + other_fields = ["fork_component", "branch_name"] + package_type = PackageType.FORK + + def __init__( + self, + url: str, + fork_component: str, + branch_name: str, + version: str | None = None, + ignored_versions: set[str] | None = None, + ): + self.fork_component = fork_component + self.branch_name = branch_name + + super().__init__( + url, + version=version, + ignored_versions=ignored_versions, + ) + + def __str__(self): + return f"{self.package_type}: {self.fork_component} ({self.owner}/{self.name}@{self.branch_name}) {self.version}" + + def fetch_version_release(self, version: str | None = None) -> str: + return get_latest_sha(self.url, self.branch_name) + + def install(self, hass_config_path: Path) -> None: + """Installs the integration from hass fork.""" + zipball_url = get_branch_zip(self.url, self.branch_name) + response = requests.get(zipball_url) + response.raise_for_status() + + with tempfile.TemporaryDirectory(prefix="unhacs-") as tempdir: + tmpdir = Path(tempdir) + extract_zip(ZipFile(BytesIO(response.content)), tmpdir) + + source, dest = None, None + source = tmpdir / "homeassistant" / "components" / self.fork_component + if not source.exists() or not source.is_dir(): + raise ValueError( + f"Could not find {self.fork_component} in {self.url}@{self.version}" + ) + + # Add version to manifest + manifest_file = source / "manifest.json" + manifest = json.load(manifest_file.open()) + manifest["version"] = "0.0.0" + json.dump(manifest, manifest_file.open("w")) + + dest = self.get_install_dir(hass_config_path) / source.name + + if not source or not dest: + raise ValueError("No custom_components directory found") + + dest.parent.mkdir(parents=True, exist_ok=True) + shutil.rmtree(dest, ignore_errors=True) + shutil.move(source, dest) + + self.to_yaml(dest.joinpath("unhacs.yaml")) diff --git a/unhacs/packages/integration.py b/unhacs/packages/integration.py new file mode 100644 index 0000000..9f0466b --- /dev/null +++ b/unhacs/packages/integration.py @@ -0,0 +1,76 @@ +import json +import shutil +import tempfile +from io import BytesIO +from pathlib import Path +from zipfile import ZipFile + +import requests + +from unhacs.git import get_tag_zip +from unhacs.packages import Package +from unhacs.packages import PackageType +from unhacs.utils import extract_zip + + +class Integration(Package): + package_type = PackageType.INTEGRATION + + def __init__( + self, + url: str, + version: str | None = None, + ignored_versions: set[str] | None = None, + ): + super().__init__( + url, + version=version, + ignored_versions=ignored_versions, + ) + + @classmethod + def get_install_dir(cls, hass_config_path: Path) -> Path: + return hass_config_path / "custom_components" + + @classmethod + def find_installed(cls, hass_config_path: Path) -> list["Package"]: + packages: list[Package] = [] + + for custom_component in cls.get_install_dir(hass_config_path).glob("*"): + unhacs = custom_component / "unhacs.yaml" + if unhacs.exists(): + package = cls.from_yaml(unhacs) + package.path = custom_component + packages.append(package) + + return packages + + def install(self, hass_config_path: Path) -> None: + """Installs the integration package.""" + zipball_url = get_tag_zip(self.url, self.version) + response = requests.get(zipball_url) + response.raise_for_status() + + with tempfile.TemporaryDirectory(prefix="unhacs-") as tempdir: + tmpdir = Path(tempdir) + extract_zip(ZipFile(BytesIO(response.content)), tmpdir) + + source, dest = None, None + for custom_component in tmpdir.glob("custom_components/*"): + source = custom_component + dest = self.get_install_dir(hass_config_path) / custom_component.name + break + else: + hacs_json = json.loads((tmpdir / "hacs.json").read_text()) + if hacs_json.get("content_in_root"): + source = tmpdir + dest = self.get_install_dir(hass_config_path) / self.name + + if not source or not dest: + raise ValueError("No custom_components directory found") + + dest.parent.mkdir(parents=True, exist_ok=True) + shutil.rmtree(dest, ignore_errors=True) + shutil.move(source, dest) + + self.to_yaml(dest.joinpath("unhacs.yaml")) diff --git a/unhacs/packages/plugin.py b/unhacs/packages/plugin.py new file mode 100644 index 0000000..c48648e --- /dev/null +++ b/unhacs/packages/plugin.py @@ -0,0 +1,86 @@ +from pathlib import Path +from typing import cast + +import requests + +from unhacs.packages import Package +from unhacs.packages import PackageType + + +class Plugin(Package): + package_type = PackageType.PLUGIN + + def __init__( + self, + url: str, + version: str | None = None, + ignored_versions: set[str] | None = None, + ): + super().__init__( + url, + version=version, + ignored_versions=ignored_versions, + ) + + @classmethod + def get_install_dir(cls, hass_config_path: Path) -> Path: + return hass_config_path / "www" / "js" + + @classmethod + def find_installed(cls, hass_config_path: Path) -> list["Package"]: + packages: list[Package] = [] + + for js_unhacs in cls.get_install_dir(hass_config_path).glob("*-unhacs.yaml"): + package = cls.from_yaml(js_unhacs) + package.path = js_unhacs.with_name( + js_unhacs.name.removesuffix("-unhacs.yaml") + ) + packages.append(package) + + return packages + + def install(self, hass_config_path: Path) -> None: + """Installs the plugin package.""" + + valid_filenames: list[str] + if filename := self.get_hacs_json().get("filename"): + valid_filenames = [cast(str, filename)] + else: + valid_filenames = [ + f"{self.name.removeprefix('lovelace-')}.js", + f"{self.name}.js", + f"{self.name}-umd.js", + f"{self.name}-bundle.js", + ] + + def real_get(filename) -> requests.Response | None: + urls = [ + f"https://raw.githubusercontent.com/{self.owner}/{self.name}/{self.version}/dist/{filename}", + f"https://github.com/{self.owner}/{self.name}/releases/download/{self.version}/{filename}", + f"https://raw.githubusercontent.com/{self.owner}/{self.name}/{self.version}/{filename}", + ] + + for url in urls: + plugin = requests.get(url) + + if int(plugin.status_code / 100) == 4: + continue + + plugin.raise_for_status() + + return plugin + + return None + + for filename in valid_filenames: + plugin = real_get(filename) + if plugin: + break + else: + raise ValueError(f"No valid filename found for package {self.name}") + + js_path = self.get_install_dir(hass_config_path) + js_path.mkdir(parents=True, exist_ok=True) + js_path.joinpath(filename).write_text(plugin.text) + + self.to_yaml(js_path.joinpath(f"{filename}-unhacs.yaml")) diff --git a/unhacs/packages/theme.py b/unhacs/packages/theme.py new file mode 100644 index 0000000..81178ac --- /dev/null +++ b/unhacs/packages/theme.py @@ -0,0 +1,57 @@ +from pathlib import Path +from typing import cast + +import requests + +from unhacs.packages import Package +from unhacs.packages import PackageType + + +class Theme(Package): + package_type = PackageType.THEME + + def __init__( + self, + url: str, + version: str | None = None, + ignored_versions: set[str] | None = None, + ): + super().__init__( + url, + version=version, + ignored_versions=ignored_versions, + ) + + @classmethod + def get_install_dir(cls, hass_config_path: Path) -> Path: + return hass_config_path / "themes" + + @classmethod + def find_installed(cls, hass_config_path: Path) -> list["Package"]: + packages: list[Package] = [] + + for js_unhacs in cls.get_install_dir(hass_config_path).glob("*-unhacs.yaml"): + package = cls.from_yaml(js_unhacs) + package.path = js_unhacs.with_name( + js_unhacs.name.removesuffix("-unhacs.yaml") + ) + packages.append(package) + + return packages + + def install(self, hass_config_path: Path) -> None: + """Install theme yaml.""" + filename = self.get_hacs_json().get("filename") + if not filename: + raise ValueError(f"No filename found for theme {self.name}") + + filename = cast(str, filename) + url = f"https://raw.githubusercontent.com/{self.owner}/{self.name}/{self.version}/themes/{filename}" + theme = requests.get(url) + theme.raise_for_status() + + themes_path = self.get_install_dir(hass_config_path) + themes_path.mkdir(parents=True, exist_ok=True) + themes_path.joinpath(filename).write_text(theme.text) + + self.to_yaml(themes_path.joinpath(f"{filename}.unhacs")) diff --git a/unhacs/utils.py b/unhacs/utils.py new file mode 100644 index 0000000..ae78cd5 --- /dev/null +++ b/unhacs/utils.py @@ -0,0 +1,21 @@ +from pathlib import Path +from zipfile import ZipFile + +DEFAULT_HASS_CONFIG_PATH: Path = Path(".") +DEFAULT_PACKAGE_FILE = Path("unhacs.yaml") + + +def extract_zip(zip_file: ZipFile, dest_dir: Path) -> Path: + """Extract a zip file to a directory.""" + for info in zip_file.infolist(): + if info.is_dir(): + continue + file = Path(info.filename) + # Strip top directory from path + file = Path(*file.parts[1:]) + path = dest_dir / file + path.parent.mkdir(parents=True, exist_ok=True) + with zip_file.open(info) as source, open(path, "wb") as dest: + dest.write(source.read()) + + return dest_dir