diff --git a/frigate/comms/ws.py b/frigate/comms/ws.py index 2f16ab7141..9af231da30 100644 --- a/frigate/comms/ws.py +++ b/frigate/comms/ws.py @@ -34,6 +34,7 @@ from frigate.const import ( UPDATE_REVIEW_DESCRIPTION, UPSERT_REVIEW_SEGMENT, ) +from frigate.models import User logger = logging.getLogger(__name__) @@ -69,11 +70,16 @@ _WS_VIEWER_TOPICS = frozenset( } ) +# Camera-scoped command topics a camera-authorized (non-admin) user may send. +_WS_CAMERA_COMMAND_TOPICS = frozenset({"ptz"}) + def _check_ws_authorization( topic: str, role_header: str | None, separator: str, + roles_config: dict[str, list[str]] | None = None, + camera_names: set[str] | None = None, ) -> bool: """Check if a WebSocket message is authorized. @@ -81,6 +87,10 @@ def _check_ws_authorization( topic: The message topic. role_header: The HTTP_REMOTE_ROLE header value, or None. separator: The role separator character from proxy config. + roles_config: The auth.roles mapping (role -> allowed cameras), used to + authorize camera-scoped commands for non-admin users. + camera_names: All configured camera names, used to resolve a role's + allowed cameras. Returns: True if authorized, False if blocked. @@ -90,16 +100,33 @@ def _check_ws_authorization( return False # No role header: default to viewer (fail-closed) - if role_header is None: - return topic in _WS_VIEWER_TOPICS + roles = [r.strip() for r in role_header.split(separator)] if role_header else [] - # Check if any role is admin - roles = [r.strip() for r in role_header.split(separator)] + # Admin can send anything if "admin" in roles: return True - # Non-admin: only viewer topics allowed - return topic in _WS_VIEWER_TOPICS + # Read-only topics any authenticated user can send + if topic in _WS_VIEWER_TOPICS: + return True + + # Camera-scoped command like "/ptz": allow when the user's role(s) + # grant access to that camera. + parts = topic.split("/") + if ( + roles_config is not None + and len(parts) == 2 + and parts[1] in _WS_CAMERA_COMMAND_TOPICS + ): + allowed: set[str] = set() + # No role header maps to the default viewer role (e.g. proxy-only setups) + for role in roles or ["viewer"]: + allowed.update( + User.get_allowed_cameras(role, roles_config, camera_names or set()) + ) + return parts[0] in allowed + + return False class WebSocket(WebSocket_): # type: ignore[misc] @@ -131,6 +158,8 @@ class WebSocketClient(Communicator): class _WebSocketHandler(WebSocket): receiver = self._dispatcher role_separator = self.config.proxy.separator or "," + roles_config = self.config.auth.roles + camera_names = set(self.config.cameras.keys()) def received_message(self, message: WebSocket.received_message) -> None: # type: ignore[name-defined] try: @@ -152,7 +181,11 @@ class WebSocketClient(Communicator): self.environ.get("HTTP_REMOTE_ROLE") if self.environ else None ) if self.environ is not None and not _check_ws_authorization( - topic, role_header, self.role_separator + topic, + role_header, + self.role_separator, + self.roles_config, + self.camera_names, ): logger.warning( "Blocked unauthorized WebSocket message: topic=%s, role=%s", diff --git a/frigate/test/test_ws_auth.py b/frigate/test/test_ws_auth.py index b762f4384c..a9fc6e1320 100644 --- a/frigate/test/test_ws_auth.py +++ b/frigate/test/test_ws_auth.py @@ -11,6 +11,16 @@ class TestCheckWsAuthorization(unittest.TestCase): DEFAULT_SEPARATOR = "," + # admin/viewer are reserved and always map to all cameras (empty list); + # custom roles map to a specific set of cameras. + ROLES_CONFIG = { + "admin": [], + "viewer": [], + "yard": ["front_door", "backyard"], + "garage_only": ["garage"], + } + CAMERA_NAMES = {"front_door", "backyard", "garage"} + # --- IPC topic blocking (unconditional, regardless of role) --- def test_ipc_topic_blocked_for_admin(self): @@ -161,6 +171,124 @@ class TestCheckWsAuthorization(unittest.TestCase): _check_ws_authorization("onConnect", None, self.DEFAULT_SEPARATOR) ) + # --- Camera-scoped PTZ access (non-admin with camera access) --- + + def test_viewer_can_ptz_camera_with_access(self): + # viewer maps to all cameras, so PTZ is allowed + self.assertTrue( + _check_ws_authorization( + "front_door/ptz", + "viewer", + self.DEFAULT_SEPARATOR, + self.ROLES_CONFIG, + self.CAMERA_NAMES, + ) + ) + + def test_custom_role_can_ptz_assigned_camera(self): + self.assertTrue( + _check_ws_authorization( + "front_door/ptz", + "yard", + self.DEFAULT_SEPARATOR, + self.ROLES_CONFIG, + self.CAMERA_NAMES, + ) + ) + + def test_custom_role_blocked_from_ptz_unassigned_camera(self): + self.assertFalse( + _check_ws_authorization( + "garage/ptz", + "yard", + self.DEFAULT_SEPARATOR, + self.ROLES_CONFIG, + self.CAMERA_NAMES, + ) + ) + + def test_multiple_roles_union_camera_access_for_ptz(self): + # "yard" covers front_door/backyard, "garage_only" covers garage + self.assertTrue( + _check_ws_authorization( + "garage/ptz", + "yard,garage_only", + self.DEFAULT_SEPARATOR, + self.ROLES_CONFIG, + self.CAMERA_NAMES, + ) + ) + + def test_unknown_role_blocked_from_ptz(self): + self.assertFalse( + _check_ws_authorization( + "front_door/ptz", + "nonexistent", + self.DEFAULT_SEPARATOR, + self.ROLES_CONFIG, + self.CAMERA_NAMES, + ) + ) + + def test_no_role_header_treated_as_viewer_for_ptz(self): + # proxy-only / auth-disabled setups default to the viewer role + self.assertTrue( + _check_ws_authorization( + "front_door/ptz", + None, + self.DEFAULT_SEPARATOR, + self.ROLES_CONFIG, + self.CAMERA_NAMES, + ) + ) + + def test_camera_access_does_not_grant_set_commands(self): + # camera access enables PTZ only, not config-changing "set" commands + self.assertFalse( + _check_ws_authorization( + "front_door/detect/set", + "yard", + self.DEFAULT_SEPARATOR, + self.ROLES_CONFIG, + self.CAMERA_NAMES, + ) + ) + + def test_ptz_autotracker_stays_admin_only(self): + # ptz_autotracker is a config toggle, not a live-view action + self.assertFalse( + _check_ws_authorization( + "front_door/ptz_autotracker/set", + "viewer", + self.DEFAULT_SEPARATOR, + self.ROLES_CONFIG, + self.CAMERA_NAMES, + ) + ) + + def test_admin_can_ptz_any_camera_with_config(self): + self.assertTrue( + _check_ws_authorization( + "garage/ptz", + "admin", + self.DEFAULT_SEPARATOR, + self.ROLES_CONFIG, + self.CAMERA_NAMES, + ) + ) + + def test_ipc_topic_still_blocked_with_camera_access(self): + # IPC topics are blocked unconditionally, even with camera access + self.assertFalse( + _check_ws_authorization( + UPDATE_CAMERA_ACTIVITY, + "viewer", + self.DEFAULT_SEPARATOR, + self.ROLES_CONFIG, + self.CAMERA_NAMES, + ) + ) + if __name__ == "__main__": unittest.main()