Use subclasses instead of one class for all package types

This commit is contained in:
IamTheFij 2024-09-18 09:00:32 -07:00
parent ee7ae5785c
commit 9d87f7748c
10 changed files with 677 additions and 523 deletions

2
.gitignore vendored
View File

@ -141,3 +141,5 @@ cython_debug/
tags tags
unhacs.txt unhacs.txt
poetry.lock poetry.lock
custom_components/
themes/

View File

@ -3,13 +3,17 @@ from collections.abc import Iterable
from pathlib import Path from pathlib import Path
from unhacs.git import get_repo_tags from unhacs.git import get_repo_tags
from unhacs.packages import DEFAULT_HASS_CONFIG_PATH
from unhacs.packages import DEFAULT_PACKAGE_FILE
from unhacs.packages import Package from unhacs.packages import Package
from unhacs.packages import PackageType from unhacs.packages import PackageType
from unhacs.packages import get_installed_packages from unhacs.packages import get_installed_packages
from unhacs.packages import read_lock_packages from unhacs.packages import read_lock_packages
from unhacs.packages import write_lock_packages from unhacs.packages import write_lock_packages
from unhacs.packages.fork import Fork
from unhacs.packages.integration import Integration
from unhacs.packages.plugin import Plugin
from unhacs.packages.theme import Theme
from unhacs.utils import DEFAULT_HASS_CONFIG_PATH
from unhacs.utils import DEFAULT_PACKAGE_FILE
def parse_args(): def parse_args():
@ -66,40 +70,40 @@ def parse_args():
"--integration", "--integration",
action="store_const", action="store_const",
dest="type", dest="type",
const=PackageType.INTEGRATION, const=Integration,
default=PackageType.INTEGRATION, default=Integration,
help="The package is an integration.", help="The package is an integration.",
) )
package_type_group.add_argument( package_type_group.add_argument(
"--plugin", "--plugin",
action="store_const", action="store_const",
dest="type", dest="type",
const=PackageType.PLUGIN, const=Plugin,
help="The package is a JavaScript plugin.", help="The package is a JavaScript plugin.",
) )
package_type_group.add_argument(
"--forked-component",
type=str,
dest="component",
help="Component name from a forked type.",
)
package_type_group.add_argument( package_type_group.add_argument(
"--theme", "--theme",
action="store_const", action="store_const",
dest="type", dest="type",
const=PackageType.THEME, const=Theme,
help="The package is a theme.", help="The package is a theme.",
) )
package_type_group.add_argument(
"--fork-component",
type=str,
help="Name of component from forked core repo.",
)
# Additional arguments for forked packages
add_parser.add_argument(
"--fork-branch",
"-b",
type=str,
help="Name of branch of forked core repo. (Only for forked components.)",
)
add_parser.add_argument( add_parser.add_argument(
"--version", "-v", type=str, help="The version of the package." "--version", "-v", type=str, help="The version of the package."
) )
add_parser.add_argument(
"--branch",
"-b",
type=str,
help="For forked types only, branch that should be used.",
)
add_parser.add_argument( add_parser.add_argument(
"--update", "--update",
"-u", "-u",
@ -129,11 +133,11 @@ def parse_args():
if args.subcommand == "add": if args.subcommand == "add":
# Component implies forked package # Component implies forked package
if args.component and args.type != PackageType.FORK: if args.fork_component and args.type != Fork:
args.type = PackageType.FORK args.type = Fork
# Branch is only valid for forked packages # Branch is only valid for forked packages
if args.type != PackageType.FORK and args.branch: if args.type != Fork and args.fork_branch:
raise ValueError( raise ValueError(
"Branch and component can only be used with forked packages" "Branch and component can only be used with forked packages"
) )
@ -231,7 +235,10 @@ class Unhacs:
if ( if (
package.name in package_names package.name in package_names
or package.url in package_names or package.url in package_names
or package.fork_component in package_names or (
hasattr(package, "fork_component")
and getattr(package, "fork_component") in package_names
)
) )
] ]
@ -261,6 +268,30 @@ class Unhacs:
self.write_lock_packages(remaining_packages) self.write_lock_packages(remaining_packages)
def args_to_package(args) -> Package:
ignore_versions = (
{version for version in args.ignore_versions.split(",")}
if args.ignore_versions
else None
)
if args.type == Fork:
if not args.fork_branch:
raise ValueError("A branch must be provided for forked components")
if not args.fork_component:
raise ValueError("A component must be provided for forked components")
return Fork(
args.url,
branch_name=args.fork_branch,
fork_component=args.fork_component,
version=args.version,
ignored_versions=ignore_versions,
)
return args.type(args.url, version=args.version, ignored_versions=ignore_versions)
def main(): def main():
# If the sub command is add package, it should pass the parsed arguments to the add_package function and return # If the sub command is add package, it should pass the parsed arguments to the add_package function and return
args = parse_args() args = parse_args()
@ -278,19 +309,9 @@ def main():
update=True, update=True,
) )
elif args.url: elif args.url:
new_package = args_to_package(args)
unhacs.add_package( unhacs.add_package(
Package( new_package,
args.url,
version=args.version,
package_type=args.type,
ignored_versions=(
{version for version in args.ignore_versions.split(",")}
if args.ignore_versions
else None
),
branch_name=args.branch,
fork_component=args.component,
),
update=args.update, update=args.update,
) )
else: else:

View File

@ -1,489 +0,0 @@
import json
import shutil
import tempfile
from collections.abc import Generator
from collections.abc import Iterable
from enum import StrEnum
from enum import auto
from io import BytesIO
from pathlib import Path
from typing import Any
from typing import cast
from zipfile import ZipFile
import requests
import yaml
from unhacs.git import get_branch_zip
from unhacs.git import get_latest_sha
from unhacs.git import get_repo_tags
from unhacs.git import get_tag_zip
DEFAULT_HASS_CONFIG_PATH: Path = Path(".")
DEFAULT_PACKAGE_FILE = Path("unhacs.yaml")
def extract_zip(zip_file: ZipFile, dest_dir: Path):
for info in zip_file.infolist():
if info.is_dir():
continue
file = Path(info.filename)
# Strip top directory from path
file = Path(*file.parts[1:])
path = dest_dir / file
path.parent.mkdir(parents=True, exist_ok=True)
with zip_file.open(info) as source, open(path, "wb") as dest:
dest.write(source.read())
class PackageType(StrEnum):
INTEGRATION = auto()
PLUGIN = auto()
FORK = auto()
THEME = auto()
def get_install_path(hass_config_path: Path, package_type: PackageType) -> Path:
if package_type == PackageType.PLUGIN:
return hass_config_path / "www" / "js"
elif package_type in (PackageType.INTEGRATION, PackageType.FORK):
return hass_config_path / "custom_components"
elif package_type == PackageType.THEME:
return hass_config_path / "themes"
else:
raise NotImplementedError(f"Unknown package type {package_type}")
class Package:
git_tags = False
def __init__(
self,
url: str,
version: str | None = None,
package_type: PackageType = PackageType.INTEGRATION,
ignored_versions: set[str] | None = None,
branch_name: str | None = None,
fork_component: str | None = None,
):
if package_type == PackageType.FORK and not fork_component:
raise ValueError(f"Fork with no component specified {url}@{branch_name}")
self.url = url
self.package_type = package_type
self.fork_component = fork_component
self.ignored_versions = ignored_versions or set()
self.branch_name = branch_name
parts = self.url.split("/")
self.owner = parts[-2]
self.name = parts[-1]
self.path: Path | None = None
if not version:
self.version = self.fetch_version_release()
else:
self.version = version
def __str__(self):
name = self.name
if self.fork_component:
name = f"{self.fork_component} ({name})"
version = self.version
if self.branch_name:
version = f"{version} ({self.branch_name})"
return f"{self.package_type}: {name} {version}"
def __eq__(self, other):
return all(
(
self.same(other),
self.fork_component == other.fork_component,
)
)
def same(self, other):
return all(
(
self.url == other.url,
self.branch_name == other.branch_name,
self.fork_component == other.fork_component,
)
)
def __hash__(self):
return hash((self.url, self.branch_name, self.fork_component))
def verbose_str(self):
return f"{str(self)} ({self.url})"
@staticmethod
def from_yaml(yml: dict) -> "Package":
# Convert package_type to enum
package_type = yml.pop("package_type", None)
if package_type and isinstance(package_type, str):
package_type = PackageType(package_type)
yml["package_type"] = package_type
return Package(**yml)
def to_yaml(self: "Package") -> dict:
data: dict[str, Any] = {
"url": self.url,
"version": self.version,
"package_type": str(self.package_type),
}
if self.branch_name:
data["branch_name"] = self.branch_name
if self.fork_component:
data["fork_component"] = self.fork_component
if self.ignored_versions:
data["ignored_versions"] = self.ignored_versions
return data
def add_ignored_version(self, version: str):
self.ignored_versions.add(version)
def _fetch_version_release_releases(self, version: str | None = None) -> str:
# Fetch the releases from the GitHub API
response = requests.get(
f"https://api.github.com/repos/{self.owner}/{self.name}/releases"
)
response.raise_for_status()
releases = response.json()
if not releases:
raise ValueError(f"No releases found for package {self.name}")
# Default to latest
desired_release = releases[0]
# If a version is provided, check if it exists in the releases
if version:
for release in releases:
if release["tag_name"] == version:
desired_release = release
break
else:
raise ValueError(f"Version {version} does not exist for this package")
return cast(str, desired_release["tag_name"])
def _fetch_version_release_git(self, version: str | None = None) -> str:
tags = get_repo_tags(self.url)
if not tags:
raise ValueError(f"No tags found for package {self.name}")
if version and version not in tags:
raise ValueError(f"Version {version} does not exist for this package")
tags = [tag for tag in tags if tag not in self.ignored_versions]
if not version:
version = tags[-1]
return version
def _fetch_latest_sha(self, branch_name: str) -> str:
return get_latest_sha(self.url, branch_name)
def fetch_version_release(self, version: str | None = None) -> str:
if self.branch_name:
return self._fetch_latest_sha(self.branch_name)
elif self.git_tags:
return self._fetch_version_release_git(version)
else:
return self._fetch_version_release_releases(version)
def fetch_versions(self) -> list[str]:
return get_repo_tags(self.url)
def get_hacs_json(self, version: str | None = None) -> dict:
"""Fetches the hacs.json file for the package."""
version = version or self.version
response = requests.get(
f"https://raw.githubusercontent.com/{self.owner}/{self.name}/{version}/hacs.json"
)
if response.status_code == 404:
return {}
response.raise_for_status()
return response.json()
def get_install_path(self, hass_config_path: Path) -> Path:
return get_install_path(hass_config_path, self.package_type)
def install_plugin(self, hass_config_path: Path):
"""Installs the plugin package."""
valid_filenames: Iterable[str]
if filename := self.get_hacs_json().get("filename"):
valid_filenames = (cast(str, filename),)
else:
valid_filenames = (
f"{self.name.removeprefix('lovelace-')}.js",
f"{self.name}.js",
f"{self.name}-umd.js",
f"{self.name}-bundle.js",
)
def real_get(filename) -> requests.Response | None:
urls = [
f"https://raw.githubusercontent.com/{self.owner}/{self.name}/{self.version}/dist/{filename}",
f"https://github.com/{self.owner}/{self.name}/releases/download/{self.version}/{filename}",
f"https://raw.githubusercontent.com/{self.owner}/{self.name}/{self.version}/{filename}",
]
for url in urls:
plugin = requests.get(url)
if int(plugin.status_code / 100) == 4:
continue
plugin.raise_for_status()
return plugin
return None
for filename in valid_filenames:
plugin = real_get(filename)
if plugin:
break
else:
raise ValueError(f"No valid filename found for package {self.name}")
js_path = self.get_install_path(hass_config_path)
js_path.mkdir(parents=True, exist_ok=True)
js_path.joinpath(filename).write_text(plugin.text)
yaml.dump(self.to_yaml(), js_path.joinpath(f"{filename}-unhacs.yaml").open("w"))
# Write to resources
resources: list[dict] = []
resources_file = hass_config_path / "resources.yaml"
if resources_file.exists():
resources = yaml.safe_load(resources_file.open()) or []
if not any(r["url"] == f"/local/js/{filename}" for r in resources):
resources.append(
{
"url": f"/local/js/{filename}",
"type": "module",
}
)
yaml.dump(resources, resources_file.open("w"))
def install_theme_component(self, hass_config_path: Path):
filename = self.get_hacs_json().get("filename")
if not filename:
raise ValueError(f"No filename found for theme {self.name}")
filename = cast(str, filename)
url = f"https://raw.githubusercontent.com/{self.owner}/{self.name}/{self.version}/themes/{filename}"
theme = requests.get(url)
theme.raise_for_status()
themes_path = self.get_install_path(hass_config_path)
themes_path.mkdir(parents=True, exist_ok=True)
themes_path.joinpath(filename).write_text(theme.text)
yaml.dump(self.to_yaml(), themes_path.joinpath(f"{filename}.unhacs").open("w"))
def install_integration(self, hass_config_path: Path):
"""Installs the integration package."""
zipball_url = get_tag_zip(self.url, self.version)
response = requests.get(zipball_url)
response.raise_for_status()
with tempfile.TemporaryDirectory(prefix="unhacs-") as tempdir:
tmpdir = Path(tempdir)
extract_zip(ZipFile(BytesIO(response.content)), tmpdir)
source, dest = None, None
for custom_component in tmpdir.glob("custom_components/*"):
source = custom_component
dest = self.get_install_path(hass_config_path) / custom_component.name
break
else:
hacs_json = json.loads((tmpdir / "hacs.json").read_text())
if hacs_json.get("content_in_root"):
source = tmpdir
dest = self.get_install_path(hass_config_path) / self.name
if not source or not dest:
raise ValueError("No custom_components directory found")
dest.parent.mkdir(parents=True, exist_ok=True)
shutil.rmtree(dest, ignore_errors=True)
shutil.move(source, dest)
yaml.dump(self.to_yaml(), dest.joinpath("unhacs.yaml").open("w"))
def install_fork_component(self, hass_config_path: Path):
"""Installs the integration from hass fork."""
if not self.fork_component:
raise ValueError(f"No fork component specified for {self.verbose_str()}")
if not self.branch_name:
raise ValueError(f"No branch name specified for {self.verbose_str()}")
zipball_url = get_branch_zip(self.url, self.branch_name)
response = requests.get(zipball_url)
response.raise_for_status()
with tempfile.TemporaryDirectory(prefix="unhacs-") as tempdir:
tmpdir = Path(tempdir)
extract_zip(ZipFile(BytesIO(response.content)), tmpdir)
source, dest = None, None
source = tmpdir / "homeassistant" / "components" / self.fork_component
if not source.exists() or not source.is_dir():
raise ValueError(
f"Could not find {self.fork_component} in {self.url}@{self.version}"
)
# Add version to manifest
manifest_file = source / "manifest.json"
manifest = json.load(manifest_file.open())
manifest["version"] = "0.0.0"
json.dump(manifest, manifest_file.open("w"))
dest = self.get_install_path(hass_config_path) / source.name
if not source or not dest:
raise ValueError("No custom_components directory found")
dest.parent.mkdir(parents=True, exist_ok=True)
shutil.rmtree(dest, ignore_errors=True)
shutil.move(source, dest)
yaml.dump(self.to_yaml(), dest.joinpath("unhacs.yaml").open("w"))
def install(self, hass_config_path: Path):
"""Installs the package."""
if self.package_type == PackageType.PLUGIN:
self.install_plugin(hass_config_path)
elif self.package_type == PackageType.INTEGRATION:
self.install_integration(hass_config_path)
elif self.package_type == PackageType.FORK:
self.install_fork_component(hass_config_path)
elif self.package_type == PackageType.THEME:
self.install_theme_component(hass_config_path)
else:
raise NotImplementedError(f"Unknown package type {self.package_type}")
def uninstall(self, hass_config_path: Path) -> bool:
"""Uninstalls the package if it is installed, returning True if it was uninstalled."""
if not self.path:
print("No path found for package, searching...")
if installed_package := self.installed_package(hass_config_path):
installed_package.uninstall(hass_config_path)
return True
return False
print("Removing", self.path)
if self.path.is_dir():
shutil.rmtree(self.path)
else:
self.path.unlink()
self.path.with_name(f"{self.path.name}-unhacs.yaml").unlink()
# Remove from resources
resources_file = hass_config_path / "resources.yaml"
if resources_file.exists():
with resources_file.open("r") as f:
resources = yaml.safe_load(f) or []
new_resources = [
r for r in resources if r["url"] != f"/local/js/{self.path.name}"
]
if len(new_resources) != len(resources):
with resources_file.open("w") as f:
yaml.dump(new_resources, f)
return True
def installed_package(self, hass_config_path: Path) -> "Package|None":
"""Returns the installed package if it exists, otherwise None."""
for package in get_installed_packages(hass_config_path, [self.package_type]):
if self.same(package):
return package
return None
def is_update(self, hass_config_path: Path) -> bool:
"""Returns True if the package is not installed or the installed version is different from the latest."""
installed_package = self.installed_package(hass_config_path)
return installed_package is None or installed_package.version != self.version
def get_latest(self) -> "Package":
"""Returns a new Package representing the latest version of this package."""
package = self.to_yaml()
package.pop("version")
return Package(**package)
def get_installed_packages(
hass_config_path: Path = DEFAULT_HASS_CONFIG_PATH,
package_types: Iterable[PackageType] = (
PackageType.INTEGRATION,
PackageType.PLUGIN,
PackageType.THEME,
),
) -> list[Package]:
# Integration packages
packages: list[Package] = []
if PackageType.INTEGRATION in package_types:
for custom_component in get_install_path(
hass_config_path, PackageType.INTEGRATION
).glob("*"):
unhacs = custom_component / "unhacs.yaml"
if unhacs.exists():
package = Package.from_yaml(yaml.safe_load(unhacs.open()))
package.path = custom_component
packages.append(package)
# Plugin packages
if PackageType.PLUGIN in package_types:
for js_unhacs in get_install_path(hass_config_path, PackageType.PLUGIN).glob(
"*-unhacs.yaml"
):
package = Package.from_yaml(yaml.safe_load(js_unhacs.open()))
package.path = js_unhacs.with_name(
js_unhacs.name.removesuffix("-unhacs.yaml")
)
packages.append(package)
# Theme packages
if PackageType.THEME in package_types:
for js_unhacs in get_install_path(hass_config_path, PackageType.THEME).glob(
"*.unhacs"
):
package = Package.from_yaml(yaml.safe_load(js_unhacs.open()))
package.path = js_unhacs.with_name(js_unhacs.name.removesuffix(".unhacs"))
packages.append(package)
return packages
# Read a list of Packages from a text file in the plain text format "URL version name"
def read_lock_packages(package_file: Path = DEFAULT_PACKAGE_FILE) -> list[Package]:
if package_file.exists():
return [
Package.from_yaml(p)
for p in yaml.safe_load(package_file.open())["packages"]
]
return []
# Write a list of Packages to a text file in the format URL version name
def write_lock_packages(
packages: Iterable[Package], package_file: Path = DEFAULT_PACKAGE_FILE
):
yaml.dump({"packages": [p.to_yaml() for p in packages]}, package_file.open("w"))

