allow non-admin users to send PTZ commands for cameras they have access to (#23578)
Some checks failed
CI / AMD64 Build (push) Has been cancelled
CI / ARM Build (push) Has been cancelled
CI / Jetson Jetpack 6 (push) Has been cancelled
CI / Assemble and push default build (push) Has been cancelled
CI / AMD64 Extra Build (push) Has been cancelled
CI / ARM Extra Build (push) Has been cancelled
CI / Synaptics Build (push) Has been cancelled

This commit is contained in:
Josh Hawkins 2026-06-27 16:55:39 -05:00 committed by GitHub
parent 933a7f1a3f
commit 3d4dd3ac4b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 168 additions and 7 deletions

View File

@ -34,6 +34,7 @@ from frigate.const import (
UPDATE_REVIEW_DESCRIPTION,
UPSERT_REVIEW_SEGMENT,
)
from frigate.models import User
logger = logging.getLogger(__name__)
@ -69,11 +70,16 @@ _WS_VIEWER_TOPICS = frozenset(
}
)
# Camera-scoped command topics a camera-authorized (non-admin) user may send.
_WS_CAMERA_COMMAND_TOPICS = frozenset({"ptz"})
def _check_ws_authorization(
topic: str,
role_header: str | None,
separator: str,
roles_config: dict[str, list[str]] | None = None,
camera_names: set[str] | None = None,
) -> bool:
"""Check if a WebSocket message is authorized.
@ -81,6 +87,10 @@ def _check_ws_authorization(
topic: The message topic.
role_header: The HTTP_REMOTE_ROLE header value, or None.
separator: The role separator character from proxy config.
roles_config: The auth.roles mapping (role -> allowed cameras), used to
authorize camera-scoped commands for non-admin users.
camera_names: All configured camera names, used to resolve a role's
allowed cameras.
Returns:
True if authorized, False if blocked.
@ -90,16 +100,33 @@ def _check_ws_authorization(
return False
# No role header: default to viewer (fail-closed)
if role_header is None:
return topic in _WS_VIEWER_TOPICS
roles = [r.strip() for r in role_header.split(separator)] if role_header else []
# Check if any role is admin
roles = [r.strip() for r in role_header.split(separator)]
# Admin can send anything
if "admin" in roles:
return True
# Non-admin: only viewer topics allowed
return topic in _WS_VIEWER_TOPICS
# Read-only topics any authenticated user can send
if topic in _WS_VIEWER_TOPICS:
return True
# Camera-scoped command like "<camera>/ptz": allow when the user's role(s)
# grant access to that camera.
parts = topic.split("/")
if (
roles_config is not None
and len(parts) == 2
and parts[1] in _WS_CAMERA_COMMAND_TOPICS
):
allowed: set[str] = set()
# No role header maps to the default viewer role (e.g. proxy-only setups)
for role in roles or ["viewer"]:
allowed.update(
User.get_allowed_cameras(role, roles_config, camera_names or set())
)
return parts[0] in allowed
return False
class WebSocket(WebSocket_): # type: ignore[misc]
@ -131,6 +158,8 @@ class WebSocketClient(Communicator):
class _WebSocketHandler(WebSocket):
receiver = self._dispatcher
role_separator = self.config.proxy.separator or ","
roles_config = self.config.auth.roles
camera_names = set(self.config.cameras.keys())
def received_message(self, message: WebSocket.received_message) -> None: # type: ignore[name-defined]
try:
@ -152,7 +181,11 @@ class WebSocketClient(Communicator):
self.environ.get("HTTP_REMOTE_ROLE") if self.environ else None
)
if self.environ is not None and not _check_ws_authorization(
topic, role_header, self.role_separator
topic,
role_header,
self.role_separator,
self.roles_config,
self.camera_names,
):
logger.warning(
"Blocked unauthorized WebSocket message: topic=%s, role=%s",

View File

@ -11,6 +11,16 @@ class TestCheckWsAuthorization(unittest.TestCase):
DEFAULT_SEPARATOR = ","
# admin/viewer are reserved and always map to all cameras (empty list);
# custom roles map to a specific set of cameras.
ROLES_CONFIG = {
"admin": [],
"viewer": [],
"yard": ["front_door", "backyard"],
"garage_only": ["garage"],
}
CAMERA_NAMES = {"front_door", "backyard", "garage"}
# --- IPC topic blocking (unconditional, regardless of role) ---
def test_ipc_topic_blocked_for_admin(self):
@ -161,6 +171,124 @@ class TestCheckWsAuthorization(unittest.TestCase):
_check_ws_authorization("onConnect", None, self.DEFAULT_SEPARATOR)
)
# --- Camera-scoped PTZ access (non-admin with camera access) ---
def test_viewer_can_ptz_camera_with_access(self):
# viewer maps to all cameras, so PTZ is allowed
self.assertTrue(
_check_ws_authorization(
"front_door/ptz",
"viewer",
self.DEFAULT_SEPARATOR,
self.ROLES_CONFIG,
self.CAMERA_NAMES,
)
)
def test_custom_role_can_ptz_assigned_camera(self):
self.assertTrue(
_check_ws_authorization(
"front_door/ptz",
"yard",
self.DEFAULT_SEPARATOR,
self.ROLES_CONFIG,
self.CAMERA_NAMES,
)
)
def test_custom_role_blocked_from_ptz_unassigned_camera(self):
self.assertFalse(
_check_ws_authorization(
"garage/ptz",
"yard",
self.DEFAULT_SEPARATOR,
self.ROLES_CONFIG,
self.CAMERA_NAMES,
)
)
def test_multiple_roles_union_camera_access_for_ptz(self):
# "yard" covers front_door/backyard, "garage_only" covers garage
self.assertTrue(
_check_ws_authorization(
"garage/ptz",
"yard,garage_only",
self.DEFAULT_SEPARATOR,
self.ROLES_CONFIG,
self.CAMERA_NAMES,
)
)
def test_unknown_role_blocked_from_ptz(self):
self.assertFalse(
_check_ws_authorization(
"front_door/ptz",
"nonexistent",
self.DEFAULT_SEPARATOR,
self.ROLES_CONFIG,
self.CAMERA_NAMES,
)
)
def test_no_role_header_treated_as_viewer_for_ptz(self):
# proxy-only / auth-disabled setups default to the viewer role
self.assertTrue(
_check_ws_authorization(
"front_door/ptz",
None,
self.DEFAULT_SEPARATOR,
self.ROLES_CONFIG,
self.CAMERA_NAMES,
)
)
def test_camera_access_does_not_grant_set_commands(self):
# camera access enables PTZ only, not config-changing "set" commands
self.assertFalse(
_check_ws_authorization(
"front_door/detect/set",
"yard",
self.DEFAULT_SEPARATOR,
self.ROLES_CONFIG,
self.CAMERA_NAMES,
)
)
def test_ptz_autotracker_stays_admin_only(self):
# ptz_autotracker is a config toggle, not a live-view action
self.assertFalse(
_check_ws_authorization(
"front_door/ptz_autotracker/set",
"viewer",
self.DEFAULT_SEPARATOR,
self.ROLES_CONFIG,
self.CAMERA_NAMES,
)
)
def test_admin_can_ptz_any_camera_with_config(self):
self.assertTrue(
_check_ws_authorization(
"garage/ptz",
"admin",
self.DEFAULT_SEPARATOR,
self.ROLES_CONFIG,
self.CAMERA_NAMES,
)
)
def test_ipc_topic_still_blocked_with_camera_access(self):
# IPC topics are blocked unconditionally, even with camera access
self.assertFalse(
_check_ws_authorization(
UPDATE_CAMERA_ACTIVITY,
"viewer",
self.DEFAULT_SEPARATOR,
self.ROLES_CONFIG,
self.CAMERA_NAMES,
)
)
if __name__ == "__main__":
unittest.main()