""" cli.py — AI Trailer Generator v2 — Command-Line Interface Usage: python cli.py analyze [--config CONFIG] [--no-audio] [--no-llm] python cli.py match [--config CONFIG] [--force-reindex] python cli.py rematch --beat N [--threshold F] [--refine] python cli.py report [--config CONFIG] python cli.py run [--config CONFIG] [--force-reindex] [--no-audio] [--no-llm] python cli.py export [--config CONFIG] [--format fcpxml|edl|both] On --no-audio / --no-llm: These flags do NOT affect matching quality. Whisper and the LLM only assign narrative labels (HOOK/SETUP/CLIMAX) to beats in the export metadata. The CV pipeline is identical either way. Use them for fast iterations: they skip large model downloads. All heavy imports are deferred so --help is instant. """ from __future__ import annotations import argparse import json import logging import sys from pathlib import Path # --------------------------------------------------------------------------- # Logging setup # --------------------------------------------------------------------------- def _setup_logging(level: str = "INFO") -> None: # Force UTF-8 for Windows console emoji printing if sys.stdout.encoding != 'utf-8': sys.stdout.reconfigure(encoding='utf-8') logging.basicConfig( format="%(asctime)s %(levelname)-8s %(name)s — %(message)s", datefmt="%H:%M:%S", level=getattr(logging, level.upper(), logging.INFO), stream=sys.stdout, ) logging.getLogger("PIL").setLevel(logging.WARNING) def _ensure_utf8_console() -> None: """Make argparse help safe on Windows before logging is configured.""" if sys.stdout.encoding != "utf-8": sys.stdout.reconfigure(encoding="utf-8") # --------------------------------------------------------------------------- # Cache helpers (match results ↔ JSON) # --------------------------------------------------------------------------- def _results_cache_path(cfg: "AppConfig") -> Path: # type: ignore[name-defined] return cfg.paths.cache_dir / "match_results.json" def _save_results(results: list, cfg: "AppConfig") -> None: # type: ignore[name-defined] from src.core.models import MatchResult data = [ { "beat_id": r.beat_id, "scene_id": r.scene_id, "source_path": str(r.source_path), "in_point_s": r.in_point_s, "out_point_s": r.out_point_s, "in_point_frame": r.in_point_frame, "match_score": r.match_score, "match_location": list(r.match_location), "is_confirmed": r.is_confirmed, "segments": [ { "trailer_offset_s": s.trailer_offset_s, "duration_s": s.duration_s, "scene_id": s.scene_id, "in_point_s": s.in_point_s, "out_point_s": s.out_point_s, "match_score": s.match_score, "is_confirmed": s.is_confirmed, } for s in getattr(r, "segments", ()) ], } for r in results ] p = _results_cache_path(cfg) p.parent.mkdir(parents=True, exist_ok=True) p.write_text(json.dumps(data, indent=2), encoding="utf-8") logging.getLogger(__name__).info("Match results cached → %s", p) def _auto_commit_push_reports(project_root: "Path") -> None: # type: ignore[name-defined] """Stage changed report files, commit, and push to origin. Only touches report output files — never stages source or config changes. Failures are logged but never propagate. """ import subprocess as _sp from datetime import datetime as _dt report_globs = [ "CUTTER_REPORT.html", "CUTTER_REPORT.md", "output/cutter_clips/beat_*_compare.mp4", "output/cutter_clips/beat_*_source.mp4", "output/cutter_clips/beat_*_source_seg*.mp4", "output/cutter_clips/beat_*_trailer.mp4", "output/cutter_stills/beat_*_source.jpg", "output/cutter_stills/beat_*_trailer.jpg", ] log = logging.getLogger(__name__) cwd = str(project_root) try: for pattern in report_globs: _sp.run(["git", "add", "--", pattern], capture_output=True, cwd=cwd) status = _sp.run( ["git", "status", "--porcelain"], capture_output=True, text=True, cwd=cwd ) if not status.stdout.strip(): log.info("Auto-commit: nothing changed in report files.") return now = _dt.now().strftime("%Y-%m-%d %H:%M") msg = f"Auto-update cutter report {now}\n\nCo-Authored-By: Claude Sonnet 4.6 " _sp.run(["git", "commit", "-m", msg], capture_output=True, cwd=cwd, check=True) _sp.run(["git", "push", "origin", "main"], capture_output=True, cwd=cwd, check=True) log.info("Auto-commit+push: cutter report updated → remote.") except Exception as exc: log.warning("Auto-commit/push failed (non-fatal): %s", exc) def _regenerate_cutter_report(cfg: "AppConfig", force_beats: set[int] | None = None) -> None: # type: ignore[name-defined] """Re-render CUTTER_REPORT.{md,html} with Frame-Locked Compare clips. Called from every match-style command after the cache is written so all cutter-facing artefacts stay in sync with `match_results.json`. After rendering, stages and pushes changed report files to the remote. Failures are logged but never abort the run. """ project_root = cfg.paths.cache_dir.parent try: import os from scripts.generate_cutter_report import render_report old_force = os.environ.get("CUTTER_REPORT_FORCE_BEATS") try: if force_beats: os.environ["CUTTER_REPORT_FORCE_BEATS"] = ",".join(str(b) for b in sorted(force_beats)) md, html = render_report(project_root, with_stills=True, with_clips=True) finally: if force_beats: if old_force is None: os.environ.pop("CUTTER_REPORT_FORCE_BEATS", None) else: os.environ["CUTTER_REPORT_FORCE_BEATS"] = old_force (project_root / "CUTTER_REPORT.md").write_text(md, encoding="utf-8") (project_root / "CUTTER_REPORT.html").write_text(html, encoding="utf-8") logging.getLogger(__name__).info("Cutter report regenerated (md + html + compare clips)") except Exception as exc: logging.getLogger(__name__).warning("Cutter report regen failed: %s", exc) _auto_commit_push_reports(project_root) def _load_results(cfg: "AppConfig") -> list: # type: ignore[name-defined] from src.core.models import MatchResult, MatchSegment p = _results_cache_path(cfg) if not p.exists(): raise FileNotFoundError(f"No cached results at {p}. Run 'match' first.") raw = json.loads(p.read_text(encoding="utf-8")) return [ MatchResult( beat_id=d["beat_id"], scene_id=d["scene_id"], source_path=Path(d["source_path"]), in_point_s=d["in_point_s"], out_point_s=d["out_point_s"], in_point_frame=d["in_point_frame"], match_score=d["match_score"], match_location=tuple(d["match_location"]), is_confirmed=d.get("is_confirmed", True), segments=tuple( MatchSegment( trailer_offset_s=float(s["trailer_offset_s"]), duration_s=float(s["duration_s"]), scene_id=int(s["scene_id"]), in_point_s=float(s["in_point_s"]), out_point_s=float(s["out_point_s"]), match_score=float(s["match_score"]), is_confirmed=bool(s.get("is_confirmed", True)), ) for s in d.get("segments", ()) ), ) for d in raw ] def _load_scene_cache_light(cfg) -> list[dict]: p = cfg.paths.cache_dir / "scene_index.json" if not p.exists(): return [] return json.loads(p.read_text(encoding="utf-8")) def _scene_fps_light(scene: dict, cfg) -> float: duration_s = max(0.0, float(scene["end_s"]) - float(scene["start_s"])) frame_count = max(0, int(scene["end_frame"]) - int(scene["start_frame"])) return frame_count / duration_s if duration_s > 0 and frame_count > 0 else cfg.export.edl_frame_rate def _scene_for_time_light(scenes: list[dict], t_sec: float, cfg) -> dict | None: for idx, scene in enumerate(scenes): if float(scene["start_s"]) <= t_sec < float(scene["end_s"]): if ( float(scene["end_s"]) - t_sec <= cfg.cv.deep_scan.scene_boundary_epsilon_s and idx + 1 < len(scenes) ): return scenes[idx + 1] return scene return None def _scene_by_id_light(scenes: list[dict], scene_id: int) -> dict | None: return next((s for s in scenes if int(s["scene_id"]) == scene_id), None) def _contiguous_duration_light(beat, in_point_s: float, scenes: list[dict], cfg, matchable_duration_s: float) -> float: if matchable_duration_s <= 0: return 0.0 try: from src.cv.global_scan import _reference_internal_cut_offsets cut_offsets = _reference_internal_cut_offsets(beat, cfg) except Exception: cut_offsets = [] start_idx = None for idx, scene in enumerate(scenes): if float(scene["start_s"]) <= in_point_s < float(scene["end_s"]): start_idx = idx break if start_idx is None: return 0.0 target_end = in_point_s + matchable_duration_s current_end = in_point_s for scene in scenes[start_idx:]: scene_end = float(scene["end_s"]) if target_end <= scene_end: return matchable_duration_s boundary_offset = scene_end - in_point_s if not any( abs(boundary_offset - cut_offset) <= cfg.vision.multi_shot_boundary_tolerance_s for cut_offset in cut_offsets ): tail_s = max(0.0, cfg.cv.deep_scan.trim_tail_frames / _scene_fps_light(scene, cfg)) return max(0.0, scene_end - in_point_s - tail_s) current_end = scene_end return max(0.0, current_end - in_point_s) def _normalize_cached_results(beats: list, results: list, cfg) -> list: """ Re-apply current generic timing rules to cached results. This keeps old automatic cache entries from preserving obsolete scene-boundary or tail-trim behavior without introducing manual per-beat truth. """ from dataclasses import replace scenes = _load_scene_cache_light(cfg) if not scenes: return results beats_by_id = {b.beat_id: b for b in beats} normalized = [] for result in results: beat = beats_by_id.get(result.beat_id) if getattr(result, "segments", ()): segment_threshold = cfg.cv.deep_scan.multi_shot_segment_threshold current_islands = _reference_scoreable_segments(beat, cfg) if beat is not None else [] repaired_segments = [] source_segments = list(result.segments) if beat is not None and len(source_segments) == 1 and len(current_islands) == 1: island_start_s, island_end_s = current_islands[0] island_duration_s = max(0.0, island_end_s - island_start_s) segment = source_segments[0] if ( abs(float(segment.trailer_offset_s) - island_start_s) > 0.04 or abs(float(segment.duration_s) - island_duration_s) > 0.08 ): from dataclasses import replace as _replace source_segments[0] = _replace( segment, trailer_offset_s=island_start_s, duration_s=island_duration_s, out_point_s=float(segment.in_point_s) + island_duration_s, ) for segment in source_segments: if float(segment.match_score) < segment_threshold: scene = _scene_by_id_light(scenes, segment.scene_id) if beat is not None and scene is not None: segment_beat = replace( beat, start_s=beat.start_s + float(segment.trailer_offset_s), end_s=beat.start_s + float(segment.trailer_offset_s) + float(segment.duration_s), ) probe = _phase_probe_segment_in_scene( segment_beat, scene, float(segment.in_point_s), cfg, ) if probe is not None: in_point_s, _phase_score = probe segment = replace( segment, in_point_s=in_point_s, out_point_s=in_point_s + float(segment.duration_s), match_score=max(float(segment.match_score), float(_phase_score)), is_confirmed=float(_phase_score) >= cfg.cv.deep_scan.match_threshold, ) repaired_segments.append(segment) valid_segments = tuple(repaired_segments) if not valid_segments: continue segment_duration = sum(max(0.0, float(s.duration_s)) for s in valid_segments) weighted_score = ( sum(max(0.0, float(s.duration_s)) * float(s.match_score) for s in valid_segments) / segment_duration if segment_duration > 0 else result.match_score ) if weighted_score < cfg.cv.deep_scan.provisional_match_threshold: continue if beat is not None and beat.duration_s > 0: visible_duration = sum( max(0.0, end_s - start_s) for start_s, end_s in _reference_scoreable_segments(beat, cfg) ) coverage_target = visible_duration if visible_duration > 0 else beat.duration_s coverage = segment_duration / coverage_target if coverage < cfg.cv.deep_scan.min_duration_coverage: continue first_segment = valid_segments[0] normalized.append(replace( result, scene_id=first_segment.scene_id, in_point_s=first_segment.in_point_s, out_point_s=first_segment.out_point_s, match_score=weighted_score, segments=valid_segments, )) continue if result.match_score < cfg.cv.deep_scan.provisional_match_threshold: continue scene = _scene_for_time_light(scenes, result.in_point_s, cfg) declared_scene = _scene_by_id_light(scenes, result.scene_id) # If the automatic matcher selected a scene but its in-point sits just # before that scene's detected start, treat this as scene-boundary drift # and clamp to the declared scene. This is generic: no beat IDs, no # manual timestamps, just consistent scene/time reconciliation. if declared_scene is not None: declared_start = float(declared_scene["start_s"]) declared_end = float(declared_scene["end_s"]) declared_fps = _scene_fps_light(declared_scene, cfg) boundary_tolerance_s = ( cfg.cv.deep_scan.scene_boundary_epsilon_s + cfg.cv.deep_scan.start_preroll_frames / declared_fps ) if declared_start - boundary_tolerance_s <= result.in_point_s < declared_end: scene = declared_scene if beat is None or scene is None: normalized.append(result) continue fps = _scene_fps_light(scene, cfg) adjusted_in_s = result.in_point_s scene_changed = int(scene["scene_id"]) != result.scene_id starts_before_scene = result.in_point_s < float(scene["start_s"]) if scene_changed or starts_before_scene or result.duration_s <= 0.12: adjusted_in_s = max(0.0, result.in_point_s - (cfg.cv.deep_scan.start_preroll_frames / fps)) adjusted_in_s = max(float(scene["start_s"]), adjusted_in_s) scene = _scene_for_time_light(scenes, adjusted_in_s, cfg) or scene fps = _scene_fps_light(scene, cfg) matchable_duration_s = beat.duration_s try: from src.cv.global_scan import estimate_matchable_reference_duration matchable_duration_s = estimate_matchable_reference_duration(beat, cfg) except Exception: pass tail_s = max(0.0, cfg.cv.deep_scan.trim_tail_frames / fps) single_scene_duration_s = max(0.0, min(beat.duration_s, float(scene["end_s"]) - adjusted_in_s) - tail_s) contiguous_duration_s = _contiguous_duration_light( beat, adjusted_in_s, scenes, cfg, matchable_duration_s, ) max_duration_s = max(single_scene_duration_s, min(beat.duration_s, contiguous_duration_s)) normalized_result = result if ( scene_changed or starts_before_scene or result.duration_s <= 0.12 or result.out_point_s > adjusted_in_s + max_duration_s + (1.0 / fps) ): normalized_result = replace( result, scene_id=int(scene["scene_id"]), in_point_s=adjusted_in_s, out_point_s=adjusted_in_s + max_duration_s, in_point_frame=int(adjusted_in_s * fps), ) coverage = ( max(0.0, normalized_result.duration_s) / matchable_duration_s if matchable_duration_s > 0 else 0.0 ) if coverage < cfg.cv.deep_scan.min_duration_coverage: continue try: from src.cv.content_align import align_cached_match_by_content _, content_score = align_cached_match_by_content( beat, normalized_result.in_point_s, cfg, search_window_s=min(0.8, cfg.cv.deep_scan.content_align_window_seconds), fps=12.5, ) content_gate = ( cfg.cv.deep_scan.provisional_content_threshold if normalized_result.is_confirmed else min(cfg.cv.deep_scan.provisional_content_threshold, cfg.vision.content_threshold) ) if content_score < content_gate: continue if content_score < cfg.cv.deep_scan.match_threshold and normalized_result.is_confirmed: normalized_result = replace( normalized_result, match_score=min(normalized_result.match_score, content_score), is_confirmed=False, ) except Exception: pass normalized.append(normalized_result) return normalized # --------------------------------------------------------------------------- # Command handlers # --------------------------------------------------------------------------- def _build_transcribe_callback(cfg): """Return a transcribe_callback closure, or None if audio is disabled.""" from src.audio.transcriber import transcribe_video def _cb(path, start_s, end_s, offset_s): return transcribe_video(path, cfg, start_s=start_s, end_s=end_s, time_offset_s=offset_s) return _cb def _build_classify_callback(cfg): """Return a classify_callback closure.""" from src.llm.dramaturg import classify_beats def _cb(beats): return classify_beats(beats, cfg) return _cb def cmd_analyze(args: argparse.Namespace, cfg) -> list: from src.pipeline.trailer_analyzer import analyze_reference_trailer transcribe_cb = _build_transcribe_callback(cfg) if not args.no_audio else None classify_cb = _build_classify_callback(cfg) if not args.no_llm else None beats = analyze_reference_trailer( cfg, transcribe_callback=transcribe_cb, classify_callback=classify_cb, ) # Persist beats for downstream commands (including histogram bytes as hex) beats_cache = cfg.paths.cache_dir / "trailer_beats.json" beats_cache.parent.mkdir(parents=True, exist_ok=True) beats_data = [ { "beat_id": b.beat_id, "start_s": b.start_s, "end_s": b.end_s, "start_frame": b.start_frame, "end_frame": b.end_frame, "beat_type": b.beat_type.name, "dialogue": [{"start_s": d.start_s, "end_s": d.end_s, "text": d.text} for d in b.dialogue], "phash": b.phash, "luma_hist": b.luma_hist.hex() if b.luma_hist else None, "sat_hist": b.sat_hist.hex() if b.sat_hist else None, } for b in beats ] beats_cache.write_text(json.dumps(beats_data, indent=2, ensure_ascii=False), encoding="utf-8") print(f"\n\u2705 {len(beats)} beats analyzed \u2192 {beats_cache}") return beats def _load_beats(cfg) -> list: from src.core.models import BeatType, DialogueLine, TrailerBeat p = cfg.paths.cache_dir / "trailer_beats.json" if not p.exists(): raise FileNotFoundError(f"No cached beats at {p}. Run 'analyze' first.") raw = json.loads(p.read_text(encoding="utf-8")) beats = [] for d in raw: dialogue = tuple( DialogueLine(start_s=x["start_s"], end_s=x["end_s"], text=x["text"]) for x in d.get("dialogue", []) ) beats.append(TrailerBeat( beat_id=d["beat_id"], trailer_path=cfg.paths.reference_trailer, start_s=d["start_s"], end_s=d["end_s"], start_frame=d["start_frame"], end_frame=d["end_frame"], beat_type=BeatType[d.get("beat_type", "UNKNOWN")], dialogue=dialogue, phash=d.get("phash"), luma_hist=bytes.fromhex(d["luma_hist"]) if d.get("luma_hist") else None, sat_hist= bytes.fromhex(d["sat_hist"]) if d.get("sat_hist") else None, )) return beats def _select_beats(beats: list, beat_id: int | None) -> list: """Return all beats or exactly one requested beat.""" if beat_id is None: return beats selected = [b for b in beats if b.beat_id == beat_id] if not selected: raise ValueError(f"Beat {beat_id} not found. Run 'analyze' first.") return selected def _select_results(results: list, beat_ids: set[int] | None) -> list: """Return all results or only results for the requested beats.""" if beat_ids is None: return results return [r for r in results if r.beat_id in beat_ids] def _find_scene_for_in_point(cfg, in_point_s: float): from src.cv.scene_indexer import build_scene_index scenes = build_scene_index(cfg) for idx, scene in enumerate(scenes): if scene.start_s <= in_point_s < scene.end_s: if ( scene.end_s - in_point_s <= cfg.cv.deep_scan.scene_boundary_epsilon_s and idx + 1 < len(scenes) ): return scenes[idx + 1] return scene return None def _reference_scoreable_segments(beat, cfg) -> list[tuple[float, float]]: """Find visible source-matchable islands inside a trailer beat.""" from src.cv.frame_extractor import grab_frame_at_path from src.cv.global_scan import ( _corr_same_size, _is_scoreable_reference_frame, _prepare_haystack, _reference_visibility_stats, ) def is_visible(frame) -> bool: if frame is None: return False mean_luma, p90_luma, contrast = _reference_visibility_stats(frame, cfg) visible_luma = ( mean_luma >= cfg.cv.deep_scan.scoreable_luma_mean_min * 0.45 or p90_luma >= cfg.cv.deep_scan.scoreable_luma_p90_min * 0.50 ) visible_contrast = contrast >= max(8.0, cfg.cv.deep_scan.scoreable_contrast_min * 0.30) return visible_luma and visible_contrast step_s = max(0.08, cfg.cv.deep_scan.span_sample_step_s) min_segment_s = max(0.32, step_s * 3.0) bridge_gap_s = max(0.18, step_s * 2.0) raw: list[tuple[float, float]] = [] start: float | None = None last_seen: float | None = None t = 0.0 while t <= beat.duration_s: frame = grab_frame_at_path(beat.trailer_path, beat.start_s + t) scoreable = frame is not None and is_visible(frame) if scoreable: if start is None: start = t last_seen = t elif start is not None and last_seen is not None and t - last_seen > bridge_gap_s: end = min(beat.duration_s, last_seen + step_s) if end - start >= min_segment_s: raw.append((start, end)) start = None last_seen = None t = round(t + step_s, 6) if start is not None and last_seen is not None: end = min(beat.duration_s, last_seen + step_s) if end - start >= min_segment_s: raw.append((start, end)) expanded: list[tuple[float, float]] = [] same_shot_corr_min = 0.72 for start_s, end_s in raw: start_anchor = grab_frame_at_path(beat.trailer_path, beat.start_s + start_s) end_anchor = grab_frame_at_path(beat.trailer_path, beat.start_s + max(start_s, end_s - step_s)) start_feature = _prepare_haystack(start_anchor, cfg) if start_anchor is not None else None end_feature = _prepare_haystack(end_anchor, cfg) if end_anchor is not None else None soft_start = start_s t = round(start_s - step_s, 6) while t >= 0.0: frame = grab_frame_at_path(beat.trailer_path, beat.start_s + t) if not is_visible(frame): break if start_feature is not None and _corr_same_size(_prepare_haystack(frame, cfg), start_feature) < same_shot_corr_min: break soft_start = max(0.0, t) t = round(t - step_s, 6) soft_end = end_s t = round(end_s, 6) while t <= beat.duration_s + 1e-6: frame = grab_frame_at_path(beat.trailer_path, beat.start_s + t) if not is_visible(frame): break if end_feature is not None and _corr_same_size(_prepare_haystack(frame, cfg), end_feature) < same_shot_corr_min: break soft_end = min(beat.duration_s, t + step_s) t = round(t + step_s, 6) if soft_end - soft_start >= min_segment_s: expanded.append((soft_start, soft_end)) merged: list[tuple[float, float]] = [] for start_s, end_s in expanded: if merged and start_s - merged[-1][1] <= bridge_gap_s: merged[-1] = (merged[-1][0], max(merged[-1][1], end_s)) else: merged.append((start_s, end_s)) return merged def _fade_content_shots(beat, cfg) -> list[tuple[float, float]]: """Find low-luma fade regions adjacent to visible islands that still carry describable content (e.g. a hand+knife silhouette during a cross-fade). These regions are too dark for CV template matching but vision can read structure during the fade — the matcher therefore treats them as their own shots and routes them through the vision-led search path. A fade region qualifies when, sampled inside the region, the brightest frame has p90 ≥ 12 (not pure black) and contrast ≥ 8 (some structure) AND the region duration is ≥ 0.2 s. Pure-black/featureless fades stay excluded. """ from src.cv.frame_extractor import grab_frame_at_path from src.cv.global_scan import _reference_visibility_stats islands = _reference_scoreable_segments(beat, cfg) if not islands: return [] step_s = max(0.04, cfg.cv.deep_scan.span_sample_step_s) min_fade_s = 0.2 def has_content(start_s: float, end_s: float) -> bool: if end_s - start_s < min_fade_s: return False peak_p90 = 0.0 peak_contrast = 0.0 t = start_s while t < end_s: frame = grab_frame_at_path(beat.trailer_path, beat.start_s + t) if frame is not None: _, p90, contrast = _reference_visibility_stats(frame, cfg) peak_p90 = max(peak_p90, p90) peak_contrast = max(peak_contrast, contrast) t = round(t + step_s, 6) return peak_p90 >= 12.0 and peak_contrast >= 8.0 fades: list[tuple[float, float]] = [] # Between-island fades only: these are genuine cross-fade silhouettes # (one visible shot dissolves into another through a dim middle frame). # Pre-island fades are fade-from-black leaders; post-island fades are # fade-to-black trailers — neither is a source-matchable shot on its own. for prev_isl, next_isl in zip(islands, islands[1:]): gap_start, gap_end = prev_isl[1], next_isl[0] if has_content(gap_start, gap_end): fades.append((gap_start, gap_end)) return fades def _reference_shot_segments(beat, cfg) -> list[tuple[float, float]]: """Source-matchable shot ranges inside a trailer beat. Returns a sorted list of (start_s, end_s) tuples covering: * each visible island, further split at internal hard cuts; * each fade region adjacent to an island that still carries describable content (e.g. a silhouette during a cross-fade) — these get matched via the vision-led search path because CV templates against the dark frames are unusable. Tiny sub-shots are merged so noisy cut detection doesn't fragment a real shot into useless slivers. """ from src.cv.global_scan import _reference_internal_cut_offsets islands = _reference_scoreable_segments(beat, cfg) try: cut_offsets = sorted(_reference_internal_cut_offsets(beat, cfg)) except Exception: cut_offsets = [] fade_shots = _fade_content_shots(beat, cfg) if not cut_offsets and not fade_shots: return islands min_shot_s = max(0.4, cfg.cv.deep_scan.span_sample_step_s * 4.0) shots: list[tuple[float, float]] = [] for start_s, end_s in islands: boundaries = [start_s] for cut in cut_offsets: if start_s + 1e-3 < cut < end_s - 1e-3: boundaries.append(cut) boundaries.append(end_s) for i in range(len(boundaries) - 1): seg_start = boundaries[i] seg_end = boundaries[i + 1] if seg_end - seg_start < min_shot_s and shots and shots[-1][1] >= seg_start - 1e-3: # merge into previous if the new piece is too short shots[-1] = (shots[-1][0], seg_end) elif seg_end - seg_start >= min_shot_s: shots.append((seg_start, seg_end)) elif shots: shots[-1] = (shots[-1][0], seg_end) else: shots.append((seg_start, seg_end)) # Add fade-content shots (cross-fade silhouettes / dim shot boundaries) # sorted with the visible-island shots so the matcher sees them in # trailer-time order. if fade_shots: all_shots = sorted(list(shots) + list(fade_shots), key=lambda iv: iv[0]) # Drop overlaps in case a fade region brushes against an island # by a few frames; the island wins. cleaned: list[tuple[float, float]] = [] for s, e in all_shots: if cleaned and s < cleaned[-1][1]: if e > cleaned[-1][1]: cleaned.append((cleaned[-1][1], e)) continue cleaned.append((s, e)) return cleaned return shots if shots else islands def _trim_beats_to_single_visual_island(beats: list, cfg) -> tuple[list, dict[int, tuple[float, float]]]: """Use a single visible island as the primary match target for faded beats.""" from dataclasses import replace trimmed = [] trims: dict[int, tuple[float, float]] = {} frame_s = 1.0 / max(1.0, float(cfg.export.edl_frame_rate)) for beat in beats: islands = _reference_scoreable_segments(beat, cfg) if len(islands) == 1: start_s, end_s = islands[0] island_duration_s = max(0.0, end_s - start_s) has_real_trim = ( start_s > frame_s * 1.5 or beat.duration_s - end_s > frame_s * 1.5 ) if island_duration_s > 0.0 and has_real_trim: trimmed.append( replace( beat, start_s=beat.start_s + start_s, end_s=beat.start_s + end_s, ) ) trims[beat.beat_id] = (start_s, island_duration_s) continue trimmed.append(beat) return trimmed, trims def _apply_single_island_segments(results: list, trims: dict[int, tuple[float, float]]) -> list: """Restore beat-relative segment metadata after matching a trimmed island.""" if not trims: return results from dataclasses import replace from src.core.models import MatchSegment expanded = [] for result in results: trim = trims.get(result.beat_id) if trim is None or getattr(result, "segments", ()): expanded.append(result) continue trailer_offset_s, island_duration_s = trim duration_s = min(max(0.0, island_duration_s), max(0.0, result.duration_s)) segment = MatchSegment( trailer_offset_s=trailer_offset_s, duration_s=duration_s, scene_id=result.scene_id, in_point_s=result.in_point_s, out_point_s=result.in_point_s + duration_s, match_score=result.match_score, is_confirmed=result.is_confirmed, ) expanded.append( replace( result, out_point_s=result.in_point_s + duration_s, segments=(segment,), ) ) return expanded def _keeps_cached_match(old, new, cfg) -> bool: """Return True when the old cached match is better than the new one and should be kept. Specifically protects multi-segment provisional matches from being replaced by a weaker single-span result. The old entry wins when it has segments (explicitly tuned multi-shot layout) and the new result has none AND is not a score improvement. """ if old is None or new is None: return False old_segs = getattr(old, "segments", ()) or () new_segs = getattr(new, "segments", ()) or () if old_segs and not new_segs and new.match_score <= old.match_score: return True return False def _merge_best_results(existing: list, candidates: list, cfg) -> list: """Merge matches by beat, preferring confirmed or higher-scoring results.""" by_id = {r.beat_id: r for r in existing} for candidate in candidates: old = by_id.get(candidate.beat_id) if old is None: by_id[candidate.beat_id] = candidate continue candidate_confirmed = candidate.match_score >= cfg.cv.deep_scan.match_threshold or candidate.is_confirmed old_confirmed = old.match_score >= cfg.cv.deep_scan.match_threshold or old.is_confirmed if ( candidate_confirmed and not old_confirmed or candidate.match_score > old.match_score + cfg.cv.deep_scan.duration_tie_break_score_delta or ( candidate.match_score >= old.match_score - cfg.cv.deep_scan.duration_tie_break_score_delta and candidate.duration_s > old.duration_s ) ): by_id[candidate.beat_id] = candidate return sorted(by_id.values(), key=lambda r: r.beat_id) def _recover_unmatched_beats_via_vision(results: list, beats: list, cfg) -> list: """Try a vision-led search for beats that ended up weak or unmatched. For each unmatched beat that has scoreable visual content (i.e. not pure fade/title-card material), this pass: 1. Asks the vibe-check (CV histogram + pHash) for the top-K candidate scenes. 2. For each candidate, runs the semantic action-window search with the beat's own description, prefering windows whose phase matches the visible part of the beat. 3. Refines the in-point with the regular CV content/motion aligner. 4. Validates the resulting window with the vision phase check, exactly like the main filter. 5. Adds the best validated candidate as a provisional MatchResult. Confirmed and provisional matches both stay subject to the same thresholds used elsewhere; this only adds matches that pass the same quality gates. """ if not beats: return results from dataclasses import replace from src.cv.global_scan import align_in_point_by_content_and_motion, estimate_usable_source_duration from src.cv.scene_indexer import build_scene_index from src.cv.vibe_check import run_vibe_check from src.core.models import MatchResult from src.llm.vision_cache import find_action_window_in_scene, validate_match_window_with_vision logger = logging.getLogger(__name__) results_by_id = {r.beat_id: r for r in results} recovery_targets = [ b for b in beats if ( b.beat_id not in results_by_id or ( not results_by_id[b.beat_id].is_confirmed and results_by_id[b.beat_id].match_score < cfg.cv.deep_scan.match_threshold ) ) ] if not recovery_targets: return results scenes = build_scene_index(cfg) if not scenes: return results target_ids = {b.beat_id for b in recovery_targets} new_results = [r for r in results if r.beat_id not in target_ids] replaced_results = {r.beat_id: r for r in results if r.beat_id in target_ids} for beat in recovery_targets: try: islands = _reference_scoreable_segments(beat, cfg) except Exception: islands = [] # Anchor selection: prefer the longest visible island; if none exists, # fall back to the full beat. The latter handles dark / low-contrast # close-ups that drop below the scoreable luma/contrast thresholds but # are still semantically describable. The strict vision phase # validation later in this pass keeps us from accepting pure title-card # or logo material. from dataclasses import replace as _replace if islands: anchor_start_s, anchor_end_s = max(islands, key=lambda iv: iv[1] - iv[0]) anchor_beat = _replace( beat, start_s=beat.start_s + anchor_start_s, end_s=beat.start_s + anchor_end_s, ) else: anchor_beat = beat try: hits = run_vibe_check( beat, scenes, top_k=max(cfg.cv.deep_scan.scene_seed_top_k, cfg.cv.vibe_check.top_k_candidates), hist_method=cfg.cv.vibe_check.hist_compare_method, phash_max_distance=64, ) except Exception as exc: logger.warning("Beat %d: recovery vibe-check failed (%s)", beat.beat_id, exc) continue scenes_by_id = {s.scene_id: s for s in scenes} best = None # (score, scene, in_s, dur_s, reason) try: from src.llm.vision_cache import ( _load_cache, _semantic_action_groups, _semantic_match_score, _STRONG_ACTION_GROUPS, ) cache = _load_cache(cfg) items = cache.get("items", {}) beat_desc = "" if isinstance(items, dict): for item in items.values(): if ( isinstance(item, dict) and item.get("kind") == "beat" and item.get("item_id") == beat.beat_id ): beat_desc = str(item.get("description", "")) break beat_actions = _semantic_action_groups(beat_desc) & _STRONG_ACTION_GROUPS if beat_desc else set() identity_vocab = { "woman", "women", "man", "men", "girl", "boy", "child", "blonde", "hair", "face", "mouth", "eyes", "profile", "close-up", "closeup", } beat_identity = {term for term in identity_vocab if term in beat_desc.lower()} distinctive_identity = { term for term in ("woman", "women", "blonde", "mouth", "face") if term in beat_desc.lower() } if beat_actions and isinstance(items, dict): for item in items.values(): if not isinstance(item, dict) or item.get("kind") != "action_window": continue scene = scenes_by_id.get(item.get("item_id")) desc = str(item.get("description", "")) source_actions = _semantic_action_groups(desc) if scene is None or not beat_actions <= source_actions: continue source_text = desc.lower() positive_source_text = source_text.split('"negatives"', 1)[0] identity_overlap = {term for term in beat_identity if term in source_text} if len(beat_identity) >= 2 and len(identity_overlap) < 2: continue if distinctive_identity and not any(term in positive_source_text for term in distinctive_identity): continue if "mouth" in beat_desc.lower() and "mouth" not in positive_source_text: continue if "dark interior" in beat_desc.lower() and ( "interior" not in positive_source_text or "dark" not in positive_source_text ): continue score, reason = _semantic_match_score(beat_desc, desc) if score < max(0.60, cfg.cv.deep_scan.provisional_match_threshold): continue try: in_s = float(item.get("start_s")) out_s = float(item.get("end_s")) except (TypeError, ValueError): continue duration_s = max(0.32, min(anchor_beat.duration_s, out_s - in_s)) candidate = ( min(0.99, score), scene, in_s, duration_s, f"cached vision action; {reason}", ) if best is None or candidate[0] > best[0]: best = candidate except Exception as exc: logger.debug("Beat %d: cached vision fallback failed (%s)", beat.beat_id, exc) seen = set() for hit in hits[: cfg.cv.deep_scan.scene_seed_top_k]: scene = scenes_by_id.get(hit.scene_id) if scene is None or scene.scene_id in seen: continue seen.add(scene.scene_id) try: found = find_action_window_in_scene(anchor_beat, scene, cfg) except Exception as exc: logger.debug("Beat %d: action window failed for scene %d (%s)", beat.beat_id, scene.scene_id, exc) continue if found is None: continue start_s, end_s, semantic_score, reason = found window_s = max(3.0, min(8.0, (end_s - start_s) * 4.0)) try: aligned_in_s, combined_score, content_score, motion_score = align_in_point_by_content_and_motion( anchor_beat, start_s, cfg, search_window_s=window_s, ) except Exception as exc: logger.debug("Beat %d: align failed for scene %d (%s)", beat.beat_id, scene.scene_id, exc) aligned_in_s = start_s combined_score = semantic_score content_score = 0.0 motion_score = 0.0 aligned_in_s = max(scene.start_s, min(aligned_in_s, max(scene.start_s, scene.end_s - anchor_beat.duration_s))) try: usable_duration_s, usable_score = estimate_usable_source_duration(anchor_beat, aligned_in_s, cfg) except Exception: usable_duration_s, usable_score = anchor_beat.duration_s, 0.0 usable_duration_s = max(0.0, min(anchor_beat.duration_s, usable_duration_s)) if usable_duration_s < max(0.32, anchor_beat.duration_s * 0.45): usable_duration_s = anchor_beat.duration_s try: ok, verify_reason = validate_match_window_with_vision( anchor_beat, source_path=scene.source_path, scene_id=scene.scene_id, in_point_s=aligned_in_s, out_point_s=aligned_in_s + usable_duration_s, cfg=cfg, ) except Exception as exc: logger.debug("Beat %d: validate failed scene=%d (%s)", beat.beat_id, scene.scene_id, exc) continue if not ok: continue final_score = max( combined_score, min(0.99, semantic_score * 0.65 + motion_score * 0.18 + content_score * 0.09 + usable_score * 0.08), ) if semantic_score >= max(0.60, cfg.cv.deep_scan.provisional_match_threshold): final_score = max(final_score, semantic_score) if final_score < cfg.cv.deep_scan.provisional_match_threshold: continue candidate = (final_score, scene, aligned_in_s, usable_duration_s, f"recovery; {reason}; {verify_reason}") if best is None or candidate[0] > best[0]: best = candidate if best is None: previous = replaced_results.get(beat.beat_id) if previous is not None: new_results.append(previous) continue score, scene, aligned_in_s, usable_duration_s, repair_reason = best logger.info( "Beat %d: recovered via vision action search scene=%d in=%.3fs score=%.3f (%s)", beat.beat_id, scene.scene_id, aligned_in_s, score, repair_reason, ) new_results.append(MatchResult( beat_id=beat.beat_id, scene_id=scene.scene_id, source_path=scene.source_path, in_point_s=aligned_in_s, out_point_s=aligned_in_s + usable_duration_s, in_point_frame=int(aligned_in_s * cfg.export.edl_frame_rate), match_score=score, match_location=(0, 0), is_confirmed=score >= cfg.cv.deep_scan.match_threshold, segments=tuple(), )) return sorted(new_results, key=lambda r: r.beat_id) def _recover_short_lowlight_vibe_matches(results: list, beats: list, cfg) -> list: """Keep obvious short low-light scene hits as provisional instead of no-match. Short blue/dark dialogue shots can be correctly ranked by scene-level histogram/pHash but then rejected by the stricter content aligner because the shot contains little texture, motion blur, or trailer timecode overlay. This fallback only accepts the top vibe scene when it has a clear margin and the local content scan still finds a usable in-point. """ from src.core.models import MatchResult, Scene from src.cv.global_scan import _content_alignment_score, _content_alignment_templates from src.cv.vibe_check import run_vibe_check from src.cv.frame_extractor import open_video matched_ids = {r.beat_id for r in results} targets = [b for b in beats if b.beat_id not in matched_ids and b.duration_s <= 2.25] if not targets: return results raw_scenes = _load_scene_cache_light(cfg) scenes = [ Scene( scene_id=int(s["scene_id"]), source_path=cfg.paths.source_movie, start_s=float(s["start_s"]), end_s=float(s["end_s"]), start_frame=int(s["start_frame"]), end_frame=int(s["end_frame"]), luma_hist=bytes.fromhex(s["luma_hist"]) if s.get("luma_hist") else None, sat_hist=bytes.fromhex(s["sat_hist"]) if s.get("sat_hist") else None, phash=s.get("phash"), ) for s in raw_scenes ] scenes_by_id = {s.scene_id: s for s in scenes} recovered = list(results) with open_video(cfg.paths.source_movie) as cap: for beat in targets: templates = _content_alignment_templates(beat, cfg) if not templates: continue hits = run_vibe_check( beat, scenes, top_k=6, hist_method=cfg.cv.vibe_check.hist_compare_method, phash_max_distance=64, ) if len(hits) < 2: continue top, second = hits[0], hits[1] if top.combined_score < 0.74 or top.combined_score - second.combined_score < 0.03: continue scene = scenes_by_id.get(top.scene_id) if scene is None or scene.duration_s < max(0.5, beat.duration_s): continue best: tuple[float, float] | None = None scan_end = max(scene.start_s, scene.end_s - beat.duration_s) step_s = 0.12 t = scene.start_s while t <= scan_end: score = _content_alignment_score(cap, t, templates, cfg) if best is None or score > best[0]: best = (score, t) t = round(t + step_s, 6) if best is None or best[0] < 0.15: continue content_score, in_point_s = best final_score = max( cfg.cv.deep_scan.provisional_match_threshold, min(0.64, top.combined_score * 0.55 + content_score * 0.45), ) recovered.append(MatchResult( beat_id=beat.beat_id, scene_id=scene.scene_id, source_path=scene.source_path, in_point_s=in_point_s, out_point_s=in_point_s + beat.duration_s, in_point_frame=int(in_point_s * cfg.export.edl_frame_rate), match_score=final_score, match_location=(0, 0), is_confirmed=False, segments=tuple(), )) return sorted(recovered, key=lambda r: r.beat_id) def _filter_semantically_invalid_vision_matches(results: list, beats: list, cfg) -> list: """Drop vision-enabled matches whose final action phase contradicts the beat.""" if not cfg.vision.enabled or not results: return results from dataclasses import replace from src.llm.vision_cache import find_action_window_in_scene, validate_match_window_with_vision from src.cv.scene_indexer import build_scene_index from src.cv.global_scan import align_in_point_by_content_and_motion, estimate_usable_source_duration logger = logging.getLogger(__name__) beats_by_id = {beat.beat_id: beat for beat in beats} scenes_by_id = {scene.scene_id: scene for scene in build_scene_index(cfg)} def visible_content_offset(action_beat, segment_start_offset_s: float) -> float: content_offset_s = 0.0 for start_s, end_s in _reference_scoreable_segments(action_beat, cfg): if end_s <= segment_start_offset_s: content_offset_s += max(0.0, end_s - start_s) elif start_s < segment_start_offset_s: content_offset_s += max(0.0, segment_start_offset_s - start_s) break else: break return content_offset_s def realign_window(check_beat, scene_id: int, action_beat=None): scene = scenes_by_id.get(scene_id) if scene is None: return None segment_window = find_action_window_in_scene(check_beat, scene, cfg) if action_beat is not None and action_beat is not check_beat: beat_window = find_action_window_in_scene(action_beat, scene, cfg) else: beat_window = None use_beat_context = False if segment_window is None: found = beat_window use_beat_context = beat_window is not None elif beat_window is None: found = segment_window elif beat_window[2] > segment_window[2] + 0.06: found = beat_window use_beat_context = True else: found = segment_window if found is None: return None start_s, end_s, semantic_score, reason = found if use_beat_context: segment_start_offset_s = max(0.0, check_beat.start_s - action_beat.start_s) content_offset_s = visible_content_offset(action_beat, segment_start_offset_s) start_s += content_offset_s end_s += content_offset_s window_s = max(3.0, min(8.0, (end_s - start_s) * 4.0)) aligned_in_s, combined_score, content_score, motion_score = align_in_point_by_content_and_motion( check_beat, start_s, cfg, search_window_s=window_s, ) aligned_in_s = max(scene.start_s, min(aligned_in_s, max(scene.start_s, scene.end_s - check_beat.duration_s))) usable_duration_s, usable_score = estimate_usable_source_duration(check_beat, aligned_in_s, cfg) usable_duration_s = max(0.0, min(check_beat.duration_s, usable_duration_s)) if usable_duration_s < max(0.32, check_beat.duration_s * 0.45): usable_duration_s = check_beat.duration_s ok, verify_reason = validate_match_window_with_vision( check_beat, source_path=scene.source_path, scene_id=scene.scene_id, in_point_s=aligned_in_s, out_point_s=aligned_in_s + usable_duration_s, cfg=cfg, ) if not ok: logger.info( "Beat %d: action-window realign rejected scene=%d in=%.3fs (%s)", check_beat.beat_id, scene.scene_id, aligned_in_s, verify_reason, ) return None score = max( combined_score, min(0.99, semantic_score * 0.65 + motion_score * 0.18 + content_score * 0.09 + usable_score * 0.08), ) return scene, aligned_in_s, usable_duration_s, score, f"{reason}; {verify_reason}" kept = [] for result in results: beat = beats_by_id.get(result.beat_id) if beat is None: kept.append(result) continue kept_before = len(kept) try: _filter_repair_one(result, beat, beats_by_id, scenes_by_id, kept, cfg, realign_window, validate_match_window_with_vision, logger) except Exception as exc: logger.warning( "Beat %d: vision filter/repair failed (%s); keeping previous cached match.", result.beat_id, exc, ) del kept[kept_before:] kept.append(result) return kept def _filter_repair_one(result, beat, beats_by_id, scenes_by_id, kept, cfg, realign_window, validate_match_window_with_vision, logger): from dataclasses import replace if True: windows = [] if getattr(result, "segments", ()): for segment in result.segments: segment_beat = replace( beat, start_s=beat.start_s + segment.trailer_offset_s, end_s=beat.start_s + segment.trailer_offset_s + segment.duration_s, ) windows.append(( segment_beat, segment.scene_id, segment.in_point_s, segment.out_point_s, )) else: windows.append((beat, result.scene_id, result.in_point_s, result.out_point_s)) valid = True reasons: list[str] = [] for check_beat, scene_id, in_point_s, out_point_s in windows: ok, reason = validate_match_window_with_vision( check_beat, source_path=result.source_path, scene_id=scene_id, in_point_s=in_point_s, out_point_s=out_point_s, cfg=cfg, ) reasons.append(reason) if not ok: valid = False break if valid: repaired = False if getattr(result, "segments", ()): new_segments = [] repair_reasons = [] changed = False for segment in result.segments: scene = scenes_by_id.get(segment.scene_id) # Allow phase-realign whenever the scene has any meaningful # slack beyond the segment, not only for "long" scenes. # Short scenes don't need realigning because the segment # essentially is the scene. if scene is None or scene.duration_s <= segment.duration_s + 0.5: new_segments.append(segment) continue # For already-confirmed segments, skip the realign to avoid # destabilizing a strong original match. if segment.is_confirmed and scene.duration_s <= max(segment.duration_s * 1.6, 6.0): new_segments.append(segment) continue segment_beat = replace( beat, start_s=beat.start_s + segment.trailer_offset_s, end_s=beat.start_s + segment.trailer_offset_s + segment.duration_s, ) repair = realign_window(segment_beat, segment.scene_id, action_beat=beat) if repair is None: new_segments.append(segment) continue repair_scene, aligned_in_s, usable_duration_s, score, repair_reason = repair if abs(aligned_in_s - segment.in_point_s) <= 1.0 / cfg.export.edl_frame_rate: new_segments.append(segment) continue # Don't commit a repair that scores meaningfully worse than # the original; phase realign should improve, not regress. if score < segment.match_score - 0.02: new_segments.append(segment) continue changed = True repair_reasons.append(repair_reason) new_segments.append(replace( segment, scene_id=repair_scene.scene_id, in_point_s=aligned_in_s, out_point_s=aligned_in_s + usable_duration_s, duration_s=usable_duration_s, match_score=score, is_confirmed=score >= cfg.cv.deep_scan.match_threshold, )) if changed and new_segments: first = new_segments[0] repaired_score = min(seg.match_score for seg in new_segments) logger.info( "Beat %d: realigned semantically valid long scene by motion/action windows (%s)", result.beat_id, "; ".join(repair_reasons), ) kept.append(replace( result, scene_id=first.scene_id, in_point_s=first.in_point_s, out_point_s=first.out_point_s, in_point_frame=int(first.in_point_s * cfg.export.edl_frame_rate), match_score=repaired_score, is_confirmed=repaired_score >= cfg.cv.deep_scan.match_threshold, segments=tuple(new_segments), )) repaired = True else: scene = scenes_by_id.get(result.scene_id) wide_scene = ( scene is not None and scene.duration_s > result.duration_s + 0.5 ) already_confirmed_in_tight_scene = ( result.is_confirmed and scene is not None and scene.duration_s <= max(result.duration_s * 1.6, 6.0) ) if wide_scene and not already_confirmed_in_tight_scene: repair = realign_window(beat, result.scene_id) if repair is not None: repair_scene, aligned_in_s, usable_duration_s, score, repair_reason = repair moved = abs(aligned_in_s - result.in_point_s) > 1.0 / cfg.export.edl_frame_rate improved = score >= result.match_score - 0.02 if moved and improved: logger.info( "Beat %d: realigned semantically valid long scene by motion/action window (%s)", result.beat_id, repair_reason, ) kept.append(replace( result, scene_id=repair_scene.scene_id, in_point_s=aligned_in_s, out_point_s=aligned_in_s + usable_duration_s, in_point_frame=int(aligned_in_s * cfg.export.edl_frame_rate), match_score=score, is_confirmed=score >= cfg.cv.deep_scan.match_threshold, )) repaired = True if not repaired: kept.append(result) else: if getattr(result, "segments", ()): new_segments = [] all_repaired = True repair_reasons = [] for segment in result.segments: segment_beat = replace( beat, start_s=beat.start_s + segment.trailer_offset_s, end_s=beat.start_s + segment.trailer_offset_s + segment.duration_s, ) repair = realign_window(segment_beat, segment.scene_id, action_beat=beat) if repair is None: all_repaired = False break scene, aligned_in_s, usable_duration_s, score, repair_reason = repair repair_reasons.append(repair_reason) new_segments.append(replace( segment, scene_id=scene.scene_id, in_point_s=aligned_in_s, out_point_s=aligned_in_s + usable_duration_s, duration_s=usable_duration_s, match_score=score, is_confirmed=score >= cfg.cv.deep_scan.match_threshold, )) if all_repaired and new_segments: first = new_segments[0] repaired_score = min(seg.match_score for seg in new_segments) logger.info( "Beat %d: realigned inside matched scene by vision action windows (%s)", result.beat_id, "; ".join(repair_reasons), ) kept.append(replace( result, scene_id=first.scene_id, in_point_s=first.in_point_s, out_point_s=first.out_point_s, in_point_frame=int(first.in_point_s * cfg.export.edl_frame_rate), match_score=repaired_score, is_confirmed=repaired_score >= cfg.cv.deep_scan.match_threshold, segments=tuple(new_segments), )) return else: repair = realign_window(beat, result.scene_id) if repair is not None: scene, aligned_in_s, usable_duration_s, score, repair_reason = repair logger.info( "Beat %d: realigned inside matched scene by vision action window (%s)", result.beat_id, repair_reason, ) kept.append(replace( result, scene_id=scene.scene_id, in_point_s=aligned_in_s, out_point_s=aligned_in_s + usable_duration_s, in_point_frame=int(aligned_in_s * cfg.export.edl_frame_rate), match_score=score, is_confirmed=score >= cfg.cv.deep_scan.match_threshold, )) return logger.warning( "Beat %d: rejected by vision action-phase verification (%s)", result.beat_id, "; ".join(reasons), ) def _attach_visual_segments(results: list, beats: list, cfg) -> list: """Attach automatic sub-shot matches for multi-island trailer beats.""" from dataclasses import replace from src.core.models import MatchResult, MatchSegment from src.cv.global_scan import run_global_scan by_id = {b.beat_id: b for b in beats} expanded: list[MatchResult] = [] for result in results: beat = by_id.get(result.beat_id) if beat is None: expanded.append(result) continue if getattr(result, "segments", ()): expanded.append(result) continue islands = _reference_scoreable_segments(beat, cfg) if len(islands) <= 1: primary = MatchSegment( trailer_offset_s=0.0, duration_s=max(0.0, result.duration_s), scene_id=result.scene_id, in_point_s=result.in_point_s, out_point_s=result.out_point_s, match_score=result.match_score, is_confirmed=result.is_confirmed, ) expanded.append(replace(result, segments=(primary,))) continue segments: list[MatchSegment] = [] first_start, first_end = islands[0] first_duration = min(max(0.0, result.duration_s), max(0.0, first_end - first_start)) segments.append( MatchSegment( trailer_offset_s=first_start, duration_s=first_duration, scene_id=result.scene_id, in_point_s=result.in_point_s, out_point_s=result.in_point_s + first_duration, match_score=result.match_score, is_confirmed=result.is_confirmed, ) ) for start_s, end_s in islands[1:]: segment_beat = replace( beat, start_s=beat.start_s + start_s, end_s=beat.start_s + end_s, ) segment_matches = run_global_scan([segment_beat], cfg, seed_in_points=None) if not segment_matches: continue seg = segment_matches[0] if seg.match_score < cfg.cv.deep_scan.multi_shot_segment_threshold: repaired = _local_same_scene_segment_match( segment_beat, beat, start_s, cached + expanded, cfg, ) if ( repaired is None or repaired.match_score < max( cfg.cv.deep_scan.multi_shot_segment_threshold, seg.match_score + cfg.cv.deep_scan.duration_tie_break_score_delta, ) ): scenes = _load_scene_cache_light(cfg) scene = _scene_by_id_light(scenes, seg.scene_id) probe = ( _phase_probe_segment_in_scene(segment_beat, scene, seg.in_point_s, cfg) if scene is not None else None ) if probe is None: continue in_point_s, _phase_score = probe from dataclasses import replace as _replace seg = _replace( seg, in_point_s=in_point_s, out_point_s=in_point_s + seg.duration_s, match_score=max(seg.match_score, _phase_score), is_confirmed=_phase_score >= cfg.cv.deep_scan.match_threshold, ) else: seg = repaired seg_dur = min(max(0.0, end_s - start_s), max(0.0, seg.duration_s)) segments.append( MatchSegment( trailer_offset_s=start_s, duration_s=seg_dur, scene_id=seg.scene_id, in_point_s=seg.in_point_s, out_point_s=seg.in_point_s + seg_dur, match_score=seg.match_score, is_confirmed=seg.is_confirmed, ) ) expanded.append(replace(result, segments=tuple(segments))) return expanded def _fast_vision_match_cfg(cfg): """Return a vision-seed prepass config that still keeps quality settings.""" from dataclasses import replace return replace( cfg, cv=replace( cfg.cv, deep_scan=replace(cfg.cv.deep_scan, skip_coarse_scan_with_weighted_seeds=True), ), vision=replace( cfg.vision, fullscan_fallback=False, ), ) def _run_segment_match(segment_beat, continuity, cfg, allow_fullscan: bool = True): """Match one visual island with the same generic staged strategy as a beat.""" from src.pipeline.matcher import run_matching if cfg.vision.enabled: fast_cfg = _fast_vision_match_cfg(cfg) fast_matches = run_matching( fast_cfg, [segment_beat], seed_in_points=continuity, ) if fast_matches: if not allow_fullscan or all( m.is_confirmed or m.match_score >= cfg.cv.deep_scan.match_threshold for m in fast_matches ): return fast_matches if not allow_fullscan: return fast_matches if cfg.vision.enabled else [] full_matches = run_matching( cfg, [segment_beat], seed_in_points=continuity, ) return _merge_best_results(fast_matches if cfg.vision.enabled else [], full_matches, cfg) def _match_unmatched_visual_segments( results: list, beats: list, cached: list, cfg, skip_global_segment_scan_for: set[int] | None = None, ) -> list: """Create segmented provisional matches when a whole beat has no single match.""" from dataclasses import replace from src.core.models import MatchResult, MatchSegment from src.cv.frame_extractor import get_video_info matched_ids = {r.beat_id for r in results} expanded = list(results) skip_global_segment_scan_for = skip_global_segment_scan_for or set() try: fps = float(get_video_info(cfg.paths.source_movie)["fps"]) or cfg.export.edl_frame_rate except Exception: fps = cfg.export.edl_frame_rate for beat in beats: if beat.beat_id in matched_ids: continue # Per-shot matching when the beat has either fade-bounded islands # OR internal hard cuts; each shot becomes its own MatchSegment. islands = _reference_shot_segments(beat, cfg) if not islands: continue segments: list[MatchSegment] = [] for island_idx, (start_s, end_s) in enumerate(islands): segment_beat = replace( beat, start_s=beat.start_s + start_s, end_s=beat.start_s + end_s, ) continuity = _continuity_seed_in_points( beat.beat_id, [b if b.beat_id != beat.beat_id else segment_beat for b in beats], cached + expanded, cfg, ) segment_matches = [] if beat.beat_id not in skip_global_segment_scan_for: segment_matches = _run_segment_match(segment_beat, continuity, cfg, allow_fullscan=True) if not segment_matches: # Fade-content shot fallback: when CV finds no templates # inside this shot (typical for cross-fade silhouettes), the # vibe-check + vision-action-window recovery path is the only # way to get a match. It's slower but works on dark frames # because vision can read structure where CV cannot. shot_islands = _reference_scoreable_segments(segment_beat, cfg) if not shot_islands and cfg.vision.enabled: recovered = _recover_unmatched_beats_via_vision([], [segment_beat], cfg) if recovered: rec = recovered[0] seg_dur = min(max(0.0, end_s - start_s), max(0.0, rec.duration_s)) if ( seg_dur > 0 and rec.match_score >= cfg.cv.deep_scan.multi_shot_segment_threshold ): segments.append(MatchSegment( trailer_offset_s=start_s, duration_s=seg_dur, scene_id=rec.scene_id, in_point_s=rec.in_point_s, out_point_s=rec.in_point_s + seg_dur, match_score=rec.match_score, is_confirmed=rec.is_confirmed, )) continue local_segment = _local_same_scene_segment_match( segment_beat, beat, start_s, cached + expanded, cfg, ) if local_segment is not None: segments.append(local_segment) continue seg = segment_matches[0] if seg.match_score < cfg.cv.deep_scan.multi_shot_segment_threshold: continue seg_dur = min(max(0.0, end_s - start_s), max(0.0, seg.duration_s)) segments.append( MatchSegment( trailer_offset_s=start_s, duration_s=seg_dur, scene_id=seg.scene_id, in_point_s=seg.in_point_s, out_point_s=seg.in_point_s + seg_dur, match_score=seg.match_score, is_confirmed=seg.is_confirmed, ) ) if not segments: continue first = segments[0] total_segment_duration = sum(max(0.0, s.duration_s) for s in segments) score = ( sum(max(0.0, s.duration_s) * s.match_score for s in segments) / total_segment_duration if total_segment_duration > 0 else min(s.match_score for s in segments) ) expanded.append( MatchResult( beat_id=beat.beat_id, scene_id=first.scene_id, source_path=cfg.paths.source_movie, in_point_s=first.in_point_s, out_point_s=first.out_point_s, in_point_frame=int(max(0.0, first.in_point_s) * fps), match_score=score, is_confirmed=all(s.is_confirmed for s in segments), segments=tuple(segments), ) ) return expanded def _local_same_scene_segment_match(segment_beat, beat, segment_offset_s: float, cached: list, cfg): """Find a short trailer island inside scenes adjacent to neighbouring beat matches.""" from src.core.models import MatchSegment from src.cv.frame_extractor import open_video from src.cv.global_scan import _content_alignment_score, _content_alignment_templates scenes = _load_scene_cache_light(cfg) if not scenes: return None by_id = {r.beat_id: r for r in cached} scene_ids: list[int] = [] for neighbour_id in (beat.beat_id - 1, beat.beat_id + 1): result = by_id.get(neighbour_id) if result is None: continue ids = [getattr(s, "scene_id", result.scene_id) for s in getattr(result, "segments", ())] or [result.scene_id] for scene_id in ids: if scene_id not in scene_ids: scene_ids.append(scene_id) if not scene_ids: return None templates = _content_alignment_templates(segment_beat, cfg) if not templates: return None min_score = min( cfg.cv.deep_scan.provisional_content_threshold * 0.70, cfg.cv.deep_scan.provisional_match_threshold, ) # Coarse repair scan over already plausible neighbouring scenes. A frame-step # sweep across long dialogue scenes is slow and can overfit static layouts. step_s = max( cfg.vision.local_scan_step_s, cfg.cv.deep_scan.content_align_sample_step_s, 0.25, ) best: tuple[float, float, int] | None = None with open_video(cfg.paths.source_movie) as cap: for scene_id in scene_ids: scene = next((s for s in scenes if int(s["scene_id"]) == int(scene_id)), None) if scene is None: continue start_s = max(0.0, float(scene["start_s"]) - 0.25) end_s = max(start_s, float(scene["end_s"]) - max(0.04, segment_beat.duration_s) + 0.25) max_points = max(4, min(48, int(cfg.vision.local_scan_max_points_per_scene))) scene_step_s = max(step_s, (end_s - start_s) / max_points) t = start_s while t <= end_s: score = _content_alignment_score(cap, t, templates, cfg) if best is None or score > best[0]: best = (score, t, int(scene_id)) t = round(t + scene_step_s, 6) if best is None or best[0] < min_score: return None score, in_point_s, scene_id = best duration_s = max(0.0, min(segment_beat.duration_s, segment_beat.end_s - segment_beat.start_s)) return MatchSegment( trailer_offset_s=segment_offset_s, duration_s=duration_s, scene_id=scene_id, in_point_s=in_point_s, out_point_s=in_point_s + duration_s, match_score=score, is_confirmed=score >= cfg.cv.deep_scan.match_threshold, ) def _phase_probe_segment_in_scene(segment_beat, scene: dict, original_in_s: float, cfg): """Retune a weak multi-shot segment inside its own scene using saliency-weighted frames.""" import cv2 import numpy as np offsets = [0.0, 0.16, 0.32, 0.48, 0.64, 0.80, 0.96, 1.12] size = (160, 90) def prepared_gray(frame): if frame is None: return None h, w = frame.shape[:2] frame = frame.copy() # Timecode overlays and letterbox edges are trailer/source-specific and # should not pull the phase toward the wrong moment. frame[: int(h * 0.16), : int(w * 0.32)] = 0 gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) gray = cv2.resize(gray, size) return cv2.equalizeHist(gray).astype("float32") / 255.0 def edge(gray): return cv2.Canny((gray * 255).astype("uint8"), 45, 130).astype("float32") / 255.0 def pair_score(ref_gray, src_gray, mask): if ref_gray is None or src_gray is None: return None pixel = 1.0 - float((np.abs(ref_gray - src_gray) * mask).sum()) edge_score = 1.0 - float((np.abs(edge(ref_gray) - edge(src_gray)) * mask).sum()) return 0.65 * pixel + 0.35 * edge_score def frame_at(cap, t_s): cap.set(cv2.CAP_PROP_POS_MSEC, t_s * 1000.0) ok, frame = cap.read() return frame if ok else None trailer_cap = cv2.VideoCapture(str(cfg.paths.reference_trailer)) ref_candidates = [] fallback_items = [] for offset in offsets: if offset > segment_beat.duration_s + 0.04: continue frame = frame_at(trailer_cap, segment_beat.start_s + offset) ref = prepared_gray(frame) if ref is None: continue fallback_items.append((offset, ref)) raw_gray = cv2.resize(cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY), size) h, w = raw_gray.shape[:2] raw_gray[: int(h * 0.16), : int(w * 0.32)] = 0 roi = raw_gray[int(h * 0.12) : int(h * 0.90), :] mean_luma = float(roi.mean() / 255.0) p90_luma = float(np.percentile(roi, 90) / 255.0) contrast = float(roi.std() / 255.0) ref_candidates.append((offset, ref, mean_luma, p90_luma, contrast)) transition_start = False ref_items = [] if ref_candidates: max_mean = max(item[2] for item in ref_candidates) max_p90 = max(item[3] for item in ref_candidates) transition_start = ( ref_candidates[0][2] < max_mean * 0.90 or ref_candidates[0][3] < max_p90 * 0.90 ) ref_items = [ (offset, ref) for offset, ref, mean_luma, p90_luma, contrast in ref_candidates if ( mean_luma >= max(0.16, max_mean * 0.82) and p90_luma >= max(0.28, max_p90 * 0.86) and contrast >= 0.035 ) ] if len(ref_items) < 4: ref_items = fallback_items if len(ref_items) < 4: return None ref_offsets = [item[0] for item in ref_items] refs = [item[1] for item in ref_items] align_offset = ref_offsets[0] ref_offsets = [offset - align_offset for offset in ref_offsets] ref_stack = np.stack(refs, axis=0) edge_stack = np.stack([edge(ref) for ref in refs], axis=0) # Static window/room edges are useful for finding the scene, but toxic for # phase retuning inside a repeated dialogue shot. Bias the mask toward # areas that actually change across the reference segment. saliency = ref_stack.std(axis=0) * 3.0 + edge_stack.std(axis=0) * 0.75 + edge_stack.mean(axis=0) * 0.15 saliency[:, : int(size[0] * 0.12)] *= 0.15 saliency[: int(size[1] * 0.16), : int(size[0] * 0.32)] = 0.0 threshold = np.quantile(saliency, 0.66) mask = (saliency >= threshold).astype("float32") mask /= mask.sum() + 1e-6 scene_start = float(scene["start_s"]) scene_end = float(scene["end_s"]) center_t = max(scene_start, min(scene_end, original_in_s + align_offset)) retune_radius_s = max(4.0, min(12.0, segment_beat.duration_s * 2.5)) scan_start = max(scene_start, center_t - retune_radius_s) scene_scan_end = min(scene_end, center_t + retune_radius_s) scan_end = max(scan_start, scene_scan_end - max(0.04, segment_beat.duration_s - align_offset)) max_points = 400 step_s = max(0.04, (scan_end - scan_start) / max_points) source_cap = cv2.VideoCapture(str(cfg.paths.source_movie)) source_fps = source_cap.get(cv2.CAP_PROP_FPS) or _scene_fps_light(scene, cfg) stride = max(1, int(round(step_s * source_fps))) start_frame = max(0, int(round(scan_start * source_fps))) end_frame = max(start_frame, int(round(scene_scan_end * source_fps))) times: list[float] = [] source_frames: list = [] frame_idx = start_frame while frame_idx <= end_frame: source_cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx) ok, frame = source_cap.read() if not ok: break times.append(frame_idx / source_fps) source_frames.append(prepared_gray(frame)) frame_idx += stride base_time = times[0] if times else scan_start candidates: list[tuple[float, float, float]] = [] for i, t in enumerate(times): if t > scan_end: break vals = [] src_for_offsets = [] for offset, ref in zip(ref_offsets, refs): j = int(round((t + offset - base_time) / step_s)) if 0 <= j < len(source_frames): src = source_frames[j] score = pair_score(ref, src, mask) else: src = None score = None if score is not None: vals.append(score) src_for_offsets.append(src) if len(vals) >= 4: avg_score = sum(vals) / len(vals) early_count = min(2, len(vals)) tail_count = min(2, len(vals)) early_score = sum(vals[:early_count]) / early_count tail_score = sum(vals[-tail_count:]) / tail_count motion_vals = [] for idx in range(1, min(len(refs), len(src_for_offsets))): if src_for_offsets[idx - 1] is None or src_for_offsets[idx] is None: continue ref_motion = refs[idx] - refs[idx - 1] src_motion = src_for_offsets[idx] - src_for_offsets[idx - 1] motion_vals.append(1.0 - float((np.abs(ref_motion - src_motion) * mask).sum())) motion_score = sum(motion_vals) / len(motion_vals) if motion_vals else avg_score # Phase retuning must reject "same shot, wrong moment" matches. # A plain average can hide a bad onset inside slow dialogue shots; # keep the low-water mark, onset, and frame-to-frame motion influential. phase_score = ( 0.26 * avg_score + 0.24 * min(vals) + 0.24 * early_score + 0.08 * tail_score + 0.18 * motion_score ) candidates.append((phase_score, min(vals), t)) if not candidates: return None candidates.sort(reverse=True) best_score = candidates[0][0] tie_window = 0.006 if transition_start else 0.002 near_tie = [c for c in candidates if c[0] >= best_score - tie_window] if transition_start: chosen = max(near_tie, key=lambda c: (c[1], c[0])) else: chosen = min(near_tie, key=lambda c: abs((c[2] - align_offset) - original_in_s)) return max(scene_start, chosen[2] - align_offset), chosen[0] def cmd_match(args: argparse.Namespace, cfg) -> list: from src.pipeline.matcher import run_matching from dataclasses import replace if getattr(args, "vision", False): cfg = replace(cfg, vision=replace(cfg.vision, enabled=True)) if getattr(args, "no_vision", False): cfg = replace(cfg, vision=replace(cfg.vision, enabled=False)) all_beats = _load_beats(cfg) beats = _select_beats(all_beats, getattr(args, "beat", None)) cached = _normalize_cached_results(all_beats, _load_results(cfg), cfg) if _results_cache_path(cfg).exists() else [] # Multi-shot beats: either fade-bounded multiple islands, OR a single # island with internal hard cuts (e.g. man-shot then back to woman). Both # cases are routed through the per-segment match path so each shot gets # its own source clip instead of being approximated by one continuous # span. multi_island_beat_ids = { beat.beat_id for beat in beats if len(_reference_shot_segments(beat, cfg)) > 1 } scan_beats, single_island_trims = _trim_beats_to_single_visual_island(beats, cfg) scan_beats = [b for b in scan_beats if b.beat_id not in multi_island_beat_ids] seed_in_points = ( _continuity_seed_in_points(args.beat, all_beats, cached, cfg) if getattr(args, "beat", None) is not None else None ) results = [] if cfg.vision.enabled: fast_cfg = _fast_vision_match_cfg(cfg) results = run_matching( fast_cfg, scan_beats, force_reindex=args.force_reindex, seed_in_points=seed_in_points, ) if len(results) < len(scan_beats) or any( not r.is_confirmed and r.match_score < cfg.cv.deep_scan.match_threshold for r in results ): results_by_id = {r.beat_id: r for r in results} remaining_beats = [ b for b in scan_beats if ( b.beat_id not in results_by_id or ( not results_by_id[b.beat_id].is_confirmed and results_by_id[b.beat_id].match_score < cfg.cv.deep_scan.match_threshold ) ) ] if remaining_beats: full_results = run_matching( cfg, remaining_beats, force_reindex=args.force_reindex, seed_in_points=seed_in_points, ) results = _merge_best_results(results, full_results, cfg) results = _apply_single_island_segments(results, single_island_trims) results = _match_unmatched_visual_segments( results, beats, cached, cfg, skip_global_segment_scan_for=set(single_island_trims), ) results = _attach_visual_segments(results, beats, cfg) results = _filter_semantically_invalid_vision_matches(results, beats, cfg) results = _recover_unmatched_beats_via_vision(results, beats, cfg) results = _recover_short_lowlight_vibe_matches(results, beats, cfg) # A targeted one-beat match must NEVER delete or modify any other beat's # cache entry. We deliberately re-load the raw cache from disk here so # the upstream normalisation pass (which drops entries that no longer # pass current quality gates) cannot leak into the save: only the # targeted beat's slot gets replaced, every other entry is written back # bit-for-bit identical to what it was before this run. if getattr(args, "beat", None) is not None and _results_cache_path(cfg).exists(): raw_cached = _load_results(cfg) old_for_beat = next((r for r in raw_cached if r.beat_id == args.beat), None) raw_cached = [r for r in raw_cached if r.beat_id != args.beat] for result in results: if _keeps_cached_match(old_for_beat, result, cfg): print( f"ℹ️ Beat {result.beat_id}: keeping existing {len(getattr(old_for_beat, 'segments', ()) or ())}‑segment " f"provisional match (score {old_for_beat.match_score:.3f}) over weaker new result " f"(score {result.match_score:.3f}, no segments)." ) raw_cached.append(old_for_beat) else: raw_cached = _update_result(result, raw_cached) results_to_save = sorted(raw_cached, key=lambda r: r.beat_id) else: results_to_save = results _save_results(results_to_save, cfg) force_report_beats = {int(args.beat)} if getattr(args, "beat", None) is not None else None _regenerate_cutter_report(cfg, force_beats=force_report_beats) print(f"\n✅ {len(results)} / {len(beats)} beats matched.") for r in results: print(f" Beat {r.beat_id:03d} → scene {r.scene_id:04d} " f"in={r.in_point_s:>8.3f}s score={r.match_score:.3f}") return results def _update_result(new_result, results: list) -> list: """Replace or insert a MatchResult in the list (by beat_id).""" updated = [r for r in results if r.beat_id != new_result.beat_id] updated.append(new_result) return sorted(updated, key=lambda r: r.beat_id) def _continuity_seed_in_points(beat_id: int, beats: list, results: list, cfg) -> dict[int, list[float | tuple[float, float]]]: beats_by_id = {b.beat_id: b for b in beats} results_by_id = {r.beat_id: r for r in results} target = beats_by_id.get(beat_id) if target is None: return {} seeds: list[tuple[float, float]] = [] base_score = max(cfg.cv.deep_scan.coarse_candidate_threshold + 0.08, 0.92) prev_matches = [ (b, results_by_id[b.beat_id]) for b in beats if b.beat_id < beat_id and b.beat_id in results_by_id ] if prev_matches: prev_beat, prev_result = max(prev_matches, key=lambda item: item[0].beat_id) trailer_gap_s = max(0.0, target.start_s - prev_beat.end_s) expected = prev_result.out_point_s + trailer_gap_s for offset in cfg.cv.deep_scan.continuity_seed_offsets_s: offset_score = max( cfg.cv.deep_scan.coarse_candidate_threshold, base_score - abs(offset) * 0.06, ) seeds.append((expected + offset, offset_score)) next_matches = [ (b, results_by_id[b.beat_id]) for b in beats if b.beat_id > beat_id and b.beat_id in results_by_id ] if next_matches: next_beat, next_result = min(next_matches, key=lambda item: item[0].beat_id) trailer_gap_s = max(0.0, next_beat.start_s - target.end_s) expected = next_result.in_point_s - trailer_gap_s - target.duration_s for offset in cfg.cv.deep_scan.continuity_seed_offsets_s: offset_score = max( cfg.cv.deep_scan.coarse_candidate_threshold, base_score - abs(offset) * 0.06, ) seeds.append((expected - offset, offset_score)) unique: dict[float, float] = {} for seed_t, seed_score in seeds: rounded = round(max(0.0, seed_t), 3) unique[rounded] = max(unique.get(rounded, 0.0), seed_score) points = [(seed_t, score) for seed_t, score in sorted(unique.items())] return {beat_id: points} if points else {} def cmd_rematch(args: argparse.Namespace, cfg) -> None: """ Re-run automatic matching for ONE beat. python cli.py rematch --beat 5 # re-scan CV for beat 5 python cli.py rematch --beat 5 --threshold 0.40 # relax threshold """ beat_id = args.beat beats = _load_beats(cfg) results = _load_results(cfg) if _results_cache_path(cfg).exists() else [] beat = next((b for b in beats if b.beat_id == beat_id), None) if beat is None: print(f"\u274c Beat {beat_id} not found. Run 'analyze' first.") return # ---- Refine an already acceptable cached match ------------------------- if args.refine: current = next((r for r in results if r.beat_id == beat_id), None) if current is None: print(f"❌ Beat {beat_id} has no cached match to refine. Run 'match --beat {beat_id}' first.") return from src.cv.content_align import align_cached_match_by_content refined_in_s, sequence_score = align_cached_match_by_content( beat, current.in_point_s, cfg, search_window_s=args.refine_window, ) usable_duration_s = max(0.0, current.out_point_s - current.in_point_s) span_score = sequence_score scene_data = _scene_for_time_light(_load_scene_cache_light(cfg), refined_in_s, cfg) out_point_s = refined_in_s + usable_duration_s if scene_data is not None: out_point_s = min(out_point_s, float(scene_data["end_s"])) matchable_duration_s = beat.duration_s duration_coverage = ( max(0.0, out_point_s - refined_in_s) / matchable_duration_s if matchable_duration_s > 0 else 0.0 ) if duration_coverage < cfg.cv.deep_scan.min_duration_coverage: print( f"❌ Beat {beat_id} refined candidate rejected: " f"duration coverage {duration_coverage:.0%} < " f"{cfg.cv.deep_scan.min_duration_coverage:.0%}" ) return try: from src.cv.frame_extractor import get_video_info fps = float(get_video_info(cfg.paths.source_movie)["fps"]) or cfg.export.edl_frame_rate except Exception: fps = cfg.export.edl_frame_rate from src.core.models import MatchResult refined = MatchResult( beat_id=beat_id, scene_id=int(scene_data["scene_id"]) if scene_data is not None else current.scene_id, source_path=current.source_path, in_point_s=max(0.0, refined_in_s), out_point_s=out_point_s, in_point_frame=int(max(0.0, refined_in_s) * fps), match_score=max(sequence_score, span_score), match_location=current.match_location, is_confirmed=max(sequence_score, span_score) >= cfg.cv.deep_scan.match_threshold, ) results = _update_result(refined, results) _save_results(results, cfg) print( f"✅ Beat {beat_id} refined → " f"in={refined.in_point_s:.3f}s, out={refined.out_point_s:.3f}s, " f"sequence_score={refined.match_score:.3f}" ) return # ---- Re-run CV with optional threshold override ------------------------ from dataclasses import replace as dc_replace run_cfg = cfg if args.threshold is not None: run_cfg = dc_replace( cfg, cv=dc_replace( cfg.cv, deep_scan=dc_replace(cfg.cv.deep_scan, match_threshold=args.threshold), ), ) print(f"ℹ️ threshold overridden to {args.threshold} for beat {beat_id}") from src.cv.global_scan import run_global_scan seed_in_points = _continuity_seed_in_points(beat_id, beats, results, run_cfg) matches = run_global_scan([beat], run_cfg, seed_in_points=seed_in_points) if not matches: print(f"❌ Beat {beat_id}: no match. Try --threshold 0.40.") return match = matches[0] results = _update_result(match, results) _save_results(results, cfg) print(f"✅ Beat {beat_id} rematched → (in={match.in_point_s:.3f}s, score={match.match_score:.3f})") def cmd_report(args: argparse.Namespace, cfg) -> None: if getattr(args, "beat", None) is not None: print(f"\n⚠️ Generating cutter report for all beats (ignoring --beat {args.beat}).") _regenerate_cutter_report(cfg) project_root = cfg.paths.cache_dir.parent print(f"\n✅ Report → {project_root / 'CUTTER_REPORT.html'} and CUTTER_REPORT.md") def cmd_export(args: argparse.Namespace, cfg) -> None: from src.export.edl_writer import write_edl from src.export.fcpxml_writer import write_fcpxml from src.pipeline.matcher import build_timeline beats = _select_beats(_load_beats(cfg), getattr(args, "beat", None)) beat_ids = {b.beat_id for b in beats} if getattr(args, "beat", None) is not None else None results = _select_results(_normalize_cached_results(_load_beats(cfg), _load_results(cfg), cfg), beat_ids) if getattr(args, "beat", None) is not None and not results: print(f"❌ Beat {args.beat} has no cached match. Run 'match --beat {args.beat}' first.") return timeline = build_timeline(beats, results, cfg) fmt = args.format or cfg.export.output_format beat_id = getattr(args, "beat", None) out_stem = ( f"{cfg.paths.reference_trailer.stem}_beat_{beat_id:03d}" if beat_id is not None else timeline.title ) if fmt in ("fcpxml", "both"): out = write_fcpxml(timeline, cfg, output_path=cfg.paths.output_dir / f"{out_stem}.fcpxml") print(f"✅ FCPXML → {out}") if fmt in ("edl", "both"): out = write_edl(timeline, cfg, output_path=cfg.paths.output_dir / f"{out_stem}.edl") print(f"✅ EDL → {out}") def cmd_run(args: argparse.Namespace, cfg) -> None: """Full pipeline: analyze → match → report → export.""" cmd_analyze(args, cfg) cmd_match(args, cfg) cmd_report(args, cfg) cmd_export(args, cfg) # --------------------------------------------------------------------------- # Argument parser # --------------------------------------------------------------------------- def _build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( prog="ai-trailer", description="AI Trailer Generator v2 — Pure CV scene matching", formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument( "--config", type=Path, default=Path("config.toml"), metavar="CONFIG", help="Path to config.toml (default: ./config.toml)", ) parser.add_argument( "--log-level", default="INFO", choices=["DEBUG", "INFO", "WARNING", "ERROR"], help="Logging verbosity (default: INFO)", ) sub = parser.add_subparsers(dest="command", required=True) # analyze p_analyze = sub.add_parser("analyze", help="Detect trailer beats + fingerprint") p_analyze.add_argument("--no-audio", action="store_true", help="Skip Whisper (only affects beat labels, not matching)") p_analyze.add_argument("--no-llm", action="store_true", help="Skip LLM classification (only affects beat labels)") # match p_match = sub.add_parser("match", help="Run 2-phase CV matching") p_match.add_argument("--force-reindex", action="store_true", help="Ignore scene cache and re-run PySceneDetect") p_match.add_argument("--beat", type=int, help="Match only one beat and merge it into the cached results") p_match.add_argument("--vision", action="store_true", help="Enable cached vision descriptions for extra automatic search seeds") p_match.add_argument("--no-vision", action="store_true", help="Disable vision seeding even if [vision].enabled is true") # rematch p_rematch = sub.add_parser("rematch", help="Re-run or override matching for one beat") p_rematch.add_argument("--beat", type=int, required=True, help="Beat ID to rematch") p_rematch.add_argument("--threshold", type=float, default=None, help="Override match_threshold") p_rematch.add_argument("--refine", action="store_true", help="Refine the cached match by measuring a local image-content offset") p_rematch.add_argument("--refine-window", type=float, default=None, help="Seconds to search around the cached in-point when using --refine") # report p_report = sub.add_parser("report", help="Generate HTML visual comparison report") p_report.add_argument("--beat", type=int, help="Report only one beat") # export p_export = sub.add_parser("export", help="Export timeline from cached results") p_export.add_argument("--format", choices=["fcpxml", "edl", "both"], help="Override [export] output_format from config") p_export.add_argument("--beat", type=int, help="Export only one beat") # run p_run = sub.add_parser("run", help="Full pipeline: analyze → match → export") p_run.add_argument("--no-audio", action="store_true") p_run.add_argument("--no-llm", action="store_true") p_run.add_argument("--force-reindex", action="store_true") p_run.add_argument("--vision", action="store_true") p_run.add_argument("--no-vision", action="store_true") p_run.add_argument("--format", choices=["fcpxml", "edl", "both"]) p_run.add_argument("--beat", type=int, help="Run match/report/export for only one cached beat") return parser # --------------------------------------------------------------------------- # Entry point # --------------------------------------------------------------------------- def main() -> None: _ensure_utf8_console() parser = _build_parser() args = parser.parse_args() _setup_logging(args.log_level) from src.core.config import load_config cfg = load_config(args.config) dispatch = { "analyze": cmd_analyze, "match": cmd_match, "rematch": cmd_rematch, "report": cmd_report, "export": cmd_export, "run": cmd_run, } handler = dispatch[args.command] handler(args, cfg) if __name__ == "__main__": main()