Initial project import

This commit is contained in:
Melbar
2026-05-02 09:07:41 +02:00
commit 8e1bcf142f
38 changed files with 7928 additions and 0 deletions
+1
View File
@@ -0,0 +1 @@
# src package
+1
View File
@@ -0,0 +1 @@
# src.audio package — Whisper / dialogue analysis
+182
View File
@@ -0,0 +1,182 @@
"""
src/audio/transcriber.py — Whisper transcription via faster-whisper
Responsibility:
- Transcribe audio from a video file into a list of DialogueLine objects
- Optionally restrict to a time window [start_s, end_s] (for single beats)
- All model config (model name, device, compute_type) comes from AppConfig
The LLM is NOT used here. This is pure audio-to-text.
"""
from __future__ import annotations
import logging
import tempfile
from pathlib import Path
from typing import Sequence
from src.core.config import AppConfig
from src.core.models import DialogueLine
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Audio extraction helper (video → wav via ffmpeg)
# ---------------------------------------------------------------------------
def _extract_audio_segment(
video_path: Path,
start_s: float | None,
end_s: float | None,
out_wav: Path,
) -> None:
"""
Use ffmpeg (subprocess) to extract a mono 16kHz WAV from *video_path*.
Args:
video_path: Source video.
start_s: Start time in seconds (None = beginning of file).
end_s: End time in seconds (None = end of file).
out_wav: Destination WAV path.
Raises:
RuntimeError: If ffmpeg exits with a non-zero code.
"""
import subprocess
cmd = ["ffmpeg", "-y", "-loglevel", "error"]
if start_s is not None:
cmd += ["-ss", str(start_s)]
if end_s is not None and start_s is not None:
cmd += ["-t", str(end_s - start_s)]
elif end_s is not None:
cmd += ["-to", str(end_s)]
cmd += [
"-i", str(video_path),
"-vn", # no video
"-ac", "1", # mono
"-ar", "16000", # 16 kHz — Whisper native rate
"-f", "wav",
str(out_wav),
]
result = subprocess.run(cmd, capture_output=True)
if result.returncode != 0:
raise RuntimeError(
f"ffmpeg failed (code {result.returncode}):\n"
f"{result.stderr.decode(errors='replace')}"
)
# ---------------------------------------------------------------------------
# Core transcription
# ---------------------------------------------------------------------------
def transcribe_video(
video_path: Path,
cfg: AppConfig,
start_s: float | None = None,
end_s: float | None = None,
time_offset_s: float = 0.0,
) -> list[DialogueLine]:
"""
Transcribe dialogue from *video_path* using faster-whisper.
Args:
video_path: Path to source or trailer video.
cfg: Application configuration (whisper section).
start_s: Clip start in video-file seconds (None = beginning).
end_s: Clip end in video-file seconds (None = end of file).
time_offset_s: Added to every transcript timestamp so that beat-level
transcripts align with absolute movie time.
Returns:
List of DialogueLine ordered by start time.
"""
try:
from faster_whisper import WhisperModel
except ImportError:
raise ImportError("faster-whisper not installed. Run: pip install faster-whisper")
w = cfg.whisper
logger.info(
"Transcribing %s [%.1f%s] with %s on %s",
video_path.name,
start_s or 0.0,
f"{end_s:.1f}s" if end_s else "end",
w.model,
w.device,
)
with tempfile.TemporaryDirectory() as tmp:
wav = Path(tmp) / "audio.wav"
_extract_audio_segment(video_path, start_s, end_s, wav)
model = WhisperModel(w.model, device=w.device, compute_type=w.compute_type)
segments, _ = model.transcribe(
str(wav),
language=w.language if w.language else None,
beam_size=5,
)
lines: list[DialogueLine] = []
for seg in segments:
lines.append(DialogueLine(
start_s=seg.start + time_offset_s,
end_s=seg.end + time_offset_s,
text=seg.text.strip(),
))
logger.info("Transcription done: %d segments.", len(lines))
return lines
# ---------------------------------------------------------------------------
# Convenience: transcribe a whole file and return grouped by scene
# ---------------------------------------------------------------------------
def transcribe_full_movie(
cfg: AppConfig,
) -> list[DialogueLine]:
"""
Transcribe the entire source movie. Use this result to enrich Scenes
via a dialogue_callback passed to build_scene_index().
"""
return transcribe_video(cfg.paths.source_movie, cfg)
def assign_dialogue_to_scenes(
all_dialogue: Sequence[DialogueLine],
scenes: list["src.core.models.Scene"], # type: ignore[name-defined]
) -> list["src.core.models.Scene"]: # type: ignore[name-defined]
"""
Distribute pre-transcribed DialogueLines into their respective Scenes.
A line is assigned to the scene whose window contains its midpoint.
Args:
all_dialogue: Full-movie transcript as flat list.
scenes: Scene list (will be replaced with enriched copies).
Returns:
New list of Scene objects with dialogue tuples populated.
"""
from dataclasses import replace
from src.core.models import Scene
enriched: list[Scene] = []
for scene in scenes:
matched = tuple(
line for line in all_dialogue
if scene.start_s <= (line.start_s + line.end_s) / 2.0 < scene.end_s
)
enriched.append(replace(scene, dialogue=matched))
total_assigned = sum(len(s.dialogue) for s in enriched)
logger.info("Assigned %d dialogue lines across %d scenes.", total_assigned, len(enriched))
return enriched
+1
View File
@@ -0,0 +1 @@
# src.core package
+387
View File
@@ -0,0 +1,387 @@
"""
src/core/config.py — Configuration loader for AI Trailer Generator v2
Loads config.toml and exposes typed, nested dataclasses.
All CV thresholds, paths, and model settings are sourced exclusively here.
API keys are NEVER stored in config.toml; they are loaded from .env.
"""
from __future__ import annotations
import os
import tomllib
try:
from dotenv import load_dotenv as _load_dotenv
_HAS_DOTENV = True
except ImportError: # dotenv optional — falls back to existing env vars
_HAS_DOTENV = False
from dataclasses import dataclass, field
from pathlib import Path
from typing import Literal
# ---------------------------------------------------------------------------
# Leaf sections
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class PathsConfig:
source_movie: Path
reference_trailer: Path
output_dir: Path
cache_dir: Path
proxy_dir: Path
@dataclass(frozen=True)
class VideoConfig:
extract_fps: float
proxy_width: int
proxy_height: int
@dataclass(frozen=True)
class VibeCheckConfig:
top_k_candidates: int
hist_compare_method: int
hist_bins_hue: int
hist_bins_saturation: int
phash_max_distance: int
crop_top_fraction: float
crop_bottom_fraction: float
@dataclass(frozen=True)
class DeepScanConfig:
coarse_step_seconds: float
match_threshold: float
provisional_match_threshold: float
coarse_candidate_threshold: float
sequence_score_weight: float
span_score_weight: float
coarse_score_weight: float
duration_score_weight: float
duration_tie_break_score_delta: float
min_duration_coverage: float
continuity_seed_offsets_s: tuple[float, ...]
scene_seed_top_k: int
scene_seed_points_per_scene: int
content_rerank_candidate_count: int
skip_coarse_scan_with_weighted_seeds: bool
max_refine_candidates: int
match_method: int
refine_window_seconds: float
refine_step_seconds: float
content_align_window_seconds: float
content_align_sample_step_s: float
content_validation_weight: float
provisional_content_threshold: float
start_tie_break_score_delta: float
start_preroll_frames: int
sequence_candidate_count: int
sequence_min_distance_s: float
span_sample_step_s: float
trim_tail_frames: int
scene_boundary_epsilon_s: float
scoreable_luma_mean_min: float
scoreable_luma_p90_min: float
scoreable_contrast_min: float
@dataclass(frozen=True)
class CVConfig:
vibe_check: VibeCheckConfig
deep_scan: DeepScanConfig
@dataclass(frozen=True)
class SceneDetectionConfig:
content_threshold: float
min_scene_duration_s: float
@dataclass(frozen=True)
class WhisperConfig:
model: str
language: str
device: Literal["cuda", "cpu"]
compute_type: Literal["float16", "int8", "float32"]
@dataclass(frozen=True)
class LLMConfig:
provider: Literal["ollama", "openai", "openrouter"]
base_url: str
model: str
timeout_seconds: int
temperature: float
max_tokens: int
# Loaded from .env — NEVER committed to version control
api_key: str = ""
@dataclass(frozen=True)
class VisionConfig:
enabled: bool
provider: Literal["openai", "openrouter"]
base_url: str
model: str
timeout_seconds: int
temperature: float
max_tokens: int
scene_candidate_top_k: int
max_new_descriptions_per_run: int
max_seed_scenes: int
seed_points_per_scene: int
seed_score: float
max_refine_candidates: int
local_scan_step_s: float
local_scan_max_points_per_scene: int
local_scan_top_candidates: int
local_scan_tie_break_score_delta: float
multi_shot_cut_corr_threshold: float
multi_shot_boundary_tolerance_s: float
fullscan_fallback: bool
content_threshold: float
similarity_threshold: float
api_key: str = ""
@dataclass(frozen=True)
class ExportConfig:
fcpxml_version: str
edl_frame_rate: float
output_format: Literal["fcpxml", "edl", "both"]
# ---------------------------------------------------------------------------
# Root config — single object passed through the entire application
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class AppConfig:
project_name: str
version: str
log_level: Literal["DEBUG", "INFO", "WARNING", "ERROR"]
paths: PathsConfig
video: VideoConfig
cv: CVConfig
scene_detection: SceneDetectionConfig
whisper: WhisperConfig
llm: LLMConfig
vision: VisionConfig
export: ExportConfig
# ---------------------------------------------------------------------------
# Loader
# ---------------------------------------------------------------------------
_DEFAULT_CONFIG_PATH = Path(__file__).parents[2] / "config.toml"
_DEFAULT_ENV_PATH = Path(__file__).parents[2] / ".env"
def load_config(
config_path: Path = _DEFAULT_CONFIG_PATH,
env_path: Path = _DEFAULT_ENV_PATH,
) -> AppConfig:
"""
Parse config.toml and return a fully-typed, immutable AppConfig.
API keys are read from the .env file (or existing environment variables);
they are never stored in config.toml.
Args:
config_path: Absolute or relative path to the TOML file.
Defaults to <project_root>/config.toml.
env_path: Path to the .env file.
Defaults to <project_root>/.env.
Raises:
FileNotFoundError: If the TOML file does not exist.
KeyError / TypeError: If a required key is missing or has the wrong type.
"""
# Load .env first so os.environ is populated before we read it below.
if _HAS_DOTENV:
_load_dotenv(dotenv_path=env_path, override=False)
if not config_path.exists():
raise FileNotFoundError(
f"Config file not found: {config_path}\n"
"Copy config.toml.example to config.toml and adjust your paths."
)
with config_path.open("rb") as fh:
raw: dict = tomllib.load(fh)
project = raw["project"]
paths_raw = raw["paths"]
video_raw = raw["video"]
cv_raw = raw["cv"]
sd_raw = raw["scene_detection"]
whisper_raw = raw["whisper"]
llm_raw = raw["llm"]
vision_raw = raw.get("vision", {})
export_raw = raw["export"]
# Resolve paths relative to the config file's parent directory so the
# project is relocatable, but keep absolute paths as-is.
def _resolve(p: str) -> Path:
path = Path(p)
return path if path.is_absolute() else (config_path.parent / path).resolve()
paths = PathsConfig(
source_movie=_resolve(paths_raw["source_movie"]),
reference_trailer=_resolve(paths_raw["reference_trailer"]),
output_dir=_resolve(paths_raw["output_dir"]),
cache_dir=_resolve(paths_raw["cache_dir"]),
proxy_dir=_resolve(paths_raw["proxy_dir"]),
)
video = VideoConfig(
extract_fps=float(video_raw["extract_fps"]),
proxy_width=int(video_raw["proxy_width"]),
proxy_height=int(video_raw["proxy_height"]),
)
vibe_check = VibeCheckConfig(
top_k_candidates=int(cv_raw["vibe_check"]["top_k_candidates"]),
hist_compare_method=int(cv_raw["vibe_check"]["hist_compare_method"]),
hist_bins_hue=int(cv_raw["vibe_check"]["hist_bins_hue"]),
hist_bins_saturation=int(cv_raw["vibe_check"]["hist_bins_saturation"]),
phash_max_distance=int(cv_raw["vibe_check"]["phash_max_distance"]),
crop_top_fraction=float(cv_raw["vibe_check"]["crop_top_fraction"]),
crop_bottom_fraction=float(cv_raw["vibe_check"]["crop_bottom_fraction"]),
)
deep_scan = DeepScanConfig(
coarse_step_seconds=float(cv_raw["deep_scan"]["coarse_step_seconds"]),
match_threshold=float(cv_raw["deep_scan"]["match_threshold"]),
provisional_match_threshold=float(cv_raw["deep_scan"].get("provisional_match_threshold", 0.45)),
coarse_candidate_threshold=float(cv_raw["deep_scan"].get("coarse_candidate_threshold", cv_raw["deep_scan"]["match_threshold"])),
sequence_score_weight=float(cv_raw["deep_scan"].get("sequence_score_weight", 0.55)),
span_score_weight=float(cv_raw["deep_scan"].get("span_score_weight", 0.15)),
coarse_score_weight=float(cv_raw["deep_scan"].get("coarse_score_weight", 0.10)),
duration_score_weight=float(cv_raw["deep_scan"].get("duration_score_weight", 0.20)),
duration_tie_break_score_delta=float(cv_raw["deep_scan"].get("duration_tie_break_score_delta", 0.03)),
min_duration_coverage=float(cv_raw["deep_scan"].get("min_duration_coverage", 0.65)),
continuity_seed_offsets_s=tuple(
float(v) for v in cv_raw["deep_scan"].get(
"continuity_seed_offsets_s",
[-1.0, 0.0, 0.5, 1.0, 1.5, 2.0, 3.0],
)
),
scene_seed_top_k=int(cv_raw["deep_scan"].get("scene_seed_top_k", 30)),
scene_seed_points_per_scene=int(cv_raw["deep_scan"].get("scene_seed_points_per_scene", 6)),
content_rerank_candidate_count=int(cv_raw["deep_scan"].get("content_rerank_candidate_count", 100)),
skip_coarse_scan_with_weighted_seeds=bool(cv_raw["deep_scan"].get("skip_coarse_scan_with_weighted_seeds", False)),
max_refine_candidates=int(cv_raw["deep_scan"].get("max_refine_candidates", 6)),
match_method=int(cv_raw["deep_scan"]["match_method"]),
refine_window_seconds=float(cv_raw["deep_scan"].get("refine_window_seconds", 0.6)),
refine_step_seconds=float(cv_raw["deep_scan"]["refine_step_seconds"]),
content_align_window_seconds=float(cv_raw["deep_scan"].get("content_align_window_seconds", 0.48)),
content_align_sample_step_s=float(cv_raw["deep_scan"].get("content_align_sample_step_s", 0.28)),
content_validation_weight=float(cv_raw["deep_scan"].get("content_validation_weight", 0.35)),
provisional_content_threshold=float(cv_raw["deep_scan"].get("provisional_content_threshold", 0.42)),
start_tie_break_score_delta=float(cv_raw["deep_scan"].get("start_tie_break_score_delta", 0.015)),
start_preroll_frames=int(cv_raw["deep_scan"].get("start_preroll_frames", 0)),
sequence_candidate_count=int(cv_raw["deep_scan"].get("sequence_candidate_count", 240)),
sequence_min_distance_s=float(cv_raw["deep_scan"].get("sequence_min_distance_s", 1.0)),
span_sample_step_s=float(cv_raw["deep_scan"].get("span_sample_step_s", 0.08)),
trim_tail_frames=int(cv_raw["deep_scan"].get("trim_tail_frames", 2)),
scene_boundary_epsilon_s=float(cv_raw["deep_scan"].get("scene_boundary_epsilon_s", 0.12)),
scoreable_luma_mean_min=float(cv_raw["deep_scan"].get("scoreable_luma_mean_min", 24.0)),
scoreable_luma_p90_min=float(cv_raw["deep_scan"].get("scoreable_luma_p90_min", 58.0)),
scoreable_contrast_min=float(cv_raw["deep_scan"].get("scoreable_contrast_min", 24.0)),
)
scene_detection = SceneDetectionConfig(
content_threshold=float(sd_raw["content_threshold"]),
min_scene_duration_s=float(sd_raw["min_scene_duration_s"]),
)
whisper = WhisperConfig(
model=whisper_raw["model"],
language=whisper_raw["language"],
device=whisper_raw["device"],
compute_type=whisper_raw["compute_type"],
)
# Resolve API key: env var takes precedence over config (which shouldn't have it).
# Supported env vars (in priority order):
# OPENROUTER_API_KEY → for provider = openrouter
# OPENAI_API_KEY → for provider = openai
# LLM_API_KEY → universal fallback
_provider = llm_raw["provider"]
_api_key = (
os.environ.get("OPENROUTER_API_KEY", "")
if _provider == "openrouter"
else os.environ.get("OPENAI_API_KEY", "")
if _provider == "openai"
else ""
) or os.environ.get("LLM_API_KEY", "")
llm = LLMConfig(
provider=_provider,
base_url=llm_raw["base_url"],
model=llm_raw["model"],
timeout_seconds=int(llm_raw["timeout_seconds"]),
temperature=float(llm_raw["temperature"]),
max_tokens=int(llm_raw["max_tokens"]),
api_key=_api_key,
)
vision_provider = vision_raw.get("provider", _provider if _provider in ("openai", "openrouter") else "openrouter")
vision_api_key = (
os.environ.get("OPENROUTER_API_KEY", "")
if vision_provider == "openrouter"
else os.environ.get("OPENAI_API_KEY", "")
) or os.environ.get("VISION_API_KEY", "") or os.environ.get("LLM_API_KEY", "")
vision = VisionConfig(
enabled=bool(vision_raw.get("enabled", False)),
provider=vision_provider,
base_url=str(vision_raw.get("base_url", llm.base_url)),
model=str(vision_raw.get("model", llm.model)),
timeout_seconds=int(vision_raw.get("timeout_seconds", llm.timeout_seconds)),
temperature=float(vision_raw.get("temperature", 0.0)),
max_tokens=int(vision_raw.get("max_tokens", 350)),
scene_candidate_top_k=int(vision_raw.get("scene_candidate_top_k", 8)),
max_new_descriptions_per_run=int(vision_raw.get("max_new_descriptions_per_run", 12)),
max_seed_scenes=int(vision_raw.get("max_seed_scenes", 3)),
seed_points_per_scene=int(vision_raw.get("seed_points_per_scene", 12)),
seed_score=float(vision_raw.get("seed_score", 0.88)),
max_refine_candidates=int(vision_raw.get("max_refine_candidates", 6)),
local_scan_step_s=float(vision_raw.get("local_scan_step_s", 0.12)),
local_scan_max_points_per_scene=int(vision_raw.get("local_scan_max_points_per_scene", 180)),
local_scan_top_candidates=int(vision_raw.get("local_scan_top_candidates", 18)),
local_scan_tie_break_score_delta=float(vision_raw.get("local_scan_tie_break_score_delta", 0.08)),
multi_shot_cut_corr_threshold=float(vision_raw.get("multi_shot_cut_corr_threshold", 0.20)),
multi_shot_boundary_tolerance_s=float(vision_raw.get("multi_shot_boundary_tolerance_s", 0.20)),
fullscan_fallback=bool(vision_raw.get("fullscan_fallback", False)),
content_threshold=float(vision_raw.get("content_threshold", 0.22)),
similarity_threshold=float(vision_raw.get("similarity_threshold", 0.18)),
api_key=vision_api_key,
)
export = ExportConfig(
fcpxml_version=str(export_raw["fcpxml_version"]),
edl_frame_rate=float(export_raw["edl_frame_rate"]),
output_format=export_raw["output_format"],
)
return AppConfig(
project_name=project["name"],
version=project["version"],
log_level=project["log_level"],
paths=paths,
video=video,
cv=CVConfig(vibe_check=vibe_check, deep_scan=deep_scan),
scene_detection=scene_detection,
whisper=whisper,
llm=llm,
vision=vision,
export=export,
)
+287
View File
@@ -0,0 +1,287 @@
"""
src/core/models.py — Canonical data models for AI Trailer Generator v2
Rules:
- Every model is a frozen dataclass (immutable after creation).
- All fields are strictly typed; no bare dicts or untyped lists.
- Seconds are always float; frame numbers are always int.
- Confidence scores live in [0.0, 1.0].
"""
from __future__ import annotations
from dataclasses import dataclass, field
from enum import Enum, auto
from pathlib import Path
from typing import Optional
# ===========================================================================
# Enumerations
# ===========================================================================
class MatchMethod(Enum):
"""CV template matching method (mirrors cv2.TM_* constants)."""
TM_SQDIFF = 0
TM_SQDIFF_NORMED = 1
TM_CCORR = 2
TM_CCORR_NORMED = 3
TM_CCOEFF = 4
TM_CCOEFF_NORMED = 5
class BeatType(Enum):
"""Narrative role of a trailer beat (for dramaturgy / LLM use only)."""
HOOK = auto() # Opening attention grabber
SETUP = auto() # World / character introduction
CONFLICT = auto() # Inciting incident / rising tension
CLIMAX = auto() # Peak action / emotion
RESOLUTION = auto() # Cool-down / tagline
UNKNOWN = auto()
class ExportFormat(Enum):
FCPXML = "fcpxml"
EDL = "edl"
BOTH = "both"
# ===========================================================================
# Phase 0 — Source-movie scene index
# ===========================================================================
@dataclass(frozen=True)
class DialogueLine:
"""Single transcribed line from Whisper output."""
start_s: float # onset in seconds
end_s: float # offset in seconds
text: str # verbatim transcript
speaker: Optional[str] = None # diarisation label if available
@property
def duration_s(self) -> float:
return self.end_s - self.start_s
@dataclass(frozen=True)
class Scene:
"""
One detected scene in the source movie.
Produced by PySceneDetect; enriched by Whisper dialogue and
(optionally) perceptual hashes during the Vibe Check phase.
"""
scene_id: int # zero-based index in source movie
source_path: Path # absolute path to the source video file
start_s: float # scene start in seconds
end_s: float # scene end in seconds
start_frame: int # first frame number
end_frame: int # last frame number
# Populated after Vibe Check fingerprinting
luma_hist: Optional[bytes] = None # serialised np.ndarray (pickle)
sat_hist: Optional[bytes] = None
phash: Optional[str] = None # 64-bit hex string
# Populated after Whisper pass
dialogue: tuple[DialogueLine, ...] = field(default_factory=tuple)
@property
def duration_s(self) -> float:
return self.end_s - self.start_s
@property
def midpoint_s(self) -> float:
return self.start_s + self.duration_s / 2.0
def __repr__(self) -> str:
return (
f"Scene(id={self.scene_id}, "
f"{self.start_s:.2f}s{self.end_s:.2f}s, "
f"dur={self.duration_s:.2f}s)"
)
# ===========================================================================
# Phase 1 — Reference-trailer beat
# ===========================================================================
@dataclass(frozen=True)
class TrailerBeat:
"""
One cut / segment in the reference trailer.
The 'beat' is the atomic unit of a trailer: it maps exactly to one
clip that will later be sourced from the original movie.
"""
beat_id: int
trailer_path: Path
start_s: float
end_s: float
start_frame: int
end_frame: int
beat_type: BeatType = BeatType.UNKNOWN # set by LLM dramaturgy pass
# Visual fingerprints of the *middle* frame (populated by CV pipeline)
luma_hist: Optional[bytes] = None
sat_hist: Optional[bytes] = None
phash: Optional[str] = None
# Dialogue extracted from this beat
dialogue: tuple[DialogueLine, ...] = field(default_factory=tuple)
@property
def duration_s(self) -> float:
return self.end_s - self.start_s
@property
def midpoint_s(self) -> float:
return self.start_s + self.duration_s / 2.0
def __repr__(self) -> str:
return (
f"TrailerBeat(id={self.beat_id}, "
f"{self.beat_type.name}, "
f"{self.start_s:.2f}s{self.end_s:.2f}s)"
)
# ===========================================================================
# Phase 2 — CV match result
# ===========================================================================
@dataclass(frozen=True)
class VibeHit:
"""
Intermediate result from Phase 1 (Vibe Check — histogram/pHash).
Represents a *candidate* scene that passed the coarse filter.
Not yet a confirmed match; forwarded to Deep Scan.
"""
beat_id: int
scene_id: int
hist_score: float # histogram similarity [0.0, 1.0] (CORREL method)
phash_distance: int # Hamming distance [0, 64]; lower = more similar
combined_score: float # weighted aggregate used for ranking
@dataclass(frozen=True)
class MatchSegment:
"""
One source-backed visual island inside a trailer beat.
Some trailer beats contain multiple shots separated by fades/title frames.
A single continuous source in/out cannot represent those beats accurately.
"""
trailer_offset_s: float
duration_s: float
scene_id: int
in_point_s: float
out_point_s: float
match_score: float
is_confirmed: bool = True
@dataclass(frozen=True)
class MatchResult:
"""
Final, confirmed match from Phase 2 (Deep Scan — template matching).
One MatchResult per TrailerBeat: the best frame-accurate hit found
inside the source movie.
"""
beat_id: int # which trailer beat was matched
scene_id: int # which source scene contains the match
source_path: Path # absolute path to source video
# Frame-accurate in-point / out-point in the SOURCE movie
in_point_s: float # matched frame onset in source seconds
out_point_s: float # computed out-point (in_point + beat duration)
in_point_frame: int # matched frame number in source movie
# Match quality
match_score: float # cv2.matchTemplate peak value [0.0, 1.0]
match_location: tuple[int, int] = field(default_factory=lambda: (0, 0))
# (x, y) pixel location of the best match within the source frame
# Provenance
vibe_hit: Optional[VibeHit] = None # the candidate that led here
is_confirmed: bool = True
segments: tuple[MatchSegment, ...] = field(default_factory=tuple)
@property
def duration_s(self) -> float:
return self.out_point_s - self.in_point_s
def __repr__(self) -> str:
return (
f"MatchResult(beat={self.beat_id} → scene={self.scene_id}, "
f"in={self.in_point_s:.3f}s, score={self.match_score:.3f})"
)
# ===========================================================================
# Phase 3 — Edit timeline (pre-export)
# ===========================================================================
@dataclass(frozen=True)
class EditClip:
"""
One clip on the final edit timeline, ready for FCPXML / EDL export.
Combines beat dramaturgy + the CV-confirmed source in/out points.
"""
clip_index: int # position on the timeline (0-based)
beat: TrailerBeat
match: MatchResult
# Timeline position (in the OUTPUT trailer)
timeline_start_s: float
timeline_end_s: float
source_duration_s: float | None = None
trailer_tail_s: float = 0.0
# Optional audio override (e.g. VO or music)
audio_path: Optional[Path] = None
audio_offset_s: float = 0.0
@property
def timeline_duration_s(self) -> float:
return self.timeline_end_s - self.timeline_start_s
@property
def source_timeline_duration_s(self) -> float:
if self.source_duration_s is not None:
return max(0.0, self.source_duration_s)
return self.timeline_duration_s
def __repr__(self) -> str:
return (
f"EditClip(#{self.clip_index}, "
f"tl={self.timeline_start_s:.2f}s{self.timeline_end_s:.2f}s, "
f"src={self.match.in_point_s:.3f}s)"
)
@dataclass(frozen=True)
class EditTimeline:
"""
The complete ordered sequence of EditClips that forms the trailer.
Passed to the export layer (FCPXML / EDL writer).
"""
title: str
frame_rate: float # e.g. 23.976
clips: tuple[EditClip, ...] # ordered by clip_index
@property
def total_duration_s(self) -> float:
if not self.clips:
return 0.0
last = max(self.clips, key=lambda c: c.timeline_end_s)
return last.timeline_end_s
@property
def clip_count(self) -> int:
return len(self.clips)
+1
View File
@@ -0,0 +1 @@
# src.cv package — Computer Vision engine
+240
View File
@@ -0,0 +1,240 @@
from __future__ import annotations
import math
import shutil
import subprocess
from pathlib import Path
import numpy as np
from PIL import Image, ImageFilter, ImageOps
from src.core.config import AppConfig
from src.core.models import TrailerBeat
def _run(cmd: list[str]) -> None:
result = subprocess.run(cmd, capture_output=True)
if result.returncode != 0:
raise RuntimeError(result.stderr.decode(errors="replace"))
def _extract_frames(
video_path: Path,
start_s: float,
duration_s: float,
fps: float,
out_dir: Path,
prefix: str,
) -> None:
out_dir.mkdir(parents=True, exist_ok=True)
_run([
"ffmpeg", "-y", "-loglevel", "error",
"-ss", str(max(0.0, start_s)),
"-i", str(video_path),
"-t", str(max(0.04, duration_s)),
"-vf", f"scale=640:360,fps={fps}",
str(out_dir / f"{prefix}_%04d.png"),
])
def _cropped_image(path: Path, cfg: AppConfig) -> Image.Image:
image = Image.open(path).convert("L")
image = _trim_dark_borders(image)
w, h = image.size
# Final validation should see the composition. The broader text-safe crop
# used for coarse search can remove bodies, furniture and lower-frame
# spatial cues that distinguish otherwise similar face/window shots.
top = int(h * 0.05)
bottom = int(h * 0.95)
return image.crop((0, top, w, bottom))
def _trim_dark_borders(image: Image.Image) -> Image.Image:
"""Remove encoded black matte/pillarbox borders before content scoring."""
gray = image.convert("L")
arr = np.asarray(gray, dtype=np.float32)
if arr.size == 0:
return image
h, w = arr.shape[:2]
col_signal = np.percentile(arr, 90, axis=0)
row_signal = np.percentile(arr, 90, axis=1)
active_cols = np.where(col_signal > 18.0)[0]
active_rows = np.where(row_signal > 18.0)[0]
if active_cols.size >= max(8, int(w * 0.35)):
x0 = max(0, int(active_cols[0]) - 2)
x1 = min(w, int(active_cols[-1]) + 3)
else:
x0, x1 = 0, w
if active_rows.size >= max(8, int(h * 0.35)):
y0 = max(0, int(active_rows[0]) - 2)
y1 = min(h, int(active_rows[-1]) + 3)
else:
y0, y1 = 0, h
if x1 - x0 < int(w * 0.35) or y1 - y0 < int(h * 0.35):
return image
return image.crop((x0, y0, x1, y1))
def _feature(path: Path, cfg: AppConfig) -> np.ndarray:
image = _cropped_image(path, cfg)
w, h = image.size
image = image.crop((int(w * 0.10), int(h * 0.10), int(w * 0.90), int(h * 0.90)))
image = ImageOps.equalize(image).filter(ImageFilter.FIND_EDGES).resize((160, 62))
arr = np.asarray(image, dtype=np.float32)
return (arr - arr.mean()) / (arr.std() + 1e-6)
def _luma_feature(path: Path, cfg: AppConfig) -> np.ndarray:
image = ImageOps.equalize(_cropped_image(path, cfg)).resize((160, 80))
arr = np.asarray(image, dtype=np.float32)
return (arr - arr.mean()) / (arr.std() + 1e-6)
def _hist_feature(path: Path, cfg: AppConfig) -> np.ndarray:
image = _trim_dark_borders(Image.open(path).convert("RGB"))
w, h = image.size
top = int(h * 0.05)
bottom = int(h * 0.95)
arr = np.asarray(image.crop((0, top, w, bottom)).resize((160, 80)), dtype=np.float32)
hist_parts = []
for channel in range(3):
hist, _ = np.histogram(arr[:, :, channel], bins=32, range=(0, 255))
hist = hist.astype(np.float32)
hist_parts.append(hist / (hist.sum() + 1e-6))
return np.concatenate(hist_parts)
def _spatial_hist_feature(path: Path, cfg: AppConfig) -> np.ndarray:
image = _trim_dark_borders(Image.open(path).convert("RGB"))
w, h = image.size
top = int(h * 0.05)
bottom = int(h * 0.95)
arr = np.asarray(image.crop((0, top, w, bottom)).resize((160, 80)), dtype=np.float32)
cells = []
grid_y = 4
grid_x = 4
cell_h = arr.shape[0] // grid_y
cell_w = arr.shape[1] // grid_x
for gy in range(grid_y):
for gx in range(grid_x):
cell = arr[gy * cell_h:(gy + 1) * cell_h, gx * cell_w:(gx + 1) * cell_w, :]
for channel in range(3):
hist, _ = np.histogram(cell[:, :, channel], bins=16, range=(0, 255))
hist = hist.astype(np.float32)
cells.append(hist / (hist.sum() + 1e-6))
return np.concatenate(cells)
def _is_dark(path: Path, cfg: AppConfig) -> bool:
image = _trim_dark_borders(Image.open(path).convert("L"))
w, h = image.size
top = int(h * 0.05)
bottom = int(h * 0.95)
arr = np.asarray(image.crop((0, top, w, bottom)), dtype=np.float32)
return float(arr.mean()) < 28.0 and float(np.percentile(arr, 90)) < 58.0
def _corr(a: np.ndarray, b: np.ndarray) -> float:
return float((a * b).mean())
def _hist_intersection(a: np.ndarray, b: np.ndarray) -> float:
return float(np.minimum(a, b).sum() / (np.maximum(a, b).sum() + 1e-6))
def _paired_frame_score(ref_path: Path, src_path: Path, cfg: AppConfig) -> float:
edge_score = _corr(_feature(ref_path, cfg), _feature(src_path, cfg))
luma_score = _corr(_luma_feature(ref_path, cfg), _luma_feature(src_path, cfg))
hist_score = _hist_intersection(_hist_feature(ref_path, cfg), _hist_feature(src_path, cfg))
spatial_score = _hist_intersection(_spatial_hist_feature(ref_path, cfg), _spatial_hist_feature(src_path, cfg))
return (
edge_score * 0.24
+ luma_score * 0.24
+ hist_score * 0.14
+ spatial_score * 0.38
)
def align_cached_match_by_content(
beat: TrailerBeat,
estimated_in_point_s: float,
cfg: AppConfig,
search_window_s: float | None = None,
fps: float = 25.0,
) -> tuple[float, float]:
"""
Measure the local source offset directly from rendered frame content.
This is intentionally independent from the global OpenCV matcher: it only
needs FFmpeg, Pillow and numpy, and it scans a small window around an
already plausible candidate.
"""
window_s = (
search_window_s
if search_window_s is not None
else cfg.cv.deep_scan.content_align_window_seconds
)
sample_step_s = max(1.0 / fps, cfg.cv.deep_scan.content_align_sample_step_s)
source_start_s = max(0.0, estimated_in_point_s - window_s)
source_duration_s = beat.duration_s + (2.0 * window_s) + 0.5
tmp = cfg.paths.output_dir / "align_tmp" / f"beat_{beat.beat_id:03d}"
shutil.rmtree(tmp, ignore_errors=True)
tmp.mkdir(parents=True, exist_ok=True)
try:
ref_dir = tmp / "ref"
src_dir = tmp / "src"
_extract_frames(beat.trailer_path, beat.start_s, beat.duration_s, fps, ref_dir, "ref")
_extract_frames(cfg.paths.source_movie, source_start_s, source_duration_s, fps, src_dir, "src")
ref_frames = sorted(ref_dir.glob("ref_*.png"))
src_frames = sorted(src_dir.glob("src_*.png"))
if not ref_frames or not src_frames:
return estimated_in_point_s, 0.0
sample_frame_step = max(1, int(round(sample_step_s * fps)))
min_matchable_frames = max(1, len(ref_frames) - int(round(0.24 * fps)))
template_offsets: list[int] = []
templates: list[tuple[int, np.ndarray]] = []
for idx in range(0, min_matchable_frames, sample_frame_step):
path = ref_frames[idx]
if _is_dark(path, cfg):
continue
template_offsets.append(idx)
templates.append((idx, _feature(path, cfg)))
if len(templates) < 3:
template_offsets = list(range(0, min_matchable_frames, sample_frame_step))
templates = [
(idx, _feature(ref_frames[idx], cfg))
for idx in template_offsets
]
search_start_frame = 0
search_end_frame = max(0, len(src_frames) - min_matchable_frames)
estimated_frame = int(round((estimated_in_point_s - source_start_s) * fps))
best_frame = estimated_frame
best_score = -1.0
for candidate_frame in range(search_start_frame, search_end_frame + 1):
scores: list[float] = []
for offset_frame in template_offsets:
src_idx = candidate_frame + offset_frame
if src_idx < 0 or src_idx >= len(src_frames):
break
scores.append(_paired_frame_score(ref_frames[offset_frame], src_frames[src_idx], cfg))
if len(scores) < max(3, math.ceil(len(templates) * 0.65)):
continue
avg_score = sum(scores) / len(scores)
min_score = min(scores)
score = (avg_score * 0.68) + (min_score * 0.32)
if score > best_score + 0.003:
best_score = score
best_frame = candidate_frame
elif score >= best_score - 0.003 and abs(candidate_frame - estimated_frame) < abs(best_frame - estimated_frame):
best_frame = candidate_frame
return source_start_s + (best_frame / fps), max(0.0, best_score)
finally:
shutil.rmtree(tmp, ignore_errors=True)
+253
View File
@@ -0,0 +1,253 @@
"""
src/cv/deep_scan.py — Phase 2: Frame-accurate template matching (Deep Scan)
Responsibility:
Given a TrailerBeat and a ranked list of VibeHit candidates, open the
source video and scan each candidate scene in two passes:
1. Coarse pass: step through at coarse_step_seconds intervals,
comparing via cv2.matchTemplate.
2. Refine pass: if coarse score > threshold, zoom in ± refine_window_seconds
at refine_step_seconds resolution to pin the exact in-point.
Returns a MatchResult if a confident hit is found, otherwise None.
"""
from __future__ import annotations
import logging
from pathlib import Path
from typing import Sequence
import cv2
import numpy as np
from src.core.config import AppConfig
from src.core.models import MatchResult, Scene, TrailerBeat, VibeHit
from src.cv.fingerprinting import text_safe_crop
from src.cv.frame_extractor import (
grab_frame_at,
grab_frame_at_path,
iter_frames_stepped,
open_video,
)
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Template preparation
# ---------------------------------------------------------------------------
def _prepare_template(
trailer_beat: TrailerBeat,
cfg: AppConfig,
proxy_w: int,
proxy_h: int,
) -> np.ndarray | None:
"""
Extract, crop, and resize the representative frame from the trailer beat.
This frame becomes the cv2.matchTemplate "needle".
"""
vc = cfg.cv.vibe_check
ds = cfg.cv.deep_scan
beat_frame = grab_frame_at_path(
trailer_beat.trailer_path,
trailer_beat.midpoint_s,
)
if beat_frame is None:
logger.warning("Beat %d: cannot decode midpoint frame.", trailer_beat.beat_id)
return None
cropped = text_safe_crop(beat_frame, vc.crop_top_fraction, vc.crop_bottom_fraction)
resized = cv2.resize(cropped, (proxy_w, proxy_h), interpolation=cv2.INTER_AREA)
# Crop the template by 10% on all sides to allow sliding window (translation invariance)
# when matching against the source movie, which might have slight pan/scan shifts.
margin_y = int(proxy_h * 0.10)
margin_x = int(proxy_w * 0.10)
template = resized[margin_y : proxy_h - margin_y, margin_x : proxy_w - margin_x]
return template
# ---------------------------------------------------------------------------
# Single-frame match
# ---------------------------------------------------------------------------
def _match_frame(
source_frame: np.ndarray,
template: np.ndarray,
method: int,
proxy_w: int,
proxy_h: int,
crop_top: float,
crop_bottom: float,
) -> tuple[float, tuple[int, int]]:
"""
Run cv2.matchTemplate between *source_frame* and *template*.
Returns:
(score, (x, y)) where score ∈ [0, 1] for CCOEFF_NORMED.
"""
cropped = text_safe_crop(source_frame, crop_top, crop_bottom)
haystack = cv2.resize(cropped, (proxy_w, proxy_h), interpolation=cv2.INTER_AREA)
# Match the slightly smaller template inside the full proxy frame
result = cv2.matchTemplate(haystack, template, method)
_, max_val, _, max_loc = cv2.minMaxLoc(result)
return float(max_val), (int(max_loc[0]), int(max_loc[1]))
# ---------------------------------------------------------------------------
# Deep Scan core
# ---------------------------------------------------------------------------
def scan_scene(
beat: TrailerBeat,
scene: Scene,
template: np.ndarray,
cfg: AppConfig,
) -> tuple[float, float, tuple[int, int]] | None:
"""
Scan one source scene in two passes (coarse → refine).
Returns:
(best_timestamp_s, best_score, best_location) or None if no hit.
"""
ds = cfg.cv.deep_scan
vc = cfg.cv.vibe_check
proxy_w = cfg.video.proxy_width
proxy_h = cfg.video.proxy_height
best_t = scene.start_s
best_score = 0.0
best_loc = (0, 0)
# ---- Coarse pass --------------------------------------------------------
with open_video(scene.source_path) as cap:
for t, frame in iter_frames_stepped(
cap, scene.start_s, scene.end_s, ds.coarse_step_seconds
):
score, loc = _match_frame(
frame, template, ds.match_method,
proxy_w, proxy_h, vc.crop_top_fraction, vc.crop_bottom_fraction,
)
if score > best_score:
best_score = score
best_t = t
best_loc = loc
if best_score < ds.match_threshold:
return None # scene doesn't contain a match worth refining
# ---- Refine pass ----------------------------------------------------
refine_start = max(scene.start_s, best_t - ds.refine_window_seconds)
refine_end = min(scene.end_s, best_t + ds.refine_window_seconds)
refined_t = best_t
refined_score = best_score
refined_loc = best_loc
for t, frame in iter_frames_stepped(
cap, refine_start, refine_end, ds.refine_step_seconds
):
score, loc = _match_frame(
frame, template, ds.match_method,
proxy_w, proxy_h, vc.crop_top_fraction, vc.crop_bottom_fraction,
)
if score > refined_score:
refined_score = score
refined_t = t
refined_loc = loc
logger.debug(
"Beat %d → Scene %d: coarse=%.3f refined=%.3f @%.3fs",
beat.beat_id, scene.scene_id, best_score, refined_score, refined_t,
)
return refined_t, refined_score, refined_loc
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def run_deep_scan(
beat: TrailerBeat,
candidates: Sequence[VibeHit],
scenes_by_id: dict[int, Scene],
cfg: AppConfig,
) -> MatchResult | None:
"""
Phase 2 Deep Scan: iterate over Vibe Check candidates and template-match.
Args:
beat: The trailer beat to source.
candidates: Ranked VibeHit list from Phase 1 (best first).
scenes_by_id: Lookup dict: scene_id → Scene.
cfg: Application configuration.
Returns:
The best MatchResult above threshold, or None if no match found.
"""
proxy_w = cfg.video.proxy_width
proxy_h = cfg.video.proxy_height
template = _prepare_template(beat, cfg, proxy_w, proxy_h)
if template is None:
return None
best_result: MatchResult | None = None
for vibe_hit in candidates:
scene = scenes_by_id.get(vibe_hit.scene_id)
if scene is None:
logger.warning("VibeHit references unknown scene_id=%d", vibe_hit.scene_id)
continue
hit = scan_scene(beat, scene, template, cfg)
if hit is None:
continue
in_point_s, match_score, match_loc = hit
# Frame number: approximate via FPS (refined later if needed)
from src.cv.frame_extractor import get_video_info
info = get_video_info(scene.source_path)
fps = float(info["fps"]) or 24.0
in_point_frame = int(in_point_s * fps)
candidate_result = MatchResult(
beat_id=beat.beat_id,
scene_id=scene.scene_id,
source_path=scene.source_path,
in_point_s=in_point_s,
out_point_s=in_point_s + beat.duration_s,
in_point_frame=in_point_frame,
match_score=match_score,
match_location=match_loc,
vibe_hit=vibe_hit,
)
if best_result is None or match_score > best_result.match_score:
best_result = candidate_result
# Early exit: if score is very high, no need to check other candidates
if match_score >= 0.90:
logger.info(
"Beat %d: early-exit match (score=%.3f) in scene %d @%.3fs",
beat.beat_id, match_score, scene.scene_id, in_point_s,
)
break
if best_result:
logger.info("Beat %d → MATCH scene=%d score=%.3f in=%.3fs",
beat.beat_id, best_result.scene_id,
best_result.match_score, best_result.in_point_s)
else:
logger.warning("Beat %d → NO MATCH found in %d candidates.",
beat.beat_id, len(candidates))
return best_result
+228
View File
@@ -0,0 +1,228 @@
"""
src/cv/fingerprinting.py — Image fingerprinting for the Vibe Check phase
Responsibilities (Single Responsibility Principle):
- Text-Safe Crop: strip top/bottom fractions to hide logos & letterbox
- Luma + Saturation histogram extraction (scale-invariant)
- Perceptual hash (pHash) via imagehash
This module is PURELY functional — no file I/O, no video decoding,
no search logic. It takes numpy arrays and returns numeric descriptors.
"""
from __future__ import annotations
import pickle
from typing import TYPE_CHECKING
import cv2
import numpy as np
try:
import imagehash
from PIL import Image as PilImage
_HAS_IMAGEHASH = True
except ImportError:
_HAS_IMAGEHASH = False
if TYPE_CHECKING:
from src.core.config import VibeCheckConfig
# ---------------------------------------------------------------------------
# Text-Safe Crop
# ---------------------------------------------------------------------------
def text_safe_crop(
frame: np.ndarray,
crop_top: float,
crop_bottom: float,
) -> np.ndarray:
"""
Remove the top and bottom fractions of a frame.
This eliminates title cards, logos (top) and letterbox / subtitles
(bottom) before any colour analysis, preventing false positives.
Args:
frame: BGR or greyscale frame as (H, W[, C]) ndarray.
crop_top: Fraction [0, 1) of height to remove from the top.
crop_bottom: Fraction [0, 1) of height to remove from the bottom.
Returns:
Cropped view (no copy — avoids memory overhead).
Raises:
ValueError: If crop fractions are out of range or overlap.
"""
if not (0.0 <= crop_top < 1.0):
raise ValueError(f"crop_top must be in [0, 1); got {crop_top}")
if not (0.0 <= crop_bottom < 1.0):
raise ValueError(f"crop_bottom must be in [0, 1); got {crop_bottom}")
if crop_top + crop_bottom >= 1.0:
raise ValueError(
f"crop_top ({crop_top}) + crop_bottom ({crop_bottom}) must be < 1.0"
)
h = frame.shape[0]
y_start = int(h * crop_top)
y_end = int(h * (1.0 - crop_bottom))
return frame[y_start:y_end]
# ---------------------------------------------------------------------------
# Histogram extraction
# ---------------------------------------------------------------------------
def extract_hs_histograms(
frame_bgr: np.ndarray,
bins_luma: int | None = None,
bins_sat: int | None = None,
*,
bins_hue: int | None = None,
) -> tuple[np.ndarray, np.ndarray]:
"""
Compute normalised Luma (Value) and Saturation histograms from a BGR frame.
We use Luma and Saturation (ignoring Hue) because Hue is highly sensitive
to color grading differences between the trailer and the source movie.
Args:
frame_bgr: BGR frame (H, W, 3) uint8.
bins_luma: Number of histogram bins for the Luma channel [0, 256).
bins_hue: Backwards-compatible alias for bins_luma.
bins_sat: Number of histogram bins for the Saturation channel [0, 256).
Returns:
(luma_hist, sat_hist) — each a 1-D float32 ndarray, L2-normalised.
"""
if bins_luma is None:
bins_luma = bins_hue
elif bins_hue is not None and bins_hue != bins_luma:
raise ValueError("bins_hue is an alias for bins_luma; pass only one value")
if bins_luma is None or bins_sat is None:
raise TypeError("bins_luma/bins_hue and bins_sat are required")
hsv = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2HSV)
luma = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY)
# Use perceptual grayscale luma rather than HSV Value. Value would make
# saturated red and blue look identical, weakening the scene-level filter.
luma_hist = cv2.calcHist(
[luma], [0], None, [bins_luma], [0, 256]
).flatten().astype(np.float32)
sat_hist = cv2.calcHist(
[hsv], [1], None, [bins_sat], [0, 256]
).flatten().astype(np.float32)
# L2-normalise so scene size doesn't affect scores
cv2.normalize(luma_hist, luma_hist, alpha=1.0, norm_type=cv2.NORM_L2)
cv2.normalize(sat_hist, sat_hist, alpha=1.0, norm_type=cv2.NORM_L2)
return luma_hist, sat_hist
def compare_histograms(
hist_a: np.ndarray,
hist_b: np.ndarray,
method: int,
) -> float:
"""
Compare two histograms using cv2.compareHist.
Args:
hist_a, hist_b: 1-D float32 ndarrays of identical shape.
method: cv2.HISTCMP_* constant (e.g. cv2.HISTCMP_CORREL = 0).
Returns:
Raw score from cv2.compareHist (range depends on method).
For CORREL: [-1, 1], higher = more similar.
For BHATTACHARYYA: [0, 1], lower = more similar.
"""
return float(cv2.compareHist(hist_a, hist_b, method))
# ---------------------------------------------------------------------------
# Perceptual Hash
# ---------------------------------------------------------------------------
def compute_phash(frame_bgr: np.ndarray, hash_size: int = 8) -> str:
"""
Compute a perceptual hash (pHash) of a BGR frame.
pHash is rotation- and scale-invariant; it catches visual similarity
even when resolution differs between trailer proxy and source movie.
Args:
frame_bgr: BGR frame (H, W, 3) uint8.
hash_size: DCT block size; 8 → 64-bit hash (default).
Returns:
Hex string representation of the 64-bit hash (e.g. "f8e0e0e0...").
Raises:
RuntimeError: If imagehash is not installed.
"""
if not _HAS_IMAGEHASH:
raise RuntimeError(
"imagehash is not installed. Run: pip install imagehash"
)
rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)
pil = PilImage.fromarray(rgb)
phash = imagehash.phash(pil, hash_size=hash_size)
return str(phash)
def phash_distance(hash_a: str, hash_b: str) -> int:
"""
Compute Hamming distance between two pHash hex strings.
Args:
hash_a, hash_b: Hex strings as returned by compute_phash().
Returns:
Integer Hamming distance [0, 64]. 0 = identical.
"""
if not _HAS_IMAGEHASH:
raise RuntimeError("imagehash is not installed.")
return int(imagehash.hex_to_hash(hash_a) - imagehash.hex_to_hash(hash_b))
# ---------------------------------------------------------------------------
# Serialisation helpers (histograms ↔ bytes for caching)
# ---------------------------------------------------------------------------
def hist_to_bytes(hist: np.ndarray) -> bytes:
"""Serialise a numpy histogram array for storage in a Scene/Beat model."""
return pickle.dumps(hist, protocol=pickle.HIGHEST_PROTOCOL)
def bytes_to_hist(data: bytes) -> np.ndarray:
"""Deserialise a numpy histogram array from bytes."""
return pickle.loads(data) # noqa: S301 (trusted internal cache only)
# ---------------------------------------------------------------------------
# High-level convenience: fingerprint one frame using config
# ---------------------------------------------------------------------------
def fingerprint_frame(
frame_bgr: np.ndarray,
cfg: "VibeCheckConfig",
) -> tuple[bytes, bytes, str]:
"""
Apply Text-Safe Crop, histogram extraction, and pHash in one call.
Args:
frame_bgr: Full BGR frame (H, W, 3) uint8.
cfg: VibeCheckConfig carrying crop fractions and bin counts.
Returns:
(luma_hist_bytes, sat_hist_bytes, phash_hex)
"""
cropped = text_safe_crop(frame_bgr, cfg.crop_top_fraction, cfg.crop_bottom_fraction)
luma_hist, sat_hist = extract_hs_histograms(cropped, cfg.hist_bins_hue, cfg.hist_bins_saturation)
phash_hex = compute_phash(cropped)
return hist_to_bytes(luma_hist), hist_to_bytes(sat_hist), phash_hex
+172
View File
@@ -0,0 +1,172 @@
"""
src/cv/frame_extractor.py — Low-level video frame access
Responsibility:
Provide a thin, testable wrapper around cv2.VideoCapture for:
- seeking to an exact timestamp and returning one BGR frame
- iterating frames with a configurable step size
- extracting the "representative" middle frame of a Scene / TrailerBeat
No fingerprinting, no matching — only raw frame delivery.
"""
from __future__ import annotations
import logging
from contextlib import contextmanager
from pathlib import Path
from typing import Generator, Iterator
import cv2
import numpy as np
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Context-managed VideoCapture
# ---------------------------------------------------------------------------
@contextmanager
def open_video(path: Path) -> Generator[cv2.VideoCapture, None, None]:
"""
Context manager that opens a VideoCapture and guarantees release.
Args:
path: Absolute path to the video file.
Raises:
FileNotFoundError: If the file does not exist.
RuntimeError: If OpenCV cannot open the file.
"""
if not path.exists():
raise FileNotFoundError(f"Video not found: {path}")
cap = cv2.VideoCapture(str(path))
if not cap.isOpened():
raise RuntimeError(f"OpenCV could not open video: {path}")
try:
yield cap
finally:
cap.release()
# ---------------------------------------------------------------------------
# Video metadata
# ---------------------------------------------------------------------------
def get_video_info(path: Path) -> dict[str, float | int]:
"""
Return basic metadata without keeping the file open.
Returns:
dict with keys: fps, frame_count, duration_s, width, height
"""
with open_video(path) as cap:
fps = cap.get(cv2.CAP_PROP_FPS)
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
duration_s = frame_count / fps if fps > 0 else 0.0
return {
"fps": fps,
"frame_count": frame_count,
"duration_s": duration_s,
"width": width,
"height": height,
}
# ---------------------------------------------------------------------------
# Single frame extraction
# ---------------------------------------------------------------------------
def grab_frame_at(cap: cv2.VideoCapture, timestamp_s: float) -> np.ndarray | None:
"""
Seek to *timestamp_s* and return the BGR frame at that position.
Uses CAP_PROP_POS_MSEC for sub-frame accuracy.
Args:
cap: An already-open VideoCapture.
timestamp_s: Target time in seconds.
Returns:
BGR ndarray (H, W, 3) or None if seeking / decoding failed.
"""
cap.set(cv2.CAP_PROP_POS_MSEC, timestamp_s * 1000.0)
ok, frame = cap.read()
if not ok or frame is None:
logger.debug("grab_frame_at: failed at %.3fs", timestamp_s)
return None
return frame
def grab_frame_at_path(path: Path, timestamp_s: float) -> np.ndarray | None:
"""
One-shot convenience: open → seek → grab → release.
Prefer open_video() when grabbing multiple frames from the same file.
"""
with open_video(path) as cap:
return grab_frame_at(cap, timestamp_s)
# ---------------------------------------------------------------------------
# Middle-frame extraction (representative frame for fingerprinting)
# ---------------------------------------------------------------------------
def grab_midpoint_frame(
cap: cv2.VideoCapture,
start_s: float,
end_s: float,
) -> np.ndarray | None:
"""
Grab the frame at the exact midpoint of a [start_s, end_s] interval.
Args:
cap: Open VideoCapture for the source video.
start_s: Interval start in seconds.
end_s: Interval end in seconds.
Returns:
BGR frame or None if decoding failed.
"""
mid = start_s + (end_s - start_s) / 2.0
return grab_frame_at(cap, mid)
# ---------------------------------------------------------------------------
# Stepped-frame iterator (used by Deep Scan coarse pass)
# ---------------------------------------------------------------------------
def iter_frames_stepped(
cap: cv2.VideoCapture,
start_s: float,
end_s: float,
step_s: float,
) -> Iterator[tuple[float, np.ndarray]]:
"""
Yield (timestamp_s, frame) for every *step_s* increment in [start_s, end_s].
Frames that fail to decode are silently skipped.
Args:
cap: Open VideoCapture.
start_s: Scan window start in seconds.
end_s: Scan window end in seconds.
step_s: Step between samples in seconds.
Yields:
(timestamp_s, bgr_frame)
"""
if step_s <= 0:
raise ValueError(f"step_s must be > 0; got {step_s}")
t = start_s
while t <= end_s:
frame = grab_frame_at(cap, t)
if frame is not None:
yield t, frame
t = round(t + step_s, 6) # avoid float accumulation drift
File diff suppressed because it is too large Load Diff
+229
View File
@@ -0,0 +1,229 @@
"""
src/cv/scene_indexer.py — Source-movie scene segmentation + fingerprinting
Responsibility:
1. Run PySceneDetect on the source movie → list of raw scene boundaries
2. For each scene, extract the midpoint frame and fingerprint it
3. Optionally run Whisper dialogue on each scene (injected as dependency)
4. Persist results to .cache/ as JSON for fast re-runs
Returns: list[Scene] with luma_hist, sat_hist, phash populated.
"""
from __future__ import annotations
import json
import logging
import pickle
from pathlib import Path
from typing import Callable, Sequence
import numpy as np
from src.core.config import AppConfig
from src.core.models import Scene
from src.cv.fingerprinting import fingerprint_frame
from src.cv.frame_extractor import grab_midpoint_frame, open_video
logger = logging.getLogger(__name__)
# Type alias for an optional dialogue-injection callback
DialogueCallback = Callable[[Scene], Scene]
# ---------------------------------------------------------------------------
# Cache helpers
# ---------------------------------------------------------------------------
def _cache_path(cfg: AppConfig) -> Path:
p = cfg.paths.cache_dir / "scene_index.json"
p.parent.mkdir(parents=True, exist_ok=True)
return p
def _scene_to_dict(s: Scene) -> dict:
return {
"scene_id": s.scene_id,
"source_path": str(s.source_path),
"start_s": s.start_s,
"end_s": s.end_s,
"start_frame": s.start_frame,
"end_frame": s.end_frame,
# histograms serialised as hex so JSON can hold them
"luma_hist": s.luma_hist.hex() if s.luma_hist else None,
"sat_hist": s.sat_hist.hex() if s.sat_hist else None,
"phash": s.phash,
}
def _scene_from_dict(d: dict) -> Scene:
return Scene(
scene_id=d["scene_id"],
source_path=Path(d["source_path"]),
start_s=d["start_s"],
end_s=d["end_s"],
start_frame=d["start_frame"],
end_frame=d["end_frame"],
luma_hist=bytes.fromhex(d["luma_hist"]) if d.get("luma_hist") else None,
sat_hist= bytes.fromhex(d["sat_hist"]) if d.get("sat_hist") else None,
phash=d.get("phash"),
)
def _save_cache(scenes: list[Scene], cfg: AppConfig) -> None:
data = [_scene_to_dict(s) for s in scenes]
_cache_path(cfg).write_text(json.dumps(data, indent=2), encoding="utf-8")
logger.info("Scene index cached → %s (%d scenes)", _cache_path(cfg), len(scenes))
def _load_cache(cfg: AppConfig) -> list[Scene] | None:
p = _cache_path(cfg)
if not p.exists():
return None
try:
data = json.loads(p.read_text(encoding="utf-8"))
scenes = [_scene_from_dict(d) for d in data]
logger.info("Loaded %d scenes from cache (%s)", len(scenes), p)
return scenes
except Exception as exc:
logger.warning("Cache corrupt, re-indexing: %s", exc)
return None
# ---------------------------------------------------------------------------
# PySceneDetect integration
# ---------------------------------------------------------------------------
def _detect_scenes_pyscenedetect(cfg: AppConfig) -> list[tuple[float, float, int, int]]:
"""
Run PySceneDetect ContentDetector on the source movie.
Returns:
List of (start_s, end_s, start_frame, end_frame) tuples.
"""
try:
from scenedetect import open_video as sd_open_video, SceneManager
from scenedetect.detectors import ContentDetector
except ImportError:
raise ImportError(
"scenedetect is not installed. Run: pip install scenedetect[opencv]"
)
video = sd_open_video(str(cfg.paths.source_movie))
manager = SceneManager()
manager.add_detector(
ContentDetector(
threshold=cfg.scene_detection.content_threshold,
min_scene_len=int(
cfg.scene_detection.min_scene_duration_s
* video.frame_rate
),
)
)
logger.info("Detecting scenes in %s", cfg.paths.source_movie.name)
manager.detect_scenes(video=video, show_progress=True)
raw = manager.get_scene_list()
result: list[tuple[float, float, int, int]] = []
for start_tc, end_tc in raw:
result.append((
start_tc.get_seconds(),
end_tc.get_seconds(),
start_tc.get_frames(),
end_tc.get_frames(),
))
logger.info("PySceneDetect found %d scenes.", len(result))
return result
# ---------------------------------------------------------------------------
# Fingerprint enrichment
# ---------------------------------------------------------------------------
def _fingerprint_scenes(
raw_scenes: list[tuple[float, float, int, int]],
cfg: AppConfig,
) -> list[Scene]:
"""
For each raw scene boundary, extract the midpoint frame and fingerprint it.
"""
scenes: list[Scene] = []
vc_cfg = cfg.cv.vibe_check
logger.info("Fingerprinting %d scenes …", len(raw_scenes))
with open_video(cfg.paths.source_movie) as cap:
for idx, (start_s, end_s, start_frame, end_frame) in enumerate(raw_scenes):
frame = grab_midpoint_frame(cap, start_s, end_s)
if frame is None:
logger.warning("Scene %d: midpoint frame decode failed, skipping fingerprint.", idx)
scenes.append(Scene(
scene_id=idx,
source_path=cfg.paths.source_movie,
start_s=start_s, end_s=end_s,
start_frame=start_frame, end_frame=end_frame,
))
continue
luma_bytes, sat_bytes, phash_hex = fingerprint_frame(frame, vc_cfg)
scenes.append(Scene(
scene_id=idx,
source_path=cfg.paths.source_movie,
start_s=start_s, end_s=end_s,
start_frame=start_frame, end_frame=end_frame,
luma_hist=luma_bytes,
sat_hist=sat_bytes,
phash=phash_hex,
))
if (idx + 1) % 50 == 0:
logger.info("%d / %d scenes fingerprinted", idx + 1, len(raw_scenes))
return scenes
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def build_scene_index(
cfg: AppConfig,
force_reindex: bool = False,
dialogue_callback: DialogueCallback | None = None,
) -> list[Scene]:
"""
Build (or load from cache) the full scene index for the source movie.
Steps:
1. Load from .cache/scene_index.json if available and force_reindex=False.
2. Otherwise: detect scenes via PySceneDetect → fingerprint → cache.
3. Optionally enrich each scene with dialogue via dialogue_callback.
Args:
cfg: Application configuration.
force_reindex: Ignore cache and re-run detection + fingerprinting.
dialogue_callback: Optional function Scene → Scene that adds dialogue.
Injected here so this module stays audio-free.
Returns:
List of Scene objects with fingerprints populated.
"""
if not force_reindex:
cached = _load_cache(cfg)
if cached is not None:
if dialogue_callback:
cached = [dialogue_callback(s) for s in cached]
return cached
raw = _detect_scenes_pyscenedetect(cfg)
scenes = _fingerprint_scenes(raw, cfg)
_save_cache(scenes, cfg)
if dialogue_callback:
scenes = [dialogue_callback(s) for s in scenes]
return scenes
+190
View File
@@ -0,0 +1,190 @@
"""
src/cv/vibe_check.py — Phase 1: Scene-level histogram / pHash filter
Responsibility:
Given ONE TrailerBeat (with pre-computed fingerprints) and a list of
source Scenes (also fingerprinted), return the Top-K candidates ranked
by a combined histogram + pHash score.
This module contains ZERO file I/O and ZERO frame decoding — those live
in the pipeline layer. Input = model objects, output = sorted VibeHit list.
"""
from __future__ import annotations
import logging
from dataclasses import replace
from typing import Sequence
import cv2
import numpy as np
from src.core.models import Scene, TrailerBeat, VibeHit
from src.cv.fingerprinting import bytes_to_hist, phash_distance
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Scoring
# ---------------------------------------------------------------------------
# Weight applied to histogram score vs pHash score in the combined metric.
# pHash gets less weight because it's sensitive to text overlays on source.
_HIST_WEIGHT = 0.70
_PHASH_WEIGHT = 0.30
_PHASH_MAX_BITS = 64 # maximum possible Hamming distance
def _hist_combined_score(
beat: TrailerBeat,
scene: Scene,
hist_method: int,
) -> float:
"""
Average CORREL score of luma + saturation histograms.
Returns a value in [-1, 1] (CORREL) or [0, 1] depending on method.
Higher is always more similar (we invert BHATTACHARYYA if needed).
"""
if beat.luma_hist is None or scene.luma_hist is None:
return 0.0
if beat.sat_hist is None or scene.sat_hist is None:
return 0.0
luma_score = cv2.compareHist(
bytes_to_hist(beat.luma_hist),
bytes_to_hist(scene.luma_hist),
hist_method,
)
sat_score = cv2.compareHist(
bytes_to_hist(beat.sat_hist),
bytes_to_hist(scene.sat_hist),
hist_method,
)
# Normalise BHATTACHARYYA to [0, 1] similarity (invert distance)
if hist_method == cv2.HISTCMP_BHATTACHARYYA:
luma_score = 1.0 - float(luma_score)
sat_score = 1.0 - float(sat_score)
return float((luma_score + sat_score) / 2.0)
def _phash_score(beat: TrailerBeat, scene: Scene) -> float:
"""
Convert Hamming distance to a [0, 1] similarity score.
0 Hamming distance → 1.0 (identical)
64 Hamming distance → 0.0 (completely different)
"""
if beat.phash is None or scene.phash is None:
return 0.0
dist = phash_distance(beat.phash, scene.phash)
return 1.0 - (dist / _PHASH_MAX_BITS)
def _combined_score(
beat: TrailerBeat,
scene: Scene,
hist_method: int,
) -> float:
"""Weighted aggregate of histogram + pHash similarity."""
hist = _hist_combined_score(beat, scene, hist_method)
phash = _phash_score(beat, scene)
return _HIST_WEIGHT * hist + _PHASH_WEIGHT * phash
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def run_vibe_check(
beat: TrailerBeat,
scenes: Sequence[Scene],
top_k: int,
hist_method: int,
phash_max_distance: int,
) -> list[VibeHit]:
"""
Phase 1: Score all source scenes against one trailer beat and return
the top-K candidates for Deep Scan.
Args:
beat: The trailer beat to match (must have fingerprints).
scenes: All detected scenes from the source movie.
top_k: Maximum number of candidates to return.
hist_method: cv2.HISTCMP_* constant (e.g. 0 = CORREL).
phash_max_distance: Scenes with pHash Hamming distance > this value
are excluded before ranking (hard filter).
Returns:
List of VibeHit, sorted by combined_score descending, length ≤ top_k.
Empty list if beat has no fingerprints or no scenes pass the filter.
"""
if beat.luma_hist is None and beat.phash is None:
logger.warning(
"Beat %d has no fingerprints — skipping Vibe Check.", beat.beat_id
)
return []
candidates: list[VibeHit] = []
for scene in scenes:
# Hard pHash filter: skip scenes that are too visually distant
if beat.phash and scene.phash:
dist = phash_distance(beat.phash, scene.phash)
if dist > phash_max_distance:
continue # fast rejection — avoids full histogram compare
hist = _hist_combined_score(beat, scene, hist_method)
phash = _phash_score(beat, scene)
combined = _HIST_WEIGHT * hist + _PHASH_WEIGHT * phash
candidates.append(VibeHit(
beat_id=beat.beat_id,
scene_id=scene.scene_id,
hist_score=round(hist, 4),
phash_distance=(
phash_distance(beat.phash, scene.phash)
if beat.phash and scene.phash
else _PHASH_MAX_BITS
),
combined_score=round(combined, 4),
))
# Sort by combined score, descending; return top-K
candidates.sort(key=lambda h: h.combined_score, reverse=True)
top = candidates[:top_k]
logger.info(
"Vibe Check beat=%d: %d scenes scored, %d candidates forwarded to Deep Scan. "
"Best score: %.3f (scene %s)",
beat.beat_id,
len(candidates),
len(top),
top[0].combined_score if top else 0.0,
top[0].scene_id if top else "",
)
return top
def batch_vibe_check(
beats: Sequence[TrailerBeat],
scenes: Sequence[Scene],
top_k: int,
hist_method: int,
phash_max_distance: int,
) -> dict[int, list[VibeHit]]:
"""
Run Vibe Check for every beat and return a mapping beat_id → [VibeHit].
Convenience wrapper for the pipeline layer.
"""
return {
beat.beat_id: run_vibe_check(
beat, scenes, top_k, hist_method, phash_max_distance
)
for beat in beats
}
+1
View File
@@ -0,0 +1 @@
# src.export package — FCPXML / EDL export
+114
View File
@@ -0,0 +1,114 @@
"""
src/export/edl_writer.py — EditTimeline → CMX 3600 EDL
Generates a standard CMX 3600 Edit Decision List compatible with
Avid, DaVinci Resolve, Premiere Pro, and most NLEs.
CMX 3600 format reference:
https://en.wikipedia.org/wiki/Edit_decision_list#CMX_3600
"""
from __future__ import annotations
import logging
from pathlib import Path
from src.core.config import AppConfig
from src.core.models import EditClip, EditTimeline
from src.export.timecode import seconds_to_smpte
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# EDL line builders
# ---------------------------------------------------------------------------
def _edl_header(title: str) -> str:
return f"TITLE: {title}\nFCM: NON-DROP FRAME\n"
def _edl_event(
event_num: int,
clip: EditClip,
fps: float,
) -> str:
"""
Build one CMX 3600 event block for a single clip.
Format:
NNN AX V C <SRC_IN> <SRC_OUT> <REC_IN> <REC_OUT>
* FROM CLIP NAME: ...
* COMMENT: ...
"""
src_in = seconds_to_smpte(clip.match.in_point_s, fps)
source_duration_s = clip.source_timeline_duration_s
src_out = seconds_to_smpte(clip.match.in_point_s + source_duration_s, fps)
rec_in = seconds_to_smpte(clip.timeline_start_s, fps)
rec_out = seconds_to_smpte(clip.timeline_start_s + source_duration_s, fps)
event_line = f"{event_num:03d} AX V C {src_in} {src_out} {rec_in} {rec_out}"
name_line = f"* FROM CLIP NAME: {clip.match.source_path.name}"
comment_line = (
f"* BEAT {clip.beat.beat_id:03d} | {clip.beat.beat_type.name} | "
f"score={clip.match.match_score:.3f}"
)
return "\n".join([event_line, name_line, comment_line, ""])
def _edl_black_tail_event(event_num: int, clip: EditClip, fps: float) -> str:
rec_in = seconds_to_smpte(clip.timeline_start_s + clip.source_timeline_duration_s, fps)
rec_out = seconds_to_smpte(clip.timeline_end_s, fps)
event_line = f"{event_num:03d} BL V C 00:00:00:00 00:00:00:00 {rec_in} {rec_out}"
comment_line = (
f"* BEAT {clip.beat.beat_id:03d} TRAILER-ONLY TAIL | "
"add fade/dissolve to black"
)
return "\n".join([event_line, "* FROM CLIP NAME: BLACK", comment_line, ""])
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def write_edl(
timeline: EditTimeline,
cfg: AppConfig,
output_path: Path | None = None,
) -> Path:
"""
Write the EditTimeline as a CMX 3600 EDL file.
Args:
timeline: EditTimeline from build_timeline().
cfg: Application configuration.
output_path: Override destination. Defaults to
<output_dir>/<project_name>.edl.
Returns:
Path to the written .edl file.
"""
if output_path is None:
output_path = cfg.paths.output_dir / f"{timeline.title}.edl"
output_path.parent.mkdir(parents=True, exist_ok=True)
fps = timeline.frame_rate
lines = [_edl_header(timeline.title), "\n"]
event_num = 1
for clip in sorted(timeline.clips, key=lambda c: c.clip_index):
lines.append(_edl_event(event_num, clip, fps))
event_num += 1
if clip.trailer_tail_s > 0:
lines.append("\n")
lines.append(_edl_black_tail_event(event_num, clip, fps))
event_num += 1
lines.append("\n")
edl_text = "\n".join(lines)
output_path.write_text(edl_text, encoding="utf-8")
logger.info("EDL written → %s (%d events)", output_path, timeline.clip_count)
return output_path
+222
View File
@@ -0,0 +1,222 @@
"""
src/export/fcpxml_writer.py — EditTimeline → Final Cut Pro XML (FCPXML 1.10)
Generates a standards-compliant FCPXML file that can be imported directly
into Final Cut Pro X, DaVinci Resolve, or Premiere Pro (via FCPXML plugin).
Spec reference: https://developer.apple.com/documentation/professional_video_applications/fcpxml_reference
"""
from __future__ import annotations
import logging
from pathlib import Path
from urllib.parse import quote
from xml.etree import ElementTree as ET
from xml.etree.ElementTree import Element, SubElement
from src.core.config import AppConfig
from src.core.models import EditClip, EditTimeline
from src.export.timecode import (
fcpxml_format_name,
fcpxml_frame_duration,
seconds_to_fcpxml,
)
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Asset registry — one <asset> per unique source file
# ---------------------------------------------------------------------------
class _AssetRegistry:
def __init__(self) -> None:
self._assets: dict[Path, str] = {} # path → asset id
self._counter = 2 # r1 reserved for format
def get_or_create(self, path: Path) -> str:
if path not in self._assets:
rid = f"r{self._counter}"
self._assets[path] = rid
self._counter += 1
return self._assets[path]
@property
def items(self) -> dict[Path, str]:
return dict(self._assets)
# ---------------------------------------------------------------------------
# Builder
# ---------------------------------------------------------------------------
def _path_to_url(path: Path) -> str:
"""Convert an absolute Path to a file:// URL as required by FCPXML."""
posix = path.as_posix()
if not posix.startswith("/"):
# Windows drive letter: C:/foo → /C:/foo
posix = "/" + posix
return "file://" + quote(posix, safe="/:@")
def build_fcpxml(
timeline: EditTimeline,
cfg: AppConfig,
source_duration_s: float = 7200.0, # 2-hour fallback if not probed
) -> ET.ElementTree:
"""
Build a complete FCPXML ElementTree from an EditTimeline.
Args:
timeline: Ordered sequence of EditClips.
cfg: Application configuration.
source_duration_s: Duration of the source movie asset (used for
<asset> duration attribute). Will be probed
automatically when possible.
Returns:
xml.etree.ElementTree.ElementTree — call .write() to serialise.
"""
fps = timeline.frame_rate
# ---- root ---------------------------------------------------------------
root = Element("fcpxml", version=cfg.export.fcpxml_version)
root.set("xmlns", "http://www.apple.com/dt/FCPXML/1_10")
# ---- resources ----------------------------------------------------------
resources = SubElement(root, "resources")
format_id = "r1"
format_name = fcpxml_format_name(fps)
fmt = SubElement(resources, "format",
id=format_id,
name=format_name,
frameDuration=fcpxml_frame_duration(fps),
width="1920",
height="1080",
colorSpace="1-1-1 (Rec. 709)",
)
registry = _AssetRegistry()
# Pre-register all unique source paths so <asset> elements come before
# the <library> block (required by FCPXML spec).
for clip in timeline.clips:
registry.get_or_create(clip.match.source_path)
# Probe actual source duration when possible
_durations: dict[Path, float] = {}
for path in registry.items:
try:
from src.cv.frame_extractor import get_video_info
info = get_video_info(path)
_durations[path] = float(info["duration_s"])
except Exception:
_durations[path] = source_duration_s
for path, rid in registry.items.items():
dur_s = _durations.get(path, source_duration_s)
SubElement(resources, "asset",
id=rid,
name=path.stem,
src=_path_to_url(path),
start="0s",
duration=seconds_to_fcpxml(dur_s, fps),
hasVideo="1",
hasAudio="1",
format=format_id,
)
# ---- library / event / project ------------------------------------------
library = SubElement(root, "library")
event = SubElement(library, "event", name=timeline.title)
project = SubElement(event, "project", name=timeline.title)
sequence = SubElement(project, "sequence",
duration=seconds_to_fcpxml(timeline.total_duration_s, fps),
format=format_id,
tcStart="0s",
tcFormat="NDF",
audioLayout="stereo",
audioRate="48k",
)
spine = SubElement(sequence, "spine")
# ---- clips --------------------------------------------------------------
for clip in sorted(timeline.clips, key=lambda c: c.clip_index):
asset_id = registry.get_or_create(clip.match.source_path)
source_duration_s = clip.source_timeline_duration_s
clip_elem = SubElement(spine, "clip",
name=f"Beat_{clip.beat.beat_id:03d}_{clip.beat.beat_type.name}",
ref=asset_id,
# offset = position on the timeline
offset=seconds_to_fcpxml(clip.timeline_start_s, fps),
# duration = matched source part only; trailer-only tails become gaps.
duration=seconds_to_fcpxml(source_duration_s, fps),
# start = in-point inside the source asset
start=seconds_to_fcpxml(clip.match.in_point_s, fps),
)
# Inline audio role
SubElement(clip_elem, "audio",
role="dialogue",
srcCh="1, 2",
outCh="L, R",
)
if clip.trailer_tail_s > 0:
gap = SubElement(spine, "gap",
name=f"Beat_{clip.beat.beat_id:03d}_TRAILER_TAIL_BLACK_FADE",
offset=seconds_to_fcpxml(clip.timeline_start_s + source_duration_s, fps),
duration=seconds_to_fcpxml(clip.trailer_tail_s, fps),
start="0s",
)
SubElement(gap, "marker",
start="0s",
value="Trailer-only tail: add fade/dissolve to black here",
completed="0",
)
return ET.ElementTree(root)
# ---------------------------------------------------------------------------
# Writer
# ---------------------------------------------------------------------------
def write_fcpxml(
timeline: EditTimeline,
cfg: AppConfig,
output_path: Path | None = None,
) -> Path:
"""
Serialise the EditTimeline to a .fcpxml file.
Args:
timeline: EditTimeline from build_timeline().
cfg: Application configuration.
output_path: Override destination. Defaults to
<output_dir>/<project_name>.fcpxml.
Returns:
Path to the written .fcpxml file.
"""
if output_path is None:
output_path = cfg.paths.output_dir / f"{timeline.title}.fcpxml"
output_path.parent.mkdir(parents=True, exist_ok=True)
tree = build_fcpxml(timeline, cfg)
# Add XML declaration + DOCTYPE manually (ElementTree doesn't support DOCTYPE)
xml_bytes = ET.tostring(tree.getroot(), encoding="unicode", xml_declaration=False)
header = (
'<?xml version="1.0" encoding="UTF-8"?>\n'
'<!DOCTYPE fcpxml>\n'
)
output_path.write_text(header + xml_bytes, encoding="utf-8")
logger.info("FCPXML written → %s (%d clips)", output_path, timeline.clip_count)
return output_path
+146
View File
@@ -0,0 +1,146 @@
"""
src/export/timecode.py — Timecode / rational-time conversion helpers
FCPXML uses rational fractions ("1001/24000s") for all time values.
EDL uses SMPTE timecode strings ("HH:MM:SS:FF").
All conversion functions are pure — no I/O, no state.
"""
from __future__ import annotations
import math
from fractions import Fraction
# ---------------------------------------------------------------------------
# Common frame-rate denominators
# ---------------------------------------------------------------------------
_FPS_RATIONAL: dict[float, tuple[int, int]] = {
23.976: (24000, 1001),
24.0: (24, 1),
25.0: (25, 1),
29.97: (30000, 1001),
30.0: (30, 1),
50.0: (50, 1),
59.94: (60000, 1001),
60.0: (60, 1),
}
_TOLERANCE = 0.01 # fps match tolerance
def _fps_to_rational(fps: float) -> tuple[int, int]:
"""Return (numerator, denominator) for common fps values."""
for ref_fps, rational in _FPS_RATIONAL.items():
if abs(fps - ref_fps) < _TOLERANCE:
return rational
# Fallback: convert float to exact fraction
f = Fraction(fps).limit_denominator(1001)
return f.numerator, f.denominator
# ---------------------------------------------------------------------------
# Seconds → FCPXML rational string
# ---------------------------------------------------------------------------
def seconds_to_fcpxml(seconds: float, fps: float) -> str:
"""
Convert *seconds* to FCPXML rational time string.
FCPXML requires exact rational arithmetic to avoid drift.
Example: 10.0s @23.976fps → "240240/24000s"
Args:
seconds: Time in seconds (float).
fps: Project frame rate.
Returns:
FCPXML time string, e.g. "240240/24000s".
"""
if seconds == 0.0:
return "0s"
num, den = _fps_to_rational(fps) # frames per second = num/den
# seconds × (num/den) = frames (float); round to nearest frame
frames = round(seconds * num / den)
# frames ÷ (num/den) = frames × den/num → rational seconds
total_num = frames * den
total_den = num
# Reduce fraction
g = math.gcd(total_num, total_den)
return f"{total_num // g}/{total_den // g}s"
def seconds_to_frame_count(seconds: float, fps: float) -> int:
"""Convert seconds to integer frame count."""
return round(seconds * fps)
# ---------------------------------------------------------------------------
# Seconds → SMPTE timecode (for EDL)
# ---------------------------------------------------------------------------
def seconds_to_smpte(seconds: float, fps: float, drop_frame: bool = False) -> str:
"""
Convert *seconds* to SMPTE timecode string "HH:MM:SS:FF".
Drop-frame timecode (;) is not implemented — always returns NDF (:).
Args:
seconds: Time in float seconds.
fps: Frame rate (23.976, 24, 25, etc.).
drop_frame: Ignored; placeholder for future DF support.
Returns:
"HH:MM:SS:FF" string.
"""
total_frames = seconds_to_frame_count(seconds, fps)
nominal_fps = round(fps) # e.g. 23.976 → 24
ff = total_frames % nominal_fps
total_s = total_frames // nominal_fps
ss = total_s % 60
total_m = total_s // 60
mm = total_m % 60
hh = total_m // 60
return f"{hh:02d}:{mm:02d}:{ss:02d}:{ff:02d}"
# ---------------------------------------------------------------------------
# FCPXML format ID helpers
# ---------------------------------------------------------------------------
def fcpxml_format_name(fps: float, width: int = 1920, height: int = 1080) -> str:
"""
Return an FCPXML format name string for a given frame rate and resolution.
Example: fps=23.976, 1080p → "FFVideoFormat1080p2398"
"""
res = f"{height}p"
fps_tag = {
23.976: "2398",
24.0: "24",
25.0: "25",
29.97: "2997",
30.0: "30",
50.0: "50",
59.94: "5994",
60.0: "60",
}.get(fps, str(int(fps * 100)))
return f"FFVideoFormat{res}{fps_tag}"
def fcpxml_frame_duration(fps: float) -> str:
"""
Return FCPXML frameDuration attribute for a given fps.
frame duration = 1 frame = 1/fps seconds = den/num seconds
Example: 23.976fps → num=24000, den=1001 → frame duration = 1001/24000s
"""
num, den = _fps_to_rational(fps) # fps = num/den (e.g. 24000/1001)
# frame duration = den/num seconds
g = math.gcd(den, num)
return f"{den // g}/{num // g}s"
+1
View File
@@ -0,0 +1 @@
# src.llm package — Thematic segmentation / dramaturgy (NO vision matching)
+202
View File
@@ -0,0 +1,202 @@
"""
src/llm/dramaturg.py — LLM-based thematic beat classification (OpenRouter)
Responsibility:
- Receive a list of TrailerBeat objects (with dialogue lines attached)
- Send a single structured prompt to the LLM
- Parse the JSON response to assign BeatType to each beat
IMPORTANT: This module does ZERO visual analysis.
It classifies narrative dramaturgy from dialogue text only.
Visual matching is handled exclusively by the CV engine.
"""
from __future__ import annotations
import json
import logging
from dataclasses import replace
from typing import Sequence
from src.core.config import AppConfig
from src.core.models import BeatType, TrailerBeat
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Prompt builder
# ---------------------------------------------------------------------------
_SYSTEM_PROMPT = """You are a film trailer editor and narrative analyst.
Your task is to classify each beat of a trailer into one of these dramatic roles:
HOOK - Opening attention grabber (first impression, shocking image, logo)
SETUP - World/character introduction
CONFLICT - Inciting incident, rising tension, threat revealed
CLIMAX - Peak action/emotion, highest stakes
RESOLUTION - Cool-down, tagline, final title card
You will receive a JSON array of beats with their index and dialogue text.
Respond ONLY with a valid JSON array, one object per beat, with keys:
"beat_id" (int) and "beat_type" (one of the strings above).
Do NOT include any explanation or markdown fences."""
_USER_TEMPLATE = """Classify the following {n} trailer beats:
{beats_json}"""
def _build_beats_payload(beats: Sequence[TrailerBeat]) -> str:
payload = []
for b in beats:
dialogue_text = " / ".join(line.text for line in b.dialogue) or "(no dialogue)"
payload.append({
"beat_id": b.beat_id,
"duration": round(b.duration_s, 2),
"dialogue": dialogue_text,
})
return json.dumps(payload, ensure_ascii=False, indent=2)
# ---------------------------------------------------------------------------
# OpenRouter / OpenAI-compatible HTTP client
# ---------------------------------------------------------------------------
def _call_llm(prompt_user: str, cfg: AppConfig) -> str:
"""
Send a chat completion request to the configured LLM provider.
Supports: openrouter, openai, ollama (all use the OpenAI-compatible API).
Returns:
The raw text content of the first assistant message.
Raises:
RuntimeError: On HTTP errors or missing API key.
"""
import urllib.request
import urllib.error
llm = cfg.llm
if llm.provider in ("openrouter", "openai") and not llm.api_key:
raise RuntimeError(
f"LLM provider is '{llm.provider}' but no API key found. "
"Set OPENROUTER_API_KEY (or OPENAI_API_KEY) in your .env file."
)
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {llm.api_key}",
}
if llm.provider == "openrouter":
headers["HTTP-Referer"] = "https://github.com/ai-trailer-2026"
headers["X-Title"] = "AI Trailer Generator v2"
body = json.dumps({
"model": llm.model,
"messages": [
{"role": "system", "content": _SYSTEM_PROMPT},
{"role": "user", "content": prompt_user},
],
"temperature": llm.temperature,
"max_tokens": llm.max_tokens,
}).encode("utf-8")
url = f"{llm.base_url.rstrip('/')}/chat/completions"
req = urllib.request.Request(url, data=body, headers=headers, method="POST")
try:
with urllib.request.urlopen(req, timeout=llm.timeout_seconds) as resp:
data = json.loads(resp.read().decode("utf-8"))
return data["choices"][0]["message"]["content"]
except urllib.error.HTTPError as exc:
body_text = exc.read().decode(errors="replace")
raise RuntimeError(
f"LLM HTTP {exc.code} from {url}:\n{body_text}"
) from exc
# ---------------------------------------------------------------------------
# Response parser
# ---------------------------------------------------------------------------
_BEAT_TYPE_MAP: dict[str, BeatType] = {bt.name: bt for bt in BeatType}
def _parse_response(raw: str, beats: Sequence[TrailerBeat]) -> dict[int, BeatType]:
"""
Parse the LLM JSON array response into a beat_id → BeatType mapping.
Falls back to BeatType.UNKNOWN for any beat that cannot be parsed.
"""
# Strip accidental markdown fences
clean = raw.strip()
if clean.startswith("```"):
clean = "\n".join(clean.split("\n")[1:])
if clean.endswith("```"):
clean = clean[: clean.rfind("```")]
clean = clean.strip()
result: dict[int, BeatType] = {b.beat_id: BeatType.UNKNOWN for b in beats}
try:
parsed = json.loads(clean)
if not isinstance(parsed, list):
raise ValueError("Expected JSON array at top level.")
for item in parsed:
bid = int(item["beat_id"])
name = str(item.get("beat_type", "UNKNOWN")).upper()
result[bid] = _BEAT_TYPE_MAP.get(name, BeatType.UNKNOWN)
except (json.JSONDecodeError, KeyError, ValueError) as exc:
logger.warning("LLM response parse error (%s) — all beats → UNKNOWN.\nRaw: %s", exc, raw[:300])
return result
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def classify_beats(
beats: Sequence[TrailerBeat],
cfg: AppConfig,
) -> list[TrailerBeat]:
"""
Use the LLM to assign a BeatType to each TrailerBeat.
Args:
beats: TrailerBeat list (dialogue should be populated for best results).
cfg: Application configuration (llm section + api key).
Returns:
New list of TrailerBeat objects with beat_type set.
On LLM error, all beats keep BeatType.UNKNOWN (no exception raised).
"""
if not beats:
return list(beats)
logger.info(
"Classifying %d beats via %s / %s",
len(beats), cfg.llm.provider, cfg.llm.model,
)
payload = _build_beats_payload(beats)
prompt = _USER_TEMPLATE.format(n=len(beats), beats_json=payload)
try:
raw_response = _call_llm(prompt, cfg)
except Exception as exc:
logger.error("LLM classification failed: %s — keeping BeatType.UNKNOWN.", exc)
return list(beats)
type_map = _parse_response(raw_response, beats)
enriched = [replace(b, beat_type=type_map.get(b.beat_id, BeatType.UNKNOWN)) for b in beats]
classified = sum(1 for b in enriched if b.beat_type != BeatType.UNKNOWN)
logger.info("Beat classification done: %d / %d classified.", classified, len(beats))
return enriched
+316
View File
@@ -0,0 +1,316 @@
"""
Cached vision descriptions for ambiguous trailer/source matching.
This module is deliberately conservative: it never writes a final match and it
does not replace CV. It describes a small number of 3-frame beat/scene samples,
caches those descriptions, and returns extra source in-point seeds for the CV
scanner to verify.
"""
from __future__ import annotations
import base64
import json
import logging
import re
import urllib.error
import urllib.request
from dataclasses import asdict
from pathlib import Path
from typing import Sequence
import cv2
from src.core.config import AppConfig
from src.core.models import Scene, TrailerBeat
logger = logging.getLogger(__name__)
_CACHE_VERSION = 1
_STOPWORDS = {
"the", "and", "with", "from", "that", "this", "there", "their", "into",
"scene", "frame", "image", "shot", "video", "visible", "looks", "appears",
"eine", "einer", "einem", "einen", "und", "oder", "mit", "der", "die", "das",
}
_SYSTEM_PROMPT = """You describe film shots for automatic matching.
Return only compact JSON with these keys:
subject, setting, composition, action_phase, distinctive_objects, lighting_color, negatives.
Focus on stable visual facts and spatial layout. Ignore timecode overlays, subtitles, logos, compression, aspect ratio, and color grading differences."""
def _cache_path(cfg: AppConfig) -> Path:
return cfg.paths.cache_dir / "vision_descriptions.json"
def _load_cache(cfg: AppConfig) -> dict:
path = _cache_path(cfg)
if not path.exists():
return {"version": _CACHE_VERSION, "items": {}}
try:
data = json.loads(path.read_text(encoding="utf-8"))
except json.JSONDecodeError:
logger.warning("Vision cache is unreadable; rebuilding: %s", path)
return {"version": _CACHE_VERSION, "items": {}}
if data.get("version") != _CACHE_VERSION or not isinstance(data.get("items"), dict):
return {"version": _CACHE_VERSION, "items": {}}
return data
def _save_cache(cfg: AppConfig, cache: dict) -> None:
path = _cache_path(cfg)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(cache, indent=2, ensure_ascii=False), encoding="utf-8")
def _sample_times(start_s: float, end_s: float) -> list[float]:
duration_s = max(0.04, end_s - start_s)
return [
start_s + min(duration_s * 0.12, max(0.0, duration_s - 0.04)),
start_s + duration_s * 0.50,
start_s + max(0.0, duration_s - min(duration_s * 0.12, 0.20)),
]
def _frame_data_url(video_path: Path, t_s: float) -> str | None:
cap = cv2.VideoCapture(str(video_path))
try:
if not cap.isOpened():
return None
cap.set(cv2.CAP_PROP_POS_MSEC, max(0.0, t_s) * 1000.0)
ok, frame = cap.read()
if not ok or frame is None:
return None
h, w = frame.shape[:2]
if w > 640:
frame = cv2.resize(frame, (640, int(h * (640 / w))), interpolation=cv2.INTER_AREA)
ok, encoded = cv2.imencode(".jpg", frame, [int(cv2.IMWRITE_JPEG_QUALITY), 72])
if not ok:
return None
payload = base64.b64encode(encoded.tobytes()).decode("ascii")
return f"data:image/jpeg;base64,{payload}"
finally:
cap.release()
def _call_vision_model(label: str, image_urls: list[str], cfg: AppConfig) -> str:
vision = cfg.vision
if vision.provider in ("openai", "openrouter") and not vision.api_key:
raise RuntimeError(
"Vision is enabled but no API key is available. Set VISION_API_KEY, "
"OPENROUTER_API_KEY, OPENAI_API_KEY, or LLM_API_KEY."
)
content: list[dict] = [{
"type": "text",
"text": (
f"Describe this 3-frame sample for matching. Label: {label}. "
"The frames are start, middle, and end of the same beat/scene."
),
}]
content.extend({
"type": "image_url",
"image_url": {"url": url, "detail": "low"},
} for url in image_urls)
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {vision.api_key}",
}
if vision.provider == "openrouter":
headers["HTTP-Referer"] = "https://github.com/ai-trailer-2026"
headers["X-Title"] = "AI Trailer Generator v2"
body = json.dumps({
"model": vision.model,
"messages": [
{"role": "system", "content": _SYSTEM_PROMPT},
{"role": "user", "content": content},
],
"temperature": vision.temperature,
"max_tokens": vision.max_tokens,
}).encode("utf-8")
url = f"{vision.base_url.rstrip('/')}/chat/completions"
req = urllib.request.Request(url, data=body, headers=headers, method="POST")
try:
with urllib.request.urlopen(req, timeout=vision.timeout_seconds) as resp:
data = json.loads(resp.read().decode("utf-8"))
return str(data["choices"][0]["message"]["content"]).strip()
except urllib.error.HTTPError as exc:
body_text = exc.read().decode(errors="replace")
raise RuntimeError(f"Vision HTTP {exc.code} from {url}:\n{body_text}") from exc
def _description_key(kind: str, item_id: int, start_s: float, end_s: float, cfg: AppConfig) -> str:
path = cfg.paths.reference_trailer if kind == "beat" else cfg.paths.source_movie
try:
stamp = int(path.stat().st_mtime)
except OSError:
stamp = 0
return (
f"{kind}:{item_id}:"
f"{start_s:.3f}:{end_s:.3f}:"
f"{cfg.vision.provider}:{cfg.vision.model}:{stamp}"
)
def _describe_sample(
*,
kind: str,
item_id: int,
label: str,
video_path: Path,
start_s: float,
end_s: float,
cfg: AppConfig,
cache: dict,
budget: list[int],
) -> str | None:
key = _description_key(kind, item_id, start_s, end_s, cfg)
cached = cache["items"].get(key)
if cached:
return str(cached.get("description", ""))
if budget[0] <= 0:
return None
image_urls = [
url for url in (_frame_data_url(video_path, t) for t in _sample_times(start_s, end_s))
if url is not None
]
if len(image_urls) < 2:
return None
description = _call_vision_model(label, image_urls, cfg)
cache["items"][key] = {
"kind": kind,
"item_id": item_id,
"start_s": start_s,
"end_s": end_s,
"label": label,
"description": description,
}
budget[0] -= 1
return description
def _terms(text: str) -> set[str]:
words = re.findall(r"[a-zA-Z][a-zA-Z0-9_'-]{2,}", text.lower())
return {w for w in words if w not in _STOPWORDS}
def _text_similarity(a: str, b: str) -> float:
ta = _terms(a)
tb = _terms(b)
if not ta or not tb:
return 0.0
overlap = len(ta & tb)
return float(overlap / max(8, min(len(ta), len(tb))))
def _scene_seed_points(scene: Scene, max_points: int) -> list[float]:
if max_points <= 1 or scene.duration_s <= 0:
return [scene.start_s]
usable_end = max(scene.start_s, scene.end_s - 0.2)
if usable_end <= scene.start_s:
return [scene.start_s]
step = (usable_end - scene.start_s) / max(1, max_points - 1)
return [scene.start_s + step * idx for idx in range(max_points)]
def build_vision_seed_in_points(
beats: Sequence[TrailerBeat],
scenes: Sequence[Scene],
cfg: AppConfig,
) -> dict[int, list[tuple[float, float]]]:
"""
Return extra in-point seeds from cached vision descriptions.
The function is intentionally small-budget: for each beat it describes the
beat once and only a few top scene-level candidates. Existing descriptions
are read from cache and cost nothing.
"""
if not cfg.vision.enabled:
return {}
if not beats or not scenes:
return {}
from src.cv.vibe_check import run_vibe_check
cache = _load_cache(cfg)
budget = [cfg.vision.max_new_descriptions_per_run]
scenes_by_id = {scene.scene_id: scene for scene in scenes}
seeds: dict[int, list[tuple[float, float]]] = {}
for beat in beats:
beat_desc = _describe_sample(
kind="beat",
item_id=beat.beat_id,
label=f"trailer beat {beat.beat_id}",
video_path=beat.trailer_path,
start_s=beat.start_s,
end_s=beat.end_s,
cfg=cfg,
cache=cache,
budget=budget,
)
if not beat_desc:
continue
hits = run_vibe_check(
beat,
scenes,
top_k=cfg.vision.scene_candidate_top_k,
hist_method=cfg.cv.vibe_check.hist_compare_method,
phash_max_distance=64,
)
ranked: list[tuple[float, Scene]] = []
for hit in hits:
scene = scenes_by_id.get(hit.scene_id)
if scene is None:
continue
scene_desc = _describe_sample(
kind="scene",
item_id=scene.scene_id,
label=f"source scene {scene.scene_id}",
video_path=scene.source_path,
start_s=scene.start_s,
end_s=scene.end_s,
cfg=cfg,
cache=cache,
budget=budget,
)
if not scene_desc:
continue
score = _text_similarity(beat_desc, scene_desc)
if score >= cfg.vision.similarity_threshold:
ranked.append((score, scene))
ranked.sort(key=lambda item: item[0], reverse=True)
points: list[tuple[float, float]] = []
for score, scene in ranked[:cfg.vision.max_seed_scenes]:
logger.info(
"Beat %d: vision seed scene=%d score=%.3f",
beat.beat_id,
scene.scene_id,
score,
)
weighted_score = max(
cfg.cv.deep_scan.coarse_candidate_threshold,
min(0.98, cfg.vision.seed_score * (0.75 + min(1.0, score) * 0.25)),
)
points.extend(
(point, weighted_score)
for point in _scene_seed_points(scene, cfg.vision.seed_points_per_scene)
)
if points:
merged: dict[float, float] = {}
for point, weighted_score in points:
key = round(max(0.0, point), 3)
merged[key] = max(weighted_score, merged.get(key, 0.0))
seeds[beat.beat_id] = sorted((point, score) for point, score in merged.items())
_save_cache(cfg, cache)
return seeds
+3
View File
@@ -0,0 +1,3 @@
"""
src/pipeline/__init__.py — Orchestration layer
"""
+291
View File
@@ -0,0 +1,291 @@
"""
src/pipeline/matcher.py — Top-level CV matching orchestrator
This is the single entry point for the full 2-phase CV pipeline:
Phase 0: Load / build scene index (PySceneDetect + fingerprinting)
Phase 1: Vibe Check — histogram + pHash filter → Top-K candidates per beat
Phase 2: Deep Scan — template matching → frame-accurate MatchResult per beat
Usage:
from src.core.config import load_config
from src.pipeline.matcher import run_matching
cfg = load_config()
beats = [...] # list[TrailerBeat] from trailer analysis
results = run_matching(cfg, beats)
"""
from __future__ import annotations
import logging
from typing import Sequence
from src.core.config import AppConfig
from src.core.models import MatchResult, Scene, TrailerBeat
logger = logging.getLogger(__name__)
SeedPoint = float | tuple[float, float]
def _scene_seed_points(scene: Scene, max_points: int) -> list[float]:
if max_points <= 1 or scene.duration_s <= 0:
return [scene.start_s]
usable_end = max(scene.start_s, scene.end_s - 0.2)
if usable_end <= scene.start_s:
return [scene.start_s]
step = (usable_end - scene.start_s) / max(1, max_points - 1)
return [scene.start_s + step * idx for idx in range(max_points)]
def _build_scene_seed_in_points(
beats: Sequence[TrailerBeat],
scenes: Sequence[Scene],
cfg: AppConfig,
) -> dict[int, list[float]]:
from src.cv.vibe_check import run_vibe_check
scenes_by_id = {scene.scene_id: scene for scene in scenes}
seeds: dict[int, list[float]] = {}
for beat in beats:
hits = run_vibe_check(
beat,
scenes,
top_k=cfg.cv.deep_scan.scene_seed_top_k,
hist_method=cfg.cv.vibe_check.hist_compare_method,
phash_max_distance=64,
)
points: list[float] = []
for hit in hits:
scene = scenes_by_id.get(hit.scene_id)
if scene is None:
continue
points.extend(_scene_seed_points(scene, cfg.cv.deep_scan.scene_seed_points_per_scene))
if points:
seeds[beat.beat_id] = sorted({round(max(0.0, p), 3) for p in points})
logger.info(
"Beat %d: added %d scene-level seed candidates from %d source scenes.",
beat.beat_id,
len(seeds[beat.beat_id]),
len(hits),
)
return seeds
def _merge_seed_in_points(
*seed_maps: dict[int, Sequence[SeedPoint]] | None,
) -> dict[int, list[SeedPoint]]:
merged: dict[int, dict[float, float | None]] = {}
for seed_map in seed_maps:
if not seed_map:
continue
for beat_id, points in seed_map.items():
beat_points = merged.setdefault(beat_id, {})
for point in points:
if isinstance(point, tuple):
t_sec = round(max(0.0, float(point[0])), 3)
score = float(point[1])
else:
t_sec = round(max(0.0, float(point)), 3)
score = None
old_score = beat_points.get(t_sec)
if old_score is None:
beat_points[t_sec] = score
elif score is not None:
beat_points[t_sec] = max(old_score, score)
result: dict[int, list[SeedPoint]] = {}
for beat_id, points in merged.items():
result[beat_id] = [
(t_sec, score) if score is not None else t_sec
for t_sec, score in sorted(points.items())
]
return result
# ---------------------------------------------------------------------------
# Beat fingerprinting
# ---------------------------------------------------------------------------
def fingerprint_beats(
beats: Sequence[TrailerBeat],
cfg: AppConfig,
) -> list[TrailerBeat]:
"""
Enrich every TrailerBeat with its visual fingerprint (histogram + pHash).
Extracts the midpoint frame from the reference trailer and fingerprints it
using the same Text-Safe Crop parameters as the scene indexer.
Args:
beats: TrailerBeat list (fingerprints will be None initially).
cfg: Application configuration.
Returns:
New list of TrailerBeat objects with luma_hist, sat_hist, phash set.
"""
from dataclasses import replace
from src.cv.fingerprinting import fingerprint_frame
from src.cv.frame_extractor import grab_frame_at_path
vc_cfg = cfg.cv.vibe_check
enriched: list[TrailerBeat] = []
for beat in beats:
frame = grab_frame_at_path(beat.trailer_path, beat.midpoint_s)
if frame is None:
logger.warning("Beat %d: cannot decode midpoint frame, leaving unfingerpinted.", beat.beat_id)
enriched.append(beat)
continue
luma_b, sat_b, phash = fingerprint_frame(frame, vc_cfg)
enriched.append(replace(beat, luma_hist=luma_b, sat_hist=sat_b, phash=phash))
logger.info("Fingerprinted %d / %d beats.", sum(1 for b in enriched if b.phash), len(beats))
return enriched
# ---------------------------------------------------------------------------
# Main pipeline entry point
# ---------------------------------------------------------------------------
def run_matching(
cfg: AppConfig,
beats: Sequence[TrailerBeat],
force_reindex: bool = False,
seed_in_points: dict[int, Sequence[SeedPoint]] | None = None,
) -> list[MatchResult]:
"""
Execute the full 2-phase CV matching pipeline.
Args:
cfg: Application configuration (loaded from config.toml).
beats: All trailer beats to source (must have trailer_path set).
force_reindex: If True, ignore the scene cache and re-run PySceneDetect.
Returns:
List of MatchResult, one per beat (unmatched beats are omitted).
Results are in the same order as the input beats.
"""
from src.cv.scene_indexer import build_scene_index
logger.info("=" * 60)
logger.info("AI Trailer Generator v2 — CV Matching Pipeline")
logger.info("Source : %s", cfg.paths.source_movie.name)
logger.info("Trailer: %s", cfg.paths.reference_trailer.name)
logger.info("Beats : %d", len(beats))
logger.info("=" * 60)
# ------------------------------------------------------------------
# Phase 0: Scene index
# ------------------------------------------------------------------
logger.info("[Phase 0] Building scene index …")
scenes: list[Scene] = build_scene_index(cfg, force_reindex=force_reindex)
scenes_by_id: dict[int, Scene] = {s.scene_id: s for s in scenes}
logger.info("[Phase 0] %d scenes indexed.", len(scenes))
# ------------------------------------------------------------------
# Phase 0b: Fingerprint the beats
# ------------------------------------------------------------------
logger.info("[Phase 0b] Fingerprinting %d trailer beats …", len(beats))
beats = fingerprint_beats(beats, cfg)
# ------------------------------------------------------------------
# Phase 1 & 2: Global Scan (bypasses Scene Indexer / Vibe Check entirely)
# ------------------------------------------------------------------
logger.info("[Phase 1 & 2] Running FFmpeg Global Scan for %d beats ...", len(beats))
from src.cv.global_scan import run_global_scan
scene_seed_in_points = _build_scene_seed_in_points(beats, scenes, cfg)
vision_seed_in_points = {}
if cfg.vision.enabled:
try:
from src.llm.vision_cache import build_vision_seed_in_points
vision_seed_in_points = build_vision_seed_in_points(beats, scenes, cfg)
except Exception as exc:
logger.error("Vision seeding failed: %s — continuing with CV-only seeds.", exc)
results = run_global_scan(
beats,
cfg,
scenes=scenes,
seed_in_points=_merge_seed_in_points(seed_in_points, scene_seed_in_points, vision_seed_in_points),
)
logger.info("[Phase 1 & 2] Done. %d / %d beats matched.", len(results), len(beats))
logger.info("=" * 60)
return results
# ---------------------------------------------------------------------------
# Convenience: build an EditTimeline from match results
# ---------------------------------------------------------------------------
def build_timeline(
beats: Sequence[TrailerBeat],
results: Sequence[MatchResult],
cfg: AppConfig,
) -> "src.core.models.EditTimeline": # type: ignore[name-defined]
"""
Combine beats + match results into an ordered EditTimeline.
Unmatched beats are skipped; timeline positions are computed
sequentially from the usable source-match durations.
Args:
beats: All trailer beats (defines order + durations).
results: MatchResult list from run_matching().
cfg: Application configuration.
Returns:
EditTimeline ready for FCPXML / EDL export.
"""
from src.core.models import EditClip, EditTimeline
results_by_beat: dict[int, MatchResult] = {r.beat_id: r for r in results}
clips: list[EditClip] = []
cursor = 0.0
for beat in beats:
match = results_by_beat.get(beat.beat_id)
if match is None:
logger.warning("Beat %d has no match — gap in timeline.", beat.beat_id)
cursor += beat.duration_s
continue
match_duration = max(0.0, match.duration_s)
source_duration = min(beat.duration_s, match_duration) if match_duration > 0 else beat.duration_s
trailer_tail_s = max(0.0, beat.duration_s - source_duration)
if trailer_tail_s > 0:
logger.warning(
"Beat %d uses %.2fs source + %.2fs generated trailer tail.",
beat.beat_id,
source_duration,
trailer_tail_s,
)
clip = EditClip(
clip_index=len(clips),
beat=beat,
match=match,
timeline_start_s=cursor,
timeline_end_s=cursor + beat.duration_s,
source_duration_s=source_duration,
trailer_tail_s=trailer_tail_s,
)
clips.append(clip)
cursor += beat.duration_s
timeline = EditTimeline(
title=cfg.paths.reference_trailer.stem,
frame_rate=cfg.export.edl_frame_rate,
clips=tuple(clips),
)
logger.info(
"Timeline built: %d clips, total duration %.2fs",
timeline.clip_count, timeline.total_duration_s,
)
return timeline
+427
View File
@@ -0,0 +1,427 @@
"""
src/pipeline/reporter.py — Visual Match Report Generator
Generates an HTML file containing side-by-side video clips of:
Left: The original beat from the reference trailer
Right: The matched scene from the source movie
This allows instant visual verification of the CV pipeline's results.
"""
from __future__ import annotations
import logging
import subprocess
from pathlib import Path
from src.core.config import AppConfig
logger = logging.getLogger(__name__)
def _extract_clip(video_path: Path, start_s: float, duration_s: float, out_path: Path) -> None:
"""Use ffmpeg to extract a silent, low-res preview clip."""
out_path.parent.mkdir(parents=True, exist_ok=True)
# Fast input seek close to the target, then accurate output seek for
# frame-faithful preview clips. A plain "-ss before -i" can land on a
# nearby keyframe and make the report look several frames out of sync.
preroll_s = 2.0 if start_s >= 2.0 else 0.0
input_seek_s = max(0.0, start_s - preroll_s)
accurate_seek_s = start_s - input_seek_s
cmd = [
"ffmpeg", "-y", "-loglevel", "error",
"-ss", str(input_seek_s),
"-i", str(video_path),
"-ss", str(accurate_seek_s),
"-t", str(duration_s),
"-map", "0:v:0",
"-c:v", "libx264",
"-preset", "ultrafast",
"-crf", "28",
"-vf", "scale=640:-2", # scale down for lightweight report
"-an", # no audio
"-movflags", "+faststart",
str(out_path)
]
result = subprocess.run(cmd, capture_output=True)
if result.returncode != 0:
logger.error(
"ffmpeg clip extraction failed for %s:\n%s",
out_path.name, result.stderr.decode(errors="replace")
)
def _extract_clip_with_black_tail(
video_path: Path,
start_s: float,
source_duration_s: float,
total_duration_s: float,
out_path: Path,
) -> None:
"""Extract a source preview and append black frames for trailer-only tails."""
tail_s = max(0.0, total_duration_s - source_duration_s)
if tail_s <= 0.02:
_extract_clip(video_path, start_s, source_duration_s, out_path)
return
out_path.parent.mkdir(parents=True, exist_ok=True)
source_tmp = out_path.with_name(f"{out_path.stem}_source_tmp.mp4")
tail_tmp = out_path.with_name(f"{out_path.stem}_tail_tmp.mp4")
preroll_s = 2.0 if start_s >= 2.0 else 0.0
input_seek_s = max(0.0, start_s - preroll_s)
accurate_seek_s = start_s - input_seek_s
# First render the matched source portion with the same accurate seek path
# as _extract_clip(). Using trim=start=... after an input seek is brittle
# because FFmpeg may preserve non-zero packet timestamps around keyframes.
source_cmd = [
"ffmpeg", "-y", "-loglevel", "error",
"-ss", str(input_seek_s),
"-i", str(video_path),
"-ss", str(accurate_seek_s),
"-t", str(source_duration_s),
"-map", "0:v:0",
"-c:v", "libx264",
"-preset", "ultrafast",
"-crf", "28",
"-vf", "scale=640:360,setsar=1,fps=25,setpts=PTS-STARTPTS",
"-an",
"-movflags", "+faststart",
str(source_tmp),
]
result = subprocess.run(source_cmd, capture_output=True)
if result.returncode != 0:
logger.error(
"ffmpeg source preview extraction failed for %s:\n%s",
out_path.name,
result.stderr.decode(errors="replace"),
)
return
tail_cmd = [
"ffmpeg", "-y", "-loglevel", "error",
"-f", "lavfi",
"-i", f"color=c=black:s=640x360:r=25:d={tail_s}",
"-c:v", "libx264",
"-preset", "ultrafast",
"-crf", "28",
"-an",
"-movflags", "+faststart",
str(tail_tmp),
]
result = subprocess.run(tail_cmd, capture_output=True)
if result.returncode != 0:
logger.error(
"ffmpeg black tail render failed for %s:\n%s",
out_path.name,
result.stderr.decode(errors="replace"),
)
return
concat_cmd = [
"ffmpeg", "-y", "-loglevel", "error",
"-i", str(source_tmp),
"-i", str(tail_tmp),
"-filter_complex", "[0:v][1:v]concat=n=2:v=1:a=0[v]",
"-map", "[v]",
"-c:v", "libx264",
"-preset", "ultrafast",
"-crf", "28",
"-an",
"-movflags", "+faststart",
str(out_path),
]
result = subprocess.run(concat_cmd, capture_output=True)
if result.returncode != 0:
logger.error(
"ffmpeg tailed preview concat failed for %s:\n%s",
out_path.name,
result.stderr.decode(errors="replace"),
)
for tmp in (source_tmp, tail_tmp):
try:
tmp.unlink(missing_ok=True)
except OSError:
pass
def _extract_segmented_clip(
video_path: Path,
segments: list,
total_duration_s: float,
out_path: Path,
) -> None:
"""Render a beat-length source preview from multiple matched source islands."""
if not segments:
_extract_clip_with_black_tail(video_path, 0.0, 0.0, total_duration_s, out_path)
return
out_path.parent.mkdir(parents=True, exist_ok=True)
tmp_paths: list[Path] = []
cursor = 0.0
def add_black(duration_s: float) -> None:
if duration_s <= 0.02:
return
tmp = out_path.with_name(f"{out_path.stem}_part_{len(tmp_paths):03d}_black.mp4")
cmd = [
"ffmpeg", "-y", "-loglevel", "error",
"-f", "lavfi",
"-i", f"color=c=black:s=640x360:r=25:d={duration_s}",
"-c:v", "libx264", "-preset", "ultrafast", "-crf", "28",
"-an", "-movflags", "+faststart",
str(tmp),
]
result = subprocess.run(cmd, capture_output=True)
if result.returncode == 0:
tmp_paths.append(tmp)
else:
logger.error("ffmpeg black segment render failed:\n%s", result.stderr.decode(errors="replace"))
def add_source(start_s: float, duration_s: float) -> None:
if duration_s <= 0.02:
return
tmp = out_path.with_name(f"{out_path.stem}_part_{len(tmp_paths):03d}_src.mp4")
preroll_s = 2.0 if start_s >= 2.0 else 0.0
input_seek_s = max(0.0, start_s - preroll_s)
accurate_seek_s = start_s - input_seek_s
cmd = [
"ffmpeg", "-y", "-loglevel", "error",
"-ss", str(input_seek_s),
"-i", str(video_path),
"-ss", str(accurate_seek_s),
"-t", str(duration_s),
"-map", "0:v:0",
"-c:v", "libx264", "-preset", "ultrafast", "-crf", "28",
"-vf", "scale=640:360,setsar=1,fps=25,setpts=PTS-STARTPTS",
"-an", "-movflags", "+faststart",
str(tmp),
]
result = subprocess.run(cmd, capture_output=True)
if result.returncode == 0 and tmp.exists():
tmp_paths.append(tmp)
else:
logger.error("ffmpeg source segment render failed:\n%s", result.stderr.decode(errors="replace"))
for segment in sorted(segments, key=lambda s: s.trailer_offset_s):
offset_s = max(0.0, float(segment.trailer_offset_s))
duration_s = max(0.0, float(segment.duration_s))
add_black(offset_s - cursor)
add_source(float(segment.in_point_s), duration_s)
cursor = max(cursor, offset_s + duration_s)
add_black(total_duration_s - cursor)
if len(tmp_paths) == 1:
tmp_paths[0].replace(out_path)
return
inputs: list[str] = []
labels: list[str] = []
for idx, tmp in enumerate(tmp_paths):
inputs.extend(["-i", str(tmp)])
labels.append(f"[{idx}:v]")
filter_complex = "".join(labels) + f"concat=n={len(tmp_paths)}:v=1:a=0[v]"
cmd = [
"ffmpeg", "-y", "-loglevel", "error",
*inputs,
"-filter_complex", filter_complex,
"-map", "[v]",
"-c:v", "libx264", "-preset", "ultrafast", "-crf", "28",
"-an", "-movflags", "+faststart",
str(out_path),
]
result = subprocess.run(cmd, capture_output=True)
if result.returncode != 0:
logger.error("ffmpeg segmented preview concat failed:\n%s", result.stderr.decode(errors="replace"))
for tmp in tmp_paths:
try:
tmp.unlink(missing_ok=True)
except OSError:
pass
def _build_frame_locked_compare(ref_path: Path, src_path: Path, out_path: Path) -> None:
"""Render reference and source into one side-by-side video stream."""
out_path.parent.mkdir(parents=True, exist_ok=True)
normalize = (
"fps=25,scale=640:360:force_original_aspect_ratio=decrease,"
"pad=640:360:(ow-iw)/2:(oh-ih)/2,setsar=1,setpts=PTS-STARTPTS"
)
filter_complex = (
f"[0:v]{normalize}[ref];"
f"[1:v]{normalize}[src];"
"[ref][src]hstack=inputs=2[v]"
)
cmd = [
"ffmpeg", "-y", "-loglevel", "error",
"-i", str(ref_path),
"-i", str(src_path),
"-filter_complex", filter_complex,
"-map", "[v]",
"-c:v", "libx264",
"-preset", "ultrafast",
"-crf", "28",
"-an",
"-movflags", "+faststart",
str(out_path),
]
result = subprocess.run(cmd, capture_output=True)
if result.returncode != 0:
logger.error(
"ffmpeg compare render failed for %s:\n%s",
out_path.name,
result.stderr.decode(errors="replace"),
)
def generate_report(beats: list, results: list, cfg: AppConfig) -> Path:
"""
Generate an HTML side-by-side report.
Returns the path to the .html file.
"""
report_dir = cfg.paths.output_dir / "report"
report_dir.mkdir(parents=True, exist_ok=True)
html_path = report_dir / "match_report.html"
results_by_beat = {r.beat_id: r for r in results}
logger.info("Generating report clips in %s (this might take a moment) ...", report_dir)
html = [
"<!DOCTYPE html>",
"<html><head><meta charset='utf-8'><title>AI Trailer Match Report</title>",
"<style>",
"body { font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; background: #0f0f0f; color: #e0e0e0; margin: 40px; }",
"h1 { color: #fff; border-bottom: 1px solid #333; padding-bottom: 10px; }",
".stats { font-size: 1.2em; margin-bottom: 30px; color: #aaa; }",
".beat-row { display: flex; margin-bottom: 30px; background: #1a1a1a; padding: 20px; border-radius: 12px; border: 1px solid #333; }",
".info { width: 250px; padding-right: 20px; flex-shrink: 0; }",
".info h3 { margin-top: 0; color: #fff; }",
".video-container { display: flex; gap: 20px; flex-grow: 1; }",
".videos { flex-grow: 1; }",
".compare { margin-bottom: 18px; }",
".video-col { flex: 1; }",
".video-col p { margin-top: 0; font-weight: bold; color: #888; }",
"video { width: 100%; border-radius: 6px; box-shadow: 0 4px 6px rgba(0,0,0,0.5); background: #000; }",
".status-match { color: #4ade80; font-weight: bold; font-size: 1.1em; }",
".status-miss { color: #f87171; font-weight: bold; font-size: 1.1em; }",
".score { font-family: monospace; font-size: 1.1em; color: #60a5fa; }",
".code-hint { background: #000; padding: 10px; border-radius: 4px; font-family: monospace; font-size: 0.9em; margin-top: 15px; color: #a3e635; }",
"</style></head><body>",
f"<h1>AI Trailer Generator — Match Report</h1>",
f"<div class='stats'>Total Beats: {len(beats)} | Matched: {len(results)}</div>",
"<script>",
"function syncBeat(row) {",
" const vids = row.querySelectorAll('video');",
" if (vids.length < 2) return;",
" const ref = vids[0];",
" const src = vids[1];",
" let syncing = false;",
" function align() {",
" if (syncing) return;",
" syncing = true;",
" const target = Math.min(ref.currentTime, Math.max(0, (src.duration || ref.currentTime) - 0.02));",
" if (Math.abs(src.currentTime - target) > 0.035) src.currentTime = target;",
" if (ref.paused && !src.paused) src.pause();",
" if (!ref.paused && src.paused) src.play().catch(() => {});",
" syncing = false;",
" }",
" ref.addEventListener('play', () => { src.currentTime = Math.min(ref.currentTime, Math.max(0, (src.duration || ref.currentTime) - 0.02)); src.play().catch(() => {}); });",
" ref.addEventListener('pause', () => src.pause());",
" ref.addEventListener('seeked', () => { src.currentTime = Math.min(ref.currentTime, Math.max(0, (src.duration || ref.currentTime) - 0.02)); });",
" ref.addEventListener('timeupdate', align);",
"}",
"document.addEventListener('DOMContentLoaded', () => document.querySelectorAll('.beat-row').forEach(syncBeat));",
"</script>"
]
for beat in beats:
res = results_by_beat.get(beat.beat_id)
# Extract Reference Clip
ref_mp4 = report_dir / f"beat_{beat.beat_id:03d}_ref.mp4"
_extract_clip(beat.trailer_path, beat.start_s, beat.duration_s, ref_mp4)
html.append("<div class='beat-row'>")
# Info Panel
html.append("<div class='info'>")
html.append(f"<h3>Beat {beat.beat_id:03d}</h3>")
html.append(f"<p><b>Type:</b> {beat.beat_type.name}</p>")
html.append(f"<p><b>Trailer:</b> {beat.start_s:.2f}s &rarr; {beat.end_s:.2f}s</p>")
if res:
segments = list(getattr(res, "segments", ()) or [])
source_duration = sum(max(0.0, float(s.duration_s)) for s in segments)
if not segments:
source_duration = max(0.0, res.out_point_s - res.in_point_s)
preview_duration = min(beat.duration_s, source_duration) if source_duration > 0 else beat.duration_s
last_segment_end = max(
(float(s.trailer_offset_s) + float(s.duration_s) for s in segments),
default=preview_duration,
)
trailer_tail_s = max(0.0, beat.duration_s - last_segment_end)
if getattr(res, "is_confirmed", True):
html.append("<p class='status-match'>MATCHED</p>")
else:
html.append("<p style='color: #fbbf24; font-weight: bold; font-size: 1.1em;'>PROVISIONAL MATCH</p>")
html.append(f"<p><b>Scene ID:</b> {res.scene_id}</p>")
html.append(f"<p><b>Movie In:</b> {res.in_point_s:.2f}s</p>")
html.append(f"<p><b>Source Dur:</b> {source_duration:.2f}s</p>")
if len(segments) > 1:
html.append(f"<p><b>Segments:</b> {len(segments)} matched visual islands</p>")
if trailer_tail_s > 0:
html.append(f"<p><b>Unmatched Tail:</b> {trailer_tail_s:.2f}s placeholder</p>")
html.append(f"<p><b>Score:</b> <span class='score'>{res.match_score:.3f}</span></p>")
if trailer_tail_s > 0:
html.append("<p style='color: #fbbf24; font-size: 0.9em;'>Some trailer frames are still unmatched; report fills only those gaps with placeholder black.</p>")
# Warn if score is low
if res.match_score < 0.80:
html.append("<p style='color: #fbbf24; font-size: 0.9em;'>⚠️ Score below 0.80. Verify visually.</p>")
# Extract Source Clip
src_mp4 = report_dir / f"beat_{beat.beat_id:03d}_src.mp4"
compare_mp4 = report_dir / f"beat_{beat.beat_id:03d}_compare.mp4"
if segments:
_extract_segmented_clip(res.source_path, segments, beat.duration_s, src_mp4)
else:
_extract_clip_with_black_tail(
res.source_path,
res.in_point_s,
preview_duration,
beat.duration_s,
src_mp4,
)
_build_frame_locked_compare(ref_mp4, src_mp4, compare_mp4)
else:
html.append("<p class='status-miss'>NO MATCH</p>")
src_mp4 = None
compare_mp4 = None
html.append(f"<div class='code-hint'>python cli.py rematch --beat {beat.beat_id}</div>")
html.append("</div>") # /info
# Video Panel
html.append("<div class='videos'>")
if compare_mp4:
html.append(f"<div class='compare'><p>Frame-Locked Compare</p><video src='{compare_mp4.name}' controls loop muted autoplay></video></div>")
else:
html.append("<div class='video-container'>")
html.append(f"<div class='video-col'><p>Reference Trailer</p><video src='{ref_mp4.name}' controls loop muted autoplay></video></div>")
html.append("<div class='video-col'><p>Matched Source</p><div style='width: 100%; aspect-ratio: 16/9; background: #222; display: flex; align-items: center; justify-content: center; border-radius: 6px; color: #555;'>No Match</div></div>")
html.append("</div>") # /video-container
html.append("</div>") # /videos
html.append("</div>") # /beat-row
html.append("</body></html>")
html_path.write_text("\n".join(html), encoding="utf-8")
return html_path
+175
View File
@@ -0,0 +1,175 @@
"""
src/pipeline/trailer_analyzer.py — Reference trailer → list[TrailerBeat]
Responsibility:
1. Run PySceneDetect on the REFERENCE TRAILER (not the source movie)
to detect cut boundaries → raw beat intervals
2. Fingerprint the midpoint frame of each beat (for Vibe Check)
3. Transcribe dialogue per beat via Whisper (optional, injected)
4. Optionally classify BeatType via the LLM dramaturg (injected)
Returns: list[TrailerBeat] ready to feed into run_matching().
"""
from __future__ import annotations
import logging
from dataclasses import replace
from pathlib import Path
from typing import Callable, Sequence
from src.core.config import AppConfig
from src.core.models import BeatType, DialogueLine, TrailerBeat
from src.cv.fingerprinting import fingerprint_frame
from src.cv.frame_extractor import grab_midpoint_frame, open_video
logger = logging.getLogger(__name__)
# Injection type aliases — keeps this module free of hard audio/LLM imports
TranscribeCallback = Callable[[Path, float, float, float], list[DialogueLine]]
ClassifyCallback = Callable[[list[TrailerBeat]], list[TrailerBeat]]
# ---------------------------------------------------------------------------
# Step 1: Scene detection on the reference trailer
# ---------------------------------------------------------------------------
def _detect_trailer_beats(cfg: AppConfig) -> list[tuple[float, float, int, int]]:
"""
Run PySceneDetect on the reference trailer.
Returns list of (start_s, end_s, start_frame, end_frame).
Uses the same ContentDetector thresholds as the source movie.
"""
try:
from scenedetect import open_video as sd_open_video, SceneManager
from scenedetect.detectors import ContentDetector
except ImportError:
raise ImportError("pip install scenedetect[opencv]")
trailer_path = cfg.paths.reference_trailer
video = sd_open_video(str(trailer_path))
manager = SceneManager()
manager.add_detector(
ContentDetector(
threshold=cfg.scene_detection.content_threshold,
min_scene_len=int(
cfg.scene_detection.min_scene_duration_s * video.frame_rate
),
)
)
logger.info("Detecting beats in reference trailer: %s", trailer_path.name)
manager.detect_scenes(video=video, show_progress=False)
raw = manager.get_scene_list()
result = [
(s.get_seconds(), e.get_seconds(), s.get_frames(), e.get_frames())
for s, e in raw
]
logger.info("Detected %d beats in reference trailer.", len(result))
return result
# ---------------------------------------------------------------------------
# Step 2: Fingerprint beats
# ---------------------------------------------------------------------------
def _fingerprint_beats(
raw_beats: list[tuple[float, float, int, int]],
cfg: AppConfig,
) -> list[TrailerBeat]:
"""Extract midpoint frame for each beat and compute fingerprints."""
vc_cfg = cfg.cv.vibe_check
trailer_path = cfg.paths.reference_trailer
beats: list[TrailerBeat] = []
with open_video(trailer_path) as cap:
for idx, (start_s, end_s, start_frame, end_frame) in enumerate(raw_beats):
frame = grab_midpoint_frame(cap, start_s, end_s)
if frame is None:
logger.warning("Beat %d: midpoint frame decode failed.", idx)
beats.append(TrailerBeat(
beat_id=idx,
trailer_path=trailer_path,
start_s=start_s, end_s=end_s,
start_frame=start_frame, end_frame=end_frame,
))
continue
luma_b, sat_b, phash = fingerprint_frame(frame, vc_cfg)
beats.append(TrailerBeat(
beat_id=idx,
trailer_path=trailer_path,
start_s=start_s, end_s=end_s,
start_frame=start_frame, end_frame=end_frame,
luma_hist=luma_b,
sat_hist=sat_b,
phash=phash,
))
return beats
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def analyze_reference_trailer(
cfg: AppConfig,
transcribe_callback: TranscribeCallback | None = None,
classify_callback: ClassifyCallback | None = None,
) -> list[TrailerBeat]:
"""
Full reference-trailer analysis pipeline.
Args:
cfg: Application configuration.
transcribe_callback: Optional fn(path, start_s, end_s, offset_s)
→ list[DialogueLine]. Injected to keep this
module free of faster-whisper imports.
classify_callback: Optional fn(beats) → beats with BeatType set.
Injected to keep this module LLM-free.
Returns:
List of TrailerBeat objects with fingerprints (and optionally
dialogue + BeatType) populated.
"""
# Step 1 — cut detection
raw_beats = _detect_trailer_beats(cfg)
# Step 2 — fingerprint
beats = _fingerprint_beats(raw_beats, cfg)
# Step 3 — dialogue (optional)
if transcribe_callback is not None:
enriched: list[TrailerBeat] = []
for beat in beats:
try:
lines = transcribe_callback(
beat.trailer_path,
beat.start_s,
beat.end_s,
beat.start_s, # time_offset so timestamps are absolute
)
enriched.append(replace(beat, dialogue=tuple(lines)))
except Exception as exc:
logger.warning("Beat %d transcription failed: %s", beat.beat_id, exc)
enriched.append(beat)
beats = enriched
# Step 4 — LLM dramaturgy (optional)
if classify_callback is not None:
try:
beats = classify_callback(beats)
except Exception as exc:
logger.warning("Beat classification failed: %s — keeping UNKNOWN.", exc)
logger.info(
"Trailer analysis complete: %d beats, %d with dialogue, %d classified.",
len(beats),
sum(1 for b in beats if b.dialogue),
sum(1 for b in beats if b.beat_type != BeatType.UNKNOWN),
)
return beats