From 3ecf924bc9dcfac1fdd5c6849fca4396ced723fe Mon Sep 17 00:00:00 2001 From: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com> Date: Mon, 1 Jun 2026 10:34:05 -0500 Subject: [PATCH] cleanup dead code and tests --- frigate/jobs/motion_search.py | 7 +- frigate/jobs/motion_search_decode.py | 425 ++------------------- frigate/test/test_motion_search_decode.py | 277 +------------- frigate/test/test_motion_search_spatial.py | 106 ----- 4 files changed, 58 insertions(+), 757 deletions(-) diff --git a/frigate/jobs/motion_search.py b/frigate/jobs/motion_search.py index d474cc6fa2..13fc841e99 100644 --- a/frigate/jobs/motion_search.py +++ b/frigate/jobs/motion_search.py @@ -27,7 +27,6 @@ from frigate.jobs.motion_search_batch import ( stream_time_to_absolute, ) from frigate.jobs.motion_search_decode import ( - _ffprobe_path, iter_vod_frames, keyframe_sampling_eligible, probe_video_dimensions, @@ -361,6 +360,7 @@ class MotionSearchRunner(threading.Thread): # Resolved once per job in _execute_search self.ffmpeg_path: str = "ffmpeg" + self.ffprobe_path: str = "ffprobe" self.decode_args: list[str] = [] # Keyframe sampling decision, decided once per job from the first run's # GOP. The fallback cadence is a fixed rate (see FALLBACK_SAMPLE_FPS). @@ -451,6 +451,7 @@ class MotionSearchRunner(threading.Thread): raise ValueError(f"Camera {camera_name} detect dimensions not configured") self.ffmpeg_path = camera_config.ffmpeg.ffmpeg_path + self.ffprobe_path = camera_config.ffmpeg.ffprobe_path # Create polygon mask polygon_mask = create_polygon_mask( @@ -561,7 +562,7 @@ class MotionSearchRunner(threading.Thread): # what we decode, so crop/scale/mask are computed against it. self.internal_port = resolve_internal_port(self.config) self.decode_args = resolve_motion_decode_args(camera_config) - ffprobe_path = _ffprobe_path(self.ffmpeg_path) + ffprobe_path = self.ffprobe_path runs = coalesce_runs(filtered_recordings, MAX_RUN_SECONDS, RUN_GAP_EPSILON) if not runs: @@ -651,7 +652,7 @@ class MotionSearchRunner(threading.Thread): time_map = build_segment_time_map(run) if self.use_keyframe: - kf_pts = probe_vod_keyframe_pts(_ffprobe_path(self.ffmpeg_path), vod_url) + kf_pts = probe_vod_keyframe_pts(self.ffprobe_path, vod_url) frames = list( iter_vod_frames( self.ffmpeg_path, diff --git a/frigate/jobs/motion_search_decode.py b/frigate/jobs/motion_search_decode.py index 783d1f3330..4b1d518013 100644 --- a/frigate/jobs/motion_search_decode.py +++ b/frigate/jobs/motion_search_decode.py @@ -1,15 +1,13 @@ """Hardware-accelerated ffmpeg decode for motion search. -Replaces the per-frame ``cv2.VideoCapture`` decode loop with an ffmpeg -subprocess that decodes a recording segment, selects only the sampled frame -indices in the filter graph, and streams raw ``bgr24`` frames over a pipe. -Output stays ``bgr24`` (not the gray plane) so the existing motion math is -byte-identical; no scaling is applied so pixels are unchanged. +Decodes a recording run's VOD/HLS stream with an ffmpeg subprocess, optionally +selecting only keyframes, and streams raw frames over a pipe for the motion +math. Output is the requested ``pix_fmt`` (gray or ``bgr24``) with optional +crop/scale applied in the filter graph so downstream pixels are unchanged. """ import json import logging -import os import subprocess as sp import tempfile from collections.abc import Callable, Generator @@ -18,116 +16,61 @@ from typing import IO import numpy as np from frigate.config import CameraConfig -from frigate.const import FFMPEG_HWACCEL_NVIDIA, FFMPEG_HWACCEL_VAAPI from frigate.ffmpeg_presets import parse_preset_hardware_acceleration_decode from frigate.util.services import auto_detect_hwaccel logger = logging.getLogger(__name__) -# Only accelerators whose decoded surfaces download cleanly to nv12 are used for -# motion search. The decode path appends a fixed ``hwdownload,format=nv12`` step, -# which is correct for these presets but not for surfaces like drm_prime (rkmpp), -# vulkan, or amf. Anything outside this set decodes in software so results stay -# byte-identical instead of risking a wrong-but-valid-sized frame that the -# zero-frame fallback gate would not catch. -_NV12_HWACCEL_PRESETS = frozenset( - { - FFMPEG_HWACCEL_VAAPI, - FFMPEG_HWACCEL_NVIDIA, - "preset-intel-qsv-h264", - "preset-intel-qsv-h265", - } -) +# Output-format surfaces that download cleanly to nv12 via the fixed +# ``hwdownload,format=nv12`` step the decode path appends. Other surfaces +# (drm_prime from rkmpp, vulkan, amf) need a different download step, so motion +# search decodes them in software to keep results byte-identical rather than risk +# a wrong-but-valid-sized frame the zero-frame fallback gate would not catch. +_NV12_OUTPUT_FORMATS = frozenset({"vaapi", "cuda", "qsv"}) -def build_motion_decode_command( - ffmpeg_path: str, - recording_path: str, - start_frame: int, - end_frame: int, - frame_step: int, - decode_args: list[str], - *, - crop: tuple[int, int, int, int] | None = None, - scale: tuple[int, int] | None = None, - gray: bool = False, -) -> list[str]: - """Build the ffmpeg argv for decoding the sampled frames of a segment. - - The ``select`` expression is frame-number based so it lands on the exact - absolute frame indices the motion loop processes. Optional ``crop`` (w, h, - x, y), ``scale`` (w, h), and ``gray`` shrink the output for the lossy modes; - with all three at their defaults the command is identical to the full-res - bgr24 path. - """ - select = ( - f"select=gte(n\\,{start_frame})" - f"*not(mod(n-{start_frame}\\,{frame_step}))" - f"*lt(n\\,{end_frame})" - ) - - filters = [select] - # With hwaccel, decoded frames are GPU surfaces; pull them back to system - # memory before any CPU crop/scale and the rawvideo encoder. - if decode_args: - filters.append("hwdownload") - filters.append("format=nv12") - if crop is not None: - cw, ch, cx, cy = crop - filters.append(f"crop={cw}:{ch}:{cx}:{cy}") - if scale is not None: - sw, sh = scale - filters.append(f"scale={sw}:{sh}") - - vf = ",".join(filters) - pix_fmt = "gray" if gray else "bgr24" - - return [ - ffmpeg_path, - "-hide_banner", - "-loglevel", - "error", - *decode_args, - "-i", - recording_path, - "-an", - "-vf", - vf, - "-vsync", - "0", - "-f", - "rawvideo", - "-pix_fmt", - pix_fmt, - "pipe:", - ] +def _hwaccel_output_format(decode_args: list[str]) -> str | None: + """Return the ``-hwaccel_output_format`` value in ffmpeg args, or None.""" + try: + idx = decode_args.index("-hwaccel_output_format") + except ValueError: + return None + return decode_args[idx + 1] if idx + 1 < len(decode_args) else None def resolve_motion_decode_args(camera_config: CameraConfig) -> list[str]: """Resolve the ffmpeg hwaccel decode args for a camera's recordings. - ``auto`` is resolved via ``auto_detect_hwaccel``. Only presets in - ``_NV12_HWACCEL_PRESETS`` are accelerated; any other value (an exotic preset - or custom args) returns an empty list so the segment decodes in software, - preserving byte-identical results. An empty list means software decode. + ``auto`` is resolved via ``auto_detect_hwaccel`` and the preset is expanded + by ``parse_preset_hardware_acceleration_decode`` (the same table the live + pipeline uses). Acceleration is kept only when the decoded surface downloads + cleanly to nv12 -- decided by reading ``-hwaccel_output_format`` back from the + resolved args rather than a separate preset allowlist that could drift from + ``PRESETS_HW_ACCEL_DECODE``. Anything else (custom args, a software-only + preset, or an nv12-incompatible surface) returns an empty list, meaning + software decode, so results stay byte-identical. """ raw = camera_config.ffmpeg.hwaccel_args preset = auto_detect_hwaccel() if raw == "auto" else raw - # Custom args (a list) or any non-allowlisted preset decode in software. - if not isinstance(preset, str) or preset not in _NV12_HWACCEL_PRESETS: + # Custom args (a list) decode in software so results stay byte-identical. + if not isinstance(preset, str): return [] - return ( - parse_preset_hardware_acceleration_decode( - preset, - camera_config.detect.fps, - camera_config.detect.width or 0, - camera_config.detect.height or 0, - camera_config.ffmpeg.gpu, - ) - or [] + decode_args = parse_preset_hardware_acceleration_decode( + preset, + camera_config.detect.fps, + camera_config.detect.width or 0, + camera_config.detect.height or 0, + camera_config.ffmpeg.gpu, ) + if not decode_args: + return [] + + if _hwaccel_output_format(decode_args) not in _NV12_OUTPUT_FORMATS: + return [] + + return decode_args def _read_exact(stream: IO[bytes], size: int) -> bytes | None: @@ -164,218 +107,9 @@ def _terminate(proc: sp.Popen[bytes]) -> None: proc.wait() -def _expected_frame_count(start_frame: int, end_frame: int, frame_step: int) -> int: - """Number of sampled frames the select filter should emit for a segment.""" - if end_frame <= start_frame: - return 0 - return (end_frame - start_frame + frame_step - 1) // frame_step - - -def _decode_segment( - cmd: list[str], - recording_path: str, - frame_size: int, - frame_width: int, - frame_height: int, - start_frame: int, - end_frame: int, - frame_step: int, - should_stop: Callable[[], bool], - *, - channels: int = 3, -) -> Generator[tuple[int, np.ndarray], None, int]: - """Run one ffmpeg decode and yield (absolute_frame_index, bgr_frame). - - Returns the number of frames produced so the caller can decide whether a - fallback decode is needed. ffmpeg stderr is captured to a temp file (not a - pipe, to avoid a full-pipe stall while we read stdout) and surfaced if the - decode ends before all expected frames were produced. - """ - stopped = False - # SpooledTemporaryFile keeps the (small, -loglevel error) stderr in memory. - stderr_file = tempfile.SpooledTemporaryFile(max_size=65536) - proc = sp.Popen(cmd, stdout=sp.PIPE, stderr=stderr_file) - assert proc.stdout is not None # stdout=PIPE always provides a stream - count = 0 - frame_idx = start_frame - try: - while frame_idx < end_frame: - if should_stop(): - stopped = True - break - buf = _read_exact(proc.stdout, frame_size) - if buf is None: - break - # frombuffer returns a read-only view; the consumer copies it via - # cv2.cvtColor before any mutation, so this is safe. - if channels == 1: - frame = np.frombuffer(buf, dtype=np.uint8).reshape( - (frame_height, frame_width) - ) - else: - frame = np.frombuffer(buf, dtype=np.uint8).reshape( - (frame_height, frame_width, channels) - ) - count += 1 - yield frame_idx, frame - frame_idx += frame_step - finally: - _terminate(proc) - expected = _expected_frame_count(start_frame, end_frame, frame_step) - if not stopped and not should_stop() and count < expected: - # The decode ended early (e.g. the process died mid-stream). Frames - # already yielded cannot be retracted, so this segment's results may - # be incomplete -- surface it rather than fail silently. - stderr_file.seek(0) - err = stderr_file.read().decode("utf-8", "replace").strip() - err_tail = err.splitlines()[-1] if err else "no ffmpeg error output" - logger.warning( - "Motion decode of %s ended early: produced %d of %d expected frames (%s)", - recording_path, - count, - expected, - err_tail, - ) - stderr_file.close() - return count - - -def iter_segment_frames( - ffmpeg_path: str, - recording_path: str, - start_frame: int, - end_frame: int, - frame_step: int, - frame_width: int, - frame_height: int, - decode_args: list[str], - should_stop: Callable[[], bool], - *, - channels: int = 3, - crop: tuple[int, int, int, int] | None = None, - scale: tuple[int, int] | None = None, - gray: bool = False, -) -> Generator[tuple[int, np.ndarray], None, None]: - """Yield sampled (absolute_frame_index, bgr_frame) tuples for a segment. - - Tries hardware-accelerated decode when ``decode_args`` is non-empty. If that - produces no frames (e.g. the preset is unsupported on this host), falls back - once to software decode. Optional ``crop``/``scale``/``gray`` (with a - matching ``channels`` count) shrink the output for the lossy modes; the - defaults yield the full-res bgr24 path unchanged. - """ - frame_size = frame_width * frame_height * channels - - if decode_args: - hw_cmd = build_motion_decode_command( - ffmpeg_path, - recording_path, - start_frame, - end_frame, - frame_step, - decode_args, - crop=crop, - scale=scale, - gray=gray, - ) - produced = yield from _decode_segment( - hw_cmd, - recording_path, - frame_size, - frame_width, - frame_height, - start_frame, - end_frame, - frame_step, - should_stop, - channels=channels, - ) - if produced > 0 or should_stop(): - return - logger.warning( - "Hardware decode produced no frames for %s, falling back to software decode", - recording_path, - ) - - sw_cmd = build_motion_decode_command( - ffmpeg_path, - recording_path, - start_frame, - end_frame, - frame_step, - [], - crop=crop, - scale=scale, - gray=gray, - ) - yield from _decode_segment( - sw_cmd, - recording_path, - frame_size, - frame_width, - frame_height, - start_frame, - end_frame, - frame_step, - should_stop, - channels=channels, - ) - - KEYFRAME_MAX_GAP_SECONDS = 2.0 -def _ffprobe_path(ffmpeg_path: str) -> str: - """Derive the ffprobe path that sits next to the configured ffmpeg binary.""" - directory = os.path.dirname(ffmpeg_path) - return os.path.join(directory, "ffprobe") if directory else "ffprobe" - - -def probe_keyframe_pts(ffprobe_path: str, recording_path: str) -> list[float]: - """Return sorted keyframe presentation timestamps (seconds) for a segment. - - Reads video packet flags via ffprobe (no decode). Returns [] on any failure - so callers fall back to every-Nth sampling. - """ - cmd = [ - ffprobe_path, - "-v", - "error", - "-select_streams", - "v:0", - "-show_packets", - "-show_entries", - "packet=pts_time,flags", - "-of", - "json", - recording_path, - ] - try: - completed = sp.run(cmd, capture_output=True, text=True, timeout=30) - except (OSError, sp.SubprocessError): - logger.warning("ffprobe failed for keyframe probe of %s", recording_path) - return [] - - if completed.returncode != 0 or not completed.stdout: - return [] - - try: - packets = json.loads(completed.stdout).get("packets", []) - except json.JSONDecodeError: - return [] - - pts: list[float] = [] - for pkt in packets: - flags = pkt.get("flags", "") - pts_time = pkt.get("pts_time") - if flags.startswith("K") and pts_time is not None: - try: - pts.append(float(pts_time)) - except ValueError: - continue - return sorted(pts) - - def keyframe_sampling_eligible( keyframe_pts: list[float], max_gap: float = KEYFRAME_MAX_GAP_SECONDS ) -> bool: @@ -390,85 +124,6 @@ def keyframe_sampling_eligible( return max(gaps) <= max_gap -def build_keyframe_decode_command( - ffmpeg_path: str, - recording_path: str, - decode_args: list[str], - crop: tuple[int, int, int, int] | None, - scale: tuple[int, int] | None, - gray: bool, -) -> list[str]: - """Build an ffmpeg argv that decodes only keyframes (-skip_frame nokey).""" - filters = [] - if decode_args: - filters.append("hwdownload") - filters.append("format=nv12") - if crop is not None: - cw, ch, cx, cy = crop - filters.append(f"crop={cw}:{ch}:{cx}:{cy}") - if scale is not None: - sw, sh = scale - filters.append(f"scale={sw}:{sh}") - - pix_fmt = "gray" if gray else "bgr24" - cmd = [ - ffmpeg_path, - "-hide_banner", - "-loglevel", - "error", - "-skip_frame", - "nokey", - *decode_args, - "-i", - recording_path, - "-an", - ] - if filters: - cmd += ["-vf", ",".join(filters)] - cmd += ["-vsync", "0", "-f", "rawvideo", "-pix_fmt", pix_fmt, "pipe:"] - return cmd - - -def iter_keyframe_frames( - ffmpeg_path: str, - recording_path: str, - out_width: int, - out_height: int, - channels: int, - decode_args: list[str], - crop: tuple[int, int, int, int] | None, - scale: tuple[int, int] | None, - gray: bool, - should_stop: Callable[[], bool], -) -> Generator[np.ndarray, None, None]: - """Yield decoded keyframes in order (no frame index; pair with probed PTS).""" - cmd = build_keyframe_decode_command( - ffmpeg_path, recording_path, decode_args, crop, scale, gray - ) - frame_size = out_width * out_height * channels - stderr_file = tempfile.SpooledTemporaryFile(max_size=65536) - proc = sp.Popen(cmd, stdout=sp.PIPE, stderr=stderr_file) - assert proc.stdout is not None - try: - while True: - if should_stop(): - break - buf = _read_exact(proc.stdout, frame_size) - if buf is None: - break - if channels == 1: - yield np.frombuffer(buf, dtype=np.uint8).reshape( - (out_height, out_width) - ) - else: - yield np.frombuffer(buf, dtype=np.uint8).reshape( - (out_height, out_width, channels) - ) - finally: - _terminate(proc) - stderr_file.close() - - VOD_PROTOCOL_ARGS = ["-protocol_whitelist", "pipe,file,http,tcp"] diff --git a/frigate/test/test_motion_search_decode.py b/frigate/test/test_motion_search_decode.py index 03a594a8f5..fda0837662 100644 --- a/frigate/test/test_motion_search_decode.py +++ b/frigate/test/test_motion_search_decode.py @@ -1,24 +1,13 @@ """Tests for the motion search hardware-accelerated decode helpers.""" -import os -import shutil -import subprocess as sp -import tempfile import unittest from types import SimpleNamespace from unittest import mock -import cv2 -import numpy as np - from frigate.jobs.motion_search_decode import ( KEYFRAME_MAX_GAP_SECONDS, - build_motion_decode_command, build_vod_decode_command, - iter_keyframe_frames, - iter_segment_frames, keyframe_sampling_eligible, - probe_keyframe_pts, probe_video_dimensions, probe_vod_keyframe_pts, resolve_motion_decode_args, @@ -51,242 +40,27 @@ class TestResolveMotionDecodeArgs(unittest.TestCase): ) def test_custom_args_fall_back_to_software(self): - # Arbitrary custom hwaccel args are not allowlisted, so they decode in - # software to preserve byte-identical results. + # Arbitrary custom hwaccel args (a list, not a preset) decode in software + # to preserve byte-identical results. self.assertEqual( resolve_motion_decode_args(_fake_camera_config(["-hwaccel", "vulkan"])), [], ) + def test_nvidia_codec_preset_is_accelerated(self): + # Codec-specific nvidia presets resolve to the same cuda decode args as + # the bare preset, so eligibility is derived from -hwaccel_output_format + # rather than a hardcoded list that omitted these aliases. + args = resolve_motion_decode_args(_fake_camera_config("preset-nvidia-h264")) + self.assertIn("-hwaccel_output_format", args) + self.assertIn("cuda", args) -class TestBuildMotionDecodeCommand(unittest.TestCase): - def test_software_command_has_no_hwdownload(self): - cmd = build_motion_decode_command( - ffmpeg_path="ffmpeg", - recording_path="/tmp/seg.mp4", - start_frame=100, - end_frame=400, - frame_step=30, - decode_args=[], + def test_software_only_preset_falls_back_to_software(self): + # A preset with no -hwaccel_output_format (decoder-based, no GPU surface) + # cannot use the nv12 download step, so it decodes in software. + self.assertEqual( + resolve_motion_decode_args(_fake_camera_config("preset-rpi-64-h264")), [] ) - vf = cmd[cmd.index("-vf") + 1] - self.assertEqual(vf, "select=gte(n\\,100)*not(mod(n-100\\,30))*lt(n\\,400)") - self.assertNotIn("hwdownload", vf) - self.assertIn("bgr24", cmd) - self.assertEqual(cmd[-1], "pipe:") - - def test_hwaccel_command_appends_hwdownload_and_decode_args(self): - cmd = build_motion_decode_command( - ffmpeg_path="/usr/lib/ffmpeg", - recording_path="/tmp/seg.mp4", - start_frame=0, - end_frame=300, - frame_step=30, - decode_args=["-hwaccel", "vaapi", "-hwaccel_output_format", "vaapi"], - ) - # decode args must come before -i - self.assertLess(cmd.index("-hwaccel"), cmd.index("-i")) - vf = cmd[cmd.index("-vf") + 1] - self.assertTrue(vf.endswith(",hwdownload,format=nv12")) - self.assertTrue(vf.startswith("select=gte(n\\,0)")) - - def test_ffmpeg_path_is_first_token(self): - cmd = build_motion_decode_command( - ffmpeg_path="/usr/lib/ffmpeg", - recording_path="/tmp/seg.mp4", - start_frame=0, - end_frame=30, - frame_step=30, - decode_args=[], - ) - self.assertEqual(cmd[0], "/usr/lib/ffmpeg") - - def test_crop_scale_gray_filters_and_pix_fmt(self): - cmd = build_motion_decode_command( - ffmpeg_path="ffmpeg", - recording_path="/tmp/seg.mp4", - start_frame=0, - end_frame=300, - frame_step=30, - decode_args=[], - crop=(640, 480, 100, 50), # w, h, x, y - scale=(320, 240), - gray=True, - ) - vf = cmd[cmd.index("-vf") + 1] - self.assertIn("crop=640:480:100:50", vf) - self.assertIn("scale=320:240", vf) - # crop must come before scale - self.assertLess(vf.index("crop="), vf.index("scale=")) - self.assertEqual(cmd[cmd.index("-pix_fmt") + 1], "gray") - - def test_hwaccel_with_crop_scale_orders_download_first(self): - cmd = build_motion_decode_command( - ffmpeg_path="ffmpeg", - recording_path="/tmp/seg.mp4", - start_frame=0, - end_frame=300, - frame_step=30, - decode_args=["-hwaccel", "vaapi", "-hwaccel_output_format", "vaapi"], - crop=(640, 480, 0, 0), - scale=(320, 240), - gray=True, - ) - vf = cmd[cmd.index("-vf") + 1] - self.assertLess(vf.index("hwdownload"), vf.index("crop=")) - self.assertLess(vf.index("crop="), vf.index("scale=")) - - -def _ffmpeg_available() -> bool: - return shutil.which("ffmpeg") is not None - - -@unittest.skipUnless(_ffmpeg_available(), "ffmpeg not available") -class TestIterSegmentFrames(unittest.TestCase): - @classmethod - def setUpClass(cls): - cls.tmpdir = tempfile.mkdtemp() - cls.path = os.path.join(cls.tmpdir, "sample.mp4") - # 90 frames @ 30fps (3s) of moving test content, H.264. - sp.run( - [ - "ffmpeg", - "-hide_banner", - "-loglevel", - "error", - "-y", - "-f", - "lavfi", - "-i", - "testsrc=size=320x240:rate=30:duration=3", - "-c:v", - "libx264", - "-pix_fmt", - "yuv420p", - "-g", - "15", - cls.path, - ], - check=True, - ) - - @classmethod - def tearDownClass(cls): - shutil.rmtree(cls.tmpdir, ignore_errors=True) - - def test_yields_expected_indices(self): - frames = list( - iter_segment_frames( - ffmpeg_path="ffmpeg", - recording_path=self.path, - start_frame=0, - end_frame=90, - frame_step=30, - frame_width=320, - frame_height=240, - decode_args=[], - should_stop=lambda: False, - ) - ) - indices = [idx for idx, _ in frames] - self.assertEqual(indices, [0, 30, 60]) - for _, frame in frames: - self.assertEqual(frame.shape, (240, 320, 3)) - - def test_frames_match_cv2_decode(self): - # Parity gate: ffmpeg-decoded sampled frames must equal cv2-decoded - # frames at the same absolute indices. - ffmpeg_frames = { - idx: frame - for idx, frame in iter_segment_frames( - ffmpeg_path="ffmpeg", - recording_path=self.path, - start_frame=0, - end_frame=90, - frame_step=30, - frame_width=320, - frame_height=240, - decode_args=[], - should_stop=lambda: False, - ) - } - - cap = cv2.VideoCapture(self.path) - try: - for idx in (0, 30, 60): - cap.set(cv2.CAP_PROP_POS_FRAMES, idx) - ret, cv2_frame = cap.read() - self.assertTrue(ret) - np.testing.assert_array_equal(ffmpeg_frames[idx], cv2_frame) - finally: - cap.release() - - def test_crop_scale_gray_output_shape(self): - # Crop a 160x120 region, scale to 80x60, gray: frames must be (60, 80). - frames = list( - iter_segment_frames( - ffmpeg_path="ffmpeg", - recording_path=self.path, - start_frame=0, - end_frame=90, - frame_step=30, - frame_width=80, - frame_height=60, - decode_args=[], - should_stop=lambda: False, - channels=1, - crop=(160, 120, 0, 0), - scale=(80, 60), - gray=True, - ) - ) - self.assertEqual([idx for idx, _ in frames], [0, 30, 60]) - for _, frame in frames: - self.assertEqual(frame.shape, (60, 80)) - self.assertEqual(frame.dtype, np.uint8) - - def test_keyframe_decode_yields_only_keyframes(self): - frames = list( - iter_keyframe_frames( - ffmpeg_path="ffmpeg", - recording_path=self.path, - out_width=320, - out_height=240, - channels=3, - decode_args=[], - crop=None, - scale=None, - gray=False, - should_stop=lambda: False, - ) - ) - # 90 frames @ -g 15 -> 6 keyframes (0,15,30,45,60,75). - self.assertEqual(len(frames), 6) - for f in frames: - self.assertEqual(f.shape, (240, 320, 3)) - - def test_software_fallback_on_bad_hwaccel(self): - # An invalid hwaccel device should fail fast and fall back to software, - # still producing all expected frames. - frames = list( - iter_segment_frames( - ffmpeg_path="ffmpeg", - recording_path=self.path, - start_frame=0, - end_frame=90, - frame_step=30, - frame_width=320, - frame_height=240, - decode_args=[ - "-hwaccel", - "vaapi", - "-hwaccel_device", - "/dev/dri/doesnotexist", - ], - should_stop=lambda: False, - ) - ) - self.assertEqual([idx for idx, _ in frames], [0, 30, 60]) class TestKeyframeEligibility(unittest.TestCase): @@ -310,29 +84,6 @@ class TestKeyframeEligibility(unittest.TestCase): self.assertEqual(KEYFRAME_MAX_GAP_SECONDS, 2.0) -class TestProbeKeyframePts(unittest.TestCase): - def test_parses_keyframe_packets(self): - sample = ( - '{"packets":[' - '{"pts_time":"0.000000","flags":"K__"},' - '{"pts_time":"0.033333","flags":"___"},' - '{"pts_time":"0.500000","flags":"K__"}]}' - ) - completed = mock.Mock(stdout=sample, returncode=0) - with mock.patch( - "frigate.jobs.motion_search_decode.sp.run", return_value=completed - ): - pts = probe_keyframe_pts("ffprobe", "/tmp/seg.mp4") - self.assertEqual(pts, [0.0, 0.5]) - - def test_returns_empty_on_probe_failure(self): - with mock.patch( - "frigate.jobs.motion_search_decode.sp.run", - side_effect=OSError("boom"), - ): - self.assertEqual(probe_keyframe_pts("ffprobe", "/tmp/seg.mp4"), []) - - class TestVodDecodeCommand(unittest.TestCase): URL = "http://127.0.0.1:5000/vod/cam/start/1/end/2/index.m3u8" diff --git a/frigate/test/test_motion_search_spatial.py b/frigate/test/test_motion_search_spatial.py index 88995114d6..b8044df4b0 100644 --- a/frigate/test/test_motion_search_spatial.py +++ b/frigate/test/test_motion_search_spatial.py @@ -1,9 +1,5 @@ """Tests for motion search spatial (crop/scale/mask) helpers.""" -import os -import shutil -import subprocess as sp -import tempfile import unittest import numpy as np @@ -13,12 +9,6 @@ from frigate.jobs.motion_search import ( compute_roi_crop_and_scale, detect_motion_scaled, ) -from frigate.jobs.motion_search_decode import ( - iter_keyframe_frames, - iter_segment_frames, - keyframe_sampling_eligible, - probe_keyframe_pts, -) class TestComputeRoiCropAndScale(unittest.TestCase): @@ -93,101 +83,5 @@ class TestDetectMotionScaled(unittest.TestCase): self.assertEqual(results, []) -def _ffmpeg_available(): - return shutil.which("ffmpeg") is not None - - -@unittest.skipUnless(_ffmpeg_available(), "ffmpeg not available") -class TestBalancedDecodePath(unittest.TestCase): - @classmethod - def setUpClass(cls): - cls.tmp = tempfile.mkdtemp() - cls.path = os.path.join(cls.tmp, "motion.mp4") - # testsrc has continuous motion across the whole frame. - sp.run( - [ - "ffmpeg", - "-hide_banner", - "-loglevel", - "error", - "-y", - "-f", - "lavfi", - "-i", - "testsrc=size=640x480:rate=30:duration=3", - "-c:v", - "libx264", - "-pix_fmt", - "yuv420p", - "-g", - "15", - cls.path, - ], - check=True, - ) - - @classmethod - def tearDownClass(cls): - shutil.rmtree(cls.tmp, ignore_errors=True) - - def test_balanced_decode_then_detect_finds_motion(self): - crop, scaled = compute_roi_crop_and_scale( - [[0.0, 0.0], [1.0, 0.0], [1.0, 1.0], [0.0, 1.0]], 640, 480, scale_target=160 - ) - mask = build_scaled_roi_mask( - [[0.0, 0.0], [1.0, 0.0], [1.0, 1.0], [0.0, 1.0]], 640, 480, crop, scaled - ) - frames = iter_segment_frames( - "ffmpeg", - self.path, - 0, - 90, - 30, - scaled[0], - scaled[1], - [], - lambda: False, - channels=1, - crop=crop, - scale=scaled, - gray=True, - ) - results = detect_motion_scaled( - list(frames), mask, threshold=20, min_area=1.0, timestamp_fn=float - ) - # testsrc moves every frame, so consecutive sampled frames differ. - self.assertGreater(len(results), 0) - - def test_keyframe_path_finds_motion(self): - crop, scaled = compute_roi_crop_and_scale( - [[0.0, 0.0], [1.0, 0.0], [1.0, 1.0], [0.0, 1.0]], 640, 480, scale_target=160 - ) - mask = build_scaled_roi_mask( - [[0.0, 0.0], [1.0, 0.0], [1.0, 1.0], [0.0, 1.0]], 640, 480, crop, scaled - ) - pts = probe_keyframe_pts("ffprobe", self.path) - self.assertTrue(keyframe_sampling_eligible(pts)) - frames = list( - iter_keyframe_frames( - "ffmpeg", - self.path, - scaled[0], - scaled[1], - 1, - [], - crop, - scaled, - True, - lambda: False, - ) - ) - self.assertEqual(len(frames), len(pts)) # count matches probe - indexed = list(enumerate(frames)) - results = detect_motion_scaled( - indexed, mask, threshold=20, min_area=1.0, timestamp_fn=lambda i: pts[i] - ) - self.assertGreater(len(results), 0) - - if __name__ == "__main__": unittest.main()