Fix queues

This commit is contained in:
Nick Mowen 2023-07-07 13:37:41 -06:00
parent 00b9a490bb
commit ebfe669ac2
3 changed files with 13 additions and 9 deletions

View File

@ -26,6 +26,7 @@ from frigate.const import (
CLIPS_DIR,
CONFIG_DIR,
DEFAULT_DB_PATH,
DEFAULT_QUEUE_SIZE,
EXPORT_DIR,
MODEL_CACHE_DIR,
RECORD_DIR,
@ -190,8 +191,8 @@ class FrigateApp:
def init_queues(self) -> None:
# Queues for clip processing
self.event_queue: Queue = ff.Queue()
self.event_processed_queue: Queue = ff.Queue()
self.event_queue: Queue = ff.Queue(DEFAULT_QUEUE_SIZE)
self.event_processed_queue: Queue = ff.Queue(DEFAULT_QUEUE_SIZE)
self.video_output_queue: Queue = LQueue(
maxsize=len(self.config.cameras.keys()) * 2
)
@ -202,10 +203,10 @@ class FrigateApp:
)
# Queue for recordings info
self.recordings_info_queue: Queue = ff.Queue()
self.recordings_info_queue: Queue = ff.Queue(DEFAULT_QUEUE_SIZE)
# Queue for timeline events
self.timeline_queue: Queue = ff.Queue()
self.timeline_queue: Queue = ff.Queue(DEFAULT_QUEUE_SIZE)
def init_database(self) -> None:
def vacuum_db(db: SqliteExtDatabase) -> None:

View File

@ -46,3 +46,7 @@ DRIVER_INTEL_iHD = "iHD"
MAX_SEGMENT_DURATION = 600
MAX_PLAYLIST_SECONDS = 7200 # support 2 hour segments for a single playlist to account for cameras with inconsistent segment times
# Queue Values
DEFAULT_QUEUE_SIZE = 2000 * 1000 # 2MB

View File

@ -17,11 +17,10 @@ from typing import Any, Tuple
import numpy as np
import pytz
import yaml
from faster_fifo import DEFAULT_CIRCULAR_BUFFER_SIZE, DEFAULT_TIMEOUT
from faster_fifo import Queue as FFQueue
from ruamel.yaml import YAML
from frigate.const import REGEX_HTTP_CAMERA_USER_PASS, REGEX_RTSP_CAMERA_USER_PASS
from frigate.const import DEFAULT_QUEUE_SIZE, REGEX_HTTP_CAMERA_USER_PASS, REGEX_RTSP_CAMERA_USER_PASS
logger = logging.getLogger(__name__)
@ -69,7 +68,7 @@ class LimitedQueue(FFQueue):
def __init__(
self,
maxsize=0,
max_size_bytes=DEFAULT_CIRCULAR_BUFFER_SIZE,
max_size_bytes=DEFAULT_QUEUE_SIZE,
loads=None,
dumps=None,
):
@ -80,7 +79,7 @@ class LimitedQueue(FFQueue):
) # Add a counter for the number of items in the queue
self.lock = multiprocessing.Lock() # Add a lock for thread-safety
def put(self, x, block=True, timeout=DEFAULT_TIMEOUT):
def put(self, x, block=True, timeout=None):
with self.lock: # Ensure thread-safety
if self.maxsize > 0 and self.size.value >= self.maxsize:
if block:
@ -95,7 +94,7 @@ class LimitedQueue(FFQueue):
self.size.value += 1
return super().put(x, block=block, timeout=timeout)
def get(self, block=True, timeout=DEFAULT_TIMEOUT):
def get(self, block=True, timeout=None):
item = super().get(block=block, timeout=timeout)
with self.lock: # Ensure thread-safety
if self.size.value <= 0 and not block: