simplify enrichment config updates to use single subscriber with topic-based routing

This commit is contained in:
Josh Hawkins 2026-03-08 12:58:29 -05:00
parent 7142c17759
commit 7800565be4
8 changed files with 53 additions and 101 deletions

View File

@ -1,49 +0,0 @@
"""Convenience classes for updating enrichment configurations dynamically."""
from enum import Enum
from typing import Any
from frigate.comms.config_updater import ConfigSubscriber
class EnrichmentConfigEnum(str, Enum):
"""Supported enrichment config update topics."""
classification = "config/classification"
classification_custom = "config/classification/custom/"
face_recognition = "config/face_recognition"
lpr = "config/lpr"
class EnrichmentConfigSubscriber:
"""Subscribes to all enrichment config updates under ``config/``."""
def __init__(self) -> None:
self.subscriber = ConfigSubscriber("config/")
def check_for_update(
self,
) -> tuple[EnrichmentConfigEnum, str, Any] | tuple[None, None, None]:
"""Check for an enrichment config update.
Returns:
A tuple of (update_type, topic, payload) or (None, None, None)
if no update is available. For custom classification topics the
full topic string is returned so the model name can be extracted.
"""
topic, payload = self.subscriber.check_for_update()
if topic is None:
return (None, None, None)
for member in EnrichmentConfigEnum:
if topic == member.value or (
member == EnrichmentConfigEnum.classification_custom
and topic.startswith(member.value)
):
return (member, topic, payload)
return (None, None, None)
def stop(self) -> None:
self.subscriber.stop()

View File

@ -5,7 +5,6 @@ from abc import ABC, abstractmethod
from typing import Any
from frigate.config import FrigateConfig
from frigate.config.enrichment_updater import EnrichmentConfigEnum
from ..types import DataProcessorMetrics, DataProcessorModelRunner, PostProcessDataEnum
@ -52,15 +51,15 @@ class PostProcessorApi(ABC):
"""
pass
def update_config(self, update_type: EnrichmentConfigEnum, payload: Any) -> None:
def update_config(self, topic: str, payload: Any) -> None:
"""Handle a config change notification.
Called for every config update published under ``config/``.
Processors should override this to check the update type and act
only on changes relevant to them. Default is a no-op.
Processors should override this to check the topic and act only
on changes relevant to them. Default is a no-op.
Args:
update_type: The enrichment config type that changed.
topic: The config topic that changed.
payload: The updated configuration object.
"""
pass

View File

@ -12,7 +12,6 @@ from frigate.comms.embeddings_updater import EmbeddingsRequestEnum
from frigate.comms.event_metadata_updater import EventMetadataPublisher
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import FrigateConfig
from frigate.config.enrichment_updater import EnrichmentConfigEnum
from frigate.data_processing.common.license_plate.mixin import (
WRITE_DEBUG_IMAGES,
LicensePlateProcessingMixin,
@ -48,13 +47,15 @@ class LicensePlatePostProcessor(LicensePlateProcessingMixin, PostProcessorApi):
self.sub_label_publisher = sub_label_publisher
super().__init__(config, metrics, model_runner)
def update_config(self, update_type: EnrichmentConfigEnum, payload: Any) -> None:
CONFIG_UPDATE_TOPIC = "config/lpr"
def update_config(self, topic: str, payload: Any) -> None:
"""Update LPR config at runtime."""
if update_type != EnrichmentConfigEnum.lpr:
if topic != self.CONFIG_UPDATE_TOPIC:
return
self.lpr_config = payload
logger.debug("LPR config updated dynamically")
logger.debug("LPR post-processor config updated dynamically")
def process_data(
self, data: dict[str, Any], data_type: PostProcessDataEnum

View File

@ -7,7 +7,6 @@ from typing import Any
import numpy as np
from frigate.config import FrigateConfig
from frigate.config.enrichment_updater import EnrichmentConfigEnum
from ..types import DataProcessorMetrics
@ -63,15 +62,15 @@ class RealTimeProcessorApi(ABC):
"""
pass
def update_config(self, update_type: EnrichmentConfigEnum, payload: Any) -> None:
def update_config(self, topic: str, payload: Any) -> None:
"""Handle a config change notification.
Called for every config update published under ``config/``.
Processors should override this to check the update type and act
only on changes relevant to them. Default is a no-op.
Processors should override this to check the topic and act only
on changes relevant to them. Default is a no-op.
Args:
update_type: The enrichment config type that changed.
topic: The config topic that changed.
payload: The updated configuration object.
"""
pass

View File

@ -169,6 +169,16 @@ class BirdRealTimeProcessor(RealTimeProcessorApi):
)
self.detected_birds[obj_data["id"]] = score
CONFIG_UPDATE_TOPIC = "config/classification"
def update_config(self, topic: str, payload: Any) -> None:
"""Update bird classification config at runtime."""
if topic != self.CONFIG_UPDATE_TOPIC:
return
self.config.classification = payload
logger.debug("Bird classification config updated dynamically")
def handle_request(self, topic, request_data):
return None

View File

