Camera access fixes (#22987)
Some checks are pending
CI / AMD64 Build (push) Waiting to run
CI / ARM Build (push) Waiting to run
CI / Jetson Jetpack 6 (push) Waiting to run
CI / AMD64 Extra Build (push) Blocked by required conditions
CI / ARM Extra Build (push) Blocked by required conditions
CI / Synaptics Build (push) Blocked by required conditions
CI / Assemble and push default build (push) Blocked by required conditions

* only send monitoring notifications to users with camera access

* check access to similarity search event id camera

* require admin role for storage usage endpoint

* check camera access for jsmpeg and birdseye cameras

* tests

* formatting
This commit is contained in:
Josh Hawkins 2026-04-23 13:27:49 -05:00 committed by GitHub
parent 1a6d04fde7
commit 77831304a7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 219 additions and 8 deletions

View File

@ -754,6 +754,15 @@ def events_search(
status_code=404,
)
if search_event.camera not in allowed_cameras:
return JSONResponse(
content={
"success": False,
"message": "Event not found",
},
status_code=404,
)
thumb_result = context.search_thumbnail(search_event)
thumb_ids = {result[0]: result[1] for result in thumb_result}
search_results = {

View File

@ -35,7 +35,7 @@ logger = logging.getLogger(__name__)
router = APIRouter(tags=[Tags.recordings])
@router.get("/recordings/storage", dependencies=[Depends(allow_any_authenticated())])
@router.get("/recordings/storage", dependencies=[Depends(require_role(["admin"]))])
def get_recordings_storage_usage(request: Request):
recording_stats = request.app.stats_emitter.get_latest_stats()["service"][
"storage"

View File

@ -549,6 +549,14 @@ class WebPushClient(Communicator):
logger.debug(f"Sending camera monitoring push notification for {camera_name}")
for user in self.web_pushers:
if not self._user_has_camera_access(user, camera):
logger.debug(
"Skipping notification for user %s - no access to camera %s",
user,
camera,
)
continue
self.send_push_notification(
user=user,
payload=payload,

View File

@ -19,6 +19,7 @@ import numpy as np
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import BirdseyeModeEnum, FfmpegConfig, FrigateConfig
from frigate.const import BASE_DIR, BIRDSEYE_PIPE, INSTALL_DIR, UPDATE_BIRDSEYE_LAYOUT
from frigate.output.ws_auth import ws_has_camera_access
from frigate.util.image import (
SharedMemoryFrameManager,
copy_yuv_to_position,
@ -236,12 +237,14 @@ class BroadcastThread(threading.Thread):
converter: FFMpegConverter,
websocket_server: Any,
stop_event: MpEvent,
config: FrigateConfig,
):
super().__init__()
self.camera = camera
self.converter = converter
self.websocket_server = websocket_server
self.stop_event = stop_event
self.config = config
def run(self) -> None:
while not self.stop_event.is_set():
@ -256,6 +259,7 @@ class BroadcastThread(threading.Thread):
if (
not ws.terminated
and ws.environ["PATH_INFO"] == f"/{self.camera}"
and ws_has_camera_access(ws, self.camera, self.config)
):
try:
ws.send(buf, binary=True)
@ -806,7 +810,11 @@ class Birdseye:
config.birdseye.restream,
)
self.broadcaster = BroadcastThread(
"birdseye", self.converter, websocket_server, stop_event
"birdseye",
self.converter,
websocket_server,
stop_event,
config,
)
self.birdseye_manager = BirdsEyeFrameManager(self.config, stop_event)
self.frame_manager = SharedMemoryFrameManager()

View File

@ -7,7 +7,8 @@ import threading
from multiprocessing.synchronize import Event as MpEvent
from typing import Any
from frigate.config import CameraConfig, FfmpegConfig
from frigate.config import CameraConfig, FfmpegConfig, FrigateConfig
from frigate.output.ws_auth import ws_has_camera_access
logger = logging.getLogger(__name__)
@ -102,12 +103,14 @@ class BroadcastThread(threading.Thread):
converter: FFMpegConverter,
websocket_server: Any,
stop_event: MpEvent,
config: FrigateConfig,
):
super().__init__()
self.camera = camera
self.converter = converter
self.websocket_server = websocket_server
self.stop_event = stop_event
self.config = config
def run(self) -> None:
while not self.stop_event.is_set():
@ -122,6 +125,7 @@ class BroadcastThread(threading.Thread):
if (
not ws.terminated
and ws.environ["PATH_INFO"] == f"/{self.camera}"
and ws_has_camera_access(ws, self.camera, self.config)
):
try:
ws.send(buf, binary=True)
@ -135,7 +139,11 @@ class BroadcastThread(threading.Thread):
class JsmpegCamera:
def __init__(
self, config: CameraConfig, stop_event: MpEvent, websocket_server: Any
self,
config: CameraConfig,
frigate_config: FrigateConfig,
stop_event: MpEvent,
websocket_server: Any,
) -> None:
self.config = config
self.input: queue.Queue[bytes] = queue.Queue(maxsize=config.detect.fps)
@ -154,7 +162,11 @@ class JsmpegCamera:
config.live.quality,
)
self.broadcaster = BroadcastThread(
config.name or "", self.converter, websocket_server, stop_event
config.name or "",
self.converter,
websocket_server,
stop_event,
frigate_config,
)
self.converter.start()

View File

@ -32,6 +32,7 @@ from frigate.const import (
from frigate.output.birdseye import Birdseye
from frigate.output.camera import JsmpegCamera
from frigate.output.preview import PreviewRecorder
from frigate.output.ws_auth import ws_has_camera_access
from frigate.util.image import SharedMemoryFrameManager, get_blank_yuv_frame
from frigate.util.process import FrigateProcess
@ -102,7 +103,7 @@ class OutputProcess(FrigateProcess):
) -> None:
camera_config = self.config.cameras[camera]
jsmpeg_cameras[camera] = JsmpegCamera(
camera_config, self.stop_event, websocket_server
camera_config, self.config, self.stop_event, websocket_server
)
preview_recorders[camera] = PreviewRecorder(camera_config)
preview_write_times[camera] = 0
@ -262,6 +263,7 @@ class OutputProcess(FrigateProcess):
# send camera frame to ffmpeg process if websockets are connected
if any(
ws.environ["PATH_INFO"].endswith(camera)
and ws_has_camera_access(ws, camera, self.config)
for ws in websocket_server.manager
):
# write to the converter for the camera if clients are listening to the specific camera
@ -275,6 +277,7 @@ class OutputProcess(FrigateProcess):
self.config.birdseye.restream
or any(
ws.environ["PATH_INFO"].endswith("birdseye")
and ws_has_camera_access(ws, "birdseye", self.config)
for ws in websocket_server.manager
)
)

