From 22ce05be7e83fdee04e1ba6ffe47d864083cdc7b Mon Sep 17 00:00:00 2001 From: Sergey Krashevich Date: Wed, 28 Jun 2023 05:50:18 +0300 Subject: [PATCH] Refactored queues to use faster_fifo instead of mp.Queue --- frigate/app.py | 20 ++++++++------- frigate/object_detection.py | 3 ++- frigate/record/maintainer.py | 3 ++- frigate/record/record.py | 3 ++- frigate/util.py | 48 +++++++++++++++++++++++++++++++++++- frigate/video.py | 5 ++-- 6 files changed, 67 insertions(+), 15 deletions(-) diff --git a/frigate/app.py b/frigate/app.py index ccfbd4696..a57cd3bb2 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -10,8 +10,10 @@ from multiprocessing.synchronize import Event as MpEvent from types import FrameType from typing import Optional +import faster_fifo as ff import psutil from faster_fifo import Queue +from frigate.util import LimitedQueue as LQueue from peewee_migrate import Router from playhouse.sqlite_ext import SqliteExtDatabase from playhouse.sqliteq import SqliteQueueDatabase @@ -56,11 +58,11 @@ logger = logging.getLogger(__name__) class FrigateApp: def __init__(self) -> None: self.stop_event: MpEvent = mp.Event() - self.detection_queue: Queue = mp.Queue() + self.detection_queue: Queue = ff.Queue() self.detectors: dict[str, ObjectDetectProcess] = {} self.detection_out_events: dict[str, MpEvent] = {} self.detection_shms: list[mp.shared_memory.SharedMemory] = [] - self.log_queue: Queue = mp.Queue() + self.log_queue: Queue = ff.Queue() self.plus_api = PlusApi() self.camera_metrics: dict[str, CameraMetricsTypes] = {} self.feature_metrics: dict[str, FeatureMetricsTypes] = {} @@ -156,7 +158,7 @@ class FrigateApp: "ffmpeg_pid": mp.Value("i", 0), # type: ignore[typeddict-item] # issue https://github.com/python/typeshed/issues/8799 # from mypy 0.981 onwards - "frame_queue": mp.Queue(maxsize=2), + "frame_queue": LQueue(maxsize=2), "capture_process": None, "process": None, } @@ -188,22 +190,22 @@ class FrigateApp: def init_queues(self) -> None: # Queues for clip processing - self.event_queue: Queue = mp.Queue() - self.event_processed_queue: Queue = mp.Queue() - self.video_output_queue: Queue = mp.Queue( + self.event_queue: Queue = ff.Queue() + self.event_processed_queue: Queue = ff.Queue() + self.video_output_queue: Queue = ff.Queue( maxsize=len(self.config.cameras.keys()) * 2 ) # Queue for cameras to push tracked objects to - self.detected_frames_queue: Queue = mp.Queue( + self.detected_frames_queue: Queue = ff.Queue( maxsize=len(self.config.cameras.keys()) * 2 ) # Queue for recordings info - self.recordings_info_queue: Queue = mp.Queue() + self.recordings_info_queue: Queue = ff.Queue() # Queue for timeline events - self.timeline_queue: Queue = mp.Queue() + self.timeline_queue: Queue = ff.Queue() def init_database(self) -> None: def vacuum_db(db: SqliteExtDatabase) -> None: diff --git a/frigate/object_detection.py b/frigate/object_detection.py index 0a2a7059c..cebd7ff41 100644 --- a/frigate/object_detection.py +++ b/frigate/object_detection.py @@ -7,6 +7,7 @@ import signal import threading from abc import ABC, abstractmethod +import faster_fifo as ff import numpy as np from setproctitle import setproctitle @@ -72,7 +73,7 @@ class LocalObjectDetector(ObjectDetector): def run_detector( name: str, - detection_queue: mp.Queue, + detection_queue: ff.Queue, out_events: dict[str, mp.Event], avg_speed, start, diff --git a/frigate/record/maintainer.py b/frigate/record/maintainer.py index 8e40fc6e7..ef703c422 100644 --- a/frigate/record/maintainer.py +++ b/frigate/record/maintainer.py @@ -15,6 +15,7 @@ from multiprocessing.synchronize import Event as MpEvent from pathlib import Path from typing import Any, Tuple +import faster_fifo as ff import psutil from frigate.config import FrigateConfig, RetainModeEnum @@ -30,7 +31,7 @@ class RecordingMaintainer(threading.Thread): def __init__( self, config: FrigateConfig, - recordings_info_queue: mp.Queue, + recordings_info_queue: ff.Queue, process_info: dict[str, FeatureMetricsTypes], stop_event: MpEvent, ): diff --git a/frigate/record/record.py b/frigate/record/record.py index 530adc031..0d22342aa 100644 --- a/frigate/record/record.py +++ b/frigate/record/record.py @@ -7,6 +7,7 @@ import threading from types import FrameType from typing import Optional +import faster_fifo as ff from playhouse.sqliteq import SqliteQueueDatabase from setproctitle import setproctitle @@ -22,7 +23,7 @@ logger = logging.getLogger(__name__) def manage_recordings( config: FrigateConfig, - recordings_info_queue: mp.Queue, + recordings_info_queue: ff.Queue, process_info: dict[str, FeatureMetricsTypes], ) -> None: stop_event = mp.Event() diff --git a/frigate/util.py b/frigate/util.py index f535a9572..d5259556f 100755 --- a/frigate/util.py +++ b/frigate/util.py @@ -14,7 +14,9 @@ from collections import Counter from collections.abc import Mapping from multiprocessing import shared_memory from typing import Any, AnyStr, Optional, Tuple - +from faster_fifo import Queue as FFQueue +from queue import Full, Empty +import time import cv2 import numpy as np import psutil @@ -1218,3 +1220,47 @@ def get_video_properties(url, get_duration=False): result["height"] = round(height) return result + + +class LimitedQueue: + def __init__(self, maxsize=0, max_size_bytes=None, loads=None, dumps=None): + self.maxsize = maxsize + self.queue = FFQueue(max_size_bytes=max_size_bytes, loads=loads, dumps=dumps) + self.size = 0 + + def put(self, item, block=True, timeout=None): + if self.maxsize > 0 and self.size >= self.maxsize: + if block: + start_time = time.time() + while self.size >= self.maxsize: + remaining = timeout - (time.time() - start_time) + if remaining <= 0.0: + raise Full + time.sleep(min(remaining, 0.1)) + else: + raise Full + self.queue.put(item) + self.size += 1 + + def get(self, block=True, timeout=None): + if self.size <= 0: + if not block: + raise Empty + start_time = time.time() + while self.size <= 0: + remaining = timeout - (time.time() - start_time) + if remaining <= 0.0: + raise Empty + time.sleep(min(remaining, 0.1)) + item = self.queue.get() + self.size -= 1 + return item + + def qsize(self): + return self.size + + def empty(self): + return self.qsize() == 0 + + def full(self): + return self.qsize() == self.maxsize \ No newline at end of file diff --git a/frigate/video.py b/frigate/video.py index 8980fcde0..f7af72dbf 100755 --- a/frigate/video.py +++ b/frigate/video.py @@ -11,6 +11,7 @@ import time from collections import defaultdict import cv2 +import faster_fifo as ff import numpy as np from setproctitle import setproctitle @@ -717,7 +718,7 @@ def get_consolidated_object_detections(detected_object_groups): def process_frames( camera_name: str, - frame_queue: mp.Queue, + frame_queue: ff.Queue, frame_shape, model_config, detect_config: DetectConfig, @@ -725,7 +726,7 @@ def process_frames( motion_detector: MotionDetector, object_detector: RemoteObjectDetector, object_tracker: ObjectTracker, - detected_objects_queue: mp.Queue, + detected_objects_queue: ff.Queue, process_info: dict, objects_to_track: list[str], object_filters,