add role map support

This commit is contained in:
Josh Hawkins 2025-08-25 17:21:48 -05:00
parent 13cef51133
commit 431ba8166e

View File

@ -217,15 +217,23 @@ def require_role(required_roles: List[str]):
if not roles:
raise HTTPException(status_code=403, detail="Role not provided")
# Check if any role matches required_roles
if not any(role in required_roles for role in roles):
# enforce VALID_ROLES
valid_roles = [r for r in roles if r in VALID_ROLES]
if not valid_roles:
raise HTTPException(
status_code=403,
detail=f"Role {', '.join(roles)} not authorized. Required: {', '.join(required_roles)}",
detail=f"No valid roles found in {roles}. Required: {', '.join(required_roles)}",
)
# Return the first matching role
return next((role for role in roles if role in required_roles), roles[0])
if not any(role in required_roles for role in valid_roles):
raise HTTPException(
status_code=403,
detail=f"Role {', '.join(valid_roles)} not authorized. Required: {', '.join(required_roles)}",
)
return next(
(role for role in valid_roles if role in required_roles), valid_roles[0]
)
return role_checker
@ -266,22 +274,38 @@ def auth(request: Request):
else "anonymous"
)
# start with default_role
role = proxy_config.default_role
# first try: explicit role header
role_header = proxy_config.header_map.role
role = (
request.headers.get(role_header, default=proxy_config.default_role)
if role_header
else proxy_config.default_role
)
# if comma-separated with "admin", use "admin",
# if comma-separated with "viewer", use "viewer",
# else use default role
roles = [r.strip() for r in role.split(proxy_config.separator)] if role else []
success_response.headers["remote-role"] = next(
(r for r in VALID_ROLES if r in roles), proxy_config.default_role
)
if role_header:
raw_value = request.headers.get(role_header, "")
if proxy_config.header_map.role_map and raw_value:
# treat as group claim
groups = [
g.strip()
for g in raw_value.replace(" ", ",").split(",")
if g.strip()
]
for (
candidate_role,
required_groups,
) in proxy_config.header_map.role_map.items():
if any(group in groups for group in required_groups):
role = candidate_role
break
elif raw_value:
normalized_role = raw_value.strip().lower()
if normalized_role in VALID_ROLES:
role = normalized_role
else:
logger.warning(
f"Provided proxy role header contains invalid value '{raw_value}'. Using default role '{proxy_config.default_role}'."
)
role = proxy_config.default_role
success_response.headers["remote-role"] = role
return success_response
# now apply authentication