From 0acb4936fe08f8dc19aa8fa224b0b7005e473094 Mon Sep 17 00:00:00 2001 From: Nicolas Mowen Date: Tue, 10 Jun 2025 08:50:03 -0600 Subject: [PATCH] Add base class for global config updates --- frigate/config/updater.py | 67 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 frigate/config/updater.py diff --git a/frigate/config/updater.py b/frigate/config/updater.py new file mode 100644 index 000000000..db01d8e22 --- /dev/null +++ b/frigate/config/updater.py @@ -0,0 +1,67 @@ +"""Convenience classes for updating global configurations dynamically.""" + +from dataclasses import dataclass +from enum import Enum +from typing import Any + +from frigate.comms.config_updater import ConfigPublisher, ConfigSubscriber + + +class GlobalConfigUpdateEnum(str, Enum): + """Supported global config update types.""" + + add_camera = "add_camera" + remove_camera = "remove_camera" + + +@dataclass +class GlobalConfigUpdateTopic: + update_type: GlobalConfigUpdateEnum + + @property + def topic(self) -> str: + return f"config/{self.update_type.name}" + + +class GlobalConfigUpdatePublisher: + def __init__(self): + self.publisher = ConfigPublisher() + + def publish_update(self, topic: GlobalConfigUpdateTopic, config: Any) -> None: + self.publisher.publish(topic.topic, config) + + def stop(self) -> None: + self.publisher.stop() + + +class GlobalConfigUpdateSubscriber: + def __init__( + self, + topics: list[GlobalConfigUpdateEnum], + ): + self.topics = topics + self.subscriber = ConfigSubscriber( + "config/", + exact=False, + ) + + def check_for_updates(self) -> dict[str, list[str]]: + updated_topics: list[str] = [] + + # get all updates available + while True: + update_topic, update_config = self.subscriber.check_for_update() + + if update_topic is None or update_config is None: + break + + _, raw_type = update_topic.split("/") + update_type = GlobalConfigUpdateEnum[raw_type] + + if update_type in self.topics: + updated_topics.append(update_type.name) + + return updated_topics + + def stop(self) -> None: + self.subscriber.stop()