From b131db2914b7a12801794b5d49327a05a4eb13f8 Mon Sep 17 00:00:00 2001 From: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com> Date: Tue, 19 May 2026 14:02:35 -0500 Subject: [PATCH] filter outbound ws broadcasts by per-recipient camera access --- frigate/comms/ws.py | 360 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 354 insertions(+), 6 deletions(-) diff --git a/frigate/comms/ws.py b/frigate/comms/ws.py index 2f16ab7141..12de545fdc 100644 --- a/frigate/comms/ws.py +++ b/frigate/comms/ws.py @@ -34,6 +34,8 @@ from frigate.const import ( UPDATE_REVIEW_DESCRIPTION, UPSERT_REVIEW_SEGMENT, ) +from frigate.models import User +from frigate.output.ws_auth import ws_has_camera_access logger = logging.getLogger(__name__) @@ -102,6 +104,320 @@ def _check_ws_authorization( return topic in _WS_VIEWER_TOPICS +# ---- Outbound filtering --------------------------------------------------- +# +# Every WebSocket broadcast is classified into one of a small set of scopes, +# then materialized per recipient. Connections with restricted roles only see +# data for cameras they are authorized to access; admin and full-access roles +# behave as today. + +# Topics that are safe to broadcast to every authenticated client. +_WS_GLOBAL_OUTBOUND_TOPICS = frozenset( + { + "model_state", + "embeddings_reindex_progress", + "audio_transcription_state", + "profile/state", + "notifications/state", + "notification_test", + } +) + +# Topics that restricted roles must never receive. Birdseye composites span +# all cameras, so the existing JSMPEG policy already restricts birdseye access +# to unrestricted roles; the layout broadcast follows the same rule. +_WS_UNRESTRICTED_ONLY_TOPICS = frozenset( + { + "birdseye_layout", + } +) + +# Topics whose payload (parsed as JSON) names a single owning camera at the +# given key path. Used to scope events, reviews, triggers, etc. +_WS_PAYLOAD_CAMERA_TOPICS: dict[str, tuple[str, ...]] = { + "events": ("after", "camera"), + "reviews": ("after", "camera"), + "tracked_object_update": ("camera",), + "triggers": ("camera",), + "camera_monitoring": ("camera",), +} + +# Topics whose payload is a dict keyed by camera name; filter keys per +# recipient. +_WS_RESHAPE_BY_CAMERA_KEY_TOPICS = frozenset( + { + "camera_activity", + "audio_detections", + } +) + +# Topics whose payload is a dict keyed by job_type, where each entry may +# contain a "camera" or "source_camera" field, or a nested ``results.jobs`` +# list of per-camera sub-jobs (export broadcasts). +_WS_RESHAPE_JOB_STATE_TOPICS = frozenset( + { + "job_state", + } +) + +# Topics whose payload mixes global aggregates with a ``cameras`` sub-dict +# keyed by camera name. Aggregates and detector data stay; per-camera entries +# are filtered. +_WS_RESHAPE_STATS_TOPICS = frozenset( + { + "stats", + } +) + + +def _collect_zone_names(config: FrigateConfig) -> set[str]: + """Return the set of all zone names defined across cameras.""" + names: set[str] = set() + for camera in config.cameras.values(): + zones = getattr(camera, "zones", None) or {} + names.update(zones.keys()) + return names + + +def _parse_json_payload(payload: Any) -> Any: + """Return payload parsed as JSON if it is a string, else as-is.""" + if isinstance(payload, str): + try: + return json.loads(payload) + except (ValueError, TypeError): + return None + return payload + + +def _scope_job_entry_to_allowed(entry: Any, allowed: set[str]) -> dict[str, Any] | None: + """Filter a single job_state entry to the recipient's allowed cameras. + + Returns the (possibly reshaped) entry, or None to drop it. Four shapes + are handled: + + * Top-level ``camera`` or ``source_camera`` (motion_search, vlm_watch, + export sub-job dicts): drop the entry if not allowed. + * Nested ``results.jobs`` list of per-camera sub-jobs (the aggregated + export broadcast): filter the list; drop the entry if nothing remains. + * Nested ``results.camera`` or ``results.source_camera`` (debug_replay, + which puts replay-specific fields inside ``results``): drop the entry + if not allowed. + * No camera anywhere (e.g. ``media_sync``): treat as global and keep. + """ + if not isinstance(entry, dict): + return None + + cam = entry.get("camera") or entry.get("source_camera") + + if cam is None: + results = entry.get("results") + if isinstance(results, dict): + sub_jobs = results.get("jobs") + if isinstance(sub_jobs, list): + filtered_jobs = [ + j + for j in sub_jobs + if isinstance(j, dict) + and (j.get("camera") or j.get("source_camera")) in allowed + ] + if not filtered_jobs: + return None + reshaped = dict(entry) + reshaped["results"] = dict(results) + reshaped["results"]["jobs"] = filtered_jobs + return reshaped + + cam = results.get("camera") or results.get("source_camera") + + if cam is not None: + return entry if cam in allowed else None + + return entry + + +def _extract_payload_camera(payload: Any, path: tuple[str, ...]) -> str | None: + """Walk the dotted path through a (possibly JSON-encoded) payload.""" + cur = _parse_json_payload(payload) + for key in path: + if not isinstance(cur, dict): + return None + cur = cur.get(key) + return cur if isinstance(cur, str) else None + + +def _classify_outbound( + topic: str, all_cameras: set[str], all_zones: set[str] +) -> tuple[str, Any]: + """Classify an outbound topic into (kind, extra). + + kind values: + - "global" : send to every authenticated client + - "drop" : send to nobody (fail-closed for unknowns) + - "unrestricted_only" : send only to admin/full-access roles + - "camera" : extra is the owning camera name + - "payload_camera" : extra is the JSON key path to the camera name + - "reshape_by_camera_key" + - "reshape_job_state" + - "reshape_stats" + """ + if topic in _WS_GLOBAL_OUTBOUND_TOPICS: + return ("global", None) + if topic in _WS_UNRESTRICTED_ONLY_TOPICS: + return ("unrestricted_only", None) + if topic in _WS_RESHAPE_BY_CAMERA_KEY_TOPICS: + return ("reshape_by_camera_key", None) + if topic in _WS_RESHAPE_JOB_STATE_TOPICS: + return ("reshape_job_state", None) + if topic in _WS_RESHAPE_STATS_TOPICS: + return ("reshape_stats", None) + if topic in _WS_PAYLOAD_CAMERA_TOPICS: + return ("payload_camera", _WS_PAYLOAD_CAMERA_TOPICS[topic]) + + # Topic-prefix based: first segment names the owning camera or zone. + first = topic.split("/", 1)[0] + if first in all_cameras: + return ("camera", first) + if first in all_zones: + # Zone aggregates span cameras; restricted users see nothing here. + return ("unrestricted_only", None) + + return ("drop", None) + + +def _ws_role_header(ws: Any) -> str | None: + """Return the HTTP_REMOTE_ROLE header value, if any.""" + environ = getattr(ws, "environ", None) + if not environ: + return None + return environ.get("HTTP_REMOTE_ROLE") + + +def _ws_valid_roles(ws: Any, config: FrigateConfig) -> list[str]: + """Return the list of recognized roles for this connection.""" + header = _ws_role_header(ws) + if not header: + return [] + roles = [r.strip() for r in header.split(config.proxy.separator) if r.strip()] + return [r for r in roles if r in config.auth.roles] + + +def _ws_is_unrestricted(ws: Any, config: FrigateConfig) -> bool: + """True when the connection has unrestricted camera access. + + Mirrors the policy in ``frigate.output.ws_auth``: admin or any role with + an empty allow-list grants full access. + """ + roles = _ws_valid_roles(ws, config) + if not roles: + return False + roles_dict = config.auth.roles + return any(r == "admin" or not roles_dict.get(r) for r in roles) + + +def _ws_allowed_cameras(ws: Any, config: FrigateConfig) -> set[str]: + """Return the union of cameras this connection may access across its roles.""" + roles = _ws_valid_roles(ws, config) + if not roles: + return set() + all_cameras = set(config.cameras.keys()) + allowed: set[str] = set() + for role in roles: + if role == "admin" or not config.auth.roles.get(role): + return all_cameras + allowed.update(User.get_allowed_cameras(role, config.auth.roles, all_cameras)) + return allowed + + +def _wrap_envelope(topic: str, inner_payload: Any) -> str: + """Re-serialize a (topic, payload) message after payload reshaping. + + Frigate's wire format keeps payloads as JSON-encoded strings inside the + outer envelope, mirroring what producers send today. + """ + return json.dumps({"topic": topic, "payload": json.dumps(inner_payload)}) + + +def _materialize_for_ws( + ws: Any, + topic: str, + full_message: str, + scope: tuple[str, Any], + parsed_payload: Any, + config: FrigateConfig, +) -> str | None: + """Return the JSON string to deliver to ``ws``, or None to skip it.""" + kind, extra = scope + has_role = _ws_role_header(ws) is not None + + if kind == "drop": + return None + + if kind == "global": + # Globals still require an authenticated connection. Missing role + # falls back to viewer semantics (matching the inbound rule). + return full_message + + # Beyond globals, an authenticated role header is required (fail-closed). + if not has_role: + return None + + if kind == "unrestricted_only": + return full_message if _ws_is_unrestricted(ws, config) else None + + if kind == "camera": + return full_message if ws_has_camera_access(ws, extra, config) else None + + if kind == "payload_camera": + camera = _extract_payload_camera(parsed_payload, extra) + if camera is None: + return None + return full_message if ws_has_camera_access(ws, camera, config) else None + + if kind == "reshape_by_camera_key": + if _ws_is_unrestricted(ws, config): + return full_message + if not isinstance(parsed_payload, dict): + return None + allowed = _ws_allowed_cameras(ws, config) + filtered = {cam: data for cam, data in parsed_payload.items() if cam in allowed} + if not filtered: + return None + return _wrap_envelope(topic, filtered) + + if kind == "reshape_job_state": + if _ws_is_unrestricted(ws, config): + return full_message + if not isinstance(parsed_payload, dict): + return None + allowed = _ws_allowed_cameras(ws, config) + filtered: dict[str, Any] = {} + for job_type, job_payload in parsed_payload.items(): + scoped = _scope_job_entry_to_allowed(job_payload, allowed) + if scoped is not None: + filtered[job_type] = scoped + if not filtered: + return None + return _wrap_envelope(topic, filtered) + + if kind == "reshape_stats": + if _ws_is_unrestricted(ws, config): + return full_message + if not isinstance(parsed_payload, dict): + return None + allowed = _ws_allowed_cameras(ws, config) + cameras_block = parsed_payload.get("cameras") + if isinstance(cameras_block, dict): + filtered_cameras = { + name: data for name, data in cameras_block.items() if name in allowed + } + reshaped = dict(parsed_payload) + reshaped["cameras"] = filtered_cameras + return _wrap_envelope(topic, reshaped) + return full_message + + return None + + class WebSocket(WebSocket_): # type: ignore[misc] def unhandled_error(self, error: Any) -> None: """ @@ -183,6 +499,10 @@ class WebSocketClient(Communicator): self.websocket_thread.start() def publish(self, topic: str, payload: Any, _: bool = False) -> None: + if self.websocket_server is None: + logger.debug("Skipping message, websocket not connected yet") + return + try: ws_message = json.dumps( { @@ -195,14 +515,42 @@ class WebSocketClient(Communicator): logger.debug(f"payload for {topic} wasn't text. Skipping...") return - if self.websocket_server is None: - logger.debug("Skipping message, websocket not connected yet") + all_cameras = set(self.config.cameras.keys()) + all_zones = _collect_zone_names(self.config) + scope = _classify_outbound(topic, all_cameras, all_zones) + + if scope[0] == "drop": return - try: - self.websocket_server.manager.broadcast(ws_message) - except ConnectionResetError: - pass + # Pre-parse payload once for topics that need to read its contents. + parsed_payload: Any = None + if scope[0] in ( + "payload_camera", + "reshape_by_camera_key", + "reshape_job_state", + "reshape_stats", + ): + parsed_payload = _parse_json_payload(payload) + if parsed_payload is None: + # malformed payload — fail closed + return + + manager = self.websocket_server.manager + with manager.lock: + websockets = list(manager.websockets.values()) + + for ws in websockets: + if getattr(ws, "terminated", False): + continue + message = _materialize_for_ws( + ws, topic, ws_message, scope, parsed_payload, self.config + ) + if message is None: + continue + try: + ws.send(message) + except (ConnectionResetError, BrokenPipeError, ValueError): + pass def stop(self) -> None: if self.websocket_server is not None: