Files
pvd/pvd_mezzanine.py
T
2026-05-07 18:00:52 +02:00

733 lines
24 KiB
Python

from __future__ import annotations
import json
import math
import os
import queue
import re
import subprocess
import sys
import threading
from dataclasses import dataclass
from pathlib import Path
from typing import Callable
try:
import tkinter as tk
from tkinter import filedialog, messagebox, ttk
except Exception:
tk = None
filedialog = None
messagebox = None
ttk = None
# =============================================================================
# 1. KONFIGURATION UND PFADE
# =============================================================================
FFMPEG_EXE = r"C:\Software\ffmpeg.exe"
FFPROBE_EXE = r"C:\Software\ffprobe.exe"
OUTPUT_BASE_DIR = r"H:\VOD"
GERMAN_LANGS = {"DEU", "GER"}
LANGUAGE_NAMES = {
"ARA": "Arabisch",
"CHI": "Chinesisch",
"DEU": "Deutsch",
"DUT": "Niederlaendisch",
"ENG": "Englisch",
"FRA": "Franzoesisch",
"FRE": "Franzoesisch",
"GER": "Deutsch",
"ITA": "Italienisch",
"JPN": "Japanisch",
"KOR": "Koreanisch",
"POL": "Polnisch",
"POR": "Portugiesisch",
"RUS": "Russisch",
"SPA": "Spanisch",
}
@dataclass
class AudioRole:
stream_index: int
list_index: int
language: str
channels_from_name: int | None
layout_from_name: str | None
detected_channels: int
codec: str
@property
def is_german(self) -> bool:
return self.language in GERMAN_LANGS
@property
def display_language(self) -> str:
return LANGUAGE_NAMES.get(self.language, self.language)
@property
def effective_channels(self) -> int:
return self.channels_from_name or self.detected_channels
@dataclass
class VideoProfile:
width: int
height: int
fps: float
keyint: int
is_sd: bool
is_ntsc: bool
is_interlaced: bool
target_prim: str
target_trc: str
target_space: str
target_name: str
needs_conversion: bool
@dataclass
class JobPlan:
input_file: str
output_dir: str
video_stream: dict
audio_streams: list[dict]
audio_roles: list[AudioRole]
video_profile: VideoProfile
pvd_mp4: str
german_mov: str | None
original_mov: str | None
selected_mp4_audio: AudioRole
forced_srt: str | None
commands: list[tuple[str, list[str]]]
def get_pvd_filename(input_path: str) -> str:
"""TITEL-EXTRAKTION (Blu-ray / ProRes / DVD)."""
path = Path(input_path)
path_parts = [p.upper() for p in path.parts]
if "BDMV" in path_parts:
bdmv_index = path_parts.index("BDMV")
try:
project_folder = path_parts[bdmv_index - 4]
except IndexError:
project_folder = path_parts[bdmv_index - 1] if bdmv_index > 0 else "UNKNOWN"
clean_name = re.sub(r"^BEST_", "", project_folder, flags=re.IGNORECASE)
words = [w.capitalize() for w in clean_name.split("_") if w.strip()]
extracted_title = "".join(words)
else:
extracted_title = strip_audio_tokens(path.stem)
extracted_title = re.sub(r"[^A-Za-z0-9]", "", extracted_title)
if not extracted_title:
extracted_title = "UNKNOWN_TITLE"
return f"{extracted_title}_DEU20_PVD.mp4"
def strip_audio_tokens(stem: str) -> str:
cleaned = stem
token_pattern = re.compile(r"(?i)(^|[_\-\s])([A-Z]{3}[1-8][0-9])(?=$|[_\-\s])")
while True:
next_cleaned = token_pattern.sub(r"\1", cleaned)
if next_cleaned == cleaned:
break
cleaned = next_cleaned
cleaned = re.sub(r"[_\-\s]+", "_", cleaned).strip("_- ")
return cleaned or stem
def safe_output_stem(input_path: str) -> str:
stem = strip_audio_tokens(Path(input_path).stem)
stem = re.sub(r"[^A-Za-z0-9]+", "_", stem).strip("_")
return stem or "UNKNOWN_TITLE"
def probe_metadata(filepath: str) -> dict:
cmd = [
FFPROBE_EXE,
"-v",
"quiet",
"-print_format",
"json",
"-show_streams",
"-show_format",
filepath,
]
result = subprocess.run(cmd, capture_output=True, text=True, check=False)
if result.returncode != 0:
raise RuntimeError(f"ffprobe konnte die Datei nicht lesen.\n{result.stderr}")
return json.loads(result.stdout)
def parse_fps(rate: str) -> float:
if not rate or rate == "0/0":
return 25.0
if "/" in rate:
num, den = map(int, rate.split("/", 1))
return num / den if den else float(num)
return float(rate)
def analyze_video(v_stream: dict) -> VideoProfile:
width = int(v_stream.get("width", 0))
height = int(v_stream.get("height", 0))
fps = parse_fps(v_stream.get("avg_frame_rate") or v_stream.get("r_frame_rate", "25/1"))
keyint = max(1, math.ceil(fps))
is_sd = height <= 576
is_ntsc = is_sd and height <= 480 and fps >= 29.0
field_order = v_stream.get("field_order", "unknown")
scan_type = v_stream.get("scan_type", "unknown")
is_interlaced = (
field_order not in ("progressive", "unknown")
or scan_type == "interlaced"
)
if is_sd and is_ntsc:
target_prim = "smpte170m"
target_trc = "smpte170m"
target_space = "smpte170m"
target_name = "Rec.601 NTSC (smpte170m)"
elif is_sd:
target_prim = "bt470bg"
target_trc = "bt470bg"
target_space = "bt470bg"
target_name = "Rec.601 PAL (bt470bg)"
else:
target_prim = "bt709"
target_trc = "bt709"
target_space = "bt709"
target_name = "Rec.709 HD"
source_prim = v_stream.get("color_primaries", "unknown")
source_trc = v_stream.get("color_transfer", "unknown")
source_space = v_stream.get("color_space", "unknown")
needs_conversion = (
source_prim == "unknown"
or source_prim != target_prim
or source_trc != target_trc
or source_space != target_space
)
return VideoProfile(
width=width,
height=height,
fps=fps,
keyint=keyint,
is_sd=is_sd,
is_ntsc=is_ntsc,
is_interlaced=is_interlaced,
target_prim=target_prim,
target_trc=target_trc,
target_space=target_space,
target_name=target_name,
needs_conversion=needs_conversion,
)
def parse_audio_tokens(input_path: str) -> list[tuple[str, int, str]]:
"""Reads tokens like DEU51_ENG20 from the filename in stream order."""
stem = Path(input_path).stem.upper()
roles = []
for match in re.finditer(r"(?<![A-Z0-9])([A-Z]{3})([1-8])([0-9])(?![A-Z0-9])", stem):
lang = match.group(1)
channels = int(match.group(2))
decimals = match.group(3)
label = f"{channels}.{decimals}"
if channels == 2 and decimals == "0":
channel_count = 2
elif channels == 5 and decimals == "1":
channel_count = 6
elif decimals == "0":
channel_count = channels
else:
channel_count = channels + 1
roles.append((lang, channel_count, label))
return roles
def build_audio_roles(input_path: str, audio_streams: list[dict]) -> list[AudioRole]:
tokens = parse_audio_tokens(input_path)
roles = []
for list_index, stream in enumerate(audio_streams):
token = tokens[list_index] if list_index < len(tokens) else None
tag_lang = stream.get("tags", {}).get("language", "und").upper()
language = token[0] if token else normalize_language(tag_lang)
channels_from_name = token[1] if token else None
layout_from_name = token[2] if token else None
roles.append(
AudioRole(
stream_index=int(stream["index"]),
list_index=list_index,
language=language,
channels_from_name=channels_from_name,
layout_from_name=layout_from_name,
detected_channels=int(stream.get("channels", 2)),
codec=stream.get("codec_name", "unknown"),
)
)
return roles
def normalize_language(language: str) -> str:
lang = (language or "UND").upper()
if lang == "GER":
return "DEU"
return lang[:3]
def select_mp4_german_audio(audio_roles: list[AudioRole]) -> AudioRole:
german_roles = [role for role in audio_roles if role.is_german]
if not german_roles:
raise RuntimeError("Keine deutsche Audiospur gefunden. Dateiname muss z.B. DEU51 oder DEU20 enthalten.")
stereo = [role for role in german_roles if role.effective_channels == 2]
return stereo[0] if stereo else german_roles[0]
def build_plan(input_file: str, output_dir: str = OUTPUT_BASE_DIR) -> JobPlan:
if not os.path.isfile(input_file):
raise FileNotFoundError(f"Eingabedatei nicht gefunden: {input_file}")
metadata = probe_metadata(input_file)
video_stream = next((s for s in metadata["streams"] if s["codec_type"] == "video"), None)
audio_streams = [s for s in metadata["streams"] if s["codec_type"] == "audio"]
if not video_stream:
raise RuntimeError("Kein Videostream gefunden.")
if not audio_streams:
raise RuntimeError("Keine Audio-Tracks gefunden.")
video_profile = analyze_video(video_stream)
audio_roles = build_audio_roles(input_file, audio_streams)
selected_mp4_audio = select_mp4_german_audio(audio_roles)
output_base = Path(output_dir)
pvd_mp4 = str(output_base / get_pvd_filename(input_file))
output_stem = safe_output_stem(input_file)
german_roles = [role for role in audio_roles if role.is_german]
original_roles = [role for role in audio_roles if not role.is_german]
german_mov = str(output_base / f"{output_stem}_DEU_AUDIO_PCM.mov") if german_roles else None
original_mov = str(output_base / f"{output_stem}_OV_AUDIO_PCM.mov") if original_roles else None
forced_srt_path = os.path.splitext(input_file)[0] + "_forced.srt"
forced_srt = forced_srt_path if os.path.exists(forced_srt_path) else None
plan = JobPlan(
input_file=input_file,
output_dir=output_dir,
video_stream=video_stream,
audio_streams=audio_streams,
audio_roles=audio_roles,
video_profile=video_profile,
pvd_mp4=pvd_mp4,
german_mov=german_mov,
original_mov=original_mov,
selected_mp4_audio=selected_mp4_audio,
forced_srt=forced_srt,
commands=[],
)
plan.commands = build_commands(plan)
return plan
def build_video_filters(plan: JobPlan) -> list[str]:
filters = []
profile = plan.video_profile
if profile.is_interlaced:
filters.append("bwdif=mode=0:parity=auto")
if profile.needs_conversion:
filters.append(
"colorspace="
f"primaries={profile.target_prim}:"
f"trc={profile.target_trc}:"
f"matrix={profile.target_space}:range=tv"
)
if plan.forced_srt:
clean_srt_path = plan.forced_srt.replace("\\", "/").replace(":", "\\:")
filters.append(f"subtitles='{clean_srt_path}':force_style='Fontsize=14,MarginV=38'")
return filters
def build_commands(plan: JobPlan) -> list[tuple[str, list[str]]]:
commands = [("Amazon PVD MP4", build_pvd_mp4_command(plan))]
german_roles = [role for role in plan.audio_roles if role.is_german]
original_roles = [role for role in plan.audio_roles if not role.is_german]
if german_roles and plan.german_mov:
commands.append(("Deutsche Audio-MOV", build_audio_mov_command(plan, german_roles, plan.german_mov)))
if original_roles and plan.original_mov:
commands.append(("Originalsprachige Audio-MOV", build_audio_mov_command(plan, original_roles, plan.original_mov)))
return commands
def build_pvd_mp4_command(plan: JobPlan) -> list[str]:
profile = plan.video_profile
selected_audio = plan.selected_mp4_audio
if profile.is_sd:
bv = "8M"
maxr = "10M"
bufs = "15M"
level = "3.1"
else:
bv = "30M"
maxr = "35M"
bufs = "50M"
level = "4.1"
cmd = [
FFMPEG_EXE,
"-hide_banner",
"-y",
"-i",
plan.input_file,
"-map",
f"0:{plan.video_stream['index']}",
"-map",
f"0:{selected_audio.stream_index}",
]
filters = build_video_filters(plan)
if filters:
cmd.extend(["-vf", ",".join(filters)])
cmd.extend(
[
"-c:v",
"libx264",
"-profile:v",
"high",
"-level",
level,
"-pix_fmt",
"yuv420p",
"-b:v",
bv,
"-maxrate",
maxr,
"-bufsize",
bufs,
"-preset",
"slow",
"-tune",
"film",
"-x264-params",
f"keyint={profile.keyint}:min-keyint=2:scenecut=40:bframes=3:aq-mode=2",
"-color_primaries",
profile.target_prim,
"-color_trc",
profile.target_trc,
"-colorspace",
profile.target_space,
"-c:a",
"aac",
"-b:a",
"256k",
"-ac:a:0",
"2",
"-ar:a:0",
"48000",
"-f",
"mp4",
"-movflags",
"+faststart",
"-metadata:s:a:0",
"language=deu",
"-metadata:s:v:0",
"language=deu",
"-map_chapters",
"-1",
"-map_metadata",
"-1",
"-avoid_negative_ts",
"make_zero",
plan.pvd_mp4,
]
)
return cmd
def build_audio_mov_command(plan: JobPlan, roles: list[AudioRole], output_path: str) -> list[str]:
cmd = [FFMPEG_EXE, "-hide_banner", "-y", "-i", plan.input_file]
for role in roles:
cmd.extend(["-map", f"0:{role.stream_index}"])
cmd.extend(["-vn", "-map_chapters", "-1", "-map_metadata", "-1"])
for idx, role in enumerate(roles):
cmd.extend([f"-c:a:{idx}", "pcm_s24le", f"-ar:a:{idx}", "48000"])
cmd.extend([f"-metadata:s:a:{idx}", f"language={role.language.lower()}"])
title = role.display_language
if role.layout_from_name:
title = f"{title} {role.layout_from_name}"
cmd.extend([f"-metadata:s:a:{idx}", f"title={title}"])
cmd.extend(["-f", "mov", output_path])
return cmd
def run_command(cmd: list[str], log: Callable[[str], None]) -> int:
log(" ".join(f'"{part}"' if " " in part else part for part in cmd))
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
)
assert process.stdout is not None
for line in process.stdout:
log(line.rstrip())
return process.wait()
def run_plan(plan: JobPlan, log: Callable[[str], None] = print) -> None:
os.makedirs(plan.output_dir, exist_ok=True)
log_plan(plan, log)
for label, cmd in plan.commands:
log("")
log(f"Starte: {label}")
returncode = run_command(cmd, log)
if returncode != 0:
raise RuntimeError(f"{label} fehlgeschlagen (FFmpeg-Code {returncode}).")
log(f"Fertig: {label}")
def log_plan(plan: JobPlan, log: Callable[[str], None] = print) -> None:
profile = plan.video_profile
log(f"Quelle: {plan.input_file}")
log(f"Video: {profile.width}x{profile.height} @ {profile.fps:.3f} fps, {profile.target_name}")
log(f"MP4: {plan.pvd_mp4}")
if plan.german_mov:
log(f"Deutsche Audio-MOV: {plan.german_mov}")
if plan.original_mov:
log(f"Originalsprachige Audio-MOV: {plan.original_mov}")
log("Audio-Tracks:")
for role in plan.audio_roles:
layout = role.layout_from_name or f"{role.detected_channels}.0?"
selected = " -> MP4" if role == plan.selected_mp4_audio else ""
log(
f" Spur {role.list_index + 1}: Stream {role.stream_index}, "
f"{role.display_language}, {layout}, {role.codec}{selected}"
)
if not plan.original_mov:
log("Nur deutsche Tonspuren erkannt; originalsprachige Audio-MOV wird nicht erzeugt.")
if plan.selected_mp4_audio.effective_channels != 2:
log("MP4-Ton: keine deutsche Stereo-Spur gefunden, FFmpeg downmixt die gewaehlte deutsche Spur auf Stereo.")
def run_cli(input_file: str, output_dir: str = OUTPUT_BASE_DIR) -> int:
try:
plan = build_plan(input_file, output_dir)
run_plan(plan)
print("\nERFOLGREICH abgeschlossen.")
return 0
except Exception as exc:
print(f"\nFEHLER: {exc}")
return 1
finally:
if sys.stdin.isatty():
os.system("pause")
class MezzanineApp:
def __init__(self, root: tk.Tk):
self.root = root
self.root.title("Amazon PVD Mezzanine Encoder")
self.root.geometry("980x680")
self.log_queue: queue.Queue[str] = queue.Queue()
self.worker: threading.Thread | None = None
self.plan: JobPlan | None = None
self.input_var = tk.StringVar()
self.output_var = tk.StringVar(value=OUTPUT_BASE_DIR)
self.status_var = tk.StringVar(value="Quelle auswaehlen und analysieren.")
self.build_ui()
self.root.after(100, self.flush_log_queue)
def build_ui(self) -> None:
outer = ttk.Frame(self.root, padding=14)
outer.pack(fill="both", expand=True)
file_row = ttk.Frame(outer)
file_row.pack(fill="x")
ttk.Label(file_row, text="Quelle").pack(side="left")
ttk.Entry(file_row, textvariable=self.input_var).pack(side="left", fill="x", expand=True, padx=8)
ttk.Button(file_row, text="Auswaehlen", command=self.choose_input).pack(side="left")
output_row = ttk.Frame(outer)
output_row.pack(fill="x", pady=(8, 0))
ttk.Label(output_row, text="Ausgabe").pack(side="left")
ttk.Entry(output_row, textvariable=self.output_var).pack(side="left", fill="x", expand=True, padx=8)
ttk.Button(output_row, text="Ordner", command=self.choose_output).pack(side="left")
button_row = ttk.Frame(outer)
button_row.pack(fill="x", pady=12)
self.analyze_button = ttk.Button(button_row, text="Analysieren", command=self.analyze)
self.analyze_button.pack(side="left")
self.start_button = ttk.Button(button_row, text="Encoding starten", command=self.start, state="disabled")
self.start_button.pack(side="left", padx=8)
ttk.Label(button_row, textvariable=self.status_var).pack(side="left", padx=12)
columns = ("track", "stream", "language", "layout", "codec", "usage")
self.tree = ttk.Treeview(outer, columns=columns, show="headings", height=8)
headings = {
"track": "Spur",
"stream": "Stream",
"language": "Sprache",
"layout": "Format",
"codec": "Codec",
"usage": "Verwendung",
}
widths = {"track": 60, "stream": 80, "language": 160, "layout": 100, "codec": 120, "usage": 300}
for col in columns:
self.tree.heading(col, text=headings[col])
self.tree.column(col, width=widths[col], anchor="w")
self.tree.pack(fill="x", pady=(0, 12))
self.summary = tk.Text(outer, height=7, wrap="word")
self.summary.pack(fill="x", pady=(0, 12))
self.summary.configure(state="disabled")
self.log_text = tk.Text(outer, wrap="word")
self.log_text.pack(fill="both", expand=True)
self.log_text.configure(state="disabled")
def choose_input(self) -> None:
path = filedialog.askopenfilename(
title="Basis-Video auswaehlen",
filetypes=[
("Video-Dateien", "*.mov *.mxf *.mp4 *.mkv *.ts *.m2ts *.vob"),
("Alle Dateien", "*.*"),
],
)
if path:
self.input_var.set(path)
self.start_button.configure(state="disabled")
def choose_output(self) -> None:
path = filedialog.askdirectory(title="Ausgabeordner auswaehlen", initialdir=self.output_var.get())
if path:
self.output_var.set(path)
def analyze(self) -> None:
try:
self.plan = build_plan(self.input_var.get(), self.output_var.get())
self.render_plan()
self.start_button.configure(state="normal")
self.status_var.set("Analyse fertig. Encoding kann gestartet werden.")
except Exception as exc:
self.plan = None
self.start_button.configure(state="disabled")
self.status_var.set("Analyse fehlgeschlagen.")
messagebox.showerror("Analyse fehlgeschlagen", str(exc))
def render_plan(self) -> None:
assert self.plan is not None
for item in self.tree.get_children():
self.tree.delete(item)
for role in self.plan.audio_roles:
usage = []
if role == self.plan.selected_mp4_audio:
usage.append("MP4 Stereo")
if role.is_german:
usage.append("DEU PCM-MOV")
else:
usage.append("OV PCM-MOV")
self.tree.insert(
"",
"end",
values=(
role.list_index + 1,
role.stream_index,
role.display_language,
role.layout_from_name or f"{role.detected_channels} Kan.",
role.codec,
", ".join(usage),
),
)
lines: list[str] = []
log_plan(self.plan, lines.append)
self.summary.configure(state="normal")
self.summary.delete("1.0", "end")
self.summary.insert("end", "\n".join(lines))
self.summary.configure(state="disabled")
def start(self) -> None:
if self.worker and self.worker.is_alive():
return
if self.plan is None:
self.analyze()
if self.plan is None:
return
self.start_button.configure(state="disabled")
self.analyze_button.configure(state="disabled")
self.status_var.set("Encoding laeuft...")
self.clear_log()
self.worker = threading.Thread(target=self.run_worker, daemon=True)
self.worker.start()
def run_worker(self) -> None:
try:
assert self.plan is not None
run_plan(self.plan, self.log_queue.put)
self.log_queue.put("\nERFOLGREICH abgeschlossen.")
self.log_queue.put("__DONE__")
except Exception as exc:
self.log_queue.put(f"\nFEHLER: {exc}")
self.log_queue.put("__FAILED__")
def flush_log_queue(self) -> None:
try:
while True:
message = self.log_queue.get_nowait()
if message == "__DONE__":
self.status_var.set("Fertig.")
self.start_button.configure(state="normal")
self.analyze_button.configure(state="normal")
continue
if message == "__FAILED__":
self.status_var.set("Fehler.")
self.start_button.configure(state="normal")
self.analyze_button.configure(state="normal")
continue
self.append_log(message)
except queue.Empty:
pass
self.root.after(100, self.flush_log_queue)
def append_log(self, message: str) -> None:
self.log_text.configure(state="normal")
self.log_text.insert("end", message + "\n")
self.log_text.see("end")
self.log_text.configure(state="disabled")
def clear_log(self) -> None:
self.log_text.configure(state="normal")
self.log_text.delete("1.0", "end")
self.log_text.configure(state="disabled")
def run_ui() -> int:
if tk is None:
print("Tkinter ist nicht verfuegbar. Bitte Datei per CLI uebergeben.")
return 1
root = tk.Tk()
app = MezzanineApp(root)
if len(sys.argv) > 1 and os.path.isfile(sys.argv[1]):
app.input_var.set(sys.argv[1])
root.mainloop()
return 0
def main() -> int:
if len(sys.argv) > 2 and sys.argv[1] == "--cli":
return run_cli(sys.argv[2], OUTPUT_BASE_DIR)
return run_ui()
if __name__ == "__main__":
raise SystemExit(main())