cleanup dead code and tests

This commit is contained in:
Josh Hawkins 2026-06-01 10:34:05 -05:00
parent 28b0320a0f
commit 3ecf924bc9
4 changed files with 58 additions and 757 deletions

View File

@ -27,7 +27,6 @@ from frigate.jobs.motion_search_batch import (
stream_time_to_absolute, stream_time_to_absolute,
) )
from frigate.jobs.motion_search_decode import ( from frigate.jobs.motion_search_decode import (
_ffprobe_path,
iter_vod_frames, iter_vod_frames,
keyframe_sampling_eligible, keyframe_sampling_eligible,
probe_video_dimensions, probe_video_dimensions,
@ -361,6 +360,7 @@ class MotionSearchRunner(threading.Thread):
# Resolved once per job in _execute_search # Resolved once per job in _execute_search
self.ffmpeg_path: str = "ffmpeg" self.ffmpeg_path: str = "ffmpeg"
self.ffprobe_path: str = "ffprobe"
self.decode_args: list[str] = [] self.decode_args: list[str] = []
# Keyframe sampling decision, decided once per job from the first run's # Keyframe sampling decision, decided once per job from the first run's
# GOP. The fallback cadence is a fixed rate (see FALLBACK_SAMPLE_FPS). # GOP. The fallback cadence is a fixed rate (see FALLBACK_SAMPLE_FPS).
@ -451,6 +451,7 @@ class MotionSearchRunner(threading.Thread):
raise ValueError(f"Camera {camera_name} detect dimensions not configured") raise ValueError(f"Camera {camera_name} detect dimensions not configured")
self.ffmpeg_path = camera_config.ffmpeg.ffmpeg_path self.ffmpeg_path = camera_config.ffmpeg.ffmpeg_path
self.ffprobe_path = camera_config.ffmpeg.ffprobe_path
# Create polygon mask # Create polygon mask
polygon_mask = create_polygon_mask( polygon_mask = create_polygon_mask(
@ -561,7 +562,7 @@ class MotionSearchRunner(threading.Thread):
# what we decode, so crop/scale/mask are computed against it. # what we decode, so crop/scale/mask are computed against it.
self.internal_port = resolve_internal_port(self.config) self.internal_port = resolve_internal_port(self.config)
self.decode_args = resolve_motion_decode_args(camera_config) self.decode_args = resolve_motion_decode_args(camera_config)
ffprobe_path = _ffprobe_path(self.ffmpeg_path) ffprobe_path = self.ffprobe_path
runs = coalesce_runs(filtered_recordings, MAX_RUN_SECONDS, RUN_GAP_EPSILON) runs = coalesce_runs(filtered_recordings, MAX_RUN_SECONDS, RUN_GAP_EPSILON)
if not runs: if not runs:
@ -651,7 +652,7 @@ class MotionSearchRunner(threading.Thread):
time_map = build_segment_time_map(run) time_map = build_segment_time_map(run)
if self.use_keyframe: if self.use_keyframe:
kf_pts = probe_vod_keyframe_pts(_ffprobe_path(self.ffmpeg_path), vod_url) kf_pts = probe_vod_keyframe_pts(self.ffprobe_path, vod_url)
frames = list( frames = list(
iter_vod_frames( iter_vod_frames(
self.ffmpeg_path, self.ffmpeg_path,

View File

@ -1,15 +1,13 @@
"""Hardware-accelerated ffmpeg decode for motion search. """Hardware-accelerated ffmpeg decode for motion search.
Replaces the per-frame ``cv2.VideoCapture`` decode loop with an ffmpeg Decodes a recording run's VOD/HLS stream with an ffmpeg subprocess, optionally
subprocess that decodes a recording segment, selects only the sampled frame selecting only keyframes, and streams raw frames over a pipe for the motion
indices in the filter graph, and streams raw ``bgr24`` frames over a pipe. math. Output is the requested ``pix_fmt`` (gray or ``bgr24``) with optional
Output stays ``bgr24`` (not the gray plane) so the existing motion math is crop/scale applied in the filter graph so downstream pixels are unchanged.
byte-identical; no scaling is applied so pixels are unchanged.
""" """
import json import json
import logging import logging
import os
import subprocess as sp import subprocess as sp
import tempfile import tempfile
from collections.abc import Callable, Generator from collections.abc import Callable, Generator
@ -18,116 +16,61 @@ from typing import IO
import numpy as np import numpy as np
from frigate.config import CameraConfig from frigate.config import CameraConfig
from frigate.const import FFMPEG_HWACCEL_NVIDIA, FFMPEG_HWACCEL_VAAPI
from frigate.ffmpeg_presets import parse_preset_hardware_acceleration_decode from frigate.ffmpeg_presets import parse_preset_hardware_acceleration_decode
from frigate.util.services import auto_detect_hwaccel from frigate.util.services import auto_detect_hwaccel
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Only accelerators whose decoded surfaces download cleanly to nv12 are used for # Output-format surfaces that download cleanly to nv12 via the fixed
# motion search. The decode path appends a fixed ``hwdownload,format=nv12`` step, # ``hwdownload,format=nv12`` step the decode path appends. Other surfaces
# which is correct for these presets but not for surfaces like drm_prime (rkmpp), # (drm_prime from rkmpp, vulkan, amf) need a different download step, so motion
# vulkan, or amf. Anything outside this set decodes in software so results stay # search decodes them in software to keep results byte-identical rather than risk
# byte-identical instead of risking a wrong-but-valid-sized frame that the # a wrong-but-valid-sized frame the zero-frame fallback gate would not catch.
# zero-frame fallback gate would not catch. _NV12_OUTPUT_FORMATS = frozenset({"vaapi", "cuda", "qsv"})
_NV12_HWACCEL_PRESETS = frozenset(
{
FFMPEG_HWACCEL_VAAPI,
FFMPEG_HWACCEL_NVIDIA,
"preset-intel-qsv-h264",
"preset-intel-qsv-h265",
}
)
def build_motion_decode_command( def _hwaccel_output_format(decode_args: list[str]) -> str | None:
ffmpeg_path: str, """Return the ``-hwaccel_output_format`` value in ffmpeg args, or None."""
recording_path: str, try:
start_frame: int, idx = decode_args.index("-hwaccel_output_format")
end_frame: int, except ValueError:
frame_step: int, return None
decode_args: list[str], return decode_args[idx + 1] if idx + 1 < len(decode_args) else None
*,
crop: tuple[int, int, int, int] | None = None,
scale: tuple[int, int] | None = None,
gray: bool = False,
) -> list[str]:
"""Build the ffmpeg argv for decoding the sampled frames of a segment.
The ``select`` expression is frame-number based so it lands on the exact
absolute frame indices the motion loop processes. Optional ``crop`` (w, h,
x, y), ``scale`` (w, h), and ``gray`` shrink the output for the lossy modes;
with all three at their defaults the command is identical to the full-res
bgr24 path.
"""
select = (
f"select=gte(n\\,{start_frame})"
f"*not(mod(n-{start_frame}\\,{frame_step}))"
f"*lt(n\\,{end_frame})"
)
filters = [select]
# With hwaccel, decoded frames are GPU surfaces; pull them back to system
# memory before any CPU crop/scale and the rawvideo encoder.
if decode_args:
filters.append("hwdownload")
filters.append("format=nv12")
if crop is not None:
cw, ch, cx, cy = crop
filters.append(f"crop={cw}:{ch}:{cx}:{cy}")
if scale is not None:
sw, sh = scale
filters.append(f"scale={sw}:{sh}")
vf = ",".join(filters)
pix_fmt = "gray" if gray else "bgr24"
return [
ffmpeg_path,
"-hide_banner",
"-loglevel",
"error",
*decode_args,
"-i",
recording_path,
"-an",
"-vf",
vf,
"-vsync",
"0",
"-f",
"rawvideo",
"-pix_fmt",
pix_fmt,
"pipe:",
]
def resolve_motion_decode_args(camera_config: CameraConfig) -> list[str]: def resolve_motion_decode_args(camera_config: CameraConfig) -> list[str]:
"""Resolve the ffmpeg hwaccel decode args for a camera's recordings. """Resolve the ffmpeg hwaccel decode args for a camera's recordings.
``auto`` is resolved via ``auto_detect_hwaccel``. Only presets in ``auto`` is resolved via ``auto_detect_hwaccel`` and the preset is expanded
``_NV12_HWACCEL_PRESETS`` are accelerated; any other value (an exotic preset by ``parse_preset_hardware_acceleration_decode`` (the same table the live
or custom args) returns an empty list so the segment decodes in software, pipeline uses). Acceleration is kept only when the decoded surface downloads
preserving byte-identical results. An empty list means software decode. cleanly to nv12 -- decided by reading ``-hwaccel_output_format`` back from the
resolved args rather than a separate preset allowlist that could drift from
``PRESETS_HW_ACCEL_DECODE``. Anything else (custom args, a software-only
preset, or an nv12-incompatible surface) returns an empty list, meaning
software decode, so results stay byte-identical.
""" """
raw = camera_config.ffmpeg.hwaccel_args raw = camera_config.ffmpeg.hwaccel_args
preset = auto_detect_hwaccel() if raw == "auto" else raw preset = auto_detect_hwaccel() if raw == "auto" else raw
# Custom args (a list) or any non-allowlisted preset decode in software. # Custom args (a list) decode in software so results stay byte-identical.
if not isinstance(preset, str) or preset not in _NV12_HWACCEL_PRESETS: if not isinstance(preset, str):
return [] return []
return ( decode_args = parse_preset_hardware_acceleration_decode(
parse_preset_hardware_acceleration_decode( preset,
preset, camera_config.detect.fps,
camera_config.detect.fps, camera_config.detect.width or 0,
camera_config.detect.width or 0, camera_config.detect.height or 0,
camera_config.detect.height or 0, camera_config.ffmpeg.gpu,
camera_config.ffmpeg.gpu,
)
or []
) )
if not decode_args:
return []
if _hwaccel_output_format(decode_args) not in _NV12_OUTPUT_FORMATS:
return []
return decode_args
def _read_exact(stream: IO[bytes], size: int) -> bytes | None: def _read_exact(stream: IO[bytes], size: int) -> bytes | None:
@ -164,218 +107,9 @@ def _terminate(proc: sp.Popen[bytes]) -> None:
proc.wait() proc.wait()
def _expected_frame_count(start_frame: int, end_frame: int, frame_step: int) -> int:
"""Number of sampled frames the select filter should emit for a segment."""
if end_frame <= start_frame:
return 0
return (end_frame - start_frame + frame_step - 1) // frame_step
def _decode_segment(
cmd: list[str],
recording_path: str,
frame_size: int,
frame_width: int,
frame_height: int,
start_frame: int,
end_frame: int,
frame_step: int,
should_stop: Callable[[], bool],
*,
channels: int = 3,
) -> Generator[tuple[int, np.ndarray], None, int]:
"""Run one ffmpeg decode and yield (absolute_frame_index, bgr_frame).
Returns the number of frames produced so the caller can decide whether a
fallback decode is needed. ffmpeg stderr is captured to a temp file (not a
pipe, to avoid a full-pipe stall while we read stdout) and surfaced if the
decode ends before all expected frames were produced.
"""
stopped = False
# SpooledTemporaryFile keeps the (small, -loglevel error) stderr in memory.
stderr_file = tempfile.SpooledTemporaryFile(max_size=65536)
proc = sp.Popen(cmd, stdout=sp.PIPE, stderr=stderr_file)
assert proc.stdout is not None # stdout=PIPE always provides a stream
count = 0
frame_idx = start_frame
try:
while frame_idx < end_frame:
if should_stop():
stopped = True
break
buf = _read_exact(proc.stdout, frame_size)
if buf is None:
break
# frombuffer returns a read-only view; the consumer copies it via
# cv2.cvtColor before any mutation, so this is safe.
if channels == 1:
frame = np.frombuffer(buf, dtype=np.uint8).reshape(
(frame_height, frame_width)
)
else:
frame = np.frombuffer(buf, dtype=np.uint8).reshape(
(frame_height, frame_width, channels)
)
count += 1
yield frame_idx, frame
frame_idx += frame_step
finally:
_terminate(proc)
expected = _expected_frame_count(start_frame, end_frame, frame_step)
if not stopped and not should_stop() and count < expected:
# The decode ended early (e.g. the process died mid-stream). Frames
# already yielded cannot be retracted, so this segment's results may
# be incomplete -- surface it rather than fail silently.
stderr_file.seek(0)
err = stderr_file.read().decode("utf-8", "replace").strip()
err_tail = err.splitlines()[-1] if err else "no ffmpeg error output"
logger.warning(
"Motion decode of %s ended early: produced %d of %d expected frames (%s)",
recording_path,
count,
expected,
err_tail,
)
stderr_file.close()
return count
def iter_segment_frames(
ffmpeg_path: str,
recording_path: str,
start_frame: int,
end_frame: int,
frame_step: int,
frame_width: int,
frame_height: int,
decode_args: list[str],
should_stop: Callable[[], bool],
*,
channels: int = 3,
crop: tuple[int, int, int, int] | None = None,
scale: tuple[int, int] | None = None,
gray: bool = False,
) -> Generator[tuple[int, np.ndarray], None, None]:
"""Yield sampled (absolute_frame_index, bgr_frame) tuples for a segment.
Tries hardware-accelerated decode when ``decode_args`` is non-empty. If that
produces no frames (e.g. the preset is unsupported on this host), falls back
once to software decode. Optional ``crop``/``scale``/``gray`` (with a
matching ``channels`` count) shrink the output for the lossy modes; the
defaults yield the full-res bgr24 path unchanged.
"""
frame_size = frame_width * frame_height * channels
if decode_args:
hw_cmd = build_motion_decode_command(
ffmpeg_path,
recording_path,
start_frame,
end_frame,
frame_step,
decode_args,
crop=crop,
scale=scale,
gray=gray,
)
produced = yield from _decode_segment(
hw_cmd,
recording_path,
frame_size,
frame_width,
frame_height,
start_frame,
end_frame,
frame_step,
should_stop,
channels=channels,
)
if produced > 0 or should_stop():
return
logger.warning(
"Hardware decode produced no frames for %s, falling back to software decode",
recording_path,
)
sw_cmd = build_motion_decode_command(
ffmpeg_path,
recording_path,
start_frame,
end_frame,
frame_step,
[],
crop=crop,
scale=scale,
gray=gray,
)
yield from _decode_segment(
sw_cmd,
recording_path,
frame_size,
frame_width,
frame_height,
start_frame,
end_frame,
frame_step,
should_stop,
channels=channels,
)
KEYFRAME_MAX_GAP_SECONDS = 2.0 KEYFRAME_MAX_GAP_SECONDS = 2.0
def _ffprobe_path(ffmpeg_path: str) -> str:
"""Derive the ffprobe path that sits next to the configured ffmpeg binary."""
directory = os.path.dirname(ffmpeg_path)
return os.path.join(directory, "ffprobe") if directory else "ffprobe"
def probe_keyframe_pts(ffprobe_path: str, recording_path: str) -> list[float]:
"""Return sorted keyframe presentation timestamps (seconds) for a segment.
Reads video packet flags via ffprobe (no decode). Returns [] on any failure
so callers fall back to every-Nth sampling.
"""
cmd = [
ffprobe_path,
"-v",
"error",
"-select_streams",
"v:0",
"-show_packets",
"-show_entries",
"packet=pts_time,flags",
"-of",
"json",
recording_path,
]
try:
completed = sp.run(cmd, capture_output=True, text=True, timeout=30)
except (OSError, sp.SubprocessError):
logger.warning("ffprobe failed for keyframe probe of %s", recording_path)
return []
if completed.returncode != 0 or not completed.stdout:
return []
try:
packets = json.loads(completed.stdout).get("packets", [])
except json.JSONDecodeError:
return []
pts: list[float] = []
for pkt in packets:
flags = pkt.get("flags", "")
pts_time = pkt.get("pts_time")
if flags.startswith("K") and pts_time is not None:
try:
pts.append(float(pts_time))
except ValueError:
continue
return sorted(pts)
def keyframe_sampling_eligible( def keyframe_sampling_eligible(
keyframe_pts: list[float], max_gap: float = KEYFRAME_MAX_GAP_SECONDS keyframe_pts: list[float], max_gap: float = KEYFRAME_MAX_GAP_SECONDS
) -> bool: ) -> bool:
@ -390,85 +124,6 @@ def keyframe_sampling_eligible(
return max(gaps) <= max_gap return max(gaps) <= max_gap
def build_keyframe_decode_command(
ffmpeg_path: str,
recording_path: str,
decode_args: list[str],
crop: tuple[int, int, int, int] | None,
scale: tuple[int, int] | None,
gray: bool,
) -> list[str]:
"""Build an ffmpeg argv that decodes only keyframes (-skip_frame nokey)."""
filters = []
if decode_args:
filters.append("hwdownload")
filters.append("format=nv12")
if crop is not None:
cw, ch, cx, cy = crop
filters.append(f"crop={cw}:{ch}:{cx}:{cy}")
if scale is not None:
sw, sh = scale
filters.append(f"scale={sw}:{sh}")
pix_fmt = "gray" if gray else "bgr24"
cmd = [
ffmpeg_path,
"-hide_banner",
"-loglevel",
"error",
"-skip_frame",
"nokey",
*decode_args,
"-i",
recording_path,
"-an",
]
if filters:
cmd += ["-vf", ",".join(filters)]
cmd += ["-vsync", "0", "-f", "rawvideo", "-pix_fmt", pix_fmt, "pipe:"]
return cmd
def iter_keyframe_frames(
ffmpeg_path: str,
recording_path: str,
out_width: int,
out_height: int,
channels: int,
decode_args: list[str],
crop: tuple[int, int, int, int] | None,
scale: tuple[int, int] | None,
gray: bool,
should_stop: Callable[[], bool],
) -> Generator[np.ndarray, None, None]:
"""Yield decoded keyframes in order (no frame index; pair with probed PTS)."""
cmd = build_keyframe_decode_command(
ffmpeg_path, recording_path, decode_args, crop, scale, gray
)
frame_size = out_width * out_height * channels
stderr_file = tempfile.SpooledTemporaryFile(max_size=65536)
proc = sp.Popen(cmd, stdout=sp.PIPE, stderr=stderr_file)
assert proc.stdout is not None
try:
while True:
if should_stop():
break
buf = _read_exact(proc.stdout, frame_size)
if buf is None:
break
if channels == 1:
yield np.frombuffer(buf, dtype=np.uint8).reshape(
(out_height, out_width)
)
else:
yield np.frombuffer(buf, dtype=np.uint8).reshape(
(out_height, out_width, channels)
)
finally:
_terminate(proc)
stderr_file.close()
VOD_PROTOCOL_ARGS = ["-protocol_whitelist", "pipe,file,http,tcp"] VOD_PROTOCOL_ARGS = ["-protocol_whitelist", "pipe,file,http,tcp"]

View File

@ -1,24 +1,13 @@
"""Tests for the motion search hardware-accelerated decode helpers.""" """Tests for the motion search hardware-accelerated decode helpers."""
import os
import shutil
import subprocess as sp
import tempfile
import unittest import unittest
from types import SimpleNamespace from types import SimpleNamespace
from unittest import mock from unittest import mock
import cv2
import numpy as np
from frigate.jobs.motion_search_decode import ( from frigate.jobs.motion_search_decode import (
KEYFRAME_MAX_GAP_SECONDS, KEYFRAME_MAX_GAP_SECONDS,
build_motion_decode_command,
build_vod_decode_command, build_vod_decode_command,
iter_keyframe_frames,
iter_segment_frames,
keyframe_sampling_eligible, keyframe_sampling_eligible,
probe_keyframe_pts,
probe_video_dimensions, probe_video_dimensions,
probe_vod_keyframe_pts, probe_vod_keyframe_pts,
resolve_motion_decode_args, resolve_motion_decode_args,
@ -51,242 +40,27 @@ class TestResolveMotionDecodeArgs(unittest.TestCase):
) )
def test_custom_args_fall_back_to_software(self): def test_custom_args_fall_back_to_software(self):
# Arbitrary custom hwaccel args are not allowlisted, so they decode in # Arbitrary custom hwaccel args (a list, not a preset) decode in software
# software to preserve byte-identical results. # to preserve byte-identical results.
self.assertEqual( self.assertEqual(
resolve_motion_decode_args(_fake_camera_config(["-hwaccel", "vulkan"])), resolve_motion_decode_args(_fake_camera_config(["-hwaccel", "vulkan"])),
[], [],
) )
def test_nvidia_codec_preset_is_accelerated(self):
# Codec-specific nvidia presets resolve to the same cuda decode args as
# the bare preset, so eligibility is derived from -hwaccel_output_format
# rather than a hardcoded list that omitted these aliases.
args = resolve_motion_decode_args(_fake_camera_config("preset-nvidia-h264"))
self.assertIn("-hwaccel_output_format", args)
self.assertIn("cuda", args)
class TestBuildMotionDecodeCommand(unittest.TestCase): def test_software_only_preset_falls_back_to_software(self):
def test_software_command_has_no_hwdownload(self): # A preset with no -hwaccel_output_format (decoder-based, no GPU surface)
cmd = build_motion_decode_command( # cannot use the nv12 download step, so it decodes in software.
ffmpeg_path="ffmpeg", self.assertEqual(
recording_path="/tmp/seg.mp4", resolve_motion_decode_args(_fake_camera_config("preset-rpi-64-h264")), []
start_frame=100,
end_frame=400,
frame_step=30,
decode_args=[],
) )
vf = cmd[cmd.index("-vf") + 1]
self.assertEqual(vf, "select=gte(n\\,100)*not(mod(n-100\\,30))*lt(n\\,400)")
self.assertNotIn("hwdownload", vf)
self.assertIn("bgr24", cmd)
self.assertEqual(cmd[-1], "pipe:")
def test_hwaccel_command_appends_hwdownload_and_decode_args(self):
cmd = build_motion_decode_command(
ffmpeg_path="/usr/lib/ffmpeg",
recording_path="/tmp/seg.mp4",
start_frame=0,
end_frame=300,
frame_step=30,
decode_args=["-hwaccel", "vaapi", "-hwaccel_output_format", "vaapi"],
)
# decode args must come before -i
self.assertLess(cmd.index("-hwaccel"), cmd.index("-i"))
vf = cmd[cmd.index("-vf") + 1]
self.assertTrue(vf.endswith(",hwdownload,format=nv12"))
self.assertTrue(vf.startswith("select=gte(n\\,0)"))
def test_ffmpeg_path_is_first_token(self):
cmd = build_motion_decode_command(
ffmpeg_path="/usr/lib/ffmpeg",
recording_path="/tmp/seg.mp4",
start_frame=0,
end_frame=30,
frame_step=30,
decode_args=[],
)
self.assertEqual(cmd[0], "/usr/lib/ffmpeg")
def test_crop_scale_gray_filters_and_pix_fmt(self):
cmd = build_motion_decode_command(
ffmpeg_path="ffmpeg",
recording_path="/tmp/seg.mp4",
start_frame=0,
end_frame=300,
frame_step=30,
decode_args=[],
crop=(640, 480, 100, 50), # w, h, x, y
scale=(320, 240),
gray=True,
)
vf = cmd[cmd.index("-vf") + 1]
self.assertIn("crop=640:480:100:50", vf)
self.assertIn("scale=320:240", vf)
# crop must come before scale
self.assertLess(vf.index("crop="), vf.index("scale="))
self.assertEqual(cmd[cmd.index("-pix_fmt") + 1], "gray")
def test_hwaccel_with_crop_scale_orders_download_first(self):
cmd = build_motion_decode_command(
ffmpeg_path="ffmpeg",
recording_path="/tmp/seg.mp4",
start_frame=0,
end_frame=300,
frame_step=30,
decode_args=["-hwaccel", "vaapi", "-hwaccel_output_format", "vaapi"],
crop=(640, 480, 0, 0),
scale=(320, 240),
gray=True,
)
vf = cmd[cmd.index("-vf") + 1]
self.assertLess(vf.index("hwdownload"), vf.index("crop="))
self.assertLess(vf.index("crop="), vf.index("scale="))
def _ffmpeg_available() -> bool:
return shutil.which("ffmpeg") is not None
@unittest.skipUnless(_ffmpeg_available(), "ffmpeg not available")
class TestIterSegmentFrames(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.tmpdir = tempfile.mkdtemp()
cls.path = os.path.join(cls.tmpdir, "sample.mp4")
# 90 frames @ 30fps (3s) of moving test content, H.264.
sp.run(
[
"ffmpeg",
"-hide_banner",
"-loglevel",
"error",
"-y",
"-f",
"lavfi",
"-i",
"testsrc=size=320x240:rate=30:duration=3",
"-c:v",
"libx264",
"-pix_fmt",
"yuv420p",
"-g",
"15",
cls.path,
],
check=True,
)
@classmethod
def tearDownClass(cls):
shutil.rmtree(cls.tmpdir, ignore_errors=True)
def test_yields_expected_indices(self):
frames = list(
iter_segment_frames(
ffmpeg_path="ffmpeg",
recording_path=self.path,
start_frame=0,
end_frame=90,
frame_step=30,
frame_width=320,
frame_height=240,
decode_args=[],
should_stop=lambda: False,
)
)
indices = [idx for idx, _ in frames]
self.assertEqual(indices, [0, 30, 60])
for _, frame in frames:
self.assertEqual(frame.shape, (240, 320, 3))
def test_frames_match_cv2_decode(self):
# Parity gate: ffmpeg-decoded sampled frames must equal cv2-decoded
# frames at the same absolute indices.
ffmpeg_frames = {
idx: frame
for idx, frame in iter_segment_frames(
ffmpeg_path="ffmpeg",
recording_path=self.path,
start_frame=0,
end_frame=90,
frame_step=30,
frame_width=320,
frame_height=240,
decode_args=[],
should_stop=lambda: False,
)
}
cap = cv2.VideoCapture(self.path)
try:
for idx in (0, 30, 60):
cap.set(cv2.CAP_PROP_POS_FRAMES, idx)
ret, cv2_frame = cap.read()
self.assertTrue(ret)
np.testing.assert_array_equal(ffmpeg_frames[idx], cv2_frame)
finally:
cap.release()
def test_crop_scale_gray_output_shape(self):
# Crop a 160x120 region, scale to 80x60, gray: frames must be (60, 80).
frames = list(
iter_segment_frames(
ffmpeg_path="ffmpeg",
recording_path=self.path,
start_frame=0,
end_frame=90,
frame_step=30,
frame_width=80,
frame_height=60,
decode_args=[],
should_stop=lambda: False,
channels=1,
crop=(160, 120, 0, 0),
scale=(80, 60),
gray=True,
)
)
self.assertEqual([idx for idx, _ in frames], [0, 30, 60])
for _, frame in frames:
self.assertEqual(frame.shape, (60, 80))
self.assertEqual(frame.dtype, np.uint8)
def test_keyframe_decode_yields_only_keyframes(self):
frames = list(
iter_keyframe_frames(
ffmpeg_path="ffmpeg",
recording_path=self.path,
out_width=320,
out_height=240,
channels=3,
decode_args=[],
crop=None,
scale=None,
gray=False,
should_stop=lambda: False,
)
)
# 90 frames @ -g 15 -> 6 keyframes (0,15,30,45,60,75).
self.assertEqual(len(frames), 6)
for f in frames:
self.assertEqual(f.shape, (240, 320, 3))
def test_software_fallback_on_bad_hwaccel(self):
# An invalid hwaccel device should fail fast and fall back to software,
# still producing all expected frames.
frames = list(
iter_segment_frames(
ffmpeg_path="ffmpeg",
recording_path=self.path,
start_frame=0,
end_frame=90,
frame_step=30,
frame_width=320,
frame_height=240,
decode_args=[
"-hwaccel",
"vaapi",
"-hwaccel_device",
"/dev/dri/doesnotexist",
],
should_stop=lambda: False,
)
)
self.assertEqual([idx for idx, _ in frames], [0, 30, 60])
class TestKeyframeEligibility(unittest.TestCase): class TestKeyframeEligibility(unittest.TestCase):
@ -310,29 +84,6 @@ class TestKeyframeEligibility(unittest.TestCase):
self.assertEqual(KEYFRAME_MAX_GAP_SECONDS, 2.0) self.assertEqual(KEYFRAME_MAX_GAP_SECONDS, 2.0)
class TestProbeKeyframePts(unittest.TestCase):
def test_parses_keyframe_packets(self):
sample = (
'{"packets":['
'{"pts_time":"0.000000","flags":"K__"},'
'{"pts_time":"0.033333","flags":"___"},'
'{"pts_time":"0.500000","flags":"K__"}]}'
)
completed = mock.Mock(stdout=sample, returncode=0)
with mock.patch(
"frigate.jobs.motion_search_decode.sp.run", return_value=completed
):
pts = probe_keyframe_pts("ffprobe", "/tmp/seg.mp4")
self.assertEqual(pts, [0.0, 0.5])
def test_returns_empty_on_probe_failure(self):
with mock.patch(
"frigate.jobs.motion_search_decode.sp.run",
side_effect=OSError("boom"),
):
self.assertEqual(probe_keyframe_pts("ffprobe", "/tmp/seg.mp4"), [])
class TestVodDecodeCommand(unittest.TestCase): class TestVodDecodeCommand(unittest.TestCase):
URL = "http://127.0.0.1:5000/vod/cam/start/1/end/2/index.m3u8" URL = "http://127.0.0.1:5000/vod/cam/start/1/end/2/index.m3u8"

View File

@ -1,9 +1,5 @@
"""Tests for motion search spatial (crop/scale/mask) helpers.""" """Tests for motion search spatial (crop/scale/mask) helpers."""
import os
import shutil
import subprocess as sp
import tempfile
import unittest import unittest
import numpy as np import numpy as np
@ -13,12 +9,6 @@ from frigate.jobs.motion_search import (
compute_roi_crop_and_scale, compute_roi_crop_and_scale,
detect_motion_scaled, detect_motion_scaled,
) )
from frigate.jobs.motion_search_decode import (
iter_keyframe_frames,
iter_segment_frames,
keyframe_sampling_eligible,
probe_keyframe_pts,
)
class TestComputeRoiCropAndScale(unittest.TestCase): class TestComputeRoiCropAndScale(unittest.TestCase):
@ -93,101 +83,5 @@ class TestDetectMotionScaled(unittest.TestCase):
self.assertEqual(results, []) self.assertEqual(results, [])
def _ffmpeg_available():
return shutil.which("ffmpeg") is not None
@unittest.skipUnless(_ffmpeg_available(), "ffmpeg not available")
class TestBalancedDecodePath(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.tmp = tempfile.mkdtemp()
cls.path = os.path.join(cls.tmp, "motion.mp4")
# testsrc has continuous motion across the whole frame.
sp.run(
[
"ffmpeg",
"-hide_banner",
"-loglevel",
"error",
"-y",
"-f",
"lavfi",
"-i",
"testsrc=size=640x480:rate=30:duration=3",
"-c:v",
"libx264",
"-pix_fmt",
"yuv420p",
"-g",
"15",
cls.path,
],
check=True,
)
@classmethod
def tearDownClass(cls):
shutil.rmtree(cls.tmp, ignore_errors=True)
def test_balanced_decode_then_detect_finds_motion(self):
crop, scaled = compute_roi_crop_and_scale(
[[0.0, 0.0], [1.0, 0.0], [1.0, 1.0], [0.0, 1.0]], 640, 480, scale_target=160
)
mask = build_scaled_roi_mask(
[[0.0, 0.0], [1.0, 0.0], [1.0, 1.0], [0.0, 1.0]], 640, 480, crop, scaled
)
frames = iter_segment_frames(
"ffmpeg",
self.path,
0,
90,
30,
scaled[0],
scaled[1],
[],
lambda: False,
channels=1,
crop=crop,
scale=scaled,
gray=True,
)
results = detect_motion_scaled(
list(frames), mask, threshold=20, min_area=1.0, timestamp_fn=float
)
# testsrc moves every frame, so consecutive sampled frames differ.
self.assertGreater(len(results), 0)
def test_keyframe_path_finds_motion(self):
crop, scaled = compute_roi_crop_and_scale(
[[0.0, 0.0], [1.0, 0.0], [1.0, 1.0], [0.0, 1.0]], 640, 480, scale_target=160
)
mask = build_scaled_roi_mask(
[[0.0, 0.0], [1.0, 0.0], [1.0, 1.0], [0.0, 1.0]], 640, 480, crop, scaled
)
pts = probe_keyframe_pts("ffprobe", self.path)
self.assertTrue(keyframe_sampling_eligible(pts))
frames = list(
iter_keyframe_frames(
"ffmpeg",
self.path,
scaled[0],
scaled[1],
1,
[],
crop,
scaled,
True,
lambda: False,
)
)
self.assertEqual(len(frames), len(pts)) # count matches probe
indexed = list(enumerate(frames))
results = detect_motion_scaled(
indexed, mask, threshold=20, min_area=1.0, timestamp_fn=lambda i: pts[i]
)
self.assertGreater(len(results), 0)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()