@ -19,7 +19,6 @@ from frigate.comms.event_metadata_updater import (
)
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import FrigateConfig
from frigate.config.enrichment_updater import EnrichmentConfigEnum
from frigate.const import FACE_DIR, MODEL_CACHE_DIR
from frigate.data_processing.common.face.model import (
ArcFaceRecognizer,
@ -96,12 +95,21 @@ class FaceRealTimeProcessor(RealTimeProcessorApi):
self.recognizer.build()
def update_config(self, update_type: EnrichmentConfigEnum, payload: Any) -> None:
CONFIG_UPDATE_TOPIC = "config/face_recognition"
def update_config(self, topic: str, payload: Any) -> None:
"""Update face recognition config at runtime."""
if update_type != EnrichmentConfigEnum.face_recognition:
if topic != self.CONFIG_UPDATE_TOPIC:
return
previous_min_area = self.config.face_recognition.min_area
self.config.face_recognition = payload
self.face_config = payload
for camera_config in self.config.cameras.values():
if camera_config.face_recognition.min_area == previous_min_area:
camera_config.face_recognition.min_area = payload.min_area
logger.debug("Face recognition config updated dynamically")
def __download_models(self, path: str) -> None:

View File

@ -8,7 +8,6 @@ import numpy as np
from frigate.comms.event_metadata_updater import EventMetadataPublisher
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import FrigateConfig
from frigate.config.enrichment_updater import EnrichmentConfigEnum
from frigate.data_processing.common.license_plate.mixin import (
LicensePlateProcessingMixin,
)
@ -41,12 +40,21 @@ class LicensePlateRealTimeProcessor(LicensePlateProcessingMixin, RealTimeProcess
self.camera_current_cars: dict[str, list[str]] = {}
super().__init__(config, metrics)
def update_config(self, update_type: EnrichmentConfigEnum, payload: Any) -> None:
CONFIG_UPDATE_TOPIC = "config/lpr"
def update_config(self, topic: str, payload: Any) -> None:
"""Update LPR config at runtime."""
if update_type != EnrichmentConfigEnum.lpr:
if topic != self.CONFIG_UPDATE_TOPIC:
return
previous_min_area = self.config.lpr.min_area
self.config.lpr = payload
self.lpr_config = payload
for camera_config in self.config.cameras.values():
if camera_config.lpr.min_area == previous_min_area:
camera_config.lpr.min_area = payload.min_area
logger.debug("LPR config updated dynamically")
def process_frame(

View File

@ -9,6 +9,7 @@ from typing import Any
from peewee import DoesNotExist
from frigate.comms.config_updater import ConfigSubscriber
from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum
from frigate.comms.embeddings_updater import (
EmbeddingsRequestEnum,
@ -32,10 +33,6 @@ from frigate.config.camera.updater import (
CameraConfigUpdateEnum,
CameraConfigUpdateSubscriber,
)
from frigate.config.enrichment_updater import (
EnrichmentConfigEnum,
EnrichmentConfigSubscriber,
)
from frigate.data_processing.common.license_plate.model import (
LicensePlateModelRunner,
)
@ -99,7 +96,7 @@ class EmbeddingMaintainer(threading.Thread):
CameraConfigUpdateEnum.semantic_search,
],
)
self.enrichment_config_subscriber = EnrichmentConfigSubscriber()
self.enrichment_config_subscriber = ConfigSubscriber("config/")
# Configure Frigate DB
db = SqliteVecQueueDatabase(
@ -297,43 +294,22 @@ class EmbeddingMaintainer(threading.Thread):
def _check_enrichment_config_updates(self) -> None:
"""Check for enrichment config updates and delegate to processors."""
update_type, topic, payload = (
self.enrichment_config_subscriber.check_for_update()
)
topic, payload = self.enrichment_config_subscriber.check_for_update()
if update_type is None:
if topic is None:
return
# Custom classification add/remove requires managing the processor list
if update_type == EnrichmentConfigEnum.classification_custom:
if topic.startswith("config/classification/custom/"):
self._handle_custom_classification_update(topic, payload)
return
# Apply global config updates before notifying processors
if update_type == EnrichmentConfigEnum.classification:
self.config.classification = payload
logger.debug("Applied dynamic bird classification config update")
elif update_type == EnrichmentConfigEnum.face_recognition:
previous_min_area = self.config.face_recognition.min_area
self.config.face_recognition = payload
for camera_config in self.config.cameras.values():
if camera_config.face_recognition.min_area == previous_min_area:
camera_config.face_recognition.min_area = payload.min_area
elif update_type == EnrichmentConfigEnum.lpr:
previous_min_area = self.config.lpr.min_area
self.config.lpr = payload
for camera_config in self.config.cameras.values():
if camera_config.lpr.min_area == previous_min_area:
camera_config.lpr.min_area = payload.min_area
# Broadcast to all processors — each decides if the update is relevant
# Broadcast to all processors — each decides if the topic is relevant
for processor in self.realtime_processors:
processor.update_config(update_type, payload)
processor.update_config(topic, payload)
for processor in self.post_processors:
processor.update_config(update_type, payload)
processor.update_config(topic, payload)
def _handle_custom_classification_update(
self, topic: str, model_config: Any