Detections

This commit is contained in:
Nicolas Mowen 2024-02-16 11:45:40 -07:00
parent 2050804d85
commit d3643741b5
7 changed files with 155 additions and 100 deletions

View File

@ -18,6 +18,7 @@ from playhouse.sqlite_ext import SqliteExtDatabase
from playhouse.sqliteq import SqliteQueueDatabase
from frigate.comms.config_updater import ConfigPublisher
from frigate.comms.detections_updater import DetectionProxy
from frigate.comms.dispatcher import Communicator, Dispatcher
from frigate.comms.inter_process import InterProcessCommunicator
from frigate.comms.mqtt import MqttClient
@ -202,16 +203,6 @@ class FrigateApp:
maxsize=sum(camera.enabled for camera in self.config.cameras.values()) * 2
)
# Queue for object recordings info
self.object_recordings_info_queue: Queue = mp.Queue()
# Queue for audio recordings info if enabled
self.audio_recordings_info_queue: Optional[Queue] = (
mp.Queue()
if len([c for c in self.config.cameras.values() if c.audio.enabled]) > 0
else None
)
# Queue for timeline events
self.timeline_queue: Queue = mp.Queue()
@ -287,11 +278,7 @@ class FrigateApp:
recording_process = mp.Process(
target=manage_recordings,
name="recording_manager",
args=(
self.config,
self.object_recordings_info_queue,
self.audio_recordings_info_queue,
),
args=(self.config,),
)
recording_process.daemon = True
self.recording_process = recording_process
@ -329,6 +316,7 @@ class FrigateApp:
def init_inter_process_communicator(self) -> None:
self.inter_process_communicator = InterProcessCommunicator()
self.inter_config_updater = ConfigPublisher()
self.inter_detection_proxy = DetectionProxy()
def init_web_server(self) -> None:
self.flask_app = create_app(
@ -417,7 +405,6 @@ class FrigateApp:
self.event_queue,
self.event_processed_queue,
self.video_output_queue,
self.object_recordings_info_queue,
self.ptz_autotracker_thread,
self.stop_event,
)
@ -500,7 +487,6 @@ class FrigateApp:
name="audio_capture",
args=(
self.config,
self.audio_recordings_info_queue,
self.camera_metrics,
),
)
@ -687,6 +673,7 @@ class FrigateApp:
# Stop Communicators
self.inter_process_communicator.stop()
self.inter_config_updater.stop()
self.inter_detection_proxy.stop()
self.dispatcher.stop()
self.detected_frames_processor.join()
@ -708,8 +695,6 @@ class FrigateApp:
self.event_processed_queue,
self.video_output_queue,
self.detected_frames_queue,
self.object_recordings_info_queue,
self.audio_recordings_info_queue,
self.log_queue,
]:
if queue is not None:

View File

@ -0,0 +1,109 @@
"""Facilitates communication between processes."""
import os
import threading
from enum import Enum
from typing import Optional
import zmq
from frigate.const import (
PORT_INTER_PROCESS_DETECTION_SUB,
PORT_INTER_PROCESS_DETECTION_PUB,
)
class DetectionTypeEnum(str, Enum):
all = ""
video = "video"
audio = "audio"
class DetectionProxyRunner(threading.Thread):
def __init__(self, context: zmq.Context[zmq.Socket]) -> None:
threading.Thread.__init__(self)
self.name = "detection_proxy"
self.context = context
def run(self) -> None:
"""Run the proxy."""
PUB_PORT = (
os.environ.get("INTER_PROCESS_DETECTION_PUB_PORT")
or PORT_INTER_PROCESS_DETECTION_PUB
)
SUB_PORT = (
os.environ.get("INTER_PROCESS_DETECTION_SUB_PORT")
or PORT_INTER_PROCESS_DETECTION_SUB
)
incoming = self.context.socket(zmq.XSUB)
incoming.bind(f"tcp://127.0.0.1:{PUB_PORT}")
outgoing = self.context.socket(zmq.XPUB)
outgoing.bind(f"tcp://127.0.0.1:{SUB_PORT}")
zmq.proxy(
incoming, outgoing
) # blocking, will unblock when context is destroyed
incoming.close()
outgoing.close()
class DetectionProxy:
"""Proxies video and audio detections."""
def __init__(self) -> None:
self.context = zmq.Context()
self.runner = DetectionProxyRunner(self.context)
self.runner.start()
def stop(self) -> None:
self.context.destroy() # destroying the context will stop the proxy
class DetectionPublisher:
"""Simplifies receiving video and audio detections."""
def __init__(self, topic: DetectionTypeEnum) -> None:
port = (
os.environ.get("INTER_PROCESS_DETECTIONS_PORT")
or PORT_INTER_PROCESS_DETECTION_PUB
)
self.topic = topic
self.context = zmq.Context()
self.socket = self.context.socket(zmq.PUB)
self.socket.connect(f"tcp://127.0.0.1:{port}")
def send_data(self, payload: any) -> None:
"""Publish detection."""
self.socket.send_string(self.topic.value, flags=zmq.SNDMORE)
self.socket.send_pyobj(payload)
def stop(self) -> None:
self.socket.close()
self.context.destroy()
class DetectionSubscriber:
"""Simplifies receiving video and audio detections."""
def __init__(self, topic: DetectionTypeEnum) -> None:
port = (
os.environ.get("INTER_PROCESS_DETECTIONS_PORT")
or PORT_INTER_PROCESS_DETECTION_SUB
)
self.context = zmq.Context()
self.socket = self.context.socket(zmq.SUB)
self.socket.setsockopt_string(zmq.SUBSCRIBE, topic.value)
self.socket.connect(f"tcp://127.0.0.1:{port}")
def get_data(self) -> Optional[tuple[str, any]]:
"""Returns detections or None if no update."""
try:
topic = DetectionTypeEnum[self.socket.recv_string(flags=zmq.NOBLOCK)]
return (topic, self.socket.recv_pyobj())
except zmq.ZMQError:
return (None, None)
def stop(self) -> None:
self.socket.close()
self.context.destroy()