View File

@ -0,0 +1,76 @@
from collections.abc import Iterable
from pathlib import Path
from typing import cast
import yaml
from unhacs.packages.common import Package
from unhacs.packages.common import PackageType
from unhacs.packages.fork import Fork
from unhacs.packages.integration import Integration
from unhacs.packages.plugin import Plugin
from unhacs.packages.theme import Theme
from unhacs.utils import DEFAULT_HASS_CONFIG_PATH
from unhacs.utils import DEFAULT_PACKAGE_FILE
def from_yaml(data: dict | Path | str) -> Package:
if isinstance(data, Path):
data = yaml.safe_load(data.open())
elif isinstance(data, str):
data = yaml.safe_load(data)
data = cast(dict, data)
# Convert package_type to enum
package_type = data.pop("package_type", None)
if package_type and isinstance(package_type, str):
package_type = PackageType(package_type)
url = data.pop("url")
return {
PackageType.INTEGRATION: Integration,
PackageType.PLUGIN: Plugin,
PackageType.THEME: Theme,
PackageType.FORK: Fork,
}[package_type](url, **data)
def get_installed_packages(
hass_config_path: Path = DEFAULT_HASS_CONFIG_PATH,
package_types: Iterable[PackageType] = (
PackageType.INTEGRATION,
PackageType.PLUGIN,
PackageType.THEME,
),
) -> list[Package]:
# Integration packages
packages: list[Package] = []
if PackageType.INTEGRATION in package_types:
packages.extend(Integration.find_installed(hass_config_path))
# Plugin packages
if PackageType.PLUGIN in package_types:
packages.extend(Plugin.find_installed(hass_config_path))
# Theme packages
if PackageType.THEME in package_types:
packages.extend(Theme.find_installed(hass_config_path))
return packages
# Read a list of Packages from a text file in the plain text format "URL version name"
def read_lock_packages(package_file: Path = DEFAULT_PACKAGE_FILE) -> list[Package]:
if package_file.exists():
return [from_yaml(p) for p in yaml.safe_load(package_file.open())["packages"]]
return []
# Write a list of Packages to a text file in the format URL version name
def write_lock_packages(
packages: Iterable[Package], package_file: Path = DEFAULT_PACKAGE_FILE
):
yaml.dump({"packages": [p.to_yaml() for p in packages]}, package_file.open("w"))

