add zmq topic

This commit is contained in:
Josh Hawkins 2025-09-26 16:57:54 -05:00
parent 6ba318b297
commit 250565eff6

View File

@ -2,6 +2,7 @@
import logging
from enum import Enum
from typing import Any
from .zmq_proxy import Publisher, Subscriber
@ -11,6 +12,7 @@ logger = logging.getLogger(__name__)
class RecordingsDataTypeEnum(str, Enum):
all = ""
recordings_available_through = "recordings_available_through"
latest_valid_segment = "latest_valid_segment"
class RecordingsDataPublisher(Publisher[tuple[str, float]]):
@ -18,10 +20,10 @@ class RecordingsDataPublisher(Publisher[tuple[str, float]]):
topic_base = "recordings/"
def __init__(self, topic: RecordingsDataTypeEnum) -> None:
super().__init__(topic.value)
def __init__(self) -> None:
super().__init__()
def publish(self, payload: tuple[str, float], sub_topic: str = "") -> None:
def publish(self, payload: Any, sub_topic: str = "") -> None:
super().publish(payload, sub_topic)
@ -32,3 +34,11 @@ class RecordingsDataSubscriber(Subscriber):
def __init__(self, topic: RecordingsDataTypeEnum) -> None:
super().__init__(topic.value)
def _return_object(
self, topic: str, payload: tuple | None
) -> tuple[str, Any] | tuple[None, None]:
if payload is None:
return (None, None)
return (topic, payload)