Refactored queues to use faster_fifo instead of mp.Queue

This commit is contained in:
Sergey Krashevich 2023-06-28 05:50:18 +03:00
parent c25367221e
commit 22ce05be7e
No known key found for this signature in database
GPG Key ID: 625171324E7D3856
6 changed files with 67 additions and 15 deletions

View File

@ -10,8 +10,10 @@ from multiprocessing.synchronize import Event as MpEvent
from types import FrameType from types import FrameType
from typing import Optional from typing import Optional
import faster_fifo as ff
import psutil import psutil
from faster_fifo import Queue from faster_fifo import Queue
from frigate.util import LimitedQueue as LQueue
from peewee_migrate import Router from peewee_migrate import Router
from playhouse.sqlite_ext import SqliteExtDatabase from playhouse.sqlite_ext import SqliteExtDatabase
from playhouse.sqliteq import SqliteQueueDatabase from playhouse.sqliteq import SqliteQueueDatabase
@ -56,11 +58,11 @@ logger = logging.getLogger(__name__)
class FrigateApp: class FrigateApp:
def __init__(self) -> None: def __init__(self) -> None:
self.stop_event: MpEvent = mp.Event() self.stop_event: MpEvent = mp.Event()
self.detection_queue: Queue = mp.Queue() self.detection_queue: Queue = ff.Queue()
self.detectors: dict[str, ObjectDetectProcess] = {} self.detectors: dict[str, ObjectDetectProcess] = {}
self.detection_out_events: dict[str, MpEvent] = {} self.detection_out_events: dict[str, MpEvent] = {}
self.detection_shms: list[mp.shared_memory.SharedMemory] = [] self.detection_shms: list[mp.shared_memory.SharedMemory] = []
self.log_queue: Queue = mp.Queue() self.log_queue: Queue = ff.Queue()
self.plus_api = PlusApi() self.plus_api = PlusApi()
self.camera_metrics: dict[str, CameraMetricsTypes] = {} self.camera_metrics: dict[str, CameraMetricsTypes] = {}
self.feature_metrics: dict[str, FeatureMetricsTypes] = {} self.feature_metrics: dict[str, FeatureMetricsTypes] = {}
@ -156,7 +158,7 @@ class FrigateApp:
"ffmpeg_pid": mp.Value("i", 0), # type: ignore[typeddict-item] "ffmpeg_pid": mp.Value("i", 0), # type: ignore[typeddict-item]
# issue https://github.com/python/typeshed/issues/8799 # issue https://github.com/python/typeshed/issues/8799
# from mypy 0.981 onwards # from mypy 0.981 onwards
"frame_queue": mp.Queue(maxsize=2), "frame_queue": LQueue(maxsize=2),
"capture_process": None, "capture_process": None,
"process": None, "process": None,
} }
@ -188,22 +190,22 @@ class FrigateApp:
def init_queues(self) -> None: def init_queues(self) -> None:
# Queues for clip processing # Queues for clip processing
self.event_queue: Queue = mp.Queue() self.event_queue: Queue = ff.Queue()
self.event_processed_queue: Queue = mp.Queue() self.event_processed_queue: Queue = ff.Queue()
self.video_output_queue: Queue = mp.Queue( self.video_output_queue: Queue = ff.Queue(
maxsize=len(self.config.cameras.keys()) * 2 maxsize=len(self.config.cameras.keys()) * 2
) )
# Queue for cameras to push tracked objects to # Queue for cameras to push tracked objects to
self.detected_frames_queue: Queue = mp.Queue( self.detected_frames_queue: Queue = ff.Queue(
maxsize=len(self.config.cameras.keys()) * 2 maxsize=len(self.config.cameras.keys()) * 2
) )
# Queue for recordings info # Queue for recordings info
self.recordings_info_queue: Queue = mp.Queue() self.recordings_info_queue: Queue = ff.Queue()
# Queue for timeline events # Queue for timeline events
self.timeline_queue: Queue = mp.Queue() self.timeline_queue: Queue = ff.Queue()
def init_database(self) -> None: def init_database(self) -> None:
def vacuum_db(db: SqliteExtDatabase) -> None: def vacuum_db(db: SqliteExtDatabase) -> None:

View File

@ -7,6 +7,7 @@ import signal
import threading import threading
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
import faster_fifo as ff
import numpy as np import numpy as np
from setproctitle import setproctitle from setproctitle import setproctitle
@ -72,7 +73,7 @@ class LocalObjectDetector(ObjectDetector):
def run_detector( def run_detector(
name: str, name: str,
detection_queue: mp.Queue, detection_queue: ff.Queue,
out_events: dict[str, mp.Event], out_events: dict[str, mp.Event],
avg_speed, avg_speed,
start, start,

View File

@ -15,6 +15,7 @@ from multiprocessing.synchronize import Event as MpEvent
from pathlib import Path from pathlib import Path
from typing import Any, Tuple from typing import Any, Tuple
import faster_fifo as ff
import psutil import psutil
from frigate.config import FrigateConfig, RetainModeEnum from frigate.config import FrigateConfig, RetainModeEnum
@ -30,7 +31,7 @@ class RecordingMaintainer(threading.Thread):
def __init__( def __init__(
self, self,
config: FrigateConfig, config: FrigateConfig,
recordings_info_queue: mp.Queue, recordings_info_queue: ff.Queue,
process_info: dict[str, FeatureMetricsTypes], process_info: dict[str, FeatureMetricsTypes],
stop_event: MpEvent, stop_event: MpEvent,
): ):

View File

@ -7,6 +7,7 @@ import threading
from types import FrameType from types import FrameType
from typing import Optional from typing import Optional
import faster_fifo as ff
from playhouse.sqliteq import SqliteQueueDatabase from playhouse.sqliteq import SqliteQueueDatabase
from setproctitle import setproctitle from setproctitle import setproctitle
@ -22,7 +23,7 @@ logger = logging.getLogger(__name__)
def manage_recordings( def manage_recordings(
config: FrigateConfig, config: FrigateConfig,
recordings_info_queue: mp.Queue, recordings_info_queue: ff.Queue,
process_info: dict[str, FeatureMetricsTypes], process_info: dict[str, FeatureMetricsTypes],
) -> None: ) -> None:
stop_event = mp.Event() stop_event = mp.Event()

View File

@ -14,7 +14,9 @@ from collections import Counter
from collections.abc import Mapping from collections.abc import Mapping
from multiprocessing import shared_memory from multiprocessing import shared_memory
from typing import Any, AnyStr, Optional, Tuple from typing import Any, AnyStr, Optional, Tuple
from faster_fifo import Queue as FFQueue
from queue import Full, Empty
import time
import cv2 import cv2
import numpy as np import numpy as np
import psutil import psutil
@ -1218,3 +1220,47 @@ def get_video_properties(url, get_duration=False):
result["height"] = round(height) result["height"] = round(height)
return result return result
class LimitedQueue:
def __init__(self, maxsize=0, max_size_bytes=None, loads=None, dumps=None):
self.maxsize = maxsize
self.queue = FFQueue(max_size_bytes=max_size_bytes, loads=loads, dumps=dumps)
self.size = 0
def put(self, item, block=True, timeout=None):
if self.maxsize > 0 and self.size >= self.maxsize:
if block:
start_time = time.time()
while self.size >= self.maxsize:
remaining = timeout - (time.time() - start_time)
if remaining <= 0.0:
raise Full
time.sleep(min(remaining, 0.1))
else:
raise Full
self.queue.put(item)
self.size += 1
def get(self, block=True, timeout=None):
if self.size <= 0:
if not block:
raise Empty
start_time = time.time()
while self.size <= 0:
remaining = timeout - (time.time() - start_time)
if remaining <= 0.0:
raise Empty
time.sleep(min(remaining, 0.1))
item = self.queue.get()
self.size -= 1
return item
def qsize(self):
return self.size
def empty(self):
return self.qsize() == 0
def full(self):
return self.qsize() == self.maxsize

View File

@ -11,6 +11,7 @@ import time
from collections import defaultdict from collections import defaultdict
import cv2 import cv2
import faster_fifo as ff
import numpy as np import numpy as np
from setproctitle import setproctitle from setproctitle import setproctitle
@ -717,7 +718,7 @@ def get_consolidated_object_detections(detected_object_groups):
def process_frames( def process_frames(
camera_name: str, camera_name: str,
frame_queue: mp.Queue, frame_queue: ff.Queue,
frame_shape, frame_shape,
model_config, model_config,
detect_config: DetectConfig, detect_config: DetectConfig,
@ -725,7 +726,7 @@ def process_frames(
motion_detector: MotionDetector, motion_detector: MotionDetector,
object_detector: RemoteObjectDetector, object_detector: RemoteObjectDetector,
object_tracker: ObjectTracker, object_tracker: ObjectTracker,
detected_objects_queue: mp.Queue, detected_objects_queue: ff.Queue,
process_info: dict, process_info: dict,
objects_to_track: list[str], objects_to_track: list[str],
object_filters, object_filters,