cc27208d2a
Two issues fixed: 1. Beats with internal hard cuts (e.g. man-shot then back to woman) were being approximated by a single source clip because the multi-segment path only triggered for fade-bounded multi-island beats. Added _reference_shot_segments(), which returns the shot ranges by splitting each visible island at detected internal cuts. The multi-island gate in cmd_match and the per-island loop in _match_unmatched_visual_segments now use shots, so any beat with cuts > 0 produces one MatchSegment per shot. Each shot is matched independently against the source movie. Effect on Beat 10: 1 segment (3.32 s in scene 558) -> 3 segments covering shots 0-0.88 s, 0.88-2.64 s, 2.64-3.32 s in scenes 554, 559, 556 respectively, with the previously missing "back to woman" cut now correctly placed in scene 556. 2. Targeted --beat N runs were silently dropping cache entries for other beats whose old scores no longer pass current quality gates (_normalize_cached_results runs at load time and removes them). The save path now re-loads the raw cache from disk and writes back every non-targeted beat verbatim, so a per-beat run can never regress another beat's stored match. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1874 lines
75 KiB
Python
1874 lines
75 KiB
Python
"""
|
||
cli.py — AI Trailer Generator v2 — Command-Line Interface
|
||
|
||
Usage:
|
||
python cli.py analyze [--config CONFIG] [--no-audio] [--no-llm]
|
||
python cli.py match [--config CONFIG] [--force-reindex]
|
||
python cli.py rematch --beat N [--threshold F] [--refine]
|
||
python cli.py report [--config CONFIG]
|
||
python cli.py run [--config CONFIG] [--force-reindex] [--no-audio] [--no-llm]
|
||
python cli.py export [--config CONFIG] [--format fcpxml|edl|both]
|
||
|
||
On --no-audio / --no-llm:
|
||
These flags do NOT affect matching quality.
|
||
Whisper and the LLM only assign narrative labels (HOOK/SETUP/CLIMAX)
|
||
to beats in the export metadata. The CV pipeline is identical either way.
|
||
Use them for fast iterations: they skip large model downloads.
|
||
|
||
All heavy imports are deferred so --help is instant.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import json
|
||
import logging
|
||
import sys
|
||
from pathlib import Path
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Logging setup
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _setup_logging(level: str = "INFO") -> None:
|
||
# Force UTF-8 for Windows console emoji printing
|
||
if sys.stdout.encoding != 'utf-8':
|
||
sys.stdout.reconfigure(encoding='utf-8')
|
||
logging.basicConfig(
|
||
format="%(asctime)s %(levelname)-8s %(name)s — %(message)s",
|
||
datefmt="%H:%M:%S",
|
||
level=getattr(logging, level.upper(), logging.INFO),
|
||
stream=sys.stdout,
|
||
)
|
||
logging.getLogger("PIL").setLevel(logging.WARNING)
|
||
|
||
|
||
def _ensure_utf8_console() -> None:
|
||
"""Make argparse help safe on Windows before logging is configured."""
|
||
if sys.stdout.encoding != "utf-8":
|
||
sys.stdout.reconfigure(encoding="utf-8")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Cache helpers (match results ↔ JSON)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _results_cache_path(cfg: "AppConfig") -> Path: # type: ignore[name-defined]
|
||
return cfg.paths.cache_dir / "match_results.json"
|
||
|
||
|
||
def _save_results(results: list, cfg: "AppConfig") -> None: # type: ignore[name-defined]
|
||
from src.core.models import MatchResult
|
||
data = [
|
||
{
|
||
"beat_id": r.beat_id,
|
||
"scene_id": r.scene_id,
|
||
"source_path": str(r.source_path),
|
||
"in_point_s": r.in_point_s,
|
||
"out_point_s": r.out_point_s,
|
||
"in_point_frame": r.in_point_frame,
|
||
"match_score": r.match_score,
|
||
"match_location": list(r.match_location),
|
||
"is_confirmed": r.is_confirmed,
|
||
"segments": [
|
||
{
|
||
"trailer_offset_s": s.trailer_offset_s,
|
||
"duration_s": s.duration_s,
|
||
"scene_id": s.scene_id,
|
||
"in_point_s": s.in_point_s,
|
||
"out_point_s": s.out_point_s,
|
||
"match_score": s.match_score,
|
||
"is_confirmed": s.is_confirmed,
|
||
}
|
||
for s in getattr(r, "segments", ())
|
||
],
|
||
}
|
||
for r in results
|
||
]
|
||
p = _results_cache_path(cfg)
|
||
p.parent.mkdir(parents=True, exist_ok=True)
|
||
p.write_text(json.dumps(data, indent=2), encoding="utf-8")
|
||
logging.getLogger(__name__).info("Match results cached → %s", p)
|
||
|
||
|
||
def _regenerate_cutter_report(cfg: "AppConfig") -> None: # type: ignore[name-defined]
|
||
"""Re-render CUTTER_REPORT.{md,html} after each cache write so they stay in sync."""
|
||
try:
|
||
from scripts.generate_cutter_report import render_report
|
||
except Exception as exc:
|
||
logging.getLogger(__name__).warning("Cutter report regen skipped: %s", exc)
|
||
return
|
||
try:
|
||
project_root = cfg.paths.cache_dir.parent
|
||
md, html = render_report(project_root, with_stills=True, with_clips=True)
|
||
(project_root / "CUTTER_REPORT.md").write_text(md, encoding="utf-8")
|
||
(project_root / "CUTTER_REPORT.html").write_text(html, encoding="utf-8")
|
||
logging.getLogger(__name__).info("Cutter report regenerated (md + html)")
|
||
except Exception as exc:
|
||
logging.getLogger(__name__).warning("Cutter report regen failed: %s", exc)
|
||
|
||
|
||
def _load_results(cfg: "AppConfig") -> list: # type: ignore[name-defined]
|
||
from src.core.models import MatchResult, MatchSegment
|
||
p = _results_cache_path(cfg)
|
||
if not p.exists():
|
||
raise FileNotFoundError(f"No cached results at {p}. Run 'match' first.")
|
||
raw = json.loads(p.read_text(encoding="utf-8"))
|
||
return [
|
||
MatchResult(
|
||
beat_id=d["beat_id"],
|
||
scene_id=d["scene_id"],
|
||
source_path=Path(d["source_path"]),
|
||
in_point_s=d["in_point_s"],
|
||
out_point_s=d["out_point_s"],
|
||
in_point_frame=d["in_point_frame"],
|
||
match_score=d["match_score"],
|
||
match_location=tuple(d["match_location"]),
|
||
is_confirmed=d.get("is_confirmed", True),
|
||
segments=tuple(
|
||
MatchSegment(
|
||
trailer_offset_s=float(s["trailer_offset_s"]),
|
||
duration_s=float(s["duration_s"]),
|
||
scene_id=int(s["scene_id"]),
|
||
in_point_s=float(s["in_point_s"]),
|
||
out_point_s=float(s["out_point_s"]),
|
||
match_score=float(s["match_score"]),
|
||
is_confirmed=bool(s.get("is_confirmed", True)),
|
||
)
|
||
for s in d.get("segments", ())
|
||
),
|
||
)
|
||
for d in raw
|
||
]
|
||
|
||
|
||
def _load_scene_cache_light(cfg) -> list[dict]:
|
||
p = cfg.paths.cache_dir / "scene_index.json"
|
||
if not p.exists():
|
||
return []
|
||
return json.loads(p.read_text(encoding="utf-8"))
|
||
|
||
|
||
def _scene_fps_light(scene: dict, cfg) -> float:
|
||
duration_s = max(0.0, float(scene["end_s"]) - float(scene["start_s"]))
|
||
frame_count = max(0, int(scene["end_frame"]) - int(scene["start_frame"]))
|
||
return frame_count / duration_s if duration_s > 0 and frame_count > 0 else cfg.export.edl_frame_rate
|
||
|
||
|
||
def _scene_for_time_light(scenes: list[dict], t_sec: float, cfg) -> dict | None:
|
||
for idx, scene in enumerate(scenes):
|
||
if float(scene["start_s"]) <= t_sec < float(scene["end_s"]):
|
||
if (
|
||
float(scene["end_s"]) - t_sec <= cfg.cv.deep_scan.scene_boundary_epsilon_s
|
||
and idx + 1 < len(scenes)
|
||
):
|
||
return scenes[idx + 1]
|
||
return scene
|
||
return None
|
||
|
||
|
||
def _scene_by_id_light(scenes: list[dict], scene_id: int) -> dict | None:
|
||
return next((s for s in scenes if int(s["scene_id"]) == scene_id), None)
|
||
|
||
|
||
def _contiguous_duration_light(beat, in_point_s: float, scenes: list[dict], cfg, matchable_duration_s: float) -> float:
|
||
if matchable_duration_s <= 0:
|
||
return 0.0
|
||
try:
|
||
from src.cv.global_scan import _reference_internal_cut_offsets
|
||
cut_offsets = _reference_internal_cut_offsets(beat, cfg)
|
||
except Exception:
|
||
cut_offsets = []
|
||
|
||
start_idx = None
|
||
for idx, scene in enumerate(scenes):
|
||
if float(scene["start_s"]) <= in_point_s < float(scene["end_s"]):
|
||
start_idx = idx
|
||
break
|
||
if start_idx is None:
|
||
return 0.0
|
||
|
||
target_end = in_point_s + matchable_duration_s
|
||
current_end = in_point_s
|
||
for scene in scenes[start_idx:]:
|
||
scene_end = float(scene["end_s"])
|
||
if target_end <= scene_end:
|
||
return matchable_duration_s
|
||
|
||
boundary_offset = scene_end - in_point_s
|
||
if not any(
|
||
abs(boundary_offset - cut_offset) <= cfg.vision.multi_shot_boundary_tolerance_s
|
||
for cut_offset in cut_offsets
|
||
):
|
||
tail_s = max(0.0, cfg.cv.deep_scan.trim_tail_frames / _scene_fps_light(scene, cfg))
|
||
return max(0.0, scene_end - in_point_s - tail_s)
|
||
current_end = scene_end
|
||
|
||
return max(0.0, current_end - in_point_s)
|
||
|
||
|
||
def _normalize_cached_results(beats: list, results: list, cfg) -> list:
|
||
"""
|
||
Re-apply current generic timing rules to cached results.
|
||
|
||
This keeps old automatic cache entries from preserving obsolete scene-boundary
|
||
or tail-trim behavior without introducing manual per-beat truth.
|
||
"""
|
||
from dataclasses import replace
|
||
|
||
scenes = _load_scene_cache_light(cfg)
|
||
if not scenes:
|
||
return results
|
||
|
||
beats_by_id = {b.beat_id: b for b in beats}
|
||
normalized = []
|
||
for result in results:
|
||
beat = beats_by_id.get(result.beat_id)
|
||
if getattr(result, "segments", ()):
|
||
segment_duration = sum(max(0.0, float(s.duration_s)) for s in result.segments)
|
||
weighted_score = (
|
||
sum(max(0.0, float(s.duration_s)) * float(s.match_score) for s in result.segments)
|
||
/ segment_duration
|
||
if segment_duration > 0 else result.match_score
|
||
)
|
||
if weighted_score < cfg.cv.deep_scan.provisional_match_threshold:
|
||
continue
|
||
if beat is not None and beat.duration_s > 0:
|
||
visible_duration = sum(
|
||
max(0.0, end_s - start_s)
|
||
for start_s, end_s in _reference_scoreable_segments(beat, cfg)
|
||
)
|
||
coverage_target = visible_duration if visible_duration > 0 else beat.duration_s
|
||
coverage = segment_duration / coverage_target
|
||
if coverage < cfg.cv.deep_scan.min_duration_coverage:
|
||
continue
|
||
normalized.append(replace(result, match_score=weighted_score))
|
||
continue
|
||
|
||
if result.match_score < cfg.cv.deep_scan.provisional_match_threshold:
|
||
continue
|
||
|
||
scene = _scene_for_time_light(scenes, result.in_point_s, cfg)
|
||
declared_scene = _scene_by_id_light(scenes, result.scene_id)
|
||
|
||
# If the automatic matcher selected a scene but its in-point sits just
|
||
# before that scene's detected start, treat this as scene-boundary drift
|
||
# and clamp to the declared scene. This is generic: no beat IDs, no
|
||
# manual timestamps, just consistent scene/time reconciliation.
|
||
if declared_scene is not None:
|
||
declared_start = float(declared_scene["start_s"])
|
||
declared_end = float(declared_scene["end_s"])
|
||
declared_fps = _scene_fps_light(declared_scene, cfg)
|
||
boundary_tolerance_s = (
|
||
cfg.cv.deep_scan.scene_boundary_epsilon_s
|
||
+ cfg.cv.deep_scan.start_preroll_frames / declared_fps
|
||
)
|
||
if declared_start - boundary_tolerance_s <= result.in_point_s < declared_end:
|
||
scene = declared_scene
|
||
|
||
if beat is None or scene is None:
|
||
normalized.append(result)
|
||
continue
|
||
|
||
fps = _scene_fps_light(scene, cfg)
|
||
adjusted_in_s = result.in_point_s
|
||
scene_changed = int(scene["scene_id"]) != result.scene_id
|
||
starts_before_scene = result.in_point_s < float(scene["start_s"])
|
||
if scene_changed or starts_before_scene or result.duration_s <= 0.12:
|
||
adjusted_in_s = max(0.0, result.in_point_s - (cfg.cv.deep_scan.start_preroll_frames / fps))
|
||
adjusted_in_s = max(float(scene["start_s"]), adjusted_in_s)
|
||
scene = _scene_for_time_light(scenes, adjusted_in_s, cfg) or scene
|
||
fps = _scene_fps_light(scene, cfg)
|
||
|
||
matchable_duration_s = beat.duration_s
|
||
try:
|
||
from src.cv.global_scan import estimate_matchable_reference_duration
|
||
matchable_duration_s = estimate_matchable_reference_duration(beat, cfg)
|
||
except Exception:
|
||
pass
|
||
|
||
tail_s = max(0.0, cfg.cv.deep_scan.trim_tail_frames / fps)
|
||
single_scene_duration_s = max(0.0, min(beat.duration_s, float(scene["end_s"]) - adjusted_in_s) - tail_s)
|
||
contiguous_duration_s = _contiguous_duration_light(
|
||
beat,
|
||
adjusted_in_s,
|
||
scenes,
|
||
cfg,
|
||
matchable_duration_s,
|
||
)
|
||
max_duration_s = max(single_scene_duration_s, min(beat.duration_s, contiguous_duration_s))
|
||
|
||
normalized_result = result
|
||
if (
|
||
scene_changed
|
||
or starts_before_scene
|
||
or result.duration_s <= 0.12
|
||
or result.out_point_s > adjusted_in_s + max_duration_s + (1.0 / fps)
|
||
):
|
||
normalized_result = replace(
|
||
result,
|
||
scene_id=int(scene["scene_id"]),
|
||
in_point_s=adjusted_in_s,
|
||
out_point_s=adjusted_in_s + max_duration_s,
|
||
in_point_frame=int(adjusted_in_s * fps),
|
||
)
|
||
|
||
coverage = (
|
||
max(0.0, normalized_result.duration_s) / matchable_duration_s
|
||
if matchable_duration_s > 0 else 0.0
|
||
)
|
||
if coverage < cfg.cv.deep_scan.min_duration_coverage:
|
||
continue
|
||
|
||
try:
|
||
from src.cv.content_align import align_cached_match_by_content
|
||
_, content_score = align_cached_match_by_content(
|
||
beat,
|
||
normalized_result.in_point_s,
|
||
cfg,
|
||
search_window_s=min(0.8, cfg.cv.deep_scan.content_align_window_seconds),
|
||
fps=12.5,
|
||
)
|
||
content_gate = (
|
||
cfg.cv.deep_scan.provisional_content_threshold
|
||
if normalized_result.is_confirmed
|
||
else min(cfg.cv.deep_scan.provisional_content_threshold, cfg.vision.content_threshold)
|
||
)
|
||
if content_score < content_gate:
|
||
continue
|
||
if content_score < cfg.cv.deep_scan.match_threshold and normalized_result.is_confirmed:
|
||
normalized_result = replace(
|
||
normalized_result,
|
||
match_score=min(normalized_result.match_score, content_score),
|
||
is_confirmed=False,
|
||
)
|
||
except Exception:
|
||
pass
|
||
|
||
normalized.append(normalized_result)
|
||
|
||
return normalized
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Command handlers
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _build_transcribe_callback(cfg):
|
||
"""Return a transcribe_callback closure, or None if audio is disabled."""
|
||
from src.audio.transcriber import transcribe_video
|
||
|
||
def _cb(path, start_s, end_s, offset_s):
|
||
return transcribe_video(path, cfg, start_s=start_s, end_s=end_s, time_offset_s=offset_s)
|
||
|
||
return _cb
|
||
|
||
|
||
def _build_classify_callback(cfg):
|
||
"""Return a classify_callback closure."""
|
||
from src.llm.dramaturg import classify_beats
|
||
|
||
def _cb(beats):
|
||
return classify_beats(beats, cfg)
|
||
|
||
return _cb
|
||
|
||
|
||
def cmd_analyze(args: argparse.Namespace, cfg) -> list:
|
||
from src.pipeline.trailer_analyzer import analyze_reference_trailer
|
||
|
||
transcribe_cb = _build_transcribe_callback(cfg) if not args.no_audio else None
|
||
classify_cb = _build_classify_callback(cfg) if not args.no_llm else None
|
||
|
||
beats = analyze_reference_trailer(
|
||
cfg,
|
||
transcribe_callback=transcribe_cb,
|
||
classify_callback=classify_cb,
|
||
)
|
||
|
||
# Persist beats for downstream commands (including histogram bytes as hex)
|
||
beats_cache = cfg.paths.cache_dir / "trailer_beats.json"
|
||
beats_cache.parent.mkdir(parents=True, exist_ok=True)
|
||
beats_data = [
|
||
{
|
||
"beat_id": b.beat_id,
|
||
"start_s": b.start_s,
|
||
"end_s": b.end_s,
|
||
"start_frame": b.start_frame,
|
||
"end_frame": b.end_frame,
|
||
"beat_type": b.beat_type.name,
|
||
"dialogue": [{"start_s": d.start_s, "end_s": d.end_s, "text": d.text} for d in b.dialogue],
|
||
"phash": b.phash,
|
||
"luma_hist": b.luma_hist.hex() if b.luma_hist else None,
|
||
"sat_hist": b.sat_hist.hex() if b.sat_hist else None,
|
||
}
|
||
for b in beats
|
||
]
|
||
beats_cache.write_text(json.dumps(beats_data, indent=2, ensure_ascii=False), encoding="utf-8")
|
||
print(f"\n\u2705 {len(beats)} beats analyzed \u2192 {beats_cache}")
|
||
return beats
|
||
|
||
|
||
def _load_beats(cfg) -> list:
|
||
from src.core.models import BeatType, DialogueLine, TrailerBeat
|
||
|
||
p = cfg.paths.cache_dir / "trailer_beats.json"
|
||
if not p.exists():
|
||
raise FileNotFoundError(f"No cached beats at {p}. Run 'analyze' first.")
|
||
|
||
raw = json.loads(p.read_text(encoding="utf-8"))
|
||
beats = []
|
||
for d in raw:
|
||
dialogue = tuple(
|
||
DialogueLine(start_s=x["start_s"], end_s=x["end_s"], text=x["text"])
|
||
for x in d.get("dialogue", [])
|
||
)
|
||
beats.append(TrailerBeat(
|
||
beat_id=d["beat_id"],
|
||
trailer_path=cfg.paths.reference_trailer,
|
||
start_s=d["start_s"],
|
||
end_s=d["end_s"],
|
||
start_frame=d["start_frame"],
|
||
end_frame=d["end_frame"],
|
||
beat_type=BeatType[d.get("beat_type", "UNKNOWN")],
|
||
dialogue=dialogue,
|
||
phash=d.get("phash"),
|
||
luma_hist=bytes.fromhex(d["luma_hist"]) if d.get("luma_hist") else None,
|
||
sat_hist= bytes.fromhex(d["sat_hist"]) if d.get("sat_hist") else None,
|
||
))
|
||
return beats
|
||
|
||
|
||
def _select_beats(beats: list, beat_id: int | None) -> list:
|
||
"""Return all beats or exactly one requested beat."""
|
||
if beat_id is None:
|
||
return beats
|
||
selected = [b for b in beats if b.beat_id == beat_id]
|
||
if not selected:
|
||
raise ValueError(f"Beat {beat_id} not found. Run 'analyze' first.")
|
||
return selected
|
||
|
||
|
||
def _select_results(results: list, beat_ids: set[int] | None) -> list:
|
||
"""Return all results or only results for the requested beats."""
|
||
if beat_ids is None:
|
||
return results
|
||
return [r for r in results if r.beat_id in beat_ids]
|
||
|
||
|
||
def _find_scene_for_in_point(cfg, in_point_s: float):
|
||
from src.cv.scene_indexer import build_scene_index
|
||
|
||
scenes = build_scene_index(cfg)
|
||
for idx, scene in enumerate(scenes):
|
||
if scene.start_s <= in_point_s < scene.end_s:
|
||
if (
|
||
scene.end_s - in_point_s <= cfg.cv.deep_scan.scene_boundary_epsilon_s
|
||
and idx + 1 < len(scenes)
|
||
):
|
||
return scenes[idx + 1]
|
||
return scene
|
||
return None
|
||
|
||
|
||
def _reference_scoreable_segments(beat, cfg) -> list[tuple[float, float]]:
|
||
"""Find visible source-matchable islands inside a trailer beat."""
|
||
from src.cv.frame_extractor import grab_frame_at_path
|
||
from src.cv.global_scan import (
|
||
_corr_same_size,
|
||
_is_scoreable_reference_frame,
|
||
_prepare_haystack,
|
||
_reference_visibility_stats,
|
||
)
|
||
|
||
def is_visible(frame) -> bool:
|
||
if frame is None:
|
||
return False
|
||
mean_luma, p90_luma, contrast = _reference_visibility_stats(frame, cfg)
|
||
visible_luma = (
|
||
mean_luma >= cfg.cv.deep_scan.scoreable_luma_mean_min * 0.45
|
||
or p90_luma >= cfg.cv.deep_scan.scoreable_luma_p90_min * 0.50
|
||
)
|
||
visible_contrast = contrast >= max(8.0, cfg.cv.deep_scan.scoreable_contrast_min * 0.30)
|
||
return visible_luma and visible_contrast
|
||
|
||
step_s = max(0.08, cfg.cv.deep_scan.span_sample_step_s)
|
||
min_segment_s = max(0.32, step_s * 3.0)
|
||
bridge_gap_s = max(0.18, step_s * 2.0)
|
||
raw: list[tuple[float, float]] = []
|
||
start: float | None = None
|
||
last_seen: float | None = None
|
||
t = 0.0
|
||
while t <= beat.duration_s:
|
||
frame = grab_frame_at_path(beat.trailer_path, beat.start_s + t)
|
||
scoreable = frame is not None and _is_scoreable_reference_frame(frame, cfg)
|
||
if scoreable:
|
||
if start is None:
|
||
start = t
|
||
last_seen = t
|
||
elif start is not None and last_seen is not None and t - last_seen > bridge_gap_s:
|
||
end = min(beat.duration_s, last_seen + step_s)
|
||
if end - start >= min_segment_s:
|
||
raw.append((start, end))
|
||
start = None
|
||
last_seen = None
|
||
t = round(t + step_s, 6)
|
||
|
||
if start is not None and last_seen is not None:
|
||
end = min(beat.duration_s, last_seen + step_s)
|
||
if end - start >= min_segment_s:
|
||
raw.append((start, end))
|
||
|
||
expanded: list[tuple[float, float]] = []
|
||
same_shot_corr_min = 0.72
|
||
for start_s, end_s in raw:
|
||
start_anchor = grab_frame_at_path(beat.trailer_path, beat.start_s + start_s)
|
||
end_anchor = grab_frame_at_path(beat.trailer_path, beat.start_s + max(start_s, end_s - step_s))
|
||
start_feature = _prepare_haystack(start_anchor, cfg) if start_anchor is not None else None
|
||
end_feature = _prepare_haystack(end_anchor, cfg) if end_anchor is not None else None
|
||
|
||
soft_start = start_s
|
||
t = round(start_s - step_s, 6)
|
||
while t >= 0.0:
|
||
frame = grab_frame_at_path(beat.trailer_path, beat.start_s + t)
|
||
if not is_visible(frame):
|
||
break
|
||
if start_feature is not None and _corr_same_size(_prepare_haystack(frame, cfg), start_feature) < same_shot_corr_min:
|
||
break
|
||
soft_start = max(0.0, t)
|
||
t = round(t - step_s, 6)
|
||
|
||
soft_end = end_s
|
||
t = round(end_s, 6)
|
||
while t <= beat.duration_s + 1e-6:
|
||
frame = grab_frame_at_path(beat.trailer_path, beat.start_s + t)
|
||
if not is_visible(frame):
|
||
break
|
||
if end_feature is not None and _corr_same_size(_prepare_haystack(frame, cfg), end_feature) < same_shot_corr_min:
|
||
break
|
||
soft_end = min(beat.duration_s, t + step_s)
|
||
t = round(t + step_s, 6)
|
||
|
||
if soft_end - soft_start >= min_segment_s:
|
||
expanded.append((soft_start, soft_end))
|
||
|
||
merged: list[tuple[float, float]] = []
|
||
for start_s, end_s in expanded:
|
||
if merged and start_s - merged[-1][1] <= bridge_gap_s:
|
||
merged[-1] = (merged[-1][0], max(merged[-1][1], end_s))
|
||
else:
|
||
merged.append((start_s, end_s))
|
||
return merged
|
||
|
||
|
||
def _reference_shot_segments(beat, cfg) -> list[tuple[float, float]]:
|
||
"""Source-matchable shot ranges inside a trailer beat.
|
||
|
||
Like ``_reference_scoreable_segments`` but additionally splits each
|
||
visible island at detected hard cuts (frame-to-frame correlation drops
|
||
below ``cfg.vision.multi_shot_cut_corr_threshold``). A shot is a
|
||
fade-bounded AND cut-bounded sub-range of the trailer beat: this is
|
||
what we want to match against an individual source clip.
|
||
|
||
Tiny sub-shots (below ``min_shot_s``) are merged into the previous shot
|
||
so noisy cut detection doesn't fragment a real shot into useless slivers.
|
||
"""
|
||
from src.cv.global_scan import _reference_internal_cut_offsets
|
||
|
||
islands = _reference_scoreable_segments(beat, cfg)
|
||
try:
|
||
cut_offsets = sorted(_reference_internal_cut_offsets(beat, cfg))
|
||
except Exception:
|
||
cut_offsets = []
|
||
if not cut_offsets:
|
||
return islands
|
||
|
||
min_shot_s = max(0.4, cfg.cv.deep_scan.span_sample_step_s * 4.0)
|
||
shots: list[tuple[float, float]] = []
|
||
for start_s, end_s in islands:
|
||
boundaries = [start_s]
|
||
for cut in cut_offsets:
|
||
if start_s + 1e-3 < cut < end_s - 1e-3:
|
||
boundaries.append(cut)
|
||
boundaries.append(end_s)
|
||
for i in range(len(boundaries) - 1):
|
||
seg_start = boundaries[i]
|
||
seg_end = boundaries[i + 1]
|
||
if seg_end - seg_start < min_shot_s and shots and shots[-1][1] >= seg_start - 1e-3:
|
||
# merge into previous if the new piece is too short
|
||
shots[-1] = (shots[-1][0], seg_end)
|
||
elif seg_end - seg_start >= min_shot_s:
|
||
shots.append((seg_start, seg_end))
|
||
elif shots:
|
||
shots[-1] = (shots[-1][0], seg_end)
|
||
else:
|
||
shots.append((seg_start, seg_end))
|
||
return shots if shots else islands
|
||
|
||
|
||
def _trim_beats_to_single_visual_island(beats: list, cfg) -> tuple[list, dict[int, tuple[float, float]]]:
|
||
"""Use a single visible island as the primary match target for faded beats."""
|
||
from dataclasses import replace
|
||
|
||
trimmed = []
|
||
trims: dict[int, tuple[float, float]] = {}
|
||
frame_s = 1.0 / max(1.0, float(cfg.export.edl_frame_rate))
|
||
for beat in beats:
|
||
islands = _reference_scoreable_segments(beat, cfg)
|
||
if len(islands) == 1:
|
||
start_s, end_s = islands[0]
|
||
island_duration_s = max(0.0, end_s - start_s)
|
||
has_real_trim = (
|
||
start_s > frame_s * 1.5
|
||
or beat.duration_s - end_s > frame_s * 1.5
|
||
)
|
||
if island_duration_s > 0.0 and has_real_trim:
|
||
trimmed.append(
|
||
replace(
|
||
beat,
|
||
start_s=beat.start_s + start_s,
|
||
end_s=beat.start_s + end_s,
|
||
)
|
||
)
|
||
trims[beat.beat_id] = (start_s, island_duration_s)
|
||
continue
|
||
trimmed.append(beat)
|
||
return trimmed, trims
|
||
|
||
|
||
def _apply_single_island_segments(results: list, trims: dict[int, tuple[float, float]]) -> list:
|
||
"""Restore beat-relative segment metadata after matching a trimmed island."""
|
||
if not trims:
|
||
return results
|
||
|
||
from dataclasses import replace
|
||
from src.core.models import MatchSegment
|
||
|
||
expanded = []
|
||
for result in results:
|
||
trim = trims.get(result.beat_id)
|
||
if trim is None or getattr(result, "segments", ()):
|
||
expanded.append(result)
|
||
continue
|
||
trailer_offset_s, island_duration_s = trim
|
||
duration_s = min(max(0.0, island_duration_s), max(0.0, result.duration_s))
|
||
segment = MatchSegment(
|
||
trailer_offset_s=trailer_offset_s,
|
||
duration_s=duration_s,
|
||
scene_id=result.scene_id,
|
||
in_point_s=result.in_point_s,
|
||
out_point_s=result.in_point_s + duration_s,
|
||
match_score=result.match_score,
|
||
is_confirmed=result.is_confirmed,
|
||
)
|
||
expanded.append(
|
||
replace(
|
||
result,
|
||
out_point_s=result.in_point_s + duration_s,
|
||
segments=(segment,),
|
||
)
|
||
)
|
||
return expanded
|
||
|
||
|
||
def _merge_best_results(existing: list, candidates: list, cfg) -> list:
|
||
"""Merge matches by beat, preferring confirmed or higher-scoring results."""
|
||
by_id = {r.beat_id: r for r in existing}
|
||
for candidate in candidates:
|
||
old = by_id.get(candidate.beat_id)
|
||
if old is None:
|
||
by_id[candidate.beat_id] = candidate
|
||
continue
|
||
candidate_confirmed = candidate.match_score >= cfg.cv.deep_scan.match_threshold or candidate.is_confirmed
|
||
old_confirmed = old.match_score >= cfg.cv.deep_scan.match_threshold or old.is_confirmed
|
||
if (
|
||
candidate_confirmed and not old_confirmed
|
||
or candidate.match_score > old.match_score + cfg.cv.deep_scan.duration_tie_break_score_delta
|
||
or (
|
||
candidate.match_score >= old.match_score - cfg.cv.deep_scan.duration_tie_break_score_delta
|
||
and candidate.duration_s > old.duration_s
|
||
)
|
||
):
|
||
by_id[candidate.beat_id] = candidate
|
||
return sorted(by_id.values(), key=lambda r: r.beat_id)
|
||
|
||
|
||
def _recover_unmatched_beats_via_vision(results: list, beats: list, cfg) -> list:
|
||
"""Try a vision-led search for beats that ended up without a match.
|
||
|
||
For each unmatched beat that has scoreable visual content (i.e. not pure
|
||
fade/title-card material), this pass:
|
||
1. Asks the vibe-check (CV histogram + pHash) for the top-K candidate
|
||
scenes.
|
||
2. For each candidate, runs the semantic action-window search with the
|
||
beat's own description, prefering windows whose phase matches the
|
||
visible part of the beat.
|
||
3. Refines the in-point with the regular CV content/motion aligner.
|
||
4. Validates the resulting window with the vision phase check, exactly
|
||
like the main filter.
|
||
5. Adds the best validated candidate as a provisional MatchResult.
|
||
|
||
Confirmed and provisional matches both stay subject to the same thresholds
|
||
used elsewhere; this only adds matches that pass the same quality gates.
|
||
"""
|
||
if not cfg.vision.enabled or not beats:
|
||
return results
|
||
|
||
from dataclasses import replace
|
||
from src.cv.global_scan import align_in_point_by_content_and_motion, estimate_usable_source_duration
|
||
from src.cv.scene_indexer import build_scene_index
|
||
from src.cv.vibe_check import run_vibe_check
|
||
from src.core.models import MatchResult
|
||
from src.llm.vision_cache import find_action_window_in_scene, validate_match_window_with_vision
|
||
|
||
logger = logging.getLogger(__name__)
|
||
matched_ids = {r.beat_id for r in results}
|
||
unmatched = [b for b in beats if b.beat_id not in matched_ids]
|
||
if not unmatched:
|
||
return results
|
||
|
||
scenes = build_scene_index(cfg)
|
||
if not scenes:
|
||
return results
|
||
|
||
new_results = list(results)
|
||
for beat in unmatched:
|
||
try:
|
||
islands = _reference_scoreable_segments(beat, cfg)
|
||
except Exception:
|
||
islands = []
|
||
|
||
# Anchor selection: prefer the longest visible island; if none exists,
|
||
# fall back to the full beat. The latter handles dark / low-contrast
|
||
# close-ups that drop below the scoreable luma/contrast thresholds but
|
||
# are still semantically describable. The strict vision phase
|
||
# validation later in this pass keeps us from accepting pure title-card
|
||
# or logo material.
|
||
from dataclasses import replace as _replace
|
||
if islands:
|
||
anchor_start_s, anchor_end_s = max(islands, key=lambda iv: iv[1] - iv[0])
|
||
anchor_beat = _replace(
|
||
beat,
|
||
start_s=beat.start_s + anchor_start_s,
|
||
end_s=beat.start_s + anchor_end_s,
|
||
)
|
||
else:
|
||
anchor_beat = beat
|
||
|
||
try:
|
||
hits = run_vibe_check(
|
||
beat,
|
||
scenes,
|
||
top_k=max(cfg.cv.deep_scan.scene_seed_top_k, cfg.cv.vibe_check.top_k_candidates),
|
||
hist_method=cfg.cv.vibe_check.hist_compare_method,
|
||
phash_max_distance=64,
|
||
)
|
||
except Exception as exc:
|
||
logger.warning("Beat %d: recovery vibe-check failed (%s)", beat.beat_id, exc)
|
||
continue
|
||
|
||
scenes_by_id = {s.scene_id: s for s in scenes}
|
||
best = None # (score, scene, in_s, dur_s, reason)
|
||
seen = set()
|
||
for hit in hits[: cfg.cv.deep_scan.scene_seed_top_k]:
|
||
scene = scenes_by_id.get(hit.scene_id)
|
||
if scene is None or scene.scene_id in seen:
|
||
continue
|
||
seen.add(scene.scene_id)
|
||
|
||
try:
|
||
found = find_action_window_in_scene(anchor_beat, scene, cfg)
|
||
except Exception as exc:
|
||
logger.debug("Beat %d: action window failed for scene %d (%s)", beat.beat_id, scene.scene_id, exc)
|
||
continue
|
||
if found is None:
|
||
continue
|
||
start_s, end_s, semantic_score, reason = found
|
||
|
||
window_s = max(3.0, min(8.0, (end_s - start_s) * 4.0))
|
||
try:
|
||
aligned_in_s, combined_score, content_score, motion_score = align_in_point_by_content_and_motion(
|
||
anchor_beat,
|
||
start_s,
|
||
cfg,
|
||
search_window_s=window_s,
|
||
)
|
||
except Exception as exc:
|
||
logger.debug("Beat %d: align failed for scene %d (%s)", beat.beat_id, scene.scene_id, exc)
|
||
continue
|
||
aligned_in_s = max(scene.start_s, min(aligned_in_s, max(scene.start_s, scene.end_s - anchor_beat.duration_s)))
|
||
|
||
try:
|
||
usable_duration_s, usable_score = estimate_usable_source_duration(anchor_beat, aligned_in_s, cfg)
|
||
except Exception:
|
||
usable_duration_s, usable_score = anchor_beat.duration_s, 0.0
|
||
usable_duration_s = max(0.0, min(anchor_beat.duration_s, usable_duration_s))
|
||
if usable_duration_s < max(0.32, anchor_beat.duration_s * 0.45):
|
||
usable_duration_s = anchor_beat.duration_s
|
||
|
||
try:
|
||
ok, verify_reason = validate_match_window_with_vision(
|
||
anchor_beat,
|
||
source_path=scene.source_path,
|
||
scene_id=scene.scene_id,
|
||
in_point_s=aligned_in_s,
|
||
out_point_s=aligned_in_s + usable_duration_s,
|
||
cfg=cfg,
|
||
)
|
||
except Exception as exc:
|
||
logger.debug("Beat %d: validate failed scene=%d (%s)", beat.beat_id, scene.scene_id, exc)
|
||
continue
|
||
if not ok:
|
||
continue
|
||
|
||
final_score = max(
|
||
combined_score,
|
||
min(0.99, semantic_score * 0.65 + motion_score * 0.18 + content_score * 0.09 + usable_score * 0.08),
|
||
)
|
||
if final_score < cfg.cv.deep_scan.provisional_match_threshold:
|
||
continue
|
||
candidate = (final_score, scene, aligned_in_s, usable_duration_s, f"recovery; {reason}; {verify_reason}")
|
||
if best is None or candidate[0] > best[0]:
|
||
best = candidate
|
||
|
||
if best is None:
|
||
continue
|
||
score, scene, aligned_in_s, usable_duration_s, repair_reason = best
|
||
logger.info(
|
||
"Beat %d: recovered via vision action search scene=%d in=%.3fs score=%.3f (%s)",
|
||
beat.beat_id,
|
||
scene.scene_id,
|
||
aligned_in_s,
|
||
score,
|
||
repair_reason,
|
||
)
|
||
new_results.append(MatchResult(
|
||
beat_id=beat.beat_id,
|
||
scene_id=scene.scene_id,
|
||
source_path=scene.source_path,
|
||
in_point_s=aligned_in_s,
|
||
out_point_s=aligned_in_s + usable_duration_s,
|
||
in_point_frame=int(aligned_in_s * cfg.export.edl_frame_rate),
|
||
match_score=score,
|
||
match_location=(0, 0),
|
||
is_confirmed=score >= cfg.cv.deep_scan.match_threshold,
|
||
segments=tuple(),
|
||
))
|
||
|
||
return sorted(new_results, key=lambda r: r.beat_id)
|
||
|
||
|
||
def _filter_semantically_invalid_vision_matches(results: list, beats: list, cfg) -> list:
|
||
"""Drop vision-enabled matches whose final action phase contradicts the beat."""
|
||
if not cfg.vision.enabled or not results:
|
||
return results
|
||
|
||
from dataclasses import replace
|
||
from src.llm.vision_cache import find_action_window_in_scene, validate_match_window_with_vision
|
||
from src.cv.scene_indexer import build_scene_index
|
||
from src.cv.global_scan import align_in_point_by_content_and_motion, estimate_usable_source_duration
|
||
|
||
logger = logging.getLogger(__name__)
|
||
beats_by_id = {beat.beat_id: beat for beat in beats}
|
||
scenes_by_id = {scene.scene_id: scene for scene in build_scene_index(cfg)}
|
||
|
||
def visible_content_offset(action_beat, segment_start_offset_s: float) -> float:
|
||
content_offset_s = 0.0
|
||
for start_s, end_s in _reference_scoreable_segments(action_beat, cfg):
|
||
if end_s <= segment_start_offset_s:
|
||
content_offset_s += max(0.0, end_s - start_s)
|
||
elif start_s < segment_start_offset_s:
|
||
content_offset_s += max(0.0, segment_start_offset_s - start_s)
|
||
break
|
||
else:
|
||
break
|
||
return content_offset_s
|
||
|
||
def realign_window(check_beat, scene_id: int, action_beat=None):
|
||
scene = scenes_by_id.get(scene_id)
|
||
if scene is None:
|
||
return None
|
||
segment_window = find_action_window_in_scene(check_beat, scene, cfg)
|
||
if action_beat is not None and action_beat is not check_beat:
|
||
beat_window = find_action_window_in_scene(action_beat, scene, cfg)
|
||
else:
|
||
beat_window = None
|
||
use_beat_context = False
|
||
if segment_window is None:
|
||
found = beat_window
|
||
use_beat_context = beat_window is not None
|
||
elif beat_window is None:
|
||
found = segment_window
|
||
elif beat_window[2] > segment_window[2] + 0.06:
|
||
found = beat_window
|
||
use_beat_context = True
|
||
else:
|
||
found = segment_window
|
||
if found is None:
|
||
return None
|
||
start_s, end_s, semantic_score, reason = found
|
||
if use_beat_context:
|
||
segment_start_offset_s = max(0.0, check_beat.start_s - action_beat.start_s)
|
||
content_offset_s = visible_content_offset(action_beat, segment_start_offset_s)
|
||
start_s += content_offset_s
|
||
end_s += content_offset_s
|
||
window_s = max(3.0, min(8.0, (end_s - start_s) * 4.0))
|
||
aligned_in_s, combined_score, content_score, motion_score = align_in_point_by_content_and_motion(
|
||
check_beat,
|
||
start_s,
|
||
cfg,
|
||
search_window_s=window_s,
|
||
)
|
||
aligned_in_s = max(scene.start_s, min(aligned_in_s, max(scene.start_s, scene.end_s - check_beat.duration_s)))
|
||
usable_duration_s, usable_score = estimate_usable_source_duration(check_beat, aligned_in_s, cfg)
|
||
usable_duration_s = max(0.0, min(check_beat.duration_s, usable_duration_s))
|
||
if usable_duration_s < max(0.32, check_beat.duration_s * 0.45):
|
||
usable_duration_s = check_beat.duration_s
|
||
ok, verify_reason = validate_match_window_with_vision(
|
||
check_beat,
|
||
source_path=scene.source_path,
|
||
scene_id=scene.scene_id,
|
||
in_point_s=aligned_in_s,
|
||
out_point_s=aligned_in_s + usable_duration_s,
|
||
cfg=cfg,
|
||
)
|
||
if not ok:
|
||
logger.info(
|
||
"Beat %d: action-window realign rejected scene=%d in=%.3fs (%s)",
|
||
check_beat.beat_id,
|
||
scene.scene_id,
|
||
aligned_in_s,
|
||
verify_reason,
|
||
)
|
||
return None
|
||
score = max(
|
||
combined_score,
|
||
min(0.99, semantic_score * 0.65 + motion_score * 0.18 + content_score * 0.09 + usable_score * 0.08),
|
||
)
|
||
return scene, aligned_in_s, usable_duration_s, score, f"{reason}; {verify_reason}"
|
||
|
||
kept = []
|
||
for result in results:
|
||
beat = beats_by_id.get(result.beat_id)
|
||
if beat is None:
|
||
kept.append(result)
|
||
continue
|
||
|
||
kept_before = len(kept)
|
||
try:
|
||
_filter_repair_one(result, beat, beats_by_id, scenes_by_id, kept, cfg, realign_window, validate_match_window_with_vision, logger)
|
||
except Exception as exc:
|
||
logger.warning(
|
||
"Beat %d: vision filter/repair failed (%s); keeping previous cached match.",
|
||
result.beat_id,
|
||
exc,
|
||
)
|
||
del kept[kept_before:]
|
||
kept.append(result)
|
||
return kept
|
||
|
||
|
||
def _filter_repair_one(result, beat, beats_by_id, scenes_by_id, kept, cfg, realign_window, validate_match_window_with_vision, logger):
|
||
from dataclasses import replace
|
||
if True:
|
||
windows = []
|
||
if getattr(result, "segments", ()):
|
||
for segment in result.segments:
|
||
segment_beat = replace(
|
||
beat,
|
||
start_s=beat.start_s + segment.trailer_offset_s,
|
||
end_s=beat.start_s + segment.trailer_offset_s + segment.duration_s,
|
||
)
|
||
windows.append((
|
||
segment_beat,
|
||
segment.scene_id,
|
||
segment.in_point_s,
|
||
segment.out_point_s,
|
||
))
|
||
else:
|
||
windows.append((beat, result.scene_id, result.in_point_s, result.out_point_s))
|
||
|
||
valid = True
|
||
reasons: list[str] = []
|
||
for check_beat, scene_id, in_point_s, out_point_s in windows:
|
||
ok, reason = validate_match_window_with_vision(
|
||
check_beat,
|
||
source_path=result.source_path,
|
||
scene_id=scene_id,
|
||
in_point_s=in_point_s,
|
||
out_point_s=out_point_s,
|
||
cfg=cfg,
|
||
)
|
||
reasons.append(reason)
|
||
if not ok:
|
||
valid = False
|
||
break
|
||
if valid:
|
||
repaired = False
|
||
if getattr(result, "segments", ()):
|
||
new_segments = []
|
||
repair_reasons = []
|
||
changed = False
|
||
for segment in result.segments:
|
||
scene = scenes_by_id.get(segment.scene_id)
|
||
# Allow phase-realign whenever the scene has any meaningful
|
||
# slack beyond the segment, not only for "long" scenes.
|
||
# Short scenes don't need realigning because the segment
|
||
# essentially is the scene.
|
||
if scene is None or scene.duration_s <= segment.duration_s + 0.5:
|
||
new_segments.append(segment)
|
||
continue
|
||
# For already-confirmed segments, skip the realign to avoid
|
||
# destabilizing a strong original match.
|
||
if segment.is_confirmed and scene.duration_s <= max(segment.duration_s * 1.6, 6.0):
|
||
new_segments.append(segment)
|
||
continue
|
||
segment_beat = replace(
|
||
beat,
|
||
start_s=beat.start_s + segment.trailer_offset_s,
|
||
end_s=beat.start_s + segment.trailer_offset_s + segment.duration_s,
|
||
)
|
||
repair = realign_window(segment_beat, segment.scene_id, action_beat=beat)
|
||
if repair is None:
|
||
new_segments.append(segment)
|
||
continue
|
||
repair_scene, aligned_in_s, usable_duration_s, score, repair_reason = repair
|
||
if abs(aligned_in_s - segment.in_point_s) <= 1.0 / cfg.export.edl_frame_rate:
|
||
new_segments.append(segment)
|
||
continue
|
||
# Don't commit a repair that scores meaningfully worse than
|
||
# the original; phase realign should improve, not regress.
|
||
if score < segment.match_score - 0.02:
|
||
new_segments.append(segment)
|
||
continue
|
||
changed = True
|
||
repair_reasons.append(repair_reason)
|
||
new_segments.append(replace(
|
||
segment,
|
||
scene_id=repair_scene.scene_id,
|
||
in_point_s=aligned_in_s,
|
||
out_point_s=aligned_in_s + usable_duration_s,
|
||
duration_s=usable_duration_s,
|
||
match_score=score,
|
||
is_confirmed=score >= cfg.cv.deep_scan.match_threshold,
|
||
))
|
||
if changed and new_segments:
|
||
first = new_segments[0]
|
||
repaired_score = min(seg.match_score for seg in new_segments)
|
||
logger.info(
|
||
"Beat %d: realigned semantically valid long scene by motion/action windows (%s)",
|
||
result.beat_id,
|
||
"; ".join(repair_reasons),
|
||
)
|
||
kept.append(replace(
|
||
result,
|
||
scene_id=first.scene_id,
|
||
in_point_s=first.in_point_s,
|
||
out_point_s=first.out_point_s,
|
||
in_point_frame=int(first.in_point_s * cfg.export.edl_frame_rate),
|
||
match_score=repaired_score,
|
||
is_confirmed=repaired_score >= cfg.cv.deep_scan.match_threshold,
|
||
segments=tuple(new_segments),
|
||
))
|
||
repaired = True
|
||
else:
|
||
scene = scenes_by_id.get(result.scene_id)
|
||
wide_scene = (
|
||
scene is not None
|
||
and scene.duration_s > result.duration_s + 0.5
|
||
)
|
||
already_confirmed_in_tight_scene = (
|
||
result.is_confirmed
|
||
and scene is not None
|
||
and scene.duration_s <= max(result.duration_s * 1.6, 6.0)
|
||
)
|
||
if wide_scene and not already_confirmed_in_tight_scene:
|
||
repair = realign_window(beat, result.scene_id)
|
||
if repair is not None:
|
||
repair_scene, aligned_in_s, usable_duration_s, score, repair_reason = repair
|
||
moved = abs(aligned_in_s - result.in_point_s) > 1.0 / cfg.export.edl_frame_rate
|
||
improved = score >= result.match_score - 0.02
|
||
if moved and improved:
|
||
logger.info(
|
||
"Beat %d: realigned semantically valid long scene by motion/action window (%s)",
|
||
result.beat_id,
|
||
repair_reason,
|
||
)
|
||
kept.append(replace(
|
||
result,
|
||
scene_id=repair_scene.scene_id,
|
||
in_point_s=aligned_in_s,
|
||
out_point_s=aligned_in_s + usable_duration_s,
|
||
in_point_frame=int(aligned_in_s * cfg.export.edl_frame_rate),
|
||
match_score=score,
|
||
is_confirmed=score >= cfg.cv.deep_scan.match_threshold,
|
||
))
|
||
repaired = True
|
||
if not repaired:
|
||
kept.append(result)
|
||
else:
|
||
if getattr(result, "segments", ()):
|
||
new_segments = []
|
||
all_repaired = True
|
||
repair_reasons = []
|
||
for segment in result.segments:
|
||
segment_beat = replace(
|
||
beat,
|
||
start_s=beat.start_s + segment.trailer_offset_s,
|
||
end_s=beat.start_s + segment.trailer_offset_s + segment.duration_s,
|
||
)
|
||
repair = realign_window(segment_beat, segment.scene_id, action_beat=beat)
|
||
if repair is None:
|
||
all_repaired = False
|
||
break
|
||
scene, aligned_in_s, usable_duration_s, score, repair_reason = repair
|
||
repair_reasons.append(repair_reason)
|
||
new_segments.append(replace(
|
||
segment,
|
||
scene_id=scene.scene_id,
|
||
in_point_s=aligned_in_s,
|
||
out_point_s=aligned_in_s + usable_duration_s,
|
||
duration_s=usable_duration_s,
|
||
match_score=score,
|
||
is_confirmed=score >= cfg.cv.deep_scan.match_threshold,
|
||
))
|
||
if all_repaired and new_segments:
|
||
first = new_segments[0]
|
||
repaired_score = min(seg.match_score for seg in new_segments)
|
||
logger.info(
|
||
"Beat %d: realigned inside matched scene by vision action windows (%s)",
|
||
result.beat_id,
|
||
"; ".join(repair_reasons),
|
||
)
|
||
kept.append(replace(
|
||
result,
|
||
scene_id=first.scene_id,
|
||
in_point_s=first.in_point_s,
|
||
out_point_s=first.out_point_s,
|
||
in_point_frame=int(first.in_point_s * cfg.export.edl_frame_rate),
|
||
match_score=repaired_score,
|
||
is_confirmed=repaired_score >= cfg.cv.deep_scan.match_threshold,
|
||
segments=tuple(new_segments),
|
||
))
|
||
return
|
||
else:
|
||
repair = realign_window(beat, result.scene_id)
|
||
if repair is not None:
|
||
scene, aligned_in_s, usable_duration_s, score, repair_reason = repair
|
||
logger.info(
|
||
"Beat %d: realigned inside matched scene by vision action window (%s)",
|
||
result.beat_id,
|
||
repair_reason,
|
||
)
|
||
kept.append(replace(
|
||
result,
|
||
scene_id=scene.scene_id,
|
||
in_point_s=aligned_in_s,
|
||
out_point_s=aligned_in_s + usable_duration_s,
|
||
in_point_frame=int(aligned_in_s * cfg.export.edl_frame_rate),
|
||
match_score=score,
|
||
is_confirmed=score >= cfg.cv.deep_scan.match_threshold,
|
||
))
|
||
return
|
||
logger.warning(
|
||
"Beat %d: rejected by vision action-phase verification (%s)",
|
||
result.beat_id,
|
||
"; ".join(reasons),
|
||
)
|
||
|
||
|
||
def _attach_visual_segments(results: list, beats: list, cfg) -> list:
|
||
"""Attach automatic sub-shot matches for multi-island trailer beats."""
|
||
from dataclasses import replace
|
||
from src.core.models import MatchResult, MatchSegment
|
||
from src.cv.global_scan import run_global_scan
|
||
|
||
by_id = {b.beat_id: b for b in beats}
|
||
expanded: list[MatchResult] = []
|
||
for result in results:
|
||
beat = by_id.get(result.beat_id)
|
||
if beat is None:
|
||
expanded.append(result)
|
||
continue
|
||
if getattr(result, "segments", ()):
|
||
expanded.append(result)
|
||
continue
|
||
|
||
islands = _reference_scoreable_segments(beat, cfg)
|
||
if len(islands) <= 1:
|
||
primary = MatchSegment(
|
||
trailer_offset_s=0.0,
|
||
duration_s=max(0.0, result.duration_s),
|
||
scene_id=result.scene_id,
|
||
in_point_s=result.in_point_s,
|
||
out_point_s=result.out_point_s,
|
||
match_score=result.match_score,
|
||
is_confirmed=result.is_confirmed,
|
||
)
|
||
expanded.append(replace(result, segments=(primary,)))
|
||
continue
|
||
|
||
segments: list[MatchSegment] = []
|
||
first_start, first_end = islands[0]
|
||
first_duration = min(max(0.0, result.duration_s), max(0.0, first_end - first_start))
|
||
segments.append(
|
||
MatchSegment(
|
||
trailer_offset_s=first_start,
|
||
duration_s=first_duration,
|
||
scene_id=result.scene_id,
|
||
in_point_s=result.in_point_s,
|
||
out_point_s=result.in_point_s + first_duration,
|
||
match_score=result.match_score,
|
||
is_confirmed=result.is_confirmed,
|
||
)
|
||
)
|
||
|
||
for start_s, end_s in islands[1:]:
|
||
segment_beat = replace(
|
||
beat,
|
||
start_s=beat.start_s + start_s,
|
||
end_s=beat.start_s + end_s,
|
||
)
|
||
segment_matches = run_global_scan([segment_beat], cfg, seed_in_points=None)
|
||
if not segment_matches:
|
||
continue
|
||
seg = segment_matches[0]
|
||
seg_dur = min(max(0.0, end_s - start_s), max(0.0, seg.duration_s))
|
||
segments.append(
|
||
MatchSegment(
|
||
trailer_offset_s=start_s,
|
||
duration_s=seg_dur,
|
||
scene_id=seg.scene_id,
|
||
in_point_s=seg.in_point_s,
|
||
out_point_s=seg.in_point_s + seg_dur,
|
||
match_score=seg.match_score,
|
||
is_confirmed=seg.is_confirmed,
|
||
)
|
||
)
|
||
|
||
expanded.append(replace(result, segments=tuple(segments)))
|
||
return expanded
|
||
|
||
|
||
def _fast_vision_match_cfg(cfg):
|
||
"""Return a vision-seed prepass config that still keeps quality settings."""
|
||
from dataclasses import replace
|
||
|
||
return replace(
|
||
cfg,
|
||
cv=replace(
|
||
cfg.cv,
|
||
deep_scan=replace(cfg.cv.deep_scan, skip_coarse_scan_with_weighted_seeds=True),
|
||
),
|
||
vision=replace(
|
||
cfg.vision,
|
||
fullscan_fallback=False,
|
||
),
|
||
)
|
||
|
||
|
||
def _run_segment_match(segment_beat, continuity, cfg, allow_fullscan: bool = True):
|
||
"""Match one visual island with the same generic staged strategy as a beat."""
|
||
from src.pipeline.matcher import run_matching
|
||
|
||
if cfg.vision.enabled:
|
||
fast_cfg = _fast_vision_match_cfg(cfg)
|
||
fast_matches = run_matching(
|
||
fast_cfg,
|
||
[segment_beat],
|
||
seed_in_points=continuity,
|
||
)
|
||
if fast_matches:
|
||
if not allow_fullscan or all(
|
||
m.is_confirmed or m.match_score >= cfg.cv.deep_scan.match_threshold
|
||
for m in fast_matches
|
||
):
|
||
return fast_matches
|
||
|
||
if not allow_fullscan:
|
||
return fast_matches if cfg.vision.enabled else []
|
||
|
||
full_matches = run_matching(
|
||
cfg,
|
||
[segment_beat],
|
||
seed_in_points=continuity,
|
||
)
|
||
return _merge_best_results(fast_matches if cfg.vision.enabled else [], full_matches, cfg)
|
||
|
||
|
||
def _match_unmatched_visual_segments(
|
||
results: list,
|
||
beats: list,
|
||
cached: list,
|
||
cfg,
|
||
skip_global_segment_scan_for: set[int] | None = None,
|
||
) -> list:
|
||
"""Create segmented provisional matches when a whole beat has no single match."""
|
||
from dataclasses import replace
|
||
from src.core.models import MatchResult, MatchSegment
|
||
from src.cv.frame_extractor import get_video_info
|
||
|
||
matched_ids = {r.beat_id for r in results}
|
||
expanded = list(results)
|
||
skip_global_segment_scan_for = skip_global_segment_scan_for or set()
|
||
try:
|
||
fps = float(get_video_info(cfg.paths.source_movie)["fps"]) or cfg.export.edl_frame_rate
|
||
except Exception:
|
||
fps = cfg.export.edl_frame_rate
|
||
|
||
for beat in beats:
|
||
if beat.beat_id in matched_ids:
|
||
continue
|
||
|
||
# Per-shot matching when the beat has either fade-bounded islands
|
||
# OR internal hard cuts; each shot becomes its own MatchSegment.
|
||
islands = _reference_shot_segments(beat, cfg)
|
||
if not islands:
|
||
continue
|
||
|
||
segments: list[MatchSegment] = []
|
||
for start_s, end_s in islands:
|
||
segment_beat = replace(
|
||
beat,
|
||
start_s=beat.start_s + start_s,
|
||
end_s=beat.start_s + end_s,
|
||
)
|
||
continuity = _continuity_seed_in_points(
|
||
beat.beat_id,
|
||
[b if b.beat_id != beat.beat_id else segment_beat for b in beats],
|
||
cached + expanded,
|
||
cfg,
|
||
)
|
||
segment_matches = []
|
||
if beat.beat_id not in skip_global_segment_scan_for:
|
||
segment_matches = _run_segment_match(segment_beat, continuity, cfg, allow_fullscan=True)
|
||
if not segment_matches:
|
||
local_segment = _local_same_scene_segment_match(
|
||
segment_beat,
|
||
beat,
|
||
start_s,
|
||
cached + expanded,
|
||
cfg,
|
||
)
|
||
if local_segment is not None:
|
||
segments.append(local_segment)
|
||
continue
|
||
seg = segment_matches[0]
|
||
seg_dur = min(max(0.0, end_s - start_s), max(0.0, seg.duration_s))
|
||
segments.append(
|
||
MatchSegment(
|
||
trailer_offset_s=start_s,
|
||
duration_s=seg_dur,
|
||
scene_id=seg.scene_id,
|
||
in_point_s=seg.in_point_s,
|
||
out_point_s=seg.in_point_s + seg_dur,
|
||
match_score=seg.match_score,
|
||
is_confirmed=seg.is_confirmed,
|
||
)
|
||
)
|
||
|
||
if not segments:
|
||
continue
|
||
|
||
first = segments[0]
|
||
total_segment_duration = sum(max(0.0, s.duration_s) for s in segments)
|
||
score = (
|
||
sum(max(0.0, s.duration_s) * s.match_score for s in segments) / total_segment_duration
|
||
if total_segment_duration > 0 else min(s.match_score for s in segments)
|
||
)
|
||
expanded.append(
|
||
MatchResult(
|
||
beat_id=beat.beat_id,
|
||
scene_id=first.scene_id,
|
||
source_path=cfg.paths.source_movie,
|
||
in_point_s=first.in_point_s,
|
||
out_point_s=first.out_point_s,
|
||
in_point_frame=int(max(0.0, first.in_point_s) * fps),
|
||
match_score=score,
|
||
is_confirmed=all(s.is_confirmed for s in segments),
|
||
segments=tuple(segments),
|
||
)
|
||
)
|
||
|
||
return expanded
|
||
|
||
|
||
def _local_same_scene_segment_match(segment_beat, beat, segment_offset_s: float, cached: list, cfg):
|
||
"""Find a short trailer island inside scenes adjacent to neighbouring beat matches."""
|
||
from src.core.models import MatchSegment
|
||
from src.cv.frame_extractor import open_video
|
||
from src.cv.global_scan import _content_alignment_score, _content_alignment_templates
|
||
|
||
scenes = _load_scene_cache_light(cfg)
|
||
if not scenes:
|
||
return None
|
||
|
||
by_id = {r.beat_id: r for r in cached}
|
||
scene_ids: list[int] = []
|
||
for neighbour_id in (beat.beat_id - 1, beat.beat_id + 1):
|
||
result = by_id.get(neighbour_id)
|
||
if result is None:
|
||
continue
|
||
ids = [getattr(s, "scene_id", result.scene_id) for s in getattr(result, "segments", ())] or [result.scene_id]
|
||
for scene_id in ids:
|
||
if scene_id not in scene_ids:
|
||
scene_ids.append(scene_id)
|
||
|
||
if not scene_ids:
|
||
return None
|
||
|
||
templates = _content_alignment_templates(segment_beat, cfg)
|
||
if not templates:
|
||
return None
|
||
|
||
min_score = min(
|
||
cfg.cv.deep_scan.provisional_content_threshold * 0.70,
|
||
cfg.cv.deep_scan.provisional_match_threshold,
|
||
)
|
||
step_s = max(1.0 / cfg.export.edl_frame_rate, 0.04)
|
||
best: tuple[float, float, int] | None = None
|
||
with open_video(cfg.paths.source_movie) as cap:
|
||
for scene_id in scene_ids:
|
||
scene = next((s for s in scenes if int(s["scene_id"]) == int(scene_id)), None)
|
||
if scene is None:
|
||
continue
|
||
start_s = max(0.0, float(scene["start_s"]) - 0.25)
|
||
end_s = max(start_s, float(scene["end_s"]) - max(0.04, segment_beat.duration_s) + 0.25)
|
||
t = start_s
|
||
while t <= end_s:
|
||
score = _content_alignment_score(cap, t, templates, cfg)
|
||
if best is None or score > best[0]:
|
||
best = (score, t, int(scene_id))
|
||
t = round(t + step_s, 6)
|
||
|
||
if best is None or best[0] < min_score:
|
||
return None
|
||
|
||
score, in_point_s, scene_id = best
|
||
duration_s = max(0.0, min(segment_beat.duration_s, segment_beat.end_s - segment_beat.start_s))
|
||
return MatchSegment(
|
||
trailer_offset_s=segment_offset_s,
|
||
duration_s=duration_s,
|
||
scene_id=scene_id,
|
||
in_point_s=in_point_s,
|
||
out_point_s=in_point_s + duration_s,
|
||
match_score=score,
|
||
is_confirmed=score >= cfg.cv.deep_scan.match_threshold,
|
||
)
|
||
|
||
|
||
def cmd_match(args: argparse.Namespace, cfg) -> list:
|
||
from src.pipeline.matcher import run_matching
|
||
from dataclasses import replace
|
||
|
||
if getattr(args, "vision", False):
|
||
cfg = replace(cfg, vision=replace(cfg.vision, enabled=True))
|
||
if getattr(args, "no_vision", False):
|
||
cfg = replace(cfg, vision=replace(cfg.vision, enabled=False))
|
||
|
||
all_beats = _load_beats(cfg)
|
||
beats = _select_beats(all_beats, getattr(args, "beat", None))
|
||
cached = _normalize_cached_results(all_beats, _load_results(cfg), cfg) if _results_cache_path(cfg).exists() else []
|
||
# Multi-shot beats: either fade-bounded multiple islands, OR a single
|
||
# island with internal hard cuts (e.g. man-shot then back to woman). Both
|
||
# cases are routed through the per-segment match path so each shot gets
|
||
# its own source clip instead of being approximated by one continuous
|
||
# span.
|
||
multi_island_beat_ids = {
|
||
beat.beat_id
|
||
for beat in beats
|
||
if len(_reference_shot_segments(beat, cfg)) > 1
|
||
}
|
||
scan_beats, single_island_trims = _trim_beats_to_single_visual_island(beats, cfg)
|
||
scan_beats = [b for b in scan_beats if b.beat_id not in multi_island_beat_ids]
|
||
seed_in_points = (
|
||
_continuity_seed_in_points(args.beat, all_beats, cached, cfg)
|
||
if getattr(args, "beat", None) is not None
|
||
else None
|
||
)
|
||
results = []
|
||
if cfg.vision.enabled:
|
||
fast_cfg = _fast_vision_match_cfg(cfg)
|
||
results = run_matching(
|
||
fast_cfg,
|
||
scan_beats,
|
||
force_reindex=args.force_reindex,
|
||
seed_in_points=seed_in_points,
|
||
)
|
||
|
||
if len(results) < len(scan_beats) or any(
|
||
not r.is_confirmed and r.match_score < cfg.cv.deep_scan.match_threshold
|
||
for r in results
|
||
):
|
||
results_by_id = {r.beat_id: r for r in results}
|
||
remaining_beats = [
|
||
b for b in scan_beats
|
||
if (
|
||
b.beat_id not in results_by_id
|
||
or (
|
||
not results_by_id[b.beat_id].is_confirmed
|
||
and results_by_id[b.beat_id].match_score < cfg.cv.deep_scan.match_threshold
|
||
)
|
||
)
|
||
]
|
||
if remaining_beats:
|
||
full_results = run_matching(
|
||
cfg,
|
||
remaining_beats,
|
||
force_reindex=args.force_reindex,
|
||
seed_in_points=seed_in_points,
|
||
)
|
||
results = _merge_best_results(results, full_results, cfg)
|
||
results = _apply_single_island_segments(results, single_island_trims)
|
||
results = _match_unmatched_visual_segments(
|
||
results,
|
||
beats,
|
||
cached,
|
||
cfg,
|
||
skip_global_segment_scan_for=set(single_island_trims),
|
||
)
|
||
results = _attach_visual_segments(results, beats, cfg)
|
||
results = _filter_semantically_invalid_vision_matches(results, beats, cfg)
|
||
results = _recover_unmatched_beats_via_vision(results, beats, cfg)
|
||
|
||
# A targeted one-beat match must NEVER delete or modify any other beat's
|
||
# cache entry. We deliberately re-load the raw cache from disk here so
|
||
# the upstream normalisation pass (which drops entries that no longer
|
||
# pass current quality gates) cannot leak into the save: only the
|
||
# targeted beat's slot gets replaced, every other entry is written back
|
||
# bit-for-bit identical to what it was before this run.
|
||
if getattr(args, "beat", None) is not None and _results_cache_path(cfg).exists():
|
||
raw_cached = _load_results(cfg)
|
||
raw_cached = [r for r in raw_cached if r.beat_id != args.beat]
|
||
for result in results:
|
||
raw_cached = _update_result(result, raw_cached)
|
||
results_to_save = sorted(raw_cached, key=lambda r: r.beat_id)
|
||
else:
|
||
results_to_save = results
|
||
|
||
_save_results(results_to_save, cfg)
|
||
_regenerate_cutter_report(cfg)
|
||
|
||
print(f"\n✅ {len(results)} / {len(beats)} beats matched.")
|
||
for r in results:
|
||
print(f" Beat {r.beat_id:03d} → scene {r.scene_id:04d} "
|
||
f"in={r.in_point_s:>8.3f}s score={r.match_score:.3f}")
|
||
return results
|
||
|
||
|
||
def _update_result(new_result, results: list) -> list:
|
||
"""Replace or insert a MatchResult in the list (by beat_id)."""
|
||
updated = [r for r in results if r.beat_id != new_result.beat_id]
|
||
updated.append(new_result)
|
||
return sorted(updated, key=lambda r: r.beat_id)
|
||
|
||
|
||
def _continuity_seed_in_points(beat_id: int, beats: list, results: list, cfg) -> dict[int, list[float | tuple[float, float]]]:
|
||
beats_by_id = {b.beat_id: b for b in beats}
|
||
results_by_id = {r.beat_id: r for r in results}
|
||
target = beats_by_id.get(beat_id)
|
||
if target is None:
|
||
return {}
|
||
|
||
seeds: list[tuple[float, float]] = []
|
||
base_score = max(cfg.cv.deep_scan.coarse_candidate_threshold + 0.08, 0.92)
|
||
prev_matches = [
|
||
(b, results_by_id[b.beat_id])
|
||
for b in beats
|
||
if b.beat_id < beat_id and b.beat_id in results_by_id
|
||
]
|
||
if prev_matches:
|
||
prev_beat, prev_result = max(prev_matches, key=lambda item: item[0].beat_id)
|
||
trailer_gap_s = max(0.0, target.start_s - prev_beat.end_s)
|
||
expected = prev_result.out_point_s + trailer_gap_s
|
||
for offset in cfg.cv.deep_scan.continuity_seed_offsets_s:
|
||
offset_score = max(
|
||
cfg.cv.deep_scan.coarse_candidate_threshold,
|
||
base_score - abs(offset) * 0.06,
|
||
)
|
||
seeds.append((expected + offset, offset_score))
|
||
|
||
next_matches = [
|
||
(b, results_by_id[b.beat_id])
|
||
for b in beats
|
||
if b.beat_id > beat_id and b.beat_id in results_by_id
|
||
]
|
||
if next_matches:
|
||
next_beat, next_result = min(next_matches, key=lambda item: item[0].beat_id)
|
||
trailer_gap_s = max(0.0, next_beat.start_s - target.end_s)
|
||
expected = next_result.in_point_s - trailer_gap_s - target.duration_s
|
||
for offset in cfg.cv.deep_scan.continuity_seed_offsets_s:
|
||
offset_score = max(
|
||
cfg.cv.deep_scan.coarse_candidate_threshold,
|
||
base_score - abs(offset) * 0.06,
|
||
)
|
||
seeds.append((expected - offset, offset_score))
|
||
|
||
unique: dict[float, float] = {}
|
||
for seed_t, seed_score in seeds:
|
||
rounded = round(max(0.0, seed_t), 3)
|
||
unique[rounded] = max(unique.get(rounded, 0.0), seed_score)
|
||
points = [(seed_t, score) for seed_t, score in sorted(unique.items())]
|
||
return {beat_id: points} if points else {}
|
||
|
||
|
||
def cmd_rematch(args: argparse.Namespace, cfg) -> None:
|
||
"""
|
||
Re-run automatic matching for ONE beat.
|
||
|
||
python cli.py rematch --beat 5 # re-scan CV for beat 5
|
||
python cli.py rematch --beat 5 --threshold 0.40 # relax threshold
|
||
"""
|
||
|
||
beat_id = args.beat
|
||
beats = _load_beats(cfg)
|
||
results = _load_results(cfg) if _results_cache_path(cfg).exists() else []
|
||
|
||
beat = next((b for b in beats if b.beat_id == beat_id), None)
|
||
if beat is None:
|
||
print(f"\u274c Beat {beat_id} not found. Run 'analyze' first.")
|
||
return
|
||
|
||
# ---- Refine an already acceptable cached match -------------------------
|
||
if args.refine:
|
||
current = next((r for r in results if r.beat_id == beat_id), None)
|
||
if current is None:
|
||
print(f"❌ Beat {beat_id} has no cached match to refine. Run 'match --beat {beat_id}' first.")
|
||
return
|
||
|
||
from src.cv.content_align import align_cached_match_by_content
|
||
refined_in_s, sequence_score = align_cached_match_by_content(
|
||
beat,
|
||
current.in_point_s,
|
||
cfg,
|
||
search_window_s=args.refine_window,
|
||
)
|
||
usable_duration_s = max(0.0, current.out_point_s - current.in_point_s)
|
||
span_score = sequence_score
|
||
scene_data = _scene_for_time_light(_load_scene_cache_light(cfg), refined_in_s, cfg)
|
||
out_point_s = refined_in_s + usable_duration_s
|
||
if scene_data is not None:
|
||
out_point_s = min(out_point_s, float(scene_data["end_s"]))
|
||
matchable_duration_s = beat.duration_s
|
||
duration_coverage = (
|
||
max(0.0, out_point_s - refined_in_s) / matchable_duration_s
|
||
if matchable_duration_s > 0 else 0.0
|
||
)
|
||
if duration_coverage < cfg.cv.deep_scan.min_duration_coverage:
|
||
print(
|
||
f"❌ Beat {beat_id} refined candidate rejected: "
|
||
f"duration coverage {duration_coverage:.0%} < "
|
||
f"{cfg.cv.deep_scan.min_duration_coverage:.0%}"
|
||
)
|
||
return
|
||
|
||
try:
|
||
from src.cv.frame_extractor import get_video_info
|
||
fps = float(get_video_info(cfg.paths.source_movie)["fps"]) or cfg.export.edl_frame_rate
|
||
except Exception:
|
||
fps = cfg.export.edl_frame_rate
|
||
|
||
from src.core.models import MatchResult
|
||
refined = MatchResult(
|
||
beat_id=beat_id,
|
||
scene_id=int(scene_data["scene_id"]) if scene_data is not None else current.scene_id,
|
||
source_path=current.source_path,
|
||
in_point_s=max(0.0, refined_in_s),
|
||
out_point_s=out_point_s,
|
||
in_point_frame=int(max(0.0, refined_in_s) * fps),
|
||
match_score=max(sequence_score, span_score),
|
||
match_location=current.match_location,
|
||
is_confirmed=max(sequence_score, span_score) >= cfg.cv.deep_scan.match_threshold,
|
||
)
|
||
results = _update_result(refined, results)
|
||
_save_results(results, cfg)
|
||
print(
|
||
f"✅ Beat {beat_id} refined → "
|
||
f"in={refined.in_point_s:.3f}s, out={refined.out_point_s:.3f}s, "
|
||
f"sequence_score={refined.match_score:.3f}"
|
||
)
|
||
return
|
||
|
||
# ---- Re-run CV with optional threshold override ------------------------
|
||
from dataclasses import replace as dc_replace
|
||
run_cfg = cfg
|
||
if args.threshold is not None:
|
||
run_cfg = dc_replace(
|
||
cfg,
|
||
cv=dc_replace(
|
||
cfg.cv,
|
||
deep_scan=dc_replace(cfg.cv.deep_scan, match_threshold=args.threshold),
|
||
),
|
||
)
|
||
print(f"ℹ️ threshold overridden to {args.threshold} for beat {beat_id}")
|
||
|
||
from src.cv.global_scan import run_global_scan
|
||
seed_in_points = _continuity_seed_in_points(beat_id, beats, results, run_cfg)
|
||
matches = run_global_scan([beat], run_cfg, seed_in_points=seed_in_points)
|
||
|
||
if not matches:
|
||
print(f"❌ Beat {beat_id}: no match. Try --threshold 0.40.")
|
||
return
|
||
|
||
match = matches[0]
|
||
results = _update_result(match, results)
|
||
_save_results(results, cfg)
|
||
print(f"✅ Beat {beat_id} rematched → (in={match.in_point_s:.3f}s, score={match.match_score:.3f})")
|
||
|
||
|
||
def cmd_report(args: argparse.Namespace, cfg) -> None:
|
||
from src.pipeline.reporter import generate_report
|
||
beats = _select_beats(_load_beats(cfg), getattr(args, "beat", None))
|
||
beat_ids = {b.beat_id for b in beats} if getattr(args, "beat", None) is not None else None
|
||
results = _select_results(_normalize_cached_results(_load_beats(cfg), _load_results(cfg), cfg), beat_ids)
|
||
out = generate_report(beats, results, cfg)
|
||
if getattr(args, "beat", None) is not None and not results:
|
||
print(
|
||
f"\n⚠️ Beat {args.beat} has no cached match yet. "
|
||
f"Run: python cli.py match --beat {args.beat}"
|
||
)
|
||
print(f"\n\u2705 Report \u2192 {out}")
|
||
|
||
|
||
def cmd_export(args: argparse.Namespace, cfg) -> None:
|
||
from src.export.edl_writer import write_edl
|
||
from src.export.fcpxml_writer import write_fcpxml
|
||
from src.pipeline.matcher import build_timeline
|
||
|
||
beats = _select_beats(_load_beats(cfg), getattr(args, "beat", None))
|
||
beat_ids = {b.beat_id for b in beats} if getattr(args, "beat", None) is not None else None
|
||
results = _select_results(_normalize_cached_results(_load_beats(cfg), _load_results(cfg), cfg), beat_ids)
|
||
if getattr(args, "beat", None) is not None and not results:
|
||
print(f"❌ Beat {args.beat} has no cached match. Run 'match --beat {args.beat}' first.")
|
||
return
|
||
timeline = build_timeline(beats, results, cfg)
|
||
|
||
fmt = args.format or cfg.export.output_format
|
||
beat_id = getattr(args, "beat", None)
|
||
out_stem = (
|
||
f"{cfg.paths.reference_trailer.stem}_beat_{beat_id:03d}"
|
||
if beat_id is not None
|
||
else timeline.title
|
||
)
|
||
|
||
if fmt in ("fcpxml", "both"):
|
||
out = write_fcpxml(timeline, cfg, output_path=cfg.paths.output_dir / f"{out_stem}.fcpxml")
|
||
print(f"✅ FCPXML → {out}")
|
||
|
||
if fmt in ("edl", "both"):
|
||
out = write_edl(timeline, cfg, output_path=cfg.paths.output_dir / f"{out_stem}.edl")
|
||
print(f"✅ EDL → {out}")
|
||
|
||
|
||
def cmd_run(args: argparse.Namespace, cfg) -> None:
|
||
"""Full pipeline: analyze → match → report → export."""
|
||
cmd_analyze(args, cfg)
|
||
cmd_match(args, cfg)
|
||
cmd_report(args, cfg)
|
||
cmd_export(args, cfg)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Argument parser
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _build_parser() -> argparse.ArgumentParser:
|
||
parser = argparse.ArgumentParser(
|
||
prog="ai-trailer",
|
||
description="AI Trailer Generator v2 — Pure CV scene matching",
|
||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||
)
|
||
parser.add_argument(
|
||
"--config", type=Path, default=Path("config.toml"),
|
||
metavar="CONFIG", help="Path to config.toml (default: ./config.toml)",
|
||
)
|
||
parser.add_argument(
|
||
"--log-level", default="INFO",
|
||
choices=["DEBUG", "INFO", "WARNING", "ERROR"],
|
||
help="Logging verbosity (default: INFO)",
|
||
)
|
||
|
||
sub = parser.add_subparsers(dest="command", required=True)
|
||
|
||
# analyze
|
||
p_analyze = sub.add_parser("analyze", help="Detect trailer beats + fingerprint")
|
||
p_analyze.add_argument("--no-audio", action="store_true",
|
||
help="Skip Whisper (only affects beat labels, not matching)")
|
||
p_analyze.add_argument("--no-llm", action="store_true",
|
||
help="Skip LLM classification (only affects beat labels)")
|
||
|
||
# match
|
||
p_match = sub.add_parser("match", help="Run 2-phase CV matching")
|
||
p_match.add_argument("--force-reindex", action="store_true",
|
||
help="Ignore scene cache and re-run PySceneDetect")
|
||
p_match.add_argument("--beat", type=int,
|
||
help="Match only one beat and merge it into the cached results")
|
||
p_match.add_argument("--vision", action="store_true",
|
||
help="Enable cached vision descriptions for extra automatic search seeds")
|
||
p_match.add_argument("--no-vision", action="store_true",
|
||
help="Disable vision seeding even if [vision].enabled is true")
|
||
|
||
# rematch
|
||
p_rematch = sub.add_parser("rematch", help="Re-run or override matching for one beat")
|
||
p_rematch.add_argument("--beat", type=int, required=True, help="Beat ID to rematch")
|
||
p_rematch.add_argument("--threshold", type=float, default=None, help="Override match_threshold")
|
||
p_rematch.add_argument("--refine", action="store_true",
|
||
help="Refine the cached match by measuring a local image-content offset")
|
||
p_rematch.add_argument("--refine-window", type=float, default=None,
|
||
help="Seconds to search around the cached in-point when using --refine")
|
||
|
||
# report
|
||
p_report = sub.add_parser("report", help="Generate HTML visual comparison report")
|
||
p_report.add_argument("--beat", type=int, help="Report only one beat")
|
||
|
||
# export
|
||
p_export = sub.add_parser("export", help="Export timeline from cached results")
|
||
p_export.add_argument("--format", choices=["fcpxml", "edl", "both"],
|
||
help="Override [export] output_format from config")
|
||
p_export.add_argument("--beat", type=int, help="Export only one beat")
|
||
|
||
# run
|
||
p_run = sub.add_parser("run", help="Full pipeline: analyze → match → export")
|
||
p_run.add_argument("--no-audio", action="store_true")
|
||
p_run.add_argument("--no-llm", action="store_true")
|
||
p_run.add_argument("--force-reindex", action="store_true")
|
||
p_run.add_argument("--vision", action="store_true")
|
||
p_run.add_argument("--no-vision", action="store_true")
|
||
p_run.add_argument("--format", choices=["fcpxml", "edl", "both"])
|
||
p_run.add_argument("--beat", type=int,
|
||
help="Run match/report/export for only one cached beat")
|
||
|
||
return parser
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Entry point
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def main() -> None:
|
||
_ensure_utf8_console()
|
||
parser = _build_parser()
|
||
args = parser.parse_args()
|
||
|
||
_setup_logging(args.log_level)
|
||
|
||
from src.core.config import load_config
|
||
cfg = load_config(args.config)
|
||
|
||
dispatch = {
|
||
"analyze": cmd_analyze,
|
||
"match": cmd_match,
|
||
"rematch": cmd_rematch,
|
||
"report": cmd_report,
|
||
"export": cmd_export,
|
||
"run": cmd_run,
|
||
}
|
||
|
||
handler = dispatch[args.command]
|
||
handler(args, cfg)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|