From 952dc1c3efb155ca93ad7299b569a0ac7745cb94 Mon Sep 17 00:00:00 2001 From: ryzendigo Date: Sat, 9 May 2026 22:13:37 +0800 Subject: [PATCH] fix: handle short reads from ffmpeg stdout in capture loop MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit io.BufferedReader.read(n) is documented to return "up to" n bytes, and a subprocess pipe will return a short read whenever the producer closes the pipe mid-frame — even on a blocking pipe. The existing assignment frame_buffer[:] = ffmpeg_process.stdout.read(frame_size) then raises ValueError on the slice assignment, and the surrounding ``except`` only breaks out of the capture loop when ``poll()`` reports the process has already exited. If ffmpeg is still alive (e.g. an RTSP source that dropped a chunk and recovered), the loop hits ``continue`` and the next read picks up data from the middle of the next frame, putting capture out of sync with frame boundaries until the watchdog later forces a restart. Read into the pre-allocated buffer in a loop and treat any short read as the end of the stream, so the watchdog can respawn ffmpeg from a clean state rather than continuing at the wrong offset. As a side effect, ``readinto`` writes directly into the shared-memory frame buffer, avoiding the per-frame Python ``bytes`` allocation and copy that the previous slice assignment performed. --- frigate/test/test_video.py | 56 ++++++++++++++++++++++++++++++++++++++ frigate/video/ffmpeg.py | 45 +++++++++++++++++++++++++++++- 2 files changed, 100 insertions(+), 1 deletion(-) diff --git a/frigate/test/test_video.py b/frigate/test/test_video.py index 8612990e2e..9662028fac 100644 --- a/frigate/test/test_video.py +++ b/frigate/test/test_video.py @@ -1,3 +1,4 @@ +import io import unittest import cv2 @@ -13,6 +14,7 @@ from frigate.util.object import ( get_region_from_grid, reduce_detections, ) +from frigate.video.ffmpeg import _read_frame_into def draw_box(frame, box, color=(255, 0, 0), thickness=2): @@ -352,3 +354,57 @@ class TestRegionGrid(unittest.TestCase): region = get_region_from_grid(frame_shape, box, 320, region_grid) assert region[2] - region[0] > 320 + + +class _ChunkedPipe(io.RawIOBase): + """Stub stdout that returns the queued chunks one readinto at a time.""" + + def __init__(self, chunks): + self._chunks = [bytes(c) for c in chunks] + + def readable(self) -> bool: + return True + + def readinto(self, b) -> int: + if not self._chunks: + return 0 + chunk = self._chunks.pop(0) + n = min(len(b), len(chunk)) + b[:n] = chunk[:n] + if n < len(chunk): + self._chunks.insert(0, chunk[n:]) + return n + + +class TestReadFrameInto(unittest.TestCase): + """Cover the partial-read path of the capture loop helper.""" + + def test_full_frame_in_one_call(self): + buf = bytearray(8) + n = _read_frame_into(_ChunkedPipe([b"\x01\x02\x03\x04\x05\x06\x07\x08"]), buf) + assert n == 8 + assert bytes(buf) == b"\x01\x02\x03\x04\x05\x06\x07\x08" + + def test_short_reads_are_reassembled(self): + buf = bytearray(8) + pipe = _ChunkedPipe([b"\x01\x02\x03", b"\x04\x05", b"\x06\x07\x08"]) + n = _read_frame_into(pipe, buf) + assert n == 8 + assert bytes(buf) == b"\x01\x02\x03\x04\x05\x06\x07\x08" + + def test_eof_before_full_buffer_returns_partial_count(self): + buf = bytearray(8) + n = _read_frame_into(_ChunkedPipe([b"\x01\x02\x03"]), buf) + assert n == 3 + + def test_immediate_eof_returns_zero(self): + buf = bytearray(8) + n = _read_frame_into(_ChunkedPipe([]), buf) + assert n == 0 + + def test_writes_into_existing_memoryview(self): + backing = bytearray(8) + view = memoryview(backing) + n = _read_frame_into(_ChunkedPipe([b"abcdefgh"]), view) + assert n == 8 + assert bytes(backing) == b"abcdefgh" diff --git a/frigate/video/ffmpeg.py b/frigate/video/ffmpeg.py index e77c03b5e5..57583a3b00 100644 --- a/frigate/video/ffmpeg.py +++ b/frigate/video/ffmpeg.py @@ -52,6 +52,32 @@ def _get_record_segment_time(config: CameraConfig) -> int: return DEFAULT_RECORD_SEGMENT_TIME +def _read_frame_into(stream, buffer) -> int: + """Read len(buffer) bytes from stream directly into buffer. + + Returns the number of bytes actually read. A return value less than + len(buffer) indicates the stream reached EOF (or the pipe was closed) + before the buffer could be filled — the contents of buffer past the + returned offset must be treated as undefined. + + BufferedReader.read(n) is documented to return "up to" n bytes, and a + subprocess pipe will hand back a short read if the producer closes + mid-frame even on a blocking pipe. Reading directly into the + pre-allocated frame buffer also avoids the extra Python bytes + allocation and copy that ``buffer[:] = stream.read(n)`` performs on + every frame. + """ + target = len(buffer) + view = memoryview(buffer) + pos = 0 + while pos < target: + n = stream.readinto(view[pos:]) + if not n: + return pos + pos += n + return pos + + def capture_frames( ffmpeg_process: sp.Popen[Any], config: CameraConfig, @@ -92,7 +118,7 @@ def capture_frames( frame_name = f"{config.name}_frame{frame_index}" frame_buffer = frame_manager.write(frame_name) try: - frame_buffer[:] = ffmpeg_process.stdout.read(frame_size) + bytes_read = _read_frame_into(ffmpeg_process.stdout, frame_buffer) except Exception: # shutdown has been initiated if stop_event.is_set(): @@ -110,6 +136,23 @@ def capture_frames( continue + if bytes_read < frame_size: + # ffmpeg's stdout pipe was closed (or returned EOF) before + # the full frame could be received. The buffer holds a + # partial frame, so don't pass it to detection: a + # half-frame in shared memory will produce garbage + # detections, and continuing to read would resume in the + # middle of the next frame, putting capture out of sync + # with frame boundaries until the next watchdog restart. + if stop_event.is_set(): + break + + logger.error( + f"{config.name}: ffmpeg returned an incomplete frame " + f"({bytes_read}/{frame_size} bytes); exiting capture thread..." + ) + break + frame_rate.update() # don't lock the queue to check, just try since it should rarely be full