Compare commits

...

2 Commits

Author SHA1 Message Date
Melbar 730b5ef3c0 Auto-update cutter report 2026-05-08 11:31
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-08 11:31:15 +02:00
Melbar f20f89b06b Add hi-res phase refinement for intra-scene phase matching (Beat 03 investigation) 2026-05-08 10:52:11 +02:00
12 changed files with 362 additions and 23 deletions
+1 -1
View File
File diff suppressed because one or more lines are too long
+5 -5
View File
File diff suppressed because one or more lines are too long
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.

Before

Width:  |  Height:  |  Size: 11 KiB

After

Width:  |  Height:  |  Size: 11 KiB

File diff suppressed because one or more lines are too long
+355 -16
View File
@@ -198,6 +198,158 @@ def _fixed_content_features(frame: np.ndarray, cfg: AppConfig) -> tuple[np.ndarr
)
def _hires_phase_feature(frame: np.ndarray) -> np.ndarray:
"""High-resolution normalised luma feature for intra-scene phase matching.
Standard pipeline features (160×80) lose the subtle pixel differences
between talking-head phases (mouth open vs. closed). This 320×160 feature
with an 8×8 spatial histogram grid provides enough spatial resolution to
discriminate facial expression phases within a single continuous scene.
"""
trimmed = _trim_dark_borders(frame)
h, w = trimmed.shape[:2]
cropped = trimmed[int(h * 0.05):int(h * 0.95), :]
gray = cv2.cvtColor(cropped, cv2.COLOR_BGR2GRAY)
gray = cv2.equalizeHist(gray)
resized = cv2.resize(gray, (320, 160), interpolation=cv2.INTER_AREA)
return resized
def _hires_spatial_hist(frame_feature: np.ndarray) -> np.ndarray:
"""8×8 grid spatial colour histogram from a hi-res luma feature."""
h, w = frame_feature.shape[:2]
grid = 8
cell_h = h // grid
cell_w = w // grid
parts: list[np.ndarray] = []
for gy in range(grid):
for gx in range(grid):
cell = frame_feature[gy * cell_h:(gy + 1) * cell_h,
gx * cell_w:(gx + 1) * cell_w]
hist = cv2.calcHist([cell], [0], None, [24], [0, 256]).astype(np.float32).flatten()
parts.append(hist / (float(np.sum(hist)) + 1e-6))
return np.concatenate(parts)
def _hires_phase_score(
ref_feature: np.ndarray,
ref_spatial: np.ndarray,
src_frame: np.ndarray,
) -> float:
"""Compare a source frame to a reference using hi-res phase features.
Uses three signals:
1. Full-frame NCC for overall similarity
2. Center-crop NCC for face/expression matching (key for talking heads)
3. Spatial histogram for structural layout
"""
src_feat = _hires_phase_feature(src_frame)
# Full-frame NCC
ncc_full = float(cv2.matchTemplate(
src_feat, ref_feature, cv2.TM_CCOEFF_NORMED
)[0][0])
# Center-crop NCC (face region — the center 40% of the frame)
h, w = ref_feature.shape[:2]
cy, cx = h // 2, w // 2
ch, cw = int(h * 0.20), int(w * 0.20)
ref_center = ref_feature[cy - ch:cy + ch, cx - cw:cx + cw]
src_center = src_feat[cy - ch:cy + ch, cx - cw:cx + cw]
ncc_center = float(cv2.matchTemplate(
src_center, ref_center, cv2.TM_CCOEFF_NORMED
)[0][0])
# Spatial histogram similarity
src_spatial = _hires_spatial_hist(src_feat)
spatial = _hist_intersection(ref_spatial, src_spatial)
return ncc_full * 0.25 + ncc_center * 0.45 + spatial * 0.30
def _hires_phase_refine(
beat: TrailerBeat,
in_point_s: float,
scene_start_s: float,
scene_end_s: float,
cfg: AppConfig,
) -> float:
"""Re-scan the full source scene at high resolution to correct phase.
This is applied as a final refinement step after the standard pipeline
has identified the correct scene. It addresses the case where low-res
features cannot distinguish between different phases of the same shot
(e.g. mouth open vs. closed in a talking-head close-up).
"""
# Build hi-res templates from only the stable, bright reference frames
# before any fade begins. Fading frames have dropping brightness that
# would penalise correct source positions where those offsets map to
# bright content in the source.
matchable_s = estimate_matchable_reference_duration(beat, cfg, sample_step_s=0.04)
ref_templates: list[tuple[float, np.ndarray, np.ndarray, float]] = []
step_s = max(1.0 / cfg.export.edl_frame_rate, 0.04)
t = 0.0
while t <= matchable_s:
frame = grab_frame_at_path(beat.trailer_path, beat.start_s + t)
if frame is not None and _is_scoreable_reference_frame(frame, cfg):
mean_l, p90_l, contrast = _reference_visibility_stats(frame, cfg)
# Only use clearly visible frames (skip dimming fade frames)
if mean_l >= 50.0 and contrast >= 40.0:
feat = _hires_phase_feature(frame)
spatial = _hires_spatial_hist(feat)
ref_templates.append((t, feat, spatial, mean_l))
t = round(t + step_s, 6)
if not ref_templates:
return in_point_s
# For very short matchable durations (fast fades / cross-dissolves),
# keep only the brightest template. When the beat fades quickly the
# later templates are dim and penalise every bright source candidate
# equally, destroying phase discrimination. A single bright anchor
# gives maximum selectivity.
if matchable_s < 1.0 and len(ref_templates) > 1:
ref_templates.sort(key=lambda x: x[3], reverse=True)
ref_templates = ref_templates[:1]
logger.debug(
'Beat %d: hi-res using single brightest template at offset %.3fs (luma %.1f)',
beat.beat_id, ref_templates[0][0], ref_templates[0][3],
)
# Strip the luma field for the scan loop
scan_templates = [(off, feat, sp) for off, feat, sp, _ in ref_templates]
max_ref_offset = max(off for off, _, _ in scan_templates)
# Scan the full scene
best_t = in_point_s
best_score = -1.0
scan_step_s = max(1.0 / (cfg.export.edl_frame_rate or 24.0), 0.04)
with open_video(cfg.paths.source_movie) as cap:
t = scene_start_s
while t + max_ref_offset <= scene_end_s:
scores: list[float] = []
all_ok = True
for off, ref_feat, ref_spatial in scan_templates:
src_frame = grab_frame_at(cap, t + off)
if src_frame is None:
all_ok = False
break
scores.append(_hires_phase_score(ref_feat, ref_spatial, src_frame))
if all_ok and scores:
avg = sum(scores) / len(scores)
combined = avg * 0.7 + min(scores) * 0.3
if combined > best_score:
best_score = combined
best_t = t
t = round(t + scan_step_s, 6)
if best_t != in_point_s:
logger.info(
'Beat %d: hi-res phase refine moved in-point %.3fs -> %.3fs '
'(delta=%.3fs, score=%.4f)',
beat.beat_id, in_point_s, best_t,
best_t - in_point_s, best_score,
)
return best_t
def _fixed_content_pair_score(
ref_features: tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray],
source_frame: np.ndarray,
@@ -388,12 +540,36 @@ def _rerank_candidates_by_content(
reranked: list[tuple[float, float, float]] = []
with open_video(cfg.paths.source_movie) as cap:
for coarse_score, t_sec in candidates:
content_score = _fixed_content_sequence_score(cap, t_sec, templates, cfg)
# If the candidate lands just before a scene boundary, also evaluate
# the start of the next scene. A coarse-scan offset can place the
# in-point a few frames into the preceding (wrong) scene, causing
# the content and coverage scores to be artificially low even though
# the next scene is the correct visual match.
eval_t = t_sec
if scenes is not None:
cur_scene = _find_scene_for_time(scenes, t_sec, cfg)
if cur_scene is not None:
remaining = float(cur_scene.end_s) - t_sec
next_idx = next(
(i + 1 for i, s in enumerate(scenes) if s.scene_id == cur_scene.scene_id),
None,
)
if (
remaining < cfg.cv.deep_scan.scene_boundary_epsilon_s * 4
and next_idx is not None
and next_idx < len(scenes)
):
next_scene_start = float(scenes[next_idx].start_s)
alt_content = _fixed_content_sequence_score(cap, next_scene_start, templates, cfg)
cur_content = _fixed_content_sequence_score(cap, t_sec, templates, cfg)
if alt_content > cur_content:
eval_t = next_scene_start
content_score = _fixed_content_sequence_score(cap, eval_t, templates, cfg)
coverage_score = 1.0
if scenes is not None and matchable_duration_s and matchable_duration_s > 0:
usable_s = _contiguous_scene_coverage_duration(
beat,
t_sec,
eval_t,
scenes,
matchable_duration_s,
cfg,
@@ -404,7 +580,7 @@ def _rerank_candidates_by_content(
+ coarse_score * 0.18
+ coverage_score * 0.20
)
reranked.append((rank_score, coarse_score, t_sec))
reranked.append((rank_score, coarse_score, eval_t))
return sorted(reranked, key=lambda item: item[0], reverse=True)
@@ -772,6 +948,8 @@ def _content_alignment_score(
in_point_s: float,
templates: list[tuple[float, np.ndarray]],
cfg: AppConfig,
fps: float | None = None,
frame_cache: dict[int, np.ndarray] | None = None,
) -> float:
if not templates:
return -1.0
@@ -782,7 +960,13 @@ def _content_alignment_score(
early_scores: list[float] = []
for offset_s, template in templates:
frame = grab_frame_at(cap, in_point_s + offset_s)
t0 = in_point_s + offset_s
if frame_cache is not None and fps is not None:
idx = int(round(t0 * fps))
frame = frame_cache.get(idx)
else:
frame = grab_frame_at(cap, t0)
if frame is None:
return -1.0
@@ -840,6 +1024,20 @@ def align_in_point_by_content(
end_s = estimated_in_point_s + window_s
tie_delta = cfg.cv.deep_scan.start_tie_break_score_delta
min_offset = min(off for off, _ in templates)
max_offset = max(off for off, _ in templates)
req_start_s = max(0.0, start_s + min_offset - frame_step_s)
req_end_s = end_s + max_offset + frame_step_s
frame_cache = {}
t_req = req_start_s
while t_req <= req_end_s:
idx = int(round(t_req * fps))
frame = grab_frame_at(cap, t_req)
if frame is not None:
frame_cache[idx] = frame
t_req = round(t_req + frame_step_s, 6)
best_in = estimated_in_point_s
best_score = -1.0
t = start_s
@@ -852,7 +1050,7 @@ def align_in_point_by_content(
active_templates = []
else:
active_templates = templates
score = _content_alignment_score(cap, t, active_templates, cfg) if active_templates else -1.0
score = _content_alignment_score(cap, t, active_templates, cfg, fps=fps, frame_cache=frame_cache) if active_templates else -1.0
if score > best_score + tie_delta:
best_score = score
best_in = t
@@ -868,11 +1066,23 @@ def _motion_phase_score(
in_point_s: float,
motion_templates: list[tuple[float, float, np.ndarray, tuple[int, ...]]],
cfg: AppConfig,
fps: float | None = None,
frame_cache: dict[int, np.ndarray] | None = None,
) -> float:
scores: list[float] = []
for offset_s, step_s, ref_delta, template_shape in motion_templates:
f0 = grab_frame_at(cap, in_point_s + offset_s)
f1 = grab_frame_at(cap, in_point_s + offset_s + step_s)
t0 = in_point_s + offset_s
t1 = in_point_s + offset_s + step_s
if frame_cache is not None and fps is not None:
idx0 = int(round(t0 * fps))
idx1 = int(round(t1 * fps))
f0 = frame_cache.get(idx0)
f1 = frame_cache.get(idx1)
else:
f0 = grab_frame_at(cap, t0)
f1 = grab_frame_at(cap, t1)
if f0 is None or f1 is None:
return -1.0
src0 = _fixed_feature(f0, template_shape, cfg)
@@ -913,11 +1123,25 @@ def align_in_point_by_motion(
end_s = estimated_in_point_s + window_s
tie_delta = cfg.cv.deep_scan.start_tie_break_score_delta
min_offset = min(off for off, _, _, _ in motion_templates)
max_offset = max(off + step for off, step, _, _ in motion_templates)
req_start_s = max(0.0, start_s + min_offset - frame_step_s)
req_end_s = end_s + max_offset + frame_step_s
frame_cache = {}
t_req = req_start_s
while t_req <= req_end_s:
idx = int(round(t_req * fps))
frame = grab_frame_at(cap, t_req)
if frame is not None:
frame_cache[idx] = frame
t_req = round(t_req + frame_step_s, 6)
best_in = estimated_in_point_s
best_score = -1.0
t = start_s
while t <= end_s:
score = _motion_phase_score(cap, t, motion_templates, cfg)
score = _motion_phase_score(cap, t, motion_templates, cfg, fps=fps, frame_cache=frame_cache)
if score > best_score + tie_delta:
best_score = score
best_in = t
@@ -933,6 +1157,7 @@ def align_in_point_by_content_and_motion(
estimated_in_point_s: float,
cfg: AppConfig,
search_window_s: float | None = None,
scene_end_s: float | None = None,
) -> tuple[float, float, float, float]:
"""
Align a candidate using still-frame content and motion phase together.
@@ -959,23 +1184,57 @@ def align_in_point_by_content_and_motion(
end_s = estimated_in_point_s + window_s
tie_delta = cfg.cv.deep_scan.start_tie_break_score_delta
min_t_offset = min(off for off, _ in templates) if templates else 0.0
max_t_offset = max(off for off, _ in templates) if templates else 0.0
min_m_offset = min(off for off, _, _, _ in motion_templates) if motion_templates else 0.0
max_m_offset = max(off + step for off, step, _, _ in motion_templates) if motion_templates else 0.0
min_offset = min(min_t_offset, min_m_offset)
max_offset = max(max_t_offset, max_m_offset)
req_start_s = max(0.0, start_s + min_offset - frame_step_s)
req_end_s = end_s + max_offset + frame_step_s
frame_cache = {}
t_req = req_start_s
while t_req <= req_end_s:
idx = int(round(t_req * fps))
frame = grab_frame_at(cap, t_req)
if frame is not None:
frame_cache[idx] = frame
t_req = round(t_req + frame_step_s, 6)
best_in = estimated_in_point_s
best_score = -1.0
best_content = -1.0
best_motion = -1.0
t = start_s
while t <= end_s:
content_score = _content_alignment_score(cap, t, templates, cfg)
if scene_end_s is not None:
avail_s = scene_end_s - t
if avail_s > 0:
active_templates = [(off, tpl) for off, tpl in templates if off <= avail_s]
active_motion = [(off, step, delta, shape) for off, step, delta, shape in motion_templates if off + step <= avail_s]
else:
active_templates = []
active_motion = []
else:
active_templates = templates
active_motion = motion_templates
content_score = _content_alignment_score(cap, t, active_templates, cfg, fps=fps, frame_cache=frame_cache) if active_templates else -1.0
motion_score = (
_motion_phase_score(cap, t, motion_templates, cfg)
if len(motion_templates) >= 2
_motion_phase_score(cap, t, active_motion, cfg, fps=fps, frame_cache=frame_cache)
if len(active_motion) >= 2
else content_score
)
if content_score < 0 or motion_score < 0:
t = round(t + frame_step_s, 6)
continue
raw_score = content_score * 0.64 + motion_score * 0.36
anchor_penalty = min(0.18, abs(t - estimated_in_point_s) * 0.05)
# The previous anchor_penalty of 0.05 per second was stronger than the
# actual variance in raw_score, preventing phase correction. We reduce it
# so that it only acts as a tie-breaker.
anchor_penalty = min(0.18, abs(t - estimated_in_point_s) * 0.005)
score = raw_score - anchor_penalty
if score > best_score + tie_delta:
best_score = score
@@ -1027,6 +1286,18 @@ def estimate_usable_source_duration(
frame = grab_frame_at(cap, in_point_s + offset_s)
if frame is None:
break
# If the template is scoreable (has content) but the source frame is dark,
# this is a bad match. We should not let dark source frames
# provide high correlation to dark templates.
# templates are already pre-processed into feature images (grayscale/edges),
# so we can't use _is_scoreable_reference_frame on them directly.
# Instead, we rely on the fact that _prepare_beat_templates already
# filtered out non-scoreable frames.
if _is_dark_reference_frame(frame, cfg):
scores.append((offset_s, 0.0))
continue
scores.append((offset_s, _match_score(frame, template, cfg)))
if not scores:
@@ -1038,12 +1309,14 @@ def estimate_usable_source_duration(
last_good = 0.0
bad_run = 0
bad_run_start_offset: float | None = None
good_scores: list[float] = []
for offset_s, score in scores:
if score >= min_score:
last_good = offset_s
bad_run = 0
bad_run_start_offset = None
good_scores.append(score)
continue
@@ -1051,7 +1324,34 @@ def estimate_usable_source_duration(
continue
bad_run += 1
if bad_run_start_offset is None:
bad_run_start_offset = offset_s
if bad_run >= 3:
# Before killing the span, check whether the remaining scores form a
# stable plateau. This handles scenes where a grading/exposure
# difference between trailer and source causes a gradual score drop
# rather than a hard cut. A genuine cut produces chaotic scores;
# a grading mismatch produces a flat, low-but-consistent plateau.
# Conditions: low variance (std < 0.025), scores above pure-black
# (mean > 0.20), and the warmup baseline was meaningful (>= 0.30).
tail_scores = [s for o, s in scores if o >= bad_run_start_offset]
if (
len(tail_scores) >= 3
and float(np.std(tail_scores)) < 0.025
and float(np.mean(tail_scores)) > 0.20
and baseline >= 0.30
):
logger.debug(
'Beat %d: stable plateau detected at offset %.3fs '
'(tail mean=%.3f std=%.3f) — extending span to full duration.',
beat.beat_id, bad_run_start_offset,
float(np.mean(tail_scores)), float(np.std(tail_scores)),
)
last_good = scores[-1][0]
good_scores.extend(tail_scores)
break
logger.debug('Beat %d: Match died at offset %.3fs. Score %.3f < min_score %.3f. Bad run count: %d',
beat.beat_id, offset_s, score, min_score, bad_run)
break
tail_safety_s = max(0.0, cfg.cv.deep_scan.trim_tail_frames / source_fps)
@@ -1113,7 +1413,10 @@ def refine_in_point_with_sequence(
Returns:
(best_in_point_s, sequence_score)
"""
return align_in_point_by_content(beat, estimated_in_point_s, cfg, search_window_s, scene_end_s)
best_in, best_score, _, _ = align_in_point_by_content_and_motion(
beat, estimated_in_point_s, cfg, search_window_s, scene_end_s
)
return best_in, best_score
def _find_scene_for_time(scenes: Sequence | None, t_sec: float, cfg: AppConfig):
@@ -1451,7 +1754,7 @@ def run_global_scan(
max_source_duration_s=duration_s if rough_scene_end_s is not None else None,
)
content_score = original_content_score
content_in_s, align_content_score = align_in_point_by_content(
content_in_s, _, align_content_score, _ = align_in_point_by_content_and_motion(
b,
adjusted_in_s,
cfg,
@@ -1495,7 +1798,7 @@ def run_global_scan(
cfg,
)
motion_in_s, align_motion_score = align_in_point_by_motion(
motion_in_s, _, _, align_motion_score = align_in_point_by_content_and_motion(
b,
adjusted_in_s,
cfg,
@@ -1504,6 +1807,7 @@ def run_global_scan(
if local_align_window_s is not None
else min(1.0, cfg.cv.deep_scan.content_align_window_seconds)
),
scene_end_s=rough_scene_end_s,
)
if align_motion_score >= original_motion_score + 0.015:
@@ -1561,7 +1865,12 @@ def run_global_scan(
)
if len(motion_templates) >= 2:
motion_score_clamped = max(0.0, min(1.0, motion_score))
final_score = final_score * 0.82 + motion_score_clamped * 0.18
blended = final_score * 0.82 + motion_score_clamped * 0.18
# Do not let motion blending drag the score below the
# content-validated level. A weak motion score often just
# means the shot contains a camera pan or slow zoom; it
# should not veto an otherwise well-supported content match.
final_score = max(blended, final_score - 0.015)
if is_weighted_seed_candidate:
vision_provisional_score = (
content_score * 0.45
@@ -1741,6 +2050,36 @@ def run_global_scan(
best_result.match_score,
)
# Final hi-res phase refinement: scan the full source scene at
# higher resolution to correct phase mismatches that the standard
# 160×80 features cannot resolve (e.g. talking-head close-ups).
final_in_s = best_result.in_point_s
final_scene = _find_scene_for_time(scenes, final_in_s, cfg)
if final_scene is not None:
refined_phase_in_s = _hires_phase_refine(
b,
final_in_s,
float(final_scene.start_s),
float(final_scene.end_s),
cfg,
)
if refined_phase_in_s != final_in_s:
final_in_s = refined_phase_in_s
# Recompute out-point preserving the duration
final_out_s = final_in_s + best_result.duration_s
if final_scene is not None:
final_out_s = min(final_out_s, float(final_scene.end_s))
best_result = MatchResult(
beat_id=b.beat_id,
scene_id=best_result.scene_id,
source_path=cfg.paths.source_movie,
in_point_s=final_in_s,
out_point_s=final_out_s,
in_point_frame=int(final_in_s * source_fps),
match_score=best_result.match_score,
is_confirmed=is_confirmed,
)
results.append(MatchResult(
beat_id=b.beat_id,
scene_id=best_result.scene_id,