From 0a8741ea877f99ef3d241cd4e35ef22b4ae23ad9 Mon Sep 17 00:00:00 2001 From: Melbar Date: Thu, 7 May 2026 18:00:52 +0200 Subject: [PATCH] Add ProRes audio workflow and UI --- README.md | 89 +++- create_mezzanine.bat | 11 +- pvd_mezzanine.py | 1001 ++++++++++++++++++++++++++++++------------ 3 files changed, 810 insertions(+), 291 deletions(-) diff --git a/README.md b/README.md index 3fe1459..9396b64 100644 --- a/README.md +++ b/README.md @@ -1,42 +1,59 @@ # Amazon PVD Mezzanine Encoder -**Automatisches Python-Script zur Erstellung von Amazon PVD-konformen Mezzanine-VOD-Dateien** -(Blu-ray • ProRes • DVD • PAL • NTSC • Interlaced) +**Automatisches Python-Tool zur Erstellung von Amazon PVD-konformen Mezzanine-VOD-Dateien und Audio-Master-Dateien** +(ProRes • Blu-ray • DVD • PAL • NTSC • Interlaced) --- ## Features -- **Vollautomatische Erkennung** von SD/HD, PAL/NTSC und interlaced Material -- **Bester Deinterlacer** (`bwdif mode=0`) → saubere 25 fps bei PAL-DVDs (kein Double-Framerate) -- **Automatische Farbraum-Konvertierung** (Rec.601 PAL/NTSC ↔ Rec.709) inkl. `colorspace`-Filter -- **Forced Subtitles** werden automatisch eingebrannt (wenn `_forced.srt` vorhanden) -- **Intelligente Audio-Auswahl** (automatisch deutsche Spur oder manuelle Wahl) -- **Amazon PVD optimierte Parameter** (Bitrate, Level, GOP, x264-Settings) -- **Saubere Metadaten** + `faststart` für beste Streaming-Performance -- Funktioniert mit Drag & Drop (`.bat` oder direkt auf Script ziehen) +- **Vollautomatische Erkennung** von SD/HD, PAL/NTSC und interlaced Material +- **ProRes-Quellen mit mehreren Tonspuren** werden unterstuetzt +- **Audio-Sprache und Layout aus Dateinamen** (`DEU51_ENG20`, `DEU20`, usw.) +- **HD-PVD-MP4 mit 30 Mbit/s** und deutscher Stereo-Spur +- **Automatischer Downmix** von deutscher 5.1-Spur auf Stereo, wenn keine deutsche Stereo-Spur vorhanden ist +- **Separate PCM-Audio-MOVs** fuer deutsche Tonspuren und originalsprachige Tonspuren +- **Bester Deinterlacer** (`bwdif mode=0`) → saubere 25 fps bei PAL-DVDs (kein Double-Framerate) +- **Automatische Farbraum-Konvertierung** (Rec.601 PAL/NTSC ↔ Rec.709) inkl. `colorspace`-Filter +- **Forced Subtitles** werden automatisch eingebrannt (wenn `_forced.srt` vorhanden) +- **Informative Tkinter-UI** mit Analyse, Audio-Uebersicht und Encoding-Log +- **Amazon PVD optimierte Parameter** (Bitrate, Level, GOP, x264-Settings) +- **Saubere Metadaten** + `faststart` für beste Streaming-Performance +- Funktioniert mit Drag & Drop (`.bat` oder direkt auf Script ziehen) ## Voraussetzungen - Windows (getestet) - FFmpeg + FFprobe (in `C:\Software\` oder Pfad in Script anpassen) -- Python 3.8+ +- Python 3.10+ - Schreibzugriff auf den Ausgabeordner `H:\VOD` ## Installation & Nutzung 1. Repo klonen oder herunterladen 2. `ffmpeg.exe` und `ffprobe.exe` in `C:\Software\` ablegen (oder Pfade im Script ändern) -3. Optional: `create_mezzanine.bat` anlegen: - ```bat +3. Optional: `create_mezzanine.bat` anlegen: + ```bat @echo off - python "%~dp0pvd_mezzanine.py" %* + set "PYTHON_EXE=%LocalAppData%\Programs\Python\Python311\python.exe" + if exist "%PYTHON_EXE%" ( + "%PYTHON_EXE%" "%~dp0pvd_mezzanine.py" %* + ) else ( + py -3 "%~dp0pvd_mezzanine.py" %* + ) pause ``` -4. Videodatei per Drag & Drop auf `create_mezzanine.bat` ziehen oder direkt aus der Konsole starten: +4. Tool starten: ```powershell - python .\pvd_mezzanine.py "C:\Pfad\zur\Quelle.mkv" + .\create_mezzanine.bat ``` +5. In der UI Quelle auswaehlen, analysieren und Encoding starten. + +Fuer automatisierte CLI-Laeufe kann das Encoding direkt gestartet werden: + +```powershell +py -3 .\pvd_mezzanine.py --cli "C:\Pfad\zur\Quelle.mov" +``` ## Ausgabe @@ -46,12 +63,42 @@ Das Script schreibt die fertige Mezzanine-Datei standardmäßig nach: H:\VOD ``` -Der Dateiname wird automatisch aus dem Quellpfad bzw. Blu-ray-Projektordner gebildet und endet auf: +Der MP4-Dateiname wird automatisch aus dem Quellpfad bzw. Blu-ray-Projektordner gebildet und endet auf: ```text _DEU20_PVD.mp4 ``` +Bei ProRes-Quellen werden zusaetzlich Audio-MOVs erzeugt: + +```text +Titel_DEU_AUDIO_PCM.mov +Titel_OV_AUDIO_PCM.mov +``` + +Wenn nur deutsche Tonspuren vorhanden sind, wird nur `Titel_DEU_AUDIO_PCM.mov` erzeugt. + +## ProRes-Audio-Namen + +Die Sprache und das Tonformat der Tonspuren werden aus dem Dateinamen gelesen. Die Tokens stehen in der Reihenfolge der Audiospuren im ProRes. + +Beispiele: + +```text +Film_DEU51_ENG20.mov +``` + +- Spur 1: Deutsch 5.1 +- Spur 2: Englisch 2.0 + +```text +Film_DEU20_DEU51.mov +``` + +- Spur 1: Deutsch 2.0 +- Spur 2: Deutsch 5.1 +- Es wird nur eine deutsche PCM-Audio-MOV erzeugt. + ## Untertitel Forced Subtitles werden automatisch eingebrannt, wenn neben der Quelle eine gleichnamige Datei mit dem Suffix `_forced.srt` liegt. @@ -63,9 +110,13 @@ Film.mkv Film_forced.srt ``` -## Audio-Auswahl +## Audio-Ausgabe -Das Script bevorzugt automatisch eine deutsche Audiospur (`ger` oder `deu`). Wenn keine deutsche Spur erkannt wird, fragt es interaktiv nach der gewünschten Spur. +- Das PVD-MP4 bekommt immer deutschen Stereo-Ton. +- Wenn eine deutsche 2.0-Spur vorhanden ist, wird sie verwendet. +- Wenn keine deutsche 2.0-Spur vorhanden ist, wird die erste deutsche Spur auf Stereo heruntergemischt. +- Alle deutschen Tonspuren werden als uncompressed PCM in eine separate MOV geschrieben. +- Alle nicht-deutschen Tonspuren werden als uncompressed PCM in eine separate MOV geschrieben. ## Konfiguration diff --git a/create_mezzanine.bat b/create_mezzanine.bat index 412408b..4554411 100644 --- a/create_mezzanine.bat +++ b/create_mezzanine.bat @@ -1,3 +1,8 @@ -@echo off -python "%~dp0pvd_mezzanine.py" %* -pause \ No newline at end of file +@echo off +set "PYTHON_EXE=%LocalAppData%\Programs\Python\Python311\python.exe" +if exist "%PYTHON_EXE%" ( + "%PYTHON_EXE%" "%~dp0pvd_mezzanine.py" %* +) else ( + py -3 "%~dp0pvd_mezzanine.py" %* +) +pause diff --git a/pvd_mezzanine.py b/pvd_mezzanine.py index d913c9b..0787d73 100644 --- a/pvd_mezzanine.py +++ b/pvd_mezzanine.py @@ -1,269 +1,732 @@ -import os -import sys -import json -import subprocess -import math -import re -from pathlib import Path - -# ============================================================================= -# 1. KONFIGURATION UND PFADE -# ============================================================================= -FFMPEG_EXE = r"C:\Software\ffmpeg.exe" -FFPROBE_EXE = r"C:\Software\ffprobe.exe" -OUTPUT_BASE_DIR = r"H:\VOD" - - -def get_pvd_filename(input_path: str) -> str: - """TITEL-EXTRAKTION (Blu-ray / ProRes / DVD)""" - path = Path(input_path) - path_parts = [p.upper() for p in path.parts] - - if "BDMV" in path_parts: - bdmv_index = path_parts.index("BDMV") - try: - project_folder = path_parts[bdmv_index - 4] - except IndexError: - project_folder = path_parts[bdmv_index - 1] if bdmv_index > 0 else "UNKNOWN" - - clean_name = re.sub(r'^BEST_', '', project_folder, flags=re.IGNORECASE) - words = [w.capitalize() for w in clean_name.split('_') if w.strip()] - extracted_title = "".join(words) - else: - extracted_title = path.stem - - extracted_title = re.sub(r'[^A-Za-z0-9]', '', extracted_title) - if not extracted_title: - extracted_title = "UNKNOWN_TITLE" - - return f"{extracted_title}_DEU20_PVD.mp4" - - -def probe_metadata(filepath: str) -> dict: - cmd = [ - FFPROBE_EXE, "-v", "quiet", "-print_format", "json", - "-show_streams", "-show_format", filepath - ] - result = subprocess.run(cmd, capture_output=True, text=True, check=False) - if result.returncode != 0: - print(f"CRITICAL ERROR: ffprobe konnte die Datei nicht lesen.\n{result.stderr}") - sys.exit(1) - return json.loads(result.stdout) - - -def main(): - if len(sys.argv) < 2: - print("ERROR: Ziehe eine Datei auf dieses Script oder die verknüpfte .bat") - os.system("pause") - sys.exit(1) - - input_file = sys.argv[1] - if not os.path.isfile(input_file): - print(f"ERROR: Eingabedatei nicht gefunden: {input_file}") - os.system("pause") - sys.exit(1) - - os.makedirs(OUTPUT_BASE_DIR, exist_ok=True) - - # --- SCHRITT 1: ANALYSE --- - print(f"\n[1/5] Analysiere Quelle: {os.path.basename(input_file)}") - metadata = probe_metadata(input_file) - - v_stream = next((s for s in metadata['streams'] if s['codec_type'] == 'video'), None) - a_streams = [s for s in metadata['streams'] if s['codec_type'] == 'audio'] - - if not v_stream: - print("Fehler: Kein Videostream gefunden!") - sys.exit(1) - if not a_streams: - print("Fehler: Keine Audio-Tracks gefunden!") - sys.exit(1) - - # === AUFLÖSUNG + PAL/NTSC + INTERLACED-ERKENNUNG + FARBRAUM === - width = int(v_stream.get('width', 0)) - height = int(v_stream.get('height', 0)) - fps_raw = v_stream.get('r_frame_rate', '24/1') - num, den = map(int, fps_raw.split('/')) - fps = num / den - keyint = math.ceil(fps) - - # SD / PAL / NTSC Erkennung - if height <= 576: - is_ntsc = (height <= 480 and fps >= 29.0) - print(f" -> Auflösung: {width}x{height} @ {fps:.3f} fps → {'NTSC' if is_ntsc else 'PAL'} SD-Material") - is_sd = True - else: - is_ntsc = False - is_sd = False - print(f" -> Auflösung: {width}x{height} @ {fps:.3f} fps → HD-Material") - - print(f" -> Keyint (1 Sek. GOP): {keyint}") - - # === INTERLACED-ERKENNUNG FÜR DVD === - field_order = v_stream.get('field_order', 'unknown') - scan_type = v_stream.get('scan_type', 'unknown') - is_interlaced = (field_order != 'progressive' and field_order != 'unknown') or scan_type == 'interlaced' - - if is_interlaced: - print(f" → **Interlaced erkannt** (field_order={field_order}, scan_type={scan_type}) → Deinterlacing mit bwdif (beste Qualität, 25 fps) aktiviert") - else: - print(" → Progressive Quelle → kein Deinterlacing nötig") - - # Ziel-Farbraum (Rec.601 PAL/NTSC vs Rec.709) - if is_sd: - if is_ntsc: - target_prim = "smpte170m" - target_trc = "smpte170m" - target_space = "smpte170m" - target_name = "Rec.601 NTSC (smpte170m)" - else: - target_prim = "bt470bg" - target_trc = "bt470bg" - target_space = "bt470bg" - target_name = "Rec.601 PAL (bt470bg)" - else: - target_prim = "bt709" - target_trc = "bt709" - target_space = "bt709" - target_name = "Rec.709 HD" - - # Quell-Farbraum prüfen + echte Konvertierung - source_prim = v_stream.get('color_primaries', 'unknown') - source_trc = v_stream.get('color_transfer', 'unknown') - source_space = v_stream.get('color_space', 'unknown') - - needs_conversion = ( - source_prim == 'unknown' or - source_prim != target_prim or - source_trc != target_trc or - source_space != target_space - ) - - if needs_conversion: - print(f" → Farbkonvertierung AKTIVIERT → Ziel: {target_name}") - else: - print(f" → Quell-Farbraum passt perfekt → keine Konvertierung") - - # --- SCHRITT 2: AUDIO-WAHL --- - print("\n[2/5] Vorhandene Audio-Tracks:") - for i, s in enumerate(a_streams): - lang = s.get('tags', {}).get('language', 'und').upper() - ch = s.get('channels', 2) - codec = s.get('codec_name', 'pcm') - print(f" [{i}] Index {s['index']}: {lang} ({ch} Kanäle, {codec})") - - selected_audio_list_idx = next( - (i for i, s in enumerate(a_streams) if s.get('tags', {}).get('language', 'und').lower() in ('ger', 'deu')), - None - ) - - if selected_audio_list_idx is None: - choice = input("\nKeine deutsche Spur automatisch erkannt.\nWelche Spur ist die deutsche Master-Spur? [Standard: 0]: ") or "0" - try: - selected_audio_list_idx = int(choice) - if not 0 <= selected_audio_list_idx < len(a_streams): - raise ValueError - except Exception: - print("Ungültige Eingabe → verwende Spur 0.") - selected_audio_list_idx = 0 - - selected_audio_idx = a_streams[selected_audio_list_idx]['index'] - print(f" → Gewählte Audio-Spur: Index {selected_audio_idx}") - - # --- SCHRITT 3: VF-FILTER (Deinterlace + Farbe + Subs) --- - output_name = get_pvd_filename(input_file) - output_path = os.path.join(OUTPUT_BASE_DIR, output_name) - - forced_srt = os.path.splitext(input_file)[0] + "_forced.srt" - vf_filters = [] - - # 1. Deinterlacing mit bwdif (beste Qualität, behält originale Framerate = 25 fps bei PAL) - if is_interlaced: - vf_filters.append("bwdif=mode=0:parity=auto") - - # 2. Farbkonvertierung (falls nötig) - if needs_conversion: - conv = (f"colorspace=primaries={target_prim}:trc={target_trc}:matrix={target_space}:range=tv") - vf_filters.append(conv) - - # 3. Forced SRT Burn-In - if os.path.exists(forced_srt): - print(f"\n[3/5] Forced SRT gefunden → wird eingebrannt: {os.path.basename(forced_srt)}") - clean_srt_path = forced_srt.replace("\\", "/").replace(":", "\\:") - vf_filters.append(f"subtitles='{clean_srt_path}':force_style='Fontsize=14,MarginV=38'") - else: - print("\n[3/5] Keine Forced SRT gefunden → Burn-In übersprungen.") - - # --- SCHRITT 4: ENCODING --- - print(f"\n[4/5] Starte Encoding → {output_name}") - print(f" → Ziel-Farbraum: {target_name}") - - if is_sd: - bv = "8M" - maxr = "10M" - bufs = "15M" - lvl = "3.1" - print(" → SD-Parameter (Bitrate + Level)") - else: - bv = "28M" - maxr = "35M" - bufs = "50M" - lvl = "4.1" - print(" → HD-Parameter (Bitrate + Level)") - - ffmpeg_cmd = [ - FFMPEG_EXE, "-hide_banner", "-y", "-i", input_file, - "-map", f"0:{v_stream['index']}", - "-map", f"0:{selected_audio_idx}", - ] - - if vf_filters: - vf_str = ",".join(vf_filters) - ffmpeg_cmd.extend(["-vf", vf_str]) - - ffmpeg_cmd.extend([ - "-c:v", "libx264", - "-profile:v", "high", - "-level", lvl, - "-pix_fmt", "yuv420p", - "-b:v", bv, - "-maxrate", maxr, - "-bufsize", bufs, - "-preset", "slow", - "-tune", "film", - "-x264-params", f"keyint={keyint}:min-keyint=2:scenecut=40:bframes=3:aq-mode=2", - "-color_primaries", target_prim, - "-color_trc", target_trc, - "-colorspace", target_space, - ]) - - ffmpeg_cmd.extend([ - "-c:a", "aac", - "-b:a", "256k", - "-ac", "2", - "-ar", "48000", - "-f", "mp4", - "-movflags", "+faststart", - "-metadata:s:a:0", "language=ger", - "-metadata:s:v:0", "language=ger", - "-map_chapters", "-1", - "-map_metadata", "-1", - "-avoid_negative_ts", "make_zero", - output_path - ]) - - print("\n--- FFmpeg Kommando ---") - print(" ".join(ffmpeg_cmd)) - print("------------------------\n") - - result = subprocess.run(ffmpeg_cmd, check=False) - if result.returncode == 0: - print(f"\n[5/5] ✅ ERFOLGREICH abgeschlossen!\nDatei: {output_path}") - else: - print(f"\n[5/5] ❌ FFmpeg-Fehler (Code {result.returncode})") - - os.system("pause") - - -if __name__ == "__main__": - main() \ No newline at end of file +from __future__ import annotations + +import json +import math +import os +import queue +import re +import subprocess +import sys +import threading +from dataclasses import dataclass +from pathlib import Path +from typing import Callable + +try: + import tkinter as tk + from tkinter import filedialog, messagebox, ttk +except Exception: + tk = None + filedialog = None + messagebox = None + ttk = None + + +# ============================================================================= +# 1. KONFIGURATION UND PFADE +# ============================================================================= +FFMPEG_EXE = r"C:\Software\ffmpeg.exe" +FFPROBE_EXE = r"C:\Software\ffprobe.exe" +OUTPUT_BASE_DIR = r"H:\VOD" + +GERMAN_LANGS = {"DEU", "GER"} +LANGUAGE_NAMES = { + "ARA": "Arabisch", + "CHI": "Chinesisch", + "DEU": "Deutsch", + "DUT": "Niederlaendisch", + "ENG": "Englisch", + "FRA": "Franzoesisch", + "FRE": "Franzoesisch", + "GER": "Deutsch", + "ITA": "Italienisch", + "JPN": "Japanisch", + "KOR": "Koreanisch", + "POL": "Polnisch", + "POR": "Portugiesisch", + "RUS": "Russisch", + "SPA": "Spanisch", +} + + +@dataclass +class AudioRole: + stream_index: int + list_index: int + language: str + channels_from_name: int | None + layout_from_name: str | None + detected_channels: int + codec: str + + @property + def is_german(self) -> bool: + return self.language in GERMAN_LANGS + + @property + def display_language(self) -> str: + return LANGUAGE_NAMES.get(self.language, self.language) + + @property + def effective_channels(self) -> int: + return self.channels_from_name or self.detected_channels + + +@dataclass +class VideoProfile: + width: int + height: int + fps: float + keyint: int + is_sd: bool + is_ntsc: bool + is_interlaced: bool + target_prim: str + target_trc: str + target_space: str + target_name: str + needs_conversion: bool + + +@dataclass +class JobPlan: + input_file: str + output_dir: str + video_stream: dict + audio_streams: list[dict] + audio_roles: list[AudioRole] + video_profile: VideoProfile + pvd_mp4: str + german_mov: str | None + original_mov: str | None + selected_mp4_audio: AudioRole + forced_srt: str | None + commands: list[tuple[str, list[str]]] + + +def get_pvd_filename(input_path: str) -> str: + """TITEL-EXTRAKTION (Blu-ray / ProRes / DVD).""" + path = Path(input_path) + path_parts = [p.upper() for p in path.parts] + + if "BDMV" in path_parts: + bdmv_index = path_parts.index("BDMV") + try: + project_folder = path_parts[bdmv_index - 4] + except IndexError: + project_folder = path_parts[bdmv_index - 1] if bdmv_index > 0 else "UNKNOWN" + + clean_name = re.sub(r"^BEST_", "", project_folder, flags=re.IGNORECASE) + words = [w.capitalize() for w in clean_name.split("_") if w.strip()] + extracted_title = "".join(words) + else: + extracted_title = strip_audio_tokens(path.stem) + + extracted_title = re.sub(r"[^A-Za-z0-9]", "", extracted_title) + if not extracted_title: + extracted_title = "UNKNOWN_TITLE" + + return f"{extracted_title}_DEU20_PVD.mp4" + + +def strip_audio_tokens(stem: str) -> str: + cleaned = stem + token_pattern = re.compile(r"(?i)(^|[_\-\s])([A-Z]{3}[1-8][0-9])(?=$|[_\-\s])") + while True: + next_cleaned = token_pattern.sub(r"\1", cleaned) + if next_cleaned == cleaned: + break + cleaned = next_cleaned + cleaned = re.sub(r"[_\-\s]+", "_", cleaned).strip("_- ") + return cleaned or stem + + +def safe_output_stem(input_path: str) -> str: + stem = strip_audio_tokens(Path(input_path).stem) + stem = re.sub(r"[^A-Za-z0-9]+", "_", stem).strip("_") + return stem or "UNKNOWN_TITLE" + + +def probe_metadata(filepath: str) -> dict: + cmd = [ + FFPROBE_EXE, + "-v", + "quiet", + "-print_format", + "json", + "-show_streams", + "-show_format", + filepath, + ] + result = subprocess.run(cmd, capture_output=True, text=True, check=False) + if result.returncode != 0: + raise RuntimeError(f"ffprobe konnte die Datei nicht lesen.\n{result.stderr}") + return json.loads(result.stdout) + + +def parse_fps(rate: str) -> float: + if not rate or rate == "0/0": + return 25.0 + if "/" in rate: + num, den = map(int, rate.split("/", 1)) + return num / den if den else float(num) + return float(rate) + + +def analyze_video(v_stream: dict) -> VideoProfile: + width = int(v_stream.get("width", 0)) + height = int(v_stream.get("height", 0)) + fps = parse_fps(v_stream.get("avg_frame_rate") or v_stream.get("r_frame_rate", "25/1")) + keyint = max(1, math.ceil(fps)) + + is_sd = height <= 576 + is_ntsc = is_sd and height <= 480 and fps >= 29.0 + + field_order = v_stream.get("field_order", "unknown") + scan_type = v_stream.get("scan_type", "unknown") + is_interlaced = ( + field_order not in ("progressive", "unknown") + or scan_type == "interlaced" + ) + + if is_sd and is_ntsc: + target_prim = "smpte170m" + target_trc = "smpte170m" + target_space = "smpte170m" + target_name = "Rec.601 NTSC (smpte170m)" + elif is_sd: + target_prim = "bt470bg" + target_trc = "bt470bg" + target_space = "bt470bg" + target_name = "Rec.601 PAL (bt470bg)" + else: + target_prim = "bt709" + target_trc = "bt709" + target_space = "bt709" + target_name = "Rec.709 HD" + + source_prim = v_stream.get("color_primaries", "unknown") + source_trc = v_stream.get("color_transfer", "unknown") + source_space = v_stream.get("color_space", "unknown") + needs_conversion = ( + source_prim == "unknown" + or source_prim != target_prim + or source_trc != target_trc + or source_space != target_space + ) + + return VideoProfile( + width=width, + height=height, + fps=fps, + keyint=keyint, + is_sd=is_sd, + is_ntsc=is_ntsc, + is_interlaced=is_interlaced, + target_prim=target_prim, + target_trc=target_trc, + target_space=target_space, + target_name=target_name, + needs_conversion=needs_conversion, + ) + + +def parse_audio_tokens(input_path: str) -> list[tuple[str, int, str]]: + """Reads tokens like DEU51_ENG20 from the filename in stream order.""" + stem = Path(input_path).stem.upper() + roles = [] + for match in re.finditer(r"(? list[AudioRole]: + tokens = parse_audio_tokens(input_path) + roles = [] + for list_index, stream in enumerate(audio_streams): + token = tokens[list_index] if list_index < len(tokens) else None + tag_lang = stream.get("tags", {}).get("language", "und").upper() + language = token[0] if token else normalize_language(tag_lang) + channels_from_name = token[1] if token else None + layout_from_name = token[2] if token else None + roles.append( + AudioRole( + stream_index=int(stream["index"]), + list_index=list_index, + language=language, + channels_from_name=channels_from_name, + layout_from_name=layout_from_name, + detected_channels=int(stream.get("channels", 2)), + codec=stream.get("codec_name", "unknown"), + ) + ) + return roles + + +def normalize_language(language: str) -> str: + lang = (language or "UND").upper() + if lang == "GER": + return "DEU" + return lang[:3] + + +def select_mp4_german_audio(audio_roles: list[AudioRole]) -> AudioRole: + german_roles = [role for role in audio_roles if role.is_german] + if not german_roles: + raise RuntimeError("Keine deutsche Audiospur gefunden. Dateiname muss z.B. DEU51 oder DEU20 enthalten.") + stereo = [role for role in german_roles if role.effective_channels == 2] + return stereo[0] if stereo else german_roles[0] + + +def build_plan(input_file: str, output_dir: str = OUTPUT_BASE_DIR) -> JobPlan: + if not os.path.isfile(input_file): + raise FileNotFoundError(f"Eingabedatei nicht gefunden: {input_file}") + + metadata = probe_metadata(input_file) + video_stream = next((s for s in metadata["streams"] if s["codec_type"] == "video"), None) + audio_streams = [s for s in metadata["streams"] if s["codec_type"] == "audio"] + if not video_stream: + raise RuntimeError("Kein Videostream gefunden.") + if not audio_streams: + raise RuntimeError("Keine Audio-Tracks gefunden.") + + video_profile = analyze_video(video_stream) + audio_roles = build_audio_roles(input_file, audio_streams) + selected_mp4_audio = select_mp4_german_audio(audio_roles) + + output_base = Path(output_dir) + pvd_mp4 = str(output_base / get_pvd_filename(input_file)) + output_stem = safe_output_stem(input_file) + german_roles = [role for role in audio_roles if role.is_german] + original_roles = [role for role in audio_roles if not role.is_german] + german_mov = str(output_base / f"{output_stem}_DEU_AUDIO_PCM.mov") if german_roles else None + original_mov = str(output_base / f"{output_stem}_OV_AUDIO_PCM.mov") if original_roles else None + forced_srt_path = os.path.splitext(input_file)[0] + "_forced.srt" + forced_srt = forced_srt_path if os.path.exists(forced_srt_path) else None + + plan = JobPlan( + input_file=input_file, + output_dir=output_dir, + video_stream=video_stream, + audio_streams=audio_streams, + audio_roles=audio_roles, + video_profile=video_profile, + pvd_mp4=pvd_mp4, + german_mov=german_mov, + original_mov=original_mov, + selected_mp4_audio=selected_mp4_audio, + forced_srt=forced_srt, + commands=[], + ) + plan.commands = build_commands(plan) + return plan + + +def build_video_filters(plan: JobPlan) -> list[str]: + filters = [] + profile = plan.video_profile + if profile.is_interlaced: + filters.append("bwdif=mode=0:parity=auto") + if profile.needs_conversion: + filters.append( + "colorspace=" + f"primaries={profile.target_prim}:" + f"trc={profile.target_trc}:" + f"matrix={profile.target_space}:range=tv" + ) + if plan.forced_srt: + clean_srt_path = plan.forced_srt.replace("\\", "/").replace(":", "\\:") + filters.append(f"subtitles='{clean_srt_path}':force_style='Fontsize=14,MarginV=38'") + return filters + + +def build_commands(plan: JobPlan) -> list[tuple[str, list[str]]]: + commands = [("Amazon PVD MP4", build_pvd_mp4_command(plan))] + german_roles = [role for role in plan.audio_roles if role.is_german] + original_roles = [role for role in plan.audio_roles if not role.is_german] + if german_roles and plan.german_mov: + commands.append(("Deutsche Audio-MOV", build_audio_mov_command(plan, german_roles, plan.german_mov))) + if original_roles and plan.original_mov: + commands.append(("Originalsprachige Audio-MOV", build_audio_mov_command(plan, original_roles, plan.original_mov))) + return commands + + +def build_pvd_mp4_command(plan: JobPlan) -> list[str]: + profile = plan.video_profile + selected_audio = plan.selected_mp4_audio + if profile.is_sd: + bv = "8M" + maxr = "10M" + bufs = "15M" + level = "3.1" + else: + bv = "30M" + maxr = "35M" + bufs = "50M" + level = "4.1" + + cmd = [ + FFMPEG_EXE, + "-hide_banner", + "-y", + "-i", + plan.input_file, + "-map", + f"0:{plan.video_stream['index']}", + "-map", + f"0:{selected_audio.stream_index}", + ] + filters = build_video_filters(plan) + if filters: + cmd.extend(["-vf", ",".join(filters)]) + + cmd.extend( + [ + "-c:v", + "libx264", + "-profile:v", + "high", + "-level", + level, + "-pix_fmt", + "yuv420p", + "-b:v", + bv, + "-maxrate", + maxr, + "-bufsize", + bufs, + "-preset", + "slow", + "-tune", + "film", + "-x264-params", + f"keyint={profile.keyint}:min-keyint=2:scenecut=40:bframes=3:aq-mode=2", + "-color_primaries", + profile.target_prim, + "-color_trc", + profile.target_trc, + "-colorspace", + profile.target_space, + "-c:a", + "aac", + "-b:a", + "256k", + "-ac:a:0", + "2", + "-ar:a:0", + "48000", + "-f", + "mp4", + "-movflags", + "+faststart", + "-metadata:s:a:0", + "language=deu", + "-metadata:s:v:0", + "language=deu", + "-map_chapters", + "-1", + "-map_metadata", + "-1", + "-avoid_negative_ts", + "make_zero", + plan.pvd_mp4, + ] + ) + return cmd + + +def build_audio_mov_command(plan: JobPlan, roles: list[AudioRole], output_path: str) -> list[str]: + cmd = [FFMPEG_EXE, "-hide_banner", "-y", "-i", plan.input_file] + for role in roles: + cmd.extend(["-map", f"0:{role.stream_index}"]) + cmd.extend(["-vn", "-map_chapters", "-1", "-map_metadata", "-1"]) + + for idx, role in enumerate(roles): + cmd.extend([f"-c:a:{idx}", "pcm_s24le", f"-ar:a:{idx}", "48000"]) + cmd.extend([f"-metadata:s:a:{idx}", f"language={role.language.lower()}"]) + title = role.display_language + if role.layout_from_name: + title = f"{title} {role.layout_from_name}" + cmd.extend([f"-metadata:s:a:{idx}", f"title={title}"]) + + cmd.extend(["-f", "mov", output_path]) + return cmd + + +def run_command(cmd: list[str], log: Callable[[str], None]) -> int: + log(" ".join(f'"{part}"' if " " in part else part for part in cmd)) + process = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + bufsize=1, + ) + assert process.stdout is not None + for line in process.stdout: + log(line.rstrip()) + return process.wait() + + +def run_plan(plan: JobPlan, log: Callable[[str], None] = print) -> None: + os.makedirs(plan.output_dir, exist_ok=True) + log_plan(plan, log) + for label, cmd in plan.commands: + log("") + log(f"Starte: {label}") + returncode = run_command(cmd, log) + if returncode != 0: + raise RuntimeError(f"{label} fehlgeschlagen (FFmpeg-Code {returncode}).") + log(f"Fertig: {label}") + + +def log_plan(plan: JobPlan, log: Callable[[str], None] = print) -> None: + profile = plan.video_profile + log(f"Quelle: {plan.input_file}") + log(f"Video: {profile.width}x{profile.height} @ {profile.fps:.3f} fps, {profile.target_name}") + log(f"MP4: {plan.pvd_mp4}") + if plan.german_mov: + log(f"Deutsche Audio-MOV: {plan.german_mov}") + if plan.original_mov: + log(f"Originalsprachige Audio-MOV: {plan.original_mov}") + log("Audio-Tracks:") + for role in plan.audio_roles: + layout = role.layout_from_name or f"{role.detected_channels}.0?" + selected = " -> MP4" if role == plan.selected_mp4_audio else "" + log( + f" Spur {role.list_index + 1}: Stream {role.stream_index}, " + f"{role.display_language}, {layout}, {role.codec}{selected}" + ) + if not plan.original_mov: + log("Nur deutsche Tonspuren erkannt; originalsprachige Audio-MOV wird nicht erzeugt.") + if plan.selected_mp4_audio.effective_channels != 2: + log("MP4-Ton: keine deutsche Stereo-Spur gefunden, FFmpeg downmixt die gewaehlte deutsche Spur auf Stereo.") + + +def run_cli(input_file: str, output_dir: str = OUTPUT_BASE_DIR) -> int: + try: + plan = build_plan(input_file, output_dir) + run_plan(plan) + print("\nERFOLGREICH abgeschlossen.") + return 0 + except Exception as exc: + print(f"\nFEHLER: {exc}") + return 1 + finally: + if sys.stdin.isatty(): + os.system("pause") + + +class MezzanineApp: + def __init__(self, root: tk.Tk): + self.root = root + self.root.title("Amazon PVD Mezzanine Encoder") + self.root.geometry("980x680") + self.log_queue: queue.Queue[str] = queue.Queue() + self.worker: threading.Thread | None = None + self.plan: JobPlan | None = None + + self.input_var = tk.StringVar() + self.output_var = tk.StringVar(value=OUTPUT_BASE_DIR) + self.status_var = tk.StringVar(value="Quelle auswaehlen und analysieren.") + + self.build_ui() + self.root.after(100, self.flush_log_queue) + + def build_ui(self) -> None: + outer = ttk.Frame(self.root, padding=14) + outer.pack(fill="both", expand=True) + + file_row = ttk.Frame(outer) + file_row.pack(fill="x") + ttk.Label(file_row, text="Quelle").pack(side="left") + ttk.Entry(file_row, textvariable=self.input_var).pack(side="left", fill="x", expand=True, padx=8) + ttk.Button(file_row, text="Auswaehlen", command=self.choose_input).pack(side="left") + + output_row = ttk.Frame(outer) + output_row.pack(fill="x", pady=(8, 0)) + ttk.Label(output_row, text="Ausgabe").pack(side="left") + ttk.Entry(output_row, textvariable=self.output_var).pack(side="left", fill="x", expand=True, padx=8) + ttk.Button(output_row, text="Ordner", command=self.choose_output).pack(side="left") + + button_row = ttk.Frame(outer) + button_row.pack(fill="x", pady=12) + self.analyze_button = ttk.Button(button_row, text="Analysieren", command=self.analyze) + self.analyze_button.pack(side="left") + self.start_button = ttk.Button(button_row, text="Encoding starten", command=self.start, state="disabled") + self.start_button.pack(side="left", padx=8) + ttk.Label(button_row, textvariable=self.status_var).pack(side="left", padx=12) + + columns = ("track", "stream", "language", "layout", "codec", "usage") + self.tree = ttk.Treeview(outer, columns=columns, show="headings", height=8) + headings = { + "track": "Spur", + "stream": "Stream", + "language": "Sprache", + "layout": "Format", + "codec": "Codec", + "usage": "Verwendung", + } + widths = {"track": 60, "stream": 80, "language": 160, "layout": 100, "codec": 120, "usage": 300} + for col in columns: + self.tree.heading(col, text=headings[col]) + self.tree.column(col, width=widths[col], anchor="w") + self.tree.pack(fill="x", pady=(0, 12)) + + self.summary = tk.Text(outer, height=7, wrap="word") + self.summary.pack(fill="x", pady=(0, 12)) + self.summary.configure(state="disabled") + + self.log_text = tk.Text(outer, wrap="word") + self.log_text.pack(fill="both", expand=True) + self.log_text.configure(state="disabled") + + def choose_input(self) -> None: + path = filedialog.askopenfilename( + title="Basis-Video auswaehlen", + filetypes=[ + ("Video-Dateien", "*.mov *.mxf *.mp4 *.mkv *.ts *.m2ts *.vob"), + ("Alle Dateien", "*.*"), + ], + ) + if path: + self.input_var.set(path) + self.start_button.configure(state="disabled") + + def choose_output(self) -> None: + path = filedialog.askdirectory(title="Ausgabeordner auswaehlen", initialdir=self.output_var.get()) + if path: + self.output_var.set(path) + + def analyze(self) -> None: + try: + self.plan = build_plan(self.input_var.get(), self.output_var.get()) + self.render_plan() + self.start_button.configure(state="normal") + self.status_var.set("Analyse fertig. Encoding kann gestartet werden.") + except Exception as exc: + self.plan = None + self.start_button.configure(state="disabled") + self.status_var.set("Analyse fehlgeschlagen.") + messagebox.showerror("Analyse fehlgeschlagen", str(exc)) + + def render_plan(self) -> None: + assert self.plan is not None + for item in self.tree.get_children(): + self.tree.delete(item) + for role in self.plan.audio_roles: + usage = [] + if role == self.plan.selected_mp4_audio: + usage.append("MP4 Stereo") + if role.is_german: + usage.append("DEU PCM-MOV") + else: + usage.append("OV PCM-MOV") + self.tree.insert( + "", + "end", + values=( + role.list_index + 1, + role.stream_index, + role.display_language, + role.layout_from_name or f"{role.detected_channels} Kan.", + role.codec, + ", ".join(usage), + ), + ) + + lines: list[str] = [] + log_plan(self.plan, lines.append) + self.summary.configure(state="normal") + self.summary.delete("1.0", "end") + self.summary.insert("end", "\n".join(lines)) + self.summary.configure(state="disabled") + + def start(self) -> None: + if self.worker and self.worker.is_alive(): + return + if self.plan is None: + self.analyze() + if self.plan is None: + return + self.start_button.configure(state="disabled") + self.analyze_button.configure(state="disabled") + self.status_var.set("Encoding laeuft...") + self.clear_log() + self.worker = threading.Thread(target=self.run_worker, daemon=True) + self.worker.start() + + def run_worker(self) -> None: + try: + assert self.plan is not None + run_plan(self.plan, self.log_queue.put) + self.log_queue.put("\nERFOLGREICH abgeschlossen.") + self.log_queue.put("__DONE__") + except Exception as exc: + self.log_queue.put(f"\nFEHLER: {exc}") + self.log_queue.put("__FAILED__") + + def flush_log_queue(self) -> None: + try: + while True: + message = self.log_queue.get_nowait() + if message == "__DONE__": + self.status_var.set("Fertig.") + self.start_button.configure(state="normal") + self.analyze_button.configure(state="normal") + continue + if message == "__FAILED__": + self.status_var.set("Fehler.") + self.start_button.configure(state="normal") + self.analyze_button.configure(state="normal") + continue + self.append_log(message) + except queue.Empty: + pass + self.root.after(100, self.flush_log_queue) + + def append_log(self, message: str) -> None: + self.log_text.configure(state="normal") + self.log_text.insert("end", message + "\n") + self.log_text.see("end") + self.log_text.configure(state="disabled") + + def clear_log(self) -> None: + self.log_text.configure(state="normal") + self.log_text.delete("1.0", "end") + self.log_text.configure(state="disabled") + + +def run_ui() -> int: + if tk is None: + print("Tkinter ist nicht verfuegbar. Bitte Datei per CLI uebergeben.") + return 1 + root = tk.Tk() + app = MezzanineApp(root) + if len(sys.argv) > 1 and os.path.isfile(sys.argv[1]): + app.input_var.set(sys.argv[1]) + root.mainloop() + return 0 + + +def main() -> int: + if len(sys.argv) > 2 and sys.argv[1] == "--cli": + return run_cli(sys.argv[2], OUTPUT_BASE_DIR) + return run_ui() + + +if __name__ == "__main__": + raise SystemExit(main())