228
unhacs/packages/common.py Normal file
View File

@ -0,0 +1,228 @@
import shutil
from enum import StrEnum
from enum import auto
from pathlib import Path
from typing import Any
from typing import cast
import requests
import yaml
from unhacs.git import get_repo_tags
class PackageType(StrEnum):
INTEGRATION = auto()
PLUGIN = auto()
FORK = auto()
THEME = auto()
class Package:
git_tags = False
package_type: PackageType
other_fields: list[str] = []
def __init__(
self,
url: str,
version: str | None = None,
ignored_versions: set[str] | None = None,
):
self.url = url
self.ignored_versions = ignored_versions or set()
parts = self.url.split("/")
self.owner = parts[-2]
self.name = parts[-1]
self.path: Path | None = None
if not version:
self.version = self.fetch_version_release()
else:
self.version = version
def __str__(self):
return f"{self.package_type}: {self.name} {self.version}"
def __eq__(self, other):
return all(
(
self.same(other),
# TODO: Should this match versions?
)
)
def same(self, other):
fields = list(["url"] + self.other_fields)
return all((getattr(self, field) == getattr(other, field) for field in fields))
def __hash__(self):
fields = list(["url"] + self.other_fields)
return hash(tuple(getattr(self, field) for field in fields))
def verbose_str(self):
return f"{str(self)} ({self.url})"
@classmethod
def from_yaml(cls, data: dict | Path | str) -> "Package":
if isinstance(data, Path):
data = yaml.safe_load(data.open())
elif isinstance(data, str):
data = yaml.safe_load(data)
data = cast(dict, data)
if data["package_type"] != cls.package_type:
raise ValueError("Invalid package_type")
return cls(data.pop("url"), **data)
def to_yaml(self, dest: Path | None = None) -> dict:
data: dict[str, Any] = {
"url": self.url,
"version": self.version,
"package_type": str(self.package_type),
}
if self.ignored_versions:
data["ignored_versions"] = self.ignored_versions
for field in self.other_fields:
if hasattr(self, field):
data[field] = getattr(self, field)
if dest:
yaml.dump(self.to_yaml(), dest.open("w"))
return data
def add_ignored_version(self, version: str):
self.ignored_versions.add(version)
def _fetch_version_release_releases(self, version: str | None = None) -> str:
# Fetch the releases from the GitHub API
response = requests.get(
f"https://api.github.com/repos/{self.owner}/{self.name}/releases"
)
response.raise_for_status()
releases = response.json()
if not releases:
raise ValueError(f"No releases found for package {self.name}")
# Default to latest
desired_release = releases[0]
# If a version is provided, check if it exists in the releases
if version:
for release in releases:
if release["tag_name"] == version:
desired_release = release
break
else:
raise ValueError(f"Version {version} does not exist for this package")
return cast(str, desired_release["tag_name"])
def _fetch_version_release_git(self, version: str | None = None) -> str:
tags = get_repo_tags(self.url)
if not tags:
raise ValueError(f"No tags found for package {self.name}")
if version and version not in tags:
raise ValueError(f"Version {version} does not exist for this package")
tags = [tag for tag in tags if tag not in self.ignored_versions]
if not version:
version = tags[-1]
return version
def fetch_version_release(self, version: str | None = None) -> str:
if self.git_tags:
return self._fetch_version_release_git(version)
else:
return self._fetch_version_release_releases(version)
def _fetch_versions(self) -> list[str]:
return get_repo_tags(self.url)
def get_hacs_json(self, version: str | None = None) -> dict:
"""Fetches the hacs.json file for the package."""
version = version or self.version
response = requests.get(
f"https://raw.githubusercontent.com/{self.owner}/{self.name}/{version}/hacs.json"
)
if response.status_code == 404:
return {}
response.raise_for_status()
return response.json()
def install(self, hass_config_path: Path):
raise NotImplementedError()
def uninstall(self, hass_config_path: Path) -> bool:
"""Uninstalls the package if it is installed, returning True if it was uninstalled."""
if not self.path:
print("No path found for package, searching...")
if installed_package := self.installed_package(hass_config_path):
installed_package.uninstall(hass_config_path)
return True
return False
print("Removing", self.path)
if self.path.is_dir():
shutil.rmtree(self.path)
else:
self.path.unlink()
self.path.with_name(f"{self.path.name}-unhacs.yaml").unlink()
# Remove from resources
resources_file = hass_config_path / "resources.yaml"
if resources_file.exists():
with resources_file.open("r") as f:
resources = yaml.safe_load(f) or []
new_resources = [
r for r in resources if r["url"] != f"/local/js/{self.path.name}"
]
if len(new_resources) != len(resources):
with resources_file.open("w") as f:
yaml.dump(new_resources, f)
return True
@classmethod
def get_install_dir(cls, hass_config_path: Path) -> Path:
raise NotImplementedError()
@classmethod
def find_installed(cls, hass_config_path: Path) -> list["Package"]:
raise NotImplementedError()
def installed_package(self, hass_config_path: Path) -> "Package|None":
"""Returns the installed package if it exists, otherwise None."""
for package in self.find_installed(hass_config_path):
if self.same(package):
return package
return None
def is_update(self, hass_config_path: Path) -> bool:
"""Returns True if the package is not installed or the installed version is different from the latest."""
installed_package = self.installed_package(hass_config_path)
return installed_package is None or installed_package.version != self.version
def get_latest(self) -> "Package":
"""Returns a new Package representing the latest version of this package."""
package = self.to_yaml()
package.pop("version")
return Package(**package)

