Use standard queue for limited queue

This commit is contained in:
Nick Mowen 2023-07-07 13:53:52 -06:00
parent 3735aee7de
commit 3abe7b6965
2 changed files with 3 additions and 52 deletions

View File

@ -48,7 +48,6 @@ from frigate.stats import StatsEmitter, stats_init
from frigate.storage import StorageMaintainer from frigate.storage import StorageMaintainer
from frigate.timeline import TimelineProcessor from frigate.timeline import TimelineProcessor
from frigate.types import CameraMetricsTypes, FeatureMetricsTypes from frigate.types import CameraMetricsTypes, FeatureMetricsTypes
from frigate.util.builtin import LimitedQueue as LQueue
from frigate.version import VERSION from frigate.version import VERSION
from frigate.video import capture_camera, track_camera from frigate.video import capture_camera, track_camera
from frigate.watchdog import FrigateWatchdog from frigate.watchdog import FrigateWatchdog
@ -159,7 +158,7 @@ class FrigateApp:
"ffmpeg_pid": mp.Value("i", 0), # type: ignore[typeddict-item] "ffmpeg_pid": mp.Value("i", 0), # type: ignore[typeddict-item]
# issue https://github.com/python/typeshed/issues/8799 # issue https://github.com/python/typeshed/issues/8799
# from mypy 0.981 onwards # from mypy 0.981 onwards
"frame_queue": LQueue(maxsize=2), "frame_queue": mp.Queue(maxsize=2),
"capture_process": None, "capture_process": None,
"process": None, "process": None,
} }
@ -193,12 +192,12 @@ class FrigateApp:
# Queues for clip processing # Queues for clip processing
self.event_queue: Queue = ff.Queue(DEFAULT_QUEUE_BUFFER_SIZE) self.event_queue: Queue = ff.Queue(DEFAULT_QUEUE_BUFFER_SIZE)
self.event_processed_queue: Queue = ff.Queue(DEFAULT_QUEUE_BUFFER_SIZE) self.event_processed_queue: Queue = ff.Queue(DEFAULT_QUEUE_BUFFER_SIZE)
self.video_output_queue: Queue = LQueue( self.video_output_queue: Queue = mp.Queue(
maxsize=len(self.config.cameras.keys()) * 2 maxsize=len(self.config.cameras.keys()) * 2
) )
# Queue for cameras to push tracked objects to # Queue for cameras to push tracked objects to
self.detected_frames_queue: Queue = LQueue( self.detected_frames_queue: Queue = mp.Queue(
maxsize=len(self.config.cameras.keys()) * 2 maxsize=len(self.config.cameras.keys()) * 2
) )

View File

@ -68,54 +68,6 @@ class EventsPerSecond:
del self._timestamps[0] del self._timestamps[0]
class LimitedQueue(FFQueue):
def __init__(
self,
maxsize=0,
max_size_bytes=DEFAULT_QUEUE_BUFFER_SIZE,
loads=None,
dumps=None,
):
super().__init__(max_size_bytes=max_size_bytes, loads=loads, dumps=dumps)
self.maxsize = maxsize
self.size = multiprocessing.RawValue(
ctypes.c_int, 0
) # Add a counter for the number of items in the queue
self.lock = multiprocessing.Lock() # Add a lock for thread-safety
def put(self, x, block=True, timeout=None):
with self.lock: # Ensure thread-safety
if self.maxsize > 0 and self.size.value >= self.maxsize:
if block:
start_time = time.time()
while self.size.value >= self.maxsize:
remaining = timeout - (time.time() - start_time)
if remaining <= 0.0:
raise Full
time.sleep(min(remaining, 0.1))
else:
raise Full
self.size.value += 1
return super().put(x, block=block, timeout=timeout)
def get(self, block=True, timeout=None):
item = super().get(block=block, timeout=timeout)
with self.lock: # Ensure thread-safety
if self.size.value <= 0 and not block:
raise Empty
self.size.value -= 1
return item
def qsize(self):
return self.size.value
def empty(self):
return self.qsize() == 0
def full(self):
return self.qsize() == self.maxsize
def deep_merge(dct1: dict, dct2: dict, override=False, merge_lists=False) -> dict: def deep_merge(dct1: dict, dct2: dict, override=False, merge_lists=False) -> dict:
""" """
:param dct1: First dict to merge :param dct1: First dict to merge