filter outbound ws broadcasts by per-recipient camera access

This commit is contained in:
Josh Hawkins 2026-05-19 14:02:35 -05:00
parent b0b00fe1d0
commit b131db2914

View File

@ -34,6 +34,8 @@ from frigate.const import (
UPDATE_REVIEW_DESCRIPTION,
UPSERT_REVIEW_SEGMENT,
)
from frigate.models import User
from frigate.output.ws_auth import ws_has_camera_access
logger = logging.getLogger(__name__)
@ -102,6 +104,320 @@ def _check_ws_authorization(
return topic in _WS_VIEWER_TOPICS
# ---- Outbound filtering ---------------------------------------------------
#
# Every WebSocket broadcast is classified into one of a small set of scopes,
# then materialized per recipient. Connections with restricted roles only see
# data for cameras they are authorized to access; admin and full-access roles
# behave as today.
# Topics that are safe to broadcast to every authenticated client.
_WS_GLOBAL_OUTBOUND_TOPICS = frozenset(
{
"model_state",
"embeddings_reindex_progress",
"audio_transcription_state",
"profile/state",
"notifications/state",
"notification_test",
}
)
# Topics that restricted roles must never receive. Birdseye composites span
# all cameras, so the existing JSMPEG policy already restricts birdseye access
# to unrestricted roles; the layout broadcast follows the same rule.
_WS_UNRESTRICTED_ONLY_TOPICS = frozenset(
{
"birdseye_layout",
}
)
# Topics whose payload (parsed as JSON) names a single owning camera at the
# given key path. Used to scope events, reviews, triggers, etc.
_WS_PAYLOAD_CAMERA_TOPICS: dict[str, tuple[str, ...]] = {
"events": ("after", "camera"),
"reviews": ("after", "camera"),
"tracked_object_update": ("camera",),
"triggers": ("camera",),
"camera_monitoring": ("camera",),
}
# Topics whose payload is a dict keyed by camera name; filter keys per
# recipient.
_WS_RESHAPE_BY_CAMERA_KEY_TOPICS = frozenset(
{
"camera_activity",
"audio_detections",
}
)
# Topics whose payload is a dict keyed by job_type, where each entry may
# contain a "camera" or "source_camera" field, or a nested ``results.jobs``
# list of per-camera sub-jobs (export broadcasts).
_WS_RESHAPE_JOB_STATE_TOPICS = frozenset(
{
"job_state",
}
)
# Topics whose payload mixes global aggregates with a ``cameras`` sub-dict
# keyed by camera name. Aggregates and detector data stay; per-camera entries
# are filtered.
_WS_RESHAPE_STATS_TOPICS = frozenset(
{
"stats",
}
)
def _collect_zone_names(config: FrigateConfig) -> set[str]:
"""Return the set of all zone names defined across cameras."""
names: set[str] = set()
for camera in config.cameras.values():
zones = getattr(camera, "zones", None) or {}
names.update(zones.keys())
return names
def _parse_json_payload(payload: Any) -> Any:
"""Return payload parsed as JSON if it is a string, else as-is."""
if isinstance(payload, str):
try:
return json.loads(payload)
except (ValueError, TypeError):
return None
return payload
def _scope_job_entry_to_allowed(entry: Any, allowed: set[str]) -> dict[str, Any] | None:
"""Filter a single job_state entry to the recipient's allowed cameras.
Returns the (possibly reshaped) entry, or None to drop it. Four shapes
are handled:
* Top-level ``camera`` or ``source_camera`` (motion_search, vlm_watch,
export sub-job dicts): drop the entry if not allowed.
* Nested ``results.jobs`` list of per-camera sub-jobs (the aggregated
export broadcast): filter the list; drop the entry if nothing remains.
* Nested ``results.camera`` or ``results.source_camera`` (debug_replay,
which puts replay-specific fields inside ``results``): drop the entry
if not allowed.
* No camera anywhere (e.g. ``media_sync``): treat as global and keep.
"""
if not isinstance(entry, dict):
return None
cam = entry.get("camera") or entry.get("source_camera")
if cam is None:
results = entry.get("results")
if isinstance(results, dict):
sub_jobs = results.get("jobs")
if isinstance(sub_jobs, list):
filtered_jobs = [
j
for j in sub_jobs
if isinstance(j, dict)
and (j.get("camera") or j.get("source_camera")) in allowed
]
if not filtered_jobs:
return None
reshaped = dict(entry)
reshaped["results"] = dict(results)
reshaped["results"]["jobs"] = filtered_jobs
return reshaped
cam = results.get("camera") or results.get("source_camera")
if cam is not None:
return entry if cam in allowed else None
return entry
def _extract_payload_camera(payload: Any, path: tuple[str, ...]) -> str | None:
"""Walk the dotted path through a (possibly JSON-encoded) payload."""
cur = _parse_json_payload(payload)
for key in path:
if not isinstance(cur, dict):
return None
cur = cur.get(key)
return cur if isinstance(cur, str) else None
def _classify_outbound(
topic: str, all_cameras: set[str], all_zones: set[str]
) -> tuple[str, Any]:
"""Classify an outbound topic into (kind, extra).
kind values:
- "global" : send to every authenticated client
- "drop" : send to nobody (fail-closed for unknowns)
- "unrestricted_only" : send only to admin/full-access roles
- "camera" : extra is the owning camera name
- "payload_camera" : extra is the JSON key path to the camera name
- "reshape_by_camera_key"
- "reshape_job_state"
- "reshape_stats"
"""
if topic in _WS_GLOBAL_OUTBOUND_TOPICS:
return ("global", None)
if topic in _WS_UNRESTRICTED_ONLY_TOPICS:
return ("unrestricted_only", None)
if topic in _WS_RESHAPE_BY_CAMERA_KEY_TOPICS:
return ("reshape_by_camera_key", None)
if topic in _WS_RESHAPE_JOB_STATE_TOPICS:
return ("reshape_job_state", None)
if topic in _WS_RESHAPE_STATS_TOPICS:
return ("reshape_stats", None)
if topic in _WS_PAYLOAD_CAMERA_TOPICS:
return ("payload_camera", _WS_PAYLOAD_CAMERA_TOPICS[topic])
# Topic-prefix based: first segment names the owning camera or zone.
first = topic.split("/", 1)[0]
if first in all_cameras:
return ("camera", first)
if first in all_zones:
# Zone aggregates span cameras; restricted users see nothing here.
return ("unrestricted_only", None)
return ("drop", None)
def _ws_role_header(ws: Any) -> str | None:
"""Return the HTTP_REMOTE_ROLE header value, if any."""
environ = getattr(ws, "environ", None)
if not environ:
return None
return environ.get("HTTP_REMOTE_ROLE")
def _ws_valid_roles(ws: Any, config: FrigateConfig) -> list[str]:
"""Return the list of recognized roles for this connection."""
header = _ws_role_header(ws)
if not header:
return []
roles = [r.strip() for r in header.split(config.proxy.separator) if r.strip()]
return [r for r in roles if r in config.auth.roles]
def _ws_is_unrestricted(ws: Any, config: FrigateConfig) -> bool:
"""True when the connection has unrestricted camera access.
Mirrors the policy in ``frigate.output.ws_auth``: admin or any role with
an empty allow-list grants full access.
"""
roles = _ws_valid_roles(ws, config)
if not roles:
return False
roles_dict = config.auth.roles
return any(r == "admin" or not roles_dict.get(r) for r in roles)
def _ws_allowed_cameras(ws: Any, config: FrigateConfig) -> set[str]:
"""Return the union of cameras this connection may access across its roles."""
roles = _ws_valid_roles(ws, config)
if not roles:
return set()
all_cameras = set(config.cameras.keys())
allowed: set[str] = set()
for role in roles:
if role == "admin" or not config.auth.roles.get(role):
return all_cameras
allowed.update(User.get_allowed_cameras(role, config.auth.roles, all_cameras))
return allowed
def _wrap_envelope(topic: str, inner_payload: Any) -> str:
"""Re-serialize a (topic, payload) message after payload reshaping.
Frigate's wire format keeps payloads as JSON-encoded strings inside the
outer envelope, mirroring what producers send today.
"""
return json.dumps({"topic": topic, "payload": json.dumps(inner_payload)})
def _materialize_for_ws(
ws: Any,
topic: str,
full_message: str,
scope: tuple[str, Any],
parsed_payload: Any,
config: FrigateConfig,
) -> str | None:
"""Return the JSON string to deliver to ``ws``, or None to skip it."""
kind, extra = scope
has_role = _ws_role_header(ws) is not None
if kind == "drop":
return None
if kind == "global":
# Globals still require an authenticated connection. Missing role
# falls back to viewer semantics (matching the inbound rule).
return full_message
# Beyond globals, an authenticated role header is required (fail-closed).
if not has_role:
return None
if kind == "unrestricted_only":
return full_message if _ws_is_unrestricted(ws, config) else None
if kind == "camera":
return full_message if ws_has_camera_access(ws, extra, config) else None
if kind == "payload_camera":
camera = _extract_payload_camera(parsed_payload, extra)
if camera is None:
return None
return full_message if ws_has_camera_access(ws, camera, config) else None
if kind == "reshape_by_camera_key":
if _ws_is_unrestricted(ws, config):
return full_message
if not isinstance(parsed_payload, dict):
return None
allowed = _ws_allowed_cameras(ws, config)
filtered = {cam: data for cam, data in parsed_payload.items() if cam in allowed}
if not filtered:
return None
return _wrap_envelope(topic, filtered)
if kind == "reshape_job_state":
if _ws_is_unrestricted(ws, config):
return full_message
if not isinstance(parsed_payload, dict):
return None
allowed = _ws_allowed_cameras(ws, config)
filtered: dict[str, Any] = {}
for job_type, job_payload in parsed_payload.items():
scoped = _scope_job_entry_to_allowed(job_payload, allowed)
if scoped is not None:
filtered[job_type] = scoped
if not filtered:
return None
return _wrap_envelope(topic, filtered)
if kind == "reshape_stats":
if _ws_is_unrestricted(ws, config):
return full_message
if not isinstance(parsed_payload, dict):
return None
allowed = _ws_allowed_cameras(ws, config)
cameras_block = parsed_payload.get("cameras")
if isinstance(cameras_block, dict):
filtered_cameras = {
name: data for name, data in cameras_block.items() if name in allowed
}
reshaped = dict(parsed_payload)
reshaped["cameras"] = filtered_cameras
return _wrap_envelope(topic, reshaped)
return full_message
return None
class WebSocket(WebSocket_): # type: ignore[misc]
def unhandled_error(self, error: Any) -> None:
"""
@ -183,6 +499,10 @@ class WebSocketClient(Communicator):
self.websocket_thread.start()
def publish(self, topic: str, payload: Any, _: bool = False) -> None:
if self.websocket_server is None:
logger.debug("Skipping message, websocket not connected yet")
return
try:
ws_message = json.dumps(
{
@ -195,14 +515,42 @@ class WebSocketClient(Communicator):
logger.debug(f"payload for {topic} wasn't text. Skipping...")
return
if self.websocket_server is None:
logger.debug("Skipping message, websocket not connected yet")
all_cameras = set(self.config.cameras.keys())
all_zones = _collect_zone_names(self.config)
scope = _classify_outbound(topic, all_cameras, all_zones)
if scope[0] == "drop":
return
try:
self.websocket_server.manager.broadcast(ws_message)
except ConnectionResetError:
pass
# Pre-parse payload once for topics that need to read its contents.
parsed_payload: Any = None
if scope[0] in (
"payload_camera",
"reshape_by_camera_key",
"reshape_job_state",
"reshape_stats",
):
parsed_payload = _parse_json_payload(payload)
if parsed_payload is None:
# malformed payload — fail closed
return
manager = self.websocket_server.manager
with manager.lock:
websockets = list(manager.websockets.values())
for ws in websockets:
if getattr(ws, "terminated", False):
continue
message = _materialize_for_ws(
ws, topic, ws_message, scope, parsed_payload, self.config
)
if message is None:
continue
try:
ws.send(message)
except (ConnectionResetError, BrokenPipeError, ValueError):
pass
def stop(self) -> None:
if self.websocket_server is not None: