mirror of
https://github.com/blakeblackshear/frigate.git
synced 2026-06-26 14:21:53 +03:00
* filter outbound ws broadcasts by per-recipient camera access * fan out config updates to comms * tests * mypy * allow viewers to use jobstate * update agent instructions * remove vitest
566 lines
19 KiB
Python
566 lines
19 KiB
Python
"""Websocket communicator."""
|
|
|
|
import errno
|
|
import json
|
|
import logging
|
|
import threading
|
|
from typing import Any, Callable
|
|
from wsgiref.simple_server import make_server
|
|
|
|
from ws4py.server.wsgirefserver import (
|
|
WebSocketWSGIHandler,
|
|
WebSocketWSGIRequestHandler,
|
|
WSGIServer,
|
|
)
|
|
from ws4py.server.wsgiutils import WebSocketWSGIApplication
|
|
from ws4py.websocket import WebSocket as WebSocket_
|
|
|
|
from frigate.comms.base_communicator import Communicator
|
|
from frigate.config import FrigateConfig
|
|
from frigate.const import (
|
|
CLEAR_ONGOING_REVIEW_SEGMENTS,
|
|
EXPIRE_AUDIO_ACTIVITY,
|
|
INSERT_MANY_RECORDINGS,
|
|
INSERT_PREVIEW,
|
|
NOTIFICATION_TEST,
|
|
REQUEST_REGION_GRID,
|
|
UPDATE_AUDIO_ACTIVITY,
|
|
UPDATE_AUDIO_TRANSCRIPTION_STATE,
|
|
UPDATE_BIRDSEYE_LAYOUT,
|
|
UPDATE_CAMERA_ACTIVITY,
|
|
UPDATE_EMBEDDINGS_REINDEX_PROGRESS,
|
|
UPDATE_EVENT_DESCRIPTION,
|
|
UPDATE_MODEL_STATE,
|
|
UPDATE_REVIEW_DESCRIPTION,
|
|
UPSERT_REVIEW_SEGMENT,
|
|
)
|
|
from frigate.models import User
|
|
from frigate.output.ws_auth import ws_has_camera_access
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Internal IPC topics — NEVER allowed from WebSocket, regardless of role
|
|
_WS_BLOCKED_TOPICS = frozenset(
|
|
{
|
|
INSERT_MANY_RECORDINGS,
|
|
INSERT_PREVIEW,
|
|
REQUEST_REGION_GRID,
|
|
UPSERT_REVIEW_SEGMENT,
|
|
CLEAR_ONGOING_REVIEW_SEGMENTS,
|
|
UPDATE_CAMERA_ACTIVITY,
|
|
UPDATE_AUDIO_ACTIVITY,
|
|
EXPIRE_AUDIO_ACTIVITY,
|
|
UPDATE_EVENT_DESCRIPTION,
|
|
UPDATE_REVIEW_DESCRIPTION,
|
|
UPDATE_MODEL_STATE,
|
|
UPDATE_EMBEDDINGS_REINDEX_PROGRESS,
|
|
UPDATE_BIRDSEYE_LAYOUT,
|
|
UPDATE_AUDIO_TRANSCRIPTION_STATE,
|
|
NOTIFICATION_TEST,
|
|
}
|
|
)
|
|
|
|
# Read-only topics any authenticated user (including viewer) can send
|
|
_WS_VIEWER_TOPICS = frozenset(
|
|
{
|
|
"onConnect",
|
|
"modelState",
|
|
"audioTranscriptionState",
|
|
"birdseyeLayout",
|
|
"embeddingsReindexProgress",
|
|
"jobState",
|
|
}
|
|
)
|
|
|
|
|
|
def _check_ws_authorization(
|
|
topic: str,
|
|
role_header: str | None,
|
|
separator: str,
|
|
) -> bool:
|
|
"""Check if a WebSocket message is authorized.
|
|
|
|
Args:
|
|
topic: The message topic.
|
|
role_header: The HTTP_REMOTE_ROLE header value, or None.
|
|
separator: The role separator character from proxy config.
|
|
|
|
Returns:
|
|
True if authorized, False if blocked.
|
|
"""
|
|
# Block IPC-only topics unconditionally
|
|
if topic in _WS_BLOCKED_TOPICS:
|
|
return False
|
|
|
|
# No role header: default to viewer (fail-closed)
|
|
if role_header is None:
|
|
return topic in _WS_VIEWER_TOPICS
|
|
|
|
# Check if any role is admin
|
|
roles = [r.strip() for r in role_header.split(separator)]
|
|
if "admin" in roles:
|
|
return True
|
|
|
|
# Non-admin: only viewer topics allowed
|
|
return topic in _WS_VIEWER_TOPICS
|
|
|
|
|
|
# ---- Outbound filtering ---------------------------------------------------
|
|
#
|
|
# Every WebSocket broadcast is classified into one of a small set of scopes,
|
|
# then materialized per recipient. Connections with restricted roles only see
|
|
# data for cameras they are authorized to access; admin and full-access roles
|
|
# behave as today.
|
|
|
|
# Topics that are safe to broadcast to every authenticated client.
|
|
_WS_GLOBAL_OUTBOUND_TOPICS = frozenset(
|
|
{
|
|
"model_state",
|
|
"embeddings_reindex_progress",
|
|
"audio_transcription_state",
|
|
"profile/state",
|
|
"notifications/state",
|
|
"notification_test",
|
|
}
|
|
)
|
|
|
|
# Topics that restricted roles must never receive. Birdseye composites span
|
|
# all cameras, so the existing JSMPEG policy already restricts birdseye access
|
|
# to unrestricted roles; the layout broadcast follows the same rule.
|
|
_WS_UNRESTRICTED_ONLY_TOPICS = frozenset(
|
|
{
|
|
"birdseye_layout",
|
|
}
|
|
)
|
|
|
|
# Topics whose payload (parsed as JSON) names a single owning camera at the
|
|
# given key path. Used to scope events, reviews, triggers, etc.
|
|
_WS_PAYLOAD_CAMERA_TOPICS: dict[str, tuple[str, ...]] = {
|
|
"events": ("after", "camera"),
|
|
"reviews": ("after", "camera"),
|
|
"tracked_object_update": ("camera",),
|
|
"triggers": ("camera",),
|
|
"camera_monitoring": ("camera",),
|
|
}
|
|
|
|
# Topics whose payload is a dict keyed by camera name; filter keys per
|
|
# recipient.
|
|
_WS_RESHAPE_BY_CAMERA_KEY_TOPICS = frozenset(
|
|
{
|
|
"camera_activity",
|
|
"audio_detections",
|
|
}
|
|
)
|
|
|
|
# Topics whose payload is a dict keyed by job_type, where each entry may
|
|
# contain a "camera" or "source_camera" field, or a nested ``results.jobs``
|
|
# list of per-camera sub-jobs (export broadcasts).
|
|
_WS_RESHAPE_JOB_STATE_TOPICS = frozenset(
|
|
{
|
|
"job_state",
|
|
}
|
|
)
|
|
|
|
# Topics whose payload mixes global aggregates with a ``cameras`` sub-dict
|
|
# keyed by camera name. Aggregates and detector data stay; per-camera entries
|
|
# are filtered.
|
|
_WS_RESHAPE_STATS_TOPICS = frozenset(
|
|
{
|
|
"stats",
|
|
}
|
|
)
|
|
|
|
|
|
def _collect_zone_names(config: FrigateConfig) -> set[str]:
|
|
"""Return the set of all zone names defined across cameras."""
|
|
names: set[str] = set()
|
|
for camera in config.cameras.values():
|
|
zones = getattr(camera, "zones", None) or {}
|
|
names.update(zones.keys())
|
|
return names
|
|
|
|
|
|
def _parse_json_payload(payload: Any) -> Any:
|
|
"""Return payload parsed as JSON if it is a string, else as-is."""
|
|
if isinstance(payload, str):
|
|
try:
|
|
return json.loads(payload)
|
|
except (ValueError, TypeError):
|
|
return None
|
|
return payload
|
|
|
|
|
|
def _scope_job_entry_to_allowed(entry: Any, allowed: set[str]) -> dict[str, Any] | None:
|
|
"""Filter a single job_state entry to the recipient's allowed cameras.
|
|
|
|
Returns the (possibly reshaped) entry, or None to drop it. Four shapes
|
|
are handled:
|
|
|
|
* Top-level ``camera`` or ``source_camera`` (motion_search, vlm_watch,
|
|
export sub-job dicts): drop the entry if not allowed.
|
|
* Nested ``results.jobs`` list of per-camera sub-jobs (the aggregated
|
|
export broadcast): filter the list; drop the entry if nothing remains.
|
|
* Nested ``results.camera`` or ``results.source_camera`` (debug_replay,
|
|
which puts replay-specific fields inside ``results``): drop the entry
|
|
if not allowed.
|
|
* No camera anywhere (e.g. ``media_sync``): treat as global and keep.
|
|
"""
|
|
if not isinstance(entry, dict):
|
|
return None
|
|
|
|
cam = entry.get("camera") or entry.get("source_camera")
|
|
|
|
if cam is None:
|
|
results = entry.get("results")
|
|
if isinstance(results, dict):
|
|
sub_jobs = results.get("jobs")
|
|
if isinstance(sub_jobs, list):
|
|
filtered_jobs = [
|
|
j
|
|
for j in sub_jobs
|
|
if isinstance(j, dict)
|
|
and (j.get("camera") or j.get("source_camera")) in allowed
|
|
]
|
|
if not filtered_jobs:
|
|
return None
|
|
reshaped = dict(entry)
|
|
reshaped["results"] = dict(results)
|
|
reshaped["results"]["jobs"] = filtered_jobs
|
|
return reshaped
|
|
|
|
cam = results.get("camera") or results.get("source_camera")
|
|
|
|
if cam is not None:
|
|
return entry if cam in allowed else None
|
|
|
|
return entry
|
|
|
|
|
|
def _extract_payload_camera(payload: Any, path: tuple[str, ...]) -> str | None:
|
|
"""Walk the dotted path through a (possibly JSON-encoded) payload."""
|
|
cur = _parse_json_payload(payload)
|
|
for key in path:
|
|
if not isinstance(cur, dict):
|
|
return None
|
|
cur = cur.get(key)
|
|
return cur if isinstance(cur, str) else None
|
|
|
|
|
|
def _classify_outbound(
|
|
topic: str, all_cameras: set[str], all_zones: set[str]
|
|
) -> tuple[str, Any]:
|
|
"""Classify an outbound topic into (kind, extra).
|
|
|
|
kind values:
|
|
- "global" : send to every authenticated client
|
|
- "drop" : send to nobody (fail-closed for unknowns)
|
|
- "unrestricted_only" : send only to admin/full-access roles
|
|
- "camera" : extra is the owning camera name
|
|
- "payload_camera" : extra is the JSON key path to the camera name
|
|
- "reshape_by_camera_key"
|
|
- "reshape_job_state"
|
|
- "reshape_stats"
|
|
"""
|
|
if topic in _WS_GLOBAL_OUTBOUND_TOPICS:
|
|
return ("global", None)
|
|
if topic in _WS_UNRESTRICTED_ONLY_TOPICS:
|
|
return ("unrestricted_only", None)
|
|
if topic in _WS_RESHAPE_BY_CAMERA_KEY_TOPICS:
|
|
return ("reshape_by_camera_key", None)
|
|
if topic in _WS_RESHAPE_JOB_STATE_TOPICS:
|
|
return ("reshape_job_state", None)
|
|
if topic in _WS_RESHAPE_STATS_TOPICS:
|
|
return ("reshape_stats", None)
|
|
if topic in _WS_PAYLOAD_CAMERA_TOPICS:
|
|
return ("payload_camera", _WS_PAYLOAD_CAMERA_TOPICS[topic])
|
|
|
|
# Topic-prefix based: first segment names the owning camera or zone.
|
|
first = topic.split("/", 1)[0]
|
|
if first in all_cameras:
|
|
return ("camera", first)
|
|
if first in all_zones:
|
|
# Zone aggregates span cameras; restricted users see nothing here.
|
|
return ("unrestricted_only", None)
|
|
|
|
return ("drop", None)
|
|
|
|
|
|
def _ws_role_header(ws: Any) -> str | None:
|
|
"""Return the HTTP_REMOTE_ROLE header value, if any."""
|
|
environ = getattr(ws, "environ", None)
|
|
if not environ:
|
|
return None
|
|
value = environ.get("HTTP_REMOTE_ROLE")
|
|
return value if isinstance(value, str) else None
|
|
|
|
|
|
def _ws_valid_roles(ws: Any, config: FrigateConfig) -> list[str]:
|
|
"""Return the list of recognized roles for this connection."""
|
|
header = _ws_role_header(ws)
|
|
if not header:
|
|
return []
|
|
roles = [r.strip() for r in header.split(config.proxy.separator) if r.strip()]
|
|
return [r for r in roles if r in config.auth.roles]
|
|
|
|
|
|
def _ws_is_unrestricted(ws: Any, config: FrigateConfig) -> bool:
|
|
"""True when the connection has unrestricted camera access.
|
|
|
|
Mirrors the policy in ``frigate.output.ws_auth``: admin or any role with
|
|
an empty allow-list grants full access.
|
|
"""
|
|
roles = _ws_valid_roles(ws, config)
|
|
if not roles:
|
|
return False
|
|
roles_dict = config.auth.roles
|
|
return any(r == "admin" or not roles_dict.get(r) for r in roles)
|
|
|
|
|
|
def _ws_allowed_cameras(ws: Any, config: FrigateConfig) -> set[str]:
|
|
"""Return the union of cameras this connection may access across its roles."""
|
|
roles = _ws_valid_roles(ws, config)
|
|
if not roles:
|
|
return set()
|
|
all_cameras = set(config.cameras.keys())
|
|
allowed: set[str] = set()
|
|
for role in roles:
|
|
if role == "admin" or not config.auth.roles.get(role):
|
|
return all_cameras
|
|
allowed.update(User.get_allowed_cameras(role, config.auth.roles, all_cameras))
|
|
return allowed
|
|
|
|
|
|
def _wrap_envelope(topic: str, inner_payload: Any) -> str:
|
|
"""Re-serialize a (topic, payload) message after payload reshaping.
|
|
|
|
Frigate's wire format keeps payloads as JSON-encoded strings inside the
|
|
outer envelope, mirroring what producers send today.
|
|
"""
|
|
return json.dumps({"topic": topic, "payload": json.dumps(inner_payload)})
|
|
|
|
|
|
def _materialize_for_ws(
|
|
ws: Any,
|
|
topic: str,
|
|
full_message: str,
|
|
scope: tuple[str, Any],
|
|
parsed_payload: Any,
|
|
config: FrigateConfig,
|
|
) -> str | None:
|
|
"""Return the JSON string to deliver to ``ws``, or None to skip it."""
|
|
kind, extra = scope
|
|
has_role = _ws_role_header(ws) is not None
|
|
|
|
if kind == "drop":
|
|
return None
|
|
|
|
if kind == "global":
|
|
# Globals still require an authenticated connection. Missing role
|
|
# falls back to viewer semantics (matching the inbound rule).
|
|
return full_message
|
|
|
|
# Beyond globals, an authenticated role header is required (fail-closed).
|
|
if not has_role:
|
|
return None
|
|
|
|
if kind == "unrestricted_only":
|
|
return full_message if _ws_is_unrestricted(ws, config) else None
|
|
|
|
if kind == "camera":
|
|
return full_message if ws_has_camera_access(ws, extra, config) else None
|
|
|
|
if kind == "payload_camera":
|
|
camera = _extract_payload_camera(parsed_payload, extra)
|
|
if camera is None:
|
|
return None
|
|
return full_message if ws_has_camera_access(ws, camera, config) else None
|
|
|
|
if kind == "reshape_by_camera_key":
|
|
if _ws_is_unrestricted(ws, config):
|
|
return full_message
|
|
if not isinstance(parsed_payload, dict):
|
|
return None
|
|
allowed = _ws_allowed_cameras(ws, config)
|
|
filtered = {cam: data for cam, data in parsed_payload.items() if cam in allowed}
|
|
if not filtered:
|
|
return None
|
|
return _wrap_envelope(topic, filtered)
|
|
|
|
if kind == "reshape_job_state":
|
|
if _ws_is_unrestricted(ws, config):
|
|
return full_message
|
|
if not isinstance(parsed_payload, dict):
|
|
return None
|
|
allowed = _ws_allowed_cameras(ws, config)
|
|
filtered_jobs: dict[str, Any] = {}
|
|
for job_type, job_payload in parsed_payload.items():
|
|
scoped = _scope_job_entry_to_allowed(job_payload, allowed)
|
|
if scoped is not None:
|
|
filtered_jobs[job_type] = scoped
|
|
if not filtered_jobs:
|
|
return None
|
|
return _wrap_envelope(topic, filtered_jobs)
|
|
|
|
if kind == "reshape_stats":
|
|
if _ws_is_unrestricted(ws, config):
|
|
return full_message
|
|
if not isinstance(parsed_payload, dict):
|
|
return None
|
|
allowed = _ws_allowed_cameras(ws, config)
|
|
cameras_block = parsed_payload.get("cameras")
|
|
if isinstance(cameras_block, dict):
|
|
filtered_cameras = {
|
|
name: data for name, data in cameras_block.items() if name in allowed
|
|
}
|
|
reshaped = dict(parsed_payload)
|
|
reshaped["cameras"] = filtered_cameras
|
|
return _wrap_envelope(topic, reshaped)
|
|
return full_message
|
|
|
|
return None
|
|
|
|
|
|
class WebSocket(WebSocket_): # type: ignore[misc]
|
|
def unhandled_error(self, error: Any) -> None:
|
|
"""
|
|
Handles the unfriendly socket closures on the server side
|
|
without showing a confusing error message
|
|
"""
|
|
if hasattr(error, "errno") and error.errno == errno.ECONNRESET:
|
|
pass
|
|
else:
|
|
logging.getLogger("ws4py").exception("Failed to receive data")
|
|
|
|
|
|
class WebSocketClient(Communicator):
|
|
"""Frigate wrapper for ws client."""
|
|
|
|
def __init__(self, config: FrigateConfig) -> None:
|
|
self.config = config
|
|
self.websocket_server: WSGIServer | None = None
|
|
|
|
def subscribe(self, receiver: Callable) -> None:
|
|
self._dispatcher = receiver
|
|
self.start()
|
|
|
|
def start(self) -> None:
|
|
"""Start the websocket client."""
|
|
|
|
class _WebSocketHandler(WebSocket):
|
|
receiver = self._dispatcher
|
|
role_separator = self.config.proxy.separator or ","
|
|
|
|
def received_message(self, message: WebSocket.received_message) -> None: # type: ignore[name-defined]
|
|
try:
|
|
json_message = json.loads(message.data.decode("utf-8"))
|
|
json_message = {
|
|
"topic": json_message.get("topic"),
|
|
"payload": json_message.get("payload"),
|
|
}
|
|
except Exception:
|
|
logger.warning(
|
|
f"Unable to parse websocket message as valid json: {message.data.decode('utf-8')}"
|
|
)
|
|
return
|
|
|
|
topic = json_message["topic"]
|
|
|
|
# Authorization check (skip when environ is None — direct internal connection)
|
|
role_header = (
|
|
self.environ.get("HTTP_REMOTE_ROLE") if self.environ else None
|
|
)
|
|
if self.environ is not None and not _check_ws_authorization(
|
|
topic, role_header, self.role_separator
|
|
):
|
|
logger.warning(
|
|
"Blocked unauthorized WebSocket message: topic=%s, role=%s",
|
|
topic,
|
|
role_header,
|
|
)
|
|
return
|
|
|
|
logger.debug(f"Publishing mqtt message from websockets at {topic}.")
|
|
self.receiver(
|
|
topic,
|
|
json_message["payload"],
|
|
)
|
|
|
|
# start a websocket server on 5002
|
|
WebSocketWSGIHandler.http_version = "1.1"
|
|
self.websocket_server = make_server(
|
|
"127.0.0.1",
|
|
5002,
|
|
server_class=WSGIServer,
|
|
handler_class=WebSocketWSGIRequestHandler,
|
|
app=WebSocketWSGIApplication(handler_cls=_WebSocketHandler),
|
|
)
|
|
self.websocket_server.initialize_websockets_manager()
|
|
self.websocket_thread = threading.Thread(
|
|
target=self.websocket_server.serve_forever
|
|
)
|
|
self.websocket_thread.start()
|
|
|
|
def publish(self, topic: str, payload: Any, _: bool = False) -> None:
|
|
if self.websocket_server is None:
|
|
logger.debug("Skipping message, websocket not connected yet")
|
|
return
|
|
|
|
try:
|
|
ws_message = json.dumps(
|
|
{
|
|
"topic": topic,
|
|
"payload": payload,
|
|
}
|
|
)
|
|
except Exception:
|
|
# if the payload can't be decoded don't relay to clients
|
|
logger.debug(f"payload for {topic} wasn't text. Skipping...")
|
|
return
|
|
|
|
all_cameras = set(self.config.cameras.keys())
|
|
all_zones = _collect_zone_names(self.config)
|
|
scope = _classify_outbound(topic, all_cameras, all_zones)
|
|
|
|
if scope[0] == "drop":
|
|
return
|
|
|
|
# Pre-parse payload once for topics that need to read its contents.
|
|
parsed_payload: Any = None
|
|
if scope[0] in (
|
|
"payload_camera",
|
|
"reshape_by_camera_key",
|
|
"reshape_job_state",
|
|
"reshape_stats",
|
|
):
|
|
parsed_payload = _parse_json_payload(payload)
|
|
if parsed_payload is None:
|
|
# malformed payload — fail closed
|
|
return
|
|
|
|
manager = self.websocket_server.manager
|
|
with manager.lock:
|
|
websockets = list(manager.websockets.values())
|
|
|
|
for ws in websockets:
|
|
if getattr(ws, "terminated", False):
|
|
continue
|
|
message = _materialize_for_ws(
|
|
ws, topic, ws_message, scope, parsed_payload, self.config
|
|
)
|
|
if message is None:
|
|
continue
|
|
try:
|
|
ws.send(message)
|
|
except (ConnectionResetError, BrokenPipeError, ValueError):
|
|
pass
|
|
|
|
def stop(self) -> None:
|
|
if self.websocket_server is not None:
|
|
self.websocket_server.manager.close_all()
|
|
self.websocket_server.manager.stop()
|
|
self.websocket_server.manager.join()
|
|
self.websocket_server.shutdown()
|
|
|
|
self.websocket_thread.join()
|
|
logger.info("Exiting websocket client...")
|