43
frigate/output/ws_auth.py Normal file
View File

@ -0,0 +1,43 @@
"""Authorization helpers for JSMPEG websocket clients."""
from typing import Any
from frigate.config import FrigateConfig
from frigate.models import User
def _get_valid_ws_roles(ws: Any, config: FrigateConfig) -> list[str]:
role_header = ws.environ.get("HTTP_REMOTE_ROLE", "")
roles = [
role.strip()
for role in role_header.split(config.proxy.separator)
if role.strip()
]
return [role for role in roles if role in config.auth.roles]
def ws_has_camera_access(ws: Any, camera_name: str, config: FrigateConfig) -> bool:
"""Return True when a websocket client is authorized for the camera path."""
roles = _get_valid_ws_roles(ws, config)
if not roles:
return False
roles_dict = config.auth.roles
# Birdseye is a composite stream, so only users with unrestricted access
# should receive it.
if camera_name == "birdseye":
return any(role == "admin" or not roles_dict.get(role) for role in roles)
all_camera_names = set(config.cameras.keys())
for role in roles:
if role == "admin" or not roles_dict.get(role):
return True
allowed_cameras = User.get_allowed_cameras(role, roles_dict, all_camera_names)
if camera_name in allowed_cameras:
return True
return False

View File

@ -23,6 +23,26 @@ class TestHttpApp(BaseTestHttp):
response_json = response.json()
assert response_json == self.test_stats
def test_recordings_storage_requires_admin(self):
stats = Mock(spec=StatsEmitter)
stats.get_latest_stats.return_value = self.test_stats
app = super().create_app(stats)
app.storage_maintainer = Mock()
app.storage_maintainer.calculate_camera_usages.return_value = {
"front_door": {"usage": 2.0},
}
with AuthTestClient(app) as client:
response = client.get(
"/recordings/storage",
headers={"remote-user": "viewer", "remote-role": "viewer"},
)
assert response.status_code == 403
response = client.get("/recordings/storage")
assert response.status_code == 200
assert response.json()["front_door"]["usage_percent"] == 25.0
def test_config_set_in_memory_replaces_objects_track_list(self):
self.minimal_config["cameras"]["front_door"]["objects"] = {
"track": ["person", "car"],

View File

@ -219,6 +219,25 @@ class TestHttpApp(BaseTestHttp):
assert len(events) == 1
assert events[0]["id"] == event_id
def test_similarity_search_hides_unauthorized_anchor_event(self):
mock_embeddings = Mock()
self.app.frigate_config.semantic_search.enabled = True
self.app.embeddings = mock_embeddings
with AuthTestClient(self.app) as client:
super().insert_mock_event("hidden.anchor", camera="back_door")
response = client.get(
"/events/search",
params={
"search_type": "similarity",
"event_id": "hidden.anchor",
},
)
assert response.status_code == 404
assert response.json()["message"] == "Event not found"
mock_embeddings.search_thumbnail.assert_not_called()
def test_get_good_event(self):
id = "123456.random"

View File

@ -145,9 +145,12 @@ class TestExecuteFindSimilarObjects(unittest.TestCase):
embeddings=embeddings,
frigate_config=SimpleNamespace(
semantic_search=SimpleNamespace(enabled=semantic_enabled),
cameras={"driveway": object()},
auth=SimpleNamespace(roles={"admin": [], "viewer": ["driveway"]}),
proxy=SimpleNamespace(separator=","),
),
)
return SimpleNamespace(app=app)
return SimpleNamespace(app=app, headers={})
def test_semantic_search_disabled_returns_error(self):
req = self._make_request(semantic_enabled=False)
@ -180,7 +183,7 @@ class TestExecuteFindSimilarObjects(unittest.TestCase):
_execute_find_similar_objects(
req,
{"event_id": "anchor", "cameras": ["nonexistent_cam"]},
allowed_cameras=["nonexistent_cam"],
allowed_cameras=["driveway"],
)
)
self.assertEqual(result["results"], [])

