Add enabling/disabling camera capture via MQTT

Adds new MQTT topic,

  frigate/<camera>/capture/state

to dynamically enable or disable capturing from a camera.

The ffmpeg process is terminated when the capture state is
OFF and is resumed when the capture state is ON.
This commit is contained in:
Chris OBryan 2023-06-08 12:53:34 -05:00
parent d3949eebfa
commit ccf5250679
8 changed files with 106 additions and 51 deletions

View File

@ -112,6 +112,14 @@ Topic to turn recordings for a camera on and off. Expected values are `ON` and `
Topic with current state of recordings for a camera. Published values are `ON` and `OFF`. Topic with current state of recordings for a camera. Published values are `ON` and `OFF`.
### `frigate/<camera_name>/capture/set`
Topic to turn capture for a camera on and off. Expected values are `ON` and `OFF`.
### `frigate/<camera_name>/capture/state`
Topic with current state of capture for a camera. Published values are `ON` and `OFF`.
### `frigate/<camera_name>/snapshots/set` ### `frigate/<camera_name>/snapshots/set`
Topic to turn snapshots for a camera on and off. Expected values are `ON` and `OFF`. Topic to turn snapshots for a camera on and off. Expected values are `ON` and `OFF`.

View File

@ -109,6 +109,9 @@ class FrigateApp:
"camera_fps": mp.Value("d", 0.0), "camera_fps": mp.Value("d", 0.0),
"skipped_fps": mp.Value("d", 0.0), "skipped_fps": mp.Value("d", 0.0),
"process_fps": mp.Value("d", 0.0), "process_fps": mp.Value("d", 0.0),
"capture_enabled": mp.Value(
"i", self.config.cameras[camera_name].capture_enabled
),
"detection_enabled": mp.Value( "detection_enabled": mp.Value(
"i", self.config.cameras[camera_name].detect.enabled "i", self.config.cameras[camera_name].detect.enabled
), ),

View File

@ -1,6 +1,8 @@
"""Handle communication between Frigate and other applications.""" """Handle communication between Frigate and other applications."""
import logging import logging
import os
import signal
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Any, Callable from typing import Any, Callable
@ -52,6 +54,7 @@ class Dispatcher:
comm.subscribe(self._receive) comm.subscribe(self._receive)
self._camera_settings_handlers: dict[str, Callable] = { self._camera_settings_handlers: dict[str, Callable] = {
"capture": self._on_capture_command,
"detect": self._on_detect_command, "detect": self._on_detect_command,
"improve_contrast": self._on_motion_improve_contrast_command, "improve_contrast": self._on_motion_improve_contrast_command,
"motion": self._on_motion_command, "motion": self._on_motion_command,
@ -92,6 +95,19 @@ class Dispatcher:
for comm in self.comms: for comm in self.comms:
comm.stop() comm.stop()
def _on_capture_command(self, camera_name: str, payload: str) -> None:
"""Callback for detect topic."""
logger.info(f"Received capture command [{payload}] for [{camera_name}]")
capture_proc = self.camera_metrics[camera_name]['capture_process']
# Send SIGUSR1 to resume, SIGUSR2 to pause
new_state = True if payload == "ON" else False if payload == "OFF" else None
if new_state is not None:
sig = signal.SIGUSR1 if new_state else signal.SIGUSR2
self.config.cameras[camera_name].capture_enabled = new_state
os.kill( capture_proc.pid, sig )
self.publish(f"{camera_name}/capture/state", payload, retain=True)
def _on_detect_command(self, camera_name: str, payload: str) -> None: def _on_detect_command(self, camera_name: str, payload: str) -> None:
"""Callback for detect topic.""" """Callback for detect topic."""
detect_settings = self.config.cameras[camera_name].detect detect_settings = self.config.cameras[camera_name].detect

View File

@ -59,6 +59,11 @@ class MqttClient(Communicator): # type: ignore[misc]
"ON", "ON",
retain=True, retain=True,
) )
self.publish(
f"{camera_name}/capture/state",
"ON",
retain=True,
)
self.publish( self.publish(
f"{camera_name}/improve_contrast/state", f"{camera_name}/improve_contrast/state",
"ON" if camera.motion.improve_contrast else "OFF", # type: ignore[union-attr] "ON" if camera.motion.improve_contrast else "OFF", # type: ignore[union-attr]
@ -141,6 +146,7 @@ class MqttClient(Communicator): # type: ignore[misc]
# register callbacks # register callbacks
callback_types = [ callback_types = [
"capture",
"recordings", "recordings",
"snapshots", "snapshots",
"detect", "detect",

View File

@ -580,6 +580,7 @@ class CameraUiConfig(FrigateBaseModel):
class CameraConfig(FrigateBaseModel): class CameraConfig(FrigateBaseModel):
name: Optional[str] = Field(title="Camera name.", regex=REGEX_CAMERA_NAME) name: Optional[str] = Field(title="Camera name.", regex=REGEX_CAMERA_NAME)
enabled: bool = Field(default=True, title="Enable camera.") enabled: bool = Field(default=True, title="Enable camera.")
capture_enabled: bool = Field(default=True, title="Capture enabled.")
ffmpeg: CameraFfmpegConfig = Field(title="FFmpeg configuration for the camera.") ffmpeg: CameraFfmpegConfig = Field(title="FFmpeg configuration for the camera.")
best_image_timeout: int = Field( best_image_timeout: int = Field(
default=60, default=60,

View File

@ -8,6 +8,7 @@ from frigate.object_detection import ObjectDetectProcess
class CameraMetricsTypes(TypedDict): class CameraMetricsTypes(TypedDict):
camera_fps: Synchronized camera_fps: Synchronized
capture_enabled: Synchronized
capture_process: Optional[Process] capture_process: Optional[Process]
detection_enabled: Synchronized detection_enabled: Synchronized
detection_fps: Synchronized detection_fps: Synchronized

View File

@ -636,6 +636,21 @@ def restart_frigate():
else: else:
os.kill(os.getpid(), signal.SIGTERM) os.kill(os.getpid(), signal.SIGTERM)
def terminate_process(proc: sp.Popen, logger_inst: logging.Logger = None, timeout: float = 30.0):
logger_inst = logger_inst or logger
if proc is not None and proc.poll() == None:
proc_name: str = proc.args[0]
logger_inst.info(f"Terminating the existing {proc_name} process...")
proc.terminate()
try:
logger_inst.info(f"Waiting for {proc_name} to exit gracefully...")
proc.communicate(timeout=timeout)
except sp.TimeoutExpired:
logger_inst.info(f"{proc_name}didn't exit. Force killing...")
proc.kill()
proc.communicate()
class EventsPerSecond: class EventsPerSecond:
def __init__(self, max_events=1000): def __init__(self, max_events=1000):

View File

@ -31,6 +31,7 @@ from frigate.util import (
intersection, intersection,
intersection_over_union, intersection_over_union,
listen, listen,
terminate_process,
yuv_region_2_bgr, yuv_region_2_bgr,
yuv_region_2_rgb, yuv_region_2_rgb,
yuv_region_2_yuv, yuv_region_2_yuv,
@ -112,25 +113,10 @@ def create_tensor_input(frame, model_config, region):
# Expand dimensions since the model expects images to have shape: [1, height, width, 3] # Expand dimensions since the model expects images to have shape: [1, height, width, 3]
return np.expand_dims(cropped_frame, axis=0) return np.expand_dims(cropped_frame, axis=0)
def stop_ffmpeg(ffmpeg_process, logger):
logger.info("Terminating the existing ffmpeg process...")
ffmpeg_process.terminate()
try:
logger.info("Waiting for ffmpeg to exit gracefully...")
ffmpeg_process.communicate(timeout=30)
except sp.TimeoutExpired:
logger.info("FFmpeg didnt exit. Force killing...")
ffmpeg_process.kill()
ffmpeg_process.communicate()
ffmpeg_process = None
def start_or_restart_ffmpeg( def start_or_restart_ffmpeg(
ffmpeg_cmd, logger, logpipe: LogPipe, frame_size=None, ffmpeg_process=None ffmpeg_cmd, logger, logpipe: LogPipe, frame_size=None, ffmpeg_process=None
): ):
if ffmpeg_process is not None: terminate_process( ffmpeg_process, logger_inst=logger )
stop_ffmpeg(ffmpeg_process, logger)
if frame_size is None: if frame_size is None:
process = sp.Popen( process = sp.Popen(
@ -268,27 +254,12 @@ class CameraWatchdog(threading.Thread):
self.logger.info( self.logger.info(
f"No frames received from {self.camera_name} in 20 seconds. Exiting ffmpeg..." f"No frames received from {self.camera_name} in 20 seconds. Exiting ffmpeg..."
) )
self.ffmpeg_detect_process.terminate() terminate_process(self.ffmpeg_detect_process, logger_inst=self.logger)
try: self.ffmpeg_detect_process = None
self.logger.info("Waiting for ffmpeg to exit gracefully...")
self.ffmpeg_detect_process.communicate(timeout=30)
except sp.TimeoutExpired:
self.logger.info("FFmpeg did not exit. Force killing...")
self.ffmpeg_detect_process.kill()
self.ffmpeg_detect_process.communicate()
elif self.camera_fps.value >= (self.config.detect.fps + 10): elif self.camera_fps.value >= (self.config.detect.fps + 10):
self.camera_fps.value = 0 self.camera_fps.value = 0
self.logger.info( terminate_process(self.ffmpeg_detect_process, logger_inst=self.logger)
f"{self.camera_name} exceeded fps limit. Exiting ffmpeg..." self.ffmpeg_detect_process = None
)
self.ffmpeg_detect_process.terminate()
try:
self.logger.info("Waiting for ffmpeg to exit gracefully...")
self.ffmpeg_detect_process.communicate(timeout=30)
except sp.TimeoutExpired:
self.logger.info("FFmpeg did not exit. Force killing...")
self.ffmpeg_detect_process.kill()
self.ffmpeg_detect_process.communicate()
for p in self.ffmpeg_other_processes: for p in self.ffmpeg_other_processes:
poll = p["process"].poll() poll = p["process"].poll()
@ -324,9 +295,11 @@ class CameraWatchdog(threading.Thread):
p["cmd"], self.logger, p["logpipe"], ffmpeg_process=p["process"] p["cmd"], self.logger, p["logpipe"], ffmpeg_process=p["process"]
) )
stop_ffmpeg(self.ffmpeg_detect_process, self.logger) terminate_process(self.ffmpeg_detect_process, logger_inst=self.logger)
self.ffmpeg_detect_process = None
for p in self.ffmpeg_other_processes: for p in self.ffmpeg_other_processes:
stop_ffmpeg(p["process"], self.logger) terminate_process(p["process"], logger_inst=self.logger)
p["process"] = None
p["logpipe"].close() p["logpipe"].close()
self.logpipe.close() self.logpipe.close()
@ -405,28 +378,60 @@ class CameraCapture(threading.Thread):
def capture_camera(name, config: CameraConfig, process_info): def capture_camera(name, config: CameraConfig, process_info):
stop_sigs = [signal.SIGTERM, signal.SIGINT]
pause_sigs = [signal.SIGUSR2]
resume_sigs = [signal.SIGUSR1]
stop_event = mp.Event() stop_event = mp.Event()
sig_queue: mp.Queue[signal.Signals] = mp.Queue()
def receiveSignal(signalNumber, frame): def receiveSignal(signalNumber, frame):
stop_event.set() logger.info(f"{name} Received signal {signalNumber}")
sig_queue.put( signalNumber )
if signalNumber in (stop_sigs + pause_sigs):
stop_event.set()
signal.signal(signal.SIGTERM, receiveSignal) signal.signal(signal.SIGTERM, receiveSignal)
signal.signal(signal.SIGINT, receiveSignal) signal.signal(signal.SIGINT, receiveSignal)
signal.signal(signal.SIGUSR1, receiveSignal)
signal.signal(signal.SIGUSR2, receiveSignal)
threading.current_thread().name = f"capture:{name}"
setproctitle(f"frigate.capture:{name}") setproctitle(f"frigate.capture:{name}")
frame_queue = process_info["frame_queue"] def run_capture():
camera_watchdog = CameraWatchdog( frame_queue = process_info["frame_queue"]
name, prev_sig = None
config, logger.info(f"{name}: capture starting")
frame_queue, while prev_sig not in stop_sigs:
process_info["camera_fps"], camera_watchdog = CameraWatchdog(
process_info["ffmpeg_pid"], name,
stop_event, config,
) frame_queue,
camera_watchdog.start() process_info["camera_fps"],
camera_watchdog.join() process_info["ffmpeg_pid"],
stop_event,
)
camera_watchdog.start()
camera_watchdog.join()
stop_event.clear()
if sig_queue.empty():
logger.warning(f"{name}: capture stopped without signal")
else:
logger.info(f"{name}: capture stopped")
# Go through the queued signals, aborting on any STOP or
# otherwise using the last state when the queue is emptied
while not sig_queue.empty() or prev_sig not in resume_sigs:
# Abort on a STOP signal
if( prev_sig := sig_queue.get() ) in stop_sigs:
break
logger.info(f"{name}: capture resuming")
# Run a background thread to prevent deadlock in signal handlers
capture_thread = threading.Thread(target=run_capture)
capture_thread.start()
capture_thread.join()
def track_camera( def track_camera(