From 306725a8d8e2d71bb0c76b126de299e329dcafab Mon Sep 17 00:00:00 2001 From: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com> Date: Sat, 7 Mar 2026 16:55:14 -0600 Subject: [PATCH] update maintainer --- frigate/embeddings/maintainer.py | 218 ++++++++++++++----------------- 1 file changed, 95 insertions(+), 123 deletions(-) diff --git a/frigate/embeddings/maintainer.py b/frigate/embeddings/maintainer.py index b85f231c0..232640b68 100644 --- a/frigate/embeddings/maintainer.py +++ b/frigate/embeddings/maintainer.py @@ -9,7 +9,6 @@ from typing import Any from peewee import DoesNotExist -from frigate.comms.config_updater import ConfigSubscriber from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum from frigate.comms.embeddings_updater import ( EmbeddingsRequestEnum, @@ -33,6 +32,10 @@ from frigate.config.camera.updater import ( CameraConfigUpdateEnum, CameraConfigUpdateSubscriber, ) +from frigate.config.enrichment_updater import ( + EnrichmentConfigEnum, + EnrichmentConfigSubscriber, +) from frigate.data_processing.common.license_plate.model import ( LicensePlateModelRunner, ) @@ -96,16 +99,7 @@ class EmbeddingMaintainer(threading.Thread): CameraConfigUpdateEnum.semantic_search, ], ) - self.classification_config_subscriber = ConfigSubscriber( - "config/classification/custom/" - ) - self.bird_classification_config_subscriber = ConfigSubscriber( - "config/classification", exact=True - ) - self.face_recognition_config_subscriber = ConfigSubscriber( - "config/face_recognition", exact=True - ) - self.lpr_config_subscriber = ConfigSubscriber("config/lpr", exact=True) + self.enrichment_config_subscriber = EnrichmentConfigSubscriber() # Configure Frigate DB db = SqliteVecQueueDatabase( @@ -279,10 +273,7 @@ class EmbeddingMaintainer(threading.Thread): """Maintain a SQLite-vec database for semantic search.""" while not self.stop_event.is_set(): self.config_updater.check_for_updates() - self._check_classification_config_updates() - self._check_bird_classification_config_updates() - self._check_face_recognition_config_updates() - self._check_lpr_config_updates() + self._check_enrichment_config_updates() self._process_requests() self._process_updates() self._process_recordings_updates() @@ -293,10 +284,7 @@ class EmbeddingMaintainer(threading.Thread): self._process_event_metadata() self.config_updater.stop() - self.classification_config_subscriber.stop() - self.bird_classification_config_subscriber.stop() - self.face_recognition_config_subscriber.stop() - self.lpr_config_subscriber.stop() + self.enrichment_config_subscriber.stop() self.event_subscriber.stop() self.event_end_subscriber.stop() self.recordings_subscriber.stop() @@ -307,124 +295,108 @@ class EmbeddingMaintainer(threading.Thread): self.requestor.stop() logger.info("Exiting embeddings maintenance...") - def _check_classification_config_updates(self) -> None: - """Check for classification config updates and add/remove processors.""" - topic, model_config = self.classification_config_subscriber.check_for_update() + def _check_enrichment_config_updates(self) -> None: + """Check for enrichment config updates and delegate to processors.""" + update_type, topic, payload = ( + self.enrichment_config_subscriber.check_for_update() + ) - if topic: - model_name = topic.split("/")[-1] + if update_type is None: + return - if model_config is None: - self.realtime_processors = [ - processor - for processor in self.realtime_processors - if not ( - isinstance( - processor, - ( - CustomStateClassificationProcessor, - CustomObjectClassificationProcessor, - ), - ) - and processor.model_config.name == model_name - ) - ] + # Custom classification add/remove requires managing the processor list + if update_type == EnrichmentConfigEnum.classification_custom: + self._handle_custom_classification_update(topic, payload) + return - logger.info( - f"Successfully removed classification processor for model: {model_name}" - ) - else: - self.config.classification.custom[model_name] = model_config + # Apply global config updates before notifying processors + if update_type == EnrichmentConfigEnum.classification: + self.config.classification = payload + logger.debug("Applied dynamic bird classification config update") + elif update_type == EnrichmentConfigEnum.face_recognition: + previous_min_area = self.config.face_recognition.min_area + self.config.face_recognition = payload - # Check if processor already exists - for processor in self.realtime_processors: - if isinstance( + for camera_config in self.config.cameras.values(): + if camera_config.face_recognition.min_area == previous_min_area: + camera_config.face_recognition.min_area = payload.min_area + elif update_type == EnrichmentConfigEnum.lpr: + previous_min_area = self.config.lpr.min_area + self.config.lpr = payload + + for camera_config in self.config.cameras.values(): + if camera_config.lpr.min_area == previous_min_area: + camera_config.lpr.min_area = payload.min_area + + # Broadcast to all processors — each decides if the update is relevant + for processor in self.realtime_processors: + processor.update_config(update_type, payload) + + for processor in self.post_processors: + processor.update_config(update_type, payload) + + def _handle_custom_classification_update( + self, topic: str, model_config: Any + ) -> None: + """Handle add/remove of custom classification processors.""" + model_name = topic.split("/")[-1] + + if model_config is None: + self.realtime_processors = [ + processor + for processor in self.realtime_processors + if not ( + isinstance( processor, ( CustomStateClassificationProcessor, CustomObjectClassificationProcessor, ), - ): - if processor.model_config.name == model_name: - logger.debug( - f"Classification processor for model {model_name} already exists, skipping" - ) - return - - if model_config.state_config is not None: - processor = CustomStateClassificationProcessor( - self.config, model_config, self.requestor, self.metrics ) - else: - processor = CustomObjectClassificationProcessor( - self.config, - model_config, - self.event_metadata_publisher, - self.requestor, - self.metrics, - ) - - self.realtime_processors.append(processor) - logger.info( - f"Added classification processor for model: {model_name} (type: {type(processor).__name__})" + and processor.model_config.name == model_name ) + ] - def _check_bird_classification_config_updates(self) -> None: - """Check for bird classification config updates.""" - topic, classification_config = ( - self.bird_classification_config_subscriber.check_for_update() + logger.info( + f"Successfully removed classification processor for model: {model_name}" + ) + return + + self.config.classification.custom[model_name] = model_config + + # Check if processor already exists + for processor in self.realtime_processors: + if isinstance( + processor, + ( + CustomStateClassificationProcessor, + CustomObjectClassificationProcessor, + ), + ): + if processor.model_config.name == model_name: + logger.debug( + f"Classification processor for model {model_name} already exists, skipping" + ) + return + + if model_config.state_config is not None: + processor = CustomStateClassificationProcessor( + self.config, model_config, self.requestor, self.metrics + ) + else: + processor = CustomObjectClassificationProcessor( + self.config, + model_config, + self.event_metadata_publisher, + self.requestor, + self.metrics, + ) + + self.realtime_processors.append(processor) + logger.info( + f"Added classification processor for model: {model_name} (type: {type(processor).__name__})" ) - if topic is None: - return - - self.config.classification = classification_config - logger.debug("Applied dynamic bird classification config update") - - def _check_face_recognition_config_updates(self) -> None: - """Check for face recognition config updates.""" - topic, face_config = self.face_recognition_config_subscriber.check_for_update() - - if topic is None: - return - - previous_min_area = self.config.face_recognition.min_area - self.config.face_recognition = face_config - - for camera_config in self.config.cameras.values(): - if camera_config.face_recognition.min_area == previous_min_area: - camera_config.face_recognition.min_area = face_config.min_area - - for processor in self.realtime_processors: - if isinstance(processor, FaceRealTimeProcessor): - processor.update_config(face_config) - - logger.debug("Applied dynamic face recognition config update") - - def _check_lpr_config_updates(self) -> None: - """Check for LPR config updates.""" - topic, lpr_config = self.lpr_config_subscriber.check_for_update() - - if topic is None: - return - - previous_min_area = self.config.lpr.min_area - self.config.lpr = lpr_config - - for camera_config in self.config.cameras.values(): - if camera_config.lpr.min_area == previous_min_area: - camera_config.lpr.min_area = lpr_config.min_area - - for processor in self.realtime_processors: - if isinstance(processor, LicensePlateRealTimeProcessor): - processor.update_config(lpr_config) - - for processor in self.post_processors: - if isinstance(processor, LicensePlatePostProcessor): - processor.update_config(lpr_config) - - logger.debug("Applied dynamic LPR config update") - def _process_requests(self) -> None: """Process embeddings requests"""