Retune weak multi-shot segment phases

This commit is contained in:
Melbar
2026-05-09 04:45:56 +02:00
parent fab6c53698
commit a275b2efb6
8 changed files with 178 additions and 11 deletions
+156 -6
View File
@@ -270,9 +270,38 @@ def _normalize_cached_results(beats: list, results: list, cfg) -> list:
for result in results:
beat = beats_by_id.get(result.beat_id)
if getattr(result, "segments", ()):
segment_duration = sum(max(0.0, float(s.duration_s)) for s in result.segments)
segment_threshold = cfg.cv.deep_scan.multi_shot_segment_threshold
repaired_segments = []
for segment in result.segments:
if float(segment.match_score) < segment_threshold:
scene = _scene_by_id_light(scenes, segment.scene_id)
if beat is not None and scene is not None:
segment_beat = replace(
beat,
start_s=beat.start_s + float(segment.trailer_offset_s),
end_s=beat.start_s + float(segment.trailer_offset_s) + float(segment.duration_s),
)
probe = _phase_probe_segment_in_scene(
segment_beat,
scene,
float(segment.in_point_s),
cfg,
)
if probe is not None:
in_point_s, _phase_score = probe
segment = replace(
segment,
in_point_s=in_point_s,
out_point_s=in_point_s + float(segment.duration_s),
)
repaired_segments.append(segment)
valid_segments = tuple(repaired_segments)
if not valid_segments:
continue
segment_duration = sum(max(0.0, float(s.duration_s)) for s in valid_segments)
weighted_score = (
sum(max(0.0, float(s.duration_s)) * float(s.match_score) for s in result.segments)
sum(max(0.0, float(s.duration_s)) * float(s.match_score) for s in valid_segments)
/ segment_duration
if segment_duration > 0 else result.match_score
)
@@ -287,7 +316,15 @@ def _normalize_cached_results(beats: list, results: list, cfg) -> list:
coverage = segment_duration / coverage_target
if coverage < cfg.cv.deep_scan.min_duration_coverage:
continue
normalized.append(replace(result, match_score=weighted_score))
first_segment = valid_segments[0]
normalized.append(replace(
result,
scene_id=first_segment.scene_id,
in_point_s=first_segment.in_point_s,
out_point_s=first_segment.out_point_s,
match_score=weighted_score,
segments=valid_segments,
))
continue
if result.match_score < cfg.cv.deep_scan.provisional_match_threshold:
@@ -1363,6 +1400,39 @@ def _attach_visual_segments(results: list, beats: list, cfg) -> list:
if not segment_matches:
continue
seg = segment_matches[0]
if seg.match_score < cfg.cv.deep_scan.multi_shot_segment_threshold:
repaired = _local_same_scene_segment_match(
segment_beat,
beat,
start_s,
cached + expanded,
cfg,
)
if (
repaired is None
or repaired.match_score
< max(
cfg.cv.deep_scan.multi_shot_segment_threshold,
seg.match_score + cfg.cv.deep_scan.duration_tie_break_score_delta,
)
):
scenes = _load_scene_cache_light(cfg)
scene = _scene_by_id_light(scenes, seg.scene_id)
probe = (
_phase_probe_segment_in_scene(segment_beat, scene, seg.in_point_s, cfg)
if scene is not None else None
)
if probe is None:
continue
in_point_s, _phase_score = probe
from dataclasses import replace as _replace
seg = _replace(
seg,
in_point_s=in_point_s,
out_point_s=in_point_s + seg.duration_s,
)
else:
seg = repaired
seg_dur = min(max(0.0, end_s - start_s), max(0.0, seg.duration_s))
segments.append(
MatchSegment(
@@ -1484,7 +1554,10 @@ def _match_unmatched_visual_segments(
if recovered:
rec = recovered[0]
seg_dur = min(max(0.0, end_s - start_s), max(0.0, rec.duration_s))
if seg_dur > 0:
if (
seg_dur > 0
and rec.match_score >= cfg.cv.deep_scan.multi_shot_segment_threshold
):
segments.append(MatchSegment(
trailer_offset_s=start_s,
duration_s=seg_dur,
@@ -1506,6 +1579,8 @@ def _match_unmatched_visual_segments(
segments.append(local_segment)
continue
seg = segment_matches[0]
if seg.match_score < cfg.cv.deep_scan.multi_shot_segment_threshold:
continue
seg_dur = min(max(0.0, end_s - start_s), max(0.0, seg.duration_s))
segments.append(
MatchSegment(
@@ -1577,7 +1652,13 @@ def _local_same_scene_segment_match(segment_beat, beat, segment_offset_s: float,
cfg.cv.deep_scan.provisional_content_threshold * 0.70,
cfg.cv.deep_scan.provisional_match_threshold,
)
step_s = max(1.0 / cfg.export.edl_frame_rate, 0.04)
# Coarse repair scan over already plausible neighbouring scenes. A frame-step
# sweep across long dialogue scenes is slow and can overfit static layouts.
step_s = max(
cfg.vision.local_scan_step_s,
cfg.cv.deep_scan.content_align_sample_step_s,
0.25,
)
best: tuple[float, float, int] | None = None
with open_video(cfg.paths.source_movie) as cap:
for scene_id in scene_ids:
@@ -1586,12 +1667,14 @@ def _local_same_scene_segment_match(segment_beat, beat, segment_offset_s: float,
continue
start_s = max(0.0, float(scene["start_s"]) - 0.25)
end_s = max(start_s, float(scene["end_s"]) - max(0.04, segment_beat.duration_s) + 0.25)
max_points = max(4, min(48, int(cfg.vision.local_scan_max_points_per_scene)))
scene_step_s = max(step_s, (end_s - start_s) / max_points)
t = start_s
while t <= end_s:
score = _content_alignment_score(cap, t, templates, cfg)
if best is None or score > best[0]:
best = (score, t, int(scene_id))
t = round(t + step_s, 6)
t = round(t + scene_step_s, 6)
if best is None or best[0] < min_score:
return None
@@ -1609,6 +1692,73 @@ def _local_same_scene_segment_match(segment_beat, beat, segment_offset_s: float,
)
def _phase_probe_segment_in_scene(segment_beat, scene: dict, original_in_s: float, cfg):
"""Retune a weak multi-shot segment inside its own scene using cheap frame features."""
import cv2
import numpy as np
offsets = [0.0, 0.28, 0.56, 0.84, 1.12]
size = (160, 90)
def feature(frame):
if frame is None:
return None
h, w = frame.shape[:2]
frame = frame.copy()
frame[: int(h * 0.16), : int(w * 0.28)] = 0
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
gray = cv2.resize(gray, size)
edges = cv2.Canny(gray, 40, 120)
vec = np.concatenate([
gray.reshape(-1).astype("float32") / 255.0,
edges.reshape(-1).astype("float32") / 255.0,
])
return (vec - vec.mean()) / (vec.std() + 1e-6)
def frame_at(cap, t_s):
cap.set(cv2.CAP_PROP_POS_MSEC, t_s * 1000.0)
ok, frame = cap.read()
return frame if ok else None
trailer_cap = cv2.VideoCapture(str(cfg.paths.reference_trailer))
refs = [
feature(frame_at(trailer_cap, segment_beat.start_s + offset))
for offset in offsets
if offset <= segment_beat.duration_s + 0.04
]
refs = [ref for ref in refs if ref is not None]
if len(refs) < 3:
return None
scene_start = float(scene["start_s"])
scene_end = float(scene["end_s"])
scan_end = max(scene_start, scene_end - max(0.04, segment_beat.duration_s))
max_points = 96
step_s = max(0.08, (scan_end - scene_start) / max_points)
source_cap = cv2.VideoCapture(str(cfg.paths.source_movie))
candidates: list[tuple[float, float, float]] = []
t = scene_start
while t <= scan_end:
vals = []
for offset, ref in zip(offsets, refs):
src = feature(frame_at(source_cap, t + offset))
if src is not None:
vals.append(float(np.dot(ref, src) / len(ref)))
if len(vals) >= 3:
candidates.append((sum(vals) / len(vals), min(vals), t))
t = round(t + step_s, 6)
if not candidates:
return None
candidates.sort(reverse=True)
best_score = candidates[0][0]
near_tie = [c for c in candidates if c[0] >= best_score - 0.01]
chosen = min(near_tie, key=lambda c: abs(c[2] - original_in_s))
return chosen[2], chosen[0]
def cmd_match(args: argparse.Namespace, cfg) -> list:
from src.pipeline.matcher import run_matching
from dataclasses import replace