diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index e6d32d3..4b9461d 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -18,8 +18,9 @@ repos: rev: 5.13.2 hooks: - id: isort + args: ["--profile", "black", "--filter-files"] - repo: https://github.com/pre-commit/mirrors-mypy - rev: v1.10.0 + rev: v1.10.1 hooks: - id: mypy exclude: docs/ diff --git a/unhacs/main.py b/unhacs/main.py index ce1df5d..f9664ad 100644 --- a/unhacs/main.py +++ b/unhacs/main.py @@ -5,6 +5,7 @@ from pathlib import Path from unhacs.packages import DEFAULT_HASS_CONFIG_PATH from unhacs.packages import DEFAULT_PACKAGE_FILE from unhacs.packages import Package +from unhacs.packages import PackageType from unhacs.packages import get_installed_packages from unhacs.packages import read_lock_packages from unhacs.packages import write_lock_packages @@ -38,10 +39,13 @@ def create_parser(): add_parser.add_argument( "--file", "-f", type=Path, help="The path to a package file." ) - add_parser.add_argument("url", nargs="?", type=str, help="The URL of the package.") add_parser.add_argument( - "name", type=str, nargs="?", help="The name of the package." + "--type", + "-t", + type=PackageType, + help="The type of the package. Defaults to 'integration'.", ) + add_parser.add_argument("url", nargs="?", type=str, help="The URL of the package.") add_parser.add_argument( "--version", "-v", type=str, help="The version of the package." ) @@ -83,12 +87,12 @@ class Unhacs: def add_package( self, package_url: str, - package_name: str | None = None, version: str | None = None, update: bool = False, + package_type: PackageType = PackageType.INTEGRATION, ): """Install and add a package to the lock or install a specific version.""" - package = Package(name=package_name, url=package_url, version=version) + package = Package(url=package_url, version=version, package_type=package_type) packages = self.read_lock_packages() # Raise an error if the package is already in the list @@ -116,7 +120,7 @@ class Unhacs: ] upgrade_packages: list[Package] = [] - latest_packages = [Package(name=p.name, url=p.url) for p in installed_packages] + latest_packages = [Package(url=p.url) for p in installed_packages] for installed_package, latest_package in zip( installed_packages, latest_packages ): @@ -179,10 +183,15 @@ def main(): packages = read_lock_packages(args.file) for package in packages: unhacs.add_package( - package.url, package.name, package.version, update=True + package.url, + package.version, + update=True, + package_type=package.package_type, ) elif args.url: - unhacs.add_package(args.url, args.name, args.version, args.update) + unhacs.add_package( + args.url, args.version, args.update, package_type=args.type + ) else: raise ValueError("Either a file or a URL must be provided") elif args.subcommand == "list": diff --git a/unhacs/packages.py b/unhacs/packages.py index 4dd544b..5b90cd9 100644 --- a/unhacs/packages.py +++ b/unhacs/packages.py @@ -2,10 +2,11 @@ import json import shutil import tempfile from collections.abc import Iterable +from enum import StrEnum +from enum import auto from io import BytesIO from pathlib import Path from typing import cast -from urllib.parse import urlparse from zipfile import ZipFile import requests @@ -27,31 +28,38 @@ def extract_zip(zip_file: ZipFile, dest_dir: Path): dest.write(source.read()) +class PackageType(StrEnum): + INTEGRATION = auto() + PLUGIN = auto() + + class Package: url: str owner: str - repo: str + name: str version: str download_url: str - name: str path: Path | None = None + package_type: PackageType = PackageType.INTEGRATION - def __init__(self, url: str, version: str | None = None, name: str | None = None): + def __init__( + self, + url: str, + version: str | None = None, + package_type: PackageType = PackageType.INTEGRATION, + ): self.url = url + self.package_type = package_type parts = self.url.split("/") self.owner = parts[-2] - self.repo = parts[-1] + self.name = parts[-1] if not version: self.version, self.download_url = self.fetch_version_release(version) else: self.version = version - parts = url.split("/") - repo = parts[-1] - self.name = name or repo - def __str__(self): return f"{self.name} {self.version}" @@ -66,17 +74,23 @@ class Package: return f"{self.name} {self.version} ({self.url})" def serialize(self) -> str: - return f"{self.url} {self.version} {self.name}" + return f"{self.url} {self.version} {self.package_type}" @staticmethod def deserialize(serialized: str) -> "Package": - url, version, name = serialized.split() - return Package(url, version, name) + url, version, package_type = serialized.split() + + # TODO: Use a less ambiguous serialization format that's still easy to read. Maybe TOML? + try: + package_type = PackageType(package_type) + except ValueError: + package_type = PackageType.INTEGRATION + return Package(url, version, package_type=package_type) def fetch_version_release(self, version: str | None = None) -> tuple[str, str]: # Fetch the releases from the GitHub API response = requests.get( - f"https://api.github.com/repos/{self.owner}/{self.repo}/releases" + f"https://api.github.com/repos/{self.owner}/{self.name}/releases" ) response.raise_for_status() releases = response.json() @@ -99,14 +113,22 @@ class Package: version = cast(str, desired_release["tag_name"]) hacs_json = self.get_hacs_json(version) + # Based on type, if we have no hacs json, we can provide some possible paths for the download but won't know + # If a plugin: + # First, check in root/dist/ for a js file named the same as the repo or with "lovelace-" prefix removed + # Second will be looking for a realeases for a js file named the same name as the repo or with loveace- prefix removed + # Third will be looking in the root dir for a js file named the same as the repo or with loveace- prefix removed + # If an integration: + # We always use the zipball_url + download_url = None - if hacs_json.get("content_in_root", True): - download_url = cast(str, desired_release["zipball_url"]) - elif filename := hacs_json.get("filename"): + if filename := hacs_json.get("filename"): for asset in desired_release["assets"]: if asset["name"] == filename: download_url = cast(str, asset["browser_download_url"]) break + else: + download_url = cast(str, desired_release["zipball_url"]) if not download_url: raise ValueError("No filename found in hacs.json") @@ -116,43 +138,96 @@ class Package: def get_hacs_json(self, version: str | None = None) -> dict: version = version or self.version response = requests.get( - f"https://raw.githubusercontent.com/{self.owner}/{self.repo}/{version}/hacs.json" + f"https://raw.githubusercontent.com/{self.owner}/{self.name}/{version}/hacs.json" ) + + if response.status_code == 404: + return {} + response.raise_for_status() return response.json() - def install(self, hass_config_path: Path, replace: bool = True): - # Fetch the download for the specified version - if not self.download_url: - _, self.download_url = self.fetch_version_release(self.version) + def install_plugin(self, hass_config_path: Path): + # First, check in root/dist/ for a js file named the same as the repo or with "lovelace-" prefix removed + # Second will be looking for a realeases for a js file named the same name as the repo or with loveace- prefix removed + # Third will be looking in the root dir for a js file named the same as the repo or with loveace- prefix removed + # If none of these are found, raise an error + # If a file is found, write it to www/js/.js and write a file www/js/-unhacs.txt with the + # serialized package - response = requests.get(self.download_url) + filename = f"{self.name.removeprefix('lovelace-')}.js" + print(filename) + + hacs_json = self.get_hacs_json() + if hacs_json.get("filename"): + filename = hacs_json["filename"] + plugin = requests.get( + f"https://github.com/{self.owner}/{self.name}/releases/download/{self.version}/{filename}" + ) + else: + # Get dist file path URL + plugin = requests.get( + f"https://raw.githubusercontent.com/{self.owner}/{self.version}/dist/{filename}" + ) + if plugin.status_code == 404: + plugin = requests.get( + f"https://github.com/{self.owner}/{self.name}/releases/download/{self.version}/{filename}" + ) + plugin.raise_for_status() + if plugin.status_code == 404: + plugin = requests.get( + f"https://raw.githubusercontent.com/{self.owner}/{self.version}/{filename}" + ) + + plugin.raise_for_status() + + js_path = hass_config_path / "www" / "js" + js_path.mkdir(parents=True, exist_ok=True) + js_path.joinpath(filename).write_text(plugin.text) + + js_path.joinpath(f"{filename}-unhacs.txt").write_text(self.serialize()) + + def install_integration(self, hass_config_path: Path): + zipball_url = f"https://codeload.github.com/{self.owner}/{self.name}/zip/refs/tags/{self.version}" + response = requests.get(zipball_url) response.raise_for_status() - if "/zipball/" in self.download_url: - # Extract the zip to a temporary directory - with tempfile.TemporaryDirectory(prefix="unhacs-") as tempdir: - tmpdir = Path(tempdir) - extract_zip(ZipFile(BytesIO(response.content)), tmpdir) + with tempfile.TemporaryDirectory(prefix="unhacs-") as tempdir: + tmpdir = Path(tempdir) + extract_zip(ZipFile(BytesIO(response.content)), tmpdir) - for custom_component in tmpdir.glob("custom_components/*"): - dest = ( - hass_config_path / "custom_components" / custom_component.name - ) - dest.mkdir(parents=True, exist_ok=True) - if replace: - shutil.rmtree(dest, ignore_errors=True) + # If an integration, check for a custom_component directory and install contents + # If not present, check the hacs.json file for content_in_root to true, if so install + # the root to custom_components/ + hacs_json = json.loads((tmpdir / "hacs.json").read_text()) - shutil.move(custom_component, dest) - dest.joinpath("unhacs.txt").write_text(self.serialize()) - elif self.download_url.endswith(".js"): - basename = urlparse(self.download_url).path.split("/")[-1] - js_path = hass_config_path / "www" / "js" - js_path.mkdir(parents=True, exist_ok=True) - js_path.joinpath(basename).write_text(response.text) - js_path.joinpath(f"{basename}-unhacs.txt").write_text(self.serialize()) + source, dest = None, None + for custom_component in tmpdir.glob("custom_components/*"): + source = custom_component + dest = hass_config_path / "custom_components" / custom_component.name + break + else: + if hacs_json.get("content_in_root"): + source = tmpdir + dest = hass_config_path / "custom_components" / self.name + + if not source or not dest: + raise ValueError("No custom_components directory found") + + dest.parent.mkdir(parents=True, exist_ok=True) + shutil.rmtree(dest, ignore_errors=True) + shutil.move(source, dest) + + dest.joinpath("unhacs.txt").write_text(self.serialize()) + + def install(self, hass_config_path: Path): + print(self.package_type) + if self.package_type == PackageType.PLUGIN: + self.install_plugin(hass_config_path) + elif self.package_type == PackageType.INTEGRATION: + self.install_integration(hass_config_path) else: - raise ValueError(f"Unknown download type: {self.download_url}") + raise NotImplementedError(f"Unknown package type {self.package_type}") def uninstall(self, hass_config_path: Path) -> bool: if self.path: