Files
aitrailer/cli.py
T
2026-05-02 14:11:27 +02:00

1219 lines
46 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
cli.py — AI Trailer Generator v2 — Command-Line Interface
Usage:
python cli.py analyze [--config CONFIG] [--no-audio] [--no-llm]
python cli.py match [--config CONFIG] [--force-reindex]
python cli.py rematch --beat N [--threshold F] [--refine]
python cli.py report [--config CONFIG]
python cli.py run [--config CONFIG] [--force-reindex] [--no-audio] [--no-llm]
python cli.py export [--config CONFIG] [--format fcpxml|edl|both]
On --no-audio / --no-llm:
These flags do NOT affect matching quality.
Whisper and the LLM only assign narrative labels (HOOK/SETUP/CLIMAX)
to beats in the export metadata. The CV pipeline is identical either way.
Use them for fast iterations: they skip large model downloads.
All heavy imports are deferred so --help is instant.
"""
from __future__ import annotations
import argparse
import json
import logging
import sys
from pathlib import Path
# ---------------------------------------------------------------------------
# Logging setup
# ---------------------------------------------------------------------------
def _setup_logging(level: str = "INFO") -> None:
# Force UTF-8 for Windows console emoji printing
if sys.stdout.encoding != 'utf-8':
sys.stdout.reconfigure(encoding='utf-8')
logging.basicConfig(
format="%(asctime)s %(levelname)-8s %(name)s%(message)s",
datefmt="%H:%M:%S",
level=getattr(logging, level.upper(), logging.INFO),
stream=sys.stdout,
)
logging.getLogger("PIL").setLevel(logging.WARNING)
def _ensure_utf8_console() -> None:
"""Make argparse help safe on Windows before logging is configured."""
if sys.stdout.encoding != "utf-8":
sys.stdout.reconfigure(encoding="utf-8")
# ---------------------------------------------------------------------------
# Cache helpers (match results ↔ JSON)
# ---------------------------------------------------------------------------
def _results_cache_path(cfg: "AppConfig") -> Path: # type: ignore[name-defined]
return cfg.paths.cache_dir / "match_results.json"
def _save_results(results: list, cfg: "AppConfig") -> None: # type: ignore[name-defined]
from src.core.models import MatchResult
data = [
{
"beat_id": r.beat_id,
"scene_id": r.scene_id,
"source_path": str(r.source_path),
"in_point_s": r.in_point_s,
"out_point_s": r.out_point_s,
"in_point_frame": r.in_point_frame,
"match_score": r.match_score,
"match_location": list(r.match_location),
"is_confirmed": r.is_confirmed,
"segments": [
{
"trailer_offset_s": s.trailer_offset_s,
"duration_s": s.duration_s,
"scene_id": s.scene_id,
"in_point_s": s.in_point_s,
"out_point_s": s.out_point_s,
"match_score": s.match_score,
"is_confirmed": s.is_confirmed,
}
for s in getattr(r, "segments", ())
],
}
for r in results
]
p = _results_cache_path(cfg)
p.parent.mkdir(parents=True, exist_ok=True)
p.write_text(json.dumps(data, indent=2), encoding="utf-8")
logging.getLogger(__name__).info("Match results cached → %s", p)
def _load_results(cfg: "AppConfig") -> list: # type: ignore[name-defined]
from src.core.models import MatchResult, MatchSegment
p = _results_cache_path(cfg)
if not p.exists():
raise FileNotFoundError(f"No cached results at {p}. Run 'match' first.")
raw = json.loads(p.read_text(encoding="utf-8"))
return [
MatchResult(
beat_id=d["beat_id"],
scene_id=d["scene_id"],
source_path=Path(d["source_path"]),
in_point_s=d["in_point_s"],
out_point_s=d["out_point_s"],
in_point_frame=d["in_point_frame"],
match_score=d["match_score"],
match_location=tuple(d["match_location"]),
is_confirmed=d.get("is_confirmed", True),
segments=tuple(
MatchSegment(
trailer_offset_s=float(s["trailer_offset_s"]),
duration_s=float(s["duration_s"]),
scene_id=int(s["scene_id"]),
in_point_s=float(s["in_point_s"]),
out_point_s=float(s["out_point_s"]),
match_score=float(s["match_score"]),
is_confirmed=bool(s.get("is_confirmed", True)),
)
for s in d.get("segments", ())
),
)
for d in raw
]
def _load_scene_cache_light(cfg) -> list[dict]:
p = cfg.paths.cache_dir / "scene_index.json"
if not p.exists():
return []
return json.loads(p.read_text(encoding="utf-8"))
def _scene_fps_light(scene: dict, cfg) -> float:
duration_s = max(0.0, float(scene["end_s"]) - float(scene["start_s"]))
frame_count = max(0, int(scene["end_frame"]) - int(scene["start_frame"]))
return frame_count / duration_s if duration_s > 0 and frame_count > 0 else cfg.export.edl_frame_rate
def _scene_for_time_light(scenes: list[dict], t_sec: float, cfg) -> dict | None:
for idx, scene in enumerate(scenes):
if float(scene["start_s"]) <= t_sec < float(scene["end_s"]):
if (
float(scene["end_s"]) - t_sec <= cfg.cv.deep_scan.scene_boundary_epsilon_s
and idx + 1 < len(scenes)
):
return scenes[idx + 1]
return scene
return None
def _scene_by_id_light(scenes: list[dict], scene_id: int) -> dict | None:
return next((s for s in scenes if int(s["scene_id"]) == scene_id), None)
def _contiguous_duration_light(beat, in_point_s: float, scenes: list[dict], cfg, matchable_duration_s: float) -> float:
if matchable_duration_s <= 0:
return 0.0
try:
from src.cv.global_scan import _reference_internal_cut_offsets
cut_offsets = _reference_internal_cut_offsets(beat, cfg)
except Exception:
cut_offsets = []
start_idx = None
for idx, scene in enumerate(scenes):
if float(scene["start_s"]) <= in_point_s < float(scene["end_s"]):
start_idx = idx
break
if start_idx is None:
return 0.0
target_end = in_point_s + matchable_duration_s
current_end = in_point_s
for scene in scenes[start_idx:]:
scene_end = float(scene["end_s"])
if target_end <= scene_end:
return matchable_duration_s
boundary_offset = scene_end - in_point_s
if not any(
abs(boundary_offset - cut_offset) <= cfg.vision.multi_shot_boundary_tolerance_s
for cut_offset in cut_offsets
):
tail_s = max(0.0, cfg.cv.deep_scan.trim_tail_frames / _scene_fps_light(scene, cfg))
return max(0.0, scene_end - in_point_s - tail_s)
current_end = scene_end
return max(0.0, current_end - in_point_s)
def _normalize_cached_results(beats: list, results: list, cfg) -> list:
"""
Re-apply current generic timing rules to cached results.
This keeps old automatic cache entries from preserving obsolete scene-boundary
or tail-trim behavior without introducing manual per-beat truth.
"""
from dataclasses import replace
scenes = _load_scene_cache_light(cfg)
if not scenes:
return results
beats_by_id = {b.beat_id: b for b in beats}
normalized = []
for result in results:
beat = beats_by_id.get(result.beat_id)
if getattr(result, "segments", ()):
segment_duration = sum(max(0.0, float(s.duration_s)) for s in result.segments)
weighted_score = (
sum(max(0.0, float(s.duration_s)) * float(s.match_score) for s in result.segments)
/ segment_duration
if segment_duration > 0 else result.match_score
)
if weighted_score < cfg.cv.deep_scan.provisional_match_threshold:
continue
if beat is not None and beat.duration_s > 0:
visible_duration = sum(
max(0.0, end_s - start_s)
for start_s, end_s in _reference_scoreable_segments(beat, cfg)
)
coverage_target = visible_duration if visible_duration > 0 else beat.duration_s
coverage = segment_duration / coverage_target
if coverage < cfg.cv.deep_scan.min_duration_coverage:
continue
normalized.append(replace(result, match_score=weighted_score))
continue
if result.match_score < cfg.cv.deep_scan.provisional_match_threshold:
continue
scene = _scene_for_time_light(scenes, result.in_point_s, cfg)
declared_scene = _scene_by_id_light(scenes, result.scene_id)
# If the automatic matcher selected a scene but its in-point sits just
# before that scene's detected start, treat this as scene-boundary drift
# and clamp to the declared scene. This is generic: no beat IDs, no
# manual timestamps, just consistent scene/time reconciliation.
if declared_scene is not None:
declared_start = float(declared_scene["start_s"])
declared_end = float(declared_scene["end_s"])
declared_fps = _scene_fps_light(declared_scene, cfg)
boundary_tolerance_s = (
cfg.cv.deep_scan.scene_boundary_epsilon_s
+ cfg.cv.deep_scan.start_preroll_frames / declared_fps
)
if declared_start - boundary_tolerance_s <= result.in_point_s < declared_end:
scene = declared_scene
if beat is None or scene is None:
normalized.append(result)
continue
fps = _scene_fps_light(scene, cfg)
adjusted_in_s = result.in_point_s
scene_changed = int(scene["scene_id"]) != result.scene_id
starts_before_scene = result.in_point_s < float(scene["start_s"])
if scene_changed or starts_before_scene or result.duration_s <= 0.12:
adjusted_in_s = max(0.0, result.in_point_s - (cfg.cv.deep_scan.start_preroll_frames / fps))
adjusted_in_s = max(float(scene["start_s"]), adjusted_in_s)
scene = _scene_for_time_light(scenes, adjusted_in_s, cfg) or scene
fps = _scene_fps_light(scene, cfg)
matchable_duration_s = beat.duration_s
try:
from src.cv.global_scan import estimate_matchable_reference_duration
matchable_duration_s = estimate_matchable_reference_duration(beat, cfg)
except Exception:
pass
tail_s = max(0.0, cfg.cv.deep_scan.trim_tail_frames / fps)
single_scene_duration_s = max(0.0, min(beat.duration_s, float(scene["end_s"]) - adjusted_in_s) - tail_s)
contiguous_duration_s = _contiguous_duration_light(
beat,
adjusted_in_s,
scenes,
cfg,
matchable_duration_s,
)
max_duration_s = max(single_scene_duration_s, min(beat.duration_s, contiguous_duration_s))
normalized_result = result
if (
scene_changed
or starts_before_scene
or result.duration_s <= 0.12
or result.out_point_s > adjusted_in_s + max_duration_s + (1.0 / fps)
):
normalized_result = replace(
result,
scene_id=int(scene["scene_id"]),
in_point_s=adjusted_in_s,
out_point_s=adjusted_in_s + max_duration_s,
in_point_frame=int(adjusted_in_s * fps),
)
coverage = (
max(0.0, normalized_result.duration_s) / matchable_duration_s
if matchable_duration_s > 0 else 0.0
)
if coverage < cfg.cv.deep_scan.min_duration_coverage:
continue
try:
from src.cv.content_align import align_cached_match_by_content
_, content_score = align_cached_match_by_content(
beat,
normalized_result.in_point_s,
cfg,
search_window_s=min(0.8, cfg.cv.deep_scan.content_align_window_seconds),
fps=12.5,
)
content_gate = (
cfg.cv.deep_scan.provisional_content_threshold
if normalized_result.is_confirmed
else min(cfg.cv.deep_scan.provisional_content_threshold, cfg.vision.content_threshold)
)
if content_score < content_gate:
continue
if content_score < cfg.cv.deep_scan.match_threshold and normalized_result.is_confirmed:
normalized_result = replace(
normalized_result,
match_score=min(normalized_result.match_score, content_score),
is_confirmed=False,
)
except Exception:
pass
normalized.append(normalized_result)
return normalized
# ---------------------------------------------------------------------------
# Command handlers
# ---------------------------------------------------------------------------
def _build_transcribe_callback(cfg):
"""Return a transcribe_callback closure, or None if audio is disabled."""
from src.audio.transcriber import transcribe_video
def _cb(path, start_s, end_s, offset_s):
return transcribe_video(path, cfg, start_s=start_s, end_s=end_s, time_offset_s=offset_s)
return _cb
def _build_classify_callback(cfg):
"""Return a classify_callback closure."""
from src.llm.dramaturg import classify_beats
def _cb(beats):
return classify_beats(beats, cfg)
return _cb
def cmd_analyze(args: argparse.Namespace, cfg) -> list:
from src.pipeline.trailer_analyzer import analyze_reference_trailer
transcribe_cb = _build_transcribe_callback(cfg) if not args.no_audio else None
classify_cb = _build_classify_callback(cfg) if not args.no_llm else None
beats = analyze_reference_trailer(
cfg,
transcribe_callback=transcribe_cb,
classify_callback=classify_cb,
)
# Persist beats for downstream commands (including histogram bytes as hex)
beats_cache = cfg.paths.cache_dir / "trailer_beats.json"
beats_cache.parent.mkdir(parents=True, exist_ok=True)
beats_data = [
{
"beat_id": b.beat_id,
"start_s": b.start_s,
"end_s": b.end_s,
"start_frame": b.start_frame,
"end_frame": b.end_frame,
"beat_type": b.beat_type.name,
"dialogue": [{"start_s": d.start_s, "end_s": d.end_s, "text": d.text} for d in b.dialogue],
"phash": b.phash,
"luma_hist": b.luma_hist.hex() if b.luma_hist else None,
"sat_hist": b.sat_hist.hex() if b.sat_hist else None,
}
for b in beats
]
beats_cache.write_text(json.dumps(beats_data, indent=2, ensure_ascii=False), encoding="utf-8")
print(f"\n\u2705 {len(beats)} beats analyzed \u2192 {beats_cache}")
return beats
def _load_beats(cfg) -> list:
from src.core.models import BeatType, DialogueLine, TrailerBeat
p = cfg.paths.cache_dir / "trailer_beats.json"
if not p.exists():
raise FileNotFoundError(f"No cached beats at {p}. Run 'analyze' first.")
raw = json.loads(p.read_text(encoding="utf-8"))
beats = []
for d in raw:
dialogue = tuple(
DialogueLine(start_s=x["start_s"], end_s=x["end_s"], text=x["text"])
for x in d.get("dialogue", [])
)
beats.append(TrailerBeat(
beat_id=d["beat_id"],
trailer_path=cfg.paths.reference_trailer,
start_s=d["start_s"],
end_s=d["end_s"],
start_frame=d["start_frame"],
end_frame=d["end_frame"],
beat_type=BeatType[d.get("beat_type", "UNKNOWN")],
dialogue=dialogue,
phash=d.get("phash"),
luma_hist=bytes.fromhex(d["luma_hist"]) if d.get("luma_hist") else None,
sat_hist= bytes.fromhex(d["sat_hist"]) if d.get("sat_hist") else None,
))
return beats
def _select_beats(beats: list, beat_id: int | None) -> list:
"""Return all beats or exactly one requested beat."""
if beat_id is None:
return beats
selected = [b for b in beats if b.beat_id == beat_id]
if not selected:
raise ValueError(f"Beat {beat_id} not found. Run 'analyze' first.")
return selected
def _select_results(results: list, beat_ids: set[int] | None) -> list:
"""Return all results or only results for the requested beats."""
if beat_ids is None:
return results
return [r for r in results if r.beat_id in beat_ids]
def _find_scene_for_in_point(cfg, in_point_s: float):
from src.cv.scene_indexer import build_scene_index
scenes = build_scene_index(cfg)
for idx, scene in enumerate(scenes):
if scene.start_s <= in_point_s < scene.end_s:
if (
scene.end_s - in_point_s <= cfg.cv.deep_scan.scene_boundary_epsilon_s
and idx + 1 < len(scenes)
):
return scenes[idx + 1]
return scene
return None
def _reference_scoreable_segments(beat, cfg) -> list[tuple[float, float]]:
"""Find visible source-matchable islands inside a trailer beat."""
from src.cv.frame_extractor import grab_frame_at_path
from src.cv.global_scan import _is_scoreable_reference_frame
step_s = max(0.08, cfg.cv.deep_scan.span_sample_step_s)
min_segment_s = max(0.32, step_s * 3.0)
bridge_gap_s = max(0.18, step_s * 2.0)
raw: list[tuple[float, float]] = []
start: float | None = None
last_seen: float | None = None
t = 0.0
while t <= beat.duration_s:
frame = grab_frame_at_path(beat.trailer_path, beat.start_s + t)
scoreable = frame is not None and _is_scoreable_reference_frame(frame, cfg)
if scoreable:
if start is None:
start = t
last_seen = t
elif start is not None and last_seen is not None and t - last_seen > bridge_gap_s:
end = min(beat.duration_s, last_seen + step_s)
if end - start >= min_segment_s:
raw.append((start, end))
start = None
last_seen = None
t = round(t + step_s, 6)
if start is not None and last_seen is not None:
end = min(beat.duration_s, last_seen + step_s)
if end - start >= min_segment_s:
raw.append((start, end))
return raw
def _trim_beats_to_single_visual_island(beats: list, cfg) -> tuple[list, dict[int, tuple[float, float]]]:
"""Use a single visible island as the primary match target for faded beats."""
from dataclasses import replace
trimmed = []
trims: dict[int, tuple[float, float]] = {}
frame_s = 1.0 / max(1.0, float(cfg.export.edl_frame_rate))
for beat in beats:
islands = _reference_scoreable_segments(beat, cfg)
if len(islands) == 1:
start_s, end_s = islands[0]
island_duration_s = max(0.0, end_s - start_s)
has_real_trim = (
start_s > frame_s * 1.5
or beat.duration_s - end_s > frame_s * 1.5
)
if island_duration_s > 0.0 and has_real_trim:
trimmed.append(
replace(
beat,
start_s=beat.start_s + start_s,
end_s=beat.start_s + end_s,
)
)
trims[beat.beat_id] = (start_s, island_duration_s)
continue
trimmed.append(beat)
return trimmed, trims
def _apply_single_island_segments(results: list, trims: dict[int, tuple[float, float]]) -> list:
"""Restore beat-relative segment metadata after matching a trimmed island."""
if not trims:
return results
from dataclasses import replace
from src.core.models import MatchSegment
expanded = []
for result in results:
trim = trims.get(result.beat_id)
if trim is None or getattr(result, "segments", ()):
expanded.append(result)
continue
trailer_offset_s, island_duration_s = trim
duration_s = min(max(0.0, island_duration_s), max(0.0, result.duration_s))
segment = MatchSegment(
trailer_offset_s=trailer_offset_s,
duration_s=duration_s,
scene_id=result.scene_id,
in_point_s=result.in_point_s,
out_point_s=result.in_point_s + duration_s,
match_score=result.match_score,
is_confirmed=result.is_confirmed,
)
expanded.append(
replace(
result,
out_point_s=result.in_point_s + duration_s,
segments=(segment,),
)
)
return expanded
def _attach_visual_segments(results: list, beats: list, cfg) -> list:
"""Attach automatic sub-shot matches for multi-island trailer beats."""
from dataclasses import replace
from src.core.models import MatchResult, MatchSegment
from src.cv.global_scan import run_global_scan
by_id = {b.beat_id: b for b in beats}
expanded: list[MatchResult] = []
for result in results:
beat = by_id.get(result.beat_id)
if beat is None:
expanded.append(result)
continue
if getattr(result, "segments", ()):
expanded.append(result)
continue
islands = _reference_scoreable_segments(beat, cfg)
if len(islands) <= 1:
primary = MatchSegment(
trailer_offset_s=0.0,
duration_s=max(0.0, result.duration_s),
scene_id=result.scene_id,
in_point_s=result.in_point_s,
out_point_s=result.out_point_s,
match_score=result.match_score,
is_confirmed=result.is_confirmed,
)
expanded.append(replace(result, segments=(primary,)))
continue
segments: list[MatchSegment] = []
first_start, first_end = islands[0]
first_duration = min(max(0.0, result.duration_s), max(0.0, first_end - first_start))
segments.append(
MatchSegment(
trailer_offset_s=first_start,
duration_s=first_duration,
scene_id=result.scene_id,
in_point_s=result.in_point_s,
out_point_s=result.in_point_s + first_duration,
match_score=result.match_score,
is_confirmed=result.is_confirmed,
)
)
for start_s, end_s in islands[1:]:
segment_beat = replace(
beat,
start_s=beat.start_s + start_s,
end_s=beat.start_s + end_s,
)
segment_matches = run_global_scan([segment_beat], cfg, seed_in_points=None)
if not segment_matches:
continue
seg = segment_matches[0]
seg_dur = min(max(0.0, end_s - start_s), max(0.0, seg.duration_s))
segments.append(
MatchSegment(
trailer_offset_s=start_s,
duration_s=seg_dur,
scene_id=seg.scene_id,
in_point_s=seg.in_point_s,
out_point_s=seg.in_point_s + seg_dur,
match_score=seg.match_score,
is_confirmed=seg.is_confirmed,
)
)
expanded.append(replace(result, segments=tuple(segments)))
return expanded
def _fast_vision_match_cfg(cfg):
"""Return a vision-seed prepass config that still keeps quality settings."""
from dataclasses import replace
return replace(
cfg,
cv=replace(
cfg.cv,
deep_scan=replace(cfg.cv.deep_scan, skip_coarse_scan_with_weighted_seeds=True),
),
vision=replace(
cfg.vision,
fullscan_fallback=False,
),
)
def _run_segment_match(segment_beat, continuity, cfg, allow_fullscan: bool = True):
"""Match one visual island with the same generic staged strategy as a beat."""
from src.pipeline.matcher import run_matching
if cfg.vision.enabled:
fast_cfg = _fast_vision_match_cfg(cfg)
fast_matches = run_matching(
fast_cfg,
[segment_beat],
seed_in_points=continuity,
)
if fast_matches:
return fast_matches
if not allow_fullscan:
return []
return run_matching(
cfg,
[segment_beat],
seed_in_points=continuity,
)
def _match_unmatched_visual_segments(
results: list,
beats: list,
cached: list,
cfg,
skip_global_segment_scan_for: set[int] | None = None,
) -> list:
"""Create segmented provisional matches when a whole beat has no single match."""
from dataclasses import replace
from src.core.models import MatchResult, MatchSegment
from src.cv.frame_extractor import get_video_info
matched_ids = {r.beat_id for r in results}
expanded = list(results)
skip_global_segment_scan_for = skip_global_segment_scan_for or set()
try:
fps = float(get_video_info(cfg.paths.source_movie)["fps"]) or cfg.export.edl_frame_rate
except Exception:
fps = cfg.export.edl_frame_rate
for beat in beats:
if beat.beat_id in matched_ids:
continue
islands = _reference_scoreable_segments(beat, cfg)
if not islands:
continue
segments: list[MatchSegment] = []
for start_s, end_s in islands:
segment_beat = replace(
beat,
start_s=beat.start_s + start_s,
end_s=beat.start_s + end_s,
)
continuity = _continuity_seed_in_points(
beat.beat_id,
[b if b.beat_id != beat.beat_id else segment_beat for b in beats],
cached + expanded,
cfg,
)
segment_matches = []
if beat.beat_id not in skip_global_segment_scan_for:
segment_matches = _run_segment_match(segment_beat, continuity, cfg, allow_fullscan=True)
if not segment_matches:
local_segment = _local_same_scene_segment_match(
segment_beat,
beat,
start_s,
cached + expanded,
cfg,
)
if local_segment is not None:
segments.append(local_segment)
continue
seg = segment_matches[0]
seg_dur = min(max(0.0, end_s - start_s), max(0.0, seg.duration_s))
segments.append(
MatchSegment(
trailer_offset_s=start_s,
duration_s=seg_dur,
scene_id=seg.scene_id,
in_point_s=seg.in_point_s,
out_point_s=seg.in_point_s + seg_dur,
match_score=seg.match_score,
is_confirmed=seg.is_confirmed,
)
)
if not segments:
continue
first = segments[0]
total_segment_duration = sum(max(0.0, s.duration_s) for s in segments)
score = (
sum(max(0.0, s.duration_s) * s.match_score for s in segments) / total_segment_duration
if total_segment_duration > 0 else min(s.match_score for s in segments)
)
expanded.append(
MatchResult(
beat_id=beat.beat_id,
scene_id=first.scene_id,
source_path=cfg.paths.source_movie,
in_point_s=first.in_point_s,
out_point_s=first.out_point_s,
in_point_frame=int(max(0.0, first.in_point_s) * fps),
match_score=score,
is_confirmed=all(s.is_confirmed for s in segments),
segments=tuple(segments),
)
)
return expanded
def _local_same_scene_segment_match(segment_beat, beat, segment_offset_s: float, cached: list, cfg):
"""Find a short trailer island inside scenes adjacent to neighbouring beat matches."""
from src.core.models import MatchSegment
from src.cv.frame_extractor import open_video
from src.cv.global_scan import _content_alignment_score, _content_alignment_templates
scenes = _load_scene_cache_light(cfg)
if not scenes:
return None
by_id = {r.beat_id: r for r in cached}
scene_ids: list[int] = []
for neighbour_id in (beat.beat_id - 1, beat.beat_id + 1):
result = by_id.get(neighbour_id)
if result is None:
continue
ids = [getattr(s, "scene_id", result.scene_id) for s in getattr(result, "segments", ())] or [result.scene_id]
for scene_id in ids:
if scene_id not in scene_ids:
scene_ids.append(scene_id)
if not scene_ids:
return None
templates = _content_alignment_templates(segment_beat, cfg)
if not templates:
return None
min_score = min(
cfg.cv.deep_scan.provisional_content_threshold * 0.70,
cfg.cv.deep_scan.provisional_match_threshold,
)
step_s = max(1.0 / cfg.export.edl_frame_rate, 0.04)
best: tuple[float, float, int] | None = None
with open_video(cfg.paths.source_movie) as cap:
for scene_id in scene_ids:
scene = next((s for s in scenes if int(s["scene_id"]) == int(scene_id)), None)
if scene is None:
continue
start_s = max(0.0, float(scene["start_s"]) - 0.25)
end_s = max(start_s, float(scene["end_s"]) - max(0.04, segment_beat.duration_s) + 0.25)
t = start_s
while t <= end_s:
score = _content_alignment_score(cap, t, templates, cfg)
if best is None or score > best[0]:
best = (score, t, int(scene_id))
t = round(t + step_s, 6)
if best is None or best[0] < min_score:
return None
score, in_point_s, scene_id = best
duration_s = max(0.0, min(segment_beat.duration_s, segment_beat.end_s - segment_beat.start_s))
return MatchSegment(
trailer_offset_s=segment_offset_s,
duration_s=duration_s,
scene_id=scene_id,
in_point_s=in_point_s,
out_point_s=in_point_s + duration_s,
match_score=score,
is_confirmed=score >= cfg.cv.deep_scan.match_threshold,
)
def cmd_match(args: argparse.Namespace, cfg) -> list:
from src.pipeline.matcher import run_matching
from dataclasses import replace
if getattr(args, "vision", False):
cfg = replace(cfg, vision=replace(cfg.vision, enabled=True))
if getattr(args, "no_vision", False):
cfg = replace(cfg, vision=replace(cfg.vision, enabled=False))
all_beats = _load_beats(cfg)
beats = _select_beats(all_beats, getattr(args, "beat", None))
cached = _normalize_cached_results(all_beats, _load_results(cfg), cfg) if _results_cache_path(cfg).exists() else []
multi_island_beat_ids = {
beat.beat_id
for beat in beats
if len(_reference_scoreable_segments(beat, cfg)) > 1
}
scan_beats, single_island_trims = _trim_beats_to_single_visual_island(beats, cfg)
scan_beats = [b for b in scan_beats if b.beat_id not in multi_island_beat_ids]
seed_in_points = (
_continuity_seed_in_points(args.beat, all_beats, cached, cfg)
if getattr(args, "beat", None) is not None
else None
)
results = []
if cfg.vision.enabled:
fast_cfg = _fast_vision_match_cfg(cfg)
results = run_matching(
fast_cfg,
scan_beats,
force_reindex=args.force_reindex,
seed_in_points=seed_in_points,
)
if len(results) < len(scan_beats):
matched_ids = {r.beat_id for r in results}
remaining_beats = [b for b in scan_beats if b.beat_id not in matched_ids]
if remaining_beats:
full_results = run_matching(
cfg,
remaining_beats,
force_reindex=args.force_reindex,
seed_in_points=seed_in_points,
)
results = sorted([*results, *full_results], key=lambda r: r.beat_id)
results = _apply_single_island_segments(results, single_island_trims)
results = _match_unmatched_visual_segments(
results,
beats,
cached,
cfg,
skip_global_segment_scan_for=set(single_island_trims),
)
results = _attach_visual_segments(results, beats, cfg)
# A targeted one-beat match should improve the cache without deleting
# automatic matches for other beats.
if getattr(args, "beat", None) is not None and _results_cache_path(cfg).exists():
cached = [r for r in cached if r.beat_id != args.beat]
for result in results:
cached = _update_result(result, cached)
results_to_save = cached
else:
results_to_save = results
_save_results(results_to_save, cfg)
print(f"\n{len(results)} / {len(beats)} beats matched.")
for r in results:
print(f" Beat {r.beat_id:03d} → scene {r.scene_id:04d} "
f"in={r.in_point_s:>8.3f}s score={r.match_score:.3f}")
return results
def _update_result(new_result, results: list) -> list:
"""Replace or insert a MatchResult in the list (by beat_id)."""
updated = [r for r in results if r.beat_id != new_result.beat_id]
updated.append(new_result)
return sorted(updated, key=lambda r: r.beat_id)
def _continuity_seed_in_points(beat_id: int, beats: list, results: list, cfg) -> dict[int, list[float | tuple[float, float]]]:
beats_by_id = {b.beat_id: b for b in beats}
results_by_id = {r.beat_id: r for r in results}
target = beats_by_id.get(beat_id)
if target is None:
return {}
seeds: list[tuple[float, float]] = []
base_score = max(cfg.cv.deep_scan.coarse_candidate_threshold + 0.08, 0.92)
prev_matches = [
(b, results_by_id[b.beat_id])
for b in beats
if b.beat_id < beat_id and b.beat_id in results_by_id
]
if prev_matches:
prev_beat, prev_result = max(prev_matches, key=lambda item: item[0].beat_id)
trailer_gap_s = max(0.0, target.start_s - prev_beat.end_s)
expected = prev_result.out_point_s + trailer_gap_s
for offset in cfg.cv.deep_scan.continuity_seed_offsets_s:
offset_score = max(
cfg.cv.deep_scan.coarse_candidate_threshold,
base_score - abs(offset) * 0.06,
)
seeds.append((expected + offset, offset_score))
next_matches = [
(b, results_by_id[b.beat_id])
for b in beats
if b.beat_id > beat_id and b.beat_id in results_by_id
]
if next_matches:
next_beat, next_result = min(next_matches, key=lambda item: item[0].beat_id)
trailer_gap_s = max(0.0, next_beat.start_s - target.end_s)
expected = next_result.in_point_s - trailer_gap_s - target.duration_s
for offset in cfg.cv.deep_scan.continuity_seed_offsets_s:
offset_score = max(
cfg.cv.deep_scan.coarse_candidate_threshold,
base_score - abs(offset) * 0.06,
)
seeds.append((expected - offset, offset_score))
unique: dict[float, float] = {}
for seed_t, seed_score in seeds:
rounded = round(max(0.0, seed_t), 3)
unique[rounded] = max(unique.get(rounded, 0.0), seed_score)
points = [(seed_t, score) for seed_t, score in sorted(unique.items())]
return {beat_id: points} if points else {}
def cmd_rematch(args: argparse.Namespace, cfg) -> None:
"""
Re-run automatic matching for ONE beat.
python cli.py rematch --beat 5 # re-scan CV for beat 5
python cli.py rematch --beat 5 --threshold 0.40 # relax threshold
"""
beat_id = args.beat
beats = _load_beats(cfg)
results = _load_results(cfg) if _results_cache_path(cfg).exists() else []
beat = next((b for b in beats if b.beat_id == beat_id), None)
if beat is None:
print(f"\u274c Beat {beat_id} not found. Run 'analyze' first.")
return
# ---- Refine an already acceptable cached match -------------------------
if args.refine:
current = next((r for r in results if r.beat_id == beat_id), None)
if current is None:
print(f"❌ Beat {beat_id} has no cached match to refine. Run 'match --beat {beat_id}' first.")
return
from src.cv.content_align import align_cached_match_by_content
refined_in_s, sequence_score = align_cached_match_by_content(
beat,
current.in_point_s,
cfg,
search_window_s=args.refine_window,
)
usable_duration_s = max(0.0, current.out_point_s - current.in_point_s)
span_score = sequence_score
scene_data = _scene_for_time_light(_load_scene_cache_light(cfg), refined_in_s, cfg)
out_point_s = refined_in_s + usable_duration_s
if scene_data is not None:
out_point_s = min(out_point_s, float(scene_data["end_s"]))
matchable_duration_s = beat.duration_s
duration_coverage = (
max(0.0, out_point_s - refined_in_s) / matchable_duration_s
if matchable_duration_s > 0 else 0.0
)
if duration_coverage < cfg.cv.deep_scan.min_duration_coverage:
print(
f"❌ Beat {beat_id} refined candidate rejected: "
f"duration coverage {duration_coverage:.0%} < "
f"{cfg.cv.deep_scan.min_duration_coverage:.0%}"
)
return
try:
from src.cv.frame_extractor import get_video_info
fps = float(get_video_info(cfg.paths.source_movie)["fps"]) or cfg.export.edl_frame_rate
except Exception:
fps = cfg.export.edl_frame_rate
from src.core.models import MatchResult
refined = MatchResult(
beat_id=beat_id,
scene_id=int(scene_data["scene_id"]) if scene_data is not None else current.scene_id,
source_path=current.source_path,
in_point_s=max(0.0, refined_in_s),
out_point_s=out_point_s,
in_point_frame=int(max(0.0, refined_in_s) * fps),
match_score=max(sequence_score, span_score),
match_location=current.match_location,
is_confirmed=max(sequence_score, span_score) >= cfg.cv.deep_scan.match_threshold,
)
results = _update_result(refined, results)
_save_results(results, cfg)
print(
f"✅ Beat {beat_id} refined → "
f"in={refined.in_point_s:.3f}s, out={refined.out_point_s:.3f}s, "
f"sequence_score={refined.match_score:.3f}"
)
return
# ---- Re-run CV with optional threshold override ------------------------
from dataclasses import replace as dc_replace
run_cfg = cfg
if args.threshold is not None:
run_cfg = dc_replace(
cfg,
cv=dc_replace(
cfg.cv,
deep_scan=dc_replace(cfg.cv.deep_scan, match_threshold=args.threshold),
),
)
print(f"️ threshold overridden to {args.threshold} for beat {beat_id}")
from src.cv.global_scan import run_global_scan
seed_in_points = _continuity_seed_in_points(beat_id, beats, results, run_cfg)
matches = run_global_scan([beat], run_cfg, seed_in_points=seed_in_points)
if not matches:
print(f"❌ Beat {beat_id}: no match. Try --threshold 0.40.")
return
match = matches[0]
results = _update_result(match, results)
_save_results(results, cfg)
print(f"✅ Beat {beat_id} rematched → (in={match.in_point_s:.3f}s, score={match.match_score:.3f})")
def cmd_report(args: argparse.Namespace, cfg) -> None:
from src.pipeline.reporter import generate_report
beats = _select_beats(_load_beats(cfg), getattr(args, "beat", None))
beat_ids = {b.beat_id for b in beats} if getattr(args, "beat", None) is not None else None
results = _select_results(_normalize_cached_results(_load_beats(cfg), _load_results(cfg), cfg), beat_ids)
out = generate_report(beats, results, cfg)
if getattr(args, "beat", None) is not None and not results:
print(
f"\n⚠️ Beat {args.beat} has no cached match yet. "
f"Run: python cli.py match --beat {args.beat}"
)
print(f"\n\u2705 Report \u2192 {out}")
def cmd_export(args: argparse.Namespace, cfg) -> None:
from src.export.edl_writer import write_edl
from src.export.fcpxml_writer import write_fcpxml
from src.pipeline.matcher import build_timeline
beats = _select_beats(_load_beats(cfg), getattr(args, "beat", None))
beat_ids = {b.beat_id for b in beats} if getattr(args, "beat", None) is not None else None
results = _select_results(_normalize_cached_results(_load_beats(cfg), _load_results(cfg), cfg), beat_ids)
if getattr(args, "beat", None) is not None and not results:
print(f"❌ Beat {args.beat} has no cached match. Run 'match --beat {args.beat}' first.")
return
timeline = build_timeline(beats, results, cfg)
fmt = args.format or cfg.export.output_format
beat_id = getattr(args, "beat", None)
out_stem = (
f"{cfg.paths.reference_trailer.stem}_beat_{beat_id:03d}"
if beat_id is not None
else timeline.title
)
if fmt in ("fcpxml", "both"):
out = write_fcpxml(timeline, cfg, output_path=cfg.paths.output_dir / f"{out_stem}.fcpxml")
print(f"✅ FCPXML → {out}")
if fmt in ("edl", "both"):
out = write_edl(timeline, cfg, output_path=cfg.paths.output_dir / f"{out_stem}.edl")
print(f"✅ EDL → {out}")
def cmd_run(args: argparse.Namespace, cfg) -> None:
"""Full pipeline: analyze → match → report → export."""
cmd_analyze(args, cfg)
cmd_match(args, cfg)
cmd_report(args, cfg)
cmd_export(args, cfg)
# ---------------------------------------------------------------------------
# Argument parser
# ---------------------------------------------------------------------------
def _build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
prog="ai-trailer",
description="AI Trailer Generator v2 — Pure CV scene matching",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument(
"--config", type=Path, default=Path("config.toml"),
metavar="CONFIG", help="Path to config.toml (default: ./config.toml)",
)
parser.add_argument(
"--log-level", default="INFO",
choices=["DEBUG", "INFO", "WARNING", "ERROR"],
help="Logging verbosity (default: INFO)",
)
sub = parser.add_subparsers(dest="command", required=True)
# analyze
p_analyze = sub.add_parser("analyze", help="Detect trailer beats + fingerprint")
p_analyze.add_argument("--no-audio", action="store_true",
help="Skip Whisper (only affects beat labels, not matching)")
p_analyze.add_argument("--no-llm", action="store_true",
help="Skip LLM classification (only affects beat labels)")
# match
p_match = sub.add_parser("match", help="Run 2-phase CV matching")
p_match.add_argument("--force-reindex", action="store_true",
help="Ignore scene cache and re-run PySceneDetect")
p_match.add_argument("--beat", type=int,
help="Match only one beat and merge it into the cached results")
p_match.add_argument("--vision", action="store_true",
help="Enable cached vision descriptions for extra automatic search seeds")
p_match.add_argument("--no-vision", action="store_true",
help="Disable vision seeding even if [vision].enabled is true")
# rematch
p_rematch = sub.add_parser("rematch", help="Re-run or override matching for one beat")
p_rematch.add_argument("--beat", type=int, required=True, help="Beat ID to rematch")
p_rematch.add_argument("--threshold", type=float, default=None, help="Override match_threshold")
p_rematch.add_argument("--refine", action="store_true",
help="Refine the cached match by measuring a local image-content offset")
p_rematch.add_argument("--refine-window", type=float, default=None,
help="Seconds to search around the cached in-point when using --refine")
# report
p_report = sub.add_parser("report", help="Generate HTML visual comparison report")
p_report.add_argument("--beat", type=int, help="Report only one beat")
# export
p_export = sub.add_parser("export", help="Export timeline from cached results")
p_export.add_argument("--format", choices=["fcpxml", "edl", "both"],
help="Override [export] output_format from config")
p_export.add_argument("--beat", type=int, help="Export only one beat")
# run
p_run = sub.add_parser("run", help="Full pipeline: analyze → match → export")
p_run.add_argument("--no-audio", action="store_true")
p_run.add_argument("--no-llm", action="store_true")
p_run.add_argument("--force-reindex", action="store_true")
p_run.add_argument("--vision", action="store_true")
p_run.add_argument("--no-vision", action="store_true")
p_run.add_argument("--format", choices=["fcpxml", "edl", "both"])
p_run.add_argument("--beat", type=int,
help="Run match/report/export for only one cached beat")
return parser
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
def main() -> None:
_ensure_utf8_console()
parser = _build_parser()
args = parser.parse_args()
_setup_logging(args.log_level)
from src.core.config import load_config
cfg = load_config(args.config)
dispatch = {
"analyze": cmd_analyze,
"match": cmd_match,
"rematch": cmd_rematch,
"report": cmd_report,
"export": cmd_export,
"run": cmd_run,
}
handler = dispatch[args.command]
handler(args, cfg)
if __name__ == "__main__":
main()