76
unhacs/packages/fork.py Normal file
View File

@ -0,0 +1,76 @@
import json
import shutil
import tempfile
from io import BytesIO
from pathlib import Path
from zipfile import ZipFile
import requests
from unhacs.git import get_branch_zip
from unhacs.git import get_latest_sha
from unhacs.packages import PackageType
from unhacs.packages.integration import Integration
from unhacs.utils import extract_zip
class Fork(Integration):
other_fields = ["fork_component", "branch_name"]
package_type = PackageType.FORK
def __init__(
self,
url: str,
fork_component: str,
branch_name: str,
version: str | None = None,
ignored_versions: set[str] | None = None,
):
self.fork_component = fork_component
self.branch_name = branch_name
super().__init__(
url,
version=version,
ignored_versions=ignored_versions,
)
def __str__(self):
return f"{self.package_type}: {self.fork_component} ({self.owner}/{self.name}@{self.branch_name}) {self.version}"
def fetch_version_release(self, version: str | None = None) -> str:
return get_latest_sha(self.url, self.branch_name)
def install(self, hass_config_path: Path) -> None:
"""Installs the integration from hass fork."""
zipball_url = get_branch_zip(self.url, self.branch_name)
response = requests.get(zipball_url)
response.raise_for_status()
with tempfile.TemporaryDirectory(prefix="unhacs-") as tempdir:
tmpdir = Path(tempdir)
extract_zip(ZipFile(BytesIO(response.content)), tmpdir)
source, dest = None, None
source = tmpdir / "homeassistant" / "components" / self.fork_component
if not source.exists() or not source.is_dir():
raise ValueError(
f"Could not find {self.fork_component} in {self.url}@{self.version}"
)
# Add version to manifest
manifest_file = source / "manifest.json"
manifest = json.load(manifest_file.open())
manifest["version"] = "0.0.0"
json.dump(manifest, manifest_file.open("w"))
dest = self.get_install_dir(hass_config_path) / source.name
if not source or not dest:
raise ValueError("No custom_components directory found")
dest.parent.mkdir(parents=True, exist_ok=True)
shutil.rmtree(dest, ignore_errors=True)
shutil.move(source, dest)
self.to_yaml(dest.joinpath("unhacs.yaml"))

