diff --git a/frigate/app.py b/frigate/app.py index 9dc1244cf..37200ac3d 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -5,6 +5,9 @@ import os import shutil import signal import sys + +import faster_fifo as ff + from faster_fifo import Queue from typing import Optional from types import FrameType @@ -30,6 +33,7 @@ from frigate.plus import PlusApi from frigate.record import RecordingCleanup, RecordingMaintainer from frigate.stats import StatsEmitter, stats_init from frigate.storage import StorageMaintainer +from frigate.util import LimitedQueue as LQueue from frigate.version import VERSION from frigate.video import capture_camera, track_camera from frigate.watchdog import FrigateWatchdog @@ -41,11 +45,11 @@ logger = logging.getLogger(__name__) class FrigateApp: def __init__(self) -> None: self.stop_event: MpEvent = mp.Event() - self.detection_queue: Queue = mp.Queue() + self.detection_queue: Queue = ff.Queue() self.detectors: dict[str, ObjectDetectProcess] = {} self.detection_out_events: dict[str, MpEvent] = {} self.detection_shms: list[mp.shared_memory.SharedMemory] = [] - self.log_queue: Queue = mp.Queue() + self.log_queue: Queue = ff.Queue() self.plus_api = PlusApi() self.camera_metrics: dict[str, CameraMetricsTypes] = {} @@ -103,7 +107,7 @@ class FrigateApp: "detection_frame": mp.Value("d", 0.0), "read_start": mp.Value("d", 0.0), "ffmpeg_pid": mp.Value("i", 0), - "frame_queue": mp.Queue(maxsize=2), + "frame_queue": LQueue(maxsize=2), "capture_process": None, "process": None, } @@ -121,19 +125,19 @@ class FrigateApp: def init_queues(self) -> None: # Queues for clip processing - self.event_queue: Queue = mp.Queue() - self.event_processed_queue: Queue = mp.Queue() - self.video_output_queue: Queue = mp.Queue( + self.event_queue: Queue = ff.Queue() + self.event_processed_queue: Queue = ff.Queue() + self.video_output_queue: Queue = LQueue( maxsize=len(self.config.cameras.keys()) * 2 ) # Queue for cameras to push tracked objects to - self.detected_frames_queue: Queue = mp.Queue( + self.detected_frames_queue: Queue = LQueue( maxsize=len(self.config.cameras.keys()) * 2 ) # Queue for recordings info - self.recordings_info_queue: Queue = mp.Queue() + self.recordings_info_queue: Queue = ff.Queue() def init_database(self) -> None: # Migrate DB location diff --git a/frigate/object_detection.py b/frigate/object_detection.py index 129fd6b26..32e8582a2 100644 --- a/frigate/object_detection.py +++ b/frigate/object_detection.py @@ -7,6 +7,7 @@ import signal import threading from abc import ABC, abstractmethod +import faster_fifo as ff import numpy as np from setproctitle import setproctitle @@ -73,7 +74,7 @@ class LocalObjectDetector(ObjectDetector): def run_detector( name: str, - detection_queue: mp.Queue, + detection_queue: ff.Queue, out_events: dict[str, mp.Event], avg_speed, start, diff --git a/frigate/util.py b/frigate/util.py index a6fe4b294..b03489479 100755 --- a/frigate/util.py +++ b/frigate/util.py @@ -1,4 +1,5 @@ import copy +import ctypes import datetime import logging import shlex @@ -14,6 +15,7 @@ from abc import ABC, abstractmethod from collections import Counter from collections.abc import Mapping from multiprocessing import shared_memory +from queue import Empty, Full from typing import Any, AnyStr, Optional, Tuple import cv2 @@ -21,6 +23,8 @@ import numpy as np import os import psutil import pytz +from faster_fifo import DEFAULT_CIRCULAR_BUFFER_SIZE, DEFAULT_TIMEOUT +from faster_fifo import Queue as FFQueue from frigate.const import REGEX_HTTP_CAMERA_USER_PASS, REGEX_RTSP_CAMERA_USER_PASS @@ -772,7 +776,6 @@ def get_docker_memlimit_bytes() -> int: # check running a supported cgroups version if get_cgroups_version() == "cgroup2": - memlimit_command = ["cat", "/sys/fs/cgroup/memory.max"] p = sp.run( @@ -817,7 +820,6 @@ def get_cpu_stats() -> dict[str, dict]: for line in lines: stats = list(filter(lambda a: a != "", line.strip().split(" "))) try: - if docker_memlimit > 0: mem_res = int(stats[5]) mem_pct = str( @@ -1067,3 +1069,47 @@ def get_tz_modifiers(tz_name: str) -> Tuple[str, str]: hour_modifier = f"{hours_offset} hour" minute_modifier = f"{minutes_offset} minute" return hour_modifier, minute_modifier + + +class LimitedQueue(FFQueue): + def __init__( + self, + maxsize=0, + max_size_bytes=DEFAULT_CIRCULAR_BUFFER_SIZE, + loads=None, + dumps=None, + ): + super().__init__(max_size_bytes=max_size_bytes, loads=loads, dumps=dumps) + self.maxsize = maxsize + self.size = multiprocessing.RawValue( + ctypes.c_int, 0 + ) # Add a counter for the number of items in the queue + + def put(self, x, block=True, timeout=DEFAULT_TIMEOUT): + if self.maxsize > 0 and self.size.value >= self.maxsize: + if block: + start_time = time.time() + while self.size.value >= self.maxsize: + remaining = timeout - (time.time() - start_time) + if remaining <= 0.0: + raise Full + time.sleep(min(remaining, 0.1)) + else: + raise Full + self.size.value += 1 + return super().put(x, block=block, timeout=timeout) + + def get(self, block=True, timeout=DEFAULT_TIMEOUT): + if self.size.value <= 0 and not block: + raise Empty + self.size.value -= 1 + return super().get(block=block, timeout=timeout) + + def qsize(self): + return self.size + + def empty(self): + return self.qsize() == 0 + + def full(self): + return self.qsize() == self.maxsize diff --git a/frigate/video.py b/frigate/video.py index ca200fd53..f0dfc94a3 100755 --- a/frigate/video.py +++ b/frigate/video.py @@ -10,6 +10,7 @@ import threading import time from collections import defaultdict +import faster_fifo as ff import numpy as np import cv2 from setproctitle import setproctitle @@ -162,7 +163,6 @@ def capture_frames( current_frame: mp.Value, stop_event: mp.Event, ): - frame_size = frame_shape[0] * frame_shape[1] frame_rate = EventsPerSecond() frame_rate.start() @@ -577,7 +577,7 @@ def detect( def process_frames( camera_name: str, - frame_queue: mp.Queue, + frame_queue: ff.Queue, frame_shape, model_config, detect_config: DetectConfig, @@ -585,7 +585,7 @@ def process_frames( motion_detector: MotionDetector, object_detector: RemoteObjectDetector, object_tracker: ObjectTracker, - detected_objects_queue: mp.Queue, + detected_objects_queue: ff.Queue, process_info: dict, objects_to_track: list[str], object_filters, @@ -594,7 +594,6 @@ def process_frames( stop_event, exit_on_empty: bool = False, ): - fps = process_info["process_fps"] detection_fps = process_info["detection_fps"] current_frame_time = process_info["detection_frame"] @@ -748,7 +747,6 @@ def process_frames( selected_objects = [] for group in detected_object_groups.values(): - # apply non-maxima suppression to suppress weak, overlapping bounding boxes # o[2] is the box of the object: xmin, ymin, xmax, ymax # apply max/min to ensure values do not exceed the known frame size