View File

@ -0,0 +1,57 @@
"""Tests for JSMPEG websocket authorization."""
import unittest
from types import SimpleNamespace
from frigate.config import FrigateConfig
from frigate.output.ws_auth import ws_has_camera_access
class TestWsHasCameraAccess(unittest.TestCase):
def setUp(self):
self.config = FrigateConfig(
mqtt={"host": "mqtt"},
auth={"roles": {"limited_user": ["front_door"]}},
cameras={
"front_door": {
"ffmpeg": {
"inputs": [
{"path": "rtsp://10.0.0.1:554/video", "roles": ["detect"]}
]
},
"detect": {"height": 1080, "width": 1920, "fps": 5},
},
"back_door": {
"ffmpeg": {
"inputs": [
{"path": "rtsp://10.0.0.2:554/video", "roles": ["detect"]}
]
},
"detect": {"height": 1080, "width": 1920, "fps": 5},
},
},
)
def _make_ws(self, role: str):
return SimpleNamespace(environ={"HTTP_REMOTE_ROLE": role})
def test_restricted_role_only_gets_allowed_camera(self):
ws = self._make_ws("limited_user")
self.assertTrue(ws_has_camera_access(ws, "front_door", self.config))
self.assertFalse(ws_has_camera_access(ws, "back_door", self.config))
def test_unrestricted_role_can_access_any_camera(self):
ws = self._make_ws("viewer")
self.assertTrue(ws_has_camera_access(ws, "front_door", self.config))
self.assertTrue(ws_has_camera_access(ws, "back_door", self.config))
def test_birdseye_requires_unrestricted_access(self):
self.assertTrue(
ws_has_camera_access(self._make_ws("admin"), "birdseye", self.config)
)
self.assertTrue(
ws_has_camera_access(self._make_ws("viewer"), "birdseye", self.config)
)
self.assertFalse(
ws_has_camera_access(self._make_ws("limited_user"), "birdseye", self.config)
)

View File

@ -0,0 +1,29 @@
"""Tests for camera monitoring notification authorization."""
import unittest
from types import SimpleNamespace
from unittest.mock import MagicMock
from frigate.comms.webpush import WebPushClient
class TestCameraMonitoringNotifications(unittest.TestCase):
def test_send_camera_monitoring_filters_by_camera_access(self):
client = WebPushClient.__new__(WebPushClient)
client.config = SimpleNamespace(
cameras={"front_door": SimpleNamespace(friendly_name=None)}
)
client.web_pushers = {"allowed": [], "denied": []}
client.user_cameras = {"allowed": {"front_door"}, "denied": set()}
client.check_registrations = MagicMock()
client.cleanup_registrations = MagicMock()
client.send_push_notification = MagicMock()
client.send_camera_monitoring(
{"camera": "front_door", "message": "Monitoring condition met"}
)
self.assertEqual(client.send_push_notification.call_count, 1)
self.assertEqual(
client.send_push_notification.call_args.kwargs["user"], "allowed"
)