Add config pub / sub pattern

This commit is contained in:
Nicolas Mowen 2024-02-14 06:54:27 -07:00
parent 617c728a88
commit ce2dddcd28
2 changed files with 61 additions and 0 deletions

View File

@ -0,0 +1,60 @@
"""Facilitates communication between processes."""
import multiprocessing as mp
import os
from multiprocessing.synchronize import Event as MpEvent
from typing import Callable, Optional
import zmq
from frigate.comms.dispatcher import Communicator
from frigate.const import PORT_INTER_PROCESS_CONFIG
class ConfigPublisher(Communicator):
"""Publishes config changes to different processes."""
def __init__(self) -> None:
INTER_PROCESS_COMM_PORT = (
os.environ.get("INTER_PROCESS_CONFIG_PORT") or PORT_INTER_PROCESS_CONFIG
)
self.context = zmq.Context()
self.socket = self.context.socket(zmq.PUB)
self.socket.bind(f"tcp://127.0.0.1:{INTER_PROCESS_COMM_PORT}")
self.stop_event: MpEvent = mp.Event()
def publish(self, topic: str, payload: str, retain: bool) -> None:
"""There is no communication back to the processes."""
self.socket.send_string(topic, flags=zmq.SNDMORE)
self.socket.send_pyobj(payload)
def subscribe(self, receiver: Callable) -> None:
pass # this class does not subscribe
def stop(self) -> None:
self.stop_event.set()
self.socket.close()
self.context.destroy()
class ConfigSubscriber:
"""Simplifies receiving an updated config."""
def __init__(self, topic: str) -> None:
port = os.environ.get("INTER_PROCESS_CONFIG_PORT") or PORT_INTER_PROCESS_CONFIG
self.context = zmq.Context()
self.socket = self.context.socket(zmq.SUB)
self.socket.setsockopt_string(zmq.SUBSCRIBE, topic)
self.socket.connect(f"tcp://127.0.0.1:{port}")
def check_for_update(self) -> Optional[any]:
"""Sends data and then waits for reply."""
try:
self.socket.recv_string(flags=zmq.NOBLOCK) # receive the topic string
return self.socket.recv_pyobj()
except zmq.ZMQError:
return None
def stop(self) -> None:
self.socket.close()
self.context.destroy()

View File

@ -60,6 +60,7 @@ DRIVER_INTEL_iHD = "iHD"
# Ports # Ports
PORT_INTER_PROCESS_COMM = 4892 PORT_INTER_PROCESS_COMM = 4892
PORT_INTER_PROCESS_CONFIG = 4893
# Record Values # Record Values