Compare commits

...

5 Commits

Author SHA1 Message Date
Dmitry Ilyin
c9d5faa2bc
Merge b420efdebd into bc816926a5 2026-06-14 11:51:44 +12:00
Josh Hawkins
bc816926a5
Replace export ffmpeg argument blocklist with a structural allowlist (#23478)
Some checks are pending
CI / Synaptics Build (push) Blocked by required conditions
CI / AMD64 Build (push) Waiting to run
CI / ARM Build (push) Waiting to run
CI / Jetson Jetpack 6 (push) Waiting to run
CI / AMD64 Extra Build (push) Blocked by required conditions
CI / ARM Extra Build (push) Blocked by required conditions
CI / Assemble and push default build (push) Blocked by required conditions
* use allowlist for custom export ffmpeg args

* reject brackets in export filtergraph validation instead of stripping link labels
2026-06-13 16:43:22 -06:00
Josh Hawkins
b79ad9871a
generate the API docs OpenAPI spec from the app with per-endpoint auth requirements (#23476) 2026-06-13 16:04:22 -06:00
Josh Hawkins
8be7a97fa6
guard norfair distance against non-finite and zero-area boxes that could crash autotracking cameras (#23475) 2026-06-13 16:23:30 -05:00
Dmitry Ilyin
b420efdebd Native ONVIF cell-motion ingest
Adds a per-camera ONVIF subscriber that lets cameras with native
hardware motion detection (e.g. OpenIPC firmware for HiSilicon,
Ingenic and SigmaStar SoCs; many ONVIF Profile-M devices) replace
Frigate's per-frame CPU motion analysis. Two standard ONVIF
transports are consumed in parallel:

- WS-BaseNotification PullPoint for the binary motion state
  (tns1:RuleEngine/CellMotionDetector/Motion IsMotion=true|false,
   with tns1:VideoSource/MotionAlarm State=true|false accepted as
   a fallback for cameras that only publish the legacy topic).
- RTSP analytics metadata stream (application/vnd.onvif.metadata)
  for the per-frame cell grid (tt:MotionInCells, base64 + PackBits
  bit-packed bitmap). Cell layout is discovered once at startup via
  AnalyticsService.GetAnalyticsModules and the camera's CellLayout
  transformation is used to map cells to detect-frame pixel
  rectangles via connected-components.

New config:
  onvif.events.{enabled, subscription_timeout, use_metadata_stream}
  motion.source: internal (default) | onvif

When motion.source: onvif, ImprovedMotionDetector is skipped and
motion_boxes come from the camera. Internal motion remains the
default; the new path is fully opt-in.
2026-05-30 19:43:33 +03:00
24 changed files with 4817 additions and 1513 deletions

View File

@ -125,5 +125,7 @@ jobs:
run: devcontainer up --workspace-folder .
- name: Run mypy in devcontainer
run: devcontainer exec --workspace-folder . bash -lc "python3 -u -m mypy --config-file frigate/mypy.ini frigate"
- name: Check API spec is up to date
run: devcontainer exec --workspace-folder . bash -lc "python3 generate_api_auth_spec.py --check"
- name: Run unit tests in devcontainer
run: devcontainer exec --workspace-folder . bash -lc "python3 -u -m unittest"

View File

@ -235,6 +235,14 @@ ruff check frigate/
# Type check
python3 -u -m mypy --config-file frigate/mypy.ini frigate
# Regenerate the OpenAPI spec after adding, changing, or removing an API
# endpoint or its auth dependency — outputs docs/static/frigate-api.yaml,
# annotated with each endpoint's auth requirement (admin / any / camera /
# public). NEVER edit that file by hand. CI runs the --check variant and fails
# if it is out of date. (from repo root)
python3 generate_api_auth_spec.py
python3 generate_api_auth_spec.py --check
```
### Frontend (from web/ directory)
@ -316,6 +324,8 @@ async def get_events(request: Request, limit: int = 100):
# Implementation
```
After adding, changing, or removing an endpoint (or its auth dependency), regenerate the OpenAPI spec with `python3 generate_api_auth_spec.py` so `docs/static/frigate-api.yaml` stays in sync and the endpoint's auth requirement is documented. CI enforces this via the `--check` variant; never edit that file by hand.
### Configuration Access
```python

View File

@ -198,6 +198,46 @@ When the skip threshold is exceeded, **no motion is reported** for that frame, m
:::
## Using Camera-Side ONVIF Motion Detection
For cameras that publish their own ONVIF cell-motion analytics (e.g. OpenIPC firmware for HiSilicon, Ingenic and SigmaStar SoCs, plus most ONVIF Profile-M devices from Hikvision, Reolink, Foscam, Amcrest, etc.), Frigate can use the camera's hardware motion engine instead of running per-frame analysis on the host CPU. This both removes CPU load from the Frigate machine and gives a more accurate motion signal than encoded-stream analysis can produce.
Frigate consumes the two standard ONVIF transports:
- **PullPoint** event subscription on `tns1:RuleEngine/CellMotionDetector/Motion` carries the binary on/off state (the legacy `tns1:VideoSource/MotionAlarm` payload is also accepted).
- **RTSP analytics metadata stream** (the `application/vnd.onvif.metadata` track on the primary RTSP profile) carries the per-frame cell grid (`tt:MotionInCells`) which Frigate decodes (base64 + PackBits) and maps through the `CellLayout` transformation into Frigate's detect-frame pixel coordinates.
```yaml
cameras:
back_door:
onvif:
host: 10.0.0.10
port: 80
user: root
password: "secret"
events:
# Subscribe to camera-side motion events.
enabled: true
# Seconds before the PullPoint subscription expires (we renew at half this).
subscription_timeout: 60
# Open the RTSP analytics metadata stream for per-cell motion coordinates.
# Disable if your camera only publishes the binary event topic.
use_metadata_stream: true
motion:
# Use the camera's ONVIF events as Frigate's motion signal. The internal
# CPU motion detector is skipped.
source: onvif
detect:
enabled: true
```
When `motion.source: onvif`:
- Frigate's internal `ImprovedMotionDetector` is **not** run on the camera's frames.
- Object detection still runs every detection frame; motion boxes are used for region clustering exactly as with the internal detector.
- If `use_metadata_stream: true` but the camera doesn't advertise the metadata track (or PackBits decoding fails for a frame), Frigate falls back to a full-frame motion box while the binary event signal is active.
- The validator requires `onvif.events.enabled: true` whenever `motion.source: onvif`.
## Reviewing Detected Motion
To review what the detector picked up — or to search past recordings for motion in a specific region — see [Reviewing Motion](/usage/review#reviewing-motion) on the Review page.

File diff suppressed because it is too large Load Diff

View File

@ -309,7 +309,9 @@ class FrigateApp:
self.detection_proxy = DetectorProxy()
def init_onvif(self) -> None:
self.onvif_controller = OnvifController(self.config, self.ptz_metrics)
self.onvif_controller = OnvifController(
self.config, self.ptz_metrics, self.camera_metrics
)
def init_dispatcher(self) -> None:
comms: list[Communicator] = []

View File

@ -1,6 +1,6 @@
import multiprocessing as mp
import queue
from multiprocessing.managers import SyncManager, ValueProxy
from multiprocessing.managers import ListProxy, SyncManager, ValueProxy
from multiprocessing.sharedctypes import Synchronized
from multiprocessing.synchronize import Event
@ -23,6 +23,14 @@ class CameraMetrics:
reconnects_last_hour: ValueProxy[int]
stalls_last_hour: ValueProxy[int]
# External motion published by OnvifController when motion.source=onvif.
# external_motion_active mirrors the PullPoint IsMotion state.
# external_motion_boxes carries the per-frame cell-derived rectangles in
# detect-frame pixel coordinates; empty list means no current spatial
# data (consumer should fall back to a full-frame box when active=1).
external_motion_active: ValueProxy[int]
external_motion_boxes: ListProxy
def __init__(self, manager: SyncManager):
self.camera_fps = manager.Value("d", 0)
self.detection_fps = manager.Value("d", 0)
@ -41,6 +49,9 @@ class CameraMetrics:
self.reconnects_last_hour = manager.Value("i", 0)
self.stalls_last_hour = manager.Value("i", 0)
self.external_motion_active = manager.Value("b", 0)
self.external_motion_boxes = manager.list()
class PTZMetrics:
autotracker_enabled: Synchronized

View File

@ -1,3 +1,4 @@
from enum import Enum
from typing import Any, Optional
from pydantic import Field, field_serializer
@ -5,7 +6,12 @@ from pydantic import Field, field_serializer
from ..base import FrigateBaseModel
from .mask import MotionMaskConfig
__all__ = ["MotionConfig"]
__all__ = ["MotionConfig", "MotionSourceEnum"]
class MotionSourceEnum(str, Enum):
internal = "internal"
onvif = "onvif"
class MotionConfig(FrigateBaseModel):
@ -14,6 +20,11 @@ class MotionConfig(FrigateBaseModel):
title="Enable motion detection",
description="Enable or disable motion detection for all cameras; can be overridden per-camera.",
)
source: MotionSourceEnum = Field(
default=MotionSourceEnum.internal,
title="Motion source",
description="Where motion state comes from: Frigate's internal frame analyser, or the camera's ONVIF cell-motion events (requires onvif.events.enabled).",
)
threshold: int = Field(
default=30,
title="Motion threshold",

View File

@ -7,7 +7,12 @@ from ..base import FrigateBaseModel
from ..env import EnvString
from .objects import DEFAULT_TRACKED_OBJECTS
__all__ = ["OnvifConfig", "PtzAutotrackConfig", "ZoomingModeEnum"]
__all__ = [
"OnvifConfig",
"OnvifEventsConfig",
"PtzAutotrackConfig",
"ZoomingModeEnum",
]
class ZoomingModeEnum(str, Enum):
@ -91,6 +96,26 @@ class PtzAutotrackConfig(FrigateBaseModel):
return weights
class OnvifEventsConfig(FrigateBaseModel):
enabled: bool = Field(
default=False,
title="Enable ONVIF events",
description="Subscribe to the camera's ONVIF cell-motion notifications and use them as Frigate's motion signal.",
)
subscription_timeout: int = Field(
default=60,
ge=10,
le=600,
title="Subscription timeout",
description="Seconds before the PullPoint subscription expires and is renewed.",
)
use_metadata_stream: bool = Field(
default=True,
title="Use metadata stream",
description="Open the ONVIF analytics RTSP metadata stream to receive per-cell motion coordinates. Falls back to a full-frame box when disabled or when the camera does not advertise the track.",
)
class OnvifConfig(FrigateBaseModel):
host: EnvString = Field(
default="",
@ -127,6 +152,11 @@ class OnvifConfig(FrigateBaseModel):
title="Autotracking",
description="Automatically track moving objects and keep them centered in the frame using PTZ camera movements.",
)
events: OnvifEventsConfig = Field(
default_factory=OnvifEventsConfig,
title="ONVIF events",
description="Consume camera-side ONVIF motion notifications instead of Frigate's CPU motion detector.",
)
ignore_time_mismatch: bool = Field(
default=False,
title="Ignore time mismatch",

View File

@ -47,7 +47,7 @@ from .camera.detect import DetectConfig
from .camera.ffmpeg import FfmpegConfig
from .camera.genai import GenAIConfig, GenAIRoleEnum
from .camera.mask import ObjectMaskConfig
from .camera.motion import MotionConfig
from .camera.motion import MotionConfig, MotionSourceEnum
from .camera.notification import NotificationConfig
from .camera.objects import FilterConfig, ObjectConfig
from .camera.record import RecordConfig
@ -380,10 +380,19 @@ def verify_autotrack_zones(camera_config: CameraConfig) -> ValueError | None:
def verify_motion_and_detect(camera_config: CameraConfig) -> ValueError | None:
"""Verify that motion detection is not disabled and object detection is enabled."""
if camera_config.detect.enabled and not camera_config.motion.enabled:
motion_via_onvif = camera_config.motion.source == MotionSourceEnum.onvif
if (
camera_config.detect.enabled
and not camera_config.motion.enabled
and not motion_via_onvif
):
raise ValueError(
f"Camera {camera_config.name} has motion detection disabled and object detection enabled but object detection requires motion detection."
)
if motion_via_onvif and not camera_config.onvif.events.enabled:
raise ValueError(
f"Camera {camera_config.name} has motion.source=onvif but onvif.events.enabled is false; enable ONVIF events to use them as the motion source."
)
def verify_objects_track(

View File

@ -48,6 +48,22 @@ def ptz_moving_at_frame_time(frame_time, ptz_start_time, ptz_stop_time):
)
def transform_is_finite(coord_transformations) -> bool:
"""Return True if a norfair coordinate transform contains only finite values.
A near-singular homography (common when the motion estimator can't find
enough stable features during zoom on a low-texture scene) can produce
inf/nan matrix entries. norfair accumulates the homography across frames, so
a single bad transform poisons every subsequent one and propagates nan into
the tracker's distance function, crashing the camera process.
"""
for attr in ("homography_matrix", "inverse_homography_matrix", "movement_vector"):
value = getattr(coord_transformations, attr, None)
if value is not None and not np.all(np.isfinite(value)):
return False
return True
class PtzMotionEstimator:
def __init__(self, config: CameraConfig, ptz_metrics: PTZMetrics) -> None:
self.frame_manager = SharedMemoryFrameManager()
@ -135,6 +151,19 @@ class PtzMotionEstimator:
)
self.coord_transformations = None
# A degenerate homography can yield non-finite transform values that
# norfair would accumulate and feed to the tracker as nan estimates.
# Drop the bad transform and request a reset so the estimator rebuilds
# a fresh reference frame instead of poisoning every following frame.
if self.coord_transformations is not None and not transform_is_finite(
self.coord_transformations
):
logger.warning(
f"Autotracker: motion estimator produced a non-finite transform for {camera} at frame time {frame_time}, resetting"
)
self.coord_transformations = None
self.ptz_metrics.reset.set()
try:
logger.debug(
f"{camera}: Motion estimator transformation: {self.coord_transformations.rel_to_abs([[0, 0]])}"

View File

@ -13,17 +13,39 @@ import numpy
from onvif import ONVIFCamera, ONVIFError, ONVIFService
from zeep.exceptions import Fault, TransportError
from frigate.camera import PTZMetrics
from frigate.camera import CameraMetrics, PTZMetrics
from frigate.config import FrigateConfig, ZoomingModeEnum
from frigate.config.camera.updater import (
CameraConfigUpdateEnum,
CameraConfigUpdateSubscriber,
)
from frigate.ptz.onvif_events import run_pullpoint_subscription
from frigate.ptz.onvif_metadata import run_metadata_stream
from frigate.util.builtin import find_by_key
logger = logging.getLogger(__name__)
def _inject_rtsp_credentials(url: str, user: str | None, password: str | None) -> str:
"""Insert user:password into an rtsp:// URL if not already present.
The ONVIF GetStreamUri response typically returns an rtsp URL without
credentials, but downstream consumers (ffmpeg, RTSP libs) need them in
the URL because the camera challenges Basic/Digest on DESCRIBE.
"""
if not user or not password:
return url
if "@" in url.split("://", 1)[-1].split("/", 1)[0]:
# URL already has user:pass — don't touch it.
return url
if "://" not in url:
return url
scheme, rest = url.split("://", 1)
from urllib.parse import quote
return f"{scheme}://{quote(user, safe='')}:{quote(password, safe='')}@{rest}"
class OnvifCommandEnum(str, Enum):
"""Holds all possible move commands"""
@ -45,7 +67,10 @@ class OnvifController:
ptz_metrics: dict[str, PTZMetrics]
def __init__(
self, config: FrigateConfig, ptz_metrics: dict[str, PTZMetrics]
self,
config: FrigateConfig,
ptz_metrics: dict[str, PTZMetrics],
camera_metrics: dict[str, CameraMetrics] | None = None,
) -> None:
self.cams: dict[str, dict] = {}
self.failed_cams: dict[str, dict] = {}
@ -53,6 +78,7 @@ class OnvifController:
self.reset_timeout = 900 # 15 minutes
self.config = config
self.ptz_metrics = ptz_metrics
self.camera_metrics = camera_metrics or {}
self.status_locks: dict[str, asyncio.Lock] = {}
@ -107,7 +133,28 @@ class OnvifController:
async def _close_camera(self, cam_name: str) -> None:
"""Close the ONVIF client session for a camera."""
cam_state = self.cams.get(cam_name)
if cam_state and "onvif" in cam_state:
if not cam_state:
return
# Stop any long-running event-consumption tasks first so they release
# any resources held against the ONVIFCamera session before we close it.
for key in ("pullpoint", "metadata"):
handle = cam_state.get(key)
if not handle:
continue
task, stop_event = handle
try:
stop_event.set()
except Exception:
pass
task.cancel()
try:
await asyncio.wait_for(task, timeout=5.0)
except (asyncio.CancelledError, asyncio.TimeoutError):
pass
except Exception:
logger.debug(f"Error awaiting {key} task for {cam_name}")
cam_state.pop(key, None)
if "onvif" in cam_state:
try:
await cam_state["onvif"].close()
except Exception:
@ -187,6 +234,172 @@ class OnvifController:
logger.error(f"Onvif connection failed for {camera_name}: {e}")
return False
# Events init runs first, independent of PTZ capability. Many ONVIF
# cameras don't expose PTZ and would otherwise be skipped at the
# get_definition("ptz") check below.
await self._init_onvif_events(camera_name)
return await self._init_onvif_ptz(camera_name)
async def _init_onvif_events(self, camera_name: str) -> None:
"""Subscribe to PullPoint motion events and optionally open the
analytics metadata stream. Failure here is non-fatal PTZ init still
proceeds and the camera continues to work without external motion."""
cam_cfg = self.config.cameras[camera_name]
if not cam_cfg.onvif.events.enabled:
return
cm = self.camera_metrics.get(camera_name)
if cm is None:
logger.warning(
f"ONVIF events enabled for {camera_name} but no CameraMetrics "
"available; external motion will not be published"
)
return
onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
cell_layout = await self._discover_cell_layout(onvif, camera_name)
self.cams[camera_name]["cell_layout"] = cell_layout
def on_state(active: bool) -> None:
cm.external_motion_active.value = 1 if active else 0
if not active:
# Drop spatial data when motion ends — keep the consumer's
# snapshot consistent with the binary state.
try:
cm.external_motion_boxes[:] = []
except Exception:
pass
pp_stop = asyncio.Event()
pp_task = asyncio.create_task(
run_pullpoint_subscription(
onvif,
camera_name,
cam_cfg.onvif.events.subscription_timeout,
on_state,
pp_stop,
)
)
self.cams[camera_name]["pullpoint"] = (pp_task, pp_stop)
logger.info(f"ONVIF events: PullPoint subscriber started for {camera_name}")
if not cam_cfg.onvif.events.use_metadata_stream or cell_layout is None:
return
rtsp_url = await self._discover_primary_rtsp_url(onvif, camera_name)
if not rtsp_url:
logger.warning(
f"ONVIF events for {camera_name}: no primary RTSP URL "
"available; skipping metadata stream"
)
return
rtsp_url = _inject_rtsp_credentials(
rtsp_url, cam_cfg.onvif.user, cam_cfg.onvif.password
)
detect_size = (cam_cfg.detect.width, cam_cfg.detect.height)
def on_boxes(boxes: list[tuple[int, int, int, int]]) -> None:
try:
cm.external_motion_boxes[:] = boxes
except Exception:
logger.debug(f"Failed to publish boxes for {camera_name}")
md_stop = asyncio.Event()
md_task = asyncio.create_task(
run_metadata_stream(
rtsp_url,
camera_name,
cell_layout,
detect_size,
on_boxes,
md_stop,
)
)
self.cams[camera_name]["metadata"] = (md_task, md_stop)
logger.info(f"ONVIF events: metadata stream consumer started for {camera_name}")
async def _discover_cell_layout(
self, onvif: ONVIFCamera, camera_name: str
) -> tuple[int, int, tuple[float, float], tuple[float, float]] | None:
"""Query AnalyticsService.GetAnalyticsModules and extract the
CellMotionEngine's CellLayout (Columns, Rows, Translate, Scale).
Returns None on failure caller should fall back to a full-frame box."""
try:
analytics = await onvif.create_analytics_service()
modules = await analytics.GetAnalyticsModules(
{"ConfigurationToken": "VA_CFG_000"}
)
except Exception as e:
logger.debug(f"ONVIF analytics service unavailable for {camera_name}: {e}")
return None
try:
for mod in modules or []:
mod_type = getattr(mod, "Type", None) or getattr(mod, "_attr_1", None)
if mod_type and "CellMotionEngine" not in str(mod_type):
continue
element_items = getattr(mod.Parameters, "ElementItem", None) or []
for item in element_items:
if item.Name != "Layout":
continue
raw = item._value_1
if raw is None or not hasattr(raw, "attrib"):
continue
cols = int(raw.attrib.get("Columns", 0))
rows = int(raw.attrib.get("Rows", 0))
if cols <= 0 or rows <= 0:
continue
tx = ty = 0.0
sx = sy = 0.0
for child in raw.iter():
if child.tag.endswith("}Translate"):
tx = float(child.attrib.get("x", 0))
ty = float(child.attrib.get("y", 0))
elif child.tag.endswith("}Scale"):
sx = float(child.attrib.get("x", 0))
sy = float(child.attrib.get("y", 0))
logger.info(
f"ONVIF cell layout for {camera_name}: {cols}x{rows} "
f"translate=({tx},{ty}) scale=({sx},{sy})"
)
return (cols, rows, (tx, ty), (sx, sy))
except Exception as e:
logger.debug(
f"Failed parsing CellMotionEngine layout for {camera_name}: {e}"
)
return None
async def _discover_primary_rtsp_url(
self, onvif: ONVIFCamera, camera_name: str
) -> str | None:
"""Return the RTSP URL for the primary profile. The ONVIF analytics
metadata track is typically bound to the primary media profile only
(sub-streams may omit it)."""
try:
media = await onvif.create_media_service()
profiles = await media.GetProfiles()
if not profiles:
return None
uri = await media.GetStreamUri(
{
"StreamSetup": {
"Stream": "RTP-Unicast",
"Transport": {"Protocol": "RTSP"},
},
"ProfileToken": profiles[0].token,
}
)
return uri.Uri
except Exception as e:
logger.debug(f"GetStreamUri failed for {camera_name}: {e}")
return None
async def _init_onvif_ptz(self, camera_name: str) -> bool:
onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
# create init services
media: ONVIFService = await onvif.create_media_service()
logger.debug(f"Onvif media xaddr for {camera_name}: {media.xaddr}")

138
frigate/ptz/onvif_events.py Normal file
View File

@ -0,0 +1,138 @@
"""ONVIF PullPoint subscriber for camera-side motion events.
Long-running per-camera coroutine that subscribes to the camera's PullPoint
service via `onvif-zeep-async`'s `PullPointManager` (which owns subscription
creation, renewal, and lifecycle), pulls notification messages, parses
IsMotion/State on each round-trip, and invokes a callback on transitions.
Lives on the OnvifController's dedicated asyncio loop (see
`frigate/ptz/onvif.py` for the loop setup).
"""
from __future__ import annotations
import asyncio
import datetime as dt
import logging
from typing import TYPE_CHECKING, Awaitable, Callable
if TYPE_CHECKING:
from onvif import ONVIFCamera
try:
from zeep.exceptions import Fault
except ImportError: # tests can run without zeep installed
class Fault(Exception): # type: ignore[no-redef]
pass
logger = logging.getLogger(__name__)
# Names of the boolean state SimpleItem we accept inside the message Data
# block. Spec calls it IsMotion; the legacy MotionAlarm topic uses State.
_STATE_NAMES = ("IsMotion", "State")
# Bounds on backoff between subscription failures.
_BACKOFF_INITIAL_S = 1.0
_BACKOFF_MAX_S = 60.0
def _parse_motion_state(msg) -> bool | None:
"""Walk a NotificationMessage and return the IsMotion/State value, or
None if not present. The Message body is often an `lxml.etree._Element`
that python-onvif-zeep returns for ##any wildcards — walk via .iter()."""
body = getattr(msg, "Message", None)
if body is None:
return None
raw = getattr(body, "_value_1", body)
if not hasattr(raw, "iter"):
return None
for el in raw.iter():
if not el.tag.endswith("}SimpleItem"):
continue
name = el.attrib.get("Name", "")
if name not in _STATE_NAMES:
continue
val = el.attrib.get("Value", "").strip().lower()
if val in ("true", "1"):
return True
if val in ("false", "0"):
return False
return None
async def run_pullpoint_subscription(
onvif_cam: "ONVIFCamera",
cam_name: str,
timeout_seconds: int,
on_state: Callable[[bool], None] | Callable[[bool], Awaitable[None]],
stop_event: asyncio.Event,
) -> None:
"""Loop until stop_event: create a PullPointManager, pull messages,
dispatch on_state on transitions, reconnect on Fault with exponential
backoff."""
backoff = _BACKOFF_INITIAL_S
last_state: bool | None = None
while not stop_event.is_set():
manager = None
sub_lost = asyncio.Event()
def _subscription_lost() -> None:
sub_lost.set()
try:
manager = await onvif_cam.create_pullpoint_manager(
dt.timedelta(seconds=timeout_seconds),
_subscription_lost,
)
service = manager.get_service()
logger.info(f"ONVIF PullPoint subscribed for {cam_name}")
while not stop_event.is_set() and not sub_lost.is_set():
# Long-poll up to 10s. The subscription manager keeps the
# subscription itself alive in the background — we just pull.
msgs = await service.PullMessages(
{"Timeout": "PT10S", "MessageLimit": 32}
)
for m in msgs.NotificationMessage or []:
state = _parse_motion_state(m)
if state is None or state == last_state:
continue
last_state = state
try:
result = on_state(state)
if asyncio.iscoroutine(result):
await result
except Exception:
logger.exception(f"on_state callback error for {cam_name}")
if sub_lost.is_set():
raise Fault("PullPoint subscription lost")
# Clean exit (stop_event set) — leave the loop.
backoff = _BACKOFF_INITIAL_S
break
except asyncio.CancelledError:
raise
except Exception as e:
logger.warning(
f"ONVIF PullPoint subscription error for {cam_name}: {e!r}; "
f"reconnecting in {backoff:.1f}s"
)
finally:
if manager is not None:
try:
await manager.shutdown()
except Exception:
pass
if stop_event.is_set():
return
try:
await asyncio.wait_for(stop_event.wait(), timeout=backoff)
return
except asyncio.TimeoutError:
pass
backoff = min(backoff * 2, _BACKOFF_MAX_S)

View File

@ -0,0 +1,357 @@
"""ONVIF analytics metadata stream consumer.
Per-camera asyncio task that opens an RTSP connection to the camera's
primary profile, extracts the `application/vnd.onvif.metadata` data track
via an ffmpeg subprocess, and converts the per-frame `<tt:MotionInCells>`
bitmap into a list of motion rectangles in Frigate detect-frame pixels.
Why ffmpeg rather than an in-process RTSP client: Frigate already ships
ffmpeg and uses it heavily for video/recording; there is no async RTSP
client in the existing dependency set that handles the `vnd.onvif.metadata`
payload cleanly. The data track is low-bandwidth (~1 packet/sec at idle,
300 bytes XML each), so the subprocess cost is negligible.
Wire format (ONVIF Analytics Service Spec, Annex B "Cell Motion Detection"):
- Each RTP packet payload is one complete <tt:MetadataStream> XML doc.
- Cells attribute = base64(PackBits(bit-packed row-major bitmap)).
- Bits: cols*rows total, MSB-first within bytes, zero-padded.
Cell detect-frame mapping uses the CellLayout transformation discovered
at OnvifController init: Translate(tx, ty) + Scale(sx, sy) maps cell index
(c, r) to normalized ONVIF coords [-1, +1]. We convert that to detect-frame
pixels.
"""
import asyncio
import base64
import logging
from typing import Awaitable, Callable
from xml.etree import ElementTree as ET
import numpy as np
logger = logging.getLogger(__name__)
_TT_NS = "http://www.onvif.org/ver10/schema"
_MIC_TAG = f"{{{_TT_NS}}}MotionInCells"
# ffmpeg's -map 0:d:0 selects the first data track from the input. -c copy
# bypasses any transcode. -f data writes raw packet payloads to stdout.
# -flush_packets 1 disables muxer-side buffering so each metadata frame
# reaches us within ~1 packet of being received from the camera.
_FFMPEG_ARGS_TEMPLATE = (
"-nostdin",
"-loglevel",
"error",
"-rtsp_transport",
"tcp",
"-i",
"{url}",
"-map",
"0:d:0?",
"-c",
"copy",
"-flush_packets",
"1",
"-f",
"data",
"pipe:1",
)
# Each metadata document ends with this closing tag — we split incoming
# stdout on it to recover packet boundaries (no other framing on a `-f data`
# stream).
_DOC_TERMINATOR = b"</tt:MetadataStream>"
_BACKOFF_INITIAL_S = 1.0
_BACKOFF_MAX_S = 60.0
# Stop reading at this many bytes per single document — guards against a
# misbehaving stream filling memory if the terminator never arrives.
_MAX_DOC_BYTES = 64 * 1024
def _packbits_decode(packed: bytes) -> bytes:
"""ISO 12639 / TIFF 6.0 PackBits decoder."""
out = bytearray()
i = 0
n = len(packed)
while i < n:
h = packed[i]
i += 1
if h <= 0x7F:
count = h + 1
out += packed[i : i + count]
i += count
elif h == 0x80:
continue # no-op header
else:
count = 257 - h
if i >= n:
break
out += bytes([packed[i]]) * count
i += 1
return bytes(out)
def _decode_cells(cells_b64: str, cols: int, rows: int) -> np.ndarray | None:
"""Decode the Cells attribute into a 2-D uint8 array shape (rows, cols).
Returns None if the decoded length doesn't match what the layout
expects caller should treat that as "no spatial data this frame"
and fall back to whatever default (e.g. full-frame box)."""
if not cells_b64:
return None
try:
packed = base64.b64decode(cells_b64, validate=False)
except Exception:
return None
raw = _packbits_decode(packed)
needed_bytes = (cols * rows + 7) // 8
if len(raw) < needed_bytes:
return None
bits = np.unpackbits(np.frombuffer(raw[:needed_bytes], dtype=np.uint8))
bits = bits[: cols * rows]
return bits.reshape((rows, cols)).astype(np.uint8)
def _connected_component_bboxes(
cells: np.ndarray,
) -> list[tuple[int, int, int, int]]:
"""4-connectivity flood fill over a small 0/1 grid; returns list of
(c_left, c_top, c_right, c_bottom) inclusive cell-index bounding boxes
for each connected region.
cv2.connectedComponentsWithStats would be faster, but the cell grid is
tiny (typically 22x18 = 396 cells) and avoiding the cv2 import keeps
this module testable without OpenCV installed.
"""
rows, cols = cells.shape
visited = np.zeros_like(cells, dtype=bool)
out: list[tuple[int, int, int, int]] = []
for r0 in range(rows):
for c0 in range(cols):
if not cells[r0, c0] or visited[r0, c0]:
continue
stack = [(r0, c0)]
cmin = cmax = c0
rmin = rmax = r0
while stack:
r, c = stack.pop()
if r < 0 or r >= rows or c < 0 or c >= cols:
continue
if visited[r, c] or not cells[r, c]:
continue
visited[r, c] = True
if r < rmin:
rmin = r
if r > rmax:
rmax = r
if c < cmin:
cmin = c
if c > cmax:
cmax = c
stack.append((r + 1, c))
stack.append((r - 1, c))
stack.append((r, c + 1))
stack.append((r, c - 1))
out.append((cmin, rmin, cmax, rmax))
return out
def _cells_to_boxes(
cells: np.ndarray,
cell_layout: tuple[int, int, tuple[float, float], tuple[float, float]],
detect_size: tuple[int, int],
) -> list[tuple[int, int, int, int]]:
"""Connected-components on the cell grid → list of detect-frame boxes.
cell_layout = (cols, rows, (tx, ty), (sx, sy)) the Translate + Scale
from CellLayout.Transformation. detect_size = (width, height) in
detect-frame pixels.
"""
if cells is None or cells.size == 0 or not cells.any():
return []
cols, rows, (tx, ty), (sx, sy) = cell_layout
det_w, det_h = detect_size
if det_w <= 0 or det_h <= 0:
return []
boxes: list[tuple[int, int, int, int]] = []
# Map cell index → detect-frame pixel via the CellLayout transformation:
# cell (c, r) covers normalized [tx + c*sx, tx + (c+1)*sx] horizontally
# and similarly vertically. Convert normalized [-1, +1] → pixel.
def cell_to_px(
c: int, r: int, *, right_edge: bool, bottom_edge: bool
) -> tuple[int, int]:
cx_idx = c + 1 if right_edge else c
cy_idx = r + 1 if bottom_edge else r
nx = tx + cx_idx * sx
ny = ty + cy_idx * sy
px = int(round((nx + 1.0) * 0.5 * det_w))
py = int(round((ny + 1.0) * 0.5 * det_h))
return px, py
for c_left, c_top, c_right, c_bottom in _connected_component_bboxes(cells):
x1, y1 = cell_to_px(c_left, c_top, right_edge=False, bottom_edge=False)
x2, y2 = cell_to_px(c_right, c_bottom, right_edge=True, bottom_edge=True)
x1 = max(0, min(det_w - 1, x1))
y1 = max(0, min(det_h - 1, y1))
x2 = max(0, min(det_w - 1, x2))
y2 = max(0, min(det_h - 1, y2))
if x2 <= x1 or y2 <= y1:
continue
boxes.append((x1, y1, x2, y2))
return boxes
def _extract_cells_from_doc(doc_bytes: bytes) -> tuple[str | None, int, int]:
"""Parse a <tt:MetadataStream> XML doc, return (cells_b64, cols, rows).
Returns (None, 0, 0) if no MotionInCells element is found."""
try:
root = ET.fromstring(doc_bytes)
except ET.ParseError:
return None, 0, 0
for el in root.iter(_MIC_TAG):
cells_b64 = el.attrib.get("Cells")
try:
cols = int(el.attrib.get("Columns", "0"))
rows = int(el.attrib.get("Rows", "0"))
except ValueError:
return None, 0, 0
return cells_b64, cols, rows
return None, 0, 0
async def run_metadata_stream(
rtsp_url: str,
cam_name: str,
cell_layout: tuple[int, int, tuple[float, float], tuple[float, float]],
detect_size: tuple[int, int],
on_boxes: Callable[[list[tuple[int, int, int, int]]], None]
| Callable[[list[tuple[int, int, int, int]]], Awaitable[None]],
stop_event: asyncio.Event,
) -> None:
"""Loop until stop_event: spawn ffmpeg → read XML docs → decode → on_boxes."""
backoff = _BACKOFF_INITIAL_S
while not stop_event.is_set():
proc = None
try:
args = [a.format(url=rtsp_url) for a in _FFMPEG_ARGS_TEMPLATE]
proc = await asyncio.create_subprocess_exec(
"ffmpeg",
*args,
stdin=asyncio.subprocess.DEVNULL,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
logger.info(
f"ONVIF metadata stream: ffmpeg started for {cam_name} pid={proc.pid}"
)
await _consume_ffmpeg(
proc, cam_name, cell_layout, detect_size, on_boxes, stop_event
)
backoff = _BACKOFF_INITIAL_S
except asyncio.CancelledError:
raise
except Exception as e:
logger.warning(
f"ONVIF metadata stream error for {cam_name}: {e!r}; "
f"reconnecting in {backoff:.1f}s"
)
finally:
if proc is not None and proc.returncode is None:
proc.terminate()
try:
await asyncio.wait_for(proc.wait(), timeout=2.0)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
if stop_event.is_set():
return
try:
await asyncio.wait_for(stop_event.wait(), timeout=backoff)
return
except asyncio.TimeoutError:
pass
backoff = min(backoff * 2, _BACKOFF_MAX_S)
async def _consume_ffmpeg(
proc: asyncio.subprocess.Process,
cam_name: str,
cell_layout: tuple[int, int, tuple[float, float], tuple[float, float]],
detect_size: tuple[int, int],
on_boxes,
stop_event: asyncio.Event,
) -> None:
"""Read XML docs from ffmpeg stdout and dispatch boxes."""
layout_cols, layout_rows, _, _ = cell_layout
assert proc.stdout is not None
buf = bytearray()
while not stop_event.is_set():
chunk = await proc.stdout.read(4096)
if not chunk:
# ffmpeg exited or stream ended.
stderr_tail = b""
if proc.stderr is not None:
try:
stderr_tail = await asyncio.wait_for(
proc.stderr.read(4096), timeout=0.5
)
except asyncio.TimeoutError:
pass
raise RuntimeError(
f"ffmpeg exited for {cam_name} rc={proc.returncode} "
f"stderr={stderr_tail.decode('utf-8', 'replace').strip()[:200]}"
)
buf.extend(chunk)
if len(buf) > _MAX_DOC_BYTES * 4:
# Drop the head to avoid unbounded growth on a wedged stream.
buf = buf[-_MAX_DOC_BYTES:]
while True:
end = buf.find(_DOC_TERMINATOR)
if end < 0:
break
end += len(_DOC_TERMINATOR)
doc = bytes(buf[:end])
del buf[:end]
cells_b64, cols, rows = _extract_cells_from_doc(doc)
if cells_b64 is None:
continue
# Trust the layout we discovered at init; warn (don't fail) if the
# camera reports a different grid mid-stream.
if cols != layout_cols or rows != layout_rows:
logger.debug(
f"{cam_name}: MotionInCells grid {cols}x{rows} differs "
f"from discovered layout {layout_cols}x{layout_rows}"
)
use_layout = (
cols,
rows,
cell_layout[2],
(2.0 / cols if cols else 0, 2.0 / rows if rows else 0),
)
else:
use_layout = cell_layout
cells = _decode_cells(cells_b64, cols, rows)
if cells is None:
continue
boxes = _cells_to_boxes(cells, use_layout, detect_size)
try:
result = on_boxes(boxes)
if asyncio.iscoroutine(result):
await result
except Exception:
logger.exception(f"on_boxes callback error for {cam_name}")

View File

@ -42,33 +42,118 @@ TIMELAPSE_DATA_INPUT_ARGS = "-an -skip_frame nokey"
# Captures the floating-point factor so we can scale expected duration.
SETPTS_FACTOR_RE = re.compile(r"setpts=([0-9]*\.?[0-9]+)\*PTS")
# ffmpeg flags that can read from or write to arbitrary files
BLOCKED_FFMPEG_ARGS = frozenset(
# Allowlisted flags that take no value.
_VALUELESS_FLAGS = frozenset({"-an", "-sn", "-dn"})
# Allowlisted filter flags. Their value is validated as a filtergraph and may
# only reference filters in _SAFE_FILTERS.
_FILTER_FLAGS = frozenset({"-vf", "-af", "-filter"})
# Allowlisted flags that take exactly one value (encoder / muxer-safe options).
_VALUE_FLAGS = frozenset(
{
"-i",
"-filter_script",
"-filter_complex",
"-lavfi",
"-vf",
"-af",
"-filter",
"-vstats_file",
"-passlogfile",
"-sdp_file",
"-dump_attachment",
"-attach",
"-c",
"-codec",
"-b",
"-crf",
"-qp",
"-q",
"-qscale",
"-preset",
"-tune",
"-profile",
"-level",
"-pix_fmt",
"-r",
"-g",
"-keyint_min",
"-sc_threshold",
"-bf",
"-refs",
"-qmin",
"-qmax",
"-maxrate",
"-minrate",
"-bufsize",
"-movflags",
"-threads",
"-aspect",
"-fps_mode",
"-vsync",
"-skip_frame",
}
)
_ALLOWED_FLAGS = _VALUELESS_FLAGS | _FILTER_FLAGS | _VALUE_FLAGS
# Filters that cannot read files, load plugins, or open network sources.
_SAFE_FILTERS = frozenset(
{
"setpts",
"fps",
"scale",
"format",
"transpose",
"hflip",
"vflip",
"crop",
"pad",
"setsar",
"setdar",
}
)
# Conservative shape for a non-filter flag value. Excludes "/" (paths /
# filtergraph division), whitespace, brackets, and a leading "-" so a value
# can never be a path or swallow a following flag. ":" is permitted for values
# like "16:9".
_SAFE_VALUE_RE = re.compile(r"^[A-Za-z0-9_.:+][A-Za-z0-9_.:+-]*$")
# Substrings inside a filtergraph that indicate a file-reading filter option.
# "movie=" also matches "amovie=" as a substring.
_BLOCKED_FILTER_VALUE_MARKERS = ("movie=", "textfile=", "filename=", "fontfile=")
def _base_flag(token: str) -> str:
"""Return a flag's base name, lowercased and without its stream specifier.
e.g. "-c:v" -> "-c", "-filter:a:0" -> "-filter".
"""
return token.lower().split(":", 1)[0]
def _validate_filtergraph(value: str) -> tuple[bool, str]:
"""Validate a filtergraph value, allowing only filters in _SAFE_FILTERS."""
# None of the safe filters need any of these
if any(token in value for token in ("://", "..", "[", "]")):
return False, "Invalid filter graph in custom ffmpeg arguments"
lowered = value.lower()
if any(marker in lowered for marker in _BLOCKED_FILTER_VALUE_MARKERS):
return False, "File-reading filters are not allowed in custom ffmpeg arguments"
# Filters are separated by "," within a chain and ";" between chains. Safe
# filters never use unescaped "," or ";" in their arguments, so splitting on
# them to recover filter names cannot hide a disallowed filter.
for spec in re.split(r"[;,]", value):
spec = spec.strip()
if not spec:
continue
name = spec.split("=", 1)[0].strip().lower()
if name not in _SAFE_FILTERS:
return False, f"Filter not allowed in custom ffmpeg arguments: {name}"
return True, ""
def validate_ffmpeg_args(args: str) -> tuple[bool, str]:
"""Validate that user-provided ffmpeg args don't allow input/output injection.
"""Validate user-provided custom export ffmpeg args with an allowlist.
Blocks:
- The -i flag and other flags that read/write arbitrary files
- Filter flags (can read files via movie=/amovie= source filters)
- Absolute/relative file paths (potential extra outputs)
- URLs and ffmpeg protocol references (data exfiltration)
Every token must be an allowlisted flag or the value of one; filter values
may only reference safe filters; and no token may become a bare input or
output URL. This structurally prevents arbitrary file read/write, network
exfiltration/SSRF, and resource-exhaustion via the export endpoint.
Admin users skip this validation entirely since they are trusted.
"""
@ -76,26 +161,36 @@ def validate_ffmpeg_args(args: str) -> tuple[bool, str]:
return True, ""
tokens = args.split()
for token in tokens:
# Block flags that could inject inputs or write to arbitrary files
if token.lower() in BLOCKED_FFMPEG_ARGS:
i = 0
while i < len(tokens):
token = tokens[i]
# A bare (non-flag) token here would be parsed by ffmpeg as an input or
# output URL. Only the server sets inputs/outputs, never the user.
if not token.startswith("-"):
return False, f"Unexpected argument in custom ffmpeg arguments: {token}"
base = _base_flag(token)
if base not in _ALLOWED_FLAGS:
return False, f"Forbidden ffmpeg argument: {token}"
# Block tokens that look like file paths (potential output injection)
if (
token.startswith("/")
or token.startswith("./")
or token.startswith("../")
or token.startswith("~")
):
return False, "File paths are not allowed in custom ffmpeg arguments"
if base in _VALUELESS_FLAGS:
i += 1
continue
# Block URLs and ffmpeg protocol references (e.g. http://, tcp://, pipe:, file:)
if "://" in token or token.startswith("pipe:") or token.startswith("file:"):
return (
False,
"Protocol references are not allowed in custom ffmpeg arguments",
)
# Remaining flags consume exactly one value.
if i + 1 >= len(tokens):
return False, f"Missing value for ffmpeg argument: {token}"
value = tokens[i + 1]
if base in _FILTER_FLAGS:
valid, message = _validate_filtergraph(value)
if not valid:
return False, message
elif not _SAFE_VALUE_RE.match(value):
return False, f"Invalid value for {token}: {value}"
i += 2
return True, ""

132
frigate/test/test_export.py Normal file
View File

@ -0,0 +1,132 @@
import unittest
from frigate.record.export import validate_ffmpeg_args
class TestValidateFfmpegArgs(unittest.TestCase):
"""Tests for the non-admin custom export ffmpeg arg validator.
The validator uses a structural allowlist: every token must be an
allowlisted flag or the value of one, filter values are restricted to a
safe set of filters, and no token may become a bare input/output URL.
"""
def assertRejected(self, args: str) -> None:
valid, message = validate_ffmpeg_args(args)
self.assertFalse(valid, f"expected {args!r} to be rejected")
self.assertNotEqual(message, "")
def assertAllowed(self, args: str) -> None:
valid, message = validate_ffmpeg_args(args)
self.assertTrue(valid, f"expected {args!r} to be allowed, got: {message}")
self.assertEqual(message, "")
# --- legitimate use cases must keep working ---------------------------
def test_timelapse_setpts_allowed(self):
# The whole reason -vf cannot simply be blocked: timelapse exports.
self.assertAllowed("-vf setpts=PTS/60 -r 25")
self.assertAllowed("-vf setpts=0.04*PTS -r 30") # server default
self.assertAllowed("-filter:v setpts=PTS/60 -r 25")
def test_default_input_args_allowed(self):
self.assertAllowed("")
self.assertAllowed("-an -skip_frame nokey")
def test_encoding_args_allowed(self):
self.assertAllowed("-c:v libx264 -crf 23 -preset fast")
self.assertAllowed("-c:v copy -c:a copy")
self.assertAllowed("-c:v libx264 -b:v 2M -maxrate 2M -bufsize 4M")
self.assertAllowed("-movflags +faststart")
self.assertAllowed("-pix_fmt yuv420p -r 30 -g 30")
def test_safe_filters_allowed(self):
self.assertAllowed("-vf scale=640:480")
self.assertAllowed("-vf scale=640:480,setpts=0.5*PTS")
self.assertAllowed("-vf format=yuv420p")
self.assertAllowed("-vf transpose=1")
self.assertAllowed("-vf hflip")
self.assertAllowed("-vf fps=15")
self.assertAllowed("-vf setsar=1 -an")
self.assertAllowed("-vf setdar=16/9")
# --- the reported advisory and file-read class ------------------------
def test_reported_advisory_rejected(self):
self.assertRejected(
"-filter:v drawtext=textfile=/etc/passwd:fontcolor=white:fontsize=20"
)
def test_file_reading_filters_rejected(self):
self.assertRejected("-vf movie=/etc/passwd")
self.assertRejected("-vf drawtext=textfile=/etc/passwd")
self.assertRejected("-vf subtitles=/etc/passwd")
# marker embedded as an option of an otherwise-allowed filter name
self.assertRejected("-vf scale=movie=/etc/passwd")
def test_filtergraph_brackets_rejected(self):
# link labels aren't needed for safe filters; rejecting "[" / "]" keeps
# filtergraph validation linear (no ReDoS on attacker input)
self.assertRejected("-vf [in]scale=640:480[out]")
self.assertRejected("-vf " + "[" * 5000)
def test_preset_file_read_rejected(self):
# cwd-anchored traversal slipped past the old startswith() path check
self.assertRejected("-fpre frigate/../../../etc/passwd")
self.assertRejected("-fpre evil.preset")
self.assertRejected("-vpre x")
self.assertRejected("-apre x")
self.assertRejected("-pre x")
def test_slash_option_file_read_rejected(self):
# ffmpeg "-/option file" reads the option value from a file
self.assertRejected("-/filter:v graph.txt")
self.assertRejected("-/filter_complex graph.txt")
# --- network / SSRF class ---------------------------------------------
def test_schemeless_protocol_rejected(self):
self.assertRejected("-f mpegts tcp:10.0.0.5:4444")
self.assertRejected("tcp:10.0.0.5:4444")
self.assertRejected("udp:10.0.0.5:4444")
self.assertRejected("-progress http:attacker.example.com:80/p")
# --- file-write class --------------------------------------------------
def test_tee_write_rejected(self):
self.assertRejected("-c:v libx264 -map 0 -f tee [f=mpegts]/tmp/owned.ts")
self.assertRejected("-f tee [f=mpegts]/etc/frigate/x.ts")
self.assertRejected("tee:/tmp/x")
def test_bare_output_token_rejected(self):
self.assertRejected("evil.mp4")
self.assertRejected("-c copy evil.mp4")
self.assertRejected("x/../escaped.mkv")
def test_file_producing_muxers_rejected(self):
self.assertRejected("-f hls -hls_segment_filename pwn%03d.ts out.m3u8")
self.assertRejected("-f md5 victim.txt")
self.assertRejected("-f segment seg%03d.ts")
def test_write_flags_rejected(self):
self.assertRejected("-progress evil.log")
self.assertRejected("-stats_enc_pre evil.csv")
self.assertRejected("-report")
# --- resource exhaustion / misc ---------------------------------------
def test_dos_input_flags_rejected(self):
self.assertRejected("-stream_loop -1")
self.assertRejected("-readrate 0.001")
def test_disallowed_flags_rejected(self):
self.assertRejected("-map 0")
self.assertRejected("-i /etc/passwd")
self.assertRejected("-attach evil.bin")
self.assertRejected("-dump_attachment evil.bin")
self.assertRejected("/etc/passwd")
self.assertRejected("-metadata comment=x")
if __name__ == "__main__":
unittest.main()

View File

@ -0,0 +1,64 @@
"""Validator tests for `motion.source=onvif` interactions with
`onvif.events.enabled` and `motion.enabled`. Exercises `verify_motion_and_detect`
directly so we don't need the full FrigateConfig path (which mounts /config)."""
import unittest
from frigate.config.config import verify_motion_and_detect
class _Dummy:
"""Light shim for the nested config attributes the validator reads."""
def __init__(self, **kw):
for k, v in kw.items():
setattr(self, k, v)
def _camera(*, name, detect, motion, onvif):
return _Dummy(name=name, detect=detect, motion=motion, onvif=onvif)
class TestVerifyMotionAndDetect(unittest.TestCase):
def test_internal_motion_with_detect_passes(self):
cam = _camera(
name="c",
detect=_Dummy(enabled=True),
motion=_Dummy(enabled=True, source="internal"),
onvif=_Dummy(events=_Dummy(enabled=False)),
)
# No exception.
self.assertIsNone(verify_motion_and_detect(cam))
def test_detect_with_motion_disabled_rejected(self):
cam = _camera(
name="c",
detect=_Dummy(enabled=True),
motion=_Dummy(enabled=False, source="internal"),
onvif=_Dummy(events=_Dummy(enabled=False)),
)
with self.assertRaisesRegex(ValueError, "object detection requires motion"):
verify_motion_and_detect(cam)
def test_source_onvif_requires_events_enabled(self):
cam = _camera(
name="c",
detect=_Dummy(enabled=True),
motion=_Dummy(enabled=False, source="onvif"),
onvif=_Dummy(events=_Dummy(enabled=False)),
)
with self.assertRaisesRegex(ValueError, "onvif.events.enabled is false"):
verify_motion_and_detect(cam)
def test_source_onvif_with_events_passes_even_with_motion_disabled(self):
cam = _camera(
name="c",
detect=_Dummy(enabled=True),
motion=_Dummy(enabled=False, source="onvif"),
onvif=_Dummy(events=_Dummy(enabled=True)),
)
self.assertIsNone(verify_motion_and_detect(cam))
if __name__ == "__main__":
unittest.main()

View File

@ -0,0 +1,91 @@
import math
import unittest
import numpy as np
from norfair.camera_motion import (
HomographyTransformation,
TranslationTransformation,
)
from frigate.ptz.autotrack import transform_is_finite
from frigate.track.norfair_tracker import distance
class TestNorfairDistance(unittest.TestCase):
"""Regression tests for the tracker distance guard.
norfair raises a hard ValueError on any nan distance, which kills the camera
process. During autotracking, an ill-conditioned homography can hand the
tracker a non-finite or degenerate estimate box, so distance() must never
return nan for any input.
"""
def setUp(self) -> None:
# boxes are [[x1, y1], [x2, y2]]
self.detection = np.array([[805.0, 402.0], [864.0, 521.0]])
self.estimate = np.array([[800.0, 400.0], [860.0, 520.0]])
def test_finite_boxes_give_finite_distance(self) -> None:
d = distance(self.detection, self.estimate)
self.assertTrue(math.isfinite(d))
def test_inf_estimate_corner_does_not_return_nan(self) -> None:
estimate = np.array([[np.inf, 400.0], [860.0, 520.0]])
d = distance(self.detection, estimate)
self.assertFalse(math.isnan(d))
self.assertEqual(d, float("inf"))
def test_nan_estimate_corner_does_not_return_nan(self) -> None:
# the actual autotracking crash: a positive-only guard would miss this
# because nan <= 0 is False
estimate = np.array([[np.nan, 400.0], [860.0, 520.0]])
d = distance(self.detection, estimate)
self.assertFalse(math.isnan(d))
self.assertEqual(d, float("inf"))
def test_zero_area_estimate_does_not_return_nan(self) -> None:
estimate = np.array([[900.0, 500.0], [900.0, 500.0]])
d = distance(self.detection, estimate)
self.assertFalse(math.isnan(d))
self.assertEqual(d, float("inf"))
def test_zero_area_detection_does_not_return_nan(self) -> None:
detection = np.array([[805.0, 402.0], [805.0, 521.0]])
d = distance(detection, self.estimate)
self.assertFalse(math.isnan(d))
self.assertEqual(d, float("inf"))
def test_inverted_estimate_corners_do_not_return_nan(self) -> None:
# Kalman estimates can occasionally cross corners (x2 < x1)
estimate = np.array([[860.0, 520.0], [800.0, 400.0]])
d = distance(self.detection, estimate)
self.assertFalse(math.isnan(d))
self.assertEqual(d, float("inf"))
class TestTransformIsFinite(unittest.TestCase):
def test_finite_homography_is_finite(self) -> None:
matrix = np.array([[1.0, 0.0, 5.0], [0.0, 1.0, 3.0], [0.0, 0.0, 1.0]])
self.assertTrue(transform_is_finite(HomographyTransformation(matrix)))
def test_finite_translation_is_finite(self) -> None:
self.assertTrue(
transform_is_finite(TranslationTransformation(np.array([12.0, -4.0])))
)
def test_non_finite_homography_is_not_finite(self) -> None:
transform = HomographyTransformation(np.eye(3))
# simulate accumulation overflowing to a non-finite matrix
transform.homography_matrix = np.array(
[[1.0, 0.0, np.inf], [0.0, 1.0, 0.0], [0.0, 0.0, 1.0]]
)
self.assertFalse(transform_is_finite(transform))
def test_nan_translation_is_not_finite(self) -> None:
self.assertFalse(
transform_is_finite(TranslationTransformation(np.array([np.nan, 0.0])))
)
if __name__ == "__main__":
unittest.main()

View File

@ -0,0 +1,152 @@
"""Unit tests for the ONVIF analytics metadata decoder + cell→box mapper."""
import base64
import unittest
import numpy as np
from frigate.ptz.onvif_metadata import (
_cells_to_boxes,
_decode_cells,
_extract_cells_from_doc,
_packbits_decode,
)
class TestPackBits(unittest.TestCase):
def test_spec_example(self):
# ONVIF Analytics Annex B worked example:
# raw = ff ff ff f0 f0 f0
# packed (PackBits) = fe ff fe f0
packed = bytes.fromhex("feff fef0".replace(" ", ""))
self.assertEqual(
_packbits_decode(packed),
bytes.fromhex("ffff fff0 f0f0".replace(" ", "")),
)
def test_idle_frame_from_live_camera(self):
# `zwA=` is a representative idle-frame payload from a 22×18 grid:
# 396 bits → 50 bytes after byte-padding; PackBits compresses 50
# zeros to `cf 00` → base64 `zwA=`.
packed = base64.b64decode("zwA=")
self.assertEqual(packed, bytes.fromhex("cf 00".replace(" ", "")))
raw = _packbits_decode(packed)
self.assertEqual(len(raw), 50)
self.assertEqual(raw, b"\x00" * 50)
def test_literal_run(self):
# Header 0x03 → 4 literal bytes follow.
self.assertEqual(_packbits_decode(b"\x03ABCD"), b"ABCD")
def test_noop_header(self):
# 0x80 is no-op per spec; literal after should still decode normally.
# \x80 → no-op; \x02 → literal of 3 bytes follows; "ABC" copied.
self.assertEqual(_packbits_decode(b"\x80\x02ABC"), b"ABC")
class TestDecodeCells(unittest.TestCase):
def test_idle(self):
cells = _decode_cells("zwA=", 22, 18)
self.assertIsNotNone(cells)
self.assertEqual(cells.shape, (18, 22))
self.assertEqual(int(cells.sum()), 0)
def test_invalid_base64(self):
self.assertIsNone(_decode_cells("not-base64!@", 22, 18))
def test_short_payload_returns_none(self):
# `cf 00` decodes to 50 bytes; ask for 100×100 grid (1250 bytes
# needed) → expect None.
self.assertIsNone(_decode_cells("zwA=", 100, 100))
def test_top_left_active(self):
# Build a 22×18 grid with only cell (0,0) active. Raw bitmap byte 0
# = 0x80 (MSB set), bytes 1..49 = 0x00. PackBits of that 50-byte
# sequence: literal-1-byte (header 0x00) of 0x80, then replicate of
# 49 zeros (header 257-49=208=0xD0, byte 0x00).
packed = bytes([0x00, 0x80, 0xD0, 0x00])
b64 = base64.b64encode(packed).decode()
cells = _decode_cells(b64, 22, 18)
self.assertIsNotNone(cells)
self.assertEqual(int(cells[0, 0]), 1)
self.assertEqual(int(cells.sum()), 1)
class TestCellsToBoxes(unittest.TestCase):
"""Verify the cell-grid → detect-frame pixel mapping using a representative
CellLayout (22x18, Translate(-1,-1), Scale(2/22, 2/18))."""
LAYOUT = (22, 18, (-1.0, -1.0), (2.0 / 22, 2.0 / 18))
DETECT = (1280, 720)
def test_empty(self):
cells = np.zeros((18, 22), dtype=np.uint8)
self.assertEqual(_cells_to_boxes(cells, self.LAYOUT, self.DETECT), [])
def test_top_left_cell(self):
cells = np.zeros((18, 22), dtype=np.uint8)
cells[0, 0] = 1
boxes = _cells_to_boxes(cells, self.LAYOUT, self.DETECT)
self.assertEqual(len(boxes), 1)
x1, y1, x2, y2 = boxes[0]
# Cell (0,0) covers normalized [-1, -1+2/22] × [-1, -1+2/18]
# → detect px [0, 1280/22] × [0, 720/18] = [0, ~58] × [0, 40]
self.assertEqual(x1, 0)
self.assertEqual(y1, 0)
self.assertAlmostEqual(x2, round(1280 / 22), delta=2)
self.assertAlmostEqual(y2, round(720 / 18), delta=2)
def test_bottom_right_cell(self):
cells = np.zeros((18, 22), dtype=np.uint8)
cells[17, 21] = 1
boxes = _cells_to_boxes(cells, self.LAYOUT, self.DETECT)
self.assertEqual(len(boxes), 1)
x1, y1, x2, y2 = boxes[0]
# Bottom-right edge clamps to detect_size - 1.
self.assertEqual(x2, self.DETECT[0] - 1)
self.assertEqual(y2, self.DETECT[1] - 1)
self.assertAlmostEqual(x1, round(21 * 1280 / 22), delta=2)
self.assertAlmostEqual(y1, round(17 * 720 / 18), delta=2)
def test_two_separated_regions(self):
cells = np.zeros((18, 22), dtype=np.uint8)
# Region A: top-left 2×2 block
cells[0:2, 0:2] = 1
# Region B: bottom-right 2×2 block (separated by inactive cells)
cells[15:17, 18:20] = 1
boxes = _cells_to_boxes(cells, self.LAYOUT, self.DETECT)
self.assertEqual(len(boxes), 2)
class TestExtractCellsFromDoc(unittest.TestCase):
def test_typical_frame(self):
doc = (
b'<tt:MetadataStream xmlns:tt="http://www.onvif.org/ver10/schema">'
b"<tt:VideoAnalytics>"
b'<tt:Frame UtcTime="2026-05-29T14:12:20Z">'
b"<tt:Extension>"
b'<tt:MotionInCells Columns="22" Rows="18" Cells="zwA="/>'
b"</tt:Extension></tt:Frame></tt:VideoAnalytics></tt:MetadataStream>"
)
cells_b64, cols, rows = _extract_cells_from_doc(doc)
self.assertEqual(cells_b64, "zwA=")
self.assertEqual(cols, 22)
self.assertEqual(rows, 18)
def test_malformed_xml(self):
self.assertEqual(
_extract_cells_from_doc(b"not-xml"),
(None, 0, 0),
)
def test_doc_without_motioncells(self):
doc = (
b'<tt:MetadataStream xmlns:tt="http://www.onvif.org/ver10/schema">'
b"<tt:VideoAnalytics><tt:Frame/></tt:VideoAnalytics>"
b"</tt:MetadataStream>"
)
self.assertEqual(_extract_cells_from_doc(doc), (None, 0, 0))
if __name__ == "__main__":
unittest.main()

View File

@ -0,0 +1,72 @@
"""Unit tests for the ONVIF PullPoint motion-state parser."""
import unittest
from xml.etree import ElementTree as ET
from frigate.ptz.onvif_events import _parse_motion_state
class FakeMessage:
"""Mimic the zeep NotificationMessage shape: a Message attribute holding
an object whose `_value_1` is an lxml/etree element."""
class _Body:
def __init__(self, element):
self._value_1 = element
def __init__(self, xml: str):
self.Message = self._Body(ET.fromstring(xml))
_NS = 'xmlns:tt="http://www.onvif.org/ver10/schema"'
def _build_msg(name: str, value: str) -> FakeMessage:
xml = (
f"<tt:Message {_NS}>"
"<tt:Source>"
'<tt:SimpleItem Name="Source" Value="VideoSourceToken"/>'
"</tt:Source>"
"<tt:Data>"
f'<tt:SimpleItem Name="{name}" Value="{value}"/>'
"</tt:Data>"
"</tt:Message>"
)
return FakeMessage(xml)
class TestParseMotionState(unittest.TestCase):
def test_is_motion_true(self):
self.assertTrue(_parse_motion_state(_build_msg("IsMotion", "true")))
def test_is_motion_false(self):
self.assertFalse(_parse_motion_state(_build_msg("IsMotion", "false")))
def test_legacy_state_topic_name(self):
# The legacy tns1:VideoSource/MotionAlarm payload uses "State" instead
# of the spec-compliant "IsMotion"; we accept either.
self.assertTrue(_parse_motion_state(_build_msg("State", "true")))
self.assertFalse(_parse_motion_state(_build_msg("State", "false")))
def test_boolean_aliases(self):
self.assertTrue(_parse_motion_state(_build_msg("IsMotion", "1")))
self.assertFalse(_parse_motion_state(_build_msg("IsMotion", "0")))
def test_no_state_returns_none(self):
# Missing the State/IsMotion SimpleItem.
xml = (
f"<tt:Message {_NS}>"
'<tt:Data><tt:SimpleItem Name="Other" Value="yes"/></tt:Data>'
"</tt:Message>"
)
self.assertIsNone(_parse_motion_state(FakeMessage(xml)))
def test_no_message_returns_none(self):
class Empty:
pass
self.assertIsNone(_parse_motion_state(Empty()))
if __name__ == "__main__":
unittest.main()

View File

@ -45,6 +45,17 @@ def distance(detection: np.ndarray, estimate: np.ndarray) -> float:
estimate_dim = np.diff(estimate, axis=0).flatten()
detection_dim = np.diff(detection, axis=0).flatten()
# Guard against degenerate or non-finite boxes
if (
not np.all(np.isfinite(estimate_dim))
or not np.all(np.isfinite(detection_dim))
or estimate_dim[0] <= 0
or estimate_dim[1] <= 0
or detection_dim[0] <= 0
or detection_dim[1] <= 0
):
return float("inf")
# get bottom center positions
detection_position = np.array(
[np.average(detection[:, 0]), np.max(detection[:, 1])]

View File

@ -14,6 +14,7 @@ from frigate.camera import CameraMetrics, PTZMetrics
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import CameraConfig, DetectConfig, LoggerConfig, ModelConfig
from frigate.config.camera.camera import CameraTypeEnum
from frigate.config.camera.motion import MotionSourceEnum
from frigate.config.camera.updater import (
CameraConfigUpdateEnum,
CameraConfigUpdateSubscriber,
@ -300,7 +301,22 @@ def process_frames(
continue
# look for motion if enabled
motion_boxes = motion_detector.detect(frame)
if camera_config.motion.source == MotionSourceEnum.onvif:
# Motion is supplied by an external ONVIF cell-motion subscriber
# writing to camera_metrics. Skip the per-frame internal detector.
if camera_metrics.external_motion_active.value:
boxes = list(camera_metrics.external_motion_boxes)
if boxes:
motion_boxes = [tuple(b) for b in boxes]
else:
# Active but no spatial data yet — fall back to full frame
# so downstream region clustering still has something to
# scan.
motion_boxes = [(0, 0, frame_shape[1] - 1, frame_shape[0] - 1)]
else:
motion_boxes = []
else:
motion_boxes = motion_detector.detect(frame)
regions = []
consolidated_detections = []

606
generate_api_auth_spec.py Normal file
View File

@ -0,0 +1,606 @@
"""Generate the OpenAPI spec from the app, annotated with auth requirements.
This generator builds the FastAPI application, exports its OpenAPI document via
``app.openapi()``, and enriches every operation with authentication metadata:
* a ``components.securitySchemes`` block,
* a per-operation ``security`` requirement (so the docs render a lock badge),
* an ``x-required-role`` extension for machine readers, and
* a short bold ``Access:`` note prepended to each operation description.
The committed docs/static/frigate-api.yaml is the output of this script. It is
generated rather than hand-maintained so it stays complete and current; the docs
build (docusaurus-plugin-openapi-docs) consumes it as-is.
The access level for an endpoint is determined by BOTH its route-level
dependency (``require_role``/``allow_any_authenticated``/``allow_public``/
``require_camera_access``) AND the global "secure by default" admin dependency,
which is bypassed only for the paths listed in ``require_admin_by_default``.
Those exempt lists are read directly from the function's closure so this script
stays in lockstep with ``frigate/api/auth.py`` instead of duplicating them.
Many handlers enforce per-camera access by calling ``require_camera_access``
inside the handler body rather than as a route dependency, which dependency
introspection cannot see. We recover those from the handler's bytecode (see
``_handler_enforces_camera``) and promote an otherwise "any authenticated"
operation to camera-scoped.
Usage (from the repository root):
python3 generate_api_auth_spec.py # write the spec
python3 generate_api_auth_spec.py --check # CI guard: fail if stale
The process exits non-zero if the generated document fails structural
validation, or (in --check mode) if the committed spec is out of date.
"""
import argparse
import difflib
import inspect
import io
import logging
import sys
from pathlib import Path
from fastapi import FastAPI
from fastapi.routing import APIRoute
from ruamel.yaml import YAML
from ruamel.yaml.scalarstring import LiteralScalarString
from frigate.api import app as main_app
from frigate.api import (
auth,
camera,
chat,
classification,
debug_replay,
event,
export,
media,
motion_search,
notification,
preview,
record,
review,
)
from frigate.api.auth import require_admin_by_default
logging.basicConfig(level=logging.INFO, format="%(message)s")
logger = logging.getLogger("generate_api_auth_spec")
REPO_ROOT = Path(__file__).resolve().parent
OUTPUT_SPEC = REPO_ROOT / "docs" / "static" / "frigate-api.yaml"
HTTP_METHODS = {"get", "post", "put", "delete", "patch"}
# Banner written at the top of the generated spec.
HEADER = (
"# Generated by generate_api_auth_spec.py — do not edit by hand.\n"
"# Regenerate with: python3 generate_api_auth_spec.py\n"
"# The empty info.title is intentional: a docusaurus-openapi-docs convention\n"
"# that suppresses the generated API introduction page.\n"
)
# Post-processing applied on top of the raw app.openapi() export. These live
# only in the published spec, not in the app, so they are reproduced here.
SPEC_TITLE = ""
SPEC_SERVERS = [
{"url": "https://demo.frigate.video/api"},
{"url": "http://localhost:5001/api"},
]
# Access levels, ordered from least to most privileged. The string values are
# also what we emit as ``x-required-role``.
PUBLIC = "public"
AUTHENTICATED = "any"
CAMERA = "camera"
ADMIN = "admin"
ADMIN_SCHEME = "frigateAdminAuth"
USER_SCHEME = "frigateUserAuth"
SECURITY_SCHEMES = {
ADMIN_SCHEME: {
"type": "apiKey",
"in": "cookie",
"name": "frigate_token",
"description": (
"Authenticated session whose resolved role is 'admin'. The session "
"is established via the JWT cookie issued by POST /login, or via "
"proxy auth headers (remote-user / remote-role) when Frigate runs "
"behind an authenticating reverse proxy."
),
},
USER_SCHEME: {
"type": "apiKey",
"in": "cookie",
"name": "frigate_token",
"description": (
"Any authenticated session (role 'viewer' or higher), established "
"via the JWT cookie issued by POST /login, or via proxy auth "
"headers when Frigate runs behind an authenticating reverse proxy."
),
},
}
# How each access level maps to a rendered note.
ACCESS_NOTES = {
PUBLIC: "**Access:** Public — no authentication required.",
AUTHENTICATED: "**Access:** Any authenticated user.",
CAMERA: "**Access:** Authenticated user with access to the referenced camera.",
ADMIN: "**Access:** Admin role required.",
}
def build_app() -> FastAPI:
"""Build a bare app with every router mounted.
This mirrors the router set wired up in frigate.api.fastapi_app. It omits
the global admin dependency and all runtime state; the OpenAPI route table
and the per-route dependencies are all we need to export and classify.
"""
app = FastAPI()
routers = [
auth.router,
camera.router,
chat.router,
classification.router,
review.router,
main_app.router,
preview.router,
notification.router,
export.router,
event.router,
media.router,
motion_search.router,
record.router,
debug_replay.router,
]
for router in routers:
app.include_router(router)
return app
def read_exempt_rules() -> tuple[set[str], tuple[str, ...]]:
"""Read the admin-exemption lists straight from the auth dependency closure.
Reading them here (rather than copying) keeps this generator in sync with
frigate/api/auth.py automatically.
"""
closure = inspect.getclosurevars(require_admin_by_default()).nonlocals
exempt_paths = set(closure["EXEMPT_PATHS"])
exempt_prefixes = tuple(closure["EXEMPT_PREFIXES"])
return exempt_paths, exempt_prefixes
def _first_segment(path: str) -> str:
return path.split("/", 2)[1] if path.startswith("/") and len(path) > 1 else ""
def _route_markers(route: APIRoute) -> tuple[set[str], list[str] | None]:
"""Return the set of recognized auth markers on a route's dependencies."""
markers: set[str] = set()
admin_roles: list[str] | None = None
for dep in route.dependant.dependencies:
call = dep.call
qualname = getattr(call, "__qualname__", "") or ""
name = getattr(call, "__name__", "") or ""
if "role_checker" in qualname:
markers.add(ADMIN)
try:
roles = inspect.getclosurevars(call).nonlocals.get("required_roles")
if roles:
admin_roles = list(roles)
except (TypeError, ValueError):
pass
elif name in ("require_camera_access", "require_go2rtc_stream_access"):
markers.add(CAMERA)
elif "auth_checker" in qualname:
markers.add(AUTHENTICATED)
elif "public_checker" in qualname:
markers.add(PUBLIC)
return markers, admin_roles
def _handler_enforces_camera(route: APIRoute) -> bool:
"""True if the route handler calls require_camera_access in its body.
Such calls are invisible to dependency introspection. We detect them from
the handler's compiled bytecode: a global name referenced anywhere in the
function appears in ``__code__.co_names``. This catches direct calls (all of
them, currently); a call hidden behind a helper function would be missed.
"""
code = getattr(route.endpoint, "__code__", None)
return bool(code and "require_camera_access" in code.co_names)
def classify_route(
route: APIRoute,
exempt_paths: set[str],
exempt_prefixes: tuple[str, ...],
) -> tuple[str, list[str] | None, str | None]:
"""Resolve the effective access level for a route.
Returns (access_level, roles, flag). ``flag`` is a human-readable note when
the result needed inference or revealed a possible inconsistency.
"""
level, roles, flag = _classify_base(route, exempt_paths, exempt_prefixes)
# In-body require_camera_access enforcement is invisible to dependency
# introspection. When the effective access would otherwise be "any
# authenticated", the handler's per-camera check is the real constraint, so
# promote it to camera-scoped. Admin/public are left alone: for admin the
# role is the binding requirement and the camera check is only defensive.
if level == AUTHENTICATED and _handler_enforces_camera(route):
return CAMERA, None, None
return level, roles, flag
def _classify_base(
route: APIRoute,
exempt_paths: set[str],
exempt_prefixes: tuple[str, ...],
) -> tuple[str, list[str] | None, str | None]:
"""Resolve the access level from route-level dependencies and exempt rules."""
markers, admin_roles = _route_markers(route)
path = route.path
is_camera_path = _first_segment(path) == "{camera_name}"
exempt = path in exempt_paths or path.startswith(exempt_prefixes) or is_camera_path
# Explicit route-level markers win, in order of specificity.
if ADMIN in markers:
return ADMIN, admin_roles or ["admin"], None
if CAMERA in markers:
return CAMERA, None, None
if AUTHENTICATED in markers:
if exempt:
return AUTHENTICATED, None, None
# The route opts in to any-authenticated, but the global admin check is
# not bypassed for this path, so admin is what actually gets enforced.
return (
ADMIN,
["admin"],
(
"route declares allow_any_authenticated but path is not exempt from "
"the global admin check; admin is effectively enforced"
),
)
if PUBLIC in markers:
if exempt:
return PUBLIC, None, None
return (
ADMIN,
["admin"],
(
"route declares allow_public but path is not exempt from the global "
"admin check; admin is effectively enforced"
),
)
# No explicit auth marker: governed purely by the global default.
if not exempt:
return ADMIN, ["admin"], None
# Exempt with no route dependency: the global admin check is bypassed and
# there is no route-level gate, so authorization (if any) happens inside the
# handler. Infer from the path shape and flag for confirmation.
if is_camera_path:
return (
CAMERA,
None,
(
"no route-level dependency; camera-scoped path, authorization "
"assumed to be enforced in the handler"
),
)
return (
AUTHENTICATED,
None,
(
"path is exempt from the global admin check but has no route-level "
"dependency; confirm authorization is enforced in the handler"
),
)
def build_access_map(
app: FastAPI,
exempt_paths: set[str],
exempt_prefixes: tuple[str, ...],
) -> dict[tuple[str, str], dict]:
"""Map (path, lowercase method) -> classification details."""
access_map: dict[tuple[str, str], dict] = {}
for route in app.routes:
if not isinstance(route, APIRoute):
continue
level, roles, flag = classify_route(route, exempt_paths, exempt_prefixes)
for method in route.methods:
if method in ("HEAD", "OPTIONS"):
continue
access_map[(route.path, method.lower())] = {
"level": level,
"roles": roles,
"flag": flag,
"path": route.path,
"method": method,
}
return access_map
def security_for(level: str) -> list:
"""Build the OpenAPI ``security`` value for an access level."""
if level == PUBLIC:
return []
if level == ADMIN:
return [{ADMIN_SCHEME: []}]
# AUTHENTICATED and CAMERA both require any authenticated session; the
# camera-specific scoping is conveyed in the note and x-required-role.
return [{USER_SCHEME: []}]
def required_role_value(level: str, roles: list[str] | None):
if level == ADMIN and roles and roles != ["admin"]:
return roles
return level
def annotate_description(operation: dict, note: str) -> None:
existing = operation.get("description")
if not existing:
operation["description"] = note
return
operation["description"] = LiteralScalarString(
f"{note}\n\n{str(existing).rstrip()}"
)
def base_document(raw: dict) -> dict:
"""Apply the docs pipeline post-processing with a stable top-level order."""
info = dict(raw.get("info", {}))
info["title"] = SPEC_TITLE
return {
"openapi": raw["openapi"],
"info": info,
"servers": [dict(server) for server in SPEC_SERVERS],
"paths": raw["paths"],
"components": raw.get("components", {}),
}
def enrich(spec: dict, access_map: dict) -> tuple[dict, list, list]:
"""Add security schemes and per-operation auth metadata in place."""
components = spec.setdefault("components", {})
components["securitySchemes"] = dict(SECURITY_SCHEMES)
counts: dict[str, int] = {}
flagged: list[dict] = []
unmatched: list[tuple[str, str]] = []
for path, path_item in spec["paths"].items():
for method, operation in path_item.items():
if method.lower() not in HTTP_METHODS:
continue
details = access_map.get((path, method.lower()))
if details is None:
unmatched.append((method.upper(), path))
continue
level = details["level"]
counts[level] = counts.get(level, 0) + 1
operation["security"] = security_for(level)
operation["x-required-role"] = required_role_value(level, details["roles"])
annotate_description(operation, ACCESS_NOTES[level])
if details["flag"]:
flagged.append(details)
return counts, flagged, unmatched
# Numeric defaults at or above this magnitude are treated as live Unix
# timestamps baked into the schema at import time (e.g. the /{camera_name}
# /recordings after/before params default to datetime.now()). They make the
# export non-deterministic and document a meaningless frozen epoch, so they are
# stripped. The proper fix is to default those route params to None and resolve
# "now" inside the handler.
VOLATILE_DEFAULT_THRESHOLD = 1_000_000_000
def strip_volatile_defaults(node, trail: str = "") -> list[tuple[str, float]]:
"""Remove epoch-like numeric ``default`` values so the export is stable.
Returns the (location, value) pairs that were removed, for reporting.
"""
removed: list[tuple[str, float]] = []
if isinstance(node, dict):
default = node.get("default")
if (
isinstance(default, (int, float))
and not isinstance(default, bool)
and default >= VOLATILE_DEFAULT_THRESHOLD
):
removed.append((trail, default))
del node["default"]
for key, value in node.items():
removed.extend(strip_volatile_defaults(value, f"{trail}/{key}"))
elif isinstance(node, list):
for index, value in enumerate(node):
removed.extend(strip_volatile_defaults(value, f"{trail}[{index}]"))
return removed
def to_block_scalars(node):
"""Recursively render multi-line strings as literal block scalars.
Produces readable, deterministic YAML (``|-`` blocks) instead of long
double-quoted lines with escaped newlines.
"""
if isinstance(node, dict):
return {key: to_block_scalars(value) for key, value in node.items()}
if isinstance(node, list):
return [to_block_scalars(value) for value in node]
if isinstance(node, str) and "\n" in node:
return LiteralScalarString(node)
return node
def _iter_refs(node):
if isinstance(node, dict):
for key, value in node.items():
if key == "$ref" and isinstance(value, str):
yield value
else:
yield from _iter_refs(value)
elif isinstance(node, list):
for value in node:
yield from _iter_refs(value)
def validate(spec: dict) -> list[str]:
"""Structural sanity checks on the generated document."""
problems: list[str] = []
schemas = set(spec.get("components", {}).get("schemas", {}))
defined_schemes = set(spec.get("components", {}).get("securitySchemes", {}))
for ref in _iter_refs(spec):
if ref.startswith("#/components/schemas/"):
name = ref.rsplit("/", 1)[-1]
if name not in schemas:
problems.append(f"dangling $ref: {ref}")
for path, path_item in spec.get("paths", {}).items():
for method, operation in path_item.items():
if method.lower() not in HTTP_METHODS or not isinstance(operation, dict):
continue
location = f"{method.upper()} {path}"
if "x-required-role" not in operation:
problems.append(f"missing x-required-role: {location}")
if "security" not in operation:
problems.append(f"missing security: {location}")
continue
for requirement in operation["security"]:
for scheme in requirement:
if scheme not in defined_schemes:
problems.append(
f"undefined security scheme {scheme}: {location}"
)
return sorted(set(problems))
def render(spec: dict) -> str:
"""Serialize the spec to the canonical YAML string (with the header)."""
yaml = YAML()
yaml.width = 80
yaml.indent(mapping=2, sequence=4, offset=2)
stream = io.StringIO()
yaml.dump(spec, stream)
return HEADER + stream.getvalue()
def build_spec() -> tuple[dict, dict, list, list, list]:
app = build_app()
exempt_paths, exempt_prefixes = read_exempt_rules()
access_map = build_access_map(app, exempt_paths, exempt_prefixes)
spec = base_document(app.openapi())
normalized = strip_volatile_defaults(spec)
counts, flagged, unmatched = enrich(spec, access_map)
spec = to_block_scalars(spec)
return spec, counts, flagged, unmatched, normalized
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description="Generate the annotated OpenAPI spec.")
parser.add_argument(
"--check",
action="store_true",
help="verify the committed spec is up to date without writing; "
"exit non-zero if it would change",
)
args = parser.parse_args(argv)
spec, counts, flagged, unmatched, normalized = build_spec()
problems = validate(spec)
rendered = render(spec)
if args.check:
return _check(rendered, problems)
if problems:
logger.error("Refusing to write — generated spec failed validation:")
for problem in problems:
logger.error(" %s", problem)
return 1
OUTPUT_SPEC.write_text(rendered)
_report(counts, flagged, unmatched, normalized)
logger.info("\nWrote %s", OUTPUT_SPEC.relative_to(REPO_ROOT))
return 0
def _check(rendered: str, problems: list[str]) -> int:
name = OUTPUT_SPEC.relative_to(REPO_ROOT)
if problems:
logger.error("Generated spec failed validation:")
for problem in problems:
logger.error(" %s", problem)
return 1
current = OUTPUT_SPEC.read_text() if OUTPUT_SPEC.exists() else ""
if current == rendered:
logger.info("%s is up to date", name)
return 0
logger.error(
"%s is out of date. Regenerate with: python3 %s",
name,
Path(__file__).name,
)
diff = difflib.unified_diff(
current.splitlines(),
rendered.splitlines(),
fromfile=f"{name} (committed)",
tofile=f"{name} (generated)",
lineterm="",
n=2,
)
for shown, line in enumerate(diff):
if shown >= 60:
logger.error(" ... (diff truncated)")
break
logger.error(" %s", line)
return 1
def _report(counts, flagged, unmatched, normalized) -> None:
logger.info("Access levels applied:")
for level in (PUBLIC, AUTHENTICATED, CAMERA, ADMIN):
logger.info(" %-14s %d", level, counts.get(level, 0))
logger.info(" %-14s %d", "total", sum(counts.values()))
if normalized:
logger.info("\nStripped volatile timestamp defaults (%d):", len(normalized))
for location, value in normalized:
logger.info(" %s = %s", location.lstrip("/"), value)
if flagged:
logger.info("\nFlagged for manual confirmation (%d):", len(flagged))
for item in flagged:
logger.info(" %-6s %s", item["method"], item["path"])
logger.info(" -> %s (%s)", item["level"], item["flag"])
if unmatched:
logger.info(
"\nOperations with no classification (%d) [unexpected]:", len(unmatched)
)
for method, path in unmatched:
logger.info(" %-6s %s", method, path)
if __name__ == "__main__":
sys.exit(main())

View File

@ -262,6 +262,10 @@
"label": "Enable motion detection",
"description": "Enable or disable motion detection for this camera."
},
"source": {
"label": "Motion source",
"description": "Where motion state comes from: Frigate's internal frame analyser, or the camera's ONVIF cell-motion events (requires onvif.events.enabled)."
},
"threshold": {
"label": "Motion threshold",
"description": "Pixel difference threshold used by the motion detector; higher values reduce sensitivity (range 1-255)."
@ -843,6 +847,22 @@
"description": "Internal field to track whether autotracking was enabled in configuration."
}
},
"events": {
"label": "ONVIF events",
"description": "Consume camera-side ONVIF motion notifications instead of Frigate's CPU motion detector.",
"enabled": {
"label": "Enable ONVIF events",
"description": "Subscribe to the camera's ONVIF cell-motion notifications and use them as Frigate's motion signal."
},
"subscription_timeout": {
"label": "Subscription timeout",
"description": "Seconds before the PullPoint subscription expires and is renewed."
},
"use_metadata_stream": {
"label": "Use metadata stream",
"description": "Open the ONVIF analytics RTSP metadata stream to receive per-cell motion coordinates. Falls back to a full-frame box when disabled or when the camera does not advertise the track."
}
},
"ignore_time_mismatch": {
"label": "Ignore time mismatch",
"description": "Ignore time synchronization differences between camera and Frigate server for ONVIF communication."

View File

@ -769,6 +769,10 @@
"label": "Enable motion detection",
"description": "Enable or disable motion detection for all cameras; can be overridden per-camera."
},
"source": {
"label": "Motion source",
"description": "Where motion state comes from: Frigate's internal frame analyser, or the camera's ONVIF cell-motion events (requires onvif.events.enabled)."
},
"threshold": {
"label": "Motion threshold",
"description": "Pixel difference threshold used by the motion detector; higher values reduce sensitivity (range 1-255)."
@ -1623,6 +1627,22 @@
"description": "Internal field to track whether autotracking was enabled in configuration."
}
},
"events": {
"label": "ONVIF events",
"description": "Consume camera-side ONVIF motion notifications instead of Frigate's CPU motion detector.",
"enabled": {
"label": "Enable ONVIF events",
"description": "Subscribe to the camera's ONVIF cell-motion notifications and use them as Frigate's motion signal."
},
"subscription_timeout": {
"label": "Subscription timeout",
"description": "Seconds before the PullPoint subscription expires and is renewed."
},
"use_metadata_stream": {
"label": "Use metadata stream",
"description": "Open the ONVIF analytics RTSP metadata stream to receive per-cell motion coordinates. Falls back to a full-frame box when disabled or when the camera does not advertise the track."
}
},
"ignore_time_mismatch": {
"label": "Ignore time mismatch",
"description": "Ignore time synchronization differences between camera and Frigate server for ONVIF communication."