photoprism-fix/delete-replacements.py

237 lines
7.1 KiB
Python
Raw Permalink Normal View History

2024-11-05 22:17:32 +00:00
#!/usr/bin/env python3
"""
This Python script is intended to delete files from a library that have replacements in a another library.
It should accept two arguments:
1. The old library path from where the files will be deleted.
2. The new library path where the replacements are located.
This script will recurse through subdirectories to locate all files and determine if they have replacements. It does
this by checking a checksum (sha1 or phash) for each file
"""
import hashlib
import logging
import mimetypes
import os
from argparse import ArgumentParser, Namespace
from collections.abc import Generator
from pathlib import Path
try:
import imagehash
from PIL import Image, UnidentifiedImageError
except ImportError:
Image = None
imagehash = None
UnidentifiedImageError = Exception
logging.warning(
"PIL and imagehash libraries are required for image hash comparison"
)
pass
try:
from pillow_heif import register_heif_opener
register_heif_opener()
except ImportError:
logging.warning("pillow-heif library is required for HEIF image support")
pass
try:
import cv2
except ImportError:
cv2 = None
logging.warning("opencv-python library is required for video hash comparison")
pass
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
log = logging.getLogger(__name__)
def image_hash(file_path: Path) -> str | None:
if imagehash is None or Image is None:
raise ImportError(
"PIL and imagehash libraries are required for image hash comparison"
)
try:
hash = imagehash.phash(Image.open(file_path))
return "image-" + str(hash)
except UnidentifiedImageError:
if file_path.suffix.lower() in [".heic", ".heif"]:
log.warning(f"Unidentified image: {file_path}. Maybe install pillow-heif?")
else:
log.warning(f"Unidentified image: {file_path}.")
except Exception as e:
log.error(f"Error calculating image hash for {file_path}: {e}")
return None
def video_hash(file_path: Path) -> str | None:
"""Extract first frame of a video and calculate the image hash"""
if imagehash is None or Image is None:
raise ImportError(
"PIL and imagehash libraries are required for image hash comparison"
)
if not cv2:
raise ImportError("opencv-python library is required for video hash comparison")
try:
cap = cv2.VideoCapture(str(file_path))
ret, frame = cap.read()
if not ret:
log.warning(f"Error reading video frame: {file_path}")
return None
hash = imagehash.phash(Image.fromarray(frame))
return "video-" + str(hash)
except Exception as e:
log.error(f"Error calculating video hash for {file_path}: {e}")
return None
def calc_hash(file_path: Path, use_image_hash=False):
"""
Calculate the hash of a file
"""
if use_image_hash:
mimetype = mimetypes.guess_type(file_path)
if not mimetype[0] or mimetype[0].startswith("image"):
if hash := image_hash(file_path):
return hash
elif mimetype[0].startswith("video"):
if hash := video_hash(file_path):
return hash
sha1 = hashlib.sha1()
with open(file_path, "rb") as f:
while True:
data = f.read(65536)
if not data:
break
sha1.update(data)
return sha1.hexdigest()
def recurse_files(directory: Path) -> Generator[Path, None, None]:
"""
Generator to yield all files in a directory tree that have an extension and are not hidden.
"""
for file in directory.rglob("*.*"):
if file.name.startswith("."):
continue
if file.is_file():
yield file
def parse_args() -> Namespace:
"""Parse and return command line arguments"""
parser = ArgumentParser(
description="Delete files from a library that have replacements in another library"
)
parser.add_argument(
"old_library_path", type=Path, help="The path to the old library"
)
parser.add_argument(
"new_library_path", type=Path, help="The path to the new library"
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Perform a dry run without deleting any files",
)
parser.add_argument(
"--use-image-hash",
action="store_true",
help="Use image hash to compare images",
)
parser.add_argument(
"--verbose",
action="store_true",
help="Increase verbosity",
)
return parser.parse_args()
def calc_hashes(directory: Path, use_image_hash: bool = False) -> dict[str, Path]:
"""Calculates the hash for all files in a directory and returns a dictionary of hashes to file paths"""
new_library_hashes: dict[str, Path] = {}
for file in recurse_files(directory):
new_hash = calc_hash(file, use_image_hash)
log.debug(f"{new_hash} {file}")
if new_hash in new_library_hashes:
log.warning("Hash collision: %s %s", file, new_library_hashes[new_hash])
new_library_hashes[new_hash] = file
return new_library_hashes
def delete_files(
old_library_path: Path,
new_library_hashes: dict[str, Path],
dry_run: bool = False,
use_image_hash: bool = False,
) -> tuple[int, int]:
"""
Delete files from the old library that have replacements in the new library.
Returns a tuple of the number of deleted files and the number of kept files.
"""
deleted_files: set[str] = set()
kept_files = 0
for file in recurse_files(old_library_path):
oldhash = calc_hash(file, use_image_hash)
log.debug(f"{oldhash} {file}")
if oldhash in new_library_hashes:
log.debug(f"Deleting {file}")
if not dry_run:
os.remove(file)
deleted_files.add(oldhash)
else:
log.debug(f"Keeping {file}")
kept_files += 1
for new_sha, new_file in new_library_hashes.items():
if new_sha not in deleted_files:
log.warning(f"Replacement file not found in old library: {new_file}")
return len(deleted_files), kept_files
def main() -> int:
args = parse_args()
if args.verbose:
log.setLevel(logging.DEBUG)
if not args.old_library_path.exists():
log.warning("Old library path does not exist: %s", args.old_library_path)
return 1
if not args.new_library_path.exists():
log.warning("New library path does not exist: %s", args.new_library_path)
return 1
log.info("Calculating hashes for new library")
new_library_hashes = calc_hashes(args.new_library_path, args.use_image_hash)
log.info(f"New library hashes calculated: {len(new_library_hashes)}")
log.info("Deleting files from old library")
deleted_files, kept_files = delete_files(
args.old_library_path, new_library_hashes, args.dry_run, args.use_image_hash
)
log.info(
f"Deleted files: {deleted_files} Kept files: {kept_files}: Unknown files: {len(new_library_hashes) - deleted_files}"
)
return 0
if __name__ == "__main__":
main()