Support non-integration packages

This commit is contained in:
IamTheFij 2024-07-06 14:21:30 -07:00
parent a424e22236
commit 31286add39
3 changed files with 136 additions and 51 deletions

View File

@ -18,8 +18,9 @@ repos:
rev: 5.13.2
hooks:
- id: isort
args: ["--profile", "black", "--filter-files"]
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.10.0
rev: v1.10.1
hooks:
- id: mypy
exclude: docs/

View File

@ -5,6 +5,7 @@ from pathlib import Path
from unhacs.packages import DEFAULT_HASS_CONFIG_PATH
from unhacs.packages import DEFAULT_PACKAGE_FILE
from unhacs.packages import Package
from unhacs.packages import PackageType
from unhacs.packages import get_installed_packages
from unhacs.packages import read_lock_packages
from unhacs.packages import write_lock_packages
@ -38,10 +39,13 @@ def create_parser():
add_parser.add_argument(
"--file", "-f", type=Path, help="The path to a package file."
)
add_parser.add_argument("url", nargs="?", type=str, help="The URL of the package.")
add_parser.add_argument(
"name", type=str, nargs="?", help="The name of the package."
"--type",
"-t",
type=PackageType,
help="The type of the package. Defaults to 'integration'.",
)
add_parser.add_argument("url", nargs="?", type=str, help="The URL of the package.")
add_parser.add_argument(
"--version", "-v", type=str, help="The version of the package."
)
@ -83,12 +87,12 @@ class Unhacs:
def add_package(
self,
package_url: str,
package_name: str | None = None,
version: str | None = None,
update: bool = False,
package_type: PackageType = PackageType.INTEGRATION,
):
"""Install and add a package to the lock or install a specific version."""
package = Package(name=package_name, url=package_url, version=version)
package = Package(url=package_url, version=version, package_type=package_type)
packages = self.read_lock_packages()
# Raise an error if the package is already in the list
@ -116,7 +120,7 @@ class Unhacs:
]
upgrade_packages: list[Package] = []
latest_packages = [Package(name=p.name, url=p.url) for p in installed_packages]
latest_packages = [Package(url=p.url) for p in installed_packages]
for installed_package, latest_package in zip(
installed_packages, latest_packages
):
@ -179,10 +183,15 @@ def main():
packages = read_lock_packages(args.file)
for package in packages:
unhacs.add_package(
package.url, package.name, package.version, update=True
package.url,
package.version,
update=True,
package_type=package.package_type,
)
elif args.url:
unhacs.add_package(args.url, args.name, args.version, args.update)
unhacs.add_package(
args.url, args.version, args.update, package_type=args.type
)
else:
raise ValueError("Either a file or a URL must be provided")
elif args.subcommand == "list":

View File

