From f20f89b06b69b435188500f4571a97ad9be28819 Mon Sep 17 00:00:00 2001 From: Melbar Date: Fri, 8 May 2026 10:52:11 +0200 Subject: [PATCH] Add hi-res phase refinement for intra-scene phase matching (Beat 03 investigation) --- src/cv/global_scan.py | 371 ++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 355 insertions(+), 16 deletions(-) diff --git a/src/cv/global_scan.py b/src/cv/global_scan.py index 1b97b52..926a346 100644 --- a/src/cv/global_scan.py +++ b/src/cv/global_scan.py @@ -198,6 +198,158 @@ def _fixed_content_features(frame: np.ndarray, cfg: AppConfig) -> tuple[np.ndarr ) +def _hires_phase_feature(frame: np.ndarray) -> np.ndarray: + """High-resolution normalised luma feature for intra-scene phase matching. + + Standard pipeline features (160×80) lose the subtle pixel differences + between talking-head phases (mouth open vs. closed). This 320×160 feature + with an 8×8 spatial histogram grid provides enough spatial resolution to + discriminate facial expression phases within a single continuous scene. + """ + trimmed = _trim_dark_borders(frame) + h, w = trimmed.shape[:2] + cropped = trimmed[int(h * 0.05):int(h * 0.95), :] + gray = cv2.cvtColor(cropped, cv2.COLOR_BGR2GRAY) + gray = cv2.equalizeHist(gray) + resized = cv2.resize(gray, (320, 160), interpolation=cv2.INTER_AREA) + return resized + + +def _hires_spatial_hist(frame_feature: np.ndarray) -> np.ndarray: + """8×8 grid spatial colour histogram from a hi-res luma feature.""" + h, w = frame_feature.shape[:2] + grid = 8 + cell_h = h // grid + cell_w = w // grid + parts: list[np.ndarray] = [] + for gy in range(grid): + for gx in range(grid): + cell = frame_feature[gy * cell_h:(gy + 1) * cell_h, + gx * cell_w:(gx + 1) * cell_w] + hist = cv2.calcHist([cell], [0], None, [24], [0, 256]).astype(np.float32).flatten() + parts.append(hist / (float(np.sum(hist)) + 1e-6)) + return np.concatenate(parts) + + +def _hires_phase_score( + ref_feature: np.ndarray, + ref_spatial: np.ndarray, + src_frame: np.ndarray, +) -> float: + """Compare a source frame to a reference using hi-res phase features. + + Uses three signals: + 1. Full-frame NCC for overall similarity + 2. Center-crop NCC for face/expression matching (key for talking heads) + 3. Spatial histogram for structural layout + """ + src_feat = _hires_phase_feature(src_frame) + # Full-frame NCC + ncc_full = float(cv2.matchTemplate( + src_feat, ref_feature, cv2.TM_CCOEFF_NORMED + )[0][0]) + # Center-crop NCC (face region — the center 40% of the frame) + h, w = ref_feature.shape[:2] + cy, cx = h // 2, w // 2 + ch, cw = int(h * 0.20), int(w * 0.20) + ref_center = ref_feature[cy - ch:cy + ch, cx - cw:cx + cw] + src_center = src_feat[cy - ch:cy + ch, cx - cw:cx + cw] + ncc_center = float(cv2.matchTemplate( + src_center, ref_center, cv2.TM_CCOEFF_NORMED + )[0][0]) + # Spatial histogram similarity + src_spatial = _hires_spatial_hist(src_feat) + spatial = _hist_intersection(ref_spatial, src_spatial) + return ncc_full * 0.25 + ncc_center * 0.45 + spatial * 0.30 + + +def _hires_phase_refine( + beat: TrailerBeat, + in_point_s: float, + scene_start_s: float, + scene_end_s: float, + cfg: AppConfig, +) -> float: + """Re-scan the full source scene at high resolution to correct phase. + + This is applied as a final refinement step after the standard pipeline + has identified the correct scene. It addresses the case where low-res + features cannot distinguish between different phases of the same shot + (e.g. mouth open vs. closed in a talking-head close-up). + """ + # Build hi-res templates from only the stable, bright reference frames + # before any fade begins. Fading frames have dropping brightness that + # would penalise correct source positions where those offsets map to + # bright content in the source. + matchable_s = estimate_matchable_reference_duration(beat, cfg, sample_step_s=0.04) + ref_templates: list[tuple[float, np.ndarray, np.ndarray, float]] = [] + step_s = max(1.0 / cfg.export.edl_frame_rate, 0.04) + t = 0.0 + while t <= matchable_s: + frame = grab_frame_at_path(beat.trailer_path, beat.start_s + t) + if frame is not None and _is_scoreable_reference_frame(frame, cfg): + mean_l, p90_l, contrast = _reference_visibility_stats(frame, cfg) + # Only use clearly visible frames (skip dimming fade frames) + if mean_l >= 50.0 and contrast >= 40.0: + feat = _hires_phase_feature(frame) + spatial = _hires_spatial_hist(feat) + ref_templates.append((t, feat, spatial, mean_l)) + t = round(t + step_s, 6) + + if not ref_templates: + return in_point_s + + # For very short matchable durations (fast fades / cross-dissolves), + # keep only the brightest template. When the beat fades quickly the + # later templates are dim and penalise every bright source candidate + # equally, destroying phase discrimination. A single bright anchor + # gives maximum selectivity. + if matchable_s < 1.0 and len(ref_templates) > 1: + ref_templates.sort(key=lambda x: x[3], reverse=True) + ref_templates = ref_templates[:1] + logger.debug( + 'Beat %d: hi-res using single brightest template at offset %.3fs (luma %.1f)', + beat.beat_id, ref_templates[0][0], ref_templates[0][3], + ) + + # Strip the luma field for the scan loop + scan_templates = [(off, feat, sp) for off, feat, sp, _ in ref_templates] + max_ref_offset = max(off for off, _, _ in scan_templates) + + # Scan the full scene + best_t = in_point_s + best_score = -1.0 + scan_step_s = max(1.0 / (cfg.export.edl_frame_rate or 24.0), 0.04) + + with open_video(cfg.paths.source_movie) as cap: + t = scene_start_s + while t + max_ref_offset <= scene_end_s: + scores: list[float] = [] + all_ok = True + for off, ref_feat, ref_spatial in scan_templates: + src_frame = grab_frame_at(cap, t + off) + if src_frame is None: + all_ok = False + break + scores.append(_hires_phase_score(ref_feat, ref_spatial, src_frame)) + if all_ok and scores: + avg = sum(scores) / len(scores) + combined = avg * 0.7 + min(scores) * 0.3 + if combined > best_score: + best_score = combined + best_t = t + t = round(t + scan_step_s, 6) + + if best_t != in_point_s: + logger.info( + 'Beat %d: hi-res phase refine moved in-point %.3fs -> %.3fs ' + '(delta=%.3fs, score=%.4f)', + beat.beat_id, in_point_s, best_t, + best_t - in_point_s, best_score, + ) + return best_t + + def _fixed_content_pair_score( ref_features: tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray], source_frame: np.ndarray, @@ -388,12 +540,36 @@ def _rerank_candidates_by_content( reranked: list[tuple[float, float, float]] = [] with open_video(cfg.paths.source_movie) as cap: for coarse_score, t_sec in candidates: - content_score = _fixed_content_sequence_score(cap, t_sec, templates, cfg) + # If the candidate lands just before a scene boundary, also evaluate + # the start of the next scene. A coarse-scan offset can place the + # in-point a few frames into the preceding (wrong) scene, causing + # the content and coverage scores to be artificially low even though + # the next scene is the correct visual match. + eval_t = t_sec + if scenes is not None: + cur_scene = _find_scene_for_time(scenes, t_sec, cfg) + if cur_scene is not None: + remaining = float(cur_scene.end_s) - t_sec + next_idx = next( + (i + 1 for i, s in enumerate(scenes) if s.scene_id == cur_scene.scene_id), + None, + ) + if ( + remaining < cfg.cv.deep_scan.scene_boundary_epsilon_s * 4 + and next_idx is not None + and next_idx < len(scenes) + ): + next_scene_start = float(scenes[next_idx].start_s) + alt_content = _fixed_content_sequence_score(cap, next_scene_start, templates, cfg) + cur_content = _fixed_content_sequence_score(cap, t_sec, templates, cfg) + if alt_content > cur_content: + eval_t = next_scene_start + content_score = _fixed_content_sequence_score(cap, eval_t, templates, cfg) coverage_score = 1.0 if scenes is not None and matchable_duration_s and matchable_duration_s > 0: usable_s = _contiguous_scene_coverage_duration( beat, - t_sec, + eval_t, scenes, matchable_duration_s, cfg, @@ -404,7 +580,7 @@ def _rerank_candidates_by_content( + coarse_score * 0.18 + coverage_score * 0.20 ) - reranked.append((rank_score, coarse_score, t_sec)) + reranked.append((rank_score, coarse_score, eval_t)) return sorted(reranked, key=lambda item: item[0], reverse=True) @@ -772,6 +948,8 @@ def _content_alignment_score( in_point_s: float, templates: list[tuple[float, np.ndarray]], cfg: AppConfig, + fps: float | None = None, + frame_cache: dict[int, np.ndarray] | None = None, ) -> float: if not templates: return -1.0 @@ -782,7 +960,13 @@ def _content_alignment_score( early_scores: list[float] = [] for offset_s, template in templates: - frame = grab_frame_at(cap, in_point_s + offset_s) + t0 = in_point_s + offset_s + if frame_cache is not None and fps is not None: + idx = int(round(t0 * fps)) + frame = frame_cache.get(idx) + else: + frame = grab_frame_at(cap, t0) + if frame is None: return -1.0 @@ -840,6 +1024,20 @@ def align_in_point_by_content( end_s = estimated_in_point_s + window_s tie_delta = cfg.cv.deep_scan.start_tie_break_score_delta + min_offset = min(off for off, _ in templates) + max_offset = max(off for off, _ in templates) + req_start_s = max(0.0, start_s + min_offset - frame_step_s) + req_end_s = end_s + max_offset + frame_step_s + + frame_cache = {} + t_req = req_start_s + while t_req <= req_end_s: + idx = int(round(t_req * fps)) + frame = grab_frame_at(cap, t_req) + if frame is not None: + frame_cache[idx] = frame + t_req = round(t_req + frame_step_s, 6) + best_in = estimated_in_point_s best_score = -1.0 t = start_s @@ -852,7 +1050,7 @@ def align_in_point_by_content( active_templates = [] else: active_templates = templates - score = _content_alignment_score(cap, t, active_templates, cfg) if active_templates else -1.0 + score = _content_alignment_score(cap, t, active_templates, cfg, fps=fps, frame_cache=frame_cache) if active_templates else -1.0 if score > best_score + tie_delta: best_score = score best_in = t @@ -868,11 +1066,23 @@ def _motion_phase_score( in_point_s: float, motion_templates: list[tuple[float, float, np.ndarray, tuple[int, ...]]], cfg: AppConfig, + fps: float | None = None, + frame_cache: dict[int, np.ndarray] | None = None, ) -> float: scores: list[float] = [] for offset_s, step_s, ref_delta, template_shape in motion_templates: - f0 = grab_frame_at(cap, in_point_s + offset_s) - f1 = grab_frame_at(cap, in_point_s + offset_s + step_s) + t0 = in_point_s + offset_s + t1 = in_point_s + offset_s + step_s + + if frame_cache is not None and fps is not None: + idx0 = int(round(t0 * fps)) + idx1 = int(round(t1 * fps)) + f0 = frame_cache.get(idx0) + f1 = frame_cache.get(idx1) + else: + f0 = grab_frame_at(cap, t0) + f1 = grab_frame_at(cap, t1) + if f0 is None or f1 is None: return -1.0 src0 = _fixed_feature(f0, template_shape, cfg) @@ -913,11 +1123,25 @@ def align_in_point_by_motion( end_s = estimated_in_point_s + window_s tie_delta = cfg.cv.deep_scan.start_tie_break_score_delta + min_offset = min(off for off, _, _, _ in motion_templates) + max_offset = max(off + step for off, step, _, _ in motion_templates) + req_start_s = max(0.0, start_s + min_offset - frame_step_s) + req_end_s = end_s + max_offset + frame_step_s + + frame_cache = {} + t_req = req_start_s + while t_req <= req_end_s: + idx = int(round(t_req * fps)) + frame = grab_frame_at(cap, t_req) + if frame is not None: + frame_cache[idx] = frame + t_req = round(t_req + frame_step_s, 6) + best_in = estimated_in_point_s best_score = -1.0 t = start_s while t <= end_s: - score = _motion_phase_score(cap, t, motion_templates, cfg) + score = _motion_phase_score(cap, t, motion_templates, cfg, fps=fps, frame_cache=frame_cache) if score > best_score + tie_delta: best_score = score best_in = t @@ -933,6 +1157,7 @@ def align_in_point_by_content_and_motion( estimated_in_point_s: float, cfg: AppConfig, search_window_s: float | None = None, + scene_end_s: float | None = None, ) -> tuple[float, float, float, float]: """ Align a candidate using still-frame content and motion phase together. @@ -959,23 +1184,57 @@ def align_in_point_by_content_and_motion( end_s = estimated_in_point_s + window_s tie_delta = cfg.cv.deep_scan.start_tie_break_score_delta + min_t_offset = min(off for off, _ in templates) if templates else 0.0 + max_t_offset = max(off for off, _ in templates) if templates else 0.0 + min_m_offset = min(off for off, _, _, _ in motion_templates) if motion_templates else 0.0 + max_m_offset = max(off + step for off, step, _, _ in motion_templates) if motion_templates else 0.0 + + min_offset = min(min_t_offset, min_m_offset) + max_offset = max(max_t_offset, max_m_offset) + req_start_s = max(0.0, start_s + min_offset - frame_step_s) + req_end_s = end_s + max_offset + frame_step_s + + frame_cache = {} + t_req = req_start_s + while t_req <= req_end_s: + idx = int(round(t_req * fps)) + frame = grab_frame_at(cap, t_req) + if frame is not None: + frame_cache[idx] = frame + t_req = round(t_req + frame_step_s, 6) + best_in = estimated_in_point_s best_score = -1.0 best_content = -1.0 best_motion = -1.0 t = start_s while t <= end_s: - content_score = _content_alignment_score(cap, t, templates, cfg) + if scene_end_s is not None: + avail_s = scene_end_s - t + if avail_s > 0: + active_templates = [(off, tpl) for off, tpl in templates if off <= avail_s] + active_motion = [(off, step, delta, shape) for off, step, delta, shape in motion_templates if off + step <= avail_s] + else: + active_templates = [] + active_motion = [] + else: + active_templates = templates + active_motion = motion_templates + + content_score = _content_alignment_score(cap, t, active_templates, cfg, fps=fps, frame_cache=frame_cache) if active_templates else -1.0 motion_score = ( - _motion_phase_score(cap, t, motion_templates, cfg) - if len(motion_templates) >= 2 + _motion_phase_score(cap, t, active_motion, cfg, fps=fps, frame_cache=frame_cache) + if len(active_motion) >= 2 else content_score ) if content_score < 0 or motion_score < 0: t = round(t + frame_step_s, 6) continue raw_score = content_score * 0.64 + motion_score * 0.36 - anchor_penalty = min(0.18, abs(t - estimated_in_point_s) * 0.05) + # The previous anchor_penalty of 0.05 per second was stronger than the + # actual variance in raw_score, preventing phase correction. We reduce it + # so that it only acts as a tie-breaker. + anchor_penalty = min(0.18, abs(t - estimated_in_point_s) * 0.005) score = raw_score - anchor_penalty if score > best_score + tie_delta: best_score = score @@ -1027,6 +1286,18 @@ def estimate_usable_source_duration( frame = grab_frame_at(cap, in_point_s + offset_s) if frame is None: break + + # If the template is scoreable (has content) but the source frame is dark, + # this is a bad match. We should not let dark source frames + # provide high correlation to dark templates. + # templates are already pre-processed into feature images (grayscale/edges), + # so we can't use _is_scoreable_reference_frame on them directly. + # Instead, we rely on the fact that _prepare_beat_templates already + # filtered out non-scoreable frames. + if _is_dark_reference_frame(frame, cfg): + scores.append((offset_s, 0.0)) + continue + scores.append((offset_s, _match_score(frame, template, cfg))) if not scores: @@ -1038,12 +1309,14 @@ def estimate_usable_source_duration( last_good = 0.0 bad_run = 0 + bad_run_start_offset: float | None = None good_scores: list[float] = [] for offset_s, score in scores: if score >= min_score: last_good = offset_s bad_run = 0 + bad_run_start_offset = None good_scores.append(score) continue @@ -1051,7 +1324,34 @@ def estimate_usable_source_duration( continue bad_run += 1 + if bad_run_start_offset is None: + bad_run_start_offset = offset_s if bad_run >= 3: + # Before killing the span, check whether the remaining scores form a + # stable plateau. This handles scenes where a grading/exposure + # difference between trailer and source causes a gradual score drop + # rather than a hard cut. A genuine cut produces chaotic scores; + # a grading mismatch produces a flat, low-but-consistent plateau. + # Conditions: low variance (std < 0.025), scores above pure-black + # (mean > 0.20), and the warmup baseline was meaningful (>= 0.30). + tail_scores = [s for o, s in scores if o >= bad_run_start_offset] + if ( + len(tail_scores) >= 3 + and float(np.std(tail_scores)) < 0.025 + and float(np.mean(tail_scores)) > 0.20 + and baseline >= 0.30 + ): + logger.debug( + 'Beat %d: stable plateau detected at offset %.3fs ' + '(tail mean=%.3f std=%.3f) — extending span to full duration.', + beat.beat_id, bad_run_start_offset, + float(np.mean(tail_scores)), float(np.std(tail_scores)), + ) + last_good = scores[-1][0] + good_scores.extend(tail_scores) + break + logger.debug('Beat %d: Match died at offset %.3fs. Score %.3f < min_score %.3f. Bad run count: %d', + beat.beat_id, offset_s, score, min_score, bad_run) break tail_safety_s = max(0.0, cfg.cv.deep_scan.trim_tail_frames / source_fps) @@ -1113,7 +1413,10 @@ def refine_in_point_with_sequence( Returns: (best_in_point_s, sequence_score) """ - return align_in_point_by_content(beat, estimated_in_point_s, cfg, search_window_s, scene_end_s) + best_in, best_score, _, _ = align_in_point_by_content_and_motion( + beat, estimated_in_point_s, cfg, search_window_s, scene_end_s + ) + return best_in, best_score def _find_scene_for_time(scenes: Sequence | None, t_sec: float, cfg: AppConfig): @@ -1451,7 +1754,7 @@ def run_global_scan( max_source_duration_s=duration_s if rough_scene_end_s is not None else None, ) content_score = original_content_score - content_in_s, align_content_score = align_in_point_by_content( + content_in_s, _, align_content_score, _ = align_in_point_by_content_and_motion( b, adjusted_in_s, cfg, @@ -1495,7 +1798,7 @@ def run_global_scan( cfg, ) - motion_in_s, align_motion_score = align_in_point_by_motion( + motion_in_s, _, _, align_motion_score = align_in_point_by_content_and_motion( b, adjusted_in_s, cfg, @@ -1504,6 +1807,7 @@ def run_global_scan( if local_align_window_s is not None else min(1.0, cfg.cv.deep_scan.content_align_window_seconds) ), + scene_end_s=rough_scene_end_s, ) if align_motion_score >= original_motion_score + 0.015: @@ -1561,7 +1865,12 @@ def run_global_scan( ) if len(motion_templates) >= 2: motion_score_clamped = max(0.0, min(1.0, motion_score)) - final_score = final_score * 0.82 + motion_score_clamped * 0.18 + blended = final_score * 0.82 + motion_score_clamped * 0.18 + # Do not let motion blending drag the score below the + # content-validated level. A weak motion score often just + # means the shot contains a camera pan or slow zoom; it + # should not veto an otherwise well-supported content match. + final_score = max(blended, final_score - 0.015) if is_weighted_seed_candidate: vision_provisional_score = ( content_score * 0.45 @@ -1741,6 +2050,36 @@ def run_global_scan( best_result.match_score, ) + # Final hi-res phase refinement: scan the full source scene at + # higher resolution to correct phase mismatches that the standard + # 160×80 features cannot resolve (e.g. talking-head close-ups). + final_in_s = best_result.in_point_s + final_scene = _find_scene_for_time(scenes, final_in_s, cfg) + if final_scene is not None: + refined_phase_in_s = _hires_phase_refine( + b, + final_in_s, + float(final_scene.start_s), + float(final_scene.end_s), + cfg, + ) + if refined_phase_in_s != final_in_s: + final_in_s = refined_phase_in_s + # Recompute out-point preserving the duration + final_out_s = final_in_s + best_result.duration_s + if final_scene is not None: + final_out_s = min(final_out_s, float(final_scene.end_s)) + best_result = MatchResult( + beat_id=b.beat_id, + scene_id=best_result.scene_id, + source_path=cfg.paths.source_movie, + in_point_s=final_in_s, + out_point_s=final_out_s, + in_point_frame=int(final_in_s * source_fps), + match_score=best_result.match_score, + is_confirmed=is_confirmed, + ) + results.append(MatchResult( beat_id=b.beat_id, scene_id=best_result.scene_id,