diff --git a/frigate/app.py b/frigate/app.py index 26956879e..1b78181ff 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -479,7 +479,7 @@ class FrigateApp: capture_process = util.Process( target=capture_camera, name=f"camera_capture:{name}", - args=(name, config, shm_frame_count, self.camera_metrics[name]), + args=(config, shm_frame_count, self.camera_metrics[name]), ) capture_process.daemon = True self.camera_metrics[name].capture_process = capture_process diff --git a/frigate/config/camera/updater.py b/frigate/config/camera/updater.py index a9d1a9a84..cb64a01f9 100644 --- a/frigate/config/camera/updater.py +++ b/frigate/config/camera/updater.py @@ -92,8 +92,8 @@ class CameraConfigUpdateSubscriber: elif update_type == CameraConfigUpdateEnum.zones: config.zones = updated_config - def check_for_update(self) -> list[str]: - updated_topics: list[str] = [] + def check_for_updates(self) -> dict[str, list[str]]: + updated_topics: dict[str, list[str]] = {} # get all updates available while True: @@ -106,7 +106,11 @@ class CameraConfigUpdateSubscriber: update_type = CameraConfigUpdateEnum[raw_type] if update_type in self.topics: - updated_topics.append(update_type.name) + if update_type.name in updated_topics: + updated_topics[update_type.name].append(camera) + else: + updated_topics[update_type.name] = [camera] + self.__update_config(camera, update_type, update_config) return updated_topics diff --git a/frigate/events/audio.py b/frigate/events/audio.py index f2a217fd3..8a929c8ff 100644 --- a/frigate/events/audio.py +++ b/frigate/events/audio.py @@ -12,7 +12,6 @@ import numpy as np import frigate.util as util from frigate.camera import CameraMetrics -from frigate.comms.config_updater import ConfigSubscriber from frigate.comms.detections_updater import DetectionPublisher, DetectionTypeEnum from frigate.comms.event_metadata_updater import ( EventMetadataPublisher, @@ -20,6 +19,10 @@ from frigate.comms.event_metadata_updater import ( ) from frigate.comms.inter_process import InterProcessRequestor from frigate.config import CameraConfig, CameraInput, FfmpegConfig +from frigate.config.camera.updater import ( + CameraConfigUpdateEnum, + CameraConfigUpdateSubscriber, +) from frigate.const import ( AUDIO_DURATION, AUDIO_FORMAT, @@ -138,9 +141,9 @@ class AudioEventMaintainer(threading.Thread): # create communication for audio detections self.requestor = InterProcessRequestor() - self.config_subscriber = ConfigSubscriber(f"config/audio/{camera.name}") - self.enabled_subscriber = ConfigSubscriber( - f"config/enabled/{camera.name}", True + self.config_subscriber = CameraConfigUpdateSubscriber( + {self.config.name: self.config}, + [CameraConfigUpdateEnum.audio, CameraConfigUpdateEnum.enabled], ) self.detection_publisher = DetectionPublisher(DetectionTypeEnum.audio) self.event_metadata_publisher = EventMetadataPublisher() @@ -308,21 +311,12 @@ class AudioEventMaintainer(threading.Thread): self.logger.error(f"Error reading audio data from ffmpeg process: {e}") log_and_restart() - def _update_enabled_state(self) -> bool: - """Fetch the latest config and update enabled state.""" - _, config_data = self.enabled_subscriber.check_for_update() - if config_data: - self.config.enabled = config_data.enabled - return config_data.enabled - - return self.config.enabled - def run(self) -> None: - if self._update_enabled_state(): + if self.config.enabled: self.start_or_restart_ffmpeg() while not self.stop_event.is_set(): - enabled = self._update_enabled_state() + enabled = self.config.enabled if enabled != self.was_enabled: if enabled: self.logger.debug( @@ -344,13 +338,7 @@ class AudioEventMaintainer(threading.Thread): continue # check if there is an updated config - ( - updated_topic, - updated_audio_config, - ) = self.config_subscriber.check_for_update() - - if updated_topic: - self.config.audio = updated_audio_config + self.config_subscriber.check_for_updates() self.read_audio() @@ -359,7 +347,6 @@ class AudioEventMaintainer(threading.Thread): self.logpipe.close() self.requestor.stop() self.config_subscriber.stop() - self.enabled_subscriber.stop() self.detection_publisher.stop() diff --git a/frigate/output/output.py b/frigate/output/output.py index befb663eb..c8a1bd061 100644 --- a/frigate/output/output.py +++ b/frigate/output/output.py @@ -17,10 +17,13 @@ from ws4py.server.wsgirefserver import ( ) from ws4py.server.wsgiutils import WebSocketWSGIApplication -from frigate.comms.config_updater import ConfigSubscriber from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum from frigate.comms.ws import WebSocket from frigate.config import FrigateConfig +from frigate.config.camera.updater import ( + CameraConfigUpdateEnum, + CameraConfigUpdateSubscriber, +) from frigate.const import CACHE_DIR, CLIPS_DIR from frigate.output.birdseye import Birdseye from frigate.output.camera import JsmpegCamera @@ -99,7 +102,9 @@ def output_frames( websocket_thread = threading.Thread(target=websocket_server.serve_forever) detection_subscriber = DetectionSubscriber(DetectionTypeEnum.video) - config_enabled_subscriber = ConfigSubscriber("config/enabled/") + config_subsriber = CameraConfigUpdateSubscriber( + config.cameras, [CameraConfigUpdateEnum.enabled] + ) jsmpeg_cameras: dict[str, JsmpegCamera] = {} birdseye: Birdseye | None = None @@ -125,18 +130,7 @@ def output_frames( while not stop_event.is_set(): # check if there is an updated config - while True: - ( - updated_enabled_topic, - updated_enabled_config, - ) = config_enabled_subscriber.check_for_update() - - if not updated_enabled_topic: - break - - if updated_enabled_config: - camera_name = updated_enabled_topic.rpartition("/")[-1] - config.cameras[camera_name].enabled = updated_enabled_config.enabled + config_subsriber.check_for_updates() (topic, data) = detection_subscriber.check_for_update(timeout=1) now = datetime.datetime.now().timestamp() @@ -240,7 +234,7 @@ def output_frames( if birdseye is not None: birdseye.stop() - config_enabled_subscriber.stop() + config_subsriber.stop() websocket_server.manager.close_all() websocket_server.manager.stop() websocket_server.manager.join() diff --git a/frigate/review/maintainer.py b/frigate/review/maintainer.py index b144b6e52..d3049c057 100644 --- a/frigate/review/maintainer.py +++ b/frigate/review/maintainer.py @@ -15,10 +15,13 @@ from typing import Any, Optional import cv2 import numpy as np -from frigate.comms.config_updater import ConfigSubscriber from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum from frigate.comms.inter_process import InterProcessRequestor from frigate.config import CameraConfig, FrigateConfig +from frigate.config.camera.updater import ( + CameraConfigUpdateEnum, + CameraConfigUpdateSubscriber, +) from frigate.const import ( CLEAR_ONGOING_REVIEW_SEGMENTS, CLIPS_DIR, @@ -150,9 +153,14 @@ class ReviewSegmentMaintainer(threading.Thread): # create communication for review segments self.requestor = InterProcessRequestor() - self.record_config_subscriber = ConfigSubscriber("config/record/") - self.review_config_subscriber = ConfigSubscriber("config/review/") - self.enabled_config_subscriber = ConfigSubscriber("config/enabled/") + self.config_subscriber = CameraConfigUpdateSubscriber( + config.cameras, + [ + CameraConfigUpdateEnum.enabled, + CameraConfigUpdateEnum.record, + CameraConfigUpdateEnum.review, + ], + ) self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.all) # manual events @@ -458,50 +466,15 @@ class ReviewSegmentMaintainer(threading.Thread): def run(self) -> None: while not self.stop_event.is_set(): # check if there is an updated config - while True: - ( - updated_record_topic, - updated_record_config, - ) = self.record_config_subscriber.check_for_update() + updated_topics = self.config_subscriber.check_for_updates() - ( - updated_review_topic, - updated_review_config, - ) = self.review_config_subscriber.check_for_update() + if "record" in updated_topics: + for camera in updated_topics["record"]: + self.end_segment(camera) - ( - updated_enabled_topic, - updated_enabled_config, - ) = self.enabled_config_subscriber.check_for_update() - - if ( - not updated_record_topic - and not updated_review_topic - and not updated_enabled_topic - ): - break - - if updated_record_topic: - camera_name = updated_record_topic.rpartition("/")[-1] - self.config.cameras[camera_name].record = updated_record_config - - # immediately end segment - if not updated_record_config.enabled: - self.end_segment(camera_name) - - if updated_review_topic: - camera_name = updated_review_topic.rpartition("/")[-1] - self.config.cameras[camera_name].review = updated_review_config - - if updated_enabled_config: - camera_name = updated_enabled_topic.rpartition("/")[-1] - self.config.cameras[ - camera_name - ].enabled = updated_enabled_config.enabled - - # immediately end segment as we may not get another update - if not updated_enabled_config.enabled: - self.end_segment(camera_name) + if "enabled" in updated_topics: + for camera in updated_topics["enabled"]: + self.end_segment(camera) (topic, data) = self.detection_subscriber.check_for_update(timeout=1) diff --git a/frigate/track/object_processing.py b/frigate/track/object_processing.py index 2eb55e883..67bc7170e 100644 --- a/frigate/track/object_processing.py +++ b/frigate/track/object_processing.py @@ -14,7 +14,6 @@ import numpy as np from peewee import DoesNotExist from frigate.camera.state import CameraState -from frigate.comms.config_updater import ConfigSubscriber from frigate.comms.detections_updater import DetectionPublisher, DetectionTypeEnum from frigate.comms.dispatcher import Dispatcher from frigate.comms.event_metadata_updater import ( @@ -29,6 +28,10 @@ from frigate.config import ( RecordConfig, SnapshotsConfig, ) +from frigate.config.camera.updater import ( + CameraConfigUpdateEnum, + CameraConfigUpdateSubscriber, +) from frigate.const import FAST_QUEUE_TIMEOUT, UPDATE_CAMERA_ACTIVITY from frigate.events.types import EventStateEnum, EventTypeEnum from frigate.models import Event, Timeline @@ -63,7 +66,9 @@ class TrackedObjectProcessor(threading.Thread): self.last_motion_detected: dict[str, float] = {} self.ptz_autotracker_thread = ptz_autotracker_thread - self.config_enabled_subscriber = ConfigSubscriber("config/enabled/") + self.config_subscriber = CameraConfigUpdateSubscriber( + self.config.cameras, [CameraConfigUpdateEnum.enabled] + ) self.requestor = InterProcessRequestor() self.detection_publisher = DetectionPublisher(DetectionTypeEnum.all) @@ -576,24 +581,14 @@ class TrackedObjectProcessor(threading.Thread): def run(self): while not self.stop_event.is_set(): # check for config updates - while True: - ( - updated_enabled_topic, - updated_enabled_config, - ) = self.config_enabled_subscriber.check_for_update() + updated_topics = self.config_subscriber.check_for_updates() - if not updated_enabled_topic: - break - - camera_name = updated_enabled_topic.rpartition("/")[-1] - self.config.cameras[ - camera_name - ].enabled = updated_enabled_config.enabled - - if self.camera_states[camera_name].prev_enabled is None: - self.camera_states[ - camera_name - ].prev_enabled = updated_enabled_config.enabled + if "enabled" in updated_topics: + for camera in updated_topics["enabled"]: + if self.camera_states[camera].prev_enabled is None: + self.camera_states[camera].prev_enabled = self.config.cameras[ + camera + ].enabled # manage camera disabled state for camera, config in self.config.cameras.items(): diff --git a/frigate/video.py b/frigate/video.py index 754c9b764..2efabfd93 100755 --- a/frigate/video.py +++ b/frigate/video.py @@ -15,10 +15,13 @@ import cv2 from setproctitle import setproctitle from frigate.camera import CameraMetrics, PTZMetrics -from frigate.comms.config_updater import ConfigSubscriber from frigate.comms.inter_process import InterProcessRequestor from frigate.config import CameraConfig, DetectConfig, ModelConfig from frigate.config.camera.camera import CameraTypeEnum +from frigate.config.camera.updater import ( + CameraConfigUpdateEnum, + CameraConfigUpdateSubscriber, +) from frigate.const import ( CACHE_DIR, CACHE_SEGMENT_FORMAT, @@ -112,15 +115,13 @@ def capture_frames( frame_rate.start() skipped_eps = EventsPerSecond() skipped_eps.start() - config_subscriber = ConfigSubscriber(f"config/enabled/{config.name}", True) + config_subscriber = CameraConfigUpdateSubscriber( + {config.name: config}, [CameraConfigUpdateEnum.enabled] + ) def get_enabled_state(): """Fetch the latest enabled state from ZMQ.""" - _, config_data = config_subscriber.check_for_update() - - if config_data: - config.enabled = config_data.enabled - + config_subscriber.check_for_updates() return config.enabled while not stop_event.is_set(): @@ -167,7 +168,6 @@ def capture_frames( class CameraWatchdog(threading.Thread): def __init__( self, - camera_name, config: CameraConfig, shm_frame_count: int, frame_queue: Queue, @@ -177,13 +177,12 @@ class CameraWatchdog(threading.Thread): stop_event, ): threading.Thread.__init__(self) - self.logger = logging.getLogger(f"watchdog.{camera_name}") - self.camera_name = camera_name + self.logger = logging.getLogger(f"watchdog.{config.name}") self.config = config self.shm_frame_count = shm_frame_count self.capture_thread = None self.ffmpeg_detect_process = None - self.logpipe = LogPipe(f"ffmpeg.{self.camera_name}.detect") + self.logpipe = LogPipe(f"ffmpeg.{self.config.name}.detect") self.ffmpeg_other_processes: list[dict[str, Any]] = [] self.camera_fps = camera_fps self.skipped_fps = skipped_fps @@ -196,16 +195,14 @@ class CameraWatchdog(threading.Thread): self.stop_event = stop_event self.sleeptime = self.config.ffmpeg.retry_interval - self.config_subscriber = ConfigSubscriber(f"config/enabled/{camera_name}", True) + self.config_subscriber = CameraConfigUpdateSubscriber( + {config.name: config}, [CameraConfigUpdateEnum.enabled] + ) self.was_enabled = self.config.enabled def _update_enabled_state(self) -> bool: """Fetch the latest config and update enabled state.""" - _, config_data = self.config_subscriber.check_for_update() - if config_data: - self.config.enabled = config_data.enabled - return config_data.enabled - + self.config_subscriber.check_for_updates() return self.config.enabled def run(self): @@ -217,10 +214,10 @@ class CameraWatchdog(threading.Thread): enabled = self._update_enabled_state() if enabled != self.was_enabled: if enabled: - self.logger.debug(f"Enabling camera {self.camera_name}") + self.logger.debug(f"Enabling camera {self.config.name}") self.start_all_ffmpeg() else: - self.logger.debug(f"Disabling camera {self.camera_name}") + self.logger.debug(f"Disabling camera {self.config.name}") self.stop_all_ffmpeg() self.was_enabled = enabled continue @@ -233,7 +230,7 @@ class CameraWatchdog(threading.Thread): if not self.capture_thread.is_alive(): self.camera_fps.value = 0 self.logger.error( - f"Ffmpeg process crashed unexpectedly for {self.camera_name}." + f"Ffmpeg process crashed unexpectedly for {self.config.name}." ) self.logger.error( "The following ffmpeg logs include the last 100 lines prior to exit." @@ -243,7 +240,7 @@ class CameraWatchdog(threading.Thread): elif now - self.capture_thread.current_frame.value > 20: self.camera_fps.value = 0 self.logger.info( - f"No frames received from {self.camera_name} in 20 seconds. Exiting ffmpeg..." + f"No frames received from {self.config.name} in 20 seconds. Exiting ffmpeg..." ) self.ffmpeg_detect_process.terminate() try: @@ -260,7 +257,7 @@ class CameraWatchdog(threading.Thread): self.fps_overflow_count = 0 self.camera_fps.value = 0 self.logger.info( - f"{self.camera_name} exceeded fps limit. Exiting ffmpeg..." + f"{self.config.name} exceeded fps limit. Exiting ffmpeg..." ) self.ffmpeg_detect_process.terminate() try: @@ -289,7 +286,7 @@ class CameraWatchdog(threading.Thread): latest_segment_time + datetime.timedelta(seconds=120) ): self.logger.error( - f"No new recording segments were created for {self.camera_name} in the last 120s. restarting the ffmpeg record process..." + f"No new recording segments were created for {self.config.name} in the last 120s. restarting the ffmpeg record process..." ) p["process"] = start_or_restart_ffmpeg( p["cmd"], @@ -336,13 +333,13 @@ class CameraWatchdog(threading.Thread): def start_all_ffmpeg(self): """Start all ffmpeg processes (detection and others).""" - logger.debug(f"Starting all ffmpeg processes for {self.camera_name}") + logger.debug(f"Starting all ffmpeg processes for {self.config.name}") self.start_ffmpeg_detect() for c in self.config.ffmpeg_cmds: if "detect" in c["roles"]: continue logpipe = LogPipe( - f"ffmpeg.{self.camera_name}.{'_'.join(sorted(c['roles']))}" + f"ffmpeg.{self.config.name}.{'_'.join(sorted(c['roles']))}" ) self.ffmpeg_other_processes.append( { @@ -355,12 +352,12 @@ class CameraWatchdog(threading.Thread): def stop_all_ffmpeg(self): """Stop all ffmpeg processes (detection and others).""" - logger.debug(f"Stopping all ffmpeg processes for {self.camera_name}") + logger.debug(f"Stopping all ffmpeg processes for {self.config.name}") if self.capture_thread is not None and self.capture_thread.is_alive(): self.capture_thread.join(timeout=5) if self.capture_thread.is_alive(): self.logger.warning( - f"Capture thread for {self.camera_name} did not stop gracefully." + f"Capture thread for {self.config.name} did not stop gracefully." ) if self.ffmpeg_detect_process is not None: stop_ffmpeg(self.ffmpeg_detect_process, self.logger) @@ -387,7 +384,7 @@ class CameraWatchdog(threading.Thread): newest_segment_time = latest_segment for file in cache_files: - if self.camera_name in file: + if self.config.name in file: basename = os.path.splitext(file)[0] _, date = basename.rsplit("@", maxsplit=1) segment_time = datetime.datetime.strptime( @@ -444,7 +441,7 @@ class CameraCapture(threading.Thread): def capture_camera( - name, config: CameraConfig, shm_frame_count: int, camera_metrics: CameraMetrics + config: CameraConfig, shm_frame_count: int, camera_metrics: CameraMetrics ): stop_event = mp.Event() @@ -454,11 +451,10 @@ def capture_camera( signal.signal(signal.SIGTERM, receiveSignal) signal.signal(signal.SIGINT, receiveSignal) - threading.current_thread().name = f"capture:{name}" - setproctitle(f"frigate.capture:{name}") + threading.current_thread().name = f"capture:{config.name}" + setproctitle(f"frigate.capture:{config.name}") camera_watchdog = CameraWatchdog( - name, config, shm_frame_count, camera_metrics.frame_queue, @@ -526,7 +522,6 @@ def track_camera( frame_shape, model_config, config, - config.detect, frame_manager, motion_detector, object_detector, @@ -593,7 +588,6 @@ def process_frames( frame_shape: tuple[int, int], model_config: ModelConfig, camera_config: CameraConfig, - detect_config: DetectConfig, frame_manager: FrameManager, motion_detector: MotionDetector, object_detector: RemoteObjectDetector, @@ -608,8 +602,14 @@ def process_frames( exit_on_empty: bool = False, ): next_region_update = get_tomorrow_at_time(2) - detect_config_subscriber = ConfigSubscriber(f"config/detect/{camera_name}", True) - enabled_config_subscriber = ConfigSubscriber(f"config/enabled/{camera_name}", True) + config_subscriber = CameraConfigUpdateSubscriber( + {camera_name: camera_config}, + [ + CameraConfigUpdateEnum.detect, + CameraConfigUpdateEnum.enabled, + CameraConfigUpdateEnum.motion, + ], + ) fps_tracker = EventsPerSecond() fps_tracker.start() @@ -644,11 +644,11 @@ def process_frames( ] while not stop_event.is_set(): - _, updated_enabled_config = enabled_config_subscriber.check_for_update() + updated_configs = config_subscriber.check_for_updates() - if updated_enabled_config: + if "enabled" in updated_configs: prev_enabled = camera_enabled - camera_enabled = updated_enabled_config.enabled + camera_enabled = camera_config.enabled if ( not camera_enabled @@ -676,12 +676,6 @@ def process_frames( time.sleep(0.1) continue - # check for updated detect config - _, updated_detect_config = detect_config_subscriber.check_for_update() - - if updated_detect_config: - detect_config = updated_detect_config - if ( datetime.datetime.now().astimezone(datetime.timezone.utc) > next_region_update @@ -716,14 +710,14 @@ def process_frames( consolidated_detections = [] # if detection is disabled - if not detect_config.enabled: + if not camera_config.detect.enabled: object_tracker.match_and_update(frame_name, frame_time, []) else: # get stationary object ids # check every Nth frame for stationary objects # disappeared objects are not stationary # also check for overlapping motion boxes - if stationary_frame_counter == detect_config.stationary.interval: + if stationary_frame_counter == camera_config.detect.stationary.interval: stationary_frame_counter = 0 stationary_object_ids = [] else: @@ -732,7 +726,8 @@ def process_frames( obj["id"] for obj in object_tracker.tracked_objects.values() # if it has exceeded the stationary threshold - if obj["motionless_count"] >= detect_config.stationary.threshold + if obj["motionless_count"] + >= camera_config.detect.stationary.threshold # and it hasn't disappeared and object_tracker.disappeared[obj["id"]] == 0 # and it doesn't overlap with any current motion boxes when not calibrating @@ -747,7 +742,8 @@ def process_frames( ( # use existing object box for stationary objects obj["estimate"] - if obj["motionless_count"] < detect_config.stationary.threshold + if obj["motionless_count"] + < camera_config.detect.stationary.threshold else obj["box"] ) for obj in object_tracker.tracked_objects.values() @@ -821,7 +817,7 @@ def process_frames( for region in regions: detections.extend( detect( - detect_config, + camera_config.detect, object_detector, frame, model_config, @@ -968,5 +964,4 @@ def process_frames( motion_detector.stop() requestor.stop() - detect_config_subscriber.stop() - enabled_config_subscriber.stop() + config_subscriber.stop()