From 6b0a68d0dd40e24671b8ab67a67aec9b29844866 Mon Sep 17 00:00:00 2001 From: Sergey Krashevich Date: Wed, 28 Jun 2023 05:50:18 +0300 Subject: [PATCH] Refactored queues to use faster_fifo instead of mp.Queue Refactored LimitedQueue to include a counter for the number of items in the queue and updated put and get methods to use the counter Refactor app.py and util.py to use a custom Queue implementation called LQueue instead of the existing Queue Refactor put and get methods in LimitedQueue to handle queue size and blocking behavior more efficiently code format remove code from other branch (merging fuckup) --- frigate/app.py | 20 +++++++++------ frigate/object_detection.py | 3 ++- frigate/util.py | 50 +++++++++++++++++++++++++++++++++++-- frigate/video.py | 8 +++--- 4 files changed, 65 insertions(+), 16 deletions(-) diff --git a/frigate/app.py b/frigate/app.py index 9dc1244cf..37200ac3d 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -5,6 +5,9 @@ import os import shutil import signal import sys + +import faster_fifo as ff + from faster_fifo import Queue from typing import Optional from types import FrameType @@ -30,6 +33,7 @@ from frigate.plus import PlusApi from frigate.record import RecordingCleanup, RecordingMaintainer from frigate.stats import StatsEmitter, stats_init from frigate.storage import StorageMaintainer +from frigate.util import LimitedQueue as LQueue from frigate.version import VERSION from frigate.video import capture_camera, track_camera from frigate.watchdog import FrigateWatchdog @@ -41,11 +45,11 @@ logger = logging.getLogger(__name__) class FrigateApp: def __init__(self) -> None: self.stop_event: MpEvent = mp.Event() - self.detection_queue: Queue = mp.Queue() + self.detection_queue: Queue = ff.Queue() self.detectors: dict[str, ObjectDetectProcess] = {} self.detection_out_events: dict[str, MpEvent] = {} self.detection_shms: list[mp.shared_memory.SharedMemory] = [] - self.log_queue: Queue = mp.Queue() + self.log_queue: Queue = ff.Queue() self.plus_api = PlusApi() self.camera_metrics: dict[str, CameraMetricsTypes] = {} @@ -103,7 +107,7 @@ class FrigateApp: "detection_frame": mp.Value("d", 0.0), "read_start": mp.Value("d", 0.0), "ffmpeg_pid": mp.Value("i", 0), - "frame_queue": mp.Queue(maxsize=2), + "frame_queue": LQueue(maxsize=2), "capture_process": None, "process": None, } @@ -121,19 +125,19 @@ class FrigateApp: def init_queues(self) -> None: # Queues for clip processing - self.event_queue: Queue = mp.Queue() - self.event_processed_queue: Queue = mp.Queue() - self.video_output_queue: Queue = mp.Queue( + self.event_queue: Queue = ff.Queue() + self.event_processed_queue: Queue = ff.Queue() + self.video_output_queue: Queue = LQueue( maxsize=len(self.config.cameras.keys()) * 2 ) # Queue for cameras to push tracked objects to - self.detected_frames_queue: Queue = mp.Queue( + self.detected_frames_queue: Queue = LQueue( maxsize=len(self.config.cameras.keys()) * 2 ) # Queue for recordings info - self.recordings_info_queue: Queue = mp.Queue() + self.recordings_info_queue: Queue = ff.Queue() def init_database(self) -> None: # Migrate DB location diff --git a/frigate/object_detection.py b/frigate/object_detection.py index 129fd6b26..32e8582a2 100644 --- a/frigate/object_detection.py +++ b/frigate/object_detection.py @@ -7,6 +7,7 @@ import signal import threading from abc import ABC, abstractmethod +import faster_fifo as ff import numpy as np from setproctitle import setproctitle @@ -73,7 +74,7 @@ class LocalObjectDetector(ObjectDetector): def run_detector( name: str, - detection_queue: mp.Queue, + detection_queue: ff.Queue, out_events: dict[str, mp.Event], avg_speed, start, diff --git a/frigate/util.py b/frigate/util.py index a6fe4b294..b03489479 100755 --- a/frigate/util.py +++ b/frigate/util.py @@ -1,4 +1,5 @@ import copy +import ctypes import datetime import logging import shlex @@ -14,6 +15,7 @@ from abc import ABC, abstractmethod from collections import Counter from collections.abc import Mapping from multiprocessing import shared_memory +from queue import Empty, Full from typing import Any, AnyStr, Optional, Tuple import cv2 @@ -21,6 +23,8 @@ import numpy as np import os import psutil import pytz +from faster_fifo import DEFAULT_CIRCULAR_BUFFER_SIZE, DEFAULT_TIMEOUT +from faster_fifo import Queue as FFQueue from frigate.const import REGEX_HTTP_CAMERA_USER_PASS, REGEX_RTSP_CAMERA_USER_PASS @@ -772,7 +776,6 @@ def get_docker_memlimit_bytes() -> int: # check running a supported cgroups version if get_cgroups_version() == "cgroup2": - memlimit_command = ["cat", "/sys/fs/cgroup/memory.max"] p = sp.run( @@ -817,7 +820,6 @@ def get_cpu_stats() -> dict[str, dict]: for line in lines: stats = list(filter(lambda a: a != "", line.strip().split(" "))) try: - if docker_memlimit > 0: mem_res = int(stats[5]) mem_pct = str( @@ -1067,3 +1069,47 @@ def get_tz_modifiers(tz_name: str) -> Tuple[str, str]: hour_modifier = f"{hours_offset} hour" minute_modifier = f"{minutes_offset} minute" return hour_modifier, minute_modifier + + +class LimitedQueue(FFQueue): + def __init__( + self, + maxsize=0, + max_size_bytes=DEFAULT_CIRCULAR_BUFFER_SIZE, + loads=None, + dumps=None, + ): + super().__init__(max_size_bytes=max_size_bytes, loads=loads, dumps=dumps) + self.maxsize = maxsize + self.size = multiprocessing.RawValue( + ctypes.c_int, 0 + ) # Add a counter for the number of items in the queue + + def put(self, x, block=True, timeout=DEFAULT_TIMEOUT): + if self.maxsize > 0 and self.size.value >= self.maxsize: + if block: + start_time = time.time() + while self.size.value >= self.maxsize: + remaining = timeout - (time.time() - start_time) + if remaining <= 0.0: + raise Full + time.sleep(min(remaining, 0.1)) + else: + raise Full + self.size.value += 1 + return super().put(x, block=block, timeout=timeout) + + def get(self, block=True, timeout=DEFAULT_TIMEOUT): + if self.size.value <= 0 and not block: + raise Empty + self.size.value -= 1 + return super().get(block=block, timeout=timeout) + + def qsize(self): + return self.size + + def empty(self): + return self.qsize() == 0 + + def full(self): + return self.qsize() == self.maxsize diff --git a/frigate/video.py b/frigate/video.py index ca200fd53..f0dfc94a3 100755 --- a/frigate/video.py +++ b/frigate/video.py @@ -10,6 +10,7 @@ import threading import time from collections import defaultdict +import faster_fifo as ff import numpy as np import cv2 from setproctitle import setproctitle @@ -162,7 +163,6 @@ def capture_frames( current_frame: mp.Value, stop_event: mp.Event, ): - frame_size = frame_shape[0] * frame_shape[1] frame_rate = EventsPerSecond() frame_rate.start() @@ -577,7 +577,7 @@ def detect( def process_frames( camera_name: str, - frame_queue: mp.Queue, + frame_queue: ff.Queue, frame_shape, model_config, detect_config: DetectConfig, @@ -585,7 +585,7 @@ def process_frames( motion_detector: MotionDetector, object_detector: RemoteObjectDetector, object_tracker: ObjectTracker, - detected_objects_queue: mp.Queue, + detected_objects_queue: ff.Queue, process_info: dict, objects_to_track: list[str], object_filters, @@ -594,7 +594,6 @@ def process_frames( stop_event, exit_on_empty: bool = False, ): - fps = process_info["process_fps"] detection_fps = process_info["detection_fps"] current_frame_time = process_info["detection_frame"] @@ -748,7 +747,6 @@ def process_frames( selected_objects = [] for group in detected_object_groups.values(): - # apply non-maxima suppression to suppress weak, overlapping bounding boxes # o[2] is the box of the object: xmin, ymin, xmax, ymax # apply max/min to ensure values do not exceed the known frame size