View File

@ -61,6 +61,8 @@ DRIVER_INTEL_iHD = "iHD"
PORT_INTER_PROCESS_COMM = 4892
PORT_INTER_PROCESS_CONFIG = 4893
PORT_INTER_PROCESS_DETECTION_PUB = 4894
PORT_INTER_PROCESS_DETECTION_SUB = 4895
# Record Values

View File

@ -14,6 +14,7 @@ import requests
from setproctitle import setproctitle
from frigate.comms.config_updater import ConfigSubscriber
from frigate.comms.detections_updater import DetectionPublisher, DetectionTypeEnum
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import CameraConfig, CameraInput, FfmpegConfig, FrigateConfig
from frigate.const import (
@ -68,7 +69,6 @@ def get_ffmpeg_command(ffmpeg: FfmpegConfig) -> list[str]:
def listen_to_audio(
config: FrigateConfig,
recordings_info_queue: mp.Queue,
camera_metrics: dict[str, CameraMetricsTypes],
) -> None:
stop_event = mp.Event()
@ -95,7 +95,6 @@ def listen_to_audio(
if camera.enabled and camera.audio.enabled_in_config:
audio = AudioEventMaintainer(
camera,
recordings_info_queue,
camera_metrics,
stop_event,
)
@ -167,14 +166,12 @@ class AudioEventMaintainer(threading.Thread):
def __init__(
self,
camera: CameraConfig,
recordings_info_queue: mp.Queue,
camera_metrics: dict[str, CameraMetricsTypes],
stop_event: mp.Event,
) -> None:
threading.Thread.__init__(self)
self.name = f"{camera.name}_audio_event_processor"
self.config = camera
self.recordings_info_queue = recordings_info_queue
self.camera_metrics = camera_metrics
self.detections: dict[dict[str, any]] = {}
self.stop_event = stop_event
@ -189,6 +186,7 @@ class AudioEventMaintainer(threading.Thread):
# create communication for audio detections
self.requestor = InterProcessRequestor()
self.config_subscriber = ConfigSubscriber(f"config/audio/{camera.name}")
self.detection_publisher = DetectionPublisher(DetectionTypeEnum.audio)
def detect_audio(self, audio) -> None:
if not self.config.audio.enabled:
@ -219,8 +217,8 @@ class AudioEventMaintainer(threading.Thread):
self.handle_detection(label, score)
audio_detections.append(label)
# add audio info to recordings queue
self.recordings_info_queue.put(
# send audio detection data
self.detection_publisher.send_data(
(
self.config.name,
datetime.datetime.now().timestamp(),

View File

@ -12,6 +12,7 @@ from typing import Callable
import cv2
import numpy as np
from frigate.comms.detections_updater import DetectionPublisher, DetectionTypeEnum
from frigate.comms.dispatcher import Dispatcher
from frigate.config import (
CameraConfig,
@ -520,7 +521,9 @@ class CameraState:
):
max_target_box = self.ptz_autotracker_thread.ptz_autotracker.tracked_object_metrics[
self.name
]["max_target_box"]
][
"max_target_box"
]
side_length = max_target_box * (
max(
self.camera_config.detect.width,
@ -814,7 +817,6 @@ class TrackedObjectProcessor(threading.Thread):
event_queue,
event_processed_queue,
video_output_queue,
recordings_info_queue,
ptz_autotracker_thread,
stop_event,
):
@ -826,12 +828,12 @@ class TrackedObjectProcessor(threading.Thread):
self.event_queue = event_queue
self.event_processed_queue = event_processed_queue
self.video_output_queue = video_output_queue
self.recordings_info_queue = recordings_info_queue
self.stop_event = stop_event
self.camera_states: dict[str, CameraState] = {}
self.frame_manager = SharedMemoryFrameManager()
self.last_motion_detected: dict[str, float] = {}
self.ptz_autotracker_thread = ptz_autotracker_thread
self.detection_publisher = DetectionPublisher(DetectionTypeEnum.video)
def start(camera, obj: TrackedObject, current_frame_time):
self.event_queue.put(
@ -1126,8 +1128,8 @@ class TrackedObjectProcessor(threading.Thread):
)
)
# send info on this frame to the recordings maintainer
self.recordings_info_queue.put(
# publish info on this frame
self.detection_publisher.send_data(
(
camera,
frame_time,

View File

@ -5,7 +5,6 @@ import datetime
import logging
import multiprocessing as mp
import os
import queue
import random
import string
import threading
@ -18,6 +17,7 @@ import numpy as np
import psutil
from frigate.comms.config_updater import ConfigSubscriber
from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import FrigateConfig, RetainModeEnum
from frigate.const import (
@ -57,13 +57,7 @@ class SegmentInfo:
class RecordingMaintainer(threading.Thread):
def __init__(
self,
config: FrigateConfig,
object_recordings_info_queue: mp.Queue,
audio_recordings_info_queue: Optional[mp.Queue],
stop_event: MpEvent,
):
def __init__(self, config: FrigateConfig, stop_event: MpEvent):
threading.Thread.__init__(self)
self.name = "recording_maintainer"
self.config = config
@ -71,9 +65,8 @@ class RecordingMaintainer(threading.Thread):
# create communication for retained recordings
self.requestor = InterProcessRequestor()
self.config_subscriber = ConfigSubscriber("config/record/")
self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.all)
self.object_recordings_info_queue = object_recordings_info_queue
self.audio_recordings_info_queue = audio_recordings_info_queue
self.stop_event = stop_event
self.object_recordings_info: dict[str, list] = defaultdict(list)
self.audio_recordings_info: dict[str, list] = defaultdict(list)
@ -436,7 +429,6 @@ class RecordingMaintainer(threading.Thread):
return None
def run(self) -> None:
camera_count = sum(camera.enabled for camera in self.config.cameras.values())
# Check for new files every 5 seconds
wait_time = 0.0
while not self.stop_event.wait(wait_time):
@ -459,19 +451,19 @@ class RecordingMaintainer(threading.Thread):
stale_frame_count_threshold = 10
# empty the object recordings info queue
while True:
try:
(topic, data) = self.detection_subscriber.get_data()
if not topic:
break
if topic == DetectionTypeEnum.video:
(
camera,
frame_time,
current_tracked_objects,
motion_boxes,
regions,
) = self.object_recordings_info_queue.get(
True, timeout=QUEUE_READ_TIMEOUT
)
if frame_time < run_start - stale_frame_count_threshold:
stale_frame_count += 1
) = data
if self.config.cameras[camera].record.enabled:
self.object_recordings_info[camera].append(
@ -482,56 +474,29 @@ class RecordingMaintainer(threading.Thread):
regions,
)
)
except queue.Empty:
q_size = self.object_recordings_info_queue.qsize()
if q_size > camera_count:
logger.debug(
f"object_recordings_info loop queue not empty ({q_size})."
elif topic == DetectionTypeEnum.audio:
(
camera,
frame_time,
dBFS,
audio_detections,
) = data
if self.config.cameras[camera].record.enabled:
self.audio_recordings_info[camera].append(
(
frame_time,
dBFS,
audio_detections,
)
)
break
if frame_time < run_start - stale_frame_count_threshold:
stale_frame_count += 1
if stale_frame_count > 0:
logger.debug(f"Found {stale_frame_count} old frames.")
# empty the audio recordings info queue if audio is enabled
if self.audio_recordings_info_queue:
stale_frame_count = 0
while True:
try:
(
camera,
frame_time,
dBFS,
audio_detections,
) = self.audio_recordings_info_queue.get(
True, timeout=QUEUE_READ_TIMEOUT
)
if frame_time < run_start - stale_frame_count_threshold:
stale_frame_count += 1
if self.config.cameras[camera].record.enabled:
self.audio_recordings_info[camera].append(
(
frame_time,
dBFS,
audio_detections,
)
)
except queue.Empty:
q_size = self.audio_recordings_info_queue.qsize()
if q_size > camera_count:
logger.debug(
f"object_recordings_info loop audio queue not empty ({q_size})."
)
break
if stale_frame_count > 0:
logger.error(
f"Found {stale_frame_count} old audio frames, segments from recordings may be missing"
)
try:
asyncio.run(self.move_files())
except Exception as e:

View File

@ -18,11 +18,7 @@ from frigate.util.services import listen
logger = logging.getLogger(__name__)
def manage_recordings(
config: FrigateConfig,
object_recordings_info_queue: mp.Queue,
audio_recordings_info_queue: mp.Queue,
) -> None:
def manage_recordings(config: FrigateConfig) -> None:
stop_event = mp.Event()
def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None:
@ -49,8 +45,6 @@ def manage_recordings(
maintainer = RecordingMaintainer(
config,
object_recordings_info_queue,
audio_recordings_info_queue,
stop_event,
)
maintainer.start()