From 0fd0a99c5242bd8d7c36b7b415349e6d9fb49278 Mon Sep 17 00:00:00 2001 From: Ian Fijolek Date: Mon, 10 Jun 2024 16:59:42 -0700 Subject: [PATCH] Use git rather than github --- README.md | 14 +++++++++ unhacs/git.py | 71 ++++++++++++++++++++++++++++++++++++++++++++++ unhacs/main.py | 63 ++++++++++++++++++++++++++++++++-------- unhacs/packages.py | 47 ++++++++++++++++++++++++------ 4 files changed, 174 insertions(+), 21 deletions(-) create mode 100644 unhacs/git.py diff --git a/README.md b/README.md index a2ab2aa..da7b3b1 100644 --- a/README.md +++ b/README.md @@ -40,6 +40,20 @@ For a more detailed output, add the `--verbose` flag: unhacs list --verbose ``` +### List tags + +To list all tags for a package, use the `tags` command followed by the name of the package: + +```bash +unhacs tags +``` + +The number or returned tags is limited to 10 by default. To change this, add the `--limit` flag: + +```bash +unhacs tags --limit 20 +``` + ### Remove a package To remove a package, use the `remove` command followed by the name of the package: diff --git a/unhacs/git.py b/unhacs/git.py new file mode 100644 index 0000000..a01616b --- /dev/null +++ b/unhacs/git.py @@ -0,0 +1,71 @@ +import re +import subprocess +from dataclasses import dataclass + + +@dataclass +class GitTag: + name: str + version: tuple[int, int, int] + suffix: str + + @staticmethod + def parse(name: str): + if result := re.match(r"^[v]?([\d.]+)(.*)", name): + version_str = result.group(1) + suffix = result.group(2) + + parts = version_str.split(".") + if len(parts) > 3: + raise ValueError(f"Invalid version tag: {name}") + + try: + version = ( + int(parts[0]), + int(parts[1]) if len(parts) > 1 else 0, + int(parts[2]) if len(parts) > 2 else 0, + ) + except ValueError: + raise ValueError(f"Invalid version tag: {name}") + + return GitTag(name, version, suffix) + + def __str__(self): + return f"{self.name} {self.version}" + + def __eq__(self, other): + return self.version == other.version and self.suffix == other.suffix + + def __lt__(self, other): + return self.version < other.version or ( + self.version == other.version and self.suffix < other.suffix + ) + + +def get_repo_tags(repository_url: str) -> list[str]: + # Run the command + command = f"git -c 'versionsort.suffix=-' ls-remote --tags --sort='v:refname' {repository_url}" + result = subprocess.run( + command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE + ) + + # Check for errors + if result.returncode != 0: + raise Exception(f"Error running command: {command}\n{result.stderr.decode()}") + + # Parse the output + tags: list[GitTag] = [] + for line in result.stdout.decode().split("\n"): + if line: + if search_result := re.search(r"refs/tags/(.*)", line): + tag = search_result.group(1) + if git_tag := GitTag.parse(tag): + tags.append(git_tag) + + tags.sort() + + return [tag.name for tag in tags] + + +def get_ref_zip(repository_url: str, tag_name: str) -> str: + return f"{repository_url}/archive/refs/tags/{tag_name}.zip" diff --git a/unhacs/main.py b/unhacs/main.py index 6713508..b1239c8 100644 --- a/unhacs/main.py +++ b/unhacs/main.py @@ -2,6 +2,7 @@ from argparse import ArgumentParser from collections.abc import Iterable from pathlib import Path +from unhacs.git import get_repo_tags from unhacs.packages import DEFAULT_HASS_CONFIG_PATH from unhacs.packages import DEFAULT_PACKAGE_FILE from unhacs.packages import Package @@ -29,12 +30,24 @@ def create_parser(): default=DEFAULT_PACKAGE_FILE, help="The path to the package file.", ) + parser.add_argument( + "--use-git", + "-g", + action="store_true", + help="Use git to install packages. This will avoid GitHub API limits.", + ) subparsers = parser.add_subparsers(dest="subcommand", required=True) list_parser = subparsers.add_parser("list", description="List installed packages.") list_parser.add_argument("--verbose", "-v", action="store_true") + list_tags_parser = subparsers.add_parser("tags", help="List tags for a package.") + list_tags_parser.add_argument("url", type=str, help="The URL of the package.") + list_tags_parser.add_argument( + "--limit", type=int, default=10, help="The number of tags to display." + ) + add_parser = subparsers.add_parser("add", description="Add or install packages.") add_parser.add_argument( "--file", "-f", type=Path, help="The path to a package file." @@ -55,6 +68,12 @@ def create_parser(): action="store_true", help="Update the package if it already exists.", ) + add_parser.add_argument( + "--ignore-versions", + "-i", + type=str, + help="The version of the package to ignore. Multiple can be split by a comma.", + ) remove_parser = subparsers.add_parser( "remove", description="Remove installed packages." @@ -90,16 +109,23 @@ class Unhacs: version: str | None = None, update: bool = False, package_type: PackageType = PackageType.INTEGRATION, + ignore_versions: set[str] | None = None, ): """Install and add a package to the lock or install a specific version.""" - package = Package(url=package_url, version=version, package_type=package_type) + package = Package( + package_url, + version=version, + package_type=package_type, + ignored_versions=ignore_versions, + ) packages = self.read_lock_packages() # Raise an error if the package is already in the list - if package in packages: + existing_package = next((p for p in packages if p.url == package.url), None) + if existing_package: if update: # Remove old version of the package - packages = [p for p in packages if p != package] + packages = [p for p in packages if p.url != package.url] else: raise ValueError("Package already exists in the list") @@ -119,7 +145,7 @@ class Unhacs: if p.name in package_names ] - upgrade_packages: list[Package] = [] + outdated_packages: list[Package] = [] latest_packages = [p.get_latest() for p in installed_packages] for installed_package, latest_package in zip( installed_packages, latest_packages @@ -128,16 +154,12 @@ class Unhacs: print( f"upgrade {installed_package.name} from {installed_package.version} to {latest_package.version}" ) - upgrade_packages.append(latest_package) + outdated_packages.append(latest_package) - if not upgrade_packages: - print("Nothing to upgrade") + if outdated_packages and input("Upgrade all packages? (y/N) ").lower() != "y": return - if input("Upgrade all packages? (y/N) ").strip().lower() != "y": - return - - for installed_package in upgrade_packages: + for installed_package in outdated_packages: installed_package.install(self.hass_config) # Update lock file to latest now that we know they are uograded @@ -151,6 +173,11 @@ class Unhacs: for package in get_installed_packages(): print(package.verbose_str() if verbose else str(package)) + def list_tags(self, url: str, limit: int = 10): + print(f"Tags for {url}:") + for tag in get_repo_tags(url)[-1 * limit :]: + print(tag) + def remove_packages(self, package_names: list[str]): """Remove installed packages and uodate lock.""" packages_to_remove = [ @@ -176,6 +203,7 @@ def main(): args = parser.parse_args() unhacs = Unhacs(args.config, args.package_file) + Package.use_git = args.use_git if args.subcommand == "add": # If a file was provided, update all packages based on the lock file @@ -187,15 +215,26 @@ def main(): package.version, update=True, package_type=package.package_type, + ignore_versions=package.ignored_versions, ) elif args.url: unhacs.add_package( - args.url, args.version, args.update, package_type=args.type + args.url, + version=args.version, + update=args.update, + package_type=args.type, + ignore_versions=( + {version for version in args.ignore_versions.split(",")} + if args.ignore_versions + else None + ), ) else: raise ValueError("Either a file or a URL must be provided") elif args.subcommand == "list": unhacs.list_packages(args.verbose) + elif args.subcommand == "tags": + unhacs.list_tags(args.url, limit=args.limit) elif args.subcommand == "remove": unhacs.remove_packages(args.packages) elif args.subcommand == "upgrade": diff --git a/unhacs/packages.py b/unhacs/packages.py index 90670c4..8c05bbf 100644 --- a/unhacs/packages.py +++ b/unhacs/packages.py @@ -12,6 +12,9 @@ from zipfile import ZipFile import requests import yaml +from unhacs.git import get_ref_zip +from unhacs.git import get_repo_tags + DEFAULT_HASS_CONFIG_PATH: Path = Path(".") DEFAULT_PACKAGE_FILE = Path("unhacs.yaml") @@ -35,29 +38,28 @@ class PackageType(StrEnum): class Package: - url: str - owner: str - name: str - version: str - download_url: str - path: Path | None = None - package_type: PackageType = PackageType.INTEGRATION + use_git = False def __init__( self, url: str, version: str | None = None, package_type: PackageType = PackageType.INTEGRATION, + ignored_versions: set[str] | None = None, ): self.url = url self.package_type = package_type + self.ignored_versions = ignored_versions or set() parts = self.url.split("/") self.owner = parts[-2] self.name = parts[-1] + self.download_url: str | None = None + self.path: Path | None = None + if not version: - self.version, self.download_url = self.fetch_version_release(version) + self.version, self.download_url = self.fetch_version_release() else: self.version = version @@ -87,7 +89,12 @@ class Package: "package_type": str(self.package_type), } - def fetch_version_release(self, version: str | None = None) -> tuple[str, str]: + def add_ignored_version(self, version: str): + self.ignored_versions.add(version) + + def _fetch_version_release_releases( + self, version: str | None = None + ) -> tuple[str, str]: # Fetch the releases from the GitHub API response = requests.get( f"https://api.github.com/repos/{self.owner}/{self.name}/releases" @@ -135,6 +142,28 @@ class Package: return version, download_url + def _fetch_version_release_git(self, version: str | None = None) -> tuple[str, str]: + tags = get_repo_tags(self.url) + if not tags: + raise ValueError(f"No tags found for package {self.name}") + if version and version not in tags: + raise ValueError(f"Version {version} does not exist for this package") + + tags = [tag for tag in tags if tag not in self.ignored_versions] + if not version: + version = tags[-1] + + return version, get_ref_zip(self.url, version) + + def fetch_version_release(self, version: str | None = None) -> tuple[str, str]: + if self.use_git: + return self._fetch_version_release_git(version) + else: + return self._fetch_version_release_releases(version) + + def fetch_versions(self) -> list[str]: + return get_repo_tags(self.url) + def get_hacs_json(self, version: str | None = None) -> dict: version = version or self.version response = requests.get(