Add tests and fix things

This commit is contained in:
IamTheFij 2024-09-18 11:33:27 -07:00
parent 9d87f7748c
commit 5db03fee66
14 changed files with 394 additions and 72 deletions

1
.gitignore vendored
View File

@ -143,3 +143,4 @@ unhacs.txt
poetry.lock
custom_components/
themes/
unhacs.yaml

View File

@ -24,7 +24,7 @@ lint: devenv
# Runs tests
.PHONY: test
test: devenv
@echo TODO: poetry run pytest
poetry run python -m unittest discover tests
# Builds wheel for package to upload
.PHONY: build

View File

@ -12,14 +12,15 @@ readme = "README.md"
[tool.poetry.dependencies]
python = "^3.11"
requests = "^2.32.3"
pyyaml = "^6.0.1"
requests = "^2.32.0"
pyyaml = "^6.0.0"
[tool.poetry.group.dev.dependencies]
black = "^24.4.2"
mypy = "^1.10.0"
pre-commit = "^3.7.1"
types-requests = "^2.32.0.20240602"
types-requests = "^2.32.0"
types-pyyaml = "^6.0.0"
[tool.poetry.scripts]
unhacs = 'unhacs.main:main'

0
tests/__init__.py Normal file
View File

235
tests/main_test.py Normal file
View File

