This commit is contained in:
Nicolas Mowen 2025-05-22 10:22:17 -06:00
parent 90a93d1fba
commit fca5722600

View File

@ -30,32 +30,12 @@ class CameraConfigUpdateTopic:
@property @property
def topic(self) -> str: def topic(self) -> str:
return f"config/cameras/{self.camera}/{self.update_type}" return f"config/cameras/{self.camera}/{self.update_type.name}"
def find_base_topic(topics: list[CameraConfigUpdateTopic]) -> str:
"""
Finds the longest common topic of topic strings.
Args:
topics: list of update topics.
Returns:
The longest common topic of the two strings.
"""
strings = [topic.topic for topic in topics]
for i in range(min([len(s) for s in strings])):
letters = [s[i] for s in strings]
if all([letters[0] == letter for letter in letters]):
continue
return strings[0][0:i]
class CameraConfigUpdatePublisher: class CameraConfigUpdatePublisher:
def __init__(self, publisher: ConfigPublisher): def __init__(self):
self.publisher = publisher self.publisher = ConfigPublisher()
def publish_update(self, topic: CameraConfigUpdateTopic, config: Any) -> None: def publish_update(self, topic: CameraConfigUpdateTopic, config: Any) -> None:
self.publisher.publish(topic.topic, config) self.publisher.publish(topic.topic, config)
@ -71,10 +51,16 @@ class CameraConfigUpdateSubscriber:
topics: list[CameraConfigUpdateEnum], topics: list[CameraConfigUpdateEnum],
): ):
self.camera_configs = camera_configs self.camera_configs = camera_configs
self.exact = len(topics) == 1 self.topics = topics
base_topic = "config/cameras"
if len(self.camera_configs) == 1:
base_topic += f"/{list(self.camera_configs.keys())[0]}"
self.subscriber = ConfigSubscriber( self.subscriber = ConfigSubscriber(
topics[0] if self.exact else find_base_topic(topics), base_topic,
exact=self.exact, exact=False,
) )
def __update_config( def __update_config(
@ -106,18 +92,24 @@ class CameraConfigUpdateSubscriber:
elif update_type == CameraConfigUpdateEnum.zones: elif update_type == CameraConfigUpdateEnum.zones:
config.zones = updated_config config.zones = updated_config
def check_for_update(self) -> bool: def check_for_update(self) -> list[str]:
update_topic, update_config = self.subscriber.check_for_update() updated_topics: list[str] = []
if update_topic and update_config: # get all updates available
_, _, camera, update_type = update_topic.split("/") while True:
update_topic, update_config = self.subscriber.check_for_update()
for enum in CameraConfigUpdateEnum.__members__.values(): if update_topic is None or update_config is None:
if update_type == enum.name: break
self.__update_config(camera, enum)
return True
return False _, _, camera, raw_type = update_topic.split("/")
update_type = CameraConfigUpdateEnum[raw_type]
if update_type in self.topics:
updated_topics.append(update_type.name)
self.__update_config(camera, update_type, update_config)
return updated_topics
def stop(self) -> None: def stop(self) -> None:
self.subscriber.stop() self.subscriber.stop()