Use git rather than github
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
IamTheFij 2024-06-10 16:59:42 -07:00
parent e45dc59d1f
commit 0fd0a99c52
4 changed files with 174 additions and 21 deletions

View File

@ -40,6 +40,20 @@ For a more detailed output, add the `--verbose` flag:
unhacs list --verbose
```
### List tags
To list all tags for a package, use the `tags` command followed by the name of the package:
```bash
unhacs tags <package_url>
```
The number or returned tags is limited to 10 by default. To change this, add the `--limit` flag:
```bash
unhacs tags <package_url> --limit 20
```
### Remove a package
To remove a package, use the `remove` command followed by the name of the package:

71
unhacs/git.py Normal file
View File

@ -0,0 +1,71 @@
import re
import subprocess
from dataclasses import dataclass
@dataclass
class GitTag:
name: str
version: tuple[int, int, int]
suffix: str
@staticmethod
def parse(name: str):
if result := re.match(r"^[v]?([\d.]+)(.*)", name):
version_str = result.group(1)
suffix = result.group(2)
parts = version_str.split(".")
if len(parts) > 3:
raise ValueError(f"Invalid version tag: {name}")
try:
version = (
int(parts[0]),
int(parts[1]) if len(parts) > 1 else 0,
int(parts[2]) if len(parts) > 2 else 0,
)
except ValueError:
raise ValueError(f"Invalid version tag: {name}")
return GitTag(name, version, suffix)
def __str__(self):
return f"{self.name} {self.version}"
def __eq__(self, other):
return self.version == other.version and self.suffix == other.suffix
def __lt__(self, other):
return self.version < other.version or (
self.version == other.version and self.suffix < other.suffix
)
def get_repo_tags(repository_url: str) -> list[str]:
# Run the command
command = f"git -c 'versionsort.suffix=-' ls-remote --tags --sort='v:refname' {repository_url}"
result = subprocess.run(
command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
# Check for errors
if result.returncode != 0:
raise Exception(f"Error running command: {command}\n{result.stderr.decode()}")
# Parse the output
tags: list[GitTag] = []
for line in result.stdout.decode().split("\n"):
if line:
if search_result := re.search(r"refs/tags/(.*)", line):
tag = search_result.group(1)
if git_tag := GitTag.parse(tag):
tags.append(git_tag)
tags.sort()
return [tag.name for tag in tags]
def get_ref_zip(repository_url: str, tag_name: str) -> str:
return f"{repository_url}/archive/refs/tags/{tag_name}.zip"

View File

@ -2,6 +2,7 @@ from argparse import ArgumentParser
from collections.abc import Iterable
from pathlib import Path
from unhacs.git import get_repo_tags
from unhacs.packages import DEFAULT_HASS_CONFIG_PATH
from unhacs.packages import DEFAULT_PACKAGE_FILE
from unhacs.packages import Package
@ -29,12 +30,24 @@ def create_parser():
default=DEFAULT_PACKAGE_FILE,
help="The path to the package file.",
)
parser.add_argument(
"--use-git",
"-g",
action="store_true",
help="Use git to install packages. This will avoid GitHub API limits.",
)
subparsers = parser.add_subparsers(dest="subcommand", required=True)
list_parser = subparsers.add_parser("list", description="List installed packages.")
list_parser.add_argument("--verbose", "-v", action="store_true")
list_tags_parser = subparsers.add_parser("tags", help="List tags for a package.")
list_tags_parser.add_argument("url", type=str, help="The URL of the package.")
list_tags_parser.add_argument(
"--limit", type=int, default=10, help="The number of tags to display."
)
add_parser = subparsers.add_parser("add", description="Add or install packages.")
add_parser.add_argument(
"--file", "-f", type=Path, help="The path to a package file."
@ -55,6 +68,12 @@ def create_parser():
action="store_true",
help="Update the package if it already exists.",
)
add_parser.add_argument(
"--ignore-versions",
"-i",
type=str,
help="The version of the package to ignore. Multiple can be split by a comma.",
)
remove_parser = subparsers.add_parser(
"remove", description="Remove installed packages."
@ -90,16 +109,23 @@ class Unhacs:
version: str | None = None,
update: bool = False,
package_type: PackageType = PackageType.INTEGRATION,
ignore_versions: set[str] | None = None,
):
"""Install and add a package to the lock or install a specific version."""
package = Package(url=package_url, version=version, package_type=package_type)
package = Package(
package_url,
version=version,
package_type=package_type,
ignored_versions=ignore_versions,
)
packages = self.read_lock_packages()
# Raise an error if the package is already in the list
if package in packages:
existing_package = next((p for p in packages if p.url == package.url), None)
if existing_package:
if update:
# Remove old version of the package
packages = [p for p in packages if p != package]
packages = [p for p in packages if p.url != package.url]
else:
raise ValueError("Package already exists in the list")
@ -119,7 +145,7 @@ class Unhacs:
if p.name in package_names
]
upgrade_packages: list[Package] = []
outdated_packages: list[Package] = []
latest_packages = [p.get_latest() for p in installed_packages]
for installed_package, latest_package in zip(
installed_packages, latest_packages
@ -128,16 +154,12 @@ class Unhacs:
print(
f"upgrade {installed_package.name} from {installed_package.version} to {latest_package.version}"
)
upgrade_packages.append(latest_package)
outdated_packages.append(latest_package)
if not upgrade_packages:
print("Nothing to upgrade")
if outdated_packages and input("Upgrade all packages? (y/N) ").lower() != "y":
return
if input("Upgrade all packages? (y/N) ").strip().lower() != "y":
return
for installed_package in upgrade_packages:
for installed_package in outdated_packages:
installed_package.install(self.hass_config)
# Update lock file to latest now that we know they are uograded
@ -151,6 +173,11 @@ class Unhacs:
for package in get_installed_packages():
print(package.verbose_str() if verbose else str(package))
def list_tags(self, url: str, limit: int = 10):
print(f"Tags for {url}:")
for tag in get_repo_tags(url)[-1 * limit :]:
print(tag)
def remove_packages(self, package_names: list[str]):
"""Remove installed packages and uodate lock."""
packages_to_remove = [
@ -176,6 +203,7 @@ def main():
args = parser.parse_args()
unhacs = Unhacs(args.config, args.package_file)
Package.use_git = args.use_git
if args.subcommand == "add":
# If a file was provided, update all packages based on the lock file
@ -187,15 +215,26 @@ def main():
package.version,
update=True,
package_type=package.package_type,
ignore_versions=package.ignored_versions,
)
elif args.url:
unhacs.add_package(
args.url, args.version, args.update, package_type=args.type
args.url,
version=args.version,
update=args.update,
package_type=args.type,
ignore_versions=(
{version for version in args.ignore_versions.split(",")}
if args.ignore_versions
else None
),
)
else:
raise ValueError("Either a file or a URL must be provided")
elif args.subcommand == "list":
unhacs.list_packages(args.verbose)
elif args.subcommand == "tags":
unhacs.list_tags(args.url, limit=args.limit)
elif args.subcommand == "remove":
unhacs.remove_packages(args.packages)
elif args.subcommand == "upgrade":

View File

@ -12,6 +12,9 @@ from zipfile import ZipFile
import requests
import yaml
from unhacs.git import get_ref_zip
from unhacs.git import get_repo_tags
DEFAULT_HASS_CONFIG_PATH: Path = Path(".")
DEFAULT_PACKAGE_FILE = Path("unhacs.yaml")
@ -35,29 +38,28 @@ class PackageType(StrEnum):
class Package:
url: str
owner: str
name: str
version: str
download_url: str
path: Path | None = None
package_type: PackageType = PackageType.INTEGRATION
use_git = False
def __init__(
self,
url: str,
version: str | None = None,
package_type: PackageType = PackageType.INTEGRATION,
ignored_versions: set[str] | None = None,
):
self.url = url
self.package_type = package_type
self.ignored_versions = ignored_versions or set()
parts = self.url.split("/")
self.owner = parts[-2]
self.name = parts[-1]
self.download_url: str | None = None
self.path: Path | None = None
if not version:
self.version, self.download_url = self.fetch_version_release(version)
self.version, self.download_url = self.fetch_version_release()
else:
self.version = version
@ -87,7 +89,12 @@ class Package:
"package_type": str(self.package_type),
}
def fetch_version_release(self, version: str | None = None) -> tuple[str, str]:
def add_ignored_version(self, version: str):
self.ignored_versions.add(version)
def _fetch_version_release_releases(
self, version: str | None = None
) -> tuple[str, str]:
# Fetch the releases from the GitHub API
response = requests.get(
f"https://api.github.com/repos/{self.owner}/{self.name}/releases"
@ -135,6 +142,28 @@ class Package:
return version, download_url
def _fetch_version_release_git(self, version: str | None = None) -> tuple[str, str]:
tags = get_repo_tags(self.url)
if not tags:
raise ValueError(f"No tags found for package {self.name}")
if version and version not in tags:
raise ValueError(f"Version {version} does not exist for this package")
tags = [tag for tag in tags if tag not in self.ignored_versions]
if not version:
version = tags[-1]
return version, get_ref_zip(self.url, version)
def fetch_version_release(self, version: str | None = None) -> tuple[str, str]:
if self.use_git:
return self._fetch_version_release_git(version)
else:
return self._fetch_version_release_releases(version)
def fetch_versions(self) -> list[str]:
return get_repo_tags(self.url)
def get_hacs_json(self, version: str | None = None) -> dict:
version = version or self.version
response = requests.get(