Files
aitrailer/src/cv/global_scan.py
T
Melbar 54d3f04616 Fix matching regressions, cache guard, and multi-shot algorithm for beat 15
- config.toml: revert scoreable_luma/contrast thresholds to 24/58/24 (lowering
  them let cross-fade blend frames contaminate content-validation templates,
  dropping scores below provisional_content_threshold)
- src/cv/global_scan.py: _is_dark_reference_frame now requires contrast<30 so
  genuine dark silhouette frames are not rejected as scoreable; two-path
  _is_scoreable_reference_frame separates standard vs fade-content scoring
- cli.py: _keeps_cached_match() guard prevents a weaker single-span rematch
  from overwriting a better multi-segment provisional cache entry
- cli.py: _fade_content_shots() restricted to between-island gaps only—
  pre-island black leaders were incorrectly emitted as matchable shots
- cli.py: island[0] of _match_unmatched_visual_segments() now uses no
  continuity seed so an insert cut at the start of a multi-shot beat is not
  forced toward the previous beat's scene
- scripts/generate_cutter_report.py: fix ffmpeg concat demuxer on Windows—
  use part.absolute().as_posix() so paths in the concat txt are absolute and
  not double-resolved relative to the concat file's directory

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-06 00:05:37 +02:00

1675 lines
66 KiB
Python

import logging
import cv2
import numpy as np
import subprocess as sp
from typing import Sequence
import time
from dataclasses import replace
from src.core.config import AppConfig
from src.core.models import MatchResult, TrailerBeat
from src.cv.fingerprinting import text_safe_crop
from src.cv.frame_extractor import grab_frame_at_path, get_video_info, open_video, grab_frame_at
logger = logging.getLogger(__name__)
SeedPoint = float | tuple[float, float]
_REFERENCE_CUT_CACHE: dict[tuple[str, float, float, float], list[float]] = {}
def _prepare_template(frame: np.ndarray, cfg: AppConfig) -> np.ndarray:
proxy_w = cfg.video.proxy_width
proxy_h = cfg.video.proxy_height
cb = text_safe_crop(
frame,
cfg.cv.vibe_check.crop_top_fraction,
cfg.cv.vibe_check.crop_bottom_fraction,
)
rb = cv2.resize(cb, (proxy_w, proxy_h), interpolation=cv2.INTER_AREA)
margin_y = int(proxy_h * 0.10)
margin_x = int(proxy_w * 0.10)
return _feature_image(rb[margin_y:proxy_h-margin_y, margin_x:proxy_w-margin_x])
def _prepare_haystack(frame: np.ndarray, cfg: AppConfig) -> np.ndarray:
cb = text_safe_crop(
frame,
cfg.cv.vibe_check.crop_top_fraction,
cfg.cv.vibe_check.crop_bottom_fraction,
)
rb = cv2.resize(cb, (cfg.video.proxy_width, cfg.video.proxy_height), interpolation=cv2.INTER_AREA)
return _feature_image(rb)
def _center_crop_feature(feature: np.ndarray, cfg: AppConfig) -> np.ndarray:
h, w = feature.shape[:2]
margin_y = int(h * 0.10)
margin_x = int(w * 0.10)
return feature[margin_y:h-margin_y, margin_x:w-margin_x]
def _feature_image(frame: np.ndarray) -> np.ndarray:
"""
Convert frames to a look-tolerant matching feature.
Trailer shots may be desaturated, contrast-shifted, or contain a different
grade than the source movie. Matching luma plus edges is more stable than
raw BGR pixels and rejects unrelated scenes with similar colors.
"""
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
gray = cv2.equalizeHist(gray)
edges = cv2.Canny(gray, 60, 140)
return cv2.addWeighted(gray, 0.70, edges, 0.30, 0)
def _match_score(frame: np.ndarray, template: np.ndarray, cfg: AppConfig) -> float:
haystack = _prepare_haystack(frame, cfg)
res = cv2.matchTemplate(haystack, template, cv2.TM_CCOEFF_NORMED)
_, max_val, _, _ = cv2.minMaxLoc(res)
return float(max_val)
def _fixed_position_score(frame: np.ndarray, template: np.ndarray, cfg: AppConfig) -> float:
fixed = _center_crop_feature(_prepare_haystack(frame, cfg), cfg)
if fixed.shape != template.shape:
fixed = cv2.resize(fixed, (template.shape[1], template.shape[0]), interpolation=cv2.INTER_AREA)
res = cv2.matchTemplate(fixed, template, cv2.TM_CCOEFF_NORMED)
_, max_val, _, _ = cv2.minMaxLoc(res)
return float(max_val)
def _fixed_feature(frame: np.ndarray, template_shape: tuple[int, ...], cfg: AppConfig) -> np.ndarray:
fixed = _center_crop_feature(_prepare_haystack(frame, cfg), cfg)
if fixed.shape != template_shape:
fixed = cv2.resize(fixed, (template_shape[1], template_shape[0]), interpolation=cv2.INTER_AREA)
return fixed
def _corr_same_size(a: np.ndarray, b: np.ndarray) -> float:
if a.shape != b.shape:
b = cv2.resize(b, (a.shape[1], a.shape[0]), interpolation=cv2.INTER_AREA)
res = cv2.matchTemplate(a, b, cv2.TM_CCOEFF_NORMED)
_, max_val, _, _ = cv2.minMaxLoc(res)
if np.isnan(max_val):
return 0.0
return float(max_val)
def _validation_crop(frame: np.ndarray) -> np.ndarray:
frame = _trim_dark_borders(frame)
h = frame.shape[0]
return frame[int(h * 0.05):int(h * 0.95), :]
def _trim_dark_borders(frame: np.ndarray) -> np.ndarray:
"""
Remove encoded black matte/pillarbox borders before fixed-position checks.
The reference trailer can contain vertical black bars while the source movie
does not. Whole-frame spatial validation should compare picture content, not
container matte.
"""
if frame.size == 0:
return frame
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
h, w = gray.shape[:2]
col_signal = np.percentile(gray, 90, axis=0)
row_signal = np.percentile(gray, 90, axis=1)
active_cols = np.where(col_signal > 18.0)[0]
active_rows = np.where(row_signal > 18.0)[0]
if active_cols.size >= max(8, int(w * 0.35)):
x0 = max(0, int(active_cols[0]) - 2)
x1 = min(w, int(active_cols[-1]) + 3)
else:
x0, x1 = 0, w
if active_rows.size >= max(8, int(h * 0.35)):
y0 = max(0, int(active_rows[0]) - 2)
y1 = min(h, int(active_rows[-1]) + 3)
else:
y0, y1 = 0, h
if x1 - x0 < int(w * 0.35) or y1 - y0 < int(h * 0.35):
return frame
return frame[y0:y1, x0:x1]
def _fixed_luma_feature(frame: np.ndarray, cfg: AppConfig) -> np.ndarray:
cropped = _validation_crop(frame)
gray = cv2.cvtColor(cropped, cv2.COLOR_BGR2GRAY)
gray = cv2.equalizeHist(gray)
resized = cv2.resize(gray, (160, 80), interpolation=cv2.INTER_AREA).astype(np.float32)
return (resized - float(np.mean(resized))) / (float(np.std(resized)) + 1e-6)
def _fixed_edge_feature(frame: np.ndarray, cfg: AppConfig) -> np.ndarray:
cropped = _validation_crop(frame)
gray = cv2.cvtColor(cropped, cv2.COLOR_BGR2GRAY)
gray = cv2.equalizeHist(gray)
edges = cv2.Canny(gray, 60, 140)
resized = cv2.resize(edges, (160, 80), interpolation=cv2.INTER_AREA).astype(np.float32)
return (resized - float(np.mean(resized))) / (float(np.std(resized)) + 1e-6)
def _fixed_hist_feature(frame: np.ndarray, cfg: AppConfig) -> np.ndarray:
cropped = _validation_crop(frame)
resized = cv2.resize(cropped, (160, 80), interpolation=cv2.INTER_AREA)
chans = cv2.split(resized)
parts = []
for channel in chans:
hist = cv2.calcHist([channel], [0], None, [32], [0, 256]).astype(np.float32).flatten()
parts.append(hist / (float(np.sum(hist)) + 1e-6))
return np.concatenate(parts)
def _fixed_spatial_hist_feature(frame: np.ndarray, cfg: AppConfig) -> np.ndarray:
cropped = _validation_crop(frame)
resized = cv2.resize(cropped, (160, 80), interpolation=cv2.INTER_AREA)
grid_y = 4
grid_x = 4
cell_h = resized.shape[0] // grid_y
cell_w = resized.shape[1] // grid_x
parts = []
for gy in range(grid_y):
for gx in range(grid_x):
cell = resized[gy * cell_h:(gy + 1) * cell_h, gx * cell_w:(gx + 1) * cell_w, :]
for channel in cv2.split(cell):
hist = cv2.calcHist([channel], [0], None, [16], [0, 256]).astype(np.float32).flatten()
parts.append(hist / (float(np.sum(hist)) + 1e-6))
return np.concatenate(parts)
def _array_corr(a: np.ndarray, b: np.ndarray) -> float:
if a.shape != b.shape:
return 0.0
return float(np.mean(a * b))
def _hist_intersection(a: np.ndarray, b: np.ndarray) -> float:
if a.shape != b.shape:
return 0.0
return float(np.minimum(a, b).sum() / (np.maximum(a, b).sum() + 1e-6))
def _fixed_content_features(frame: np.ndarray, cfg: AppConfig) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
return (
_fixed_luma_feature(frame, cfg),
_fixed_edge_feature(frame, cfg),
_fixed_hist_feature(frame, cfg),
_fixed_spatial_hist_feature(frame, cfg),
)
def _fixed_content_pair_score(
ref_features: tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray],
source_frame: np.ndarray,
cfg: AppConfig,
) -> float:
src_luma, src_edge, src_hist, src_spatial = _fixed_content_features(source_frame, cfg)
ref_luma, ref_edge, ref_hist, ref_spatial = ref_features
luma_score = _array_corr(ref_luma, src_luma)
edge_score = _array_corr(ref_edge, src_edge)
hist_score = _hist_intersection(ref_hist, src_hist)
spatial_score = _hist_intersection(ref_spatial, src_spatial)
return (
edge_score * 0.24
+ luma_score * 0.24
+ hist_score * 0.14
+ spatial_score * 0.38
)
def _prepare_validation_templates(
beat: TrailerBeat,
cfg: AppConfig,
) -> list[tuple[float, tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]]]:
step_s = max(0.20, cfg.cv.deep_scan.content_align_sample_step_s * 1.5)
matchable_s = estimate_matchable_reference_duration(beat, cfg, sample_step_s=step_s)
templates: list[tuple[float, tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]]] = []
t = 0.0
while t <= matchable_s:
frame = grab_frame_at_path(beat.trailer_path, beat.start_s + t)
if frame is not None and _is_scoreable_reference_frame(frame, cfg):
templates.append((t, _fixed_content_features(frame, cfg)))
t = round(t + step_s, 6)
if len(templates) >= 3:
return templates
fallback: list[tuple[float, tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]]] = []
for offset_s in _beat_offsets(matchable_s):
frame = grab_frame_at_path(beat.trailer_path, beat.start_s + offset_s)
if frame is not None and _is_scoreable_reference_frame(frame, cfg):
fallback.append((offset_s, _fixed_content_features(frame, cfg)))
return fallback
def _prepare_rerank_templates(
beat: TrailerBeat,
cfg: AppConfig,
) -> list[tuple[float, tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]]]:
matchable_s = estimate_matchable_reference_duration(beat, cfg)
templates: list[tuple[float, tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]]] = []
for offset_s in _beat_offsets(matchable_s):
frame = grab_frame_at_path(beat.trailer_path, beat.start_s + offset_s)
if frame is not None and _is_scoreable_reference_frame(frame, cfg):
templates.append((offset_s, _fixed_content_features(frame, cfg)))
return templates
def _fixed_content_sequence_score(
cap: cv2.VideoCapture,
in_point_s: float,
templates: list[tuple[float, tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]]],
cfg: AppConfig,
) -> float:
if not templates:
return 0.0
scores: list[float] = []
for offset_s, ref_features in templates:
frame = grab_frame_at(cap, in_point_s + offset_s)
if frame is None:
return 0.0
scores.append(_fixed_content_pair_score(ref_features, frame, cfg))
if not scores:
return 0.0
return float((sum(scores) / len(scores)) * 0.68 + min(scores) * 0.32)
def _reference_internal_cut_offsets(beat: TrailerBeat, cfg: AppConfig) -> list[float]:
"""Detect hard visual cuts inside a single trailer beat."""
cache_key = (
str(beat.trailer_path),
round(float(beat.start_s), 3),
round(float(beat.end_s), 3),
round(float(cfg.vision.multi_shot_cut_corr_threshold), 3),
)
cached = _REFERENCE_CUT_CACHE.get(cache_key)
if cached is not None:
return cached
step_s = max(1.0 / cfg.export.edl_frame_rate, 0.08)
previous: np.ndarray | None = None
cuts: list[float] = []
t = 0.0
while t <= beat.duration_s:
frame = grab_frame_at_path(beat.trailer_path, beat.start_s + t)
if frame is not None and _is_scoreable_reference_frame(frame, cfg):
feature = _prepare_haystack(frame, cfg)
if previous is not None:
corr = _corr_same_size(previous, feature)
if (
corr < cfg.vision.multi_shot_cut_corr_threshold
and 0.18 < t < beat.duration_s - 0.18
and (not cuts or t - cuts[-1] > 0.24)
):
cuts.append(round(t, 3))
previous = feature
t = round(t + step_s, 6)
if cuts:
logger.debug('Beat %d: detected internal trailer cuts at %s', beat.beat_id, cuts)
_REFERENCE_CUT_CACHE[cache_key] = cuts
return cuts
def _scene_fps_estimate(scene, cfg: AppConfig) -> float:
duration_s = max(0.0, float(scene.end_s) - float(scene.start_s))
frame_count = max(0, int(scene.end_frame) - int(scene.start_frame))
if duration_s <= 0.0 or frame_count <= 0:
return cfg.export.edl_frame_rate
return frame_count / duration_s
def _contiguous_scene_coverage_duration(
beat: TrailerBeat,
in_point_s: float,
scenes: Sequence | None,
matchable_duration_s: float,
cfg: AppConfig,
) -> float:
"""
Allow a source span to cross scene boundaries only when the trailer beat has
matching internal cuts at the same relative offsets.
"""
if not scenes or matchable_duration_s <= 0:
return 0.0
start_idx = None
for idx, scene in enumerate(scenes):
if float(scene.start_s) <= in_point_s < float(scene.end_s):
start_idx = idx
break
if start_idx is None:
return 0.0
cut_offsets = _reference_internal_cut_offsets(beat, cfg)
target_end = in_point_s + matchable_duration_s
current_end = in_point_s
for scene in scenes[start_idx:]:
scene_end = float(scene.end_s)
fps = _scene_fps_estimate(scene, cfg)
tail_s = max(0.0, cfg.cv.deep_scan.trim_tail_frames / fps)
if target_end <= scene_end:
return matchable_duration_s
boundary_offset = scene_end - in_point_s
boundary_matches_ref_cut = any(
abs(boundary_offset - cut_offset) <= cfg.vision.multi_shot_boundary_tolerance_s
for cut_offset in cut_offsets
)
if not boundary_matches_ref_cut:
return max(0.0, scene_end - in_point_s - tail_s)
current_end = scene_end
return max(0.0, current_end - in_point_s)
def _rerank_candidates_by_content(
beat: TrailerBeat,
candidates: list[tuple[float, float]],
cfg: AppConfig,
scenes: Sequence | None = None,
matchable_duration_s: float | None = None,
) -> list[tuple[float, float, float]]:
templates = _prepare_rerank_templates(beat, cfg)
if not templates:
return [(score, score, t_sec) for score, t_sec in candidates]
reranked: list[tuple[float, float, float]] = []
with open_video(cfg.paths.source_movie) as cap:
for coarse_score, t_sec in candidates:
content_score = _fixed_content_sequence_score(cap, t_sec, templates, cfg)
coverage_score = 1.0
if scenes is not None and matchable_duration_s and matchable_duration_s > 0:
usable_s = _contiguous_scene_coverage_duration(
beat,
t_sec,
scenes,
matchable_duration_s,
cfg,
)
coverage_score = min(1.0, usable_s / matchable_duration_s)
rank_score = (
content_score * 0.62
+ coarse_score * 0.18
+ coverage_score * 0.20
)
reranked.append((rank_score, coarse_score, t_sec))
return sorted(reranked, key=lambda item: item[0], reverse=True)
def _dense_weighted_seed_candidates(
beat: TrailerBeat,
seed_candidates: list[tuple[float, float]],
cfg: AppConfig,
scenes: Sequence | None,
matchable_duration_s: float,
) -> list[tuple[float, float]]:
"""Scan vision-selected source scenes densely with fixed-position content features."""
if not scenes or not seed_candidates:
return []
weighted_floor = cfg.cv.deep_scan.coarse_candidate_threshold + 0.05
seeded_scenes: dict[int, tuple[object, float]] = {}
for seed_score, seed_t in seed_candidates:
if seed_score <= weighted_floor:
continue
scene = _find_scene_for_time(scenes, seed_t, cfg)
if scene is None:
continue
previous = seeded_scenes.get(scene.scene_id)
if previous is None or seed_score > previous[1]:
seeded_scenes[scene.scene_id] = (scene, seed_score)
if not seeded_scenes:
return []
templates = _prepare_rerank_templates(beat, cfg)
if not templates:
return []
cut_offsets = _reference_internal_cut_offsets(beat, cfg)
dense: list[tuple[float, float, float, float, int]] = []
with open_video(cfg.paths.source_movie) as cap:
for scene, seed_score in seeded_scenes.values():
fps = _source_fps_from_scene(scene) or cfg.export.edl_frame_rate
tail_s = max(0.0, cfg.cv.deep_scan.trim_tail_frames / fps)
start_s = max(0.0, float(scene.start_s))
end_s = max(start_s, float(scene.end_s) - tail_s)
if end_s <= start_s:
continue
span_s = end_s - start_s
step_s = max(0.04, cfg.vision.local_scan_step_s)
max_points = max(2, cfg.vision.local_scan_max_points_per_scene)
point_count = int(span_s / step_s) + 1
if point_count > max_points:
step_s = span_s / float(max_points - 1)
t_sec = start_s
while t_sec <= end_s + 0.001:
content_score = _fixed_content_sequence_score(cap, t_sec, templates, cfg)
usable_s = max(0.0, float(scene.end_s) - t_sec - tail_s)
coverage_score = (
min(1.0, usable_s / matchable_duration_s)
if matchable_duration_s > 0 else 0.0
)
rank_score = (
content_score * 0.50
+ coverage_score * 0.35
+ seed_score * 0.15
)
coarse_score = max(
weighted_floor,
min(0.99, seed_score * 0.80 + content_score * 0.20),
)
dense.append((rank_score, coarse_score, t_sec, content_score, scene.scene_id))
t_sec += step_s
for cut_offset in cut_offsets:
shifted_t = max(0.0, float(scene.start_s) - cut_offset)
coverage_score = (
min(
1.0,
_contiguous_scene_coverage_duration(
beat,
shifted_t,
scenes,
matchable_duration_s,
cfg,
) / matchable_duration_s,
)
if matchable_duration_s > 0 else 0.0
)
if coverage_score < 0.80:
continue
content_score = _fixed_content_sequence_score(cap, shifted_t, templates, cfg)
rank_score = (
content_score * 0.56
+ coverage_score * 0.34
+ seed_score * 0.10
)
coarse_score = max(
weighted_floor,
min(0.99, seed_score * 0.78 + content_score * 0.22),
)
dense.append((rank_score, coarse_score, shifted_t, content_score, scene.scene_id))
dense.sort(key=lambda item: item[0], reverse=True)
top = dense[: max(0, cfg.vision.local_scan_top_candidates)]
if top:
logger.info(
'Beat %d: dense vision content scan kept %d/%d candidates; best scene=%d in=%.3fs content=%.3f rank=%.3f.',
beat.beat_id,
len(top),
len(dense),
top[0][4],
top[0][2],
top[0][3],
top[0][0],
)
return [(coarse_score, t_sec) for _, coarse_score, t_sec, _, _ in top]
def _beat_offsets(duration_s: float) -> list[float]:
"""Use several frames across the beat, including the leading edge."""
if duration_s < 1.0:
return [0.0, duration_s * 0.35, duration_s * 0.70]
if duration_s < 2.5:
return [duration_s * r for r in (0.00, 0.15, 0.35, 0.55, 0.78)]
return [duration_s * r for r in (0.00, 0.12, 0.30, 0.50, 0.70, 0.88)]
def _prepare_beat_templates(beat: TrailerBeat, cfg: AppConfig) -> list[tuple[float, np.ndarray]]:
templates: list[tuple[float, np.ndarray]] = []
matchable_s = estimate_matchable_reference_duration(beat, cfg)
for offset_s in _beat_offsets(matchable_s):
frame = grab_frame_at_path(beat.trailer_path, beat.start_s + offset_s)
if frame is None or not _is_scoreable_reference_frame(frame, cfg):
continue
templates.append((offset_s, _prepare_template(frame, cfg)))
return templates
def _prepare_beat_templates_stepped(
beat: TrailerBeat,
cfg: AppConfig,
step_s: float = 0.12,
) -> list[tuple[float, np.ndarray]]:
templates: list[tuple[float, np.ndarray]] = []
matchable_s = estimate_matchable_reference_duration(beat, cfg, sample_step_s=step_s)
t = 0.0
while t <= matchable_s:
frame = grab_frame_at_path(beat.trailer_path, beat.start_s + t)
if frame is not None and _is_scoreable_reference_frame(frame, cfg):
templates.append((t, _prepare_template(frame, cfg)))
t = round(t + step_s, 6)
return templates
def _prepare_motion_templates(
beat: TrailerBeat,
cfg: AppConfig,
step_s: float = 0.12,
) -> list[tuple[float, float, np.ndarray, tuple[int, ...]]]:
"""
Build reference frame-difference templates for motion-phase alignment.
Absolute image similarity can match the right shot at the wrong point in a
repeated movement. Frame-to-frame deltas make the refine pass care about the
phase and direction of motion as well.
"""
result: list[tuple[float, float, np.ndarray, tuple[int, ...]]] = []
max_offset = max(0.0, beat.duration_s - step_s)
t = 0.0
while t <= max_offset:
f0 = grab_frame_at_path(beat.trailer_path, beat.start_s + t)
f1 = grab_frame_at_path(beat.trailer_path, beat.start_s + t + step_s)
if (
f0 is not None
and f1 is not None
and _is_scoreable_reference_frame(f0, cfg)
and _is_scoreable_reference_frame(f1, cfg)
):
feat0 = _prepare_template(f0, cfg)
feat1 = _prepare_template(f1, cfg)
result.append((t, step_s, cv2.absdiff(feat1, feat0), feat0.shape))
t = round(t + step_s, 6)
return result
def _is_dark_reference_frame(frame: np.ndarray, cfg: AppConfig) -> bool:
"""Truly dark / pure-black frame: no usable structure for matching.
A cross-fade silhouette (low overall luma but visible contrast) is NOT
a dark frame for our purposes — it carries content (a hand, a knife,
a face peeking through the fade) and should still be matchable.
"""
cropped = text_safe_crop(
frame,
cfg.cv.vibe_check.crop_top_fraction,
cfg.cv.vibe_check.crop_bottom_fraction,
)
gray = cv2.cvtColor(cropped, cv2.COLOR_BGR2GRAY)
mean = float(np.mean(gray))
p90 = float(np.percentile(gray, 90))
p10 = float(np.percentile(gray, 10))
contrast = p90 - p10
# Real darkness: low luma AND low contrast (no structure visible)
return mean < 28.0 and p90 < 58.0 and contrast < 30.0
def _reference_visibility_stats(frame: np.ndarray, cfg: AppConfig) -> tuple[float, float, float]:
cropped = text_safe_crop(
frame,
cfg.cv.vibe_check.crop_top_fraction,
cfg.cv.vibe_check.crop_bottom_fraction,
)
gray = cv2.cvtColor(cropped, cv2.COLOR_BGR2GRAY)
p10 = float(np.percentile(gray, 10))
p90 = float(np.percentile(gray, 90))
return float(np.mean(gray)), p90, p90 - p10
def _is_scoreable_reference_frame(frame: np.ndarray, cfg: AppConfig) -> bool:
"""Decide whether a reference frame can carry a usable match template.
Two acceptance paths:
* Standard: regular daylight / interior shot — luma at or above the
configured thresholds AND enough contrast to be distinct.
* Fade-content: low overall luma BUT with strong local contrast,
i.e. a cross-fade silhouette where you can clearly see structure
(hand+knife against dark, face emerging from black, etc.). Without
this path the matcher would silently drop content-bearing fades and
mis-match the visible portion alone.
"""
if _is_dark_reference_frame(frame, cfg):
return False
mean_luma, p90_luma, contrast = _reference_visibility_stats(frame, cfg)
# Standard daylight / interior shot
enough_luma = (
mean_luma >= cfg.cv.deep_scan.scoreable_luma_mean_min
or p90_luma >= cfg.cv.deep_scan.scoreable_luma_p90_min
)
if enough_luma and contrast >= cfg.cv.deep_scan.scoreable_contrast_min:
return True
# Fade-content: dim but with structure. The local contrast must be
# well above what a uniform dim frame would have, and at least a few
# bright pixels must exist (p90 above pure-black), so we don't accept
# a featureless dark wash. These thresholds are deliberately tighter
# than the standard path so we don't pollute scoring with smooth fades.
if contrast >= 40.0 and p90_luma >= 30.0:
return True
return False
def estimate_matchable_reference_duration(
beat: TrailerBeat,
cfg: AppConfig,
sample_step_s: float | None = None,
) -> float:
"""
Estimate the part of a trailer beat that should be source-matchable.
Trailer beats often include trailing black/title/credit frames that do not
exist in the source movie. Those frames should not force the source match to
cover the full beat duration.
"""
step_s = sample_step_s if sample_step_s is not None else cfg.cv.deep_scan.span_sample_step_s
samples: list[tuple[float, bool]] = []
t = 0.0
while t <= beat.duration_s:
frame = grab_frame_at_path(beat.trailer_path, beat.start_s + t)
if frame is not None:
samples.append((t, _is_dark_reference_frame(frame, cfg)))
t = round(t + step_s, 6)
if not samples:
return beat.duration_s
dark_run_start: float | None = None
saw_visible = False
min_dark_break_s = max(0.24, step_s * 2.0)
for offset_s, is_dark in samples:
if not is_dark:
saw_visible = True
dark_run_start = None
continue
if saw_visible:
if dark_run_start is None:
dark_run_start = offset_s
if offset_s - dark_run_start >= min_dark_break_s:
break
if dark_run_start is None:
return beat.duration_s
# Keep a small buffer before the first sustained dark/title break so the
# source clip does not visibly end before the trailer begins its fade/card.
# Long beats can contain later credit/title islands; those should not force
# one source clip to validate unrelated images.
return max(step_s, min(beat.duration_s, dark_run_start + step_s))
def _sequence_score(
cap: cv2.VideoCapture,
in_point_s: float,
templates: list[tuple[float, np.ndarray]],
cfg: AppConfig,
) -> float:
weighted_scores: list[float] = []
raw_scores: list[float] = []
for offset_s, template in templates:
frame = grab_frame_at(cap, in_point_s + offset_s)
if frame is None:
return -1.0
floating_score = _match_score(frame, template, cfg)
fixed_score = _fixed_position_score(frame, template, cfg)
score = (floating_score * 0.55) + (fixed_score * 0.45)
# The first frames matter most for perceived sync. Weight them higher
# so a match that begins a few frames early loses to a better aligned hit.
weight = 1.35 if offset_s <= 0.16 else 1.0
weighted_scores.append(score * weight)
raw_scores.append(score)
if not raw_scores:
return -1.0
# Reward consistently good temporal alignment. A single strong frame is not
# enough if the other beat frames drift away.
weighted_avg = sum(weighted_scores) / (len(raw_scores) + 0.35 * sum(1 for o, _ in templates if o <= 0.16))
return float(weighted_avg * 0.70 + min(raw_scores) * 0.30)
def _content_alignment_templates(
beat: TrailerBeat,
cfg: AppConfig,
) -> list[tuple[float, np.ndarray]]:
matchable_s = estimate_matchable_reference_duration(
beat,
cfg,
sample_step_s=cfg.cv.deep_scan.content_align_sample_step_s,
)
step_s = max(1.0 / cfg.export.edl_frame_rate, cfg.cv.deep_scan.content_align_sample_step_s)
max_offset_s = max(0.0, min(beat.duration_s, matchable_s) - step_s)
offsets = [0.0]
t = step_s
while t <= max_offset_s:
offsets.append(round(t, 6))
t = round(t + step_s, 6)
if matchable_s > step_s and offsets[-1] < max_offset_s:
offsets.append(round(max_offset_s, 6))
templates: list[tuple[float, np.ndarray]] = []
for offset_s in offsets:
frame = grab_frame_at_path(beat.trailer_path, beat.start_s + offset_s)
if frame is not None:
if not _is_scoreable_reference_frame(frame, cfg):
continue
templates.append((offset_s, _prepare_template(frame, cfg)))
if not templates:
return _prepare_beat_templates(beat, cfg)
return templates
def _content_alignment_score(
cap: cv2.VideoCapture,
in_point_s: float,
templates: list[tuple[float, np.ndarray]],
cfg: AppConfig,
) -> float:
if not templates:
return -1.0
weighted_total = 0.0
weight_total = 0.0
raw_scores: list[float] = []
early_scores: list[float] = []
for offset_s, template in templates:
frame = grab_frame_at(cap, in_point_s + offset_s)
if frame is None:
return -1.0
# For offset detection the fixed frame position is intentionally more
# important than free template placement. Free placement can make the
# right shot look acceptable even when the movement is a few frames off.
fixed_score = _fixed_position_score(frame, template, cfg)
floating_score = _match_score(frame, template, cfg)
score = fixed_score * 0.72 + floating_score * 0.28
weight = 1.45 if offset_s <= 0.20 else 1.0
weighted_total += score * weight
weight_total += weight
raw_scores.append(score)
if offset_s <= 0.36:
early_scores.append(score)
avg_score = weighted_total / weight_total if weight_total > 0 else -1.0
min_score = min(raw_scores) if raw_scores else -1.0
early_score = sum(early_scores) / len(early_scores) if early_scores else avg_score
return float(avg_score * 0.55 + min_score * 0.25 + early_score * 0.20)
def align_in_point_by_content(
beat: TrailerBeat,
estimated_in_point_s: float,
cfg: AppConfig,
search_window_s: float | None = None,
) -> tuple[float, float]:
"""
Find the frame offset directly from image content around a rough match.
This is deliberately local: once a candidate shot is plausible, scanning a
small window around it with many reference frames is faster and more robust
than repeating a global scan or applying a fixed frame preroll.
"""
templates = _content_alignment_templates(beat, cfg)
if not templates:
return estimated_in_point_s, 0.0
with open_video(cfg.paths.source_movie) as cap:
fps = float(cap.get(cv2.CAP_PROP_FPS)) or cfg.export.edl_frame_rate
frame_step_s = 1.0 / fps
window_s = (
search_window_s
if search_window_s is not None
else cfg.cv.deep_scan.content_align_window_seconds
)
start_s = max(0.0, estimated_in_point_s - window_s)
end_s = estimated_in_point_s + window_s
tie_delta = cfg.cv.deep_scan.start_tie_break_score_delta
best_in = estimated_in_point_s
best_score = -1.0
t = start_s
while t <= end_s:
score = _content_alignment_score(cap, t, templates, cfg)
if score > best_score + tie_delta:
best_score = score
best_in = t
elif score >= best_score - tie_delta and abs(t - estimated_in_point_s) < abs(best_in - estimated_in_point_s):
best_in = t
t = round(t + frame_step_s, 6)
return best_in, max(0.0, best_score)
def _motion_phase_score(
cap: cv2.VideoCapture,
in_point_s: float,
motion_templates: list[tuple[float, float, np.ndarray, tuple[int, ...]]],
cfg: AppConfig,
) -> float:
scores: list[float] = []
for offset_s, step_s, ref_delta, template_shape in motion_templates:
f0 = grab_frame_at(cap, in_point_s + offset_s)
f1 = grab_frame_at(cap, in_point_s + offset_s + step_s)
if f0 is None or f1 is None:
return -1.0
src0 = _fixed_feature(f0, template_shape, cfg)
src1 = _fixed_feature(f1, template_shape, cfg)
scores.append(_corr_same_size(cv2.absdiff(src1, src0), ref_delta))
if not scores:
return 0.0
return float((sum(scores) / len(scores)) * 0.65 + min(scores) * 0.35)
def align_in_point_by_motion(
beat: TrailerBeat,
estimated_in_point_s: float,
cfg: AppConfig,
search_window_s: float | None = None,
) -> tuple[float, float]:
"""
Align a candidate by matching the frame-to-frame motion pattern.
This catches the common failure mode where the right source scene is found,
but the in-point is a few seconds too early or late inside a repeated
conversation/action beat.
"""
motion_templates = _prepare_motion_templates(beat, cfg)
if len(motion_templates) < 2:
return estimated_in_point_s, 0.0
with open_video(cfg.paths.source_movie) as cap:
fps = float(cap.get(cv2.CAP_PROP_FPS)) or cfg.export.edl_frame_rate
frame_step_s = 1.0 / fps
window_s = (
search_window_s
if search_window_s is not None
else cfg.cv.deep_scan.content_align_window_seconds
)
start_s = max(0.0, estimated_in_point_s - window_s)
end_s = estimated_in_point_s + window_s
tie_delta = cfg.cv.deep_scan.start_tie_break_score_delta
best_in = estimated_in_point_s
best_score = -1.0
t = start_s
while t <= end_s:
score = _motion_phase_score(cap, t, motion_templates, cfg)
if score > best_score + tie_delta:
best_score = score
best_in = t
elif score >= best_score - tie_delta and abs(t - estimated_in_point_s) < abs(best_in - estimated_in_point_s):
best_in = t
t = round(t + frame_step_s, 6)
return best_in, max(0.0, best_score)
def align_in_point_by_content_and_motion(
beat: TrailerBeat,
estimated_in_point_s: float,
cfg: AppConfig,
search_window_s: float | None = None,
) -> tuple[float, float, float, float]:
"""
Align a candidate using still-frame content and motion phase together.
Running content and motion as separate passes can overshoot short action
phases: one pass may land on the right broad gesture and the next can slide
to a visually similar but later posture. A joint score keeps the in-point
tied to the same frame hypothesis throughout the local search.
"""
templates = _prepare_beat_templates(beat, cfg)
motion_templates = _prepare_motion_templates(beat, cfg)
if not templates:
return estimated_in_point_s, 0.0, 0.0, 0.0
with open_video(cfg.paths.source_movie) as cap:
fps = float(cap.get(cv2.CAP_PROP_FPS)) or cfg.export.edl_frame_rate
frame_step_s = 1.0 / fps
window_s = (
search_window_s
if search_window_s is not None
else cfg.cv.deep_scan.content_align_window_seconds
)
start_s = max(0.0, estimated_in_point_s - window_s)
end_s = estimated_in_point_s + window_s
tie_delta = cfg.cv.deep_scan.start_tie_break_score_delta
best_in = estimated_in_point_s
best_score = -1.0
best_content = -1.0
best_motion = -1.0
t = start_s
while t <= end_s:
content_score = _content_alignment_score(cap, t, templates, cfg)
motion_score = (
_motion_phase_score(cap, t, motion_templates, cfg)
if len(motion_templates) >= 2
else content_score
)
if content_score < 0 or motion_score < 0:
t = round(t + frame_step_s, 6)
continue
raw_score = content_score * 0.64 + motion_score * 0.36
anchor_penalty = min(0.18, abs(t - estimated_in_point_s) * 0.05)
score = raw_score - anchor_penalty
if score > best_score + tie_delta:
best_score = score
best_in = t
best_content = content_score
best_motion = motion_score
elif score >= best_score - tie_delta:
current_distance = abs(t - estimated_in_point_s)
best_distance = abs(best_in - estimated_in_point_s)
if current_distance < best_distance or (
abs(current_distance - best_distance) <= frame_step_s * 0.5
and t < best_in
):
best_in = t
best_content = content_score
best_motion = motion_score
t = round(t + frame_step_s, 6)
return best_in, max(0.0, best_score), max(0.0, best_content), max(0.0, best_motion)
def estimate_usable_source_duration(
beat: TrailerBeat,
in_point_s: float,
cfg: AppConfig,
sample_step_s: float | None = None,
min_keep_s: float = 0.5,
) -> tuple[float, float]:
"""
Estimate how long the source stays visually aligned with the beat.
This catches cases where the source dissolves/cuts into the next shot while
the trailer beat continues into a title card or black fade.
Returns:
(usable_duration_s, average_good_score)
"""
step_s = sample_step_s if sample_step_s is not None else cfg.cv.deep_scan.span_sample_step_s
templates = _prepare_beat_templates_stepped(beat, cfg, step_s)
if not templates:
return beat.duration_s, 0.0
scores: list[tuple[float, float]] = []
source_fps = cfg.export.edl_frame_rate
with open_video(cfg.paths.source_movie) as cap:
source_fps = float(cap.get(cv2.CAP_PROP_FPS)) or cfg.export.edl_frame_rate
for offset_s, template in templates:
frame = grab_frame_at(cap, in_point_s + offset_s)
if frame is None:
break
scores.append((offset_s, _match_score(frame, template, cfg)))
if not scores:
return 0.0, 0.0
warmup_scores = [score for offset, score in scores if offset <= min(1.0, beat.duration_s * 0.35)]
baseline = max(warmup_scores) if warmup_scores else max(score for _, score in scores)
min_score = max(0.34, baseline * 0.48)
last_good = 0.0
bad_run = 0
good_scores: list[float] = []
for offset_s, score in scores:
if score >= min_score:
last_good = offset_s
bad_run = 0
good_scores.append(score)
continue
if offset_s < min_keep_s:
continue
bad_run += 1
if bad_run >= 3:
break
tail_safety_s = max(0.0, cfg.cv.deep_scan.trim_tail_frames / source_fps)
usable = min(beat.duration_s, max(0.0, last_good - tail_safety_s))
if usable < min_keep_s and scores:
usable = min(beat.duration_s, max(min_keep_s, scores[0][0] + step_s - tail_safety_s))
avg_good = float(sum(good_scores) / len(good_scores)) if good_scores else 0.0
return usable, avg_good
def refine_timestamp(template: np.ndarray, t_sec: float, cfg: AppConfig) -> float:
best_score = -1.0
best_t = t_sec
tie_delta = cfg.cv.deep_scan.start_tie_break_score_delta
with open_video(cfg.paths.source_movie) as cap:
fps = float(cap.get(cv2.CAP_PROP_FPS))
step = 1.0 / fps
start_t = max(0.0, t_sec - 0.5)
end_t = t_sec + 0.5
t = start_t
while t <= end_t:
frame = grab_frame_at(cap, t)
if frame is not None:
max_val = _match_score(frame, template, cfg)
if max_val > best_score + tie_delta:
best_score = max_val
best_t = t
elif max_val >= best_score - tie_delta and t < best_t:
best_t = t
t += step
return best_t
def refine_in_point_with_sequence(
beat: TrailerBeat,
estimated_in_point_s: float,
cfg: AppConfig,
search_window_s: float | None = None,
) -> tuple[float, float]:
"""
Refine a rough source in-point by comparing several frames across the beat.
Returns:
(best_in_point_s, sequence_score)
"""
return align_in_point_by_content(beat, estimated_in_point_s, cfg, search_window_s)
def _find_scene_for_time(scenes: Sequence | None, t_sec: float, cfg: AppConfig):
if not scenes:
return None
for idx, scene in enumerate(scenes):
if scene.start_s <= t_sec < scene.end_s:
if (
scene.end_s - t_sec <= cfg.cv.deep_scan.scene_boundary_epsilon_s
and idx + 1 < len(scenes)
):
return scenes[idx + 1]
return scene
return None
def _source_fps_from_scene(scene) -> float:
duration_s = max(0.0, scene.end_s - scene.start_s)
frame_count = max(0, scene.end_frame - scene.start_frame)
return frame_count / duration_s if duration_s > 0 and frame_count > 0 else 0.0
def _apply_start_preroll(in_point_s: float, source_fps: float, cfg: AppConfig) -> float:
if cfg.cv.deep_scan.start_preroll_frames <= 0:
return in_point_s
fps = source_fps or cfg.export.edl_frame_rate
return max(0.0, in_point_s - (cfg.cv.deep_scan.start_preroll_frames / fps))
def _clamp_to_scene_start(in_point_s: float, scene) -> float:
if scene is None:
return in_point_s
return max(float(scene.start_s), in_point_s)
def _add_top_candidate(
candidates: list[tuple[float, float]],
score: float,
t_sec: float,
max_candidates: int,
min_distance_s: float,
) -> list[tuple[float, float]]:
"""
Keep diverse coarse candidates as (score, midpoint_time).
A single best midpoint frame is too brittle: repeated actors, similar color
palettes, cars, forests, and title-card darkness can all create plausible
false positives. Keeping a ranked pool lets the multi-frame sequence pass
choose the temporally consistent match.
"""
for idx, (old_score, old_t) in enumerate(candidates):
if abs(old_t - t_sec) < min_distance_s:
if score > old_score:
candidates[idx] = (score, t_sec)
return sorted(candidates, key=lambda item: item[0], reverse=True)[:max_candidates]
candidates.append((score, t_sec))
return sorted(candidates, key=lambda item: item[0], reverse=True)[:max_candidates]
def run_global_scan(
beats: Sequence[TrailerBeat],
cfg: AppConfig,
scenes: Sequence | None = None,
seed_in_points: dict[int, Sequence[SeedPoint]] | None = None,
) -> list[MatchResult]:
logger.info('[Global Scan] Preparing templates for %d beats...', len(beats))
templates = []
midpoint_templates = []
beat_valid = []
for b in beats:
bf = grab_frame_at_path(cfg.paths.reference_trailer, b.start_s + (b.end_s - b.start_s)/2)
if bf is None:
midpoint_templates.append(None)
templates.append([])
beat_valid.append(False)
continue
midpoint_templates.append(_prepare_template(bf, cfg))
beat_templates = _prepare_beat_templates(b, cfg)
templates.append(beat_templates)
beat_valid.append(bool(beat_templates))
top_candidates: list[list[tuple[float, float]]] = [[] for _ in beats]
seed_candidates: list[list[tuple[float, float]]] = [[] for _ in beats]
has_weighted_seeds = False
for idx, beat in enumerate(beats):
for seed in (seed_in_points or {}).get(beat.beat_id, ()):
if isinstance(seed, tuple):
seed_t = float(seed[0])
seed_score = max(
cfg.cv.deep_scan.coarse_candidate_threshold,
min(0.99, float(seed[1])),
)
has_weighted_seeds = True
else:
seed_t = float(seed)
seed_score = cfg.cv.deep_scan.coarse_candidate_threshold
seed_candidate = (
seed_score,
max(0.0, seed_t),
)
seed_candidates[idx].append(seed_candidate)
top_candidates[idx] = _add_top_candidate(
top_candidates[idx],
seed_candidate[0],
seed_candidate[1],
max_candidates=cfg.cv.deep_scan.sequence_candidate_count,
min_distance_s=cfg.cv.deep_scan.sequence_min_distance_s,
)
if (seed_in_points or {}).get(beat.beat_id):
logger.info(
'Beat %d: added %d seeded in-point candidates.',
beat.beat_id,
len((seed_in_points or {}).get(beat.beat_id, ())),
)
skip_coarse_scan = (
cfg.vision.enabled
and cfg.cv.deep_scan.skip_coarse_scan_with_weighted_seeds
and has_weighted_seeds
and all(top_candidates[i] for i, valid in enumerate(beat_valid) if valid)
)
if skip_coarse_scan:
logger.info('[Global Scan] Weighted vision seeds present; skipping full FFmpeg coarse scan.')
else:
fps = 2.0
cmd = [
'ffmpeg', '-i', str(cfg.paths.source_movie),
'-vf', f'scale={cfg.video.proxy_width}:{cfg.video.proxy_height},fps={fps}',
'-f', 'image2pipe', '-vcodec', 'rawvideo', '-pix_fmt', 'bgr24', '-'
]
logger.info('[Global Scan] Streaming %s via FFmpeg (%.1f fps) ...', cfg.paths.source_movie.name, fps)
p = sp.Popen(cmd, stdout=sp.PIPE, stderr=sp.DEVNULL)
frame_size = cfg.video.proxy_width * cfg.video.proxy_height * 3
frame_idx = 0
start_t = time.time()
while True:
raw = p.stdout.read(frame_size)
if len(raw) != frame_size: break
frame = np.frombuffer(raw, dtype=np.uint8).reshape((cfg.video.proxy_height, cfg.video.proxy_width, 3))
haystack = _prepare_haystack(frame, cfg)
for i, beat_templates in enumerate(templates):
if not beat_valid[i]: continue
source_t = frame_idx / fps
for beat_offset_s, template in beat_templates:
res = cv2.matchTemplate(haystack, template, cv2.TM_CCOEFF_NORMED)
_, max_val, _, _ = cv2.minMaxLoc(res)
candidate_in_s = source_t - beat_offset_s
if candidate_in_s < 0.0:
continue
top_candidates[i] = _add_top_candidate(
top_candidates[i],
float(max_val),
candidate_in_s,
max_candidates=cfg.cv.deep_scan.sequence_candidate_count,
min_distance_s=cfg.cv.deep_scan.sequence_min_distance_s,
)
frame_idx += 1
if frame_idx % 1000 == 0:
logger.info('[Global Scan] Processed %d frames (%.1fs movie time)...', frame_idx, frame_idx / fps)
p.stdout.close()
p.wait()
logger.info('[Global Scan] Finished streaming %d frames in %.1fs.', frame_idx, time.time() - start_t)
results = []
source_info = get_video_info(cfg.paths.source_movie)
source_fps = float(source_info['fps']) or 24.0
for i, b in enumerate(beats):
if not beat_valid[i]: continue
candidates = top_candidates[i]
if not candidates:
continue
score = float(candidates[0][0])
if score >= cfg.cv.deep_scan.coarse_candidate_threshold:
matchable_duration_s = estimate_matchable_reference_duration(b, cfg)
logger.info(
'Beat %d: refining %d temporal candidates (best offset score %.3f, matchable %.2fs / beat %.2fs).',
b.beat_id,
len(candidates),
score,
matchable_duration_s,
b.duration_s,
)
best_result: MatchResult | None = None
best_short_result: MatchResult | None = None
best_short_coverage = -1.0
best_duration_coverage = -1.0
best_content_score = -1.0
rejected_short_candidates = 0
rejected_content_candidates = 0
scan_cfg = cfg.cv.deep_scan
content_gate = (
min(scan_cfg.provisional_content_threshold, cfg.vision.content_threshold)
if skip_coarse_scan and has_weighted_seeds
else scan_cfg.provisional_content_threshold
)
candidate_pool = candidates[:scan_cfg.content_rerank_candidate_count]
for seed_candidate in seed_candidates[i]:
candidate_pool = _add_top_candidate(
candidate_pool,
seed_candidate[0],
seed_candidate[1],
max_candidates=scan_cfg.content_rerank_candidate_count + len(seed_candidates[i]),
min_distance_s=scan_cfg.sequence_min_distance_s,
)
if skip_coarse_scan and has_weighted_seeds:
dense_candidates = _dense_weighted_seed_candidates(
b,
seed_candidates[i],
cfg,
scenes,
matchable_duration_s,
)
for dense_candidate in dense_candidates:
candidate_pool = _add_top_candidate(
candidate_pool,
dense_candidate[0],
dense_candidate[1],
max_candidates=(
scan_cfg.content_rerank_candidate_count
+ len(seed_candidates[i])
+ len(dense_candidates)
),
min_distance_s=max(0.04, cfg.vision.local_scan_step_s * 0.5),
)
reranked_candidates = _rerank_candidates_by_content(
b,
candidate_pool,
cfg,
scenes=scenes,
matchable_duration_s=matchable_duration_s,
)
refine_limit = (
min(scan_cfg.max_refine_candidates, cfg.vision.max_refine_candidates)
if skip_coarse_scan and has_weighted_seeds
else scan_cfg.max_refine_candidates
)
refine_candidates = [
(coarse_score, in_point_s)
for _, coarse_score, in_point_s in reranked_candidates[:refine_limit]
]
validation_templates = _prepare_validation_templates(b, cfg)
motion_templates = _prepare_motion_templates(b, cfg)
logger.info(
'Beat %d: content-reranked top %d / %d candidates.',
b.beat_id,
len(refine_candidates),
len(candidate_pool),
)
for coarse_score, coarse_in_s in refine_candidates:
rough_in_s = coarse_in_s
is_weighted_seed_candidate = (
skip_coarse_scan
and has_weighted_seeds
and coarse_score > scan_cfg.coarse_candidate_threshold + 0.05
)
if midpoint_templates[i] is not None and not is_weighted_seed_candidate:
midpoint_t = coarse_in_s + (b.duration_s / 2)
fine_t = refine_timestamp(midpoint_templates[i], midpoint_t, cfg)
rough_in_s = max(0.0, fine_t - (b.duration_s / 2))
local_align_window_s = (
min(cfg.vision.local_scan_step_s, cfg.cv.deep_scan.content_align_window_seconds)
if is_weighted_seed_candidate
else None
)
refined_in_s, sequence_score = refine_in_point_with_sequence(
b,
rough_in_s,
cfg,
search_window_s=local_align_window_s,
)
scene = _find_scene_for_time(scenes, refined_in_s, cfg)
scene_fps = _source_fps_from_scene(scene) if scene is not None else source_fps
adjusted_in_s = _apply_start_preroll(refined_in_s, scene_fps, cfg)
adjusted_in_s = _clamp_to_scene_start(adjusted_in_s, scene)
scene = _find_scene_for_time(scenes, adjusted_in_s, cfg)
usable_duration_s, span_score = estimate_usable_source_duration(b, adjusted_in_s, cfg)
out_s = adjusted_in_s + usable_duration_s
if scene is not None:
out_s = min(out_s, scene.end_s)
duration_s = max(0.0, out_s - adjusted_in_s)
duration_coverage = min(1.0, duration_s / matchable_duration_s) if matchable_duration_s > 0 else 0.0
with open_video(cfg.paths.source_movie) as validation_cap:
original_content_score = _fixed_content_sequence_score(
validation_cap,
adjusted_in_s,
validation_templates,
cfg,
)
content_score = original_content_score
content_in_s, align_content_score = align_in_point_by_content(
b,
adjusted_in_s,
cfg,
search_window_s=(
local_align_window_s
if local_align_window_s is not None
else min(0.8, cfg.cv.deep_scan.content_align_window_seconds)
),
)
if abs(content_in_s - adjusted_in_s) <= cfg.cv.deep_scan.content_align_window_seconds:
with open_video(cfg.paths.source_movie) as validation_cap:
aligned_content_score = _fixed_content_sequence_score(
validation_cap,
content_in_s,
validation_templates,
cfg,
)
if aligned_content_score >= original_content_score + 0.01:
adjusted_in_s = content_in_s
content_score = min(align_content_score, aligned_content_score)
scene = _find_scene_for_time(scenes, adjusted_in_s, cfg)
usable_duration_s = max(0.0, duration_s)
out_s = adjusted_in_s + usable_duration_s
if scene is not None:
out_s = min(out_s, scene.end_s)
duration_s = max(0.0, out_s - adjusted_in_s)
duration_coverage = (
min(1.0, duration_s / matchable_duration_s)
if matchable_duration_s > 0 else 0.0
)
motion_score = 0.0
if len(motion_templates) >= 2:
with open_video(cfg.paths.source_movie) as motion_cap:
motion_score = _motion_phase_score(
motion_cap,
adjusted_in_s,
motion_templates,
cfg,
)
if is_weighted_seed_candidate and scene is not None and content_score >= content_gate:
contiguous_usable_s = _contiguous_scene_coverage_duration(
b,
adjusted_in_s,
scenes,
matchable_duration_s,
cfg,
)
scene_duration_s = min(b.duration_s, contiguous_usable_s)
if scene_duration_s > duration_s:
usable_duration_s = scene_duration_s
out_s = adjusted_in_s + usable_duration_s
duration_s = usable_duration_s
duration_coverage = (
min(1.0, duration_s / matchable_duration_s)
if matchable_duration_s > 0 else 0.0
)
span_score = max(span_score, content_score)
final_score = (
sequence_score * scan_cfg.sequence_score_weight
+ span_score * scan_cfg.span_score_weight
+ coarse_score * scan_cfg.coarse_score_weight
+ duration_coverage * scan_cfg.duration_score_weight
)
final_score = (
final_score * (1.0 - scan_cfg.content_validation_weight)
+ content_score * scan_cfg.content_validation_weight
)
if len(motion_templates) >= 2:
motion_score_clamped = max(0.0, min(1.0, motion_score))
final_score = final_score * 0.82 + motion_score_clamped * 0.18
if is_weighted_seed_candidate:
vision_provisional_score = (
content_score * 0.45
+ duration_coverage * 0.33
+ coarse_score * 0.12
+ max(0.0, min(1.0, motion_score)) * 0.10
)
final_score = max(final_score, vision_provisional_score)
if content_score < scan_cfg.match_threshold and not is_weighted_seed_candidate:
final_score = min(final_score, content_score)
if content_score < content_gate:
logger.debug(
'Beat %d rejected by content validation in=%.3fs scene=%s content=%.3f min=%.3f',
b.beat_id,
adjusted_in_s,
scene.scene_id if scene is not None else 'none',
content_score,
content_gate,
)
rejected_content_candidates += 1
continue
candidate_result = MatchResult(
beat_id=b.beat_id,
scene_id=scene.scene_id if scene is not None else 0,
source_path=cfg.paths.source_movie,
in_point_s=max(0.0, adjusted_in_s),
out_point_s=out_s,
in_point_frame=int(max(0.0, adjusted_in_s) * source_fps),
match_score=final_score,
)
if duration_coverage < scan_cfg.min_duration_coverage:
rejected_short_candidates += 1
logger.debug(
'Beat %d short candidate in=%.3fs scene=%s sequence=%.3f span=%.3f coarse=%.3f content=%.3f motion=%.3f coverage=%.2f final=%.3f',
b.beat_id,
adjusted_in_s,
scene.scene_id if scene is not None else 'none',
sequence_score,
span_score,
coarse_score,
content_score,
motion_score,
duration_coverage,
final_score,
)
long_enough_for_review = duration_s >= max(0.5, matchable_duration_s * 0.45)
visually_plausible = (
sequence_score >= scan_cfg.provisional_match_threshold
or final_score >= scan_cfg.provisional_match_threshold
)
if long_enough_for_review and visually_plausible:
if (
best_short_result is None
or candidate_result.match_score
> best_short_result.match_score + scan_cfg.duration_tie_break_score_delta
or (
candidate_result.match_score
>= best_short_result.match_score - scan_cfg.duration_tie_break_score_delta
and duration_coverage > best_short_coverage
)
):
best_short_result = candidate_result
best_short_coverage = duration_coverage
continue
logger.debug(
'Beat %d candidate in=%.3fs scene=%s sequence=%.3f span=%.3f coarse=%.3f content=%.3f motion=%.3f coverage=%.2f final=%.3f',
b.beat_id,
adjusted_in_s,
scene.scene_id if scene is not None else 'none',
sequence_score,
span_score,
coarse_score,
content_score,
motion_score,
duration_coverage,
final_score,
)
clearly_better_score = (
best_result is None
or candidate_result.match_score
> best_result.match_score + scan_cfg.duration_tie_break_score_delta
)
similar_score_better_duration = (
best_result is not None
and candidate_result.match_score
>= best_result.match_score - scan_cfg.duration_tie_break_score_delta
and duration_coverage > best_duration_coverage + 0.03
)
similar_vision_score_earlier_phase = (
is_weighted_seed_candidate
and best_result is not None
and candidate_result.scene_id == best_result.scene_id
and candidate_result.match_score
>= best_result.match_score - cfg.vision.local_scan_tie_break_score_delta
and content_score >= best_content_score - 0.005
and duration_coverage >= best_duration_coverage - 0.03
and candidate_result.in_point_s < best_result.in_point_s
)
similar_vision_score_better_phase = (
is_weighted_seed_candidate
and best_result is not None
and candidate_result.scene_id == best_result.scene_id
and candidate_result.match_score
>= best_result.match_score - cfg.vision.local_scan_tie_break_score_delta
and content_score > best_content_score + 0.008
and duration_coverage >= best_duration_coverage - 0.03
)
if (
clearly_better_score
or similar_score_better_duration
or similar_vision_score_earlier_phase
or similar_vision_score_better_phase
):
best_result = candidate_result
best_duration_coverage = duration_coverage
best_content_score = content_score
if best_result is None:
if best_short_result is not None:
logger.warning(
'Beat %d: using short provisional automatic match scene=%d in=%.3fs dur=%.3fs coverage=%.2f score=%.3f',
b.beat_id,
best_short_result.scene_id,
best_short_result.in_point_s,
best_short_result.duration_s,
best_short_coverage,
best_short_result.match_score,
)
best_result = best_short_result
best_duration_coverage = best_short_coverage
else:
if rejected_content_candidates > 0 and rejected_short_candidates == 0:
logger.warning(
'Beat %d: NO MATCH after refinement (%d candidates rejected by content validation)',
b.beat_id,
rejected_content_candidates,
)
else:
logger.warning(
'Beat %d: NO MATCH after refinement (%d candidates rejected below %.0f%% duration coverage, %d by content validation)',
b.beat_id,
rejected_short_candidates,
scan_cfg.min_duration_coverage * 100.0,
rejected_content_candidates,
)
continue
is_confirmed = best_result.match_score >= cfg.cv.deep_scan.match_threshold
if best_result.match_score < cfg.cv.deep_scan.provisional_match_threshold:
logger.warning(
'Beat %d: NO MATCH after refinement (best final score %.3f, provisional threshold %.3f)',
b.beat_id,
best_result.match_score,
cfg.cv.deep_scan.provisional_match_threshold,
)
continue
if not is_confirmed:
logger.warning(
'Beat %d: provisional automatic match scene=%d in=%.3fs score=%.3f (confirmed threshold %.3f)',
b.beat_id,
best_result.scene_id,
best_result.in_point_s,
best_result.match_score,
cfg.cv.deep_scan.match_threshold,
)
logger.info(
'Beat %d: best automatic match scene=%d in=%.3fs dur=%.3fs coverage=%.2f score=%.3f',
b.beat_id,
best_result.scene_id,
best_result.in_point_s,
best_result.duration_s,
best_duration_coverage,
best_result.match_score,
)
results.append(MatchResult(
beat_id=b.beat_id,
scene_id=best_result.scene_id,
source_path=cfg.paths.source_movie,
in_point_s=best_result.in_point_s,
out_point_s=best_result.out_point_s,
in_point_frame=best_result.in_point_frame,
match_score=best_result.match_score,
is_confirmed=is_confirmed,
))
else:
logger.warning(
'Beat %d: NO MATCH (best coarse score %.3f, coarse threshold %.3f)',
b.beat_id,
score,
cfg.cv.deep_scan.coarse_candidate_threshold,
)
if skip_coarse_scan and not results and cfg.vision.fullscan_fallback:
logger.warning(
'[Global Scan] Weighted vision-seed pass found no valid matches; retrying with full FFmpeg coarse scan.'
)
retry_cfg = replace(
cfg,
cv=replace(
cfg.cv,
deep_scan=replace(cfg.cv.deep_scan, skip_coarse_scan_with_weighted_seeds=False),
),
)
return run_global_scan(beats, retry_cfg, scenes=scenes, seed_in_points=seed_in_points)
return results