From 862cb7bd8bbd3e0290d3eba6f68ddda1435da0fe Mon Sep 17 00:00:00 2001 From: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com> Date: Tue, 9 Sep 2025 10:44:03 -0500 Subject: [PATCH] enforce config roles in auth api endpoints --- frigate/api/auth.py | 90 ++++++++++++++++++++++++++++++--------------- 1 file changed, 61 insertions(+), 29 deletions(-) diff --git a/frigate/api/auth.py b/frigate/api/auth.py index f78a460c0..00fc470b9 100644 --- a/frigate/api/auth.py +++ b/frigate/api/auth.py @@ -33,7 +33,6 @@ from frigate.models import User logger = logging.getLogger(__name__) router = APIRouter(tags=[Tags.auth]) -VALID_ROLES = ["admin", "viewer"] class RateLimiter: @@ -204,6 +203,7 @@ async def get_current_user(request: Request): def require_role(required_roles: List[str]): async def role_checker(request: Request): proxy_config: ProxyConfig = request.app.frigate_config.proxy + config_roles = list(request.app.frigate_config.auth.roles.keys()) # Get role from header (could be comma-separated) role_header = request.headers.get("remote-role") @@ -217,12 +217,12 @@ def require_role(required_roles: List[str]): if not roles: raise HTTPException(status_code=403, detail="Role not provided") - # enforce VALID_ROLES - valid_roles = [r for r in roles if r in VALID_ROLES] + # enforce config roles + valid_roles = [r for r in roles if r in config_roles] if not valid_roles: raise HTTPException( status_code=403, - detail=f"No valid roles found in {roles}. Required: {', '.join(required_roles)}", + detail=f"No valid roles found in {roles}. Required: {', '.join(required_roles)}. Available: {', '.join(config_roles)}", ) if not any(role in required_roles for role in valid_roles): @@ -238,7 +238,9 @@ def require_role(required_roles: List[str]): return role_checker -def resolve_role(headers: dict, proxy_config: ProxyConfig) -> str: +def resolve_role( + headers: dict, proxy_config: ProxyConfig, config_roles: set[str] +) -> str: """ Determine the effective role for a request based on proxy headers and configuration. @@ -247,31 +249,40 @@ def resolve_role(headers: dict, proxy_config: ProxyConfig) -> str: - If a role_map is configured, treat the header as group claims (split by proxy_config.separator) and map to roles. - If no role_map is configured, treat the header as role names directly. - 2. If no valid role is found, return proxy_config.default_role. + 2. If no valid role is found, return proxy_config.default_role if it's valid in config_roles, else 'viewer'. Args: headers (dict): Incoming request headers (case-insensitive). proxy_config (ProxyConfig): Proxy configuration. + config_roles (set[str]): Set of valid roles from config. Returns: - str: Resolved role (always one of VALID_ROLES). + str: Resolved role (one of config_roles or validated default). """ - role = proxy_config.default_role + default_role = proxy_config.default_role role_header = proxy_config.header_map.role + # Validate default_role against config; fallback to 'viewer' if invalid + validated_default = default_role if default_role in config_roles else "viewer" + if not config_roles: + validated_default = "viewer" # Edge case: no roles defined + if not role_header: logger.debug( - "No role header configured in proxy_config.header_map. Returning default role '%s'.", - role, + "No role header configured in proxy_config.header_map. Returning validated default role '%s'.", + validated_default, ) - return role + return validated_default raw_value = headers.get(role_header, "") logger.debug("Raw role header value from '%s': %r", role_header, raw_value) if not raw_value: - logger.debug("Role header missing or empty. Returning default role '%s'.", role) - return role + logger.debug( + "Role header missing or empty. Returning validated default role '%s'.", + validated_default, + ) + return validated_default # role_map configured, treat header as group claims if proxy_config.header_map.role_map: @@ -288,16 +299,18 @@ def resolve_role(headers: dict, proxy_config: ProxyConfig) -> str: logger.debug("Matched roles from role_map: %s", matched_roles) if matched_roles: - resolved = next((r for r in VALID_ROLES if r in matched_roles), role) + resolved = next( + (r for r in config_roles if r in matched_roles), validated_default + ) logger.debug("Resolved role (with role_map) to '%s'.", resolved) return resolved logger.debug( - "No role_map match for groups '%s'. Using default role '%s'.", + "No role_map match for groups '%s'. Using validated default role '%s'.", raw_value, - proxy_config.default_role, + validated_default, ) - return role + return validated_default # no role_map, treat as role names directly roles_from_header = [ @@ -306,14 +319,14 @@ def resolve_role(headers: dict, proxy_config: ProxyConfig) -> str: logger.debug("Parsed roles directly from header: %s", roles_from_header) resolved = next( - (r for r in VALID_ROLES if r in roles_from_header), - proxy_config.default_role, + (r for r in config_roles if r in roles_from_header), + validated_default, ) - if resolved == proxy_config.default_role and roles_from_header: + if resolved == validated_default and roles_from_header: logger.debug( - "Provided proxy role header values '%s' did not contain a valid role. Using default role '%s'.", + "Provided proxy role header values '%s' did not contain a valid role. Using validated default role '%s'.", raw_value, - proxy_config.default_role, + validated_default, ) else: logger.debug("Resolved role (direct header) to '%s'.", resolved) @@ -358,7 +371,8 @@ def auth(request: Request): ) # parse header and resolve a valid role - role = resolve_role(request.headers, proxy_config) + config_roles_set = set(auth_config.roles.keys()) + role = resolve_role(request.headers, proxy_config, config_roles_set) success_response.headers["remote-role"] = role return success_response @@ -452,7 +466,13 @@ def profile(request: Request): username = request.headers.get("remote-user", "anonymous") role = request.headers.get("remote-role", "viewer") - return JSONResponse(content={"username": username, "role": role}) + all_camera_names = set(request.app.frigate_config.cameras.keys()) + roles_dict = request.app.frigate_config.auth.roles + allowed_cameras = User.get_allowed_cameras(role, roles_dict, all_camera_names) + + return JSONResponse( + content={"username": username, "role": role, "allowed_cameras": allowed_cameras} + ) @router.get("/logout") @@ -483,8 +503,12 @@ def login(request: Request, body: AppPostLoginBody): password_hash = db_user.password_hash if verify_password(password, password_hash): role = getattr(db_user, "role", "viewer") - if role not in VALID_ROLES: - role = "viewer" # Enforce valid roles + config_roles_set = set(request.app.frigate_config.auth.roles.keys()) + if role not in config_roles_set: + logger.warning( + f"User {db_user.username} has an invalid role {role}, falling back to 'viewer'." + ) + role = "viewer" expiration = int(time.time()) + JWT_SESSION_LENGTH encoded_jwt = create_encoded_jwt(user, role, expiration, request.app.jwt_token) response = Response("", 200) @@ -509,11 +533,17 @@ def create_user( body: AppPostUsersBody, ): HASH_ITERATIONS = request.app.frigate_config.auth.hash_iterations + config_roles = list(request.app.frigate_config.auth.roles.keys()) if not re.match("^[A-Za-z0-9._]+$", body.username): return JSONResponse(content={"message": "Invalid username"}, status_code=400) - role = body.role if body.role in VALID_ROLES else "viewer" + if body.role not in config_roles: + return JSONResponse( + content={"message": f"Role must be one of: {', '.join(config_roles)}"}, + status_code=400, + ) + role = body.role or "viewer" password_hash = hash_password(body.password, iterations=HASH_ITERATIONS) User.insert( { @@ -584,9 +614,11 @@ async def update_role( return JSONResponse( content={"message": "Cannot modify admin user's role"}, status_code=403 ) - if body.role not in VALID_ROLES: + config_roles = list(request.app.frigate_config.auth.roles.keys()) + if body.role not in config_roles: return JSONResponse( - content={"message": "Role must be 'admin' or 'viewer'"}, status_code=400 + content={"message": f"Role must be one of: {', '.join(config_roles)}"}, + status_code=400, ) User.set_by_id(username, {User.role: body.role})