Use git rather than github

This commit is contained in:
IamTheFij 2024-06-10 16:59:42 -07:00
parent 269e565f4f
commit bbef574bf8
4 changed files with 179 additions and 13 deletions

View File

@ -52,6 +52,20 @@ For a more detailed output, add the `--verbose` flag:
unhacs list --verbose
```
### List tags
To list all tags for a package, use the `tags` command followed by the name of the package:
```bash
unhacs tags <package_url>
```
The number or returned tags is limited to 10 by default. To change this, add the `--limit` flag:
```bash
unhacs tags <package_url> --limit 20
```
### Remove a package
To remove a package, use the `remove` command followed by the name of the package:
@ -74,6 +88,14 @@ To upgrade specific packages, add their names after the `upgrade` command:
unhacs upgrade <package_name_1> <package_name_2> ...
```
## Use git tags
By default, identification of releases uses the GitHub API. If you want to use git tags instead, you can add the `--git-tags` flag to the base command:
```bash
unhacs --git-tags add <package_url>
```
## License
Unhacs is licensed under the MIT License. See the LICENSE file for more details.

71
unhacs/git.py Normal file
View File

@ -0,0 +1,71 @@
import re
import subprocess
from dataclasses import dataclass
@dataclass
class GitTag:
name: str
version: tuple[int, int, int]
suffix: str
@staticmethod
def parse(name: str):
if result := re.match(r"^[v]?([\d.]+)(.*)", name):
version_str = result.group(1)
suffix = result.group(2)
parts = version_str.split(".")
if len(parts) > 3:
raise ValueError(f"Invalid version tag: {name}")
try:
version = (
int(parts[0]),
int(parts[1]) if len(parts) > 1 else 0,
int(parts[2]) if len(parts) > 2 else 0,
)
except ValueError:
raise ValueError(f"Invalid version tag: {name}")
return GitTag(name, version, suffix)
def __str__(self):
return f"{self.name} {self.version}"
def __eq__(self, other):
return self.version == other.version and self.suffix == other.suffix
def __lt__(self, other):
return self.version < other.version or (
self.version == other.version and self.suffix < other.suffix
)
def get_repo_tags(repository_url: str) -> list[str]:
# Run the command
command = f"git -c 'versionsort.suffix=-' ls-remote --tags --sort='v:refname' {repository_url}"
result = subprocess.run(
command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
# Check for errors
if result.returncode != 0:
raise Exception(f"Error running command: {command}\n{result.stderr.decode()}")
# Parse the output
tags: list[GitTag] = []
for line in result.stdout.decode().split("\n"):
if line:
if search_result := re.search(r"refs/tags/(.*)", line):
tag = search_result.group(1)
if git_tag := GitTag.parse(tag):
tags.append(git_tag)
tags.sort()
return [tag.name for tag in tags]
def get_ref_zip(repository_url: str, tag_name: str) -> str:
return f"{repository_url}/archive/refs/tags/{tag_name}.zip"

View File

@ -2,6 +2,7 @@ from argparse import ArgumentParser
from collections.abc import Iterable
from pathlib import Path
from unhacs.git import get_repo_tags
from unhacs.packages import DEFAULT_HASS_CONFIG_PATH
from unhacs.packages import DEFAULT_PACKAGE_FILE
from unhacs.packages import Package
@ -29,6 +30,12 @@ def create_parser():
default=DEFAULT_PACKAGE_FILE,
help="The path to the package file.",
)
parser.add_argument(
"--git-tags",
"-g",
action="store_true",
help="Use git to search for version tags. This will avoid GitHub API limits.",
)
subparsers = parser.add_subparsers(dest="subcommand", required=True)
@ -36,6 +43,13 @@ def create_parser():
list_parser = subparsers.add_parser("list", description="List installed packages.")
list_parser.add_argument("--verbose", "-v", action="store_true")
# List git tags for a given package
list_tags_parser = subparsers.add_parser("tags", help="List tags for a package.")
list_tags_parser.add_argument("url", type=str, help="The URL of the package.")
list_tags_parser.add_argument(
"--limit", type=int, default=10, help="The number of tags to display."
)
# Add packages
add_parser = subparsers.add_parser("add", description="Add or install packages.")
@ -68,6 +82,12 @@ def create_parser():
action="store_true",
help="Update the package if it already exists.",
)
add_parser.add_argument(
"--ignore-versions",
"-i",
type=str,
help="The version of the package to ignore. Multiple can be split by a comma.",
)
# Remove packages
remove_parser = subparsers.add_parser(
@ -105,16 +125,23 @@ class Unhacs:
version: str | None = None,
update: bool = False,
package_type: PackageType = PackageType.INTEGRATION,
ignore_versions: set[str] | None = None,
):
"""Install and add a package to the lock or install a specific version."""
package = Package(url=package_url, version=version, package_type=package_type)
package = Package(
package_url,
version=version,
package_type=package_type,
ignored_versions=ignore_versions,
)
packages = self.read_lock_packages()
# Raise an error if the package is already in the list
if package in packages:
existing_package = next((p for p in packages if p.url == package.url), None)
if existing_package:
if update:
# Remove old version of the package
packages = [p for p in packages if p != package]
packages = [p for p in packages if p.url != package.url]
else:
raise ValueError("Package already exists in the list")
@ -164,6 +191,11 @@ class Unhacs:
for package in get_installed_packages():
print(package.verbose_str() if verbose else str(package))
def list_tags(self, url: str, limit: int = 10):
print(f"Tags for {url}:")
for tag in get_repo_tags(url)[-1 * limit :]:
print(tag)
def remove_packages(self, package_names: list[str]):
"""Remove installed packages and uodate lock."""
packages_to_remove = [
@ -189,6 +221,7 @@ def main():
args = parser.parse_args()
unhacs = Unhacs(args.config, args.package_file)
Package.git_tags = args.git_tags
if args.subcommand == "add":
# If a file was provided, update all packages based on the lock file
@ -200,15 +233,26 @@ def main():
package.version,
update=True,
package_type=package.package_type,
ignore_versions=package.ignored_versions,
)
elif args.url:
unhacs.add_package(
args.url, args.version, args.update, package_type=args.type
args.url,
version=args.version,
update=args.update,
package_type=args.type,
ignore_versions=(
{version for version in args.ignore_versions.split(",")}
if args.ignore_versions
else None
),
)
else:
raise ValueError("Either a file or a URL must be provided")
elif args.subcommand == "list":
unhacs.list_packages(args.verbose)
elif args.subcommand == "tags":
unhacs.list_tags(args.url, limit=args.limit)
elif args.subcommand == "remove":
unhacs.remove_packages(args.packages)
elif args.subcommand == "upgrade":

