#! /usr/bin/env python3 import re import subprocess from argparse import ArgumentParser from pathlib import Path from subprocess import check_output, CalledProcessError from typing import List, Dict, Optional FILELIST = Path("./concat-files.txt") DEFAULT_SLUG_PATTERN = "^(.+[^0-9])[0-9]+\\.[a-zA-Z0-9]+$" DEFAULT_NAME_PATTERN = "\\1" def call_output(*popenargs, **kwargs): """Similar to check_output, but instead returns output and exception""" # So we can capture complete output, redirect sderr to stdout kwargs.setdefault('stderr', subprocess.STDOUT) output, ex = None, None try: output = check_output(*popenargs, **kwargs) except CalledProcessError as e: output, ex = e.output, e output = output.rstrip(b'\n') return output, ex def write_file_list(file_list: Path, files: List[Path]): file_list.write_text("\n".join(list(( f"file '{str(f.relative_to(file_list.parent))}'" # for path, don't reverse the files # for f in reversed(files) for f in files )))) def concat_files(output_file: Path, files: List[Path]) -> subprocess.CompletedProcess: file_list = output_file.with_suffix(".txt") output_file_tmp = output_file.with_suffix(".tmp" + output_file.suffix) write_file_list(file_list, files) result = subprocess.run([ "ffmpeg", "-f", "concat", "-safe", "0", "-i", str(file_list), "-codec", "copy", str(output_file_tmp), ]) print(result) if result.returncode == 0: output_file_tmp.rename(output_file) return result def extract_slug( f: Path, pattern: str=DEFAULT_SLUG_PATTERN, name: str=DEFAULT_NAME_PATTERN, ) -> Optional[str]: result = re.search(pattern, f.name) if not result: return None return result.expand(name) def main(): parser = ArgumentParser(description="Merge video files") parser.add_argument("paths", metavar="path", nargs="+") parser.add_argument("--write", action="store_true") parser.add_argument("--clean", action="store_true") parser.add_argument("--pattern", default=DEFAULT_SLUG_PATTERN) parser.add_argument("--name", default=DEFAULT_NAME_PATTERN) args = parser.parse_args() paths = [Path(path).resolve() for path in args.paths] # print(paths) if any(path.is_dir() for path in paths) and any(path.is_file() for path in paths): raise ValueError("Cannot mix lists of files and paths") if all(path.is_file() for path in paths): slug = extract_slug(paths[0], args.pattern, args.name) output_file = Path.cwd() / f"{slug.strip()}{paths[0].suffix}" result = concat_files(output_file, list(paths)) result.check_returncode() return for path in paths: seen_slugs: Dict[str, List[Path]] = {} for file in path.glob("*.*"): slug = extract_slug(file, args.pattern, args.name) if not slug: continue seen_slugs.setdefault(slug, []).append(file) for slug, parts in seen_slugs.items(): if len(parts) < 2: continue output_file = path / f"{slug.strip()}{parts[0].suffix}" if output_file.exists(): print(output_file, "already exists...") if args.write and args.clean: print("Cleaning parts...") for part in parts: part.unlink() else: print("dry run") continue print(f"Merging {str(output_file)}:") parts.sort(key=lambda x: str(x)) for part in parts: print("\t"+str(part)) if args.write: result = concat_files(output_file, parts) result.check_returncode() pass else: print("dry run") if __name__ == "__main__": main()