shoestrap/assets/default/bin/concat_video
ViViDboarder 4e653980dc Add concat_video and rm-empty-dirs
Some utilities I use from time to time
2024-06-13 12:44:38 -07:00

134 lines
3.9 KiB
Python
Executable File

#! /usr/bin/env python3
import re
import subprocess
from argparse import ArgumentParser
from pathlib import Path
from subprocess import check_output, CalledProcessError
from typing import List, Dict, Optional
FILELIST = Path("./concat-files.txt")
DEFAULT_SLUG_PATTERN = "^(.+[^0-9])[0-9]+\\.[a-zA-Z0-9]+$"
DEFAULT_NAME_PATTERN = "\\1"
def call_output(*popenargs, **kwargs):
"""Similar to check_output, but instead returns output and exception"""
# So we can capture complete output, redirect sderr to stdout
kwargs.setdefault('stderr', subprocess.STDOUT)
output, ex = None, None
try:
output = check_output(*popenargs, **kwargs)
except CalledProcessError as e:
output, ex = e.output, e
output = output.rstrip(b'\n')
return output, ex
def write_file_list(file_list: Path, files: List[Path]):
file_list.write_text("\n".join(list((
f"file '{str(f.relative_to(file_list.parent))}'"
# for path, don't reverse the files
# for f in reversed(files)
for f in files
))))
def concat_files(output_file: Path, files: List[Path]) -> subprocess.CompletedProcess:
file_list = output_file.with_suffix(".txt")
output_file_tmp = output_file.with_suffix(".tmp" + output_file.suffix)
write_file_list(file_list, files)
result = subprocess.run([
"ffmpeg",
"-f",
"concat",
"-safe",
"0",
"-i",
str(file_list),
"-codec",
"copy",
str(output_file_tmp),
])
print(result)
if result.returncode == 0:
output_file_tmp.rename(output_file)
return result
def extract_slug(
f: Path,
pattern: str=DEFAULT_SLUG_PATTERN,
name: str=DEFAULT_NAME_PATTERN,
) -> Optional[str]:
result = re.search(pattern, f.name)
if not result:
return None
return result.expand(name)
def main():
parser = ArgumentParser(description="Merge video files")
parser.add_argument("paths", metavar="path", nargs="+")
parser.add_argument("--write", action="store_true")
parser.add_argument("--clean", action="store_true")
parser.add_argument("--pattern", default=DEFAULT_SLUG_PATTERN)
parser.add_argument("--name", default=DEFAULT_NAME_PATTERN)
args = parser.parse_args()
paths = [Path(path).resolve() for path in args.paths]
# print(paths)
if any(path.is_dir() for path in paths) and any(path.is_file() for path in paths):
raise ValueError("Cannot mix lists of files and paths")
if all(path.is_file() for path in paths):
slug = extract_slug(paths[0], args.pattern, args.name)
output_file = Path.cwd() / f"{slug.strip()}{paths[0].suffix}"
result = concat_files(output_file, list(paths))
result.check_returncode()
return
for path in paths:
seen_slugs: Dict[str, List[Path]] = {}
for file in path.glob("*.*"):
slug = extract_slug(file, args.pattern, args.name)
if not slug:
continue
seen_slugs.setdefault(slug, []).append(file)
for slug, parts in seen_slugs.items():
if len(parts) < 2:
continue
output_file = path / f"{slug.strip()}{parts[0].suffix}"
if output_file.exists():
print(output_file, "already exists...")
if args.write and args.clean:
print("Cleaning parts...")
for part in parts:
part.unlink()
else:
print("dry run")
continue
print(f"Merging {str(output_file)}:")
parts.sort(key=lambda x: str(x))
for part in parts:
print("\t"+str(part))
if args.write:
result = concat_files(output_file, parts)
result.check_returncode()
pass
else:
print("dry run")
if __name__ == "__main__":
main()