View File

@ -13,6 +13,9 @@ from zipfile import ZipFile
import requests
import yaml
from unhacs.git import get_ref_zip
from unhacs.git import get_repo_tags
DEFAULT_HASS_CONFIG_PATH: Path = Path(".")
DEFAULT_PACKAGE_FILE = Path("unhacs.yaml")
@ -36,29 +39,28 @@ class PackageType(StrEnum):
class Package:
url: str
owner: str
name: str
version: str
download_url: str
path: Path | None = None
package_type: PackageType = PackageType.INTEGRATION
git_tags = False
def __init__(
self,
url: str,
version: str | None = None,
package_type: PackageType = PackageType.INTEGRATION,
ignored_versions: set[str] | None = None,
):
self.url = url
self.package_type = package_type
self.ignored_versions = ignored_versions or set()
parts = self.url.split("/")
self.owner = parts[-2]
self.name = parts[-1]
self.download_url: str | None = None
self.path: Path | None = None
if not version:
self.version, self.download_url = self.fetch_version_release(version)
self.version, self.download_url = self.fetch_version_release()
else:
self.version = version
@ -88,7 +90,12 @@ class Package:
"package_type": str(self.package_type),
}
def fetch_version_release(self, version: str | None = None) -> tuple[str, str]:
def add_ignored_version(self, version: str):
self.ignored_versions.add(version)
def _fetch_version_release_releases(
self, version: str | None = None
) -> tuple[str, str]:
# Fetch the releases from the GitHub API
response = requests.get(
f"https://api.github.com/repos/{self.owner}/{self.name}/releases"
@ -128,6 +135,28 @@ class Package:
return version, download_url
def _fetch_version_release_git(self, version: str | None = None) -> tuple[str, str]:
tags = get_repo_tags(self.url)
if not tags:
raise ValueError(f"No tags found for package {self.name}")
if version and version not in tags:
raise ValueError(f"Version {version} does not exist for this package")
tags = [tag for tag in tags if tag not in self.ignored_versions]
if not version:
version = tags[-1]
return version, get_ref_zip(self.url, version)
def fetch_version_release(self, version: str | None = None) -> tuple[str, str]:
if self.git_tags:
return self._fetch_version_release_git(version)
else:
return self._fetch_version_release_releases(version)
def fetch_versions(self) -> list[str]:
return get_repo_tags(self.url)
def get_hacs_json(self, version: str | None = None) -> dict:
"""Fetches the hacs.json file for the package."""
version = version or self.version