97a8f9e305
- New CUTTER_REPORT.md: per-beat hand-off table for the video editor doing the manual recut. Per beat: trailer SMPTE in/out, source SMPTE in/out, scene id, score, status (OK / ? / MAN.), and a one-line phase description from the cached vision text. - New scripts/generate_cutter_report.py: pure renderer that reads the current cache (match_results.json + trailer_beats.json + optional vision_descriptions.json) and writes CUTTER_REPORT.md. No side effects on the cache. - cli.py: after every successful match the cutter report is regenerated automatically (best-effort; failures are logged and do not abort). - README.md: new top-section "Fuer den Cutter" describing exactly what the editor needs (which two files to look at, how the status flag works, the recommended NLE workflow). The technical algorithm description follows below. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1816 lines
72 KiB
Python
1816 lines
72 KiB
Python
"""
|
||
cli.py — AI Trailer Generator v2 — Command-Line Interface
|
||
|
||
Usage:
|
||
python cli.py analyze [--config CONFIG] [--no-audio] [--no-llm]
|
||
python cli.py match [--config CONFIG] [--force-reindex]
|
||
python cli.py rematch --beat N [--threshold F] [--refine]
|
||
python cli.py report [--config CONFIG]
|
||
python cli.py run [--config CONFIG] [--force-reindex] [--no-audio] [--no-llm]
|
||
python cli.py export [--config CONFIG] [--format fcpxml|edl|both]
|
||
|
||
On --no-audio / --no-llm:
|
||
These flags do NOT affect matching quality.
|
||
Whisper and the LLM only assign narrative labels (HOOK/SETUP/CLIMAX)
|
||
to beats in the export metadata. The CV pipeline is identical either way.
|
||
Use them for fast iterations: they skip large model downloads.
|
||
|
||
All heavy imports are deferred so --help is instant.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import json
|
||
import logging
|
||
import sys
|
||
from pathlib import Path
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Logging setup
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _setup_logging(level: str = "INFO") -> None:
|
||
# Force UTF-8 for Windows console emoji printing
|
||
if sys.stdout.encoding != 'utf-8':
|
||
sys.stdout.reconfigure(encoding='utf-8')
|
||
logging.basicConfig(
|
||
format="%(asctime)s %(levelname)-8s %(name)s — %(message)s",
|
||
datefmt="%H:%M:%S",
|
||
level=getattr(logging, level.upper(), logging.INFO),
|
||
stream=sys.stdout,
|
||
)
|
||
logging.getLogger("PIL").setLevel(logging.WARNING)
|
||
|
||
|
||
def _ensure_utf8_console() -> None:
|
||
"""Make argparse help safe on Windows before logging is configured."""
|
||
if sys.stdout.encoding != "utf-8":
|
||
sys.stdout.reconfigure(encoding="utf-8")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Cache helpers (match results ↔ JSON)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _results_cache_path(cfg: "AppConfig") -> Path: # type: ignore[name-defined]
|
||
return cfg.paths.cache_dir / "match_results.json"
|
||
|
||
|
||
def _save_results(results: list, cfg: "AppConfig") -> None: # type: ignore[name-defined]
|
||
from src.core.models import MatchResult
|
||
data = [
|
||
{
|
||
"beat_id": r.beat_id,
|
||
"scene_id": r.scene_id,
|
||
"source_path": str(r.source_path),
|
||
"in_point_s": r.in_point_s,
|
||
"out_point_s": r.out_point_s,
|
||
"in_point_frame": r.in_point_frame,
|
||
"match_score": r.match_score,
|
||
"match_location": list(r.match_location),
|
||
"is_confirmed": r.is_confirmed,
|
||
"segments": [
|
||
{
|
||
"trailer_offset_s": s.trailer_offset_s,
|
||
"duration_s": s.duration_s,
|
||
"scene_id": s.scene_id,
|
||
"in_point_s": s.in_point_s,
|
||
"out_point_s": s.out_point_s,
|
||
"match_score": s.match_score,
|
||
"is_confirmed": s.is_confirmed,
|
||
}
|
||
for s in getattr(r, "segments", ())
|
||
],
|
||
}
|
||
for r in results
|
||
]
|
||
p = _results_cache_path(cfg)
|
||
p.parent.mkdir(parents=True, exist_ok=True)
|
||
p.write_text(json.dumps(data, indent=2), encoding="utf-8")
|
||
logging.getLogger(__name__).info("Match results cached → %s", p)
|
||
|
||
|
||
def _regenerate_cutter_report(cfg: "AppConfig") -> None: # type: ignore[name-defined]
|
||
"""Re-render CUTTER_REPORT.md after each cache write so it stays in sync."""
|
||
try:
|
||
from scripts.generate_cutter_report import render_report
|
||
except Exception as exc:
|
||
logging.getLogger(__name__).warning("Cutter report regen skipped: %s", exc)
|
||
return
|
||
try:
|
||
project_root = cfg.paths.cache_dir.parent
|
||
out = project_root / "CUTTER_REPORT.md"
|
||
out.write_text(render_report(project_root), encoding="utf-8")
|
||
logging.getLogger(__name__).info("Cutter report regenerated → %s", out)
|
||
except Exception as exc:
|
||
logging.getLogger(__name__).warning("Cutter report regen failed: %s", exc)
|
||
|
||
|
||
def _load_results(cfg: "AppConfig") -> list: # type: ignore[name-defined]
|
||
from src.core.models import MatchResult, MatchSegment
|
||
p = _results_cache_path(cfg)
|
||
if not p.exists():
|
||
raise FileNotFoundError(f"No cached results at {p}. Run 'match' first.")
|
||
raw = json.loads(p.read_text(encoding="utf-8"))
|
||
return [
|
||
MatchResult(
|
||
beat_id=d["beat_id"],
|
||
scene_id=d["scene_id"],
|
||
source_path=Path(d["source_path"]),
|
||
in_point_s=d["in_point_s"],
|
||
out_point_s=d["out_point_s"],
|
||
in_point_frame=d["in_point_frame"],
|
||
match_score=d["match_score"],
|
||
match_location=tuple(d["match_location"]),
|
||
is_confirmed=d.get("is_confirmed", True),
|
||
segments=tuple(
|
||
MatchSegment(
|
||
trailer_offset_s=float(s["trailer_offset_s"]),
|
||
duration_s=float(s["duration_s"]),
|
||
scene_id=int(s["scene_id"]),
|
||
in_point_s=float(s["in_point_s"]),
|
||
out_point_s=float(s["out_point_s"]),
|
||
match_score=float(s["match_score"]),
|
||
is_confirmed=bool(s.get("is_confirmed", True)),
|
||
)
|
||
for s in d.get("segments", ())
|
||
),
|
||
)
|
||
for d in raw
|
||
]
|
||
|
||
|
||
def _load_scene_cache_light(cfg) -> list[dict]:
|
||
p = cfg.paths.cache_dir / "scene_index.json"
|
||
if not p.exists():
|
||
return []
|
||
return json.loads(p.read_text(encoding="utf-8"))
|
||
|
||
|
||
def _scene_fps_light(scene: dict, cfg) -> float:
|
||
duration_s = max(0.0, float(scene["end_s"]) - float(scene["start_s"]))
|
||
frame_count = max(0, int(scene["end_frame"]) - int(scene["start_frame"]))
|
||
return frame_count / duration_s if duration_s > 0 and frame_count > 0 else cfg.export.edl_frame_rate
|
||
|
||
|
||
def _scene_for_time_light(scenes: list[dict], t_sec: float, cfg) -> dict | None:
|
||
for idx, scene in enumerate(scenes):
|
||
if float(scene["start_s"]) <= t_sec < float(scene["end_s"]):
|
||
if (
|
||
float(scene["end_s"]) - t_sec <= cfg.cv.deep_scan.scene_boundary_epsilon_s
|
||
and idx + 1 < len(scenes)
|
||
):
|
||
return scenes[idx + 1]
|
||
return scene
|
||
return None
|
||
|
||
|
||
def _scene_by_id_light(scenes: list[dict], scene_id: int) -> dict | None:
|
||
return next((s for s in scenes if int(s["scene_id"]) == scene_id), None)
|
||
|
||
|
||
def _contiguous_duration_light(beat, in_point_s: float, scenes: list[dict], cfg, matchable_duration_s: float) -> float:
|
||
if matchable_duration_s <= 0:
|
||
return 0.0
|
||
try:
|
||
from src.cv.global_scan import _reference_internal_cut_offsets
|
||
cut_offsets = _reference_internal_cut_offsets(beat, cfg)
|
||
except Exception:
|
||
cut_offsets = []
|
||
|
||
start_idx = None
|
||
for idx, scene in enumerate(scenes):
|
||
if float(scene["start_s"]) <= in_point_s < float(scene["end_s"]):
|
||
start_idx = idx
|
||
break
|
||
if start_idx is None:
|
||
return 0.0
|
||
|
||
target_end = in_point_s + matchable_duration_s
|
||
current_end = in_point_s
|
||
for scene in scenes[start_idx:]:
|
||
scene_end = float(scene["end_s"])
|
||
if target_end <= scene_end:
|
||
return matchable_duration_s
|
||
|
||
boundary_offset = scene_end - in_point_s
|
||
if not any(
|
||
abs(boundary_offset - cut_offset) <= cfg.vision.multi_shot_boundary_tolerance_s
|
||
for cut_offset in cut_offsets
|
||
):
|
||
tail_s = max(0.0, cfg.cv.deep_scan.trim_tail_frames / _scene_fps_light(scene, cfg))
|
||
return max(0.0, scene_end - in_point_s - tail_s)
|
||
current_end = scene_end
|
||
|
||
return max(0.0, current_end - in_point_s)
|
||
|
||
|
||
def _normalize_cached_results(beats: list, results: list, cfg) -> list:
|
||
"""
|
||
Re-apply current generic timing rules to cached results.
|
||
|
||
This keeps old automatic cache entries from preserving obsolete scene-boundary
|
||
or tail-trim behavior without introducing manual per-beat truth.
|
||
"""
|
||
from dataclasses import replace
|
||
|
||
scenes = _load_scene_cache_light(cfg)
|
||
if not scenes:
|
||
return results
|
||
|
||
beats_by_id = {b.beat_id: b for b in beats}
|
||
normalized = []
|
||
for result in results:
|
||
beat = beats_by_id.get(result.beat_id)
|
||
if getattr(result, "segments", ()):
|
||
segment_duration = sum(max(0.0, float(s.duration_s)) for s in result.segments)
|
||
weighted_score = (
|
||
sum(max(0.0, float(s.duration_s)) * float(s.match_score) for s in result.segments)
|
||
/ segment_duration
|
||
if segment_duration > 0 else result.match_score
|
||
)
|
||
if weighted_score < cfg.cv.deep_scan.provisional_match_threshold:
|
||
continue
|
||
if beat is not None and beat.duration_s > 0:
|
||
visible_duration = sum(
|
||
max(0.0, end_s - start_s)
|
||
for start_s, end_s in _reference_scoreable_segments(beat, cfg)
|
||
)
|
||
coverage_target = visible_duration if visible_duration > 0 else beat.duration_s
|
||
coverage = segment_duration / coverage_target
|
||
if coverage < cfg.cv.deep_scan.min_duration_coverage:
|
||
continue
|
||
normalized.append(replace(result, match_score=weighted_score))
|
||
continue
|
||
|
||
if result.match_score < cfg.cv.deep_scan.provisional_match_threshold:
|
||
continue
|
||
|
||
scene = _scene_for_time_light(scenes, result.in_point_s, cfg)
|
||
declared_scene = _scene_by_id_light(scenes, result.scene_id)
|
||
|
||
# If the automatic matcher selected a scene but its in-point sits just
|
||
# before that scene's detected start, treat this as scene-boundary drift
|
||
# and clamp to the declared scene. This is generic: no beat IDs, no
|
||
# manual timestamps, just consistent scene/time reconciliation.
|
||
if declared_scene is not None:
|
||
declared_start = float(declared_scene["start_s"])
|
||
declared_end = float(declared_scene["end_s"])
|
||
declared_fps = _scene_fps_light(declared_scene, cfg)
|
||
boundary_tolerance_s = (
|
||
cfg.cv.deep_scan.scene_boundary_epsilon_s
|
||
+ cfg.cv.deep_scan.start_preroll_frames / declared_fps
|
||
)
|
||
if declared_start - boundary_tolerance_s <= result.in_point_s < declared_end:
|
||
scene = declared_scene
|
||
|
||
if beat is None or scene is None:
|
||
normalized.append(result)
|
||
continue
|
||
|
||
fps = _scene_fps_light(scene, cfg)
|
||
adjusted_in_s = result.in_point_s
|
||
scene_changed = int(scene["scene_id"]) != result.scene_id
|
||
starts_before_scene = result.in_point_s < float(scene["start_s"])
|
||
if scene_changed or starts_before_scene or result.duration_s <= 0.12:
|
||
adjusted_in_s = max(0.0, result.in_point_s - (cfg.cv.deep_scan.start_preroll_frames / fps))
|
||
adjusted_in_s = max(float(scene["start_s"]), adjusted_in_s)
|
||
scene = _scene_for_time_light(scenes, adjusted_in_s, cfg) or scene
|
||
fps = _scene_fps_light(scene, cfg)
|
||
|
||
matchable_duration_s = beat.duration_s
|
||
try:
|
||
from src.cv.global_scan import estimate_matchable_reference_duration
|
||
matchable_duration_s = estimate_matchable_reference_duration(beat, cfg)
|
||
except Exception:
|
||
pass
|
||
|
||
tail_s = max(0.0, cfg.cv.deep_scan.trim_tail_frames / fps)
|
||
single_scene_duration_s = max(0.0, min(beat.duration_s, float(scene["end_s"]) - adjusted_in_s) - tail_s)
|
||
contiguous_duration_s = _contiguous_duration_light(
|
||
beat,
|
||
adjusted_in_s,
|
||
scenes,
|
||
cfg,
|
||
matchable_duration_s,
|
||
)
|
||
max_duration_s = max(single_scene_duration_s, min(beat.duration_s, contiguous_duration_s))
|
||
|
||
normalized_result = result
|
||
if (
|
||
scene_changed
|
||
or starts_before_scene
|
||
or result.duration_s <= 0.12
|
||
or result.out_point_s > adjusted_in_s + max_duration_s + (1.0 / fps)
|
||
):
|
||
normalized_result = replace(
|
||
result,
|
||
scene_id=int(scene["scene_id"]),
|
||
in_point_s=adjusted_in_s,
|
||
out_point_s=adjusted_in_s + max_duration_s,
|
||
in_point_frame=int(adjusted_in_s * fps),
|
||
)
|
||
|
||
coverage = (
|
||
max(0.0, normalized_result.duration_s) / matchable_duration_s
|
||
if matchable_duration_s > 0 else 0.0
|
||
)
|
||
if coverage < cfg.cv.deep_scan.min_duration_coverage:
|
||
continue
|
||
|
||
try:
|
||
from src.cv.content_align import align_cached_match_by_content
|
||
_, content_score = align_cached_match_by_content(
|
||
beat,
|
||
normalized_result.in_point_s,
|
||
cfg,
|
||
search_window_s=min(0.8, cfg.cv.deep_scan.content_align_window_seconds),
|
||
fps=12.5,
|
||
)
|
||
content_gate = (
|
||
cfg.cv.deep_scan.provisional_content_threshold
|
||
if normalized_result.is_confirmed
|
||
else min(cfg.cv.deep_scan.provisional_content_threshold, cfg.vision.content_threshold)
|
||
)
|
||
if content_score < content_gate:
|
||
continue
|
||
if content_score < cfg.cv.deep_scan.match_threshold and normalized_result.is_confirmed:
|
||
normalized_result = replace(
|
||
normalized_result,
|
||
match_score=min(normalized_result.match_score, content_score),
|
||
is_confirmed=False,
|
||
)
|
||
except Exception:
|
||
pass
|
||
|
||
normalized.append(normalized_result)
|
||
|
||
return normalized
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Command handlers
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _build_transcribe_callback(cfg):
|
||
"""Return a transcribe_callback closure, or None if audio is disabled."""
|
||
from src.audio.transcriber import transcribe_video
|
||
|
||
def _cb(path, start_s, end_s, offset_s):
|
||
return transcribe_video(path, cfg, start_s=start_s, end_s=end_s, time_offset_s=offset_s)
|
||
|
||
return _cb
|
||
|
||
|
||
def _build_classify_callback(cfg):
|
||
"""Return a classify_callback closure."""
|
||
from src.llm.dramaturg import classify_beats
|
||
|
||
def _cb(beats):
|
||
return classify_beats(beats, cfg)
|
||
|
||
return _cb
|
||
|
||
|
||
def cmd_analyze(args: argparse.Namespace, cfg) -> list:
|
||
from src.pipeline.trailer_analyzer import analyze_reference_trailer
|
||
|
||
transcribe_cb = _build_transcribe_callback(cfg) if not args.no_audio else None
|
||
classify_cb = _build_classify_callback(cfg) if not args.no_llm else None
|
||
|
||
beats = analyze_reference_trailer(
|
||
cfg,
|
||
transcribe_callback=transcribe_cb,
|
||
classify_callback=classify_cb,
|
||
)
|
||
|
||
# Persist beats for downstream commands (including histogram bytes as hex)
|
||
beats_cache = cfg.paths.cache_dir / "trailer_beats.json"
|
||
beats_cache.parent.mkdir(parents=True, exist_ok=True)
|
||
beats_data = [
|
||
{
|
||
"beat_id": b.beat_id,
|
||
"start_s": b.start_s,
|
||
"end_s": b.end_s,
|
||
"start_frame": b.start_frame,
|
||
"end_frame": b.end_frame,
|
||
"beat_type": b.beat_type.name,
|
||
"dialogue": [{"start_s": d.start_s, "end_s": d.end_s, "text": d.text} for d in b.dialogue],
|
||
"phash": b.phash,
|
||
"luma_hist": b.luma_hist.hex() if b.luma_hist else None,
|
||
"sat_hist": b.sat_hist.hex() if b.sat_hist else None,
|
||
}
|
||
for b in beats
|
||
]
|
||
beats_cache.write_text(json.dumps(beats_data, indent=2, ensure_ascii=False), encoding="utf-8")
|
||
print(f"\n\u2705 {len(beats)} beats analyzed \u2192 {beats_cache}")
|
||
return beats
|
||
|
||
|
||
def _load_beats(cfg) -> list:
|
||
from src.core.models import BeatType, DialogueLine, TrailerBeat
|
||
|
||
p = cfg.paths.cache_dir / "trailer_beats.json"
|
||
if not p.exists():
|
||
raise FileNotFoundError(f"No cached beats at {p}. Run 'analyze' first.")
|
||
|
||
raw = json.loads(p.read_text(encoding="utf-8"))
|
||
beats = []
|
||
for d in raw:
|
||
dialogue = tuple(
|
||
DialogueLine(start_s=x["start_s"], end_s=x["end_s"], text=x["text"])
|
||
for x in d.get("dialogue", [])
|
||
)
|
||
beats.append(TrailerBeat(
|
||
beat_id=d["beat_id"],
|
||
trailer_path=cfg.paths.reference_trailer,
|
||
start_s=d["start_s"],
|
||
end_s=d["end_s"],
|
||
start_frame=d["start_frame"],
|
||
end_frame=d["end_frame"],
|
||
beat_type=BeatType[d.get("beat_type", "UNKNOWN")],
|
||
dialogue=dialogue,
|
||
phash=d.get("phash"),
|
||
luma_hist=bytes.fromhex(d["luma_hist"]) if d.get("luma_hist") else None,
|
||
sat_hist= bytes.fromhex(d["sat_hist"]) if d.get("sat_hist") else None,
|
||
))
|
||
return beats
|
||
|
||
|
||
def _select_beats(beats: list, beat_id: int | None) -> list:
|
||
"""Return all beats or exactly one requested beat."""
|
||
if beat_id is None:
|
||
return beats
|
||
selected = [b for b in beats if b.beat_id == beat_id]
|
||
if not selected:
|
||
raise ValueError(f"Beat {beat_id} not found. Run 'analyze' first.")
|
||
return selected
|
||
|
||
|
||
def _select_results(results: list, beat_ids: set[int] | None) -> list:
|
||
"""Return all results or only results for the requested beats."""
|
||
if beat_ids is None:
|
||
return results
|
||
return [r for r in results if r.beat_id in beat_ids]
|
||
|
||
|
||
def _find_scene_for_in_point(cfg, in_point_s: float):
|
||
from src.cv.scene_indexer import build_scene_index
|
||
|
||
scenes = build_scene_index(cfg)
|
||
for idx, scene in enumerate(scenes):
|
||
if scene.start_s <= in_point_s < scene.end_s:
|
||
if (
|
||
scene.end_s - in_point_s <= cfg.cv.deep_scan.scene_boundary_epsilon_s
|
||
and idx + 1 < len(scenes)
|
||
):
|
||
return scenes[idx + 1]
|
||
return scene
|
||
return None
|
||
|
||
|
||
def _reference_scoreable_segments(beat, cfg) -> list[tuple[float, float]]:
|
||
"""Find visible source-matchable islands inside a trailer beat."""
|
||
from src.cv.frame_extractor import grab_frame_at_path
|
||
from src.cv.global_scan import (
|
||
_corr_same_size,
|
||
_is_scoreable_reference_frame,
|
||
_prepare_haystack,
|
||
_reference_visibility_stats,
|
||
)
|
||
|
||
def is_visible(frame) -> bool:
|
||
if frame is None:
|
||
return False
|
||
mean_luma, p90_luma, contrast = _reference_visibility_stats(frame, cfg)
|
||
visible_luma = (
|
||
mean_luma >= cfg.cv.deep_scan.scoreable_luma_mean_min * 0.45
|
||
or p90_luma >= cfg.cv.deep_scan.scoreable_luma_p90_min * 0.50
|
||
)
|
||
visible_contrast = contrast >= max(8.0, cfg.cv.deep_scan.scoreable_contrast_min * 0.30)
|
||
return visible_luma and visible_contrast
|
||
|
||
step_s = max(0.08, cfg.cv.deep_scan.span_sample_step_s)
|
||
min_segment_s = max(0.32, step_s * 3.0)
|
||
bridge_gap_s = max(0.18, step_s * 2.0)
|
||
raw: list[tuple[float, float]] = []
|
||
start: float | None = None
|
||
last_seen: float | None = None
|
||
t = 0.0
|
||
while t <= beat.duration_s:
|
||
frame = grab_frame_at_path(beat.trailer_path, beat.start_s + t)
|
||
scoreable = frame is not None and _is_scoreable_reference_frame(frame, cfg)
|
||
if scoreable:
|
||
if start is None:
|
||
start = t
|
||
last_seen = t
|
||
elif start is not None and last_seen is not None and t - last_seen > bridge_gap_s:
|
||
end = min(beat.duration_s, last_seen + step_s)
|
||
if end - start >= min_segment_s:
|
||
raw.append((start, end))
|
||
start = None
|
||
last_seen = None
|
||
t = round(t + step_s, 6)
|
||
|
||
if start is not None and last_seen is not None:
|
||
end = min(beat.duration_s, last_seen + step_s)
|
||
if end - start >= min_segment_s:
|
||
raw.append((start, end))
|
||
|
||
expanded: list[tuple[float, float]] = []
|
||
same_shot_corr_min = 0.72
|
||
for start_s, end_s in raw:
|
||
start_anchor = grab_frame_at_path(beat.trailer_path, beat.start_s + start_s)
|
||
end_anchor = grab_frame_at_path(beat.trailer_path, beat.start_s + max(start_s, end_s - step_s))
|
||
start_feature = _prepare_haystack(start_anchor, cfg) if start_anchor is not None else None
|
||
end_feature = _prepare_haystack(end_anchor, cfg) if end_anchor is not None else None
|
||
|
||
soft_start = start_s
|
||
t = round(start_s - step_s, 6)
|
||
while t >= 0.0:
|
||
frame = grab_frame_at_path(beat.trailer_path, beat.start_s + t)
|
||
if not is_visible(frame):
|
||
break
|
||
if start_feature is not None and _corr_same_size(_prepare_haystack(frame, cfg), start_feature) < same_shot_corr_min:
|
||
break
|
||
soft_start = max(0.0, t)
|
||
t = round(t - step_s, 6)
|
||
|
||
soft_end = end_s
|
||
t = round(end_s, 6)
|
||
while t <= beat.duration_s + 1e-6:
|
||
frame = grab_frame_at_path(beat.trailer_path, beat.start_s + t)
|
||
if not is_visible(frame):
|
||
break
|
||
if end_feature is not None and _corr_same_size(_prepare_haystack(frame, cfg), end_feature) < same_shot_corr_min:
|
||
break
|
||
soft_end = min(beat.duration_s, t + step_s)
|
||
t = round(t + step_s, 6)
|
||
|
||
if soft_end - soft_start >= min_segment_s:
|
||
expanded.append((soft_start, soft_end))
|
||
|
||
merged: list[tuple[float, float]] = []
|
||
for start_s, end_s in expanded:
|
||
if merged and start_s - merged[-1][1] <= bridge_gap_s:
|
||
merged[-1] = (merged[-1][0], max(merged[-1][1], end_s))
|
||
else:
|
||
merged.append((start_s, end_s))
|
||
return merged
|
||
|
||
|
||
def _trim_beats_to_single_visual_island(beats: list, cfg) -> tuple[list, dict[int, tuple[float, float]]]:
|
||
"""Use a single visible island as the primary match target for faded beats."""
|
||
from dataclasses import replace
|
||
|
||
trimmed = []
|
||
trims: dict[int, tuple[float, float]] = {}
|
||
frame_s = 1.0 / max(1.0, float(cfg.export.edl_frame_rate))
|
||
for beat in beats:
|
||
islands = _reference_scoreable_segments(beat, cfg)
|
||
if len(islands) == 1:
|
||
start_s, end_s = islands[0]
|
||
island_duration_s = max(0.0, end_s - start_s)
|
||
has_real_trim = (
|
||
start_s > frame_s * 1.5
|
||
or beat.duration_s - end_s > frame_s * 1.5
|
||
)
|
||
if island_duration_s > 0.0 and has_real_trim:
|
||
trimmed.append(
|
||
replace(
|
||
beat,
|
||
start_s=beat.start_s + start_s,
|
||
end_s=beat.start_s + end_s,
|
||
)
|
||
)
|
||
trims[beat.beat_id] = (start_s, island_duration_s)
|
||
continue
|
||
trimmed.append(beat)
|
||
return trimmed, trims
|
||
|
||
|
||
def _apply_single_island_segments(results: list, trims: dict[int, tuple[float, float]]) -> list:
|
||
"""Restore beat-relative segment metadata after matching a trimmed island."""
|
||
if not trims:
|
||
return results
|
||
|
||
from dataclasses import replace
|
||
from src.core.models import MatchSegment
|
||
|
||
expanded = []
|
||
for result in results:
|
||
trim = trims.get(result.beat_id)
|
||
if trim is None or getattr(result, "segments", ()):
|
||
expanded.append(result)
|
||
continue
|
||
trailer_offset_s, island_duration_s = trim
|
||
duration_s = min(max(0.0, island_duration_s), max(0.0, result.duration_s))
|
||
segment = MatchSegment(
|
||
trailer_offset_s=trailer_offset_s,
|
||
duration_s=duration_s,
|
||
scene_id=result.scene_id,
|
||
in_point_s=result.in_point_s,
|
||
out_point_s=result.in_point_s + duration_s,
|
||
match_score=result.match_score,
|
||
is_confirmed=result.is_confirmed,
|
||
)
|
||
expanded.append(
|
||
replace(
|
||
result,
|
||
out_point_s=result.in_point_s + duration_s,
|
||
segments=(segment,),
|
||
)
|
||
)
|
||
return expanded
|
||
|
||
|
||
def _merge_best_results(existing: list, candidates: list, cfg) -> list:
|
||
"""Merge matches by beat, preferring confirmed or higher-scoring results."""
|
||
by_id = {r.beat_id: r for r in existing}
|
||
for candidate in candidates:
|
||
old = by_id.get(candidate.beat_id)
|
||
if old is None:
|
||
by_id[candidate.beat_id] = candidate
|
||
continue
|
||
candidate_confirmed = candidate.match_score >= cfg.cv.deep_scan.match_threshold or candidate.is_confirmed
|
||
old_confirmed = old.match_score >= cfg.cv.deep_scan.match_threshold or old.is_confirmed
|
||
if (
|
||
candidate_confirmed and not old_confirmed
|
||
or candidate.match_score > old.match_score + cfg.cv.deep_scan.duration_tie_break_score_delta
|
||
or (
|
||
candidate.match_score >= old.match_score - cfg.cv.deep_scan.duration_tie_break_score_delta
|
||
and candidate.duration_s > old.duration_s
|
||
)
|
||
):
|
||
by_id[candidate.beat_id] = candidate
|
||
return sorted(by_id.values(), key=lambda r: r.beat_id)
|
||
|
||
|
||
def _recover_unmatched_beats_via_vision(results: list, beats: list, cfg) -> list:
|
||
"""Try a vision-led search for beats that ended up without a match.
|
||
|
||
For each unmatched beat that has scoreable visual content (i.e. not pure
|
||
fade/title-card material), this pass:
|
||
1. Asks the vibe-check (CV histogram + pHash) for the top-K candidate
|
||
scenes.
|
||
2. For each candidate, runs the semantic action-window search with the
|
||
beat's own description, prefering windows whose phase matches the
|
||
visible part of the beat.
|
||
3. Refines the in-point with the regular CV content/motion aligner.
|
||
4. Validates the resulting window with the vision phase check, exactly
|
||
like the main filter.
|
||
5. Adds the best validated candidate as a provisional MatchResult.
|
||
|
||
Confirmed and provisional matches both stay subject to the same thresholds
|
||
used elsewhere; this only adds matches that pass the same quality gates.
|
||
"""
|
||
if not cfg.vision.enabled or not beats:
|
||
return results
|
||
|
||
from dataclasses import replace
|
||
from src.cv.global_scan import align_in_point_by_content_and_motion, estimate_usable_source_duration
|
||
from src.cv.scene_indexer import build_scene_index
|
||
from src.cv.vibe_check import run_vibe_check
|
||
from src.core.models import MatchResult
|
||
from src.llm.vision_cache import find_action_window_in_scene, validate_match_window_with_vision
|
||
|
||
logger = logging.getLogger(__name__)
|
||
matched_ids = {r.beat_id for r in results}
|
||
unmatched = [b for b in beats if b.beat_id not in matched_ids]
|
||
if not unmatched:
|
||
return results
|
||
|
||
scenes = build_scene_index(cfg)
|
||
if not scenes:
|
||
return results
|
||
|
||
new_results = list(results)
|
||
for beat in unmatched:
|
||
try:
|
||
islands = _reference_scoreable_segments(beat, cfg)
|
||
except Exception:
|
||
islands = []
|
||
|
||
# Anchor selection: prefer the longest visible island; if none exists,
|
||
# fall back to the full beat. The latter handles dark / low-contrast
|
||
# close-ups that drop below the scoreable luma/contrast thresholds but
|
||
# are still semantically describable. The strict vision phase
|
||
# validation later in this pass keeps us from accepting pure title-card
|
||
# or logo material.
|
||
from dataclasses import replace as _replace
|
||
if islands:
|
||
anchor_start_s, anchor_end_s = max(islands, key=lambda iv: iv[1] - iv[0])
|
||
anchor_beat = _replace(
|
||
beat,
|
||
start_s=beat.start_s + anchor_start_s,
|
||
end_s=beat.start_s + anchor_end_s,
|
||
)
|
||
else:
|
||
anchor_beat = beat
|
||
|
||
try:
|
||
hits = run_vibe_check(
|
||
beat,
|
||
scenes,
|
||
top_k=max(cfg.cv.deep_scan.scene_seed_top_k, cfg.cv.vibe_check.top_k_candidates),
|
||
hist_method=cfg.cv.vibe_check.hist_compare_method,
|
||
phash_max_distance=64,
|
||
)
|
||
except Exception as exc:
|
||
logger.warning("Beat %d: recovery vibe-check failed (%s)", beat.beat_id, exc)
|
||
continue
|
||
|
||
scenes_by_id = {s.scene_id: s for s in scenes}
|
||
best = None # (score, scene, in_s, dur_s, reason)
|
||
seen = set()
|
||
for hit in hits[: cfg.cv.deep_scan.scene_seed_top_k]:
|
||
scene = scenes_by_id.get(hit.scene_id)
|
||
if scene is None or scene.scene_id in seen:
|
||
continue
|
||
seen.add(scene.scene_id)
|
||
|
||
try:
|
||
found = find_action_window_in_scene(anchor_beat, scene, cfg)
|
||
except Exception as exc:
|
||
logger.debug("Beat %d: action window failed for scene %d (%s)", beat.beat_id, scene.scene_id, exc)
|
||
continue
|
||
if found is None:
|
||
continue
|
||
start_s, end_s, semantic_score, reason = found
|
||
|
||
window_s = max(3.0, min(8.0, (end_s - start_s) * 4.0))
|
||
try:
|
||
aligned_in_s, combined_score, content_score, motion_score = align_in_point_by_content_and_motion(
|
||
anchor_beat,
|
||
start_s,
|
||
cfg,
|
||
search_window_s=window_s,
|
||
)
|
||
except Exception as exc:
|
||
logger.debug("Beat %d: align failed for scene %d (%s)", beat.beat_id, scene.scene_id, exc)
|
||
continue
|
||
aligned_in_s = max(scene.start_s, min(aligned_in_s, max(scene.start_s, scene.end_s - anchor_beat.duration_s)))
|
||
|
||
try:
|
||
usable_duration_s, usable_score = estimate_usable_source_duration(anchor_beat, aligned_in_s, cfg)
|
||
except Exception:
|
||
usable_duration_s, usable_score = anchor_beat.duration_s, 0.0
|
||
usable_duration_s = max(0.0, min(anchor_beat.duration_s, usable_duration_s))
|
||
if usable_duration_s < max(0.32, anchor_beat.duration_s * 0.45):
|
||
usable_duration_s = anchor_beat.duration_s
|
||
|
||
try:
|
||
ok, verify_reason = validate_match_window_with_vision(
|
||
anchor_beat,
|
||
source_path=scene.source_path,
|
||
scene_id=scene.scene_id,
|
||
in_point_s=aligned_in_s,
|
||
out_point_s=aligned_in_s + usable_duration_s,
|
||
cfg=cfg,
|
||
)
|
||
except Exception as exc:
|
||
logger.debug("Beat %d: validate failed scene=%d (%s)", beat.beat_id, scene.scene_id, exc)
|
||
continue
|
||
if not ok:
|
||
continue
|
||
|
||
final_score = max(
|
||
combined_score,
|
||
min(0.99, semantic_score * 0.65 + motion_score * 0.18 + content_score * 0.09 + usable_score * 0.08),
|
||
)
|
||
if final_score < cfg.cv.deep_scan.provisional_match_threshold:
|
||
continue
|
||
candidate = (final_score, scene, aligned_in_s, usable_duration_s, f"recovery; {reason}; {verify_reason}")
|
||
if best is None or candidate[0] > best[0]:
|
||
best = candidate
|
||
|
||
if best is None:
|
||
continue
|
||
score, scene, aligned_in_s, usable_duration_s, repair_reason = best
|
||
logger.info(
|
||
"Beat %d: recovered via vision action search scene=%d in=%.3fs score=%.3f (%s)",
|
||
beat.beat_id,
|
||
scene.scene_id,
|
||
aligned_in_s,
|
||
score,
|
||
repair_reason,
|
||
)
|
||
new_results.append(MatchResult(
|
||
beat_id=beat.beat_id,
|
||
scene_id=scene.scene_id,
|
||
source_path=scene.source_path,
|
||
in_point_s=aligned_in_s,
|
||
out_point_s=aligned_in_s + usable_duration_s,
|
||
in_point_frame=int(aligned_in_s * cfg.export.edl_frame_rate),
|
||
match_score=score,
|
||
match_location=(0, 0),
|
||
is_confirmed=score >= cfg.cv.deep_scan.match_threshold,
|
||
segments=tuple(),
|
||
))
|
||
|
||
return sorted(new_results, key=lambda r: r.beat_id)
|
||
|
||
|
||
def _filter_semantically_invalid_vision_matches(results: list, beats: list, cfg) -> list:
|
||
"""Drop vision-enabled matches whose final action phase contradicts the beat."""
|
||
if not cfg.vision.enabled or not results:
|
||
return results
|
||
|
||
from dataclasses import replace
|
||
from src.llm.vision_cache import find_action_window_in_scene, validate_match_window_with_vision
|
||
from src.cv.scene_indexer import build_scene_index
|
||
from src.cv.global_scan import align_in_point_by_content_and_motion, estimate_usable_source_duration
|
||
|
||
logger = logging.getLogger(__name__)
|
||
beats_by_id = {beat.beat_id: beat for beat in beats}
|
||
scenes_by_id = {scene.scene_id: scene for scene in build_scene_index(cfg)}
|
||
|
||
def visible_content_offset(action_beat, segment_start_offset_s: float) -> float:
|
||
content_offset_s = 0.0
|
||
for start_s, end_s in _reference_scoreable_segments(action_beat, cfg):
|
||
if end_s <= segment_start_offset_s:
|
||
content_offset_s += max(0.0, end_s - start_s)
|
||
elif start_s < segment_start_offset_s:
|
||
content_offset_s += max(0.0, segment_start_offset_s - start_s)
|
||
break
|
||
else:
|
||
break
|
||
return content_offset_s
|
||
|
||
def realign_window(check_beat, scene_id: int, action_beat=None):
|
||
scene = scenes_by_id.get(scene_id)
|
||
if scene is None:
|
||
return None
|
||
segment_window = find_action_window_in_scene(check_beat, scene, cfg)
|
||
if action_beat is not None and action_beat is not check_beat:
|
||
beat_window = find_action_window_in_scene(action_beat, scene, cfg)
|
||
else:
|
||
beat_window = None
|
||
use_beat_context = False
|
||
if segment_window is None:
|
||
found = beat_window
|
||
use_beat_context = beat_window is not None
|
||
elif beat_window is None:
|
||
found = segment_window
|
||
elif beat_window[2] > segment_window[2] + 0.06:
|
||
found = beat_window
|
||
use_beat_context = True
|
||
else:
|
||
found = segment_window
|
||
if found is None:
|
||
return None
|
||
start_s, end_s, semantic_score, reason = found
|
||
if use_beat_context:
|
||
segment_start_offset_s = max(0.0, check_beat.start_s - action_beat.start_s)
|
||
content_offset_s = visible_content_offset(action_beat, segment_start_offset_s)
|
||
start_s += content_offset_s
|
||
end_s += content_offset_s
|
||
window_s = max(3.0, min(8.0, (end_s - start_s) * 4.0))
|
||
aligned_in_s, combined_score, content_score, motion_score = align_in_point_by_content_and_motion(
|
||
check_beat,
|
||
start_s,
|
||
cfg,
|
||
search_window_s=window_s,
|
||
)
|
||
aligned_in_s = max(scene.start_s, min(aligned_in_s, max(scene.start_s, scene.end_s - check_beat.duration_s)))
|
||
usable_duration_s, usable_score = estimate_usable_source_duration(check_beat, aligned_in_s, cfg)
|
||
usable_duration_s = max(0.0, min(check_beat.duration_s, usable_duration_s))
|
||
if usable_duration_s < max(0.32, check_beat.duration_s * 0.45):
|
||
usable_duration_s = check_beat.duration_s
|
||
ok, verify_reason = validate_match_window_with_vision(
|
||
check_beat,
|
||
source_path=scene.source_path,
|
||
scene_id=scene.scene_id,
|
||
in_point_s=aligned_in_s,
|
||
out_point_s=aligned_in_s + usable_duration_s,
|
||
cfg=cfg,
|
||
)
|
||
if not ok:
|
||
logger.info(
|
||
"Beat %d: action-window realign rejected scene=%d in=%.3fs (%s)",
|
||
check_beat.beat_id,
|
||
scene.scene_id,
|
||
aligned_in_s,
|
||
verify_reason,
|
||
)
|
||
return None
|
||
score = max(
|
||
combined_score,
|
||
min(0.99, semantic_score * 0.65 + motion_score * 0.18 + content_score * 0.09 + usable_score * 0.08),
|
||
)
|
||
return scene, aligned_in_s, usable_duration_s, score, f"{reason}; {verify_reason}"
|
||
|
||
kept = []
|
||
for result in results:
|
||
beat = beats_by_id.get(result.beat_id)
|
||
if beat is None:
|
||
kept.append(result)
|
||
continue
|
||
|
||
kept_before = len(kept)
|
||
try:
|
||
_filter_repair_one(result, beat, beats_by_id, scenes_by_id, kept, cfg, realign_window, validate_match_window_with_vision, logger)
|
||
except Exception as exc:
|
||
logger.warning(
|
||
"Beat %d: vision filter/repair failed (%s); keeping previous cached match.",
|
||
result.beat_id,
|
||
exc,
|
||
)
|
||
del kept[kept_before:]
|
||
kept.append(result)
|
||
return kept
|
||
|
||
|
||
def _filter_repair_one(result, beat, beats_by_id, scenes_by_id, kept, cfg, realign_window, validate_match_window_with_vision, logger):
|
||
from dataclasses import replace
|
||
if True:
|
||
windows = []
|
||
if getattr(result, "segments", ()):
|
||
for segment in result.segments:
|
||
segment_beat = replace(
|
||
beat,
|
||
start_s=beat.start_s + segment.trailer_offset_s,
|
||
end_s=beat.start_s + segment.trailer_offset_s + segment.duration_s,
|
||
)
|
||
windows.append((
|
||
segment_beat,
|
||
segment.scene_id,
|
||
segment.in_point_s,
|
||
segment.out_point_s,
|
||
))
|
||
else:
|
||
windows.append((beat, result.scene_id, result.in_point_s, result.out_point_s))
|
||
|
||
valid = True
|
||
reasons: list[str] = []
|
||
for check_beat, scene_id, in_point_s, out_point_s in windows:
|
||
ok, reason = validate_match_window_with_vision(
|
||
check_beat,
|
||
source_path=result.source_path,
|
||
scene_id=scene_id,
|
||
in_point_s=in_point_s,
|
||
out_point_s=out_point_s,
|
||
cfg=cfg,
|
||
)
|
||
reasons.append(reason)
|
||
if not ok:
|
||
valid = False
|
||
break
|
||
if valid:
|
||
repaired = False
|
||
if getattr(result, "segments", ()):
|
||
new_segments = []
|
||
repair_reasons = []
|
||
changed = False
|
||
for segment in result.segments:
|
||
scene = scenes_by_id.get(segment.scene_id)
|
||
# Allow phase-realign whenever the scene has any meaningful
|
||
# slack beyond the segment, not only for "long" scenes.
|
||
# Short scenes don't need realigning because the segment
|
||
# essentially is the scene.
|
||
if scene is None or scene.duration_s <= segment.duration_s + 0.5:
|
||
new_segments.append(segment)
|
||
continue
|
||
# For already-confirmed segments, skip the realign to avoid
|
||
# destabilizing a strong original match.
|
||
if segment.is_confirmed and scene.duration_s <= max(segment.duration_s * 1.6, 6.0):
|
||
new_segments.append(segment)
|
||
continue
|
||
segment_beat = replace(
|
||
beat,
|
||
start_s=beat.start_s + segment.trailer_offset_s,
|
||
end_s=beat.start_s + segment.trailer_offset_s + segment.duration_s,
|
||
)
|
||
repair = realign_window(segment_beat, segment.scene_id, action_beat=beat)
|
||
if repair is None:
|
||
new_segments.append(segment)
|
||
continue
|
||
repair_scene, aligned_in_s, usable_duration_s, score, repair_reason = repair
|
||
if abs(aligned_in_s - segment.in_point_s) <= 1.0 / cfg.export.edl_frame_rate:
|
||
new_segments.append(segment)
|
||
continue
|
||
# Don't commit a repair that scores meaningfully worse than
|
||
# the original; phase realign should improve, not regress.
|
||
if score < segment.match_score - 0.02:
|
||
new_segments.append(segment)
|
||
continue
|
||
changed = True
|
||
repair_reasons.append(repair_reason)
|
||
new_segments.append(replace(
|
||
segment,
|
||
scene_id=repair_scene.scene_id,
|
||
in_point_s=aligned_in_s,
|
||
out_point_s=aligned_in_s + usable_duration_s,
|
||
duration_s=usable_duration_s,
|
||
match_score=score,
|
||
is_confirmed=score >= cfg.cv.deep_scan.match_threshold,
|
||
))
|
||
if changed and new_segments:
|
||
first = new_segments[0]
|
||
repaired_score = min(seg.match_score for seg in new_segments)
|
||
logger.info(
|
||
"Beat %d: realigned semantically valid long scene by motion/action windows (%s)",
|
||
result.beat_id,
|
||
"; ".join(repair_reasons),
|
||
)
|
||
kept.append(replace(
|
||
result,
|
||
scene_id=first.scene_id,
|
||
in_point_s=first.in_point_s,
|
||
out_point_s=first.out_point_s,
|
||
in_point_frame=int(first.in_point_s * cfg.export.edl_frame_rate),
|
||
match_score=repaired_score,
|
||
is_confirmed=repaired_score >= cfg.cv.deep_scan.match_threshold,
|
||
segments=tuple(new_segments),
|
||
))
|
||
repaired = True
|
||
else:
|
||
scene = scenes_by_id.get(result.scene_id)
|
||
wide_scene = (
|
||
scene is not None
|
||
and scene.duration_s > result.duration_s + 0.5
|
||
)
|
||
already_confirmed_in_tight_scene = (
|
||
result.is_confirmed
|
||
and scene is not None
|
||
and scene.duration_s <= max(result.duration_s * 1.6, 6.0)
|
||
)
|
||
if wide_scene and not already_confirmed_in_tight_scene:
|
||
repair = realign_window(beat, result.scene_id)
|
||
if repair is not None:
|
||
repair_scene, aligned_in_s, usable_duration_s, score, repair_reason = repair
|
||
moved = abs(aligned_in_s - result.in_point_s) > 1.0 / cfg.export.edl_frame_rate
|
||
improved = score >= result.match_score - 0.02
|
||
if moved and improved:
|
||
logger.info(
|
||
"Beat %d: realigned semantically valid long scene by motion/action window (%s)",
|
||
result.beat_id,
|
||
repair_reason,
|
||
)
|
||
kept.append(replace(
|
||
result,
|
||
scene_id=repair_scene.scene_id,
|
||
in_point_s=aligned_in_s,
|
||
out_point_s=aligned_in_s + usable_duration_s,
|
||
in_point_frame=int(aligned_in_s * cfg.export.edl_frame_rate),
|
||
match_score=score,
|
||
is_confirmed=score >= cfg.cv.deep_scan.match_threshold,
|
||
))
|
||
repaired = True
|
||
if not repaired:
|
||
kept.append(result)
|
||
else:
|
||
if getattr(result, "segments", ()):
|
||
new_segments = []
|
||
all_repaired = True
|
||
repair_reasons = []
|
||
for segment in result.segments:
|
||
segment_beat = replace(
|
||
beat,
|
||
start_s=beat.start_s + segment.trailer_offset_s,
|
||
end_s=beat.start_s + segment.trailer_offset_s + segment.duration_s,
|
||
)
|
||
repair = realign_window(segment_beat, segment.scene_id, action_beat=beat)
|
||
if repair is None:
|
||
all_repaired = False
|
||
break
|
||
scene, aligned_in_s, usable_duration_s, score, repair_reason = repair
|
||
repair_reasons.append(repair_reason)
|
||
new_segments.append(replace(
|
||
segment,
|
||
scene_id=scene.scene_id,
|
||
in_point_s=aligned_in_s,
|
||
out_point_s=aligned_in_s + usable_duration_s,
|
||
duration_s=usable_duration_s,
|
||
match_score=score,
|
||
is_confirmed=score >= cfg.cv.deep_scan.match_threshold,
|
||
))
|
||
if all_repaired and new_segments:
|
||
first = new_segments[0]
|
||
repaired_score = min(seg.match_score for seg in new_segments)
|
||
logger.info(
|
||
"Beat %d: realigned inside matched scene by vision action windows (%s)",
|
||
result.beat_id,
|
||
"; ".join(repair_reasons),
|
||
)
|
||
kept.append(replace(
|
||
result,
|
||
scene_id=first.scene_id,
|
||
in_point_s=first.in_point_s,
|
||
out_point_s=first.out_point_s,
|
||
in_point_frame=int(first.in_point_s * cfg.export.edl_frame_rate),
|
||
match_score=repaired_score,
|
||
is_confirmed=repaired_score >= cfg.cv.deep_scan.match_threshold,
|
||
segments=tuple(new_segments),
|
||
))
|
||
return
|
||
else:
|
||
repair = realign_window(beat, result.scene_id)
|
||
if repair is not None:
|
||
scene, aligned_in_s, usable_duration_s, score, repair_reason = repair
|
||
logger.info(
|
||
"Beat %d: realigned inside matched scene by vision action window (%s)",
|
||
result.beat_id,
|
||
repair_reason,
|
||
)
|
||
kept.append(replace(
|
||
result,
|
||
scene_id=scene.scene_id,
|
||
in_point_s=aligned_in_s,
|
||
out_point_s=aligned_in_s + usable_duration_s,
|
||
in_point_frame=int(aligned_in_s * cfg.export.edl_frame_rate),
|
||
match_score=score,
|
||
is_confirmed=score >= cfg.cv.deep_scan.match_threshold,
|
||
))
|
||
return
|
||
logger.warning(
|
||
"Beat %d: rejected by vision action-phase verification (%s)",
|
||
result.beat_id,
|
||
"; ".join(reasons),
|
||
)
|
||
|
||
|
||
def _attach_visual_segments(results: list, beats: list, cfg) -> list:
|
||
"""Attach automatic sub-shot matches for multi-island trailer beats."""
|
||
from dataclasses import replace
|
||
from src.core.models import MatchResult, MatchSegment
|
||
from src.cv.global_scan import run_global_scan
|
||
|
||
by_id = {b.beat_id: b for b in beats}
|
||
expanded: list[MatchResult] = []
|
||
for result in results:
|
||
beat = by_id.get(result.beat_id)
|
||
if beat is None:
|
||
expanded.append(result)
|
||
continue
|
||
if getattr(result, "segments", ()):
|
||
expanded.append(result)
|
||
continue
|
||
|
||
islands = _reference_scoreable_segments(beat, cfg)
|
||
if len(islands) <= 1:
|
||
primary = MatchSegment(
|
||
trailer_offset_s=0.0,
|
||
duration_s=max(0.0, result.duration_s),
|
||
scene_id=result.scene_id,
|
||
in_point_s=result.in_point_s,
|
||
out_point_s=result.out_point_s,
|
||
match_score=result.match_score,
|
||
is_confirmed=result.is_confirmed,
|
||
)
|
||
expanded.append(replace(result, segments=(primary,)))
|
||
continue
|
||
|
||
segments: list[MatchSegment] = []
|
||
first_start, first_end = islands[0]
|
||
first_duration = min(max(0.0, result.duration_s), max(0.0, first_end - first_start))
|
||
segments.append(
|
||
MatchSegment(
|
||
trailer_offset_s=first_start,
|
||
duration_s=first_duration,
|
||
scene_id=result.scene_id,
|
||
in_point_s=result.in_point_s,
|
||
out_point_s=result.in_point_s + first_duration,
|
||
match_score=result.match_score,
|
||
is_confirmed=result.is_confirmed,
|
||
)
|
||
)
|
||
|
||
for start_s, end_s in islands[1:]:
|
||
segment_beat = replace(
|
||
beat,
|
||
start_s=beat.start_s + start_s,
|
||
end_s=beat.start_s + end_s,
|
||
)
|
||
segment_matches = run_global_scan([segment_beat], cfg, seed_in_points=None)
|
||
if not segment_matches:
|
||
continue
|
||
seg = segment_matches[0]
|
||
seg_dur = min(max(0.0, end_s - start_s), max(0.0, seg.duration_s))
|
||
segments.append(
|
||
MatchSegment(
|
||
trailer_offset_s=start_s,
|
||
duration_s=seg_dur,
|
||
scene_id=seg.scene_id,
|
||
in_point_s=seg.in_point_s,
|
||
out_point_s=seg.in_point_s + seg_dur,
|
||
match_score=seg.match_score,
|
||
is_confirmed=seg.is_confirmed,
|
||
)
|
||
)
|
||
|
||
expanded.append(replace(result, segments=tuple(segments)))
|
||
return expanded
|
||
|
||
|
||
def _fast_vision_match_cfg(cfg):
|
||
"""Return a vision-seed prepass config that still keeps quality settings."""
|
||
from dataclasses import replace
|
||
|
||
return replace(
|
||
cfg,
|
||
cv=replace(
|
||
cfg.cv,
|
||
deep_scan=replace(cfg.cv.deep_scan, skip_coarse_scan_with_weighted_seeds=True),
|
||
),
|
||
vision=replace(
|
||
cfg.vision,
|
||
fullscan_fallback=False,
|
||
),
|
||
)
|
||
|
||
|
||
def _run_segment_match(segment_beat, continuity, cfg, allow_fullscan: bool = True):
|
||
"""Match one visual island with the same generic staged strategy as a beat."""
|
||
from src.pipeline.matcher import run_matching
|
||
|
||
if cfg.vision.enabled:
|
||
fast_cfg = _fast_vision_match_cfg(cfg)
|
||
fast_matches = run_matching(
|
||
fast_cfg,
|
||
[segment_beat],
|
||
seed_in_points=continuity,
|
||
)
|
||
if fast_matches:
|
||
if not allow_fullscan or all(
|
||
m.is_confirmed or m.match_score >= cfg.cv.deep_scan.match_threshold
|
||
for m in fast_matches
|
||
):
|
||
return fast_matches
|
||
|
||
if not allow_fullscan:
|
||
return fast_matches if cfg.vision.enabled else []
|
||
|
||
full_matches = run_matching(
|
||
cfg,
|
||
[segment_beat],
|
||
seed_in_points=continuity,
|
||
)
|
||
return _merge_best_results(fast_matches if cfg.vision.enabled else [], full_matches, cfg)
|
||
|
||
|
||
def _match_unmatched_visual_segments(
|
||
results: list,
|
||
beats: list,
|
||
cached: list,
|
||
cfg,
|
||
skip_global_segment_scan_for: set[int] | None = None,
|
||
) -> list:
|
||
"""Create segmented provisional matches when a whole beat has no single match."""
|
||
from dataclasses import replace
|
||
from src.core.models import MatchResult, MatchSegment
|
||
from src.cv.frame_extractor import get_video_info
|
||
|
||
matched_ids = {r.beat_id for r in results}
|
||
expanded = list(results)
|
||
skip_global_segment_scan_for = skip_global_segment_scan_for or set()
|
||
try:
|
||
fps = float(get_video_info(cfg.paths.source_movie)["fps"]) or cfg.export.edl_frame_rate
|
||
except Exception:
|
||
fps = cfg.export.edl_frame_rate
|
||
|
||
for beat in beats:
|
||
if beat.beat_id in matched_ids:
|
||
continue
|
||
|
||
islands = _reference_scoreable_segments(beat, cfg)
|
||
if not islands:
|
||
continue
|
||
|
||
segments: list[MatchSegment] = []
|
||
for start_s, end_s in islands:
|
||
segment_beat = replace(
|
||
beat,
|
||
start_s=beat.start_s + start_s,
|
||
end_s=beat.start_s + end_s,
|
||
)
|
||
continuity = _continuity_seed_in_points(
|
||
beat.beat_id,
|
||
[b if b.beat_id != beat.beat_id else segment_beat for b in beats],
|
||
cached + expanded,
|
||
cfg,
|
||
)
|
||
segment_matches = []
|
||
if beat.beat_id not in skip_global_segment_scan_for:
|
||
segment_matches = _run_segment_match(segment_beat, continuity, cfg, allow_fullscan=True)
|
||
if not segment_matches:
|
||
local_segment = _local_same_scene_segment_match(
|
||
segment_beat,
|
||
beat,
|
||
start_s,
|
||
cached + expanded,
|
||
cfg,
|
||
)
|
||
if local_segment is not None:
|
||
segments.append(local_segment)
|
||
continue
|
||
seg = segment_matches[0]
|
||
seg_dur = min(max(0.0, end_s - start_s), max(0.0, seg.duration_s))
|
||
segments.append(
|
||
MatchSegment(
|
||
trailer_offset_s=start_s,
|
||
duration_s=seg_dur,
|
||
scene_id=seg.scene_id,
|
||
in_point_s=seg.in_point_s,
|
||
out_point_s=seg.in_point_s + seg_dur,
|
||
match_score=seg.match_score,
|
||
is_confirmed=seg.is_confirmed,
|
||
)
|
||
)
|
||
|
||
if not segments:
|
||
continue
|
||
|
||
first = segments[0]
|
||
total_segment_duration = sum(max(0.0, s.duration_s) for s in segments)
|
||
score = (
|
||
sum(max(0.0, s.duration_s) * s.match_score for s in segments) / total_segment_duration
|
||
if total_segment_duration > 0 else min(s.match_score for s in segments)
|
||
)
|
||
expanded.append(
|
||
MatchResult(
|
||
beat_id=beat.beat_id,
|
||
scene_id=first.scene_id,
|
||
source_path=cfg.paths.source_movie,
|
||
in_point_s=first.in_point_s,
|
||
out_point_s=first.out_point_s,
|
||
in_point_frame=int(max(0.0, first.in_point_s) * fps),
|
||
match_score=score,
|
||
is_confirmed=all(s.is_confirmed for s in segments),
|
||
segments=tuple(segments),
|
||
)
|
||
)
|
||
|
||
return expanded
|
||
|
||
|
||
def _local_same_scene_segment_match(segment_beat, beat, segment_offset_s: float, cached: list, cfg):
|
||
"""Find a short trailer island inside scenes adjacent to neighbouring beat matches."""
|
||
from src.core.models import MatchSegment
|
||
from src.cv.frame_extractor import open_video
|
||
from src.cv.global_scan import _content_alignment_score, _content_alignment_templates
|
||
|
||
scenes = _load_scene_cache_light(cfg)
|
||
if not scenes:
|
||
return None
|
||
|
||
by_id = {r.beat_id: r for r in cached}
|
||
scene_ids: list[int] = []
|
||
for neighbour_id in (beat.beat_id - 1, beat.beat_id + 1):
|
||
result = by_id.get(neighbour_id)
|
||
if result is None:
|
||
continue
|
||
ids = [getattr(s, "scene_id", result.scene_id) for s in getattr(result, "segments", ())] or [result.scene_id]
|
||
for scene_id in ids:
|
||
if scene_id not in scene_ids:
|
||
scene_ids.append(scene_id)
|
||
|
||
if not scene_ids:
|
||
return None
|
||
|
||
templates = _content_alignment_templates(segment_beat, cfg)
|
||
if not templates:
|
||
return None
|
||
|
||
min_score = min(
|
||
cfg.cv.deep_scan.provisional_content_threshold * 0.70,
|
||
cfg.cv.deep_scan.provisional_match_threshold,
|
||
)
|
||
step_s = max(1.0 / cfg.export.edl_frame_rate, 0.04)
|
||
best: tuple[float, float, int] | None = None
|
||
with open_video(cfg.paths.source_movie) as cap:
|
||
for scene_id in scene_ids:
|
||
scene = next((s for s in scenes if int(s["scene_id"]) == int(scene_id)), None)
|
||
if scene is None:
|
||
continue
|
||
start_s = max(0.0, float(scene["start_s"]) - 0.25)
|
||
end_s = max(start_s, float(scene["end_s"]) - max(0.04, segment_beat.duration_s) + 0.25)
|
||
t = start_s
|
||
while t <= end_s:
|
||
score = _content_alignment_score(cap, t, templates, cfg)
|
||
if best is None or score > best[0]:
|
||
best = (score, t, int(scene_id))
|
||
t = round(t + step_s, 6)
|
||
|
||
if best is None or best[0] < min_score:
|
||
return None
|
||
|
||
score, in_point_s, scene_id = best
|
||
duration_s = max(0.0, min(segment_beat.duration_s, segment_beat.end_s - segment_beat.start_s))
|
||
return MatchSegment(
|
||
trailer_offset_s=segment_offset_s,
|
||
duration_s=duration_s,
|
||
scene_id=scene_id,
|
||
in_point_s=in_point_s,
|
||
out_point_s=in_point_s + duration_s,
|
||
match_score=score,
|
||
is_confirmed=score >= cfg.cv.deep_scan.match_threshold,
|
||
)
|
||
|
||
|
||
def cmd_match(args: argparse.Namespace, cfg) -> list:
|
||
from src.pipeline.matcher import run_matching
|
||
from dataclasses import replace
|
||
|
||
if getattr(args, "vision", False):
|
||
cfg = replace(cfg, vision=replace(cfg.vision, enabled=True))
|
||
if getattr(args, "no_vision", False):
|
||
cfg = replace(cfg, vision=replace(cfg.vision, enabled=False))
|
||
|
||
all_beats = _load_beats(cfg)
|
||
beats = _select_beats(all_beats, getattr(args, "beat", None))
|
||
cached = _normalize_cached_results(all_beats, _load_results(cfg), cfg) if _results_cache_path(cfg).exists() else []
|
||
multi_island_beat_ids = {
|
||
beat.beat_id
|
||
for beat in beats
|
||
if len(_reference_scoreable_segments(beat, cfg)) > 1
|
||
}
|
||
scan_beats, single_island_trims = _trim_beats_to_single_visual_island(beats, cfg)
|
||
scan_beats = [b for b in scan_beats if b.beat_id not in multi_island_beat_ids]
|
||
seed_in_points = (
|
||
_continuity_seed_in_points(args.beat, all_beats, cached, cfg)
|
||
if getattr(args, "beat", None) is not None
|
||
else None
|
||
)
|
||
results = []
|
||
if cfg.vision.enabled:
|
||
fast_cfg = _fast_vision_match_cfg(cfg)
|
||
results = run_matching(
|
||
fast_cfg,
|
||
scan_beats,
|
||
force_reindex=args.force_reindex,
|
||
seed_in_points=seed_in_points,
|
||
)
|
||
|
||
if len(results) < len(scan_beats) or any(
|
||
not r.is_confirmed and r.match_score < cfg.cv.deep_scan.match_threshold
|
||
for r in results
|
||
):
|
||
results_by_id = {r.beat_id: r for r in results}
|
||
remaining_beats = [
|
||
b for b in scan_beats
|
||
if (
|
||
b.beat_id not in results_by_id
|
||
or (
|
||
not results_by_id[b.beat_id].is_confirmed
|
||
and results_by_id[b.beat_id].match_score < cfg.cv.deep_scan.match_threshold
|
||
)
|
||
)
|
||
]
|
||
if remaining_beats:
|
||
full_results = run_matching(
|
||
cfg,
|
||
remaining_beats,
|
||
force_reindex=args.force_reindex,
|
||
seed_in_points=seed_in_points,
|
||
)
|
||
results = _merge_best_results(results, full_results, cfg)
|
||
results = _apply_single_island_segments(results, single_island_trims)
|
||
results = _match_unmatched_visual_segments(
|
||
results,
|
||
beats,
|
||
cached,
|
||
cfg,
|
||
skip_global_segment_scan_for=set(single_island_trims),
|
||
)
|
||
results = _attach_visual_segments(results, beats, cfg)
|
||
results = _filter_semantically_invalid_vision_matches(results, beats, cfg)
|
||
results = _recover_unmatched_beats_via_vision(results, beats, cfg)
|
||
|
||
# A targeted one-beat match should improve the cache without deleting
|
||
# automatic matches for other beats.
|
||
if getattr(args, "beat", None) is not None and _results_cache_path(cfg).exists():
|
||
cached = [r for r in cached if r.beat_id != args.beat]
|
||
for result in results:
|
||
cached = _update_result(result, cached)
|
||
results_to_save = cached
|
||
else:
|
||
results_to_save = results
|
||
|
||
_save_results(results_to_save, cfg)
|
||
_regenerate_cutter_report(cfg)
|
||
|
||
print(f"\n✅ {len(results)} / {len(beats)} beats matched.")
|
||
for r in results:
|
||
print(f" Beat {r.beat_id:03d} → scene {r.scene_id:04d} "
|
||
f"in={r.in_point_s:>8.3f}s score={r.match_score:.3f}")
|
||
return results
|
||
|
||
|
||
def _update_result(new_result, results: list) -> list:
|
||
"""Replace or insert a MatchResult in the list (by beat_id)."""
|
||
updated = [r for r in results if r.beat_id != new_result.beat_id]
|
||
updated.append(new_result)
|
||
return sorted(updated, key=lambda r: r.beat_id)
|
||
|
||
|
||
def _continuity_seed_in_points(beat_id: int, beats: list, results: list, cfg) -> dict[int, list[float | tuple[float, float]]]:
|
||
beats_by_id = {b.beat_id: b for b in beats}
|
||
results_by_id = {r.beat_id: r for r in results}
|
||
target = beats_by_id.get(beat_id)
|
||
if target is None:
|
||
return {}
|
||
|
||
seeds: list[tuple[float, float]] = []
|
||
base_score = max(cfg.cv.deep_scan.coarse_candidate_threshold + 0.08, 0.92)
|
||
prev_matches = [
|
||
(b, results_by_id[b.beat_id])
|
||
for b in beats
|
||
if b.beat_id < beat_id and b.beat_id in results_by_id
|
||
]
|
||
if prev_matches:
|
||
prev_beat, prev_result = max(prev_matches, key=lambda item: item[0].beat_id)
|
||
trailer_gap_s = max(0.0, target.start_s - prev_beat.end_s)
|
||
expected = prev_result.out_point_s + trailer_gap_s
|
||
for offset in cfg.cv.deep_scan.continuity_seed_offsets_s:
|
||
offset_score = max(
|
||
cfg.cv.deep_scan.coarse_candidate_threshold,
|
||
base_score - abs(offset) * 0.06,
|
||
)
|
||
seeds.append((expected + offset, offset_score))
|
||
|
||
next_matches = [
|
||
(b, results_by_id[b.beat_id])
|
||
for b in beats
|
||
if b.beat_id > beat_id and b.beat_id in results_by_id
|
||
]
|
||
if next_matches:
|
||
next_beat, next_result = min(next_matches, key=lambda item: item[0].beat_id)
|
||
trailer_gap_s = max(0.0, next_beat.start_s - target.end_s)
|
||
expected = next_result.in_point_s - trailer_gap_s - target.duration_s
|
||
for offset in cfg.cv.deep_scan.continuity_seed_offsets_s:
|
||
offset_score = max(
|
||
cfg.cv.deep_scan.coarse_candidate_threshold,
|
||
base_score - abs(offset) * 0.06,
|
||
)
|
||
seeds.append((expected - offset, offset_score))
|
||
|
||
unique: dict[float, float] = {}
|
||
for seed_t, seed_score in seeds:
|
||
rounded = round(max(0.0, seed_t), 3)
|
||
unique[rounded] = max(unique.get(rounded, 0.0), seed_score)
|
||
points = [(seed_t, score) for seed_t, score in sorted(unique.items())]
|
||
return {beat_id: points} if points else {}
|
||
|
||
|
||
def cmd_rematch(args: argparse.Namespace, cfg) -> None:
|
||
"""
|
||
Re-run automatic matching for ONE beat.
|
||
|
||
python cli.py rematch --beat 5 # re-scan CV for beat 5
|
||
python cli.py rematch --beat 5 --threshold 0.40 # relax threshold
|
||
"""
|
||
|
||
beat_id = args.beat
|
||
beats = _load_beats(cfg)
|
||
results = _load_results(cfg) if _results_cache_path(cfg).exists() else []
|
||
|
||
beat = next((b for b in beats if b.beat_id == beat_id), None)
|
||
if beat is None:
|
||
print(f"\u274c Beat {beat_id} not found. Run 'analyze' first.")
|
||
return
|
||
|
||
# ---- Refine an already acceptable cached match -------------------------
|
||
if args.refine:
|
||
current = next((r for r in results if r.beat_id == beat_id), None)
|
||
if current is None:
|
||
print(f"❌ Beat {beat_id} has no cached match to refine. Run 'match --beat {beat_id}' first.")
|
||
return
|
||
|
||
from src.cv.content_align import align_cached_match_by_content
|
||
refined_in_s, sequence_score = align_cached_match_by_content(
|
||
beat,
|
||
current.in_point_s,
|
||
cfg,
|
||
search_window_s=args.refine_window,
|
||
)
|
||
usable_duration_s = max(0.0, current.out_point_s - current.in_point_s)
|
||
span_score = sequence_score
|
||
scene_data = _scene_for_time_light(_load_scene_cache_light(cfg), refined_in_s, cfg)
|
||
out_point_s = refined_in_s + usable_duration_s
|
||
if scene_data is not None:
|
||
out_point_s = min(out_point_s, float(scene_data["end_s"]))
|
||
matchable_duration_s = beat.duration_s
|
||
duration_coverage = (
|
||
max(0.0, out_point_s - refined_in_s) / matchable_duration_s
|
||
if matchable_duration_s > 0 else 0.0
|
||
)
|
||
if duration_coverage < cfg.cv.deep_scan.min_duration_coverage:
|
||
print(
|
||
f"❌ Beat {beat_id} refined candidate rejected: "
|
||
f"duration coverage {duration_coverage:.0%} < "
|
||
f"{cfg.cv.deep_scan.min_duration_coverage:.0%}"
|
||
)
|
||
return
|
||
|
||
try:
|
||
from src.cv.frame_extractor import get_video_info
|
||
fps = float(get_video_info(cfg.paths.source_movie)["fps"]) or cfg.export.edl_frame_rate
|
||
except Exception:
|
||
fps = cfg.export.edl_frame_rate
|
||
|
||
from src.core.models import MatchResult
|
||
refined = MatchResult(
|
||
beat_id=beat_id,
|
||
scene_id=int(scene_data["scene_id"]) if scene_data is not None else current.scene_id,
|
||
source_path=current.source_path,
|
||
in_point_s=max(0.0, refined_in_s),
|
||
out_point_s=out_point_s,
|
||
in_point_frame=int(max(0.0, refined_in_s) * fps),
|
||
match_score=max(sequence_score, span_score),
|
||
match_location=current.match_location,
|
||
is_confirmed=max(sequence_score, span_score) >= cfg.cv.deep_scan.match_threshold,
|
||
)
|
||
results = _update_result(refined, results)
|
||
_save_results(results, cfg)
|
||
print(
|
||
f"✅ Beat {beat_id} refined → "
|
||
f"in={refined.in_point_s:.3f}s, out={refined.out_point_s:.3f}s, "
|
||
f"sequence_score={refined.match_score:.3f}"
|
||
)
|
||
return
|
||
|
||
# ---- Re-run CV with optional threshold override ------------------------
|
||
from dataclasses import replace as dc_replace
|
||
run_cfg = cfg
|
||
if args.threshold is not None:
|
||
run_cfg = dc_replace(
|
||
cfg,
|
||
cv=dc_replace(
|
||
cfg.cv,
|
||
deep_scan=dc_replace(cfg.cv.deep_scan, match_threshold=args.threshold),
|
||
),
|
||
)
|
||
print(f"ℹ️ threshold overridden to {args.threshold} for beat {beat_id}")
|
||
|
||
from src.cv.global_scan import run_global_scan
|
||
seed_in_points = _continuity_seed_in_points(beat_id, beats, results, run_cfg)
|
||
matches = run_global_scan([beat], run_cfg, seed_in_points=seed_in_points)
|
||
|
||
if not matches:
|
||
print(f"❌ Beat {beat_id}: no match. Try --threshold 0.40.")
|
||
return
|
||
|
||
match = matches[0]
|
||
results = _update_result(match, results)
|
||
_save_results(results, cfg)
|
||
print(f"✅ Beat {beat_id} rematched → (in={match.in_point_s:.3f}s, score={match.match_score:.3f})")
|
||
|
||
|
||
def cmd_report(args: argparse.Namespace, cfg) -> None:
|
||
from src.pipeline.reporter import generate_report
|
||
beats = _select_beats(_load_beats(cfg), getattr(args, "beat", None))
|
||
beat_ids = {b.beat_id for b in beats} if getattr(args, "beat", None) is not None else None
|
||
results = _select_results(_normalize_cached_results(_load_beats(cfg), _load_results(cfg), cfg), beat_ids)
|
||
out = generate_report(beats, results, cfg)
|
||
if getattr(args, "beat", None) is not None and not results:
|
||
print(
|
||
f"\n⚠️ Beat {args.beat} has no cached match yet. "
|
||
f"Run: python cli.py match --beat {args.beat}"
|
||
)
|
||
print(f"\n\u2705 Report \u2192 {out}")
|
||
|
||
|
||
def cmd_export(args: argparse.Namespace, cfg) -> None:
|
||
from src.export.edl_writer import write_edl
|
||
from src.export.fcpxml_writer import write_fcpxml
|
||
from src.pipeline.matcher import build_timeline
|
||
|
||
beats = _select_beats(_load_beats(cfg), getattr(args, "beat", None))
|
||
beat_ids = {b.beat_id for b in beats} if getattr(args, "beat", None) is not None else None
|
||
results = _select_results(_normalize_cached_results(_load_beats(cfg), _load_results(cfg), cfg), beat_ids)
|
||
if getattr(args, "beat", None) is not None and not results:
|
||
print(f"❌ Beat {args.beat} has no cached match. Run 'match --beat {args.beat}' first.")
|
||
return
|
||
timeline = build_timeline(beats, results, cfg)
|
||
|
||
fmt = args.format or cfg.export.output_format
|
||
beat_id = getattr(args, "beat", None)
|
||
out_stem = (
|
||
f"{cfg.paths.reference_trailer.stem}_beat_{beat_id:03d}"
|
||
if beat_id is not None
|
||
else timeline.title
|
||
)
|
||
|
||
if fmt in ("fcpxml", "both"):
|
||
out = write_fcpxml(timeline, cfg, output_path=cfg.paths.output_dir / f"{out_stem}.fcpxml")
|
||
print(f"✅ FCPXML → {out}")
|
||
|
||
if fmt in ("edl", "both"):
|
||
out = write_edl(timeline, cfg, output_path=cfg.paths.output_dir / f"{out_stem}.edl")
|
||
print(f"✅ EDL → {out}")
|
||
|
||
|
||
def cmd_run(args: argparse.Namespace, cfg) -> None:
|
||
"""Full pipeline: analyze → match → report → export."""
|
||
cmd_analyze(args, cfg)
|
||
cmd_match(args, cfg)
|
||
cmd_report(args, cfg)
|
||
cmd_export(args, cfg)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Argument parser
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _build_parser() -> argparse.ArgumentParser:
|
||
parser = argparse.ArgumentParser(
|
||
prog="ai-trailer",
|
||
description="AI Trailer Generator v2 — Pure CV scene matching",
|
||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||
)
|
||
parser.add_argument(
|
||
"--config", type=Path, default=Path("config.toml"),
|
||
metavar="CONFIG", help="Path to config.toml (default: ./config.toml)",
|
||
)
|
||
parser.add_argument(
|
||
"--log-level", default="INFO",
|
||
choices=["DEBUG", "INFO", "WARNING", "ERROR"],
|
||
help="Logging verbosity (default: INFO)",
|
||
)
|
||
|
||
sub = parser.add_subparsers(dest="command", required=True)
|
||
|
||
# analyze
|
||
p_analyze = sub.add_parser("analyze", help="Detect trailer beats + fingerprint")
|
||
p_analyze.add_argument("--no-audio", action="store_true",
|
||
help="Skip Whisper (only affects beat labels, not matching)")
|
||
p_analyze.add_argument("--no-llm", action="store_true",
|
||
help="Skip LLM classification (only affects beat labels)")
|
||
|
||
# match
|
||
p_match = sub.add_parser("match", help="Run 2-phase CV matching")
|
||
p_match.add_argument("--force-reindex", action="store_true",
|
||
help="Ignore scene cache and re-run PySceneDetect")
|
||
p_match.add_argument("--beat", type=int,
|
||
help="Match only one beat and merge it into the cached results")
|
||
p_match.add_argument("--vision", action="store_true",
|
||
help="Enable cached vision descriptions for extra automatic search seeds")
|
||
p_match.add_argument("--no-vision", action="store_true",
|
||
help="Disable vision seeding even if [vision].enabled is true")
|
||
|
||
# rematch
|
||
p_rematch = sub.add_parser("rematch", help="Re-run or override matching for one beat")
|
||
p_rematch.add_argument("--beat", type=int, required=True, help="Beat ID to rematch")
|
||
p_rematch.add_argument("--threshold", type=float, default=None, help="Override match_threshold")
|
||
p_rematch.add_argument("--refine", action="store_true",
|
||
help="Refine the cached match by measuring a local image-content offset")
|
||
p_rematch.add_argument("--refine-window", type=float, default=None,
|
||
help="Seconds to search around the cached in-point when using --refine")
|
||
|
||
# report
|
||
p_report = sub.add_parser("report", help="Generate HTML visual comparison report")
|
||
p_report.add_argument("--beat", type=int, help="Report only one beat")
|
||
|
||
# export
|
||
p_export = sub.add_parser("export", help="Export timeline from cached results")
|
||
p_export.add_argument("--format", choices=["fcpxml", "edl", "both"],
|
||
help="Override [export] output_format from config")
|
||
p_export.add_argument("--beat", type=int, help="Export only one beat")
|
||
|
||
# run
|
||
p_run = sub.add_parser("run", help="Full pipeline: analyze → match → export")
|
||
p_run.add_argument("--no-audio", action="store_true")
|
||
p_run.add_argument("--no-llm", action="store_true")
|
||
p_run.add_argument("--force-reindex", action="store_true")
|
||
p_run.add_argument("--vision", action="store_true")
|
||
p_run.add_argument("--no-vision", action="store_true")
|
||
p_run.add_argument("--format", choices=["fcpxml", "edl", "both"])
|
||
p_run.add_argument("--beat", type=int,
|
||
help="Run match/report/export for only one cached beat")
|
||
|
||
return parser
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Entry point
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def main() -> None:
|
||
_ensure_utf8_console()
|
||
parser = _build_parser()
|
||
args = parser.parse_args()
|
||
|
||
_setup_logging(args.log_level)
|
||
|
||
from src.core.config import load_config
|
||
cfg = load_config(args.config)
|
||
|
||
dispatch = {
|
||
"analyze": cmd_analyze,
|
||
"match": cmd_match,
|
||
"rematch": cmd_rematch,
|
||
"report": cmd_report,
|
||
"export": cmd_export,
|
||
"run": cmd_run,
|
||
}
|
||
|
||
handler = dispatch[args.command]
|
||
handler(args, cfg)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|