From ccf5250679eb2b2411cd66cdbdb8bc09bcb86530 Mon Sep 17 00:00:00 2001 From: Chris OBryan <13701027+cobryan05@users.noreply.github.com> Date: Thu, 8 Jun 2023 12:53:34 -0500 Subject: [PATCH] Add enabling/disabling camera capture via MQTT Adds new MQTT topic, frigate//capture/state to dynamically enable or disable capturing from a camera. The ffmpeg process is terminated when the capture state is OFF and is resumed when the capture state is ON. --- docs/docs/integrations/mqtt.md | 8 +++ frigate/app.py | 3 + frigate/comms/dispatcher.py | 16 +++++ frigate/comms/mqtt.py | 6 ++ frigate/config.py | 1 + frigate/types.py | 1 + frigate/util.py | 15 +++++ frigate/video.py | 107 +++++++++++++++++---------------- 8 files changed, 106 insertions(+), 51 deletions(-) diff --git a/docs/docs/integrations/mqtt.md b/docs/docs/integrations/mqtt.md index 814656258..70b60f2b5 100644 --- a/docs/docs/integrations/mqtt.md +++ b/docs/docs/integrations/mqtt.md @@ -112,6 +112,14 @@ Topic to turn recordings for a camera on and off. Expected values are `ON` and ` Topic with current state of recordings for a camera. Published values are `ON` and `OFF`. +### `frigate//capture/set` + +Topic to turn capture for a camera on and off. Expected values are `ON` and `OFF`. + +### `frigate//capture/state` + +Topic with current state of capture for a camera. Published values are `ON` and `OFF`. + ### `frigate//snapshots/set` Topic to turn snapshots for a camera on and off. Expected values are `ON` and `OFF`. diff --git a/frigate/app.py b/frigate/app.py index 2db8728b2..69c6550e8 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -109,6 +109,9 @@ class FrigateApp: "camera_fps": mp.Value("d", 0.0), "skipped_fps": mp.Value("d", 0.0), "process_fps": mp.Value("d", 0.0), + "capture_enabled": mp.Value( + "i", self.config.cameras[camera_name].capture_enabled + ), "detection_enabled": mp.Value( "i", self.config.cameras[camera_name].detect.enabled ), diff --git a/frigate/comms/dispatcher.py b/frigate/comms/dispatcher.py index b7e9e8858..ba5cd1c08 100644 --- a/frigate/comms/dispatcher.py +++ b/frigate/comms/dispatcher.py @@ -1,6 +1,8 @@ """Handle communication between Frigate and other applications.""" import logging +import os +import signal from abc import ABC, abstractmethod from typing import Any, Callable @@ -52,6 +54,7 @@ class Dispatcher: comm.subscribe(self._receive) self._camera_settings_handlers: dict[str, Callable] = { + "capture": self._on_capture_command, "detect": self._on_detect_command, "improve_contrast": self._on_motion_improve_contrast_command, "motion": self._on_motion_command, @@ -92,6 +95,19 @@ class Dispatcher: for comm in self.comms: comm.stop() + def _on_capture_command(self, camera_name: str, payload: str) -> None: + """Callback for detect topic.""" + logger.info(f"Received capture command [{payload}] for [{camera_name}]") + capture_proc = self.camera_metrics[camera_name]['capture_process'] + + # Send SIGUSR1 to resume, SIGUSR2 to pause + new_state = True if payload == "ON" else False if payload == "OFF" else None + if new_state is not None: + sig = signal.SIGUSR1 if new_state else signal.SIGUSR2 + self.config.cameras[camera_name].capture_enabled = new_state + os.kill( capture_proc.pid, sig ) + self.publish(f"{camera_name}/capture/state", payload, retain=True) + def _on_detect_command(self, camera_name: str, payload: str) -> None: """Callback for detect topic.""" detect_settings = self.config.cameras[camera_name].detect diff --git a/frigate/comms/mqtt.py b/frigate/comms/mqtt.py index 07799f9da..6133c3caa 100644 --- a/frigate/comms/mqtt.py +++ b/frigate/comms/mqtt.py @@ -59,6 +59,11 @@ class MqttClient(Communicator): # type: ignore[misc] "ON", retain=True, ) + self.publish( + f"{camera_name}/capture/state", + "ON", + retain=True, + ) self.publish( f"{camera_name}/improve_contrast/state", "ON" if camera.motion.improve_contrast else "OFF", # type: ignore[union-attr] @@ -141,6 +146,7 @@ class MqttClient(Communicator): # type: ignore[misc] # register callbacks callback_types = [ + "capture", "recordings", "snapshots", "detect", diff --git a/frigate/config.py b/frigate/config.py index 5c2f27b5a..36e25ad05 100644 --- a/frigate/config.py +++ b/frigate/config.py @@ -580,6 +580,7 @@ class CameraUiConfig(FrigateBaseModel): class CameraConfig(FrigateBaseModel): name: Optional[str] = Field(title="Camera name.", regex=REGEX_CAMERA_NAME) enabled: bool = Field(default=True, title="Enable camera.") + capture_enabled: bool = Field(default=True, title="Capture enabled.") ffmpeg: CameraFfmpegConfig = Field(title="FFmpeg configuration for the camera.") best_image_timeout: int = Field( default=60, diff --git a/frigate/types.py b/frigate/types.py index 8c3e54654..9a03c8359 100644 --- a/frigate/types.py +++ b/frigate/types.py @@ -8,6 +8,7 @@ from frigate.object_detection import ObjectDetectProcess class CameraMetricsTypes(TypedDict): camera_fps: Synchronized + capture_enabled: Synchronized capture_process: Optional[Process] detection_enabled: Synchronized detection_fps: Synchronized diff --git a/frigate/util.py b/frigate/util.py index e624e877a..32af012e0 100755 --- a/frigate/util.py +++ b/frigate/util.py @@ -636,6 +636,21 @@ def restart_frigate(): else: os.kill(os.getpid(), signal.SIGTERM) +def terminate_process(proc: sp.Popen, logger_inst: logging.Logger = None, timeout: float = 30.0): + logger_inst = logger_inst or logger + if proc is not None and proc.poll() == None: + proc_name: str = proc.args[0] + logger_inst.info(f"Terminating the existing {proc_name} process...") + proc.terminate() + try: + logger_inst.info(f"Waiting for {proc_name} to exit gracefully...") + proc.communicate(timeout=timeout) + except sp.TimeoutExpired: + logger_inst.info(f"{proc_name}didn't exit. Force killing...") + proc.kill() + proc.communicate() + + class EventsPerSecond: def __init__(self, max_events=1000): diff --git a/frigate/video.py b/frigate/video.py index 08808038e..a0ecba514 100755 --- a/frigate/video.py +++ b/frigate/video.py @@ -31,6 +31,7 @@ from frigate.util import ( intersection, intersection_over_union, listen, + terminate_process, yuv_region_2_bgr, yuv_region_2_rgb, yuv_region_2_yuv, @@ -112,25 +113,10 @@ def create_tensor_input(frame, model_config, region): # Expand dimensions since the model expects images to have shape: [1, height, width, 3] return np.expand_dims(cropped_frame, axis=0) - -def stop_ffmpeg(ffmpeg_process, logger): - logger.info("Terminating the existing ffmpeg process...") - ffmpeg_process.terminate() - try: - logger.info("Waiting for ffmpeg to exit gracefully...") - ffmpeg_process.communicate(timeout=30) - except sp.TimeoutExpired: - logger.info("FFmpeg didnt exit. Force killing...") - ffmpeg_process.kill() - ffmpeg_process.communicate() - ffmpeg_process = None - - def start_or_restart_ffmpeg( ffmpeg_cmd, logger, logpipe: LogPipe, frame_size=None, ffmpeg_process=None ): - if ffmpeg_process is not None: - stop_ffmpeg(ffmpeg_process, logger) + terminate_process( ffmpeg_process, logger_inst=logger ) if frame_size is None: process = sp.Popen( @@ -268,27 +254,12 @@ class CameraWatchdog(threading.Thread): self.logger.info( f"No frames received from {self.camera_name} in 20 seconds. Exiting ffmpeg..." ) - self.ffmpeg_detect_process.terminate() - try: - self.logger.info("Waiting for ffmpeg to exit gracefully...") - self.ffmpeg_detect_process.communicate(timeout=30) - except sp.TimeoutExpired: - self.logger.info("FFmpeg did not exit. Force killing...") - self.ffmpeg_detect_process.kill() - self.ffmpeg_detect_process.communicate() + terminate_process(self.ffmpeg_detect_process, logger_inst=self.logger) + self.ffmpeg_detect_process = None elif self.camera_fps.value >= (self.config.detect.fps + 10): self.camera_fps.value = 0 - self.logger.info( - f"{self.camera_name} exceeded fps limit. Exiting ffmpeg..." - ) - self.ffmpeg_detect_process.terminate() - try: - self.logger.info("Waiting for ffmpeg to exit gracefully...") - self.ffmpeg_detect_process.communicate(timeout=30) - except sp.TimeoutExpired: - self.logger.info("FFmpeg did not exit. Force killing...") - self.ffmpeg_detect_process.kill() - self.ffmpeg_detect_process.communicate() + terminate_process(self.ffmpeg_detect_process, logger_inst=self.logger) + self.ffmpeg_detect_process = None for p in self.ffmpeg_other_processes: poll = p["process"].poll() @@ -324,9 +295,11 @@ class CameraWatchdog(threading.Thread): p["cmd"], self.logger, p["logpipe"], ffmpeg_process=p["process"] ) - stop_ffmpeg(self.ffmpeg_detect_process, self.logger) + terminate_process(self.ffmpeg_detect_process, logger_inst=self.logger) + self.ffmpeg_detect_process = None for p in self.ffmpeg_other_processes: - stop_ffmpeg(p["process"], self.logger) + terminate_process(p["process"], logger_inst=self.logger) + p["process"] = None p["logpipe"].close() self.logpipe.close() @@ -405,28 +378,60 @@ class CameraCapture(threading.Thread): def capture_camera(name, config: CameraConfig, process_info): + stop_sigs = [signal.SIGTERM, signal.SIGINT] + pause_sigs = [signal.SIGUSR2] + resume_sigs = [signal.SIGUSR1] stop_event = mp.Event() + sig_queue: mp.Queue[signal.Signals] = mp.Queue() def receiveSignal(signalNumber, frame): - stop_event.set() + logger.info(f"{name} Received signal {signalNumber}") + sig_queue.put( signalNumber ) + if signalNumber in (stop_sigs + pause_sigs): + stop_event.set() signal.signal(signal.SIGTERM, receiveSignal) - signal.signal(signal.SIGINT, receiveSignal) + signal.signal(signal.SIGINT, receiveSignal) + signal.signal(signal.SIGUSR1, receiveSignal) + signal.signal(signal.SIGUSR2, receiveSignal) - threading.current_thread().name = f"capture:{name}" setproctitle(f"frigate.capture:{name}") - frame_queue = process_info["frame_queue"] - camera_watchdog = CameraWatchdog( - name, - config, - frame_queue, - process_info["camera_fps"], - process_info["ffmpeg_pid"], - stop_event, - ) - camera_watchdog.start() - camera_watchdog.join() + def run_capture(): + frame_queue = process_info["frame_queue"] + prev_sig = None + logger.info(f"{name}: capture starting") + while prev_sig not in stop_sigs: + camera_watchdog = CameraWatchdog( + name, + config, + frame_queue, + process_info["camera_fps"], + process_info["ffmpeg_pid"], + stop_event, + ) + camera_watchdog.start() + camera_watchdog.join() + stop_event.clear() + + if sig_queue.empty(): + logger.warning(f"{name}: capture stopped without signal") + else: + logger.info(f"{name}: capture stopped") + + # Go through the queued signals, aborting on any STOP or + # otherwise using the last state when the queue is emptied + while not sig_queue.empty() or prev_sig not in resume_sigs: + # Abort on a STOP signal + if( prev_sig := sig_queue.get() ) in stop_sigs: + break + + logger.info(f"{name}: capture resuming") + + # Run a background thread to prevent deadlock in signal handlers + capture_thread = threading.Thread(target=run_capture) + capture_thread.start() + capture_thread.join() def track_camera(