Improve multi-shot phase retune

This commit is contained in:
Melbar
2026-05-09 09:36:11 +02:00
parent a275b2efb6
commit c08ba97d37
8 changed files with 138 additions and 44 deletions
+78 -25
View File
@@ -131,7 +131,7 @@ def _auto_commit_push_reports(project_root: "Path") -> None: # type: ignore[nam
log.warning("Auto-commit/push failed (non-fatal): %s", exc)
def _regenerate_cutter_report(cfg: "AppConfig") -> None: # type: ignore[name-defined]
def _regenerate_cutter_report(cfg: "AppConfig", force_beats: set[int] | None = None) -> None: # type: ignore[name-defined]
"""Re-render CUTTER_REPORT.{md,html} with Frame-Locked Compare clips.
Called from every match-style command after the cache is written so all
@@ -141,8 +141,19 @@ def _regenerate_cutter_report(cfg: "AppConfig") -> None: # type: ignore[name-de
"""
project_root = cfg.paths.cache_dir.parent
try:
import os
from scripts.generate_cutter_report import render_report
md, html = render_report(project_root, with_stills=True, with_clips=True)
old_force = os.environ.get("CUTTER_REPORT_FORCE_BEATS")
try:
if force_beats:
os.environ["CUTTER_REPORT_FORCE_BEATS"] = ",".join(str(b) for b in sorted(force_beats))
md, html = render_report(project_root, with_stills=True, with_clips=True)
finally:
if force_beats:
if old_force is None:
os.environ.pop("CUTTER_REPORT_FORCE_BEATS", None)
else:
os.environ["CUTTER_REPORT_FORCE_BEATS"] = old_force
(project_root / "CUTTER_REPORT.md").write_text(md, encoding="utf-8")
(project_root / "CUTTER_REPORT.html").write_text(html, encoding="utf-8")
@@ -293,6 +304,8 @@ def _normalize_cached_results(beats: list, results: list, cfg) -> list:
segment,
in_point_s=in_point_s,
out_point_s=in_point_s + float(segment.duration_s),
match_score=max(float(segment.match_score), float(_phase_score)),
is_confirmed=float(_phase_score) >= cfg.cv.deep_scan.match_threshold,
)
repaired_segments.append(segment)
@@ -1430,6 +1443,8 @@ def _attach_visual_segments(results: list, beats: list, cfg) -> list:
seg,
in_point_s=in_point_s,
out_point_s=in_point_s + seg.duration_s,
match_score=max(seg.match_score, _phase_score),
is_confirmed=_phase_score >= cfg.cv.deep_scan.match_threshold,
)
else:
seg = repaired
@@ -1693,27 +1708,34 @@ def _local_same_scene_segment_match(segment_beat, beat, segment_offset_s: float,
def _phase_probe_segment_in_scene(segment_beat, scene: dict, original_in_s: float, cfg):
"""Retune a weak multi-shot segment inside its own scene using cheap frame features."""
"""Retune a weak multi-shot segment inside its own scene using saliency-weighted frames."""
import cv2
import numpy as np
offsets = [0.0, 0.28, 0.56, 0.84, 1.12]
offsets = [0.0, 0.16, 0.32, 0.48, 0.64, 0.80, 0.96, 1.12]
size = (160, 90)
def feature(frame):
def prepared_gray(frame):
if frame is None:
return None
h, w = frame.shape[:2]
frame = frame.copy()
frame[: int(h * 0.16), : int(w * 0.28)] = 0
# Timecode overlays and letterbox edges are trailer/source-specific and
# should not pull the phase toward the wrong moment.
frame[: int(h * 0.16), : int(w * 0.32)] = 0
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
gray = cv2.resize(gray, size)
edges = cv2.Canny(gray, 40, 120)
vec = np.concatenate([
gray.reshape(-1).astype("float32") / 255.0,
edges.reshape(-1).astype("float32") / 255.0,
])
return (vec - vec.mean()) / (vec.std() + 1e-6)
return cv2.equalizeHist(gray).astype("float32") / 255.0
def edge(gray):
return cv2.Canny((gray * 255).astype("uint8"), 45, 130).astype("float32") / 255.0
def pair_score(ref_gray, src_gray, mask):
if ref_gray is None or src_gray is None:
return None
pixel = 1.0 - float((np.abs(ref_gray - src_gray) * mask).sum())
edge_score = 1.0 - float((np.abs(edge(ref_gray) - edge(src_gray)) * mask).sum())
return 0.65 * pixel + 0.35 * edge_score
def frame_at(cap, t_s):
cap.set(cv2.CAP_PROP_POS_MSEC, t_s * 1000.0)
@@ -1722,39 +1744,69 @@ def _phase_probe_segment_in_scene(segment_beat, scene: dict, original_in_s: floa
trailer_cap = cv2.VideoCapture(str(cfg.paths.reference_trailer))
refs = [
feature(frame_at(trailer_cap, segment_beat.start_s + offset))
prepared_gray(frame_at(trailer_cap, segment_beat.start_s + offset))
for offset in offsets
if offset <= segment_beat.duration_s + 0.04
]
refs = [ref for ref in refs if ref is not None]
if len(refs) < 3:
if len(refs) < 4:
return None
ref_stack = np.stack(refs, axis=0)
edge_stack = np.stack([edge(ref) for ref in refs], axis=0)
saliency = ref_stack.std(axis=0) * 1.25 + edge_stack.mean(axis=0) * 0.75
saliency[:, : int(size[0] * 0.12)] *= 0.15
saliency[: int(size[1] * 0.16), : int(size[0] * 0.32)] = 0.0
threshold = np.quantile(saliency, 0.72)
mask = (saliency >= threshold).astype("float32")
mask /= mask.sum() + 1e-6
scene_start = float(scene["start_s"])
scene_end = float(scene["end_s"])
scan_end = max(scene_start, scene_end - max(0.04, segment_beat.duration_s))
max_points = 96
max_points = 400
step_s = max(0.08, (scan_end - scene_start) / max_points)
source_cap = cv2.VideoCapture(str(cfg.paths.source_movie))
source_fps = source_cap.get(cv2.CAP_PROP_FPS) or _scene_fps_light(scene, cfg)
stride = max(1, int(round(step_s * source_fps)))
start_frame = max(0, int(round(scene_start * source_fps)))
end_frame = max(start_frame, int(round(scene_end * source_fps)))
times: list[float] = []
source_frames: list = []
frame_idx = start_frame
while frame_idx <= end_frame:
source_cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
ok, frame = source_cap.read()
if not ok:
break
times.append(frame_idx / source_fps)
source_frames.append(prepared_gray(frame))
frame_idx += stride
candidates: list[tuple[float, float, float]] = []
t = scene_start
while t <= scan_end:
for i, t in enumerate(times):
if t > scan_end:
break
vals = []
for offset, ref in zip(offsets, refs):
src = feature(frame_at(source_cap, t + offset))
if src is not None:
vals.append(float(np.dot(ref, src) / len(ref)))
if len(vals) >= 3:
candidates.append((sum(vals) / len(vals), min(vals), t))
t = round(t + step_s, 6)
j = int(round((t + offset - scene_start) / step_s))
if 0 <= j < len(source_frames):
score = pair_score(ref, source_frames[j], mask)
else:
score = None
if score is not None:
vals.append(score)
if len(vals) >= 4:
avg_score = sum(vals) / len(vals)
candidates.append((0.55 * avg_score + 0.45 * min(vals), min(vals), t))
if not candidates:
return None
candidates.sort(reverse=True)
best_score = candidates[0][0]
near_tie = [c for c in candidates if c[0] >= best_score - 0.01]
near_tie = [c for c in candidates if c[0] >= best_score - 0.002]
chosen = min(near_tie, key=lambda c: abs(c[2] - original_in_s))
return chosen[2], chosen[0]
@@ -1858,7 +1910,8 @@ def cmd_match(args: argparse.Namespace, cfg) -> list:
results_to_save = results
_save_results(results_to_save, cfg)
_regenerate_cutter_report(cfg)
force_report_beats = {int(args.beat)} if getattr(args, "beat", None) is not None else None
_regenerate_cutter_report(cfg, force_beats=force_report_beats)
print(f"\n{len(results)} / {len(beats)} beats matched.")
for r in results: