diff --git a/frigate/api/auth.py b/frigate/api/auth.py index 00fc470b9..740e75998 100644 --- a/frigate/api/auth.py +++ b/frigate/api/auth.py @@ -11,7 +11,7 @@ import secrets import time from datetime import datetime from pathlib import Path -from typing import List +from typing import List, Optional from fastapi import APIRouter, Depends, HTTPException, Request, Response from fastapi.responses import JSONResponse, RedirectResponse @@ -623,3 +623,43 @@ async def update_role( User.set_by_id(username, {User.role: body.role}) return JSONResponse(content={"success": True}) + + +async def require_camera_access( + camera: Optional[str] = None, + request: Request = None, +): + """Dependency to enforce camera access based on user role.""" + if camera is None: + return # For lists, filter later + + current_user = await get_current_user(request) + if isinstance(current_user, JSONResponse): + return current_user + + role = current_user["role"] + all_camera_names = set(request.app.frigate_config.cameras.keys()) + roles_dict = request.app.frigate_config.auth.roles + allowed_cameras = User.get_allowed_cameras(role, roles_dict, all_camera_names) + + # Admin or full access bypasses + if role == "admin" or not roles_dict.get(role): + return + + if camera not in allowed_cameras: + raise HTTPException( + status_code=403, + detail=f"Access denied to camera '{camera}'. Allowed: {allowed_cameras}", + ) + + +async def get_allowed_cameras_for_filter(request: Request): + """Dependency to get allowed_cameras for filtering lists.""" + current_user = await get_current_user(request) + if isinstance(current_user, JSONResponse): + return [] # Unauthorized: no cameras + + role = current_user["role"] + all_camera_names = set(request.app.frigate_config.cameras.keys()) + roles_dict = request.app.frigate_config.auth.roles + return User.get_allowed_cameras(role, roles_dict, all_camera_names)