Support for installing hass fork components

This commit is contained in:
IamTheFij 2024-07-17 21:09:36 -07:00
parent dddb8484e2
commit 3f2aeadbc5
3 changed files with 173 additions and 40 deletions

View File

@ -67,5 +67,26 @@ def get_repo_tags(repository_url: str) -> list[str]:
return [tag.name for tag in tags] return [tag.name for tag in tags]
def get_ref_zip(repository_url: str, tag_name: str) -> str: def get_latest_sha(repository_url: str, branch_name: str) -> str:
command = f"git ls-remote {repository_url} {branch_name}"
result = subprocess.run(
command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
# Check for errors
if result.returncode != 0:
raise Exception(f"Error running command: {command}\n{result.stderr.decode()}")
for line in result.stdout.decode().split("\n"):
if line:
return line.partition("\t")[0]
raise ValueError(f"branch name '{branch_name}' not found for {repository_url}")
def get_tag_zip(repository_url: str, tag_name: str) -> str:
return f"{repository_url}/archive/refs/tags/{tag_name}.zip" return f"{repository_url}/archive/refs/tags/{tag_name}.zip"
def get_branch_zip(repository_url: str, branch_name: str) -> str:
return f"{repository_url}/archive/{branch_name}.zip"

View File

@ -12,7 +12,7 @@ from unhacs.packages import read_lock_packages
from unhacs.packages import write_lock_packages from unhacs.packages import write_lock_packages
def create_parser(): def parse_args():
parser = ArgumentParser( parser = ArgumentParser(
description="Unhacs - Command line interface for the Home Assistant Community Store" description="Unhacs - Command line interface for the Home Assistant Community Store"
) )
@ -68,14 +68,31 @@ def create_parser():
dest="type", dest="type",
const=PackageType.INTEGRATION, const=PackageType.INTEGRATION,
default=PackageType.INTEGRATION, default=PackageType.INTEGRATION,
help="The package is an integration.",
) )
package_type_group.add_argument( package_type_group.add_argument(
"--plugin", action="store_const", dest="type", const=PackageType.PLUGIN "--plugin",
action="store_const",
dest="type",
const=PackageType.PLUGIN,
help="The package is a JavaScript plugin.",
)
package_type_group.add_argument(
"--forked-component",
type=str,
dest="component",
help="Component name from a forked type.",
) )
add_parser.add_argument( add_parser.add_argument(
"--version", "-v", type=str, help="The version of the package." "--version", "-v", type=str, help="The version of the package."
) )
add_parser.add_argument(
"--branch",
"-b",
type=str,
help="For foked types only, branch that should be used.",
)
add_parser.add_argument( add_parser.add_argument(
"--update", "--update",
"-u", "-u",
@ -101,7 +118,20 @@ def create_parser():
) )
update_parser.add_argument("packages", nargs="*") update_parser.add_argument("packages", nargs="*")
return parser args = parser.parse_args()
if args.subcommand == "add":
# Component implies forked package
if args.component and args.type != PackageType.FORK:
args.type = PackageType.FORK
# Branch is only valid for forked packages
if args.type != PackageType.FORK and args.branch:
raise ValueError(
"Branch and component can only be used with forked packages"
)
return args
class Unhacs: class Unhacs:
@ -121,19 +151,10 @@ class Unhacs:
def add_package( def add_package(
self, self,
package_url: str, package: Package,
version: str | None = None,
update: bool = False, update: bool = False,
package_type: PackageType = PackageType.INTEGRATION,
ignore_versions: set[str] | None = None,
): ):
"""Install and add a package to the lock or install a specific version.""" """Install and add a package to the lock or install a specific version."""
package = Package(
package_url,
version=version,
package_type=package_type,
ignored_versions=ignore_versions,
)
packages = self.read_lock_packages() packages = self.read_lock_packages()
# Raise an error if the package is already in the list # Raise an error if the package is already in the list
@ -201,8 +222,20 @@ class Unhacs:
packages_to_remove = [ packages_to_remove = [
package package
for package in get_installed_packages() for package in get_installed_packages()
if (package.name in package_names or package.url in package_names) if (
package.name in package_names
or package.url in package_names
or package.fork_component in package_names
)
] ]
if packages_to_remove and input("Remove all packages? (y/N) ").lower() != "y":
return
if package_names and not packages_to_remove:
print("No packages found to remove")
return
remaining_packages = [ remaining_packages = [
package package
for package in self.read_lock_packages() for package in self.read_lock_packages()
@ -217,8 +250,7 @@ class Unhacs:
def main(): def main():
# If the sub command is add package, it should pass the parsed arguments to the add_package function and return # If the sub command is add package, it should pass the parsed arguments to the add_package function and return
parser = create_parser() args = parse_args()
args = parser.parse_args()
unhacs = Unhacs(args.config, args.package_file) unhacs = Unhacs(args.config, args.package_file)
Package.git_tags = args.git_tags Package.git_tags = args.git_tags
@ -229,23 +261,24 @@ def main():
packages = read_lock_packages(args.file) packages = read_lock_packages(args.file)
for package in packages: for package in packages:
unhacs.add_package( unhacs.add_package(
package.url, package,
package.version,
update=True, update=True,
package_type=package.package_type,
ignore_versions=package.ignored_versions,
) )
elif args.url: elif args.url:
unhacs.add_package( unhacs.add_package(
Package(
args.url, args.url,
version=args.version, version=args.version,
update=args.update,
package_type=args.type, package_type=args.type,
ignore_versions=( ignored_versions=(
{version for version in args.ignore_versions.split(",")} {version for version in args.ignore_versions.split(",")}
if args.ignore_versions if args.ignore_versions
else None else None
), ),
branch_name=args.branch,
fork_component=args.component,
),
update=args.update,
) )
else: else:
raise ValueError("Either a file or a URL must be provided") raise ValueError("Either a file or a URL must be provided")

View File

@ -7,14 +7,17 @@ from enum import StrEnum
from enum import auto from enum import auto
from io import BytesIO from io import BytesIO
from pathlib import Path from pathlib import Path
from typing import Any
from typing import cast from typing import cast
from zipfile import ZipFile from zipfile import ZipFile
import requests import requests
import yaml import yaml
from unhacs.git import get_ref_zip from unhacs.git import get_branch_zip
from unhacs.git import get_latest_sha
from unhacs.git import get_repo_tags from unhacs.git import get_repo_tags
from unhacs.git import get_tag_zip
DEFAULT_HASS_CONFIG_PATH: Path = Path(".") DEFAULT_HASS_CONFIG_PATH: Path = Path(".")
DEFAULT_PACKAGE_FILE = Path("unhacs.yaml") DEFAULT_PACKAGE_FILE = Path("unhacs.yaml")
@ -36,6 +39,7 @@ def extract_zip(zip_file: ZipFile, dest_dir: Path):
class PackageType(StrEnum): class PackageType(StrEnum):
INTEGRATION = auto() INTEGRATION = auto()
PLUGIN = auto() PLUGIN = auto()
FORK = auto()
class Package: class Package:
@ -47,10 +51,17 @@ class Package:
version: str | None = None, version: str | None = None,
package_type: PackageType = PackageType.INTEGRATION, package_type: PackageType = PackageType.INTEGRATION,
ignored_versions: set[str] | None = None, ignored_versions: set[str] | None = None,
branch_name: str | None = None,
fork_component: str | None = None,
): ):
if package_type == PackageType.FORK and not fork_component:
raise ValueError(f"Fork with no component specified {url}@{branch_name}")
self.url = url self.url = url
self.package_type = package_type self.package_type = package_type
self.fork_component = fork_component
self.ignored_versions = ignored_versions or set() self.ignored_versions = ignored_versions or set()
self.branch_name = branch_name
parts = self.url.split("/") parts = self.url.split("/")
self.owner = parts[-2] self.owner = parts[-2]
@ -64,31 +75,53 @@ class Package:
self.version = version self.version = version
def __str__(self): def __str__(self):
return f"{self.name} {self.version}" name = self.name
if self.fork_component:
name = f"{self.fork_component} ({name})"
version = self.version
if self.branch_name:
version = f"{version} ({self.branch_name})"
return f"{self.package_type}: {name} {version}"
def __eq__(self, other): def __eq__(self, other):
return self.url == other.url and self.version == other.version return all(
(
self.url == other.url,
self.version == other.version,
self.branch_name == other.branch_name,
self.fork_component == other.fork_component,
)
)
def verbose_str(self): def verbose_str(self):
return f"{self.name} {self.version} ({self.url})" return f"{str(self)} ({self.url})"
@staticmethod @staticmethod
def from_yaml(yaml: dict) -> "Package": def from_yaml(yml: dict) -> "Package":
# Convert package_type to enum # Convert package_type to enum
package_type = yaml.pop("package_type", None) package_type = yml.pop("package_type", None)
if package_type and isinstance(package_type, str): if package_type and isinstance(package_type, str):
package_type = PackageType(package_type) package_type = PackageType(package_type)
yaml["package_type"] = package_type yml["package_type"] = package_type
return Package(**yaml) return Package(**yml)
def to_yaml(self: "Package") -> dict: def to_yaml(self: "Package") -> dict:
return { data: dict[str, Any] = {
"url": self.url, "url": self.url,
"version": self.version, "version": self.version,
"package_type": str(self.package_type), "package_type": str(self.package_type),
} }
if self.branch_name:
data["branch_name"] = self.branch_name
if self.fork_component:
data["fork_component"] = self.fork_component
if self.ignored_versions:
data["ignored_versions"] = self.ignored_versions
return data
def add_ignored_version(self, version: str): def add_ignored_version(self, version: str):
self.ignored_versions.add(version) self.ignored_versions.add(version)
@ -130,8 +163,13 @@ class Package:
return version return version
def _fetch_latest_sha(self, branch_name: str) -> str:
return get_latest_sha(self.url, branch_name)
def fetch_version_release(self, version: str | None = None) -> str: def fetch_version_release(self, version: str | None = None) -> str:
if self.git_tags: if self.branch_name:
return self._fetch_latest_sha(self.branch_name)
elif self.git_tags:
return self._fetch_version_release_git(version) return self._fetch_version_release_git(version)
else: else:
return self._fetch_version_release_releases(version) return self._fetch_version_release_releases(version)
@ -216,7 +254,7 @@ class Package:
def install_integration(self, hass_config_path: Path): def install_integration(self, hass_config_path: Path):
"""Installs the integration package.""" """Installs the integration package."""
zipball_url = get_ref_zip(self.url, self.version) zipball_url = get_tag_zip(self.url, self.version)
response = requests.get(zipball_url) response = requests.get(zipball_url)
response.raise_for_status() response.raise_for_status()
@ -244,12 +282,53 @@ class Package:
yaml.dump(self.to_yaml(), dest.joinpath("unhacs.yaml").open("w")) yaml.dump(self.to_yaml(), dest.joinpath("unhacs.yaml").open("w"))
def install_fork_component(self, hass_config_path: Path):
"""Installs the integration from hass fork."""
if not self.fork_component:
raise ValueError(f"No fork component specified for {self.verbose_str()}")
if not self.branch_name:
raise ValueError(f"No branch name specified for {self.verbose_str()}")
zipball_url = get_branch_zip(self.url, self.branch_name)
response = requests.get(zipball_url)
response.raise_for_status()
with tempfile.TemporaryDirectory(prefix="unhacs-") as tempdir:
tmpdir = Path(tempdir)
extract_zip(ZipFile(BytesIO(response.content)), tmpdir)
source, dest = None, None
source = tmpdir / "homeassistant" / "components" / self.fork_component
if not source.exists() or not source.is_dir():
raise ValueError(
f"Could not find {self.fork_component} in {self.url}@{self.version}"
)
# Add version to manifest
manifest_file = source / "manifest.json"
manifest = json.load(manifest_file.open())
manifest["version"] = "0.0.0"
json.dump(manifest, manifest_file.open("w"))
dest = hass_config_path / "custom_components" / source.name
if not source or not dest:
raise ValueError("No custom_components directory found")
dest.parent.mkdir(parents=True, exist_ok=True)
shutil.rmtree(dest, ignore_errors=True)
shutil.move(source, dest)
yaml.dump(self.to_yaml(), dest.joinpath("unhacs.yaml").open("w"))
def install(self, hass_config_path: Path): def install(self, hass_config_path: Path):
"""Installs the package.""" """Installs the package."""
if self.package_type == PackageType.PLUGIN: if self.package_type == PackageType.PLUGIN:
self.install_plugin(hass_config_path) self.install_plugin(hass_config_path)
elif self.package_type == PackageType.INTEGRATION: elif self.package_type == PackageType.INTEGRATION:
self.install_integration(hass_config_path) self.install_integration(hass_config_path)
elif self.package_type == PackageType.FORK:
self.install_fork_component(hass_config_path)
else: else:
raise NotImplementedError(f"Unknown package type {self.package_type}") raise NotImplementedError(f"Unknown package type {self.package_type}")