Merge branch 'dev' of github.com:blakeblackshear/frigate into util-cleanup

This commit is contained in:
Nick Mowen 2023-07-06 07:35:38 -06:00
commit 7d61b52e9f
9 changed files with 109 additions and 31 deletions

View File

@ -184,7 +184,7 @@ Topic to send PTZ commands to camera.
| Command | Description |
| ---------------------- | ----------------------------------------------------------------------------------------- |
| `preset-<preset_name>` | send command to move to preset with name `<preset_name>` |
| `preset_<preset_name>` | send command to move to preset with name `<preset_name>` |
| `MOVE_<dir>` | send command to continuously move in `<dir>`, possible values are [UP, DOWN, LEFT, RIGHT] |
| `ZOOM_<dir>` | send command to continuously zoom `<dir>`, possible values are [IN, OUT] |
| `STOP` | send command to stop moving |

View File

@ -10,6 +10,7 @@ from multiprocessing.synchronize import Event as MpEvent
from types import FrameType
from typing import Optional
import faster_fifo as ff
import psutil
from faster_fifo import Queue
from peewee_migrate import Router
@ -46,6 +47,7 @@ from frigate.stats import StatsEmitter, stats_init
from frigate.storage import StorageMaintainer
from frigate.timeline import TimelineProcessor
from frigate.types import CameraMetricsTypes, FeatureMetricsTypes
from frigate.util import LimitedQueue as LQueue
from frigate.version import VERSION
from frigate.video import capture_camera, track_camera
from frigate.watchdog import FrigateWatchdog
@ -56,11 +58,11 @@ logger = logging.getLogger(__name__)
class FrigateApp:
def __init__(self) -> None:
self.stop_event: MpEvent = mp.Event()
self.detection_queue: Queue = mp.Queue()
self.detection_queue: Queue = ff.Queue()
self.detectors: dict[str, ObjectDetectProcess] = {}
self.detection_out_events: dict[str, MpEvent] = {}
self.detection_shms: list[mp.shared_memory.SharedMemory] = []
self.log_queue: Queue = mp.Queue()
self.log_queue: Queue = ff.Queue()
self.plus_api = PlusApi()
self.camera_metrics: dict[str, CameraMetricsTypes] = {}
self.feature_metrics: dict[str, FeatureMetricsTypes] = {}
@ -156,7 +158,7 @@ class FrigateApp:
"ffmpeg_pid": mp.Value("i", 0), # type: ignore[typeddict-item]
# issue https://github.com/python/typeshed/issues/8799
# from mypy 0.981 onwards
"frame_queue": mp.Queue(maxsize=2),
"frame_queue": LQueue(maxsize=2),
"capture_process": None,
"process": None,
}
@ -188,22 +190,22 @@ class FrigateApp:
def init_queues(self) -> None:
# Queues for clip processing
self.event_queue: Queue = mp.Queue()
self.event_processed_queue: Queue = mp.Queue()
self.video_output_queue: Queue = mp.Queue(
self.event_queue: Queue = ff.Queue()
self.event_processed_queue: Queue = ff.Queue()
self.video_output_queue: Queue = LQueue(
maxsize=len(self.config.cameras.keys()) * 2
)
# Queue for cameras to push tracked objects to
self.detected_frames_queue: Queue = mp.Queue(
self.detected_frames_queue: Queue = LQueue(
maxsize=len(self.config.cameras.keys()) * 2
)
# Queue for recordings info
self.recordings_info_queue: Queue = mp.Queue()
self.recordings_info_queue: Queue = ff.Queue()
# Queue for timeline events
self.timeline_queue: Queue = mp.Queue()
self.timeline_queue: Queue = ff.Queue()
def init_database(self) -> None:
def vacuum_db(db: SqliteExtDatabase) -> None:

View File

@ -253,7 +253,7 @@ class Dispatcher:
try:
if "preset" in payload.lower():
command = OnvifCommandEnum.preset
param = payload.lower().split("-")[1]
param = payload.lower()[payload.index("_") + 1 :]
else:
command = OnvifCommandEnum[payload.lower()]
param = ""

View File

@ -7,6 +7,7 @@ import signal
import threading
from abc import ABC, abstractmethod
import faster_fifo as ff
import numpy as np
from setproctitle import setproctitle
@ -74,7 +75,7 @@ class LocalObjectDetector(ObjectDetector):
def run_detector(
name: str,
detection_queue: mp.Queue,
detection_queue: ff.Queue,
out_events: dict[str, mp.Event],
avg_speed,
start,

View File

@ -3,7 +3,6 @@
import asyncio
import datetime
import logging
import multiprocessing as mp
import os
import queue
import random
@ -15,6 +14,7 @@ from multiprocessing.synchronize import Event as MpEvent
from pathlib import Path
from typing import Any, Tuple
import faster_fifo as ff
import psutil
from frigate.config import FrigateConfig, RetainModeEnum
@ -31,7 +31,7 @@ class RecordingMaintainer(threading.Thread):
def __init__(
self,
config: FrigateConfig,
recordings_info_queue: mp.Queue,
recordings_info_queue: ff.Queue,
process_info: dict[str, FeatureMetricsTypes],
stop_event: MpEvent,
):

View File

@ -7,6 +7,7 @@ import threading
from types import FrameType
from typing import Optional
import faster_fifo as ff
from playhouse.sqliteq import SqliteQueueDatabase
from setproctitle import setproctitle
@ -22,7 +23,7 @@ logger = logging.getLogger(__name__)
def manage_recordings(
config: FrigateConfig,
recordings_info_queue: mp.Queue,
recordings_info_queue: ff.Queue,
process_info: dict[str, FeatureMetricsTypes],
) -> None:
stop_event = mp.Event()

View File

@ -1,10 +1,13 @@
"""Utilities for builtin types manipulation."""
import copy
import ctypes
import datetime
import logging
import multiprocessing
import re
import shlex
import time
import urllib.parse
from collections import Counter
from collections.abc import Mapping
@ -12,6 +15,10 @@ from typing import Any, Tuple
import pytz
import yaml
from queue import Empty, Full
from faster_fifo import DEFAULT_CIRCULAR_BUFFER_SIZE, DEFAULT_TIMEOUT
from faster_fifo import Queue as FFQueue
from frigate.const import REGEX_HTTP_CAMERA_USER_PASS, REGEX_RTSP_CAMERA_USER_PASS
@ -57,6 +64,50 @@ class EventsPerSecond:
del self._timestamps[0]
class LimitedQueue(FFQueue):
def __init__(
self,
maxsize=0,
max_size_bytes=DEFAULT_CIRCULAR_BUFFER_SIZE,
loads=None,
dumps=None,
):
super().__init__(max_size_bytes=max_size_bytes, loads=loads, dumps=dumps)
self.maxsize = maxsize
self.size = multiprocessing.RawValue(
ctypes.c_int, 0
) # Add a counter for the number of items in the queue
def put(self, x, block=True, timeout=DEFAULT_TIMEOUT):
if self.maxsize > 0 and self.size.value >= self.maxsize:
if block:
start_time = time.time()
while self.size.value >= self.maxsize:
remaining = timeout - (time.time() - start_time)
if remaining <= 0.0:
raise Full
time.sleep(min(remaining, 0.1))
else:
raise Full
self.size.value += 1
return super().put(x, block=block, timeout=timeout)
def get(self, block=True, timeout=DEFAULT_TIMEOUT):
if self.size.value <= 0 and not block:
raise Empty
self.size.value -= 1
return super().get(block=block, timeout=timeout)
def qsize(self):
return self.size
def empty(self):
return self.qsize() == 0
def full(self):
return self.qsize() == self.maxsize
def deep_merge(dct1: dict, dct2: dict, override=False, merge_lists=False) -> dict:
"""
:param dct1: First dict to merge

View File

@ -1,13 +1,34 @@
<<<<<<< HEAD:frigate/util/image.py
"""Utilities for creating and manipulating image frames."""
=======
import copy
import ctypes
>>>>>>> 2fae9dcb93f41936ee1907fc0813719c8559fe1b:frigate/util.py
import datetime
import logging
from abc import ABC, abstractmethod
from multiprocessing import shared_memory
<<<<<<< HEAD:frigate/util/image.py
from typing import AnyStr, Optional
import cv2
import numpy as np
=======
from queue import Empty, Full
from typing import Any, AnyStr, Optional, Tuple
import cv2
import numpy as np
import psutil
import py3nvml.py3nvml as nvml
import pytz
import yaml
from frigate.const import REGEX_HTTP_CAMERA_USER_PASS, REGEX_RTSP_CAMERA_USER_PASS
>>>>>>> 2fae9dcb93f41936ee1907fc0813719c8559fe1b:frigate/util.py
logger = logging.getLogger(__name__)

View File

@ -11,6 +11,7 @@ import time
from collections import defaultdict
import cv2
import faster_fifo as ff
import numpy as np
from setproctitle import setproctitle
@ -205,17 +206,16 @@ def capture_frames(
frame_rate.update()
# if the queue is full, skip this frame
if frame_queue.full():
skipped_eps.update()
frame_manager.delete(frame_name)
continue
# don't lock the queue to check, just try since it should rarely be full
try:
# add to the queue
frame_queue.put(current_frame.value, False)
# close the frame
frame_manager.close(frame_name)
# add to the queue
frame_queue.put(current_frame.value)
except queue.Full:
# if the queue is full, skip this frame
skipped_eps.update()
frame_manager.delete(frame_name)
class CameraWatchdog(threading.Thread):
@ -727,7 +727,7 @@ def get_consolidated_object_detections(detected_object_groups):
def process_frames(
camera_name: str,
frame_queue: mp.Queue,
frame_queue: ff.Queue,
frame_shape,
model_config: ModelConfig,
detect_config: DetectConfig,
@ -735,7 +735,7 @@ def process_frames(
motion_detector: MotionDetector,
object_detector: RemoteObjectDetector,
object_tracker: ObjectTracker,
detected_objects_queue: mp.Queue,
detected_objects_queue: ff.Queue,
process_info: dict,
objects_to_track: list[str],
object_filters,
@ -756,13 +756,15 @@ def process_frames(
region_min_size = get_min_region_size(model_config)
while not stop_event.is_set():
if exit_on_empty and frame_queue.empty():
logger.info("Exiting track_objects...")
break
try:
if exit_on_empty:
frame_time = frame_queue.get(False)
else:
frame_time = frame_queue.get(True, 1)
except queue.Empty:
if exit_on_empty:
logger.info("Exiting track_objects...")
break
continue
current_frame_time.value = frame_time