@ -2,10 +2,11 @@ import json
import shutil
import tempfile
from collections.abc import Iterable
from enum import StrEnum
from enum import auto
from io import BytesIO
from pathlib import Path
from typing import cast
from urllib.parse import urlparse
from zipfile import ZipFile
import requests
@ -27,31 +28,38 @@ def extract_zip(zip_file: ZipFile, dest_dir: Path):
dest.write(source.read())
class PackageType(StrEnum):
INTEGRATION = auto()
PLUGIN = auto()
class Package:
url: str
owner: str
repo: str
name: str
version: str
download_url: str
name: str
path: Path | None = None
package_type: PackageType = PackageType.INTEGRATION
def __init__(self, url: str, version: str | None = None, name: str | None = None):
def __init__(
self,
url: str,
version: str | None = None,
package_type: PackageType = PackageType.INTEGRATION,
):
self.url = url
self.package_type = package_type
parts = self.url.split("/")
self.owner = parts[-2]
self.repo = parts[-1]
self.name = parts[-1]
if not version:
self.version, self.download_url = self.fetch_version_release(version)
else:
self.version = version
parts = url.split("/")
repo = parts[-1]
self.name = name or repo
def __str__(self):
return f"{self.name} {self.version}"
@ -66,17 +74,23 @@ class Package:
return f"{self.name} {self.version} ({self.url})"
def serialize(self) -> str:
return f"{self.url} {self.version} {self.name}"
return f"{self.url} {self.version} {self.package_type}"
@staticmethod
def deserialize(serialized: str) -> "Package":
url, version, name = serialized.split()
return Package(url, version, name)
url, version, package_type = serialized.split()
# TODO: Use a less ambiguous serialization format that's still easy to read. Maybe TOML?
try:
package_type = PackageType(package_type)
except ValueError:
package_type = PackageType.INTEGRATION
return Package(url, version, package_type=package_type)
def fetch_version_release(self, version: str | None = None) -> tuple[str, str]:
# Fetch the releases from the GitHub API
response = requests.get(
f"https://api.github.com/repos/{self.owner}/{self.repo}/releases"
f"https://api.github.com/repos/{self.owner}/{self.name}/releases"
)
response.raise_for_status()
releases = response.json()
@ -99,14 +113,22 @@ class Package:
version = cast(str, desired_release["tag_name"])
hacs_json = self.get_hacs_json(version)
# Based on type, if we have no hacs json, we can provide some possible paths for the download but won't know
# If a plugin:
# First, check in root/dist/ for a js file named the same as the repo or with "lovelace-" prefix removed
# Second will be looking for a realeases for a js file named the same name as the repo or with loveace- prefix removed
# Third will be looking in the root dir for a js file named the same as the repo or with loveace- prefix removed
# If an integration:
# We always use the zipball_url
download_url = None
if hacs_json.get("content_in_root", True):
download_url = cast(str, desired_release["zipball_url"])
elif filename := hacs_json.get("filename"):
if filename := hacs_json.get("filename"):
for asset in desired_release["assets"]:
if asset["name"] == filename:
download_url = cast(str, asset["browser_download_url"])
break
else:
download_url = cast(str, desired_release["zipball_url"])
if not download_url:
raise ValueError("No filename found in hacs.json")
@ -116,43 +138,96 @@ class Package:
def get_hacs_json(self, version: str | None = None) -> dict:
version = version or self.version
response = requests.get(
f"https://raw.githubusercontent.com/{self.owner}/{self.repo}/{version}/hacs.json"
f"https://raw.githubusercontent.com/{self.owner}/{self.name}/{version}/hacs.json"
)
if response.status_code == 404:
return {}
response.raise_for_status()
return response.json()
def install(self, hass_config_path: Path, replace: bool = True):
# Fetch the download for the specified version
if not self.download_url:
_, self.download_url = self.fetch_version_release(self.version)
def install_plugin(self, hass_config_path: Path):
# First, check in root/dist/ for a js file named the same as the repo or with "lovelace-" prefix removed
# Second will be looking for a realeases for a js file named the same name as the repo or with loveace- prefix removed
# Third will be looking in the root dir for a js file named the same as the repo or with loveace- prefix removed
# If none of these are found, raise an error
# If a file is found, write it to www/js/<filename>.js and write a file www/js/<filename>-unhacs.txt with the
# serialized package
response = requests.get(self.download_url)
filename = f"{self.name.removeprefix('lovelace-')}.js"
print(filename)
hacs_json = self.get_hacs_json()
if hacs_json.get("filename"):
filename = hacs_json["filename"]
plugin = requests.get(
f"https://github.com/{self.owner}/{self.name}/releases/download/{self.version}/{filename}"
)
else:
# Get dist file path URL
plugin = requests.get(
f"https://raw.githubusercontent.com/{self.owner}/{self.version}/dist/{filename}"
)
if plugin.status_code == 404:
plugin = requests.get(
f"https://github.com/{self.owner}/{self.name}/releases/download/{self.version}/{filename}"
)
plugin.raise_for_status()
if plugin.status_code == 404:
plugin = requests.get(
f"https://raw.githubusercontent.com/{self.owner}/{self.version}/{filename}"
)
plugin.raise_for_status()
js_path = hass_config_path / "www" / "js"
js_path.mkdir(parents=True, exist_ok=True)
js_path.joinpath(filename).write_text(plugin.text)
js_path.joinpath(f"{filename}-unhacs.txt").write_text(self.serialize())
def install_integration(self, hass_config_path: Path):
zipball_url = f"https://codeload.github.com/{self.owner}/{self.name}/zip/refs/tags/{self.version}"
response = requests.get(zipball_url)
response.raise_for_status()
if "/zipball/" in self.download_url:
# Extract the zip to a temporary directory
with tempfile.TemporaryDirectory(prefix="unhacs-") as tempdir:
tmpdir = Path(tempdir)
extract_zip(ZipFile(BytesIO(response.content)), tmpdir)
for custom_component in tmpdir.glob("custom_components/*"):
dest = (
hass_config_path / "custom_components" / custom_component.name
)
dest.mkdir(parents=True, exist_ok=True)
if replace:
shutil.rmtree(dest, ignore_errors=True)
# If an integration, check for a custom_component directory and install contents
# If not present, check the hacs.json file for content_in_root to true, if so install
# the root to custom_components/<package_name>
hacs_json = json.loads((tmpdir / "hacs.json").read_text())
shutil.move(custom_component, dest)
dest.joinpath("unhacs.txt").write_text(self.serialize())
elif self.download_url.endswith(".js"):
basename = urlparse(self.download_url).path.split("/")[-1]
js_path = hass_config_path / "www" / "js"
js_path.mkdir(parents=True, exist_ok=True)
js_path.joinpath(basename).write_text(response.text)
js_path.joinpath(f"{basename}-unhacs.txt").write_text(self.serialize())
source, dest = None, None
for custom_component in tmpdir.glob("custom_components/*"):
source = custom_component
dest = hass_config_path / "custom_components" / custom_component.name
break
else:
raise ValueError(f"Unknown download type: {self.download_url}")
if hacs_json.get("content_in_root"):
source = tmpdir
dest = hass_config_path / "custom_components" / self.name
if not source or not dest:
raise ValueError("No custom_components directory found")
dest.parent.mkdir(parents=True, exist_ok=True)
shutil.rmtree(dest, ignore_errors=True)
shutil.move(source, dest)
dest.joinpath("unhacs.txt").write_text(self.serialize())
def install(self, hass_config_path: Path):
print(self.package_type)
if self.package_type == PackageType.PLUGIN:
self.install_plugin(hass_config_path)
elif self.package_type == PackageType.INTEGRATION:
self.install_integration(hass_config_path)
else:
raise NotImplementedError(f"Unknown package type {self.package_type}")
def uninstall(self, hass_config_path: Path) -> bool:
if self.path: