diff --git a/frigate/util/builtin.py b/frigate/util/builtin.py index 2f623567c..653c101b3 100644 --- a/frigate/util/builtin.py +++ b/frigate/util/builtin.py @@ -1,7 +1,6 @@ """Utilities for builtin types manipulation.""" import copy -import ctypes import datetime import logging import multiprocessing @@ -11,7 +10,7 @@ import time import urllib.parse from collections import Counter from collections.abc import Mapping -from queue import Empty, Full +from queue import Full from typing import Any, Tuple import numpy as np @@ -75,39 +74,40 @@ class LimitedQueue(FFQueue): ): super().__init__(max_size_bytes=max_size_bytes, loads=loads, dumps=dumps) self.maxsize = maxsize - self.size = multiprocessing.RawValue( - ctypes.c_int, 0 - ) # Add a counter for the number of items in the queue self.lock = multiprocessing.Lock() # Add a lock for thread-safety - def put(self, x, block=True, timeout=DEFAULT_TIMEOUT): - with self.lock: # Ensure thread-safety - if self.maxsize > 0 and self.size.value >= self.maxsize: + def put(self, x, block=True, timeout=None): + # ensure only one writer. + with self.lock: + # block/full due to num elems + if self.maxsize > 0 and self.qsize() >= self.maxsize: if block: - start_time = time.time() - while self.size.value >= self.maxsize: - remaining = timeout - (time.time() - start_time) - if remaining <= 0.0: - raise Full - time.sleep(min(remaining, 0.1)) + if timeout is None: + while self.qsize() >= self.maxsize: + time.sleep( + 0.1 + ) # 0.1s, might want to replace this with a signal. + else: + start_time = time.time() + while self.qsize() >= self.maxsize: + remaining = timeout - (time.time() - start_time) + if remaining <= 0.0: + raise Full + time.sleep(min(remaining, 0.1)) else: raise Full - self.size.value += 1 - return super().put(x, block=block, timeout=timeout) - - def get(self, block=True, timeout=DEFAULT_TIMEOUT): - item = super().get(block=block, timeout=timeout) - with self.lock: # Ensure thread-safety - if self.size.value <= 0 and not block: - raise Empty - self.size.value -= 1 - return item - - def qsize(self): - return self.size.value - - def empty(self): - return self.qsize() == 0 + # block/full due to underlying circular buffer being full + if block and timeout is None: + # workaround for https://github.com/alex-petrenko/faster-fifo/issues/42 + while True: + try: + return super().put(x, block=block, timeout=DEFAULT_TIMEOUT) + except Full: + logger.warn("Queue was full, retrying in 1s") + time.sleep(1) + return super().put( + x, block=block, timeout=DEFAULT_TIMEOUT if timeout is None else timeout + ) def full(self): return self.qsize() == self.maxsize diff --git a/frigate/video.py b/frigate/video.py index 0d0b3e5c6..7a76b36ea 100755 --- a/frigate/video.py +++ b/frigate/video.py @@ -1025,21 +1025,17 @@ def process_frames( f"debug/frames/{camera_name}-{'{:.6f}'.format(frame_time)}.jpg", bgr_frame, ) - # add to the queue if not full - if detected_objects_queue.full(): - frame_manager.delete(f"{camera_name}{frame_time}") - continue - else: - fps_tracker.update() - fps.value = fps_tracker.eps() - detected_objects_queue.put( - ( - camera_name, - frame_time, - detections, - motion_boxes, - regions, - ) + # add to the queue, blocking in case the queue is full. + fps_tracker.update() + fps.value = fps_tracker.eps() + detected_objects_queue.put( + ( + camera_name, + frame_time, + detections, + motion_boxes, + regions, ) - detection_fps.value = object_detector.fps.eps() - frame_manager.close(f"{camera_name}{frame_time}") + ) + detection_fps.value = object_detector.fps.eps() + frame_manager.close(f"{camera_name}{frame_time}")