Add support for js installs

This commit is contained in:
IamTheFij 2024-07-05 15:25:29 -07:00
parent a77889eadb
commit 5a72d690e6

View File

@ -4,6 +4,8 @@ import tempfile
from collections.abc import Iterable from collections.abc import Iterable
from io import BytesIO from io import BytesIO
from pathlib import Path from pathlib import Path
from typing import cast
from urllib.parse import urlparse
from zipfile import ZipFile from zipfile import ZipFile
import requests import requests
@ -27,16 +29,22 @@ def extract_zip(zip_file: ZipFile, dest_dir: Path):
class Package: class Package:
url: str url: str
owner: str
repo: str
version: str version: str
zip_url: str download_url: str
name: str name: str
path: Path | None = None path: Path | None = None
def __init__(self, url: str, version: str | None = None, name: str | None = None): def __init__(self, url: str, version: str | None = None, name: str | None = None):
self.url = url self.url = url
parts = self.url.split("/")
self.owner = parts[-2]
self.repo = parts[-1]
if not version: if not version:
self.version, self.zip_url = self.fetch_version_release(version) self.version, self.download_url = self.fetch_version_release(version)
else: else:
self.version = version self.version = version
@ -67,59 +75,96 @@ class Package:
def fetch_version_release(self, version: str | None = None) -> tuple[str, str]: def fetch_version_release(self, version: str | None = None) -> tuple[str, str]:
# Fetch the releases from the GitHub API # Fetch the releases from the GitHub API
parts = self.url.split("/") response = requests.get(
owner = parts[-2] f"https://api.github.com/repos/{self.owner}/{self.repo}/releases"
repo = parts[-1] )
response = requests.get(f"https://api.github.com/repos/{owner}/{repo}/releases")
response.raise_for_status() response.raise_for_status()
releases = response.json() releases = response.json()
if not releases: if not releases:
raise ValueError(f"No releases found for package {self.name}") raise ValueError(f"No releases found for package {self.name}")
# Default to latest
desired_release = releases[0]
# If a version is provided, check if it exists in the releases # If a version is provided, check if it exists in the releases
if version: if version:
for release in releases: for release in releases:
if release["tag_name"] == version: if release["tag_name"] == version:
return version, release["zipball_url"] desired_release = release
break
else: else:
raise ValueError(f"Version {version} does not exist for this package") raise ValueError(f"Version {version} does not exist for this package")
# If no version is provided, use the latest release
return releases[0]["tag_name"], releases[0]["zipball_url"] version = cast(str, desired_release["tag_name"])
hacs_json = self.get_hacs_json(version)
download_url = None
if hacs_json.get("content_in_root", True):
download_url = cast(str, desired_release["zipball_url"])
elif filename := hacs_json.get("filename"):
for asset in desired_release["assets"]:
if asset["name"] == filename:
download_url = cast(str, asset["browser_download_url"])
break
if not download_url:
raise ValueError("No filename found in hacs.json")
return version, download_url
def get_hacs_json(self, version: str | None = None) -> dict:
version = version or self.version
response = requests.get(
f"https://raw.githubusercontent.com/{self.owner}/{self.repo}/{version}/hacs.json"
)
response.raise_for_status()
return response.json()
def install(self, hass_config_path: Path, replace: bool = True): def install(self, hass_config_path: Path, replace: bool = True):
# Fetch the release zip with the specified version # Fetch the download for the specified version
if not self.zip_url: if not self.download_url:
_, self.zip_url = self.fetch_version_release(self.version) _, self.download_url = self.fetch_version_release(self.version)
response = requests.get(self.zip_url) response = requests.get(self.download_url)
response.raise_for_status() response.raise_for_status()
if "/zipball/" in self.download_url:
# Extract the zip to a temporary directory # Extract the zip to a temporary directory
with tempfile.TemporaryDirectory(prefix="unhacs-") as tempdir: with tempfile.TemporaryDirectory(prefix="unhacs-") as tempdir:
tmpdir = Path(tempdir) tmpdir = Path(tempdir)
extract_zip(ZipFile(BytesIO(response.content)), tmpdir) extract_zip(ZipFile(BytesIO(response.content)), tmpdir)
hacs = json.loads((tmpdir / "hacs.json").read_text())
print("Hacs?", hacs)
for custom_component in tmpdir.glob("custom_components/*"): for custom_component in tmpdir.glob("custom_components/*"):
dest = hass_config_path / "custom_components" / custom_component.name dest = (
hass_config_path / "custom_components" / custom_component.name
)
dest.mkdir(parents=True, exist_ok=True)
if replace: if replace:
shutil.rmtree(dest, ignore_errors=True) shutil.rmtree(dest, ignore_errors=True)
shutil.move(custom_component, dest) shutil.move(custom_component, dest)
dest.joinpath("unhacs.txt").write_text(self.serialize()) dest.joinpath("unhacs.txt").write_text(self.serialize())
elif self.download_url.endswith(".js"):
basename = urlparse(self.download_url).path.split("/")[-1]
js_path = hass_config_path / "www" / "js"
js_path.mkdir(parents=True, exist_ok=True)
js_path.joinpath(basename).write_text(response.text)
js_path.joinpath(f"{basename}-unhacs.txt").write_text(self.serialize())
else:
raise ValueError(f"Unknown download type: {self.download_url}")
def uninstall(self, hass_config_path: Path) -> bool: def uninstall(self, hass_config_path: Path) -> bool:
if self.path: if self.path:
if self.path.is_dir():
shutil.rmtree(self.path) shutil.rmtree(self.path)
else:
self.path.unlink()
return True return True
installed_package = self.installed_package(hass_config_path) installed_package = self.installed_package(hass_config_path)
if installed_package and installed_package.path: if installed_package:
shutil.rmtree(installed_package.path) installed_package.uninstall(hass_config_path)
return True return True
return False return False
@ -135,6 +180,18 @@ class Package:
and installed_package.url == self.url and installed_package.url == self.url
): ):
return installed_package return installed_package
for js_unhacs in (hass_config_path / "www" / "js").glob("*-unhacs.txt"):
installed_package = Package.deserialize(js_unhacs.read_text())
installed_package.path = js_unhacs.with_name(
js_unhacs.name.removesuffix("-unhacs.txt")
)
if (
installed_package.name == self.name
and installed_package.url == self.url
):
return installed_package
return None return None
def is_update(self, hass_config_path: Path) -> bool: def is_update(self, hass_config_path: Path) -> bool:
@ -152,6 +209,10 @@ def get_installed_packages(
package = Package.deserialize(unhacs.read_text()) package = Package.deserialize(unhacs.read_text())
package.path = custom_component package.path = custom_component
packages.append(package) packages.append(package)
for js_unhacs in (hass_config_path / "www" / "js").glob("*-unhacs.txt"):
package = Package.deserialize(js_unhacs.read_text())
package.path = js_unhacs.with_name(js_unhacs.name.removesuffix("-unhacs.txt"))
packages.append(package)
return packages return packages