Filter outbound websocket broadcasts by per-recipient camera access (#23256)

* filter outbound ws broadcasts by per-recipient camera access

* fan out config updates to comms

* tests

* mypy

* allow viewers to use jobstate

* update agent instructions

* remove vitest
This commit is contained in:
Josh Hawkins 2026-05-19 14:51:16 -05:00 committed by GitHub
parent b0b00fe1d0
commit 7881bea60f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 1203 additions and 7 deletions

View File

@ -162,7 +162,6 @@ When reviewing code, do NOT comment on:
- **Linting**: ESLint (see `web/.eslintrc.cjs`)
- **Formatting**: Prettier with Tailwind CSS plugin
- **Type Safety**: TypeScript strict mode enabled
- **Testing**: Vitest for unit tests
### Component Patterns
@ -233,6 +232,9 @@ ruff format frigate/
# Run linter
ruff check frigate/
# Type check
python3 -u -m mypy --config-file frigate/mypy.ini frigate
```
### Frontend (from web/ directory)
@ -252,6 +254,38 @@ npm run lint:fix
# Format code
npm run prettier:write
# E2E: first-time setup
npm install
npx playwright install chromium
# E2E: build the app and run all tests
npm run e2e:build && npm run e2e
# E2E: interactive UI for debugging
npm run e2e:ui
# E2E: run a specific spec
npx playwright test --config e2e/playwright.config.ts e2e/specs/live.spec.ts
# E2E: filter by name, or run only desktop/mobile
npx playwright test --config e2e/playwright.config.ts --grep="severity tab"
npx playwright test --config e2e/playwright.config.ts --project=desktop
# E2E: regenerate mock data after backend model changes (from repo root)
PYTHONPATH=. python3 web/e2e/fixtures/mock-data/generate-mock-data.py
# Regenerate config translations from Pydantic models — outputs to
# web/public/locales/en/config/{global,cameras}.json. NEVER edit those
# JSON files by hand; change the Pydantic field title/description and
# re-run this script. (from repo root)
python3 generate_config_translations.py
# Extract i18n keys from source into the locale files after adding
# new t() calls. Use the :ci variant to verify the locale files are
# in sync with source (fails if extraction would change anything).
npm run i18n:extract
npm run i18n:extract:ci
```
### Docker Development
@ -371,6 +405,10 @@ except ValueError:
)
```
## WebSocket Broadcasts
Outbound WebSocket broadcasts go through a per-recipient classifier in `frigate/comms/ws.py` that enforces camera-level access. **The classifier is fail-closed: any topic it doesn't recognize is dropped for every client.** New outbound topics must be classified there or they'll silently disappear.
## Project-Specific Conventions
### Configuration Files

View File

@ -774,6 +774,8 @@ def config_set(request: Request, body: AppConfigSetBody):
if request.app.dispatcher is not None:
request.app.dispatcher.config = config
for comm in request.app.dispatcher.comms:
comm.config = config
if body.update_topic:
if body.update_topic.startswith("config/cameras/"):

View File

@ -34,6 +34,8 @@ from frigate.const import (
UPDATE_REVIEW_DESCRIPTION,
UPSERT_REVIEW_SEGMENT,
)
from frigate.models import User
from frigate.output.ws_auth import ws_has_camera_access
logger = logging.getLogger(__name__)
@ -66,6 +68,7 @@ _WS_VIEWER_TOPICS = frozenset(
"audioTranscriptionState",
"birdseyeLayout",
"embeddingsReindexProgress",
"jobState",
}
)
@ -102,6 +105,321 @@ def _check_ws_authorization(
return topic in _WS_VIEWER_TOPICS
# ---- Outbound filtering ---------------------------------------------------
#
# Every WebSocket broadcast is classified into one of a small set of scopes,
# then materialized per recipient. Connections with restricted roles only see
# data for cameras they are authorized to access; admin and full-access roles
# behave as today.
# Topics that are safe to broadcast to every authenticated client.
_WS_GLOBAL_OUTBOUND_TOPICS = frozenset(
{
"model_state",
"embeddings_reindex_progress",
"audio_transcription_state",
"profile/state",
"notifications/state",
"notification_test",
}
)
# Topics that restricted roles must never receive. Birdseye composites span
# all cameras, so the existing JSMPEG policy already restricts birdseye access
# to unrestricted roles; the layout broadcast follows the same rule.
_WS_UNRESTRICTED_ONLY_TOPICS = frozenset(
{
"birdseye_layout",
}
)
# Topics whose payload (parsed as JSON) names a single owning camera at the
# given key path. Used to scope events, reviews, triggers, etc.
_WS_PAYLOAD_CAMERA_TOPICS: dict[str, tuple[str, ...]] = {
"events": ("after", "camera"),
"reviews": ("after", "camera"),
"tracked_object_update": ("camera",),
"triggers": ("camera",),
"camera_monitoring": ("camera",),
}
# Topics whose payload is a dict keyed by camera name; filter keys per
# recipient.
_WS_RESHAPE_BY_CAMERA_KEY_TOPICS = frozenset(
{
"camera_activity",
"audio_detections",
}
)
# Topics whose payload is a dict keyed by job_type, where each entry may
# contain a "camera" or "source_camera" field, or a nested ``results.jobs``
# list of per-camera sub-jobs (export broadcasts).
_WS_RESHAPE_JOB_STATE_TOPICS = frozenset(
{
"job_state",
}
)
# Topics whose payload mixes global aggregates with a ``cameras`` sub-dict
# keyed by camera name. Aggregates and detector data stay; per-camera entries
# are filtered.
_WS_RESHAPE_STATS_TOPICS = frozenset(
{
"stats",
}
)
def _collect_zone_names(config: FrigateConfig) -> set[str]:
"""Return the set of all zone names defined across cameras."""
names: set[str] = set()
for camera in config.cameras.values():
zones = getattr(camera, "zones", None) or {}
names.update(zones.keys())
return names
def _parse_json_payload(payload: Any) -> Any:
"""Return payload parsed as JSON if it is a string, else as-is."""
if isinstance(payload, str):
try:
return json.loads(payload)
except (ValueError, TypeError):
return None
return payload
def _scope_job_entry_to_allowed(entry: Any, allowed: set[str]) -> dict[str, Any] | None:
"""Filter a single job_state entry to the recipient's allowed cameras.
Returns the (possibly reshaped) entry, or None to drop it. Four shapes
are handled:
* Top-level ``camera`` or ``source_camera`` (motion_search, vlm_watch,
export sub-job dicts): drop the entry if not allowed.
* Nested ``results.jobs`` list of per-camera sub-jobs (the aggregated
export broadcast): filter the list; drop the entry if nothing remains.
* Nested ``results.camera`` or ``results.source_camera`` (debug_replay,
which puts replay-specific fields inside ``results``): drop the entry
if not allowed.
* No camera anywhere (e.g. ``media_sync``): treat as global and keep.
"""
if not isinstance(entry, dict):
return None
cam = entry.get("camera") or entry.get("source_camera")
if cam is None:
results = entry.get("results")
if isinstance(results, dict):
sub_jobs = results.get("jobs")
if isinstance(sub_jobs, list):
filtered_jobs = [
j
for j in sub_jobs
if isinstance(j, dict)
and (j.get("camera") or j.get("source_camera")) in allowed
]
if not filtered_jobs:
return None
reshaped = dict(entry)
reshaped["results"] = dict(results)
reshaped["results"]["jobs"] = filtered_jobs
return reshaped
cam = results.get("camera") or results.get("source_camera")
if cam is not None:
return entry if cam in allowed else None
return entry
def _extract_payload_camera(payload: Any, path: tuple[str, ...]) -> str | None:
"""Walk the dotted path through a (possibly JSON-encoded) payload."""
cur = _parse_json_payload(payload)
for key in path:
if not isinstance(cur, dict):
return None
cur = cur.get(key)
return cur if isinstance(cur, str) else None
def _classify_outbound(
topic: str, all_cameras: set[str], all_zones: set[str]
) -> tuple[str, Any]:
"""Classify an outbound topic into (kind, extra).
kind values:
- "global" : send to every authenticated client
- "drop" : send to nobody (fail-closed for unknowns)
- "unrestricted_only" : send only to admin/full-access roles
- "camera" : extra is the owning camera name
- "payload_camera" : extra is the JSON key path to the camera name
- "reshape_by_camera_key"
- "reshape_job_state"
- "reshape_stats"
"""
if topic in _WS_GLOBAL_OUTBOUND_TOPICS:
return ("global", None)
if topic in _WS_UNRESTRICTED_ONLY_TOPICS:
return ("unrestricted_only", None)
if topic in _WS_RESHAPE_BY_CAMERA_KEY_TOPICS:
return ("reshape_by_camera_key", None)
if topic in _WS_RESHAPE_JOB_STATE_TOPICS:
return ("reshape_job_state", None)
if topic in _WS_RESHAPE_STATS_TOPICS:
return ("reshape_stats", None)
if topic in _WS_PAYLOAD_CAMERA_TOPICS:
return ("payload_camera", _WS_PAYLOAD_CAMERA_TOPICS[topic])
# Topic-prefix based: first segment names the owning camera or zone.
first = topic.split("/", 1)[0]
if first in all_cameras:
return ("camera", first)
if first in all_zones:
# Zone aggregates span cameras; restricted users see nothing here.
return ("unrestricted_only", None)
return ("drop", None)
def _ws_role_header(ws: Any) -> str | None:
"""Return the HTTP_REMOTE_ROLE header value, if any."""
environ = getattr(ws, "environ", None)
if not environ:
return None
value = environ.get("HTTP_REMOTE_ROLE")
return value if isinstance(value, str) else None
def _ws_valid_roles(ws: Any, config: FrigateConfig) -> list[str]:
"""Return the list of recognized roles for this connection."""
header = _ws_role_header(ws)
if not header:
return []
roles = [r.strip() for r in header.split(config.proxy.separator) if r.strip()]
return [r for r in roles if r in config.auth.roles]
def _ws_is_unrestricted(ws: Any, config: FrigateConfig) -> bool:
"""True when the connection has unrestricted camera access.
Mirrors the policy in ``frigate.output.ws_auth``: admin or any role with
an empty allow-list grants full access.
"""
roles = _ws_valid_roles(ws, config)
if not roles:
return False
roles_dict = config.auth.roles
return any(r == "admin" or not roles_dict.get(r) for r in roles)
def _ws_allowed_cameras(ws: Any, config: FrigateConfig) -> set[str]:
"""Return the union of cameras this connection may access across its roles."""
roles = _ws_valid_roles(ws, config)
if not roles:
return set()
all_cameras = set(config.cameras.keys())
allowed: set[str] = set()
for role in roles:
if role == "admin" or not config.auth.roles.get(role):
return all_cameras
allowed.update(User.get_allowed_cameras(role, config.auth.roles, all_cameras))
return allowed
def _wrap_envelope(topic: str, inner_payload: Any) -> str:
"""Re-serialize a (topic, payload) message after payload reshaping.
Frigate's wire format keeps payloads as JSON-encoded strings inside the
outer envelope, mirroring what producers send today.
"""
return json.dumps({"topic": topic, "payload": json.dumps(inner_payload)})
def _materialize_for_ws(
ws: Any,
topic: str,
full_message: str,
scope: tuple[str, Any],
parsed_payload: Any,
config: FrigateConfig,
) -> str | None:
"""Return the JSON string to deliver to ``ws``, or None to skip it."""
kind, extra = scope
has_role = _ws_role_header(ws) is not None
if kind == "drop":
return None
if kind == "global":
# Globals still require an authenticated connection. Missing role
# falls back to viewer semantics (matching the inbound rule).
return full_message
# Beyond globals, an authenticated role header is required (fail-closed).
if not has_role:
return None
if kind == "unrestricted_only":
return full_message if _ws_is_unrestricted(ws, config) else None
if kind == "camera":
return full_message if ws_has_camera_access(ws, extra, config) else None
if kind == "payload_camera":
camera = _extract_payload_camera(parsed_payload, extra)
if camera is None:
return None
return full_message if ws_has_camera_access(ws, camera, config) else None
if kind == "reshape_by_camera_key":
if _ws_is_unrestricted(ws, config):
return full_message
if not isinstance(parsed_payload, dict):
return None
allowed = _ws_allowed_cameras(ws, config)
filtered = {cam: data for cam, data in parsed_payload.items() if cam in allowed}
if not filtered:
return None
return _wrap_envelope(topic, filtered)
if kind == "reshape_job_state":
if _ws_is_unrestricted(ws, config):
return full_message
if not isinstance(parsed_payload, dict):
return None
allowed = _ws_allowed_cameras(ws, config)
filtered_jobs: dict[str, Any] = {}
for job_type, job_payload in parsed_payload.items():
scoped = _scope_job_entry_to_allowed(job_payload, allowed)
if scoped is not None:
filtered_jobs[job_type] = scoped
if not filtered_jobs:
return None
return _wrap_envelope(topic, filtered_jobs)
if kind == "reshape_stats":
if _ws_is_unrestricted(ws, config):
return full_message
if not isinstance(parsed_payload, dict):
return None
allowed = _ws_allowed_cameras(ws, config)
cameras_block = parsed_payload.get("cameras")
if isinstance(cameras_block, dict):
filtered_cameras = {
name: data for name, data in cameras_block.items() if name in allowed
}
reshaped = dict(parsed_payload)
reshaped["cameras"] = filtered_cameras
return _wrap_envelope(topic, reshaped)
return full_message
return None
class WebSocket(WebSocket_): # type: ignore[misc]
def unhandled_error(self, error: Any) -> None:
"""
@ -183,6 +501,10 @@ class WebSocketClient(Communicator):
self.websocket_thread.start()
def publish(self, topic: str, payload: Any, _: bool = False) -> None:
if self.websocket_server is None:
logger.debug("Skipping message, websocket not connected yet")
return
try:
ws_message = json.dumps(
{
@ -195,13 +517,41 @@ class WebSocketClient(Communicator):
logger.debug(f"payload for {topic} wasn't text. Skipping...")
return
if self.websocket_server is None:
logger.debug("Skipping message, websocket not connected yet")
all_cameras = set(self.config.cameras.keys())
all_zones = _collect_zone_names(self.config)
scope = _classify_outbound(topic, all_cameras, all_zones)
if scope[0] == "drop":
return
# Pre-parse payload once for topics that need to read its contents.
parsed_payload: Any = None
if scope[0] in (
"payload_camera",
"reshape_by_camera_key",
"reshape_job_state",
"reshape_stats",
):
parsed_payload = _parse_json_payload(payload)
if parsed_payload is None:
# malformed payload — fail closed
return
manager = self.websocket_server.manager
with manager.lock:
websockets = list(manager.websockets.values())
for ws in websockets:
if getattr(ws, "terminated", False):
continue
message = _materialize_for_ws(
ws, topic, ws_message, scope, parsed_payload, self.config
)
if message is None:
continue
try:
self.websocket_server.manager.broadcast(ws_message)
except ConnectionResetError:
ws.send(message)
except (ConnectionResetError, BrokenPipeError, ValueError):
pass
def stop(self) -> None:

View File

@ -0,0 +1,806 @@
"""Tests for outbound WebSocket broadcast filtering."""
import json
import threading
import unittest
from types import SimpleNamespace
from typing import Any
from frigate.comms.ws import (
WebSocketClient,
_classify_outbound,
_collect_zone_names,
_extract_payload_camera,
_materialize_for_ws,
_ws_allowed_cameras,
_ws_is_unrestricted,
)
from frigate.config import FrigateConfig
def _build_config(
*,
extra_roles: dict[str, list[str]] | None = None,
extra_cameras: dict[str, dict[str, Any]] | None = None,
extra_zones: dict[str, dict[str, dict[str, Any]]] | None = None,
) -> FrigateConfig:
"""Construct a FrigateConfig used by the outbound filter tests.
The default fixture has three cameras: front_door, back_door, garage.
Restricted role "house_only" sees front_door + back_door but not garage.
"""
cameras: dict[str, dict[str, Any]] = {
"front_door": {
"ffmpeg": {
"inputs": [{"path": "rtsp://10.0.0.1:554/v", "roles": ["detect"]}],
},
"detect": {"height": 1080, "width": 1920, "fps": 5},
},
"back_door": {
"ffmpeg": {
"inputs": [{"path": "rtsp://10.0.0.2:554/v", "roles": ["detect"]}],
},
"detect": {"height": 1080, "width": 1920, "fps": 5},
},
"garage": {
"ffmpeg": {
"inputs": [{"path": "rtsp://10.0.0.3:554/v", "roles": ["detect"]}],
},
"detect": {"height": 1080, "width": 1920, "fps": 5},
},
}
if extra_cameras:
cameras.update(extra_cameras)
if extra_zones:
for cam_name, zones in extra_zones.items():
cameras[cam_name]["zones"] = zones
roles = {"house_only": ["front_door", "back_door"]}
if extra_roles:
roles.update(extra_roles)
return FrigateConfig(
mqtt={"host": "mqtt"},
auth={"roles": roles},
cameras=cameras,
)
def _ws(role: str | None) -> Any:
"""Build a fake ws4py-style websocket exposing ``environ``."""
environ = {} if role is None else {"HTTP_REMOTE_ROLE": role}
return SimpleNamespace(environ=environ, terminated=False, sent=[])
class TestClassifyOutbound(unittest.TestCase):
"""The pure classifier — bucket every topic into a scope."""
def setUp(self):
self.config = _build_config(
extra_zones={"front_door": {"driveway": {"coordinates": "0,0,1,0,1,1,0,1"}}}
)
self.all_cameras = set(self.config.cameras.keys())
self.all_zones = _collect_zone_names(self.config)
def _classify(self, topic: str) -> tuple[str, Any]:
return _classify_outbound(topic, self.all_cameras, self.all_zones)
# --- Global allowlist ---
def test_model_state_is_global(self):
self.assertEqual(self._classify("model_state"), ("global", None))
def test_profile_state_is_global(self):
self.assertEqual(self._classify("profile/state"), ("global", None))
def test_bare_notifications_state_is_global(self):
"""The 2-segment ``notifications/state`` is global; the 3-segment
``<camera>/notifications/state`` is camera-scoped (see below)."""
self.assertEqual(self._classify("notifications/state"), ("global", None))
def test_notification_test_is_global(self):
self.assertEqual(self._classify("notification_test"), ("global", None))
# --- Unrestricted-only ---
def test_birdseye_layout_is_unrestricted_only(self):
self.assertEqual(self._classify("birdseye_layout"), ("unrestricted_only", None))
# --- Camera-prefixed ---
def test_camera_state_topic_resolves_to_camera(self):
self.assertEqual(
self._classify("front_door/detect/state"), ("camera", "front_door")
)
def test_camera_motion_topic_resolves_to_camera(self):
self.assertEqual(self._classify("back_door/motion"), ("camera", "back_door"))
def test_camera_per_notification_topic_resolves_to_camera(self):
self.assertEqual(
self._classify("front_door/notifications/state"),
("camera", "front_door"),
)
def test_camera_label_counter_resolves_to_camera(self):
self.assertEqual(self._classify("front_door/person"), ("camera", "front_door"))
def test_camera_object_mask_state_resolves_to_camera(self):
self.assertEqual(
self._classify("front_door/object_mask/zone_1/state"),
("camera", "front_door"),
)
# --- Zone-prefixed ---
def test_zone_aggregate_topic_is_unrestricted_only(self):
self.assertEqual(self._classify("driveway/person"), ("unrestricted_only", None))
def test_zone_all_topic_is_unrestricted_only(self):
self.assertEqual(self._classify("driveway/all"), ("unrestricted_only", None))
# --- Payload-camera ---
def test_events_topic_marks_payload_camera_path(self):
self.assertEqual(
self._classify("events"), ("payload_camera", ("after", "camera"))
)
def test_reviews_topic_marks_payload_camera_path(self):
self.assertEqual(
self._classify("reviews"), ("payload_camera", ("after", "camera"))
)
def test_triggers_topic_marks_payload_camera_path(self):
self.assertEqual(self._classify("triggers"), ("payload_camera", ("camera",)))
def test_tracked_object_update_marks_payload_camera_path(self):
self.assertEqual(
self._classify("tracked_object_update"), ("payload_camera", ("camera",))
)
# --- Reshape ---
def test_camera_activity_is_reshape_by_camera_key(self):
self.assertEqual(
self._classify("camera_activity"), ("reshape_by_camera_key", None)
)
def test_audio_detections_is_reshape_by_camera_key(self):
self.assertEqual(
self._classify("audio_detections"), ("reshape_by_camera_key", None)
)
def test_job_state_is_reshape_job_state(self):
self.assertEqual(self._classify("job_state"), ("reshape_job_state", None))
def test_stats_is_reshape_stats(self):
self.assertEqual(self._classify("stats"), ("reshape_stats", None))
# --- Fail-closed ---
def test_unknown_topic_is_dropped(self):
self.assertEqual(self._classify("some_random_topic"), ("drop", None))
def test_unknown_camera_prefix_is_dropped(self):
self.assertEqual(self._classify("ghost_camera/detect/state"), ("drop", None))
class TestCollectZoneNames(unittest.TestCase):
def test_zones_from_all_cameras(self):
config = _build_config(
extra_zones={
"front_door": {"driveway": {"coordinates": "0,0,1,0,1,1,0,1"}},
"back_door": {"yard": {"coordinates": "0,0,1,0,1,1,0,1"}},
}
)
self.assertEqual(_collect_zone_names(config), {"driveway", "yard"})
def test_no_zones_returns_empty(self):
self.assertEqual(_collect_zone_names(_build_config()), set())
class TestExtractPayloadCamera(unittest.TestCase):
def test_extract_from_dict_path(self):
payload = {"after": {"camera": "front_door"}}
self.assertEqual(
_extract_payload_camera(payload, ("after", "camera")), "front_door"
)
def test_extract_from_json_string(self):
payload = json.dumps({"after": {"camera": "front_door"}})
self.assertEqual(
_extract_payload_camera(payload, ("after", "camera")), "front_door"
)
def test_extract_single_segment_path(self):
self.assertEqual(
_extract_payload_camera({"camera": "garage"}, ("camera",)), "garage"
)
def test_missing_key_returns_none(self):
self.assertIsNone(_extract_payload_camera({}, ("after", "camera")))
def test_malformed_json_returns_none(self):
self.assertIsNone(_extract_payload_camera("not-json", ("camera",)))
def test_non_string_camera_returns_none(self):
self.assertIsNone(_extract_payload_camera({"camera": 42}, ("camera",)))
class TestWsRoleHelpers(unittest.TestCase):
def setUp(self):
self.config = _build_config()
def test_admin_is_unrestricted(self):
self.assertTrue(_ws_is_unrestricted(_ws("admin"), self.config))
def test_viewer_is_unrestricted(self):
self.assertTrue(_ws_is_unrestricted(_ws("viewer"), self.config))
def test_restricted_role_is_not_unrestricted(self):
self.assertFalse(_ws_is_unrestricted(_ws("house_only"), self.config))
def test_missing_role_is_not_unrestricted(self):
self.assertFalse(_ws_is_unrestricted(_ws(None), self.config))
def test_unknown_role_is_not_unrestricted(self):
self.assertFalse(_ws_is_unrestricted(_ws("ghost"), self.config))
def test_admin_allowed_cameras_is_all(self):
self.assertEqual(
_ws_allowed_cameras(_ws("admin"), self.config),
{"front_door", "back_door", "garage"},
)
def test_restricted_role_allowed_cameras_is_subset(self):
self.assertEqual(
_ws_allowed_cameras(_ws("house_only"), self.config),
{"front_door", "back_door"},
)
def test_missing_role_allowed_cameras_is_empty(self):
self.assertEqual(_ws_allowed_cameras(_ws(None), self.config), set())
def test_multi_role_union_grants_widest(self):
self.assertEqual(
_ws_allowed_cameras(_ws("house_only,admin"), self.config),
{"front_door", "back_door", "garage"},
)
class TestMaterializeForWs(unittest.TestCase):
def setUp(self):
self.config = _build_config(
extra_zones={"front_door": {"driveway": {"coordinates": "0,0,1,0,1,1,0,1"}}}
)
self.all_cameras = set(self.config.cameras.keys())
self.all_zones = _collect_zone_names(self.config)
def _materialize(self, ws: Any, topic: str, payload: Any) -> str | None:
scope = _classify_outbound(topic, self.all_cameras, self.all_zones)
from frigate.comms.ws import _parse_json_payload
parsed = (
_parse_json_payload(payload)
if scope[0]
in (
"payload_camera",
"reshape_by_camera_key",
"reshape_job_state",
"reshape_stats",
)
else None
)
full = json.dumps({"topic": topic, "payload": payload})
return _materialize_for_ws(ws, topic, full, scope, parsed, self.config)
# --- Globals: every authenticated client sees them ---
def test_globals_reach_admin(self):
self.assertIsNotNone(self._materialize(_ws("admin"), "model_state", "{}"))
def test_globals_reach_restricted(self):
self.assertIsNotNone(self._materialize(_ws("house_only"), "model_state", "{}"))
def test_globals_reach_no_role(self):
"""A missing role header still gets globals (matches viewer-default
for inbound)."""
self.assertIsNotNone(self._materialize(_ws(None), "model_state", "{}"))
# --- Unknown topic dropped for everyone ---
def test_unknown_topic_dropped_for_admin(self):
self.assertIsNone(self._materialize(_ws("admin"), "rogue_topic", "{}"))
# --- Non-global topics require a role (fail-closed) ---
def test_no_role_blocked_from_camera_topic(self):
self.assertIsNone(self._materialize(_ws(None), "front_door/detect/state", "ON"))
def test_no_role_blocked_from_events(self):
payload = json.dumps({"after": {"camera": "front_door"}})
self.assertIsNone(self._materialize(_ws(None), "events", payload))
# --- Camera-prefixed ---
def test_restricted_role_sees_allowed_camera(self):
self.assertIsNotNone(
self._materialize(_ws("house_only"), "front_door/detect/state", "ON")
)
def test_restricted_role_blocked_from_unallowed_camera(self):
self.assertIsNone(
self._materialize(_ws("house_only"), "garage/detect/state", "ON")
)
def test_admin_sees_all_camera_topics(self):
self.assertIsNotNone(
self._materialize(_ws("admin"), "garage/detect/state", "ON")
)
# --- Unrestricted-only (zones, birdseye_layout) ---
def test_zone_aggregate_blocked_for_restricted(self):
self.assertIsNone(self._materialize(_ws("house_only"), "driveway/person", 3))
def test_zone_aggregate_visible_to_admin(self):
self.assertIsNotNone(self._materialize(_ws("admin"), "driveway/person", 3))
def test_birdseye_layout_blocked_for_restricted(self):
payload = json.dumps(
{"front_door": {"x": 0, "y": 0, "width": 100, "height": 100}}
)
self.assertIsNone(
self._materialize(_ws("house_only"), "birdseye_layout", payload)
)
def test_birdseye_layout_visible_to_admin(self):
payload = json.dumps(
{"front_door": {"x": 0, "y": 0, "width": 100, "height": 100}}
)
self.assertIsNotNone(
self._materialize(_ws("admin"), "birdseye_layout", payload)
)
# --- Payload-camera ---
def test_events_filtered_by_payload_camera(self):
payload = json.dumps({"after": {"camera": "garage"}})
self.assertIsNone(self._materialize(_ws("house_only"), "events", payload))
payload = json.dumps({"after": {"camera": "front_door"}})
self.assertIsNotNone(self._materialize(_ws("house_only"), "events", payload))
def test_events_with_missing_camera_dropped(self):
payload = json.dumps({"after": {}})
self.assertIsNone(self._materialize(_ws("house_only"), "events", payload))
def test_triggers_filtered_by_payload_camera(self):
payload = json.dumps({"name": "t1", "camera": "garage"})
self.assertIsNone(self._materialize(_ws("house_only"), "triggers", payload))
# --- Reshape: dict keyed by camera ---
def test_camera_activity_filtered_to_allowed_keys(self):
payload = json.dumps(
{
"front_door": {"objects": 1},
"back_door": {"objects": 0},
"garage": {"objects": 2},
}
)
message = self._materialize(_ws("house_only"), "camera_activity", payload)
self.assertIsNotNone(message)
envelope = json.loads(message) # type: ignore[arg-type]
inner = json.loads(envelope["payload"])
self.assertEqual(set(inner.keys()), {"front_door", "back_door"})
self.assertNotIn("garage", inner)
def test_camera_activity_unchanged_for_admin(self):
payload = json.dumps({"front_door": {}, "back_door": {}, "garage": {}})
message = self._materialize(_ws("admin"), "camera_activity", payload)
envelope = json.loads(message) # type: ignore[arg-type]
self.assertEqual(envelope["payload"], payload)
def test_camera_activity_with_no_allowed_returns_none(self):
payload = json.dumps({"garage": {"objects": 2}})
self.assertIsNone(
self._materialize(_ws("house_only"), "camera_activity", payload)
)
def test_audio_detections_filtered_to_allowed_keys(self):
payload = json.dumps({"front_door": {"bark": {}}, "garage": {"speech": {}}})
message = self._materialize(_ws("house_only"), "audio_detections", payload)
envelope = json.loads(message) # type: ignore[arg-type]
inner = json.loads(envelope["payload"])
self.assertEqual(set(inner.keys()), {"front_door"})
# --- Reshape: job_state ---
def test_job_state_admin_sees_full_payload(self):
payload = json.dumps(
{
"motion_search": {"job_type": "motion_search", "camera": "garage"},
"media_sync": {"job_type": "media_sync"},
}
)
message = self._materialize(_ws("admin"), "job_state", payload)
envelope = json.loads(message) # type: ignore[arg-type]
self.assertEqual(envelope["payload"], payload)
def test_job_state_restricted_keeps_allowed_camera_jobs(self):
"""Top-level camera field on a job entry: drop if not allowed."""
payload = json.dumps(
{
"motion_search": {"job_type": "motion_search", "camera": "front_door"},
"vlm_watch": {"job_type": "vlm_watch", "camera": "garage"},
}
)
message = self._materialize(_ws("house_only"), "job_state", payload)
envelope = json.loads(message) # type: ignore[arg-type]
inner = json.loads(envelope["payload"])
self.assertIn("motion_search", inner)
self.assertNotIn("vlm_watch", inner)
def test_job_state_export_results_jobs_filtered_per_recipient(self):
"""The aggregated export broadcast nests per-camera sub-jobs under
``results.jobs``. Restricted users must only see allowed entries."""
payload = json.dumps(
{
"export": {
"job_type": "export",
"status": "running",
"results": {
"jobs": [
{"job_type": "export", "camera": "front_door", "id": "a"},
{"job_type": "export", "camera": "garage", "id": "b"},
{"job_type": "export", "camera": "back_door", "id": "c"},
]
},
}
}
)
message = self._materialize(_ws("house_only"), "job_state", payload)
envelope = json.loads(message) # type: ignore[arg-type]
inner = json.loads(envelope["payload"])
self.assertIn("export", inner)
kept_cameras = [j["camera"] for j in inner["export"]["results"]["jobs"]]
self.assertEqual(kept_cameras, ["front_door", "back_door"])
# Sibling fields like ``status`` must survive reshaping.
self.assertEqual(inner["export"]["status"], "running")
def test_job_state_export_entry_dropped_when_no_jobs_allowed(self):
payload = json.dumps(
{
"export": {
"job_type": "export",
"status": "running",
"results": {
"jobs": [
{"job_type": "export", "camera": "garage", "id": "b"},
]
},
}
}
)
self.assertIsNone(self._materialize(_ws("house_only"), "job_state", payload))
# --- Reshape: stats ---
def _stats_payload(self) -> str:
return json.dumps(
{
"cameras": {
"front_door": {"camera_fps": 5.0, "pid": 1234},
"back_door": {"camera_fps": 5.0, "pid": 1235},
"garage": {"camera_fps": 5.0, "pid": 1236},
},
"detectors": {"cpu": {"detection_start": 0.0, "inference_speed": 10}},
"service": {"uptime": 12345, "version": "0.16.0"},
"camera_fps": 15.0,
"detection_fps": 6.0,
}
)
def test_stats_admin_sees_full_payload(self):
message = self._materialize(_ws("admin"), "stats", self._stats_payload())
envelope = json.loads(message) # type: ignore[arg-type]
self.assertEqual(envelope["payload"], self._stats_payload())
def test_stats_restricted_filters_camera_keys_but_keeps_aggregates(self):
message = self._materialize(_ws("house_only"), "stats", self._stats_payload())
envelope = json.loads(message) # type: ignore[arg-type]
inner = json.loads(envelope["payload"])
self.assertEqual(set(inner["cameras"].keys()), {"front_door", "back_door"})
self.assertNotIn("garage", inner["cameras"])
# Aggregates, detectors, and service block must survive.
self.assertEqual(inner["camera_fps"], 15.0)
self.assertEqual(inner["detection_fps"], 6.0)
self.assertIn("detectors", inner)
self.assertIn("service", inner)
def test_stats_restricted_with_no_allowed_cameras_still_sends_aggregates(self):
"""A restricted role whose allow-list contains only nonexistent cameras
still gets the global aggregates and service block."""
config = _build_config(extra_roles={"empty_role": ["nonexistent"]})
from frigate.comms.ws import _parse_json_payload
payload = self._stats_payload()
all_cameras = set(config.cameras.keys())
scope = _classify_outbound("stats", all_cameras, _collect_zone_names(config))
full = json.dumps({"topic": "stats", "payload": payload})
message = _materialize_for_ws(
_ws("empty_role"),
"stats",
full,
scope,
_parse_json_payload(payload),
config,
)
envelope = json.loads(message) # type: ignore[arg-type]
inner = json.loads(envelope["payload"])
self.assertEqual(inner["cameras"], {})
self.assertEqual(inner["camera_fps"], 15.0)
self.assertIn("service", inner)
def test_stats_without_cameras_key_passes_through(self):
"""A malformed stats payload missing the cameras sub-dict shouldn't
break delivery for restricted users fall back to the full message."""
payload = json.dumps({"detectors": {}, "service": {}, "detection_fps": 0.0})
message = self._materialize(_ws("house_only"), "stats", payload)
envelope = json.loads(message) # type: ignore[arg-type]
self.assertEqual(envelope["payload"], payload)
def test_job_state_export_entry_unchanged_for_admin(self):
payload = json.dumps(
{
"export": {
"job_type": "export",
"status": "running",
"results": {
"jobs": [
{"job_type": "export", "camera": "garage", "id": "b"},
]
},
}
}
)
message = self._materialize(_ws("admin"), "job_state", payload)
envelope = json.loads(message) # type: ignore[arg-type]
self.assertEqual(envelope["payload"], payload)
def test_job_state_restricted_keeps_global_jobs(self):
"""media_sync has no camera field; restricted users still see it."""
payload = json.dumps(
{"media_sync": {"job_type": "media_sync", "status": "running"}}
)
message = self._materialize(_ws("house_only"), "job_state", payload)
envelope = json.loads(message) # type: ignore[arg-type]
inner = json.loads(envelope["payload"])
self.assertIn("media_sync", inner)
def test_job_state_debug_replay_nested_source_camera_filtered(self):
"""debug_replay puts ``source_camera`` inside ``results`` (see
jobs/debug_replay.py:to_dict). Restricted users must not receive
entries whose nested source camera is unauthorized."""
payload = json.dumps(
{
"debug_replay": {
"id": "bd6dc99d-a7d",
"job_type": "debug_replay",
"status": "running",
"start_time": 1.0,
"end_time": None,
"error_message": None,
"results": {
"current_step": "preparing_clip",
"progress_percent": 0.0,
"source_camera": "garage",
"replay_camera_name": "_replay_garage",
"start_ts": 0.0,
"end_ts": 1.0,
},
}
}
)
self.assertIsNone(self._materialize(_ws("house_only"), "job_state", payload))
def test_job_state_debug_replay_nested_source_camera_allowed(self):
payload = json.dumps(
{
"debug_replay": {
"id": "bd6dc99d-a7d",
"job_type": "debug_replay",
"status": "running",
"results": {
"source_camera": "front_door",
"replay_camera_name": "_replay_front_door",
},
}
}
)
message = self._materialize(_ws("house_only"), "job_state", payload)
envelope = json.loads(message) # type: ignore[arg-type]
inner = json.loads(envelope["payload"])
self.assertIn("debug_replay", inner)
self.assertEqual(
inner["debug_replay"]["results"]["source_camera"], "front_door"
)
class _FakeManager:
"""Minimal ws4py manager: holds clients and exposes a lock."""
def __init__(self, clients: list[Any]) -> None:
self.lock = threading.Lock()
self.websockets = {id(c): c for c in clients}
class _FakeServer:
def __init__(self, manager: _FakeManager) -> None:
self.manager = manager
class _CapturingWs(SimpleNamespace):
"""Fake ws4py client that records what was sent."""
def __init__(self, role: str | None) -> None:
environ = {} if role is None else {"HTTP_REMOTE_ROLE": role}
super().__init__(environ=environ, terminated=False)
self.sent: list[str] = []
def send(self, message: str) -> None: # noqa: D401 - matches ws4py API
self.sent.append(message)
class TestPublishEndToEnd(unittest.TestCase):
"""Drive WebSocketClient.publish() against fake clients with different roles."""
def setUp(self):
self.config = _build_config(
extra_zones={"front_door": {"driveway": {"coordinates": "0,0,1,0,1,1,0,1"}}}
)
self.admin = _CapturingWs("admin")
self.restricted = _CapturingWs("house_only")
self.anon = _CapturingWs(None)
self.client = WebSocketClient(self.config)
self.client.websocket_server = _FakeServer(
_FakeManager([self.admin, self.restricted, self.anon])
)
def _payloads(self, ws: _CapturingWs) -> list[Any]:
return [json.loads(m)["payload"] for m in ws.sent]
def test_global_topic_reaches_everyone(self):
self.client.publish("model_state", "{}")
self.assertEqual(len(self.admin.sent), 1)
self.assertEqual(len(self.restricted.sent), 1)
self.assertEqual(len(self.anon.sent), 1)
def test_camera_topic_filters_restricted_recipient(self):
self.client.publish("garage/detect/state", "ON")
self.assertEqual(len(self.admin.sent), 1)
self.assertEqual(len(self.restricted.sent), 0)
self.assertEqual(len(self.anon.sent), 0)
def test_camera_topic_allows_restricted_recipient_for_allowed_camera(self):
self.client.publish("front_door/detect/state", "ON")
self.assertEqual(len(self.admin.sent), 1)
self.assertEqual(len(self.restricted.sent), 1)
self.assertEqual(len(self.anon.sent), 0)
def test_events_payload_filtered(self):
self.client.publish("events", json.dumps({"after": {"camera": "garage"}}))
self.assertEqual(len(self.admin.sent), 1)
self.assertEqual(len(self.restricted.sent), 0)
def test_camera_activity_reshaped_per_recipient(self):
self.client.publish(
"camera_activity",
json.dumps(
{
"front_door": {"objects": 1},
"back_door": {"objects": 0},
"garage": {"objects": 2},
}
),
)
self.assertEqual(len(self.admin.sent), 1)
admin_inner = json.loads(self._payloads(self.admin)[0])
self.assertEqual(set(admin_inner.keys()), {"front_door", "back_door", "garage"})
self.assertEqual(len(self.restricted.sent), 1)
restricted_inner = json.loads(self._payloads(self.restricted)[0])
self.assertEqual(set(restricted_inner.keys()), {"front_door", "back_door"})
self.assertEqual(len(self.anon.sent), 0)
def test_birdseye_layout_blocked_for_restricted_and_anon(self):
self.client.publish(
"birdseye_layout",
json.dumps({"front_door": {"x": 0, "y": 0, "width": 1, "height": 1}}),
)
self.assertEqual(len(self.admin.sent), 1)
self.assertEqual(len(self.restricted.sent), 0)
self.assertEqual(len(self.anon.sent), 0)
def test_zone_aggregate_blocked_for_restricted(self):
self.client.publish("driveway/person", 2)
self.assertEqual(len(self.admin.sent), 1)
self.assertEqual(len(self.restricted.sent), 0)
def test_stats_reshaped_per_recipient(self):
self.client.publish(
"stats",
json.dumps(
{
"cameras": {
"front_door": {"camera_fps": 5.0},
"garage": {"camera_fps": 5.0},
},
"service": {"uptime": 1},
"camera_fps": 10.0,
}
),
)
self.assertEqual(len(self.admin.sent), 1)
admin_inner = json.loads(self._payloads(self.admin)[0])
self.assertEqual(set(admin_inner["cameras"].keys()), {"front_door", "garage"})
self.assertEqual(len(self.restricted.sent), 1)
restricted_inner = json.loads(self._payloads(self.restricted)[0])
self.assertEqual(set(restricted_inner["cameras"].keys()), {"front_door"})
self.assertEqual(restricted_inner["camera_fps"], 10.0)
self.assertIn("service", restricted_inner)
# Stats requires a role; anonymous gets nothing.
self.assertEqual(len(self.anon.sent), 0)
def test_export_job_state_filters_results_jobs_per_recipient(self):
self.client.publish(
"job_state",
json.dumps(
{
"export": {
"job_type": "export",
"status": "running",
"results": {
"jobs": [
{"camera": "front_door", "id": "a"},
{"camera": "garage", "id": "b"},
]
},
}
}
),
)
self.assertEqual(len(self.admin.sent), 1)
admin_inner = json.loads(self._payloads(self.admin)[0])
self.assertEqual(
[j["camera"] for j in admin_inner["export"]["results"]["jobs"]],
["front_door", "garage"],
)
self.assertEqual(len(self.restricted.sent), 1)
restricted_inner = json.loads(self._payloads(self.restricted)[0])
self.assertEqual(
[j["camera"] for j in restricted_inner["export"]["results"]["jobs"]],
["front_door"],
)
def test_unknown_topic_dropped_for_everyone(self):
self.client.publish("some_rogue_topic", "data")
self.assertEqual(self.admin.sent, [])
self.assertEqual(self.restricted.sent, [])
self.assertEqual(self.anon.sent, [])
def test_terminated_client_is_skipped(self):
self.restricted.terminated = True
self.client.publish("front_door/detect/state", "ON")
self.assertEqual(len(self.admin.sent), 1)
self.assertEqual(len(self.restricted.sent), 0)
if __name__ == "__main__":
unittest.main()