|
|
|
@ -2,7 +2,9 @@
|
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
|
|
|
|
import argparse
|
|
|
|
|
import logging
|
|
|
|
|
import platform
|
|
|
|
|
import tempfile
|
|
|
|
|
from collections.abc import Sequence
|
|
|
|
|
from dataclasses import dataclass
|
|
|
|
|
from io import BytesIO
|
|
|
|
@ -20,7 +22,10 @@ from zipfile import ZipFile
|
|
|
|
|
|
|
|
|
|
import requests
|
|
|
|
|
|
|
|
|
|
__version__ = "2.5.0"
|
|
|
|
|
__version__ = "3.0.3"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.WARNING)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class UnsupportedContentTypeError(ValueError):
|
|
|
|
@ -31,6 +36,24 @@ class InvalidRemoteError(ValueError):
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def removeprefix(s: str, pre: str) -> str:
|
|
|
|
|
# Duplicate str.removeprefix for py<3.9
|
|
|
|
|
try:
|
|
|
|
|
return s.removeprefix(pre) # type: ignore
|
|
|
|
|
except AttributeError:
|
|
|
|
|
# Py < 3.9
|
|
|
|
|
return s[len(pre) :] if s and s.startswith(pre) else s
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def removesuffix(s: str, suf: str) -> str:
|
|
|
|
|
# Duplicate str.removesuffix for py<3.9
|
|
|
|
|
try:
|
|
|
|
|
return s.removesuffix(suf) # type: ignore
|
|
|
|
|
except AttributeError:
|
|
|
|
|
# Py < 3.9
|
|
|
|
|
return s[: -len(suf)] if s and s.endswith(suf) else s
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
SYSTEM_SYNONYMS: list[list[str]] = [
|
|
|
|
|
["Darwin", "darwin", "MacOS", "macos", "macOS"],
|
|
|
|
|
["Windows", "windows", "win", "win32", "win64"],
|
|
|
|
@ -103,13 +126,13 @@ class GitRemoteInfo:
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def parse_git_remote(git_url: str | None = None) -> GitRemoteInfo:
|
|
|
|
|
"""Extract Github repo info from a git remote url"""
|
|
|
|
|
if not git_url:
|
|
|
|
|
git_url = (
|
|
|
|
|
check_output(["git", "remote", "get-url", "origin"]).decode("UTF-8").strip()
|
|
|
|
|
)
|
|
|
|
|
def read_git_remote() -> str:
|
|
|
|
|
"""Reads the git remote url from the origin"""
|
|
|
|
|
return check_output(["git", "remote", "get-url", "origin"]).decode("UTF-8").strip()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def parse_git_url(git_url: str) -> GitRemoteInfo:
|
|
|
|
|
"""Extract Github repo info from a git remote url"""
|
|
|
|
|
# Normalize Github ssh url as a proper URL
|
|
|
|
|
if git_url.startswith("git@github.com:"):
|
|
|
|
|
git_ssh_parts = git_url.partition(":")
|
|
|
|
@ -129,7 +152,7 @@ def parse_git_remote(git_url: str | None = None) -> GitRemoteInfo:
|
|
|
|
|
f"{path[1:3]} Could not parse owner and repo from URL {git_url}"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
return GitRemoteInfo(u.hostname, path[1], path[2].removesuffix(".git"))
|
|
|
|
|
return GitRemoteInfo(u.hostname, path[1], removesuffix(path[2], ".git"))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def parse_cargo_version(p: Path) -> str:
|
|
|
|
@ -156,6 +179,7 @@ def read_git_tag(fetch: bool = True) -> str | None:
|
|
|
|
|
def read_version(from_tags: bool = False, fetch: bool = False) -> str | None:
|
|
|
|
|
"""Read version information from file or from git"""
|
|
|
|
|
if from_tags:
|
|
|
|
|
logging.debug("Reading version from git tag")
|
|
|
|
|
return read_git_tag(fetch)
|
|
|
|
|
|
|
|
|
|
matchers = {
|
|
|
|
@ -165,10 +189,13 @@ def read_version(from_tags: bool = False, fetch: bool = False) -> str | None:
|
|
|
|
|
for name, extractor in matchers.items():
|
|
|
|
|
p = Path(name)
|
|
|
|
|
if p.exists():
|
|
|
|
|
logging.debug(f"Reading version from {p}")
|
|
|
|
|
return extractor(p)
|
|
|
|
|
|
|
|
|
|
# TODO: Log this out to stderr
|
|
|
|
|
# raise ValueError(f"Unknown project type. Didn't find any of {matchers.keys()}")
|
|
|
|
|
logging.warning(
|
|
|
|
|
"Unknown local project version. Didn't find any of %s", set(matchers.keys())
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -191,6 +218,8 @@ def fetch_release(
|
|
|
|
|
|
|
|
|
|
# Return the latest if requested
|
|
|
|
|
if version is None or version == "latest":
|
|
|
|
|
logging.debug("Looking for latest release")
|
|
|
|
|
|
|
|
|
|
for release in result.json():
|
|
|
|
|
if release["prerelease"] and not pre_release:
|
|
|
|
|
continue
|
|
|
|
@ -200,6 +229,8 @@ def fetch_release(
|
|
|
|
|
# Return matching version
|
|
|
|
|
for release in result.json():
|
|
|
|
|
if release["tag_name"].endswith(version):
|
|
|
|
|
logging.debug(f"Found release {release['name']} matching version {version}")
|
|
|
|
|
|
|
|
|
|
return release
|
|
|
|
|
|
|
|
|
|
raise ValueError(
|
|
|
|
@ -250,13 +281,7 @@ def match_asset(
|
|
|
|
|
|
|
|
|
|
# This should never really happen
|
|
|
|
|
if version is None:
|
|
|
|
|
if "{version}" in format:
|
|
|
|
|
raise ValueError(
|
|
|
|
|
"No version provided or found in release name but is in format"
|
|
|
|
|
)
|
|
|
|
|
else:
|
|
|
|
|
# This should never happen, but since version isn't used anywhere, we can make it an empty string
|
|
|
|
|
version = ""
|
|
|
|
|
raise ValueError("No version provided or found in release name.")
|
|
|
|
|
|
|
|
|
|
system = platform.system()
|
|
|
|
|
if system_mapping:
|
|
|
|
@ -304,8 +329,12 @@ class PackageAdapter:
|
|
|
|
|
"application/zip",
|
|
|
|
|
"application/x-zip-compressed",
|
|
|
|
|
):
|
|
|
|
|
logging.debug("Opening zip file from response content")
|
|
|
|
|
|
|
|
|
|
self._package = ZipFile(BytesIO(response.content))
|
|
|
|
|
elif content_type == "application/x-tar":
|
|
|
|
|
logging.debug("Opening tar file from response content")
|
|
|
|
|
|
|
|
|
|
self._package = TarFile(fileobj=response.raw)
|
|
|
|
|
elif content_type in (
|
|
|
|
|
"application/gzip",
|
|
|
|
@ -313,6 +342,8 @@ class PackageAdapter:
|
|
|
|
|
"application/x-tar+xz",
|
|
|
|
|
"application/x-compressed-tar",
|
|
|
|
|
):
|
|
|
|
|
logging.debug("Opening compressed tar file from response content")
|
|
|
|
|
|
|
|
|
|
self._package = TarFile.open(fileobj=BytesIO(response.content), mode="r:*")
|
|
|
|
|
else:
|
|
|
|
|
raise UnsupportedContentTypeError(
|
|
|
|
@ -323,6 +354,7 @@ class PackageAdapter:
|
|
|
|
|
"""Get list of all file names in package"""
|
|
|
|
|
if isinstance(self._package, ZipFile):
|
|
|
|
|
return self._package.namelist()
|
|
|
|
|
|
|
|
|
|
if isinstance(self._package, TarFile):
|
|
|
|
|
return self._package.getnames()
|
|
|
|
|
|
|
|
|
@ -340,19 +372,26 @@ class PackageAdapter:
|
|
|
|
|
If the `file_names` list is empty, all files will be extracted"""
|
|
|
|
|
if path is None:
|
|
|
|
|
path = Path.cwd()
|
|
|
|
|
|
|
|
|
|
if not members:
|
|
|
|
|
logging.debug("Extracting all members to %s", path)
|
|
|
|
|
|
|
|
|
|
self._package.extractall(path=path)
|
|
|
|
|
|
|
|
|
|
return self.get_names()
|
|
|
|
|
|
|
|
|
|
# TODO: Use walrus operator when dropping 3.7 support
|
|
|
|
|
missing_members = set(members) - set(self.get_names())
|
|
|
|
|
if missing_members:
|
|
|
|
|
raise ValueError(f"Missing members: {missing_members}")
|
|
|
|
|
|
|
|
|
|
logging.debug("Extracting members %s to %s", members, path)
|
|
|
|
|
|
|
|
|
|
if isinstance(self._package, ZipFile):
|
|
|
|
|
self._package.extractall(path=path, members=members)
|
|
|
|
|
if isinstance(self._package, TarFile):
|
|
|
|
|
self._package.extractall(
|
|
|
|
|
path=path, members=(TarInfo(name) for name in members)
|
|
|
|
|
path=path, members=(self._package.getmember(name) for name in members)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
return members
|
|
|
|
@ -375,7 +414,7 @@ def get_asset_package(
|
|
|
|
|
continue
|
|
|
|
|
else:
|
|
|
|
|
raise UnsupportedContentTypeError(
|
|
|
|
|
"Cannot extract files from archive because we don't recognize the content type"
|
|
|
|
|
f"Cannot extract files from archive because we don't recognize the content types {possible_content_types}"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -402,8 +441,10 @@ def download_asset(
|
|
|
|
|
result = requests.get(asset["browser_download_url"])
|
|
|
|
|
|
|
|
|
|
if extract_files is not None:
|
|
|
|
|
logging.info("Extracting package %s", asset["name"])
|
|
|
|
|
package = get_asset_package(asset, result)
|
|
|
|
|
extract_files = package.extractall(path=destination, members=extract_files)
|
|
|
|
|
|
|
|
|
|
return [destination / name for name in extract_files]
|
|
|
|
|
|
|
|
|
|
file_name = destination / asset["name"]
|
|
|
|
@ -450,6 +491,7 @@ class MapAddAction(argparse.Action):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _parse_args(args: list[str] | None = None) -> argparse.Namespace:
|
|
|
|
|
logging.debug("Parsing arguments: %s", args)
|
|
|
|
|
parser = argparse.ArgumentParser()
|
|
|
|
|
parser.add_argument(
|
|
|
|
|
"format",
|
|
|
|
@ -463,7 +505,9 @@ def _parse_args(args: list[str] | None = None) -> argparse.Namespace:
|
|
|
|
|
default=Path.cwd(),
|
|
|
|
|
help="Destination directory. Defaults to current directory",
|
|
|
|
|
)
|
|
|
|
|
parser.add_argument("-v", action="store_true", help="verbose logging")
|
|
|
|
|
parser.add_argument(
|
|
|
|
|
"-v", action="count", help="verbose or debug logging", default=0
|
|
|
|
|
)
|
|
|
|
|
parser.add_argument(
|
|
|
|
|
"--hostname",
|
|
|
|
|
help="Git repository hostname",
|
|
|
|
@ -534,12 +578,29 @@ def _parse_args(args: list[str] | None = None) -> argparse.Namespace:
|
|
|
|
|
action="store_true",
|
|
|
|
|
help="Only print the URL and do not download",
|
|
|
|
|
)
|
|
|
|
|
parser.add_argument(
|
|
|
|
|
"--use-temp-dir",
|
|
|
|
|
action="store_true",
|
|
|
|
|
help="Use a temporary directory as the destination",
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
parsed_args = parser.parse_args(args)
|
|
|
|
|
|
|
|
|
|
# Merge in fields from args and git remote
|
|
|
|
|
if not all((parsed_args.owner, parsed_args.repo, parsed_args.hostname)):
|
|
|
|
|
remote_info = parse_git_remote(parsed_args.git_url)
|
|
|
|
|
# Check to see if a git url was provided. If not, we use local directory git remote
|
|
|
|
|
if parsed_args.git_url is None:
|
|
|
|
|
parsed_args.git_url = read_git_remote()
|
|
|
|
|
|
|
|
|
|
# If using a local repo, try to determine version from project files
|
|
|
|
|
if parsed_args.version is None:
|
|
|
|
|
parsed_args.version = read_version(
|
|
|
|
|
parsed_args.version_git_tag,
|
|
|
|
|
not parsed_args.version_git_no_fetch,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Get parts from git url
|
|
|
|
|
remote_info = parse_git_url(parsed_args.git_url)
|
|
|
|
|
|
|
|
|
|
def merge_field(a, b, field):
|
|
|
|
|
value = getattr(a, field)
|
|
|
|
@ -549,15 +610,12 @@ def _parse_args(args: list[str] | None = None) -> argparse.Namespace:
|
|
|
|
|
for field in ("owner", "repo", "hostname"):
|
|
|
|
|
merge_field(parsed_args, remote_info, field)
|
|
|
|
|
|
|
|
|
|
if parsed_args.version is None:
|
|
|
|
|
parsed_args.version = read_version(
|
|
|
|
|
parsed_args.version_git_tag,
|
|
|
|
|
not parsed_args.version_git_no_fetch,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if parsed_args.extract_all:
|
|
|
|
|
parsed_args.extract_files = []
|
|
|
|
|
|
|
|
|
|
if parsed_args.use_temp_dir:
|
|
|
|
|
parsed_args.destination = Path(tempfile.mkdtemp())
|
|
|
|
|
|
|
|
|
|
return parsed_args
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -590,9 +648,14 @@ def download_release(
|
|
|
|
|
arch_mapping=arch_mapping,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
format_fields = dict(
|
|
|
|
|
asset_name=asset["name"],
|
|
|
|
|
**matched_values._asdict(),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
formatted_files = (
|
|
|
|
|
[file.format(**matched_values._asdict()) for file in extract_files]
|
|
|
|
|
if extract_files
|
|
|
|
|
[file.format(**format_fields) for file in extract_files]
|
|
|
|
|
if extract_files is not None
|
|
|
|
|
else None
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
@ -603,7 +666,9 @@ def download_release(
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if exec:
|
|
|
|
|
check_call(exec.format(asset["name"]), shell=True, cwd=destination)
|
|
|
|
|
check_call(
|
|
|
|
|
exec.format(asset["name"], **format_fields), shell=True, cwd=destination
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
return files
|
|
|
|
|
|
|
|
|
@ -611,6 +676,8 @@ def download_release(
|
|
|
|
|
def main():
|
|
|
|
|
args = _parse_args()
|
|
|
|
|
|
|
|
|
|
logging.getLogger().setLevel(30 - 10 * args.v)
|
|
|
|
|
|
|
|
|
|
# Fetch the release
|
|
|
|
|
release = fetch_release(
|
|
|
|
|
GitRemoteInfo(args.hostname, args.owner, args.repo),
|
|
|
|
@ -618,8 +685,12 @@ def main():
|
|
|
|
|
pre_release=args.prerelease,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
logging.debug("Found release: %s", release["name"])
|
|
|
|
|
|
|
|
|
|
version = args.version or release["tag_name"]
|
|
|
|
|
|
|
|
|
|
logging.debug("Release version: %s", version)
|
|
|
|
|
|
|
|
|
|
# Find the asset to download using mapping rules
|
|
|
|
|
asset, matched_values = match_asset(
|
|
|
|
|
release,
|
|
|
|
@ -629,17 +700,21 @@ def main():
|
|
|
|
|
arch_mapping=args.map_arch,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if args.v:
|
|
|
|
|
print(f"Downloading {asset['name']} from release {release['name']}")
|
|
|
|
|
logging.info(f"Downloading {asset['name']} from release {release['name']}")
|
|
|
|
|
|
|
|
|
|
if args.url_only:
|
|
|
|
|
print(asset["browser_download_url"])
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
format_fields = dict(
|
|
|
|
|
asset_name=asset["name"],
|
|
|
|
|
**matched_values._asdict(),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Format files to extract with version info, as this is sometimes included
|
|
|
|
|
formatted_files = (
|
|
|
|
|
[file.format(**matched_values._asdict()) for file in args.extract_files]
|
|
|
|
|
if args.extract_files
|
|
|
|
|
[file.format(**format_fields) for file in args.extract_files]
|
|
|
|
|
if args.extract_files is not None
|
|
|
|
|
else None
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
@ -653,7 +728,11 @@ def main():
|
|
|
|
|
|
|
|
|
|
# Optionally execute post command
|
|
|
|
|
if args.exec:
|
|
|
|
|
check_call(args.exec.format(asset["name"]), shell=True)
|
|
|
|
|
check_call(
|
|
|
|
|
args.exec.format(asset["name"], **format_fields),
|
|
|
|
|
shell=True,
|
|
|
|
|
cwd=args.destination,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|