Compare commits

..

1 Commits
main ... git-ls

Author SHA1 Message Date
0fd0a99c52 Use git rather than github
All checks were successful
continuous-integration/drone/push Build is passing
2024-07-06 22:02:07 -07:00
17 changed files with 423 additions and 1204 deletions

3
.gitignore vendored
View File

@ -141,6 +141,3 @@ cython_debug/
tags
unhacs.txt
poetry.lock
custom_components/
themes/
unhacs.yaml

View File

@ -24,7 +24,7 @@ lint: devenv
# Runs tests
.PHONY: test
test: devenv
poetry run python -m unittest discover tests --pattern "*_test.py"
@echo TODO: poetry run pytest
# Builds wheel for package to upload
.PHONY: build
@ -37,27 +37,6 @@ verify-tag-version:
$(eval TAG_NAME = $(shell [ -n "$(DRONE_TAG)" ] && echo $(DRONE_TAG) || git describe --tags --exact-match))
test "v$(shell poetry version | awk '{print $$2}')" = "$(TAG_NAME)"
.PHONY: bump-patch
bump-patch:
$(eval NEW_VERSION = $(shell poetry version patch | awk '{print $$6}'))
git add pyproject.toml
git commit -m "Bump version to $(NEW_VERSION)"
git tag "v$(NEW_VERSION)"
.PHONY: bump-minor
bump-minor:
$(eval NEW_VERSION = $(shell poetry version minor | awk '{print $$6}'))
git add pyproject.toml
git commit -m "Bump version to $(NEW_VERSION)"
git tag "v$(NEW_VERSION)"
.PHONY: bump-major
bump-major:
$(eval NEW_VERSION = $(shell poetry version major | awk '{print $$6}'))
git add pyproject.toml
git commit -m "Bump version to $(NEW_VERSION)"
git tag "v$(NEW_VERSION)"
# Upload to pypi
.PHONY: upload
upload: verify-tag-version build

View File