@ -0,0 +1,235 @@
import os
import shutil
import tempfile
import unittest
from pathlib import Path
from unhacs.main import main
from unhacs.packages import get_installed_packages
INTEGRATION_URL = "https://github.com/simbaja/ha_gehome"
INTEGRATION_VERSION = "v0.6.9"
PLUGIN_URL = "https://github.com/kalkih/mini-media-player"
PLUGIN_VERSION = "v1.16.8"
THEME_URL = "https://github.com/basnijholt/lovelace-ios-themes"
THEME_VERSION = "v3.0.1"
FORK_URL = "https://github.com/ViViDboarder/home-assistant"
FORK_BRANCH = "dev"
FORK_COMPONENT = "nextbus"
FORK_VERSION = "3b2893f2f4e16f9a05d9cc4a7ba9f31984c841be"
class TestMainIntegrarion(unittest.TestCase):
test_dir: str
def setUp(self):
self.test_dir = tempfile.mkdtemp()
os.chdir(self.test_dir)
def tearDown(self):
shutil.rmtree(self.test_dir)
pass
def run_itest(
self,
test_name: str,
command: str,
expected_files: list[str] | None = None,
expect_missing_files: list[str] | None = None,
expected_code: int = 0,
):
with self.subTest(test_name, command=command):
self.assertEqual(main(command.split()), expected_code)
# Verify that the package was installed by checking the filesystem
if expected_files:
expected_files = [
os.path.join(self.test_dir, file) for file in expected_files
]
missing_files = [
file for file in expected_files if not os.path.exists(file)
]
if missing_files:
self.fail(f"Missing files: {missing_files}")
if expect_missing_files:
expect_missing_files = [
os.path.join(self.test_dir, file) for file in expect_missing_files
]
existing_files = [
file for file in expect_missing_files if os.path.exists(file)
]
if existing_files:
self.fail(f"Files should not exist: {existing_files}")
def test_integration(self):
self.run_itest(
"Add integration",
f"add {INTEGRATION_URL} --version {INTEGRATION_VERSION}",
expected_files=[
"custom_components/ge_home/__init__.py",
"custom_components/ge_home/manifest.json",
"custom_components/ge_home/switch.py",
],
)
self.run_itest(
"List installed packages",
"list",
)
installed = get_installed_packages()
self.assertEqual(len(installed), 1)
self.assertEqual(installed[0].url, INTEGRATION_URL)
self.assertEqual(installed[0].version, INTEGRATION_VERSION)
self.run_itest(
"Double add",
f"add {INTEGRATION_URL}",
expected_code=1,
)
self.run_itest(
"Upgrade to latest version",
"upgrade ha_gehome --yes",
expected_files=[
"custom_components/ge_home/__init__.py",
"custom_components/ge_home/manifest.json",
"custom_components/ge_home/switch.py",
],
)
installed = get_installed_packages()
self.assertEqual(len(installed), 1)
self.assertEqual(installed[0].url, INTEGRATION_URL)
self.assertNotEqual(installed[0].version, INTEGRATION_VERSION)
self.run_itest(
"Downgrade integration",
f"add {INTEGRATION_URL} --version {INTEGRATION_VERSION} --update",
expected_files=[
"custom_components/ge_home/__init__.py",
"custom_components/ge_home/manifest.json",
"custom_components/ge_home/switch.py",
],
)
self.run_itest(
"List installed packages",
"list",
)
installed = get_installed_packages()
self.assertEqual(len(installed), 1)
self.assertEqual(installed[0].url, INTEGRATION_URL)
self.assertEqual(installed[0].version, INTEGRATION_VERSION)
self.run_itest(
"Remove integration",
"remove ha_gehome --yes",
expect_missing_files=[
"custom_components/ge_home/__init__.py",
"custom_components/ge_home/manifest.json",
"custom_components/ge_home/switch.py",
],
)
installed = get_installed_packages()
self.assertEqual(len(installed), 0)
def test_plugin(self):
self.run_itest(
"Add plugin",
f"add --plugin {PLUGIN_URL} --version {PLUGIN_VERSION}",
expected_files=[
"www/js/mini-media-player-bundle.js",
],
)
self.run_itest(
"List installed packages",
"list",
)
installed = get_installed_packages()
self.assertEqual(len(installed), 1)
self.assertEqual(installed[0].url, PLUGIN_URL)
self.assertEqual(installed[0].version, PLUGIN_VERSION)
self.run_itest(
"Remove plugin",
"remove mini-media-player --yes",
expect_missing_files=[
"www/js/mini-media-player-bundle.js",
],
)
installed = get_installed_packages()
self.assertEqual(len(installed), 0)
def test_theme(self):
self.run_itest(
"Add theme",
f"add --theme {THEME_URL} --version {THEME_VERSION}",
expected_files=[
"themes/ios-themes.yaml",
],
)
self.run_itest(
"List installed packages",
"list",
)
installed = get_installed_packages()
self.assertEqual(len(installed), 1)
self.assertEqual(installed[0].url, THEME_URL)
self.assertEqual(installed[0].version, THEME_VERSION)
self.run_itest(
"Remove theme",
"remove lovelace-ios-themes --yes",
expect_missing_files=[
"themes/ios-themes.yaml",
],
)
installed = get_installed_packages()
self.assertEqual(len(installed), 0)
def test_fork(self):
self.run_itest(
"Add fork",
f"add {FORK_URL} --fork-component {FORK_COMPONENT} --fork-branch {FORK_BRANCH} --version {FORK_VERSION}",
expected_files=[
"custom_components/nextbus/__init__.py",
"custom_components/nextbus/manifest.json",
"custom_components/nextbus/sensor.py",
"custom_components/nextbus/unhacs.yaml",
],
)
self.run_itest(
"List installed packages",
"list",
)
installed = get_installed_packages()
self.assertEqual(len(installed), 1)
self.assertEqual(installed[0].url, FORK_URL)
self.assertEqual(installed[0].version, FORK_VERSION)
self.run_itest(
"Remove fork",
f"remove {FORK_URL} --yes",
expect_missing_files=[
"custom_components/nextbus/__init__.py",
"custom_components/nextbus/manifest.json",
"custom_components/nextbus/sensor.py",
"custom_components/nextbus/unhacs.yaml",
],
)
installed = get_installed_packages()
self.assertEqual(len(installed), 0)
if __name__ == "__main__":
unittest.main()

View File

