frigate/frigate/comms/inter_process.py
Josh Hawkins 95956a690b
Some checks are pending
CI / AMD64 Build (push) Waiting to run
CI / ARM Build (push) Waiting to run
CI / Jetson Jetpack 6 (push) Waiting to run
CI / AMD64 Extra Build (push) Blocked by required conditions
CI / ARM Extra Build (push) Blocked by required conditions
CI / Synaptics Build (push) Blocked by required conditions
CI / Assemble and push default build (push) Blocked by required conditions
Debug replay (#22212)
* debug replay implementation

* fix masks after dev rebase

* fix squash merge issues

* fix

* fix

* fix

* no need to write debug replay camera to config

* camera and filter button and dropdown

* add filters

* add ability to edit motion and object config for debug replay

* add debug draw overlay to debug replay

* add guard to prevent crash when camera is no longer in camera_states

* fix overflow due to radix absolutely positioned elements

* increase number of messages

* ensure deep_merge replaces existing list values when override is true

* add back button

* add debug replay to explore and review menus

* clean up

* clean up

* update instructions to prevent exposing exception info

* fix typing

* refactor output logic

* refactor with helper function

* move init to function for consistency
2026-03-04 10:07:34 -06:00

87 lines
2.7 KiB
Python

"""Facilitates communication between processes."""
import logging
import multiprocessing as mp
import threading
from multiprocessing.synchronize import Event as MpEvent
from typing import Any, Callable
import zmq
from frigate.comms.base_communicator import Communicator
logger = logging.getLogger(__name__)
SOCKET_REP_REQ = "ipc:///tmp/cache/comms"
class InterProcessCommunicator(Communicator):
def __init__(self) -> None:
self.context = zmq.Context()
self.socket = self.context.socket(zmq.REP)
self.socket.bind(SOCKET_REP_REQ)
self.stop_event: MpEvent = mp.Event()
def publish(self, topic: str, payload: Any, retain: bool = False) -> None:
"""There is no communication back to the processes."""
pass
def subscribe(self, receiver: Callable) -> None:
self._dispatcher = receiver
self.reader_thread = threading.Thread(target=self.read)
self.reader_thread.start()
def read(self) -> None:
while not self.stop_event.is_set():
while True: # load all messages that are queued
has_message, _, _ = zmq.select([self.socket], [], [], 1)
if not has_message:
break
try:
raw = self.socket.recv_json(flags=zmq.NOBLOCK)
if isinstance(raw, list):
(topic, value) = raw
response = self._dispatcher(topic, value)
else:
logging.warning(
f"Received unexpected data type in ZMQ recv_json: {type(raw)}"
)
response = None
if response is not None:
self.socket.send_json(response)
else:
self.socket.send_json([])
except zmq.ZMQError:
break
def stop(self) -> None:
self.stop_event.set()
self.reader_thread.join()
self.socket.close(linger=0)
self.context.destroy(linger=0)
class InterProcessRequestor:
"""Simplifies sending data to InterProcessCommunicator and getting a reply."""
def __init__(self) -> None:
self.context = zmq.Context()
self.socket = self.context.socket(zmq.REQ)
self.socket.connect(SOCKET_REP_REQ)
def send_data(self, topic: str, data: Any) -> Any:
"""Sends data and then waits for reply."""
try:
self.socket.send_json((topic, data))
return self.socket.recv_json()
except zmq.ZMQError:
return ""
def stop(self) -> None:
self.socket.close(linger=0)
self.context.destroy(linger=0)