require admin role by default

This commit is contained in:
Josh Hawkins 2025-11-26 07:54:53 -06:00
parent de2144f158
commit 55a1cb0599
2 changed files with 95 additions and 7 deletions

View File

@ -32,10 +32,97 @@ from frigate.models import User
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def require_admin_by_default():
"""
Global admin requirement dependency for all endpoints by default.
This is set as the default dependency on the FastAPI app to ensure all
endpoints require admin access unless explicitly overridden with
allow_public(), allow_any_authenticated(), or allow_viewer().
Port 5000 (internal) always has admin role set by the /auth endpoint,
so this check passes automatically for internal requests.
"""
async def admin_checker(request: Request):
role = request.headers.get("remote-role")
if role == "admin":
return
raise HTTPException(
status_code=403,
detail="Admin role required for this endpoint",
)
return admin_checker
def allow_public():
"""
Override dependency to allow unauthenticated access to an endpoint.
Use this for endpoints that should be publicly accessible without
authentication, such as login page, health checks, or pre-auth info.
Example:
@router.get("/public-endpoint", dependencies=[Depends(allow_public())])
"""
async def public_checker(request: Request):
return # Always allow
return public_checker
def allow_any_authenticated():
"""
Override dependency to allow any authenticated user (bypass admin requirement).
The user must have valid remote-user and remote-role headers, but any role
is accepted. Port 5000 requests have "admin" role so they also pass.
Example:
@router.get("/authenticated-endpoint", dependencies=[Depends(allow_any_authenticated())])
"""
async def auth_checker(request: Request):
if not request.headers.get("remote-user"):
raise HTTPException(
status_code=401,
detail="Authentication required",
)
return
return auth_checker
def allow_viewer():
"""
Override dependency to allow viewer or higher roles (admin, custom roles with cameras).
This is useful for read-only endpoints that should allow non-admin authenticated users.
Example:
@router.get("/viewer-endpoint", dependencies=[Depends(allow_viewer())])
"""
async def viewer_checker(request: Request):
role = request.headers.get("remote-role")
if not role:
raise HTTPException(
status_code=401,
detail="Authentication required",
)
# Any role is allowed (admin, viewer, or custom roles)
return
return viewer_checker
router = APIRouter(tags=[Tags.auth]) router = APIRouter(tags=[Tags.auth])
@router.get("/auth/first_time_login") @router.get("/auth/first_time_login", dependencies=[Depends(allow_public())])
def first_time_login(request: Request): def first_time_login(request: Request):
"""Return whether the admin first-time login help flag is set in config. """Return whether the admin first-time login help flag is set in config.
@ -352,7 +439,7 @@ def resolve_role(
# Endpoints # Endpoints
@router.get("/auth") @router.get("/auth", dependencies=[Depends(allow_public())])
def auth(request: Request): def auth(request: Request):
auth_config: AuthConfig = request.app.frigate_config.auth auth_config: AuthConfig = request.app.frigate_config.auth
proxy_config: ProxyConfig = request.app.frigate_config.proxy proxy_config: ProxyConfig = request.app.frigate_config.proxy
@ -478,7 +565,7 @@ def auth(request: Request):
return fail_response return fail_response
@router.get("/profile") @router.get("/profile", dependencies=[Depends(allow_any_authenticated())])
def profile(request: Request): def profile(request: Request):
username = request.headers.get("remote-user", "anonymous") username = request.headers.get("remote-user", "anonymous")
role = request.headers.get("remote-role", "viewer") role = request.headers.get("remote-role", "viewer")
@ -492,7 +579,7 @@ def profile(request: Request):
) )
@router.get("/logout") @router.get("/logout", dependencies=[Depends(allow_any_authenticated())])
def logout(request: Request): def logout(request: Request):
auth_config: AuthConfig = request.app.frigate_config.auth auth_config: AuthConfig = request.app.frigate_config.auth
response = RedirectResponse("/login", status_code=303) response = RedirectResponse("/login", status_code=303)
@ -503,7 +590,7 @@ def logout(request: Request):
limiter = Limiter(key_func=get_remote_addr) limiter = Limiter(key_func=get_remote_addr)
@router.post("/login") @router.post("/login", dependencies=[Depends(allow_public())])
@limiter.limit(limit_value=rateLimiter.get_limit) @limiter.limit(limit_value=rateLimiter.get_limit)
def login(request: Request, body: AppPostLoginBody): def login(request: Request, body: AppPostLoginBody):
JWT_COOKIE_NAME = request.app.frigate_config.auth.cookie_name JWT_COOKIE_NAME = request.app.frigate_config.auth.cookie_name

View File

@ -2,7 +2,7 @@ import logging
import re import re
from typing import Optional from typing import Optional
from fastapi import FastAPI, Request from fastapi import Depends, FastAPI, Request
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
from joserfc.jwk import OctKey from joserfc.jwk import OctKey
from playhouse.sqliteq import SqliteQueueDatabase from playhouse.sqliteq import SqliteQueueDatabase
@ -24,7 +24,7 @@ from frigate.api import (
preview, preview,
review, review,
) )
from frigate.api.auth import get_jwt_secret, limiter from frigate.api.auth import get_jwt_secret, limiter, require_admin_by_default
from frigate.comms.event_metadata_updater import ( from frigate.comms.event_metadata_updater import (
EventMetadataPublisher, EventMetadataPublisher,
) )
@ -67,6 +67,7 @@ def create_fastapi_app(
app = FastAPI( app = FastAPI(
debug=False, debug=False,
swagger_ui_parameters={"apisSorter": "alpha", "operationsSorter": "alpha"}, swagger_ui_parameters={"apisSorter": "alpha", "operationsSorter": "alpha"},
dependencies=[Depends(require_admin_by_default())],
) )
# update the request_address with the x-forwarded-for header from nginx # update the request_address with the x-forwarded-for header from nginx