Native ONVIF cell-motion ingest

Adds a per-camera ONVIF subscriber that lets cameras with native
hardware motion detection (e.g. OpenIPC firmware for HiSilicon,
Ingenic and SigmaStar SoCs; many ONVIF Profile-M devices) replace
Frigate's per-frame CPU motion analysis. Two standard ONVIF
transports are consumed in parallel:

- WS-BaseNotification PullPoint for the binary motion state
  (tns1:RuleEngine/CellMotionDetector/Motion IsMotion=true|false,
   with tns1:VideoSource/MotionAlarm State=true|false accepted as
   a fallback for cameras that only publish the legacy topic).
- RTSP analytics metadata stream (application/vnd.onvif.metadata)
  for the per-frame cell grid (tt:MotionInCells, base64 + PackBits
  bit-packed bitmap). Cell layout is discovered once at startup via
  AnalyticsService.GetAnalyticsModules and the camera's CellLayout
  transformation is used to map cells to detect-frame pixel
  rectangles via connected-components.

New config:
  onvif.events.{enabled, subscription_timeout, use_metadata_stream}
  motion.source: internal (default) | onvif

When motion.source: onvif, ImprovedMotionDetector is skipped and
motion_boxes come from the camera. Internal motion remains the
default; the new path is fully opt-in.
This commit is contained in:
Dmitry Ilyin 2026-05-30 19:39:22 +03:00 committed by John
parent bc65713ae4
commit b420efdebd
15 changed files with 1165 additions and 10 deletions

View File

