Files
aitrailer/cli.py
T
Melbar 45769aa366 Refactor report pipeline: redesign HTML, add motion alignment, remove legacy reporter
- scripts/generate_cutter_report.py: complete HTML redesign with glassmorphism
  dark-mode style, compare video links in markdown output
- cli.py: cmd_report now calls _regenerate_cutter_report directly; also writes
  legacy match_report.html; removes dependency on src/pipeline/reporter.py
- src/cv/global_scan.py: add motion-phase alignment refinement step after
  initial in-point search (align_in_point_by_motion, threshold +0.015)
- Remove HANDOVER.md and src/pipeline/reporter.py (superseded)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-06 12:44:10 +02:00

2046 lines
83 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
cli.py — AI Trailer Generator v2 — Command-Line Interface
Usage:
python cli.py analyze [--config CONFIG] [--no-audio] [--no-llm]
python cli.py match [--config CONFIG] [--force-reindex]
python cli.py rematch --beat N [--threshold F] [--refine]
python cli.py report [--config CONFIG]
python cli.py run [--config CONFIG] [--force-reindex] [--no-audio] [--no-llm]
python cli.py export [--config CONFIG] [--format fcpxml|edl|both]
On --no-audio / --no-llm:
These flags do NOT affect matching quality.
Whisper and the LLM only assign narrative labels (HOOK/SETUP/CLIMAX)
to beats in the export metadata. The CV pipeline is identical either way.
Use them for fast iterations: they skip large model downloads.
All heavy imports are deferred so --help is instant.
"""
from __future__ import annotations
import argparse
import json
import logging
import sys
from pathlib import Path
# ---------------------------------------------------------------------------
# Logging setup
# ---------------------------------------------------------------------------
def _setup_logging(level: str = "INFO") -> None:
# Force UTF-8 for Windows console emoji printing
if sys.stdout.encoding != 'utf-8':
sys.stdout.reconfigure(encoding='utf-8')
logging.basicConfig(
format="%(asctime)s %(levelname)-8s %(name)s%(message)s",
datefmt="%H:%M:%S",
level=getattr(logging, level.upper(), logging.INFO),
stream=sys.stdout,
)
logging.getLogger("PIL").setLevel(logging.WARNING)
def _ensure_utf8_console() -> None:
"""Make argparse help safe on Windows before logging is configured."""
if sys.stdout.encoding != "utf-8":
sys.stdout.reconfigure(encoding="utf-8")
# ---------------------------------------------------------------------------
# Cache helpers (match results ↔ JSON)
# ---------------------------------------------------------------------------
def _results_cache_path(cfg: "AppConfig") -> Path: # type: ignore[name-defined]
return cfg.paths.cache_dir / "match_results.json"
def _save_results(results: list, cfg: "AppConfig") -> None: # type: ignore[name-defined]
from src.core.models import MatchResult
data = [
{
"beat_id": r.beat_id,
"scene_id": r.scene_id,
"source_path": str(r.source_path),
"in_point_s": r.in_point_s,
"out_point_s": r.out_point_s,
"in_point_frame": r.in_point_frame,
"match_score": r.match_score,
"match_location": list(r.match_location),
"is_confirmed": r.is_confirmed,
"segments": [
{
"trailer_offset_s": s.trailer_offset_s,
"duration_s": s.duration_s,
"scene_id": s.scene_id,
"in_point_s": s.in_point_s,
"out_point_s": s.out_point_s,
"match_score": s.match_score,
"is_confirmed": s.is_confirmed,
}
for s in getattr(r, "segments", ())
],
}
for r in results
]
p = _results_cache_path(cfg)
p.parent.mkdir(parents=True, exist_ok=True)
p.write_text(json.dumps(data, indent=2), encoding="utf-8")
logging.getLogger(__name__).info("Match results cached → %s", p)
def _auto_commit_push_reports(project_root: "Path") -> None: # type: ignore[name-defined]
"""Stage changed report files, commit, and push to origin.
Only touches report output files — never stages source or config changes.
Failures are logged but never propagate.
"""
import subprocess as _sp
from datetime import datetime as _dt
report_globs = [
"CUTTER_REPORT.html",
"CUTTER_REPORT.md",
"output/report/match_report.html",
"output/report/beat_*_compare.mp4",
"output/report/beat_*_src.mp4",
"output/report/beat_*_ref.mp4",
"output/cutter_clips/beat_*_compare.mp4",
"output/cutter_clips/beat_*_source.mp4",
"output/cutter_clips/beat_*_source_seg*.mp4",
"output/cutter_clips/beat_*_trailer.mp4",
"output/cutter_stills/beat_*_source.jpg",
"output/cutter_stills/beat_*_trailer.jpg",
]
log = logging.getLogger(__name__)
cwd = str(project_root)
try:
for pattern in report_globs:
_sp.run(["git", "add", "--", pattern], capture_output=True, cwd=cwd)
status = _sp.run(
["git", "status", "--porcelain"], capture_output=True, text=True, cwd=cwd
)
if not status.stdout.strip():
log.info("Auto-commit: nothing changed in report files.")
return
now = _dt.now().strftime("%Y-%m-%d %H:%M")
msg = f"Auto-update cutter report {now}\n\nCo-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>"
_sp.run(["git", "commit", "-m", msg], capture_output=True, cwd=cwd, check=True)
_sp.run(["git", "push", "origin", "main"], capture_output=True, cwd=cwd, check=True)
log.info("Auto-commit+push: cutter report updated → remote.")
except Exception as exc:
log.warning("Auto-commit/push failed (non-fatal): %s", exc)
def _regenerate_cutter_report(cfg: "AppConfig") -> None: # type: ignore[name-defined]
"""Re-render CUTTER_REPORT.{md,html} with Frame-Locked Compare clips.
Called from every match-style command after the cache is written so all
cutter-facing artefacts stay in sync with `match_results.json`.
After rendering, stages and pushes changed report files to the remote.
Failures are logged but never abort the run.
"""
project_root = cfg.paths.cache_dir.parent
try:
from scripts.generate_cutter_report import render_report
md, html = render_report(project_root, with_stills=True, with_clips=True)
(project_root / "CUTTER_REPORT.md").write_text(md, encoding="utf-8")
(project_root / "CUTTER_REPORT.html").write_text(html, encoding="utf-8")
legacy_report_path = project_root / "output" / "report" / "match_report.html"
legacy_report_path.parent.mkdir(parents=True, exist_ok=True)
legacy_report_path.write_text(html, encoding="utf-8")
logging.getLogger(__name__).info("Cutter report regenerated (md + html + compare clips + legacy match_report.html)")
except Exception as exc:
logging.getLogger(__name__).warning("Cutter report regen failed: %s", exc)
_auto_commit_push_reports(project_root)
def _load_results(cfg: "AppConfig") -> list: # type: ignore[name-defined]
from src.core.models import MatchResult, MatchSegment
p = _results_cache_path(cfg)
if not p.exists():
raise FileNotFoundError(f"No cached results at {p}. Run 'match' first.")
raw = json.loads(p.read_text(encoding="utf-8"))
return [
MatchResult(
beat_id=d["beat_id"],
scene_id=d["scene_id"],
source_path=Path(d["source_path"]),
in_point_s=d["in_point_s"],
out_point_s=d["out_point_s"],
in_point_frame=d["in_point_frame"],
match_score=d["match_score"],
match_location=tuple(d["match_location"]),
is_confirmed=d.get("is_confirmed", True),
segments=tuple(
MatchSegment(
trailer_offset_s=float(s["trailer_offset_s"]),
duration_s=float(s["duration_s"]),
scene_id=int(s["scene_id"]),
in_point_s=float(s["in_point_s"]),
out_point_s=float(s["out_point_s"]),
match_score=float(s["match_score"]),
is_confirmed=bool(s.get("is_confirmed", True)),
)
for s in d.get("segments", ())
),
)
for d in raw
]
def _load_scene_cache_light(cfg) -> list[dict]:
p = cfg.paths.cache_dir / "scene_index.json"
if not p.exists():
return []
return json.loads(p.read_text(encoding="utf-8"))
def _scene_fps_light(scene: dict, cfg) -> float:
duration_s = max(0.0, float(scene["end_s"]) - float(scene["start_s"]))
frame_count = max(0, int(scene["end_frame"]) - int(scene["start_frame"]))
return frame_count / duration_s if duration_s > 0 and frame_count > 0 else cfg.export.edl_frame_rate
def _scene_for_time_light(scenes: list[dict], t_sec: float, cfg) -> dict | None:
for idx, scene in enumerate(scenes):
if float(scene["start_s"]) <= t_sec < float(scene["end_s"]):
if (
float(scene["end_s"]) - t_sec <= cfg.cv.deep_scan.scene_boundary_epsilon_s
and idx + 1 < len(scenes)
):
return scenes[idx + 1]
return scene
return None
def _scene_by_id_light(scenes: list[dict], scene_id: int) -> dict | None:
return next((s for s in scenes if int(s["scene_id"]) == scene_id), None)
def _contiguous_duration_light(beat, in_point_s: float, scenes: list[dict], cfg, matchable_duration_s: float) -> float:
if matchable_duration_s <= 0:
return 0.0
try:
from src.cv.global_scan import _reference_internal_cut_offsets
cut_offsets = _reference_internal_cut_offsets(beat, cfg)
except Exception:
cut_offsets = []
start_idx = None
for idx, scene in enumerate(scenes):
if float(scene["start_s"]) <= in_point_s < float(scene["end_s"]):
start_idx = idx
break
if start_idx is None:
return 0.0
target_end = in_point_s + matchable_duration_s
current_end = in_point_s
for scene in scenes[start_idx:]:
scene_end = float(scene["end_s"])
if target_end <= scene_end:
return matchable_duration_s
boundary_offset = scene_end - in_point_s
if not any(
abs(boundary_offset - cut_offset) <= cfg.vision.multi_shot_boundary_tolerance_s
for cut_offset in cut_offsets
):
tail_s = max(0.0, cfg.cv.deep_scan.trim_tail_frames / _scene_fps_light(scene, cfg))
return max(0.0, scene_end - in_point_s - tail_s)
current_end = scene_end
return max(0.0, current_end - in_point_s)
def _normalize_cached_results(beats: list, results: list, cfg) -> list:
"""
Re-apply current generic timing rules to cached results.
This keeps old automatic cache entries from preserving obsolete scene-boundary
or tail-trim behavior without introducing manual per-beat truth.
"""
from dataclasses import replace
scenes = _load_scene_cache_light(cfg)
if not scenes:
return results
beats_by_id = {b.beat_id: b for b in beats}
normalized = []
for result in results:
beat = beats_by_id.get(result.beat_id)
if getattr(result, "segments", ()):
segment_duration = sum(max(0.0, float(s.duration_s)) for s in result.segments)
weighted_score = (
sum(max(0.0, float(s.duration_s)) * float(s.match_score) for s in result.segments)
/ segment_duration
if segment_duration > 0 else result.match_score
)
if weighted_score < cfg.cv.deep_scan.provisional_match_threshold:
continue
if beat is not None and beat.duration_s > 0:
visible_duration = sum(
max(0.0, end_s - start_s)
for start_s, end_s in _reference_scoreable_segments(beat, cfg)
)
coverage_target = visible_duration if visible_duration > 0 else beat.duration_s
coverage = segment_duration / coverage_target
if coverage < cfg.cv.deep_scan.min_duration_coverage:
continue
normalized.append(replace(result, match_score=weighted_score))
continue
if result.match_score < cfg.cv.deep_scan.provisional_match_threshold:
continue
scene = _scene_for_time_light(scenes, result.in_point_s, cfg)
declared_scene = _scene_by_id_light(scenes, result.scene_id)
# If the automatic matcher selected a scene but its in-point sits just
# before that scene's detected start, treat this as scene-boundary drift
# and clamp to the declared scene. This is generic: no beat IDs, no
# manual timestamps, just consistent scene/time reconciliation.
if declared_scene is not None:
declared_start = float(declared_scene["start_s"])
declared_end = float(declared_scene["end_s"])
declared_fps = _scene_fps_light(declared_scene, cfg)
boundary_tolerance_s = (
cfg.cv.deep_scan.scene_boundary_epsilon_s
+ cfg.cv.deep_scan.start_preroll_frames / declared_fps
)
if declared_start - boundary_tolerance_s <= result.in_point_s < declared_end:
scene = declared_scene
if beat is None or scene is None:
normalized.append(result)
continue
fps = _scene_fps_light(scene, cfg)
adjusted_in_s = result.in_point_s
scene_changed = int(scene["scene_id"]) != result.scene_id
starts_before_scene = result.in_point_s < float(scene["start_s"])
if scene_changed or starts_before_scene or result.duration_s <= 0.12:
adjusted_in_s = max(0.0, result.in_point_s - (cfg.cv.deep_scan.start_preroll_frames / fps))
adjusted_in_s = max(float(scene["start_s"]), adjusted_in_s)
scene = _scene_for_time_light(scenes, adjusted_in_s, cfg) or scene
fps = _scene_fps_light(scene, cfg)
matchable_duration_s = beat.duration_s
try:
from src.cv.global_scan import estimate_matchable_reference_duration
matchable_duration_s = estimate_matchable_reference_duration(beat, cfg)
except Exception:
pass
tail_s = max(0.0, cfg.cv.deep_scan.trim_tail_frames / fps)
single_scene_duration_s = max(0.0, min(beat.duration_s, float(scene["end_s"]) - adjusted_in_s) - tail_s)
contiguous_duration_s = _contiguous_duration_light(
beat,
adjusted_in_s,
scenes,
cfg,
matchable_duration_s,
)
max_duration_s = max(single_scene_duration_s, min(beat.duration_s, contiguous_duration_s))
normalized_result = result
if (
scene_changed
or starts_before_scene
or result.duration_s <= 0.12
or result.out_point_s > adjusted_in_s + max_duration_s + (1.0 / fps)
):
normalized_result = replace(
result,
scene_id=int(scene["scene_id"]),
in_point_s=adjusted_in_s,
out_point_s=adjusted_in_s + max_duration_s,
in_point_frame=int(adjusted_in_s * fps),
)
coverage = (
max(0.0, normalized_result.duration_s) / matchable_duration_s
if matchable_duration_s > 0 else 0.0
)
if coverage < cfg.cv.deep_scan.min_duration_coverage:
continue
try:
from src.cv.content_align import align_cached_match_by_content
_, content_score = align_cached_match_by_content(
beat,
normalized_result.in_point_s,
cfg,
search_window_s=min(0.8, cfg.cv.deep_scan.content_align_window_seconds),
fps=12.5,
)
content_gate = (
cfg.cv.deep_scan.provisional_content_threshold
if normalized_result.is_confirmed
else min(cfg.cv.deep_scan.provisional_content_threshold, cfg.vision.content_threshold)
)
if content_score < content_gate:
continue
if content_score < cfg.cv.deep_scan.match_threshold and normalized_result.is_confirmed:
normalized_result = replace(
normalized_result,
match_score=min(normalized_result.match_score, content_score),
is_confirmed=False,
)
except Exception:
pass
normalized.append(normalized_result)
return normalized
# ---------------------------------------------------------------------------
# Command handlers
# ---------------------------------------------------------------------------
def _build_transcribe_callback(cfg):
"""Return a transcribe_callback closure, or None if audio is disabled."""
from src.audio.transcriber import transcribe_video
def _cb(path, start_s, end_s, offset_s):
return transcribe_video(path, cfg, start_s=start_s, end_s=end_s, time_offset_s=offset_s)
return _cb
def _build_classify_callback(cfg):
"""Return a classify_callback closure."""
from src.llm.dramaturg import classify_beats
def _cb(beats):
return classify_beats(beats, cfg)
return _cb
def cmd_analyze(args: argparse.Namespace, cfg) -> list:
from src.pipeline.trailer_analyzer import analyze_reference_trailer
transcribe_cb = _build_transcribe_callback(cfg) if not args.no_audio else None
classify_cb = _build_classify_callback(cfg) if not args.no_llm else None
beats = analyze_reference_trailer(
cfg,
transcribe_callback=transcribe_cb,
classify_callback=classify_cb,
)
# Persist beats for downstream commands (including histogram bytes as hex)
beats_cache = cfg.paths.cache_dir / "trailer_beats.json"
beats_cache.parent.mkdir(parents=True, exist_ok=True)
beats_data = [
{
"beat_id": b.beat_id,
"start_s": b.start_s,
"end_s": b.end_s,
"start_frame": b.start_frame,
"end_frame": b.end_frame,
"beat_type": b.beat_type.name,
"dialogue": [{"start_s": d.start_s, "end_s": d.end_s, "text": d.text} for d in b.dialogue],
"phash": b.phash,
"luma_hist": b.luma_hist.hex() if b.luma_hist else None,
"sat_hist": b.sat_hist.hex() if b.sat_hist else None,
}
for b in beats
]
beats_cache.write_text(json.dumps(beats_data, indent=2, ensure_ascii=False), encoding="utf-8")
print(f"\n\u2705 {len(beats)} beats analyzed \u2192 {beats_cache}")
return beats
def _load_beats(cfg) -> list:
from src.core.models import BeatType, DialogueLine, TrailerBeat
p = cfg.paths.cache_dir / "trailer_beats.json"
if not p.exists():
raise FileNotFoundError(f"No cached beats at {p}. Run 'analyze' first.")
raw = json.loads(p.read_text(encoding="utf-8"))
beats = []
for d in raw:
dialogue = tuple(
DialogueLine(start_s=x["start_s"], end_s=x["end_s"], text=x["text"])
for x in d.get("dialogue", [])
)
beats.append(TrailerBeat(
beat_id=d["beat_id"],
trailer_path=cfg.paths.reference_trailer,
start_s=d["start_s"],
end_s=d["end_s"],
start_frame=d["start_frame"],
end_frame=d["end_frame"],
beat_type=BeatType[d.get("beat_type", "UNKNOWN")],
dialogue=dialogue,
phash=d.get("phash"),
luma_hist=bytes.fromhex(d["luma_hist"]) if d.get("luma_hist") else None,
sat_hist= bytes.fromhex(d["sat_hist"]) if d.get("sat_hist") else None,
))
return beats
def _select_beats(beats: list, beat_id: int | None) -> list:
"""Return all beats or exactly one requested beat."""
if beat_id is None:
return beats
selected = [b for b in beats if b.beat_id == beat_id]
if not selected:
raise ValueError(f"Beat {beat_id} not found. Run 'analyze' first.")
return selected
def _select_results(results: list, beat_ids: set[int] | None) -> list:
"""Return all results or only results for the requested beats."""
if beat_ids is None:
return results
return [r for r in results if r.beat_id in beat_ids]
def _find_scene_for_in_point(cfg, in_point_s: float):
from src.cv.scene_indexer import build_scene_index
scenes = build_scene_index(cfg)
for idx, scene in enumerate(scenes):
if scene.start_s <= in_point_s < scene.end_s:
if (
scene.end_s - in_point_s <= cfg.cv.deep_scan.scene_boundary_epsilon_s
and idx + 1 < len(scenes)
):
return scenes[idx + 1]
return scene
return None
def _reference_scoreable_segments(beat, cfg) -> list[tuple[float, float]]:
"""Find visible source-matchable islands inside a trailer beat."""
from src.cv.frame_extractor import grab_frame_at_path
from src.cv.global_scan import (
_corr_same_size,
_is_scoreable_reference_frame,
_prepare_haystack,
_reference_visibility_stats,
)
def is_visible(frame) -> bool:
if frame is None:
return False
mean_luma, p90_luma, contrast = _reference_visibility_stats(frame, cfg)
visible_luma = (
mean_luma >= cfg.cv.deep_scan.scoreable_luma_mean_min * 0.45
or p90_luma >= cfg.cv.deep_scan.scoreable_luma_p90_min * 0.50
)
visible_contrast = contrast >= max(8.0, cfg.cv.deep_scan.scoreable_contrast_min * 0.30)
return visible_luma and visible_contrast
step_s = max(0.08, cfg.cv.deep_scan.span_sample_step_s)
min_segment_s = max(0.32, step_s * 3.0)
bridge_gap_s = max(0.18, step_s * 2.0)
raw: list[tuple[float, float]] = []
start: float | None = None
last_seen: float | None = None
t = 0.0
while t <= beat.duration_s:
frame = grab_frame_at_path(beat.trailer_path, beat.start_s + t)
scoreable = frame is not None and _is_scoreable_reference_frame(frame, cfg)
if scoreable:
if start is None:
start = t
last_seen = t
elif start is not None and last_seen is not None and t - last_seen > bridge_gap_s:
end = min(beat.duration_s, last_seen + step_s)
if end - start >= min_segment_s:
raw.append((start, end))
start = None
last_seen = None
t = round(t + step_s, 6)
if start is not None and last_seen is not None:
end = min(beat.duration_s, last_seen + step_s)
if end - start >= min_segment_s:
raw.append((start, end))
expanded: list[tuple[float, float]] = []
same_shot_corr_min = 0.72
for start_s, end_s in raw:
start_anchor = grab_frame_at_path(beat.trailer_path, beat.start_s + start_s)
end_anchor = grab_frame_at_path(beat.trailer_path, beat.start_s + max(start_s, end_s - step_s))
start_feature = _prepare_haystack(start_anchor, cfg) if start_anchor is not None else None
end_feature = _prepare_haystack(end_anchor, cfg) if end_anchor is not None else None
soft_start = start_s
t = round(start_s - step_s, 6)
while t >= 0.0:
frame = grab_frame_at_path(beat.trailer_path, beat.start_s + t)
if not is_visible(frame):
break
if start_feature is not None and _corr_same_size(_prepare_haystack(frame, cfg), start_feature) < same_shot_corr_min:
break
soft_start = max(0.0, t)
t = round(t - step_s, 6)
soft_end = end_s
t = round(end_s, 6)
while t <= beat.duration_s + 1e-6:
frame = grab_frame_at_path(beat.trailer_path, beat.start_s + t)
if not is_visible(frame):
break
if end_feature is not None and _corr_same_size(_prepare_haystack(frame, cfg), end_feature) < same_shot_corr_min:
break
soft_end = min(beat.duration_s, t + step_s)
t = round(t + step_s, 6)
if soft_end - soft_start >= min_segment_s:
expanded.append((soft_start, soft_end))
merged: list[tuple[float, float]] = []
for start_s, end_s in expanded:
if merged and start_s - merged[-1][1] <= bridge_gap_s:
merged[-1] = (merged[-1][0], max(merged[-1][1], end_s))
else:
merged.append((start_s, end_s))
return merged
def _fade_content_shots(beat, cfg) -> list[tuple[float, float]]:
"""Find low-luma fade regions adjacent to visible islands that still carry
describable content (e.g. a hand+knife silhouette during a cross-fade).
These regions are too dark for CV template matching but vision can read
structure during the fade — the matcher therefore treats them as their
own shots and routes them through the vision-led search path.
A fade region qualifies when, sampled inside the region, the brightest
frame has p90 ≥ 12 (not pure black) and contrast ≥ 8 (some structure)
AND the region duration is ≥ 0.2 s. Pure-black/featureless fades stay
excluded.
"""
from src.cv.frame_extractor import grab_frame_at_path
from src.cv.global_scan import _reference_visibility_stats
islands = _reference_scoreable_segments(beat, cfg)
if not islands:
return []
step_s = max(0.04, cfg.cv.deep_scan.span_sample_step_s)
min_fade_s = 0.2
def has_content(start_s: float, end_s: float) -> bool:
if end_s - start_s < min_fade_s:
return False
peak_p90 = 0.0
peak_contrast = 0.0
t = start_s
while t < end_s:
frame = grab_frame_at_path(beat.trailer_path, beat.start_s + t)
if frame is not None:
_, p90, contrast = _reference_visibility_stats(frame, cfg)
peak_p90 = max(peak_p90, p90)
peak_contrast = max(peak_contrast, contrast)
t = round(t + step_s, 6)
return peak_p90 >= 12.0 and peak_contrast >= 8.0
fades: list[tuple[float, float]] = []
# Between-island fades only: these are genuine cross-fade silhouettes
# (one visible shot dissolves into another through a dim middle frame).
# Pre-island fades are fade-from-black leaders; post-island fades are
# fade-to-black trailers — neither is a source-matchable shot on its own.
for prev_isl, next_isl in zip(islands, islands[1:]):
gap_start, gap_end = prev_isl[1], next_isl[0]
if has_content(gap_start, gap_end):
fades.append((gap_start, gap_end))
return fades
def _reference_shot_segments(beat, cfg) -> list[tuple[float, float]]:
"""Source-matchable shot ranges inside a trailer beat.
Returns a sorted list of (start_s, end_s) tuples covering:
* each visible island, further split at internal hard cuts;
* each fade region adjacent to an island that still carries
describable content (e.g. a silhouette during a cross-fade) —
these get matched via the vision-led search path because CV
templates against the dark frames are unusable.
Tiny sub-shots are merged so noisy cut detection doesn't fragment a
real shot into useless slivers.
"""
from src.cv.global_scan import _reference_internal_cut_offsets
islands = _reference_scoreable_segments(beat, cfg)
try:
cut_offsets = sorted(_reference_internal_cut_offsets(beat, cfg))
except Exception:
cut_offsets = []
fade_shots = _fade_content_shots(beat, cfg)
if not cut_offsets and not fade_shots:
return islands
min_shot_s = max(0.4, cfg.cv.deep_scan.span_sample_step_s * 4.0)
shots: list[tuple[float, float]] = []
for start_s, end_s in islands:
boundaries = [start_s]
for cut in cut_offsets:
if start_s + 1e-3 < cut < end_s - 1e-3:
boundaries.append(cut)
boundaries.append(end_s)
for i in range(len(boundaries) - 1):
seg_start = boundaries[i]
seg_end = boundaries[i + 1]
if seg_end - seg_start < min_shot_s and shots and shots[-1][1] >= seg_start - 1e-3:
# merge into previous if the new piece is too short
shots[-1] = (shots[-1][0], seg_end)
elif seg_end - seg_start >= min_shot_s:
shots.append((seg_start, seg_end))
elif shots:
shots[-1] = (shots[-1][0], seg_end)
else:
shots.append((seg_start, seg_end))
# Add fade-content shots (cross-fade silhouettes / dim shot boundaries)
# sorted with the visible-island shots so the matcher sees them in
# trailer-time order.
if fade_shots:
all_shots = sorted(list(shots) + list(fade_shots), key=lambda iv: iv[0])
# Drop overlaps in case a fade region brushes against an island
# by a few frames; the island wins.
cleaned: list[tuple[float, float]] = []
for s, e in all_shots:
if cleaned and s < cleaned[-1][1]:
if e > cleaned[-1][1]:
cleaned.append((cleaned[-1][1], e))
continue
cleaned.append((s, e))
return cleaned
return shots if shots else islands
def _trim_beats_to_single_visual_island(beats: list, cfg) -> tuple[list, dict[int, tuple[float, float]]]:
"""Use a single visible island as the primary match target for faded beats."""
from dataclasses import replace
trimmed = []
trims: dict[int, tuple[float, float]] = {}
frame_s = 1.0 / max(1.0, float(cfg.export.edl_frame_rate))
for beat in beats:
islands = _reference_scoreable_segments(beat, cfg)
if len(islands) == 1:
start_s, end_s = islands[0]
island_duration_s = max(0.0, end_s - start_s)
has_real_trim = (
start_s > frame_s * 1.5
or beat.duration_s - end_s > frame_s * 1.5
)
if island_duration_s > 0.0 and has_real_trim:
trimmed.append(
replace(
beat,
start_s=beat.start_s + start_s,
end_s=beat.start_s + end_s,
)
)
trims[beat.beat_id] = (start_s, island_duration_s)
continue
trimmed.append(beat)
return trimmed, trims
def _apply_single_island_segments(results: list, trims: dict[int, tuple[float, float]]) -> list:
"""Restore beat-relative segment metadata after matching a trimmed island."""
if not trims:
return results
from dataclasses import replace
from src.core.models import MatchSegment
expanded = []
for result in results:
trim = trims.get(result.beat_id)
if trim is None or getattr(result, "segments", ()):
expanded.append(result)
continue
trailer_offset_s, island_duration_s = trim
duration_s = min(max(0.0, island_duration_s), max(0.0, result.duration_s))
segment = MatchSegment(
trailer_offset_s=trailer_offset_s,
duration_s=duration_s,
scene_id=result.scene_id,
in_point_s=result.in_point_s,
out_point_s=result.in_point_s + duration_s,
match_score=result.match_score,
is_confirmed=result.is_confirmed,
)
expanded.append(
replace(
result,
out_point_s=result.in_point_s + duration_s,
segments=(segment,),
)
)
return expanded
def _keeps_cached_match(old, new, cfg) -> bool:
"""Return True when the old cached match is better than the new one and should be kept.
Specifically protects multi-segment provisional matches from being replaced
by a weaker single-span result. The old entry wins when it has segments
(explicitly tuned multi-shot layout) and the new result has none AND is not
a score improvement.
"""
if old is None or new is None:
return False
old_segs = getattr(old, "segments", ()) or ()
new_segs = getattr(new, "segments", ()) or ()
if old_segs and not new_segs and new.match_score <= old.match_score:
return True
return False
def _merge_best_results(existing: list, candidates: list, cfg) -> list:
"""Merge matches by beat, preferring confirmed or higher-scoring results."""
by_id = {r.beat_id: r for r in existing}
for candidate in candidates:
old = by_id.get(candidate.beat_id)
if old is None:
by_id[candidate.beat_id] = candidate
continue
candidate_confirmed = candidate.match_score >= cfg.cv.deep_scan.match_threshold or candidate.is_confirmed
old_confirmed = old.match_score >= cfg.cv.deep_scan.match_threshold or old.is_confirmed
if (
candidate_confirmed and not old_confirmed
or candidate.match_score > old.match_score + cfg.cv.deep_scan.duration_tie_break_score_delta
or (
candidate.match_score >= old.match_score - cfg.cv.deep_scan.duration_tie_break_score_delta
and candidate.duration_s > old.duration_s
)
):
by_id[candidate.beat_id] = candidate
return sorted(by_id.values(), key=lambda r: r.beat_id)
def _recover_unmatched_beats_via_vision(results: list, beats: list, cfg) -> list:
"""Try a vision-led search for beats that ended up without a match.
For each unmatched beat that has scoreable visual content (i.e. not pure
fade/title-card material), this pass:
1. Asks the vibe-check (CV histogram + pHash) for the top-K candidate
scenes.
2. For each candidate, runs the semantic action-window search with the
beat's own description, prefering windows whose phase matches the
visible part of the beat.
3. Refines the in-point with the regular CV content/motion aligner.
4. Validates the resulting window with the vision phase check, exactly
like the main filter.
5. Adds the best validated candidate as a provisional MatchResult.
Confirmed and provisional matches both stay subject to the same thresholds
used elsewhere; this only adds matches that pass the same quality gates.
"""
if not cfg.vision.enabled or not beats:
return results
from dataclasses import replace
from src.cv.global_scan import align_in_point_by_content_and_motion, estimate_usable_source_duration
from src.cv.scene_indexer import build_scene_index
from src.cv.vibe_check import run_vibe_check
from src.core.models import MatchResult
from src.llm.vision_cache import find_action_window_in_scene, validate_match_window_with_vision
logger = logging.getLogger(__name__)
matched_ids = {r.beat_id for r in results}
unmatched = [b for b in beats if b.beat_id not in matched_ids]
if not unmatched:
return results
scenes = build_scene_index(cfg)
if not scenes:
return results
new_results = list(results)
for beat in unmatched:
try:
islands = _reference_scoreable_segments(beat, cfg)
except Exception:
islands = []
# Anchor selection: prefer the longest visible island; if none exists,
# fall back to the full beat. The latter handles dark / low-contrast
# close-ups that drop below the scoreable luma/contrast thresholds but
# are still semantically describable. The strict vision phase
# validation later in this pass keeps us from accepting pure title-card
# or logo material.
from dataclasses import replace as _replace
if islands:
anchor_start_s, anchor_end_s = max(islands, key=lambda iv: iv[1] - iv[0])
anchor_beat = _replace(
beat,
start_s=beat.start_s + anchor_start_s,
end_s=beat.start_s + anchor_end_s,
)
else:
anchor_beat = beat
try:
hits = run_vibe_check(
beat,
scenes,
top_k=max(cfg.cv.deep_scan.scene_seed_top_k, cfg.cv.vibe_check.top_k_candidates),
hist_method=cfg.cv.vibe_check.hist_compare_method,
phash_max_distance=64,
)
except Exception as exc:
logger.warning("Beat %d: recovery vibe-check failed (%s)", beat.beat_id, exc)
continue
scenes_by_id = {s.scene_id: s for s in scenes}
best = None # (score, scene, in_s, dur_s, reason)
seen = set()
for hit in hits[: cfg.cv.deep_scan.scene_seed_top_k]:
scene = scenes_by_id.get(hit.scene_id)
if scene is None or scene.scene_id in seen:
continue
seen.add(scene.scene_id)
try:
found = find_action_window_in_scene(anchor_beat, scene, cfg)
except Exception as exc:
logger.debug("Beat %d: action window failed for scene %d (%s)", beat.beat_id, scene.scene_id, exc)
continue
if found is None:
continue
start_s, end_s, semantic_score, reason = found
window_s = max(3.0, min(8.0, (end_s - start_s) * 4.0))
try:
aligned_in_s, combined_score, content_score, motion_score = align_in_point_by_content_and_motion(
anchor_beat,
start_s,
cfg,
search_window_s=window_s,
)
except Exception as exc:
logger.debug("Beat %d: align failed for scene %d (%s)", beat.beat_id, scene.scene_id, exc)
continue
aligned_in_s = max(scene.start_s, min(aligned_in_s, max(scene.start_s, scene.end_s - anchor_beat.duration_s)))
try:
usable_duration_s, usable_score = estimate_usable_source_duration(anchor_beat, aligned_in_s, cfg)
except Exception:
usable_duration_s, usable_score = anchor_beat.duration_s, 0.0
usable_duration_s = max(0.0, min(anchor_beat.duration_s, usable_duration_s))
if usable_duration_s < max(0.32, anchor_beat.duration_s * 0.45):
usable_duration_s = anchor_beat.duration_s
try:
ok, verify_reason = validate_match_window_with_vision(
anchor_beat,
source_path=scene.source_path,
scene_id=scene.scene_id,
in_point_s=aligned_in_s,
out_point_s=aligned_in_s + usable_duration_s,
cfg=cfg,
)
except Exception as exc:
logger.debug("Beat %d: validate failed scene=%d (%s)", beat.beat_id, scene.scene_id, exc)
continue
if not ok:
continue
final_score = max(
combined_score,
min(0.99, semantic_score * 0.65 + motion_score * 0.18 + content_score * 0.09 + usable_score * 0.08),
)
if final_score < cfg.cv.deep_scan.provisional_match_threshold:
continue
candidate = (final_score, scene, aligned_in_s, usable_duration_s, f"recovery; {reason}; {verify_reason}")
if best is None or candidate[0] > best[0]:
best = candidate
if best is None:
continue
score, scene, aligned_in_s, usable_duration_s, repair_reason = best
logger.info(
"Beat %d: recovered via vision action search scene=%d in=%.3fs score=%.3f (%s)",
beat.beat_id,
scene.scene_id,
aligned_in_s,
score,
repair_reason,
)
new_results.append(MatchResult(
beat_id=beat.beat_id,
scene_id=scene.scene_id,
source_path=scene.source_path,
in_point_s=aligned_in_s,
out_point_s=aligned_in_s + usable_duration_s,
in_point_frame=int(aligned_in_s * cfg.export.edl_frame_rate),
match_score=score,
match_location=(0, 0),
is_confirmed=score >= cfg.cv.deep_scan.match_threshold,
segments=tuple(),
))
return sorted(new_results, key=lambda r: r.beat_id)
def _filter_semantically_invalid_vision_matches(results: list, beats: list, cfg) -> list:
"""Drop vision-enabled matches whose final action phase contradicts the beat."""
if not cfg.vision.enabled or not results:
return results
from dataclasses import replace
from src.llm.vision_cache import find_action_window_in_scene, validate_match_window_with_vision
from src.cv.scene_indexer import build_scene_index
from src.cv.global_scan import align_in_point_by_content_and_motion, estimate_usable_source_duration
logger = logging.getLogger(__name__)
beats_by_id = {beat.beat_id: beat for beat in beats}
scenes_by_id = {scene.scene_id: scene for scene in build_scene_index(cfg)}
def visible_content_offset(action_beat, segment_start_offset_s: float) -> float:
content_offset_s = 0.0
for start_s, end_s in _reference_scoreable_segments(action_beat, cfg):
if end_s <= segment_start_offset_s:
content_offset_s += max(0.0, end_s - start_s)
elif start_s < segment_start_offset_s:
content_offset_s += max(0.0, segment_start_offset_s - start_s)
break
else:
break
return content_offset_s
def realign_window(check_beat, scene_id: int, action_beat=None):
scene = scenes_by_id.get(scene_id)
if scene is None:
return None
segment_window = find_action_window_in_scene(check_beat, scene, cfg)
if action_beat is not None and action_beat is not check_beat:
beat_window = find_action_window_in_scene(action_beat, scene, cfg)
else:
beat_window = None
use_beat_context = False
if segment_window is None:
found = beat_window
use_beat_context = beat_window is not None
elif beat_window is None:
found = segment_window
elif beat_window[2] > segment_window[2] + 0.06:
found = beat_window
use_beat_context = True
else:
found = segment_window
if found is None:
return None
start_s, end_s, semantic_score, reason = found
if use_beat_context:
segment_start_offset_s = max(0.0, check_beat.start_s - action_beat.start_s)
content_offset_s = visible_content_offset(action_beat, segment_start_offset_s)
start_s += content_offset_s
end_s += content_offset_s
window_s = max(3.0, min(8.0, (end_s - start_s) * 4.0))
aligned_in_s, combined_score, content_score, motion_score = align_in_point_by_content_and_motion(
check_beat,
start_s,
cfg,
search_window_s=window_s,
)
aligned_in_s = max(scene.start_s, min(aligned_in_s, max(scene.start_s, scene.end_s - check_beat.duration_s)))
usable_duration_s, usable_score = estimate_usable_source_duration(check_beat, aligned_in_s, cfg)
usable_duration_s = max(0.0, min(check_beat.duration_s, usable_duration_s))
if usable_duration_s < max(0.32, check_beat.duration_s * 0.45):
usable_duration_s = check_beat.duration_s
ok, verify_reason = validate_match_window_with_vision(
check_beat,
source_path=scene.source_path,
scene_id=scene.scene_id,
in_point_s=aligned_in_s,
out_point_s=aligned_in_s + usable_duration_s,
cfg=cfg,
)
if not ok:
logger.info(
"Beat %d: action-window realign rejected scene=%d in=%.3fs (%s)",
check_beat.beat_id,
scene.scene_id,
aligned_in_s,
verify_reason,
)
return None
score = max(
combined_score,
min(0.99, semantic_score * 0.65 + motion_score * 0.18 + content_score * 0.09 + usable_score * 0.08),
)
return scene, aligned_in_s, usable_duration_s, score, f"{reason}; {verify_reason}"
kept = []
for result in results:
beat = beats_by_id.get(result.beat_id)
if beat is None:
kept.append(result)
continue
kept_before = len(kept)
try:
_filter_repair_one(result, beat, beats_by_id, scenes_by_id, kept, cfg, realign_window, validate_match_window_with_vision, logger)
except Exception as exc:
logger.warning(
"Beat %d: vision filter/repair failed (%s); keeping previous cached match.",
result.beat_id,
exc,
)
del kept[kept_before:]
kept.append(result)
return kept
def _filter_repair_one(result, beat, beats_by_id, scenes_by_id, kept, cfg, realign_window, validate_match_window_with_vision, logger):
from dataclasses import replace
if True:
windows = []
if getattr(result, "segments", ()):
for segment in result.segments:
segment_beat = replace(
beat,
start_s=beat.start_s + segment.trailer_offset_s,
end_s=beat.start_s + segment.trailer_offset_s + segment.duration_s,
)
windows.append((
segment_beat,
segment.scene_id,
segment.in_point_s,
segment.out_point_s,
))
else:
windows.append((beat, result.scene_id, result.in_point_s, result.out_point_s))
valid = True
reasons: list[str] = []
for check_beat, scene_id, in_point_s, out_point_s in windows:
ok, reason = validate_match_window_with_vision(
check_beat,
source_path=result.source_path,
scene_id=scene_id,
in_point_s=in_point_s,
out_point_s=out_point_s,
cfg=cfg,
)
reasons.append(reason)
if not ok:
valid = False
break
if valid:
repaired = False
if getattr(result, "segments", ()):
new_segments = []
repair_reasons = []
changed = False
for segment in result.segments:
scene = scenes_by_id.get(segment.scene_id)
# Allow phase-realign whenever the scene has any meaningful
# slack beyond the segment, not only for "long" scenes.
# Short scenes don't need realigning because the segment
# essentially is the scene.
if scene is None or scene.duration_s <= segment.duration_s + 0.5:
new_segments.append(segment)
continue
# For already-confirmed segments, skip the realign to avoid
# destabilizing a strong original match.
if segment.is_confirmed and scene.duration_s <= max(segment.duration_s * 1.6, 6.0):
new_segments.append(segment)
continue
segment_beat = replace(
beat,
start_s=beat.start_s + segment.trailer_offset_s,
end_s=beat.start_s + segment.trailer_offset_s + segment.duration_s,
)
repair = realign_window(segment_beat, segment.scene_id, action_beat=beat)
if repair is None:
new_segments.append(segment)
continue
repair_scene, aligned_in_s, usable_duration_s, score, repair_reason = repair
if abs(aligned_in_s - segment.in_point_s) <= 1.0 / cfg.export.edl_frame_rate:
new_segments.append(segment)
continue
# Don't commit a repair that scores meaningfully worse than
# the original; phase realign should improve, not regress.
if score < segment.match_score - 0.02:
new_segments.append(segment)
continue
changed = True
repair_reasons.append(repair_reason)
new_segments.append(replace(
segment,
scene_id=repair_scene.scene_id,
in_point_s=aligned_in_s,
out_point_s=aligned_in_s + usable_duration_s,
duration_s=usable_duration_s,
match_score=score,
is_confirmed=score >= cfg.cv.deep_scan.match_threshold,
))
if changed and new_segments:
first = new_segments[0]
repaired_score = min(seg.match_score for seg in new_segments)
logger.info(
"Beat %d: realigned semantically valid long scene by motion/action windows (%s)",
result.beat_id,
"; ".join(repair_reasons),
)
kept.append(replace(
result,
scene_id=first.scene_id,
in_point_s=first.in_point_s,
out_point_s=first.out_point_s,
in_point_frame=int(first.in_point_s * cfg.export.edl_frame_rate),
match_score=repaired_score,
is_confirmed=repaired_score >= cfg.cv.deep_scan.match_threshold,
segments=tuple(new_segments),
))
repaired = True
else:
scene = scenes_by_id.get(result.scene_id)
wide_scene = (
scene is not None
and scene.duration_s > result.duration_s + 0.5
)
already_confirmed_in_tight_scene = (
result.is_confirmed
and scene is not None
and scene.duration_s <= max(result.duration_s * 1.6, 6.0)
)
if wide_scene and not already_confirmed_in_tight_scene:
repair = realign_window(beat, result.scene_id)
if repair is not None:
repair_scene, aligned_in_s, usable_duration_s, score, repair_reason = repair
moved = abs(aligned_in_s - result.in_point_s) > 1.0 / cfg.export.edl_frame_rate
improved = score >= result.match_score - 0.02
if moved and improved:
logger.info(
"Beat %d: realigned semantically valid long scene by motion/action window (%s)",
result.beat_id,
repair_reason,
)
kept.append(replace(
result,
scene_id=repair_scene.scene_id,
in_point_s=aligned_in_s,
out_point_s=aligned_in_s + usable_duration_s,
in_point_frame=int(aligned_in_s * cfg.export.edl_frame_rate),
match_score=score,
is_confirmed=score >= cfg.cv.deep_scan.match_threshold,
))
repaired = True
if not repaired:
kept.append(result)
else:
if getattr(result, "segments", ()):
new_segments = []
all_repaired = True
repair_reasons = []
for segment in result.segments:
segment_beat = replace(
beat,
start_s=beat.start_s + segment.trailer_offset_s,
end_s=beat.start_s + segment.trailer_offset_s + segment.duration_s,
)
repair = realign_window(segment_beat, segment.scene_id, action_beat=beat)
if repair is None:
all_repaired = False
break
scene, aligned_in_s, usable_duration_s, score, repair_reason = repair
repair_reasons.append(repair_reason)
new_segments.append(replace(
segment,
scene_id=scene.scene_id,
in_point_s=aligned_in_s,
out_point_s=aligned_in_s + usable_duration_s,
duration_s=usable_duration_s,
match_score=score,
is_confirmed=score >= cfg.cv.deep_scan.match_threshold,
))
if all_repaired and new_segments:
first = new_segments[0]
repaired_score = min(seg.match_score for seg in new_segments)
logger.info(
"Beat %d: realigned inside matched scene by vision action windows (%s)",
result.beat_id,
"; ".join(repair_reasons),
)
kept.append(replace(
result,
scene_id=first.scene_id,
in_point_s=first.in_point_s,
out_point_s=first.out_point_s,
in_point_frame=int(first.in_point_s * cfg.export.edl_frame_rate),
match_score=repaired_score,
is_confirmed=repaired_score >= cfg.cv.deep_scan.match_threshold,
segments=tuple(new_segments),
))
return
else:
repair = realign_window(beat, result.scene_id)
if repair is not None:
scene, aligned_in_s, usable_duration_s, score, repair_reason = repair
logger.info(
"Beat %d: realigned inside matched scene by vision action window (%s)",
result.beat_id,
repair_reason,
)
kept.append(replace(
result,
scene_id=scene.scene_id,
in_point_s=aligned_in_s,
out_point_s=aligned_in_s + usable_duration_s,
in_point_frame=int(aligned_in_s * cfg.export.edl_frame_rate),
match_score=score,
is_confirmed=score >= cfg.cv.deep_scan.match_threshold,
))
return
logger.warning(
"Beat %d: rejected by vision action-phase verification (%s)",
result.beat_id,
"; ".join(reasons),
)
def _attach_visual_segments(results: list, beats: list, cfg) -> list:
"""Attach automatic sub-shot matches for multi-island trailer beats."""
from dataclasses import replace
from src.core.models import MatchResult, MatchSegment
from src.cv.global_scan import run_global_scan
by_id = {b.beat_id: b for b in beats}
expanded: list[MatchResult] = []
for result in results:
beat = by_id.get(result.beat_id)
if beat is None:
expanded.append(result)
continue
if getattr(result, "segments", ()):
expanded.append(result)
continue
islands = _reference_scoreable_segments(beat, cfg)
if len(islands) <= 1:
primary = MatchSegment(
trailer_offset_s=0.0,
duration_s=max(0.0, result.duration_s),
scene_id=result.scene_id,
in_point_s=result.in_point_s,
out_point_s=result.out_point_s,
match_score=result.match_score,
is_confirmed=result.is_confirmed,
)
expanded.append(replace(result, segments=(primary,)))
continue
segments: list[MatchSegment] = []
first_start, first_end = islands[0]
first_duration = min(max(0.0, result.duration_s), max(0.0, first_end - first_start))
segments.append(
MatchSegment(
trailer_offset_s=first_start,
duration_s=first_duration,
scene_id=result.scene_id,
in_point_s=result.in_point_s,
out_point_s=result.in_point_s + first_duration,
match_score=result.match_score,
is_confirmed=result.is_confirmed,
)
)
for start_s, end_s in islands[1:]:
segment_beat = replace(
beat,
start_s=beat.start_s + start_s,
end_s=beat.start_s + end_s,
)
segment_matches = run_global_scan([segment_beat], cfg, seed_in_points=None)
if not segment_matches:
continue
seg = segment_matches[0]
seg_dur = min(max(0.0, end_s - start_s), max(0.0, seg.duration_s))
segments.append(
MatchSegment(
trailer_offset_s=start_s,
duration_s=seg_dur,
scene_id=seg.scene_id,
in_point_s=seg.in_point_s,
out_point_s=seg.in_point_s + seg_dur,
match_score=seg.match_score,
is_confirmed=seg.is_confirmed,
)
)
expanded.append(replace(result, segments=tuple(segments)))
return expanded
def _fast_vision_match_cfg(cfg):
"""Return a vision-seed prepass config that still keeps quality settings."""
from dataclasses import replace
return replace(
cfg,
cv=replace(
cfg.cv,
deep_scan=replace(cfg.cv.deep_scan, skip_coarse_scan_with_weighted_seeds=True),
),
vision=replace(
cfg.vision,
fullscan_fallback=False,
),
)
def _run_segment_match(segment_beat, continuity, cfg, allow_fullscan: bool = True):
"""Match one visual island with the same generic staged strategy as a beat."""
from src.pipeline.matcher import run_matching
if cfg.vision.enabled:
fast_cfg = _fast_vision_match_cfg(cfg)
fast_matches = run_matching(
fast_cfg,
[segment_beat],
seed_in_points=continuity,
)
if fast_matches:
if not allow_fullscan or all(
m.is_confirmed or m.match_score >= cfg.cv.deep_scan.match_threshold
for m in fast_matches
):
return fast_matches
if not allow_fullscan:
return fast_matches if cfg.vision.enabled else []
full_matches = run_matching(
cfg,
[segment_beat],
seed_in_points=continuity,
)
return _merge_best_results(fast_matches if cfg.vision.enabled else [], full_matches, cfg)
def _match_unmatched_visual_segments(
results: list,
beats: list,
cached: list,
cfg,
skip_global_segment_scan_for: set[int] | None = None,
) -> list:
"""Create segmented provisional matches when a whole beat has no single match."""
from dataclasses import replace
from src.core.models import MatchResult, MatchSegment
from src.cv.frame_extractor import get_video_info
matched_ids = {r.beat_id for r in results}
expanded = list(results)
skip_global_segment_scan_for = skip_global_segment_scan_for or set()
try:
fps = float(get_video_info(cfg.paths.source_movie)["fps"]) or cfg.export.edl_frame_rate
except Exception:
fps = cfg.export.edl_frame_rate
for beat in beats:
if beat.beat_id in matched_ids:
continue
# Per-shot matching when the beat has either fade-bounded islands
# OR internal hard cuts; each shot becomes its own MatchSegment.
islands = _reference_shot_segments(beat, cfg)
if not islands:
continue
segments: list[MatchSegment] = []
for island_idx, (start_s, end_s) in enumerate(islands):
segment_beat = replace(
beat,
start_s=beat.start_s + start_s,
end_s=beat.start_s + end_s,
)
if island_idx == 0:
# First island of an unmatched multi-shot beat: search globally
# without a continuity bias from the previous beat. Continuity
# assumes the shot follows the previous beat in the source, but
# the lead shot of a multi-shot beat is often an insert cut from
# a completely different scene. A wrong seed with score 0.92
# would push the real match out of the refinement candidate pool.
continuity = {}
else:
continuity = _continuity_seed_in_points(
beat.beat_id,
[b if b.beat_id != beat.beat_id else segment_beat for b in beats],
cached + expanded,
cfg,
)
segment_matches = []
if beat.beat_id not in skip_global_segment_scan_for:
segment_matches = _run_segment_match(segment_beat, continuity, cfg, allow_fullscan=True)
if not segment_matches:
# Fade-content shot fallback: when CV finds no templates
# inside this shot (typical for cross-fade silhouettes), the
# vibe-check + vision-action-window recovery path is the only
# way to get a match. It's slower but works on dark frames
# because vision can read structure where CV cannot.
shot_islands = _reference_scoreable_segments(segment_beat, cfg)
if not shot_islands and cfg.vision.enabled:
recovered = _recover_unmatched_beats_via_vision([], [segment_beat], cfg)
if recovered:
rec = recovered[0]
seg_dur = min(max(0.0, end_s - start_s), max(0.0, rec.duration_s))
if seg_dur > 0:
segments.append(MatchSegment(
trailer_offset_s=start_s,
duration_s=seg_dur,
scene_id=rec.scene_id,
in_point_s=rec.in_point_s,
out_point_s=rec.in_point_s + seg_dur,
match_score=rec.match_score,
is_confirmed=rec.is_confirmed,
))
continue
local_segment = _local_same_scene_segment_match(
segment_beat,
beat,
start_s,
cached + expanded,
cfg,
)
if local_segment is not None:
segments.append(local_segment)
continue
seg = segment_matches[0]
seg_dur = min(max(0.0, end_s - start_s), max(0.0, seg.duration_s))
segments.append(
MatchSegment(
trailer_offset_s=start_s,
duration_s=seg_dur,
scene_id=seg.scene_id,
in_point_s=seg.in_point_s,
out_point_s=seg.in_point_s + seg_dur,
match_score=seg.match_score,
is_confirmed=seg.is_confirmed,
)
)
if not segments:
continue
first = segments[0]
total_segment_duration = sum(max(0.0, s.duration_s) for s in segments)
score = (
sum(max(0.0, s.duration_s) * s.match_score for s in segments) / total_segment_duration
if total_segment_duration > 0 else min(s.match_score for s in segments)
)
expanded.append(
MatchResult(
beat_id=beat.beat_id,
scene_id=first.scene_id,
source_path=cfg.paths.source_movie,
in_point_s=first.in_point_s,
out_point_s=first.out_point_s,
in_point_frame=int(max(0.0, first.in_point_s) * fps),
match_score=score,
is_confirmed=all(s.is_confirmed for s in segments),
segments=tuple(segments),
)
)
return expanded
def _local_same_scene_segment_match(segment_beat, beat, segment_offset_s: float, cached: list, cfg):
"""Find a short trailer island inside scenes adjacent to neighbouring beat matches."""
from src.core.models import MatchSegment
from src.cv.frame_extractor import open_video
from src.cv.global_scan import _content_alignment_score, _content_alignment_templates
scenes = _load_scene_cache_light(cfg)
if not scenes:
return None
by_id = {r.beat_id: r for r in cached}
scene_ids: list[int] = []
for neighbour_id in (beat.beat_id - 1, beat.beat_id + 1):
result = by_id.get(neighbour_id)
if result is None:
continue
ids = [getattr(s, "scene_id", result.scene_id) for s in getattr(result, "segments", ())] or [result.scene_id]
for scene_id in ids:
if scene_id not in scene_ids:
scene_ids.append(scene_id)
if not scene_ids:
return None
templates = _content_alignment_templates(segment_beat, cfg)
if not templates:
return None
min_score = min(
cfg.cv.deep_scan.provisional_content_threshold * 0.70,
cfg.cv.deep_scan.provisional_match_threshold,
)
step_s = max(1.0 / cfg.export.edl_frame_rate, 0.04)
best: tuple[float, float, int] | None = None
with open_video(cfg.paths.source_movie) as cap:
for scene_id in scene_ids:
scene = next((s for s in scenes if int(s["scene_id"]) == int(scene_id)), None)
if scene is None:
continue
start_s = max(0.0, float(scene["start_s"]) - 0.25)
end_s = max(start_s, float(scene["end_s"]) - max(0.04, segment_beat.duration_s) + 0.25)
t = start_s
while t <= end_s:
score = _content_alignment_score(cap, t, templates, cfg)
if best is None or score > best[0]:
best = (score, t, int(scene_id))
t = round(t + step_s, 6)
if best is None or best[0] < min_score:
return None
score, in_point_s, scene_id = best
duration_s = max(0.0, min(segment_beat.duration_s, segment_beat.end_s - segment_beat.start_s))
return MatchSegment(
trailer_offset_s=segment_offset_s,
duration_s=duration_s,
scene_id=scene_id,
in_point_s=in_point_s,
out_point_s=in_point_s + duration_s,
match_score=score,
is_confirmed=score >= cfg.cv.deep_scan.match_threshold,
)
def cmd_match(args: argparse.Namespace, cfg) -> list:
from src.pipeline.matcher import run_matching
from dataclasses import replace
if getattr(args, "vision", False):
cfg = replace(cfg, vision=replace(cfg.vision, enabled=True))
if getattr(args, "no_vision", False):
cfg = replace(cfg, vision=replace(cfg.vision, enabled=False))
all_beats = _load_beats(cfg)
beats = _select_beats(all_beats, getattr(args, "beat", None))
cached = _normalize_cached_results(all_beats, _load_results(cfg), cfg) if _results_cache_path(cfg).exists() else []
# Multi-shot beats: either fade-bounded multiple islands, OR a single
# island with internal hard cuts (e.g. man-shot then back to woman). Both
# cases are routed through the per-segment match path so each shot gets
# its own source clip instead of being approximated by one continuous
# span.
multi_island_beat_ids = {
beat.beat_id
for beat in beats
if len(_reference_shot_segments(beat, cfg)) > 1
}
scan_beats, single_island_trims = _trim_beats_to_single_visual_island(beats, cfg)
scan_beats = [b for b in scan_beats if b.beat_id not in multi_island_beat_ids]
seed_in_points = (
_continuity_seed_in_points(args.beat, all_beats, cached, cfg)
if getattr(args, "beat", None) is not None
else None
)
results = []
if cfg.vision.enabled:
fast_cfg = _fast_vision_match_cfg(cfg)
results = run_matching(
fast_cfg,
scan_beats,
force_reindex=args.force_reindex,
seed_in_points=seed_in_points,
)
if len(results) < len(scan_beats) or any(
not r.is_confirmed and r.match_score < cfg.cv.deep_scan.match_threshold
for r in results
):
results_by_id = {r.beat_id: r for r in results}
remaining_beats = [
b for b in scan_beats
if (
b.beat_id not in results_by_id
or (
not results_by_id[b.beat_id].is_confirmed
and results_by_id[b.beat_id].match_score < cfg.cv.deep_scan.match_threshold
)
)
]
if remaining_beats:
full_results = run_matching(
cfg,
remaining_beats,
force_reindex=args.force_reindex,
seed_in_points=seed_in_points,
)
results = _merge_best_results(results, full_results, cfg)
results = _apply_single_island_segments(results, single_island_trims)
results = _match_unmatched_visual_segments(
results,
beats,
cached,
cfg,
skip_global_segment_scan_for=set(single_island_trims),
)
results = _attach_visual_segments(results, beats, cfg)
results = _filter_semantically_invalid_vision_matches(results, beats, cfg)
results = _recover_unmatched_beats_via_vision(results, beats, cfg)
# A targeted one-beat match must NEVER delete or modify any other beat's
# cache entry. We deliberately re-load the raw cache from disk here so
# the upstream normalisation pass (which drops entries that no longer
# pass current quality gates) cannot leak into the save: only the
# targeted beat's slot gets replaced, every other entry is written back
# bit-for-bit identical to what it was before this run.
if getattr(args, "beat", None) is not None and _results_cache_path(cfg).exists():
raw_cached = _load_results(cfg)
old_for_beat = next((r for r in raw_cached if r.beat_id == args.beat), None)
raw_cached = [r for r in raw_cached if r.beat_id != args.beat]
for result in results:
if _keeps_cached_match(old_for_beat, result, cfg):
print(
f"️ Beat {result.beat_id}: keeping existing {len(getattr(old_for_beat, 'segments', ()) or ())}segment "
f"provisional match (score {old_for_beat.match_score:.3f}) over weaker new result "
f"(score {result.match_score:.3f}, no segments)."
)
raw_cached.append(old_for_beat)
else:
raw_cached = _update_result(result, raw_cached)
results_to_save = sorted(raw_cached, key=lambda r: r.beat_id)
else:
results_to_save = results
_save_results(results_to_save, cfg)
_regenerate_cutter_report(cfg)
print(f"\n{len(results)} / {len(beats)} beats matched.")
for r in results:
print(f" Beat {r.beat_id:03d} → scene {r.scene_id:04d} "
f"in={r.in_point_s:>8.3f}s score={r.match_score:.3f}")
return results
def _update_result(new_result, results: list) -> list:
"""Replace or insert a MatchResult in the list (by beat_id)."""
updated = [r for r in results if r.beat_id != new_result.beat_id]
updated.append(new_result)
return sorted(updated, key=lambda r: r.beat_id)
def _continuity_seed_in_points(beat_id: int, beats: list, results: list, cfg) -> dict[int, list[float | tuple[float, float]]]:
beats_by_id = {b.beat_id: b for b in beats}
results_by_id = {r.beat_id: r for r in results}
target = beats_by_id.get(beat_id)
if target is None:
return {}
seeds: list[tuple[float, float]] = []
base_score = max(cfg.cv.deep_scan.coarse_candidate_threshold + 0.08, 0.92)
prev_matches = [
(b, results_by_id[b.beat_id])
for b in beats
if b.beat_id < beat_id and b.beat_id in results_by_id
]
if prev_matches:
prev_beat, prev_result = max(prev_matches, key=lambda item: item[0].beat_id)
trailer_gap_s = max(0.0, target.start_s - prev_beat.end_s)
expected = prev_result.out_point_s + trailer_gap_s
for offset in cfg.cv.deep_scan.continuity_seed_offsets_s:
offset_score = max(
cfg.cv.deep_scan.coarse_candidate_threshold,
base_score - abs(offset) * 0.06,
)
seeds.append((expected + offset, offset_score))
next_matches = [
(b, results_by_id[b.beat_id])
for b in beats
if b.beat_id > beat_id and b.beat_id in results_by_id
]
if next_matches:
next_beat, next_result = min(next_matches, key=lambda item: item[0].beat_id)
trailer_gap_s = max(0.0, next_beat.start_s - target.end_s)
expected = next_result.in_point_s - trailer_gap_s - target.duration_s
for offset in cfg.cv.deep_scan.continuity_seed_offsets_s:
offset_score = max(
cfg.cv.deep_scan.coarse_candidate_threshold,
base_score - abs(offset) * 0.06,
)
seeds.append((expected - offset, offset_score))
unique: dict[float, float] = {}
for seed_t, seed_score in seeds:
rounded = round(max(0.0, seed_t), 3)
unique[rounded] = max(unique.get(rounded, 0.0), seed_score)
points = [(seed_t, score) for seed_t, score in sorted(unique.items())]
return {beat_id: points} if points else {}
def cmd_rematch(args: argparse.Namespace, cfg) -> None:
"""
Re-run automatic matching for ONE beat.
python cli.py rematch --beat 5 # re-scan CV for beat 5
python cli.py rematch --beat 5 --threshold 0.40 # relax threshold
"""
beat_id = args.beat
beats = _load_beats(cfg)
results = _load_results(cfg) if _results_cache_path(cfg).exists() else []
beat = next((b for b in beats if b.beat_id == beat_id), None)
if beat is None:
print(f"\u274c Beat {beat_id} not found. Run 'analyze' first.")
return
# ---- Refine an already acceptable cached match -------------------------
if args.refine:
current = next((r for r in results if r.beat_id == beat_id), None)
if current is None:
print(f"❌ Beat {beat_id} has no cached match to refine. Run 'match --beat {beat_id}' first.")
return
from src.cv.content_align import align_cached_match_by_content
refined_in_s, sequence_score = align_cached_match_by_content(
beat,
current.in_point_s,
cfg,
search_window_s=args.refine_window,
)
usable_duration_s = max(0.0, current.out_point_s - current.in_point_s)
span_score = sequence_score
scene_data = _scene_for_time_light(_load_scene_cache_light(cfg), refined_in_s, cfg)
out_point_s = refined_in_s + usable_duration_s
if scene_data is not None:
out_point_s = min(out_point_s, float(scene_data["end_s"]))
matchable_duration_s = beat.duration_s
duration_coverage = (
max(0.0, out_point_s - refined_in_s) / matchable_duration_s
if matchable_duration_s > 0 else 0.0
)
if duration_coverage < cfg.cv.deep_scan.min_duration_coverage:
print(
f"❌ Beat {beat_id} refined candidate rejected: "
f"duration coverage {duration_coverage:.0%} < "
f"{cfg.cv.deep_scan.min_duration_coverage:.0%}"
)
return
try:
from src.cv.frame_extractor import get_video_info
fps = float(get_video_info(cfg.paths.source_movie)["fps"]) or cfg.export.edl_frame_rate
except Exception:
fps = cfg.export.edl_frame_rate
from src.core.models import MatchResult
refined = MatchResult(
beat_id=beat_id,
scene_id=int(scene_data["scene_id"]) if scene_data is not None else current.scene_id,
source_path=current.source_path,
in_point_s=max(0.0, refined_in_s),
out_point_s=out_point_s,
in_point_frame=int(max(0.0, refined_in_s) * fps),
match_score=max(sequence_score, span_score),
match_location=current.match_location,
is_confirmed=max(sequence_score, span_score) >= cfg.cv.deep_scan.match_threshold,
)
results = _update_result(refined, results)
_save_results(results, cfg)
print(
f"✅ Beat {beat_id} refined → "
f"in={refined.in_point_s:.3f}s, out={refined.out_point_s:.3f}s, "
f"sequence_score={refined.match_score:.3f}"
)
return
# ---- Re-run CV with optional threshold override ------------------------
from dataclasses import replace as dc_replace
run_cfg = cfg
if args.threshold is not None:
run_cfg = dc_replace(
cfg,
cv=dc_replace(
cfg.cv,
deep_scan=dc_replace(cfg.cv.deep_scan, match_threshold=args.threshold),
),
)
print(f"️ threshold overridden to {args.threshold} for beat {beat_id}")
from src.cv.global_scan import run_global_scan
seed_in_points = _continuity_seed_in_points(beat_id, beats, results, run_cfg)
matches = run_global_scan([beat], run_cfg, seed_in_points=seed_in_points)
if not matches:
print(f"❌ Beat {beat_id}: no match. Try --threshold 0.40.")
return
match = matches[0]
results = _update_result(match, results)
_save_results(results, cfg)
print(f"✅ Beat {beat_id} rematched → (in={match.in_point_s:.3f}s, score={match.match_score:.3f})")
def cmd_report(args: argparse.Namespace, cfg) -> None:
if getattr(args, "beat", None) is not None:
print(f"\n⚠️ Generating cutter report for all beats (ignoring --beat {args.beat}).")
_regenerate_cutter_report(cfg)
project_root = cfg.paths.cache_dir.parent
print(f"\n✅ Report → {project_root / 'CUTTER_REPORT.html'} and CUTTER_REPORT.md")
def cmd_export(args: argparse.Namespace, cfg) -> None:
from src.export.edl_writer import write_edl
from src.export.fcpxml_writer import write_fcpxml
from src.pipeline.matcher import build_timeline
beats = _select_beats(_load_beats(cfg), getattr(args, "beat", None))
beat_ids = {b.beat_id for b in beats} if getattr(args, "beat", None) is not None else None
results = _select_results(_normalize_cached_results(_load_beats(cfg), _load_results(cfg), cfg), beat_ids)
if getattr(args, "beat", None) is not None and not results:
print(f"❌ Beat {args.beat} has no cached match. Run 'match --beat {args.beat}' first.")
return
timeline = build_timeline(beats, results, cfg)
fmt = args.format or cfg.export.output_format
beat_id = getattr(args, "beat", None)
out_stem = (
f"{cfg.paths.reference_trailer.stem}_beat_{beat_id:03d}"
if beat_id is not None
else timeline.title
)
if fmt in ("fcpxml", "both"):
out = write_fcpxml(timeline, cfg, output_path=cfg.paths.output_dir / f"{out_stem}.fcpxml")
print(f"✅ FCPXML → {out}")
if fmt in ("edl", "both"):
out = write_edl(timeline, cfg, output_path=cfg.paths.output_dir / f"{out_stem}.edl")
print(f"✅ EDL → {out}")
def cmd_run(args: argparse.Namespace, cfg) -> None:
"""Full pipeline: analyze → match → report → export."""
cmd_analyze(args, cfg)
cmd_match(args, cfg)
cmd_report(args, cfg)
cmd_export(args, cfg)
# ---------------------------------------------------------------------------
# Argument parser
# ---------------------------------------------------------------------------
def _build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
prog="ai-trailer",
description="AI Trailer Generator v2 — Pure CV scene matching",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument(
"--config", type=Path, default=Path("config.toml"),
metavar="CONFIG", help="Path to config.toml (default: ./config.toml)",
)
parser.add_argument(
"--log-level", default="INFO",
choices=["DEBUG", "INFO", "WARNING", "ERROR"],
help="Logging verbosity (default: INFO)",
)
sub = parser.add_subparsers(dest="command", required=True)
# analyze
p_analyze = sub.add_parser("analyze", help="Detect trailer beats + fingerprint")
p_analyze.add_argument("--no-audio", action="store_true",
help="Skip Whisper (only affects beat labels, not matching)")
p_analyze.add_argument("--no-llm", action="store_true",
help="Skip LLM classification (only affects beat labels)")
# match
p_match = sub.add_parser("match", help="Run 2-phase CV matching")
p_match.add_argument("--force-reindex", action="store_true",
help="Ignore scene cache and re-run PySceneDetect")
p_match.add_argument("--beat", type=int,
help="Match only one beat and merge it into the cached results")
p_match.add_argument("--vision", action="store_true",
help="Enable cached vision descriptions for extra automatic search seeds")
p_match.add_argument("--no-vision", action="store_true",
help="Disable vision seeding even if [vision].enabled is true")
# rematch
p_rematch = sub.add_parser("rematch", help="Re-run or override matching for one beat")
p_rematch.add_argument("--beat", type=int, required=True, help="Beat ID to rematch")
p_rematch.add_argument("--threshold", type=float, default=None, help="Override match_threshold")
p_rematch.add_argument("--refine", action="store_true",
help="Refine the cached match by measuring a local image-content offset")
p_rematch.add_argument("--refine-window", type=float, default=None,
help="Seconds to search around the cached in-point when using --refine")
# report
p_report = sub.add_parser("report", help="Generate HTML visual comparison report")
p_report.add_argument("--beat", type=int, help="Report only one beat")
# export
p_export = sub.add_parser("export", help="Export timeline from cached results")
p_export.add_argument("--format", choices=["fcpxml", "edl", "both"],
help="Override [export] output_format from config")
p_export.add_argument("--beat", type=int, help="Export only one beat")
# run
p_run = sub.add_parser("run", help="Full pipeline: analyze → match → export")
p_run.add_argument("--no-audio", action="store_true")
p_run.add_argument("--no-llm", action="store_true")
p_run.add_argument("--force-reindex", action="store_true")
p_run.add_argument("--vision", action="store_true")
p_run.add_argument("--no-vision", action="store_true")
p_run.add_argument("--format", choices=["fcpxml", "edl", "both"])
p_run.add_argument("--beat", type=int,
help="Run match/report/export for only one cached beat")
return parser
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
def main() -> None:
_ensure_utf8_console()
parser = _build_parser()
args = parser.parse_args()
_setup_logging(args.log_level)
from src.core.config import load_config
cfg = load_config(args.config)
dispatch = {
"analyze": cmd_analyze,
"match": cmd_match,
"rematch": cmd_rematch,
"report": cmd_report,
"export": cmd_export,
"run": cmd_run,
}
handler = dispatch[args.command]
handler(args, cfg)
if __name__ == "__main__":
main()