diff --git a/frigate/data_processing/post/api.py b/frigate/data_processing/post/api.py index c341bd8ef..2c1359d96 100644 --- a/frigate/data_processing/post/api.py +++ b/frigate/data_processing/post/api.py @@ -50,3 +50,16 @@ class PostProcessorApi(ABC): None if request was not handled, otherwise return response. """ pass + + def update_config(self, topic: str, payload: Any) -> None: + """Handle a config change notification. + + Called for every config update published under ``config/``. + Processors should override this to check the topic and act only + on changes relevant to them. Default is a no-op. + + Args: + topic: The config topic that changed. + payload: The updated configuration object. + """ + pass diff --git a/frigate/data_processing/post/license_plate.py b/frigate/data_processing/post/license_plate.py index 9f8d72975..6f5149b9f 100644 --- a/frigate/data_processing/post/license_plate.py +++ b/frigate/data_processing/post/license_plate.py @@ -12,7 +12,6 @@ from frigate.comms.embeddings_updater import EmbeddingsRequestEnum from frigate.comms.event_metadata_updater import EventMetadataPublisher from frigate.comms.inter_process import InterProcessRequestor from frigate.config import FrigateConfig -from frigate.config.classification import LicensePlateRecognitionConfig from frigate.data_processing.common.license_plate.mixin import ( WRITE_DEBUG_IMAGES, LicensePlateProcessingMixin, @@ -48,10 +47,15 @@ class LicensePlatePostProcessor(LicensePlateProcessingMixin, PostProcessorApi): self.sub_label_publisher = sub_label_publisher super().__init__(config, metrics, model_runner) - def update_config(self, lpr_config: LicensePlateRecognitionConfig) -> None: + CONFIG_UPDATE_TOPIC = "config/lpr" + + def update_config(self, topic: str, payload: Any) -> None: """Update LPR config at runtime.""" - self.lpr_config = lpr_config - logger.debug("LPR config updated dynamically") + if topic != self.CONFIG_UPDATE_TOPIC: + return + + self.lpr_config = payload + logger.debug("LPR post-processor config updated dynamically") def process_data( self, data: dict[str, Any], data_type: PostProcessDataEnum diff --git a/frigate/data_processing/real_time/api.py b/frigate/data_processing/real_time/api.py index 0fa0f9952..31127220f 100644 --- a/frigate/data_processing/real_time/api.py +++ b/frigate/data_processing/real_time/api.py @@ -61,3 +61,16 @@ class RealTimeProcessorApi(ABC): None. """ pass + + def update_config(self, topic: str, payload: Any) -> None: + """Handle a config change notification. + + Called for every config update published under ``config/``. + Processors should override this to check the topic and act only + on changes relevant to them. Default is a no-op. + + Args: + topic: The config topic that changed. + payload: The updated configuration object. + """ + pass diff --git a/frigate/data_processing/real_time/bird.py b/frigate/data_processing/real_time/bird.py index 520440005..38ff1a950 100644 --- a/frigate/data_processing/real_time/bird.py +++ b/frigate/data_processing/real_time/bird.py @@ -169,6 +169,16 @@ class BirdRealTimeProcessor(RealTimeProcessorApi): ) self.detected_birds[obj_data["id"]] = score + CONFIG_UPDATE_TOPIC = "config/classification" + + def update_config(self, topic: str, payload: Any) -> None: + """Update bird classification config at runtime.""" + if topic != self.CONFIG_UPDATE_TOPIC: + return + + self.config.classification = payload + logger.debug("Bird classification config updated dynamically") + def handle_request(self, topic, request_data): return None diff --git a/frigate/data_processing/real_time/face.py b/frigate/data_processing/real_time/face.py index 408a0456e..d886a86e5 100644 --- a/frigate/data_processing/real_time/face.py +++ b/frigate/data_processing/real_time/face.py @@ -19,7 +19,6 @@ from frigate.comms.event_metadata_updater import ( ) from frigate.comms.inter_process import InterProcessRequestor from frigate.config import FrigateConfig -from frigate.config.classification import FaceRecognitionConfig from frigate.const import FACE_DIR, MODEL_CACHE_DIR from frigate.data_processing.common.face.model import ( ArcFaceRecognizer, @@ -96,9 +95,21 @@ class FaceRealTimeProcessor(RealTimeProcessorApi): self.recognizer.build() - def update_config(self, face_config: FaceRecognitionConfig) -> None: + CONFIG_UPDATE_TOPIC = "config/face_recognition" + + def update_config(self, topic: str, payload: Any) -> None: """Update face recognition config at runtime.""" - self.face_config = face_config + if topic != self.CONFIG_UPDATE_TOPIC: + return + + previous_min_area = self.config.face_recognition.min_area + self.config.face_recognition = payload + self.face_config = payload + + for camera_config in self.config.cameras.values(): + if camera_config.face_recognition.min_area == previous_min_area: + camera_config.face_recognition.min_area = payload.min_area + logger.debug("Face recognition config updated dynamically") def __download_models(self, path: str) -> None: diff --git a/frigate/data_processing/real_time/license_plate.py b/frigate/data_processing/real_time/license_plate.py index 5c098369f..298989c82 100644 --- a/frigate/data_processing/real_time/license_plate.py +++ b/frigate/data_processing/real_time/license_plate.py @@ -8,7 +8,6 @@ import numpy as np from frigate.comms.event_metadata_updater import EventMetadataPublisher from frigate.comms.inter_process import InterProcessRequestor from frigate.config import FrigateConfig -from frigate.config.classification import LicensePlateRecognitionConfig from frigate.data_processing.common.license_plate.mixin import ( LicensePlateProcessingMixin, ) @@ -41,9 +40,21 @@ class LicensePlateRealTimeProcessor(LicensePlateProcessingMixin, RealTimeProcess self.camera_current_cars: dict[str, list[str]] = {} super().__init__(config, metrics) - def update_config(self, lpr_config: LicensePlateRecognitionConfig) -> None: + CONFIG_UPDATE_TOPIC = "config/lpr" + + def update_config(self, topic: str, payload: Any) -> None: """Update LPR config at runtime.""" - self.lpr_config = lpr_config + if topic != self.CONFIG_UPDATE_TOPIC: + return + + previous_min_area = self.config.lpr.min_area + self.config.lpr = payload + self.lpr_config = payload + + for camera_config in self.config.cameras.values(): + if camera_config.lpr.min_area == previous_min_area: + camera_config.lpr.min_area = payload.min_area + logger.debug("LPR config updated dynamically") def process_frame( diff --git a/frigate/embeddings/maintainer.py b/frigate/embeddings/maintainer.py index 533f79e50..2c61c5fe9 100644 --- a/frigate/embeddings/maintainer.py +++ b/frigate/embeddings/maintainer.py @@ -96,16 +96,7 @@ class EmbeddingMaintainer(threading.Thread): CameraConfigUpdateEnum.semantic_search, ], ) - self.classification_config_subscriber = ConfigSubscriber( - "config/classification/custom/" - ) - self.bird_classification_config_subscriber = ConfigSubscriber( - "config/classification", exact=True - ) - self.face_recognition_config_subscriber = ConfigSubscriber( - "config/face_recognition", exact=True - ) - self.lpr_config_subscriber = ConfigSubscriber("config/lpr", exact=True) + self.enrichment_config_subscriber = ConfigSubscriber("config/") # Configure Frigate DB db = SqliteVecQueueDatabase( @@ -280,10 +271,7 @@ class EmbeddingMaintainer(threading.Thread): """Maintain a SQLite-vec database for semantic search.""" while not self.stop_event.is_set(): self.config_updater.check_for_updates() - self._check_classification_config_updates() - self._check_bird_classification_config_updates() - self._check_face_recognition_config_updates() - self._check_lpr_config_updates() + self._check_enrichment_config_updates() self._process_requests() self._process_updates() self._process_recordings_updates() @@ -294,10 +282,7 @@ class EmbeddingMaintainer(threading.Thread): self._process_event_metadata() self.config_updater.stop() - self.classification_config_subscriber.stop() - self.bird_classification_config_subscriber.stop() - self.face_recognition_config_subscriber.stop() - self.lpr_config_subscriber.stop() + self.enrichment_config_subscriber.stop() self.event_subscriber.stop() self.event_end_subscriber.stop() self.recordings_subscriber.stop() @@ -308,124 +293,87 @@ class EmbeddingMaintainer(threading.Thread): self.requestor.stop() logger.info("Exiting embeddings maintenance...") - def _check_classification_config_updates(self) -> None: - """Check for classification config updates and add/remove processors.""" - topic, model_config = self.classification_config_subscriber.check_for_update() + def _check_enrichment_config_updates(self) -> None: + """Check for enrichment config updates and delegate to processors.""" + topic, payload = self.enrichment_config_subscriber.check_for_update() - if topic: - model_name = topic.split("/")[-1] + if topic is None: + return - if model_config is None: - self.realtime_processors = [ - processor - for processor in self.realtime_processors - if not ( - isinstance( - processor, - ( - CustomStateClassificationProcessor, - CustomObjectClassificationProcessor, - ), - ) - and processor.model_config.name == model_name - ) - ] + # Custom classification add/remove requires managing the processor list + if topic.startswith("config/classification/custom/"): + self._handle_custom_classification_update(topic, payload) + return - logger.info( - f"Successfully removed classification processor for model: {model_name}" - ) - else: - self.config.classification.custom[model_name] = model_config + # Broadcast to all processors — each decides if the topic is relevant + for processor in self.realtime_processors: + processor.update_config(topic, payload) - # Check if processor already exists - for processor in self.realtime_processors: - if isinstance( + for processor in self.post_processors: + processor.update_config(topic, payload) + + def _handle_custom_classification_update( + self, topic: str, model_config: Any + ) -> None: + """Handle add/remove of custom classification processors.""" + model_name = topic.split("/")[-1] + + if model_config is None: + self.realtime_processors = [ + processor + for processor in self.realtime_processors + if not ( + isinstance( processor, ( CustomStateClassificationProcessor, CustomObjectClassificationProcessor, ), - ): - if processor.model_config.name == model_name: - logger.debug( - f"Classification processor for model {model_name} already exists, skipping" - ) - return - - if model_config.state_config is not None: - processor = CustomStateClassificationProcessor( - self.config, model_config, self.requestor, self.metrics ) - else: - processor = CustomObjectClassificationProcessor( - self.config, - model_config, - self.event_metadata_publisher, - self.requestor, - self.metrics, - ) - - self.realtime_processors.append(processor) - logger.info( - f"Added classification processor for model: {model_name} (type: {type(processor).__name__})" + and processor.model_config.name == model_name ) + ] - def _check_bird_classification_config_updates(self) -> None: - """Check for bird classification config updates.""" - topic, classification_config = ( - self.bird_classification_config_subscriber.check_for_update() + logger.info( + f"Successfully removed classification processor for model: {model_name}" + ) + return + + self.config.classification.custom[model_name] = model_config + + # Check if processor already exists + for processor in self.realtime_processors: + if isinstance( + processor, + ( + CustomStateClassificationProcessor, + CustomObjectClassificationProcessor, + ), + ): + if processor.model_config.name == model_name: + logger.debug( + f"Classification processor for model {model_name} already exists, skipping" + ) + return + + if model_config.state_config is not None: + processor = CustomStateClassificationProcessor( + self.config, model_config, self.requestor, self.metrics + ) + else: + processor = CustomObjectClassificationProcessor( + self.config, + model_config, + self.event_metadata_publisher, + self.requestor, + self.metrics, + ) + + self.realtime_processors.append(processor) + logger.info( + f"Added classification processor for model: {model_name} (type: {type(processor).__name__})" ) - if topic is None: - return - - self.config.classification = classification_config - logger.debug("Applied dynamic bird classification config update") - - def _check_face_recognition_config_updates(self) -> None: - """Check for face recognition config updates.""" - topic, face_config = self.face_recognition_config_subscriber.check_for_update() - - if topic is None: - return - - previous_min_area = self.config.face_recognition.min_area - self.config.face_recognition = face_config - - for camera_config in self.config.cameras.values(): - if camera_config.face_recognition.min_area == previous_min_area: - camera_config.face_recognition.min_area = face_config.min_area - - for processor in self.realtime_processors: - if isinstance(processor, FaceRealTimeProcessor): - processor.update_config(face_config) - - logger.debug("Applied dynamic face recognition config update") - - def _check_lpr_config_updates(self) -> None: - """Check for LPR config updates.""" - topic, lpr_config = self.lpr_config_subscriber.check_for_update() - - if topic is None: - return - - previous_min_area = self.config.lpr.min_area - self.config.lpr = lpr_config - - for camera_config in self.config.cameras.values(): - if camera_config.lpr.min_area == previous_min_area: - camera_config.lpr.min_area = lpr_config.min_area - - for processor in self.realtime_processors: - if isinstance(processor, LicensePlateRealTimeProcessor): - processor.update_config(lpr_config) - - for processor in self.post_processors: - if isinstance(processor, LicensePlatePostProcessor): - processor.update_config(lpr_config) - - logger.debug("Applied dynamic LPR config update") - def _process_requests(self) -> None: """Process embeddings requests"""