""" scripts/generate_cutter_report.py — generate CUTTER_REPORT.{md,html} from cache Renders two reports for the video editor: * ``CUTTER_REPORT.md`` — text + base64-embedded preview stills. Self- contained (no broken image links on git server), opens in any markdown viewer. * ``CUTTER_REPORT.html`` — same data with HTML layout, side-by-side preview stills, and optionally side-by-side 3-second MP4 video clips per beat for sight-checking phase agreement. Frame rates: * Trailer fps is taken from ``config.toml`` if ``[paths] trailer_frame_rate`` is set, otherwise from ffprobe on the trailer file. * Source fps is taken from ``[export] edl_frame_rate`` in config.toml. This is the value the EDL/FCPXML uses, so it matches what the cutter sees in the NLE timeline. ffprobe is used only as a last-resort fallback. Usage (from project root): python scripts/generate_cutter_report.py # full report python scripts/generate_cutter_report.py --no-stills # text-only md python scripts/generate_cutter_report.py --with-clips # also render # video previews """ from __future__ import annotations import argparse import base64 import json import re import subprocess import sys from dataclasses import dataclass from datetime import date from pathlib import Path # ---------------------------------------------------------------------------- # Frame-rate handling # ---------------------------------------------------------------------------- def probe_fps(video_path: Path) -> float | None: """Return the file's frame rate via ffprobe. Tries r_frame_rate first.""" if not video_path.exists(): return None for key in ("r_frame_rate", "avg_frame_rate"): try: proc = subprocess.run( [ "ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", f"stream={key}", "-of", "default=noprint_wrappers=1:nokey=1", str(video_path), ], capture_output=True, text=True, timeout=10, ) except (FileNotFoundError, subprocess.TimeoutExpired): return None raw = proc.stdout.strip() if not raw or raw == "0/0": continue if "/" in raw: num, _, den = raw.partition("/") try: n, d = float(num), float(den) if d: return n / d except ValueError: continue try: return float(raw) except ValueError: continue return None def smpte(t: float | None, fps: float) -> str: """Format seconds as h:mm:ss:ff using nearest-int frame counter.""" if t is None: return "--:--:--:--" fps_int = max(1, int(round(fps))) total = int(round(t * fps_int)) h = total // (3600 * fps_int) m = (total // (60 * fps_int)) % 60 s = (total // fps_int) % 60 f = total % fps_int return f"{h:02d}:{m:02d}:{s:02d}:{f:02d}" # ---------------------------------------------------------------------------- # Vision-description helpers # ---------------------------------------------------------------------------- def best_beat_description(items: dict, beat_id: int, start_s: float, end_s: float) -> str | None: best, best_diff = None, 1e9 for key, value in items.items(): if not key.startswith(f"beat:{beat_id}:") or not isinstance(value, dict): continue try: parts = key.split(":") ks, ke = float(parts[2]), float(parts[3]) except (IndexError, ValueError): continue diff = abs(ks - start_s) + abs(ke - end_s) if diff < best_diff: best_diff = diff best = value return best.get("description", "") if best else None def parse_field(desc: str | None, key: str) -> str: if not desc: return "" match = re.search(rf'"{key}"\s*:\s*"([^"]+)"', desc) return match.group(1) if match else "" # ---------------------------------------------------------------------------- # Stills / clips # ---------------------------------------------------------------------------- STILL_WIDTH = 480 STILL_QUALITY = 4 CLIP_WIDTH = 480 # Clips run for the full beat / match duration, with this cap as a safety net # so a runaway match doesn't pull a 60 s preview. Most beats are below 10 s # anyway. The cap should never silently truncate a normal beat — set it well # above any realistic beat length. CLIP_MAX_DURATION_S = 30.0 def _stale(out: Path, src: Path) -> bool: try: return not (out.exists() and out.stat().st_mtime >= src.stat().st_mtime and out.stat().st_size > 0) except OSError: return True def extract_still(video_path: Path, t_s: float, out: Path) -> bool: if not video_path.exists(): return False if not _stale(out, video_path): return True out.parent.mkdir(parents=True, exist_ok=True) cmd = [ "ffmpeg", "-y", "-loglevel", "error", "-ss", f"{max(0.0, t_s):.3f}", "-i", str(video_path), "-frames:v", "1", "-vf", f"scale={STILL_WIDTH}:-2", "-q:v", str(STILL_QUALITY), str(out), ] try: subprocess.run(cmd, check=True, capture_output=True, timeout=30) except (FileNotFoundError, subprocess.CalledProcessError, subprocess.TimeoutExpired): return False return out.exists() and out.stat().st_size > 0 def extract_clip(video_path: Path, start_s: float, duration_s: float, out: Path) -> bool: if not video_path.exists(): return False if not _stale(out, video_path): return True out.parent.mkdir(parents=True, exist_ok=True) cmd = [ "ffmpeg", "-y", "-loglevel", "error", "-ss", f"{max(0.0, start_s):.3f}", "-i", str(video_path), "-t", f"{max(0.04, duration_s):.3f}", "-vf", f"scale={CLIP_WIDTH}:-2", "-c:v", "libx264", "-preset", "veryfast", "-crf", "26", "-pix_fmt", "yuv420p", "-an", "-movflags", "+faststart", str(out), ] try: subprocess.run(cmd, check=True, capture_output=True, timeout=60) except (FileNotFoundError, subprocess.CalledProcessError, subprocess.TimeoutExpired): return False return out.exists() and out.stat().st_size > 0 def extract_concat_clip( video_path: Path, segments: list[tuple[float, float]], out: Path, ) -> bool: """Render each (start_s, duration_s) segment then concat into one MP4. Used for multi-shot source matches so the cutter sees the assembled source side-by-side with the trailer beat instead of just one segment. """ if not video_path.exists() or not segments: return False out.parent.mkdir(parents=True, exist_ok=True) parts: list[Path] = [] for idx, (seg_start, seg_dur) in enumerate(segments): part = out.with_name(f"{out.stem}_seg{idx:02d}.mp4") if not extract_clip(video_path, seg_start, seg_dur, part): return False parts.append(part) if not parts: return False # Single segment: just rename / copy. if len(parts) == 1: if parts[0].resolve() != out.resolve(): try: if out.exists(): out.unlink() parts[0].rename(out) except OSError: return False return out.exists() and out.stat().st_size > 0 # Multi-segment: concat via ffmpeg concat demuxer (codec params match # because every segment is rendered through extract_clip with identical # encoder settings). list_file = out.with_name(f"{out.stem}_concat.txt") list_file.write_text( "\n".join(f"file '{part.as_posix()}'" for part in parts) + "\n", encoding="utf-8", ) cmd = [ "ffmpeg", "-y", "-loglevel", "error", "-f", "concat", "-safe", "0", "-i", str(list_file), "-c", "copy", "-movflags", "+faststart", str(out), ] try: subprocess.run(cmd, check=True, capture_output=True, timeout=60) finally: try: list_file.unlink() except OSError: pass return out.exists() and out.stat().st_size > 0 def beat_still_time(start_s: float, end_s: float) -> float: duration = max(0.04, end_s - start_s) return start_s + min(0.4, duration * 0.3) def data_uri(path: Path, mime: str) -> str | None: if not path.exists() or path.stat().st_size == 0: return None payload = base64.b64encode(path.read_bytes()).decode("ascii") return f"data:{mime};base64,{payload}" # ---------------------------------------------------------------------------- # Per-beat data # ---------------------------------------------------------------------------- @dataclass class BeatRow: bid: int trailer_in_s: float trailer_out_s: float source_in_s: float | None source_out_s: float | None scene_id: int | None score: float confirmed: bool matched: bool phase: str composition: str setting: str trailer_still: Path | None source_still: Path | None trailer_clip: Path | None source_clip: Path | None @property def status(self) -> str: if not self.matched: return "MAN." return "OK" if self.confirmed else "?" def collect_rows( project_root: Path, beats: list[dict], results: dict[int, dict], vis_items: dict, trailer_path: Path, source_path: Path, with_stills: bool, with_clips: bool, ) -> list[BeatRow]: stills_dir = project_root / "output" / "cutter_stills" clips_dir = project_root / "output" / "cutter_clips" if with_stills: stills_dir.mkdir(parents=True, exist_ok=True) if with_clips: clips_dir.mkdir(parents=True, exist_ok=True) rows: list[BeatRow] = [] for beat in beats: bid = beat["beat_id"] rec = results.get(bid) desc = best_beat_description(vis_items, bid, beat["start_s"], beat["end_s"]) or "" trailer_still = source_still = trailer_clip = source_clip = None if with_stills: t_still = beat_still_time(beat["start_s"], beat["end_s"]) tjpg = stills_dir / f"beat_{bid:02d}_trailer.jpg" if extract_still(trailer_path, t_still, tjpg): trailer_still = tjpg if rec is not None: src_dur = max(0.04, rec["out_point_s"] - rec["in_point_s"]) s_still = rec["in_point_s"] + min(0.4, src_dur * 0.3) sjpg = stills_dir / f"beat_{bid:02d}_source.jpg" if extract_still(source_path, s_still, sjpg): source_still = sjpg if with_clips: # Trailer clip: full beat duration so the cutter sees the entire # reference beat, not an arbitrary-length excerpt. tdur = max(0.5, min(CLIP_MAX_DURATION_S, beat["end_s"] - beat["start_s"])) tmp4 = clips_dir / f"beat_{bid:02d}_trailer.mp4" if extract_clip(trailer_path, beat["start_s"], tdur, tmp4): trailer_clip = tmp4 # Source clip: # - segmented match (multi-shot beat): concatenate each # segment back-to-back so the cutter sees the assembled # source (e.g. man-shot + reaction shot) at the same total # length as the trailer beat. # - single match: extract the full matched span. # May be shorter than the beat when a match drops before the # beat ends (fade / shot change in source) — intentional. if rec is not None: segs = rec.get("segments") or [] smp4 = clips_dir / f"beat_{bid:02d}_source.mp4" if len(segs) >= 2: seg_specs = [ ( float(s["in_point_s"]), max(0.04, float(s["out_point_s"]) - float(s["in_point_s"])), ) for s in segs if float(s["out_point_s"]) > float(s["in_point_s"]) ] if seg_specs and extract_concat_clip(source_path, seg_specs, smp4): source_clip = smp4 else: sdur = max(0.5, min(CLIP_MAX_DURATION_S, rec["out_point_s"] - rec["in_point_s"])) if extract_clip(source_path, rec["in_point_s"], sdur, smp4): source_clip = smp4 rows.append(BeatRow( bid=bid, trailer_in_s=beat["start_s"], trailer_out_s=beat["end_s"], source_in_s=rec["in_point_s"] if rec else None, source_out_s=rec["out_point_s"] if rec else None, scene_id=rec["scene_id"] if rec else None, score=rec["match_score"] if rec else 0.0, confirmed=bool(rec and rec.get("is_confirmed")), matched=rec is not None, phase=parse_field(desc, "action_phase") or parse_field(desc, "subject"), composition=parse_field(desc, "composition"), setting=parse_field(desc, "setting"), trailer_still=trailer_still, source_still=source_still, trailer_clip=trailer_clip, source_clip=source_clip, )) return rows # ---------------------------------------------------------------------------- # Markdown renderer # ---------------------------------------------------------------------------- def render_markdown( rows: list[BeatRow], trailer_fps: float, source_fps: float, trailer_path: Path, source_path: Path, ) -> str: matched = sum(1 for r in rows if r.matched) confirmed = sum(1 for r in rows if r.confirmed) out: list[str] = [] out.append("# Cutter-Report — manuelles Nachschneiden") out.append("") out.append(f"Stand: {date.today().isoformat()}") out.append("") out.append(f"- **Trailer**: `{trailer_path.name}` @ {trailer_fps:.3f} fps") out.append(f"- **Source** : `{source_path.name}` @ {source_fps:.3f} fps") out.append("") out.append( "Trailer-TC in Trailer-Framerate, Source-TC in Source-Framerate — " "passt 1:1 zum Schnittplatz." ) out.append("") out.append( "Bilder sind base64-eingebettet (kein toter Link). Für Video-Vorschau " "siehe `CUTTER_REPORT.html` (gleiche Daten, mit Clips)." ) out.append("") out.append("## Status-Legende") out.append("") out.append("| Status | Bedeutung | Was tun? |") out.append("|--------|-----------|----------|") out.append("| `OK` | bestätigt durch CV + Vision-Phasenprüfung | übernehmen |") out.append("| `?` | korrekte Szene, Phase ggf. um wenige Frames verschoben | im NLE prüfen |") out.append("| `MAN.` | kein automatischer Treffer | manuell setzen oder Schwarzfade |") out.append("") out.append(f"**Beats:** {len(rows)} gesamt · **{matched}** automatisch (**{confirmed}** bestätigt) · **{len(rows)-matched}** manuell.") out.append("") out.append("## Beat-Tabelle") out.append("") out.append("| Beat | Trailer In / Out | Source In / Out | Score | Status | Bild laut Vision |") out.append("|-----:|------------------|------------------|------:|:------:|-------------------|") for r in rows: ti = smpte(r.trailer_in_s, trailer_fps) to = smpte(r.trailer_out_s, trailer_fps) si = smpte(r.source_in_s, source_fps) if r.matched else "—" so = smpte(r.source_out_s, source_fps) if r.matched else "—" sc = f"{r.score:.3f}" if r.matched else "—" out.append(f"| {r.bid:>4} | {ti}-{to} | {si}-{so} | {sc} | {r.status} | {r.phase[:80]} |") out.append("") out.append("## Beat-Details") out.append("") for r in rows: ti = smpte(r.trailer_in_s, trailer_fps) to = smpte(r.trailer_out_s, trailer_fps) out.append(f"### Beat {r.bid:02d} — Status `{r.status}`") out.append("") out.append(f"- **Trailer**: {ti} – {to}") if r.matched: si = smpte(r.source_in_s, source_fps) so = smpte(r.source_out_s, source_fps) out.append(f"- **Source** : {si} – {so} (scene {r.scene_id}, score {r.score:.3f})") else: out.append("- **Source** : — (manuell setzen)") if r.phase: out.append(f"- **Phase** : {r.phase}") if r.composition: extra = f", {r.setting}" if r.setting else "" out.append(f"- **Bild** : {r.composition}{extra}") out.append("") # Inline base64 stills t_uri = data_uri(r.trailer_still, "image/jpeg") if r.trailer_still else None s_uri = data_uri(r.source_still, "image/jpeg") if r.source_still else None if t_uri or s_uri: out.append("| Trailer | Source |") out.append("|:---:|:---:|") t_cell = f"![Trailer beat {r.bid}]({t_uri})" if t_uri else "_(kein Still)_" s_cell = f"![Source beat {r.bid}]({s_uri})" if s_uri else "_(MAN.)_" out.append(f"| {t_cell} | {s_cell} |") out.append("") return "\n".join(out) # ---------------------------------------------------------------------------- # HTML renderer # ---------------------------------------------------------------------------- HTML_HEAD = """ Cutter-Report
""" HTML_FOOT = """
""" def html_escape(s: str) -> str: return (s.replace("&", "&").replace("<", "<") .replace(">", ">").replace('"', """)) def render_html( rows: list[BeatRow], trailer_fps: float, source_fps: float, trailer_path: Path, source_path: Path, with_clips: bool, ) -> str: matched = sum(1 for r in rows if r.matched) confirmed = sum(1 for r in rows if r.confirmed) parts: list[str] = [HTML_HEAD] parts.append(f'

Cutter-Report — {date.today().isoformat()}

') parts.append('
') parts.append(f"
Trailer {html_escape(trailer_path.name)} @ {trailer_fps:.3f} fps
") parts.append(f"
Source {html_escape(source_path.name)} @ {source_fps:.3f} fps
") parts.append("
Trailer-TC in Trailer-Framerate, Source-TC in Source-Framerate.
") parts.append("
") parts.append(f'
{len(rows)} Beats — {matched} automatisch ({confirmed} bestätigt) — {len(rows)-matched} manuell.
') parts.append("

Status-Legende

") parts.append('') parts.append('') parts.append('') parts.append('') parts.append('
StatusBedeutungWas tun?
OKbestätigt durch CV + Vision-Phasenprüfungübernehmen, optional sichten
?korrekte Szene, Phase ggf. um wenige Frames verschobenim NLE prüfen, Source-In nachjustieren
MAN.kein automatischer Treffermanuell suchen oder Schwarzfade
') # Compact table parts.append("

Beat-Tabelle

") parts.append('') for r in rows: ti = smpte(r.trailer_in_s, trailer_fps) to = smpte(r.trailer_out_s, trailer_fps) si = smpte(r.source_in_s, source_fps) if r.matched else "—" so = smpte(r.source_out_s, source_fps) if r.matched else "—" sc = f"{r.score:.3f}" if r.matched else "—" bcls = {"OK": "ok", "?": "q", "MAN.": "man"}[r.status] parts.append( f'' f'' f'' f'' f'' f'' ) parts.append("
BeatTrailer In / OutSource In / OutScoreStatusPhase
{r.bid}{ti}–{to}{si}–{so}{sc}{r.status}{html_escape(r.phase[:120])}
") # Per-beat detail cards parts.append("

Beat-Details

") for r in rows: ti = smpte(r.trailer_in_s, trailer_fps) to = smpte(r.trailer_out_s, trailer_fps) bcls = {"OK": "ok", "?": "q", "MAN.": "man"}[r.status] parts.append('
') parts.append(f'

Beat {r.bid:02d} {r.status}

') # Trailer column parts.append('
') parts.append('

Trailer

') clip_uri = data_uri(r.trailer_clip, "video/mp4") if (with_clips and r.trailer_clip) else None if clip_uri: parts.append(f'') elif r.trailer_still: uri = data_uri(r.trailer_still, "image/jpeg") or "" parts.append(f'Trailer beat {r.bid}') else: parts.append('
— kein Vorschaubild —
') parts.append(f'
TC {ti} – {to}
') if r.phase: parts.append(f'
Phase {html_escape(r.phase)}
') if r.composition: extra = f", {r.setting}" if r.setting else "" parts.append(f'
Bild {html_escape(r.composition + extra)}
') parts.append('
') # Source column parts.append('
') parts.append('

Source

') clip_uri = data_uri(r.source_clip, "video/mp4") if (with_clips and r.source_clip) else None if clip_uri: parts.append(f'') elif r.source_still: uri = data_uri(r.source_still, "image/jpeg") or "" parts.append(f'Source beat {r.bid}') else: parts.append('
— manuell setzen —
') if r.matched: si = smpte(r.source_in_s, source_fps) so = smpte(r.source_out_s, source_fps) parts.append(f'
TC {si} – {so}
') parts.append(f'
Scene {r.scene_id} · Score {r.score:.3f}
') else: parts.append('
— kein automatischer Treffer —
') parts.append('
') parts.append('
') # .beat parts.append(HTML_FOOT) return "".join(parts) # ---------------------------------------------------------------------------- # Top-level # ---------------------------------------------------------------------------- def render_report( project_root: Path, with_stills: bool = True, with_clips: bool = False, ) -> tuple[str, str]: """Return (markdown, html). Both written by main().""" sys.path.insert(0, str(project_root)) from src.core.config import load_config cfg = load_config(project_root / "config.toml") trailer_path = Path(cfg.paths.reference_trailer) source_path = Path(cfg.paths.source_movie) # Source fps: trust config.toml's edl_frame_rate (it's what the EDL/FCPXML # uses, hence what the cutter sees in the NLE). Fall back to ffprobe only # if no value is configured. source_fps = float(getattr(cfg.export, "edl_frame_rate", 0.0)) or probe_fps(source_path) or 23.976 # Trailer fps: optional config override, else ffprobe, else fallback to # source fps so the two sides at least share a number. trailer_fps_cfg = getattr(cfg.paths, "trailer_frame_rate", None) trailer_fps = float(trailer_fps_cfg) if trailer_fps_cfg else (probe_fps(trailer_path) or source_fps) cache = project_root / ".cache" results = {r["beat_id"]: r for r in json.loads((cache / "match_results.json").read_text())} beats = json.loads((cache / "trailer_beats.json").read_text()) vis_path = cache / "vision_descriptions.json" vis_items = json.loads(vis_path.read_text())["items"] if vis_path.exists() else {} rows = collect_rows( project_root, beats, results, vis_items, trailer_path, source_path, with_stills, with_clips, ) md = render_markdown(rows, trailer_fps, source_fps, trailer_path, source_path) html = render_html(rows, trailer_fps, source_fps, trailer_path, source_path, with_clips) return md, html def main() -> int: parser = argparse.ArgumentParser(description="Render CUTTER_REPORT.{md,html} from current cache") parser.add_argument("--no-stills", action="store_true", help="skip frame extraction (markdown stays text-only)") parser.add_argument("--with-clips", action="store_true", help="also render 3 s MP4 previews per beat (slow)") args = parser.parse_args() here = Path(__file__).resolve().parent project_root = here.parent md, html = render_report( project_root, with_stills=not args.no_stills, with_clips=args.with_clips, ) (project_root / "CUTTER_REPORT.md").write_text(md, encoding="utf-8") (project_root / "CUTTER_REPORT.html").write_text(html, encoding="utf-8") print(f"Wrote {project_root / 'CUTTER_REPORT.md'}") print(f"Wrote {project_root / 'CUTTER_REPORT.html'}") return 0 if __name__ == "__main__": raise SystemExit(main())