From 4e653980dc0972e97e9109475f57dbc82d89983e Mon Sep 17 00:00:00 2001 From: ViViDboarder Date: Thu, 13 Jun 2024 12:43:11 -0700 Subject: [PATCH] Add concat_video and rm-empty-dirs Some utilities I use from time to time --- assets/default/bin/concat_video | 133 +++++++++++++++++++++++++++++++ assets/default/bin/rm-empty-dirs | 41 ++++++++++ 2 files changed, 174 insertions(+) create mode 100755 assets/default/bin/concat_video create mode 100755 assets/default/bin/rm-empty-dirs diff --git a/assets/default/bin/concat_video b/assets/default/bin/concat_video new file mode 100755 index 0000000..e74457a --- /dev/null +++ b/assets/default/bin/concat_video @@ -0,0 +1,133 @@ +#! /usr/bin/env python3 + +import re +import subprocess +from argparse import ArgumentParser +from pathlib import Path +from subprocess import check_output, CalledProcessError +from typing import List, Dict, Optional + + +FILELIST = Path("./concat-files.txt") +DEFAULT_SLUG_PATTERN = "^(.+[^0-9])[0-9]+\\.[a-zA-Z0-9]+$" +DEFAULT_NAME_PATTERN = "\\1" + + +def call_output(*popenargs, **kwargs): + """Similar to check_output, but instead returns output and exception""" + # So we can capture complete output, redirect sderr to stdout + kwargs.setdefault('stderr', subprocess.STDOUT) + output, ex = None, None + try: + output = check_output(*popenargs, **kwargs) + except CalledProcessError as e: + output, ex = e.output, e + + output = output.rstrip(b'\n') + return output, ex + + +def write_file_list(file_list: Path, files: List[Path]): + file_list.write_text("\n".join(list(( + f"file '{str(f.relative_to(file_list.parent))}'" + # for path, don't reverse the files + # for f in reversed(files) + for f in files + )))) + + +def concat_files(output_file: Path, files: List[Path]) -> subprocess.CompletedProcess: + file_list = output_file.with_suffix(".txt") + output_file_tmp = output_file.with_suffix(".tmp" + output_file.suffix) + write_file_list(file_list, files) + result = subprocess.run([ + "ffmpeg", + "-f", + "concat", + "-safe", + "0", + "-i", + str(file_list), + "-codec", + "copy", + str(output_file_tmp), + ]) + + print(result) + if result.returncode == 0: + output_file_tmp.rename(output_file) + + return result + + +def extract_slug( + f: Path, + pattern: str=DEFAULT_SLUG_PATTERN, + name: str=DEFAULT_NAME_PATTERN, +) -> Optional[str]: + result = re.search(pattern, f.name) + if not result: + return None + + return result.expand(name) + + +def main(): + parser = ArgumentParser(description="Merge video files") + parser.add_argument("paths", metavar="path", nargs="+") + parser.add_argument("--write", action="store_true") + parser.add_argument("--clean", action="store_true") + parser.add_argument("--pattern", default=DEFAULT_SLUG_PATTERN) + parser.add_argument("--name", default=DEFAULT_NAME_PATTERN) + args = parser.parse_args() + + paths = [Path(path).resolve() for path in args.paths] + # print(paths) + + if any(path.is_dir() for path in paths) and any(path.is_file() for path in paths): + raise ValueError("Cannot mix lists of files and paths") + + if all(path.is_file() for path in paths): + slug = extract_slug(paths[0], args.pattern, args.name) + output_file = Path.cwd() / f"{slug.strip()}{paths[0].suffix}" + result = concat_files(output_file, list(paths)) + result.check_returncode() + return + + for path in paths: + seen_slugs: Dict[str, List[Path]] = {} + for file in path.glob("*.*"): + slug = extract_slug(file, args.pattern, args.name) + if not slug: + continue + seen_slugs.setdefault(slug, []).append(file) + + for slug, parts in seen_slugs.items(): + if len(parts) < 2: + continue + output_file = path / f"{slug.strip()}{parts[0].suffix}" + if output_file.exists(): + print(output_file, "already exists...") + if args.write and args.clean: + print("Cleaning parts...") + for part in parts: + part.unlink() + else: + print("dry run") + continue + + print(f"Merging {str(output_file)}:") + parts.sort(key=lambda x: str(x)) + for part in parts: + print("\t"+str(part)) + + if args.write: + result = concat_files(output_file, parts) + result.check_returncode() + pass + else: + print("dry run") + + +if __name__ == "__main__": + main() diff --git a/assets/default/bin/rm-empty-dirs b/assets/default/bin/rm-empty-dirs new file mode 100755 index 0000000..66e2a78 --- /dev/null +++ b/assets/default/bin/rm-empty-dirs @@ -0,0 +1,41 @@ +#! /usr/bin/env python3 +import sys +from pathlib import Path + + +def delete_empty(p: Path) -> bool: + if p.is_file(): + print(f"{p} is a file, will not delete parent dirs") + return False + + if not p.is_dir(): + raise ValueError(f"unknown type {p}") + + for child in p.iterdir(): + if not delete_empty(child): + print(f"Skipping delete of {p}") + return False + + p.rmdir() + + return True + + +def main() -> None: + paths = [Path(arg) for arg in sys.argv[1:]] + if not paths: + print( + "\n".join( + ( + "Usage: rm-empty-dirs path [path...]", + "", + "Recursively deletes any empty dirs using a depth first search.", + ) + ) + ) + for path in paths: + delete_empty(path) + + +if __name__ == "__main__": + main()