diff --git a/unhacs/packages.py b/unhacs/packages.py index 3c5ba99..4dd544b 100644 --- a/unhacs/packages.py +++ b/unhacs/packages.py @@ -4,6 +4,8 @@ import tempfile from collections.abc import Iterable from io import BytesIO from pathlib import Path +from typing import cast +from urllib.parse import urlparse from zipfile import ZipFile import requests @@ -27,16 +29,22 @@ def extract_zip(zip_file: ZipFile, dest_dir: Path): class Package: url: str + owner: str + repo: str version: str - zip_url: str + download_url: str name: str path: Path | None = None def __init__(self, url: str, version: str | None = None, name: str | None = None): self.url = url + parts = self.url.split("/") + self.owner = parts[-2] + self.repo = parts[-1] + if not version: - self.version, self.zip_url = self.fetch_version_release(version) + self.version, self.download_url = self.fetch_version_release(version) else: self.version = version @@ -67,59 +75,96 @@ class Package: def fetch_version_release(self, version: str | None = None) -> tuple[str, str]: # Fetch the releases from the GitHub API - parts = self.url.split("/") - owner = parts[-2] - repo = parts[-1] - - response = requests.get(f"https://api.github.com/repos/{owner}/{repo}/releases") + response = requests.get( + f"https://api.github.com/repos/{self.owner}/{self.repo}/releases" + ) response.raise_for_status() releases = response.json() if not releases: raise ValueError(f"No releases found for package {self.name}") + # Default to latest + desired_release = releases[0] + # If a version is provided, check if it exists in the releases if version: for release in releases: if release["tag_name"] == version: - return version, release["zipball_url"] + desired_release = release + break else: raise ValueError(f"Version {version} does not exist for this package") - # If no version is provided, use the latest release - return releases[0]["tag_name"], releases[0]["zipball_url"] + + version = cast(str, desired_release["tag_name"]) + hacs_json = self.get_hacs_json(version) + + download_url = None + if hacs_json.get("content_in_root", True): + download_url = cast(str, desired_release["zipball_url"]) + elif filename := hacs_json.get("filename"): + for asset in desired_release["assets"]: + if asset["name"] == filename: + download_url = cast(str, asset["browser_download_url"]) + break + + if not download_url: + raise ValueError("No filename found in hacs.json") + + return version, download_url + + def get_hacs_json(self, version: str | None = None) -> dict: + version = version or self.version + response = requests.get( + f"https://raw.githubusercontent.com/{self.owner}/{self.repo}/{version}/hacs.json" + ) + response.raise_for_status() + return response.json() def install(self, hass_config_path: Path, replace: bool = True): - # Fetch the release zip with the specified version - if not self.zip_url: - _, self.zip_url = self.fetch_version_release(self.version) + # Fetch the download for the specified version + if not self.download_url: + _, self.download_url = self.fetch_version_release(self.version) - response = requests.get(self.zip_url) + response = requests.get(self.download_url) response.raise_for_status() - # Extract the zip to a temporary directory - with tempfile.TemporaryDirectory(prefix="unhacs-") as tempdir: - tmpdir = Path(tempdir) - extract_zip(ZipFile(BytesIO(response.content)), tmpdir) + if "/zipball/" in self.download_url: + # Extract the zip to a temporary directory + with tempfile.TemporaryDirectory(prefix="unhacs-") as tempdir: + tmpdir = Path(tempdir) + extract_zip(ZipFile(BytesIO(response.content)), tmpdir) - hacs = json.loads((tmpdir / "hacs.json").read_text()) - print("Hacs?", hacs) + for custom_component in tmpdir.glob("custom_components/*"): + dest = ( + hass_config_path / "custom_components" / custom_component.name + ) + dest.mkdir(parents=True, exist_ok=True) + if replace: + shutil.rmtree(dest, ignore_errors=True) - for custom_component in tmpdir.glob("custom_components/*"): - dest = hass_config_path / "custom_components" / custom_component.name - if replace: - shutil.rmtree(dest, ignore_errors=True) - - shutil.move(custom_component, dest) - dest.joinpath("unhacs.txt").write_text(self.serialize()) + shutil.move(custom_component, dest) + dest.joinpath("unhacs.txt").write_text(self.serialize()) + elif self.download_url.endswith(".js"): + basename = urlparse(self.download_url).path.split("/")[-1] + js_path = hass_config_path / "www" / "js" + js_path.mkdir(parents=True, exist_ok=True) + js_path.joinpath(basename).write_text(response.text) + js_path.joinpath(f"{basename}-unhacs.txt").write_text(self.serialize()) + else: + raise ValueError(f"Unknown download type: {self.download_url}") def uninstall(self, hass_config_path: Path) -> bool: if self.path: - shutil.rmtree(self.path) + if self.path.is_dir(): + shutil.rmtree(self.path) + else: + self.path.unlink() return True installed_package = self.installed_package(hass_config_path) - if installed_package and installed_package.path: - shutil.rmtree(installed_package.path) + if installed_package: + installed_package.uninstall(hass_config_path) return True return False @@ -135,6 +180,18 @@ class Package: and installed_package.url == self.url ): return installed_package + + for js_unhacs in (hass_config_path / "www" / "js").glob("*-unhacs.txt"): + installed_package = Package.deserialize(js_unhacs.read_text()) + installed_package.path = js_unhacs.with_name( + js_unhacs.name.removesuffix("-unhacs.txt") + ) + if ( + installed_package.name == self.name + and installed_package.url == self.url + ): + return installed_package + return None def is_update(self, hass_config_path: Path) -> bool: @@ -152,6 +209,10 @@ def get_installed_packages( package = Package.deserialize(unhacs.read_text()) package.path = custom_component packages.append(package) + for js_unhacs in (hass_config_path / "www" / "js").glob("*-unhacs.txt"): + package = Package.deserialize(js_unhacs.read_text()) + package.path = js_unhacs.with_name(js_unhacs.name.removesuffix("-unhacs.txt")) + packages.append(package) return packages