View File

@ -0,0 +1,76 @@
import json
import shutil
import tempfile
from io import BytesIO
from pathlib import Path
from zipfile import ZipFile
import requests
from unhacs.git import get_tag_zip
from unhacs.packages import Package
from unhacs.packages import PackageType
from unhacs.utils import extract_zip
class Integration(Package):
package_type = PackageType.INTEGRATION
def __init__(
self,
url: str,
version: str | None = None,
ignored_versions: set[str] | None = None,
):
super().__init__(
url,
version=version,
ignored_versions=ignored_versions,
)
@classmethod
def get_install_dir(cls, hass_config_path: Path) -> Path:
return hass_config_path / "custom_components"
@classmethod
def find_installed(cls, hass_config_path: Path) -> list["Package"]:
packages: list[Package] = []
for custom_component in cls.get_install_dir(hass_config_path).glob("*"):
unhacs = custom_component / "unhacs.yaml"
if unhacs.exists():
package = cls.from_yaml(unhacs)
package.path = custom_component
packages.append(package)
return packages
def install(self, hass_config_path: Path) -> None:
"""Installs the integration package."""
zipball_url = get_tag_zip(self.url, self.version)
response = requests.get(zipball_url)
response.raise_for_status()
with tempfile.TemporaryDirectory(prefix="unhacs-") as tempdir:
tmpdir = Path(tempdir)
extract_zip(ZipFile(BytesIO(response.content)), tmpdir)
source, dest = None, None
for custom_component in tmpdir.glob("custom_components/*"):
source = custom_component
dest = self.get_install_dir(hass_config_path) / custom_component.name
break
else:
hacs_json = json.loads((tmpdir / "hacs.json").read_text())
if hacs_json.get("content_in_root"):
source = tmpdir
dest = self.get_install_dir(hass_config_path) / self.name
if not source or not dest:
raise ValueError("No custom_components directory found")
dest.parent.mkdir(parents=True, exist_ok=True)
shutil.rmtree(dest, ignore_errors=True)
shutil.move(source, dest)
self.to_yaml(dest.joinpath("unhacs.yaml"))

86
unhacs/packages/plugin.py Normal file
View File

@ -0,0 +1,86 @@
from pathlib import Path
from typing import cast
import requests
from unhacs.packages import Package
from unhacs.packages import PackageType
class Plugin(Package):
package_type = PackageType.PLUGIN
def __init__(
self,
url: str,
version: str | None = None,
ignored_versions: set[str] | None = None,
):
super().__init__(
url,
version=version,
ignored_versions=ignored_versions,
)
@classmethod
def get_install_dir(cls, hass_config_path: Path) -> Path:
return hass_config_path / "www" / "js"
@classmethod
def find_installed(cls, hass_config_path: Path) -> list["Package"]:
packages: list[Package] = []
for js_unhacs in cls.get_install_dir(hass_config_path).glob("*-unhacs.yaml"):
package = cls.from_yaml(js_unhacs)
package.path = js_unhacs.with_name(
js_unhacs.name.removesuffix("-unhacs.yaml")
)
packages.append(package)
return packages
def install(self, hass_config_path: Path) -> None:
"""Installs the plugin package."""
valid_filenames: list[str]
if filename := self.get_hacs_json().get("filename"):
valid_filenames = [cast(str, filename)]
else:
valid_filenames = [
f"{self.name.removeprefix('lovelace-')}.js",
f"{self.name}.js",
f"{self.name}-umd.js",
f"{self.name}-bundle.js",
]
def real_get(filename) -> requests.Response | None:
urls = [
f"https://raw.githubusercontent.com/{self.owner}/{self.name}/{self.version}/dist/{filename}",
f"https://github.com/{self.owner}/{self.name}/releases/download/{self.version}/{filename}",
f"https://raw.githubusercontent.com/{self.owner}/{self.name}/{self.version}/{filename}",
]
for url in urls:
plugin = requests.get(url)
if int(plugin.status_code / 100) == 4:
continue
plugin.raise_for_status()
return plugin
return None
for filename in valid_filenames:
plugin = real_get(filename)
if plugin:
break
else:
raise ValueError(f"No valid filename found for package {self.name}")
js_path = self.get_install_dir(hass_config_path)
js_path.mkdir(parents=True, exist_ok=True)
js_path.joinpath(filename).write_text(plugin.text)
self.to_yaml(js_path.joinpath(f"{filename}-unhacs.yaml"))

