mirror of
https://github.com/blakeblackshear/frigate.git
synced 2026-02-08 20:25:26 +03:00
Add config pub / sub pattern
This commit is contained in:
parent
617c728a88
commit
a3fb1325b2
79
frigate/comms/config_updater.py
Normal file
79
frigate/comms/config_updater.py
Normal file
@ -0,0 +1,79 @@
|
||||
"""Facilitates communication between processes."""
|
||||
|
||||
import multiprocessing as mp
|
||||
import os
|
||||
import threading
|
||||
from multiprocessing.synchronize import Event as MpEvent
|
||||
from typing import Callable, Optional
|
||||
|
||||
import zmq
|
||||
|
||||
from frigate.comms.dispatcher import Communicator
|
||||
from frigate.const import PORT_INTER_PROCESS_CONFIG
|
||||
|
||||
|
||||
class ConfigPublisher(Communicator):
|
||||
"""Publishes config changes to different processes."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
INTER_PROCESS_COMM_PORT = (
|
||||
os.environ.get("INTER_PROCESS_CONFIG_PORT") or PORT_INTER_PROCESS_CONFIG
|
||||
)
|
||||
self.context = zmq.Context()
|
||||
self.socket = self.context.socket(zmq.PUB)
|
||||
self.socket.bind(f"tcp://127.0.0.1:{INTER_PROCESS_COMM_PORT}")
|
||||
self.stop_event: MpEvent = mp.Event()
|
||||
|
||||
def publish(self, topic: str, payload: str, retain: bool) -> None:
|
||||
"""There is no communication back to the processes."""
|
||||
self.socket.send_string(topic, flags=zmq.SNDMORE)
|
||||
self.socket.send_pyobj(payload)
|
||||
|
||||
def subscribe(self, receiver: Callable) -> None:
|
||||
self._dispatcher = receiver
|
||||
self.reader_thread = threading.Thread(target=self.read)
|
||||
self.reader_thread.start()
|
||||
|
||||
def read(self) -> None:
|
||||
while not self.stop_event.wait(0.5):
|
||||
while True: # load all messages that are queued
|
||||
try:
|
||||
(topic, value) = self.socket.recv_pyobj(flags=zmq.NOBLOCK)
|
||||
|
||||
response = self._dispatcher(topic, value)
|
||||
|
||||
if response is not None:
|
||||
self.socket.send_pyobj(response)
|
||||
else:
|
||||
self.socket.send_pyobj([])
|
||||
except zmq.ZMQError:
|
||||
break
|
||||
|
||||
def stop(self) -> None:
|
||||
self.stop_event.set()
|
||||
self.reader_thread.join()
|
||||
self.socket.close()
|
||||
self.context.destroy()
|
||||
|
||||
|
||||
class ConfigSubscriber:
|
||||
"""Simplifies receiving an updated config."""
|
||||
|
||||
def __init__(self, topic: str) -> None:
|
||||
port = os.environ.get("INTER_PROCESS_CONFIG_PORT") or PORT_INTER_PROCESS_CONFIG
|
||||
self.context = zmq.Context()
|
||||
self.socket = self.context.socket(zmq.SUB)
|
||||
self.socket.setsockopt_string(zmq.SUBSCRIBE, topic)
|
||||
self.socket.connect(f"tcp://127.0.0.1:{port}")
|
||||
|
||||
def check_for_update(self) -> Optional[any]:
|
||||
"""Sends data and then waits for reply."""
|
||||
try:
|
||||
self.socket.recv_string(flags=zmq.NOBLOCK) # receive the topic string
|
||||
return self.socket.recv_pyobj()
|
||||
except zmq.ZMQError:
|
||||
return None
|
||||
|
||||
def stop(self) -> None:
|
||||
self.socket.close()
|
||||
self.context.destroy()
|
||||
@ -60,6 +60,7 @@ DRIVER_INTEL_iHD = "iHD"
|
||||
# Ports
|
||||
|
||||
PORT_INTER_PROCESS_COMM = 4892
|
||||
PORT_INTER_PROCESS_CONFIG = 4893
|
||||
|
||||
# Record Values
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user