""" scripts/generate_cutter_report.py — generate CUTTER_REPORT.md from current cache Regenerates ``CUTTER_REPORT.md`` from ``.cache/match_results.json``, ``.cache/trailer_beats.json`` and ``.cache/vision_descriptions.json``. The report is a hand-off document for a video editor (Cutter) doing the manual recut: per beat it lists trailer timecode, the proposed source timecode, the match score, what the vision model saw in the trailer beat, and side-by-side preview stills (extracted via ffmpeg). Important: trailer and source can have different frame rates (e.g. trailer 25 fps, source 23.976 fps). This script probes each file with ffprobe and renders trailer timecodes in trailer fps and source timecodes in source fps, so the timecode matches what the cutter sees in the NLE. Usage (from project root): python scripts/generate_cutter_report.py # text + stills python scripts/generate_cutter_report.py --no-stills # text only Stills go to ``output/cutter_stills/beat_NN_{trailer,source}.jpg`` and are referenced from the markdown. They are only re-rendered when the underlying match position has changed — fast on repeat runs. """ from __future__ import annotations import argparse import json import os import re import subprocess import sys from datetime import date from pathlib import Path # ---------------------------------------------------------------------------- # Frame-rate handling # ---------------------------------------------------------------------------- def probe_fps(video_path: Path) -> float | None: """Return container fps (avg_frame_rate) for a video file, or None.""" if not video_path.exists(): return None try: proc = subprocess.run( [ "ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=avg_frame_rate", "-of", "default=noprint_wrappers=1:nokey=1", str(video_path), ], capture_output=True, text=True, timeout=10, ) except (FileNotFoundError, subprocess.TimeoutExpired): return None raw = proc.stdout.strip() if "/" in raw: num, _, den = raw.partition("/") try: n, d = float(num), float(den) return n / d if d else None except ValueError: return None try: return float(raw) except ValueError: return None def smpte(t: float | None, fps: float) -> str: """Format seconds as h:mm:ss:ff, frame counter rounded to nearest int fps.""" if t is None: return "--:--:--:--" fps_int = max(1, int(round(fps))) total = int(round(t * fps_int)) h = total // (3600 * fps_int) m = (total // (60 * fps_int)) % 60 s = (total // fps_int) % 60 f = total % fps_int return f"{h:02d}:{m:02d}:{s:02d}:{f:02d}" # ---------------------------------------------------------------------------- # Vision-description helpers # ---------------------------------------------------------------------------- def best_beat_description(items: dict, beat_id: int, start_s: float, end_s: float) -> str | None: best, best_diff = None, 1e9 for key, value in items.items(): if not key.startswith(f"beat:{beat_id}:") or not isinstance(value, dict): continue try: parts = key.split(":") ks, ke = float(parts[2]), float(parts[3]) except (IndexError, ValueError): continue diff = abs(ks - start_s) + abs(ke - end_s) if diff < best_diff: best_diff = diff best = value return best.get("description", "") if best else None def parse_field(desc: str | None, key: str) -> str: if not desc: return "" match = re.search(rf'"{key}"\s*:\s*"([^"]+)"', desc) return match.group(1) if match else "" # ---------------------------------------------------------------------------- # Stills # ---------------------------------------------------------------------------- STILL_WIDTH = 360 # px, downscaled for fast preview in the markdown STILL_QUALITY = 5 # ffmpeg -q:v scale 1 (best) .. 31 (worst) def extract_still(video_path: Path, t_s: float, out: Path) -> bool: """Extract one JPEG frame at t_s. Skip if out is newer than video.""" if not video_path.exists(): return False try: if out.exists() and out.stat().st_mtime >= video_path.stat().st_mtime and out.stat().st_size > 0: return True except OSError: pass out.parent.mkdir(parents=True, exist_ok=True) cmd = [ "ffmpeg", "-y", "-loglevel", "error", "-ss", f"{max(0.0, t_s):.3f}", "-i", str(video_path), "-frames:v", "1", "-vf", f"scale={STILL_WIDTH}:-2", "-q:v", str(STILL_QUALITY), str(out), ] try: subprocess.run(cmd, check=True, capture_output=True, timeout=30) except (FileNotFoundError, subprocess.CalledProcessError, subprocess.TimeoutExpired): return False return out.exists() and out.stat().st_size > 0 def beat_still_time(start_s: float, end_s: float) -> float: """Pick a representative time inside the beat (~30% in, but at least 0.4 s).""" duration = max(0.04, end_s - start_s) return start_s + min(0.4, duration * 0.3) # ---------------------------------------------------------------------------- # Renderer # ---------------------------------------------------------------------------- def render_report(project_root: Path, with_stills: bool = True) -> str: sys.path.insert(0, str(project_root)) from src.core.config import load_config cfg = load_config(project_root / "config.toml") trailer_path = Path(cfg.paths.reference_trailer) source_path = Path(cfg.paths.source_movie) trailer_fps = probe_fps(trailer_path) or 25.0 source_fps = probe_fps(source_path) or float(cfg.export.edl_frame_rate) cache = project_root / ".cache" results = {r["beat_id"]: r for r in json.loads((cache / "match_results.json").read_text())} beats = json.loads((cache / "trailer_beats.json").read_text()) vis_path = cache / "vision_descriptions.json" vis_items = json.loads(vis_path.read_text())["items"] if vis_path.exists() else {} stills_dir = project_root / "output" / "cutter_stills" if with_stills: stills_dir.mkdir(parents=True, exist_ok=True) lines: list[str] = [] lines.append("# Cutter-Report — manuelles Nachschneiden") lines.append("") lines.append(f"Stand: {date.today().isoformat()}") lines.append("") lines.append(f"- **Trailer**: `{trailer_path.name}` @ {trailer_fps:.3f} fps") lines.append(f"- **Source** : `{source_path.name}` @ {source_fps:.3f} fps") lines.append("") lines.append( "Trailer-Timecodes sind in **Trailer-Framerate** angegeben, " "Source-Timecodes in **Source-Framerate**. So passen sie 1:1 zu dem, " "was du in deinem NLE auf den jeweiligen Spuren siehst." ) lines.append("") lines.append( "Diese Datei wird automatisch erzeugt — nach jedem `python cli.py match` " "neu generieren mit:" ) lines.append("") lines.append("```powershell") lines.append("python scripts/generate_cutter_report.py") lines.append("```") lines.append("") lines.append("## Status-Legende") lines.append("") lines.append("| Status | Bedeutung | Was tun? |") lines.append("|--------|-----------|----------|") lines.append("| `OK` | bestätigt durch CV + Vision-Phasenprüfung | übernehmen, optional stichprobenartig sichten |") lines.append("| `?` | korrekte Szene, Phase eventuell um wenige Frames verschoben | im NLE prüfen, Source-In ggf. nachjustieren |") lines.append("| `MAN.` | kein automatischer Treffer | manuell suchen oder als Schwarzfade/Titel übernehmen |") lines.append("") matched = sum(1 for b in beats if b["beat_id"] in results) confirmed = sum(1 for b in beats if b["beat_id"] in results and results[b["beat_id"]]["is_confirmed"]) lines.append("## Übersicht") lines.append("") lines.append(f"- Beats gesamt: **{len(beats)}**") lines.append(f"- Automatisch gefunden: **{matched}** ({confirmed} davon bestätigt)") lines.append(f"- Manuell zu setzen: **{len(beats) - matched}**") lines.append("") # ---- Compact table (timecode-only, no images) ------------------------ lines.append("## Beat-Tabelle (kompakt)") lines.append("") lines.append("| Beat | Trailer In / Out | Source In / Out | Score | Status | Was im Bild zu sehen ist |") lines.append("|-----:|------------------|------------------|------:|:------:|---------------------------|") def status_for(rec: dict | None) -> str: if rec is None: return "MAN." return "OK" if rec.get("is_confirmed") else "?" for beat in beats: bid = beat["beat_id"] rec = results.get(bid) ti = smpte(beat["start_s"], trailer_fps) to = smpte(beat["end_s"], trailer_fps) if rec is not None: si = smpte(rec["in_point_s"], source_fps) so = smpte(rec["out_point_s"], source_fps) sc = rec["match_score"] else: si = so = "—" sc = 0.0 desc = best_beat_description(vis_items, bid, beat["start_s"], beat["end_s"]) or "" phase = (parse_field(desc, "action_phase") or parse_field(desc, "subject"))[:80] lines.append(f"| {bid:>4} | {ti}-{to} | {si}-{so} | {sc:.3f} | {status_for(rec)} | {phase} |") lines.append("") # ---- Detailed per-beat sections with stills -------------------------- lines.append("## Beat-Details mit Vorschau-Stills") lines.append("") if not with_stills: lines.append("_Stills sind in diesem Lauf deaktiviert (`--no-stills`)._") lines.append("") for beat in beats: bid = beat["beat_id"] rec = results.get(bid) ti = smpte(beat["start_s"], trailer_fps) to = smpte(beat["end_s"], trailer_fps) if rec is not None: si = smpte(rec["in_point_s"], source_fps) so = smpte(rec["out_point_s"], source_fps) sc_str = f"{rec['match_score']:.3f}" scn = rec["scene_id"] else: si = so = "—" sc_str = "—" scn = "—" status = status_for(rec) desc = best_beat_description(vis_items, bid, beat["start_s"], beat["end_s"]) or "" phase = parse_field(desc, "action_phase") or parse_field(desc, "subject") or "(keine Vision-Beschreibung)" composition = parse_field(desc, "composition") setting = parse_field(desc, "setting") lines.append(f"### Beat {bid:02d} — Status `{status}`") lines.append("") lines.append(f"- **Trailer**: {ti} – {to}") if rec is not None: lines.append(f"- **Source** : {si} – {so} (scene {scn}, score {sc_str})") else: lines.append("- **Source** : — (kein Treffer; manuell setzen)") lines.append(f"- **Phase** : {phase}") if composition: lines.append(f"- **Bild** : {composition}{', ' + setting if setting else ''}") lines.append("") if with_stills: t_still = beat_still_time(beat["start_s"], beat["end_s"]) trailer_jpg = stills_dir / f"beat_{bid:02d}_trailer.jpg" ok_t = extract_still(trailer_path, t_still, trailer_jpg) source_jpg = stills_dir / f"beat_{bid:02d}_source.jpg" if rec is not None: s_t = rec["in_point_s"] + min(0.4, max(0.04, rec["out_point_s"] - rec["in_point_s"]) * 0.3) ok_s = extract_still(source_path, s_t, source_jpg) else: ok_s = False cells_h = [] cells_t = [] cells_h.append("Trailer") if ok_t: rel_t = trailer_jpg.relative_to(project_root).as_posix() cells_t.append(f"![Trailer beat {bid}]({rel_t})") else: cells_t.append("_(kein Still)_") cells_h.append("Source") if ok_s: rel_s = source_jpg.relative_to(project_root).as_posix() cells_t.append(f"![Source beat {bid}]({rel_s})") else: cells_t.append("_(kein Still)_") lines.append("| " + " | ".join(cells_h) + " |") lines.append("|" + "|".join(["---"] * len(cells_h)) + "|") lines.append("| " + " | ".join(cells_t) + " |") lines.append("") lines.append("## Hinweise zur Prüfung") lines.append("") lines.append( "1. Wenn die Bewegungsphase im Source-Still nicht zum Trailer-Still passt, im NLE den Source-In um wenige Frames verschieben — innerhalb derselben Source-Szene reicht das meistens." ) lines.append( "2. Wenn der Source-Clip kürzer ist als der Trailerbeat (Source-Out < Trailer-Out), enthält der Trailerbeat eine Blende oder Titelkarte; im Schnitt mit Schwarzfade oder dem Source-Tail auffüllen." ) lines.append( "3. `OK`-Beats sind doppelt verifiziert (CV + Vision-Phase). Trotzdem stichprobenartig sichten." ) lines.append( "4. Stills liegen unter `output/cutter_stills/`. Bei Bedarf einzelne neu generieren: einfach die Datei löschen und das Skript erneut laufen lassen." ) lines.append("") return "\n".join(lines) # ---------------------------------------------------------------------------- # CLI entry # ---------------------------------------------------------------------------- def main() -> int: parser = argparse.ArgumentParser(description="Render CUTTER_REPORT.md from current cache") parser.add_argument("--no-stills", action="store_true", help="skip frame extraction") args = parser.parse_args() here = Path(__file__).resolve().parent project_root = here.parent out = project_root / "CUTTER_REPORT.md" out.write_text(render_report(project_root, with_stills=not args.no_stills), encoding="utf-8") print(f"Wrote {out}") return 0 if __name__ == "__main__": raise SystemExit(main())