@ -1,4 +1,4 @@
from unhacs.main import main
if __name__ == "__main__":
main()
exit(main())

View File

@ -90,3 +90,7 @@ def get_tag_zip(repository_url: str, tag_name: str) -> str:
def get_branch_zip(repository_url: str, branch_name: str) -> str:
return f"{repository_url}/archive/{branch_name}.zip"
def get_sha_zip(repository_url: str, sha: str) -> str:
return f"{repository_url}/archive/{sha}.zip"

View File

@ -1,3 +1,4 @@
import sys
from argparse import ArgumentParser
from collections.abc import Iterable
from pathlib import Path
@ -16,7 +17,15 @@ from unhacs.utils import DEFAULT_HASS_CONFIG_PATH
from unhacs.utils import DEFAULT_PACKAGE_FILE
def parse_args():
class InvalidArgumentsError(ValueError):
pass
class DuplicatePackageError(ValueError):
pass
def parse_args(argv: list[str]):
parser = ArgumentParser(
description="Unhacs - Command line interface for the Home Assistant Community Store"
)
@ -121,15 +130,21 @@ def parse_args():
remove_parser = subparsers.add_parser(
"remove", description="Remove installed packages."
)
remove_parser.add_argument(
"--yes", "-y", action="store_true", help="Do not prompt for confirmation."
)
remove_parser.add_argument("packages", nargs="+")
# Upgrade packages
update_parser = subparsers.add_parser(
"upgrade", description="Upgrade installed packages."
)
update_parser.add_argument(
"--yes", "-y", action="store_true", help="Do not prompt for confirmation."
)
update_parser.add_argument("packages", nargs="*")
args = parser.parse_args()
args = parser.parse_args(argv)
if args.subcommand == "add":
# Component implies forked package
@ -138,7 +153,7 @@ def parse_args():
# Branch is only valid for forked packages
if args.type != Fork and args.fork_branch:
raise ValueError(
raise InvalidArgumentsError(
"Branch and component can only be used with forked packages"
)
@ -174,14 +189,14 @@ class Unhacs:
# Remove old version of the package
packages = [p for p in packages if p == existing_package]
else:
raise ValueError("Package already exists in the list")
raise DuplicatePackageError("Package already exists in the list")
package.install(self.hass_config)
packages.append(package)
self.write_lock_packages(packages)
def upgrade_packages(self, package_names: list[str]):
def upgrade_packages(self, package_names: list[str], yes: bool = False):
"""Uograde to latest version of packages and update lock."""
installed_packages: Iterable[Package]
@ -205,7 +220,8 @@ class Unhacs:
)
outdated_packages.append(latest_package)
if outdated_packages and input("Upgrade all packages? (y/N) ").lower() != "y":
confirmed = yes or input("Upgrade all packages? (y/N) ").lower() == "y"
if outdated_packages and not confirmed:
return
for installed_package in outdated_packages:
@ -227,7 +243,7 @@ class Unhacs:
for tag in get_repo_tags(url)[-1 * limit :]:
print(tag)
def remove_packages(self, package_names: list[str]):
def remove_packages(self, package_names: list[str], yes: bool = False):
"""Remove installed packages and uodate lock."""
packages_to_remove = [
package
@ -250,10 +266,8 @@ class Unhacs:
for package in packages_to_remove:
print(package)
if (
packages_to_remove
and input("Remove listed packages? (y/N) ").lower() != "y"
):
confirmed = yes or input("Remove listed packages? (y/N) ").lower() == "y"
if packages_to_remove and not confirmed:
return
remaining_packages = [
@ -277,9 +291,13 @@ def args_to_package(args) -> Package:
if args.type == Fork:
if not args.fork_branch:
raise ValueError("A branch must be provided for forked components")
raise InvalidArgumentsError(
"A branch must be provided for forked components"
)
if not args.fork_component:
raise ValueError("A component must be provided for forked components")
raise InvalidArgumentsError(
"A component must be provided for forked components"
)
return Fork(
args.url,
@ -292,9 +310,8 @@ def args_to_package(args) -> Package:
return args.type(args.url, version=args.version, ignored_versions=ignore_versions)
def main():
# If the sub command is add package, it should pass the parsed arguments to the add_package function and return
args = parse_args()
def main(argv: list[str] | None = None) -> int:
args = parse_args(argv or sys.argv[1:])
unhacs = Unhacs(args.config, args.package_file)
Package.git_tags = args.git_tags
@ -309,25 +326,36 @@ def main():
update=True,
)
elif args.url:
new_package = args_to_package(args)
unhacs.add_package(
new_package,
update=args.update,
)
try:
new_package = args_to_package(args)
except InvalidArgumentsError as e:
print(e)
return 1
try:
unhacs.add_package(
new_package,
update=args.update,
)
except DuplicatePackageError as e:
print(e)
return 1
else:
raise ValueError("Either a file or a URL must be provided")
print("Either a file or a URL must be provided")
return 1
elif args.subcommand == "list":
unhacs.list_packages(args.verbose)
elif args.subcommand == "tags":
unhacs.list_tags(args.url, limit=args.limit)
elif args.subcommand == "remove":
unhacs.remove_packages(args.packages)
unhacs.remove_packages(args.packages, yes=args.yes)
elif args.subcommand == "upgrade":
unhacs.upgrade_packages(args.packages)
unhacs.upgrade_packages(args.packages, yes=args.yes)
else:
print(f"Command {args.subcommand} is not implemented")
exit(1)
return 1
return 0
if __name__ == "__main__":
main()
exit(main())

View File

@ -41,6 +41,7 @@ def get_installed_packages(
hass_config_path: Path = DEFAULT_HASS_CONFIG_PATH,
package_types: Iterable[PackageType] = (
PackageType.INTEGRATION,
PackageType.FORK,
PackageType.PLUGIN,
PackageType.THEME,
),
@ -51,6 +52,9 @@ def get_installed_packages(
if PackageType.INTEGRATION in package_types:
packages.extend(Integration.find_installed(hass_config_path))
if PackageType.FORK in package_types:
packages.extend(Fork.find_installed(hass_config_path))
# Plugin packages
if PackageType.PLUGIN in package_types:
packages.extend(Plugin.find_installed(hass_config_path))
@ -65,7 +69,8 @@ def get_installed_packages(
# Read a list of Packages from a text file in the plain text format "URL version name"
def read_lock_packages(package_file: Path = DEFAULT_PACKAGE_FILE) -> list[Package]:
if package_file.exists():
return [from_yaml(p) for p in yaml.safe_load(package_file.open())["packages"]]
with package_file.open() as f:
return [from_yaml(p) for p in yaml.safe_load(f)["packages"]]
return []
@ -73,4 +78,5 @@ def read_lock_packages(package_file: Path = DEFAULT_PACKAGE_FILE) -> list[Packag
def write_lock_packages(
packages: Iterable[Package], package_file: Path = DEFAULT_PACKAGE_FILE
):
yaml.dump({"packages": [p.to_yaml() for p in packages]}, package_file.open("w"))
with open(package_file, "w") as f:
yaml.dump({"packages": [p.to_yaml() for p in packages]}, f)

View File

@ -51,7 +51,7 @@ class Package:
return all(
(
self.same(other),
# TODO: Should this match versions?
self.version == other.version,
)
)
@ -71,14 +71,17 @@ class Package:
@classmethod
def from_yaml(cls, data: dict | Path | str) -> "Package":
if isinstance(data, Path):
data = yaml.safe_load(data.open())
with data.open() as f:
data = yaml.safe_load(f)
elif isinstance(data, str):
data = yaml.safe_load(data)
data = cast(dict, data)
if data["package_type"] != cls.package_type:
raise ValueError("Invalid package_type")
if (package_type := data.pop("package_type")) != cls.package_type:
raise ValueError(
f"Invalid package_type ({package_type}) for this class {cls.package_type}"
)
return cls(data.pop("url"), **data)
@ -97,7 +100,8 @@ class Package:
data[field] = getattr(self, field)
if dest:
yaml.dump(self.to_yaml(), dest.open("w"))
with dest.open("w") as f:
yaml.dump(self.to_yaml(), f)
return data
@ -167,36 +171,28 @@ class Package:
def install(self, hass_config_path: Path):
raise NotImplementedError()
@property
def unhacs_path(self) -> Path | None:
if self.path is None:
return None
return self.path / "unhacs.yaml"
def uninstall(self, hass_config_path: Path) -> bool:
"""Uninstalls the package if it is installed, returning True if it was uninstalled."""
if not self.path:
print("No path found for package, searching...")
if installed_package := self.installed_package(hass_config_path):
installed_package.uninstall(hass_config_path)
return True
return False
print("Removing", self.path)
if self.path.is_dir():
shutil.rmtree(self.path)
else:
self.path.unlink()
self.path.with_name(f"{self.path.name}-unhacs.yaml").unlink()
# Remove from resources
resources_file = hass_config_path / "resources.yaml"
if resources_file.exists():
with resources_file.open("r") as f:
resources = yaml.safe_load(f) or []
new_resources = [
r for r in resources if r["url"] != f"/local/js/{self.path.name}"
]
if len(new_resources) != len(resources):
with resources_file.open("w") as f:
yaml.dump(new_resources, f)
if self.unhacs_path and self.unhacs_path.exists():
self.unhacs_path.unlink()
return True
@ -225,4 +221,5 @@ class Package:
"""Returns a new Package representing the latest version of this package."""
package = self.to_yaml()
package.pop("version")
return Package(**package)
package.pop("package_type")
return self.__class__(package.pop("url"), **package)

View File

@ -6,10 +6,13 @@ from pathlib import Path
from zipfile import ZipFile
import requests
import yaml
from unhacs.git import get_branch_zip
from unhacs.git import get_latest_sha
from unhacs.git import get_sha_zip
from unhacs.packages import PackageType
from unhacs.packages.common import Package
from unhacs.packages.integration import Integration
from unhacs.utils import extract_zip
@ -39,11 +42,34 @@ class Fork(Integration):
return f"{self.package_type}: {self.fork_component} ({self.owner}/{self.name}@{self.branch_name}) {self.version}"
def fetch_version_release(self, version: str | None = None) -> str:
if version:
return version
return get_latest_sha(self.url, self.branch_name)
@classmethod
def find_installed(cls, hass_config_path: Path) -> list[Package]:
packages: list[Package] = []
for custom_component in cls.get_install_dir(hass_config_path).glob("*"):
unhacs = custom_component / "unhacs.yaml"
if unhacs.exists():
data = yaml.safe_load(unhacs.read_text())
if data["package_type"] != "fork":
continue
package = cls.from_yaml(data)
package.path = custom_component
packages.append(package)
return packages
def install(self, hass_config_path: Path) -> None:
"""Installs the integration from hass fork."""
zipball_url = get_branch_zip(self.url, self.branch_name)
if self.version:
zipball_url = get_sha_zip(self.url, self.version)
else:
zipball_url = get_branch_zip(self.url, self.branch_name)
response = requests.get(zipball_url)
response.raise_for_status()
@ -60,9 +86,12 @@ class Fork(Integration):
# Add version to manifest
manifest_file = source / "manifest.json"
manifest = json.load(manifest_file.open())
manifest["version"] = "0.0.0"
json.dump(manifest, manifest_file.open("w"))
manifest: dict[str, str]
with manifest_file.open("r") as f:
manifest = json.load(f)
manifest["version"] = "0.0.0"
with manifest_file.open("w") as f:
json.dump(manifest, f)
dest = self.get_install_dir(hass_config_path) / source.name
@ -72,5 +101,6 @@ class Fork(Integration):
dest.parent.mkdir(parents=True, exist_ok=True)
shutil.rmtree(dest, ignore_errors=True)
shutil.move(source, dest)
self.path = dest
self.to_yaml(dest.joinpath("unhacs.yaml"))
self.to_yaml(self.unhacs_path)

View File

@ -6,6 +6,7 @@ from pathlib import Path
from zipfile import ZipFile
import requests
import yaml
from unhacs.git import get_tag_zip
from unhacs.packages import Package
@ -33,13 +34,16 @@ class Integration(Package):
return hass_config_path / "custom_components"
@classmethod
def find_installed(cls, hass_config_path: Path) -> list["Package"]:
def find_installed(cls, hass_config_path: Path) -> list[Package]:
packages: list[Package] = []
for custom_component in cls.get_install_dir(hass_config_path).glob("*"):
unhacs = custom_component / "unhacs.yaml"
if unhacs.exists():
package = cls.from_yaml(unhacs)
data = yaml.safe_load(unhacs.read_text())
if data["package_type"] == "fork":
continue
package = cls.from_yaml(data)
package.path = custom_component
packages.append(package)
@ -72,5 +76,6 @@ class Integration(Package):
dest.parent.mkdir(parents=True, exist_ok=True)
shutil.rmtree(dest, ignore_errors=True)
shutil.move(source, dest)
self.path = dest
self.to_yaml(dest.joinpath("unhacs.yaml"))
self.to_yaml(self.unhacs_path)

View File

@ -26,6 +26,13 @@ class Plugin(Package):
def get_install_dir(cls, hass_config_path: Path) -> Path:
return hass_config_path / "www" / "js"
@property
def unhacs_path(self) -> Path | None:
if self.path is None:
return None
return self.path.with_name(f"{self.path.name}-unhacs.yaml")
@classmethod
def find_installed(cls, hass_config_path: Path) -> list["Package"]:
packages: list[Package] = []
@ -81,6 +88,8 @@ class Plugin(Package):
js_path = self.get_install_dir(hass_config_path)
js_path.mkdir(parents=True, exist_ok=True)
js_path.joinpath(filename).write_text(plugin.text)
self.to_yaml(js_path.joinpath(f"{filename}-unhacs.yaml"))
self.path = js_path.joinpath(filename)
self.path.write_text(plugin.text)
self.to_yaml(self.unhacs_path)

View File

@ -26,15 +26,20 @@ class Theme(Package):
def get_install_dir(cls, hass_config_path: Path) -> Path:
return hass_config_path / "themes"
@property
def unhacs_path(self) -> Path | None:
if self.path is None:
return None
return self.path.with_name(f"{self.path.name}.unhacs")
@classmethod
def find_installed(cls, hass_config_path: Path) -> list["Package"]:
packages: list[Package] = []
for js_unhacs in cls.get_install_dir(hass_config_path).glob("*-unhacs.yaml"):
for js_unhacs in cls.get_install_dir(hass_config_path).glob("*.unhacs"):
package = cls.from_yaml(js_unhacs)
package.path = js_unhacs.with_name(
js_unhacs.name.removesuffix("-unhacs.yaml")
)
package.path = js_unhacs.with_name(js_unhacs.name.removesuffix(".unhacs"))
packages.append(package)
return packages
@ -52,6 +57,7 @@ class Theme(Package):
themes_path = self.get_install_dir(hass_config_path)
themes_path.mkdir(parents=True, exist_ok=True)
themes_path.joinpath(filename).write_text(theme.text)
self.path = themes_path.joinpath(filename)
self.path.write_text(theme.text)
self.to_yaml(themes_path.joinpath(f"{filename}.unhacs"))
self.to_yaml(self.unhacs_path)