From 7800565be42a4c557ce01e772b49fb4c212cf00a Mon Sep 17 00:00:00 2001 From: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com> Date: Sun, 8 Mar 2026 12:58:29 -0500 Subject: [PATCH] simplify enrichment config updates to use single subscriber with topic-based routing --- frigate/config/enrichment_updater.py | 49 ------------------- frigate/data_processing/post/api.py | 9 ++-- frigate/data_processing/post/license_plate.py | 9 ++-- frigate/data_processing/real_time/api.py | 9 ++-- frigate/data_processing/real_time/bird.py | 10 ++++ frigate/data_processing/real_time/face.py | 14 ++++-- .../real_time/license_plate.py | 14 ++++-- frigate/embeddings/maintainer.py | 40 +++------------ 8 files changed, 53 insertions(+), 101 deletions(-) delete mode 100644 frigate/config/enrichment_updater.py diff --git a/frigate/config/enrichment_updater.py b/frigate/config/enrichment_updater.py deleted file mode 100644 index f570fd49f..000000000 --- a/frigate/config/enrichment_updater.py +++ /dev/null @@ -1,49 +0,0 @@ -"""Convenience classes for updating enrichment configurations dynamically.""" - -from enum import Enum -from typing import Any - -from frigate.comms.config_updater import ConfigSubscriber - - -class EnrichmentConfigEnum(str, Enum): - """Supported enrichment config update topics.""" - - classification = "config/classification" - classification_custom = "config/classification/custom/" - face_recognition = "config/face_recognition" - lpr = "config/lpr" - - -class EnrichmentConfigSubscriber: - """Subscribes to all enrichment config updates under ``config/``.""" - - def __init__(self) -> None: - self.subscriber = ConfigSubscriber("config/") - - def check_for_update( - self, - ) -> tuple[EnrichmentConfigEnum, str, Any] | tuple[None, None, None]: - """Check for an enrichment config update. - - Returns: - A tuple of (update_type, topic, payload) or (None, None, None) - if no update is available. For custom classification topics the - full topic string is returned so the model name can be extracted. - """ - topic, payload = self.subscriber.check_for_update() - - if topic is None: - return (None, None, None) - - for member in EnrichmentConfigEnum: - if topic == member.value or ( - member == EnrichmentConfigEnum.classification_custom - and topic.startswith(member.value) - ): - return (member, topic, payload) - - return (None, None, None) - - def stop(self) -> None: - self.subscriber.stop() diff --git a/frigate/data_processing/post/api.py b/frigate/data_processing/post/api.py index 1631839fb..2c1359d96 100644 --- a/frigate/data_processing/post/api.py +++ b/frigate/data_processing/post/api.py @@ -5,7 +5,6 @@ from abc import ABC, abstractmethod from typing import Any from frigate.config import FrigateConfig -from frigate.config.enrichment_updater import EnrichmentConfigEnum from ..types import DataProcessorMetrics, DataProcessorModelRunner, PostProcessDataEnum @@ -52,15 +51,15 @@ class PostProcessorApi(ABC): """ pass - def update_config(self, update_type: EnrichmentConfigEnum, payload: Any) -> None: + def update_config(self, topic: str, payload: Any) -> None: """Handle a config change notification. Called for every config update published under ``config/``. - Processors should override this to check the update type and act - only on changes relevant to them. Default is a no-op. + Processors should override this to check the topic and act only + on changes relevant to them. Default is a no-op. Args: - update_type: The enrichment config type that changed. + topic: The config topic that changed. payload: The updated configuration object. """ pass diff --git a/frigate/data_processing/post/license_plate.py b/frigate/data_processing/post/license_plate.py index 024d15ee4..6f5149b9f 100644 --- a/frigate/data_processing/post/license_plate.py +++ b/frigate/data_processing/post/license_plate.py @@ -12,7 +12,6 @@ from frigate.comms.embeddings_updater import EmbeddingsRequestEnum from frigate.comms.event_metadata_updater import EventMetadataPublisher from frigate.comms.inter_process import InterProcessRequestor from frigate.config import FrigateConfig -from frigate.config.enrichment_updater import EnrichmentConfigEnum from frigate.data_processing.common.license_plate.mixin import ( WRITE_DEBUG_IMAGES, LicensePlateProcessingMixin, @@ -48,13 +47,15 @@ class LicensePlatePostProcessor(LicensePlateProcessingMixin, PostProcessorApi): self.sub_label_publisher = sub_label_publisher super().__init__(config, metrics, model_runner) - def update_config(self, update_type: EnrichmentConfigEnum, payload: Any) -> None: + CONFIG_UPDATE_TOPIC = "config/lpr" + + def update_config(self, topic: str, payload: Any) -> None: """Update LPR config at runtime.""" - if update_type != EnrichmentConfigEnum.lpr: + if topic != self.CONFIG_UPDATE_TOPIC: return self.lpr_config = payload - logger.debug("LPR config updated dynamically") + logger.debug("LPR post-processor config updated dynamically") def process_data( self, data: dict[str, Any], data_type: PostProcessDataEnum diff --git a/frigate/data_processing/real_time/api.py b/frigate/data_processing/real_time/api.py index dfb5ba82a..31127220f 100644 --- a/frigate/data_processing/real_time/api.py +++ b/frigate/data_processing/real_time/api.py @@ -7,7 +7,6 @@ from typing import Any import numpy as np from frigate.config import FrigateConfig -from frigate.config.enrichment_updater import EnrichmentConfigEnum from ..types import DataProcessorMetrics @@ -63,15 +62,15 @@ class RealTimeProcessorApi(ABC): """ pass - def update_config(self, update_type: EnrichmentConfigEnum, payload: Any) -> None: + def update_config(self, topic: str, payload: Any) -> None: """Handle a config change notification. Called for every config update published under ``config/``. - Processors should override this to check the update type and act - only on changes relevant to them. Default is a no-op. + Processors should override this to check the topic and act only + on changes relevant to them. Default is a no-op. Args: - update_type: The enrichment config type that changed. + topic: The config topic that changed. payload: The updated configuration object. """ pass diff --git a/frigate/data_processing/real_time/bird.py b/frigate/data_processing/real_time/bird.py index 520440005..38ff1a950 100644 --- a/frigate/data_processing/real_time/bird.py +++ b/frigate/data_processing/real_time/bird.py @@ -169,6 +169,16 @@ class BirdRealTimeProcessor(RealTimeProcessorApi): ) self.detected_birds[obj_data["id"]] = score + CONFIG_UPDATE_TOPIC = "config/classification" + + def update_config(self, topic: str, payload: Any) -> None: + """Update bird classification config at runtime.""" + if topic != self.CONFIG_UPDATE_TOPIC: + return + + self.config.classification = payload + logger.debug("Bird classification config updated dynamically") + def handle_request(self, topic, request_data): return None diff --git a/frigate/data_processing/real_time/face.py b/frigate/data_processing/real_time/face.py index a3c390dfb..d886a86e5 100644 --- a/frigate/data_processing/real_time/face.py +++ b/frigate/data_processing/real_time/face.py @@ -19,7 +19,6 @@ from frigate.comms.event_metadata_updater import ( ) from frigate.comms.inter_process import InterProcessRequestor from frigate.config import FrigateConfig -from frigate.config.enrichment_updater import EnrichmentConfigEnum from frigate.const import FACE_DIR, MODEL_CACHE_DIR from frigate.data_processing.common.face.model import ( ArcFaceRecognizer, @@ -96,12 +95,21 @@ class FaceRealTimeProcessor(RealTimeProcessorApi): self.recognizer.build() - def update_config(self, update_type: EnrichmentConfigEnum, payload: Any) -> None: + CONFIG_UPDATE_TOPIC = "config/face_recognition" + + def update_config(self, topic: str, payload: Any) -> None: """Update face recognition config at runtime.""" - if update_type != EnrichmentConfigEnum.face_recognition: + if topic != self.CONFIG_UPDATE_TOPIC: return + previous_min_area = self.config.face_recognition.min_area + self.config.face_recognition = payload self.face_config = payload + + for camera_config in self.config.cameras.values(): + if camera_config.face_recognition.min_area == previous_min_area: + camera_config.face_recognition.min_area = payload.min_area + logger.debug("Face recognition config updated dynamically") def __download_models(self, path: str) -> None: diff --git a/frigate/data_processing/real_time/license_plate.py b/frigate/data_processing/real_time/license_plate.py index b76e93cc8..298989c82 100644 --- a/frigate/data_processing/real_time/license_plate.py +++ b/frigate/data_processing/real_time/license_plate.py @@ -8,7 +8,6 @@ import numpy as np from frigate.comms.event_metadata_updater import EventMetadataPublisher from frigate.comms.inter_process import InterProcessRequestor from frigate.config import FrigateConfig -from frigate.config.enrichment_updater import EnrichmentConfigEnum from frigate.data_processing.common.license_plate.mixin import ( LicensePlateProcessingMixin, ) @@ -41,12 +40,21 @@ class LicensePlateRealTimeProcessor(LicensePlateProcessingMixin, RealTimeProcess self.camera_current_cars: dict[str, list[str]] = {} super().__init__(config, metrics) - def update_config(self, update_type: EnrichmentConfigEnum, payload: Any) -> None: + CONFIG_UPDATE_TOPIC = "config/lpr" + + def update_config(self, topic: str, payload: Any) -> None: """Update LPR config at runtime.""" - if update_type != EnrichmentConfigEnum.lpr: + if topic != self.CONFIG_UPDATE_TOPIC: return + previous_min_area = self.config.lpr.min_area + self.config.lpr = payload self.lpr_config = payload + + for camera_config in self.config.cameras.values(): + if camera_config.lpr.min_area == previous_min_area: + camera_config.lpr.min_area = payload.min_area + logger.debug("LPR config updated dynamically") def process_frame( diff --git a/frigate/embeddings/maintainer.py b/frigate/embeddings/maintainer.py index 232640b68..5fdbf26e0 100644 --- a/frigate/embeddings/maintainer.py +++ b/frigate/embeddings/maintainer.py @@ -9,6 +9,7 @@ from typing import Any from peewee import DoesNotExist +from frigate.comms.config_updater import ConfigSubscriber from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum from frigate.comms.embeddings_updater import ( EmbeddingsRequestEnum, @@ -32,10 +33,6 @@ from frigate.config.camera.updater import ( CameraConfigUpdateEnum, CameraConfigUpdateSubscriber, ) -from frigate.config.enrichment_updater import ( - EnrichmentConfigEnum, - EnrichmentConfigSubscriber, -) from frigate.data_processing.common.license_plate.model import ( LicensePlateModelRunner, ) @@ -99,7 +96,7 @@ class EmbeddingMaintainer(threading.Thread): CameraConfigUpdateEnum.semantic_search, ], ) - self.enrichment_config_subscriber = EnrichmentConfigSubscriber() + self.enrichment_config_subscriber = ConfigSubscriber("config/") # Configure Frigate DB db = SqliteVecQueueDatabase( @@ -297,43 +294,22 @@ class EmbeddingMaintainer(threading.Thread): def _check_enrichment_config_updates(self) -> None: """Check for enrichment config updates and delegate to processors.""" - update_type, topic, payload = ( - self.enrichment_config_subscriber.check_for_update() - ) + topic, payload = self.enrichment_config_subscriber.check_for_update() - if update_type is None: + if topic is None: return # Custom classification add/remove requires managing the processor list - if update_type == EnrichmentConfigEnum.classification_custom: + if topic.startswith("config/classification/custom/"): self._handle_custom_classification_update(topic, payload) return - # Apply global config updates before notifying processors - if update_type == EnrichmentConfigEnum.classification: - self.config.classification = payload - logger.debug("Applied dynamic bird classification config update") - elif update_type == EnrichmentConfigEnum.face_recognition: - previous_min_area = self.config.face_recognition.min_area - self.config.face_recognition = payload - - for camera_config in self.config.cameras.values(): - if camera_config.face_recognition.min_area == previous_min_area: - camera_config.face_recognition.min_area = payload.min_area - elif update_type == EnrichmentConfigEnum.lpr: - previous_min_area = self.config.lpr.min_area - self.config.lpr = payload - - for camera_config in self.config.cameras.values(): - if camera_config.lpr.min_area == previous_min_area: - camera_config.lpr.min_area = payload.min_area - - # Broadcast to all processors — each decides if the update is relevant + # Broadcast to all processors — each decides if the topic is relevant for processor in self.realtime_processors: - processor.update_config(update_type, payload) + processor.update_config(topic, payload) for processor in self.post_processors: - processor.update_config(update_type, payload) + processor.update_config(topic, payload) def _handle_custom_classification_update( self, topic: str, model_config: Any