Files
pvd/pvd_mezzanine.py
T
Melbar e8a553c08a fix: Ausgabedateinamen vereinheitlichen – Titel ohne Unterstriche
safe_output_stem und get_pvd_filename nutzten unterschiedliche Logik,
sodass MP4 und MOV-Dateien inkonsistente Titel hatten. Neue gemeinsame
Funktion extract_title stellt sicher, dass alle drei Ausgabedateien
denselben Titel ohne Unterstriche verwenden.

README und CLAUDE.md aktualisiert.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-08 15:48:41 +02:00

934 lines
30 KiB
Python

from __future__ import annotations
import configparser
import json
import math
import os
import queue
import re
import shutil
import subprocess
import sys
import threading
from dataclasses import dataclass
from pathlib import Path
from typing import Callable
try:
import tkinter as tk
from tkinter import filedialog, messagebox, ttk
except Exception:
tk = None
filedialog = None
messagebox = None
ttk = None
# =============================================================================
# 1. KONFIGURATION UND PFADE
# =============================================================================
APP_DIR = Path(__file__).resolve().parent
CONFIG_PATH = APP_DIR / "config.ini"
DEFAULT_CONFIG = {
"ffmpeg": {
"ffmpeg_exe": "",
"ffprobe_exe": "",
"search_dirs": [
r"C:\Tools\FFMPEG",
r"C:\Software",
],
},
"output": {
"base_dir": "",
"preferred_dirs": [
r"F:\VOD",
r"H:\VOD",
],
},
"video": {
"hd_bitrate": "30M",
"hd_maxrate": "35M",
"hd_bufsize": "50M",
"hd_level": "4.1",
"sd_bitrate": "8M",
"sd_maxrate": "10M",
"sd_bufsize": "15M",
"sd_level": "3.1",
"preset": "slow",
"tune": "film",
},
"audio": {
"mp4_bitrate": "256k",
"sample_rate": "48000",
"pcm_codec": "pcm_s24le",
},
}
def write_default_config(path: Path) -> None:
path.write_text(
"""# Amazon PVD Mezzanine Encoder Konfiguration
# Zeilen mit # sind Kommentare.
# Leere Werte bedeuten: automatisch suchen bzw. automatisch waehlen.
[ffmpeg]
# Optional: feste Pfade setzen. Leer lassen fuer automatische Suche.
ffmpeg_exe =
ffprobe_exe =
# Suchordner nach PATH. Mehrere Ordner mit Komma trennen.
search_dirs = C:\\Tools\\FFMPEG, C:\\Software
[output]
# Optional: festes Zielverzeichnis. Leer lassen fuer preferred_dirs.
base_dir =
# Erstes vorhandenes Verzeichnis wird verwendet.
preferred_dirs = F:\\VOD, H:\\VOD
[video]
# HD-PVD-MP4
hd_bitrate = 30M
hd_maxrate = 35M
hd_bufsize = 50M
hd_level = 4.1
# SD-PVD-MP4
sd_bitrate = 8M
sd_maxrate = 10M
sd_bufsize = 15M
sd_level = 3.1
# x264
preset = slow
tune = film
[audio]
# Deutscher Stereo-Ton im MP4
mp4_bitrate = 256k
sample_rate = 48000
# Audio-MOVs
pcm_codec = pcm_s24le
""",
encoding="utf-8",
)
def split_config_list(value: str) -> list[str]:
return [part.strip() for part in value.split(",") if part.strip()]
def load_config() -> dict:
if not CONFIG_PATH.exists():
write_default_config(CONFIG_PATH)
return DEFAULT_CONFIG
parser = configparser.ConfigParser()
parser.read(CONFIG_PATH, encoding="utf-8")
config = {
section: dict(values)
for section, values in DEFAULT_CONFIG.items()
}
for section, defaults in DEFAULT_CONFIG.items():
if not parser.has_section(section):
continue
for key, default_value in defaults.items():
if not parser.has_option(section, key):
continue
raw_value = parser.get(section, key).strip()
if isinstance(default_value, list):
config[section][key] = split_config_list(raw_value)
else:
config[section][key] = raw_value
return config
def find_executable(name: str, configured_path: str, search_dirs: list[str]) -> str:
if configured_path:
path = Path(configured_path)
if path.is_file():
return str(path)
raise FileNotFoundError(f"Konfigurierter Pfad fuer {name} existiert nicht: {configured_path}")
from_path = shutil.which(name)
if from_path:
return from_path
for directory in search_dirs:
candidate = Path(directory) / name
if candidate.is_file():
return str(candidate)
searched = ", ".join(["PATH", *search_dirs])
raise FileNotFoundError(f"{name} nicht gefunden. Gesucht in: {searched}")
def choose_output_base_dir(configured_path: str, preferred_dirs: list[str]) -> str:
if configured_path:
return configured_path
for directory in preferred_dirs:
if Path(directory).is_dir():
return directory
return preferred_dirs[0] if preferred_dirs else str(APP_DIR)
CONFIG = load_config()
FFMPEG_EXE = find_executable("ffmpeg.exe", CONFIG["ffmpeg"]["ffmpeg_exe"], CONFIG["ffmpeg"]["search_dirs"])
FFPROBE_EXE = find_executable("ffprobe.exe", CONFIG["ffmpeg"]["ffprobe_exe"], CONFIG["ffmpeg"]["search_dirs"])
OUTPUT_BASE_DIR = choose_output_base_dir(CONFIG["output"]["base_dir"], CONFIG["output"]["preferred_dirs"])
GERMAN_LANGS = {"DEU", "GER"}
LANGUAGE_NAMES = {
"ARA": "Arabisch",
"CHI": "Chinesisch",
"DEU": "Deutsch",
"DUT": "Niederlaendisch",
"ENG": "Englisch",
"FRA": "Franzoesisch",
"FRE": "Franzoesisch",
"GER": "Deutsch",
"ITA": "Italienisch",
"JPN": "Japanisch",
"KOR": "Koreanisch",
"POL": "Polnisch",
"POR": "Portugiesisch",
"RUS": "Russisch",
"SPA": "Spanisch",
}
@dataclass
class AudioRole:
stream_index: int
list_index: int
language: str
channels_from_name: int | None
layout_from_name: str | None
detected_channels: int
codec: str
@property
def is_german(self) -> bool:
return self.language in GERMAN_LANGS
@property
def display_language(self) -> str:
return LANGUAGE_NAMES.get(self.language, self.language)
@property
def effective_channels(self) -> int:
return self.channels_from_name or self.detected_channels
@dataclass
class VideoProfile:
width: int
height: int
fps: float
keyint: int
is_sd: bool
is_ntsc: bool
is_interlaced: bool
target_prim: str
target_trc: str
target_space: str
target_name: str
needs_conversion: bool
can_convert_colorspace: bool
@dataclass
class JobPlan:
input_file: str
output_dir: str
video_stream: dict
audio_streams: list[dict]
audio_roles: list[AudioRole]
video_profile: VideoProfile
pvd_mp4: str
german_mov: str | None
original_mov: str | None
selected_mp4_audio: AudioRole
forced_srt: str | None
commands: list[tuple[str, list[str]]]
def strip_audio_tokens(stem: str) -> str:
cleaned = stem
token_pattern = re.compile(r"(?i)(^|[_\-\s])([A-Z]{3}[1-8][0-9])(?=$|[_\-\s])")
while True:
next_cleaned = token_pattern.sub(r"\1", cleaned)
if next_cleaned == cleaned:
break
cleaned = next_cleaned
cleaned = re.sub(r"[_\-\s]+", "_", cleaned).strip("_- ")
return cleaned or stem
def extract_title(input_path: str) -> str:
"""Extrahiert den Titel einheitlich für alle Ausgabedateien (Blu-ray / ProRes / DVD)."""
path = Path(input_path)
path_parts = [p.upper() for p in path.parts]
if "BDMV" in path_parts:
bdmv_index = path_parts.index("BDMV")
try:
project_folder = path_parts[bdmv_index - 4]
except IndexError:
project_folder = path_parts[bdmv_index - 1] if bdmv_index > 0 else "UNKNOWN"
clean_name = re.sub(r"^BEST_", "", project_folder, flags=re.IGNORECASE)
words = [w.capitalize() for w in clean_name.split("_") if w.strip()]
title = "".join(words)
else:
title = strip_audio_tokens(path.stem)
title = re.sub(r"[^A-Za-z0-9]", "", title)
return title or "UNKNOWN_TITLE"
def get_pvd_filename(input_path: str) -> str:
return f"{extract_title(input_path)}_DEU20_PVD.mp4"
def safe_output_stem(input_path: str) -> str:
return extract_title(input_path)
def probe_metadata(filepath: str) -> dict:
cmd = [
FFPROBE_EXE,
"-v",
"quiet",
"-print_format",
"json",
"-show_streams",
"-show_format",
filepath,
]
result = subprocess.run(cmd, capture_output=True, text=True, check=False)
if result.returncode != 0:
raise RuntimeError(f"ffprobe konnte die Datei nicht lesen.\n{result.stderr}")
return json.loads(result.stdout)
def parse_fps(rate: str) -> float:
if not rate or rate == "0/0":
return 25.0
if "/" in rate:
num, den = map(int, rate.split("/", 1))
return num / den if den else float(num)
return float(rate)
def analyze_video(v_stream: dict) -> VideoProfile:
width = int(v_stream.get("width", 0))
height = int(v_stream.get("height", 0))
fps = parse_fps(v_stream.get("avg_frame_rate") or v_stream.get("r_frame_rate", "25/1"))
keyint = max(1, math.ceil(fps))
is_sd = height <= 576
is_ntsc = is_sd and height <= 480 and fps >= 29.0
field_order = v_stream.get("field_order", "unknown")
scan_type = v_stream.get("scan_type", "unknown")
is_interlaced = (
field_order not in ("progressive", "unknown")
or scan_type == "interlaced"
)
if is_sd and is_ntsc:
target_prim = "smpte170m"
target_trc = "smpte170m"
target_space = "smpte170m"
target_name = "Rec.601 NTSC (smpte170m)"
elif is_sd:
target_prim = "bt470bg"
target_trc = "bt470bg"
target_space = "bt470bg"
target_name = "Rec.601 PAL (bt470bg)"
else:
target_prim = "bt709"
target_trc = "bt709"
target_space = "bt709"
target_name = "Rec.709 HD"
source_prim = v_stream.get("color_primaries", "unknown")
source_trc = v_stream.get("color_transfer", "unknown")
source_space = v_stream.get("color_space", "unknown")
needs_conversion = (
source_prim == "unknown"
or source_prim != target_prim
or source_trc != target_trc
or source_space != target_space
)
can_convert_colorspace = all(
value not in ("unknown", None, "")
for value in (source_prim, source_trc, source_space)
)
return VideoProfile(
width=width,
height=height,
fps=fps,
keyint=keyint,
is_sd=is_sd,
is_ntsc=is_ntsc,
is_interlaced=is_interlaced,
target_prim=target_prim,
target_trc=target_trc,
target_space=target_space,
target_name=target_name,
needs_conversion=needs_conversion,
can_convert_colorspace=can_convert_colorspace,
)
def parse_audio_tokens(input_path: str) -> list[tuple[str, int, str]]:
"""Reads tokens like DEU51_ENG20 from the filename in stream order."""
stem = Path(input_path).stem.upper()
roles = []
for match in re.finditer(r"(?<![A-Z0-9])([A-Z]{3})([1-8])([0-9])(?![A-Z0-9])", stem):
lang = match.group(1)
channels = int(match.group(2))
decimals = match.group(3)
label = f"{channels}.{decimals}"
if channels == 2 and decimals == "0":
channel_count = 2
elif channels == 5 and decimals == "1":
channel_count = 6
elif decimals == "0":
channel_count = channels
else:
channel_count = channels + 1
roles.append((lang, channel_count, label))
return roles
def build_audio_roles(input_path: str, audio_streams: list[dict]) -> list[AudioRole]:
tokens = parse_audio_tokens(input_path)
roles = []
for list_index, stream in enumerate(audio_streams):
token = tokens[list_index] if list_index < len(tokens) else None
tag_lang = stream.get("tags", {}).get("language", "und").upper()
language = token[0] if token else normalize_language(tag_lang)
channels_from_name = token[1] if token else None
layout_from_name = token[2] if token else None
roles.append(
AudioRole(
stream_index=int(stream["index"]),
list_index=list_index,
language=language,
channels_from_name=channels_from_name,
layout_from_name=layout_from_name,
detected_channels=int(stream.get("channels", 2)),
codec=stream.get("codec_name", "unknown"),
)
)
return roles
def normalize_language(language: str) -> str:
lang = (language or "UND").upper()
if lang == "GER":
return "DEU"
return lang[:3]
def select_mp4_german_audio(audio_roles: list[AudioRole]) -> AudioRole:
german_roles = [role for role in audio_roles if role.is_german]
if not german_roles:
raise RuntimeError("Keine deutsche Audiospur gefunden. Dateiname muss z.B. DEU51 oder DEU20 enthalten.")
stereo = [role for role in german_roles if role.effective_channels == 2]
return stereo[0] if stereo else german_roles[0]
def build_plan(input_file: str, output_dir: str = OUTPUT_BASE_DIR) -> JobPlan:
if not os.path.isfile(input_file):
raise FileNotFoundError(f"Eingabedatei nicht gefunden: {input_file}")
metadata = probe_metadata(input_file)
video_stream = next((s for s in metadata["streams"] if s["codec_type"] == "video"), None)
audio_streams = [s for s in metadata["streams"] if s["codec_type"] == "audio"]
if not video_stream:
raise RuntimeError("Kein Videostream gefunden.")
if not audio_streams:
raise RuntimeError("Keine Audio-Tracks gefunden.")
video_profile = analyze_video(video_stream)
audio_roles = build_audio_roles(input_file, audio_streams)
selected_mp4_audio = select_mp4_german_audio(audio_roles)
output_base = Path(output_dir)
pvd_mp4 = str(output_base / get_pvd_filename(input_file))
output_stem = safe_output_stem(input_file)
german_roles = [role for role in audio_roles if role.is_german]
original_roles = [role for role in audio_roles if not role.is_german]
german_mov = str(output_base / f"{output_stem}_DEU_AUDIO_PCM.mov") if german_roles else None
original_mov = str(output_base / f"{output_stem}_OV_AUDIO_PCM.mov") if original_roles else None
forced_srt_path = os.path.splitext(input_file)[0] + "_forced.srt"
forced_srt = forced_srt_path if os.path.exists(forced_srt_path) else None
plan = JobPlan(
input_file=input_file,
output_dir=output_dir,
video_stream=video_stream,
audio_streams=audio_streams,
audio_roles=audio_roles,
video_profile=video_profile,
pvd_mp4=pvd_mp4,
german_mov=german_mov,
original_mov=original_mov,
selected_mp4_audio=selected_mp4_audio,
forced_srt=forced_srt,
commands=[],
)
plan.commands = build_commands(plan)
return plan
def build_video_filters(plan: JobPlan) -> list[str]:
filters = []
profile = plan.video_profile
if profile.is_interlaced:
filters.append("bwdif=mode=0:parity=auto")
if profile.needs_conversion and profile.can_convert_colorspace:
filters.append(
"colorspace="
f"primaries={profile.target_prim}:"
f"trc={profile.target_trc}:"
f"space={profile.target_space}:range=tv"
)
if plan.forced_srt:
clean_srt_path = plan.forced_srt.replace("\\", "/").replace(":", "\\:")
filters.append(f"subtitles='{clean_srt_path}':force_style='Fontsize=14,MarginV=38'")
return filters
def build_commands(plan: JobPlan) -> list[tuple[str, list[str]]]:
commands = [("Amazon PVD MP4", build_pvd_mp4_command(plan))]
german_roles = [role for role in plan.audio_roles if role.is_german]
original_roles = [role for role in plan.audio_roles if not role.is_german]
if german_roles and plan.german_mov:
commands.append(("Deutsche Audio-MOV", build_audio_mov_command(plan, german_roles, plan.german_mov)))
if original_roles and plan.original_mov:
commands.append(("Originalsprachige Audio-MOV", build_audio_mov_command(plan, original_roles, plan.original_mov)))
return commands
def build_pvd_mp4_command(plan: JobPlan) -> list[str]:
profile = plan.video_profile
selected_audio = plan.selected_mp4_audio
video_config = CONFIG["video"]
audio_config = CONFIG["audio"]
if profile.is_sd:
bv = video_config["sd_bitrate"]
maxr = video_config["sd_maxrate"]
bufs = video_config["sd_bufsize"]
level = video_config["sd_level"]
else:
bv = video_config["hd_bitrate"]
maxr = video_config["hd_maxrate"]
bufs = video_config["hd_bufsize"]
level = video_config["hd_level"]
cmd = [
FFMPEG_EXE,
"-hide_banner",
"-y",
"-i",
plan.input_file,
"-map",
f"0:{plan.video_stream['index']}",
"-map",
f"0:{selected_audio.stream_index}",
]
filters = build_video_filters(plan)
if filters:
cmd.extend(["-vf", ",".join(filters)])
cmd.extend(
[
"-c:v",
"libx264",
"-profile:v",
"high",
"-level",
level,
"-pix_fmt",
"yuv420p",
"-b:v",
bv,
"-maxrate",
maxr,
"-bufsize",
bufs,
"-preset",
video_config["preset"],
"-tune",
video_config["tune"],
"-x264-params",
f"keyint={profile.keyint}:min-keyint=2:scenecut=40:bframes=3:aq-mode=2",
"-color_primaries",
profile.target_prim,
"-color_trc",
profile.target_trc,
"-colorspace",
profile.target_space,
"-c:a",
"aac",
"-b:a",
audio_config["mp4_bitrate"],
"-ac:a:0",
"2",
"-ar:a:0",
audio_config["sample_rate"],
"-f",
"mp4",
"-movflags",
"+faststart",
"-metadata:s:a:0",
"language=deu",
"-metadata:s:v:0",
"language=deu",
"-map_chapters",
"-1",
"-map_metadata",
"-1",
"-avoid_negative_ts",
"make_zero",
plan.pvd_mp4,
]
)
return cmd
def build_audio_mov_command(plan: JobPlan, roles: list[AudioRole], output_path: str) -> list[str]:
cmd = [FFMPEG_EXE, "-hide_banner", "-y", "-i", plan.input_file]
audio_config = CONFIG["audio"]
for role in roles:
cmd.extend(["-map", f"0:{role.stream_index}"])
cmd.extend(["-vn", "-map_chapters", "-1", "-map_metadata", "-1"])
for idx, role in enumerate(roles):
cmd.extend([f"-c:a:{idx}", audio_config["pcm_codec"], f"-ar:a:{idx}", audio_config["sample_rate"]])
cmd.extend([f"-metadata:s:a:{idx}", f"language={role.language.lower()}"])
title = role.display_language
if role.layout_from_name:
title = f"{title} {role.layout_from_name}"
cmd.extend([f"-metadata:s:a:{idx}", f"title={title}"])
cmd.extend(["-f", "mov", output_path])
return cmd
def run_command(cmd: list[str], log: Callable[[str], None]) -> int:
log(" ".join(f'"{part}"' if " " in part else part for part in cmd))
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
)
assert process.stdout is not None
for line in process.stdout:
log(line.rstrip())
return process.wait()
def run_plan(plan: JobPlan, log: Callable[[str], None] = print) -> None:
os.makedirs(plan.output_dir, exist_ok=True)
log_plan(plan, log)
for label, cmd in plan.commands:
log("")
log(f"Starte: {label}")
returncode = run_command(cmd, log)
if returncode != 0:
raise RuntimeError(f"{label} fehlgeschlagen (FFmpeg-Code {returncode}).")
log(f"Fertig: {label}")
def log_plan(plan: JobPlan, log: Callable[[str], None] = print) -> None:
profile = plan.video_profile
log(f"Quelle: {plan.input_file}")
log(f"FFmpeg: {FFMPEG_EXE}")
log(f"FFprobe: {FFPROBE_EXE}")
log(f"Video: {profile.width}x{profile.height} @ {profile.fps:.3f} fps, {profile.target_name}")
if profile.needs_conversion and not profile.can_convert_colorspace:
log("Farbraum: Quell-Metadaten unvollstaendig; setze Ziel-Metadaten ohne Colorspace-Filter.")
log(f"MP4: {plan.pvd_mp4}")
if plan.german_mov:
log(f"Deutsche Audio-MOV: {plan.german_mov}")
if plan.original_mov:
log(f"Originalsprachige Audio-MOV: {plan.original_mov}")
log("Audio-Tracks:")
for role in plan.audio_roles:
layout = role.layout_from_name or f"{role.detected_channels}.0?"
selected = " -> MP4" if role == plan.selected_mp4_audio else ""
log(
f" Spur {role.list_index + 1}: Stream {role.stream_index}, "
f"{role.display_language}, {layout}, {role.codec}{selected}"
)
if not plan.original_mov:
log("Nur deutsche Tonspuren erkannt; originalsprachige Audio-MOV wird nicht erzeugt.")
if plan.selected_mp4_audio.effective_channels != 2:
log("MP4-Ton: keine deutsche Stereo-Spur gefunden, FFmpeg downmixt die gewaehlte deutsche Spur auf Stereo.")
def run_cli(input_file: str, output_dir: str = OUTPUT_BASE_DIR) -> int:
try:
plan = build_plan(input_file, output_dir)
run_plan(plan)
print("\nERFOLGREICH abgeschlossen.")
return 0
except Exception as exc:
print(f"\nFEHLER: {exc}")
return 1
class MezzanineApp:
def __init__(self, root: tk.Tk):
self.root = root
self.root.title("Amazon PVD Mezzanine Encoder")
self.root.geometry("980x680")
self.log_queue: queue.Queue[str] = queue.Queue()
self.worker: threading.Thread | None = None
self.plan: JobPlan | None = None
self.input_var = tk.StringVar()
self.output_var = tk.StringVar(value=OUTPUT_BASE_DIR)
self.status_var = tk.StringVar(value="Quelle auswaehlen und analysieren.")
self.build_ui()
self.root.after(100, self.flush_log_queue)
def build_ui(self) -> None:
outer = ttk.Frame(self.root, padding=14)
outer.pack(fill="both", expand=True)
file_row = ttk.Frame(outer)
file_row.pack(fill="x")
ttk.Label(file_row, text="Quelle").pack(side="left")
ttk.Entry(file_row, textvariable=self.input_var).pack(side="left", fill="x", expand=True, padx=8)
ttk.Button(file_row, text="Auswaehlen", command=self.choose_input).pack(side="left")
output_row = ttk.Frame(outer)
output_row.pack(fill="x", pady=(8, 0))
ttk.Label(output_row, text="Ausgabe").pack(side="left")
ttk.Entry(output_row, textvariable=self.output_var).pack(side="left", fill="x", expand=True, padx=8)
ttk.Button(output_row, text="Ordner", command=self.choose_output).pack(side="left")
button_row = ttk.Frame(outer)
button_row.pack(fill="x", pady=12)
self.analyze_button = ttk.Button(button_row, text="Analysieren", command=self.analyze)
self.analyze_button.pack(side="left")
self.start_button = ttk.Button(button_row, text="Encoding starten", command=self.start, state="disabled")
self.start_button.pack(side="left", padx=8)
ttk.Label(button_row, textvariable=self.status_var).pack(side="left", padx=12)
columns = ("track", "stream", "language", "layout", "codec", "usage")
self.tree = ttk.Treeview(outer, columns=columns, show="headings", height=8)
headings = {
"track": "Spur",
"stream": "Stream",
"language": "Sprache",
"layout": "Format",
"codec": "Codec",
"usage": "Verwendung",
}
widths = {"track": 60, "stream": 80, "language": 160, "layout": 100, "codec": 120, "usage": 300}
for col in columns:
self.tree.heading(col, text=headings[col])
self.tree.column(col, width=widths[col], anchor="w")
self.tree.pack(fill="x", pady=(0, 12))
self.summary = tk.Text(outer, height=7, wrap="word")
self.summary.pack(fill="x", pady=(0, 12))
self.summary.configure(state="disabled")
self.log_text = tk.Text(outer, wrap="word")
self.log_text.pack(fill="both", expand=True)
self.log_text.configure(state="disabled")
def choose_input(self) -> None:
path = filedialog.askopenfilename(
title="Basis-Video auswaehlen",
filetypes=[
("Video-Dateien", "*.mov *.mxf *.mp4 *.mkv *.ts *.m2ts *.vob"),
("Alle Dateien", "*.*"),
],
)
if path:
self.input_var.set(path)
self.start_button.configure(state="disabled")
def choose_output(self) -> None:
path = filedialog.askdirectory(title="Ausgabeordner auswaehlen", initialdir=self.output_var.get())
if path:
self.output_var.set(path)
def analyze(self) -> None:
try:
self.plan = build_plan(self.input_var.get(), self.output_var.get())
self.render_plan()
self.start_button.configure(state="normal")
self.status_var.set("Analyse fertig. Encoding kann gestartet werden.")
except Exception as exc:
self.plan = None
self.start_button.configure(state="disabled")
self.status_var.set("Analyse fehlgeschlagen.")
messagebox.showerror("Analyse fehlgeschlagen", str(exc))
def render_plan(self) -> None:
assert self.plan is not None
for item in self.tree.get_children():
self.tree.delete(item)
for role in self.plan.audio_roles:
usage = []
if role == self.plan.selected_mp4_audio:
usage.append("MP4 Stereo")
if role.is_german:
usage.append("DEU PCM-MOV")
else:
usage.append("OV PCM-MOV")
self.tree.insert(
"",
"end",
values=(
role.list_index + 1,
role.stream_index,
role.display_language,
role.layout_from_name or f"{role.detected_channels} Kan.",
role.codec,
", ".join(usage),
),
)
lines: list[str] = []
log_plan(self.plan, lines.append)
self.summary.configure(state="normal")
self.summary.delete("1.0", "end")
self.summary.insert("end", "\n".join(lines))
self.summary.configure(state="disabled")
def start(self) -> None:
if self.worker and self.worker.is_alive():
return
if self.plan is None:
self.analyze()
if self.plan is None:
return
self.start_button.configure(state="disabled")
self.analyze_button.configure(state="disabled")
self.status_var.set("Encoding laeuft...")
self.clear_log()
self.worker = threading.Thread(target=self.run_worker, daemon=True)
self.worker.start()
def run_worker(self) -> None:
try:
assert self.plan is not None
run_plan(self.plan, self.log_queue.put)
self.log_queue.put("\nERFOLGREICH abgeschlossen.")
self.log_queue.put("__DONE__")
except Exception as exc:
self.log_queue.put(f"\nFEHLER: {exc}")
self.log_queue.put("__FAILED__")
def flush_log_queue(self) -> None:
try:
while True:
message = self.log_queue.get_nowait()
if message == "__DONE__":
self.status_var.set("Fertig.")
self.start_button.configure(state="normal")
self.analyze_button.configure(state="normal")
continue
if message == "__FAILED__":
self.status_var.set("Fehler.")
self.start_button.configure(state="normal")
self.analyze_button.configure(state="normal")
continue
self.append_log(message)
except queue.Empty:
pass
self.root.after(100, self.flush_log_queue)
def append_log(self, message: str) -> None:
self.log_text.configure(state="normal")
self.log_text.insert("end", message + "\n")
self.log_text.see("end")
self.log_text.configure(state="disabled")
def clear_log(self) -> None:
self.log_text.configure(state="normal")
self.log_text.delete("1.0", "end")
self.log_text.configure(state="disabled")
def run_ui() -> int:
if tk is None:
return run_no_tk_fallback()
root = tk.Tk()
app = MezzanineApp(root)
if len(sys.argv) > 1 and os.path.isfile(sys.argv[1]):
app.input_var.set(sys.argv[1])
root.mainloop()
return 0
def run_no_tk_fallback() -> int:
if len(sys.argv) > 1 and os.path.isfile(sys.argv[1]):
print("Tkinter ist nicht verfuegbar. Starte Drag-and-Drop-Datei direkt im CLI-Modus.")
return run_cli(sys.argv[1], OUTPUT_BASE_DIR)
print("Tkinter ist nicht verfuegbar. Oeffne Windows-Dateidialog als Fallback.")
input_file = choose_input_with_powershell()
if input_file is None:
print("Keine Quelle ausgewaehlt.")
return 1
print(f"Quelle: {input_file}")
print(f"Ausgabe: {OUTPUT_BASE_DIR}")
return run_cli(input_file, OUTPUT_BASE_DIR)
def choose_input_with_powershell() -> str | None:
script = r"""
Add-Type -AssemblyName System.Windows.Forms
$open = New-Object System.Windows.Forms.OpenFileDialog
$open.Title = 'Basis-Video auswaehlen'
$open.Filter = 'Video-Dateien (*.mov;*.mxf;*.mp4;*.mkv;*.ts;*.m2ts;*.vob)|*.mov;*.mxf;*.mp4;*.mkv;*.ts;*.m2ts;*.vob|Alle Dateien (*.*)|*.*'
if ($open.ShowDialog() -ne [System.Windows.Forms.DialogResult]::OK) { exit 2 }
Write-Output $open.FileName
"""
result = subprocess.run(
["powershell", "-NoProfile", "-STA", "-Command", script],
capture_output=True,
text=True,
check=False,
)
if result.returncode != 0:
return None
lines = [line.strip() for line in result.stdout.splitlines() if line.strip()]
if not lines:
return None
return lines[0]
def main() -> int:
if len(sys.argv) > 2 and sys.argv[1] == "--cli":
return run_cli(sys.argv[2], OUTPUT_BASE_DIR)
if len(sys.argv) > 1:
input_file = sys.argv[1]
if not os.path.isfile(input_file):
print(f"FEHLER: Eingabedatei nicht gefunden: {input_file}")
return 1
return run_cli(input_file, OUTPUT_BASE_DIR)
return run_ui()
if __name__ == "__main__":
raise SystemExit(main())