limit access to admin-only websocket topics for viewer users (#22710)

This commit is contained in:
Josh Hawkins 2026-03-31 08:51:55 -05:00 committed by GitHub
parent 01392e03ac
commit 0371b60c71
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 265 additions and 3 deletions

View File

@ -17,9 +17,90 @@ from ws4py.websocket import WebSocket as WebSocket_
from frigate.comms.base_communicator import Communicator
from frigate.config import FrigateConfig
from frigate.const import (
CLEAR_ONGOING_REVIEW_SEGMENTS,
EXPIRE_AUDIO_ACTIVITY,
INSERT_MANY_RECORDINGS,
INSERT_PREVIEW,
NOTIFICATION_TEST,
REQUEST_REGION_GRID,
UPDATE_AUDIO_ACTIVITY,
UPDATE_AUDIO_TRANSCRIPTION_STATE,
UPDATE_BIRDSEYE_LAYOUT,
UPDATE_CAMERA_ACTIVITY,
UPDATE_EMBEDDINGS_REINDEX_PROGRESS,
UPDATE_EVENT_DESCRIPTION,
UPDATE_MODEL_STATE,
UPDATE_REVIEW_DESCRIPTION,
UPSERT_REVIEW_SEGMENT,
)
logger = logging.getLogger(__name__)
# Internal IPC topics — NEVER allowed from WebSocket, regardless of role
_WS_BLOCKED_TOPICS = frozenset(
{
INSERT_MANY_RECORDINGS,
INSERT_PREVIEW,
REQUEST_REGION_GRID,
UPSERT_REVIEW_SEGMENT,
CLEAR_ONGOING_REVIEW_SEGMENTS,
UPDATE_CAMERA_ACTIVITY,
UPDATE_AUDIO_ACTIVITY,
EXPIRE_AUDIO_ACTIVITY,
UPDATE_EVENT_DESCRIPTION,
UPDATE_REVIEW_DESCRIPTION,
UPDATE_MODEL_STATE,
UPDATE_EMBEDDINGS_REINDEX_PROGRESS,
UPDATE_BIRDSEYE_LAYOUT,
UPDATE_AUDIO_TRANSCRIPTION_STATE,
NOTIFICATION_TEST,
}
)
# Read-only topics any authenticated user (including viewer) can send
_WS_VIEWER_TOPICS = frozenset(
{
"onConnect",
"modelState",
"audioTranscriptionState",
"birdseyeLayout",
"embeddingsReindexProgress",
}
)
def _check_ws_authorization(
topic: str,
role_header: str | None,
separator: str,
) -> bool:
"""Check if a WebSocket message is authorized.
Args:
topic: The message topic.
role_header: The HTTP_REMOTE_ROLE header value, or None.
separator: The role separator character from proxy config.
Returns:
True if authorized, False if blocked.
"""
# Block IPC-only topics unconditionally
if topic in _WS_BLOCKED_TOPICS:
return False
# No role header: default to viewer (fail-closed)
if role_header is None:
return topic in _WS_VIEWER_TOPICS
# Check if any role is admin
roles = [r.strip() for r in role_header.split(separator)]
if "admin" in roles:
return True
# Non-admin: only viewer topics allowed
return topic in _WS_VIEWER_TOPICS
class WebSocket(WebSocket_): # type: ignore[misc]
def unhandled_error(self, error: Any) -> None:
@ -49,6 +130,7 @@ class WebSocketClient(Communicator):
class _WebSocketHandler(WebSocket):
receiver = self._dispatcher
role_separator = self.config.proxy.separator or ","
def received_message(self, message: WebSocket.received_message) -> None: # type: ignore[name-defined]
try:
@ -63,11 +145,25 @@ class WebSocketClient(Communicator):
)
return
logger.debug(
f"Publishing mqtt message from websockets at {json_message['topic']}."
topic = json_message["topic"]
# Authorization check (skip when environ is None — direct internal connection)
role_header = (
self.environ.get("HTTP_REMOTE_ROLE") if self.environ else None
)
if self.environ is not None and not _check_ws_authorization(
topic, role_header, self.role_separator
):
logger.warning(
"Blocked unauthorized WebSocket message: topic=%s, role=%s",
topic,
role_header,
)
return
logger.debug(f"Publishing mqtt message from websockets at {topic}.")
self.receiver(
json_message["topic"],
topic,
json_message["payload"],
)

View File

@ -0,0 +1,166 @@
"""Tests for WebSocket authorization checks."""
import unittest
from frigate.comms.ws import _check_ws_authorization
from frigate.const import INSERT_MANY_RECORDINGS, UPDATE_CAMERA_ACTIVITY
class TestCheckWsAuthorization(unittest.TestCase):
"""Tests for the _check_ws_authorization pure function."""
DEFAULT_SEPARATOR = ","
# --- IPC topic blocking (unconditional, regardless of role) ---
def test_ipc_topic_blocked_for_admin(self):
self.assertFalse(
_check_ws_authorization(
INSERT_MANY_RECORDINGS, "admin", self.DEFAULT_SEPARATOR
)
)
def test_ipc_topic_blocked_for_viewer(self):
self.assertFalse(
_check_ws_authorization(
UPDATE_CAMERA_ACTIVITY, "viewer", self.DEFAULT_SEPARATOR
)
)
def test_ipc_topic_blocked_when_no_role(self):
self.assertFalse(
_check_ws_authorization(
INSERT_MANY_RECORDINGS, None, self.DEFAULT_SEPARATOR
)
)
# --- Viewer allowed topics ---
def test_viewer_can_send_on_connect(self):
self.assertTrue(
_check_ws_authorization("onConnect", "viewer", self.DEFAULT_SEPARATOR)
)
def test_viewer_can_send_model_state(self):
self.assertTrue(
_check_ws_authorization("modelState", "viewer", self.DEFAULT_SEPARATOR)
)
def test_viewer_can_send_audio_transcription_state(self):
self.assertTrue(
_check_ws_authorization(
"audioTranscriptionState", "viewer", self.DEFAULT_SEPARATOR
)
)
def test_viewer_can_send_birdseye_layout(self):
self.assertTrue(
_check_ws_authorization("birdseyeLayout", "viewer", self.DEFAULT_SEPARATOR)
)
def test_viewer_can_send_embeddings_reindex_progress(self):
self.assertTrue(
_check_ws_authorization(
"embeddingsReindexProgress", "viewer", self.DEFAULT_SEPARATOR
)
)
# --- Viewer blocked from admin topics ---
def test_viewer_blocked_from_restart(self):
self.assertFalse(
_check_ws_authorization("restart", "viewer", self.DEFAULT_SEPARATOR)
)
def test_viewer_blocked_from_camera_detect_set(self):
self.assertFalse(
_check_ws_authorization(
"front_door/detect/set", "viewer", self.DEFAULT_SEPARATOR
)
)
def test_viewer_blocked_from_camera_ptz(self):
self.assertFalse(
_check_ws_authorization("front_door/ptz", "viewer", self.DEFAULT_SEPARATOR)
)
def test_viewer_blocked_from_global_notifications_set(self):
self.assertFalse(
_check_ws_authorization(
"notifications/set", "viewer", self.DEFAULT_SEPARATOR
)
)
def test_viewer_blocked_from_camera_notifications_suspend(self):
self.assertFalse(
_check_ws_authorization(
"front_door/notifications/suspend", "viewer", self.DEFAULT_SEPARATOR
)
)
def test_viewer_blocked_from_arbitrary_unknown_topic(self):
self.assertFalse(
_check_ws_authorization(
"some_random_topic", "viewer", self.DEFAULT_SEPARATOR
)
)
# --- Admin access ---
def test_admin_can_send_restart(self):
self.assertTrue(
_check_ws_authorization("restart", "admin", self.DEFAULT_SEPARATOR)
)
def test_admin_can_send_camera_detect_set(self):
self.assertTrue(
_check_ws_authorization(
"front_door/detect/set", "admin", self.DEFAULT_SEPARATOR
)
)
def test_admin_can_send_camera_ptz(self):
self.assertTrue(
_check_ws_authorization("front_door/ptz", "admin", self.DEFAULT_SEPARATOR)
)
# --- Comma-separated roles ---
def test_comma_separated_admin_viewer_grants_admin(self):
self.assertTrue(
_check_ws_authorization("restart", "admin,viewer", self.DEFAULT_SEPARATOR)
)
def test_comma_separated_viewer_admin_grants_admin(self):
self.assertTrue(
_check_ws_authorization("restart", "viewer,admin", self.DEFAULT_SEPARATOR)
)
def test_comma_separated_with_spaces(self):
self.assertTrue(
_check_ws_authorization("restart", "viewer, admin", self.DEFAULT_SEPARATOR)
)
# --- Custom separator ---
def test_pipe_separator(self):
self.assertTrue(_check_ws_authorization("restart", "viewer|admin", "|"))
def test_pipe_separator_no_admin(self):
self.assertFalse(_check_ws_authorization("restart", "viewer|editor", "|"))
# --- No role header (fail-closed) ---
def test_no_role_header_blocks_admin_topics(self):
self.assertFalse(
_check_ws_authorization("restart", None, self.DEFAULT_SEPARATOR)
)
def test_no_role_header_allows_viewer_topics(self):
self.assertTrue(
_check_ws_authorization("onConnect", None, self.DEFAULT_SEPARATOR)
)
if __name__ == "__main__":
unittest.main()