diff --git a/docs/docs/integrations/mqtt.md b/docs/docs/integrations/mqtt.md index 229285676..378eadb3e 100644 --- a/docs/docs/integrations/mqtt.md +++ b/docs/docs/integrations/mqtt.md @@ -184,7 +184,7 @@ Topic to send PTZ commands to camera. | Command | Description | | ---------------------- | ----------------------------------------------------------------------------------------- | -| `preset-` | send command to move to preset with name `` | +| `preset_` | send command to move to preset with name `` | | `MOVE_` | send command to continuously move in ``, possible values are [UP, DOWN, LEFT, RIGHT] | | `ZOOM_` | send command to continuously zoom ``, possible values are [IN, OUT] | | `STOP` | send command to stop moving | diff --git a/frigate/app.py b/frigate/app.py index 4d4aa5dd4..ed32dee17 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -10,6 +10,7 @@ from multiprocessing.synchronize import Event as MpEvent from types import FrameType from typing import Optional +import faster_fifo as ff import psutil from faster_fifo import Queue from peewee_migrate import Router @@ -46,6 +47,7 @@ from frigate.stats import StatsEmitter, stats_init from frigate.storage import StorageMaintainer from frigate.timeline import TimelineProcessor from frigate.types import CameraMetricsTypes, FeatureMetricsTypes +from frigate.util import LimitedQueue as LQueue from frigate.version import VERSION from frigate.video import capture_camera, track_camera from frigate.watchdog import FrigateWatchdog @@ -56,11 +58,11 @@ logger = logging.getLogger(__name__) class FrigateApp: def __init__(self) -> None: self.stop_event: MpEvent = mp.Event() - self.detection_queue: Queue = mp.Queue() + self.detection_queue: Queue = ff.Queue() self.detectors: dict[str, ObjectDetectProcess] = {} self.detection_out_events: dict[str, MpEvent] = {} self.detection_shms: list[mp.shared_memory.SharedMemory] = [] - self.log_queue: Queue = mp.Queue() + self.log_queue: Queue = ff.Queue() self.plus_api = PlusApi() self.camera_metrics: dict[str, CameraMetricsTypes] = {} self.feature_metrics: dict[str, FeatureMetricsTypes] = {} @@ -156,7 +158,7 @@ class FrigateApp: "ffmpeg_pid": mp.Value("i", 0), # type: ignore[typeddict-item] # issue https://github.com/python/typeshed/issues/8799 # from mypy 0.981 onwards - "frame_queue": mp.Queue(maxsize=2), + "frame_queue": LQueue(maxsize=2), "capture_process": None, "process": None, } @@ -188,22 +190,22 @@ class FrigateApp: def init_queues(self) -> None: # Queues for clip processing - self.event_queue: Queue = mp.Queue() - self.event_processed_queue: Queue = mp.Queue() - self.video_output_queue: Queue = mp.Queue( + self.event_queue: Queue = ff.Queue() + self.event_processed_queue: Queue = ff.Queue() + self.video_output_queue: Queue = LQueue( maxsize=len(self.config.cameras.keys()) * 2 ) # Queue for cameras to push tracked objects to - self.detected_frames_queue: Queue = mp.Queue( + self.detected_frames_queue: Queue = LQueue( maxsize=len(self.config.cameras.keys()) * 2 ) # Queue for recordings info - self.recordings_info_queue: Queue = mp.Queue() + self.recordings_info_queue: Queue = ff.Queue() # Queue for timeline events - self.timeline_queue: Queue = mp.Queue() + self.timeline_queue: Queue = ff.Queue() def init_database(self) -> None: def vacuum_db(db: SqliteExtDatabase) -> None: diff --git a/frigate/comms/dispatcher.py b/frigate/comms/dispatcher.py index 6fdec5ed0..ada7e4cb4 100644 --- a/frigate/comms/dispatcher.py +++ b/frigate/comms/dispatcher.py @@ -253,7 +253,7 @@ class Dispatcher: try: if "preset" in payload.lower(): command = OnvifCommandEnum.preset - param = payload.lower().split("-")[1] + param = payload.lower()[payload.index("_") + 1 :] else: command = OnvifCommandEnum[payload.lower()] param = "" diff --git a/frigate/object_detection.py b/frigate/object_detection.py index fac43f39b..a0c31b755 100644 --- a/frigate/object_detection.py +++ b/frigate/object_detection.py @@ -7,6 +7,7 @@ import signal import threading from abc import ABC, abstractmethod +import faster_fifo as ff import numpy as np from setproctitle import setproctitle @@ -74,7 +75,7 @@ class LocalObjectDetector(ObjectDetector): def run_detector( name: str, - detection_queue: mp.Queue, + detection_queue: ff.Queue, out_events: dict[str, mp.Event], avg_speed, start, diff --git a/frigate/record/maintainer.py b/frigate/record/maintainer.py index 72766a39d..d21affefa 100644 --- a/frigate/record/maintainer.py +++ b/frigate/record/maintainer.py @@ -3,7 +3,6 @@ import asyncio import datetime import logging -import multiprocessing as mp import os import queue import random @@ -15,6 +14,7 @@ from multiprocessing.synchronize import Event as MpEvent from pathlib import Path from typing import Any, Tuple +import faster_fifo as ff import psutil from frigate.config import FrigateConfig, RetainModeEnum @@ -31,7 +31,7 @@ class RecordingMaintainer(threading.Thread): def __init__( self, config: FrigateConfig, - recordings_info_queue: mp.Queue, + recordings_info_queue: ff.Queue, process_info: dict[str, FeatureMetricsTypes], stop_event: MpEvent, ): diff --git a/frigate/record/record.py b/frigate/record/record.py index dd03bf656..3e812a809 100644 --- a/frigate/record/record.py +++ b/frigate/record/record.py @@ -7,6 +7,7 @@ import threading from types import FrameType from typing import Optional +import faster_fifo as ff from playhouse.sqliteq import SqliteQueueDatabase from setproctitle import setproctitle @@ -22,7 +23,7 @@ logger = logging.getLogger(__name__) def manage_recordings( config: FrigateConfig, - recordings_info_queue: mp.Queue, + recordings_info_queue: ff.Queue, process_info: dict[str, FeatureMetricsTypes], ) -> None: stop_event = mp.Event() diff --git a/frigate/util/builtin.py b/frigate/util/builtin.py index d6574aed0..20ce74d9e 100644 --- a/frigate/util/builtin.py +++ b/frigate/util/builtin.py @@ -1,10 +1,13 @@ """Utilities for builtin types manipulation.""" import copy +import ctypes import datetime import logging +import multiprocessing import re import shlex +import time import urllib.parse from collections import Counter from collections.abc import Mapping @@ -12,6 +15,10 @@ from typing import Any, Tuple import pytz import yaml +from queue import Empty, Full + +from faster_fifo import DEFAULT_CIRCULAR_BUFFER_SIZE, DEFAULT_TIMEOUT +from faster_fifo import Queue as FFQueue from frigate.const import REGEX_HTTP_CAMERA_USER_PASS, REGEX_RTSP_CAMERA_USER_PASS @@ -57,6 +64,50 @@ class EventsPerSecond: del self._timestamps[0] +class LimitedQueue(FFQueue): + def __init__( + self, + maxsize=0, + max_size_bytes=DEFAULT_CIRCULAR_BUFFER_SIZE, + loads=None, + dumps=None, + ): + super().__init__(max_size_bytes=max_size_bytes, loads=loads, dumps=dumps) + self.maxsize = maxsize + self.size = multiprocessing.RawValue( + ctypes.c_int, 0 + ) # Add a counter for the number of items in the queue + + def put(self, x, block=True, timeout=DEFAULT_TIMEOUT): + if self.maxsize > 0 and self.size.value >= self.maxsize: + if block: + start_time = time.time() + while self.size.value >= self.maxsize: + remaining = timeout - (time.time() - start_time) + if remaining <= 0.0: + raise Full + time.sleep(min(remaining, 0.1)) + else: + raise Full + self.size.value += 1 + return super().put(x, block=block, timeout=timeout) + + def get(self, block=True, timeout=DEFAULT_TIMEOUT): + if self.size.value <= 0 and not block: + raise Empty + self.size.value -= 1 + return super().get(block=block, timeout=timeout) + + def qsize(self): + return self.size + + def empty(self): + return self.qsize() == 0 + + def full(self): + return self.qsize() == self.maxsize + + def deep_merge(dct1: dict, dct2: dict, override=False, merge_lists=False) -> dict: """ :param dct1: First dict to merge diff --git a/frigate/util/image.py b/frigate/util/image.py index 4af94500d..2a59e93a4 100644 --- a/frigate/util/image.py +++ b/frigate/util/image.py @@ -1,13 +1,34 @@ +<<<<<<< HEAD:frigate/util/image.py """Utilities for creating and manipulating image frames.""" +======= +import copy +import ctypes +>>>>>>> 2fae9dcb93f41936ee1907fc0813719c8559fe1b:frigate/util.py import datetime import logging + from abc import ABC, abstractmethod from multiprocessing import shared_memory +<<<<<<< HEAD:frigate/util/image.py from typing import AnyStr, Optional import cv2 import numpy as np +======= +from queue import Empty, Full +from typing import Any, AnyStr, Optional, Tuple + +import cv2 +import numpy as np +import psutil +import py3nvml.py3nvml as nvml +import pytz +import yaml + + +from frigate.const import REGEX_HTTP_CAMERA_USER_PASS, REGEX_RTSP_CAMERA_USER_PASS +>>>>>>> 2fae9dcb93f41936ee1907fc0813719c8559fe1b:frigate/util.py logger = logging.getLogger(__name__) diff --git a/frigate/video.py b/frigate/video.py index 06a6ac6a6..0d0b3e5c6 100755 --- a/frigate/video.py +++ b/frigate/video.py @@ -11,6 +11,7 @@ import time from collections import defaultdict import cv2 +import faster_fifo as ff import numpy as np from setproctitle import setproctitle @@ -205,17 +206,16 @@ def capture_frames( frame_rate.update() - # if the queue is full, skip this frame - if frame_queue.full(): + # don't lock the queue to check, just try since it should rarely be full + try: + # add to the queue + frame_queue.put(current_frame.value, False) + # close the frame + frame_manager.close(frame_name) + except queue.Full: + # if the queue is full, skip this frame skipped_eps.update() frame_manager.delete(frame_name) - continue - - # close the frame - frame_manager.close(frame_name) - - # add to the queue - frame_queue.put(current_frame.value) class CameraWatchdog(threading.Thread): @@ -727,7 +727,7 @@ def get_consolidated_object_detections(detected_object_groups): def process_frames( camera_name: str, - frame_queue: mp.Queue, + frame_queue: ff.Queue, frame_shape, model_config: ModelConfig, detect_config: DetectConfig, @@ -735,7 +735,7 @@ def process_frames( motion_detector: MotionDetector, object_detector: RemoteObjectDetector, object_tracker: ObjectTracker, - detected_objects_queue: mp.Queue, + detected_objects_queue: ff.Queue, process_info: dict, objects_to_track: list[str], object_filters, @@ -756,13 +756,15 @@ def process_frames( region_min_size = get_min_region_size(model_config) while not stop_event.is_set(): - if exit_on_empty and frame_queue.empty(): - logger.info("Exiting track_objects...") - break - try: - frame_time = frame_queue.get(True, 1) + if exit_on_empty: + frame_time = frame_queue.get(False) + else: + frame_time = frame_queue.get(True, 1) except queue.Empty: + if exit_on_empty: + logger.info("Exiting track_objects...") + break continue current_frame_time.value = frame_time