Refactored queues to use faster_fifo instead of mp.Queue

Refactored LimitedQueue to include a counter for the number of items in the queue and updated put and get methods to use the counter

Refactor app.py and util.py to use a custom Queue implementation called LQueue instead of the existing Queue

Refactor put and get methods in LimitedQueue to handle queue size and blocking behavior more efficiently

code format

remove code from other branch (merging fuckup)
This commit is contained in:
Sergey Krashevich 2023-06-28 05:50:18 +03:00
parent dbb6d704e3
commit 6b0a68d0dd
No known key found for this signature in database
GPG Key ID: 625171324E7D3856
4 changed files with 65 additions and 16 deletions

View File

@ -5,6 +5,9 @@ import os
import shutil import shutil
import signal import signal
import sys import sys
import faster_fifo as ff
from faster_fifo import Queue from faster_fifo import Queue
from typing import Optional from typing import Optional
from types import FrameType from types import FrameType
@ -30,6 +33,7 @@ from frigate.plus import PlusApi
from frigate.record import RecordingCleanup, RecordingMaintainer from frigate.record import RecordingCleanup, RecordingMaintainer
from frigate.stats import StatsEmitter, stats_init from frigate.stats import StatsEmitter, stats_init
from frigate.storage import StorageMaintainer from frigate.storage import StorageMaintainer
from frigate.util import LimitedQueue as LQueue
from frigate.version import VERSION from frigate.version import VERSION
from frigate.video import capture_camera, track_camera from frigate.video import capture_camera, track_camera
from frigate.watchdog import FrigateWatchdog from frigate.watchdog import FrigateWatchdog
@ -41,11 +45,11 @@ logger = logging.getLogger(__name__)
class FrigateApp: class FrigateApp:
def __init__(self) -> None: def __init__(self) -> None:
self.stop_event: MpEvent = mp.Event() self.stop_event: MpEvent = mp.Event()
self.detection_queue: Queue = mp.Queue() self.detection_queue: Queue = ff.Queue()
self.detectors: dict[str, ObjectDetectProcess] = {} self.detectors: dict[str, ObjectDetectProcess] = {}
self.detection_out_events: dict[str, MpEvent] = {} self.detection_out_events: dict[str, MpEvent] = {}
self.detection_shms: list[mp.shared_memory.SharedMemory] = [] self.detection_shms: list[mp.shared_memory.SharedMemory] = []
self.log_queue: Queue = mp.Queue() self.log_queue: Queue = ff.Queue()
self.plus_api = PlusApi() self.plus_api = PlusApi()
self.camera_metrics: dict[str, CameraMetricsTypes] = {} self.camera_metrics: dict[str, CameraMetricsTypes] = {}
@ -103,7 +107,7 @@ class FrigateApp:
"detection_frame": mp.Value("d", 0.0), "detection_frame": mp.Value("d", 0.0),
"read_start": mp.Value("d", 0.0), "read_start": mp.Value("d", 0.0),
"ffmpeg_pid": mp.Value("i", 0), "ffmpeg_pid": mp.Value("i", 0),
"frame_queue": mp.Queue(maxsize=2), "frame_queue": LQueue(maxsize=2),
"capture_process": None, "capture_process": None,
"process": None, "process": None,
} }
@ -121,19 +125,19 @@ class FrigateApp:
def init_queues(self) -> None: def init_queues(self) -> None:
# Queues for clip processing # Queues for clip processing
self.event_queue: Queue = mp.Queue() self.event_queue: Queue = ff.Queue()
self.event_processed_queue: Queue = mp.Queue() self.event_processed_queue: Queue = ff.Queue()
self.video_output_queue: Queue = mp.Queue( self.video_output_queue: Queue = LQueue(
maxsize=len(self.config.cameras.keys()) * 2 maxsize=len(self.config.cameras.keys()) * 2
) )
# Queue for cameras to push tracked objects to # Queue for cameras to push tracked objects to
self.detected_frames_queue: Queue = mp.Queue( self.detected_frames_queue: Queue = LQueue(
maxsize=len(self.config.cameras.keys()) * 2 maxsize=len(self.config.cameras.keys()) * 2
) )
# Queue for recordings info # Queue for recordings info
self.recordings_info_queue: Queue = mp.Queue() self.recordings_info_queue: Queue = ff.Queue()
def init_database(self) -> None: def init_database(self) -> None:
# Migrate DB location # Migrate DB location

View File

@ -7,6 +7,7 @@ import signal
import threading import threading
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
import faster_fifo as ff
import numpy as np import numpy as np
from setproctitle import setproctitle from setproctitle import setproctitle
@ -73,7 +74,7 @@ class LocalObjectDetector(ObjectDetector):
def run_detector( def run_detector(
name: str, name: str,
detection_queue: mp.Queue, detection_queue: ff.Queue,
out_events: dict[str, mp.Event], out_events: dict[str, mp.Event],
avg_speed, avg_speed,
start, start,

View File

@ -1,4 +1,5 @@
import copy import copy
import ctypes
import datetime import datetime
import logging import logging
import shlex import shlex
@ -14,6 +15,7 @@ from abc import ABC, abstractmethod
from collections import Counter from collections import Counter
from collections.abc import Mapping from collections.abc import Mapping
from multiprocessing import shared_memory from multiprocessing import shared_memory
from queue import Empty, Full
from typing import Any, AnyStr, Optional, Tuple from typing import Any, AnyStr, Optional, Tuple
import cv2 import cv2
@ -21,6 +23,8 @@ import numpy as np
import os import os
import psutil import psutil
import pytz import pytz
from faster_fifo import DEFAULT_CIRCULAR_BUFFER_SIZE, DEFAULT_TIMEOUT
from faster_fifo import Queue as FFQueue
from frigate.const import REGEX_HTTP_CAMERA_USER_PASS, REGEX_RTSP_CAMERA_USER_PASS from frigate.const import REGEX_HTTP_CAMERA_USER_PASS, REGEX_RTSP_CAMERA_USER_PASS
@ -772,7 +776,6 @@ def get_docker_memlimit_bytes() -> int:
# check running a supported cgroups version # check running a supported cgroups version
if get_cgroups_version() == "cgroup2": if get_cgroups_version() == "cgroup2":
memlimit_command = ["cat", "/sys/fs/cgroup/memory.max"] memlimit_command = ["cat", "/sys/fs/cgroup/memory.max"]
p = sp.run( p = sp.run(
@ -817,7 +820,6 @@ def get_cpu_stats() -> dict[str, dict]:
for line in lines: for line in lines:
stats = list(filter(lambda a: a != "", line.strip().split(" "))) stats = list(filter(lambda a: a != "", line.strip().split(" ")))
try: try:
if docker_memlimit > 0: if docker_memlimit > 0:
mem_res = int(stats[5]) mem_res = int(stats[5])
mem_pct = str( mem_pct = str(
@ -1067,3 +1069,47 @@ def get_tz_modifiers(tz_name: str) -> Tuple[str, str]:
hour_modifier = f"{hours_offset} hour" hour_modifier = f"{hours_offset} hour"
minute_modifier = f"{minutes_offset} minute" minute_modifier = f"{minutes_offset} minute"
return hour_modifier, minute_modifier return hour_modifier, minute_modifier
class LimitedQueue(FFQueue):
def __init__(
self,
maxsize=0,
max_size_bytes=DEFAULT_CIRCULAR_BUFFER_SIZE,
loads=None,
dumps=None,
):
super().__init__(max_size_bytes=max_size_bytes, loads=loads, dumps=dumps)
self.maxsize = maxsize
self.size = multiprocessing.RawValue(
ctypes.c_int, 0
) # Add a counter for the number of items in the queue
def put(self, x, block=True, timeout=DEFAULT_TIMEOUT):
if self.maxsize > 0 and self.size.value >= self.maxsize:
if block:
start_time = time.time()
while self.size.value >= self.maxsize:
remaining = timeout - (time.time() - start_time)
if remaining <= 0.0:
raise Full
time.sleep(min(remaining, 0.1))
else:
raise Full
self.size.value += 1
return super().put(x, block=block, timeout=timeout)
def get(self, block=True, timeout=DEFAULT_TIMEOUT):
if self.size.value <= 0 and not block:
raise Empty
self.size.value -= 1
return super().get(block=block, timeout=timeout)
def qsize(self):
return self.size
def empty(self):
return self.qsize() == 0
def full(self):
return self.qsize() == self.maxsize

View File

@ -10,6 +10,7 @@ import threading
import time import time
from collections import defaultdict from collections import defaultdict
import faster_fifo as ff
import numpy as np import numpy as np
import cv2 import cv2
from setproctitle import setproctitle from setproctitle import setproctitle
@ -162,7 +163,6 @@ def capture_frames(
current_frame: mp.Value, current_frame: mp.Value,
stop_event: mp.Event, stop_event: mp.Event,
): ):
frame_size = frame_shape[0] * frame_shape[1] frame_size = frame_shape[0] * frame_shape[1]
frame_rate = EventsPerSecond() frame_rate = EventsPerSecond()
frame_rate.start() frame_rate.start()
@ -577,7 +577,7 @@ def detect(
def process_frames( def process_frames(
camera_name: str, camera_name: str,
frame_queue: mp.Queue, frame_queue: ff.Queue,
frame_shape, frame_shape,
model_config, model_config,
detect_config: DetectConfig, detect_config: DetectConfig,
@ -585,7 +585,7 @@ def process_frames(
motion_detector: MotionDetector, motion_detector: MotionDetector,
object_detector: RemoteObjectDetector, object_detector: RemoteObjectDetector,
object_tracker: ObjectTracker, object_tracker: ObjectTracker,
detected_objects_queue: mp.Queue, detected_objects_queue: ff.Queue,
process_info: dict, process_info: dict,
objects_to_track: list[str], objects_to_track: list[str],
object_filters, object_filters,
@ -594,7 +594,6 @@ def process_frames(
stop_event, stop_event,
exit_on_empty: bool = False, exit_on_empty: bool = False,
): ):
fps = process_info["process_fps"] fps = process_info["process_fps"]
detection_fps = process_info["detection_fps"] detection_fps = process_info["detection_fps"]
current_frame_time = process_info["detection_frame"] current_frame_time = process_info["detection_frame"]
@ -748,7 +747,6 @@ def process_frames(
selected_objects = [] selected_objects = []
for group in detected_object_groups.values(): for group in detected_object_groups.values():
# apply non-maxima suppression to suppress weak, overlapping bounding boxes # apply non-maxima suppression to suppress weak, overlapping bounding boxes
# o[2] is the box of the object: xmin, ymin, xmax, ymax # o[2] is the box of the object: xmin, ymin, xmax, ymax
# apply max/min to ensure values do not exceed the known frame size # apply max/min to ensure values do not exceed the known frame size