diff --git a/frigate/comms/config_updater.py b/frigate/comms/config_updater.py new file mode 100644 index 000000000..3aaa54ee3 --- /dev/null +++ b/frigate/comms/config_updater.py @@ -0,0 +1,79 @@ +"""Facilitates communication between processes.""" + +import multiprocessing as mp +import os +import threading +from multiprocessing.synchronize import Event as MpEvent +from typing import Callable, Optional + +import zmq + +from frigate.comms.dispatcher import Communicator +from frigate.const import PORT_INTER_PROCESS_CONFIG + + +class ConfigPublisher(Communicator): + """Publishes config changes to different processes.""" + + def __init__(self) -> None: + INTER_PROCESS_COMM_PORT = ( + os.environ.get("INTER_PROCESS_CONFIG_PORT") or PORT_INTER_PROCESS_CONFIG + ) + self.context = zmq.Context() + self.socket = self.context.socket(zmq.PUB) + self.socket.bind(f"tcp://127.0.0.1:{INTER_PROCESS_COMM_PORT}") + self.stop_event: MpEvent = mp.Event() + + def publish(self, topic: str, payload: str, retain: bool) -> None: + """There is no communication back to the processes.""" + self.socket.send_string(topic, flags=zmq.SNDMORE) + self.socket.send_pyobj(payload) + + def subscribe(self, receiver: Callable) -> None: + self._dispatcher = receiver + self.reader_thread = threading.Thread(target=self.read) + self.reader_thread.start() + + def read(self) -> None: + while not self.stop_event.wait(0.5): + while True: # load all messages that are queued + try: + (topic, value) = self.socket.recv_pyobj(flags=zmq.NOBLOCK) + + response = self._dispatcher(topic, value) + + if response is not None: + self.socket.send_pyobj(response) + else: + self.socket.send_pyobj([]) + except zmq.ZMQError: + break + + def stop(self) -> None: + self.stop_event.set() + self.reader_thread.join() + self.socket.close() + self.context.destroy() + + +class ConfigSubscriber: + """Simplifies receiving an updated config.""" + + def __init__(self, topic: str) -> None: + port = os.environ.get("INTER_PROCESS_CONFIG_PORT") or PORT_INTER_PROCESS_CONFIG + self.context = zmq.Context() + self.socket = self.context.socket(zmq.SUB) + self.socket.setsockopt_string(zmq.SUBSCRIBE, topic) + self.socket.connect(f"tcp://127.0.0.1:{port}") + + def check_for_update(self) -> Optional[any]: + """Sends data and then waits for reply.""" + try: + self.socket.recv_string(flags=zmq.NOBLOCK) # receive the topic string + return self.socket.recv_pyobj() + except zmq.ZMQError: + return None + + def stop(self) -> None: + self.socket.close() + self.context.destroy() diff --git a/frigate/const.py b/frigate/const.py index 158996116..7c56bb337 100644 --- a/frigate/const.py +++ b/frigate/const.py @@ -60,6 +60,7 @@ DRIVER_INTEL_iHD = "iHD" # Ports PORT_INTER_PROCESS_COMM = 4892 +PORT_INTER_PROCESS_CONFIG = 4893 # Record Values