237 lines
7.1 KiB
Python
Executable File
237 lines
7.1 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
This Python script is intended to delete files from a library that have replacements in a another library.
|
|
|
|
It should accept two arguments:
|
|
1. The old library path from where the files will be deleted.
|
|
2. The new library path where the replacements are located.
|
|
|
|
This script will recurse through subdirectories to locate all files and determine if they have replacements. It does
|
|
this by checking a checksum (sha1 or phash) for each file
|
|
"""
|
|
|
|
import hashlib
|
|
import logging
|
|
import mimetypes
|
|
import os
|
|
from argparse import ArgumentParser, Namespace
|
|
from collections.abc import Generator
|
|
from pathlib import Path
|
|
|
|
try:
|
|
import imagehash
|
|
from PIL import Image, UnidentifiedImageError
|
|
except ImportError:
|
|
Image = None
|
|
imagehash = None
|
|
UnidentifiedImageError = Exception
|
|
logging.warning(
|
|
"PIL and imagehash libraries are required for image hash comparison"
|
|
)
|
|
pass
|
|
|
|
try:
|
|
from pillow_heif import register_heif_opener
|
|
|
|
register_heif_opener()
|
|
except ImportError:
|
|
logging.warning("pillow-heif library is required for HEIF image support")
|
|
pass
|
|
|
|
try:
|
|
import cv2
|
|
except ImportError:
|
|
cv2 = None
|
|
logging.warning("opencv-python library is required for video hash comparison")
|
|
pass
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
def image_hash(file_path: Path) -> str | None:
|
|
if imagehash is None or Image is None:
|
|
raise ImportError(
|
|
"PIL and imagehash libraries are required for image hash comparison"
|
|
)
|
|
|
|
try:
|
|
hash = imagehash.phash(Image.open(file_path))
|
|
return "image-" + str(hash)
|
|
except UnidentifiedImageError:
|
|
if file_path.suffix.lower() in [".heic", ".heif"]:
|
|
log.warning(f"Unidentified image: {file_path}. Maybe install pillow-heif?")
|
|
else:
|
|
log.warning(f"Unidentified image: {file_path}.")
|
|
except Exception as e:
|
|
log.error(f"Error calculating image hash for {file_path}: {e}")
|
|
|
|
return None
|
|
|
|
|
|
def video_hash(file_path: Path) -> str | None:
|
|
"""Extract first frame of a video and calculate the image hash"""
|
|
if imagehash is None or Image is None:
|
|
raise ImportError(
|
|
"PIL and imagehash libraries are required for image hash comparison"
|
|
)
|
|
|
|
if not cv2:
|
|
raise ImportError("opencv-python library is required for video hash comparison")
|
|
|
|
try:
|
|
cap = cv2.VideoCapture(str(file_path))
|
|
ret, frame = cap.read()
|
|
if not ret:
|
|
log.warning(f"Error reading video frame: {file_path}")
|
|
return None
|
|
|
|
hash = imagehash.phash(Image.fromarray(frame))
|
|
return "video-" + str(hash)
|
|
except Exception as e:
|
|
log.error(f"Error calculating video hash for {file_path}: {e}")
|
|
|
|
return None
|
|
|
|
|
|
def calc_hash(file_path: Path, use_image_hash=False):
|
|
"""
|
|
Calculate the hash of a file
|
|
"""
|
|
if use_image_hash:
|
|
mimetype = mimetypes.guess_type(file_path)
|
|
if not mimetype[0] or mimetype[0].startswith("image"):
|
|
if hash := image_hash(file_path):
|
|
return hash
|
|
elif mimetype[0].startswith("video"):
|
|
if hash := video_hash(file_path):
|
|
return hash
|
|
|
|
sha1 = hashlib.sha1()
|
|
with open(file_path, "rb") as f:
|
|
while True:
|
|
data = f.read(65536)
|
|
if not data:
|
|
break
|
|
sha1.update(data)
|
|
|
|
return sha1.hexdigest()
|
|
|
|
|
|
def recurse_files(directory: Path) -> Generator[Path, None, None]:
|
|
"""
|
|
Generator to yield all files in a directory tree that have an extension and are not hidden.
|
|
"""
|
|
for file in directory.rglob("*.*"):
|
|
if file.name.startswith("."):
|
|
continue
|
|
if file.is_file():
|
|
yield file
|
|
|
|
|
|
def parse_args() -> Namespace:
|
|
"""Parse and return command line arguments"""
|
|
parser = ArgumentParser(
|
|
description="Delete files from a library that have replacements in another library"
|
|
)
|
|
parser.add_argument(
|
|
"old_library_path", type=Path, help="The path to the old library"
|
|
)
|
|
parser.add_argument(
|
|
"new_library_path", type=Path, help="The path to the new library"
|
|
)
|
|
parser.add_argument(
|
|
"--dry-run",
|
|
action="store_true",
|
|
help="Perform a dry run without deleting any files",
|
|
)
|
|
parser.add_argument(
|
|
"--use-image-hash",
|
|
action="store_true",
|
|
help="Use image hash to compare images",
|
|
)
|
|
parser.add_argument(
|
|
"--verbose",
|
|
action="store_true",
|
|
help="Increase verbosity",
|
|
)
|
|
|
|
return parser.parse_args()
|
|
|
|
|
|
def calc_hashes(directory: Path, use_image_hash: bool = False) -> dict[str, Path]:
|
|
"""Calculates the hash for all files in a directory and returns a dictionary of hashes to file paths"""
|
|
new_library_hashes: dict[str, Path] = {}
|
|
for file in recurse_files(directory):
|
|
new_hash = calc_hash(file, use_image_hash)
|
|
log.debug(f"{new_hash} {file}")
|
|
if new_hash in new_library_hashes:
|
|
log.warning("Hash collision: %s %s", file, new_library_hashes[new_hash])
|
|
new_library_hashes[new_hash] = file
|
|
|
|
return new_library_hashes
|
|
|
|
|
|
def delete_files(
|
|
old_library_path: Path,
|
|
new_library_hashes: dict[str, Path],
|
|
dry_run: bool = False,
|
|
use_image_hash: bool = False,
|
|
) -> tuple[int, int]:
|
|
"""
|
|
Delete files from the old library that have replacements in the new library.
|
|
|
|
Returns a tuple of the number of deleted files and the number of kept files.
|
|
"""
|
|
deleted_files: set[str] = set()
|
|
kept_files = 0
|
|
for file in recurse_files(old_library_path):
|
|
oldhash = calc_hash(file, use_image_hash)
|
|
log.debug(f"{oldhash} {file}")
|
|
if oldhash in new_library_hashes:
|
|
log.debug(f"Deleting {file}")
|
|
if not dry_run:
|
|
os.remove(file)
|
|
deleted_files.add(oldhash)
|
|
else:
|
|
log.debug(f"Keeping {file}")
|
|
kept_files += 1
|
|
|
|
for new_sha, new_file in new_library_hashes.items():
|
|
if new_sha not in deleted_files:
|
|
log.warning(f"Replacement file not found in old library: {new_file}")
|
|
|
|
return len(deleted_files), kept_files
|
|
|
|
|
|
def main() -> int:
|
|
args = parse_args()
|
|
if args.verbose:
|
|
log.setLevel(logging.DEBUG)
|
|
|
|
if not args.old_library_path.exists():
|
|
log.warning("Old library path does not exist: %s", args.old_library_path)
|
|
return 1
|
|
|
|
if not args.new_library_path.exists():
|
|
log.warning("New library path does not exist: %s", args.new_library_path)
|
|
return 1
|
|
|
|
log.info("Calculating hashes for new library")
|
|
new_library_hashes = calc_hashes(args.new_library_path, args.use_image_hash)
|
|
log.info(f"New library hashes calculated: {len(new_library_hashes)}")
|
|
|
|
log.info("Deleting files from old library")
|
|
deleted_files, kept_files = delete_files(
|
|
args.old_library_path, new_library_hashes, args.dry_run, args.use_image_hash
|
|
)
|
|
log.info(
|
|
f"Deleted files: {deleted_files} Kept files: {kept_files}: Unknown files: {len(new_library_hashes) - deleted_files}"
|
|
)
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|