from __future__ import annotations import json import math import os import queue import re import subprocess import sys import threading from dataclasses import dataclass from pathlib import Path from typing import Callable try: import tkinter as tk from tkinter import filedialog, messagebox, ttk except Exception: tk = None filedialog = None messagebox = None ttk = None # ============================================================================= # 1. KONFIGURATION UND PFADE # ============================================================================= FFMPEG_EXE = r"C:\Software\ffmpeg.exe" FFPROBE_EXE = r"C:\Software\ffprobe.exe" OUTPUT_BASE_DIR = r"H:\VOD" GERMAN_LANGS = {"DEU", "GER"} LANGUAGE_NAMES = { "ARA": "Arabisch", "CHI": "Chinesisch", "DEU": "Deutsch", "DUT": "Niederlaendisch", "ENG": "Englisch", "FRA": "Franzoesisch", "FRE": "Franzoesisch", "GER": "Deutsch", "ITA": "Italienisch", "JPN": "Japanisch", "KOR": "Koreanisch", "POL": "Polnisch", "POR": "Portugiesisch", "RUS": "Russisch", "SPA": "Spanisch", } @dataclass class AudioRole: stream_index: int list_index: int language: str channels_from_name: int | None layout_from_name: str | None detected_channels: int codec: str @property def is_german(self) -> bool: return self.language in GERMAN_LANGS @property def display_language(self) -> str: return LANGUAGE_NAMES.get(self.language, self.language) @property def effective_channels(self) -> int: return self.channels_from_name or self.detected_channels @dataclass class VideoProfile: width: int height: int fps: float keyint: int is_sd: bool is_ntsc: bool is_interlaced: bool target_prim: str target_trc: str target_space: str target_name: str needs_conversion: bool @dataclass class JobPlan: input_file: str output_dir: str video_stream: dict audio_streams: list[dict] audio_roles: list[AudioRole] video_profile: VideoProfile pvd_mp4: str german_mov: str | None original_mov: str | None selected_mp4_audio: AudioRole forced_srt: str | None commands: list[tuple[str, list[str]]] def get_pvd_filename(input_path: str) -> str: """TITEL-EXTRAKTION (Blu-ray / ProRes / DVD).""" path = Path(input_path) path_parts = [p.upper() for p in path.parts] if "BDMV" in path_parts: bdmv_index = path_parts.index("BDMV") try: project_folder = path_parts[bdmv_index - 4] except IndexError: project_folder = path_parts[bdmv_index - 1] if bdmv_index > 0 else "UNKNOWN" clean_name = re.sub(r"^BEST_", "", project_folder, flags=re.IGNORECASE) words = [w.capitalize() for w in clean_name.split("_") if w.strip()] extracted_title = "".join(words) else: extracted_title = strip_audio_tokens(path.stem) extracted_title = re.sub(r"[^A-Za-z0-9]", "", extracted_title) if not extracted_title: extracted_title = "UNKNOWN_TITLE" return f"{extracted_title}_DEU20_PVD.mp4" def strip_audio_tokens(stem: str) -> str: cleaned = stem token_pattern = re.compile(r"(?i)(^|[_\-\s])([A-Z]{3}[1-8][0-9])(?=$|[_\-\s])") while True: next_cleaned = token_pattern.sub(r"\1", cleaned) if next_cleaned == cleaned: break cleaned = next_cleaned cleaned = re.sub(r"[_\-\s]+", "_", cleaned).strip("_- ") return cleaned or stem def safe_output_stem(input_path: str) -> str: stem = strip_audio_tokens(Path(input_path).stem) stem = re.sub(r"[^A-Za-z0-9]+", "_", stem).strip("_") return stem or "UNKNOWN_TITLE" def probe_metadata(filepath: str) -> dict: cmd = [ FFPROBE_EXE, "-v", "quiet", "-print_format", "json", "-show_streams", "-show_format", filepath, ] result = subprocess.run(cmd, capture_output=True, text=True, check=False) if result.returncode != 0: raise RuntimeError(f"ffprobe konnte die Datei nicht lesen.\n{result.stderr}") return json.loads(result.stdout) def parse_fps(rate: str) -> float: if not rate or rate == "0/0": return 25.0 if "/" in rate: num, den = map(int, rate.split("/", 1)) return num / den if den else float(num) return float(rate) def analyze_video(v_stream: dict) -> VideoProfile: width = int(v_stream.get("width", 0)) height = int(v_stream.get("height", 0)) fps = parse_fps(v_stream.get("avg_frame_rate") or v_stream.get("r_frame_rate", "25/1")) keyint = max(1, math.ceil(fps)) is_sd = height <= 576 is_ntsc = is_sd and height <= 480 and fps >= 29.0 field_order = v_stream.get("field_order", "unknown") scan_type = v_stream.get("scan_type", "unknown") is_interlaced = ( field_order not in ("progressive", "unknown") or scan_type == "interlaced" ) if is_sd and is_ntsc: target_prim = "smpte170m" target_trc = "smpte170m" target_space = "smpte170m" target_name = "Rec.601 NTSC (smpte170m)" elif is_sd: target_prim = "bt470bg" target_trc = "bt470bg" target_space = "bt470bg" target_name = "Rec.601 PAL (bt470bg)" else: target_prim = "bt709" target_trc = "bt709" target_space = "bt709" target_name = "Rec.709 HD" source_prim = v_stream.get("color_primaries", "unknown") source_trc = v_stream.get("color_transfer", "unknown") source_space = v_stream.get("color_space", "unknown") needs_conversion = ( source_prim == "unknown" or source_prim != target_prim or source_trc != target_trc or source_space != target_space ) return VideoProfile( width=width, height=height, fps=fps, keyint=keyint, is_sd=is_sd, is_ntsc=is_ntsc, is_interlaced=is_interlaced, target_prim=target_prim, target_trc=target_trc, target_space=target_space, target_name=target_name, needs_conversion=needs_conversion, ) def parse_audio_tokens(input_path: str) -> list[tuple[str, int, str]]: """Reads tokens like DEU51_ENG20 from the filename in stream order.""" stem = Path(input_path).stem.upper() roles = [] for match in re.finditer(r"(? list[AudioRole]: tokens = parse_audio_tokens(input_path) roles = [] for list_index, stream in enumerate(audio_streams): token = tokens[list_index] if list_index < len(tokens) else None tag_lang = stream.get("tags", {}).get("language", "und").upper() language = token[0] if token else normalize_language(tag_lang) channels_from_name = token[1] if token else None layout_from_name = token[2] if token else None roles.append( AudioRole( stream_index=int(stream["index"]), list_index=list_index, language=language, channels_from_name=channels_from_name, layout_from_name=layout_from_name, detected_channels=int(stream.get("channels", 2)), codec=stream.get("codec_name", "unknown"), ) ) return roles def normalize_language(language: str) -> str: lang = (language or "UND").upper() if lang == "GER": return "DEU" return lang[:3] def select_mp4_german_audio(audio_roles: list[AudioRole]) -> AudioRole: german_roles = [role for role in audio_roles if role.is_german] if not german_roles: raise RuntimeError("Keine deutsche Audiospur gefunden. Dateiname muss z.B. DEU51 oder DEU20 enthalten.") stereo = [role for role in german_roles if role.effective_channels == 2] return stereo[0] if stereo else german_roles[0] def build_plan(input_file: str, output_dir: str = OUTPUT_BASE_DIR) -> JobPlan: if not os.path.isfile(input_file): raise FileNotFoundError(f"Eingabedatei nicht gefunden: {input_file}") metadata = probe_metadata(input_file) video_stream = next((s for s in metadata["streams"] if s["codec_type"] == "video"), None) audio_streams = [s for s in metadata["streams"] if s["codec_type"] == "audio"] if not video_stream: raise RuntimeError("Kein Videostream gefunden.") if not audio_streams: raise RuntimeError("Keine Audio-Tracks gefunden.") video_profile = analyze_video(video_stream) audio_roles = build_audio_roles(input_file, audio_streams) selected_mp4_audio = select_mp4_german_audio(audio_roles) output_base = Path(output_dir) pvd_mp4 = str(output_base / get_pvd_filename(input_file)) output_stem = safe_output_stem(input_file) german_roles = [role for role in audio_roles if role.is_german] original_roles = [role for role in audio_roles if not role.is_german] german_mov = str(output_base / f"{output_stem}_DEU_AUDIO_PCM.mov") if german_roles else None original_mov = str(output_base / f"{output_stem}_OV_AUDIO_PCM.mov") if original_roles else None forced_srt_path = os.path.splitext(input_file)[0] + "_forced.srt" forced_srt = forced_srt_path if os.path.exists(forced_srt_path) else None plan = JobPlan( input_file=input_file, output_dir=output_dir, video_stream=video_stream, audio_streams=audio_streams, audio_roles=audio_roles, video_profile=video_profile, pvd_mp4=pvd_mp4, german_mov=german_mov, original_mov=original_mov, selected_mp4_audio=selected_mp4_audio, forced_srt=forced_srt, commands=[], ) plan.commands = build_commands(plan) return plan def build_video_filters(plan: JobPlan) -> list[str]: filters = [] profile = plan.video_profile if profile.is_interlaced: filters.append("bwdif=mode=0:parity=auto") if profile.needs_conversion: filters.append( "colorspace=" f"primaries={profile.target_prim}:" f"trc={profile.target_trc}:" f"matrix={profile.target_space}:range=tv" ) if plan.forced_srt: clean_srt_path = plan.forced_srt.replace("\\", "/").replace(":", "\\:") filters.append(f"subtitles='{clean_srt_path}':force_style='Fontsize=14,MarginV=38'") return filters def build_commands(plan: JobPlan) -> list[tuple[str, list[str]]]: commands = [("Amazon PVD MP4", build_pvd_mp4_command(plan))] german_roles = [role for role in plan.audio_roles if role.is_german] original_roles = [role for role in plan.audio_roles if not role.is_german] if german_roles and plan.german_mov: commands.append(("Deutsche Audio-MOV", build_audio_mov_command(plan, german_roles, plan.german_mov))) if original_roles and plan.original_mov: commands.append(("Originalsprachige Audio-MOV", build_audio_mov_command(plan, original_roles, plan.original_mov))) return commands def build_pvd_mp4_command(plan: JobPlan) -> list[str]: profile = plan.video_profile selected_audio = plan.selected_mp4_audio if profile.is_sd: bv = "8M" maxr = "10M" bufs = "15M" level = "3.1" else: bv = "30M" maxr = "35M" bufs = "50M" level = "4.1" cmd = [ FFMPEG_EXE, "-hide_banner", "-y", "-i", plan.input_file, "-map", f"0:{plan.video_stream['index']}", "-map", f"0:{selected_audio.stream_index}", ] filters = build_video_filters(plan) if filters: cmd.extend(["-vf", ",".join(filters)]) cmd.extend( [ "-c:v", "libx264", "-profile:v", "high", "-level", level, "-pix_fmt", "yuv420p", "-b:v", bv, "-maxrate", maxr, "-bufsize", bufs, "-preset", "slow", "-tune", "film", "-x264-params", f"keyint={profile.keyint}:min-keyint=2:scenecut=40:bframes=3:aq-mode=2", "-color_primaries", profile.target_prim, "-color_trc", profile.target_trc, "-colorspace", profile.target_space, "-c:a", "aac", "-b:a", "256k", "-ac:a:0", "2", "-ar:a:0", "48000", "-f", "mp4", "-movflags", "+faststart", "-metadata:s:a:0", "language=deu", "-metadata:s:v:0", "language=deu", "-map_chapters", "-1", "-map_metadata", "-1", "-avoid_negative_ts", "make_zero", plan.pvd_mp4, ] ) return cmd def build_audio_mov_command(plan: JobPlan, roles: list[AudioRole], output_path: str) -> list[str]: cmd = [FFMPEG_EXE, "-hide_banner", "-y", "-i", plan.input_file] for role in roles: cmd.extend(["-map", f"0:{role.stream_index}"]) cmd.extend(["-vn", "-map_chapters", "-1", "-map_metadata", "-1"]) for idx, role in enumerate(roles): cmd.extend([f"-c:a:{idx}", "pcm_s24le", f"-ar:a:{idx}", "48000"]) cmd.extend([f"-metadata:s:a:{idx}", f"language={role.language.lower()}"]) title = role.display_language if role.layout_from_name: title = f"{title} {role.layout_from_name}" cmd.extend([f"-metadata:s:a:{idx}", f"title={title}"]) cmd.extend(["-f", "mov", output_path]) return cmd def run_command(cmd: list[str], log: Callable[[str], None]) -> int: log(" ".join(f'"{part}"' if " " in part else part for part in cmd)) process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, ) assert process.stdout is not None for line in process.stdout: log(line.rstrip()) return process.wait() def run_plan(plan: JobPlan, log: Callable[[str], None] = print) -> None: os.makedirs(plan.output_dir, exist_ok=True) log_plan(plan, log) for label, cmd in plan.commands: log("") log(f"Starte: {label}") returncode = run_command(cmd, log) if returncode != 0: raise RuntimeError(f"{label} fehlgeschlagen (FFmpeg-Code {returncode}).") log(f"Fertig: {label}") def log_plan(plan: JobPlan, log: Callable[[str], None] = print) -> None: profile = plan.video_profile log(f"Quelle: {plan.input_file}") log(f"Video: {profile.width}x{profile.height} @ {profile.fps:.3f} fps, {profile.target_name}") log(f"MP4: {plan.pvd_mp4}") if plan.german_mov: log(f"Deutsche Audio-MOV: {plan.german_mov}") if plan.original_mov: log(f"Originalsprachige Audio-MOV: {plan.original_mov}") log("Audio-Tracks:") for role in plan.audio_roles: layout = role.layout_from_name or f"{role.detected_channels}.0?" selected = " -> MP4" if role == plan.selected_mp4_audio else "" log( f" Spur {role.list_index + 1}: Stream {role.stream_index}, " f"{role.display_language}, {layout}, {role.codec}{selected}" ) if not plan.original_mov: log("Nur deutsche Tonspuren erkannt; originalsprachige Audio-MOV wird nicht erzeugt.") if plan.selected_mp4_audio.effective_channels != 2: log("MP4-Ton: keine deutsche Stereo-Spur gefunden, FFmpeg downmixt die gewaehlte deutsche Spur auf Stereo.") def run_cli(input_file: str, output_dir: str = OUTPUT_BASE_DIR) -> int: try: plan = build_plan(input_file, output_dir) run_plan(plan) print("\nERFOLGREICH abgeschlossen.") return 0 except Exception as exc: print(f"\nFEHLER: {exc}") return 1 finally: if sys.stdin.isatty(): os.system("pause") class MezzanineApp: def __init__(self, root: tk.Tk): self.root = root self.root.title("Amazon PVD Mezzanine Encoder") self.root.geometry("980x680") self.log_queue: queue.Queue[str] = queue.Queue() self.worker: threading.Thread | None = None self.plan: JobPlan | None = None self.input_var = tk.StringVar() self.output_var = tk.StringVar(value=OUTPUT_BASE_DIR) self.status_var = tk.StringVar(value="Quelle auswaehlen und analysieren.") self.build_ui() self.root.after(100, self.flush_log_queue) def build_ui(self) -> None: outer = ttk.Frame(self.root, padding=14) outer.pack(fill="both", expand=True) file_row = ttk.Frame(outer) file_row.pack(fill="x") ttk.Label(file_row, text="Quelle").pack(side="left") ttk.Entry(file_row, textvariable=self.input_var).pack(side="left", fill="x", expand=True, padx=8) ttk.Button(file_row, text="Auswaehlen", command=self.choose_input).pack(side="left") output_row = ttk.Frame(outer) output_row.pack(fill="x", pady=(8, 0)) ttk.Label(output_row, text="Ausgabe").pack(side="left") ttk.Entry(output_row, textvariable=self.output_var).pack(side="left", fill="x", expand=True, padx=8) ttk.Button(output_row, text="Ordner", command=self.choose_output).pack(side="left") button_row = ttk.Frame(outer) button_row.pack(fill="x", pady=12) self.analyze_button = ttk.Button(button_row, text="Analysieren", command=self.analyze) self.analyze_button.pack(side="left") self.start_button = ttk.Button(button_row, text="Encoding starten", command=self.start, state="disabled") self.start_button.pack(side="left", padx=8) ttk.Label(button_row, textvariable=self.status_var).pack(side="left", padx=12) columns = ("track", "stream", "language", "layout", "codec", "usage") self.tree = ttk.Treeview(outer, columns=columns, show="headings", height=8) headings = { "track": "Spur", "stream": "Stream", "language": "Sprache", "layout": "Format", "codec": "Codec", "usage": "Verwendung", } widths = {"track": 60, "stream": 80, "language": 160, "layout": 100, "codec": 120, "usage": 300} for col in columns: self.tree.heading(col, text=headings[col]) self.tree.column(col, width=widths[col], anchor="w") self.tree.pack(fill="x", pady=(0, 12)) self.summary = tk.Text(outer, height=7, wrap="word") self.summary.pack(fill="x", pady=(0, 12)) self.summary.configure(state="disabled") self.log_text = tk.Text(outer, wrap="word") self.log_text.pack(fill="both", expand=True) self.log_text.configure(state="disabled") def choose_input(self) -> None: path = filedialog.askopenfilename( title="Basis-Video auswaehlen", filetypes=[ ("Video-Dateien", "*.mov *.mxf *.mp4 *.mkv *.ts *.m2ts *.vob"), ("Alle Dateien", "*.*"), ], ) if path: self.input_var.set(path) self.start_button.configure(state="disabled") def choose_output(self) -> None: path = filedialog.askdirectory(title="Ausgabeordner auswaehlen", initialdir=self.output_var.get()) if path: self.output_var.set(path) def analyze(self) -> None: try: self.plan = build_plan(self.input_var.get(), self.output_var.get()) self.render_plan() self.start_button.configure(state="normal") self.status_var.set("Analyse fertig. Encoding kann gestartet werden.") except Exception as exc: self.plan = None self.start_button.configure(state="disabled") self.status_var.set("Analyse fehlgeschlagen.") messagebox.showerror("Analyse fehlgeschlagen", str(exc)) def render_plan(self) -> None: assert self.plan is not None for item in self.tree.get_children(): self.tree.delete(item) for role in self.plan.audio_roles: usage = [] if role == self.plan.selected_mp4_audio: usage.append("MP4 Stereo") if role.is_german: usage.append("DEU PCM-MOV") else: usage.append("OV PCM-MOV") self.tree.insert( "", "end", values=( role.list_index + 1, role.stream_index, role.display_language, role.layout_from_name or f"{role.detected_channels} Kan.", role.codec, ", ".join(usage), ), ) lines: list[str] = [] log_plan(self.plan, lines.append) self.summary.configure(state="normal") self.summary.delete("1.0", "end") self.summary.insert("end", "\n".join(lines)) self.summary.configure(state="disabled") def start(self) -> None: if self.worker and self.worker.is_alive(): return if self.plan is None: self.analyze() if self.plan is None: return self.start_button.configure(state="disabled") self.analyze_button.configure(state="disabled") self.status_var.set("Encoding laeuft...") self.clear_log() self.worker = threading.Thread(target=self.run_worker, daemon=True) self.worker.start() def run_worker(self) -> None: try: assert self.plan is not None run_plan(self.plan, self.log_queue.put) self.log_queue.put("\nERFOLGREICH abgeschlossen.") self.log_queue.put("__DONE__") except Exception as exc: self.log_queue.put(f"\nFEHLER: {exc}") self.log_queue.put("__FAILED__") def flush_log_queue(self) -> None: try: while True: message = self.log_queue.get_nowait() if message == "__DONE__": self.status_var.set("Fertig.") self.start_button.configure(state="normal") self.analyze_button.configure(state="normal") continue if message == "__FAILED__": self.status_var.set("Fehler.") self.start_button.configure(state="normal") self.analyze_button.configure(state="normal") continue self.append_log(message) except queue.Empty: pass self.root.after(100, self.flush_log_queue) def append_log(self, message: str) -> None: self.log_text.configure(state="normal") self.log_text.insert("end", message + "\n") self.log_text.see("end") self.log_text.configure(state="disabled") def clear_log(self) -> None: self.log_text.configure(state="normal") self.log_text.delete("1.0", "end") self.log_text.configure(state="disabled") def run_ui() -> int: if tk is None: return run_no_tk_fallback() root = tk.Tk() app = MezzanineApp(root) if len(sys.argv) > 1 and os.path.isfile(sys.argv[1]): app.input_var.set(sys.argv[1]) root.mainloop() return 0 def run_no_tk_fallback() -> int: if len(sys.argv) > 1 and os.path.isfile(sys.argv[1]): print("Tkinter ist nicht verfuegbar. Starte Drag-and-Drop-Datei direkt im CLI-Modus.") return run_cli(sys.argv[1], OUTPUT_BASE_DIR) print("Tkinter ist nicht verfuegbar. Oeffne Windows-Dateidialoge als Fallback.") selected = choose_paths_with_powershell() if selected is None: print("Keine Quelle ausgewaehlt.") return 1 input_file, output_dir = selected return run_cli(input_file, output_dir) def choose_paths_with_powershell() -> tuple[str, str] | None: script = r""" Add-Type -AssemblyName System.Windows.Forms $open = New-Object System.Windows.Forms.OpenFileDialog $open.Title = 'Basis-Video auswaehlen' $open.Filter = 'Video-Dateien (*.mov;*.mxf;*.mp4;*.mkv;*.ts;*.m2ts;*.vob)|*.mov;*.mxf;*.mp4;*.mkv;*.ts;*.m2ts;*.vob|Alle Dateien (*.*)|*.*' if ($open.ShowDialog() -ne [System.Windows.Forms.DialogResult]::OK) { exit 2 } $folder = New-Object System.Windows.Forms.FolderBrowserDialog $folder.Description = 'Ausgabeordner auswaehlen' $folder.SelectedPath = 'H:\VOD' if ($folder.ShowDialog() -ne [System.Windows.Forms.DialogResult]::OK) { exit 2 } Write-Output $open.FileName Write-Output $folder.SelectedPath """ result = subprocess.run( ["powershell", "-NoProfile", "-STA", "-Command", script], capture_output=True, text=True, check=False, ) if result.returncode != 0: return None lines = [line.strip() for line in result.stdout.splitlines() if line.strip()] if len(lines) < 2: return None return lines[0], lines[1] def main() -> int: if len(sys.argv) > 2 and sys.argv[1] == "--cli": return run_cli(sys.argv[2], OUTPUT_BASE_DIR) return run_ui() if __name__ == "__main__": raise SystemExit(main())