update maintainer

This commit is contained in:
Josh Hawkins 2026-03-07 16:55:14 -06:00
parent 826735bee2
commit 306725a8d8

View File

@ -9,7 +9,6 @@ from typing import Any
from peewee import DoesNotExist from peewee import DoesNotExist
from frigate.comms.config_updater import ConfigSubscriber
from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum
from frigate.comms.embeddings_updater import ( from frigate.comms.embeddings_updater import (
EmbeddingsRequestEnum, EmbeddingsRequestEnum,
@ -33,6 +32,10 @@ from frigate.config.camera.updater import (
CameraConfigUpdateEnum, CameraConfigUpdateEnum,
CameraConfigUpdateSubscriber, CameraConfigUpdateSubscriber,
) )
from frigate.config.enrichment_updater import (
EnrichmentConfigEnum,
EnrichmentConfigSubscriber,
)
from frigate.data_processing.common.license_plate.model import ( from frigate.data_processing.common.license_plate.model import (
LicensePlateModelRunner, LicensePlateModelRunner,
) )
@ -96,16 +99,7 @@ class EmbeddingMaintainer(threading.Thread):
CameraConfigUpdateEnum.semantic_search, CameraConfigUpdateEnum.semantic_search,
], ],
) )
self.classification_config_subscriber = ConfigSubscriber( self.enrichment_config_subscriber = EnrichmentConfigSubscriber()
"config/classification/custom/"
)
self.bird_classification_config_subscriber = ConfigSubscriber(
"config/classification", exact=True
)
self.face_recognition_config_subscriber = ConfigSubscriber(
"config/face_recognition", exact=True
)
self.lpr_config_subscriber = ConfigSubscriber("config/lpr", exact=True)
# Configure Frigate DB # Configure Frigate DB
db = SqliteVecQueueDatabase( db = SqliteVecQueueDatabase(
@ -279,10 +273,7 @@ class EmbeddingMaintainer(threading.Thread):
"""Maintain a SQLite-vec database for semantic search.""" """Maintain a SQLite-vec database for semantic search."""
while not self.stop_event.is_set(): while not self.stop_event.is_set():
self.config_updater.check_for_updates() self.config_updater.check_for_updates()
self._check_classification_config_updates() self._check_enrichment_config_updates()
self._check_bird_classification_config_updates()
self._check_face_recognition_config_updates()
self._check_lpr_config_updates()
self._process_requests() self._process_requests()
self._process_updates() self._process_updates()
self._process_recordings_updates() self._process_recordings_updates()
@ -293,10 +284,7 @@ class EmbeddingMaintainer(threading.Thread):
self._process_event_metadata() self._process_event_metadata()
self.config_updater.stop() self.config_updater.stop()
self.classification_config_subscriber.stop() self.enrichment_config_subscriber.stop()
self.bird_classification_config_subscriber.stop()
self.face_recognition_config_subscriber.stop()
self.lpr_config_subscriber.stop()
self.event_subscriber.stop() self.event_subscriber.stop()
self.event_end_subscriber.stop() self.event_end_subscriber.stop()
self.recordings_subscriber.stop() self.recordings_subscriber.stop()
@ -307,124 +295,108 @@ class EmbeddingMaintainer(threading.Thread):
self.requestor.stop() self.requestor.stop()
logger.info("Exiting embeddings maintenance...") logger.info("Exiting embeddings maintenance...")
def _check_classification_config_updates(self) -> None: def _check_enrichment_config_updates(self) -> None:
"""Check for classification config updates and add/remove processors.""" """Check for enrichment config updates and delegate to processors."""
topic, model_config = self.classification_config_subscriber.check_for_update() update_type, topic, payload = (
self.enrichment_config_subscriber.check_for_update()
)
if topic: if update_type is None:
model_name = topic.split("/")[-1] return
if model_config is None: # Custom classification add/remove requires managing the processor list
self.realtime_processors = [ if update_type == EnrichmentConfigEnum.classification_custom:
processor self._handle_custom_classification_update(topic, payload)
for processor in self.realtime_processors return
if not (
isinstance(
processor,
(
CustomStateClassificationProcessor,
CustomObjectClassificationProcessor,
),
)
and processor.model_config.name == model_name
)
]
logger.info( # Apply global config updates before notifying processors
f"Successfully removed classification processor for model: {model_name}" if update_type == EnrichmentConfigEnum.classification:
) self.config.classification = payload
else: logger.debug("Applied dynamic bird classification config update")
self.config.classification.custom[model_name] = model_config elif update_type == EnrichmentConfigEnum.face_recognition:
previous_min_area = self.config.face_recognition.min_area
self.config.face_recognition = payload
# Check if processor already exists for camera_config in self.config.cameras.values():
for processor in self.realtime_processors: if camera_config.face_recognition.min_area == previous_min_area:
if isinstance( camera_config.face_recognition.min_area = payload.min_area
elif update_type == EnrichmentConfigEnum.lpr:
previous_min_area = self.config.lpr.min_area
self.config.lpr = payload
for camera_config in self.config.cameras.values():
if camera_config.lpr.min_area == previous_min_area:
camera_config.lpr.min_area = payload.min_area
# Broadcast to all processors — each decides if the update is relevant
for processor in self.realtime_processors:
processor.update_config(update_type, payload)
for processor in self.post_processors:
processor.update_config(update_type, payload)
def _handle_custom_classification_update(
self, topic: str, model_config: Any
) -> None:
"""Handle add/remove of custom classification processors."""
model_name = topic.split("/")[-1]
if model_config is None:
self.realtime_processors = [
processor
for processor in self.realtime_processors
if not (
isinstance(
processor, processor,
( (
CustomStateClassificationProcessor, CustomStateClassificationProcessor,
CustomObjectClassificationProcessor, CustomObjectClassificationProcessor,
), ),
):
if processor.model_config.name == model_name:
logger.debug(
f"Classification processor for model {model_name} already exists, skipping"
)
return
if model_config.state_config is not None:
processor = CustomStateClassificationProcessor(
self.config, model_config, self.requestor, self.metrics
) )
else: and processor.model_config.name == model_name
processor = CustomObjectClassificationProcessor(
self.config,
model_config,
self.event_metadata_publisher,
self.requestor,
self.metrics,
)
self.realtime_processors.append(processor)
logger.info(
f"Added classification processor for model: {model_name} (type: {type(processor).__name__})"
) )
]
def _check_bird_classification_config_updates(self) -> None: logger.info(
"""Check for bird classification config updates.""" f"Successfully removed classification processor for model: {model_name}"
topic, classification_config = ( )
self.bird_classification_config_subscriber.check_for_update() return
self.config.classification.custom[model_name] = model_config
# Check if processor already exists
for processor in self.realtime_processors:
if isinstance(
processor,
(
CustomStateClassificationProcessor,
CustomObjectClassificationProcessor,
),
):
if processor.model_config.name == model_name:
logger.debug(
f"Classification processor for model {model_name} already exists, skipping"
)
return
if model_config.state_config is not None:
processor = CustomStateClassificationProcessor(
self.config, model_config, self.requestor, self.metrics
)
else:
processor = CustomObjectClassificationProcessor(
self.config,
model_config,
self.event_metadata_publisher,
self.requestor,
self.metrics,
)
self.realtime_processors.append(processor)
logger.info(
f"Added classification processor for model: {model_name} (type: {type(processor).__name__})"
) )
if topic is None:
return
self.config.classification = classification_config
logger.debug("Applied dynamic bird classification config update")
def _check_face_recognition_config_updates(self) -> None:
"""Check for face recognition config updates."""
topic, face_config = self.face_recognition_config_subscriber.check_for_update()
if topic is None:
return
previous_min_area = self.config.face_recognition.min_area
self.config.face_recognition = face_config
for camera_config in self.config.cameras.values():
if camera_config.face_recognition.min_area == previous_min_area:
camera_config.face_recognition.min_area = face_config.min_area
for processor in self.realtime_processors:
if isinstance(processor, FaceRealTimeProcessor):
processor.update_config(face_config)
logger.debug("Applied dynamic face recognition config update")
def _check_lpr_config_updates(self) -> None:
"""Check for LPR config updates."""
topic, lpr_config = self.lpr_config_subscriber.check_for_update()
if topic is None:
return
previous_min_area = self.config.lpr.min_area
self.config.lpr = lpr_config
for camera_config in self.config.cameras.values():
if camera_config.lpr.min_area == previous_min_area:
camera_config.lpr.min_area = lpr_config.min_area
for processor in self.realtime_processors:
if isinstance(processor, LicensePlateRealTimeProcessor):
processor.update_config(lpr_config)
for processor in self.post_processors:
if isinstance(processor, LicensePlatePostProcessor):
processor.update_config(lpr_config)
logger.debug("Applied dynamic LPR config update")
def _process_requests(self) -> None: def _process_requests(self) -> None:
"""Process embeddings requests""" """Process embeddings requests"""