enforce config roles in auth api endpoints

This commit is contained in:
Josh Hawkins 2025-09-09 10:44:03 -05:00
parent c9578a7c02
commit 862cb7bd8b

View File

@ -33,7 +33,6 @@ from frigate.models import User
logger = logging.getLogger(__name__)
router = APIRouter(tags=[Tags.auth])
VALID_ROLES = ["admin", "viewer"]
class RateLimiter:
@ -204,6 +203,7 @@ async def get_current_user(request: Request):
def require_role(required_roles: List[str]):
async def role_checker(request: Request):
proxy_config: ProxyConfig = request.app.frigate_config.proxy
config_roles = list(request.app.frigate_config.auth.roles.keys())
# Get role from header (could be comma-separated)
role_header = request.headers.get("remote-role")
@ -217,12 +217,12 @@ def require_role(required_roles: List[str]):
if not roles:
raise HTTPException(status_code=403, detail="Role not provided")
# enforce VALID_ROLES
valid_roles = [r for r in roles if r in VALID_ROLES]
# enforce config roles
valid_roles = [r for r in roles if r in config_roles]
if not valid_roles:
raise HTTPException(
status_code=403,
detail=f"No valid roles found in {roles}. Required: {', '.join(required_roles)}",
detail=f"No valid roles found in {roles}. Required: {', '.join(required_roles)}. Available: {', '.join(config_roles)}",
)
if not any(role in required_roles for role in valid_roles):
@ -238,7 +238,9 @@ def require_role(required_roles: List[str]):
return role_checker
def resolve_role(headers: dict, proxy_config: ProxyConfig) -> str:
def resolve_role(
headers: dict, proxy_config: ProxyConfig, config_roles: set[str]
) -> str:
"""
Determine the effective role for a request based on proxy headers and configuration.
@ -247,31 +249,40 @@ def resolve_role(headers: dict, proxy_config: ProxyConfig) -> str:
- If a role_map is configured, treat the header as group claims
(split by proxy_config.separator) and map to roles.
- If no role_map is configured, treat the header as role names directly.
2. If no valid role is found, return proxy_config.default_role.
2. If no valid role is found, return proxy_config.default_role if it's valid in config_roles, else 'viewer'.
Args:
headers (dict): Incoming request headers (case-insensitive).
proxy_config (ProxyConfig): Proxy configuration.
config_roles (set[str]): Set of valid roles from config.
Returns:
str: Resolved role (always one of VALID_ROLES).
str: Resolved role (one of config_roles or validated default).
"""
role = proxy_config.default_role
default_role = proxy_config.default_role
role_header = proxy_config.header_map.role
# Validate default_role against config; fallback to 'viewer' if invalid
validated_default = default_role if default_role in config_roles else "viewer"
if not config_roles:
validated_default = "viewer" # Edge case: no roles defined
if not role_header:
logger.debug(
"No role header configured in proxy_config.header_map. Returning default role '%s'.",
role,
"No role header configured in proxy_config.header_map. Returning validated default role '%s'.",
validated_default,
)
return role
return validated_default
raw_value = headers.get(role_header, "")
logger.debug("Raw role header value from '%s': %r", role_header, raw_value)
if not raw_value:
logger.debug("Role header missing or empty. Returning default role '%s'.", role)
return role
logger.debug(
"Role header missing or empty. Returning validated default role '%s'.",
validated_default,
)
return validated_default
# role_map configured, treat header as group claims
if proxy_config.header_map.role_map:
@ -288,16 +299,18 @@ def resolve_role(headers: dict, proxy_config: ProxyConfig) -> str:
logger.debug("Matched roles from role_map: %s", matched_roles)
if matched_roles:
resolved = next((r for r in VALID_ROLES if r in matched_roles), role)
resolved = next(
(r for r in config_roles if r in matched_roles), validated_default
)
logger.debug("Resolved role (with role_map) to '%s'.", resolved)
return resolved
logger.debug(
"No role_map match for groups '%s'. Using default role '%s'.",
"No role_map match for groups '%s'. Using validated default role '%s'.",
raw_value,
proxy_config.default_role,
validated_default,
)
return role
return validated_default
# no role_map, treat as role names directly
roles_from_header = [
@ -306,14 +319,14 @@ def resolve_role(headers: dict, proxy_config: ProxyConfig) -> str:
logger.debug("Parsed roles directly from header: %s", roles_from_header)
resolved = next(
(r for r in VALID_ROLES if r in roles_from_header),
proxy_config.default_role,
(r for r in config_roles if r in roles_from_header),
validated_default,
)
if resolved == proxy_config.default_role and roles_from_header:
if resolved == validated_default and roles_from_header:
logger.debug(
"Provided proxy role header values '%s' did not contain a valid role. Using default role '%s'.",
"Provided proxy role header values '%s' did not contain a valid role. Using validated default role '%s'.",
raw_value,
proxy_config.default_role,
validated_default,
)
else:
logger.debug("Resolved role (direct header) to '%s'.", resolved)
@ -358,7 +371,8 @@ def auth(request: Request):
)
# parse header and resolve a valid role
role = resolve_role(request.headers, proxy_config)
config_roles_set = set(auth_config.roles.keys())
role = resolve_role(request.headers, proxy_config, config_roles_set)
success_response.headers["remote-role"] = role
return success_response
@ -452,7 +466,13 @@ def profile(request: Request):
username = request.headers.get("remote-user", "anonymous")
role = request.headers.get("remote-role", "viewer")
return JSONResponse(content={"username": username, "role": role})
all_camera_names = set(request.app.frigate_config.cameras.keys())
roles_dict = request.app.frigate_config.auth.roles
allowed_cameras = User.get_allowed_cameras(role, roles_dict, all_camera_names)
return JSONResponse(
content={"username": username, "role": role, "allowed_cameras": allowed_cameras}
)
@router.get("/logout")
@ -483,8 +503,12 @@ def login(request: Request, body: AppPostLoginBody):
password_hash = db_user.password_hash
if verify_password(password, password_hash):
role = getattr(db_user, "role", "viewer")
if role not in VALID_ROLES:
role = "viewer" # Enforce valid roles
config_roles_set = set(request.app.frigate_config.auth.roles.keys())
if role not in config_roles_set:
logger.warning(
f"User {db_user.username} has an invalid role {role}, falling back to 'viewer'."
)
role = "viewer"
expiration = int(time.time()) + JWT_SESSION_LENGTH
encoded_jwt = create_encoded_jwt(user, role, expiration, request.app.jwt_token)
response = Response("", 200)
@ -509,11 +533,17 @@ def create_user(
body: AppPostUsersBody,
):
HASH_ITERATIONS = request.app.frigate_config.auth.hash_iterations
config_roles = list(request.app.frigate_config.auth.roles.keys())
if not re.match("^[A-Za-z0-9._]+$", body.username):
return JSONResponse(content={"message": "Invalid username"}, status_code=400)
role = body.role if body.role in VALID_ROLES else "viewer"
if body.role not in config_roles:
return JSONResponse(
content={"message": f"Role must be one of: {', '.join(config_roles)}"},
status_code=400,
)
role = body.role or "viewer"
password_hash = hash_password(body.password, iterations=HASH_ITERATIONS)
User.insert(
{
@ -584,9 +614,11 @@ async def update_role(
return JSONResponse(
content={"message": "Cannot modify admin user's role"}, status_code=403
)
if body.role not in VALID_ROLES:
config_roles = list(request.app.frigate_config.auth.roles.keys())
if body.role not in config_roles:
return JSONResponse(
content={"message": "Role must be 'admin' or 'viewer'"}, status_code=400
content={"message": f"Role must be one of: {', '.join(config_roles)}"},
status_code=400,
)
User.set_by_id(username, {User.role: body.role})