frigate/frigate/detectors/plugins/zmq_ipc.py
Josh Hawkins e7250f24cb
Full UI configuration (#22151)
* use react-jsonschema-form for UI config

* don't use properties wrapper when generating config i18n json

* configure for full i18n support

* section fields

* add descriptions to all fields for i18n

* motion i18n

* fix nullable fields

* sanitize internal fields

* add switches widgets and use friendly names

* fix nullable schema entries

* ensure update_topic is added to api calls

this needs further backend implementation to work correctly

* add global sections, camera config overrides, and reset button

* i18n

* add reset logic to global config view

* tweaks

* fix sections and live validation

* fix validation for schema objects that can be null

* generic and custom per-field validation

* improve generic error validation messages

* remove show advanced fields switch

* tweaks

* use shadcn theme

* fix array field template

* i18n tweaks

* remove collapsible around root section

* deep merge schema for advanced fields

* add array field item template and fix ffmpeg section

* add missing i18n keys

* tweaks

* comment out api call for testing

* add config groups as a separate i18n namespace

* add descriptions to all pydantic fields

* make titles more concise

* new titles as i18n

* update i18n config generation script to use json schema

* tweaks

* tweaks

* rebase

* clean up

* form tweaks

* add wildcards and fix object filter fields

* add field template for additionalproperties schema objects

* improve typing

* add section description from schema and clarify global vs camera level descriptions

* separate and consolidate global and camera i18n namespaces

* clean up now obsolete namespaces

* tweaks

* refactor sections and overrides

* add ability to render components before and after fields

* fix titles

* chore(sections): remove legacy single-section components replaced by template

* refactor configs to use individual files with a template

* fix review description

* apply hidden fields after ui schema

* move util

* remove unused i18n

* clean up error messages

* fix fast refresh

* add custom validation and use it for ffmpeg input roles

* update nav tree

* remove unused

* re-add override and modified indicators

* mark pending changes and add confirmation dialog for resets

* fix red unsaved dot

* tweaks

* add docs links, readonly keys, and restart required per field

* add special case and comments for global motion section

* add section form special cases

* combine review sections

* tweaks

* add audio labels endpoint

* add audio label switches and input to filter list

* fix type

* remove key from config when resetting to default/global

* don't show description for new key/val fields

* tweaks

* spacing tweaks

* add activity indicator and scrollbar tweaks

* add docs to filter fields

* wording changes

* fix global ffmpeg section

* add review classification zones to review form

* add backend endpoint and frontend widget for ffmpeg presets and manual args

* improve wording

* hide descriptions for additional properties arrays

* add warning log about incorrectly nested model config

* spacing and language tweaks

* fix i18n keys

* networking section docs and description

* small wording tweaks

* add layout grid field

* refactor with shared utilities

* field order

* add individual detectors to schema

add detector titles and descriptions (docstrings in pydantic are used for descriptions) and add i18n keys to globals

* clean up detectors section and i18n

* don't save model config back to yaml when saving detectors

* add full detectors config to api model dump

works around the way we use detector plugins so we can have the full detector config for the frontend

* add restart button to toast when restart is required

* add ui option to remove inner cards

* fix buttons

* section tweaks

* don't zoom into text on mobile

* make buttons sticky at bottom of sections

* small tweaks

* highlight label of changed fields

* add null to enum list when unwrapping

* refactor to shared utils and add save all button

* add undo all button

* add RJSF to dictionary

* consolidate utils

* preserve form data when changing cameras

* add mono fonts

* add popover to show what fields will be saved

* fix mobile menu not re-rendering with unsaved dots

* tweaks

* fix logger and env vars config section saving

use escaped periods in keys to retain them in the config file (eg "frigate.embeddings")

* add timezone widget

* role map field with validation

* fix validation for model section

* add another hidden field

* add footer message for required restart

* use rjsf for notifications view

* fix config saving

* add replace rules field

* default column layout and add field sizing

* clean up field template

* refactor profile settings to match rjsf forms

* tweaks

* refactor frigate+ view and make tweaks to sections

* show frigate+ model info in detection model settings when using a frigate+ model

* update restartRequired for all fields

* fix restart fields

* tweaks and add ability enable disabled cameras

more backend changes required

* require restart when enabling camera that is disabled in config

* disable save when form is invalid

* refactor ffmpeg section for readability

* change label

* clean up camera inputs fields

* misc tweaks to ffmpeg section

- add raw paths endpoint to ensure credentials get saved
- restart required tooltip

* maintenance settings tweaks

* don't mutate with lodash

* fix description re-rendering for nullable object fields

* hide reindex field

* update rjsf

* add frigate+ description to settings pane

* disable save all when any section is invalid

* show translated field name in validation error pane

* clean up

* remove unused

* fix genai merge

* fix genai
2026-02-27 08:55:36 -07:00

346 lines
13 KiB
Python

import json
import logging
import os
from typing import Any, List
import numpy as np
import zmq
from pydantic import ConfigDict, Field
from typing_extensions import Literal
from frigate.detectors.detection_api import DetectionApi
from frigate.detectors.detector_config import BaseDetectorConfig
logger = logging.getLogger(__name__)
DETECTOR_KEY = "zmq"
class ZmqDetectorConfig(BaseDetectorConfig):
"""ZMQ IPC detector that offloads inference to an external process via a ZeroMQ IPC endpoint."""
model_config = ConfigDict(
title="ZMQ IPC",
)
type: Literal[DETECTOR_KEY]
endpoint: str = Field(
default="ipc:///tmp/cache/zmq_detector",
title="ZMQ IPC endpoint",
description="The ZMQ endpoint to connect to.",
)
request_timeout_ms: int = Field(
default=200,
title="ZMQ request timeout in milliseconds",
description="Timeout for ZMQ requests in milliseconds.",
)
linger_ms: int = Field(
default=0,
title="ZMQ socket linger in milliseconds",
description="Socket linger period in milliseconds.",
)
class ZmqIpcDetector(DetectionApi):
"""
ZMQ-based detector plugin using a REQ/REP socket over an IPC endpoint.
Protocol:
- Request is sent as a multipart message:
[ header_json_bytes, tensor_bytes ]
where header is a JSON object containing:
{
"shape": List[int],
"dtype": str, # numpy dtype string, e.g. "uint8", "float32"
}
tensor_bytes are the raw bytes of the numpy array in C-order.
- Response is expected to be either:
a) Multipart [ header_json_bytes, tensor_bytes ] with header specifying
shape [20,6] and dtype "float32"; or
b) Single frame tensor_bytes of length 20*6*4 bytes (float32).
On any error or timeout, this detector returns a zero array of shape (20, 6).
Model Management:
- On initialization, sends model request to check if model is available
- If model not available, sends model data via ZMQ
- Only starts inference after model is ready
"""
type_key = DETECTOR_KEY
def __init__(self, detector_config: ZmqDetectorConfig):
super().__init__(detector_config)
self._context = zmq.Context()
self._endpoint = detector_config.endpoint
self._request_timeout_ms = detector_config.request_timeout_ms
self._linger_ms = detector_config.linger_ms
self._socket = None
self._create_socket()
# Model management
self._model_ready = False
self._model_name = self._get_model_name()
# Initialize model if needed
self._initialize_model()
# Preallocate zero result for error paths
self._zero_result = np.zeros((20, 6), np.float32)
def _create_socket(self) -> None:
if self._socket is not None:
try:
self._socket.close(linger=self._linger_ms)
except Exception:
pass
self._socket = self._context.socket(zmq.REQ)
# Apply timeouts and linger so calls don't block indefinitely
self._socket.setsockopt(zmq.RCVTIMEO, self._request_timeout_ms)
self._socket.setsockopt(zmq.SNDTIMEO, self._request_timeout_ms)
self._socket.setsockopt(zmq.LINGER, self._linger_ms)
logger.debug(f"ZMQ detector connecting to {self._endpoint}")
self._socket.connect(self._endpoint)
def _get_model_name(self) -> str:
"""Get the model filename from the detector config."""
model_path = self.detector_config.model.path
return os.path.basename(model_path)
def _initialize_model(self) -> None:
"""Initialize the model by checking availability and transferring if needed."""
try:
logger.info(f"Initializing model: {self._model_name}")
# Check if model is available and transfer if needed
if self._check_and_transfer_model():
logger.info(f"Model {self._model_name} is ready")
self._model_ready = True
else:
logger.error(f"Failed to initialize model {self._model_name}")
except Exception as e:
logger.error(f"Failed to initialize model: {e}")
def _check_and_transfer_model(self) -> bool:
"""Check if model is available and transfer if needed in one atomic operation."""
try:
# Send model availability request
header = {"model_request": True, "model_name": self._model_name}
header_bytes = json.dumps(header).encode("utf-8")
self._socket.send_multipart([header_bytes])
# Temporarily increase timeout for model operations
original_timeout = self._socket.getsockopt(zmq.RCVTIMEO)
self._socket.setsockopt(zmq.RCVTIMEO, 30000)
try:
response_frames = self._socket.recv_multipart()
finally:
self._socket.setsockopt(zmq.RCVTIMEO, original_timeout)
if len(response_frames) == 1:
try:
response = json.loads(response_frames[0].decode("utf-8"))
model_available = response.get("model_available", False)
model_loaded = response.get("model_loaded", False)
if model_available and model_loaded:
return True
elif model_available and not model_loaded:
logger.error("Model exists but failed to load")
return False
else:
return self._send_model_data()
except json.JSONDecodeError:
logger.warning(
"Received non-JSON response for model availability check"
)
return False
else:
logger.warning(
"Received unexpected response format for model availability check"
)
return False
except Exception as e:
logger.error(f"Failed to check and transfer model: {e}")
return False
def _check_model_availability(self) -> bool:
"""Check if the model is available on the detector."""
try:
# Send model availability request
header = {"model_request": True, "model_name": self._model_name}
header_bytes = json.dumps(header).encode("utf-8")
self._socket.send_multipart([header_bytes])
# Receive response
response_frames = self._socket.recv_multipart()
# Check if this is a JSON response (model management)
if len(response_frames) == 1:
try:
response = json.loads(response_frames[0].decode("utf-8"))
model_available = response.get("model_available", False)
model_loaded = response.get("model_loaded", False)
logger.debug(
f"Model availability check: available={model_available}, loaded={model_loaded}"
)
return model_available and model_loaded
except json.JSONDecodeError:
logger.warning(
"Received non-JSON response for model availability check"
)
return False
else:
logger.warning(
"Received unexpected response format for model availability check"
)
return False
except Exception as e:
logger.error(f"Failed to check model availability: {e}")
return False
def _send_model_data(self) -> bool:
"""Send model data to the detector."""
try:
model_path = self.detector_config.model.path
if not os.path.exists(model_path):
logger.error(f"Model file not found: {model_path}")
return False
logger.info(f"Transferring model to detector: {self._model_name}")
with open(model_path, "rb") as f:
model_data = f.read()
header = {"model_data": True, "model_name": self._model_name}
header_bytes = json.dumps(header).encode("utf-8")
self._socket.send_multipart([header_bytes, model_data])
# Temporarily increase timeout for model loading (can take several seconds)
original_timeout = self._socket.getsockopt(zmq.RCVTIMEO)
self._socket.setsockopt(zmq.RCVTIMEO, 30000)
try:
# Receive response
response_frames = self._socket.recv_multipart()
finally:
# Restore original timeout
self._socket.setsockopt(zmq.RCVTIMEO, original_timeout)
# Check if this is a JSON response (model management)
if len(response_frames) == 1:
try:
response = json.loads(response_frames[0].decode("utf-8"))
model_saved = response.get("model_saved", False)
model_loaded = response.get("model_loaded", False)
if model_saved and model_loaded:
logger.info(
f"Model {self._model_name} transferred and loaded successfully"
)
else:
logger.error(
f"Model transfer failed: saved={model_saved}, loaded={model_loaded}"
)
return model_saved and model_loaded
except json.JSONDecodeError:
logger.warning("Received non-JSON response for model data transfer")
return False
else:
logger.warning(
"Received unexpected response format for model data transfer"
)
return False
except Exception as e:
logger.error(f"Failed to send model data: {e}")
return False
def _build_header(self, tensor_input: np.ndarray) -> bytes:
header: dict[str, Any] = {
"shape": list(tensor_input.shape),
"dtype": str(tensor_input.dtype.name),
"model_type": str(self.detector_config.model.model_type.name),
}
return json.dumps(header).encode("utf-8")
def _decode_response(self, frames: List[bytes]) -> np.ndarray:
try:
if len(frames) == 1:
# Single-frame raw float32 (20x6)
buf = frames[0]
if len(buf) != 20 * 6 * 4:
logger.warning(
f"ZMQ detector received unexpected payload size: {len(buf)}"
)
return self._zero_result
return np.frombuffer(buf, dtype=np.float32).reshape((20, 6))
if len(frames) >= 2:
header = json.loads(frames[0].decode("utf-8"))
shape = tuple(header.get("shape", []))
dtype = np.dtype(header.get("dtype", "float32"))
return np.frombuffer(frames[1], dtype=dtype).reshape(shape)
logger.warning("ZMQ detector received empty reply")
return self._zero_result
except Exception as exc: # noqa: BLE001
logger.error(f"ZMQ detector failed to decode response: {exc}")
return self._zero_result
def detect_raw(self, tensor_input: np.ndarray) -> np.ndarray:
if not self._model_ready:
logger.warning("Model not ready, returning zero detections")
return self._zero_result
try:
header_bytes = self._build_header(tensor_input)
payload_bytes = memoryview(tensor_input.tobytes(order="C"))
# Send request
self._socket.send_multipart([header_bytes, payload_bytes])
# Receive reply
reply_frames = self._socket.recv_multipart()
detections = self._decode_response(reply_frames)
# Ensure output shape and dtype are exactly as expected
return detections
except zmq.Again:
# Timeout
logger.debug("ZMQ detector request timed out; resetting socket")
try:
self._create_socket()
self._initialize_model()
except Exception:
pass
return self._zero_result
except zmq.ZMQError as exc:
logger.error(f"ZMQ detector ZMQError: {exc}; resetting socket")
try:
self._create_socket()
self._initialize_model()
except Exception:
pass
return self._zero_result
except Exception as exc: # noqa: BLE001
logger.error(f"ZMQ detector unexpected error: {exc}")
return self._zero_result
def __del__(self) -> None: # pragma: no cover - best-effort cleanup
try:
if self._socket is not None:
self._socket.close(linger=self.detector_config.linger_ms)
except Exception:
pass