Refactor enrichment confg updater (#22325)

* enrichment updater and enum

* update_config stubs

* config updaters in enrichments

* update maintainer

* formatting

* simplify enrichment config updates to use single subscriber with topic-based routing
This commit is contained in:
Josh Hawkins 2026-03-08 15:14:18 -05:00 committed by GitHub
parent df27e04c0f
commit b2c7840c29
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 142 additions and 132 deletions

View File

@ -50,3 +50,16 @@ class PostProcessorApi(ABC):
None if request was not handled, otherwise return response.
"""
pass
def update_config(self, topic: str, payload: Any) -> None:
"""Handle a config change notification.
Called for every config update published under ``config/``.
Processors should override this to check the topic and act only
on changes relevant to them. Default is a no-op.
Args:
topic: The config topic that changed.
payload: The updated configuration object.
"""
pass

View File

@ -12,7 +12,6 @@ from frigate.comms.embeddings_updater import EmbeddingsRequestEnum
from frigate.comms.event_metadata_updater import EventMetadataPublisher
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import FrigateConfig
from frigate.config.classification import LicensePlateRecognitionConfig
from frigate.data_processing.common.license_plate.mixin import (
WRITE_DEBUG_IMAGES,
LicensePlateProcessingMixin,
@ -48,10 +47,15 @@ class LicensePlatePostProcessor(LicensePlateProcessingMixin, PostProcessorApi):
self.sub_label_publisher = sub_label_publisher
super().__init__(config, metrics, model_runner)
def update_config(self, lpr_config: LicensePlateRecognitionConfig) -> None:
CONFIG_UPDATE_TOPIC = "config/lpr"
def update_config(self, topic: str, payload: Any) -> None:
"""Update LPR config at runtime."""
self.lpr_config = lpr_config
logger.debug("LPR config updated dynamically")
if topic != self.CONFIG_UPDATE_TOPIC:
return
self.lpr_config = payload
logger.debug("LPR post-processor config updated dynamically")
def process_data(
self, data: dict[str, Any], data_type: PostProcessDataEnum

View File

@ -61,3 +61,16 @@ class RealTimeProcessorApi(ABC):
None.
"""
pass
def update_config(self, topic: str, payload: Any) -> None:
"""Handle a config change notification.
Called for every config update published under ``config/``.
Processors should override this to check the topic and act only
on changes relevant to them. Default is a no-op.
Args:
topic: The config topic that changed.
payload: The updated configuration object.
"""
pass

View File

@ -169,6 +169,16 @@ class BirdRealTimeProcessor(RealTimeProcessorApi):
)
self.detected_birds[obj_data["id"]] = score
CONFIG_UPDATE_TOPIC = "config/classification"
def update_config(self, topic: str, payload: Any) -> None:
"""Update bird classification config at runtime."""
if topic != self.CONFIG_UPDATE_TOPIC:
return
self.config.classification = payload
logger.debug("Bird classification config updated dynamically")
def handle_request(self, topic, request_data):
return None

View File

