""" cli.py — AI Trailer Generator v2 — Command-Line Interface Usage: python cli.py analyze [--config CONFIG] [--no-audio] [--no-llm] python cli.py match [--config CONFIG] [--force-reindex] python cli.py rematch --beat N [--threshold F] [--refine] python cli.py report [--config CONFIG] python cli.py run [--config CONFIG] [--force-reindex] [--no-audio] [--no-llm] python cli.py export [--config CONFIG] [--format fcpxml|edl|both] On --no-audio / --no-llm: These flags do NOT affect matching quality. Whisper and the LLM only assign narrative labels (HOOK/SETUP/CLIMAX) to beats in the export metadata. The CV pipeline is identical either way. Use them for fast iterations: they skip large model downloads. All heavy imports are deferred so --help is instant. """ from __future__ import annotations import argparse import json import logging import sys from pathlib import Path # --------------------------------------------------------------------------- # Logging setup # --------------------------------------------------------------------------- def _setup_logging(level: str = "INFO") -> None: # Force UTF-8 for Windows console emoji printing if sys.stdout.encoding != 'utf-8': sys.stdout.reconfigure(encoding='utf-8') logging.basicConfig( format="%(asctime)s %(levelname)-8s %(name)s — %(message)s", datefmt="%H:%M:%S", level=getattr(logging, level.upper(), logging.INFO), stream=sys.stdout, ) logging.getLogger("PIL").setLevel(logging.WARNING) def _ensure_utf8_console() -> None: """Make argparse help safe on Windows before logging is configured.""" if sys.stdout.encoding != "utf-8": sys.stdout.reconfigure(encoding="utf-8") # --------------------------------------------------------------------------- # Cache helpers (match results ↔ JSON) # --------------------------------------------------------------------------- def _results_cache_path(cfg: "AppConfig") -> Path: # type: ignore[name-defined] return cfg.paths.cache_dir / "match_results.json" def _save_results(results: list, cfg: "AppConfig") -> None: # type: ignore[name-defined] from src.core.models import MatchResult data = [ { "beat_id": r.beat_id, "scene_id": r.scene_id, "source_path": str(r.source_path), "in_point_s": r.in_point_s, "out_point_s": r.out_point_s, "in_point_frame": r.in_point_frame, "match_score": r.match_score, "match_location": list(r.match_location), "is_confirmed": r.is_confirmed, "segments": [ { "trailer_offset_s": s.trailer_offset_s, "duration_s": s.duration_s, "scene_id": s.scene_id, "in_point_s": s.in_point_s, "out_point_s": s.out_point_s, "match_score": s.match_score, "is_confirmed": s.is_confirmed, } for s in getattr(r, "segments", ()) ], } for r in results ] p = _results_cache_path(cfg) p.parent.mkdir(parents=True, exist_ok=True) p.write_text(json.dumps(data, indent=2), encoding="utf-8") logging.getLogger(__name__).info("Match results cached → %s", p) def _auto_commit_push_reports(project_root: "Path") -> None: # type: ignore[name-defined] """Stage changed report files, commit, and push to origin. Only touches report output files — never stages source or config changes. Failures are logged but never propagate. """ import subprocess as _sp from datetime import datetime as _dt report_globs = [ "CUTTER_REPORT.html", "CUTTER_REPORT.md", "output/report/match_report.html", "output/report/beat_*_compare.mp4", "output/report/beat_*_src.mp4", "output/report/beat_*_ref.mp4", "output/cutter_clips/beat_*_compare.mp4", "output/cutter_clips/beat_*_source.mp4", "output/cutter_clips/beat_*_source_seg*.mp4", "output/cutter_clips/beat_*_trailer.mp4", "output/cutter_stills/beat_*_source.jpg", "output/cutter_stills/beat_*_trailer.jpg", ] log = logging.getLogger(__name__) cwd = str(project_root) try: for pattern in report_globs: _sp.run(["git", "add", "--", pattern], capture_output=True, cwd=cwd) status = _sp.run( ["git", "status", "--porcelain"], capture_output=True, text=True, cwd=cwd ) if not status.stdout.strip(): log.info("Auto-commit: nothing changed in report files.") return now = _dt.now().strftime("%Y-%m-%d %H:%M") msg = f"Auto-update cutter report {now}\n\nCo-Authored-By: Claude Sonnet 4.6 " _sp.run(["git", "commit", "-m", msg], capture_output=True, cwd=cwd, check=True) _sp.run(["git", "push", "origin", "main"], capture_output=True, cwd=cwd, check=True) log.info("Auto-commit+push: cutter report updated → remote.") except Exception as exc: log.warning("Auto-commit/push failed (non-fatal): %s", exc) def _regenerate_cutter_report(cfg: "AppConfig") -> None: # type: ignore[name-defined] """Re-render CUTTER_REPORT.{md,html} with Frame-Locked Compare clips. Called from every match-style command after the cache is written so all cutter-facing artefacts stay in sync with `match_results.json`. After rendering, stages and pushes changed report files to the remote. Failures are logged but never abort the run. """ project_root = cfg.paths.cache_dir.parent try: from scripts.generate_cutter_report import render_report md, html = render_report(project_root, with_stills=True, with_clips=True) (project_root / "CUTTER_REPORT.md").write_text(md, encoding="utf-8") (project_root / "CUTTER_REPORT.html").write_text(html, encoding="utf-8") legacy_report_path = project_root / "output" / "report" / "match_report.html" legacy_report_path.parent.mkdir(parents=True, exist_ok=True) legacy_report_path.write_text(html, encoding="utf-8") logging.getLogger(__name__).info("Cutter report regenerated (md + html + compare clips + legacy match_report.html)") except Exception as exc: logging.getLogger(__name__).warning("Cutter report regen failed: %s", exc) _auto_commit_push_reports(project_root) def _load_results(cfg: "AppConfig") -> list: # type: ignore[name-defined] from src.core.models import MatchResult, MatchSegment p = _results_cache_path(cfg) if not p.exists(): raise FileNotFoundError(f"No cached results at {p}. Run 'match' first.") raw = json.loads(p.read_text(encoding="utf-8")) return [ MatchResult( beat_id=d["beat_id"], scene_id=d["scene_id"], source_path=Path(d["source_path"]), in_point_s=d["in_point_s"], out_point_s=d["out_point_s"], in_point_frame=d["in_point_frame"], match_score=d["match_score"], match_location=tuple(d["match_location"]), is_confirmed=d.get("is_confirmed", True), segments=tuple( MatchSegment( trailer_offset_s=float(s["trailer_offset_s"]), duration_s=float(s["duration_s"]), scene_id=int(s["scene_id"]), in_point_s=float(s["in_point_s"]), out_point_s=float(s["out_point_s"]), match_score=float(s["match_score"]), is_confirmed=bool(s.get("is_confirmed", True)), ) for s in d.get("segments", ()) ), ) for d in raw ] def _load_scene_cache_light(cfg) -> list[dict]: p = cfg.paths.cache_dir / "scene_index.json" if not p.exists(): return [] return json.loads(p.read_text(encoding="utf-8")) def _scene_fps_light(scene: dict, cfg) -> float: duration_s = max(0.0, float(scene["end_s"]) - float(scene["start_s"])) frame_count = max(0, int(scene["end_frame"]) - int(scene["start_frame"])) return frame_count / duration_s if duration_s > 0 and frame_count > 0 else cfg.export.edl_frame_rate def _scene_for_time_light(scenes: list[dict], t_sec: float, cfg) -> dict | None: for idx, scene in enumerate(scenes): if float(scene["start_s"]) <= t_sec < float(scene["end_s"]): if ( float(scene["end_s"]) - t_sec <= cfg.cv.deep_scan.scene_boundary_epsilon_s and idx + 1 < len(scenes) ): return scenes[idx + 1] return scene return None def _scene_by_id_light(scenes: list[dict], scene_id: int) -> dict | None: return next((s for s in scenes if int(s["scene_id"]) == scene_id), None) def _contiguous_duration_light(beat, in_point_s: float, scenes: list[dict], cfg, matchable_duration_s: float) -> float: if matchable_duration_s <= 0: return 0.0 try: from src.cv.global_scan import _reference_internal_cut_offsets cut_offsets = _reference_internal_cut_offsets(beat, cfg) except Exception: cut_offsets = [] start_idx = None for idx, scene in enumerate(scenes): if float(scene["start_s"]) <= in_point_s < float(scene["end_s"]): start_idx = idx break if start_idx is None: return 0.0 target_end = in_point_s + matchable_duration_s current_end = in_point_s for scene in scenes[start_idx:]: scene_end = float(scene["end_s"]) if target_end <= scene_end: return matchable_duration_s boundary_offset = scene_end - in_point_s if not any( abs(boundary_offset - cut_offset) <= cfg.vision.multi_shot_boundary_tolerance_s for cut_offset in cut_offsets ): tail_s = max(0.0, cfg.cv.deep_scan.trim_tail_frames / _scene_fps_light(scene, cfg)) return max(0.0, scene_end - in_point_s - tail_s) current_end = scene_end return max(0.0, current_end - in_point_s) def _normalize_cached_results(beats: list, results: list, cfg) -> list: """ Re-apply current generic timing rules to cached results. This keeps old automatic cache entries from preserving obsolete scene-boundary or tail-trim behavior without introducing manual per-beat truth. """ from dataclasses import replace scenes = _load_scene_cache_light(cfg) if not scenes: return results beats_by_id = {b.beat_id: b for b in beats} normalized = [] for result in results: beat = beats_by_id.get(result.beat_id) if getattr(result, "segments", ()): segment_duration = sum(max(0.0, float(s.duration_s)) for s in result.segments) weighted_score = ( sum(max(0.0, float(s.duration_s)) * float(s.match_score) for s in result.segments) / segment_duration if segment_duration > 0 else result.match_score ) if weighted_score < cfg.cv.deep_scan.provisional_match_threshold: continue if beat is not None and beat.duration_s > 0: visible_duration = sum( max(0.0, end_s - start_s) for start_s, end_s in _reference_scoreable_segments(beat, cfg) ) coverage_target = visible_duration if visible_duration > 0 else beat.duration_s coverage = segment_duration / coverage_target if coverage < cfg.cv.deep_scan.min_duration_coverage: continue normalized.append(replace(result, match_score=weighted_score)) continue if result.match_score < cfg.cv.deep_scan.provisional_match_threshold: continue scene = _scene_for_time_light(scenes, result.in_point_s, cfg) declared_scene = _scene_by_id_light(scenes, result.scene_id) # If the automatic matcher selected a scene but its in-point sits just # before that scene's detected start, treat this as scene-boundary drift # and clamp to the declared scene. This is generic: no beat IDs, no # manual timestamps, just consistent scene/time reconciliation. if declared_scene is not None: declared_start = float(declared_scene["start_s"]) declared_end = float(declared_scene["end_s"]) declared_fps = _scene_fps_light(declared_scene, cfg) boundary_tolerance_s = ( cfg.cv.deep_scan.scene_boundary_epsilon_s + cfg.cv.deep_scan.start_preroll_frames / declared_fps ) if declared_start - boundary_tolerance_s <= result.in_point_s < declared_end: scene = declared_scene if beat is None or scene is None: normalized.append(result) continue fps = _scene_fps_light(scene, cfg) adjusted_in_s = result.in_point_s scene_changed = int(scene["scene_id"]) != result.scene_id starts_before_scene = result.in_point_s < float(scene["start_s"]) if scene_changed or starts_before_scene or result.duration_s <= 0.12: adjusted_in_s = max(0.0, result.in_point_s - (cfg.cv.deep_scan.start_preroll_frames / fps)) adjusted_in_s = max(float(scene["start_s"]), adjusted_in_s) scene = _scene_for_time_light(scenes, adjusted_in_s, cfg) or scene fps = _scene_fps_light(scene, cfg) matchable_duration_s = beat.duration_s try: from src.cv.global_scan import estimate_matchable_reference_duration matchable_duration_s = estimate_matchable_reference_duration(beat, cfg) except Exception: pass tail_s = max(0.0, cfg.cv.deep_scan.trim_tail_frames / fps) single_scene_duration_s = max(0.0, min(beat.duration_s, float(scene["end_s"]) - adjusted_in_s) - tail_s) contiguous_duration_s = _contiguous_duration_light( beat, adjusted_in_s, scenes, cfg, matchable_duration_s, ) max_duration_s = max(single_scene_duration_s, min(beat.duration_s, contiguous_duration_s)) normalized_result = result if ( scene_changed or starts_before_scene or result.duration_s <= 0.12 or result.out_point_s > adjusted_in_s + max_duration_s + (1.0 / fps) ): normalized_result = replace( result, scene_id=int(scene["scene_id"]), in_point_s=adjusted_in_s, out_point_s=adjusted_in_s + max_duration_s, in_point_frame=int(adjusted_in_s * fps), ) coverage = ( max(0.0, normalized_result.duration_s) / matchable_duration_s if matchable_duration_s > 0 else 0.0 ) if coverage < cfg.cv.deep_scan.min_duration_coverage: continue try: from src.cv.content_align import align_cached_match_by_content _, content_score = align_cached_match_by_content( beat, normalized_result.in_point_s, cfg, search_window_s=min(0.8, cfg.cv.deep_scan.content_align_window_seconds), fps=12.5, ) content_gate = ( cfg.cv.deep_scan.provisional_content_threshold if normalized_result.is_confirmed else min(cfg.cv.deep_scan.provisional_content_threshold, cfg.vision.content_threshold) ) if content_score < content_gate: continue if content_score < cfg.cv.deep_scan.match_threshold and normalized_result.is_confirmed: normalized_result = replace( normalized_result, match_score=min(normalized_result.match_score, content_score), is_confirmed=False, ) except Exception: pass normalized.append(normalized_result) return normalized # --------------------------------------------------------------------------- # Command handlers # --------------------------------------------------------------------------- def _build_transcribe_callback(cfg): """Return a transcribe_callback closure, or None if audio is disabled.""" from src.audio.transcriber import transcribe_video def _cb(path, start_s, end_s, offset_s): return transcribe_video(path, cfg, start_s=start_s, end_s=end_s, time_offset_s=offset_s) return _cb def _build_classify_callback(cfg): """Return a classify_callback closure.""" from src.llm.dramaturg import classify_beats def _cb(beats): return classify_beats(beats, cfg) return _cb def cmd_analyze(args: argparse.Namespace, cfg) -> list: from src.pipeline.trailer_analyzer import analyze_reference_trailer transcribe_cb = _build_transcribe_callback(cfg) if not args.no_audio else None classify_cb = _build_classify_callback(cfg) if not args.no_llm else None beats = analyze_reference_trailer( cfg, transcribe_callback=transcribe_cb, classify_callback=classify_cb, ) # Persist beats for downstream commands (including histogram bytes as hex) beats_cache = cfg.paths.cache_dir / "trailer_beats.json" beats_cache.parent.mkdir(parents=True, exist_ok=True) beats_data = [ { "beat_id": b.beat_id, "start_s": b.start_s, "end_s": b.end_s, "start_frame": b.start_frame, "end_frame": b.end_frame, "beat_type": b.beat_type.name, "dialogue": [{"start_s": d.start_s, "end_s": d.end_s, "text": d.text} for d in b.dialogue], "phash": b.phash, "luma_hist": b.luma_hist.hex() if b.luma_hist else None, "sat_hist": b.sat_hist.hex() if b.sat_hist else None, } for b in beats ] beats_cache.write_text(json.dumps(beats_data, indent=2, ensure_ascii=False), encoding="utf-8") print(f"\n\u2705 {len(beats)} beats analyzed \u2192 {beats_cache}") return beats def _load_beats(cfg) -> list: from src.core.models import BeatType, DialogueLine, TrailerBeat p = cfg.paths.cache_dir / "trailer_beats.json" if not p.exists(): raise FileNotFoundError(f"No cached beats at {p}. Run 'analyze' first.") raw = json.loads(p.read_text(encoding="utf-8")) beats = [] for d in raw: dialogue = tuple( DialogueLine(start_s=x["start_s"], end_s=x["end_s"], text=x["text"]) for x in d.get("dialogue", []) ) beats.append(TrailerBeat( beat_id=d["beat_id"], trailer_path=cfg.paths.reference_trailer, start_s=d["start_s"], end_s=d["end_s"], start_frame=d["start_frame"], end_frame=d["end_frame"], beat_type=BeatType[d.get("beat_type", "UNKNOWN")], dialogue=dialogue, phash=d.get("phash"), luma_hist=bytes.fromhex(d["luma_hist"]) if d.get("luma_hist") else None, sat_hist= bytes.fromhex(d["sat_hist"]) if d.get("sat_hist") else None, )) return beats def _select_beats(beats: list, beat_id: int | None) -> list: """Return all beats or exactly one requested beat.""" if beat_id is None: return beats selected = [b for b in beats if b.beat_id == beat_id] if not selected: raise ValueError(f"Beat {beat_id} not found. Run 'analyze' first.") return selected def _select_results(results: list, beat_ids: set[int] | None) -> list: """Return all results or only results for the requested beats.""" if beat_ids is None: return results return [r for r in results if r.beat_id in beat_ids] def _find_scene_for_in_point(cfg, in_point_s: float): from src.cv.scene_indexer import build_scene_index scenes = build_scene_index(cfg) for idx, scene in enumerate(scenes): if scene.start_s <= in_point_s < scene.end_s: if ( scene.end_s - in_point_s <= cfg.cv.deep_scan.scene_boundary_epsilon_s and idx + 1 < len(scenes) ): return scenes[idx + 1] return scene return None def _reference_scoreable_segments(beat, cfg) -> list[tuple[float, float]]: """Find visible source-matchable islands inside a trailer beat.""" from src.cv.frame_extractor import grab_frame_at_path from src.cv.global_scan import ( _corr_same_size, _is_scoreable_reference_frame, _prepare_haystack, _reference_visibility_stats, ) def is_visible(frame) -> bool: if frame is None: return False mean_luma, p90_luma, contrast = _reference_visibility_stats(frame, cfg) visible_luma = ( mean_luma >= cfg.cv.deep_scan.scoreable_luma_mean_min * 0.45 or p90_luma >= cfg.cv.deep_scan.scoreable_luma_p90_min * 0.50 ) visible_contrast = contrast >= max(8.0, cfg.cv.deep_scan.scoreable_contrast_min * 0.30) return visible_luma and visible_contrast step_s = max(0.08, cfg.cv.deep_scan.span_sample_step_s) min_segment_s = max(0.32, step_s * 3.0) bridge_gap_s = max(0.18, step_s * 2.0) raw: list[tuple[float, float]] = [] start: float | None = None last_seen: float | None = None t = 0.0 while t <= beat.duration_s: frame = grab_frame_at_path(beat.trailer_path, beat.start_s + t) scoreable = frame is not None and _is_scoreable_reference_frame(frame, cfg) if scoreable: if start is None: start = t last_seen = t elif start is not None and last_seen is not None and t - last_seen > bridge_gap_s: end = min(beat.duration_s, last_seen + step_s) if end - start >= min_segment_s: raw.append((start, end)) start = None last_seen = None t = round(t + step_s, 6) if start is not None and last_seen is not None: end = min(beat.duration_s, last_seen + step_s) if end - start >= min_segment_s: raw.append((start, end)) expanded: list[tuple[float, float]] = [] same_shot_corr_min = 0.72 for start_s, end_s in raw: start_anchor = grab_frame_at_path(beat.trailer_path, beat.start_s + start_s) end_anchor = grab_frame_at_path(beat.trailer_path, beat.start_s + max(start_s, end_s - step_s)) start_feature = _prepare_haystack(start_anchor, cfg) if start_anchor is not None else None end_feature = _prepare_haystack(end_anchor, cfg) if end_anchor is not None else None soft_start = start_s t = round(start_s - step_s, 6) while t >= 0.0: frame = grab_frame_at_path(beat.trailer_path, beat.start_s + t) if not is_visible(frame): break if start_feature is not None and _corr_same_size(_prepare_haystack(frame, cfg), start_feature) < same_shot_corr_min: break soft_start = max(0.0, t) t = round(t - step_s, 6) soft_end = end_s t = round(end_s, 6) while t <= beat.duration_s + 1e-6: frame = grab_frame_at_path(beat.trailer_path, beat.start_s + t) if not is_visible(frame): break if end_feature is not None and _corr_same_size(_prepare_haystack(frame, cfg), end_feature) < same_shot_corr_min: break soft_end = min(beat.duration_s, t + step_s) t = round(t + step_s, 6) if soft_end - soft_start >= min_segment_s: expanded.append((soft_start, soft_end)) merged: list[tuple[float, float]] = [] for start_s, end_s in expanded: if merged and start_s - merged[-1][1] <= bridge_gap_s: merged[-1] = (merged[-1][0], max(merged[-1][1], end_s)) else: merged.append((start_s, end_s)) return merged def _fade_content_shots(beat, cfg) -> list[tuple[float, float]]: """Find low-luma fade regions adjacent to visible islands that still carry describable content (e.g. a hand+knife silhouette during a cross-fade). These regions are too dark for CV template matching but vision can read structure during the fade — the matcher therefore treats them as their own shots and routes them through the vision-led search path. A fade region qualifies when, sampled inside the region, the brightest frame has p90 ≥ 12 (not pure black) and contrast ≥ 8 (some structure) AND the region duration is ≥ 0.2 s. Pure-black/featureless fades stay excluded. """ from src.cv.frame_extractor import grab_frame_at_path from src.cv.global_scan import _reference_visibility_stats islands = _reference_scoreable_segments(beat, cfg) if not islands: return [] step_s = max(0.04, cfg.cv.deep_scan.span_sample_step_s) min_fade_s = 0.2 def has_content(start_s: float, end_s: float) -> bool: if end_s - start_s < min_fade_s: return False peak_p90 = 0.0 peak_contrast = 0.0 t = start_s while t < end_s: frame = grab_frame_at_path(beat.trailer_path, beat.start_s + t) if frame is not None: _, p90, contrast = _reference_visibility_stats(frame, cfg) peak_p90 = max(peak_p90, p90) peak_contrast = max(peak_contrast, contrast) t = round(t + step_s, 6) return peak_p90 >= 12.0 and peak_contrast >= 8.0 fades: list[tuple[float, float]] = [] # Between-island fades only: these are genuine cross-fade silhouettes # (one visible shot dissolves into another through a dim middle frame). # Pre-island fades are fade-from-black leaders; post-island fades are # fade-to-black trailers — neither is a source-matchable shot on its own. for prev_isl, next_isl in zip(islands, islands[1:]): gap_start, gap_end = prev_isl[1], next_isl[0] if has_content(gap_start, gap_end): fades.append((gap_start, gap_end)) return fades def _reference_shot_segments(beat, cfg) -> list[tuple[float, float]]: """Source-matchable shot ranges inside a trailer beat. Returns a sorted list of (start_s, end_s) tuples covering: * each visible island, further split at internal hard cuts; * each fade region adjacent to an island that still carries describable content (e.g. a silhouette during a cross-fade) — these get matched via the vision-led search path because CV templates against the dark frames are unusable. Tiny sub-shots are merged so noisy cut detection doesn't fragment a real shot into useless slivers. """ from src.cv.global_scan import _reference_internal_cut_offsets islands = _reference_scoreable_segments(beat, cfg) try: cut_offsets = sorted(_reference_internal_cut_offsets(beat, cfg)) except Exception: cut_offsets = [] fade_shots = _fade_content_shots(beat, cfg) if not cut_offsets and not fade_shots: return islands min_shot_s = max(0.4, cfg.cv.deep_scan.span_sample_step_s * 4.0) shots: list[tuple[float, float]] = [] for start_s, end_s in islands: boundaries = [start_s] for cut in cut_offsets: if start_s + 1e-3 < cut < end_s - 1e-3: boundaries.append(cut) boundaries.append(end_s) for i in range(len(boundaries) - 1): seg_start = boundaries[i] seg_end = boundaries[i + 1] if seg_end - seg_start < min_shot_s and shots and shots[-1][1] >= seg_start - 1e-3: # merge into previous if the new piece is too short shots[-1] = (shots[-1][0], seg_end) elif seg_end - seg_start >= min_shot_s: shots.append((seg_start, seg_end)) elif shots: shots[-1] = (shots[-1][0], seg_end) else: shots.append((seg_start, seg_end)) # Add fade-content shots (cross-fade silhouettes / dim shot boundaries) # sorted with the visible-island shots so the matcher sees them in # trailer-time order. if fade_shots: all_shots = sorted(list(shots) + list(fade_shots), key=lambda iv: iv[0]) # Drop overlaps in case a fade region brushes against an island # by a few frames; the island wins. cleaned: list[tuple[float, float]] = [] for s, e in all_shots: if cleaned and s < cleaned[-1][1]: if e > cleaned[-1][1]: cleaned.append((cleaned[-1][1], e)) continue cleaned.append((s, e)) return cleaned return shots if shots else islands def _trim_beats_to_single_visual_island(beats: list, cfg) -> tuple[list, dict[int, tuple[float, float]]]: """Use a single visible island as the primary match target for faded beats.""" from dataclasses import replace trimmed = [] trims: dict[int, tuple[float, float]] = {} frame_s = 1.0 / max(1.0, float(cfg.export.edl_frame_rate)) for beat in beats: islands = _reference_scoreable_segments(beat, cfg) if len(islands) == 1: start_s, end_s = islands[0] island_duration_s = max(0.0, end_s - start_s) has_real_trim = ( start_s > frame_s * 1.5 or beat.duration_s - end_s > frame_s * 1.5 ) if island_duration_s > 0.0 and has_real_trim: trimmed.append( replace( beat, start_s=beat.start_s + start_s, end_s=beat.start_s + end_s, ) ) trims[beat.beat_id] = (start_s, island_duration_s) continue trimmed.append(beat) return trimmed, trims def _apply_single_island_segments(results: list, trims: dict[int, tuple[float, float]]) -> list: """Restore beat-relative segment metadata after matching a trimmed island.""" if not trims: return results from dataclasses import replace from src.core.models import MatchSegment expanded = [] for result in results: trim = trims.get(result.beat_id) if trim is None or getattr(result, "segments", ()): expanded.append(result) continue trailer_offset_s, island_duration_s = trim duration_s = min(max(0.0, island_duration_s), max(0.0, result.duration_s)) segment = MatchSegment( trailer_offset_s=trailer_offset_s, duration_s=duration_s, scene_id=result.scene_id, in_point_s=result.in_point_s, out_point_s=result.in_point_s + duration_s, match_score=result.match_score, is_confirmed=result.is_confirmed, ) expanded.append( replace( result, out_point_s=result.in_point_s + duration_s, segments=(segment,), ) ) return expanded def _keeps_cached_match(old, new, cfg) -> bool: """Return True when the old cached match is better than the new one and should be kept. Specifically protects multi-segment provisional matches from being replaced by a weaker single-span result. The old entry wins when it has segments (explicitly tuned multi-shot layout) and the new result has none AND is not a score improvement. """ if old is None or new is None: return False old_segs = getattr(old, "segments", ()) or () new_segs = getattr(new, "segments", ()) or () if old_segs and not new_segs and new.match_score <= old.match_score: return True return False def _merge_best_results(existing: list, candidates: list, cfg) -> list: """Merge matches by beat, preferring confirmed or higher-scoring results.""" by_id = {r.beat_id: r for r in existing} for candidate in candidates: old = by_id.get(candidate.beat_id) if old is None: by_id[candidate.beat_id] = candidate continue candidate_confirmed = candidate.match_score >= cfg.cv.deep_scan.match_threshold or candidate.is_confirmed old_confirmed = old.match_score >= cfg.cv.deep_scan.match_threshold or old.is_confirmed if ( candidate_confirmed and not old_confirmed or candidate.match_score > old.match_score + cfg.cv.deep_scan.duration_tie_break_score_delta or ( candidate.match_score >= old.match_score - cfg.cv.deep_scan.duration_tie_break_score_delta and candidate.duration_s > old.duration_s ) ): by_id[candidate.beat_id] = candidate return sorted(by_id.values(), key=lambda r: r.beat_id) def _recover_unmatched_beats_via_vision(results: list, beats: list, cfg) -> list: """Try a vision-led search for beats that ended up without a match. For each unmatched beat that has scoreable visual content (i.e. not pure fade/title-card material), this pass: 1. Asks the vibe-check (CV histogram + pHash) for the top-K candidate scenes. 2. For each candidate, runs the semantic action-window search with the beat's own description, prefering windows whose phase matches the visible part of the beat. 3. Refines the in-point with the regular CV content/motion aligner. 4. Validates the resulting window with the vision phase check, exactly like the main filter. 5. Adds the best validated candidate as a provisional MatchResult. Confirmed and provisional matches both stay subject to the same thresholds used elsewhere; this only adds matches that pass the same quality gates. """ if not cfg.vision.enabled or not beats: return results from dataclasses import replace from src.cv.global_scan import align_in_point_by_content_and_motion, estimate_usable_source_duration from src.cv.scene_indexer import build_scene_index from src.cv.vibe_check import run_vibe_check from src.core.models import MatchResult from src.llm.vision_cache import find_action_window_in_scene, validate_match_window_with_vision logger = logging.getLogger(__name__) matched_ids = {r.beat_id for r in results} unmatched = [b for b in beats if b.beat_id not in matched_ids] if not unmatched: return results scenes = build_scene_index(cfg) if not scenes: return results new_results = list(results) for beat in unmatched: try: islands = _reference_scoreable_segments(beat, cfg) except Exception: islands = [] # Anchor selection: prefer the longest visible island; if none exists, # fall back to the full beat. The latter handles dark / low-contrast # close-ups that drop below the scoreable luma/contrast thresholds but # are still semantically describable. The strict vision phase # validation later in this pass keeps us from accepting pure title-card # or logo material. from dataclasses import replace as _replace if islands: anchor_start_s, anchor_end_s = max(islands, key=lambda iv: iv[1] - iv[0]) anchor_beat = _replace( beat, start_s=beat.start_s + anchor_start_s, end_s=beat.start_s + anchor_end_s, ) else: anchor_beat = beat try: hits = run_vibe_check( beat, scenes, top_k=max(cfg.cv.deep_scan.scene_seed_top_k, cfg.cv.vibe_check.top_k_candidates), hist_method=cfg.cv.vibe_check.hist_compare_method, phash_max_distance=64, ) except Exception as exc: logger.warning("Beat %d: recovery vibe-check failed (%s)", beat.beat_id, exc) continue scenes_by_id = {s.scene_id: s for s in scenes} best = None # (score, scene, in_s, dur_s, reason) seen = set() for hit in hits[: cfg.cv.deep_scan.scene_seed_top_k]: scene = scenes_by_id.get(hit.scene_id) if scene is None or scene.scene_id in seen: continue seen.add(scene.scene_id) try: found = find_action_window_in_scene(anchor_beat, scene, cfg) except Exception as exc: logger.debug("Beat %d: action window failed for scene %d (%s)", beat.beat_id, scene.scene_id, exc) continue if found is None: continue start_s, end_s, semantic_score, reason = found window_s = max(3.0, min(8.0, (end_s - start_s) * 4.0)) try: aligned_in_s, combined_score, content_score, motion_score = align_in_point_by_content_and_motion( anchor_beat, start_s, cfg, search_window_s=window_s, ) except Exception as exc: logger.debug("Beat %d: align failed for scene %d (%s)", beat.beat_id, scene.scene_id, exc) continue aligned_in_s = max(scene.start_s, min(aligned_in_s, max(scene.start_s, scene.end_s - anchor_beat.duration_s))) try: usable_duration_s, usable_score = estimate_usable_source_duration(anchor_beat, aligned_in_s, cfg) except Exception: usable_duration_s, usable_score = anchor_beat.duration_s, 0.0 usable_duration_s = max(0.0, min(anchor_beat.duration_s, usable_duration_s)) if usable_duration_s < max(0.32, anchor_beat.duration_s * 0.45): usable_duration_s = anchor_beat.duration_s try: ok, verify_reason = validate_match_window_with_vision( anchor_beat, source_path=scene.source_path, scene_id=scene.scene_id, in_point_s=aligned_in_s, out_point_s=aligned_in_s + usable_duration_s, cfg=cfg, ) except Exception as exc: logger.debug("Beat %d: validate failed scene=%d (%s)", beat.beat_id, scene.scene_id, exc) continue if not ok: continue final_score = max( combined_score, min(0.99, semantic_score * 0.65 + motion_score * 0.18 + content_score * 0.09 + usable_score * 0.08), ) if final_score < cfg.cv.deep_scan.provisional_match_threshold: continue candidate = (final_score, scene, aligned_in_s, usable_duration_s, f"recovery; {reason}; {verify_reason}") if best is None or candidate[0] > best[0]: best = candidate if best is None: continue score, scene, aligned_in_s, usable_duration_s, repair_reason = best logger.info( "Beat %d: recovered via vision action search scene=%d in=%.3fs score=%.3f (%s)", beat.beat_id, scene.scene_id, aligned_in_s, score, repair_reason, ) new_results.append(MatchResult( beat_id=beat.beat_id, scene_id=scene.scene_id, source_path=scene.source_path, in_point_s=aligned_in_s, out_point_s=aligned_in_s + usable_duration_s, in_point_frame=int(aligned_in_s * cfg.export.edl_frame_rate), match_score=score, match_location=(0, 0), is_confirmed=score >= cfg.cv.deep_scan.match_threshold, segments=tuple(), )) return sorted(new_results, key=lambda r: r.beat_id) def _filter_semantically_invalid_vision_matches(results: list, beats: list, cfg) -> list: """Drop vision-enabled matches whose final action phase contradicts the beat.""" if not cfg.vision.enabled or not results: return results from dataclasses import replace from src.llm.vision_cache import find_action_window_in_scene, validate_match_window_with_vision from src.cv.scene_indexer import build_scene_index from src.cv.global_scan import align_in_point_by_content_and_motion, estimate_usable_source_duration logger = logging.getLogger(__name__) beats_by_id = {beat.beat_id: beat for beat in beats} scenes_by_id = {scene.scene_id: scene for scene in build_scene_index(cfg)} def visible_content_offset(action_beat, segment_start_offset_s: float) -> float: content_offset_s = 0.0 for start_s, end_s in _reference_scoreable_segments(action_beat, cfg): if end_s <= segment_start_offset_s: content_offset_s += max(0.0, end_s - start_s) elif start_s < segment_start_offset_s: content_offset_s += max(0.0, segment_start_offset_s - start_s) break else: break return content_offset_s def realign_window(check_beat, scene_id: int, action_beat=None): scene = scenes_by_id.get(scene_id) if scene is None: return None segment_window = find_action_window_in_scene(check_beat, scene, cfg) if action_beat is not None and action_beat is not check_beat: beat_window = find_action_window_in_scene(action_beat, scene, cfg) else: beat_window = None use_beat_context = False if segment_window is None: found = beat_window use_beat_context = beat_window is not None elif beat_window is None: found = segment_window elif beat_window[2] > segment_window[2] + 0.06: found = beat_window use_beat_context = True else: found = segment_window if found is None: return None start_s, end_s, semantic_score, reason = found if use_beat_context: segment_start_offset_s = max(0.0, check_beat.start_s - action_beat.start_s) content_offset_s = visible_content_offset(action_beat, segment_start_offset_s) start_s += content_offset_s end_s += content_offset_s window_s = max(3.0, min(8.0, (end_s - start_s) * 4.0)) aligned_in_s, combined_score, content_score, motion_score = align_in_point_by_content_and_motion( check_beat, start_s, cfg, search_window_s=window_s, ) aligned_in_s = max(scene.start_s, min(aligned_in_s, max(scene.start_s, scene.end_s - check_beat.duration_s))) usable_duration_s, usable_score = estimate_usable_source_duration(check_beat, aligned_in_s, cfg) usable_duration_s = max(0.0, min(check_beat.duration_s, usable_duration_s)) if usable_duration_s < max(0.32, check_beat.duration_s * 0.45): usable_duration_s = check_beat.duration_s ok, verify_reason = validate_match_window_with_vision( check_beat, source_path=scene.source_path, scene_id=scene.scene_id, in_point_s=aligned_in_s, out_point_s=aligned_in_s + usable_duration_s, cfg=cfg, ) if not ok: logger.info( "Beat %d: action-window realign rejected scene=%d in=%.3fs (%s)", check_beat.beat_id, scene.scene_id, aligned_in_s, verify_reason, ) return None score = max( combined_score, min(0.99, semantic_score * 0.65 + motion_score * 0.18 + content_score * 0.09 + usable_score * 0.08), ) return scene, aligned_in_s, usable_duration_s, score, f"{reason}; {verify_reason}" kept = [] for result in results: beat = beats_by_id.get(result.beat_id) if beat is None: kept.append(result) continue kept_before = len(kept) try: _filter_repair_one(result, beat, beats_by_id, scenes_by_id, kept, cfg, realign_window, validate_match_window_with_vision, logger) except Exception as exc: logger.warning( "Beat %d: vision filter/repair failed (%s); keeping previous cached match.", result.beat_id, exc, ) del kept[kept_before:] kept.append(result) return kept def _filter_repair_one(result, beat, beats_by_id, scenes_by_id, kept, cfg, realign_window, validate_match_window_with_vision, logger): from dataclasses import replace if True: windows = [] if getattr(result, "segments", ()): for segment in result.segments: segment_beat = replace( beat, start_s=beat.start_s + segment.trailer_offset_s, end_s=beat.start_s + segment.trailer_offset_s + segment.duration_s, ) windows.append(( segment_beat, segment.scene_id, segment.in_point_s, segment.out_point_s, )) else: windows.append((beat, result.scene_id, result.in_point_s, result.out_point_s)) valid = True reasons: list[str] = [] for check_beat, scene_id, in_point_s, out_point_s in windows: ok, reason = validate_match_window_with_vision( check_beat, source_path=result.source_path, scene_id=scene_id, in_point_s=in_point_s, out_point_s=out_point_s, cfg=cfg, ) reasons.append(reason) if not ok: valid = False break if valid: repaired = False if getattr(result, "segments", ()): new_segments = [] repair_reasons = [] changed = False for segment in result.segments: scene = scenes_by_id.get(segment.scene_id) # Allow phase-realign whenever the scene has any meaningful # slack beyond the segment, not only for "long" scenes. # Short scenes don't need realigning because the segment # essentially is the scene. if scene is None or scene.duration_s <= segment.duration_s + 0.5: new_segments.append(segment) continue # For already-confirmed segments, skip the realign to avoid # destabilizing a strong original match. if segment.is_confirmed and scene.duration_s <= max(segment.duration_s * 1.6, 6.0): new_segments.append(segment) continue segment_beat = replace( beat, start_s=beat.start_s + segment.trailer_offset_s, end_s=beat.start_s + segment.trailer_offset_s + segment.duration_s, ) repair = realign_window(segment_beat, segment.scene_id, action_beat=beat) if repair is None: new_segments.append(segment) continue repair_scene, aligned_in_s, usable_duration_s, score, repair_reason = repair if abs(aligned_in_s - segment.in_point_s) <= 1.0 / cfg.export.edl_frame_rate: new_segments.append(segment) continue # Don't commit a repair that scores meaningfully worse than # the original; phase realign should improve, not regress. if score < segment.match_score - 0.02: new_segments.append(segment) continue changed = True repair_reasons.append(repair_reason) new_segments.append(replace( segment, scene_id=repair_scene.scene_id, in_point_s=aligned_in_s, out_point_s=aligned_in_s + usable_duration_s, duration_s=usable_duration_s, match_score=score, is_confirmed=score >= cfg.cv.deep_scan.match_threshold, )) if changed and new_segments: first = new_segments[0] repaired_score = min(seg.match_score for seg in new_segments) logger.info( "Beat %d: realigned semantically valid long scene by motion/action windows (%s)", result.beat_id, "; ".join(repair_reasons), ) kept.append(replace( result, scene_id=first.scene_id, in_point_s=first.in_point_s, out_point_s=first.out_point_s, in_point_frame=int(first.in_point_s * cfg.export.edl_frame_rate), match_score=repaired_score, is_confirmed=repaired_score >= cfg.cv.deep_scan.match_threshold, segments=tuple(new_segments), )) repaired = True else: scene = scenes_by_id.get(result.scene_id) wide_scene = ( scene is not None and scene.duration_s > result.duration_s + 0.5 ) already_confirmed_in_tight_scene = ( result.is_confirmed and scene is not None and scene.duration_s <= max(result.duration_s * 1.6, 6.0) ) if wide_scene and not already_confirmed_in_tight_scene: repair = realign_window(beat, result.scene_id) if repair is not None: repair_scene, aligned_in_s, usable_duration_s, score, repair_reason = repair moved = abs(aligned_in_s - result.in_point_s) > 1.0 / cfg.export.edl_frame_rate improved = score >= result.match_score - 0.02 if moved and improved: logger.info( "Beat %d: realigned semantically valid long scene by motion/action window (%s)", result.beat_id, repair_reason, ) kept.append(replace( result, scene_id=repair_scene.scene_id, in_point_s=aligned_in_s, out_point_s=aligned_in_s + usable_duration_s, in_point_frame=int(aligned_in_s * cfg.export.edl_frame_rate), match_score=score, is_confirmed=score >= cfg.cv.deep_scan.match_threshold, )) repaired = True if not repaired: kept.append(result) else: if getattr(result, "segments", ()): new_segments = [] all_repaired = True repair_reasons = [] for segment in result.segments: segment_beat = replace( beat, start_s=beat.start_s + segment.trailer_offset_s, end_s=beat.start_s + segment.trailer_offset_s + segment.duration_s, ) repair = realign_window(segment_beat, segment.scene_id, action_beat=beat) if repair is None: all_repaired = False break scene, aligned_in_s, usable_duration_s, score, repair_reason = repair repair_reasons.append(repair_reason) new_segments.append(replace( segment, scene_id=scene.scene_id, in_point_s=aligned_in_s, out_point_s=aligned_in_s + usable_duration_s, duration_s=usable_duration_s, match_score=score, is_confirmed=score >= cfg.cv.deep_scan.match_threshold, )) if all_repaired and new_segments: first = new_segments[0] repaired_score = min(seg.match_score for seg in new_segments) logger.info( "Beat %d: realigned inside matched scene by vision action windows (%s)", result.beat_id, "; ".join(repair_reasons), ) kept.append(replace( result, scene_id=first.scene_id, in_point_s=first.in_point_s, out_point_s=first.out_point_s, in_point_frame=int(first.in_point_s * cfg.export.edl_frame_rate), match_score=repaired_score, is_confirmed=repaired_score >= cfg.cv.deep_scan.match_threshold, segments=tuple(new_segments), )) return else: repair = realign_window(beat, result.scene_id) if repair is not None: scene, aligned_in_s, usable_duration_s, score, repair_reason = repair logger.info( "Beat %d: realigned inside matched scene by vision action window (%s)", result.beat_id, repair_reason, ) kept.append(replace( result, scene_id=scene.scene_id, in_point_s=aligned_in_s, out_point_s=aligned_in_s + usable_duration_s, in_point_frame=int(aligned_in_s * cfg.export.edl_frame_rate), match_score=score, is_confirmed=score >= cfg.cv.deep_scan.match_threshold, )) return logger.warning( "Beat %d: rejected by vision action-phase verification (%s)", result.beat_id, "; ".join(reasons), ) def _attach_visual_segments(results: list, beats: list, cfg) -> list: """Attach automatic sub-shot matches for multi-island trailer beats.""" from dataclasses import replace from src.core.models import MatchResult, MatchSegment from src.cv.global_scan import run_global_scan by_id = {b.beat_id: b for b in beats} expanded: list[MatchResult] = [] for result in results: beat = by_id.get(result.beat_id) if beat is None: expanded.append(result) continue if getattr(result, "segments", ()): expanded.append(result) continue islands = _reference_scoreable_segments(beat, cfg) if len(islands) <= 1: primary = MatchSegment( trailer_offset_s=0.0, duration_s=max(0.0, result.duration_s), scene_id=result.scene_id, in_point_s=result.in_point_s, out_point_s=result.out_point_s, match_score=result.match_score, is_confirmed=result.is_confirmed, ) expanded.append(replace(result, segments=(primary,))) continue segments: list[MatchSegment] = [] first_start, first_end = islands[0] first_duration = min(max(0.0, result.duration_s), max(0.0, first_end - first_start)) segments.append( MatchSegment( trailer_offset_s=first_start, duration_s=first_duration, scene_id=result.scene_id, in_point_s=result.in_point_s, out_point_s=result.in_point_s + first_duration, match_score=result.match_score, is_confirmed=result.is_confirmed, ) ) for start_s, end_s in islands[1:]: segment_beat = replace( beat, start_s=beat.start_s + start_s, end_s=beat.start_s + end_s, ) segment_matches = run_global_scan([segment_beat], cfg, seed_in_points=None) if not segment_matches: continue seg = segment_matches[0] seg_dur = min(max(0.0, end_s - start_s), max(0.0, seg.duration_s)) segments.append( MatchSegment( trailer_offset_s=start_s, duration_s=seg_dur, scene_id=seg.scene_id, in_point_s=seg.in_point_s, out_point_s=seg.in_point_s + seg_dur, match_score=seg.match_score, is_confirmed=seg.is_confirmed, ) ) expanded.append(replace(result, segments=tuple(segments))) return expanded def _fast_vision_match_cfg(cfg): """Return a vision-seed prepass config that still keeps quality settings.""" from dataclasses import replace return replace( cfg, cv=replace( cfg.cv, deep_scan=replace(cfg.cv.deep_scan, skip_coarse_scan_with_weighted_seeds=True), ), vision=replace( cfg.vision, fullscan_fallback=False, ), ) def _run_segment_match(segment_beat, continuity, cfg, allow_fullscan: bool = True): """Match one visual island with the same generic staged strategy as a beat.""" from src.pipeline.matcher import run_matching if cfg.vision.enabled: fast_cfg = _fast_vision_match_cfg(cfg) fast_matches = run_matching( fast_cfg, [segment_beat], seed_in_points=continuity, ) if fast_matches: if not allow_fullscan or all( m.is_confirmed or m.match_score >= cfg.cv.deep_scan.match_threshold for m in fast_matches ): return fast_matches if not allow_fullscan: return fast_matches if cfg.vision.enabled else [] full_matches = run_matching( cfg, [segment_beat], seed_in_points=continuity, ) return _merge_best_results(fast_matches if cfg.vision.enabled else [], full_matches, cfg) def _match_unmatched_visual_segments( results: list, beats: list, cached: list, cfg, skip_global_segment_scan_for: set[int] | None = None, ) -> list: """Create segmented provisional matches when a whole beat has no single match.""" from dataclasses import replace from src.core.models import MatchResult, MatchSegment from src.cv.frame_extractor import get_video_info matched_ids = {r.beat_id for r in results} expanded = list(results) skip_global_segment_scan_for = skip_global_segment_scan_for or set() try: fps = float(get_video_info(cfg.paths.source_movie)["fps"]) or cfg.export.edl_frame_rate except Exception: fps = cfg.export.edl_frame_rate for beat in beats: if beat.beat_id in matched_ids: continue # Per-shot matching when the beat has either fade-bounded islands # OR internal hard cuts; each shot becomes its own MatchSegment. islands = _reference_shot_segments(beat, cfg) if not islands: continue segments: list[MatchSegment] = [] for island_idx, (start_s, end_s) in enumerate(islands): segment_beat = replace( beat, start_s=beat.start_s + start_s, end_s=beat.start_s + end_s, ) if island_idx == 0: # First island of an unmatched multi-shot beat: search globally # without a continuity bias from the previous beat. Continuity # assumes the shot follows the previous beat in the source, but # the lead shot of a multi-shot beat is often an insert cut from # a completely different scene. A wrong seed with score 0.92 # would push the real match out of the refinement candidate pool. continuity = {} else: continuity = _continuity_seed_in_points( beat.beat_id, [b if b.beat_id != beat.beat_id else segment_beat for b in beats], cached + expanded, cfg, ) segment_matches = [] if beat.beat_id not in skip_global_segment_scan_for: segment_matches = _run_segment_match(segment_beat, continuity, cfg, allow_fullscan=True) if not segment_matches: # Fade-content shot fallback: when CV finds no templates # inside this shot (typical for cross-fade silhouettes), the # vibe-check + vision-action-window recovery path is the only # way to get a match. It's slower but works on dark frames # because vision can read structure where CV cannot. shot_islands = _reference_scoreable_segments(segment_beat, cfg) if not shot_islands and cfg.vision.enabled: recovered = _recover_unmatched_beats_via_vision([], [segment_beat], cfg) if recovered: rec = recovered[0] seg_dur = min(max(0.0, end_s - start_s), max(0.0, rec.duration_s)) if seg_dur > 0: segments.append(MatchSegment( trailer_offset_s=start_s, duration_s=seg_dur, scene_id=rec.scene_id, in_point_s=rec.in_point_s, out_point_s=rec.in_point_s + seg_dur, match_score=rec.match_score, is_confirmed=rec.is_confirmed, )) continue local_segment = _local_same_scene_segment_match( segment_beat, beat, start_s, cached + expanded, cfg, ) if local_segment is not None: segments.append(local_segment) continue seg = segment_matches[0] seg_dur = min(max(0.0, end_s - start_s), max(0.0, seg.duration_s)) segments.append( MatchSegment( trailer_offset_s=start_s, duration_s=seg_dur, scene_id=seg.scene_id, in_point_s=seg.in_point_s, out_point_s=seg.in_point_s + seg_dur, match_score=seg.match_score, is_confirmed=seg.is_confirmed, ) ) if not segments: continue first = segments[0] total_segment_duration = sum(max(0.0, s.duration_s) for s in segments) score = ( sum(max(0.0, s.duration_s) * s.match_score for s in segments) / total_segment_duration if total_segment_duration > 0 else min(s.match_score for s in segments) ) expanded.append( MatchResult( beat_id=beat.beat_id, scene_id=first.scene_id, source_path=cfg.paths.source_movie, in_point_s=first.in_point_s, out_point_s=first.out_point_s, in_point_frame=int(max(0.0, first.in_point_s) * fps), match_score=score, is_confirmed=all(s.is_confirmed for s in segments), segments=tuple(segments), ) ) return expanded def _local_same_scene_segment_match(segment_beat, beat, segment_offset_s: float, cached: list, cfg): """Find a short trailer island inside scenes adjacent to neighbouring beat matches.""" from src.core.models import MatchSegment from src.cv.frame_extractor import open_video from src.cv.global_scan import _content_alignment_score, _content_alignment_templates scenes = _load_scene_cache_light(cfg) if not scenes: return None by_id = {r.beat_id: r for r in cached} scene_ids: list[int] = [] for neighbour_id in (beat.beat_id - 1, beat.beat_id + 1): result = by_id.get(neighbour_id) if result is None: continue ids = [getattr(s, "scene_id", result.scene_id) for s in getattr(result, "segments", ())] or [result.scene_id] for scene_id in ids: if scene_id not in scene_ids: scene_ids.append(scene_id) if not scene_ids: return None templates = _content_alignment_templates(segment_beat, cfg) if not templates: return None min_score = min( cfg.cv.deep_scan.provisional_content_threshold * 0.70, cfg.cv.deep_scan.provisional_match_threshold, ) step_s = max(1.0 / cfg.export.edl_frame_rate, 0.04) best: tuple[float, float, int] | None = None with open_video(cfg.paths.source_movie) as cap: for scene_id in scene_ids: scene = next((s for s in scenes if int(s["scene_id"]) == int(scene_id)), None) if scene is None: continue start_s = max(0.0, float(scene["start_s"]) - 0.25) end_s = max(start_s, float(scene["end_s"]) - max(0.04, segment_beat.duration_s) + 0.25) t = start_s while t <= end_s: score = _content_alignment_score(cap, t, templates, cfg) if best is None or score > best[0]: best = (score, t, int(scene_id)) t = round(t + step_s, 6) if best is None or best[0] < min_score: return None score, in_point_s, scene_id = best duration_s = max(0.0, min(segment_beat.duration_s, segment_beat.end_s - segment_beat.start_s)) return MatchSegment( trailer_offset_s=segment_offset_s, duration_s=duration_s, scene_id=scene_id, in_point_s=in_point_s, out_point_s=in_point_s + duration_s, match_score=score, is_confirmed=score >= cfg.cv.deep_scan.match_threshold, ) def cmd_match(args: argparse.Namespace, cfg) -> list: from src.pipeline.matcher import run_matching from dataclasses import replace if getattr(args, "vision", False): cfg = replace(cfg, vision=replace(cfg.vision, enabled=True)) if getattr(args, "no_vision", False): cfg = replace(cfg, vision=replace(cfg.vision, enabled=False)) all_beats = _load_beats(cfg) beats = _select_beats(all_beats, getattr(args, "beat", None)) cached = _normalize_cached_results(all_beats, _load_results(cfg), cfg) if _results_cache_path(cfg).exists() else [] # Multi-shot beats: either fade-bounded multiple islands, OR a single # island with internal hard cuts (e.g. man-shot then back to woman). Both # cases are routed through the per-segment match path so each shot gets # its own source clip instead of being approximated by one continuous # span. multi_island_beat_ids = { beat.beat_id for beat in beats if len(_reference_shot_segments(beat, cfg)) > 1 } scan_beats, single_island_trims = _trim_beats_to_single_visual_island(beats, cfg) scan_beats = [b for b in scan_beats if b.beat_id not in multi_island_beat_ids] seed_in_points = ( _continuity_seed_in_points(args.beat, all_beats, cached, cfg) if getattr(args, "beat", None) is not None else None ) results = [] if cfg.vision.enabled: fast_cfg = _fast_vision_match_cfg(cfg) results = run_matching( fast_cfg, scan_beats, force_reindex=args.force_reindex, seed_in_points=seed_in_points, ) if len(results) < len(scan_beats) or any( not r.is_confirmed and r.match_score < cfg.cv.deep_scan.match_threshold for r in results ): results_by_id = {r.beat_id: r for r in results} remaining_beats = [ b for b in scan_beats if ( b.beat_id not in results_by_id or ( not results_by_id[b.beat_id].is_confirmed and results_by_id[b.beat_id].match_score < cfg.cv.deep_scan.match_threshold ) ) ] if remaining_beats: full_results = run_matching( cfg, remaining_beats, force_reindex=args.force_reindex, seed_in_points=seed_in_points, ) results = _merge_best_results(results, full_results, cfg) results = _apply_single_island_segments(results, single_island_trims) results = _match_unmatched_visual_segments( results, beats, cached, cfg, skip_global_segment_scan_for=set(single_island_trims), ) results = _attach_visual_segments(results, beats, cfg) results = _filter_semantically_invalid_vision_matches(results, beats, cfg) results = _recover_unmatched_beats_via_vision(results, beats, cfg) # A targeted one-beat match must NEVER delete or modify any other beat's # cache entry. We deliberately re-load the raw cache from disk here so # the upstream normalisation pass (which drops entries that no longer # pass current quality gates) cannot leak into the save: only the # targeted beat's slot gets replaced, every other entry is written back # bit-for-bit identical to what it was before this run. if getattr(args, "beat", None) is not None and _results_cache_path(cfg).exists(): raw_cached = _load_results(cfg) old_for_beat = next((r for r in raw_cached if r.beat_id == args.beat), None) raw_cached = [r for r in raw_cached if r.beat_id != args.beat] for result in results: if _keeps_cached_match(old_for_beat, result, cfg): print( f"ℹ️ Beat {result.beat_id}: keeping existing {len(getattr(old_for_beat, 'segments', ()) or ())}‑segment " f"provisional match (score {old_for_beat.match_score:.3f}) over weaker new result " f"(score {result.match_score:.3f}, no segments)." ) raw_cached.append(old_for_beat) else: raw_cached = _update_result(result, raw_cached) results_to_save = sorted(raw_cached, key=lambda r: r.beat_id) else: results_to_save = results _save_results(results_to_save, cfg) _regenerate_cutter_report(cfg) print(f"\n✅ {len(results)} / {len(beats)} beats matched.") for r in results: print(f" Beat {r.beat_id:03d} → scene {r.scene_id:04d} " f"in={r.in_point_s:>8.3f}s score={r.match_score:.3f}") return results def _update_result(new_result, results: list) -> list: """Replace or insert a MatchResult in the list (by beat_id).""" updated = [r for r in results if r.beat_id != new_result.beat_id] updated.append(new_result) return sorted(updated, key=lambda r: r.beat_id) def _continuity_seed_in_points(beat_id: int, beats: list, results: list, cfg) -> dict[int, list[float | tuple[float, float]]]: beats_by_id = {b.beat_id: b for b in beats} results_by_id = {r.beat_id: r for r in results} target = beats_by_id.get(beat_id) if target is None: return {} seeds: list[tuple[float, float]] = [] base_score = max(cfg.cv.deep_scan.coarse_candidate_threshold + 0.08, 0.92) prev_matches = [ (b, results_by_id[b.beat_id]) for b in beats if b.beat_id < beat_id and b.beat_id in results_by_id ] if prev_matches: prev_beat, prev_result = max(prev_matches, key=lambda item: item[0].beat_id) trailer_gap_s = max(0.0, target.start_s - prev_beat.end_s) expected = prev_result.out_point_s + trailer_gap_s for offset in cfg.cv.deep_scan.continuity_seed_offsets_s: offset_score = max( cfg.cv.deep_scan.coarse_candidate_threshold, base_score - abs(offset) * 0.06, ) seeds.append((expected + offset, offset_score)) next_matches = [ (b, results_by_id[b.beat_id]) for b in beats if b.beat_id > beat_id and b.beat_id in results_by_id ] if next_matches: next_beat, next_result = min(next_matches, key=lambda item: item[0].beat_id) trailer_gap_s = max(0.0, next_beat.start_s - target.end_s) expected = next_result.in_point_s - trailer_gap_s - target.duration_s for offset in cfg.cv.deep_scan.continuity_seed_offsets_s: offset_score = max( cfg.cv.deep_scan.coarse_candidate_threshold, base_score - abs(offset) * 0.06, ) seeds.append((expected - offset, offset_score)) unique: dict[float, float] = {} for seed_t, seed_score in seeds: rounded = round(max(0.0, seed_t), 3) unique[rounded] = max(unique.get(rounded, 0.0), seed_score) points = [(seed_t, score) for seed_t, score in sorted(unique.items())] return {beat_id: points} if points else {} def cmd_rematch(args: argparse.Namespace, cfg) -> None: """ Re-run automatic matching for ONE beat. python cli.py rematch --beat 5 # re-scan CV for beat 5 python cli.py rematch --beat 5 --threshold 0.40 # relax threshold """ beat_id = args.beat beats = _load_beats(cfg) results = _load_results(cfg) if _results_cache_path(cfg).exists() else [] beat = next((b for b in beats if b.beat_id == beat_id), None) if beat is None: print(f"\u274c Beat {beat_id} not found. Run 'analyze' first.") return # ---- Refine an already acceptable cached match ------------------------- if args.refine: current = next((r for r in results if r.beat_id == beat_id), None) if current is None: print(f"❌ Beat {beat_id} has no cached match to refine. Run 'match --beat {beat_id}' first.") return from src.cv.content_align import align_cached_match_by_content refined_in_s, sequence_score = align_cached_match_by_content( beat, current.in_point_s, cfg, search_window_s=args.refine_window, ) usable_duration_s = max(0.0, current.out_point_s - current.in_point_s) span_score = sequence_score scene_data = _scene_for_time_light(_load_scene_cache_light(cfg), refined_in_s, cfg) out_point_s = refined_in_s + usable_duration_s if scene_data is not None: out_point_s = min(out_point_s, float(scene_data["end_s"])) matchable_duration_s = beat.duration_s duration_coverage = ( max(0.0, out_point_s - refined_in_s) / matchable_duration_s if matchable_duration_s > 0 else 0.0 ) if duration_coverage < cfg.cv.deep_scan.min_duration_coverage: print( f"❌ Beat {beat_id} refined candidate rejected: " f"duration coverage {duration_coverage:.0%} < " f"{cfg.cv.deep_scan.min_duration_coverage:.0%}" ) return try: from src.cv.frame_extractor import get_video_info fps = float(get_video_info(cfg.paths.source_movie)["fps"]) or cfg.export.edl_frame_rate except Exception: fps = cfg.export.edl_frame_rate from src.core.models import MatchResult refined = MatchResult( beat_id=beat_id, scene_id=int(scene_data["scene_id"]) if scene_data is not None else current.scene_id, source_path=current.source_path, in_point_s=max(0.0, refined_in_s), out_point_s=out_point_s, in_point_frame=int(max(0.0, refined_in_s) * fps), match_score=max(sequence_score, span_score), match_location=current.match_location, is_confirmed=max(sequence_score, span_score) >= cfg.cv.deep_scan.match_threshold, ) results = _update_result(refined, results) _save_results(results, cfg) print( f"✅ Beat {beat_id} refined → " f"in={refined.in_point_s:.3f}s, out={refined.out_point_s:.3f}s, " f"sequence_score={refined.match_score:.3f}" ) return # ---- Re-run CV with optional threshold override ------------------------ from dataclasses import replace as dc_replace run_cfg = cfg if args.threshold is not None: run_cfg = dc_replace( cfg, cv=dc_replace( cfg.cv, deep_scan=dc_replace(cfg.cv.deep_scan, match_threshold=args.threshold), ), ) print(f"ℹ️ threshold overridden to {args.threshold} for beat {beat_id}") from src.cv.global_scan import run_global_scan seed_in_points = _continuity_seed_in_points(beat_id, beats, results, run_cfg) matches = run_global_scan([beat], run_cfg, seed_in_points=seed_in_points) if not matches: print(f"❌ Beat {beat_id}: no match. Try --threshold 0.40.") return match = matches[0] results = _update_result(match, results) _save_results(results, cfg) print(f"✅ Beat {beat_id} rematched → (in={match.in_point_s:.3f}s, score={match.match_score:.3f})") def cmd_report(args: argparse.Namespace, cfg) -> None: if getattr(args, "beat", None) is not None: print(f"\n⚠️ Generating cutter report for all beats (ignoring --beat {args.beat}).") _regenerate_cutter_report(cfg) project_root = cfg.paths.cache_dir.parent print(f"\n✅ Report → {project_root / 'CUTTER_REPORT.html'} and CUTTER_REPORT.md") def cmd_export(args: argparse.Namespace, cfg) -> None: from src.export.edl_writer import write_edl from src.export.fcpxml_writer import write_fcpxml from src.pipeline.matcher import build_timeline beats = _select_beats(_load_beats(cfg), getattr(args, "beat", None)) beat_ids = {b.beat_id for b in beats} if getattr(args, "beat", None) is not None else None results = _select_results(_normalize_cached_results(_load_beats(cfg), _load_results(cfg), cfg), beat_ids) if getattr(args, "beat", None) is not None and not results: print(f"❌ Beat {args.beat} has no cached match. Run 'match --beat {args.beat}' first.") return timeline = build_timeline(beats, results, cfg) fmt = args.format or cfg.export.output_format beat_id = getattr(args, "beat", None) out_stem = ( f"{cfg.paths.reference_trailer.stem}_beat_{beat_id:03d}" if beat_id is not None else timeline.title ) if fmt in ("fcpxml", "both"): out = write_fcpxml(timeline, cfg, output_path=cfg.paths.output_dir / f"{out_stem}.fcpxml") print(f"✅ FCPXML → {out}") if fmt in ("edl", "both"): out = write_edl(timeline, cfg, output_path=cfg.paths.output_dir / f"{out_stem}.edl") print(f"✅ EDL → {out}") def cmd_run(args: argparse.Namespace, cfg) -> None: """Full pipeline: analyze → match → report → export.""" cmd_analyze(args, cfg) cmd_match(args, cfg) cmd_report(args, cfg) cmd_export(args, cfg) # --------------------------------------------------------------------------- # Argument parser # --------------------------------------------------------------------------- def _build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( prog="ai-trailer", description="AI Trailer Generator v2 — Pure CV scene matching", formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument( "--config", type=Path, default=Path("config.toml"), metavar="CONFIG", help="Path to config.toml (default: ./config.toml)", ) parser.add_argument( "--log-level", default="INFO", choices=["DEBUG", "INFO", "WARNING", "ERROR"], help="Logging verbosity (default: INFO)", ) sub = parser.add_subparsers(dest="command", required=True) # analyze p_analyze = sub.add_parser("analyze", help="Detect trailer beats + fingerprint") p_analyze.add_argument("--no-audio", action="store_true", help="Skip Whisper (only affects beat labels, not matching)") p_analyze.add_argument("--no-llm", action="store_true", help="Skip LLM classification (only affects beat labels)") # match p_match = sub.add_parser("match", help="Run 2-phase CV matching") p_match.add_argument("--force-reindex", action="store_true", help="Ignore scene cache and re-run PySceneDetect") p_match.add_argument("--beat", type=int, help="Match only one beat and merge it into the cached results") p_match.add_argument("--vision", action="store_true", help="Enable cached vision descriptions for extra automatic search seeds") p_match.add_argument("--no-vision", action="store_true", help="Disable vision seeding even if [vision].enabled is true") # rematch p_rematch = sub.add_parser("rematch", help="Re-run or override matching for one beat") p_rematch.add_argument("--beat", type=int, required=True, help="Beat ID to rematch") p_rematch.add_argument("--threshold", type=float, default=None, help="Override match_threshold") p_rematch.add_argument("--refine", action="store_true", help="Refine the cached match by measuring a local image-content offset") p_rematch.add_argument("--refine-window", type=float, default=None, help="Seconds to search around the cached in-point when using --refine") # report p_report = sub.add_parser("report", help="Generate HTML visual comparison report") p_report.add_argument("--beat", type=int, help="Report only one beat") # export p_export = sub.add_parser("export", help="Export timeline from cached results") p_export.add_argument("--format", choices=["fcpxml", "edl", "both"], help="Override [export] output_format from config") p_export.add_argument("--beat", type=int, help="Export only one beat") # run p_run = sub.add_parser("run", help="Full pipeline: analyze → match → export") p_run.add_argument("--no-audio", action="store_true") p_run.add_argument("--no-llm", action="store_true") p_run.add_argument("--force-reindex", action="store_true") p_run.add_argument("--vision", action="store_true") p_run.add_argument("--no-vision", action="store_true") p_run.add_argument("--format", choices=["fcpxml", "edl", "both"]) p_run.add_argument("--beat", type=int, help="Run match/report/export for only one cached beat") return parser # --------------------------------------------------------------------------- # Entry point # --------------------------------------------------------------------------- def main() -> None: _ensure_utf8_console() parser = _build_parser() args = parser.parse_args() _setup_logging(args.log_level) from src.core.config import load_config cfg = load_config(args.config) dispatch = { "analyze": cmd_analyze, "match": cmd_match, "rematch": cmd_rematch, "report": cmd_report, "export": cmd_export, "run": cmd_run, } handler = dispatch[args.command] handler(args, cfg) if __name__ == "__main__": main()