Compare commits

..

3 Commits

Author SHA1 Message Date
b019185380 Use branch zip for downloading forks
Some checks failed
continuous-integration/drone/push Build is failing
2024-07-21 09:42:46 -07:00
ea982ee5a9 Rename tag zip and add branch zip
Some checks failed
continuous-integration/drone/push Build is failing
2024-07-21 09:16:15 -07:00
273199a9a6 WIP: Support for installing hass fork components
All checks were successful
continuous-integration/drone/push Build is passing
2024-07-17 21:12:31 -07:00
17 changed files with 469 additions and 1137 deletions

3
.gitignore vendored
View File

@ -141,6 +141,3 @@ cython_debug/
tags
unhacs.txt
poetry.lock
custom_components/
themes/
unhacs.yaml

View File

@ -24,7 +24,7 @@ lint: devenv
# Runs tests
.PHONY: test
test: devenv
poetry run python -m unittest discover tests --pattern "*_test.py"
@echo TODO: poetry run pytest
# Builds wheel for package to upload
.PHONY: build
@ -37,27 +37,6 @@ verify-tag-version:
$(eval TAG_NAME = $(shell [ -n "$(DRONE_TAG)" ] && echo $(DRONE_TAG) || git describe --tags --exact-match))
test "v$(shell poetry version | awk '{print $$2}')" = "$(TAG_NAME)"
.PHONY: bump-patch
bump-patch:
$(eval NEW_VERSION = $(shell poetry version patch | awk '{print $$6}'))
git add pyproject.toml
git commit -m "Bump version to $(NEW_VERSION)"
git tag "v$(NEW_VERSION)"
.PHONY: bump-minor
bump-minor:
$(eval NEW_VERSION = $(shell poetry version minor | awk '{print $$6}'))
git add pyproject.toml
git commit -m "Bump version to $(NEW_VERSION)"
git tag "v$(NEW_VERSION)"
.PHONY: bump-major
bump-major:
$(eval NEW_VERSION = $(shell poetry version major | awk '{print $$6}'))
git add pyproject.toml
git commit -m "Bump version to $(NEW_VERSION)"
git tag "v$(NEW_VERSION)"
# Upload to pypi
.PHONY: upload
upload: verify-tag-version build

View File