@ -198,6 +198,46 @@ When the skip threshold is exceeded, **no motion is reported** for that frame, m
:::
## Using Camera-Side ONVIF Motion Detection
For cameras that publish their own ONVIF cell-motion analytics (e.g. OpenIPC firmware for HiSilicon, Ingenic and SigmaStar SoCs, plus most ONVIF Profile-M devices from Hikvision, Reolink, Foscam, Amcrest, etc.), Frigate can use the camera's hardware motion engine instead of running per-frame analysis on the host CPU. This both removes CPU load from the Frigate machine and gives a more accurate motion signal than encoded-stream analysis can produce.
Frigate consumes the two standard ONVIF transports:
- **PullPoint** event subscription on `tns1:RuleEngine/CellMotionDetector/Motion` carries the binary on/off state (the legacy `tns1:VideoSource/MotionAlarm` payload is also accepted).
- **RTSP analytics metadata stream** (the `application/vnd.onvif.metadata` track on the primary RTSP profile) carries the per-frame cell grid (`tt:MotionInCells`) which Frigate decodes (base64 + PackBits) and maps through the `CellLayout` transformation into Frigate's detect-frame pixel coordinates.
```yaml
cameras:
back_door:
onvif:
host: 10.0.0.10
port: 80
user: root
password: "secret"
events:
# Subscribe to camera-side motion events.
enabled: true
# Seconds before the PullPoint subscription expires (we renew at half this).
subscription_timeout: 60
# Open the RTSP analytics metadata stream for per-cell motion coordinates.
# Disable if your camera only publishes the binary event topic.
use_metadata_stream: true
motion:
# Use the camera's ONVIF events as Frigate's motion signal. The internal
# CPU motion detector is skipped.
source: onvif
detect:
enabled: true
```
When `motion.source: onvif`:
- Frigate's internal `ImprovedMotionDetector` is **not** run on the camera's frames.
- Object detection still runs every detection frame; motion boxes are used for region clustering exactly as with the internal detector.
- If `use_metadata_stream: true` but the camera doesn't advertise the metadata track (or PackBits decoding fails for a frame), Frigate falls back to a full-frame motion box while the binary event signal is active.
- The validator requires `onvif.events.enabled: true` whenever `motion.source: onvif`.
## Reviewing Detected Motion
To review what the detector picked up — or to search past recordings for motion in a specific region — see [Reviewing Motion](review.md#reviewing-motion) on the Review page.

View File

@ -309,7 +309,9 @@ class FrigateApp:
self.detection_proxy = DetectorProxy()
def init_onvif(self) -> None:
self.onvif_controller = OnvifController(self.config, self.ptz_metrics)
self.onvif_controller = OnvifController(
self.config, self.ptz_metrics, self.camera_metrics
)
def init_dispatcher(self) -> None:
comms: list[Communicator] = []

View File

@ -1,6 +1,6 @@
import multiprocessing as mp
import queue
from multiprocessing.managers import SyncManager, ValueProxy
from multiprocessing.managers import ListProxy, SyncManager, ValueProxy
from multiprocessing.sharedctypes import Synchronized
from multiprocessing.synchronize import Event
@ -23,6 +23,14 @@ class CameraMetrics:
reconnects_last_hour: ValueProxy[int]
stalls_last_hour: ValueProxy[int]
# External motion published by OnvifController when motion.source=onvif.
# external_motion_active mirrors the PullPoint IsMotion state.
# external_motion_boxes carries the per-frame cell-derived rectangles in
# detect-frame pixel coordinates; empty list means no current spatial
# data (consumer should fall back to a full-frame box when active=1).
external_motion_active: ValueProxy[int]
external_motion_boxes: ListProxy
def __init__(self, manager: SyncManager):
self.camera_fps = manager.Value("d", 0)
self.detection_fps = manager.Value("d", 0)
@ -41,6 +49,9 @@ class CameraMetrics:
self.reconnects_last_hour = manager.Value("i", 0)
self.stalls_last_hour = manager.Value("i", 0)
self.external_motion_active = manager.Value("b", 0)
self.external_motion_boxes = manager.list()
class PTZMetrics:
autotracker_enabled: Synchronized

View File

@ -1,3 +1,4 @@
from enum import Enum
from typing import Any, Optional
from pydantic import Field, field_serializer
@ -5,7 +6,12 @@ from pydantic import Field, field_serializer
from ..base import FrigateBaseModel
from .mask import MotionMaskConfig
__all__ = ["MotionConfig"]
__all__ = ["MotionConfig", "MotionSourceEnum"]
class MotionSourceEnum(str, Enum):
internal = "internal"
onvif = "onvif"
class MotionConfig(FrigateBaseModel):
@ -14,6 +20,11 @@ class MotionConfig(FrigateBaseModel):
title="Enable motion detection",
description="Enable or disable motion detection for all cameras; can be overridden per-camera.",
)
source: MotionSourceEnum = Field(
default=MotionSourceEnum.internal,
title="Motion source",
description="Where motion state comes from: Frigate's internal frame analyser, or the camera's ONVIF cell-motion events (requires onvif.events.enabled).",
)
threshold: int = Field(
default=30,
title="Motion threshold",

View File

@ -7,7 +7,12 @@ from ..base import FrigateBaseModel
from ..env import EnvString
from .objects import DEFAULT_TRACKED_OBJECTS
__all__ = ["OnvifConfig", "PtzAutotrackConfig", "ZoomingModeEnum"]
__all__ = [
"OnvifConfig",
"OnvifEventsConfig",
"PtzAutotrackConfig",
"ZoomingModeEnum",
]
class ZoomingModeEnum(str, Enum):
@ -91,6 +96,26 @@ class PtzAutotrackConfig(FrigateBaseModel):
return weights
class OnvifEventsConfig(FrigateBaseModel):
enabled: bool = Field(
default=False,
title="Enable ONVIF events",
description="Subscribe to the camera's ONVIF cell-motion notifications and use them as Frigate's motion signal.",
)
subscription_timeout: int = Field(
default=60,
ge=10,
le=600,
title="Subscription timeout",
description="Seconds before the PullPoint subscription expires and is renewed.",
)
use_metadata_stream: bool = Field(
default=True,
title="Use metadata stream",
description="Open the ONVIF analytics RTSP metadata stream to receive per-cell motion coordinates. Falls back to a full-frame box when disabled or when the camera does not advertise the track.",
)
class OnvifConfig(FrigateBaseModel):
host: EnvString = Field(
default="",
@ -127,6 +152,11 @@ class OnvifConfig(FrigateBaseModel):
title="Autotracking",
description="Automatically track moving objects and keep them centered in the frame using PTZ camera movements.",
)
events: OnvifEventsConfig = Field(
default_factory=OnvifEventsConfig,
title="ONVIF events",
description="Consume camera-side ONVIF motion notifications instead of Frigate's CPU motion detector.",
)
ignore_time_mismatch: bool = Field(
default=False,
title="Ignore time mismatch",

View File

@ -47,7 +47,7 @@ from .camera.detect import DetectConfig
from .camera.ffmpeg import FfmpegConfig
from .camera.genai import GenAIConfig, GenAIRoleEnum
from .camera.mask import ObjectMaskConfig
from .camera.motion import MotionConfig
from .camera.motion import MotionConfig, MotionSourceEnum
from .camera.notification import NotificationConfig
from .camera.objects import FilterConfig, ObjectConfig
from .camera.record import RecordConfig
@ -380,10 +380,19 @@ def verify_autotrack_zones(camera_config: CameraConfig) -> ValueError | None:
def verify_motion_and_detect(camera_config: CameraConfig) -> ValueError | None:
"""Verify that motion detection is not disabled and object detection is enabled."""
if camera_config.detect.enabled and not camera_config.motion.enabled:
motion_via_onvif = camera_config.motion.source == MotionSourceEnum.onvif
if (
camera_config.detect.enabled
and not camera_config.motion.enabled
and not motion_via_onvif
):
raise ValueError(
f"Camera {camera_config.name} has motion detection disabled and object detection enabled but object detection requires motion detection."
)
if motion_via_onvif and not camera_config.onvif.events.enabled:
raise ValueError(
f"Camera {camera_config.name} has motion.source=onvif but onvif.events.enabled is false; enable ONVIF events to use them as the motion source."
)
def verify_objects_track(

View File

@ -13,17 +13,39 @@ import numpy
from onvif import ONVIFCamera, ONVIFError, ONVIFService
from zeep.exceptions import Fault, TransportError
from frigate.camera import PTZMetrics
from frigate.camera import CameraMetrics, PTZMetrics
from frigate.config import FrigateConfig, ZoomingModeEnum
from frigate.config.camera.updater import (
CameraConfigUpdateEnum,
CameraConfigUpdateSubscriber,
)
from frigate.ptz.onvif_events import run_pullpoint_subscription
from frigate.ptz.onvif_metadata import run_metadata_stream
from frigate.util.builtin import find_by_key
logger = logging.getLogger(__name__)
def _inject_rtsp_credentials(url: str, user: str | None, password: str | None) -> str:
"""Insert user:password into an rtsp:// URL if not already present.
The ONVIF GetStreamUri response typically returns an rtsp URL without
credentials, but downstream consumers (ffmpeg, RTSP libs) need them in
the URL because the camera challenges Basic/Digest on DESCRIBE.
"""
if not user or not password:
return url
if "@" in url.split("://", 1)[-1].split("/", 1)[0]:
# URL already has user:pass — don't touch it.
return url
if "://" not in url:
return url
scheme, rest = url.split("://", 1)
from urllib.parse import quote
return f"{scheme}://{quote(user, safe='')}:{quote(password, safe='')}@{rest}"
class OnvifCommandEnum(str, Enum):
"""Holds all possible move commands"""
@ -45,7 +67,10 @@ class OnvifController:
ptz_metrics: dict[str, PTZMetrics]
def __init__(
self, config: FrigateConfig, ptz_metrics: dict[str, PTZMetrics]
self,
config: FrigateConfig,
ptz_metrics: dict[str, PTZMetrics],
camera_metrics: dict[str, CameraMetrics] | None = None,
) -> None:
self.cams: dict[str, dict] = {}
self.failed_cams: dict[str, dict] = {}
@ -53,6 +78,7 @@ class OnvifController:
self.reset_timeout = 900 # 15 minutes
self.config = config
self.ptz_metrics = ptz_metrics
self.camera_metrics = camera_metrics or {}
self.status_locks: dict[str, asyncio.Lock] = {}
@ -107,7 +133,28 @@ class OnvifController:
async def _close_camera(self, cam_name: str) -> None:
"""Close the ONVIF client session for a camera."""
cam_state = self.cams.get(cam_name)
if cam_state and "onvif" in cam_state:
if not cam_state:
return
# Stop any long-running event-consumption tasks first so they release
# any resources held against the ONVIFCamera session before we close it.
for key in ("pullpoint", "metadata"):
handle = cam_state.get(key)
if not handle:
continue
task, stop_event = handle
try:
stop_event.set()
except Exception:
pass
task.cancel()
try:
await asyncio.wait_for(task, timeout=5.0)
except (asyncio.CancelledError, asyncio.TimeoutError):
pass
except Exception:
logger.debug(f"Error awaiting {key} task for {cam_name}")
cam_state.pop(key, None)
if "onvif" in cam_state:
try:
await cam_state["onvif"].close()
except Exception:
@ -187,6 +234,172 @@ class OnvifController:
logger.error(f"Onvif connection failed for {camera_name}: {e}")
return False
# Events init runs first, independent of PTZ capability. Many ONVIF
# cameras don't expose PTZ and would otherwise be skipped at the
# get_definition("ptz") check below.
await self._init_onvif_events(camera_name)
return await self._init_onvif_ptz(camera_name)
async def _init_onvif_events(self, camera_name: str) -> None:
"""Subscribe to PullPoint motion events and optionally open the
analytics metadata stream. Failure here is non-fatal PTZ init still
proceeds and the camera continues to work without external motion."""
cam_cfg = self.config.cameras[camera_name]
if not cam_cfg.onvif.events.enabled:
return
cm = self.camera_metrics.get(camera_name)
if cm is None:
logger.warning(
f"ONVIF events enabled for {camera_name} but no CameraMetrics "
"available; external motion will not be published"
)
return
onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
cell_layout = await self._discover_cell_layout(onvif, camera_name)
self.cams[camera_name]["cell_layout"] = cell_layout
def on_state(active: bool) -> None:
cm.external_motion_active.value = 1 if active else 0
if not active:
# Drop spatial data when motion ends — keep the consumer's
# snapshot consistent with the binary state.
try:
cm.external_motion_boxes[:] = []
except Exception:
pass
pp_stop = asyncio.Event()
pp_task = asyncio.create_task(
run_pullpoint_subscription(
onvif,
camera_name,
cam_cfg.onvif.events.subscription_timeout,
on_state,
pp_stop,
)
)
self.cams[camera_name]["pullpoint"] = (pp_task, pp_stop)
logger.info(f"ONVIF events: PullPoint subscriber started for {camera_name}")
if not cam_cfg.onvif.events.use_metadata_stream or cell_layout is None:
return
rtsp_url = await self._discover_primary_rtsp_url(onvif, camera_name)
if not rtsp_url:
logger.warning(
f"ONVIF events for {camera_name}: no primary RTSP URL "
"available; skipping metadata stream"
)
return
rtsp_url = _inject_rtsp_credentials(
rtsp_url, cam_cfg.onvif.user, cam_cfg.onvif.password
)
detect_size = (cam_cfg.detect.width, cam_cfg.detect.height)
def on_boxes(boxes: list[tuple[int, int, int, int]]) -> None:
try:
cm.external_motion_boxes[:] = boxes
except Exception:
logger.debug(f"Failed to publish boxes for {camera_name}")
md_stop = asyncio.Event()
md_task = asyncio.create_task(
run_metadata_stream(
rtsp_url,
camera_name,
cell_layout,
detect_size,
on_boxes,
md_stop,
)
)
self.cams[camera_name]["metadata"] = (md_task, md_stop)
logger.info(f"ONVIF events: metadata stream consumer started for {camera_name}")
async def _discover_cell_layout(
self, onvif: ONVIFCamera, camera_name: str
) -> tuple[int, int, tuple[float, float], tuple[float, float]] | None:
"""Query AnalyticsService.GetAnalyticsModules and extract the
CellMotionEngine's CellLayout (Columns, Rows, Translate, Scale).
Returns None on failure caller should fall back to a full-frame box."""
try:
analytics = await onvif.create_analytics_service()
modules = await analytics.GetAnalyticsModules(
{"ConfigurationToken": "VA_CFG_000"}
)
except Exception as e:
logger.debug(f"ONVIF analytics service unavailable for {camera_name}: {e}")
return None
try:
for mod in modules or []:
mod_type = getattr(mod, "Type", None) or getattr(mod, "_attr_1", None)
if mod_type and "CellMotionEngine" not in str(mod_type):
continue
element_items = getattr(mod.Parameters, "ElementItem", None) or []
for item in element_items:
if item.Name != "Layout":
continue
raw = item._value_1
if raw is None or not hasattr(raw, "attrib"):
continue
cols = int(raw.attrib.get("Columns", 0))
rows = int(raw.attrib.get("Rows", 0))
if cols <= 0 or rows <= 0:
continue
tx = ty = 0.0
sx = sy = 0.0
for child in raw.iter():
if child.tag.endswith("}Translate"):
tx = float(child.attrib.get("x", 0))
ty = float(child.attrib.get("y", 0))
elif child.tag.endswith("}Scale"):
sx = float(child.attrib.get("x", 0))
sy = float(child.attrib.get("y", 0))
logger.info(
f"ONVIF cell layout for {camera_name}: {cols}x{rows} "
f"translate=({tx},{ty}) scale=({sx},{sy})"
)
return (cols, rows, (tx, ty), (sx, sy))
except Exception as e:
logger.debug(
f"Failed parsing CellMotionEngine layout for {camera_name}: {e}"
)
return None
async def _discover_primary_rtsp_url(
self, onvif: ONVIFCamera, camera_name: str
) -> str | None:
"""Return the RTSP URL for the primary profile. The ONVIF analytics
metadata track is typically bound to the primary media profile only
(sub-streams may omit it)."""
try:
media = await onvif.create_media_service()
profiles = await media.GetProfiles()
if not profiles:
return None
uri = await media.GetStreamUri(
{
"StreamSetup": {
"Stream": "RTP-Unicast",
"Transport": {"Protocol": "RTSP"},
},
"ProfileToken": profiles[0].token,
}
)
return uri.Uri
except Exception as e:
logger.debug(f"GetStreamUri failed for {camera_name}: {e}")
return None
async def _init_onvif_ptz(self, camera_name: str) -> bool:
onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
# create init services
media: ONVIFService = await onvif.create_media_service()
logger.debug(f"Onvif media xaddr for {camera_name}: {media.xaddr}")

138
frigate/ptz/onvif_events.py Normal file
View File

@ -0,0 +1,138 @@
"""ONVIF PullPoint subscriber for camera-side motion events.
Long-running per-camera coroutine that subscribes to the camera's PullPoint
service via `onvif-zeep-async`'s `PullPointManager` (which owns subscription
creation, renewal, and lifecycle), pulls notification messages, parses
IsMotion/State on each round-trip, and invokes a callback on transitions.
Lives on the OnvifController's dedicated asyncio loop (see
`frigate/ptz/onvif.py` for the loop setup).
"""
from __future__ import annotations
import asyncio
import datetime as dt
import logging
from typing import TYPE_CHECKING, Awaitable, Callable
if TYPE_CHECKING:
from onvif import ONVIFCamera
try:
from zeep.exceptions import Fault
except ImportError: # tests can run without zeep installed
class Fault(Exception): # type: ignore[no-redef]
pass
logger = logging.getLogger(__name__)
# Names of the boolean state SimpleItem we accept inside the message Data
# block. Spec calls it IsMotion; the legacy MotionAlarm topic uses State.
_STATE_NAMES = ("IsMotion", "State")
# Bounds on backoff between subscription failures.
_BACKOFF_INITIAL_S = 1.0
_BACKOFF_MAX_S = 60.0
def _parse_motion_state(msg) -> bool | None:
"""Walk a NotificationMessage and return the IsMotion/State value, or
None if not present. The Message body is often an `lxml.etree._Element`
that python-onvif-zeep returns for ##any wildcards — walk via .iter()."""
body = getattr(msg, "Message", None)
if body is None:
return None
raw = getattr(body, "_value_1", body)
if not hasattr(raw, "iter"):
return None
for el in raw.iter():
if not el.tag.endswith("}SimpleItem"):
continue
name = el.attrib.get("Name", "")
if name not in _STATE_NAMES:
continue
val = el.attrib.get("Value", "").strip().lower()
if val in ("true", "1"):
return True
if val in ("false", "0"):
return False
return None
async def run_pullpoint_subscription(
onvif_cam: "ONVIFCamera",
cam_name: str,
timeout_seconds: int,
on_state: Callable[[bool], None] | Callable[[bool], Awaitable[None]],
stop_event: asyncio.Event,
) -> None:
"""Loop until stop_event: create a PullPointManager, pull messages,
dispatch on_state on transitions, reconnect on Fault with exponential
backoff."""
backoff = _BACKOFF_INITIAL_S
last_state: bool | None = None
while not stop_event.is_set():
manager = None
sub_lost = asyncio.Event()
def _subscription_lost() -> None:
sub_lost.set()
try:
manager = await onvif_cam.create_pullpoint_manager(
dt.timedelta(seconds=timeout_seconds),
_subscription_lost,
)
service = manager.get_service()
logger.info(f"ONVIF PullPoint subscribed for {cam_name}")
while not stop_event.is_set() and not sub_lost.is_set():
# Long-poll up to 10s. The subscription manager keeps the
# subscription itself alive in the background — we just pull.
msgs = await service.PullMessages(
{"Timeout": "PT10S", "MessageLimit": 32}
)
for m in msgs.NotificationMessage or []:
state = _parse_motion_state(m)
if state is None or state == last_state:
continue
last_state = state
try:
result = on_state(state)
if asyncio.iscoroutine(result):
await result
except Exception:
logger.exception(f"on_state callback error for {cam_name}")
if sub_lost.is_set():
raise Fault("PullPoint subscription lost")
# Clean exit (stop_event set) — leave the loop.
backoff = _BACKOFF_INITIAL_S
break
except asyncio.CancelledError:
raise
except Exception as e:
logger.warning(
f"ONVIF PullPoint subscription error for {cam_name}: {e!r}; "
f"reconnecting in {backoff:.1f}s"
)
finally:
if manager is not None:
try:
await manager.shutdown()
except Exception:
pass
if stop_event.is_set():
return
try:
await asyncio.wait_for(stop_event.wait(), timeout=backoff)
return
except asyncio.TimeoutError:
pass
backoff = min(backoff * 2, _BACKOFF_MAX_S)

View File

@ -0,0 +1,357 @@
"""ONVIF analytics metadata stream consumer.
Per-camera asyncio task that opens an RTSP connection to the camera's
primary profile, extracts the `application/vnd.onvif.metadata` data track
via an ffmpeg subprocess, and converts the per-frame `<tt:MotionInCells>`
bitmap into a list of motion rectangles in Frigate detect-frame pixels.
Why ffmpeg rather than an in-process RTSP client: Frigate already ships
ffmpeg and uses it heavily for video/recording; there is no async RTSP
client in the existing dependency set that handles the `vnd.onvif.metadata`
payload cleanly. The data track is low-bandwidth (~1 packet/sec at idle,
300 bytes XML each), so the subprocess cost is negligible.
Wire format (ONVIF Analytics Service Spec, Annex B "Cell Motion Detection"):
- Each RTP packet payload is one complete <tt:MetadataStream> XML doc.
- Cells attribute = base64(PackBits(bit-packed row-major bitmap)).
- Bits: cols*rows total, MSB-first within bytes, zero-padded.
Cell detect-frame mapping uses the CellLayout transformation discovered
at OnvifController init: Translate(tx, ty) + Scale(sx, sy) maps cell index
(c, r) to normalized ONVIF coords [-1, +1]. We convert that to detect-frame
pixels.
"""
import asyncio
import base64
import logging
from typing import Awaitable, Callable
from xml.etree import ElementTree as ET
import numpy as np
logger = logging.getLogger(__name__)
_TT_NS = "http://www.onvif.org/ver10/schema"
_MIC_TAG = f"{{{_TT_NS}}}MotionInCells"
# ffmpeg's -map 0:d:0 selects the first data track from the input. -c copy
# bypasses any transcode. -f data writes raw packet payloads to stdout.
# -flush_packets 1 disables muxer-side buffering so each metadata frame
# reaches us within ~1 packet of being received from the camera.
_FFMPEG_ARGS_TEMPLATE = (
"-nostdin",
"-loglevel",
"error",
"-rtsp_transport",
"tcp",
"-i",
"{url}",
"-map",
"0:d:0?",
"-c",
"copy",
"-flush_packets",
"1",
"-f",
"data",
"pipe:1",
)
# Each metadata document ends with this closing tag — we split incoming
# stdout on it to recover packet boundaries (no other framing on a `-f data`
# stream).
_DOC_TERMINATOR = b"</tt:MetadataStream>"
_BACKOFF_INITIAL_S = 1.0
_BACKOFF_MAX_S = 60.0
# Stop reading at this many bytes per single document — guards against a
# misbehaving stream filling memory if the terminator never arrives.
_MAX_DOC_BYTES = 64 * 1024
def _packbits_decode(packed: bytes) -> bytes:
"""ISO 12639 / TIFF 6.0 PackBits decoder."""
out = bytearray()
i = 0
n = len(packed)
while i < n:
h = packed[i]
i += 1
if h <= 0x7F:
count = h + 1
out += packed[i : i + count]
i += count
elif h == 0x80:
continue # no-op header
else:
count = 257 - h
if i >= n:
break
out += bytes([packed[i]]) * count
i += 1
return bytes(out)
def _decode_cells(cells_b64: str, cols: int, rows: int) -> np.ndarray | None:
"""Decode the Cells attribute into a 2-D uint8 array shape (rows, cols).
Returns None if the decoded length doesn't match what the layout
expects caller should treat that as "no spatial data this frame"
and fall back to whatever default (e.g. full-frame box)."""
if not cells_b64:
return None
try:
packed = base64.b64decode(cells_b64, validate=False)
except Exception:
return None
raw = _packbits_decode(packed)
needed_bytes = (cols * rows + 7) // 8
if len(raw) < needed_bytes:
return None
bits = np.unpackbits(np.frombuffer(raw[:needed_bytes], dtype=np.uint8))
bits = bits[: cols * rows]
return bits.reshape((rows, cols)).astype(np.uint8)
def _connected_component_bboxes(
cells: np.ndarray,
) -> list[tuple[int, int, int, int]]:
"""4-connectivity flood fill over a small 0/1 grid; returns list of
(c_left, c_top, c_right, c_bottom) inclusive cell-index bounding boxes
for each connected region.
cv2.connectedComponentsWithStats would be faster, but the cell grid is
tiny (typically 22x18 = 396 cells) and avoiding the cv2 import keeps
this module testable without OpenCV installed.
"""
rows, cols = cells.shape
visited = np.zeros_like(cells, dtype=bool)
out: list[tuple[int, int, int, int]] = []
for r0 in range(rows):
for c0 in range(cols):
if not cells[r0, c0] or visited[r0, c0]:
continue
stack = [(r0, c0)]
cmin = cmax = c0
rmin = rmax = r0
while stack:
r, c = stack.pop()
if r < 0 or r >= rows or c < 0 or c >= cols:
continue
if visited[r, c] or not cells[r, c]:
continue
visited[r, c] = True
if r < rmin:
rmin = r
if r > rmax:
rmax = r
if c < cmin:
cmin = c
if c > cmax:
cmax = c
stack.append((r + 1, c))
stack.append((r - 1, c))
stack.append((r, c + 1))
stack.append((r, c - 1))
out.append((cmin, rmin, cmax, rmax))
return out
def _cells_to_boxes(
cells: np.ndarray,
cell_layout: tuple[int, int, tuple[float, float], tuple[float, float]],
detect_size: tuple[int, int],
) -> list[tuple[int, int, int, int]]:
"""Connected-components on the cell grid → list of detect-frame boxes.
cell_layout = (cols, rows, (tx, ty), (sx, sy)) the Translate + Scale
from CellLayout.Transformation. detect_size = (width, height) in
detect-frame pixels.
"""
if cells is None or cells.size == 0 or not cells.any():
return []
cols, rows, (tx, ty), (sx, sy) = cell_layout
det_w, det_h = detect_size
if det_w <= 0 or det_h <= 0:
return []
boxes: list[tuple[int, int, int, int]] = []
# Map cell index → detect-frame pixel via the CellLayout transformation:
# cell (c, r) covers normalized [tx + c*sx, tx + (c+1)*sx] horizontally
# and similarly vertically. Convert normalized [-1, +1] → pixel.
def cell_to_px(
c: int, r: int, *, right_edge: bool, bottom_edge: bool
) -> tuple[int, int]:
cx_idx = c + 1 if right_edge else c
cy_idx = r + 1 if bottom_edge else r
nx = tx + cx_idx * sx
ny = ty + cy_idx * sy
px = int(round((nx + 1.0) * 0.5 * det_w))
py = int(round((ny + 1.0) * 0.5 * det_h))
return px, py
for c_left, c_top, c_right, c_bottom in _connected_component_bboxes(cells):
x1, y1 = cell_to_px(c_left, c_top, right_edge=False, bottom_edge=False)
x2, y2 = cell_to_px(c_right, c_bottom, right_edge=True, bottom_edge=True)
x1 = max(0, min(det_w - 1, x1))
y1 = max(0, min(det_h - 1, y1))
x2 = max(0, min(det_w - 1, x2))
y2 = max(0, min(det_h - 1, y2))
if x2 <= x1 or y2 <= y1:
continue
boxes.append((x1, y1, x2, y2))
return boxes
def _extract_cells_from_doc(doc_bytes: bytes) -> tuple[str | None, int, int]:
"""Parse a <tt:MetadataStream> XML doc, return (cells_b64, cols, rows).
Returns (None, 0, 0) if no MotionInCells element is found."""
try:
root = ET.fromstring(doc_bytes)
except ET.ParseError:
return None, 0, 0
for el in root.iter(_MIC_TAG):
cells_b64 = el.attrib.get("Cells")
try:
cols = int(el.attrib.get("Columns", "0"))
rows = int(el.attrib.get("Rows", "0"))
except ValueError:
return None, 0, 0
return cells_b64, cols, rows
return None, 0, 0
async def run_metadata_stream(
rtsp_url: str,
cam_name: str,
cell_layout: tuple[int, int, tuple[float, float], tuple[float, float]],
detect_size: tuple[int, int],
on_boxes: Callable[[list[tuple[int, int, int, int]]], None]
| Callable[[list[tuple[int, int, int, int]]], Awaitable[None]],
stop_event: asyncio.Event,
) -> None:
"""Loop until stop_event: spawn ffmpeg → read XML docs → decode → on_boxes."""
backoff = _BACKOFF_INITIAL_S
while not stop_event.is_set():
proc = None
try:
args = [a.format(url=rtsp_url) for a in _FFMPEG_ARGS_TEMPLATE]
proc = await asyncio.create_subprocess_exec(
"ffmpeg",
*args,
stdin=asyncio.subprocess.DEVNULL,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
logger.info(
f"ONVIF metadata stream: ffmpeg started for {cam_name} pid={proc.pid}"
)
await _consume_ffmpeg(
proc, cam_name, cell_layout, detect_size, on_boxes, stop_event
)
backoff = _BACKOFF_INITIAL_S
except asyncio.CancelledError:
raise
except Exception as e:
logger.warning(
f"ONVIF metadata stream error for {cam_name}: {e!r}; "
f"reconnecting in {backoff:.1f}s"
)
finally:
if proc is not None and proc.returncode is None:
proc.terminate()
try:
await asyncio.wait_for(proc.wait(), timeout=2.0)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
if stop_event.is_set():
return
try:
await asyncio.wait_for(stop_event.wait(), timeout=backoff)
return
except asyncio.TimeoutError:
pass
backoff = min(backoff * 2, _BACKOFF_MAX_S)
async def _consume_ffmpeg(
proc: asyncio.subprocess.Process,
cam_name: str,
cell_layout: tuple[int, int, tuple[float, float], tuple[float, float]],
detect_size: tuple[int, int],
on_boxes,
stop_event: asyncio.Event,
) -> None:
"""Read XML docs from ffmpeg stdout and dispatch boxes."""
layout_cols, layout_rows, _, _ = cell_layout
assert proc.stdout is not None
buf = bytearray()
while not stop_event.is_set():
chunk = await proc.stdout.read(4096)
if not chunk:
# ffmpeg exited or stream ended.
stderr_tail = b""
if proc.stderr is not None:
try:
stderr_tail = await asyncio.wait_for(
proc.stderr.read(4096), timeout=0.5
)
except asyncio.TimeoutError:
pass
raise RuntimeError(
f"ffmpeg exited for {cam_name} rc={proc.returncode} "
f"stderr={stderr_tail.decode('utf-8', 'replace').strip()[:200]}"
)
buf.extend(chunk)
if len(buf) > _MAX_DOC_BYTES * 4:
# Drop the head to avoid unbounded growth on a wedged stream.
buf = buf[-_MAX_DOC_BYTES:]
while True:
end = buf.find(_DOC_TERMINATOR)
if end < 0:
break
end += len(_DOC_TERMINATOR)
doc = bytes(buf[:end])
del buf[:end]
cells_b64, cols, rows = _extract_cells_from_doc(doc)
if cells_b64 is None:
continue
# Trust the layout we discovered at init; warn (don't fail) if the
# camera reports a different grid mid-stream.
if cols != layout_cols or rows != layout_rows:
logger.debug(
f"{cam_name}: MotionInCells grid {cols}x{rows} differs "
f"from discovered layout {layout_cols}x{layout_rows}"
)
use_layout = (
cols,
rows,
cell_layout[2],
(2.0 / cols if cols else 0, 2.0 / rows if rows else 0),
)
else:
use_layout = cell_layout
cells = _decode_cells(cells_b64, cols, rows)
if cells is None:
continue
boxes = _cells_to_boxes(cells, use_layout, detect_size)
try:
result = on_boxes(boxes)
if asyncio.iscoroutine(result):
await result
except Exception:
logger.exception(f"on_boxes callback error for {cam_name}")

View File

@ -0,0 +1,64 @@
"""Validator tests for `motion.source=onvif` interactions with
`onvif.events.enabled` and `motion.enabled`. Exercises `verify_motion_and_detect`
directly so we don't need the full FrigateConfig path (which mounts /config)."""
import unittest
from frigate.config.config import verify_motion_and_detect
class _Dummy:
"""Light shim for the nested config attributes the validator reads."""
def __init__(self, **kw):
for k, v in kw.items():
setattr(self, k, v)
def _camera(*, name, detect, motion, onvif):
return _Dummy(name=name, detect=detect, motion=motion, onvif=onvif)
class TestVerifyMotionAndDetect(unittest.TestCase):
def test_internal_motion_with_detect_passes(self):
cam = _camera(
name="c",
detect=_Dummy(enabled=True),
motion=_Dummy(enabled=True, source="internal"),
onvif=_Dummy(events=_Dummy(enabled=False)),
)
# No exception.
self.assertIsNone(verify_motion_and_detect(cam))
def test_detect_with_motion_disabled_rejected(self):
cam = _camera(
name="c",
detect=_Dummy(enabled=True),
motion=_Dummy(enabled=False, source="internal"),
onvif=_Dummy(events=_Dummy(enabled=False)),
)
with self.assertRaisesRegex(ValueError, "object detection requires motion"):
verify_motion_and_detect(cam)
def test_source_onvif_requires_events_enabled(self):
cam = _camera(
name="c",
detect=_Dummy(enabled=True),
motion=_Dummy(enabled=False, source="onvif"),
onvif=_Dummy(events=_Dummy(enabled=False)),
)
with self.assertRaisesRegex(ValueError, "onvif.events.enabled is false"):
verify_motion_and_detect(cam)
def test_source_onvif_with_events_passes_even_with_motion_disabled(self):
cam = _camera(
name="c",
detect=_Dummy(enabled=True),
motion=_Dummy(enabled=False, source="onvif"),
onvif=_Dummy(events=_Dummy(enabled=True)),
)
self.assertIsNone(verify_motion_and_detect(cam))
if __name__ == "__main__":
unittest.main()

View File

@ -0,0 +1,152 @@
"""Unit tests for the ONVIF analytics metadata decoder + cell→box mapper."""
import base64
import unittest
import numpy as np
from frigate.ptz.onvif_metadata import (
_cells_to_boxes,
_decode_cells,
_extract_cells_from_doc,
_packbits_decode,
)
class TestPackBits(unittest.TestCase):
def test_spec_example(self):
# ONVIF Analytics Annex B worked example:
# raw = ff ff ff f0 f0 f0
# packed (PackBits) = fe ff fe f0
packed = bytes.fromhex("feff fef0".replace(" ", ""))
self.assertEqual(
_packbits_decode(packed),
bytes.fromhex("ffff fff0 f0f0".replace(" ", "")),
)
def test_idle_frame_from_live_camera(self):
# `zwA=` is a representative idle-frame payload from a 22×18 grid:
# 396 bits → 50 bytes after byte-padding; PackBits compresses 50
# zeros to `cf 00` → base64 `zwA=`.
packed = base64.b64decode("zwA=")
self.assertEqual(packed, bytes.fromhex("cf 00".replace(" ", "")))
raw = _packbits_decode(packed)
self.assertEqual(len(raw), 50)
self.assertEqual(raw, b"\x00" * 50)
def test_literal_run(self):
# Header 0x03 → 4 literal bytes follow.
self.assertEqual(_packbits_decode(b"\x03ABCD"), b"ABCD")
def test_noop_header(self):
# 0x80 is no-op per spec; literal after should still decode normally.
# \x80 → no-op; \x02 → literal of 3 bytes follows; "ABC" copied.
self.assertEqual(_packbits_decode(b"\x80\x02ABC"), b"ABC")
class TestDecodeCells(unittest.TestCase):
def test_idle(self):
cells = _decode_cells("zwA=", 22, 18)
self.assertIsNotNone(cells)
self.assertEqual(cells.shape, (18, 22))
self.assertEqual(int(cells.sum()), 0)
def test_invalid_base64(self):
self.assertIsNone(_decode_cells("not-base64!@", 22, 18))
def test_short_payload_returns_none(self):
# `cf 00` decodes to 50 bytes; ask for 100×100 grid (1250 bytes
# needed) → expect None.
self.assertIsNone(_decode_cells("zwA=", 100, 100))
def test_top_left_active(self):
# Build a 22×18 grid with only cell (0,0) active. Raw bitmap byte 0
# = 0x80 (MSB set), bytes 1..49 = 0x00. PackBits of that 50-byte
# sequence: literal-1-byte (header 0x00) of 0x80, then replicate of
# 49 zeros (header 257-49=208=0xD0, byte 0x00).
packed = bytes([0x00, 0x80, 0xD0, 0x00])
b64 = base64.b64encode(packed).decode()
cells = _decode_cells(b64, 22, 18)
self.assertIsNotNone(cells)
self.assertEqual(int(cells[0, 0]), 1)
self.assertEqual(int(cells.sum()), 1)
class TestCellsToBoxes(unittest.TestCase):
"""Verify the cell-grid → detect-frame pixel mapping using a representative
CellLayout (22x18, Translate(-1,-1), Scale(2/22, 2/18))."""
LAYOUT = (22, 18, (-1.0, -1.0), (2.0 / 22, 2.0 / 18))
DETECT = (1280, 720)
def test_empty(self):
cells = np.zeros((18, 22), dtype=np.uint8)
self.assertEqual(_cells_to_boxes(cells, self.LAYOUT, self.DETECT), [])
def test_top_left_cell(self):
cells = np.zeros((18, 22), dtype=np.uint8)
cells[0, 0] = 1
boxes = _cells_to_boxes(cells, self.LAYOUT, self.DETECT)
self.assertEqual(len(boxes), 1)
x1, y1, x2, y2 = boxes[0]
# Cell (0,0) covers normalized [-1, -1+2/22] × [-1, -1+2/18]
# → detect px [0, 1280/22] × [0, 720/18] = [0, ~58] × [0, 40]
self.assertEqual(x1, 0)
self.assertEqual(y1, 0)
self.assertAlmostEqual(x2, round(1280 / 22), delta=2)
self.assertAlmostEqual(y2, round(720 / 18), delta=2)
def test_bottom_right_cell(self):
cells = np.zeros((18, 22), dtype=np.uint8)
cells[17, 21] = 1
boxes = _cells_to_boxes(cells, self.LAYOUT, self.DETECT)
self.assertEqual(len(boxes), 1)
x1, y1, x2, y2 = boxes[0]
# Bottom-right edge clamps to detect_size - 1.
self.assertEqual(x2, self.DETECT[0] - 1)
self.assertEqual(y2, self.DETECT[1] - 1)
self.assertAlmostEqual(x1, round(21 * 1280 / 22), delta=2)
self.assertAlmostEqual(y1, round(17 * 720 / 18), delta=2)
def test_two_separated_regions(self):
cells = np.zeros((18, 22), dtype=np.uint8)
# Region A: top-left 2×2 block
cells[0:2, 0:2] = 1
# Region B: bottom-right 2×2 block (separated by inactive cells)
cells[15:17, 18:20] = 1
boxes = _cells_to_boxes(cells, self.LAYOUT, self.DETECT)
self.assertEqual(len(boxes), 2)
class TestExtractCellsFromDoc(unittest.TestCase):
def test_typical_frame(self):
doc = (
b'<tt:MetadataStream xmlns:tt="http://www.onvif.org/ver10/schema">'
b"<tt:VideoAnalytics>"
b'<tt:Frame UtcTime="2026-05-29T14:12:20Z">'
b"<tt:Extension>"
b'<tt:MotionInCells Columns="22" Rows="18" Cells="zwA="/>'
b"</tt:Extension></tt:Frame></tt:VideoAnalytics></tt:MetadataStream>"
)
cells_b64, cols, rows = _extract_cells_from_doc(doc)
self.assertEqual(cells_b64, "zwA=")
self.assertEqual(cols, 22)
self.assertEqual(rows, 18)
def test_malformed_xml(self):
self.assertEqual(
_extract_cells_from_doc(b"not-xml"),
(None, 0, 0),
)
def test_doc_without_motioncells(self):
doc = (
b'<tt:MetadataStream xmlns:tt="http://www.onvif.org/ver10/schema">'
b"<tt:VideoAnalytics><tt:Frame/></tt:VideoAnalytics>"
b"</tt:MetadataStream>"
)
self.assertEqual(_extract_cells_from_doc(doc), (None, 0, 0))
if __name__ == "__main__":
unittest.main()

View File

@ -0,0 +1,72 @@
"""Unit tests for the ONVIF PullPoint motion-state parser."""
import unittest
from xml.etree import ElementTree as ET
from frigate.ptz.onvif_events import _parse_motion_state
class FakeMessage:
"""Mimic the zeep NotificationMessage shape: a Message attribute holding
an object whose `_value_1` is an lxml/etree element."""
class _Body:
def __init__(self, element):
self._value_1 = element
def __init__(self, xml: str):
self.Message = self._Body(ET.fromstring(xml))
_NS = 'xmlns:tt="http://www.onvif.org/ver10/schema"'
def _build_msg(name: str, value: str) -> FakeMessage:
xml = (
f"<tt:Message {_NS}>"
"<tt:Source>"
'<tt:SimpleItem Name="Source" Value="VideoSourceToken"/>'
"</tt:Source>"
"<tt:Data>"
f'<tt:SimpleItem Name="{name}" Value="{value}"/>'
"</tt:Data>"
"</tt:Message>"
)
return FakeMessage(xml)
class TestParseMotionState(unittest.TestCase):
def test_is_motion_true(self):
self.assertTrue(_parse_motion_state(_build_msg("IsMotion", "true")))
def test_is_motion_false(self):
self.assertFalse(_parse_motion_state(_build_msg("IsMotion", "false")))
def test_legacy_state_topic_name(self):
# The legacy tns1:VideoSource/MotionAlarm payload uses "State" instead
# of the spec-compliant "IsMotion"; we accept either.
self.assertTrue(_parse_motion_state(_build_msg("State", "true")))
self.assertFalse(_parse_motion_state(_build_msg("State", "false")))
def test_boolean_aliases(self):
self.assertTrue(_parse_motion_state(_build_msg("IsMotion", "1")))
self.assertFalse(_parse_motion_state(_build_msg("IsMotion", "0")))
def test_no_state_returns_none(self):
# Missing the State/IsMotion SimpleItem.
xml = (
f"<tt:Message {_NS}>"
'<tt:Data><tt:SimpleItem Name="Other" Value="yes"/></tt:Data>'
"</tt:Message>"
)
self.assertIsNone(_parse_motion_state(FakeMessage(xml)))
def test_no_message_returns_none(self):
class Empty:
pass
self.assertIsNone(_parse_motion_state(Empty()))
if __name__ == "__main__":
unittest.main()

View File

@ -14,6 +14,7 @@ from frigate.camera import CameraMetrics, PTZMetrics
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import CameraConfig, DetectConfig, LoggerConfig, ModelConfig
from frigate.config.camera.camera import CameraTypeEnum
from frigate.config.camera.motion import MotionSourceEnum
from frigate.config.camera.updater import (
CameraConfigUpdateEnum,
CameraConfigUpdateSubscriber,
@ -300,7 +301,22 @@ def process_frames(
continue
# look for motion if enabled
motion_boxes = motion_detector.detect(frame)
if camera_config.motion.source == MotionSourceEnum.onvif:
# Motion is supplied by an external ONVIF cell-motion subscriber
# writing to camera_metrics. Skip the per-frame internal detector.
if camera_metrics.external_motion_active.value:
boxes = list(camera_metrics.external_motion_boxes)
if boxes:
motion_boxes = [tuple(b) for b in boxes]
else:
# Active but no spatial data yet — fall back to full frame
# so downstream region clustering still has something to
# scan.
motion_boxes = [(0, 0, frame_shape[1] - 1, frame_shape[0] - 1)]
else:
motion_boxes = []
else:
motion_boxes = motion_detector.detect(frame)
regions = []
consolidated_detections = []

View File

@ -262,6 +262,10 @@
"label": "Enable motion detection",
"description": "Enable or disable motion detection for this camera."
},
"source": {
"label": "Motion source",
"description": "Where motion state comes from: Frigate's internal frame analyser, or the camera's ONVIF cell-motion events (requires onvif.events.enabled)."
},
"threshold": {
"label": "Motion threshold",
"description": "Pixel difference threshold used by the motion detector; higher values reduce sensitivity (range 1-255)."
@ -843,6 +847,22 @@
"description": "Internal field to track whether autotracking was enabled in configuration."
}
},
"events": {
"label": "ONVIF events",
"description": "Consume camera-side ONVIF motion notifications instead of Frigate's CPU motion detector.",
"enabled": {
"label": "Enable ONVIF events",
"description": "Subscribe to the camera's ONVIF cell-motion notifications and use them as Frigate's motion signal."
},
"subscription_timeout": {
"label": "Subscription timeout",
"description": "Seconds before the PullPoint subscription expires and is renewed."
},
"use_metadata_stream": {
"label": "Use metadata stream",
"description": "Open the ONVIF analytics RTSP metadata stream to receive per-cell motion coordinates. Falls back to a full-frame box when disabled or when the camera does not advertise the track."
}
},
"ignore_time_mismatch": {
"label": "Ignore time mismatch",
"description": "Ignore time synchronization differences between camera and Frigate server for ONVIF communication."

View File

@ -777,6 +777,10 @@
"label": "Enable motion detection",
"description": "Enable or disable motion detection for all cameras; can be overridden per-camera."
},
"source": {
"label": "Motion source",
"description": "Where motion state comes from: Frigate's internal frame analyser, or the camera's ONVIF cell-motion events (requires onvif.events.enabled)."
},
"threshold": {
"label": "Motion threshold",
"description": "Pixel difference threshold used by the motion detector; higher values reduce sensitivity (range 1-255)."
@ -1627,6 +1631,22 @@
"description": "Internal field to track whether autotracking was enabled in configuration."
}
},
"events": {
"label": "ONVIF events",
"description": "Consume camera-side ONVIF motion notifications instead of Frigate's CPU motion detector.",
"enabled": {
"label": "Enable ONVIF events",
"description": "Subscribe to the camera's ONVIF cell-motion notifications and use them as Frigate's motion signal."
},
"subscription_timeout": {
"label": "Subscription timeout",
"description": "Seconds before the PullPoint subscription expires and is renewed."
},
"use_metadata_stream": {
"label": "Use metadata stream",
"description": "Open the ONVIF analytics RTSP metadata stream to receive per-cell motion coordinates. Falls back to a full-frame box when disabled or when the camera does not advertise the track."
}
},
"ignore_time_mismatch": {
"label": "Ignore time mismatch",
"description": "Ignore time synchronization differences between camera and Frigate server for ONVIF communication."