Use ipc instead of tcp

This commit is contained in:
Nicolas Mowen 2024-02-19 06:14:33 -07:00
parent 71184c0ef4
commit 5bec8999bf
5 changed files with 14 additions and 50 deletions

View File

@ -1,25 +1,21 @@
"""Facilitates communication between processes."""
import multiprocessing as mp
import os
from multiprocessing.synchronize import Event as MpEvent
from typing import Optional
import zmq
from frigate.const import PORT_INTER_PROCESS_CONFIG
SOCKET_PUB_SUB = "ipc:///tmp/cache/config"
class ConfigPublisher:
"""Publishes config changes to different processes."""
def __init__(self) -> None:
INTER_PROCESS_CONFIG_PORT = (
os.environ.get("INTER_PROCESS_CONFIG_PORT") or PORT_INTER_PROCESS_CONFIG
)
self.context = zmq.Context()
self.socket = self.context.socket(zmq.PUB)
self.socket.bind(f"tcp://127.0.0.1:{INTER_PROCESS_CONFIG_PORT}")
self.socket.bind(SOCKET_PUB_SUB)
self.stop_event: MpEvent = mp.Event()
def publish(self, topic: str, payload: any) -> None:
@ -37,11 +33,10 @@ class ConfigSubscriber:
"""Simplifies receiving an updated config."""
def __init__(self, topic: str) -> None:
port = os.environ.get("INTER_PROCESS_CONFIG_PORT") or PORT_INTER_PROCESS_CONFIG
self.context = zmq.Context()
self.socket = self.context.socket(zmq.SUB)
self.socket.setsockopt_string(zmq.SUBSCRIBE, topic)
self.socket.connect(f"tcp://127.0.0.1:{port}")
self.socket.connect(SOCKET_PUB_SUB)
def check_for_update(self) -> Optional[tuple[str, any]]:
"""Returns updated config or None if no update."""

View File

@ -1,18 +1,14 @@
"""Facilitates communication between processes."""
import os
import threading
from enum import Enum
from typing import Optional
import zmq
from frigate.const import (
PORT_INTER_PROCESS_DETECTION_PUB,
PORT_INTER_PROCESS_DETECTION_SUB,
)
SOCKET_CONTROL = "inproc://control.detections_updater"
SOCKET_PUB = "ipc:///tmp/cache/detect_pub"
SOCKET_SUB = "ipc:///tmp/cache/detect_sun"
class DetectionTypeEnum(str, Enum):
@ -29,21 +25,13 @@ class DetectionProxyRunner(threading.Thread):
def run(self) -> None:
"""Run the proxy."""
PUB_PORT = (
os.environ.get("INTER_PROCESS_DETECTION_PUB_PORT")
or PORT_INTER_PROCESS_DETECTION_PUB
)
SUB_PORT = (
os.environ.get("INTER_PROCESS_DETECTION_SUB_PORT")
or PORT_INTER_PROCESS_DETECTION_SUB
)
control = self.context.socket(zmq.SUB)
control.connect(SOCKET_CONTROL)
control.setsockopt_string(zmq.SUBSCRIBE, "")
incoming = self.context.socket(zmq.XSUB)
incoming.bind(f"tcp://127.0.0.1:{PUB_PORT}")
incoming.bind(SOCKET_PUB)
outgoing = self.context.socket(zmq.XPUB)
outgoing.bind(f"tcp://127.0.0.1:{SUB_PORT}")
outgoing.bind(SOCKET_SUB)
zmq.proxy_steerable(
incoming, outgoing, None, control
@ -72,14 +60,10 @@ class DetectionPublisher:
"""Simplifies receiving video and audio detections."""
def __init__(self, topic: DetectionTypeEnum) -> None:
port = (
os.environ.get("INTER_PROCESS_DETECTIONS_PORT")
or PORT_INTER_PROCESS_DETECTION_PUB
)
self.topic = topic
self.context = zmq.Context()
self.socket = self.context.socket(zmq.PUB)
self.socket.connect(f"tcp://127.0.0.1:{port}")
self.socket.connect(SOCKET_PUB)
def send_data(self, payload: any) -> None:
"""Publish detection."""
@ -95,14 +79,10 @@ class DetectionSubscriber:
"""Simplifies receiving video and audio detections."""
def __init__(self, topic: DetectionTypeEnum) -> None:
port = (
os.environ.get("INTER_PROCESS_DETECTIONS_PORT")
or PORT_INTER_PROCESS_DETECTION_SUB
)
self.context = zmq.Context()
self.socket = self.context.socket(zmq.SUB)
self.socket.setsockopt_string(zmq.SUBSCRIBE, topic.value)
self.socket.connect(f"tcp://127.0.0.1:{port}")
self.socket.connect(SOCKET_SUB)
def get_data(self, timeout: float = None) -> Optional[tuple[str, any]]:
"""Returns detections or None if no update."""

View File

@ -1,7 +1,6 @@
"""Facilitates communication between processes."""
import multiprocessing as mp
import os
import threading
from multiprocessing.synchronize import Event as MpEvent
from typing import Callable
@ -9,17 +8,15 @@ from typing import Callable
import zmq
from frigate.comms.dispatcher import Communicator
from frigate.const import PORT_INTER_PROCESS_COMM
SOCKET_REP_REQ = "ipc:///tmp/cache/comms"
class InterProcessCommunicator(Communicator):
def __init__(self) -> None:
INTER_PROCESS_COMM_PORT = (
os.environ.get("INTER_PROCESS_COMM_PORT") or PORT_INTER_PROCESS_COMM
)
self.context = zmq.Context()
self.socket = self.context.socket(zmq.REP)
self.socket.bind(f"tcp://127.0.0.1:{INTER_PROCESS_COMM_PORT}")
self.socket.bind(SOCKET_REP_REQ)
self.stop_event: MpEvent = mp.Event()
def publish(self, topic: str, payload: str, retain: bool) -> None:
@ -62,10 +59,9 @@ class InterProcessRequestor:
"""Simplifies sending data to InterProcessCommunicator and getting a reply."""
def __init__(self) -> None:
port = os.environ.get("INTER_PROCESS_COMM_PORT") or PORT_INTER_PROCESS_COMM
self.context = zmq.Context()
self.socket = self.context.socket(zmq.REQ)
self.socket.connect(f"tcp://127.0.0.1:{port}")
self.socket.connect(SOCKET_REP_REQ)
def send_data(self, topic: str, data: any) -> any:
"""Sends data and then waits for reply."""

View File

@ -1049,7 +1049,7 @@ def verify_motion_and_detect(camera_config: CameraConfig) -> ValueError | None:
"""Verify that required_zones are specified when autotracking is enabled."""
if camera_config.detect.enabled and not camera_config.motion.enabled:
raise ValueError(
f"Camera {camera_config.name} has motion detection disabled and object detection enabled but object detection requires motion detection ."
f"Camera {camera_config.name} has motion detection disabled and object detection enabled but object detection requires motion detection."
)

View File

@ -57,13 +57,6 @@ DRIVER_AMD = "radeonsi"
DRIVER_INTEL_i965 = "i965"
DRIVER_INTEL_iHD = "iHD"
# Ports
PORT_INTER_PROCESS_COMM = 4892
PORT_INTER_PROCESS_CONFIG = 4893
PORT_INTER_PROCESS_DETECTION_PUB = 4894
PORT_INTER_PROCESS_DETECTION_SUB = 4895
# Record Values
CACHE_SEGMENT_FORMAT = "%Y%m%d%H%M%S%z"