fix: handle short reads from ffmpeg stdout in capture loop

io.BufferedReader.read(n) is documented to return "up to" n bytes, and
a subprocess pipe will return a short read whenever the producer closes
the pipe mid-frame — even on a blocking pipe. The existing assignment

    frame_buffer[:] = ffmpeg_process.stdout.read(frame_size)

then raises ValueError on the slice assignment, and the surrounding
``except`` only breaks out of the capture loop when ``poll()`` reports
the process has already exited. If ffmpeg is still alive (e.g. an RTSP
source that dropped a chunk and recovered), the loop hits ``continue``
and the next read picks up data from the middle of the next frame,
putting capture out of sync with frame boundaries until the watchdog
later forces a restart.

Read into the pre-allocated buffer in a loop and treat any short read
as the end of the stream, so the watchdog can respawn ffmpeg from a
clean state rather than continuing at the wrong offset. As a side
effect, ``readinto`` writes directly into the shared-memory frame
buffer, avoiding the per-frame Python ``bytes`` allocation and copy
that the previous slice assignment performed.
This commit is contained in:
ryzendigo 2026-05-09 22:13:37 +08:00
parent 4ff7ab96dc
commit 952dc1c3ef
2 changed files with 100 additions and 1 deletions

View File

@ -1,3 +1,4 @@
import io
import unittest
import cv2
@ -13,6 +14,7 @@ from frigate.util.object import (
get_region_from_grid,
reduce_detections,
)
from frigate.video.ffmpeg import _read_frame_into
def draw_box(frame, box, color=(255, 0, 0), thickness=2):
@ -352,3 +354,57 @@ class TestRegionGrid(unittest.TestCase):
region = get_region_from_grid(frame_shape, box, 320, region_grid)
assert region[2] - region[0] > 320
class _ChunkedPipe(io.RawIOBase):
"""Stub stdout that returns the queued chunks one readinto at a time."""
def __init__(self, chunks):
self._chunks = [bytes(c) for c in chunks]
def readable(self) -> bool:
return True
def readinto(self, b) -> int:
if not self._chunks:
return 0
chunk = self._chunks.pop(0)
n = min(len(b), len(chunk))
b[:n] = chunk[:n]
if n < len(chunk):
self._chunks.insert(0, chunk[n:])
return n
class TestReadFrameInto(unittest.TestCase):
"""Cover the partial-read path of the capture loop helper."""
def test_full_frame_in_one_call(self):
buf = bytearray(8)
n = _read_frame_into(_ChunkedPipe([b"\x01\x02\x03\x04\x05\x06\x07\x08"]), buf)
assert n == 8
assert bytes(buf) == b"\x01\x02\x03\x04\x05\x06\x07\x08"
def test_short_reads_are_reassembled(self):
buf = bytearray(8)
pipe = _ChunkedPipe([b"\x01\x02\x03", b"\x04\x05", b"\x06\x07\x08"])
n = _read_frame_into(pipe, buf)
assert n == 8
assert bytes(buf) == b"\x01\x02\x03\x04\x05\x06\x07\x08"
def test_eof_before_full_buffer_returns_partial_count(self):
buf = bytearray(8)
n = _read_frame_into(_ChunkedPipe([b"\x01\x02\x03"]), buf)
assert n == 3
def test_immediate_eof_returns_zero(self):
buf = bytearray(8)
n = _read_frame_into(_ChunkedPipe([]), buf)
assert n == 0
def test_writes_into_existing_memoryview(self):
backing = bytearray(8)
view = memoryview(backing)
n = _read_frame_into(_ChunkedPipe([b"abcdefgh"]), view)
assert n == 8
assert bytes(backing) == b"abcdefgh"

View File

@ -52,6 +52,32 @@ def _get_record_segment_time(config: CameraConfig) -> int:
return DEFAULT_RECORD_SEGMENT_TIME
def _read_frame_into(stream, buffer) -> int:
"""Read len(buffer) bytes from stream directly into buffer.
Returns the number of bytes actually read. A return value less than
len(buffer) indicates the stream reached EOF (or the pipe was closed)
before the buffer could be filled the contents of buffer past the
returned offset must be treated as undefined.
BufferedReader.read(n) is documented to return "up to" n bytes, and a
subprocess pipe will hand back a short read if the producer closes
mid-frame even on a blocking pipe. Reading directly into the
pre-allocated frame buffer also avoids the extra Python bytes
allocation and copy that ``buffer[:] = stream.read(n)`` performs on
every frame.
"""
target = len(buffer)
view = memoryview(buffer)
pos = 0
while pos < target:
n = stream.readinto(view[pos:])
if not n:
return pos
pos += n
return pos
def capture_frames(
ffmpeg_process: sp.Popen[Any],
config: CameraConfig,
@ -92,7 +118,7 @@ def capture_frames(
frame_name = f"{config.name}_frame{frame_index}"
frame_buffer = frame_manager.write(frame_name)
try:
frame_buffer[:] = ffmpeg_process.stdout.read(frame_size)
bytes_read = _read_frame_into(ffmpeg_process.stdout, frame_buffer)
except Exception:
# shutdown has been initiated
if stop_event.is_set():
@ -110,6 +136,23 @@ def capture_frames(
continue
if bytes_read < frame_size:
# ffmpeg's stdout pipe was closed (or returned EOF) before
# the full frame could be received. The buffer holds a
# partial frame, so don't pass it to detection: a
# half-frame in shared memory will produce garbage
# detections, and continuing to read would resume in the
# middle of the next frame, putting capture out of sync
# with frame boundaries until the next watchdog restart.
if stop_event.is_set():
break
logger.error(
f"{config.name}: ffmpeg returned an incomplete frame "
f"({bytes_read}/{frame_size} bytes); exiting capture thread..."
)
break
frame_rate.update()
# don't lock the queue to check, just try since it should rarely be full