This commit is contained in:
ryzendigo 2026-06-08 02:24:10 +02:00 committed by GitHub
commit 2cff1905a9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 100 additions and 1 deletions

View File

@ -1,3 +1,4 @@
import io
import unittest
import cv2
@ -13,6 +14,7 @@ from frigate.util.object import (
get_region_from_grid,
reduce_detections,
)
from frigate.video.ffmpeg import _read_frame_into
def draw_box(frame, box, color=(255, 0, 0), thickness=2):
@ -352,3 +354,57 @@ class TestRegionGrid(unittest.TestCase):
region = get_region_from_grid(frame_shape, box, 320, region_grid)
assert region[2] - region[0] > 320
class _ChunkedPipe(io.RawIOBase):
"""Stub stdout that returns the queued chunks one readinto at a time."""
def __init__(self, chunks):
self._chunks = [bytes(c) for c in chunks]
def readable(self) -> bool:
return True
def readinto(self, b) -> int:
if not self._chunks:
return 0
chunk = self._chunks.pop(0)
n = min(len(b), len(chunk))
b[:n] = chunk[:n]
if n < len(chunk):
self._chunks.insert(0, chunk[n:])
return n
class TestReadFrameInto(unittest.TestCase):
"""Cover the partial-read path of the capture loop helper."""
def test_full_frame_in_one_call(self):
buf = bytearray(8)
n = _read_frame_into(_ChunkedPipe([b"\x01\x02\x03\x04\x05\x06\x07\x08"]), buf)
assert n == 8
assert bytes(buf) == b"\x01\x02\x03\x04\x05\x06\x07\x08"
def test_short_reads_are_reassembled(self):
buf = bytearray(8)
pipe = _ChunkedPipe([b"\x01\x02\x03", b"\x04\x05", b"\x06\x07\x08"])
n = _read_frame_into(pipe, buf)
assert n == 8
assert bytes(buf) == b"\x01\x02\x03\x04\x05\x06\x07\x08"
def test_eof_before_full_buffer_returns_partial_count(self):
buf = bytearray(8)
n = _read_frame_into(_ChunkedPipe([b"\x01\x02\x03"]), buf)
assert n == 3
def test_immediate_eof_returns_zero(self):
buf = bytearray(8)
n = _read_frame_into(_ChunkedPipe([]), buf)
assert n == 0
def test_writes_into_existing_memoryview(self):
backing = bytearray(8)
view = memoryview(backing)
n = _read_frame_into(_ChunkedPipe([b"abcdefgh"]), view)
assert n == 8
assert bytes(backing) == b"abcdefgh"

View File

@ -52,6 +52,32 @@ def _get_record_segment_time(config: CameraConfig) -> int:
return DEFAULT_RECORD_SEGMENT_TIME
def _read_frame_into(stream, buffer) -> int:
"""Read len(buffer) bytes from stream directly into buffer.
Returns the number of bytes actually read. A return value less than
len(buffer) indicates the stream reached EOF (or the pipe was closed)
before the buffer could be filled the contents of buffer past the
returned offset must be treated as undefined.
BufferedReader.read(n) is documented to return "up to" n bytes, and a
subprocess pipe will hand back a short read if the producer closes
mid-frame even on a blocking pipe. Reading directly into the
pre-allocated frame buffer also avoids the extra Python bytes
allocation and copy that ``buffer[:] = stream.read(n)`` performs on
every frame.
"""
target = len(buffer)
view = memoryview(buffer)
pos = 0
while pos < target:
n = stream.readinto(view[pos:])
if not n:
return pos
pos += n
return pos
def capture_frames(
ffmpeg_process: sp.Popen[Any],
config: CameraConfig,
@ -92,7 +118,7 @@ def capture_frames(
frame_name = f"{config.name}_frame{frame_index}"
frame_buffer = frame_manager.write(frame_name)
try:
frame_buffer[:] = ffmpeg_process.stdout.read(frame_size)
bytes_read = _read_frame_into(ffmpeg_process.stdout, frame_buffer)
except Exception:
# shutdown has been initiated
if stop_event.is_set():
@ -110,6 +136,23 @@ def capture_frames(
continue
if bytes_read < frame_size:
# ffmpeg's stdout pipe was closed (or returned EOF) before
# the full frame could be received. The buffer holds a
# partial frame, so don't pass it to detection: a
# half-frame in shared memory will produce garbage
# detections, and continuing to read would resume in the
# middle of the next frame, putting capture out of sync
# with frame boundaries until the next watchdog restart.
if stop_event.is_set():
break
logger.error(
f"{config.name}: ffmpeg returned an incomplete frame "
f"({bytes_read}/{frame_size} bytes); exiting capture thread..."
)
break
frame_rate.update()
# don't lock the queue to check, just try since it should rarely be full