Compare commits

...

1 Commits
main ... git-ls

Author SHA1 Message Date
0fd0a99c52 Use git rather than github
All checks were successful
continuous-integration/drone/push Build is passing
2024-07-06 22:02:07 -07:00
4 changed files with 174 additions and 21 deletions

View File

@ -40,6 +40,20 @@ For a more detailed output, add the `--verbose` flag:
unhacs list --verbose unhacs list --verbose
``` ```
### List tags
To list all tags for a package, use the `tags` command followed by the name of the package:
```bash
unhacs tags <package_url>
```
The number or returned tags is limited to 10 by default. To change this, add the `--limit` flag:
```bash
unhacs tags <package_url> --limit 20
```
### Remove a package ### Remove a package
To remove a package, use the `remove` command followed by the name of the package: To remove a package, use the `remove` command followed by the name of the package:

71
unhacs/git.py Normal file
View File

@ -0,0 +1,71 @@
import re
import subprocess
from dataclasses import dataclass
@dataclass
class GitTag:
name: str
version: tuple[int, int, int]
suffix: str
@staticmethod
def parse(name: str):
if result := re.match(r"^[v]?([\d.]+)(.*)", name):
version_str = result.group(1)
suffix = result.group(2)
parts = version_str.split(".")
if len(parts) > 3:
raise ValueError(f"Invalid version tag: {name}")
try:
version = (
int(parts[0]),
int(parts[1]) if len(parts) > 1 else 0,
int(parts[2]) if len(parts) > 2 else 0,
)
except ValueError:
raise ValueError(f"Invalid version tag: {name}")
return GitTag(name, version, suffix)
def __str__(self):
return f"{self.name} {self.version}"
def __eq__(self, other):
return self.version == other.version and self.suffix == other.suffix
def __lt__(self, other):
return self.version < other.version or (
self.version == other.version and self.suffix < other.suffix
)
def get_repo_tags(repository_url: str) -> list[str]:
# Run the command
command = f"git -c 'versionsort.suffix=-' ls-remote --tags --sort='v:refname' {repository_url}"
result = subprocess.run(
command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
# Check for errors
if result.returncode != 0:
raise Exception(f"Error running command: {command}\n{result.stderr.decode()}")
# Parse the output
tags: list[GitTag] = []
for line in result.stdout.decode().split("\n"):
if line:
if search_result := re.search(r"refs/tags/(.*)", line):
tag = search_result.group(1)
if git_tag := GitTag.parse(tag):
tags.append(git_tag)
tags.sort()
return [tag.name for tag in tags]
def get_ref_zip(repository_url: str, tag_name: str) -> str:
return f"{repository_url}/archive/refs/tags/{tag_name}.zip"

View File

@ -2,6 +2,7 @@ from argparse import ArgumentParser
from collections.abc import Iterable from collections.abc import Iterable
from pathlib import Path from pathlib import Path
from unhacs.git import get_repo_tags
from unhacs.packages import DEFAULT_HASS_CONFIG_PATH from unhacs.packages import DEFAULT_HASS_CONFIG_PATH
from unhacs.packages import DEFAULT_PACKAGE_FILE from unhacs.packages import DEFAULT_PACKAGE_FILE
from unhacs.packages import Package from unhacs.packages import Package
@ -29,12 +30,24 @@ def create_parser():
default=DEFAULT_PACKAGE_FILE, default=DEFAULT_PACKAGE_FILE,
help="The path to the package file.", help="The path to the package file.",
) )
parser.add_argument(
"--use-git",
"-g",
action="store_true",
help="Use git to install packages. This will avoid GitHub API limits.",
)
subparsers = parser.add_subparsers(dest="subcommand", required=True) subparsers = parser.add_subparsers(dest="subcommand", required=True)
list_parser = subparsers.add_parser("list", description="List installed packages.") list_parser = subparsers.add_parser("list", description="List installed packages.")
list_parser.add_argument("--verbose", "-v", action="store_true") list_parser.add_argument("--verbose", "-v", action="store_true")
list_tags_parser = subparsers.add_parser("tags", help="List tags for a package.")
list_tags_parser.add_argument("url", type=str, help="The URL of the package.")
list_tags_parser.add_argument(
"--limit", type=int, default=10, help="The number of tags to display."
)
add_parser = subparsers.add_parser("add", description="Add or install packages.") add_parser = subparsers.add_parser("add", description="Add or install packages.")
add_parser.add_argument( add_parser.add_argument(
"--file", "-f", type=Path, help="The path to a package file." "--file", "-f", type=Path, help="The path to a package file."
@ -55,6 +68,12 @@ def create_parser():
action="store_true", action="store_true",
help="Update the package if it already exists.", help="Update the package if it already exists.",
) )
add_parser.add_argument(
"--ignore-versions",
"-i",
type=str,
help="The version of the package to ignore. Multiple can be split by a comma.",
)
remove_parser = subparsers.add_parser( remove_parser = subparsers.add_parser(
"remove", description="Remove installed packages." "remove", description="Remove installed packages."
@ -90,16 +109,23 @@ class Unhacs:
version: str | None = None, version: str | None = None,
update: bool = False, update: bool = False,
package_type: PackageType = PackageType.INTEGRATION, package_type: PackageType = PackageType.INTEGRATION,
ignore_versions: set[str] | None = None,
): ):
"""Install and add a package to the lock or install a specific version.""" """Install and add a package to the lock or install a specific version."""
package = Package(url=package_url, version=version, package_type=package_type) package = Package(
package_url,
version=version,
package_type=package_type,
ignored_versions=ignore_versions,
)
packages = self.read_lock_packages() packages = self.read_lock_packages()
# Raise an error if the package is already in the list # Raise an error if the package is already in the list
if package in packages: existing_package = next((p for p in packages if p.url == package.url), None)
if existing_package:
if update: if update:
# Remove old version of the package # Remove old version of the package
packages = [p for p in packages if p != package] packages = [p for p in packages if p.url != package.url]
else: else:
raise ValueError("Package already exists in the list") raise ValueError("Package already exists in the list")
@ -119,7 +145,7 @@ class Unhacs:
if p.name in package_names if p.name in package_names
] ]
upgrade_packages: list[Package] = [] outdated_packages: list[Package] = []
latest_packages = [p.get_latest() for p in installed_packages] latest_packages = [p.get_latest() for p in installed_packages]
for installed_package, latest_package in zip( for installed_package, latest_package in zip(
installed_packages, latest_packages installed_packages, latest_packages
@ -128,16 +154,12 @@ class Unhacs:
print( print(
f"upgrade {installed_package.name} from {installed_package.version} to {latest_package.version}" f"upgrade {installed_package.name} from {installed_package.version} to {latest_package.version}"
) )
upgrade_packages.append(latest_package) outdated_packages.append(latest_package)
if not upgrade_packages: if outdated_packages and input("Upgrade all packages? (y/N) ").lower() != "y":
print("Nothing to upgrade")
return return
if input("Upgrade all packages? (y/N) ").strip().lower() != "y": for installed_package in outdated_packages:
return
for installed_package in upgrade_packages:
installed_package.install(self.hass_config) installed_package.install(self.hass_config)
# Update lock file to latest now that we know they are uograded # Update lock file to latest now that we know they are uograded
@ -151,6 +173,11 @@ class Unhacs:
for package in get_installed_packages(): for package in get_installed_packages():
print(package.verbose_str() if verbose else str(package)) print(package.verbose_str() if verbose else str(package))
def list_tags(self, url: str, limit: int = 10):
print(f"Tags for {url}:")
for tag in get_repo_tags(url)[-1 * limit :]:
print(tag)
def remove_packages(self, package_names: list[str]): def remove_packages(self, package_names: list[str]):
"""Remove installed packages and uodate lock.""" """Remove installed packages and uodate lock."""
packages_to_remove = [ packages_to_remove = [
@ -176,6 +203,7 @@ def main():
args = parser.parse_args() args = parser.parse_args()
unhacs = Unhacs(args.config, args.package_file) unhacs = Unhacs(args.config, args.package_file)
Package.use_git = args.use_git
if args.subcommand == "add": if args.subcommand == "add":
# If a file was provided, update all packages based on the lock file # If a file was provided, update all packages based on the lock file
@ -187,15 +215,26 @@ def main():
package.version, package.version,
update=True, update=True,
package_type=package.package_type, package_type=package.package_type,
ignore_versions=package.ignored_versions,
) )
elif args.url: elif args.url:
unhacs.add_package( unhacs.add_package(
args.url, args.version, args.update, package_type=args.type args.url,
version=args.version,
update=args.update,
package_type=args.type,
ignore_versions=(
{version for version in args.ignore_versions.split(",")}
if args.ignore_versions
else None
),
) )
else: else:
raise ValueError("Either a file or a URL must be provided") raise ValueError("Either a file or a URL must be provided")
elif args.subcommand == "list": elif args.subcommand == "list":
unhacs.list_packages(args.verbose) unhacs.list_packages(args.verbose)
elif args.subcommand == "tags":
unhacs.list_tags(args.url, limit=args.limit)
elif args.subcommand == "remove": elif args.subcommand == "remove":
unhacs.remove_packages(args.packages) unhacs.remove_packages(args.packages)
elif args.subcommand == "upgrade": elif args.subcommand == "upgrade":

View File

@ -12,6 +12,9 @@ from zipfile import ZipFile
import requests import requests
import yaml import yaml
from unhacs.git import get_ref_zip
from unhacs.git import get_repo_tags
DEFAULT_HASS_CONFIG_PATH: Path = Path(".") DEFAULT_HASS_CONFIG_PATH: Path = Path(".")
DEFAULT_PACKAGE_FILE = Path("unhacs.yaml") DEFAULT_PACKAGE_FILE = Path("unhacs.yaml")
@ -35,29 +38,28 @@ class PackageType(StrEnum):
class Package: class Package:
url: str use_git = False
owner: str
name: str
version: str
download_url: str
path: Path | None = None
package_type: PackageType = PackageType.INTEGRATION
def __init__( def __init__(
self, self,
url: str, url: str,
version: str | None = None, version: str | None = None,
package_type: PackageType = PackageType.INTEGRATION, package_type: PackageType = PackageType.INTEGRATION,
ignored_versions: set[str] | None = None,
): ):
self.url = url self.url = url
self.package_type = package_type self.package_type = package_type
self.ignored_versions = ignored_versions or set()
parts = self.url.split("/") parts = self.url.split("/")
self.owner = parts[-2] self.owner = parts[-2]
self.name = parts[-1] self.name = parts[-1]
self.download_url: str | None = None
self.path: Path | None = None
if not version: if not version:
self.version, self.download_url = self.fetch_version_release(version) self.version, self.download_url = self.fetch_version_release()
else: else:
self.version = version self.version = version
@ -87,7 +89,12 @@ class Package:
"package_type": str(self.package_type), "package_type": str(self.package_type),
} }
def fetch_version_release(self, version: str | None = None) -> tuple[str, str]: def add_ignored_version(self, version: str):
self.ignored_versions.add(version)
def _fetch_version_release_releases(
self, version: str | None = None
) -> tuple[str, str]:
# Fetch the releases from the GitHub API # Fetch the releases from the GitHub API
response = requests.get( response = requests.get(
f"https://api.github.com/repos/{self.owner}/{self.name}/releases" f"https://api.github.com/repos/{self.owner}/{self.name}/releases"
@ -135,6 +142,28 @@ class Package:
return version, download_url return version, download_url
def _fetch_version_release_git(self, version: str | None = None) -> tuple[str, str]:
tags = get_repo_tags(self.url)
if not tags:
raise ValueError(f"No tags found for package {self.name}")
if version and version not in tags:
raise ValueError(f"Version {version} does not exist for this package")
tags = [tag for tag in tags if tag not in self.ignored_versions]
if not version:
version = tags[-1]
return version, get_ref_zip(self.url, version)
def fetch_version_release(self, version: str | None = None) -> tuple[str, str]:
if self.use_git:
return self._fetch_version_release_git(version)
else:
return self._fetch_version_release_releases(version)
def fetch_versions(self) -> list[str]:
return get_repo_tags(self.url)
def get_hacs_json(self, version: str | None = None) -> dict: def get_hacs_json(self, version: str | None = None) -> dict:
version = version or self.version version = version or self.version
response = requests.get( response = requests.get(