Compare commits

...

39 Commits

Author SHA1 Message Date
Melbar fa40821319 Update cutter report 2026-05-18 08:48:26 +02:00
Melbar 68ec775916 Auto-update cutter report 2026-05-09 19:06
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-09 19:06:02 +02:00
Melbar 3b42c5d018 Mark trailer title cards as graphics 2026-05-09 18:48:24 +02:00
Melbar f3c3a9cfd4 Auto-update cutter report 2026-05-09 18:46
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-09 18:46:52 +02:00
Melbar e966a4c321 Filter cached vision action windows 2026-05-09 18:30:13 +02:00
Melbar 45b5376cef Auto-update cutter report 2026-05-09 18:28
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-09 18:28:33 +02:00
Melbar 4b3894a812 Auto-update cutter report 2026-05-09 18:22
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-09 18:22:35 +02:00
Melbar 3ad2b51e56 Auto-update cutter report 2026-05-09 18:04
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-09 18:04:24 +02:00
Melbar c16e46fb9d Auto-update cutter report 2026-05-09 18:03
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-09 18:03:14 +02:00
Melbar 8ca6d4b696 Auto-update cutter report 2026-05-09 18:02
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-09 18:02:11 +02:00
Melbar b771c6792b Auto-update cutter report 2026-05-09 18:01
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-09 18:01:01 +02:00
Melbar 6bf3ab6626 Auto-update cutter report 2026-05-09 17:59
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-09 17:59:15 +02:00
Melbar 9a5abd5312 Auto-update cutter report 2026-05-09 17:55
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-09 17:55:55 +02:00
Melbar b2abdafc7a Auto-update cutter report 2026-05-09 17:53
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-09 17:53:39 +02:00
Melbar 02e9fee982 Auto-update cutter report 2026-05-09 17:36
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-09 17:36:04 +02:00
Melbar 5425939a84 Auto-update cutter report 2026-05-09 17:29
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-09 17:29:01 +02:00
Melbar ed7b083dca Recover weak low-light matches via vision 2026-05-09 17:26:10 +02:00
Melbar ae3c2b1b13 Improve local phase retuning 2026-05-09 12:35:33 +02:00
Melbar 71117a8a3b Auto-update cutter report 2026-05-09 12:30
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-09 12:30:17 +02:00
Melbar c1425003c1 Normalize visible island segments 2026-05-09 11:29:07 +02:00
Melbar bcaf0417b3 Recover short low-light vibe matches 2026-05-09 10:38:57 +02:00
Melbar f63d65fcd2 Handle fade-led segment phase ties 2026-05-09 10:11:36 +02:00
Melbar c08ba97d37 Improve multi-shot phase retune 2026-05-09 09:36:11 +02:00
Melbar a275b2efb6 Retune weak multi-shot segment phases 2026-05-09 05:10:38 +02:00
Melbar fab6c53698 Remove legacy match report 2026-05-09 04:33:53 +02:00
Melbar c5b7d61451 Restore visible beat 14 cutter candidate 2026-05-09 04:31:14 +02:00
Melbar acafe538b2 Tighten cutter phase span validation 2026-05-08 14:56:44 +02:00
Melbar 10e27afc8d Make cutter report the only generated review report 2026-05-08 14:29:49 +02:00
Melbar e335fffe92 Mask timecode in phase refine and guard cutter scene starts 2026-05-08 14:18:27 +02:00
Melbar bdc9e4ab31 Clamp cutter clips to source scene start 2026-05-08 14:11:02 +02:00
Melbar 430a81a988 Constrain hi-res phase refine and update beat 14 2026-05-08 13:45:09 +02:00
Melbar 5611902eb5 Update cutter report for beat 14 compare clip 2026-05-08 13:21:35 +02:00
Melbar 4eeecca80d Fix cutter compare fallback for single-shot matches 2026-05-08 13:18:56 +02:00
Melbar 5407f08fbc Auto-update cutter report 2026-05-08 12:46
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-08 12:46:57 +02:00
Melbar 0baedb3a17 Auto-update cutter report 2026-05-08 12:22
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-08 12:22:35 +02:00
Melbar d83fced8d2 Fix multi-shot matching: increase cut correlation threshold to properly segment multi-island beats 2026-05-08 12:16:09 +02:00
Melbar 4fe1d35f1a Fix multi-shot matching: Always use continuity seed for first island to prevent wrong scene jumps 2026-05-08 11:50:13 +02:00
Melbar 730b5ef3c0 Auto-update cutter report 2026-05-08 11:31
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-08 11:31:15 +02:00
Melbar f20f89b06b Add hi-res phase refinement for intra-scene phase matching (Beat 03 investigation) 2026-05-08 10:52:11 +02:00
39 changed files with 1474 additions and 904 deletions
+6
View File
@@ -0,0 +1,6 @@
* text=auto
.gitattributes text eol=lf
*.py text eol=lf
*.md text eol=lf
*.html text eol=lf
*.ps1 text eol=crlf
+2 -1
View File
File diff suppressed because one or more lines are too long
+36 -35
View File
File diff suppressed because one or more lines are too long
+10 -2
View File
@@ -36,6 +36,10 @@ Was du bekommst sind zwei Dateien, mit denen du arbeitest:
5. Bei `MAN.`-Beats selbst die passende Stelle im Spielfilm suchen — die
Beschreibung im Report sagt dir was du suchst.
Für die visuelle Kontrolle ist zusätzlich **`CUTTER_REPORT.html`** relevant:
er enthält die frame-locked Compare-Clips. Der alte `match_report.html` ist
nicht mehr Teil des Workflows.
Alles andere unten ist Hintergrund für den Tool-Verantwortlichen.
---
@@ -48,7 +52,7 @@ Alles andere unten ist Hintergrund für den Tool-Verantwortlichen.
| **1** | Schneller Vibe-Check: für jeden Beat die Top-K ähnlichsten Szenen aus dem Spielfilm vorauswählen (Histogramm + pHash). |
| **2** | Optional: Vision-LLM beschreibt unsichere Szenen mit 3-Frame-Samples; die Beschreibungen liegen gecached vor. |
| **3** | Frame-genaue Verfeinerung pro Beat (OpenCV-Templatematching, Bewegungsphasen-Vergleich). |
| **4** | Phasen-Reparatur: bei segmentierten Beats wird die Bewegungsphase im Source mit der sichtbaren Trailerphase abgeglichen. |
| **4** | Phasen-Reparatur: bei segmentierten Beats wird die Bewegungsphase lokal um den gefundenen Inpoint saliency- und motion-gewichtet mit der sichtbaren Trailerphase abgeglichen. |
| **5** | Recovery: Beats ohne Treffer werden via Vision-Phasensuche in den Top-K Szenen nochmal probiert. |
| **6** | Export als FCPXML 1.10 oder CMX-3600-EDL plus `CUTTER_REPORT.md`. |
@@ -56,6 +60,10 @@ Alles andere unten ist Hintergrund für den Tool-Verantwortlichen.
Vergleich ausgeblendet, damit Title-Cards, Logos und Letterbox die Treffer
nicht verfälschen.
**Cutter-Report-Caching:** Vorhandene Compare-Clips werden wiederverwendet.
Bei gezielten Rematches wird nur der betroffene Beat neu gerendert, damit der
Report schnell aktuell bleibt und keine unnötigen Videoartefakte neu entstehen.
**Wichtig:** Auch wenn Vision aktiviert ist — der finale Match bleibt
CV-verifiziert. Das LLM liefert nur zusätzliche Suchanker.
@@ -159,7 +167,7 @@ wenn sich das zugrundeliegende Match geändert hat.
| Source-Clip zeigt richtige Szene, aber falsche Bewegungsphase | `python cli.py rematch --beat N --refine` — schiebt den Inpoint frame-genau aus dem Bildinhalt. |
| Score zu niedrig, andere Szene wäre richtig | `python cli.py match --beat N --vision` — vollständiger Re-Match nur für diesen Beat mit Vision-Phasenprüfung. |
| Match offensichtlich falsche Szene | `python cli.py rematch --beat N --threshold 0.50` — Schwelle absenken, neuer globaler Scan nur für diesen Beat. |
| Beat ist Schwarzbild / Logo / Titel und sollte gar nicht matchen | nichts tun, der Status `MAN.` im `CUTTER_REPORT.md` ist korrekt. |
| Beat ist Schwarzbild / Logo / Titel und sollte gar nicht matchen | nichts tun, der Status `GFX` im `CUTTER_REPORT.md` ist korrekt. |
### Algorithmische Details
+671 -43
View File
@@ -104,10 +104,6 @@ def _auto_commit_push_reports(project_root: "Path") -> None: # type: ignore[nam
report_globs = [
"CUTTER_REPORT.html",
"CUTTER_REPORT.md",
"output/report/match_report.html",
"output/report/beat_*_compare.mp4",
"output/report/beat_*_src.mp4",
"output/report/beat_*_ref.mp4",
"output/cutter_clips/beat_*_compare.mp4",
"output/cutter_clips/beat_*_source.mp4",
"output/cutter_clips/beat_*_source_seg*.mp4",
@@ -135,7 +131,7 @@ def _auto_commit_push_reports(project_root: "Path") -> None: # type: ignore[nam
log.warning("Auto-commit/push failed (non-fatal): %s", exc)
def _regenerate_cutter_report(cfg: "AppConfig") -> None: # type: ignore[name-defined]
def _regenerate_cutter_report(cfg: "AppConfig", force_beats: set[int] | None = None) -> None: # type: ignore[name-defined]
"""Re-render CUTTER_REPORT.{md,html} with Frame-Locked Compare clips.
Called from every match-style command after the cache is written so all
@@ -145,16 +141,23 @@ def _regenerate_cutter_report(cfg: "AppConfig") -> None: # type: ignore[name-de
"""
project_root = cfg.paths.cache_dir.parent
try:
import os
from scripts.generate_cutter_report import render_report
md, html = render_report(project_root, with_stills=True, with_clips=True)
old_force = os.environ.get("CUTTER_REPORT_FORCE_BEATS")
try:
if force_beats:
os.environ["CUTTER_REPORT_FORCE_BEATS"] = ",".join(str(b) for b in sorted(force_beats))
md, html = render_report(project_root, with_stills=True, with_clips=True)
finally:
if force_beats:
if old_force is None:
os.environ.pop("CUTTER_REPORT_FORCE_BEATS", None)
else:
os.environ["CUTTER_REPORT_FORCE_BEATS"] = old_force
(project_root / "CUTTER_REPORT.md").write_text(md, encoding="utf-8")
(project_root / "CUTTER_REPORT.html").write_text(html, encoding="utf-8")
legacy_report_path = project_root / "output" / "report" / "match_report.html"
legacy_report_path.parent.mkdir(parents=True, exist_ok=True)
legacy_report_path.write_text(html, encoding="utf-8")
logging.getLogger(__name__).info("Cutter report regenerated (md + html + compare clips + legacy match_report.html)")
logging.getLogger(__name__).info("Cutter report regenerated (md + html + compare clips)")
except Exception as exc:
logging.getLogger(__name__).warning("Cutter report regen failed: %s", exc)
@@ -278,9 +281,57 @@ def _normalize_cached_results(beats: list, results: list, cfg) -> list:
for result in results:
beat = beats_by_id.get(result.beat_id)
if getattr(result, "segments", ()):
segment_duration = sum(max(0.0, float(s.duration_s)) for s in result.segments)
segment_threshold = cfg.cv.deep_scan.multi_shot_segment_threshold
current_islands = _reference_scoreable_segments(beat, cfg) if beat is not None else []
repaired_segments = []
source_segments = list(result.segments)
if beat is not None and len(source_segments) == 1 and len(current_islands) == 1:
island_start_s, island_end_s = current_islands[0]
island_duration_s = max(0.0, island_end_s - island_start_s)
segment = source_segments[0]
if (
abs(float(segment.trailer_offset_s) - island_start_s) > 0.04
or abs(float(segment.duration_s) - island_duration_s) > 0.08
):
from dataclasses import replace as _replace
source_segments[0] = _replace(
segment,
trailer_offset_s=island_start_s,
duration_s=island_duration_s,
out_point_s=float(segment.in_point_s) + island_duration_s,
)
for segment in source_segments:
if float(segment.match_score) < segment_threshold:
scene = _scene_by_id_light(scenes, segment.scene_id)
if beat is not None and scene is not None:
segment_beat = replace(
beat,
start_s=beat.start_s + float(segment.trailer_offset_s),
end_s=beat.start_s + float(segment.trailer_offset_s) + float(segment.duration_s),
)
probe = _phase_probe_segment_in_scene(
segment_beat,
scene,
float(segment.in_point_s),
cfg,
)
if probe is not None:
in_point_s, _phase_score = probe
segment = replace(
segment,
in_point_s=in_point_s,
out_point_s=in_point_s + float(segment.duration_s),
match_score=max(float(segment.match_score), float(_phase_score)),
is_confirmed=float(_phase_score) >= cfg.cv.deep_scan.match_threshold,
)
repaired_segments.append(segment)
valid_segments = tuple(repaired_segments)
if not valid_segments:
continue
segment_duration = sum(max(0.0, float(s.duration_s)) for s in valid_segments)
weighted_score = (
sum(max(0.0, float(s.duration_s)) * float(s.match_score) for s in result.segments)
sum(max(0.0, float(s.duration_s)) * float(s.match_score) for s in valid_segments)
/ segment_duration
if segment_duration > 0 else result.match_score
)
@@ -295,7 +346,15 @@ def _normalize_cached_results(beats: list, results: list, cfg) -> list:
coverage = segment_duration / coverage_target
if coverage < cfg.cv.deep_scan.min_duration_coverage:
continue
normalized.append(replace(result, match_score=weighted_score))
first_segment = valid_segments[0]
normalized.append(replace(
result,
scene_id=first_segment.scene_id,
in_point_s=first_segment.in_point_s,
out_point_s=first_segment.out_point_s,
match_score=weighted_score,
segments=valid_segments,
))
continue
if result.match_score < cfg.cv.deep_scan.provisional_match_threshold:
@@ -325,6 +384,7 @@ def _normalize_cached_results(beats: list, results: list, cfg) -> list:
fps = _scene_fps_light(scene, cfg)
adjusted_in_s = result.in_point_s
phase_changed = False
scene_changed = int(scene["scene_id"]) != result.scene_id
starts_before_scene = result.in_point_s < float(scene["start_s"])
if scene_changed or starts_before_scene or result.duration_s <= 0.12:
@@ -333,6 +393,25 @@ def _normalize_cached_results(beats: list, results: list, cfg) -> list:
scene = _scene_for_time_light(scenes, adjusted_in_s, cfg) or scene
fps = _scene_fps_light(scene, cfg)
should_phase_probe = (
scene_changed
or starts_before_scene
or not result.is_confirmed
or result.match_score < cfg.cv.deep_scan.match_threshold
)
phase_score = result.match_score
if should_phase_probe:
probe = _phase_probe_segment_in_scene(beat, scene, adjusted_in_s, cfg)
if probe is not None:
probed_in_s, probed_score = probe
max_shift_s = max(0.12, min(0.75, beat.duration_s * 0.35))
if abs(probed_in_s - adjusted_in_s) <= max_shift_s:
adjusted_in_s = probed_in_s
phase_changed = True
phase_score = max(float(result.match_score), float(probed_score))
scene = _scene_for_time_light(scenes, adjusted_in_s, cfg) or scene
fps = _scene_fps_light(scene, cfg)
matchable_duration_s = beat.duration_s
try:
from src.cv.global_scan import estimate_matchable_reference_duration
@@ -355,6 +434,7 @@ def _normalize_cached_results(beats: list, results: list, cfg) -> list:
if (
scene_changed
or starts_before_scene
or phase_changed
or result.duration_s <= 0.12
or result.out_point_s > adjusted_in_s + max_duration_s + (1.0 / fps)
):
@@ -364,6 +444,8 @@ def _normalize_cached_results(beats: list, results: list, cfg) -> list:
in_point_s=adjusted_in_s,
out_point_s=adjusted_in_s + max_duration_s,
in_point_frame=int(adjusted_in_s * fps),
match_score=phase_score,
is_confirmed=phase_score >= cfg.cv.deep_scan.match_threshold,
)
coverage = (
@@ -554,7 +636,7 @@ def _reference_scoreable_segments(beat, cfg) -> list[tuple[float, float]]:
t = 0.0
while t <= beat.duration_s:
frame = grab_frame_at_path(beat.trailer_path, beat.start_s + t)
scoreable = frame is not None and _is_scoreable_reference_frame(frame, cfg)
scoreable = frame is not None and is_visible(frame)
if scoreable:
if start is None:
start = t
@@ -832,7 +914,7 @@ def _merge_best_results(existing: list, candidates: list, cfg) -> list:
def _recover_unmatched_beats_via_vision(results: list, beats: list, cfg) -> list:
"""Try a vision-led search for beats that ended up without a match.
"""Try a vision-led search for beats that ended up weak or unmatched.
For each unmatched beat that has scoreable visual content (i.e. not pure
fade/title-card material), this pass:
@@ -849,7 +931,7 @@ def _recover_unmatched_beats_via_vision(results: list, beats: list, cfg) -> list
Confirmed and provisional matches both stay subject to the same thresholds
used elsewhere; this only adds matches that pass the same quality gates.
"""
if not cfg.vision.enabled or not beats:
if not beats:
return results
from dataclasses import replace
@@ -860,17 +942,28 @@ def _recover_unmatched_beats_via_vision(results: list, beats: list, cfg) -> list
from src.llm.vision_cache import find_action_window_in_scene, validate_match_window_with_vision
logger = logging.getLogger(__name__)
matched_ids = {r.beat_id for r in results}
unmatched = [b for b in beats if b.beat_id not in matched_ids]
if not unmatched:
results_by_id = {r.beat_id: r for r in results}
recovery_targets = [
b for b in beats
if (
b.beat_id not in results_by_id
or (
not results_by_id[b.beat_id].is_confirmed
and results_by_id[b.beat_id].match_score < cfg.cv.deep_scan.match_threshold
)
)
]
if not recovery_targets:
return results
scenes = build_scene_index(cfg)
if not scenes:
return results
new_results = list(results)
for beat in unmatched:
target_ids = {b.beat_id for b in recovery_targets}
new_results = [r for r in results if r.beat_id not in target_ids]
replaced_results = {r.beat_id: r for r in results if r.beat_id in target_ids}
for beat in recovery_targets:
try:
islands = _reference_scoreable_segments(beat, cfg)
except Exception:
@@ -907,6 +1000,79 @@ def _recover_unmatched_beats_via_vision(results: list, beats: list, cfg) -> list
scenes_by_id = {s.scene_id: s for s in scenes}
best = None # (score, scene, in_s, dur_s, reason)
try:
from src.llm.vision_cache import (
_load_cache,
_semantic_action_groups,
_semantic_match_score,
_STRONG_ACTION_GROUPS,
)
cache = _load_cache(cfg)
items = cache.get("items", {})
beat_desc = ""
if isinstance(items, dict):
for item in items.values():
if (
isinstance(item, dict)
and item.get("kind") == "beat"
and item.get("item_id") == beat.beat_id
):
beat_desc = str(item.get("description", ""))
break
beat_actions = _semantic_action_groups(beat_desc) & _STRONG_ACTION_GROUPS if beat_desc else set()
identity_vocab = {
"woman", "women", "man", "men", "girl", "boy", "child",
"blonde", "hair", "face", "mouth", "eyes", "profile",
"close-up", "closeup",
}
beat_identity = {term for term in identity_vocab if term in beat_desc.lower()}
distinctive_identity = {
term for term in ("woman", "women", "blonde", "mouth", "face")
if term in beat_desc.lower()
}
if beat_actions and isinstance(items, dict):
for item in items.values():
if not isinstance(item, dict) or item.get("kind") != "action_window":
continue
scene = scenes_by_id.get(item.get("item_id"))
desc = str(item.get("description", ""))
source_actions = _semantic_action_groups(desc)
if scene is None or not beat_actions <= source_actions:
continue
source_text = desc.lower()
positive_source_text = source_text.split('"negatives"', 1)[0]
identity_overlap = {term for term in beat_identity if term in source_text}
if len(beat_identity) >= 2 and len(identity_overlap) < 2:
continue
if distinctive_identity and not any(term in positive_source_text for term in distinctive_identity):
continue
if "mouth" in beat_desc.lower() and "mouth" not in positive_source_text:
continue
if "dark interior" in beat_desc.lower() and (
"interior" not in positive_source_text or "dark" not in positive_source_text
):
continue
score, reason = _semantic_match_score(beat_desc, desc)
if score < max(0.60, cfg.cv.deep_scan.provisional_match_threshold):
continue
try:
in_s = float(item.get("start_s"))
out_s = float(item.get("end_s"))
except (TypeError, ValueError):
continue
duration_s = max(0.32, min(anchor_beat.duration_s, out_s - in_s))
candidate = (
min(0.99, score),
scene,
in_s,
duration_s,
f"cached vision action; {reason}",
)
if best is None or candidate[0] > best[0]:
best = candidate
except Exception as exc:
logger.debug("Beat %d: cached vision fallback failed (%s)", beat.beat_id, exc)
seen = set()
for hit in hits[: cfg.cv.deep_scan.scene_seed_top_k]:
scene = scenes_by_id.get(hit.scene_id)
@@ -933,7 +1099,10 @@ def _recover_unmatched_beats_via_vision(results: list, beats: list, cfg) -> list
)
except Exception as exc:
logger.debug("Beat %d: align failed for scene %d (%s)", beat.beat_id, scene.scene_id, exc)
continue
aligned_in_s = start_s
combined_score = semantic_score
content_score = 0.0
motion_score = 0.0
aligned_in_s = max(scene.start_s, min(aligned_in_s, max(scene.start_s, scene.end_s - anchor_beat.duration_s)))
try:
@@ -963,6 +1132,8 @@ def _recover_unmatched_beats_via_vision(results: list, beats: list, cfg) -> list
combined_score,
min(0.99, semantic_score * 0.65 + motion_score * 0.18 + content_score * 0.09 + usable_score * 0.08),
)
if semantic_score >= max(0.60, cfg.cv.deep_scan.provisional_match_threshold):
final_score = max(final_score, semantic_score)
if final_score < cfg.cv.deep_scan.provisional_match_threshold:
continue
candidate = (final_score, scene, aligned_in_s, usable_duration_s, f"recovery; {reason}; {verify_reason}")
@@ -970,6 +1141,9 @@ def _recover_unmatched_beats_via_vision(results: list, beats: list, cfg) -> list
best = candidate
if best is None:
previous = replaced_results.get(beat.beat_id)
if previous is not None:
new_results.append(previous)
continue
score, scene, aligned_in_s, usable_duration_s, repair_reason = best
logger.info(
@@ -996,6 +1170,97 @@ def _recover_unmatched_beats_via_vision(results: list, beats: list, cfg) -> list
return sorted(new_results, key=lambda r: r.beat_id)
def _recover_short_lowlight_vibe_matches(results: list, beats: list, cfg) -> list:
"""Keep obvious short low-light scene hits as provisional instead of no-match.
Short blue/dark dialogue shots can be correctly ranked by scene-level
histogram/pHash but then rejected by the stricter content aligner because
the shot contains little texture, motion blur, or trailer timecode overlay.
This fallback only accepts the top vibe scene when it has a clear margin and
the local content scan still finds a usable in-point.
"""
from src.core.models import MatchResult, Scene
from src.cv.global_scan import _content_alignment_score, _content_alignment_templates
from src.cv.vibe_check import run_vibe_check
from src.cv.frame_extractor import open_video
matched_ids = {r.beat_id for r in results}
targets = [b for b in beats if b.beat_id not in matched_ids and b.duration_s <= 2.25]
if not targets:
return results
raw_scenes = _load_scene_cache_light(cfg)
scenes = [
Scene(
scene_id=int(s["scene_id"]),
source_path=cfg.paths.source_movie,
start_s=float(s["start_s"]),
end_s=float(s["end_s"]),
start_frame=int(s["start_frame"]),
end_frame=int(s["end_frame"]),
luma_hist=bytes.fromhex(s["luma_hist"]) if s.get("luma_hist") else None,
sat_hist=bytes.fromhex(s["sat_hist"]) if s.get("sat_hist") else None,
phash=s.get("phash"),
)
for s in raw_scenes
]
scenes_by_id = {s.scene_id: s for s in scenes}
recovered = list(results)
with open_video(cfg.paths.source_movie) as cap:
for beat in targets:
templates = _content_alignment_templates(beat, cfg)
if not templates:
continue
hits = run_vibe_check(
beat,
scenes,
top_k=6,
hist_method=cfg.cv.vibe_check.hist_compare_method,
phash_max_distance=64,
)
if len(hits) < 2:
continue
top, second = hits[0], hits[1]
if top.combined_score < 0.74 or top.combined_score - second.combined_score < 0.03:
continue
scene = scenes_by_id.get(top.scene_id)
if scene is None or scene.duration_s < max(0.5, beat.duration_s):
continue
best: tuple[float, float] | None = None
scan_end = max(scene.start_s, scene.end_s - beat.duration_s)
step_s = 0.12
t = scene.start_s
while t <= scan_end:
score = _content_alignment_score(cap, t, templates, cfg)
if best is None or score > best[0]:
best = (score, t)
t = round(t + step_s, 6)
if best is None or best[0] < 0.15:
continue
content_score, in_point_s = best
final_score = max(
cfg.cv.deep_scan.provisional_match_threshold,
min(0.64, top.combined_score * 0.55 + content_score * 0.45),
)
recovered.append(MatchResult(
beat_id=beat.beat_id,
scene_id=scene.scene_id,
source_path=scene.source_path,
in_point_s=in_point_s,
out_point_s=in_point_s + beat.duration_s,
in_point_frame=int(in_point_s * cfg.export.edl_frame_rate),
match_score=final_score,
match_location=(0, 0),
is_confirmed=False,
segments=tuple(),
))
return sorted(recovered, key=lambda r: r.beat_id)
def _filter_semantically_invalid_vision_matches(results: list, beats: list, cfg) -> list:
"""Drop vision-enabled matches whose final action phase contradicts the beat."""
if not cfg.vision.enabled or not results:
@@ -1371,6 +1636,41 @@ def _attach_visual_segments(results: list, beats: list, cfg) -> list:
if not segment_matches:
continue
seg = segment_matches[0]
if seg.match_score < cfg.cv.deep_scan.multi_shot_segment_threshold:
repaired = _local_same_scene_segment_match(
segment_beat,
beat,
start_s,
cached + expanded,
cfg,
)
if (
repaired is None
or repaired.match_score
< max(
cfg.cv.deep_scan.multi_shot_segment_threshold,
seg.match_score + cfg.cv.deep_scan.duration_tie_break_score_delta,
)
):
scenes = _load_scene_cache_light(cfg)
scene = _scene_by_id_light(scenes, seg.scene_id)
probe = (
_phase_probe_segment_in_scene(segment_beat, scene, seg.in_point_s, cfg)
if scene is not None else None
)
if probe is None:
continue
in_point_s, _phase_score = probe
from dataclasses import replace as _replace
seg = _replace(
seg,
in_point_s=in_point_s,
out_point_s=in_point_s + seg.duration_s,
match_score=max(seg.match_score, _phase_score),
is_confirmed=_phase_score >= cfg.cv.deep_scan.match_threshold,
)
else:
seg = repaired
seg_dur = min(max(0.0, end_s - start_s), max(0.0, seg.duration_s))
segments.append(
MatchSegment(
@@ -1471,21 +1771,12 @@ def _match_unmatched_visual_segments(
start_s=beat.start_s + start_s,
end_s=beat.start_s + end_s,
)
if island_idx == 0:
# First island of an unmatched multi-shot beat: search globally
# without a continuity bias from the previous beat. Continuity
# assumes the shot follows the previous beat in the source, but
# the lead shot of a multi-shot beat is often an insert cut from
# a completely different scene. A wrong seed with score 0.92
# would push the real match out of the refinement candidate pool.
continuity = {}
else:
continuity = _continuity_seed_in_points(
beat.beat_id,
[b if b.beat_id != beat.beat_id else segment_beat for b in beats],
cached + expanded,
cfg,
)
continuity = _continuity_seed_in_points(
beat.beat_id,
[b if b.beat_id != beat.beat_id else segment_beat for b in beats],
cached + expanded,
cfg,
)
segment_matches = []
if beat.beat_id not in skip_global_segment_scan_for:
segment_matches = _run_segment_match(segment_beat, continuity, cfg, allow_fullscan=True)
@@ -1501,7 +1792,10 @@ def _match_unmatched_visual_segments(
if recovered:
rec = recovered[0]
seg_dur = min(max(0.0, end_s - start_s), max(0.0, rec.duration_s))
if seg_dur > 0:
if (
seg_dur > 0
and rec.match_score >= cfg.cv.deep_scan.multi_shot_segment_threshold
):
segments.append(MatchSegment(
trailer_offset_s=start_s,
duration_s=seg_dur,
@@ -1523,6 +1817,8 @@ def _match_unmatched_visual_segments(
segments.append(local_segment)
continue
seg = segment_matches[0]
if seg.match_score < cfg.cv.deep_scan.multi_shot_segment_threshold:
continue
seg_dur = min(max(0.0, end_s - start_s), max(0.0, seg.duration_s))
segments.append(
MatchSegment(
@@ -1594,7 +1890,13 @@ def _local_same_scene_segment_match(segment_beat, beat, segment_offset_s: float,
cfg.cv.deep_scan.provisional_content_threshold * 0.70,
cfg.cv.deep_scan.provisional_match_threshold,
)
step_s = max(1.0 / cfg.export.edl_frame_rate, 0.04)
# Coarse repair scan over already plausible neighbouring scenes. A frame-step
# sweep across long dialogue scenes is slow and can overfit static layouts.
step_s = max(
cfg.vision.local_scan_step_s,
cfg.cv.deep_scan.content_align_sample_step_s,
0.25,
)
best: tuple[float, float, int] | None = None
with open_video(cfg.paths.source_movie) as cap:
for scene_id in scene_ids:
@@ -1603,12 +1905,14 @@ def _local_same_scene_segment_match(segment_beat, beat, segment_offset_s: float,
continue
start_s = max(0.0, float(scene["start_s"]) - 0.25)
end_s = max(start_s, float(scene["end_s"]) - max(0.04, segment_beat.duration_s) + 0.25)
max_points = max(4, min(48, int(cfg.vision.local_scan_max_points_per_scene)))
scene_step_s = max(step_s, (end_s - start_s) / max_points)
t = start_s
while t <= end_s:
score = _content_alignment_score(cap, t, templates, cfg)
if best is None or score > best[0]:
best = (score, t, int(scene_id))
t = round(t + step_s, 6)
t = round(t + scene_step_s, 6)
if best is None or best[0] < min_score:
return None
@@ -1626,6 +1930,186 @@ def _local_same_scene_segment_match(segment_beat, beat, segment_offset_s: float,
)
def _phase_probe_segment_in_scene(segment_beat, scene: dict, original_in_s: float, cfg):
"""Retune a weak multi-shot segment inside its own scene using saliency-weighted frames."""
import cv2
import numpy as np
offsets = [0.0, 0.16, 0.32, 0.48, 0.64, 0.80, 0.96, 1.12]
size = (160, 90)
def prepared_gray(frame):
if frame is None:
return None
h, w = frame.shape[:2]
frame = frame.copy()
# Timecode overlays and letterbox edges are trailer/source-specific and
# should not pull the phase toward the wrong moment.
frame[: int(h * 0.16), : int(w * 0.32)] = 0
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
gray = cv2.resize(gray, size)
return cv2.equalizeHist(gray).astype("float32") / 255.0
def edge(gray):
return cv2.Canny((gray * 255).astype("uint8"), 45, 130).astype("float32") / 255.0
def pair_score(ref_gray, src_gray, mask):
if ref_gray is None or src_gray is None:
return None
pixel = 1.0 - float((np.abs(ref_gray - src_gray) * mask).sum())
edge_score = 1.0 - float((np.abs(edge(ref_gray) - edge(src_gray)) * mask).sum())
return 0.65 * pixel + 0.35 * edge_score
def frame_at(cap, t_s):
cap.set(cv2.CAP_PROP_POS_MSEC, t_s * 1000.0)
ok, frame = cap.read()
return frame if ok else None
trailer_cap = cv2.VideoCapture(str(cfg.paths.reference_trailer))
ref_candidates = []
fallback_items = []
for offset in offsets:
if offset > segment_beat.duration_s + 0.04:
continue
frame = frame_at(trailer_cap, segment_beat.start_s + offset)
ref = prepared_gray(frame)
if ref is None:
continue
fallback_items.append((offset, ref))
raw_gray = cv2.resize(cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY), size)
h, w = raw_gray.shape[:2]
raw_gray[: int(h * 0.16), : int(w * 0.32)] = 0
roi = raw_gray[int(h * 0.12) : int(h * 0.90), :]
mean_luma = float(roi.mean() / 255.0)
p90_luma = float(np.percentile(roi, 90) / 255.0)
contrast = float(roi.std() / 255.0)
ref_candidates.append((offset, ref, mean_luma, p90_luma, contrast))
transition_start = False
ref_items = []
if ref_candidates:
max_mean = max(item[2] for item in ref_candidates)
max_p90 = max(item[3] for item in ref_candidates)
transition_start = (
ref_candidates[0][2] < max_mean * 0.90
or ref_candidates[0][3] < max_p90 * 0.90
)
ref_items = [
(offset, ref)
for offset, ref, mean_luma, p90_luma, contrast in ref_candidates
if (
mean_luma >= max(0.16, max_mean * 0.82)
and p90_luma >= max(0.28, max_p90 * 0.86)
and contrast >= 0.035
)
]
if len(ref_items) < 4:
ref_items = fallback_items
if len(ref_items) < 4:
return None
ref_offsets = [item[0] for item in ref_items]
refs = [item[1] for item in ref_items]
align_offset = ref_offsets[0]
ref_offsets = [offset - align_offset for offset in ref_offsets]
ref_stack = np.stack(refs, axis=0)
edge_stack = np.stack([edge(ref) for ref in refs], axis=0)
# Static window/room edges are useful for finding the scene, but toxic for
# phase retuning inside a repeated dialogue shot. Bias the mask toward
# areas that actually change across the reference segment.
saliency = ref_stack.std(axis=0) * 3.0 + edge_stack.std(axis=0) * 0.75 + edge_stack.mean(axis=0) * 0.15
saliency[:, : int(size[0] * 0.12)] *= 0.15
saliency[: int(size[1] * 0.16), : int(size[0] * 0.32)] = 0.0
threshold = np.quantile(saliency, 0.66)
mask = (saliency >= threshold).astype("float32")
mask /= mask.sum() + 1e-6
scene_start = float(scene["start_s"])
scene_end = float(scene["end_s"])
center_t = max(scene_start, min(scene_end, original_in_s + align_offset))
retune_radius_s = max(4.0, min(12.0, segment_beat.duration_s * 2.5))
scan_start = max(scene_start, center_t - retune_radius_s)
scene_scan_end = min(scene_end, center_t + retune_radius_s)
scan_end = max(scan_start, scene_scan_end - max(0.04, segment_beat.duration_s - align_offset))
max_points = 400
step_s = max(0.04, (scan_end - scan_start) / max_points)
source_cap = cv2.VideoCapture(str(cfg.paths.source_movie))
source_fps = source_cap.get(cv2.CAP_PROP_FPS) or _scene_fps_light(scene, cfg)
stride = max(1, int(round(step_s * source_fps)))
start_frame = max(0, int(round(scan_start * source_fps)))
end_frame = max(start_frame, int(round(scene_scan_end * source_fps)))
times: list[float] = []
source_frames: list = []
frame_idx = start_frame
while frame_idx <= end_frame:
source_cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
ok, frame = source_cap.read()
if not ok:
break
times.append(frame_idx / source_fps)
source_frames.append(prepared_gray(frame))
frame_idx += stride
base_time = times[0] if times else scan_start
candidates: list[tuple[float, float, float]] = []
for i, t in enumerate(times):
if t > scan_end:
break
vals = []
src_for_offsets = []
for offset, ref in zip(ref_offsets, refs):
j = int(round((t + offset - base_time) / step_s))
if 0 <= j < len(source_frames):
src = source_frames[j]
score = pair_score(ref, src, mask)
else:
src = None
score = None
if score is not None:
vals.append(score)
src_for_offsets.append(src)
if len(vals) >= 4:
avg_score = sum(vals) / len(vals)
early_count = min(2, len(vals))
tail_count = min(2, len(vals))
early_score = sum(vals[:early_count]) / early_count
tail_score = sum(vals[-tail_count:]) / tail_count
motion_vals = []
for idx in range(1, min(len(refs), len(src_for_offsets))):
if src_for_offsets[idx - 1] is None or src_for_offsets[idx] is None:
continue
ref_motion = refs[idx] - refs[idx - 1]
src_motion = src_for_offsets[idx] - src_for_offsets[idx - 1]
motion_vals.append(1.0 - float((np.abs(ref_motion - src_motion) * mask).sum()))
motion_score = sum(motion_vals) / len(motion_vals) if motion_vals else avg_score
# Phase retuning must reject "same shot, wrong moment" matches.
# A plain average can hide a bad onset inside slow dialogue shots;
# keep the low-water mark, onset, and frame-to-frame motion influential.
phase_score = (
0.26 * avg_score
+ 0.24 * min(vals)
+ 0.24 * early_score
+ 0.08 * tail_score
+ 0.18 * motion_score
)
candidates.append((phase_score, min(vals), t))
if not candidates:
return None
candidates.sort(reverse=True)
best_score = candidates[0][0]
tie_window = 0.006 if transition_start else 0.002
near_tie = [c for c in candidates if c[0] >= best_score - tie_window]
if transition_start:
chosen = max(near_tie, key=lambda c: (c[1], c[0]))
else:
chosen = min(near_tie, key=lambda c: abs((c[2] - align_offset) - original_in_s))
return max(scene_start, chosen[2] - align_offset), chosen[0]
def cmd_match(args: argparse.Namespace, cfg) -> list:
from src.pipeline.matcher import run_matching
from dataclasses import replace
@@ -1699,6 +2183,7 @@ def cmd_match(args: argparse.Namespace, cfg) -> list:
results = _attach_visual_segments(results, beats, cfg)
results = _filter_semantically_invalid_vision_matches(results, beats, cfg)
results = _recover_unmatched_beats_via_vision(results, beats, cfg)
results = _recover_short_lowlight_vibe_matches(results, beats, cfg)
# A targeted one-beat match must NEVER delete or modify any other beat's
# cache entry. We deliberately re-load the raw cache from disk here so
@@ -1725,7 +2210,8 @@ def cmd_match(args: argparse.Namespace, cfg) -> list:
results_to_save = results
_save_results(results_to_save, cfg)
_regenerate_cutter_report(cfg)
force_report_beats = {int(args.beat)} if getattr(args, "beat", None) is not None else None
_regenerate_cutter_report(cfg, force_beats=force_report_beats)
print(f"\n{len(results)} / {len(beats)} beats matched.")
for r in results:
@@ -1941,6 +2427,141 @@ def cmd_run(args: argparse.Namespace, cfg) -> None:
cmd_export(args, cfg)
def cmd_preview(args: argparse.Namespace, cfg) -> None:
"""Assemble a rough preview video from cached source matches, with original audio."""
import subprocess
log = logging.getLogger(__name__)
results_path = _results_cache_path(cfg)
if not results_path.exists():
log.error("No match_results.json — run 'match' first.")
return
data = sorted(
json.loads(results_path.read_text(encoding="utf-8")),
key=lambda r: r["beat_id"],
)
beats_path = cfg.paths.cache_dir / "trailer_beats.json"
beats_by_id: dict = {}
if beats_path.exists():
for b in json.loads(beats_path.read_text(encoding="utf-8")):
beats_by_id[int(b["beat_id"])] = b
clip_width = 1280
fps = 25
out_dir = cfg.paths.output_dir / "preview_clips"
out_dir.mkdir(parents=True, exist_ok=True)
preview_out = cfg.paths.output_dir / "preview.mp4"
def _run(cmd: list, timeout: int = 120) -> bool:
r = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
if r.returncode != 0:
log.debug("ffmpeg stderr: %s", r.stderr[-600:])
return r.returncode == 0
def extract_with_audio(src: Path, start_s: float, duration_s: float, out: Path) -> bool:
preroll = 2.0 if start_s >= 2.0 else 0.0
input_seek = max(0.0, start_s - preroll)
accurate_seek = start_s - input_seek
return _run([
"ffmpeg", "-y", "-loglevel", "error",
"-ss", f"{input_seek:.3f}", "-i", str(src),
"-ss", f"{accurate_seek:.3f}", "-t", f"{max(0.04, duration_s):.3f}",
"-map", "0:v:0", "-map", "0:a:0",
"-c:v", "libx264", "-preset", "veryfast", "-crf", "23",
"-vf", f"fps={fps},scale={clip_width}:-2,setsar=1,setpts=PTS-STARTPTS",
"-c:a", "aac", "-ar", "48000", "-ac", "2",
"-pix_fmt", "yuv420p", "-movflags", "+faststart", str(out),
])
def black_silence(duration_s: float, out: Path) -> bool:
return _run([
"ffmpeg", "-y", "-loglevel", "error",
"-f", "lavfi", "-i", f"color=black:s={clip_width}x720:r={fps}",
"-f", "lavfi", "-i", "anullsrc=r=48000:cl=stereo",
"-t", f"{max(0.5, duration_s):.3f}",
"-c:v", "libx264", "-preset", "veryfast", "-crf", "23",
"-c:a", "aac", "-pix_fmt", "yuv420p", "-movflags", "+faststart", str(out),
])
def concat_clips(parts: list[Path], out: Path) -> bool:
lst = out.with_suffix(".txt")
lst.write_text(
"\n".join(f"file '{p.resolve().as_posix()}'" for p in parts),
encoding="utf-8",
)
ok = _run([
"ffmpeg", "-y", "-loglevel", "error",
"-f", "concat", "-safe", "0", "-i", str(lst),
"-c", "copy", str(out),
], timeout=300)
lst.unlink(missing_ok=True)
return ok
beat_clips: list[Path] = []
for rec in data:
bid = int(rec["beat_id"])
segs = rec.get("segments", [])
src = Path(rec["source_path"]) if rec.get("source_path") else None
clip_out = out_dir / f"beat_{bid:02d}.mp4"
if src is None or not src.exists():
beat = beats_by_id.get(bid, {})
dur = max(0.5, float(beat.get("end_s", 1)) - float(beat.get("start_s", 0)))
log.info("Beat %02d: NO MATCH — black/silence %.2fs", bid, dur)
if black_silence(dur, clip_out):
beat_clips.append(clip_out)
continue
if len(segs) >= 2:
parts: list[Path] = []
for idx, seg in enumerate(segs):
in_s = float(seg["in_point_s"])
dur = max(0.04, float(seg["out_point_s"]) - in_s)
seg_src = Path(seg["source_path"]) if seg.get("source_path") else src
part = out_dir / f"beat_{bid:02d}_seg{idx:02d}.mp4"
log.info("Beat %02d seg%d: scene=%s %.2fs%.2fs", bid, idx, seg.get("scene_id"), in_s, in_s + dur)
if extract_with_audio(seg_src, in_s, dur, part):
parts.append(part)
if not parts:
log.warning("Beat %02d: no segments extracted", bid)
continue
if len(parts) == 1:
parts[0].rename(clip_out)
beat_clips.append(clip_out)
else:
if concat_clips(parts, clip_out):
beat_clips.append(clip_out)
for p in parts:
p.unlink(missing_ok=True)
else:
in_s = float(rec["in_point_s"])
beat = beats_by_id.get(bid, {})
beat_dur = float(beat["end_s"]) - float(beat["start_s"]) if beat else 0.0
source_dur = float(rec["out_point_s"]) - in_s
dur = max(0.04, beat_dur if beat_dur > 0.04 else source_dur)
log.info("Beat %02d: scene=%s %.2fs+%.2fs (trailer=%.2fs src=%.2fs)", bid, rec.get("scene_id"), in_s, dur, beat_dur, source_dur)
if extract_with_audio(src, in_s, dur, clip_out):
beat_clips.append(clip_out)
else:
log.warning("Beat %02d: extraction failed", bid)
if not beat_clips:
log.error("No clips extracted — aborting.")
return
log.info("Concatenating %d beat clips → %s", len(beat_clips), preview_out)
if concat_clips(beat_clips, preview_out):
size_mb = preview_out.stat().st_size / 1_048_576
log.info("Preview ready: %s (%.1f MB)", preview_out, size_mb)
print(f"\n Preview → {preview_out} ({size_mb:.1f} MB)")
else:
log.error("Final concat failed — per-beat clips are in %s", out_dir)
# ---------------------------------------------------------------------------
# Argument parser
# ---------------------------------------------------------------------------
@@ -2011,6 +2632,12 @@ def _build_parser() -> argparse.ArgumentParser:
p_run.add_argument("--beat", type=int,
help="Run match/report/export for only one cached beat")
# preview
sub.add_parser(
"preview",
help="Build output/preview.mp4 from cached matches — source clips with audio in beat order",
)
return parser
@@ -2035,6 +2662,7 @@ def main() -> None:
"report": cmd_report,
"export": cmd_export,
"run": cmd_run,
"preview": cmd_preview,
}
handler = dispatch[args.command]
+6 -3
View File
@@ -8,7 +8,7 @@
[project]
name = "AI Trailer Generator v2"
version = "2.0.0"
log_level = "INFO" # DEBUG | INFO | WARNING | ERROR
log_level = "DEBUG" # DEBUG | INFO | WARNING | ERROR
# -----------------------------------------------------------------------------
# [paths] — External video sources (read-only access)
@@ -86,7 +86,10 @@ span_score_weight = 0.15
coarse_score_weight = 0.10
duration_score_weight = 0.20
duration_tie_break_score_delta = 0.03
min_duration_coverage = 0.65
min_duration_coverage = 0.55
# Every visible sub-shot in a multi-shot beat must pass this stricter gate.
# A weak segment is left unmatched instead of being hidden by a strong neighbor.
multi_shot_segment_threshold = 0.50
continuity_seed_offsets_s = [-1.0, 0.0, 0.5, 1.0, 1.5, 2.0, 3.0]
scene_seed_top_k = 30
scene_seed_points_per_scene = 6
@@ -183,7 +186,7 @@ local_scan_step_s = 0.12
local_scan_max_points_per_scene = 180
local_scan_top_candidates = 36
local_scan_tie_break_score_delta = 0.08
multi_shot_cut_corr_threshold = 0.20
multi_shot_cut_corr_threshold = 0.55
multi_shot_boundary_tolerance_s = 0.20
fullscan_fallback = false
content_threshold = 0.22
+85 -2
View File
@@ -132,8 +132,33 @@ bereits auf die sichtbare Aktionsphase ausgerichtet.
Der Segment-Offset zählt nur über vorherige scorebare Bildinseln, nicht über
schwarze oder blendige Lücken. Nach dem Retiming wird die nutzbare Source-
Dauer erneut geschätzt; läuft die Source am Ende in eine sichtbar andere
Aktionsphase, wird der Clip gekürzt und der Rest bleibt Placeholder/Fade
statt einen falschen Bewegungsmoment zu zeigen.
Aktionsphase, wird der Treffer im Cutter-Report klar als phasenkritisch
markiert. Schwarz/Placeholder wird nur für wirklich ungematchte Trailer-
Bereiche oder Fades verwendet, nicht um sichtbare Kandidatenbewegung im Review
zu verstecken.
Diese Span-Schätzung ist strenger als der grobe Suchscore: Ein fast stehender
Anfang darf einen Match nicht retten, wenn spätere Frames sichtbar in eine
andere Gestik, Körperposition oder eintretende Figur driften. Stabile
Score-Plateaus dürfen nur verlängern, wenn sie noch nah genug am Anfangsniveau
liegen; sonst bleibt der Treffer vorläufig und muss neu gesucht oder visuell
geprüft werden. Der Review-Clip zeigt den Kandidaten weiterhin sichtbar, damit
Phasenfehler nicht durch Schwarz verdeckt werden.
Für Multi-Shot-Beats gilt zusätzlich eine Segment-Schwelle pro sichtbarer
Einstellung. Ein gutes erstes Segment darf kein zweites Segment mit schwachem
Score mitziehen. Segmente unter `multi_shot_segment_threshold` werden nicht als
stabile Wahrheit behandelt, sondern innerhalb derselben plausiblen Source-Scene
nachjustiert. Die Nachjustierung nutzt eine saliency-gewichtete Mehrframe-Prüfung:
Timecodes und statische Randbereiche werden entwertet, kontrastreiche und über
mehrere Trailerframes unterscheidbare Bildbereiche zählen stärker. Dadurch kann
eine schwache zweite Einstellung phasengenauer repariert werden, ohne den Fehler
durch Schwarzbild zu verdecken oder einen Beat manuell zu kuratieren.
Der Cutter-Report verwendet Clip-Caching. Bereits vorhandene Compare-Clips werden
wiederverwendet; bei gezielten Rematches wird nur der betroffene Beat neu gerendert
(`CUTTER_REPORT_FORCE_BEATS`). So bleibt der Report aktuell, ohne alle Beats jedes
Mal neu zu kodieren.
## Vision-Seeds vs. Vollscan
@@ -165,6 +190,56 @@ eine kurze Geste erst korrekt erkannt und anschließend in eine spätere
ähnliche Körperhaltung verschoben wird. Wenn mehrere Vision-Kandidaten in
derselben Source-Szene ähnlich gut scoren und die Beat-Dauer abdecken,
bevorzugt der Matcher die frühere Phase.
Die Vision-Recovery läuft nicht nur für komplett fehlende Beats, sondern auch
für schwache unbestätigte Treffer. Gerade Low-Light-Beats dürfen nicht an einem
falschen dunklen CV-Treffer hängen bleiben, wenn der Cache semantisch eine
bessere Handlungsphase kennt.
Bei langen Source-Szenen prüft die Action-Window-Suche immer den Szenenanfang
und mehrere frühe Fenster, bevor sie gleichmäßig über die ganze Szene sampelt.
Damit gehen kurze Trailer-Aktionen am Anfang einer langen Szene nicht unter,
wenn der Rest der Szene aus Credits, Schwarzbild oder ruhigen Folgeframes
besteht.
Wenn ein Action-Window die starke Beat-Aktion explizit enthält, darf es eine
etwas niedrigere Textähnlichkeit haben; die Handlung zählt dann stärker als
Nebenwörter zu Licht, Bildausschnitt oder Stimmung.
Bereits gecachte Action-Windows einer Szene bleiben gültige Kandidaten, auch
wenn sich das aktuelle Sampling-Raster ändert. So verliert der Matcher keine
teuren Vision-Hinweise und muss dieselben Fenster nicht erneut beschreiben.
Wenn neue Vision-Calls deaktiviert sind, darf die Recovery vorhandene Cache-
Beschreibungen trotzdem lesen; das erzeugt keine API-Kosten und verhindert,
dass alte schwache CV-Treffer stehen bleiben.
Schlägt die CV-Feinjustierung bei einem semantisch klaren Low-Light-Fenster
fehl, bleibt das Action-Window als provisorischer Treffer erhalten. CV darf
einen dunklen Treffer verfeinern, aber nicht einen eindeutigen Cache-Hinweis
komplett verwerfen.
Zusätzlich kann Recovery vorhandene gecachte Action-Windows direkt über alle
Szenen ranken. Dieser schnelle Pfad vermeidet einen teuren Vollscan, wenn der
Cache bereits eine starke Aktion wie Hand-am-Mund, Kuss oder Blickwechsel
enthält.
Eindeutige Begriffe aus der Beat-Beschreibung wirken als harte Filter für
Vision-Fenster: `mouth` muss im Kandidaten wiederkehren, `dark interior` darf
nicht auf Outdoor-Material fallen, und markante Personenmerkmale wie `blonde`
bleiben bindend.
Der zusätzliche Hi-Res-Phasenrefine bleibt lokal um den bereits validierten
Inpoint und übernimmt nur klare Verbesserungen. Er darf keine ganze lange
Dialogszene nach ähnlichen Layouts durchsuchen, weil sonst dieselbe Location
mit anderer Gestik als falsche Phase gewinnen kann und die Laufzeit explodiert.
Die lokale Retune-Wertung nutzt deshalb nicht nur den mittleren Frame-Score,
sondern auch den schlechtesten Einzelvergleich, die ersten sichtbaren Frames
und die Frame-zu-Frame-Bewegung. Dadurch gewinnt nicht mehr ein späteres
Standbild derselben Einstellung, nur weil Fenster, Gesichter und Licht fast
identisch aussehen.
Unsichere Einzeltreffer ohne Segmentliste laufen ebenfalls durch diesen lokalen
Phasen-Probe. Das repariert alte Cache-Einträge, deren Szene korrekt ist, deren
Inpoint aber einige Frames in der Bewegung daneben liegt. Der Probe bleibt auf
kleine lokale Shifts begrenzt und wird nicht für jeden bestätigten Treffer
erzwungen, damit Report-Refreshes nicht zum Vollscan werden.
Report-Clips werden zusätzlich an den bekannten Source-Szenenstart plus eine
sehr kurze Ein-Frame-Guard-Zone geklemmt, damit ein knapp vor oder direkt auf
der Schnittkante liegender Inpoint nicht mit Frames der vorherigen Einstellung
beginnt. Die Guard-Zone bleibt bewusst klein, weil eine längere Korrektur die
sichtbare Bewegungsphase innerhalb derselben Einstellung verschieben würde.
## Multi-Shot-Beats
@@ -175,6 +250,13 @@ nur wenn die relative Source-Grenze zeitlich zu einem erkannten Trailer-
Umschnitt passt. So kann ein Beat aus Frage/Antwort-Shots vollständig erfasst
werden, ohne Szenen willkürlich zusammenzukleben.
## Titel- und Grafikbeats
Dunkle Trailerkarten mit deutlich isoliertem Text werden im Cutter-Report als
`GFX` markiert, wenn es keinen Source-Treffer gibt. Diese Beats sind keine
fehlgeschlagenen Matches: Der Cutter soll die Trailer-Grafik beziehungsweise
eine NLE-Titelkarte übernehmen und nicht im Spielfilm nach einem Bild suchen.
## Reranking-Pipeline
Vor dem teuren Frame-Refine wird der gesamte Kandidatenpool mit einer
@@ -296,3 +378,4 @@ bzw. letzten scorebaren Frame derselben Einstellung passen.
Treffer unter `provisional_content_threshold` werden nicht mehr gespeichert
oder aus alten Cache-Ergebnissen übernommen.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.

Before

Width:  |  Height:  |  Size: 9.9 KiB

After

Width:  |  Height:  |  Size: 9.9 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 11 KiB

After

Width:  |  Height:  |  Size: 11 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 20 KiB

After

Width:  |  Height:  |  Size: 14 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 5.0 KiB

After

Width:  |  Height:  |  Size: 8.4 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 12 KiB

After

Width:  |  Height:  |  Size: 13 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 2.8 KiB

After

Width:  |  Height:  |  Size: 6.4 KiB

File diff suppressed because one or more lines are too long
+121 -18
View File
@@ -9,7 +9,7 @@ Renders two editor-facing reports:
scene and segment info, score warnings, and rematch hints.
This report is the single source of truth for the video editor and is
designed to eventually replace the legacy match_report.html.
the only report that should be opened for review.
Usage (from project root):
python scripts/generate_cutter_report.py # stills + compare clips
@@ -22,6 +22,7 @@ from __future__ import annotations
import argparse
import base64
import json
import os
import re
import subprocess
import sys
@@ -29,6 +30,8 @@ from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from PIL import Image, ImageStat
# ---------------------------------------------------------------------------
# Frame-rate / timecode helpers
# ---------------------------------------------------------------------------
@@ -120,6 +123,7 @@ STILL_WIDTH = 480
STILL_QUALITY = 4
CLIP_WIDTH = 480
CLIP_MAX_DURATION_S = 30.0
SCENE_START_GUARD_S = 0.04
# Each half of the side-by-side compare strip
COMPARE_HALF_W = 480
COMPARE_H = 270 # 16:9
@@ -133,6 +137,19 @@ def _run(cmd: list[str], timeout: int = 120) -> bool:
return False
def _forced_beats() -> set[int]:
raw = os.environ.get("CUTTER_REPORT_FORCE_BEATS", "")
forced: set[int] = set()
for part in re.split(r"[,;\s]+", raw):
if not part:
continue
try:
forced.add(int(part))
except ValueError:
continue
return forced
def extract_still(video_path: Path, t_s: float, out: Path) -> bool:
"""Always render fresh."""
if not video_path.exists():
@@ -373,15 +390,20 @@ class BeatRow:
trailer_clip: Path | None = None
source_clip: Path | None = None
compare_clip: Path | None = None
is_graphic: bool = False
@property
def status(self) -> str:
if self.is_graphic:
return "GFX"
if not self.matched:
return "MAN."
return "OK" if self.confirmed else "?"
@property
def status_de(self) -> str:
if self.is_graphic:
return "Titel/Grafik"
if not self.matched:
return "Kein Treffer"
return "Bestätigt" if self.confirmed else "Vorläufig"
@@ -401,6 +423,7 @@ def collect_rows(
source_path: Path,
with_stills: bool,
with_clips: bool,
scenes_by_id: dict[int, dict] | None = None,
) -> list[BeatRow]:
stills_dir = project_root / "output" / "cutter_stills"
clips_dir = project_root / "output" / "cutter_clips"
@@ -408,6 +431,27 @@ def collect_rows(
stills_dir.mkdir(parents=True, exist_ok=True)
if with_clips:
clips_dir.mkdir(parents=True, exist_ok=True)
force_beats = _forced_beats()
def is_dark_title_card(path: Path | None) -> bool:
if path is None or not path.exists():
return False
try:
image = Image.open(path).convert("L").resize((160, 90))
except Exception:
return False
stat = ImageStat.Stat(image)
mean = float(stat.mean[0])
extrema = image.getextrema()
if mean > 55.0 or extrema[1] < 90:
return False
pixels = list(image.getdata())
bright = sum(1 for p in pixels if p >= 92)
mid = sum(1 for p in pixels if 30 <= p < 92)
total = max(1, len(pixels))
bright_ratio = bright / total
mid_ratio = mid / total
return 0.004 <= bright_ratio <= 0.18 and mid_ratio <= 0.35
rows: list[BeatRow] = []
for beat in beats:
@@ -420,6 +464,31 @@ def collect_rows(
if rec is not None:
segs = rec.get("segments") or []
num_segs = len(segs)
if scenes_by_id:
rec_scene = scenes_by_id.get(int(rec.get("scene_id", -1)))
if rec_scene and float(rec["in_point_s"]) < float(rec_scene["start_s"]) + SCENE_START_GUARD_S:
guarded_start = min(
float(rec_scene["end_s"]) - 0.04,
float(rec_scene["start_s"]) + SCENE_START_GUARD_S,
)
shift = guarded_start - float(rec["in_point_s"])
rec = dict(rec)
rec["in_point_s"] = guarded_start
rec["out_point_s"] = max(float(rec["in_point_s"]) + 0.04, float(rec["out_point_s"]) + shift)
fixed_segs = []
for seg in segs:
fixed = dict(seg)
seg_scene = scenes_by_id.get(int(fixed.get("scene_id", -1)))
if seg_scene and float(fixed["in_point_s"]) < float(seg_scene["start_s"]) + SCENE_START_GUARD_S:
guarded_start = min(
float(seg_scene["end_s"]) - 0.04,
float(seg_scene["start_s"]) + SCENE_START_GUARD_S,
)
shift = guarded_start - float(fixed["in_point_s"])
fixed["in_point_s"] = guarded_start
fixed["out_point_s"] = max(float(fixed["in_point_s"]) + 0.04, float(fixed["out_point_s"]) + shift)
fixed_segs.append(fixed)
segs = fixed_segs
trailer_still = source_still = None
trailer_clip = source_clip = compare_clip = None
@@ -427,13 +496,17 @@ def collect_rows(
if with_stills:
t_still = beat_still_time(beat["start_s"], beat["end_s"])
tjpg = stills_dir / f"beat_{bid:02d}_trailer.jpg"
if extract_still(trailer_path, t_still, tjpg):
if tjpg.exists() and bid not in force_beats:
trailer_still = tjpg
elif extract_still(trailer_path, t_still, tjpg):
trailer_still = tjpg
if rec is not None:
src_dur = max(0.04, rec["out_point_s"] - rec["in_point_s"])
s_still = rec["in_point_s"] + min(0.4, src_dur * 0.3)
sjpg = stills_dir / f"beat_{bid:02d}_source.jpg"
if extract_still(source_path, s_still, sjpg):
if sjpg.exists() and bid not in force_beats:
source_still = sjpg
elif extract_still(source_path, s_still, sjpg):
source_still = sjpg
if with_clips:
@@ -441,12 +514,16 @@ def collect_rows(
# Trailer clip (cutter-side, simple)
tmp4 = clips_dir / f"beat_{bid:02d}_trailer.mp4"
if extract_clip(trailer_path, beat["start_s"], beat_dur, tmp4):
if tmp4.exists() and bid not in force_beats:
trailer_clip = tmp4
elif extract_clip(trailer_path, beat["start_s"], beat_dur, tmp4):
trailer_clip = tmp4
if rec is not None:
smp4 = clips_dir / f"beat_{bid:02d}_source.mp4"
if num_segs >= 2:
if smp4.exists() and bid not in force_beats:
source_clip = smp4
elif num_segs >= 2:
seg_specs = [
(float(s["in_point_s"]),
max(0.04, float(s["out_point_s"]) - float(s["in_point_s"])))
@@ -463,13 +540,28 @@ def collect_rows(
# Frame-locked compare video
cmp4 = clips_dir / f"beat_{bid:02d}_compare.mp4"
if build_compare_clip(
compare_segs = segs
if not compare_segs:
seg_dur = max(0.04, min(beat_dur, rec["out_point_s"] - rec["in_point_s"]))
compare_segs = [{
"trailer_offset_s": 0.0,
"duration_s": seg_dur,
"scene_id": rec.get("scene_id"),
"in_point_s": rec["in_point_s"],
"out_point_s": rec["in_point_s"] + seg_dur,
"match_score": rec.get("match_score", 0.0),
"is_confirmed": rec.get("is_confirmed", False),
}]
if cmp4.exists() and bid not in force_beats:
compare_clip = cmp4
elif build_compare_clip(
trailer_path, beat["start_s"], beat_dur,
source_path, segs if num_segs >= 1 else [],
source_path, compare_segs,
cmp4,
):
compare_clip = cmp4
is_graphic = (rec is None and is_dark_title_card(trailer_still))
rows.append(BeatRow(
bid=bid,
trailer_in_s=beat["start_s"], trailer_out_s=beat["end_s"],
@@ -489,6 +581,7 @@ def collect_rows(
trailer_clip=trailer_clip,
source_clip=source_clip,
compare_clip=compare_clip,
is_graphic=is_graphic,
))
return rows
@@ -505,6 +598,7 @@ def render_markdown(
) -> str:
matched = sum(1 for r in rows if r.matched)
confirmed = sum(1 for r in rows if r.confirmed)
graphic = sum(1 for r in rows if r.is_graphic)
out: list[str] = []
out.append("# Cutter-Report — manuelles Nachschneiden")
@@ -523,11 +617,12 @@ def render_markdown(
out.append("|--------|-----------|")
out.append("| `OK` | Bestätigt durch CV-Analyse — übernehmen |")
out.append("| `?` | Vorläufig — korrekte Szene, Phase im NLE prüfen |")
out.append("| `GFX` | Titel-/Grafikkarte — nicht aus dem Spielfilm matchen |")
out.append("| `MAN.` | Kein automatischer Treffer — manuell setzen |")
out.append("")
out.append(
f"**{len(rows)}** Beats gesamt · **{matched}** automatisch (**{confirmed}** bestätigt)"
f" · **{len(rows) - matched}** manuell."
f" · **{graphic}** Grafik/Titel · **{len(rows) - matched - graphic}** manuell."
)
out.append("")
@@ -580,10 +675,14 @@ def render_markdown(
f" (scene {seg.get('scene_id', '?')})"
)
else:
out.append("- **Source** : — (manuell setzen)")
if r.is_graphic:
out.append("- **Source** : — (Titel-/Grafikkarte, nicht aus Source matchen)")
else:
out.append("- **Source** : — (manuell setzen)")
if r.score > 0 and r.score < 0.65:
out.append(f"- ⚠ Score {r.score:.3f} unter 0.65 — visuell prüfen")
out.append(f"- **Rematch**: `python cli.py rematch --beat {r.bid}`")
if not r.is_graphic:
out.append(f"- **Rematch**: `python cli.py rematch --beat {r.bid}`")
if r.phase:
out.append(f"- **Phase**: {r.phase}")
if r.composition:
@@ -602,7 +701,7 @@ def render_markdown(
out.append("| Trailer | Source |")
out.append("|:---:|:---:|")
t_cell = f"![Trailer {r.bid}]({t_uri})" if t_uri else "_(kein Still)_"
s_cell = f"![Source {r.bid}]({s_uri})" if s_uri else "_(MAN.)_"
s_cell = f"![Source {r.bid}]({s_uri})" if s_uri else f"_({r.status})_"
out.append(f"| {t_cell} | {s_cell} |")
out.append("")
@@ -708,6 +807,7 @@ table.ov tr:hover { background: rgba(255, 255, 255, 0.05); }
.badge.ok { background: var(--ok-bg); color: var(--ok); border: 1px solid rgba(74, 222, 128, 0.2); }
.badge.q { background: var(--q-bg); color: var(--q); border: 1px solid rgba(251, 191, 36, 0.2); }
.badge.man { background: var(--man-bg); color: var(--man); border: 1px solid rgba(248, 113, 113, 0.2); }
.badge.gfx { background: rgba(96, 165, 250, 0.12); color: #93c5fd; border: 1px solid rgba(147, 197, 253, 0.24); }
/* Beat cards */
.beats-grid { display: grid; grid-template-columns: repeat(auto-fill, minmax(600px, 1fr)); gap: 32px; }
@@ -820,6 +920,10 @@ def render_html(
'<tr><td><span class="badge q">?</span></td>'
'<td>Vorläufig — Phase und Aktion im NLE visuell prüfen</td></tr>'
)
parts.append(
'<tr><td><span class="badge gfx">GFX</span></td>'
'<td>Titel-/Grafikkarte — als Trailer-Grafik übernehmen, nicht im Spielfilm suchen</td></tr>'
)
parts.append(
'<tr><td><span class="badge man">MAN.</span></td>'
'<td>Kein Treffer — manuell suchen oder Schwarzbild einfügen</td></tr>'
@@ -847,7 +951,7 @@ def render_html(
str(s.get("scene_id", "?")) for s in r.segments
))
scene = "+".join(all_scenes)
bcls = {"OK": "ok", "?": "q", "MAN.": "man"}[r.status]
bcls = {"OK": "ok", "?": "q", "GFX": "gfx", "MAN.": "man"}[r.status]
parts.append(
f'<tr>'
f'<td class="num"><a href="#beat-{r.bid:02d}">{r.bid:02d}</a></td>'
@@ -868,7 +972,7 @@ def render_html(
ti = smpte(r.trailer_in_s, trailer_fps)
to = smpte(r.trailer_out_s, trailer_fps)
dur = r.trailer_out_s - r.trailer_in_s
bcls = {"OK": "ok", "?": "q", "MAN.": "man"}[r.status]
bcls = {"OK": "ok", "?": "q", "GFX": "gfx", "MAN.": "man"}[r.status]
parts.append(f'<div class="beat" id="beat-{r.bid:02d}">')
@@ -1007,12 +1111,15 @@ def render_report(
cache = project_root / ".cache"
results = {r["beat_id"]: r for r in json.loads((cache / "match_results.json").read_text())}
beats = json.loads((cache / "trailer_beats.json").read_text())
scene_path = cache / "scene_index.json"
scenes = json.loads(scene_path.read_text()) if scene_path.exists() else []
scenes_by_id = {int(s["scene_id"]): s for s in scenes}
vis_path = cache / "vision_descriptions.json"
vis_items = json.loads(vis_path.read_text())["items"] if vis_path.exists() else {}
rows = collect_rows(
project_root, beats, results, vis_items,
trailer_path, source_path, with_stills, with_clips,
trailer_path, source_path, with_stills, with_clips, scenes_by_id,
)
now = datetime.now()
@@ -1040,13 +1147,9 @@ def main() -> int:
)
(project_root / "CUTTER_REPORT.md").write_text(md, encoding="utf-8")
(project_root / "CUTTER_REPORT.html").write_text(html, encoding="utf-8")
legacy_path = project_root / "output" / "report" / "match_report.html"
legacy_path.parent.mkdir(parents=True, exist_ok=True)
legacy_path.write_text(html, encoding="utf-8")
print(f"Wrote {project_root / 'CUTTER_REPORT.md'}")
print(f"Wrote {project_root / 'CUTTER_REPORT.html'}")
print(f"Wrote {legacy_path}")
return 0
+33 -5
View File
@@ -7,6 +7,34 @@
$ErrorActionPreference = "Stop"
$VENV_DIR = ".venv"
function Invoke-CapturedProcess {
param(
[Parameter(Mandatory = $true)][string]$FilePath,
[Parameter(Mandatory = $false)][string[]]$Arguments = @()
)
$psi = [System.Diagnostics.ProcessStartInfo]::new()
$psi.FileName = $FilePath
foreach ($arg in $Arguments) {
[void]$psi.ArgumentList.Add($arg)
}
$psi.UseShellExecute = $false
$psi.RedirectStandardOutput = $true
$psi.RedirectStandardError = $true
$psi.CreateNoWindow = $true
$process = [System.Diagnostics.Process]::Start($psi)
$stdout = $process.StandardOutput.ReadToEnd()
$stderr = $process.StandardError.ReadToEnd()
$process.WaitForExit()
$combined = (($stdout + "`n" + $stderr).Trim())
if ($process.ExitCode -ne 0) {
throw "Command failed ($($process.ExitCode)): $FilePath $($Arguments -join ' ')`n$combined"
}
return $combined
}
function Resolve-ProjectPython {
$cmd = Get-Command python -ErrorAction SilentlyContinue
if ($cmd) {
@@ -35,7 +63,7 @@ Write-Host ""
# ---- 1. Check Python version ------------------------------------------------
$PROJECT_PYTHON = Resolve-ProjectPython
$pythonVersion = & $PROJECT_PYTHON --version 2>&1
$pythonVersion = Invoke-CapturedProcess $PROJECT_PYTHON @("--version")
Write-Host "Python: $pythonVersion"
if ($pythonVersion -notmatch "3\.(1[1-9]|[2-9]\d)") {
Write-Error "Python 3.11+ required. Found: $pythonVersion"
@@ -48,8 +76,8 @@ if (Test-Path $VENV_DIR) {
$venvOk = $false
if (Test-Path $existingVenvPython) {
try {
$existingVersion = & $existingVenvPython --version 2>&1
$venvOk = $LASTEXITCODE -eq 0 -and $existingVersion -match "3\.(1[1-9]|[2-9]\d)"
$existingVersion = Invoke-CapturedProcess $existingVenvPython @("--version")
$venvOk = $existingVersion -match "3\.(1[1-9]|[2-9]\d)"
} catch {
$venvOk = $false
}
@@ -60,12 +88,12 @@ if (Test-Path $VENV_DIR) {
} else {
Write-Host "Existing virtual environment is not usable. Recreating '$VENV_DIR' ..." -ForegroundColor Yellow
Remove-Item -LiteralPath $VENV_DIR -Recurse -Force
& $PROJECT_PYTHON -m venv $VENV_DIR
Invoke-CapturedProcess $PROJECT_PYTHON @("-m", "venv", $VENV_DIR) | Out-Null
Write-Host "Done." -ForegroundColor Green
}
} else {
Write-Host "Creating virtual environment in '$VENV_DIR' ..." -ForegroundColor Green
& $PROJECT_PYTHON -m venv $VENV_DIR
Invoke-CapturedProcess $PROJECT_PYTHON @("-m", "venv", $VENV_DIR) | Out-Null
Write-Host "Done." -ForegroundColor Green
}
+2
View File
@@ -64,6 +64,7 @@ class DeepScanConfig:
duration_score_weight: float
duration_tie_break_score_delta: float
min_duration_coverage: float
multi_shot_segment_threshold: float
continuity_seed_offsets_s: tuple[float, ...]
scene_seed_top_k: int
scene_seed_points_per_scene: int
@@ -267,6 +268,7 @@ def load_config(
duration_score_weight=float(cv_raw["deep_scan"].get("duration_score_weight", 0.20)),
duration_tie_break_score_delta=float(cv_raw["deep_scan"].get("duration_tie_break_score_delta", 0.03)),
min_duration_coverage=float(cv_raw["deep_scan"].get("min_duration_coverage", 0.65)),
multi_shot_segment_threshold=float(cv_raw["deep_scan"].get("multi_shot_segment_threshold", 0.50)),
continuity_seed_offsets_s=tuple(
float(v) for v in cv_raw["deep_scan"].get(
"continuity_seed_offsets_s",
+384 -17
View File
@@ -198,6 +198,183 @@ def _fixed_content_features(frame: np.ndarray, cfg: AppConfig) -> tuple[np.ndarr
)
def _hires_phase_feature(frame: np.ndarray) -> np.ndarray:
"""High-resolution normalised luma feature for intra-scene phase matching.
Standard pipeline features (160×80) lose the subtle pixel differences
between talking-head phases (mouth open vs. closed). This 320×160 feature
with an 8×8 spatial histogram grid provides enough spatial resolution to
discriminate facial expression phases within a single continuous scene.
"""
trimmed = _trim_dark_borders(frame).copy()
h0, w0 = trimmed.shape[:2]
# Source and trailer masters often contain burned-in timecode in the
# upper-left corner. It changes every frame and can dominate fine phase
# matching, so neutralise that area before extracting hi-res features.
trimmed[: int(h0 * 0.16), : int(w0 * 0.28)] = 0
h, w = trimmed.shape[:2]
cropped = trimmed[int(h * 0.05):int(h * 0.95), :]
gray = cv2.cvtColor(cropped, cv2.COLOR_BGR2GRAY)
gray = cv2.equalizeHist(gray)
resized = cv2.resize(gray, (320, 160), interpolation=cv2.INTER_AREA)
return resized
def _hires_spatial_hist(frame_feature: np.ndarray) -> np.ndarray:
"""8×8 grid spatial colour histogram from a hi-res luma feature."""
h, w = frame_feature.shape[:2]
grid = 8
cell_h = h // grid
cell_w = w // grid
parts: list[np.ndarray] = []
for gy in range(grid):
for gx in range(grid):
cell = frame_feature[gy * cell_h:(gy + 1) * cell_h,
gx * cell_w:(gx + 1) * cell_w]
hist = cv2.calcHist([cell], [0], None, [24], [0, 256]).astype(np.float32).flatten()
parts.append(hist / (float(np.sum(hist)) + 1e-6))
return np.concatenate(parts)
def _hires_phase_score(
ref_feature: np.ndarray,
ref_spatial: np.ndarray,
src_frame: np.ndarray,
) -> float:
"""Compare a source frame to a reference using hi-res phase features.
Uses three signals:
1. Full-frame NCC for overall similarity
2. Center-crop NCC for face/expression matching (key for talking heads)
3. Spatial histogram for structural layout
"""
src_feat = _hires_phase_feature(src_frame)
# Full-frame NCC
ncc_full = float(cv2.matchTemplate(
src_feat, ref_feature, cv2.TM_CCOEFF_NORMED
)[0][0])
# Center-crop NCC (face region — the center 40% of the frame)
h, w = ref_feature.shape[:2]
cy, cx = h // 2, w // 2
ch, cw = int(h * 0.20), int(w * 0.20)
ref_center = ref_feature[cy - ch:cy + ch, cx - cw:cx + cw]
src_center = src_feat[cy - ch:cy + ch, cx - cw:cx + cw]
ncc_center = float(cv2.matchTemplate(
src_center, ref_center, cv2.TM_CCOEFF_NORMED
)[0][0])
# Spatial histogram similarity
src_spatial = _hires_spatial_hist(src_feat)
spatial = _hist_intersection(ref_spatial, src_spatial)
return ncc_full * 0.25 + ncc_center * 0.45 + spatial * 0.30
def _hires_phase_refine(
beat: TrailerBeat,
in_point_s: float,
scene_start_s: float,
scene_end_s: float,
cfg: AppConfig,
) -> float:
"""Re-scan the full source scene at high resolution to correct phase.
This is applied as a final refinement step after the standard pipeline
has identified the correct scene. It addresses the case where low-res
features cannot distinguish between different phases of the same shot
(e.g. mouth open vs. closed in a talking-head close-up).
"""
# Build hi-res templates from only the stable, bright reference frames
# before any fade begins. Fading frames have dropping brightness that
# would penalise correct source positions where those offsets map to
# bright content in the source.
matchable_s = estimate_matchable_reference_duration(beat, cfg, sample_step_s=0.04)
ref_templates: list[tuple[float, np.ndarray, np.ndarray, float]] = []
step_s = max(1.0 / cfg.export.edl_frame_rate, 0.04)
t = 0.0
while t <= matchable_s:
frame = grab_frame_at_path(beat.trailer_path, beat.start_s + t)
if frame is not None and _is_scoreable_reference_frame(frame, cfg):
mean_l, p90_l, contrast = _reference_visibility_stats(frame, cfg)
# Only use clearly visible frames (skip dimming fade frames)
if mean_l >= 50.0 and contrast >= 40.0:
feat = _hires_phase_feature(frame)
spatial = _hires_spatial_hist(feat)
ref_templates.append((t, feat, spatial, mean_l))
t = round(t + step_s, 6)
if not ref_templates:
return in_point_s
# For very short matchable durations (fast fades / cross-dissolves),
# keep only the brightest template. When the beat fades quickly the
# later templates are dim and penalise every bright source candidate
# equally, destroying phase discrimination. A single bright anchor
# gives maximum selectivity.
if matchable_s < 1.0 and len(ref_templates) > 1:
ref_templates.sort(key=lambda x: x[3], reverse=True)
ref_templates = ref_templates[:1]
logger.debug(
'Beat %d: hi-res using single brightest template at offset %.3fs (luma %.1f)',
beat.beat_id, ref_templates[0][0], ref_templates[0][3],
)
# Strip the luma field for the scan loop
scan_templates = [(off, feat, sp) for off, feat, sp, _ in ref_templates]
max_ref_offset = max(off for off, _, _ in scan_templates)
# Scan only a local window around the already validated in-point. A full
# scene scan can jump to a different phase of the same shot (same room,
# same actor, different gesture) and it is unnecessarily expensive.
local_window_s = max(0.8, min(2.0, cfg.cv.deep_scan.content_align_window_seconds * 3.0))
scan_start_s = max(scene_start_s, in_point_s - local_window_s)
scan_end_s = min(scene_end_s, in_point_s + local_window_s)
original_scores: list[float] = []
with open_video(cfg.paths.source_movie) as cap:
for off, ref_feat, ref_spatial in scan_templates:
src_frame = grab_frame_at(cap, in_point_s + off)
if src_frame is not None:
original_scores.append(_hires_phase_score(ref_feat, ref_spatial, src_frame))
original_score = -1.0
if original_scores:
original_score = (sum(original_scores) / len(original_scores)) * 0.7 + min(original_scores) * 0.3
# Scan the local neighbourhood.
best_t = in_point_s
best_score = original_score
scan_step_s = max(1.0 / (cfg.export.edl_frame_rate or 24.0), 0.04)
with open_video(cfg.paths.source_movie) as cap:
t = scan_start_s
while t + max_ref_offset <= scan_end_s:
scores: list[float] = []
all_ok = True
for off, ref_feat, ref_spatial in scan_templates:
src_frame = grab_frame_at(cap, t + off)
if src_frame is None:
all_ok = False
break
scores.append(_hires_phase_score(ref_feat, ref_spatial, src_frame))
if all_ok and scores:
avg = sum(scores) / len(scores)
combined = avg * 0.7 + min(scores) * 0.3
if combined > best_score:
best_score = combined
best_t = t
t = round(t + scan_step_s, 6)
if best_score < original_score + 0.025:
return in_point_s
if best_t != in_point_s:
logger.info(
'Beat %d: hi-res phase refine moved in-point %.3fs -> %.3fs '
'(delta=%.3fs, score=%.4f)',
beat.beat_id, in_point_s, best_t,
best_t - in_point_s, best_score,
)
return best_t
def _fixed_content_pair_score(
ref_features: tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray],
source_frame: np.ndarray,
@@ -388,12 +565,36 @@ def _rerank_candidates_by_content(
reranked: list[tuple[float, float, float]] = []
with open_video(cfg.paths.source_movie) as cap:
for coarse_score, t_sec in candidates:
content_score = _fixed_content_sequence_score(cap, t_sec, templates, cfg)
# If the candidate lands just before a scene boundary, also evaluate
# the start of the next scene. A coarse-scan offset can place the
# in-point a few frames into the preceding (wrong) scene, causing
# the content and coverage scores to be artificially low even though
# the next scene is the correct visual match.
eval_t = t_sec
if scenes is not None:
cur_scene = _find_scene_for_time(scenes, t_sec, cfg)
if cur_scene is not None:
remaining = float(cur_scene.end_s) - t_sec
next_idx = next(
(i + 1 for i, s in enumerate(scenes) if s.scene_id == cur_scene.scene_id),
None,
)
if (
remaining < cfg.cv.deep_scan.scene_boundary_epsilon_s * 4
and next_idx is not None
and next_idx < len(scenes)
):
next_scene_start = float(scenes[next_idx].start_s)
alt_content = _fixed_content_sequence_score(cap, next_scene_start, templates, cfg)
cur_content = _fixed_content_sequence_score(cap, t_sec, templates, cfg)
if alt_content > cur_content:
eval_t = next_scene_start
content_score = _fixed_content_sequence_score(cap, eval_t, templates, cfg)
coverage_score = 1.0
if scenes is not None and matchable_duration_s and matchable_duration_s > 0:
usable_s = _contiguous_scene_coverage_duration(
beat,
t_sec,
eval_t,
scenes,
matchable_duration_s,
cfg,
@@ -404,7 +605,7 @@ def _rerank_candidates_by_content(
+ coarse_score * 0.18
+ coverage_score * 0.20
)
reranked.append((rank_score, coarse_score, t_sec))
reranked.append((rank_score, coarse_score, eval_t))
return sorted(reranked, key=lambda item: item[0], reverse=True)
@@ -772,6 +973,8 @@ def _content_alignment_score(
in_point_s: float,
templates: list[tuple[float, np.ndarray]],
cfg: AppConfig,
fps: float | None = None,
frame_cache: dict[int, np.ndarray] | None = None,
) -> float:
if not templates:
return -1.0
@@ -782,7 +985,13 @@ def _content_alignment_score(
early_scores: list[float] = []
for offset_s, template in templates:
frame = grab_frame_at(cap, in_point_s + offset_s)
t0 = in_point_s + offset_s
if frame_cache is not None and fps is not None:
idx = int(round(t0 * fps))
frame = frame_cache.get(idx)
else:
frame = grab_frame_at(cap, t0)
if frame is None:
return -1.0
@@ -840,6 +1049,20 @@ def align_in_point_by_content(
end_s = estimated_in_point_s + window_s
tie_delta = cfg.cv.deep_scan.start_tie_break_score_delta
min_offset = min(off for off, _ in templates)
max_offset = max(off for off, _ in templates)
req_start_s = max(0.0, start_s + min_offset - frame_step_s)
req_end_s = end_s + max_offset + frame_step_s
frame_cache = {}
t_req = req_start_s
while t_req <= req_end_s:
idx = int(round(t_req * fps))
frame = grab_frame_at(cap, t_req)
if frame is not None:
frame_cache[idx] = frame
t_req = round(t_req + frame_step_s, 6)
best_in = estimated_in_point_s
best_score = -1.0
t = start_s
@@ -852,7 +1075,7 @@ def align_in_point_by_content(
active_templates = []
else:
active_templates = templates
score = _content_alignment_score(cap, t, active_templates, cfg) if active_templates else -1.0
score = _content_alignment_score(cap, t, active_templates, cfg, fps=fps, frame_cache=frame_cache) if active_templates else -1.0
if score > best_score + tie_delta:
best_score = score
best_in = t
@@ -868,11 +1091,23 @@ def _motion_phase_score(
in_point_s: float,
motion_templates: list[tuple[float, float, np.ndarray, tuple[int, ...]]],
cfg: AppConfig,
fps: float | None = None,
frame_cache: dict[int, np.ndarray] | None = None,
) -> float:
scores: list[float] = []
for offset_s, step_s, ref_delta, template_shape in motion_templates:
f0 = grab_frame_at(cap, in_point_s + offset_s)
f1 = grab_frame_at(cap, in_point_s + offset_s + step_s)
t0 = in_point_s + offset_s
t1 = in_point_s + offset_s + step_s
if frame_cache is not None and fps is not None:
idx0 = int(round(t0 * fps))
idx1 = int(round(t1 * fps))
f0 = frame_cache.get(idx0)
f1 = frame_cache.get(idx1)
else:
f0 = grab_frame_at(cap, t0)
f1 = grab_frame_at(cap, t1)
if f0 is None or f1 is None:
return -1.0
src0 = _fixed_feature(f0, template_shape, cfg)
@@ -913,11 +1148,25 @@ def align_in_point_by_motion(
end_s = estimated_in_point_s + window_s
tie_delta = cfg.cv.deep_scan.start_tie_break_score_delta
min_offset = min(off for off, _, _, _ in motion_templates)
max_offset = max(off + step for off, step, _, _ in motion_templates)
req_start_s = max(0.0, start_s + min_offset - frame_step_s)
req_end_s = end_s + max_offset + frame_step_s
frame_cache = {}
t_req = req_start_s
while t_req <= req_end_s:
idx = int(round(t_req * fps))
frame = grab_frame_at(cap, t_req)
if frame is not None:
frame_cache[idx] = frame
t_req = round(t_req + frame_step_s, 6)
best_in = estimated_in_point_s
best_score = -1.0
t = start_s
while t <= end_s:
score = _motion_phase_score(cap, t, motion_templates, cfg)
score = _motion_phase_score(cap, t, motion_templates, cfg, fps=fps, frame_cache=frame_cache)
if score > best_score + tie_delta:
best_score = score
best_in = t
@@ -933,6 +1182,7 @@ def align_in_point_by_content_and_motion(
estimated_in_point_s: float,
cfg: AppConfig,
search_window_s: float | None = None,
scene_end_s: float | None = None,
) -> tuple[float, float, float, float]:
"""
Align a candidate using still-frame content and motion phase together.
@@ -959,23 +1209,57 @@ def align_in_point_by_content_and_motion(
end_s = estimated_in_point_s + window_s
tie_delta = cfg.cv.deep_scan.start_tie_break_score_delta
min_t_offset = min(off for off, _ in templates) if templates else 0.0
max_t_offset = max(off for off, _ in templates) if templates else 0.0
min_m_offset = min(off for off, _, _, _ in motion_templates) if motion_templates else 0.0
max_m_offset = max(off + step for off, step, _, _ in motion_templates) if motion_templates else 0.0
min_offset = min(min_t_offset, min_m_offset)
max_offset = max(max_t_offset, max_m_offset)
req_start_s = max(0.0, start_s + min_offset - frame_step_s)
req_end_s = end_s + max_offset + frame_step_s
frame_cache = {}
t_req = req_start_s
while t_req <= req_end_s:
idx = int(round(t_req * fps))
frame = grab_frame_at(cap, t_req)
if frame is not None:
frame_cache[idx] = frame
t_req = round(t_req + frame_step_s, 6)
best_in = estimated_in_point_s
best_score = -1.0
best_content = -1.0
best_motion = -1.0
t = start_s
while t <= end_s:
content_score = _content_alignment_score(cap, t, templates, cfg)
if scene_end_s is not None:
avail_s = scene_end_s - t
if avail_s > 0:
active_templates = [(off, tpl) for off, tpl in templates if off <= avail_s]
active_motion = [(off, step, delta, shape) for off, step, delta, shape in motion_templates if off + step <= avail_s]
else:
active_templates = []
active_motion = []
else:
active_templates = templates
active_motion = motion_templates
content_score = _content_alignment_score(cap, t, active_templates, cfg, fps=fps, frame_cache=frame_cache) if active_templates else -1.0
motion_score = (
_motion_phase_score(cap, t, motion_templates, cfg)
if len(motion_templates) >= 2
_motion_phase_score(cap, t, active_motion, cfg, fps=fps, frame_cache=frame_cache)
if len(active_motion) >= 2
else content_score
)
if content_score < 0 or motion_score < 0:
t = round(t + frame_step_s, 6)
continue
raw_score = content_score * 0.64 + motion_score * 0.36
anchor_penalty = min(0.18, abs(t - estimated_in_point_s) * 0.05)
# The previous anchor_penalty of 0.05 per second was stronger than the
# actual variance in raw_score, preventing phase correction. We reduce it
# so that it only acts as a tie-breaker.
anchor_penalty = min(0.18, abs(t - estimated_in_point_s) * 0.005)
score = raw_score - anchor_penalty
if score > best_score + tie_delta:
best_score = score
@@ -1027,6 +1311,18 @@ def estimate_usable_source_duration(
frame = grab_frame_at(cap, in_point_s + offset_s)
if frame is None:
break
# If the template is scoreable (has content) but the source frame is dark,
# this is a bad match. We should not let dark source frames
# provide high correlation to dark templates.
# templates are already pre-processed into feature images (grayscale/edges),
# so we can't use _is_scoreable_reference_frame on them directly.
# Instead, we rely on the fact that _prepare_beat_templates already
# filtered out non-scoreable frames.
if _is_dark_reference_frame(frame, cfg):
scores.append((offset_s, 0.0))
continue
scores.append((offset_s, _match_score(frame, template, cfg)))
if not scores:
@@ -1034,16 +1330,21 @@ def estimate_usable_source_duration(
warmup_scores = [score for offset, score in scores if offset <= min(1.0, beat.duration_s * 0.35)]
baseline = max(warmup_scores) if warmup_scores else max(score for _, score in scores)
min_score = max(0.34, baseline * 0.48)
# Keep the usable span tied to the same action phase, not just the same room
# or actors. A loose cutoff hides "same scene, wrong moment" drift in long
# dialogue shots where the background remains highly correlated.
min_score = max(0.42, baseline * 0.62)
last_good = 0.0
bad_run = 0
bad_run_start_offset: float | None = None
good_scores: list[float] = []
for offset_s, score in scores:
if score >= min_score:
last_good = offset_s
bad_run = 0
bad_run_start_offset = None
good_scores.append(score)
continue
@@ -1051,7 +1352,34 @@ def estimate_usable_source_duration(
continue
bad_run += 1
if bad_run_start_offset is None:
bad_run_start_offset = offset_s
if bad_run >= 3:
# Before killing the span, check whether the remaining scores form a
# stable plateau. This handles scenes where a grading/exposure
# difference between trailer and source causes a gradual score drop
# rather than a hard cut. A genuine cut produces chaotic scores;
# a grading mismatch produces a flat, low-but-consistent plateau.
# Conditions: low variance (std < 0.025), scores above pure-black
# (mean > 0.20), and the warmup baseline was meaningful (>= 0.30).
tail_scores = [s for o, s in scores if o >= bad_run_start_offset]
if (
len(tail_scores) >= 3
and float(np.std(tail_scores)) < 0.025
and float(np.mean(tail_scores)) > max(0.30, baseline * 0.58)
and baseline >= 0.42
):
logger.debug(
'Beat %d: stable plateau detected at offset %.3fs '
'(tail mean=%.3f std=%.3f) — extending span to full duration.',
beat.beat_id, bad_run_start_offset,
float(np.mean(tail_scores)), float(np.std(tail_scores)),
)
last_good = scores[-1][0]
good_scores.extend(tail_scores)
break
logger.debug('Beat %d: Match died at offset %.3fs. Score %.3f < min_score %.3f. Bad run count: %d',
beat.beat_id, offset_s, score, min_score, bad_run)
break
tail_safety_s = max(0.0, cfg.cv.deep_scan.trim_tail_frames / source_fps)
@@ -1113,7 +1441,10 @@ def refine_in_point_with_sequence(
Returns:
(best_in_point_s, sequence_score)
"""
return align_in_point_by_content(beat, estimated_in_point_s, cfg, search_window_s, scene_end_s)
best_in, best_score, _, _ = align_in_point_by_content_and_motion(
beat, estimated_in_point_s, cfg, search_window_s, scene_end_s
)
return best_in, best_score
def _find_scene_for_time(scenes: Sequence | None, t_sec: float, cfg: AppConfig):
@@ -1451,7 +1782,7 @@ def run_global_scan(
max_source_duration_s=duration_s if rough_scene_end_s is not None else None,
)
content_score = original_content_score
content_in_s, align_content_score = align_in_point_by_content(
content_in_s, _, align_content_score, _ = align_in_point_by_content_and_motion(
b,
adjusted_in_s,
cfg,
@@ -1495,7 +1826,7 @@ def run_global_scan(
cfg,
)
motion_in_s, align_motion_score = align_in_point_by_motion(
motion_in_s, _, _, align_motion_score = align_in_point_by_content_and_motion(
b,
adjusted_in_s,
cfg,
@@ -1504,6 +1835,7 @@ def run_global_scan(
if local_align_window_s is not None
else min(1.0, cfg.cv.deep_scan.content_align_window_seconds)
),
scene_end_s=rough_scene_end_s,
)
if align_motion_score >= original_motion_score + 0.015:
@@ -1561,7 +1893,12 @@ def run_global_scan(
)
if len(motion_templates) >= 2:
motion_score_clamped = max(0.0, min(1.0, motion_score))
final_score = final_score * 0.82 + motion_score_clamped * 0.18
blended = final_score * 0.82 + motion_score_clamped * 0.18
# Do not let motion blending drag the score below the
# content-validated level. A weak motion score often just
# means the shot contains a camera pan or slow zoom; it
# should not veto an otherwise well-supported content match.
final_score = max(blended, final_score - 0.015)
if is_weighted_seed_candidate:
vision_provisional_score = (
content_score * 0.45
@@ -1741,6 +2078,36 @@ def run_global_scan(
best_result.match_score,
)
# Final hi-res phase refinement: scan the full source scene at
# higher resolution to correct phase mismatches that the standard
# 160×80 features cannot resolve (e.g. talking-head close-ups).
final_in_s = best_result.in_point_s
final_scene = _find_scene_for_time(scenes, final_in_s, cfg)
if final_scene is not None:
refined_phase_in_s = _hires_phase_refine(
b,
final_in_s,
float(final_scene.start_s),
float(final_scene.end_s),
cfg,
)
if refined_phase_in_s != final_in_s:
final_in_s = refined_phase_in_s
# Recompute out-point preserving the duration
final_out_s = final_in_s + best_result.duration_s
if final_scene is not None:
final_out_s = min(final_out_s, float(final_scene.end_s))
best_result = MatchResult(
beat_id=b.beat_id,
scene_id=best_result.scene_id,
source_path=cfg.paths.source_movie,
in_point_s=final_in_s,
out_point_s=final_out_s,
in_point_frame=int(final_in_s * source_fps),
match_score=best_result.match_score,
is_confirmed=is_confirmed,
)
results.append(MatchResult(
beat_id=b.beat_id,
scene_id=best_result.scene_id,
+118 -28
View File
@@ -434,12 +434,20 @@ def _scene_window_ranges(scene: Scene, beat: TrailerBeat, max_windows: int) -> l
usable_start = scene.start_s
usable_end = max(scene.start_s, scene.end_s - window_s)
if max_windows == 1:
starts = [usable_start + (usable_end - usable_start) * 0.5]
else:
step = (usable_end - usable_start) / max(1, max_windows - 1)
starts = [usable_start + step * idx for idx in range(max_windows)]
return [(start_s, min(scene.end_s, start_s + window_s)) for start_s in starts]
starts = [usable_start]
early_step = max(0.5, window_s * 0.75)
for idx in range(1, min(max_windows, 4)):
starts.append(min(usable_end, usable_start + early_step * idx))
remaining = max_windows - len(starts)
if remaining > 0:
if remaining == 1:
starts.append(usable_start + (usable_end - usable_start) * 0.5)
else:
step = (usable_end - usable_start) / max(1, remaining - 1)
starts.extend(usable_start + step * idx for idx in range(remaining))
deduped = sorted({round(max(usable_start, min(usable_end, s)), 3) for s in starts})
return [(start_s, min(scene.end_s, start_s + window_s)) for start_s in deduped[:max_windows]]
def _cached_scene_descriptions(
@@ -749,11 +757,11 @@ def find_action_window_in_scene(
inside that scene. It stays automatic and cached: windows are described
evenly across the scene until the per-run vision budget is consumed.
"""
if not cfg.vision.enabled or scene.duration_s <= 0:
if scene.duration_s <= 0:
return None
cache = _load_cache(cfg)
budget = [max(0, cfg.vision.max_new_descriptions_per_run)]
budget = [max(0, cfg.vision.max_new_descriptions_per_run) if cfg.vision.enabled else 0]
beat_desc = _describe_sample(
kind="beat",
item_id=beat.beat_id,
@@ -772,37 +780,37 @@ def find_action_window_in_scene(
if not beat_actions:
return None
max_windows = max(
cfg.vision.seed_points_per_scene,
cfg.vision.max_new_descriptions_per_run,
)
best: tuple[float, float, float, str] | None = None
for start_s, end_s in _scene_window_ranges(scene, beat, max_windows):
desc = _describe_sample(
kind="action_window",
item_id=scene.scene_id,
label=f"source scene {scene.scene_id} action window {start_s:.2f}-{end_s:.2f}",
video_path=scene.source_path,
start_s=start_s,
end_s=end_s,
cfg=cfg,
cache=cache,
budget=budget,
)
def consider_candidate(start_s: float, end_s: float, desc: str) -> None:
nonlocal best
if not desc:
continue
return
beat_text = beat_desc.lower()
source_text = desc.lower()
positive_source_text = source_text.split('"negatives"', 1)[0]
if "mouth" in beat_text and "mouth" not in positive_source_text:
return
if "dark interior" in beat_text and (
"interior" not in positive_source_text or "dark" not in positive_source_text
):
return
if "blonde" in beat_text and "blonde" not in positive_source_text:
return
score, reason = _semantic_match_score(beat_desc, desc)
source_actions = _semantic_action_groups(desc)
missing_actions = _missing_action_groups(beat_actions, source_actions)
if missing_actions:
continue
return
threshold = max(0.38, cfg.vision.similarity_threshold + 0.18)
if beat_actions and beat_actions <= source_actions:
threshold = min(threshold, max(0.52, cfg.vision.similarity_threshold + 0.05))
if score < threshold:
continue
return
phase_adjustment, phase_reason = _action_phase_adjustment(beat_desc, desc)
adjusted_score = max(0.0, min(1.0, score + phase_adjustment))
if adjusted_score < threshold:
continue
return
candidate = (
start_s,
end_s,
@@ -814,5 +822,87 @@ def find_action_window_in_scene(
):
best = candidate
max_windows = max(
cfg.vision.seed_points_per_scene,
cfg.vision.max_new_descriptions_per_run,
)
ranges = _scene_window_ranges(scene, beat, max_windows)
cached_desc_by_range: dict[tuple[float, float], str] = {}
cached_items = cache.get("items", {})
if isinstance(cached_items, dict):
for item in cached_items.values():
if not isinstance(item, dict) or item.get("kind") != "action_window":
continue
if item.get("item_id") != scene.scene_id:
continue
try:
start_s = float(item.get("start_s"))
end_s = float(item.get("end_s"))
except (TypeError, ValueError):
continue
if scene.start_s <= start_s < scene.end_s and end_s > start_s:
key = (round(start_s, 3), round(min(scene.end_s, end_s), 3))
ranges.append(key)
description = item.get("description", "")
if isinstance(description, str) and description.strip():
cached_desc_by_range[key] = description
consider_candidate(key[0], key[1], description)
ranges = sorted({(round(start_s, 3), round(end_s, 3)) for start_s, end_s in ranges})
for start_s, end_s in ranges:
desc = cached_desc_by_range.get((round(start_s, 3), round(end_s, 3)))
if desc is None:
desc = _describe_sample(
kind="action_window",
item_id=scene.scene_id,
label=f"source scene {scene.scene_id} action window {start_s:.2f}-{end_s:.2f}",
video_path=scene.source_path,
start_s=start_s,
end_s=end_s,
cfg=cfg,
cache=cache,
budget=budget,
)
if not desc:
continue
consider_candidate(start_s, end_s, desc)
_save_cache(cfg, cache)
if best is None and isinstance(cached_items, dict):
for item in cached_items.values():
if not isinstance(item, dict) or item.get("kind") != "action_window":
continue
if item.get("item_id") != scene.scene_id:
continue
desc = item.get("description", "")
if not isinstance(desc, str) or not desc.strip():
continue
beat_text = beat_desc.lower()
source_text = desc.lower()
positive_source_text = source_text.split('"negatives"', 1)[0]
if "mouth" in beat_text and "mouth" not in positive_source_text:
continue
if "dark interior" in beat_text and (
"interior" not in positive_source_text or "dark" not in positive_source_text
):
continue
if "blonde" in beat_text and "blonde" not in positive_source_text:
continue
source_actions = _semantic_action_groups(desc)
if not beat_actions or not beat_actions <= source_actions:
continue
score, reason = _semantic_match_score(beat_desc, desc)
if score < max(0.38, cfg.vision.similarity_threshold + 0.05):
continue
try:
start_s = float(item.get("start_s"))
end_s = float(item.get("end_s"))
except (TypeError, ValueError):
continue
return (
start_s,
min(scene.end_s, end_s),
min(0.99, score),
f"{reason} phase=cached_action_window raw={score:.3f}",
)
return best
-1
View File
@@ -1 +0,0 @@
# tests package
-144
View File
@@ -1,144 +0,0 @@
"""
tests/test_config.py Smoke tests for config loading and model integrity.
Run with: pytest tests/test_config.py -v
"""
from pathlib import Path
import pytest
from src.core.config import load_config, AppConfig
from src.core.models import (
Scene, TrailerBeat, MatchResult, VibeHit,
EditClip, EditTimeline, BeatType, DialogueLine,
)
CONFIG_PATH = Path(__file__).parents[1] / "config.toml"
# ---------------------------------------------------------------------------
# Config loader
# ---------------------------------------------------------------------------
class TestConfigLoader:
def test_loads_without_error(self) -> None:
cfg = load_config(CONFIG_PATH)
assert isinstance(cfg, AppConfig)
def test_project_meta(self) -> None:
cfg = load_config(CONFIG_PATH)
assert cfg.version == "2.0.0"
assert cfg.log_level in ("DEBUG", "INFO", "WARNING", "ERROR")
def test_cv_thresholds_in_range(self) -> None:
cfg = load_config(CONFIG_PATH)
ds = cfg.cv.deep_scan
assert 0.0 < ds.match_threshold < 1.0
assert ds.coarse_step_seconds > 0
def test_vibe_check_crop_fractions(self) -> None:
cfg = load_config(CONFIG_PATH)
vc = cfg.cv.vibe_check
assert 0.0 < vc.crop_top_fraction < 1.0
assert 0.0 < vc.crop_bottom_fraction < 1.0
assert vc.crop_top_fraction + vc.crop_bottom_fraction < 1.0
def test_missing_config_raises(self, tmp_path: Path) -> None:
with pytest.raises(FileNotFoundError):
load_config(tmp_path / "nonexistent.toml")
def test_paths_are_path_objects(self) -> None:
cfg = load_config(CONFIG_PATH)
assert isinstance(cfg.paths.source_movie, Path)
assert isinstance(cfg.paths.reference_trailer, Path)
# ---------------------------------------------------------------------------
# Data models — construction & properties
# ---------------------------------------------------------------------------
class TestSceneModel:
def test_duration(self) -> None:
s = Scene(
scene_id=0,
source_path=Path("dummy.mp4"),
start_s=10.0,
end_s=25.5,
start_frame=240,
end_frame=612,
)
assert s.duration_s == pytest.approx(15.5)
assert s.midpoint_s == pytest.approx(17.75)
def test_immutable(self) -> None:
s = Scene(
scene_id=0, source_path=Path("x.mp4"),
start_s=0.0, end_s=1.0,
start_frame=0, end_frame=24,
)
with pytest.raises(Exception): # FrozenInstanceError
s.scene_id = 99 # type: ignore[misc]
class TestTrailerBeatModel:
def test_beat_type_default(self) -> None:
b = TrailerBeat(
beat_id=0, trailer_path=Path("trailer.mp4"),
start_s=0.0, end_s=3.0,
start_frame=0, end_frame=72,
)
assert b.beat_type == BeatType.UNKNOWN
class TestMatchResultModel:
def test_duration_computed(self) -> None:
mr = MatchResult(
beat_id=0, scene_id=3,
source_path=Path("movie.mp4"),
in_point_s=120.0,
out_point_s=123.5,
in_point_frame=2880,
match_score=0.87,
)
assert mr.duration_s == pytest.approx(3.5)
def test_repr_contains_key_info(self) -> None:
mr = MatchResult(
beat_id=1, scene_id=7,
source_path=Path("movie.mp4"),
in_point_s=60.0, out_point_s=63.0,
in_point_frame=1440, match_score=0.91,
)
r = repr(mr)
assert "beat=1" in r
assert "scene=7" in r
class TestEditTimeline:
def _make_clip(self, idx: int, t_start: float, t_end: float) -> EditClip:
beat = TrailerBeat(
beat_id=idx, trailer_path=Path("t.mp4"),
start_s=t_start, end_s=t_end,
start_frame=0, end_frame=1,
)
match = MatchResult(
beat_id=idx, scene_id=0,
source_path=Path("m.mp4"),
in_point_s=0.0, out_point_s=t_end - t_start,
in_point_frame=0, match_score=0.9,
)
return EditClip(
clip_index=idx, beat=beat, match=match,
timeline_start_s=t_start, timeline_end_s=t_end,
)
def test_total_duration(self) -> None:
clips = (self._make_clip(0, 0.0, 5.0), self._make_clip(1, 5.0, 9.0))
tl = EditTimeline(title="Test Trailer", frame_rate=23.976, clips=clips)
assert tl.total_duration_s == pytest.approx(9.0)
assert tl.clip_count == 2
def test_empty_timeline(self) -> None:
tl = EditTimeline(title="Empty", frame_rate=24.0, clips=())
assert tl.total_duration_s == 0.0
-140
View File
@@ -1,140 +0,0 @@
"""
tests/test_deep_scan.py Unit tests for frame_extractor and deep_scan
Uses synthetic in-memory videos (cv2.VideoWriter temp file) so no real
video files are required. Tests cover the pure logic, not hardware decoding.
"""
from __future__ import annotations
import tempfile
from pathlib import Path
import cv2
import numpy as np
import pytest
from src.cv.frame_extractor import (
get_video_info,
grab_frame_at,
iter_frames_stepped,
open_video,
)
from src.cv.fingerprinting import text_safe_crop
# ---------------------------------------------------------------------------
# Helpers: build a tiny synthetic video on disk
# ---------------------------------------------------------------------------
FPS = 24
WIDTH = 320
HEIGHT = 240
SECS = 3
def _make_synthetic_video(path: Path, color_bgr: tuple[int, int, int] = (0, 128, 255)) -> Path:
"""Write a 3-second single-colour video to *path*."""
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
writer = cv2.VideoWriter(str(path), fourcc, float(FPS), (WIDTH, HEIGHT))
frame = np.full((HEIGHT, WIDTH, 3), color_bgr, dtype=np.uint8)
for _ in range(FPS * SECS):
writer.write(frame)
writer.release()
return path
@pytest.fixture
def synthetic_video(tmp_path: Path) -> Path:
return _make_synthetic_video(tmp_path / "test.mp4")
# ---------------------------------------------------------------------------
# open_video
# ---------------------------------------------------------------------------
class TestOpenVideo:
def test_opens_valid_file(self, synthetic_video: Path) -> None:
with open_video(synthetic_video) as cap:
assert cap.isOpened()
def test_raises_on_missing_file(self, tmp_path: Path) -> None:
with pytest.raises(FileNotFoundError):
with open_video(tmp_path / "ghost.mp4"):
pass
# ---------------------------------------------------------------------------
# get_video_info
# ---------------------------------------------------------------------------
class TestGetVideoInfo:
def test_returns_correct_fps(self, synthetic_video: Path) -> None:
info = get_video_info(synthetic_video)
assert info["fps"] == pytest.approx(FPS, rel=0.05)
def test_duration_approx(self, synthetic_video: Path) -> None:
info = get_video_info(synthetic_video)
assert info["duration_s"] == pytest.approx(SECS, rel=0.1)
def test_resolution(self, synthetic_video: Path) -> None:
info = get_video_info(synthetic_video)
assert info["width"] == WIDTH
assert info["height"] == HEIGHT
# ---------------------------------------------------------------------------
# grab_frame_at
# ---------------------------------------------------------------------------
class TestGrabFrameAt:
def test_returns_ndarray(self, synthetic_video: Path) -> None:
with open_video(synthetic_video) as cap:
frame = grab_frame_at(cap, 1.0)
assert frame is not None
assert isinstance(frame, np.ndarray)
assert frame.shape == (HEIGHT, WIDTH, 3)
def test_returns_none_past_end(self, synthetic_video: Path) -> None:
with open_video(synthetic_video) as cap:
frame = grab_frame_at(cap, 9999.0)
# May return None or a repeated last frame depending on codec;
# we only assert no exception is raised.
assert frame is None or isinstance(frame, np.ndarray)
# ---------------------------------------------------------------------------
# iter_frames_stepped
# ---------------------------------------------------------------------------
class TestIterFramesStepped:
def test_yields_correct_count(self, synthetic_video: Path) -> None:
with open_video(synthetic_video) as cap:
frames = list(iter_frames_stepped(cap, 0.0, 1.0, 0.5))
# Expect timestamps: 0.0, 0.5, 1.0 → 3 frames
assert len(frames) == 3
def test_timestamps_increasing(self, synthetic_video: Path) -> None:
with open_video(synthetic_video) as cap:
frames = list(iter_frames_stepped(cap, 0.0, 2.0, 0.5))
timestamps = [t for t, _ in frames]
assert timestamps == sorted(timestamps)
def test_invalid_step_raises(self, synthetic_video: Path) -> None:
with open_video(synthetic_video) as cap:
with pytest.raises(ValueError, match="step_s"):
list(iter_frames_stepped(cap, 0.0, 1.0, 0.0))
# ---------------------------------------------------------------------------
# text_safe_crop integration (sanity: cropped height consistent)
# ---------------------------------------------------------------------------
class TestCropSanity:
def test_crop_reduces_height(self, synthetic_video: Path) -> None:
with open_video(synthetic_video) as cap:
frame = grab_frame_at(cap, 0.5)
assert frame is not None
cropped = text_safe_crop(frame, 0.15, 0.30)
assert cropped.shape[0] < frame.shape[0]
assert cropped.shape[1] == frame.shape[1] # width unchanged
-218
View File
@@ -1,218 +0,0 @@
"""
tests/test_export.py Unit tests for timecode conversion and export writers
Tests use synthetic EditTimeline objects (no real video files needed).
"""
from __future__ import annotations
from pathlib import Path
import pytest
from src.export.timecode import (
seconds_to_fcpxml,
seconds_to_smpte,
fcpxml_frame_duration,
fcpxml_format_name,
seconds_to_frame_count,
)
# ---------------------------------------------------------------------------
# Timecode helpers
# ---------------------------------------------------------------------------
class TestSecondsToFcpxml:
def test_zero(self) -> None:
assert seconds_to_fcpxml(0.0, 24.0) == "0s"
def test_one_second_at_24fps(self) -> None:
# 1.0s @ 24fps → 24 frames → 24/24s = 1/1s
result = seconds_to_fcpxml(1.0, 24.0)
assert result == "1/1s"
def test_one_second_at_23976(self) -> None:
# 1s @ 23.976 → 24000/24000 * 1001/1001 = 1001/1000 ... let's just check it's rational
result = seconds_to_fcpxml(1.0, 23.976)
assert result.endswith("s")
assert "/" in result
def test_ten_seconds_at_25fps(self) -> None:
# 10s @ 25fps → 250 frames → 250/25s = 10/1s
result = seconds_to_fcpxml(10.0, 25.0)
assert result == "10/1s"
def test_rational_is_reduced(self) -> None:
# Should never produce 24/24s
result = seconds_to_fcpxml(1.0, 24.0)
num, den = result.rstrip("s").split("/")
from math import gcd
assert gcd(int(num), int(den)) == 1
class TestSecondsToSmpte:
def test_zero(self) -> None:
assert seconds_to_smpte(0.0, 24.0) == "00:00:00:00"
def test_one_minute(self) -> None:
assert seconds_to_smpte(60.0, 25.0) == "00:01:00:00"
def test_one_hour(self) -> None:
assert seconds_to_smpte(3600.0, 24.0) == "01:00:00:00"
def test_frames_overflow(self) -> None:
# 25fps: 26 frames → 1s + 1 frame = 00:00:01:01
result = seconds_to_smpte(26 / 25, 25.0)
assert result == "00:00:01:01"
def test_format_length(self) -> None:
result = seconds_to_smpte(123.456, 23.976)
parts = result.split(":")
assert len(parts) == 4
assert all(len(p) == 2 for p in parts)
class TestFcpxmlHelpers:
def test_frame_duration_24fps(self) -> None:
assert fcpxml_frame_duration(24.0) == "1/24s"
def test_frame_duration_23976(self) -> None:
fd = fcpxml_frame_duration(23.976)
# Should be "1001/24000s"
assert fd == "1001/24000s"
def test_format_name_1080p_2398(self) -> None:
name = fcpxml_format_name(23.976, 1920, 1080)
assert "1080" in name
assert "2398" in name
def test_frame_count_roundtrip(self) -> None:
fps = 25.0
seconds = 10.0
frames = seconds_to_frame_count(seconds, fps)
assert frames == 250
# ---------------------------------------------------------------------------
# EDL writer (string output)
# ---------------------------------------------------------------------------
class TestEdlWriter:
def _make_timeline(self) -> "src.core.models.EditTimeline": # type: ignore
from src.core.models import (
BeatType, EditClip, EditTimeline, MatchResult, TrailerBeat,
)
beat = TrailerBeat(
beat_id=0, trailer_path=Path("trailer.mp4"),
start_s=0.0, end_s=5.0, start_frame=0, end_frame=120,
beat_type=BeatType.HOOK,
)
match = MatchResult(
beat_id=0, scene_id=3,
source_path=Path("movie.mp4"),
in_point_s=30.0, out_point_s=35.0,
in_point_frame=720, match_score=0.88,
)
clip = EditClip(
clip_index=0, beat=beat, match=match,
timeline_start_s=0.0, timeline_end_s=5.0,
)
return EditTimeline(
title="TestTrailer", frame_rate=25.0, clips=(clip,)
)
def test_edl_contains_title(self, tmp_path: Path) -> None:
from src.core.config import load_config
from src.export.edl_writer import write_edl
cfg = load_config()
tl = self._make_timeline()
out = write_edl(tl, cfg, output_path=tmp_path / "test.edl")
text = out.read_text(encoding="utf-8")
assert "TITLE: TestTrailer" in text
def test_edl_has_event_line(self, tmp_path: Path) -> None:
from src.core.config import load_config
from src.export.edl_writer import write_edl
cfg = load_config()
tl = self._make_timeline()
out = write_edl(tl, cfg, output_path=tmp_path / "test.edl")
text = out.read_text(encoding="utf-8")
assert "001" in text # event number
assert "AX" in text # reel name
# ---------------------------------------------------------------------------
# FCPXML writer (XML structure)
# ---------------------------------------------------------------------------
class TestFcpxmlWriter:
def _make_timeline(self) -> "src.core.models.EditTimeline": # type: ignore
from src.core.models import (
BeatType, EditClip, EditTimeline, MatchResult, TrailerBeat,
)
beat = TrailerBeat(
beat_id=0, trailer_path=Path("trailer.mp4"),
start_s=0.0, end_s=5.0, start_frame=0, end_frame=120,
beat_type=BeatType.HOOK,
)
match = MatchResult(
beat_id=0, scene_id=3,
source_path=Path("B:/Proxy/movie.mp4"),
in_point_s=30.0, out_point_s=35.0,
in_point_frame=720, match_score=0.88,
)
clip = EditClip(
clip_index=0, beat=beat, match=match,
timeline_start_s=0.0, timeline_end_s=5.0,
)
return EditTimeline(
title="TestTrailer", frame_rate=25.0, clips=(clip,)
)
def test_fcpxml_is_valid_xml(self, tmp_path: Path) -> None:
from xml.etree import ElementTree as ET
from src.core.config import load_config
from src.export.fcpxml_writer import write_fcpxml
cfg = load_config()
tl = self._make_timeline()
out = write_fcpxml(tl, cfg, output_path=tmp_path / "test.fcpxml")
text = out.read_text(encoding="utf-8")
text_no_doctype = "\n".join(
line for line in text.splitlines()
if not line.strip().startswith("<!DOCTYPE")
)
root = ET.fromstring(text_no_doctype)
# Strip namespace prefix for comparison
local_tag = root.tag.split("}")[-1] if "}" in root.tag else root.tag
assert local_tag == "fcpxml"
def test_fcpxml_has_spine(self, tmp_path: Path) -> None:
from xml.etree import ElementTree as ET
from src.core.config import load_config
from src.export.fcpxml_writer import write_fcpxml
cfg = load_config()
tl = self._make_timeline()
out = write_fcpxml(tl, cfg, output_path=tmp_path / "test.fcpxml")
text = out.read_text(encoding="utf-8")
text_no_doctype = "\n".join(
line for line in text.splitlines()
if not line.strip().startswith("<!DOCTYPE")
)
# Register the FCPXML namespace so find() works
ns = {"fcp": "http://www.apple.com/dt/FCPXML/1_10"}
root = ET.fromstring(text_no_doctype)
spine = root.find(".//fcp:spine", ns)
assert spine is not None
clips = list(spine)
assert len(clips) == 1
-112
View File
@@ -1,112 +0,0 @@
"""
tests/test_fingerprinting.py Unit tests for src/cv/fingerprinting.py
Tests run WITHOUT requiring real video files.
"""
from __future__ import annotations
import numpy as np
import pytest
from src.cv.fingerprinting import (
text_safe_crop,
extract_hs_histograms,
compare_histograms,
hist_to_bytes,
bytes_to_hist,
)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def solid_blue_frame() -> np.ndarray:
"""256×256 solid blue BGR frame."""
frame = np.zeros((256, 256, 3), dtype=np.uint8)
frame[:, :] = (255, 0, 0) # BGR blue
return frame
@pytest.fixture
def solid_red_frame() -> np.ndarray:
"""256×256 solid red BGR frame."""
frame = np.zeros((256, 256, 3), dtype=np.uint8)
frame[:, :] = (0, 0, 255) # BGR red
return frame
# ---------------------------------------------------------------------------
# text_safe_crop
# ---------------------------------------------------------------------------
class TestTextSafeCrop:
def test_removes_correct_rows(self, solid_blue_frame: np.ndarray) -> None:
cropped = text_safe_crop(solid_blue_frame, crop_top=0.15, crop_bottom=0.30)
h = solid_blue_frame.shape[0] # 256
expected_h = int(h * (1.0 - 0.30)) - int(h * 0.15)
assert cropped.shape[0] == expected_h
def test_zero_crop_returns_same_size(self, solid_blue_frame: np.ndarray) -> None:
cropped = text_safe_crop(solid_blue_frame, crop_top=0.0, crop_bottom=0.0)
assert cropped.shape == solid_blue_frame.shape
def test_invalid_top_raises(self, solid_blue_frame: np.ndarray) -> None:
with pytest.raises(ValueError, match="crop_top"):
text_safe_crop(solid_blue_frame, crop_top=1.0, crop_bottom=0.0)
def test_invalid_bottom_raises(self, solid_blue_frame: np.ndarray) -> None:
with pytest.raises(ValueError, match="crop_bottom"):
text_safe_crop(solid_blue_frame, crop_top=0.0, crop_bottom=-0.1)
def test_overlapping_crops_raise(self, solid_blue_frame: np.ndarray) -> None:
with pytest.raises(ValueError, match="must be < 1.0"):
text_safe_crop(solid_blue_frame, crop_top=0.6, crop_bottom=0.5)
# ---------------------------------------------------------------------------
# Histograms
# ---------------------------------------------------------------------------
class TestHistograms:
def test_output_shape(self, solid_blue_frame: np.ndarray) -> None:
luma, sat = extract_hs_histograms(solid_blue_frame, bins_hue=50, bins_sat=60)
assert luma.shape == (50,)
assert sat.shape == (60,)
def test_normalised(self, solid_blue_frame: np.ndarray) -> None:
import numpy as np
luma, sat = extract_hs_histograms(solid_blue_frame, bins_hue=50, bins_sat=60)
# L2-normalised → norm ≈ 1.0
assert np.linalg.norm(luma) == pytest.approx(1.0, abs=1e-5)
assert np.linalg.norm(sat) == pytest.approx(1.0, abs=1e-5)
def test_same_frame_correl_is_one(self, solid_blue_frame: np.ndarray) -> None:
import cv2
luma, _ = extract_hs_histograms(solid_blue_frame, bins_hue=50, bins_sat=60)
score = compare_histograms(luma, luma, method=cv2.HISTCMP_CORREL)
assert score == pytest.approx(1.0, abs=1e-5)
def test_different_frames_correl_lower(
self,
solid_blue_frame: np.ndarray,
solid_red_frame: np.ndarray,
) -> None:
import cv2
luma_b, _ = extract_hs_histograms(solid_blue_frame, 50, 60)
luma_r, _ = extract_hs_histograms(solid_red_frame, 50, 60)
score = compare_histograms(luma_b, luma_r, method=cv2.HISTCMP_CORREL)
assert score < 1.0
# ---------------------------------------------------------------------------
# Serialisation round-trip
# ---------------------------------------------------------------------------
class TestSerialisation:
def test_round_trip(self, solid_blue_frame: np.ndarray) -> None:
luma, _ = extract_hs_histograms(solid_blue_frame, 50, 60)
restored = bytes_to_hist(hist_to_bytes(luma))
np.testing.assert_array_almost_equal(luma, restored)