57
unhacs/packages/theme.py Normal file
View File

@ -0,0 +1,57 @@
from pathlib import Path
from typing import cast
import requests
from unhacs.packages import Package
from unhacs.packages import PackageType
class Theme(Package):
package_type = PackageType.THEME
def __init__(
self,
url: str,
version: str | None = None,
ignored_versions: set[str] | None = None,
):
super().__init__(
url,
version=version,
ignored_versions=ignored_versions,
)
@classmethod
def get_install_dir(cls, hass_config_path: Path) -> Path:
return hass_config_path / "themes"
@classmethod
def find_installed(cls, hass_config_path: Path) -> list["Package"]:
packages: list[Package] = []
for js_unhacs in cls.get_install_dir(hass_config_path).glob("*-unhacs.yaml"):
package = cls.from_yaml(js_unhacs)
package.path = js_unhacs.with_name(
js_unhacs.name.removesuffix("-unhacs.yaml")
)
packages.append(package)
return packages
def install(self, hass_config_path: Path) -> None:
"""Install theme yaml."""
filename = self.get_hacs_json().get("filename")
if not filename:
raise ValueError(f"No filename found for theme {self.name}")
filename = cast(str, filename)
url = f"https://raw.githubusercontent.com/{self.owner}/{self.name}/{self.version}/themes/{filename}"
theme = requests.get(url)
theme.raise_for_status()
themes_path = self.get_install_dir(hass_config_path)
themes_path.mkdir(parents=True, exist_ok=True)
themes_path.joinpath(filename).write_text(theme.text)
self.to_yaml(themes_path.joinpath(f"{filename}.unhacs"))

21
unhacs/utils.py Normal file
View File

@ -0,0 +1,21 @@
from pathlib import Path
from zipfile import ZipFile
DEFAULT_HASS_CONFIG_PATH: Path = Path(".")
DEFAULT_PACKAGE_FILE = Path("unhacs.yaml")
def extract_zip(zip_file: ZipFile, dest_dir: Path) -> Path:
"""Extract a zip file to a directory."""
for info in zip_file.infolist():
if info.is_dir():
continue
file = Path(info.filename)
# Strip top directory from path
file = Path(*file.parts[1:])
path = dest_dir / file
path.parent.mkdir(parents=True, exist_ok=True)
with zip_file.open(info) as source, open(path, "wb") as dest:
dest.write(source.read())
return dest_dir