@ -38,14 +38,6 @@ If you already have a list of packages in a file, you can add them all at once u
unhacs add --file <file_path>
```
### Add a component from a forked Home Assistant Core repository
To add a component from a fork of home-assistant/core, use the `--forked-component` flag followed by the URL of the forked repository and then specify the branch with the `--branch` flag:
```bash
unhacs add --forked-component <forked_repo_url> --branch <branch>
```
### List packages
To list all installed packages, use the `list` command:

View File

@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "unhacs"
version = "0.7.1"
version = "0.5.2"
description = "Command line interface to install Home Assistant Community Store packages"
authors = ["Ian Fijolek <ian@iamthefij.com>"]
license = "MIT"
@ -12,16 +12,14 @@ readme = "README.md"
[tool.poetry.dependencies]
python = "^3.11"
requests = "^2.32.0"
pyyaml = "^6.0.0"
requests = "^2.32.3"
pyyaml = "^6.0.1"
[tool.poetry.group.dev.dependencies]
black = "^24.4.2"
isort = "^5.13.2"
mypy = "^1.10.0"
pre-commit = "^3.7.1"
types-requests = "^2.32.0"
types-pyyaml = "^6.0.0"
types-requests = "^2.32.0.20240602"
[tool.poetry.scripts]
unhacs = 'unhacs.main:main'

View File

View File

@ -1,260 +0,0 @@
import os
import shutil
import tempfile
import unittest
from pathlib import Path
from unhacs.main import main
from unhacs.packages import get_installed_packages
from unhacs.packages import read_lock_packages
INTEGRATION_URL = "https://github.com/simbaja/ha_gehome"
INTEGRATION_VERSION = "v0.6.9"
PLUGIN_URL = "https://github.com/kalkih/mini-media-player"
PLUGIN_VERSION = "v1.16.8"
THEME_URL = "https://github.com/basnijholt/lovelace-ios-themes"
THEME_VERSION = "v3.0.1"
FORK_URL = "https://github.com/ViViDboarder/home-assistant"
FORK_BRANCH = "dev"
FORK_COMPONENT = "nextbus"
FORK_VERSION = "3b2893f2f4e16f9a05d9cc4a7ba9f31984c841be"
class TestMainIntegrarion(unittest.TestCase):
test_dir: str
def setUp(self):
self.test_dir = tempfile.mkdtemp()
os.chdir(self.test_dir)
def tearDown(self):
shutil.rmtree(self.test_dir)
pass
def run_itest(
self,
test_name: str,
command: str,
expected_files: list[str] | None = None,
expect_missing_files: list[str] | None = None,
expected_code: int = 0,
):
with self.subTest(test_name, command=command):
self.assertEqual(main(command.split()), expected_code)
# Verify that the package was installed by checking the filesystem
if expected_files:
expected_files = [
os.path.join(self.test_dir, file) for file in expected_files
]
missing_files = [
file for file in expected_files if not os.path.exists(file)
]
if missing_files:
self.fail(f"Missing files: {missing_files}")
if expect_missing_files:
expect_missing_files = [
os.path.join(self.test_dir, file) for file in expect_missing_files
]
existing_files = [
file for file in expect_missing_files if os.path.exists(file)
]
if existing_files:
self.fail(f"Files should not exist: {existing_files}")
def test_integration(self):
self.run_itest(
"Add integration",
f"add {INTEGRATION_URL} --version {INTEGRATION_VERSION}",
expected_files=[
"custom_components/ge_home/__init__.py",
"custom_components/ge_home/manifest.json",
"custom_components/ge_home/switch.py",
],
)
self.run_itest(
"List installed packages",
"list",
)
installed = get_installed_packages()
self.assertEqual(len(installed), 1)
self.assertEqual(installed[0].url, INTEGRATION_URL)
self.assertEqual(installed[0].version, INTEGRATION_VERSION)
self.run_itest(
"Double add",
f"add {INTEGRATION_URL}",
expected_code=1,
)
self.run_itest(
"Upgrade to latest version",
"upgrade ha_gehome --yes",
expected_files=[
"custom_components/ge_home/__init__.py",
"custom_components/ge_home/manifest.json",
"custom_components/ge_home/switch.py",
],
)
installed = get_installed_packages()
self.assertEqual(len(installed), 1)
self.assertEqual(installed[0].url, INTEGRATION_URL)
self.assertNotEqual(installed[0].version, INTEGRATION_VERSION)
self.run_itest(
"Downgrade integration",
f"add {INTEGRATION_URL} --version {INTEGRATION_VERSION} --update",
expected_files=[
"custom_components/ge_home/__init__.py",
"custom_components/ge_home/manifest.json",
"custom_components/ge_home/switch.py",
],
)
self.run_itest(
"List installed packages",
"list",
)
installed = get_installed_packages()
self.assertEqual(len(installed), 1)
self.assertEqual(installed[0].url, INTEGRATION_URL)
self.assertEqual(installed[0].version, INTEGRATION_VERSION)
# Delete the custom_components folder and re-install the integration using the lock file
shutil.rmtree(os.path.join(self.test_dir, "custom_components"))
self.run_itest(
"Re-install integration using lock file",
"add --file unhacs.yaml",
expected_files=[
"custom_components/ge_home/__init__.py",
"custom_components/ge_home/manifest.json",
"custom_components/ge_home/switch.py",
],
)
# Delete the lock file and then regenerate it
os.remove(os.path.join(self.test_dir, "unhacs.yaml"))
self.run_itest(
"Regenerate lock file",
"list --freeze",
expected_files=[
"unhacs.yaml",
],
)
self.assertGreater(len(read_lock_packages()), 0)
self.run_itest(
"Remove integration",
"remove ha_gehome --yes",
expect_missing_files=[
"custom_components/ge_home/__init__.py",
"custom_components/ge_home/manifest.json",
"custom_components/ge_home/switch.py",
],
)
installed = get_installed_packages()
self.assertEqual(len(installed), 0)
def test_plugin(self):
self.run_itest(
"Add plugin",
f"add --plugin {PLUGIN_URL} --version {PLUGIN_VERSION}",
expected_files=[
"www/js/mini-media-player-bundle.js",
],
)
self.run_itest(
"List installed packages",
"list",
)
installed = get_installed_packages()
self.assertEqual(len(installed), 1)
self.assertEqual(installed[0].url, PLUGIN_URL)
self.assertEqual(installed[0].version, PLUGIN_VERSION)
self.run_itest(
"Remove plugin",
"remove mini-media-player --yes",
expect_missing_files=[
"www/js/mini-media-player-bundle.js",
],
)
installed = get_installed_packages()
self.assertEqual(len(installed), 0)
def test_theme(self):
self.run_itest(
"Add theme",
f"add --theme {THEME_URL} --version {THEME_VERSION}",
expected_files=[
"themes/ios-themes.yaml",
],
)
self.run_itest(
"List installed packages",
"list",
)
installed = get_installed_packages()
self.assertEqual(len(installed), 1)
self.assertEqual(installed[0].url, THEME_URL)
self.assertEqual(installed[0].version, THEME_VERSION)
self.run_itest(
"Remove theme",
"remove lovelace-ios-themes --yes",
expect_missing_files=[
"themes/ios-themes.yaml",
],
)
installed = get_installed_packages()
self.assertEqual(len(installed), 0)
def test_fork(self):
self.run_itest(
"Add fork",
f"add {FORK_URL} --fork-component {FORK_COMPONENT} --fork-branch {FORK_BRANCH} --version {FORK_VERSION}",
expected_files=[
"custom_components/nextbus/__init__.py",
"custom_components/nextbus/manifest.json",
"custom_components/nextbus/sensor.py",
"custom_components/nextbus/unhacs.yaml",
],
)
self.run_itest(
"List installed packages",
"list",
)
installed = get_installed_packages()
self.assertEqual(len(installed), 1)
self.assertEqual(installed[0].url, FORK_URL)
self.assertEqual(installed[0].version, FORK_VERSION)
self.run_itest(
"Remove fork",
f"remove {FORK_URL} --yes",
expect_missing_files=[
"custom_components/nextbus/__init__.py",
"custom_components/nextbus/manifest.json",
"custom_components/nextbus/sensor.py",
"custom_components/nextbus/unhacs.yaml",
],
)
installed = get_installed_packages()
self.assertEqual(len(installed), 0)
if __name__ == "__main__":
unittest.main()

View File

@ -1,4 +1,4 @@
from unhacs.main import main
if __name__ == "__main__":
exit(main())
main()

View File

@ -79,7 +79,7 @@ def get_latest_sha(repository_url: str, branch_name: str) -> str:
for line in result.stdout.decode().split("\n"):
if line:
return line.partition("\t")[0]
return line.partition(" ")[0]
raise ValueError(f"branch name '{branch_name}' not found for {repository_url}")
@ -87,10 +87,7 @@ def get_latest_sha(repository_url: str, branch_name: str) -> str:
def get_tag_zip(repository_url: str, tag_name: str) -> str:
return f"{repository_url}/archive/refs/tags/{tag_name}.zip"
def get_branch_zip(repository_url: str, branch_name: str) -> str:
return f"{repository_url}/archive/{branch_name}.zip"
def get_sha_zip(repository_url: str, sha: str) -> str:
return f"{repository_url}/archive/{sha}.zip"

View File

@ -1,30 +1,18 @@
import sys
from argparse import ArgumentParser
from collections.abc import Iterable
from pathlib import Path
from unhacs.git import get_repo_tags
from unhacs.packages import DEFAULT_HASS_CONFIG_PATH
from unhacs.packages import DEFAULT_PACKAGE_FILE
from unhacs.packages import Package
from unhacs.packages import PackageType
from unhacs.packages import get_installed_packages
from unhacs.packages import read_lock_packages
from unhacs.packages import write_lock_packages
from unhacs.packages.fork import Fork
from unhacs.packages.integration import Integration
from unhacs.packages.plugin import Plugin
from unhacs.packages.theme import Theme
from unhacs.utils import DEFAULT_HASS_CONFIG_PATH
from unhacs.utils import DEFAULT_PACKAGE_FILE
class InvalidArgumentsError(ValueError):
pass
class DuplicatePackageError(ValueError):
pass
def parse_args(argv: list[str]):
def create_parser():
parser = ArgumentParser(
description="Unhacs - Command line interface for the Home Assistant Community Store"
)
@ -54,12 +42,6 @@ def parse_args(argv: list[str]):
# List installed packages
list_parser = subparsers.add_parser("list", description="List installed packages.")
list_parser.add_argument("--verbose", "-v", action="store_true")
list_parser.add_argument(
"--freeze",
"-f",
action="store_true",
help="Regenerate unhacs.yaml with installed packages.",
)
# List git tags for a given package
list_tags_parser = subparsers.add_parser("tags", help="List tags for a package.")
@ -84,35 +66,11 @@ def parse_args(argv: list[str]):
"--integration",
action="store_const",
dest="type",
const=Integration,
default=Integration,
help="The package is an integration.",
const=PackageType.INTEGRATION,
default=PackageType.INTEGRATION,
)
package_type_group.add_argument(
"--plugin",
action="store_const",
dest="type",
const=Plugin,
help="The package is a JavaScript plugin.",
)
package_type_group.add_argument(
"--theme",
action="store_const",
dest="type",
const=Theme,
help="The package is a theme.",
)
package_type_group.add_argument(
"--fork-component",
type=str,
help="Name of component from forked core repo.",
)
# Additional arguments for forked packages
add_parser.add_argument(
"--fork-branch",
"-b",
type=str,
help="Name of branch of forked core repo. (Only for forked components.)",
"--plugin", action="store_const", dest="type", const=PackageType.PLUGIN
)
add_parser.add_argument(
@ -135,34 +93,15 @@ def parse_args(argv: list[str]):
remove_parser = subparsers.add_parser(
"remove", description="Remove installed packages."
)
remove_parser.add_argument(
"--yes", "-y", action="store_true", help="Do not prompt for confirmation."
)
remove_parser.add_argument("packages", nargs="+")
# Upgrade packages
update_parser = subparsers.add_parser(
"upgrade", description="Upgrade installed packages."
)
update_parser.add_argument(
"--yes", "-y", action="store_true", help="Do not prompt for confirmation."
)
update_parser.add_argument("packages", nargs="*")
args = parser.parse_args(argv)
if args.subcommand == "add":
# Component implies forked package
if args.fork_component and args.type != Fork:
args.type = Fork
# Branch is only valid for forked packages
if args.type != Fork and args.fork_branch:
raise InvalidArgumentsError(
"Branch and component can only be used with forked packages"
)
return args
return parser
class Unhacs:
@ -182,26 +121,36 @@ class Unhacs:
def add_package(
self,
package: Package,
package_url: str,
version: str | None = None,
update: bool = False,
package_type: PackageType = PackageType.INTEGRATION,
ignore_versions: set[str] | None = None,
):
"""Install and add a package to the lock or install a specific version."""
package = Package(
package_url,
version=version,
package_type=package_type,
ignored_versions=ignore_versions,
)
packages = self.read_lock_packages()
# Raise an error if the package is already in the list
if existing_package := next((p for p in packages if p.same(package)), None):
existing_package = next((p for p in packages if p.url == package.url), None)
if existing_package:
if update:
# Remove old version of the package
packages = [p for p in packages if p == existing_package]
packages = [p for p in packages if p.url != package.url]
else:
raise DuplicatePackageError("Package already exists in the list")
raise ValueError("Package already exists in the list")
package.install(self.hass_config)
packages.append(package)
self.write_lock_packages(packages)
def upgrade_packages(self, package_names: list[str], yes: bool = False):
def upgrade_packages(self, package_names: list[str]):
"""Uograde to latest version of packages and update lock."""
installed_packages: Iterable[Package]
@ -225,60 +174,35 @@ class Unhacs:
)
outdated_packages.append(latest_package)
confirmed = yes or input("Upgrade all packages? (y/N) ").lower() == "y"
if outdated_packages and not confirmed:
if outdated_packages and input("Upgrade all packages? (y/N) ").lower() != "y":
return
for installed_package in outdated_packages:
installed_package.install(self.hass_config)
# Update lock file to latest now that we know they are uograded
latest_lookup = {p: p for p in latest_packages}
packages = [latest_lookup.get(p, p) for p in self.read_lock_packages()]
latest_lookup = {p.url: p for p in latest_packages}
packages = [latest_lookup.get(p.url, p) for p in self.read_lock_packages()]
self.write_lock_packages(packages)
def list_packages(self, verbose: bool = False, freeze: bool = False):
def list_packages(self, verbose: bool = False):
"""List installed packages and their versions."""
installed_packages = get_installed_packages()
for package in installed_packages:
for package in get_installed_packages():
print(package.verbose_str() if verbose else str(package))
if freeze:
self.write_lock_packages(installed_packages)
def list_tags(self, url: str, limit: int = 10):
print(f"Tags for {url}:")
for tag in get_repo_tags(url)[-1 * limit :]:
print(tag)
def remove_packages(self, package_names: list[str], yes: bool = False):
def remove_packages(self, package_names: list[str]):
"""Remove installed packages and uodate lock."""
packages_to_remove = [
package
for package in get_installed_packages()
if (
package.name in package_names
or package.url in package_names
or (
hasattr(package, "fork_component")
and getattr(package, "fork_component") in package_names
)
)
if (package.name in package_names or package.url in package_names)
]
if package_names and not packages_to_remove:
print("No packages found to remove")
return
print("Packages to remove:")
for package in packages_to_remove:
print(package)
confirmed = yes or input("Remove listed packages? (y/N) ").lower() == "y"
if packages_to_remove and not confirmed:
return
remaining_packages = [
package
for package in self.read_lock_packages()
@ -291,36 +215,10 @@ class Unhacs:
self.write_lock_packages(remaining_packages)
def args_to_package(args) -> Package:
ignore_versions = (
{version for version in args.ignore_versions.split(",")}
if args.ignore_versions
else None
)
if args.type == Fork:
if not args.fork_branch:
raise InvalidArgumentsError(
"A branch must be provided for forked components"
)
if not args.fork_component:
raise InvalidArgumentsError(
"A component must be provided for forked components"
)
return Fork(
args.url,
branch_name=args.fork_branch,
fork_component=args.fork_component,
version=args.version,
ignored_versions=ignore_versions,
)
return args.type(args.url, version=args.version, ignored_versions=ignore_versions)
def main(argv: list[str] | None = None) -> int:
args = parse_args(argv or sys.argv[1:])
def main():
# If the sub command is add package, it should pass the parsed arguments to the add_package function and return
parser = create_parser()
args = parser.parse_args()
unhacs = Unhacs(args.config, args.package_file)
Package.git_tags = args.git_tags
@ -331,40 +229,38 @@ def main(argv: list[str] | None = None) -> int:
packages = read_lock_packages(args.file)
for package in packages:
unhacs.add_package(
package,
package.url,
package.version,
update=True,
package_type=package.package_type,
ignore_versions=package.ignored_versions,
)
elif args.url:
try:
new_package = args_to_package(args)
except InvalidArgumentsError as e:
print(e)
return 1
try:
unhacs.add_package(
new_package,
update=args.update,
)
except DuplicatePackageError as e:
print(e)
return 1
unhacs.add_package(
args.url,
version=args.version,
update=args.update,
package_type=args.type,
ignore_versions=(
{version for version in args.ignore_versions.split(",")}
if args.ignore_versions
else None
),
)
else:
print("Either a file or a URL must be provided")
return 1
raise ValueError("Either a file or a URL must be provided")
elif args.subcommand == "list":
unhacs.list_packages(args.verbose, args.freeze)
unhacs.list_packages(args.verbose)
elif args.subcommand == "tags":
unhacs.list_tags(args.url, limit=args.limit)
elif args.subcommand == "remove":
unhacs.remove_packages(args.packages, yes=args.yes)
unhacs.remove_packages(args.packages)
elif args.subcommand == "upgrade":
unhacs.upgrade_packages(args.packages, yes=args.yes)
unhacs.upgrade_packages(args.packages)
else:
print(f"Command {args.subcommand} is not implemented")
return 1
return 0
exit(1)
if __name__ == "__main__":
exit(main())
main()

406
unhacs/packages.py Normal file
View File

@ -0,0 +1,406 @@
import json
import shutil
import tempfile
from collections.abc import Generator
from collections.abc import Iterable
from enum import StrEnum
from enum import auto
from io import BytesIO
from pathlib import Path
from typing import cast
from zipfile import ZipFile
import requests
import yaml
from unhacs.git import get_branch_zip
from unhacs.git import get_latest_sha
from unhacs.git import get_repo_tags
from unhacs.git import get_tag_zip
DEFAULT_HASS_CONFIG_PATH: Path = Path(".")
DEFAULT_PACKAGE_FILE = Path("unhacs.yaml")
def extract_zip(zip_file: ZipFile, dest_dir: Path):
for info in zip_file.infolist():
if info.is_dir():
continue
file = Path(info.filename)
# Strip top directory from path
file = Path(*file.parts[1:])
path = dest_dir / file
path.parent.mkdir(parents=True, exist_ok=True)
with zip_file.open(info) as source, open(path, "wb") as dest:
dest.write(source.read())
class PackageType(StrEnum):
INTEGRATION = auto()
PLUGIN = auto()
FORK = auto()
class Package:
git_tags = False
def __init__(
self,
url: str,
version: str | None = None,
package_type: PackageType = PackageType.INTEGRATION,
ignored_versions: set[str] | None = None,
branch_name: str | None = None,
fork_component: str | None = None,
):
if package_type == PackageType.FORK and not fork_component:
raise ValueError(f"Fork with no component specified {url}@{branch_name}")
self.url = url
self.package_type = package_type
self.fork_component = fork_component
self.ignored_versions = ignored_versions or set()
self.branch_name = branch_name
parts = self.url.split("/")
self.owner = parts[-2]
self.name = parts[-1]
self.path: Path | None = None
if not version:
self.version = self.fetch_version_release()
else:
self.version = version
def __str__(self):
return f"{self.name} {self.version}"
def __eq__(self, other):
return self.url == other.url and self.version == other.version
def verbose_str(self):
return f"{self.name} {self.version} ({self.url})"
@staticmethod
def from_yaml(yaml: dict) -> "Package":
# Convert package_type to enum
package_type = yaml.pop("package_type", None)
if package_type and isinstance(package_type, str):
package_type = PackageType(package_type)
yaml["package_type"] = package_type
return Package(**yaml)
def to_yaml(self: "Package") -> dict:
data = {
"url": self.url,
"version": self.version,
"package_type": str(self.package_type),
}
if self.branch_name:
data["branch_name"] = self.branch_name
return data
def add_ignored_version(self, version: str):
self.ignored_versions.add(version)
def _fetch_version_release_releases(self, version: str | None = None) -> str:
# Fetch the releases from the GitHub API
response = requests.get(
f"https://api.github.com/repos/{self.owner}/{self.name}/releases"
)
response.raise_for_status()
releases = response.json()
if not releases:
raise ValueError(f"No releases found for package {self.name}")
# Default to latest
desired_release = releases[0]
# If a version is provided, check if it exists in the releases
if version:
for release in releases:
if release["tag_name"] == version:
desired_release = release
break
else:
raise ValueError(f"Version {version} does not exist for this package")
return cast(str, desired_release["tag_name"])
def _fetch_version_release_git(self, version: str | None = None) -> str:
tags = get_repo_tags(self.url)
if not tags:
raise ValueError(f"No tags found for package {self.name}")
if version and version not in tags:
raise ValueError(f"Version {version} does not exist for this package")
tags = [tag for tag in tags if tag not in self.ignored_versions]
if not version:
version = tags[-1]
return version
def _fetch_latest_sha(self, branch_name: str) -> str:
return get_latest_sha(self.url, branch_name)
def fetch_version_release(self, version: str | None = None) -> str:
if self.branch_name:
return self._fetch_latest_sha(self.branch_name)
elif self.git_tags:
return self._fetch_version_release_git(version)
else:
return self._fetch_version_release_releases(version)
def fetch_versions(self) -> list[str]:
return get_repo_tags(self.url)
def get_hacs_json(self, version: str | None = None) -> dict:
"""Fetches the hacs.json file for the package."""
version = version or self.version
response = requests.get(
f"https://raw.githubusercontent.com/{self.owner}/{self.name}/{version}/hacs.json"
)
if response.status_code == 404:
return {}
response.raise_for_status()
return response.json()
def install_plugin(self, hass_config_path: Path):
"""Installs the plugin package."""
valid_filenames: Iterable[str]
if filename := self.get_hacs_json().get("filename"):
valid_filenames = (cast(str, filename),)
else:
valid_filenames = (
f"{self.name.removeprefix('lovelace-')}.js",
f"{self.name}.js",
f"{self.name}-umd.js",
f"{self.name}-bundle.js",
)
def real_get(filename) -> requests.Response | None:
urls = [
f"https://raw.githubusercontent.com/{self.owner}/{self.version}/dist/{filename}",
f"https://github.com/{self.owner}/{self.name}/releases/download/{self.version}/{filename}",
f"https://raw.githubusercontent.com/{self.owner}/{self.version}/{filename}",
]
for url in urls:
plugin = requests.get(url)
if int(plugin.status_code / 100) == 4:
continue
plugin.raise_for_status()
return plugin
return None
for filename in valid_filenames:
plugin = real_get(filename)
if plugin:
break
else:
raise ValueError(f"No valid filename found for package {self.name}")
js_path = hass_config_path / "www" / "js"
js_path.mkdir(parents=True, exist_ok=True)
js_path.joinpath(filename).write_text(plugin.text)
yaml.dump(self.to_yaml(), js_path.joinpath(f"{filename}-unhacs.yaml").open("w"))
# Write to resources
resources: list[dict] = []
resources_file = hass_config_path / "resources.yaml"
if resources_file.exists():
resources = yaml.safe_load(resources_file.open()) or []
if not any(r["url"] == f"/local/js/{filename}" for r in resources):
resources.append(
{
"url": f"/local/js/{filename}",
"type": "module",
}
)
yaml.dump(resources, resources_file.open("w"))
def install_integration(self, hass_config_path: Path):
"""Installs the integration package."""
zipball_url = get_tag_zip(self.url, self.version)
response = requests.get(zipball_url)
response.raise_for_status()
with tempfile.TemporaryDirectory(prefix="unhacs-") as tempdir:
tmpdir = Path(tempdir)
extract_zip(ZipFile(BytesIO(response.content)), tmpdir)
source, dest = None, None
for custom_component in tmpdir.glob("custom_components/*"):
source = custom_component
dest = hass_config_path / "custom_components" / custom_component.name
break
else:
hacs_json = json.loads((tmpdir / "hacs.json").read_text())
if hacs_json.get("content_in_root"):
source = tmpdir
dest = hass_config_path / "custom_components" / self.name
if not source or not dest:
raise ValueError("No custom_components directory found")
dest.parent.mkdir(parents=True, exist_ok=True)
shutil.rmtree(dest, ignore_errors=True)
shutil.move(source, dest)
yaml.dump(self.to_yaml(), dest.joinpath("unhacs.yaml").open("w"))
def install_fork_component(self, hass_config_path: Path):
"""Installs the integration from hass fork."""
# TODO: Replace asserts with errors
assert self.fork_component
assert self.branch_name
zipball_url = get_branch_zip(self.url, self.branch_name)
response = requests.get(zipball_url)
response.raise_for_status()
with tempfile.TemporaryDirectory(prefix="unhacs-") as tempdir:
tmpdir = Path(tempdir)
extract_zip(ZipFile(BytesIO(response.content)), tmpdir)
source, dest = None, None
source = tmpdir / "homeassistant" / "components" / self.fork_component
if not source.exists() or not source.is_dir():
raise ValueError(
f"Could not find {self.fork_component} in {self.url}@{self.version}"
)
# Add version to manifest
manifest_file = source / "manifest.json"
manifest = json.load(manifest_file.open())
manifest["version"] = "0.0.0"
json.dump(manifest, manifest_file.open("w"))
dest = hass_config_path / "custom_components" / source.name
if not source or not dest:
raise ValueError("No custom_components directory found")
dest.parent.mkdir(parents=True, exist_ok=True)
shutil.rmtree(dest, ignore_errors=True)
shutil.move(source, dest)
yaml.dump(self.to_yaml(), dest.joinpath("unhacs.yaml").open("w"))
def install(self, hass_config_path: Path):
"""Installs the package."""
if self.package_type == PackageType.PLUGIN:
self.install_plugin(hass_config_path)
elif self.package_type == PackageType.INTEGRATION:
self.install_integration(hass_config_path)
else:
raise NotImplementedError(f"Unknown package type {self.package_type}")
def uninstall(self, hass_config_path: Path) -> bool:
"""Uninstalls the package if it is installed, returning True if it was uninstalled."""
if not self.path:
print("No path found for package, searching...")
if installed_package := self.installed_package(hass_config_path):
installed_package.uninstall(hass_config_path)
return True
return False
print("Removing", self.path)
if self.path.is_dir():
shutil.rmtree(self.path)
else:
self.path.unlink()
self.path.with_name(f"{self.path.name}-unhacs.yaml").unlink()
# Remove from resources
resources_file = hass_config_path / "resources.yaml"
if resources_file.exists():
with resources_file.open("r") as f:
resources = yaml.safe_load(f) or []
new_resources = [
r for r in resources if r["url"] != f"/local/js/{self.path.name}"
]
if len(new_resources) != len(resources):
with resources_file.open("w") as f:
yaml.dump(new_resources, f)
return True
def installed_package(self, hass_config_path: Path) -> "Package|None":
"""Returns the installed package if it exists, otherwise None."""
for package in get_installed_packages(hass_config_path, [self.package_type]):
if package.url == self.url:
return package
return None
def is_update(self, hass_config_path: Path) -> bool:
"""Returns True if the package is not installed or the installed version is different from the latest."""
installed_package = self.installed_package(hass_config_path)
return installed_package is None or installed_package.version != self.version
def get_latest(self) -> "Package":
"""Returns a new Package representing the latest version of this package."""
package = self.to_yaml()
package.pop("version")
return Package(**package)
def get_installed_packages(
hass_config_path: Path = DEFAULT_HASS_CONFIG_PATH,
package_types: Iterable[PackageType] = (
PackageType.INTEGRATION,
PackageType.PLUGIN,
),
) -> Generator[Package, None, None]:
# Integration packages
if PackageType.INTEGRATION in package_types:
for custom_component in (hass_config_path / "custom_components").glob("*"):
unhacs = custom_component / "unhacs.yaml"
if unhacs.exists():
package = Package.from_yaml(yaml.safe_load(unhacs.open()))
package.path = custom_component
yield package
# Plugin packages
if PackageType.PLUGIN in package_types:
for js_unhacs in (hass_config_path / "www" / "js").glob("*-unhacs.yaml"):
package = Package.from_yaml(yaml.safe_load(js_unhacs.open()))
package.path = js_unhacs.with_name(
js_unhacs.name.removesuffix("-unhacs.yaml")
)
yield package
# Read a list of Packages from a text file in the plain text format "URL version name"
def read_lock_packages(package_file: Path = DEFAULT_PACKAGE_FILE) -> list[Package]:
if package_file.exists():
return [
Package.from_yaml(p)
for p in yaml.safe_load(package_file.open())["packages"]
]
return []
# Write a list of Packages to a text file in the format URL version name
def write_lock_packages(
packages: Iterable[Package], package_file: Path = DEFAULT_PACKAGE_FILE
):
yaml.dump({"packages": [p.to_yaml() for p in packages]}, package_file.open("w"))

View File

@ -1,82 +0,0 @@
from collections.abc import Iterable
from pathlib import Path
from typing import cast
import yaml
from unhacs.packages.common import Package
from unhacs.packages.common import PackageType
from unhacs.packages.fork import Fork
from unhacs.packages.integration import Integration
from unhacs.packages.plugin import Plugin
from unhacs.packages.theme import Theme
from unhacs.utils import DEFAULT_HASS_CONFIG_PATH
from unhacs.utils import DEFAULT_PACKAGE_FILE
def from_yaml(data: dict | Path | str) -> Package:
if isinstance(data, Path):
data = yaml.safe_load(data.open())
elif isinstance(data, str):
data = yaml.safe_load(data)
data = cast(dict, data)
# Convert package_type to enum
package_type = data.pop("package_type", None)
if package_type and isinstance(package_type, str):
package_type = PackageType(package_type)
url = data.pop("url")
return {
PackageType.INTEGRATION: Integration,
PackageType.PLUGIN: Plugin,
PackageType.THEME: Theme,
PackageType.FORK: Fork,
}[package_type](url, **data)
def get_installed_packages(
hass_config_path: Path = DEFAULT_HASS_CONFIG_PATH,
package_types: Iterable[PackageType] = (
PackageType.INTEGRATION,
PackageType.FORK,
PackageType.PLUGIN,
PackageType.THEME,
),
) -> list[Package]:
# Integration packages
packages: list[Package] = []
if PackageType.INTEGRATION in package_types:
packages.extend(Integration.find_installed(hass_config_path))
if PackageType.FORK in package_types:
packages.extend(Fork.find_installed(hass_config_path))
# Plugin packages
if PackageType.PLUGIN in package_types:
packages.extend(Plugin.find_installed(hass_config_path))
# Theme packages
if PackageType.THEME in package_types:
packages.extend(Theme.find_installed(hass_config_path))
return packages
# Read a list of Packages from a text file in the plain text format "URL version name"
def read_lock_packages(package_file: Path = DEFAULT_PACKAGE_FILE) -> list[Package]:
if package_file.exists():
with package_file.open() as f:
return [from_yaml(p) for p in yaml.safe_load(f)["packages"]]
return []
# Write a list of Packages to a text file in the format URL version name
def write_lock_packages(
packages: Iterable[Package], package_file: Path = DEFAULT_PACKAGE_FILE
):
with open(package_file, "w") as f:
yaml.dump({"packages": [p.to_yaml() for p in packages]}, f)

View File

@ -1,225 +0,0 @@
import shutil
from enum import StrEnum
from enum import auto
from pathlib import Path
from typing import Any
from typing import cast
import requests
import yaml
from unhacs.git import get_repo_tags
class PackageType(StrEnum):
INTEGRATION = auto()
PLUGIN = auto()
FORK = auto()
THEME = auto()
class Package:
git_tags = False
package_type: PackageType
other_fields: list[str] = []
def __init__(
self,
url: str,
version: str | None = None,
ignored_versions: set[str] | None = None,
):
self.url = url
self.ignored_versions = ignored_versions or set()
parts = self.url.split("/")
self.owner = parts[-2]
self.name = parts[-1]
self.path: Path | None = None
if not version:
self.version = self.fetch_version_release()
else:
self.version = version
def __str__(self):
return f"{self.package_type}: {self.name} {self.version}"
def __eq__(self, other):
return all(
(
self.same(other),
self.version == other.version,
)
)
def same(self, other):
fields = list(["url"] + self.other_fields)
return all((getattr(self, field) == getattr(other, field) for field in fields))
def __hash__(self):
fields = list(["url"] + self.other_fields)
return hash(tuple(getattr(self, field) for field in fields))
def verbose_str(self):
return f"{str(self)} ({self.url})"
@classmethod
def from_yaml(cls, data: dict | Path | str) -> "Package":
if isinstance(data, Path):
with data.open() as f:
data = yaml.safe_load(f)
elif isinstance(data, str):
data = yaml.safe_load(data)
data = cast(dict, data)
if (package_type := data.pop("package_type")) != cls.package_type:
raise ValueError(
f"Invalid package_type ({package_type}) for this class {cls.package_type}"
)
return cls(data.pop("url"), **data)
def to_yaml(self, dest: Path | None = None) -> dict:
data: dict[str, Any] = {
"url": self.url,
"version": self.version,
"package_type": str(self.package_type),
}
if self.ignored_versions:
data["ignored_versions"] = self.ignored_versions
for field in self.other_fields:
if hasattr(self, field):
data[field] = getattr(self, field)
if dest:
with dest.open("w") as f:
yaml.dump(self.to_yaml(), f)
return data
def add_ignored_version(self, version: str):
self.ignored_versions.add(version)
def _fetch_version_release_releases(self, version: str | None = None) -> str:
# Fetch the releases from the GitHub API
response = requests.get(
f"https://api.github.com/repos/{self.owner}/{self.name}/releases"
)
response.raise_for_status()
releases = response.json()
if not releases:
raise ValueError(f"No releases found for package {self.name}")
# Default to latest
desired_release = releases[0]
# If a version is provided, check if it exists in the releases
if version:
for release in releases:
if release["tag_name"] == version:
desired_release = release
break
else:
raise ValueError(f"Version {version} does not exist for this package")
return cast(str, desired_release["tag_name"])
def _fetch_version_release_git(self, version: str | None = None) -> str:
tags = get_repo_tags(self.url)
if not tags:
raise ValueError(f"No tags found for package {self.name}")
if version and version not in tags:
raise ValueError(f"Version {version} does not exist for this package")
tags = [tag for tag in tags if tag not in self.ignored_versions]
if not version:
version = tags[-1]
return version
def fetch_version_release(self, version: str | None = None) -> str:
if self.git_tags:
return self._fetch_version_release_git(version)
else:
return self._fetch_version_release_releases(version)
def _fetch_versions(self) -> list[str]:
return get_repo_tags(self.url)
def get_hacs_json(self, version: str | None = None) -> dict:
"""Fetches the hacs.json file for the package."""
version = version or self.version
response = requests.get(
f"https://raw.githubusercontent.com/{self.owner}/{self.name}/{version}/hacs.json"
)
if response.status_code == 404:
return {}
response.raise_for_status()
return response.json()
def install(self, hass_config_path: Path):
raise NotImplementedError()
@property
def unhacs_path(self) -> Path | None:
if self.path is None:
return None
return self.path / "unhacs.yaml"
def uninstall(self, hass_config_path: Path) -> bool:
"""Uninstalls the package if it is installed, returning True if it was uninstalled."""
if not self.path:
if installed_package := self.installed_package(hass_config_path):
installed_package.uninstall(hass_config_path)
return True
return False
if self.path.is_dir():
shutil.rmtree(self.path)
else:
self.path.unlink()
if self.unhacs_path and self.unhacs_path.exists():
self.unhacs_path.unlink()
return True
@classmethod
def get_install_dir(cls, hass_config_path: Path) -> Path:
raise NotImplementedError()
@classmethod
def find_installed(cls, hass_config_path: Path) -> list["Package"]:
raise NotImplementedError()
def installed_package(self, hass_config_path: Path) -> "Package|None":
"""Returns the installed package if it exists, otherwise None."""
for package in self.find_installed(hass_config_path):
if self.same(package):
return package
return None
def is_update(self, hass_config_path: Path) -> bool:
"""Returns True if the package is not installed or the installed version is different from the latest."""
installed_package = self.installed_package(hass_config_path)
return installed_package is None or installed_package.version != self.version
def get_latest(self) -> "Package":
"""Returns a new Package representing the latest version of this package."""
package = self.to_yaml()
package.pop("version")
package.pop("package_type")
return self.__class__(package.pop("url"), **package)

View File

@ -1,106 +0,0 @@
import json
import shutil
import tempfile
from io import BytesIO
from pathlib import Path
from zipfile import ZipFile
import requests
import yaml
from unhacs.git import get_branch_zip
from unhacs.git import get_latest_sha
from unhacs.git import get_sha_zip
from unhacs.packages import PackageType
from unhacs.packages.common import Package
from unhacs.packages.integration import Integration
from unhacs.utils import extract_zip
class Fork(Integration):
other_fields = ["fork_component", "branch_name"]
package_type = PackageType.FORK
def __init__(
self,
url: str,
fork_component: str,
branch_name: str,
version: str | None = None,
ignored_versions: set[str] | None = None,
):
self.fork_component = fork_component
self.branch_name = branch_name
super().__init__(
url,
version=version,
ignored_versions=ignored_versions,
)
def __str__(self):
return f"{self.package_type}: {self.fork_component} ({self.owner}/{self.name}@{self.branch_name}) {self.version}"
def fetch_version_release(self, version: str | None = None) -> str:
if version:
return version
return get_latest_sha(self.url, self.branch_name)
@classmethod
def find_installed(cls, hass_config_path: Path) -> list[Package]:
packages: list[Package] = []
for custom_component in cls.get_install_dir(hass_config_path).glob("*"):
unhacs = custom_component / "unhacs.yaml"
if unhacs.exists():
data = yaml.safe_load(unhacs.read_text())
if data["package_type"] != "fork":
continue
package = cls.from_yaml(data)
package.path = custom_component
packages.append(package)
return packages
def install(self, hass_config_path: Path) -> None:
"""Installs the integration from hass fork."""
if self.version:
zipball_url = get_sha_zip(self.url, self.version)
else:
zipball_url = get_branch_zip(self.url, self.branch_name)
response = requests.get(zipball_url)
response.raise_for_status()
with tempfile.TemporaryDirectory(prefix="unhacs-") as tempdir:
tmpdir = Path(tempdir)
extract_zip(ZipFile(BytesIO(response.content)), tmpdir)
source, dest = None, None
source = tmpdir / "homeassistant" / "components" / self.fork_component
if not source.exists() or not source.is_dir():
raise ValueError(
f"Could not find {self.fork_component} in {self.url}@{self.version}"
)
# Add version to manifest
manifest_file = source / "manifest.json"
manifest: dict[str, str]
with manifest_file.open("r") as f:
manifest = json.load(f)
manifest["version"] = "0.0.0"
with manifest_file.open("w") as f:
json.dump(manifest, f)
dest = self.get_install_dir(hass_config_path) / source.name
if not source or not dest:
raise ValueError("No custom_components directory found")
dest.parent.mkdir(parents=True, exist_ok=True)
shutil.rmtree(dest, ignore_errors=True)
shutil.move(source, dest)
self.path = dest
self.to_yaml(self.unhacs_path)

View File

@ -1,81 +0,0 @@
import json
import shutil
import tempfile
from io import BytesIO
from pathlib import Path
from zipfile import ZipFile
import requests
import yaml
from unhacs.git import get_tag_zip
from unhacs.packages import Package
from unhacs.packages import PackageType
from unhacs.utils import extract_zip
class Integration(Package):
package_type = PackageType.INTEGRATION
def __init__(
self,
url: str,
version: str | None = None,
ignored_versions: set[str] | None = None,
):
super().__init__(
url,
version=version,
ignored_versions=ignored_versions,
)
@classmethod
def get_install_dir(cls, hass_config_path: Path) -> Path:
return hass_config_path / "custom_components"
@classmethod
def find_installed(cls, hass_config_path: Path) -> list[Package]:
packages: list[Package] = []
for custom_component in cls.get_install_dir(hass_config_path).glob("*"):
unhacs = custom_component / "unhacs.yaml"
if unhacs.exists():
data = yaml.safe_load(unhacs.read_text())
if data["package_type"] == "fork":
continue
package = cls.from_yaml(data)
package.path = custom_component
packages.append(package)
return packages
def install(self, hass_config_path: Path) -> None:
"""Installs the integration package."""
zipball_url = get_tag_zip(self.url, self.version)
response = requests.get(zipball_url)
response.raise_for_status()
with tempfile.TemporaryDirectory(prefix="unhacs-") as tempdir:
tmpdir = Path(tempdir)
extract_zip(ZipFile(BytesIO(response.content)), tmpdir)
source, dest = None, None
for custom_component in tmpdir.glob("custom_components/*"):
source = custom_component
dest = self.get_install_dir(hass_config_path) / custom_component.name
break
else:
hacs_json = json.loads((tmpdir / "hacs.json").read_text())
if hacs_json.get("content_in_root"):
source = tmpdir
dest = self.get_install_dir(hass_config_path) / self.name
if not source or not dest:
raise ValueError("No custom_components directory found")
dest.parent.mkdir(parents=True, exist_ok=True)
shutil.rmtree(dest, ignore_errors=True)
shutil.move(source, dest)
self.path = dest
self.to_yaml(self.unhacs_path)

View File

@ -1,95 +0,0 @@
from pathlib import Path
from typing import cast
import requests
from unhacs.packages import Package
from unhacs.packages import PackageType
class Plugin(Package):
package_type = PackageType.PLUGIN
def __init__(
self,
url: str,
version: str | None = None,
ignored_versions: set[str] | None = None,
):
super().__init__(
url,
version=version,
ignored_versions=ignored_versions,
)
@classmethod
def get_install_dir(cls, hass_config_path: Path) -> Path:
return hass_config_path / "www" / "js"
@property
def unhacs_path(self) -> Path | None:
if self.path is None:
return None
return self.path.with_name(f"{self.path.name}-unhacs.yaml")
@classmethod
def find_installed(cls, hass_config_path: Path) -> list["Package"]:
packages: list[Package] = []
for js_unhacs in cls.get_install_dir(hass_config_path).glob("*-unhacs.yaml"):
package = cls.from_yaml(js_unhacs)
package.path = js_unhacs.with_name(
js_unhacs.name.removesuffix("-unhacs.yaml")
)
packages.append(package)
return packages
def install(self, hass_config_path: Path) -> None:
"""Installs the plugin package."""
valid_filenames: list[str]
if filename := self.get_hacs_json().get("filename"):
valid_filenames = [cast(str, filename)]
else:
valid_filenames = [
f"{self.name.removeprefix('lovelace-')}.js",
f"{self.name}.js",
f"{self.name}-umd.js",
f"{self.name}-bundle.js",
]
def real_get(filename) -> requests.Response | None:
urls = [
f"https://raw.githubusercontent.com/{self.owner}/{self.name}/{self.version}/dist/{filename}",
f"https://github.com/{self.owner}/{self.name}/releases/download/{self.version}/{filename}",
f"https://raw.githubusercontent.com/{self.owner}/{self.name}/{self.version}/{filename}",
]
for url in urls:
plugin = requests.get(url)
if int(plugin.status_code / 100) == 4:
continue
plugin.raise_for_status()
return plugin
return None
for filename in valid_filenames:
plugin = real_get(filename)
if plugin:
break
else:
raise ValueError(f"No valid filename found for package {self.name}")
js_path = self.get_install_dir(hass_config_path)
js_path.mkdir(parents=True, exist_ok=True)
self.path = js_path.joinpath(filename)
self.path.write_text(plugin.text)
self.to_yaml(self.unhacs_path)

View File

@ -1,63 +0,0 @@
from pathlib import Path
from typing import cast
import requests
from unhacs.packages import Package
from unhacs.packages import PackageType
class Theme(Package):
package_type = PackageType.THEME
def __init__(
self,
url: str,
version: str | None = None,
ignored_versions: set[str] | None = None,
):
super().__init__(
url,
version=version,
ignored_versions=ignored_versions,
)
@classmethod
def get_install_dir(cls, hass_config_path: Path) -> Path:
return hass_config_path / "themes"
@property
def unhacs_path(self) -> Path | None:
if self.path is None:
return None
return self.path.with_name(f"{self.path.name}.unhacs")
@classmethod
def find_installed(cls, hass_config_path: Path) -> list["Package"]:
packages: list[Package] = []
for js_unhacs in cls.get_install_dir(hass_config_path).glob("*.unhacs"):
package = cls.from_yaml(js_unhacs)
package.path = js_unhacs.with_name(js_unhacs.name.removesuffix(".unhacs"))
packages.append(package)
return packages
def install(self, hass_config_path: Path) -> None:
"""Install theme yaml."""
filename = self.get_hacs_json().get("filename")
if not filename:
raise ValueError(f"No filename found for theme {self.name}")
filename = cast(str, filename)
url = f"https://raw.githubusercontent.com/{self.owner}/{self.name}/{self.version}/themes/{filename}"
theme = requests.get(url)
theme.raise_for_status()
themes_path = self.get_install_dir(hass_config_path)
themes_path.mkdir(parents=True, exist_ok=True)
self.path = themes_path.joinpath(filename)
self.path.write_text(theme.text)
self.to_yaml(self.unhacs_path)

View File

@ -1,21 +0,0 @@
from pathlib import Path
from zipfile import ZipFile
DEFAULT_HASS_CONFIG_PATH: Path = Path(".")
DEFAULT_PACKAGE_FILE = Path("unhacs.yaml")
def extract_zip(zip_file: ZipFile, dest_dir: Path) -> Path:
"""Extract a zip file to a directory."""
for info in zip_file.infolist():
if info.is_dir():
continue
file = Path(info.filename)
# Strip top directory from path
file = Path(*file.parts[1:])
path = dest_dir / file
path.parent.mkdir(parents=True, exist_ok=True)
with zip_file.open(info) as source, open(path, "wb") as dest:
dest.write(source.read())
return dest_dir