@ -19,7 +19,6 @@ from frigate.comms.event_metadata_updater import (
)
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import FrigateConfig
from frigate.config.classification import FaceRecognitionConfig
from frigate.const import FACE_DIR, MODEL_CACHE_DIR
from frigate.data_processing.common.face.model import (
ArcFaceRecognizer,
@ -96,9 +95,21 @@ class FaceRealTimeProcessor(RealTimeProcessorApi):
self.recognizer.build()
def update_config(self, face_config: FaceRecognitionConfig) -> None:
CONFIG_UPDATE_TOPIC = "config/face_recognition"
def update_config(self, topic: str, payload: Any) -> None:
"""Update face recognition config at runtime."""
self.face_config = face_config
if topic != self.CONFIG_UPDATE_TOPIC:
return
previous_min_area = self.config.face_recognition.min_area
self.config.face_recognition = payload
self.face_config = payload
for camera_config in self.config.cameras.values():
if camera_config.face_recognition.min_area == previous_min_area:
camera_config.face_recognition.min_area = payload.min_area
logger.debug("Face recognition config updated dynamically")
def __download_models(self, path: str) -> None:

View File

@ -8,7 +8,6 @@ import numpy as np
from frigate.comms.event_metadata_updater import EventMetadataPublisher
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import FrigateConfig
from frigate.config.classification import LicensePlateRecognitionConfig
from frigate.data_processing.common.license_plate.mixin import (
LicensePlateProcessingMixin,
)
@ -41,9 +40,21 @@ class LicensePlateRealTimeProcessor(LicensePlateProcessingMixin, RealTimeProcess
self.camera_current_cars: dict[str, list[str]] = {}
super().__init__(config, metrics)
def update_config(self, lpr_config: LicensePlateRecognitionConfig) -> None:
CONFIG_UPDATE_TOPIC = "config/lpr"
def update_config(self, topic: str, payload: Any) -> None:
"""Update LPR config at runtime."""
self.lpr_config = lpr_config
if topic != self.CONFIG_UPDATE_TOPIC:
return
previous_min_area = self.config.lpr.min_area
self.config.lpr = payload
self.lpr_config = payload
for camera_config in self.config.cameras.values():
if camera_config.lpr.min_area == previous_min_area:
camera_config.lpr.min_area = payload.min_area
logger.debug("LPR config updated dynamically")
def process_frame(

View File

@ -96,16 +96,7 @@ class EmbeddingMaintainer(threading.Thread):
CameraConfigUpdateEnum.semantic_search,
],
)
self.classification_config_subscriber = ConfigSubscriber(
"config/classification/custom/"
)
self.bird_classification_config_subscriber = ConfigSubscriber(
"config/classification", exact=True
)
self.face_recognition_config_subscriber = ConfigSubscriber(
"config/face_recognition", exact=True
)
self.lpr_config_subscriber = ConfigSubscriber("config/lpr", exact=True)
self.enrichment_config_subscriber = ConfigSubscriber("config/")
# Configure Frigate DB
db = SqliteVecQueueDatabase(
@ -280,10 +271,7 @@ class EmbeddingMaintainer(threading.Thread):
"""Maintain a SQLite-vec database for semantic search."""
while not self.stop_event.is_set():
self.config_updater.check_for_updates()
self._check_classification_config_updates()
self._check_bird_classification_config_updates()
self._check_face_recognition_config_updates()
self._check_lpr_config_updates()
self._check_enrichment_config_updates()
self._process_requests()
self._process_updates()
self._process_recordings_updates()
@ -294,10 +282,7 @@ class EmbeddingMaintainer(threading.Thread):
self._process_event_metadata()
self.config_updater.stop()
self.classification_config_subscriber.stop()
self.bird_classification_config_subscriber.stop()
self.face_recognition_config_subscriber.stop()
self.lpr_config_subscriber.stop()
self.enrichment_config_subscriber.stop()
self.event_subscriber.stop()
self.event_end_subscriber.stop()
self.recordings_subscriber.stop()
@ -308,11 +293,29 @@ class EmbeddingMaintainer(threading.Thread):
self.requestor.stop()
logger.info("Exiting embeddings maintenance...")
def _check_classification_config_updates(self) -> None:
"""Check for classification config updates and add/remove processors."""
topic, model_config = self.classification_config_subscriber.check_for_update()
def _check_enrichment_config_updates(self) -> None:
"""Check for enrichment config updates and delegate to processors."""
topic, payload = self.enrichment_config_subscriber.check_for_update()
if topic:
if topic is None:
return
# Custom classification add/remove requires managing the processor list
if topic.startswith("config/classification/custom/"):
self._handle_custom_classification_update(topic, payload)
return
# Broadcast to all processors — each decides if the topic is relevant
for processor in self.realtime_processors:
processor.update_config(topic, payload)
for processor in self.post_processors:
processor.update_config(topic, payload)
def _handle_custom_classification_update(
self, topic: str, model_config: Any
) -> None:
"""Handle add/remove of custom classification processors."""
model_name = topic.split("/")[-1]
if model_config is None:
@ -334,7 +337,8 @@ class EmbeddingMaintainer(threading.Thread):
logger.info(
f"Successfully removed classification processor for model: {model_name}"
)
else:
return
self.config.classification.custom[model_name] = model_config
# Check if processor already exists
@ -370,62 +374,6 @@ class EmbeddingMaintainer(threading.Thread):
f"Added classification processor for model: {model_name} (type: {type(processor).__name__})"
)
def _check_bird_classification_config_updates(self) -> None:
"""Check for bird classification config updates."""
topic, classification_config = (
self.bird_classification_config_subscriber.check_for_update()
)
if topic is None:
return
self.config.classification = classification_config
logger.debug("Applied dynamic bird classification config update")
def _check_face_recognition_config_updates(self) -> None:
"""Check for face recognition config updates."""
topic, face_config = self.face_recognition_config_subscriber.check_for_update()
if topic is None:
return
previous_min_area = self.config.face_recognition.min_area
self.config.face_recognition = face_config
for camera_config in self.config.cameras.values():
if camera_config.face_recognition.min_area == previous_min_area:
camera_config.face_recognition.min_area = face_config.min_area
for processor in self.realtime_processors:
if isinstance(processor, FaceRealTimeProcessor):
processor.update_config(face_config)
logger.debug("Applied dynamic face recognition config update")
def _check_lpr_config_updates(self) -> None:
"""Check for LPR config updates."""
topic, lpr_config = self.lpr_config_subscriber.check_for_update()
if topic is None:
return
previous_min_area = self.config.lpr.min_area
self.config.lpr = lpr_config
for camera_config in self.config.cameras.values():
if camera_config.lpr.min_area == previous_min_area:
camera_config.lpr.min_area = lpr_config.min_area
for processor in self.realtime_processors:
if isinstance(processor, LicensePlateRealTimeProcessor):
processor.update_config(lpr_config)
for processor in self.post_processors:
if isinstance(processor, LicensePlatePostProcessor):
processor.update_config(lpr_config)
logger.debug("Applied dynamic LPR config update")
def _process_requests(self) -> None:
"""Process embeddings requests"""