@ -17,33 +17,13 @@ Unhacs provides several commands to manage your Home Assistant packages:
To add a package, use the `add` command followed by the URL of the package. Optionally, you can specify the package name and version:
```bash
unhacs add <package_url> --version <version>
unhacs add --url <package_url> --name <package_name> --version <version>
```
If the package already exists, you can update it by adding the `--update` flag:
```bash
unhacs add <package_url> --update
```
If the package is a Lovelace plugin, you must specify it using the `--plugin` flag:
```bash
unhacs add --plugin <package_url>
```
If you already have a list of packages in a file, you can add them all at once using the `--file` flag:
```bash
unhacs add --file <file_path>
```
### Add a component from a forked Home Assistant Core repository
To add a component from a fork of home-assistant/core, use the `--forked-component` flag followed by the URL of the forked repository and then specify the branch with the `--branch` flag:
```bash
unhacs add --forked-component <forked_repo_url> --branch <branch>
unhacs add --url <package_url> --update
```
### List packages
@ -96,14 +76,6 @@ To upgrade specific packages, add their names after the `upgrade` command:
unhacs upgrade <package_name_1> <package_name_2> ...
```
## Use git tags
By default, identification of releases uses the GitHub API. If you want to use git tags instead, you can add the `--git-tags` flag to the base command:
```bash
unhacs --git-tags add <package_url>
```
## License
Unhacs is licensed under the MIT License. See the LICENSE file for more details.

View File

@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "unhacs"
version = "0.7.1"
version = "0.4.0"
description = "Command line interface to install Home Assistant Community Store packages"
authors = ["Ian Fijolek <ian@iamthefij.com>"]
license = "MIT"
@ -12,16 +12,14 @@ readme = "README.md"
[tool.poetry.dependencies]
python = "^3.11"
requests = "^2.32.0"
pyyaml = "^6.0.0"
requests = "^2.32.3"
pyyaml = "^6.0.1"
[tool.poetry.group.dev.dependencies]
black = "^24.4.2"
isort = "^5.13.2"
mypy = "^1.10.0"
pre-commit = "^3.7.1"
types-requests = "^2.32.0"
types-pyyaml = "^6.0.0"
types-requests = "^2.32.0.20240602"
[tool.poetry.scripts]
unhacs = 'unhacs.main:main'

View File

View File

@ -1,260 +0,0 @@
import os
import shutil
import tempfile
import unittest
from pathlib import Path
from unhacs.main import main
from unhacs.packages import get_installed_packages
from unhacs.packages import read_lock_packages
INTEGRATION_URL = "https://github.com/simbaja/ha_gehome"
INTEGRATION_VERSION = "v0.6.9"
PLUGIN_URL = "https://github.com/kalkih/mini-media-player"
PLUGIN_VERSION = "v1.16.8"
THEME_URL = "https://github.com/basnijholt/lovelace-ios-themes"
THEME_VERSION = "v3.0.1"
FORK_URL = "https://github.com/ViViDboarder/home-assistant"
FORK_BRANCH = "dev"
FORK_COMPONENT = "nextbus"
FORK_VERSION = "3b2893f2f4e16f9a05d9cc4a7ba9f31984c841be"
class TestMainIntegrarion(unittest.TestCase):
test_dir: str
def setUp(self):
self.test_dir = tempfile.mkdtemp()
os.chdir(self.test_dir)
def tearDown(self):
shutil.rmtree(self.test_dir)
pass
def run_itest(
self,
test_name: str,
command: str,
expected_files: list[str] | None = None,
expect_missing_files: list[str] | None = None,
expected_code: int = 0,
):
with self.subTest(test_name, command=command):
self.assertEqual(main(command.split()), expected_code)
# Verify that the package was installed by checking the filesystem
if expected_files:
expected_files = [
os.path.join(self.test_dir, file) for file in expected_files
]
missing_files = [
file for file in expected_files if not os.path.exists(file)
]
if missing_files:
self.fail(f"Missing files: {missing_files}")
if expect_missing_files:
expect_missing_files = [
os.path.join(self.test_dir, file) for file in expect_missing_files
]
existing_files = [
file for file in expect_missing_files if os.path.exists(file)
]
if existing_files:
self.fail(f"Files should not exist: {existing_files}")
def test_integration(self):
self.run_itest(
"Add integration",
f"add {INTEGRATION_URL} --version {INTEGRATION_VERSION}",
expected_files=[
"custom_components/ge_home/__init__.py",
"custom_components/ge_home/manifest.json",
"custom_components/ge_home/switch.py",
],
)
self.run_itest(
"List installed packages",
"list",
)
installed = get_installed_packages()
self.assertEqual(len(installed), 1)
self.assertEqual(installed[0].url, INTEGRATION_URL)
self.assertEqual(installed[0].version, INTEGRATION_VERSION)
self.run_itest(
"Double add",
f"add {INTEGRATION_URL}",
expected_code=1,
)
self.run_itest(
"Upgrade to latest version",
"upgrade ha_gehome --yes",
expected_files=[
"custom_components/ge_home/__init__.py",
"custom_components/ge_home/manifest.json",
"custom_components/ge_home/switch.py",
],
)
installed = get_installed_packages()
self.assertEqual(len(installed), 1)
self.assertEqual(installed[0].url, INTEGRATION_URL)
self.assertNotEqual(installed[0].version, INTEGRATION_VERSION)
self.run_itest(
"Downgrade integration",
f"add {INTEGRATION_URL} --version {INTEGRATION_VERSION} --update",
expected_files=[
"custom_components/ge_home/__init__.py",
"custom_components/ge_home/manifest.json",
"custom_components/ge_home/switch.py",
],
)
self.run_itest(
"List installed packages",
"list",
)
installed = get_installed_packages()
self.assertEqual(len(installed), 1)
self.assertEqual(installed[0].url, INTEGRATION_URL)
self.assertEqual(installed[0].version, INTEGRATION_VERSION)
# Delete the custom_components folder and re-install the integration using the lock file
shutil.rmtree(os.path.join(self.test_dir, "custom_components"))
self.run_itest(
"Re-install integration using lock file",
"add --file unhacs.yaml",
expected_files=[
"custom_components/ge_home/__init__.py",
"custom_components/ge_home/manifest.json",
"custom_components/ge_home/switch.py",
],
)
# Delete the lock file and then regenerate it
os.remove(os.path.join(self.test_dir, "unhacs.yaml"))
self.run_itest(
"Regenerate lock file",
"list --freeze",
expected_files=[
"unhacs.yaml",
],
)
self.assertGreater(len(read_lock_packages()), 0)
self.run_itest(
"Remove integration",
"remove ha_gehome --yes",
expect_missing_files=[
"custom_components/ge_home/__init__.py",
"custom_components/ge_home/manifest.json",
"custom_components/ge_home/switch.py",
],
)
installed = get_installed_packages()
self.assertEqual(len(installed), 0)
def test_plugin(self):
self.run_itest(
"Add plugin",
f"add --plugin {PLUGIN_URL} --version {PLUGIN_VERSION}",
expected_files=[
"www/js/mini-media-player-bundle.js",
],
)
self.run_itest(
"List installed packages",
"list",
)
installed = get_installed_packages()
self.assertEqual(len(installed), 1)
self.assertEqual(installed[0].url, PLUGIN_URL)
self.assertEqual(installed[0].version, PLUGIN_VERSION)
self.run_itest(
"Remove plugin",
"remove mini-media-player --yes",
expect_missing_files=[
"www/js/mini-media-player-bundle.js",
],
)
installed = get_installed_packages()
self.assertEqual(len(installed), 0)
def test_theme(self):
self.run_itest(
"Add theme",
f"add --theme {THEME_URL} --version {THEME_VERSION}",
expected_files=[
"themes/ios-themes.yaml",
],
)
self.run_itest(
"List installed packages",
"list",
)
installed = get_installed_packages()
self.assertEqual(len(installed), 1)
self.assertEqual(installed[0].url, THEME_URL)
self.assertEqual(installed[0].version, THEME_VERSION)
self.run_itest(
"Remove theme",
"remove lovelace-ios-themes --yes",
expect_missing_files=[
"themes/ios-themes.yaml",
],
)
installed = get_installed_packages()
self.assertEqual(len(installed), 0)
def test_fork(self):
self.run_itest(
"Add fork",
f"add {FORK_URL} --fork-component {FORK_COMPONENT} --fork-branch {FORK_BRANCH} --version {FORK_VERSION}",
expected_files=[
"custom_components/nextbus/__init__.py",
"custom_components/nextbus/manifest.json",
"custom_components/nextbus/sensor.py",
"custom_components/nextbus/unhacs.yaml",
],
)
self.run_itest(
"List installed packages",
"list",
)
installed = get_installed_packages()
self.assertEqual(len(installed), 1)
self.assertEqual(installed[0].url, FORK_URL)
self.assertEqual(installed[0].version, FORK_VERSION)
self.run_itest(
"Remove fork",
f"remove {FORK_URL} --yes",
expect_missing_files=[
"custom_components/nextbus/__init__.py",
"custom_components/nextbus/manifest.json",
"custom_components/nextbus/sensor.py",
"custom_components/nextbus/unhacs.yaml",
],
)
installed = get_installed_packages()
self.assertEqual(len(installed), 0)
if __name__ == "__main__":
unittest.main()

View File

@ -1,4 +1,4 @@
from unhacs.main import main
if __name__ == "__main__":
exit(main())
main()

View File

@ -67,30 +67,5 @@ def get_repo_tags(repository_url: str) -> list[str]:
return [tag.name for tag in tags]
def get_latest_sha(repository_url: str, branch_name: str) -> str:
command = f"git ls-remote {repository_url} {branch_name}"
result = subprocess.run(
command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
# Check for errors
if result.returncode != 0:
raise Exception(f"Error running command: {command}\n{result.stderr.decode()}")
for line in result.stdout.decode().split("\n"):
if line:
return line.partition("\t")[0]
raise ValueError(f"branch name '{branch_name}' not found for {repository_url}")
def get_tag_zip(repository_url: str, tag_name: str) -> str:
def get_ref_zip(repository_url: str, tag_name: str) -> str:
return f"{repository_url}/archive/refs/tags/{tag_name}.zip"
def get_branch_zip(repository_url: str, branch_name: str) -> str:
return f"{repository_url}/archive/{branch_name}.zip"
def get_sha_zip(repository_url: str, sha: str) -> str:
return f"{repository_url}/archive/{sha}.zip"

View File

@ -1,30 +1,18 @@
import sys
from argparse import ArgumentParser
from collections.abc import Iterable
from pathlib import Path
from unhacs.git import get_repo_tags
from unhacs.packages import DEFAULT_HASS_CONFIG_PATH
from unhacs.packages import DEFAULT_PACKAGE_FILE
from unhacs.packages import Package
from unhacs.packages import PackageType
from unhacs.packages import get_installed_packages
from unhacs.packages import read_lock_packages
from unhacs.packages import write_lock_packages
from unhacs.packages.fork import Fork
from unhacs.packages.integration import Integration
from unhacs.packages.plugin import Plugin
from unhacs.packages.theme import Theme
from unhacs.utils import DEFAULT_HASS_CONFIG_PATH
from unhacs.utils import DEFAULT_PACKAGE_FILE
class InvalidArgumentsError(ValueError):
pass
class DuplicatePackageError(ValueError):
pass
def parse_args(argv: list[str]):
def create_parser():
parser = ArgumentParser(
description="Unhacs - Command line interface for the Home Assistant Community Store"
)
@ -43,78 +31,34 @@ def parse_args(argv: list[str]):
help="The path to the package file.",
)
parser.add_argument(
"--git-tags",
"--use-git",
"-g",
action="store_true",
help="Use git to search for version tags. This will avoid GitHub API limits.",
help="Use git to install packages. This will avoid GitHub API limits.",
)
subparsers = parser.add_subparsers(dest="subcommand", required=True)
# List installed packages
list_parser = subparsers.add_parser("list", description="List installed packages.")
list_parser.add_argument("--verbose", "-v", action="store_true")
list_parser.add_argument(
"--freeze",
"-f",
action="store_true",
help="Regenerate unhacs.yaml with installed packages.",
)
# List git tags for a given package
list_tags_parser = subparsers.add_parser("tags", help="List tags for a package.")
list_tags_parser.add_argument("url", type=str, help="The URL of the package.")
list_tags_parser.add_argument(
"--limit", type=int, default=10, help="The number of tags to display."
)
# Add packages
add_parser = subparsers.add_parser("add", description="Add or install packages.")
package_group = add_parser.add_mutually_exclusive_group(required=True)
package_group.add_argument(
add_parser.add_argument(
"--file", "-f", type=Path, help="The path to a package file."
)
package_group.add_argument(
"url", nargs="?", type=str, help="The URL of the package."
)
package_type_group = add_parser.add_mutually_exclusive_group()
package_type_group.add_argument(
"--integration",
action="store_const",
dest="type",
const=Integration,
default=Integration,
help="The package is an integration.",
)
package_type_group.add_argument(
"--plugin",
action="store_const",
dest="type",
const=Plugin,
help="The package is a JavaScript plugin.",
)
package_type_group.add_argument(
"--theme",
action="store_const",
dest="type",
const=Theme,
help="The package is a theme.",
)
package_type_group.add_argument(
"--fork-component",
type=str,
help="Name of component from forked core repo.",
)
# Additional arguments for forked packages
add_parser.add_argument(
"--fork-branch",
"-b",
type=str,
help="Name of branch of forked core repo. (Only for forked components.)",
"--type",
"-t",
type=PackageType,
help="The type of the package. Defaults to 'integration'.",
)
add_parser.add_argument("url", nargs="?", type=str, help="The URL of the package.")
add_parser.add_argument(
"--version", "-v", type=str, help="The version of the package."
)
@ -131,38 +75,17 @@ def parse_args(argv: list[str]):
help="The version of the package to ignore. Multiple can be split by a comma.",
)
# Remove packages
remove_parser = subparsers.add_parser(
"remove", description="Remove installed packages."
)
remove_parser.add_argument(
"--yes", "-y", action="store_true", help="Do not prompt for confirmation."
)
remove_parser.add_argument("packages", nargs="+")
# Upgrade packages
update_parser = subparsers.add_parser(
"upgrade", description="Upgrade installed packages."
)
update_parser.add_argument(
"--yes", "-y", action="store_true", help="Do not prompt for confirmation."
)
update_parser.add_argument("packages", nargs="*")
args = parser.parse_args(argv)
if args.subcommand == "add":
# Component implies forked package
if args.fork_component and args.type != Fork:
args.type = Fork
# Branch is only valid for forked packages
if args.type != Fork and args.fork_branch:
raise InvalidArgumentsError(
"Branch and component can only be used with forked packages"
)
return args
return parser
class Unhacs:
@ -182,29 +105,37 @@ class Unhacs:
def add_package(
self,
package: Package,
package_url: str,
version: str | None = None,
update: bool = False,
package_type: PackageType = PackageType.INTEGRATION,
ignore_versions: set[str] | None = None,
):
"""Install and add a package to the lock or install a specific version."""
package = Package(
package_url,
version=version,
package_type=package_type,
ignored_versions=ignore_versions,
)
packages = self.read_lock_packages()
# Raise an error if the package is already in the list
if existing_package := next((p for p in packages if p.same(package)), None):
existing_package = next((p for p in packages if p.url == package.url), None)
if existing_package:
if update:
# Remove old version of the package
packages = [p for p in packages if p == existing_package]
packages = [p for p in packages if p.url != package.url]
else:
raise DuplicatePackageError("Package already exists in the list")
raise ValueError("Package already exists in the list")
package.install(self.hass_config)
packages.append(package)
self.write_lock_packages(packages)
def upgrade_packages(self, package_names: list[str], yes: bool = False):
def upgrade_packages(self, package_names: list[str]):
"""Uograde to latest version of packages and update lock."""
installed_packages: Iterable[Package]
if not package_names:
installed_packages = get_installed_packages(self.hass_config)
else:
@ -225,60 +156,35 @@ class Unhacs:
)
outdated_packages.append(latest_package)
confirmed = yes or input("Upgrade all packages? (y/N) ").lower() == "y"
if outdated_packages and not confirmed:
if outdated_packages and input("Upgrade all packages? (y/N) ").lower() != "y":
return
for installed_package in outdated_packages:
installed_package.install(self.hass_config)
# Update lock file to latest now that we know they are uograded
latest_lookup = {p: p for p in latest_packages}
packages = [latest_lookup.get(p, p) for p in self.read_lock_packages()]
latest_lookup = {p.url: p for p in latest_packages}
packages = [latest_lookup.get(p.url, p) for p in self.read_lock_packages()]
self.write_lock_packages(packages)
def list_packages(self, verbose: bool = False, freeze: bool = False):
def list_packages(self, verbose: bool = False):
"""List installed packages and their versions."""
installed_packages = get_installed_packages()
for package in installed_packages:
for package in get_installed_packages():
print(package.verbose_str() if verbose else str(package))
if freeze:
self.write_lock_packages(installed_packages)
def list_tags(self, url: str, limit: int = 10):
print(f"Tags for {url}:")
for tag in get_repo_tags(url)[-1 * limit :]:
print(tag)
def remove_packages(self, package_names: list[str], yes: bool = False):
def remove_packages(self, package_names: list[str]):
"""Remove installed packages and uodate lock."""
packages_to_remove = [
package
for package in get_installed_packages()
if (
package.name in package_names
or package.url in package_names
or (
hasattr(package, "fork_component")
and getattr(package, "fork_component") in package_names
)
)
if package.name in package_names
]
if package_names and not packages_to_remove:
print("No packages found to remove")
return
print("Packages to remove:")
for package in packages_to_remove:
print(package)
confirmed = yes or input("Remove listed packages? (y/N) ").lower() == "y"
if packages_to_remove and not confirmed:
return
remaining_packages = [
package
for package in self.read_lock_packages()
@ -291,39 +197,13 @@ class Unhacs:
self.write_lock_packages(remaining_packages)
def args_to_package(args) -> Package:
ignore_versions = (
{version for version in args.ignore_versions.split(",")}
if args.ignore_versions
else None
)
if args.type == Fork:
if not args.fork_branch:
raise InvalidArgumentsError(
"A branch must be provided for forked components"
)
if not args.fork_component:
raise InvalidArgumentsError(
"A component must be provided for forked components"
)
return Fork(
args.url,
branch_name=args.fork_branch,
fork_component=args.fork_component,
version=args.version,
ignored_versions=ignore_versions,
)
return args.type(args.url, version=args.version, ignored_versions=ignore_versions)
def main(argv: list[str] | None = None) -> int:
args = parse_args(argv or sys.argv[1:])
def main():
# If the sub command is add package, it should pass the parsed arguments to the add_package function and return
parser = create_parser()
args = parser.parse_args()
unhacs = Unhacs(args.config, args.package_file)
Package.git_tags = args.git_tags
Package.use_git = args.use_git
if args.subcommand == "add":
# If a file was provided, update all packages based on the lock file
@ -331,40 +211,38 @@ def main(argv: list[str] | None = None) -> int:
packages = read_lock_packages(args.file)
for package in packages:
unhacs.add_package(
package,
package.url,
package.version,
update=True,
package_type=package.package_type,
ignore_versions=package.ignored_versions,
)
elif args.url:
try:
new_package = args_to_package(args)
except InvalidArgumentsError as e:
print(e)
return 1
try:
unhacs.add_package(
new_package,
update=args.update,
)
except DuplicatePackageError as e:
print(e)
return 1
unhacs.add_package(
args.url,
version=args.version,
update=args.update,
package_type=args.type,
ignore_versions=(
{version for version in args.ignore_versions.split(",")}
if args.ignore_versions
else None
),
)
else:
print("Either a file or a URL must be provided")
return 1
raise ValueError("Either a file or a URL must be provided")
elif args.subcommand == "list":
unhacs.list_packages(args.verbose, args.freeze)
unhacs.list_packages(args.verbose)
elif args.subcommand == "tags":
unhacs.list_tags(args.url, limit=args.limit)
elif args.subcommand == "remove":
unhacs.remove_packages(args.packages, yes=args.yes)
unhacs.remove_packages(args.packages)
elif args.subcommand == "upgrade":
unhacs.upgrade_packages(args.packages, yes=args.yes)
unhacs.upgrade_packages(args.packages)
else:
print(f"Command {args.subcommand} is not implemented")
return 1
return 0
exit(1)
if __name__ == "__main__":
exit(main())
main()

353
unhacs/packages.py Normal file
View File

@ -0,0 +1,353 @@
import json
import shutil
import tempfile
from collections.abc import Iterable
from enum import StrEnum
from enum import auto
from io import BytesIO
from pathlib import Path
from typing import cast
from zipfile import ZipFile
import requests
import yaml
from unhacs.git import get_ref_zip
from unhacs.git import get_repo_tags
DEFAULT_HASS_CONFIG_PATH: Path = Path(".")
DEFAULT_PACKAGE_FILE = Path("unhacs.yaml")
def extract_zip(zip_file: ZipFile, dest_dir: Path):
for info in zip_file.infolist():
if info.is_dir():
continue
file = Path(info.filename)
# Strip top directory from path
file = Path(*file.parts[1:])
path = dest_dir / file
path.parent.mkdir(parents=True, exist_ok=True)
with zip_file.open(info) as source, open(path, "wb") as dest:
dest.write(source.read())
class PackageType(StrEnum):
INTEGRATION = auto()
PLUGIN = auto()
class Package:
use_git = False
def __init__(
self,
url: str,
version: str | None = None,
package_type: PackageType = PackageType.INTEGRATION,
ignored_versions: set[str] | None = None,
):
self.url = url
self.package_type = package_type
self.ignored_versions = ignored_versions or set()
parts = self.url.split("/")
self.owner = parts[-2]
self.name = parts[-1]
self.download_url: str | None = None
self.path: Path | None = None
if not version:
self.version, self.download_url = self.fetch_version_release()
else:
self.version = version
def __str__(self):
return f"{self.name} {self.version}"
def __eq__(self, other):
return self.url == other.url and self.version == other.version
def verbose_str(self):
return f"{self.name} {self.version} ({self.url})"
@staticmethod
def from_yaml(yaml: dict) -> "Package":
# Convert package_type to enum
package_type = yaml.pop("package_type")
if package_type and isinstance(package_type, str):
package_type = PackageType(package_type)
yaml["package_type"] = package_type
return Package(**yaml)
def to_yaml(self: "Package") -> dict:
return {
"url": self.url,
"version": self.version,
"package_type": str(self.package_type),
}
def add_ignored_version(self, version: str):
self.ignored_versions.add(version)
def _fetch_version_release_releases(
self, version: str | None = None
) -> tuple[str, str]:
# Fetch the releases from the GitHub API
response = requests.get(
f"https://api.github.com/repos/{self.owner}/{self.name}/releases"
)
response.raise_for_status()
releases = response.json()
if not releases:
raise ValueError(f"No releases found for package {self.name}")
# Default to latest
desired_release = releases[0]
# If a version is provided, check if it exists in the releases
if version:
for release in releases:
if release["tag_name"] == version:
desired_release = release
break
else:
raise ValueError(f"Version {version} does not exist for this package")
version = cast(str, desired_release["tag_name"])
hacs_json = self.get_hacs_json(version)
# Based on type, if we have no hacs json, we can provide some possible paths for the download but won't know
# If a plugin:
# First, check in root/dist/ for a js file named the same as the repo or with "lovelace-" prefix removed
# Second will be looking for a realeases for a js file named the same name as the repo or with loveace- prefix removed
# Third will be looking in the root dir for a js file named the same as the repo or with loveace- prefix removed
# If an integration:
# We always use the zipball_url
download_url = None
if filename := hacs_json.get("filename"):
for asset in desired_release["assets"]:
if asset["name"] == filename:
download_url = cast(str, asset["browser_download_url"])
break
else:
download_url = cast(str, desired_release["zipball_url"])
if not download_url:
raise ValueError("No filename found in hacs.json")
return version, download_url
def _fetch_version_release_git(self, version: str | None = None) -> tuple[str, str]:
tags = get_repo_tags(self.url)
if not tags:
raise ValueError(f"No tags found for package {self.name}")
if version and version not in tags:
raise ValueError(f"Version {version} does not exist for this package")
tags = [tag for tag in tags if tag not in self.ignored_versions]
if not version:
version = tags[-1]
return version, get_ref_zip(self.url, version)
def fetch_version_release(self, version: str | None = None) -> tuple[str, str]:
if self.use_git:
return self._fetch_version_release_git(version)
else:
return self._fetch_version_release_releases(version)
def fetch_versions(self) -> list[str]:
return get_repo_tags(self.url)
def get_hacs_json(self, version: str | None = None) -> dict:
version = version or self.version
response = requests.get(
f"https://raw.githubusercontent.com/{self.owner}/{self.name}/{version}/hacs.json"
)
if response.status_code == 404:
return {}
response.raise_for_status()
return response.json()
def install_plugin(self, hass_config_path: Path):
# First, check in root/dist/ for a js file named the same as the repo or with "lovelace-" prefix removed
# Second will be looking for a realeases for a js file named the same name as the repo or with loveace- prefix removed
# Third will be looking in the root dir for a js file named the same as the repo or with loveace- prefix removed
# If none of these are found, raise an error
# If a file is found, write it to www/js/<filename>.js and write a file www/js/<filename>-unhacs.txt with the
# serialized package
valid_filenames: Iterable[str]
if filename := self.get_hacs_json().get("filename"):
valid_filenames = (cast(str, filename),)
else:
valid_filenames = (
f"{self.name.removeprefix('lovelace-')}.js",
f"{self.name}.js",
f"{self.name}-umd.js",
f"{self.name}-bundle.js",
)
def real_get(filename) -> requests.Response:
plugin = requests.get(
f"https://raw.githubusercontent.com/{self.owner}/{self.version}/dist/{filename}"
)
if plugin.status_code == 404:
plugin = requests.get(
f"https://github.com/{self.owner}/{self.name}/releases/download/{self.version}/{filename}"
)
if plugin.status_code == 404:
plugin = requests.get(
f"https://raw.githubusercontent.com/{self.owner}/{self.version}/{filename}"
)
plugin.raise_for_status()
return plugin
for filename in valid_filenames:
try:
plugin = real_get(filename)
break
except requests.HTTPError:
pass
else:
raise ValueError(f"No valid filename found for package {self.name}")
js_path = hass_config_path / "www" / "js"
js_path.mkdir(parents=True, exist_ok=True)
js_path.joinpath(filename).write_text(plugin.text)
yaml.dump(self.to_yaml(), js_path.joinpath(f"{filename}-unhacs.yaml").open("w"))
def install_integration(self, hass_config_path: Path):
zipball_url = f"https://codeload.github.com/{self.owner}/{self.name}/zip/refs/tags/{self.version}"
response = requests.get(zipball_url)
response.raise_for_status()
with tempfile.TemporaryDirectory(prefix="unhacs-") as tempdir:
tmpdir = Path(tempdir)
extract_zip(ZipFile(BytesIO(response.content)), tmpdir)
source, dest = None, None
for custom_component in tmpdir.glob("custom_components/*"):
source = custom_component
dest = hass_config_path / "custom_components" / custom_component.name
break
else:
hacs_json = json.loads((tmpdir / "hacs.json").read_text())
if hacs_json.get("content_in_root"):
source = tmpdir
dest = hass_config_path / "custom_components" / self.name
if not source or not dest:
raise ValueError("No custom_components directory found")
dest.parent.mkdir(parents=True, exist_ok=True)
shutil.rmtree(dest, ignore_errors=True)
shutil.move(source, dest)
yaml.dump(self.to_yaml(), dest.joinpath("unhacs.yaml").open("w"))
def install(self, hass_config_path: Path):
if self.package_type == PackageType.PLUGIN:
self.install_plugin(hass_config_path)
elif self.package_type == PackageType.INTEGRATION:
self.install_integration(hass_config_path)
else:
raise NotImplementedError(f"Unknown package type {self.package_type}")
def uninstall(self, hass_config_path: Path) -> bool:
if self.path:
if self.path.is_dir():
shutil.rmtree(self.path)
else:
self.path.unlink()
self.path.with_name(f"{self.path.name}-unhacs.yaml").unlink()
return True
installed_package = self.installed_package(hass_config_path)
if installed_package:
installed_package.uninstall(hass_config_path)
return True
return False
def installed_package(self, hass_config_path: Path) -> "Package|None":
for custom_component in (hass_config_path / "custom_components").glob("*"):
unhacs = custom_component / "unhacs.yaml"
if unhacs.exists():
installed_package = Package.from_yaml(yaml.safe_load(unhacs.open()))
installed_package.path = custom_component
if (
installed_package.name == self.name
and installed_package.url == self.url
):
return installed_package
for js_unhacs in (hass_config_path / "www" / "js").glob("*-unhacs.yaml"):
installed_package = Package.from_yaml(yaml.safe_load(js_unhacs.open()))
installed_package.path = js_unhacs.with_name(
js_unhacs.name.removesuffix("-unhacs.yaml")
)
if (
installed_package.name == self.name
and installed_package.url == self.url
):
return installed_package
return None
def is_update(self, hass_config_path: Path) -> bool:
installed_package = self.installed_package(hass_config_path)
return installed_package is None or installed_package.version != self.version
def get_latest(self) -> "Package":
package = self.to_yaml()
package.pop("version")
return Package(**package)
def get_installed_packages(
hass_config_path: Path = DEFAULT_HASS_CONFIG_PATH,
) -> list[Package]:
packages = []
# Integration packages
for custom_component in (hass_config_path / "custom_components").glob("*"):
unhacs = custom_component / "unhacs.yaml"
if unhacs.exists():
package = Package.from_yaml(yaml.safe_load(unhacs.open()))
package.path = custom_component
packages.append(package)
# Plugin packages
for js_unhacs in (hass_config_path / "www" / "js").glob("*-unhacs.yaml"):
package = Package.from_yaml(yaml.safe_load(js_unhacs.open()))
package.path = js_unhacs.with_name(js_unhacs.name.removesuffix("-unhacs.yaml"))
packages.append(package)
return packages
# Read a list of Packages from a text file in the plain text format "URL version name"
def read_lock_packages(package_file: Path = DEFAULT_PACKAGE_FILE) -> list[Package]:
if package_file.exists():
return [
Package.from_yaml(p)
for p in yaml.safe_load(package_file.open())["packages"]
]
return []
# Write a list of Packages to a text file in the format URL version name
def write_lock_packages(
packages: Iterable[Package], package_file: Path = DEFAULT_PACKAGE_FILE
):
yaml.dump({"packages": [p.to_yaml() for p in packages]}, package_file.open("w"))

View File

@ -1,82 +0,0 @@
from collections.abc import Iterable
from pathlib import Path
from typing import cast
import yaml
from unhacs.packages.common import Package
from unhacs.packages.common import PackageType
from unhacs.packages.fork import Fork
from unhacs.packages.integration import Integration
from unhacs.packages.plugin import Plugin
from unhacs.packages.theme import Theme
from unhacs.utils import DEFAULT_HASS_CONFIG_PATH
from unhacs.utils import DEFAULT_PACKAGE_FILE
def from_yaml(data: dict | Path | str) -> Package:
if isinstance(data, Path):
data = yaml.safe_load(data.open())
elif isinstance(data, str):
data = yaml.safe_load(data)
data = cast(dict, data)
# Convert package_type to enum
package_type = data.pop("package_type", None)
if package_type and isinstance(package_type, str):
package_type = PackageType(package_type)
url = data.pop("url")
return {
PackageType.INTEGRATION: Integration,
PackageType.PLUGIN: Plugin,
PackageType.THEME: Theme,
PackageType.FORK: Fork,
}[package_type](url, **data)
def get_installed_packages(
hass_config_path: Path = DEFAULT_HASS_CONFIG_PATH,
package_types: Iterable[PackageType] = (
PackageType.INTEGRATION,
PackageType.FORK,
PackageType.PLUGIN,
PackageType.THEME,
),
) -> list[Package]:
# Integration packages
packages: list[Package] = []
if PackageType.INTEGRATION in package_types:
packages.extend(Integration.find_installed(hass_config_path))
if PackageType.FORK in package_types:
packages.extend(Fork.find_installed(hass_config_path))
# Plugin packages
if PackageType.PLUGIN in package_types:
packages.extend(Plugin.find_installed(hass_config_path))
# Theme packages
if PackageType.THEME in package_types:
packages.extend(Theme.find_installed(hass_config_path))
return packages
# Read a list of Packages from a text file in the plain text format "URL version name"
def read_lock_packages(package_file: Path = DEFAULT_PACKAGE_FILE) -> list[Package]:
if package_file.exists():
with package_file.open() as f:
return [from_yaml(p) for p in yaml.safe_load(f)["packages"]]
return []
# Write a list of Packages to a text file in the format URL version name
def write_lock_packages(
packages: Iterable[Package], package_file: Path = DEFAULT_PACKAGE_FILE
):
with open(package_file, "w") as f:
yaml.dump({"packages": [p.to_yaml() for p in packages]}, f)

View File

@ -1,225 +0,0 @@
import shutil
from enum import StrEnum
from enum import auto
from pathlib import Path
from typing import Any
from typing import cast
import requests
import yaml
from unhacs.git import get_repo_tags
class PackageType(StrEnum):
INTEGRATION = auto()
PLUGIN = auto()
FORK = auto()
THEME = auto()
class Package:
git_tags = False
package_type: PackageType
other_fields: list[str] = []
def __init__(
self,
url: str,
version: str | None = None,
ignored_versions: set[str] | None = None,
):
self.url = url
self.ignored_versions = ignored_versions or set()
parts = self.url.split("/")
self.owner = parts[-2]
self.name = parts[-1]
self.path: Path | None = None
if not version:
self.version = self.fetch_version_release()
else:
self.version = version
def __str__(self):
return f"{self.package_type}: {self.name} {self.version}"
def __eq__(self, other):
return all(
(
self.same(other),
self.version == other.version,
)
)
def same(self, other):
fields = list(["url"] + self.other_fields)
return all((getattr(self, field) == getattr(other, field) for field in fields))
def __hash__(self):
fields = list(["url"] + self.other_fields)
return hash(tuple(getattr(self, field) for field in fields))
def verbose_str(self):
return f"{str(self)} ({self.url})"
@classmethod
def from_yaml(cls, data: dict | Path | str) -> "Package":
if isinstance(data, Path):
with data.open() as f:
data = yaml.safe_load(f)
elif isinstance(data, str):
data = yaml.safe_load(data)
data = cast(dict, data)
if (package_type := data.pop("package_type")) != cls.package_type:
raise ValueError(
f"Invalid package_type ({package_type}) for this class {cls.package_type}"
)
return cls(data.pop("url"), **data)
def to_yaml(self, dest: Path | None = None) -> dict:
data: dict[str, Any] = {
"url": self.url,
"version": self.version,
"package_type": str(self.package_type),
}
if self.ignored_versions:
data["ignored_versions"] = self.ignored_versions
for field in self.other_fields:
if hasattr(self, field):
data[field] = getattr(self, field)
if dest:
with dest.open("w") as f:
yaml.dump(self.to_yaml(), f)
return data
def add_ignored_version(self, version: str):
self.ignored_versions.add(version)
def _fetch_version_release_releases(self, version: str | None = None) -> str:
# Fetch the releases from the GitHub API
response = requests.get(
f"https://api.github.com/repos/{self.owner}/{self.name}/releases"
)
response.raise_for_status()
releases = response.json()
if not releases:
raise ValueError(f"No releases found for package {self.name}")
# Default to latest
desired_release = releases[0]
# If a version is provided, check if it exists in the releases
if version:
for release in releases:
if release["tag_name"] == version:
desired_release = release
break
else:
raise ValueError(f"Version {version} does not exist for this package")
return cast(str, desired_release["tag_name"])
def _fetch_version_release_git(self, version: str | None = None) -> str:
tags = get_repo_tags(self.url)
if not tags:
raise ValueError(f"No tags found for package {self.name}")
if version and version not in tags:
raise ValueError(f"Version {version} does not exist for this package")
tags = [tag for tag in tags if tag not in self.ignored_versions]
if not version:
version = tags[-1]
return version
def fetch_version_release(self, version: str | None = None) -> str:
if self.git_tags:
return self._fetch_version_release_git(version)
else:
return self._fetch_version_release_releases(version)
def _fetch_versions(self) -> list[str]:
return get_repo_tags(self.url)
def get_hacs_json(self, version: str | None = None) -> dict:
"""Fetches the hacs.json file for the package."""
version = version or self.version
response = requests.get(
f"https://raw.githubusercontent.com/{self.owner}/{self.name}/{version}/hacs.json"
)
if response.status_code == 404:
return {}
response.raise_for_status()
return response.json()
def install(self, hass_config_path: Path):
raise NotImplementedError()
@property
def unhacs_path(self) -> Path | None:
if self.path is None:
return None
return self.path / "unhacs.yaml"
def uninstall(self, hass_config_path: Path) -> bool:
"""Uninstalls the package if it is installed, returning True if it was uninstalled."""
if not self.path:
if installed_package := self.installed_package(hass_config_path):
installed_package.uninstall(hass_config_path)
return True
return False
if self.path.is_dir():
shutil.rmtree(self.path)
else:
self.path.unlink()
if self.unhacs_path and self.unhacs_path.exists():
self.unhacs_path.unlink()
return True
@classmethod
def get_install_dir(cls, hass_config_path: Path) -> Path:
raise NotImplementedError()
@classmethod
def find_installed(cls, hass_config_path: Path) -> list["Package"]:
raise NotImplementedError()
def installed_package(self, hass_config_path: Path) -> "Package|None":
"""Returns the installed package if it exists, otherwise None."""
for package in self.find_installed(hass_config_path):
if self.same(package):
return package
return None
def is_update(self, hass_config_path: Path) -> bool:
"""Returns True if the package is not installed or the installed version is different from the latest."""
installed_package = self.installed_package(hass_config_path)
return installed_package is None or installed_package.version != self.version
def get_latest(self) -> "Package":
"""Returns a new Package representing the latest version of this package."""
package = self.to_yaml()
package.pop("version")
package.pop("package_type")
return self.__class__(package.pop("url"), **package)

View File

@ -1,106 +0,0 @@
import json
import shutil
import tempfile
from io import BytesIO
from pathlib import Path
from zipfile import ZipFile
import requests
import yaml
from unhacs.git import get_branch_zip
from unhacs.git import get_latest_sha
from unhacs.git import get_sha_zip
from unhacs.packages import PackageType
from unhacs.packages.common import Package
from unhacs.packages.integration import Integration
from unhacs.utils import extract_zip
class Fork(Integration):
other_fields = ["fork_component", "branch_name"]
package_type = PackageType.FORK
def __init__(
self,
url: str,
fork_component: str,
branch_name: str,
version: str | None = None,
ignored_versions: set[str] | None = None,
):
self.fork_component = fork_component
self.branch_name = branch_name
super().__init__(
url,
version=version,
ignored_versions=ignored_versions,
)
def __str__(self):
return f"{self.package_type}: {self.fork_component} ({self.owner}/{self.name}@{self.branch_name}) {self.version}"
def fetch_version_release(self, version: str | None = None) -> str:
if version:
return version
return get_latest_sha(self.url, self.branch_name)
@classmethod
def find_installed(cls, hass_config_path: Path) -> list[Package]:
packages: list[Package] = []
for custom_component in cls.get_install_dir(hass_config_path).glob("*"):
unhacs = custom_component / "unhacs.yaml"
if unhacs.exists():
data = yaml.safe_load(unhacs.read_text())
if data["package_type"] != "fork":
continue
package = cls.from_yaml(data)
package.path = custom_component
packages.append(package)
return packages
def install(self, hass_config_path: Path) -> None:
"""Installs the integration from hass fork."""
if self.version:
zipball_url = get_sha_zip(self.url, self.version)
else:
zipball_url = get_branch_zip(self.url, self.branch_name)
response = requests.get(zipball_url)
response.raise_for_status()
with tempfile.TemporaryDirectory(prefix="unhacs-") as tempdir:
tmpdir = Path(tempdir)
extract_zip(ZipFile(BytesIO(response.content)), tmpdir)
source, dest = None, None
source = tmpdir / "homeassistant" / "components" / self.fork_component
if not source.exists() or not source.is_dir():
raise ValueError(
f"Could not find {self.fork_component} in {self.url}@{self.version}"
)
# Add version to manifest
manifest_file = source / "manifest.json"
manifest: dict[str, str]
with manifest_file.open("r") as f:
manifest = json.load(f)
manifest["version"] = "0.0.0"
with manifest_file.open("w") as f:
json.dump(manifest, f)
dest = self.get_install_dir(hass_config_path) / source.name
if not source or not dest:
raise ValueError("No custom_components directory found")
dest.parent.mkdir(parents=True, exist_ok=True)
shutil.rmtree(dest, ignore_errors=True)
shutil.move(source, dest)
self.path = dest
self.to_yaml(self.unhacs_path)

View File

@ -1,81 +0,0 @@
import json
import shutil
import tempfile
from io import BytesIO
from pathlib import Path
from zipfile import ZipFile
import requests
import yaml
from unhacs.git import get_tag_zip
from unhacs.packages import Package
from unhacs.packages import PackageType
from unhacs.utils import extract_zip
class Integration(Package):
package_type = PackageType.INTEGRATION
def __init__(
self,
url: str,
version: str | None = None,
ignored_versions: set[str] | None = None,
):
super().__init__(
url,
version=version,
ignored_versions=ignored_versions,
)
@classmethod
def get_install_dir(cls, hass_config_path: Path) -> Path:
return hass_config_path / "custom_components"
@classmethod
def find_installed(cls, hass_config_path: Path) -> list[Package]:
packages: list[Package] = []
for custom_component in cls.get_install_dir(hass_config_path).glob("*"):
unhacs = custom_component / "unhacs.yaml"
if unhacs.exists():
data = yaml.safe_load(unhacs.read_text())
if data["package_type"] == "fork":
continue
package = cls.from_yaml(data)
package.path = custom_component
packages.append(package)
return packages
def install(self, hass_config_path: Path) -> None:
"""Installs the integration package."""
zipball_url = get_tag_zip(self.url, self.version)
response = requests.get(zipball_url)
response.raise_for_status()
with tempfile.TemporaryDirectory(prefix="unhacs-") as tempdir:
tmpdir = Path(tempdir)
extract_zip(ZipFile(BytesIO(response.content)), tmpdir)
source, dest = None, None
for custom_component in tmpdir.glob("custom_components/*"):
source = custom_component
dest = self.get_install_dir(hass_config_path) / custom_component.name
break
else:
hacs_json = json.loads((tmpdir / "hacs.json").read_text())
if hacs_json.get("content_in_root"):
source = tmpdir
dest = self.get_install_dir(hass_config_path) / self.name
if not source or not dest:
raise ValueError("No custom_components directory found")
dest.parent.mkdir(parents=True, exist_ok=True)
shutil.rmtree(dest, ignore_errors=True)
shutil.move(source, dest)
self.path = dest
self.to_yaml(self.unhacs_path)

View File

@ -1,95 +0,0 @@
from pathlib import Path
from typing import cast
import requests
from unhacs.packages import Package
from unhacs.packages import PackageType
class Plugin(Package):
package_type = PackageType.PLUGIN
def __init__(
self,
url: str,
version: str | None = None,
ignored_versions: set[str] | None = None,
):
super().__init__(
url,
version=version,
ignored_versions=ignored_versions,
)
@classmethod
def get_install_dir(cls, hass_config_path: Path) -> Path:
return hass_config_path / "www" / "js"
@property
def unhacs_path(self) -> Path | None:
if self.path is None:
return None
return self.path.with_name(f"{self.path.name}-unhacs.yaml")
@classmethod
def find_installed(cls, hass_config_path: Path) -> list["Package"]:
packages: list[Package] = []
for js_unhacs in cls.get_install_dir(hass_config_path).glob("*-unhacs.yaml"):
package = cls.from_yaml(js_unhacs)
package.path = js_unhacs.with_name(
js_unhacs.name.removesuffix("-unhacs.yaml")
)
packages.append(package)
return packages
def install(self, hass_config_path: Path) -> None:
"""Installs the plugin package."""
valid_filenames: list[str]
if filename := self.get_hacs_json().get("filename"):
valid_filenames = [cast(str, filename)]
else:
valid_filenames = [
f"{self.name.removeprefix('lovelace-')}.js",
f"{self.name}.js",
f"{self.name}-umd.js",
f"{self.name}-bundle.js",
]
def real_get(filename) -> requests.Response | None:
urls = [
f"https://raw.githubusercontent.com/{self.owner}/{self.name}/{self.version}/dist/{filename}",
f"https://github.com/{self.owner}/{self.name}/releases/download/{self.version}/{filename}",
f"https://raw.githubusercontent.com/{self.owner}/{self.name}/{self.version}/{filename}",
]
for url in urls:
plugin = requests.get(url)
if int(plugin.status_code / 100) == 4:
continue
plugin.raise_for_status()
return plugin
return None
for filename in valid_filenames:
plugin = real_get(filename)
if plugin:
break
else:
raise ValueError(f"No valid filename found for package {self.name}")
js_path = self.get_install_dir(hass_config_path)
js_path.mkdir(parents=True, exist_ok=True)
self.path = js_path.joinpath(filename)
self.path.write_text(plugin.text)
self.to_yaml(self.unhacs_path)

View File

@ -1,63 +0,0 @@
from pathlib import Path
from typing import cast
import requests
from unhacs.packages import Package
from unhacs.packages import PackageType
class Theme(Package):
package_type = PackageType.THEME
def __init__(
self,
url: str,
version: str | None = None,
ignored_versions: set[str] | None = None,
):
super().__init__(
url,
version=version,
ignored_versions=ignored_versions,
)
@classmethod
def get_install_dir(cls, hass_config_path: Path) -> Path:
return hass_config_path / "themes"
@property
def unhacs_path(self) -> Path | None:
if self.path is None:
return None
return self.path.with_name(f"{self.path.name}.unhacs")
@classmethod
def find_installed(cls, hass_config_path: Path) -> list["Package"]:
packages: list[Package] = []
for js_unhacs in cls.get_install_dir(hass_config_path).glob("*.unhacs"):
package = cls.from_yaml(js_unhacs)
package.path = js_unhacs.with_name(js_unhacs.name.removesuffix(".unhacs"))
packages.append(package)
return packages
def install(self, hass_config_path: Path) -> None:
"""Install theme yaml."""
filename = self.get_hacs_json().get("filename")
if not filename:
raise ValueError(f"No filename found for theme {self.name}")
filename = cast(str, filename)
url = f"https://raw.githubusercontent.com/{self.owner}/{self.name}/{self.version}/themes/{filename}"
theme = requests.get(url)
theme.raise_for_status()
themes_path = self.get_install_dir(hass_config_path)
themes_path.mkdir(parents=True, exist_ok=True)
self.path = themes_path.joinpath(filename)
self.path.write_text(theme.text)
self.to_yaml(self.unhacs_path)

View File

@ -1,21 +0,0 @@
from pathlib import Path
from zipfile import ZipFile
DEFAULT_HASS_CONFIG_PATH: Path = Path(".")
DEFAULT_PACKAGE_FILE = Path("unhacs.yaml")
def extract_zip(zip_file: ZipFile, dest_dir: Path) -> Path:
"""Extract a zip file to a directory."""
for info in zip_file.infolist():
if info.is_dir():
continue
file = Path(info.filename)
# Strip top directory from path
file = Path(*file.parts[1:])
path = dest_dir / file
path.parent.mkdir(parents=True, exist_ok=True)
with zip_file.open(info) as source, open(path, "wb") as dest:
dest.write(source.read())
return dest_dir