frigate/frigate/comms/config_updater.py
Josh Hawkins 95956a690b
Some checks are pending
CI / AMD64 Build (push) Waiting to run
CI / ARM Build (push) Waiting to run
CI / Jetson Jetpack 6 (push) Waiting to run
CI / AMD64 Extra Build (push) Blocked by required conditions
CI / ARM Extra Build (push) Blocked by required conditions
CI / Synaptics Build (push) Blocked by required conditions
CI / Assemble and push default build (push) Blocked by required conditions
Debug replay (#22212)
* debug replay implementation

* fix masks after dev rebase

* fix squash merge issues

* fix

* fix

* fix

* no need to write debug replay camera to config

* camera and filter button and dropdown

* add filters

* add ability to edit motion and object config for debug replay

* add debug draw overlay to debug replay

* add guard to prevent crash when camera is no longer in camera_states

* fix overflow due to radix absolutely positioned elements

* increase number of messages

* ensure deep_merge replaces existing list values when override is true

* add back button

* add debug replay to explore and review menus

* clean up

* clean up

* update instructions to prevent exposing exception info

* fix typing

* refactor output logic

* refactor with helper function

* move init to function for consistency
2026-03-04 10:07:34 -06:00

60 lines
1.8 KiB
Python

"""Facilitates communication between processes."""
import multiprocessing as mp
from _pickle import UnpicklingError
from multiprocessing.synchronize import Event as MpEvent
from typing import Any
import zmq
SOCKET_PUB_SUB = "ipc:///tmp/cache/config"
class ConfigPublisher:
"""Publishes config changes to different processes."""
def __init__(self) -> None:
self.context = zmq.Context()
self.socket = self.context.socket(zmq.PUB)
self.socket.bind(SOCKET_PUB_SUB)
self.stop_event: MpEvent = mp.Event()
def publish(self, topic: str, payload: Any) -> None:
"""There is no communication back to the processes."""
self.socket.send_string(topic, flags=zmq.SNDMORE)
self.socket.send_pyobj(payload)
def stop(self) -> None:
self.stop_event.set()
self.socket.close(linger=0)
self.context.destroy(linger=0)
class ConfigSubscriber:
"""Simplifies receiving an updated config."""
def __init__(self, topic: str, exact: bool = False) -> None:
self.topic = topic
self.exact = exact
self.context = zmq.Context()
self.socket = self.context.socket(zmq.SUB)
self.socket.setsockopt_string(zmq.SUBSCRIBE, topic)
self.socket.connect(SOCKET_PUB_SUB)
def check_for_update(self) -> tuple[str, Any] | tuple[None, None]:
"""Returns updated config or None if no update."""
try:
topic = self.socket.recv_string(flags=zmq.NOBLOCK)
obj = self.socket.recv_pyobj()
if not self.exact or self.topic == topic:
return (topic, obj)
else:
return (None, None)
except (zmq.ZMQError, UnicodeDecodeError, UnpicklingError):
return (None, None)
def stop(self) -> None:
self.socket.close(linger=0)
self.context.destroy(linger=0)