Retiming long scene matches by action phase

This commit is contained in:
Melbar
2026-05-02 20:47:59 +02:00
parent 252f710396
commit 8415516f89
3 changed files with 158 additions and 11 deletions
+9
View File
@@ -181,6 +181,10 @@ Aktionsphase verfehlt, sucht der Matcher automatisch dichter innerhalb derselben
Source-Szene nach lokalen Vision-Fenstern mit der passenden Aktion und richtet
den Inpoint mit der Motion-Phase-Prüfung darauf neu aus. Erst wenn auch diese
In-Scene-Reparatur scheitert, wird der Treffer verworfen.
Diese In-Scene-Reparatur läuft auch für semantisch gültige Treffer aus langen
Source-Szenen. Dadurch kann ein grob passender Dialogmoment nicht bestehen
bleiben, wenn ein anderes lokales Fenster derselben Szene die gesuchte
Aktionsphase und Bewegung klarer trifft.
Der gewichtete Vision-Seed-Pfad ersetzt standardmäßig keinen normalen
FFmpeg-Vollscan. Vision-Beschreibungen sind semantische Hinweise, aber keine
Beweise; der volle CV-Scan bleibt deshalb aktiv, damit falsch bewertete
@@ -200,6 +204,11 @@ Nach einem dichten Vision-Treffer darf der spätere lokale Aligner nur noch im
Bereich dieses Scan-Schritts nachjustieren. So kann ein korrekt gefundener
Bewegungsmoment nicht wieder um viele Frames in eine ähnlich aussehende Phase
derselben Szene verschoben werden.
Für Vision-Action-Fenster nutzt die finale Retiming-Prüfung eine gemeinsame
Content-und-Motion-Suche pro Frame. Content und Bewegungsphase werden dabei
nicht mehr als zwei getrennte Korrekturschritte angewendet; das verhindert,
dass eine kurze Geste erst korrekt erkannt und anschließend in eine spätere
ähnliche Körperhaltung verschoben wird.
Wenn mehrere Vision-Kandidaten in derselben Source-Szene ähnlich gut scoren
und die Beat-Dauer abdecken, bevorzugt der Matcher die frühere Phase. Das
verhindert, dass ein späterer, minimal stärkerer Standbildtreffer die
+80 -11
View File
@@ -640,7 +640,7 @@ def _filter_semantically_invalid_vision_matches(results: list, beats: list, cfg)
from dataclasses import replace
from src.llm.vision_cache import find_action_window_in_scene, validate_match_window_with_vision
from src.cv.scene_indexer import build_scene_index
from src.cv.global_scan import align_in_point_by_content, align_in_point_by_motion
from src.cv.global_scan import align_in_point_by_content_and_motion
logger = logging.getLogger(__name__)
beats_by_id = {beat.beat_id: beat for beat in beats}
@@ -654,19 +654,13 @@ def _filter_semantically_invalid_vision_matches(results: list, beats: list, cfg)
if found is None:
return None
start_s, end_s, semantic_score, reason = found
window_s = max(1.0, min(4.0, (end_s - start_s) * 1.5))
motion_in_s, motion_score = align_in_point_by_motion(
window_s = max(3.0, min(8.0, (end_s - start_s) * 4.0))
aligned_in_s, combined_score, content_score, motion_score = align_in_point_by_content_and_motion(
check_beat,
start_s,
cfg,
search_window_s=window_s,
)
aligned_in_s, content_score = align_in_point_by_content(
check_beat,
motion_in_s,
cfg,
search_window_s=min(window_s, 0.8),
)
aligned_in_s = max(scene.start_s, min(aligned_in_s, max(scene.start_s, scene.end_s - check_beat.duration_s)))
ok, verify_reason = validate_match_window_with_vision(
check_beat,
@@ -685,7 +679,7 @@ def _filter_semantically_invalid_vision_matches(results: list, beats: list, cfg)
verify_reason,
)
return None
score = max(content_score, min(0.99, semantic_score * 0.75 + motion_score * 0.25))
score = max(combined_score, min(0.99, semantic_score * 0.70 + motion_score * 0.20 + content_score * 0.10))
return scene, aligned_in_s, score, f"{reason}; {verify_reason}"
kept = []
@@ -728,7 +722,82 @@ def _filter_semantically_invalid_vision_matches(results: list, beats: list, cfg)
valid = False
break
if valid:
kept.append(result)
repaired = False
if getattr(result, "segments", ()):
new_segments = []
repair_reasons = []
changed = False
for segment in result.segments:
scene = scenes_by_id.get(segment.scene_id)
if scene is None or scene.duration_s <= max(segment.duration_s * 1.6, 6.0):
new_segments.append(segment)
continue
segment_beat = replace(
beat,
start_s=beat.start_s + segment.trailer_offset_s,
end_s=beat.start_s + segment.trailer_offset_s + segment.duration_s,
)
repair = realign_window(segment_beat, segment.scene_id)
if repair is None:
new_segments.append(segment)
continue
repair_scene, aligned_in_s, score, repair_reason = repair
if abs(aligned_in_s - segment.in_point_s) <= 1.0 / cfg.export.edl_frame_rate:
new_segments.append(segment)
continue
changed = True
repair_reasons.append(repair_reason)
new_segments.append(replace(
segment,
scene_id=repair_scene.scene_id,
in_point_s=aligned_in_s,
out_point_s=aligned_in_s + segment.duration_s,
match_score=score,
is_confirmed=score >= cfg.cv.deep_scan.match_threshold,
))
if changed and new_segments:
first = new_segments[0]
repaired_score = min(seg.match_score for seg in new_segments)
logger.info(
"Beat %d: realigned semantically valid long scene by motion/action windows (%s)",
result.beat_id,
"; ".join(repair_reasons),
)
kept.append(replace(
result,
scene_id=first.scene_id,
in_point_s=first.in_point_s,
out_point_s=first.out_point_s,
in_point_frame=int(first.in_point_s * cfg.export.edl_frame_rate),
match_score=repaired_score,
is_confirmed=repaired_score >= cfg.cv.deep_scan.match_threshold,
segments=tuple(new_segments),
))
repaired = True
else:
scene = scenes_by_id.get(result.scene_id)
if scene is not None and scene.duration_s > max(result.duration_s * 1.6, 6.0):
repair = realign_window(beat, result.scene_id)
if repair is not None:
repair_scene, aligned_in_s, score, repair_reason = repair
if abs(aligned_in_s - result.in_point_s) > 1.0 / cfg.export.edl_frame_rate:
logger.info(
"Beat %d: realigned semantically valid long scene by motion/action window (%s)",
result.beat_id,
repair_reason,
)
kept.append(replace(
result,
scene_id=repair_scene.scene_id,
in_point_s=aligned_in_s,
out_point_s=aligned_in_s + result.duration_s,
in_point_frame=int(aligned_in_s * cfg.export.edl_frame_rate),
match_score=score,
is_confirmed=score >= cfg.cv.deep_scan.match_threshold,
))
repaired = True
if not repaired:
kept.append(result)
else:
if getattr(result, "segments", ()):
new_segments = []
+69
View File
@@ -871,6 +871,75 @@ def align_in_point_by_motion(
return best_in, max(0.0, best_score)
def align_in_point_by_content_and_motion(
beat: TrailerBeat,
estimated_in_point_s: float,
cfg: AppConfig,
search_window_s: float | None = None,
) -> tuple[float, float, float, float]:
"""
Align a candidate using still-frame content and motion phase together.
Running content and motion as separate passes can overshoot short action
phases: one pass may land on the right broad gesture and the next can slide
to a visually similar but later posture. A joint score keeps the in-point
tied to the same frame hypothesis throughout the local search.
"""
templates = _prepare_beat_templates(beat, cfg)
motion_templates = _prepare_motion_templates(beat, cfg)
if not templates:
return estimated_in_point_s, 0.0, 0.0, 0.0
with open_video(cfg.paths.source_movie) as cap:
fps = float(cap.get(cv2.CAP_PROP_FPS)) or cfg.export.edl_frame_rate
frame_step_s = 1.0 / fps
window_s = (
search_window_s
if search_window_s is not None
else cfg.cv.deep_scan.content_align_window_seconds
)
start_s = max(0.0, estimated_in_point_s - window_s)
end_s = estimated_in_point_s + window_s
tie_delta = cfg.cv.deep_scan.start_tie_break_score_delta
best_in = estimated_in_point_s
best_score = -1.0
best_content = -1.0
best_motion = -1.0
t = start_s
while t <= end_s:
content_score = _content_alignment_score(cap, t, templates, cfg)
motion_score = (
_motion_phase_score(cap, t, motion_templates, cfg)
if len(motion_templates) >= 2
else content_score
)
if content_score < 0 or motion_score < 0:
t = round(t + frame_step_s, 6)
continue
raw_score = content_score * 0.64 + motion_score * 0.36
anchor_penalty = min(0.18, abs(t - estimated_in_point_s) * 0.05)
score = raw_score - anchor_penalty
if score > best_score + tie_delta:
best_score = score
best_in = t
best_content = content_score
best_motion = motion_score
elif score >= best_score - tie_delta:
current_distance = abs(t - estimated_in_point_s)
best_distance = abs(best_in - estimated_in_point_s)
if current_distance < best_distance or (
abs(current_distance - best_distance) <= frame_step_s * 0.5
and t < best_in
):
best_in = t
best_content = content_score
best_motion = motion_score
t = round(t + frame_step_s, 6)
return best_in, max(0.0, best_score), max(0.0, best_content), max(0.0, best_motion)
def estimate_usable_source_duration(
beat: TrailerBeat,
in_point_s: float,