Add hi-res phase refinement for intra-scene phase matching (Beat 03 investigation)

This commit is contained in:
Melbar
2026-05-08 10:52:11 +02:00
parent 18c8c89ee6
commit f20f89b06b
+355 -16
View File
@@ -198,6 +198,158 @@ def _fixed_content_features(frame: np.ndarray, cfg: AppConfig) -> tuple[np.ndarr
)
def _hires_phase_feature(frame: np.ndarray) -> np.ndarray:
"""High-resolution normalised luma feature for intra-scene phase matching.
Standard pipeline features (160×80) lose the subtle pixel differences
between talking-head phases (mouth open vs. closed). This 320×160 feature
with an 8×8 spatial histogram grid provides enough spatial resolution to
discriminate facial expression phases within a single continuous scene.
"""
trimmed = _trim_dark_borders(frame)
h, w = trimmed.shape[:2]
cropped = trimmed[int(h * 0.05):int(h * 0.95), :]
gray = cv2.cvtColor(cropped, cv2.COLOR_BGR2GRAY)
gray = cv2.equalizeHist(gray)
resized = cv2.resize(gray, (320, 160), interpolation=cv2.INTER_AREA)
return resized
def _hires_spatial_hist(frame_feature: np.ndarray) -> np.ndarray:
"""8×8 grid spatial colour histogram from a hi-res luma feature."""
h, w = frame_feature.shape[:2]
grid = 8
cell_h = h // grid
cell_w = w // grid
parts: list[np.ndarray] = []
for gy in range(grid):
for gx in range(grid):
cell = frame_feature[gy * cell_h:(gy + 1) * cell_h,
gx * cell_w:(gx + 1) * cell_w]
hist = cv2.calcHist([cell], [0], None, [24], [0, 256]).astype(np.float32).flatten()
parts.append(hist / (float(np.sum(hist)) + 1e-6))
return np.concatenate(parts)
def _hires_phase_score(
ref_feature: np.ndarray,
ref_spatial: np.ndarray,
src_frame: np.ndarray,
) -> float:
"""Compare a source frame to a reference using hi-res phase features.
Uses three signals:
1. Full-frame NCC for overall similarity
2. Center-crop NCC for face/expression matching (key for talking heads)
3. Spatial histogram for structural layout
"""
src_feat = _hires_phase_feature(src_frame)
# Full-frame NCC
ncc_full = float(cv2.matchTemplate(
src_feat, ref_feature, cv2.TM_CCOEFF_NORMED
)[0][0])
# Center-crop NCC (face region — the center 40% of the frame)
h, w = ref_feature.shape[:2]
cy, cx = h // 2, w // 2
ch, cw = int(h * 0.20), int(w * 0.20)
ref_center = ref_feature[cy - ch:cy + ch, cx - cw:cx + cw]
src_center = src_feat[cy - ch:cy + ch, cx - cw:cx + cw]
ncc_center = float(cv2.matchTemplate(
src_center, ref_center, cv2.TM_CCOEFF_NORMED
)[0][0])
# Spatial histogram similarity
src_spatial = _hires_spatial_hist(src_feat)
spatial = _hist_intersection(ref_spatial, src_spatial)
return ncc_full * 0.25 + ncc_center * 0.45 + spatial * 0.30
def _hires_phase_refine(
beat: TrailerBeat,
in_point_s: float,
scene_start_s: float,
scene_end_s: float,
cfg: AppConfig,
) -> float:
"""Re-scan the full source scene at high resolution to correct phase.
This is applied as a final refinement step after the standard pipeline
has identified the correct scene. It addresses the case where low-res
features cannot distinguish between different phases of the same shot
(e.g. mouth open vs. closed in a talking-head close-up).
"""
# Build hi-res templates from only the stable, bright reference frames
# before any fade begins. Fading frames have dropping brightness that
# would penalise correct source positions where those offsets map to
# bright content in the source.
matchable_s = estimate_matchable_reference_duration(beat, cfg, sample_step_s=0.04)
ref_templates: list[tuple[float, np.ndarray, np.ndarray, float]] = []
step_s = max(1.0 / cfg.export.edl_frame_rate, 0.04)
t = 0.0
while t <= matchable_s:
frame = grab_frame_at_path(beat.trailer_path, beat.start_s + t)
if frame is not None and _is_scoreable_reference_frame(frame, cfg):
mean_l, p90_l, contrast = _reference_visibility_stats(frame, cfg)
# Only use clearly visible frames (skip dimming fade frames)
if mean_l >= 50.0 and contrast >= 40.0:
feat = _hires_phase_feature(frame)
spatial = _hires_spatial_hist(feat)
ref_templates.append((t, feat, spatial, mean_l))
t = round(t + step_s, 6)
if not ref_templates:
return in_point_s
# For very short matchable durations (fast fades / cross-dissolves),
# keep only the brightest template. When the beat fades quickly the
# later templates are dim and penalise every bright source candidate
# equally, destroying phase discrimination. A single bright anchor
# gives maximum selectivity.
if matchable_s < 1.0 and len(ref_templates) > 1:
ref_templates.sort(key=lambda x: x[3], reverse=True)
ref_templates = ref_templates[:1]
logger.debug(
'Beat %d: hi-res using single brightest template at offset %.3fs (luma %.1f)',
beat.beat_id, ref_templates[0][0], ref_templates[0][3],
)
# Strip the luma field for the scan loop
scan_templates = [(off, feat, sp) for off, feat, sp, _ in ref_templates]
max_ref_offset = max(off for off, _, _ in scan_templates)
# Scan the full scene
best_t = in_point_s
best_score = -1.0
scan_step_s = max(1.0 / (cfg.export.edl_frame_rate or 24.0), 0.04)
with open_video(cfg.paths.source_movie) as cap:
t = scene_start_s
while t + max_ref_offset <= scene_end_s:
scores: list[float] = []
all_ok = True
for off, ref_feat, ref_spatial in scan_templates:
src_frame = grab_frame_at(cap, t + off)
if src_frame is None:
all_ok = False
break
scores.append(_hires_phase_score(ref_feat, ref_spatial, src_frame))
if all_ok and scores:
avg = sum(scores) / len(scores)
combined = avg * 0.7 + min(scores) * 0.3
if combined > best_score:
best_score = combined
best_t = t
t = round(t + scan_step_s, 6)
if best_t != in_point_s:
logger.info(
'Beat %d: hi-res phase refine moved in-point %.3fs -> %.3fs '
'(delta=%.3fs, score=%.4f)',
beat.beat_id, in_point_s, best_t,
best_t - in_point_s, best_score,
)
return best_t
def _fixed_content_pair_score(
ref_features: tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray],
source_frame: np.ndarray,
@@ -388,12 +540,36 @@ def _rerank_candidates_by_content(
reranked: list[tuple[float, float, float]] = []
with open_video(cfg.paths.source_movie) as cap:
for coarse_score, t_sec in candidates:
content_score = _fixed_content_sequence_score(cap, t_sec, templates, cfg)
# If the candidate lands just before a scene boundary, also evaluate
# the start of the next scene. A coarse-scan offset can place the
# in-point a few frames into the preceding (wrong) scene, causing
# the content and coverage scores to be artificially low even though
# the next scene is the correct visual match.
eval_t = t_sec
if scenes is not None:
cur_scene = _find_scene_for_time(scenes, t_sec, cfg)
if cur_scene is not None:
remaining = float(cur_scene.end_s) - t_sec
next_idx = next(
(i + 1 for i, s in enumerate(scenes) if s.scene_id == cur_scene.scene_id),
None,
)
if (
remaining < cfg.cv.deep_scan.scene_boundary_epsilon_s * 4
and next_idx is not None
and next_idx < len(scenes)
):
next_scene_start = float(scenes[next_idx].start_s)
alt_content = _fixed_content_sequence_score(cap, next_scene_start, templates, cfg)
cur_content = _fixed_content_sequence_score(cap, t_sec, templates, cfg)
if alt_content > cur_content:
eval_t = next_scene_start
content_score = _fixed_content_sequence_score(cap, eval_t, templates, cfg)
coverage_score = 1.0
if scenes is not None and matchable_duration_s and matchable_duration_s > 0:
usable_s = _contiguous_scene_coverage_duration(
beat,
t_sec,
eval_t,
scenes,
matchable_duration_s,
cfg,
@@ -404,7 +580,7 @@ def _rerank_candidates_by_content(
+ coarse_score * 0.18
+ coverage_score * 0.20
)
reranked.append((rank_score, coarse_score, t_sec))
reranked.append((rank_score, coarse_score, eval_t))
return sorted(reranked, key=lambda item: item[0], reverse=True)
@@ -772,6 +948,8 @@ def _content_alignment_score(
in_point_s: float,
templates: list[tuple[float, np.ndarray]],
cfg: AppConfig,
fps: float | None = None,
frame_cache: dict[int, np.ndarray] | None = None,
) -> float:
if not templates:
return -1.0
@@ -782,7 +960,13 @@ def _content_alignment_score(
early_scores: list[float] = []
for offset_s, template in templates:
frame = grab_frame_at(cap, in_point_s + offset_s)
t0 = in_point_s + offset_s
if frame_cache is not None and fps is not None:
idx = int(round(t0 * fps))
frame = frame_cache.get(idx)
else:
frame = grab_frame_at(cap, t0)
if frame is None:
return -1.0
@@ -840,6 +1024,20 @@ def align_in_point_by_content(
end_s = estimated_in_point_s + window_s
tie_delta = cfg.cv.deep_scan.start_tie_break_score_delta
min_offset = min(off for off, _ in templates)
max_offset = max(off for off, _ in templates)
req_start_s = max(0.0, start_s + min_offset - frame_step_s)
req_end_s = end_s + max_offset + frame_step_s
frame_cache = {}
t_req = req_start_s
while t_req <= req_end_s:
idx = int(round(t_req * fps))
frame = grab_frame_at(cap, t_req)
if frame is not None:
frame_cache[idx] = frame
t_req = round(t_req + frame_step_s, 6)
best_in = estimated_in_point_s
best_score = -1.0
t = start_s
@@ -852,7 +1050,7 @@ def align_in_point_by_content(
active_templates = []
else:
active_templates = templates
score = _content_alignment_score(cap, t, active_templates, cfg) if active_templates else -1.0
score = _content_alignment_score(cap, t, active_templates, cfg, fps=fps, frame_cache=frame_cache) if active_templates else -1.0
if score > best_score + tie_delta:
best_score = score
best_in = t
@@ -868,11 +1066,23 @@ def _motion_phase_score(
in_point_s: float,
motion_templates: list[tuple[float, float, np.ndarray, tuple[int, ...]]],
cfg: AppConfig,
fps: float | None = None,
frame_cache: dict[int, np.ndarray] | None = None,
) -> float:
scores: list[float] = []
for offset_s, step_s, ref_delta, template_shape in motion_templates:
f0 = grab_frame_at(cap, in_point_s + offset_s)
f1 = grab_frame_at(cap, in_point_s + offset_s + step_s)
t0 = in_point_s + offset_s
t1 = in_point_s + offset_s + step_s
if frame_cache is not None and fps is not None:
idx0 = int(round(t0 * fps))
idx1 = int(round(t1 * fps))
f0 = frame_cache.get(idx0)
f1 = frame_cache.get(idx1)
else:
f0 = grab_frame_at(cap, t0)
f1 = grab_frame_at(cap, t1)
if f0 is None or f1 is None:
return -1.0
src0 = _fixed_feature(f0, template_shape, cfg)
@@ -913,11 +1123,25 @@ def align_in_point_by_motion(
end_s = estimated_in_point_s + window_s
tie_delta = cfg.cv.deep_scan.start_tie_break_score_delta
min_offset = min(off for off, _, _, _ in motion_templates)
max_offset = max(off + step for off, step, _, _ in motion_templates)
req_start_s = max(0.0, start_s + min_offset - frame_step_s)
req_end_s = end_s + max_offset + frame_step_s
frame_cache = {}
t_req = req_start_s
while t_req <= req_end_s:
idx = int(round(t_req * fps))
frame = grab_frame_at(cap, t_req)
if frame is not None:
frame_cache[idx] = frame
t_req = round(t_req + frame_step_s, 6)
best_in = estimated_in_point_s
best_score = -1.0
t = start_s
while t <= end_s:
score = _motion_phase_score(cap, t, motion_templates, cfg)
score = _motion_phase_score(cap, t, motion_templates, cfg, fps=fps, frame_cache=frame_cache)
if score > best_score + tie_delta:
best_score = score
best_in = t
@@ -933,6 +1157,7 @@ def align_in_point_by_content_and_motion(
estimated_in_point_s: float,
cfg: AppConfig,
search_window_s: float | None = None,
scene_end_s: float | None = None,
) -> tuple[float, float, float, float]:
"""
Align a candidate using still-frame content and motion phase together.
@@ -959,23 +1184,57 @@ def align_in_point_by_content_and_motion(
end_s = estimated_in_point_s + window_s
tie_delta = cfg.cv.deep_scan.start_tie_break_score_delta
min_t_offset = min(off for off, _ in templates) if templates else 0.0
max_t_offset = max(off for off, _ in templates) if templates else 0.0
min_m_offset = min(off for off, _, _, _ in motion_templates) if motion_templates else 0.0
max_m_offset = max(off + step for off, step, _, _ in motion_templates) if motion_templates else 0.0
min_offset = min(min_t_offset, min_m_offset)
max_offset = max(max_t_offset, max_m_offset)
req_start_s = max(0.0, start_s + min_offset - frame_step_s)
req_end_s = end_s + max_offset + frame_step_s
frame_cache = {}
t_req = req_start_s
while t_req <= req_end_s:
idx = int(round(t_req * fps))
frame = grab_frame_at(cap, t_req)
if frame is not None:
frame_cache[idx] = frame
t_req = round(t_req + frame_step_s, 6)
best_in = estimated_in_point_s
best_score = -1.0
best_content = -1.0
best_motion = -1.0
t = start_s
while t <= end_s:
content_score = _content_alignment_score(cap, t, templates, cfg)
if scene_end_s is not None:
avail_s = scene_end_s - t
if avail_s > 0:
active_templates = [(off, tpl) for off, tpl in templates if off <= avail_s]
active_motion = [(off, step, delta, shape) for off, step, delta, shape in motion_templates if off + step <= avail_s]
else:
active_templates = []
active_motion = []
else:
active_templates = templates
active_motion = motion_templates
content_score = _content_alignment_score(cap, t, active_templates, cfg, fps=fps, frame_cache=frame_cache) if active_templates else -1.0
motion_score = (
_motion_phase_score(cap, t, motion_templates, cfg)
if len(motion_templates) >= 2
_motion_phase_score(cap, t, active_motion, cfg, fps=fps, frame_cache=frame_cache)
if len(active_motion) >= 2
else content_score
)
if content_score < 0 or motion_score < 0:
t = round(t + frame_step_s, 6)
continue
raw_score = content_score * 0.64 + motion_score * 0.36
anchor_penalty = min(0.18, abs(t - estimated_in_point_s) * 0.05)
# The previous anchor_penalty of 0.05 per second was stronger than the
# actual variance in raw_score, preventing phase correction. We reduce it
# so that it only acts as a tie-breaker.
anchor_penalty = min(0.18, abs(t - estimated_in_point_s) * 0.005)
score = raw_score - anchor_penalty
if score > best_score + tie_delta:
best_score = score
@@ -1027,6 +1286,18 @@ def estimate_usable_source_duration(
frame = grab_frame_at(cap, in_point_s + offset_s)
if frame is None:
break
# If the template is scoreable (has content) but the source frame is dark,
# this is a bad match. We should not let dark source frames
# provide high correlation to dark templates.
# templates are already pre-processed into feature images (grayscale/edges),
# so we can't use _is_scoreable_reference_frame on them directly.
# Instead, we rely on the fact that _prepare_beat_templates already
# filtered out non-scoreable frames.
if _is_dark_reference_frame(frame, cfg):
scores.append((offset_s, 0.0))
continue
scores.append((offset_s, _match_score(frame, template, cfg)))
if not scores:
@@ -1038,12 +1309,14 @@ def estimate_usable_source_duration(
last_good = 0.0
bad_run = 0
bad_run_start_offset: float | None = None
good_scores: list[float] = []
for offset_s, score in scores:
if score >= min_score:
last_good = offset_s
bad_run = 0
bad_run_start_offset = None
good_scores.append(score)
continue
@@ -1051,7 +1324,34 @@ def estimate_usable_source_duration(
continue
bad_run += 1
if bad_run_start_offset is None:
bad_run_start_offset = offset_s
if bad_run >= 3:
# Before killing the span, check whether the remaining scores form a
# stable plateau. This handles scenes where a grading/exposure
# difference between trailer and source causes a gradual score drop
# rather than a hard cut. A genuine cut produces chaotic scores;
# a grading mismatch produces a flat, low-but-consistent plateau.
# Conditions: low variance (std < 0.025), scores above pure-black
# (mean > 0.20), and the warmup baseline was meaningful (>= 0.30).
tail_scores = [s for o, s in scores if o >= bad_run_start_offset]
if (
len(tail_scores) >= 3
and float(np.std(tail_scores)) < 0.025
and float(np.mean(tail_scores)) > 0.20
and baseline >= 0.30
):
logger.debug(
'Beat %d: stable plateau detected at offset %.3fs '
'(tail mean=%.3f std=%.3f) — extending span to full duration.',
beat.beat_id, bad_run_start_offset,
float(np.mean(tail_scores)), float(np.std(tail_scores)),
)
last_good = scores[-1][0]
good_scores.extend(tail_scores)
break
logger.debug('Beat %d: Match died at offset %.3fs. Score %.3f < min_score %.3f. Bad run count: %d',
beat.beat_id, offset_s, score, min_score, bad_run)
break
tail_safety_s = max(0.0, cfg.cv.deep_scan.trim_tail_frames / source_fps)
@@ -1113,7 +1413,10 @@ def refine_in_point_with_sequence(
Returns:
(best_in_point_s, sequence_score)
"""
return align_in_point_by_content(beat, estimated_in_point_s, cfg, search_window_s, scene_end_s)
best_in, best_score, _, _ = align_in_point_by_content_and_motion(
beat, estimated_in_point_s, cfg, search_window_s, scene_end_s
)
return best_in, best_score
def _find_scene_for_time(scenes: Sequence | None, t_sec: float, cfg: AppConfig):
@@ -1451,7 +1754,7 @@ def run_global_scan(
max_source_duration_s=duration_s if rough_scene_end_s is not None else None,
)
content_score = original_content_score
content_in_s, align_content_score = align_in_point_by_content(
content_in_s, _, align_content_score, _ = align_in_point_by_content_and_motion(
b,
adjusted_in_s,
cfg,
@@ -1495,7 +1798,7 @@ def run_global_scan(
cfg,
)
motion_in_s, align_motion_score = align_in_point_by_motion(
motion_in_s, _, _, align_motion_score = align_in_point_by_content_and_motion(
b,
adjusted_in_s,
cfg,
@@ -1504,6 +1807,7 @@ def run_global_scan(
if local_align_window_s is not None
else min(1.0, cfg.cv.deep_scan.content_align_window_seconds)
),
scene_end_s=rough_scene_end_s,
)
if align_motion_score >= original_motion_score + 0.015:
@@ -1561,7 +1865,12 @@ def run_global_scan(
)
if len(motion_templates) >= 2:
motion_score_clamped = max(0.0, min(1.0, motion_score))
final_score = final_score * 0.82 + motion_score_clamped * 0.18
blended = final_score * 0.82 + motion_score_clamped * 0.18
# Do not let motion blending drag the score below the
# content-validated level. A weak motion score often just
# means the shot contains a camera pan or slow zoom; it
# should not veto an otherwise well-supported content match.
final_score = max(blended, final_score - 0.015)
if is_weighted_seed_candidate:
vision_provisional_score = (
content_score * 0.45
@@ -1741,6 +2050,36 @@ def run_global_scan(
best_result.match_score,
)
# Final hi-res phase refinement: scan the full source scene at
# higher resolution to correct phase mismatches that the standard
# 160×80 features cannot resolve (e.g. talking-head close-ups).
final_in_s = best_result.in_point_s
final_scene = _find_scene_for_time(scenes, final_in_s, cfg)
if final_scene is not None:
refined_phase_in_s = _hires_phase_refine(
b,
final_in_s,
float(final_scene.start_s),
float(final_scene.end_s),
cfg,
)
if refined_phase_in_s != final_in_s:
final_in_s = refined_phase_in_s
# Recompute out-point preserving the duration
final_out_s = final_in_s + best_result.duration_s
if final_scene is not None:
final_out_s = min(final_out_s, float(final_scene.end_s))
best_result = MatchResult(
beat_id=b.beat_id,
scene_id=best_result.scene_id,
source_path=cfg.paths.source_movie,
in_point_s=final_in_s,
out_point_s=final_out_s,
in_point_frame=int(final_in_s * source_fps),
match_score=best_result.match_score,
is_confirmed=is_confirmed,
)
results.append(MatchResult(
beat_id=b.beat_id,
scene_id=best_result.scene_id,