diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md index 0af9c249f1..d87dbb239e 100644 --- a/.github/copilot-instructions.md +++ b/.github/copilot-instructions.md @@ -162,7 +162,6 @@ When reviewing code, do NOT comment on: - **Linting**: ESLint (see `web/.eslintrc.cjs`) - **Formatting**: Prettier with Tailwind CSS plugin - **Type Safety**: TypeScript strict mode enabled -- **Testing**: Vitest for unit tests ### Component Patterns @@ -233,6 +232,9 @@ ruff format frigate/ # Run linter ruff check frigate/ + +# Type check +python3 -u -m mypy --config-file frigate/mypy.ini frigate ``` ### Frontend (from web/ directory) @@ -252,6 +254,38 @@ npm run lint:fix # Format code npm run prettier:write + +# E2E: first-time setup +npm install +npx playwright install chromium + +# E2E: build the app and run all tests +npm run e2e:build && npm run e2e + +# E2E: interactive UI for debugging +npm run e2e:ui + +# E2E: run a specific spec +npx playwright test --config e2e/playwright.config.ts e2e/specs/live.spec.ts + +# E2E: filter by name, or run only desktop/mobile +npx playwright test --config e2e/playwright.config.ts --grep="severity tab" +npx playwright test --config e2e/playwright.config.ts --project=desktop + +# E2E: regenerate mock data after backend model changes (from repo root) +PYTHONPATH=. python3 web/e2e/fixtures/mock-data/generate-mock-data.py + +# Regenerate config translations from Pydantic models — outputs to +# web/public/locales/en/config/{global,cameras}.json. NEVER edit those +# JSON files by hand; change the Pydantic field title/description and +# re-run this script. (from repo root) +python3 generate_config_translations.py + +# Extract i18n keys from source into the locale files after adding +# new t() calls. Use the :ci variant to verify the locale files are +# in sync with source (fails if extraction would change anything). +npm run i18n:extract +npm run i18n:extract:ci ``` ### Docker Development @@ -371,6 +405,10 @@ except ValueError: ) ``` +## WebSocket Broadcasts + +Outbound WebSocket broadcasts go through a per-recipient classifier in `frigate/comms/ws.py` that enforces camera-level access. **The classifier is fail-closed: any topic it doesn't recognize is dropped for every client.** New outbound topics must be classified there or they'll silently disappear. + ## Project-Specific Conventions ### Configuration Files diff --git a/frigate/api/app.py b/frigate/api/app.py index 9ff24ed7e8..4fac58a715 100644 --- a/frigate/api/app.py +++ b/frigate/api/app.py @@ -774,6 +774,8 @@ def config_set(request: Request, body: AppConfigSetBody): if request.app.dispatcher is not None: request.app.dispatcher.config = config + for comm in request.app.dispatcher.comms: + comm.config = config if body.update_topic: if body.update_topic.startswith("config/cameras/"): diff --git a/frigate/comms/ws.py b/frigate/comms/ws.py index 2f16ab7141..5b555999e3 100644 --- a/frigate/comms/ws.py +++ b/frigate/comms/ws.py @@ -34,6 +34,8 @@ from frigate.const import ( UPDATE_REVIEW_DESCRIPTION, UPSERT_REVIEW_SEGMENT, ) +from frigate.models import User +from frigate.output.ws_auth import ws_has_camera_access logger = logging.getLogger(__name__) @@ -66,6 +68,7 @@ _WS_VIEWER_TOPICS = frozenset( "audioTranscriptionState", "birdseyeLayout", "embeddingsReindexProgress", + "jobState", } ) @@ -102,6 +105,321 @@ def _check_ws_authorization( return topic in _WS_VIEWER_TOPICS +# ---- Outbound filtering --------------------------------------------------- +# +# Every WebSocket broadcast is classified into one of a small set of scopes, +# then materialized per recipient. Connections with restricted roles only see +# data for cameras they are authorized to access; admin and full-access roles +# behave as today. + +# Topics that are safe to broadcast to every authenticated client. +_WS_GLOBAL_OUTBOUND_TOPICS = frozenset( + { + "model_state", + "embeddings_reindex_progress", + "audio_transcription_state", + "profile/state", + "notifications/state", + "notification_test", + } +) + +# Topics that restricted roles must never receive. Birdseye composites span +# all cameras, so the existing JSMPEG policy already restricts birdseye access +# to unrestricted roles; the layout broadcast follows the same rule. +_WS_UNRESTRICTED_ONLY_TOPICS = frozenset( + { + "birdseye_layout", + } +) + +# Topics whose payload (parsed as JSON) names a single owning camera at the +# given key path. Used to scope events, reviews, triggers, etc. +_WS_PAYLOAD_CAMERA_TOPICS: dict[str, tuple[str, ...]] = { + "events": ("after", "camera"), + "reviews": ("after", "camera"), + "tracked_object_update": ("camera",), + "triggers": ("camera",), + "camera_monitoring": ("camera",), +} + +# Topics whose payload is a dict keyed by camera name; filter keys per +# recipient. +_WS_RESHAPE_BY_CAMERA_KEY_TOPICS = frozenset( + { + "camera_activity", + "audio_detections", + } +) + +# Topics whose payload is a dict keyed by job_type, where each entry may +# contain a "camera" or "source_camera" field, or a nested ``results.jobs`` +# list of per-camera sub-jobs (export broadcasts). +_WS_RESHAPE_JOB_STATE_TOPICS = frozenset( + { + "job_state", + } +) + +# Topics whose payload mixes global aggregates with a ``cameras`` sub-dict +# keyed by camera name. Aggregates and detector data stay; per-camera entries +# are filtered. +_WS_RESHAPE_STATS_TOPICS = frozenset( + { + "stats", + } +) + + +def _collect_zone_names(config: FrigateConfig) -> set[str]: + """Return the set of all zone names defined across cameras.""" + names: set[str] = set() + for camera in config.cameras.values(): + zones = getattr(camera, "zones", None) or {} + names.update(zones.keys()) + return names + + +def _parse_json_payload(payload: Any) -> Any: + """Return payload parsed as JSON if it is a string, else as-is.""" + if isinstance(payload, str): + try: + return json.loads(payload) + except (ValueError, TypeError): + return None + return payload + + +def _scope_job_entry_to_allowed(entry: Any, allowed: set[str]) -> dict[str, Any] | None: + """Filter a single job_state entry to the recipient's allowed cameras. + + Returns the (possibly reshaped) entry, or None to drop it. Four shapes + are handled: + + * Top-level ``camera`` or ``source_camera`` (motion_search, vlm_watch, + export sub-job dicts): drop the entry if not allowed. + * Nested ``results.jobs`` list of per-camera sub-jobs (the aggregated + export broadcast): filter the list; drop the entry if nothing remains. + * Nested ``results.camera`` or ``results.source_camera`` (debug_replay, + which puts replay-specific fields inside ``results``): drop the entry + if not allowed. + * No camera anywhere (e.g. ``media_sync``): treat as global and keep. + """ + if not isinstance(entry, dict): + return None + + cam = entry.get("camera") or entry.get("source_camera") + + if cam is None: + results = entry.get("results") + if isinstance(results, dict): + sub_jobs = results.get("jobs") + if isinstance(sub_jobs, list): + filtered_jobs = [ + j + for j in sub_jobs + if isinstance(j, dict) + and (j.get("camera") or j.get("source_camera")) in allowed + ] + if not filtered_jobs: + return None + reshaped = dict(entry) + reshaped["results"] = dict(results) + reshaped["results"]["jobs"] = filtered_jobs + return reshaped + + cam = results.get("camera") or results.get("source_camera") + + if cam is not None: + return entry if cam in allowed else None + + return entry + + +def _extract_payload_camera(payload: Any, path: tuple[str, ...]) -> str | None: + """Walk the dotted path through a (possibly JSON-encoded) payload.""" + cur = _parse_json_payload(payload) + for key in path: + if not isinstance(cur, dict): + return None + cur = cur.get(key) + return cur if isinstance(cur, str) else None + + +def _classify_outbound( + topic: str, all_cameras: set[str], all_zones: set[str] +) -> tuple[str, Any]: + """Classify an outbound topic into (kind, extra). + + kind values: + - "global" : send to every authenticated client + - "drop" : send to nobody (fail-closed for unknowns) + - "unrestricted_only" : send only to admin/full-access roles + - "camera" : extra is the owning camera name + - "payload_camera" : extra is the JSON key path to the camera name + - "reshape_by_camera_key" + - "reshape_job_state" + - "reshape_stats" + """ + if topic in _WS_GLOBAL_OUTBOUND_TOPICS: + return ("global", None) + if topic in _WS_UNRESTRICTED_ONLY_TOPICS: + return ("unrestricted_only", None) + if topic in _WS_RESHAPE_BY_CAMERA_KEY_TOPICS: + return ("reshape_by_camera_key", None) + if topic in _WS_RESHAPE_JOB_STATE_TOPICS: + return ("reshape_job_state", None) + if topic in _WS_RESHAPE_STATS_TOPICS: + return ("reshape_stats", None) + if topic in _WS_PAYLOAD_CAMERA_TOPICS: + return ("payload_camera", _WS_PAYLOAD_CAMERA_TOPICS[topic]) + + # Topic-prefix based: first segment names the owning camera or zone. + first = topic.split("/", 1)[0] + if first in all_cameras: + return ("camera", first) + if first in all_zones: + # Zone aggregates span cameras; restricted users see nothing here. + return ("unrestricted_only", None) + + return ("drop", None) + + +def _ws_role_header(ws: Any) -> str | None: + """Return the HTTP_REMOTE_ROLE header value, if any.""" + environ = getattr(ws, "environ", None) + if not environ: + return None + value = environ.get("HTTP_REMOTE_ROLE") + return value if isinstance(value, str) else None + + +def _ws_valid_roles(ws: Any, config: FrigateConfig) -> list[str]: + """Return the list of recognized roles for this connection.""" + header = _ws_role_header(ws) + if not header: + return [] + roles = [r.strip() for r in header.split(config.proxy.separator) if r.strip()] + return [r for r in roles if r in config.auth.roles] + + +def _ws_is_unrestricted(ws: Any, config: FrigateConfig) -> bool: + """True when the connection has unrestricted camera access. + + Mirrors the policy in ``frigate.output.ws_auth``: admin or any role with + an empty allow-list grants full access. + """ + roles = _ws_valid_roles(ws, config) + if not roles: + return False + roles_dict = config.auth.roles + return any(r == "admin" or not roles_dict.get(r) for r in roles) + + +def _ws_allowed_cameras(ws: Any, config: FrigateConfig) -> set[str]: + """Return the union of cameras this connection may access across its roles.""" + roles = _ws_valid_roles(ws, config) + if not roles: + return set() + all_cameras = set(config.cameras.keys()) + allowed: set[str] = set() + for role in roles: + if role == "admin" or not config.auth.roles.get(role): + return all_cameras + allowed.update(User.get_allowed_cameras(role, config.auth.roles, all_cameras)) + return allowed + + +def _wrap_envelope(topic: str, inner_payload: Any) -> str: + """Re-serialize a (topic, payload) message after payload reshaping. + + Frigate's wire format keeps payloads as JSON-encoded strings inside the + outer envelope, mirroring what producers send today. + """ + return json.dumps({"topic": topic, "payload": json.dumps(inner_payload)}) + + +def _materialize_for_ws( + ws: Any, + topic: str, + full_message: str, + scope: tuple[str, Any], + parsed_payload: Any, + config: FrigateConfig, +) -> str | None: + """Return the JSON string to deliver to ``ws``, or None to skip it.""" + kind, extra = scope + has_role = _ws_role_header(ws) is not None + + if kind == "drop": + return None + + if kind == "global": + # Globals still require an authenticated connection. Missing role + # falls back to viewer semantics (matching the inbound rule). + return full_message + + # Beyond globals, an authenticated role header is required (fail-closed). + if not has_role: + return None + + if kind == "unrestricted_only": + return full_message if _ws_is_unrestricted(ws, config) else None + + if kind == "camera": + return full_message if ws_has_camera_access(ws, extra, config) else None + + if kind == "payload_camera": + camera = _extract_payload_camera(parsed_payload, extra) + if camera is None: + return None + return full_message if ws_has_camera_access(ws, camera, config) else None + + if kind == "reshape_by_camera_key": + if _ws_is_unrestricted(ws, config): + return full_message + if not isinstance(parsed_payload, dict): + return None + allowed = _ws_allowed_cameras(ws, config) + filtered = {cam: data for cam, data in parsed_payload.items() if cam in allowed} + if not filtered: + return None + return _wrap_envelope(topic, filtered) + + if kind == "reshape_job_state": + if _ws_is_unrestricted(ws, config): + return full_message + if not isinstance(parsed_payload, dict): + return None + allowed = _ws_allowed_cameras(ws, config) + filtered_jobs: dict[str, Any] = {} + for job_type, job_payload in parsed_payload.items(): + scoped = _scope_job_entry_to_allowed(job_payload, allowed) + if scoped is not None: + filtered_jobs[job_type] = scoped + if not filtered_jobs: + return None + return _wrap_envelope(topic, filtered_jobs) + + if kind == "reshape_stats": + if _ws_is_unrestricted(ws, config): + return full_message + if not isinstance(parsed_payload, dict): + return None + allowed = _ws_allowed_cameras(ws, config) + cameras_block = parsed_payload.get("cameras") + if isinstance(cameras_block, dict): + filtered_cameras = { + name: data for name, data in cameras_block.items() if name in allowed + } + reshaped = dict(parsed_payload) + reshaped["cameras"] = filtered_cameras + return _wrap_envelope(topic, reshaped) + return full_message + + return None + + class WebSocket(WebSocket_): # type: ignore[misc] def unhandled_error(self, error: Any) -> None: """ @@ -183,6 +501,10 @@ class WebSocketClient(Communicator): self.websocket_thread.start() def publish(self, topic: str, payload: Any, _: bool = False) -> None: + if self.websocket_server is None: + logger.debug("Skipping message, websocket not connected yet") + return + try: ws_message = json.dumps( { @@ -195,14 +517,42 @@ class WebSocketClient(Communicator): logger.debug(f"payload for {topic} wasn't text. Skipping...") return - if self.websocket_server is None: - logger.debug("Skipping message, websocket not connected yet") + all_cameras = set(self.config.cameras.keys()) + all_zones = _collect_zone_names(self.config) + scope = _classify_outbound(topic, all_cameras, all_zones) + + if scope[0] == "drop": return - try: - self.websocket_server.manager.broadcast(ws_message) - except ConnectionResetError: - pass + # Pre-parse payload once for topics that need to read its contents. + parsed_payload: Any = None + if scope[0] in ( + "payload_camera", + "reshape_by_camera_key", + "reshape_job_state", + "reshape_stats", + ): + parsed_payload = _parse_json_payload(payload) + if parsed_payload is None: + # malformed payload — fail closed + return + + manager = self.websocket_server.manager + with manager.lock: + websockets = list(manager.websockets.values()) + + for ws in websockets: + if getattr(ws, "terminated", False): + continue + message = _materialize_for_ws( + ws, topic, ws_message, scope, parsed_payload, self.config + ) + if message is None: + continue + try: + ws.send(message) + except (ConnectionResetError, BrokenPipeError, ValueError): + pass def stop(self) -> None: if self.websocket_server is not None: diff --git a/frigate/test/test_ws_outbound_filter.py b/frigate/test/test_ws_outbound_filter.py new file mode 100644 index 0000000000..ab1489da54 --- /dev/null +++ b/frigate/test/test_ws_outbound_filter.py @@ -0,0 +1,806 @@ +"""Tests for outbound WebSocket broadcast filtering.""" + +import json +import threading +import unittest +from types import SimpleNamespace +from typing import Any + +from frigate.comms.ws import ( + WebSocketClient, + _classify_outbound, + _collect_zone_names, + _extract_payload_camera, + _materialize_for_ws, + _ws_allowed_cameras, + _ws_is_unrestricted, +) +from frigate.config import FrigateConfig + + +def _build_config( + *, + extra_roles: dict[str, list[str]] | None = None, + extra_cameras: dict[str, dict[str, Any]] | None = None, + extra_zones: dict[str, dict[str, dict[str, Any]]] | None = None, +) -> FrigateConfig: + """Construct a FrigateConfig used by the outbound filter tests. + + The default fixture has three cameras: front_door, back_door, garage. + Restricted role "house_only" sees front_door + back_door but not garage. + """ + cameras: dict[str, dict[str, Any]] = { + "front_door": { + "ffmpeg": { + "inputs": [{"path": "rtsp://10.0.0.1:554/v", "roles": ["detect"]}], + }, + "detect": {"height": 1080, "width": 1920, "fps": 5}, + }, + "back_door": { + "ffmpeg": { + "inputs": [{"path": "rtsp://10.0.0.2:554/v", "roles": ["detect"]}], + }, + "detect": {"height": 1080, "width": 1920, "fps": 5}, + }, + "garage": { + "ffmpeg": { + "inputs": [{"path": "rtsp://10.0.0.3:554/v", "roles": ["detect"]}], + }, + "detect": {"height": 1080, "width": 1920, "fps": 5}, + }, + } + if extra_cameras: + cameras.update(extra_cameras) + if extra_zones: + for cam_name, zones in extra_zones.items(): + cameras[cam_name]["zones"] = zones + + roles = {"house_only": ["front_door", "back_door"]} + if extra_roles: + roles.update(extra_roles) + + return FrigateConfig( + mqtt={"host": "mqtt"}, + auth={"roles": roles}, + cameras=cameras, + ) + + +def _ws(role: str | None) -> Any: + """Build a fake ws4py-style websocket exposing ``environ``.""" + environ = {} if role is None else {"HTTP_REMOTE_ROLE": role} + return SimpleNamespace(environ=environ, terminated=False, sent=[]) + + +class TestClassifyOutbound(unittest.TestCase): + """The pure classifier — bucket every topic into a scope.""" + + def setUp(self): + self.config = _build_config( + extra_zones={"front_door": {"driveway": {"coordinates": "0,0,1,0,1,1,0,1"}}} + ) + self.all_cameras = set(self.config.cameras.keys()) + self.all_zones = _collect_zone_names(self.config) + + def _classify(self, topic: str) -> tuple[str, Any]: + return _classify_outbound(topic, self.all_cameras, self.all_zones) + + # --- Global allowlist --- + + def test_model_state_is_global(self): + self.assertEqual(self._classify("model_state"), ("global", None)) + + def test_profile_state_is_global(self): + self.assertEqual(self._classify("profile/state"), ("global", None)) + + def test_bare_notifications_state_is_global(self): + """The 2-segment ``notifications/state`` is global; the 3-segment + ``/notifications/state`` is camera-scoped (see below).""" + self.assertEqual(self._classify("notifications/state"), ("global", None)) + + def test_notification_test_is_global(self): + self.assertEqual(self._classify("notification_test"), ("global", None)) + + # --- Unrestricted-only --- + + def test_birdseye_layout_is_unrestricted_only(self): + self.assertEqual(self._classify("birdseye_layout"), ("unrestricted_only", None)) + + # --- Camera-prefixed --- + + def test_camera_state_topic_resolves_to_camera(self): + self.assertEqual( + self._classify("front_door/detect/state"), ("camera", "front_door") + ) + + def test_camera_motion_topic_resolves_to_camera(self): + self.assertEqual(self._classify("back_door/motion"), ("camera", "back_door")) + + def test_camera_per_notification_topic_resolves_to_camera(self): + self.assertEqual( + self._classify("front_door/notifications/state"), + ("camera", "front_door"), + ) + + def test_camera_label_counter_resolves_to_camera(self): + self.assertEqual(self._classify("front_door/person"), ("camera", "front_door")) + + def test_camera_object_mask_state_resolves_to_camera(self): + self.assertEqual( + self._classify("front_door/object_mask/zone_1/state"), + ("camera", "front_door"), + ) + + # --- Zone-prefixed --- + + def test_zone_aggregate_topic_is_unrestricted_only(self): + self.assertEqual(self._classify("driveway/person"), ("unrestricted_only", None)) + + def test_zone_all_topic_is_unrestricted_only(self): + self.assertEqual(self._classify("driveway/all"), ("unrestricted_only", None)) + + # --- Payload-camera --- + + def test_events_topic_marks_payload_camera_path(self): + self.assertEqual( + self._classify("events"), ("payload_camera", ("after", "camera")) + ) + + def test_reviews_topic_marks_payload_camera_path(self): + self.assertEqual( + self._classify("reviews"), ("payload_camera", ("after", "camera")) + ) + + def test_triggers_topic_marks_payload_camera_path(self): + self.assertEqual(self._classify("triggers"), ("payload_camera", ("camera",))) + + def test_tracked_object_update_marks_payload_camera_path(self): + self.assertEqual( + self._classify("tracked_object_update"), ("payload_camera", ("camera",)) + ) + + # --- Reshape --- + + def test_camera_activity_is_reshape_by_camera_key(self): + self.assertEqual( + self._classify("camera_activity"), ("reshape_by_camera_key", None) + ) + + def test_audio_detections_is_reshape_by_camera_key(self): + self.assertEqual( + self._classify("audio_detections"), ("reshape_by_camera_key", None) + ) + + def test_job_state_is_reshape_job_state(self): + self.assertEqual(self._classify("job_state"), ("reshape_job_state", None)) + + def test_stats_is_reshape_stats(self): + self.assertEqual(self._classify("stats"), ("reshape_stats", None)) + + # --- Fail-closed --- + + def test_unknown_topic_is_dropped(self): + self.assertEqual(self._classify("some_random_topic"), ("drop", None)) + + def test_unknown_camera_prefix_is_dropped(self): + self.assertEqual(self._classify("ghost_camera/detect/state"), ("drop", None)) + + +class TestCollectZoneNames(unittest.TestCase): + def test_zones_from_all_cameras(self): + config = _build_config( + extra_zones={ + "front_door": {"driveway": {"coordinates": "0,0,1,0,1,1,0,1"}}, + "back_door": {"yard": {"coordinates": "0,0,1,0,1,1,0,1"}}, + } + ) + self.assertEqual(_collect_zone_names(config), {"driveway", "yard"}) + + def test_no_zones_returns_empty(self): + self.assertEqual(_collect_zone_names(_build_config()), set()) + + +class TestExtractPayloadCamera(unittest.TestCase): + def test_extract_from_dict_path(self): + payload = {"after": {"camera": "front_door"}} + self.assertEqual( + _extract_payload_camera(payload, ("after", "camera")), "front_door" + ) + + def test_extract_from_json_string(self): + payload = json.dumps({"after": {"camera": "front_door"}}) + self.assertEqual( + _extract_payload_camera(payload, ("after", "camera")), "front_door" + ) + + def test_extract_single_segment_path(self): + self.assertEqual( + _extract_payload_camera({"camera": "garage"}, ("camera",)), "garage" + ) + + def test_missing_key_returns_none(self): + self.assertIsNone(_extract_payload_camera({}, ("after", "camera"))) + + def test_malformed_json_returns_none(self): + self.assertIsNone(_extract_payload_camera("not-json", ("camera",))) + + def test_non_string_camera_returns_none(self): + self.assertIsNone(_extract_payload_camera({"camera": 42}, ("camera",))) + + +class TestWsRoleHelpers(unittest.TestCase): + def setUp(self): + self.config = _build_config() + + def test_admin_is_unrestricted(self): + self.assertTrue(_ws_is_unrestricted(_ws("admin"), self.config)) + + def test_viewer_is_unrestricted(self): + self.assertTrue(_ws_is_unrestricted(_ws("viewer"), self.config)) + + def test_restricted_role_is_not_unrestricted(self): + self.assertFalse(_ws_is_unrestricted(_ws("house_only"), self.config)) + + def test_missing_role_is_not_unrestricted(self): + self.assertFalse(_ws_is_unrestricted(_ws(None), self.config)) + + def test_unknown_role_is_not_unrestricted(self): + self.assertFalse(_ws_is_unrestricted(_ws("ghost"), self.config)) + + def test_admin_allowed_cameras_is_all(self): + self.assertEqual( + _ws_allowed_cameras(_ws("admin"), self.config), + {"front_door", "back_door", "garage"}, + ) + + def test_restricted_role_allowed_cameras_is_subset(self): + self.assertEqual( + _ws_allowed_cameras(_ws("house_only"), self.config), + {"front_door", "back_door"}, + ) + + def test_missing_role_allowed_cameras_is_empty(self): + self.assertEqual(_ws_allowed_cameras(_ws(None), self.config), set()) + + def test_multi_role_union_grants_widest(self): + self.assertEqual( + _ws_allowed_cameras(_ws("house_only,admin"), self.config), + {"front_door", "back_door", "garage"}, + ) + + +class TestMaterializeForWs(unittest.TestCase): + def setUp(self): + self.config = _build_config( + extra_zones={"front_door": {"driveway": {"coordinates": "0,0,1,0,1,1,0,1"}}} + ) + self.all_cameras = set(self.config.cameras.keys()) + self.all_zones = _collect_zone_names(self.config) + + def _materialize(self, ws: Any, topic: str, payload: Any) -> str | None: + scope = _classify_outbound(topic, self.all_cameras, self.all_zones) + from frigate.comms.ws import _parse_json_payload + + parsed = ( + _parse_json_payload(payload) + if scope[0] + in ( + "payload_camera", + "reshape_by_camera_key", + "reshape_job_state", + "reshape_stats", + ) + else None + ) + full = json.dumps({"topic": topic, "payload": payload}) + return _materialize_for_ws(ws, topic, full, scope, parsed, self.config) + + # --- Globals: every authenticated client sees them --- + + def test_globals_reach_admin(self): + self.assertIsNotNone(self._materialize(_ws("admin"), "model_state", "{}")) + + def test_globals_reach_restricted(self): + self.assertIsNotNone(self._materialize(_ws("house_only"), "model_state", "{}")) + + def test_globals_reach_no_role(self): + """A missing role header still gets globals (matches viewer-default + for inbound).""" + self.assertIsNotNone(self._materialize(_ws(None), "model_state", "{}")) + + # --- Unknown topic dropped for everyone --- + + def test_unknown_topic_dropped_for_admin(self): + self.assertIsNone(self._materialize(_ws("admin"), "rogue_topic", "{}")) + + # --- Non-global topics require a role (fail-closed) --- + + def test_no_role_blocked_from_camera_topic(self): + self.assertIsNone(self._materialize(_ws(None), "front_door/detect/state", "ON")) + + def test_no_role_blocked_from_events(self): + payload = json.dumps({"after": {"camera": "front_door"}}) + self.assertIsNone(self._materialize(_ws(None), "events", payload)) + + # --- Camera-prefixed --- + + def test_restricted_role_sees_allowed_camera(self): + self.assertIsNotNone( + self._materialize(_ws("house_only"), "front_door/detect/state", "ON") + ) + + def test_restricted_role_blocked_from_unallowed_camera(self): + self.assertIsNone( + self._materialize(_ws("house_only"), "garage/detect/state", "ON") + ) + + def test_admin_sees_all_camera_topics(self): + self.assertIsNotNone( + self._materialize(_ws("admin"), "garage/detect/state", "ON") + ) + + # --- Unrestricted-only (zones, birdseye_layout) --- + + def test_zone_aggregate_blocked_for_restricted(self): + self.assertIsNone(self._materialize(_ws("house_only"), "driveway/person", 3)) + + def test_zone_aggregate_visible_to_admin(self): + self.assertIsNotNone(self._materialize(_ws("admin"), "driveway/person", 3)) + + def test_birdseye_layout_blocked_for_restricted(self): + payload = json.dumps( + {"front_door": {"x": 0, "y": 0, "width": 100, "height": 100}} + ) + self.assertIsNone( + self._materialize(_ws("house_only"), "birdseye_layout", payload) + ) + + def test_birdseye_layout_visible_to_admin(self): + payload = json.dumps( + {"front_door": {"x": 0, "y": 0, "width": 100, "height": 100}} + ) + self.assertIsNotNone( + self._materialize(_ws("admin"), "birdseye_layout", payload) + ) + + # --- Payload-camera --- + + def test_events_filtered_by_payload_camera(self): + payload = json.dumps({"after": {"camera": "garage"}}) + self.assertIsNone(self._materialize(_ws("house_only"), "events", payload)) + + payload = json.dumps({"after": {"camera": "front_door"}}) + self.assertIsNotNone(self._materialize(_ws("house_only"), "events", payload)) + + def test_events_with_missing_camera_dropped(self): + payload = json.dumps({"after": {}}) + self.assertIsNone(self._materialize(_ws("house_only"), "events", payload)) + + def test_triggers_filtered_by_payload_camera(self): + payload = json.dumps({"name": "t1", "camera": "garage"}) + self.assertIsNone(self._materialize(_ws("house_only"), "triggers", payload)) + + # --- Reshape: dict keyed by camera --- + + def test_camera_activity_filtered_to_allowed_keys(self): + payload = json.dumps( + { + "front_door": {"objects": 1}, + "back_door": {"objects": 0}, + "garage": {"objects": 2}, + } + ) + message = self._materialize(_ws("house_only"), "camera_activity", payload) + self.assertIsNotNone(message) + envelope = json.loads(message) # type: ignore[arg-type] + inner = json.loads(envelope["payload"]) + self.assertEqual(set(inner.keys()), {"front_door", "back_door"}) + self.assertNotIn("garage", inner) + + def test_camera_activity_unchanged_for_admin(self): + payload = json.dumps({"front_door": {}, "back_door": {}, "garage": {}}) + message = self._materialize(_ws("admin"), "camera_activity", payload) + envelope = json.loads(message) # type: ignore[arg-type] + self.assertEqual(envelope["payload"], payload) + + def test_camera_activity_with_no_allowed_returns_none(self): + payload = json.dumps({"garage": {"objects": 2}}) + self.assertIsNone( + self._materialize(_ws("house_only"), "camera_activity", payload) + ) + + def test_audio_detections_filtered_to_allowed_keys(self): + payload = json.dumps({"front_door": {"bark": {}}, "garage": {"speech": {}}}) + message = self._materialize(_ws("house_only"), "audio_detections", payload) + envelope = json.loads(message) # type: ignore[arg-type] + inner = json.loads(envelope["payload"]) + self.assertEqual(set(inner.keys()), {"front_door"}) + + # --- Reshape: job_state --- + + def test_job_state_admin_sees_full_payload(self): + payload = json.dumps( + { + "motion_search": {"job_type": "motion_search", "camera": "garage"}, + "media_sync": {"job_type": "media_sync"}, + } + ) + message = self._materialize(_ws("admin"), "job_state", payload) + envelope = json.loads(message) # type: ignore[arg-type] + self.assertEqual(envelope["payload"], payload) + + def test_job_state_restricted_keeps_allowed_camera_jobs(self): + """Top-level camera field on a job entry: drop if not allowed.""" + payload = json.dumps( + { + "motion_search": {"job_type": "motion_search", "camera": "front_door"}, + "vlm_watch": {"job_type": "vlm_watch", "camera": "garage"}, + } + ) + message = self._materialize(_ws("house_only"), "job_state", payload) + envelope = json.loads(message) # type: ignore[arg-type] + inner = json.loads(envelope["payload"]) + self.assertIn("motion_search", inner) + self.assertNotIn("vlm_watch", inner) + + def test_job_state_export_results_jobs_filtered_per_recipient(self): + """The aggregated export broadcast nests per-camera sub-jobs under + ``results.jobs``. Restricted users must only see allowed entries.""" + payload = json.dumps( + { + "export": { + "job_type": "export", + "status": "running", + "results": { + "jobs": [ + {"job_type": "export", "camera": "front_door", "id": "a"}, + {"job_type": "export", "camera": "garage", "id": "b"}, + {"job_type": "export", "camera": "back_door", "id": "c"}, + ] + }, + } + } + ) + message = self._materialize(_ws("house_only"), "job_state", payload) + envelope = json.loads(message) # type: ignore[arg-type] + inner = json.loads(envelope["payload"]) + self.assertIn("export", inner) + kept_cameras = [j["camera"] for j in inner["export"]["results"]["jobs"]] + self.assertEqual(kept_cameras, ["front_door", "back_door"]) + # Sibling fields like ``status`` must survive reshaping. + self.assertEqual(inner["export"]["status"], "running") + + def test_job_state_export_entry_dropped_when_no_jobs_allowed(self): + payload = json.dumps( + { + "export": { + "job_type": "export", + "status": "running", + "results": { + "jobs": [ + {"job_type": "export", "camera": "garage", "id": "b"}, + ] + }, + } + } + ) + self.assertIsNone(self._materialize(_ws("house_only"), "job_state", payload)) + + # --- Reshape: stats --- + + def _stats_payload(self) -> str: + return json.dumps( + { + "cameras": { + "front_door": {"camera_fps": 5.0, "pid": 1234}, + "back_door": {"camera_fps": 5.0, "pid": 1235}, + "garage": {"camera_fps": 5.0, "pid": 1236}, + }, + "detectors": {"cpu": {"detection_start": 0.0, "inference_speed": 10}}, + "service": {"uptime": 12345, "version": "0.16.0"}, + "camera_fps": 15.0, + "detection_fps": 6.0, + } + ) + + def test_stats_admin_sees_full_payload(self): + message = self._materialize(_ws("admin"), "stats", self._stats_payload()) + envelope = json.loads(message) # type: ignore[arg-type] + self.assertEqual(envelope["payload"], self._stats_payload()) + + def test_stats_restricted_filters_camera_keys_but_keeps_aggregates(self): + message = self._materialize(_ws("house_only"), "stats", self._stats_payload()) + envelope = json.loads(message) # type: ignore[arg-type] + inner = json.loads(envelope["payload"]) + self.assertEqual(set(inner["cameras"].keys()), {"front_door", "back_door"}) + self.assertNotIn("garage", inner["cameras"]) + # Aggregates, detectors, and service block must survive. + self.assertEqual(inner["camera_fps"], 15.0) + self.assertEqual(inner["detection_fps"], 6.0) + self.assertIn("detectors", inner) + self.assertIn("service", inner) + + def test_stats_restricted_with_no_allowed_cameras_still_sends_aggregates(self): + """A restricted role whose allow-list contains only nonexistent cameras + still gets the global aggregates and service block.""" + config = _build_config(extra_roles={"empty_role": ["nonexistent"]}) + from frigate.comms.ws import _parse_json_payload + + payload = self._stats_payload() + all_cameras = set(config.cameras.keys()) + scope = _classify_outbound("stats", all_cameras, _collect_zone_names(config)) + full = json.dumps({"topic": "stats", "payload": payload}) + message = _materialize_for_ws( + _ws("empty_role"), + "stats", + full, + scope, + _parse_json_payload(payload), + config, + ) + envelope = json.loads(message) # type: ignore[arg-type] + inner = json.loads(envelope["payload"]) + self.assertEqual(inner["cameras"], {}) + self.assertEqual(inner["camera_fps"], 15.0) + self.assertIn("service", inner) + + def test_stats_without_cameras_key_passes_through(self): + """A malformed stats payload missing the cameras sub-dict shouldn't + break delivery for restricted users — fall back to the full message.""" + payload = json.dumps({"detectors": {}, "service": {}, "detection_fps": 0.0}) + message = self._materialize(_ws("house_only"), "stats", payload) + envelope = json.loads(message) # type: ignore[arg-type] + self.assertEqual(envelope["payload"], payload) + + def test_job_state_export_entry_unchanged_for_admin(self): + payload = json.dumps( + { + "export": { + "job_type": "export", + "status": "running", + "results": { + "jobs": [ + {"job_type": "export", "camera": "garage", "id": "b"}, + ] + }, + } + } + ) + message = self._materialize(_ws("admin"), "job_state", payload) + envelope = json.loads(message) # type: ignore[arg-type] + self.assertEqual(envelope["payload"], payload) + + def test_job_state_restricted_keeps_global_jobs(self): + """media_sync has no camera field; restricted users still see it.""" + payload = json.dumps( + {"media_sync": {"job_type": "media_sync", "status": "running"}} + ) + message = self._materialize(_ws("house_only"), "job_state", payload) + envelope = json.loads(message) # type: ignore[arg-type] + inner = json.loads(envelope["payload"]) + self.assertIn("media_sync", inner) + + def test_job_state_debug_replay_nested_source_camera_filtered(self): + """debug_replay puts ``source_camera`` inside ``results`` (see + jobs/debug_replay.py:to_dict). Restricted users must not receive + entries whose nested source camera is unauthorized.""" + payload = json.dumps( + { + "debug_replay": { + "id": "bd6dc99d-a7d", + "job_type": "debug_replay", + "status": "running", + "start_time": 1.0, + "end_time": None, + "error_message": None, + "results": { + "current_step": "preparing_clip", + "progress_percent": 0.0, + "source_camera": "garage", + "replay_camera_name": "_replay_garage", + "start_ts": 0.0, + "end_ts": 1.0, + }, + } + } + ) + self.assertIsNone(self._materialize(_ws("house_only"), "job_state", payload)) + + def test_job_state_debug_replay_nested_source_camera_allowed(self): + payload = json.dumps( + { + "debug_replay": { + "id": "bd6dc99d-a7d", + "job_type": "debug_replay", + "status": "running", + "results": { + "source_camera": "front_door", + "replay_camera_name": "_replay_front_door", + }, + } + } + ) + message = self._materialize(_ws("house_only"), "job_state", payload) + envelope = json.loads(message) # type: ignore[arg-type] + inner = json.loads(envelope["payload"]) + self.assertIn("debug_replay", inner) + self.assertEqual( + inner["debug_replay"]["results"]["source_camera"], "front_door" + ) + + +class _FakeManager: + """Minimal ws4py manager: holds clients and exposes a lock.""" + + def __init__(self, clients: list[Any]) -> None: + self.lock = threading.Lock() + self.websockets = {id(c): c for c in clients} + + +class _FakeServer: + def __init__(self, manager: _FakeManager) -> None: + self.manager = manager + + +class _CapturingWs(SimpleNamespace): + """Fake ws4py client that records what was sent.""" + + def __init__(self, role: str | None) -> None: + environ = {} if role is None else {"HTTP_REMOTE_ROLE": role} + super().__init__(environ=environ, terminated=False) + self.sent: list[str] = [] + + def send(self, message: str) -> None: # noqa: D401 - matches ws4py API + self.sent.append(message) + + +class TestPublishEndToEnd(unittest.TestCase): + """Drive WebSocketClient.publish() against fake clients with different roles.""" + + def setUp(self): + self.config = _build_config( + extra_zones={"front_door": {"driveway": {"coordinates": "0,0,1,0,1,1,0,1"}}} + ) + self.admin = _CapturingWs("admin") + self.restricted = _CapturingWs("house_only") + self.anon = _CapturingWs(None) + self.client = WebSocketClient(self.config) + self.client.websocket_server = _FakeServer( + _FakeManager([self.admin, self.restricted, self.anon]) + ) + + def _payloads(self, ws: _CapturingWs) -> list[Any]: + return [json.loads(m)["payload"] for m in ws.sent] + + def test_global_topic_reaches_everyone(self): + self.client.publish("model_state", "{}") + self.assertEqual(len(self.admin.sent), 1) + self.assertEqual(len(self.restricted.sent), 1) + self.assertEqual(len(self.anon.sent), 1) + + def test_camera_topic_filters_restricted_recipient(self): + self.client.publish("garage/detect/state", "ON") + self.assertEqual(len(self.admin.sent), 1) + self.assertEqual(len(self.restricted.sent), 0) + self.assertEqual(len(self.anon.sent), 0) + + def test_camera_topic_allows_restricted_recipient_for_allowed_camera(self): + self.client.publish("front_door/detect/state", "ON") + self.assertEqual(len(self.admin.sent), 1) + self.assertEqual(len(self.restricted.sent), 1) + self.assertEqual(len(self.anon.sent), 0) + + def test_events_payload_filtered(self): + self.client.publish("events", json.dumps({"after": {"camera": "garage"}})) + self.assertEqual(len(self.admin.sent), 1) + self.assertEqual(len(self.restricted.sent), 0) + + def test_camera_activity_reshaped_per_recipient(self): + self.client.publish( + "camera_activity", + json.dumps( + { + "front_door": {"objects": 1}, + "back_door": {"objects": 0}, + "garage": {"objects": 2}, + } + ), + ) + self.assertEqual(len(self.admin.sent), 1) + admin_inner = json.loads(self._payloads(self.admin)[0]) + self.assertEqual(set(admin_inner.keys()), {"front_door", "back_door", "garage"}) + + self.assertEqual(len(self.restricted.sent), 1) + restricted_inner = json.loads(self._payloads(self.restricted)[0]) + self.assertEqual(set(restricted_inner.keys()), {"front_door", "back_door"}) + + self.assertEqual(len(self.anon.sent), 0) + + def test_birdseye_layout_blocked_for_restricted_and_anon(self): + self.client.publish( + "birdseye_layout", + json.dumps({"front_door": {"x": 0, "y": 0, "width": 1, "height": 1}}), + ) + self.assertEqual(len(self.admin.sent), 1) + self.assertEqual(len(self.restricted.sent), 0) + self.assertEqual(len(self.anon.sent), 0) + + def test_zone_aggregate_blocked_for_restricted(self): + self.client.publish("driveway/person", 2) + self.assertEqual(len(self.admin.sent), 1) + self.assertEqual(len(self.restricted.sent), 0) + + def test_stats_reshaped_per_recipient(self): + self.client.publish( + "stats", + json.dumps( + { + "cameras": { + "front_door": {"camera_fps": 5.0}, + "garage": {"camera_fps": 5.0}, + }, + "service": {"uptime": 1}, + "camera_fps": 10.0, + } + ), + ) + self.assertEqual(len(self.admin.sent), 1) + admin_inner = json.loads(self._payloads(self.admin)[0]) + self.assertEqual(set(admin_inner["cameras"].keys()), {"front_door", "garage"}) + + self.assertEqual(len(self.restricted.sent), 1) + restricted_inner = json.loads(self._payloads(self.restricted)[0]) + self.assertEqual(set(restricted_inner["cameras"].keys()), {"front_door"}) + self.assertEqual(restricted_inner["camera_fps"], 10.0) + self.assertIn("service", restricted_inner) + + # Stats requires a role; anonymous gets nothing. + self.assertEqual(len(self.anon.sent), 0) + + def test_export_job_state_filters_results_jobs_per_recipient(self): + self.client.publish( + "job_state", + json.dumps( + { + "export": { + "job_type": "export", + "status": "running", + "results": { + "jobs": [ + {"camera": "front_door", "id": "a"}, + {"camera": "garage", "id": "b"}, + ] + }, + } + } + ), + ) + self.assertEqual(len(self.admin.sent), 1) + admin_inner = json.loads(self._payloads(self.admin)[0]) + self.assertEqual( + [j["camera"] for j in admin_inner["export"]["results"]["jobs"]], + ["front_door", "garage"], + ) + + self.assertEqual(len(self.restricted.sent), 1) + restricted_inner = json.loads(self._payloads(self.restricted)[0]) + self.assertEqual( + [j["camera"] for j in restricted_inner["export"]["results"]["jobs"]], + ["front_door"], + ) + + def test_unknown_topic_dropped_for_everyone(self): + self.client.publish("some_rogue_topic", "data") + self.assertEqual(self.admin.sent, []) + self.assertEqual(self.restricted.sent, []) + self.assertEqual(self.anon.sent, []) + + def test_terminated_client_is_skipped(self): + self.restricted.terminated = True + self.client.publish("front_door/detect/state", "ON") + self.assertEqual(len(self.admin.sent), 1) + self.assertEqual(len(self.restricted.sent), 0) + + +if __name__ == "__main__": + unittest.main()