Convert auth endpoints to FastAPI

This commit is contained in:
Rui Alves 2024-09-19 08:41:59 +01:00
parent 11186b4e70
commit 655d24a653
5 changed files with 94 additions and 76 deletions

View File

@ -14,13 +14,13 @@ from fastapi import APIRouter, Path, Request, Response
from fastapi.encoders import jsonable_encoder
from fastapi.params import Depends
from fastapi.responses import JSONResponse
from flask import Blueprint, Flask, jsonify, request
from flask import Flask, jsonify, request
from markupsafe import escape
from peewee import operator
from playhouse.sqliteq import SqliteQueueDatabase
from werkzeug.middleware.proxy_fix import ProxyFix
from frigate.api.auth import AuthBp, get_jwt_secret, limiter
from frigate.api.auth import get_jwt_secret, limiter
from frigate.api.defs.app_body import AppConfigSetBody
from frigate.api.defs.app_query_parameters import AppTimelineHourlyQueryParameters
from frigate.api.defs.tags import Tags
@ -44,9 +44,6 @@ from frigate.version import VERSION
logger = logging.getLogger(__name__)
bp = Blueprint("frigate", __name__)
bp.register_blueprint(AuthBp)
router = APIRouter(tags=[Tags.app])
@ -97,8 +94,6 @@ def create_app(
if frigate_config.auth.failed_login_rate_limit is None:
limiter.enabled = False
app.register_blueprint(bp)
return app

View File

@ -12,25 +12,31 @@ import time
from datetime import datetime
from pathlib import Path
from flask import Blueprint, current_app, jsonify, make_response, redirect, request
from flask_limiter import Limiter
from fastapi import APIRouter, Request, Response
from fastapi.responses import JSONResponse, RedirectResponse
from joserfc import jwt
from peewee import DoesNotExist
from frigate.api.defs.app_body import (
AppPostLoginBody,
AppPostUsersBody,
AppPutPasswordBody,
)
from frigate.api.defs.tags import Tags
from frigate.config import AuthConfig, ProxyConfig
from frigate.const import CONFIG_DIR, JWT_SECRET_ENV_VAR, PASSWORD_HASH_ALGORITHM
from frigate.models import User
logger = logging.getLogger(__name__)
AuthBp = Blueprint("auth", __name__)
router = APIRouter(tags=[Tags.auth])
def get_remote_addr():
def get_remote_addr(request: Request):
route = list(reversed(request.headers.get("x-forwarded-for").split(",")))
logger.debug(f"IP Route: {[r for r in route]}")
trusted_proxies = []
for proxy in current_app.frigate_config.auth.trusted_proxies:
for proxy in request.app.frigate_config.auth.trusted_proxies:
try:
network = ipaddress.ip_network(proxy)
except ValueError:
@ -68,14 +74,15 @@ def get_remote_addr():
return request.remote_addr or "127.0.0.1"
limiter = Limiter(
get_remote_addr,
storage_uri="memory://",
)
# TODO: Rui
# limiter = Limiter(
# get_remote_addr,
# storage_uri="memory://",
# )
def get_rate_limit():
return current_app.frigate_config.auth.failed_login_rate_limit
def get_rate_limit(request: Request):
return request.app.frigate_config.auth.failed_login_rate_limit
def get_jwt_secret() -> str:
@ -132,7 +139,7 @@ def get_jwt_secret() -> str:
return jwt_secret
def hash_password(password, salt=None, iterations=600000):
def hash_password(password: str, salt=None, iterations=600000):
if salt is None:
salt = secrets.token_hex(16)
assert salt and isinstance(salt, str) and "$" not in salt
@ -158,27 +165,31 @@ def create_encoded_jwt(user, expiration, secret):
return jwt.encode({"alg": "HS256"}, {"sub": user, "exp": expiration}, secret)
def set_jwt_cookie(response, cookie_name, encoded_jwt, expiration, secure):
def set_jwt_cookie(response: Response, cookie_name, encoded_jwt, expiration, secure):
# TODO: ideally this would set secure as well, but that requires TLS
response.set_cookie(
cookie_name, encoded_jwt, httponly=True, expires=expiration, secure=secure
key=cookie_name,
value=encoded_jwt,
httponly=True,
expires=expiration,
secure=secure,
)
# Endpoint for use with nginx auth_request
@AuthBp.route("/auth")
def auth():
auth_config: AuthConfig = current_app.frigate_config.auth
proxy_config: ProxyConfig = current_app.frigate_config.proxy
@router.get("/auth")
def auth(request: Request):
auth_config: AuthConfig = request.app.frigate_config.auth
proxy_config: ProxyConfig = request.app.frigate_config.proxy
success_response = make_response({}, 202)
success_response = Response(content={}, status_code=202)
# dont require auth if the request is on the internal port
# this header is set by Frigate's nginx proxy, so it cant be spoofed
if request.headers.get("x-server-port", 0, type=int) == 5000:
return success_response
fail_response = make_response({}, 401)
fail_response = Response(content={}, status_code=401)
# ensure the proxy secret matches if configured
if (
@ -207,10 +218,10 @@ def auth():
# now apply authentication
fail_response.headers["location"] = "/login"
JWT_COOKIE_NAME = current_app.frigate_config.auth.cookie_name
JWT_COOKIE_SECURE = current_app.frigate_config.auth.cookie_secure
JWT_REFRESH = current_app.frigate_config.auth.refresh_time
JWT_SESSION_LENGTH = current_app.frigate_config.auth.session_length
JWT_COOKIE_NAME = request.app.frigate_config.auth.cookie_name
JWT_COOKIE_SECURE = request.app.frigate_config.auth.cookie_secure
JWT_REFRESH = request.app.frigate_config.auth.refresh_time
JWT_SESSION_LENGTH = request.app.frigate_config.auth.session_length
jwt_source = None
encoded_token = None
@ -230,7 +241,7 @@ def auth():
return fail_response
try:
token = jwt.decode(encoded_token, current_app.jwt_token)
token = jwt.decode(encoded_token, request.app.jwt_token)
if "sub" not in token.claims:
logger.debug("user not set in jwt token")
return fail_response
@ -266,7 +277,7 @@ def auth():
return fail_response
new_expiration = current_time + JWT_SESSION_LENGTH
new_encoded_jwt = create_encoded_jwt(
user, new_expiration, current_app.jwt_token
user, new_expiration, request.app.jwt_token
)
set_jwt_cookie(
success_response,
@ -283,86 +294,82 @@ def auth():
return fail_response
@AuthBp.route("/profile")
def profile():
username = request.headers.get("remote-user", type=str)
return jsonify({"username": username})
@router.get("/profile")
def profile(request: Request):
username = request.headers.get("remote-user")
return JSONResponse(content={"username": username})
@AuthBp.route("/logout")
def logout():
auth_config: AuthConfig = current_app.frigate_config.auth
response = make_response(redirect("/login", code=303))
@router.get("/logout")
def logout(request: Request):
auth_config: AuthConfig = request.app.frigate_config.auth
response = RedirectResponse("/login", status_code=303)
response.delete_cookie(auth_config.cookie_name)
return response
@AuthBp.route("/login", methods=["POST"])
@limiter.limit(get_rate_limit, deduct_when=lambda response: response.status_code == 400)
def login():
JWT_COOKIE_NAME = current_app.frigate_config.auth.cookie_name
JWT_COOKIE_SECURE = current_app.frigate_config.auth.cookie_secure
JWT_SESSION_LENGTH = current_app.frigate_config.auth.session_length
content = request.get_json()
user = content["user"]
password = content["password"]
@router.post("/login")
# TODO: Rui Implement limiter for FastAPI
# @limiter.limit(get_rate_limit, deduct_when=lambda response: response.status_code == 400)
def login(request: Request, body: AppPostLoginBody):
JWT_COOKIE_NAME = request.app.frigate_config.auth.cookie_name
JWT_COOKIE_SECURE = request.app.frigate_config.auth.cookie_secure
JWT_SESSION_LENGTH = request.app.frigate_config.auth.session_length
user = body.user
password = body.password
try:
db_user: User = User.get_by_id(user)
except DoesNotExist:
return make_response({"message": "Login failed"}, 400)
return JSONResponse(content={"message": "Login failed"}, status_code=400)
password_hash = db_user.password_hash
if verify_password(password, password_hash):
expiration = int(time.time()) + JWT_SESSION_LENGTH
encoded_jwt = create_encoded_jwt(user, expiration, current_app.jwt_token)
response = make_response({}, 200)
encoded_jwt = create_encoded_jwt(user, expiration, request.app.jwt_token)
response = Response({}, 200)
set_jwt_cookie(
response, JWT_COOKIE_NAME, encoded_jwt, expiration, JWT_COOKIE_SECURE
)
return response
return make_response({"message": "Login failed"}, 400)
return JSONResponse(content={"message": "Login failed"}, status_code=400)
@AuthBp.route("/users")
@router.get("/users")
def get_users():
exports = User.select(User.username).order_by(User.username).dicts().iterator()
return jsonify([e for e in exports])
return JSONResponse([e for e in exports])
@AuthBp.route("/users", methods=["POST"])
def create_user():
HASH_ITERATIONS = current_app.frigate_config.auth.hash_iterations
@router.post("/users")
def create_user(request: Request, body: AppPostUsersBody):
HASH_ITERATIONS = request.app.frigate_config.auth.hash_iterations
request_data = request.get_json()
if not re.match("^[A-Za-z0-9._]+$", body.username):
JSONResponse(content={"message": "Invalid username"}, status_code=400)
if not re.match("^[A-Za-z0-9._]+$", request_data.get("username", "")):
make_response({"message": "Invalid username"}, 400)
password_hash = hash_password(request_data["password"], iterations=HASH_ITERATIONS)
password_hash = hash_password(body.password, iterations=HASH_ITERATIONS)
User.insert(
{
User.username: request_data["username"],
User.username: body.username,
User.password_hash: password_hash,
}
).execute()
return jsonify({"username": request_data["username"]})
return JSONResponse(content={"username": body.username})
@AuthBp.route("/users/<username>", methods=["DELETE"])
@router.delete("/users/{username}")
def delete_user(username: str):
User.delete_by_id(username)
return jsonify({"success": True})
return JSONResponse(content={"success": True})
@AuthBp.route("/users/<username>/password", methods=["PUT"])
def update_password(username: str):
HASH_ITERATIONS = current_app.frigate_config.auth.hash_iterations
@router.put("/users/{username}/password")
def update_password(request: Request, username: str, body: AppPutPasswordBody):
HASH_ITERATIONS = request.app.frigate_config.auth.hash_iterations
request_data = request.get_json()
password_hash = hash_password(request_data["password"], iterations=HASH_ITERATIONS)
password_hash = hash_password(body.password, iterations=HASH_ITERATIONS)
User.set_by_id(
username,
@ -370,4 +377,4 @@ def update_password(username: str):
User.password_hash: password_hash,
},
)
return jsonify({"success": True})
return JSONResponse(content={"success": True})

View File

@ -3,3 +3,15 @@ from pydantic import BaseModel
class AppConfigSetBody(BaseModel):
requires_restart: int = 1
class AppPutPasswordBody(BaseModel):
password: str
class AppPostUsersBody(BaseModel):
username: str
password: str
class AppPostLoginBody(BaseModel):
user: str
password: str

View File

@ -10,3 +10,4 @@ class Tags(Enum):
review = "Review"
export = "Export"
events = "Events"
auth = "Auth"

View File

@ -4,7 +4,8 @@ from typing import Optional
from fastapi import FastAPI
from frigate.api import app as main_app
from frigate.api import event, export, media, notification, preview, review
from frigate.api import auth, event, export, media, notification, preview, review
from frigate.api.auth import get_jwt_secret
from frigate.embeddings import EmbeddingsContext
from frigate.events.external import ExternalEventProcessor
from frigate.plus import PlusApi
@ -38,6 +39,7 @@ def create_fastapi_app(
app.include_router(review.router)
app.include_router(export.router)
app.include_router(event.router)
app.include_router(auth.router)
# App Properties
app.frigate_config = frigate_config
app.embeddings = embeddings
@ -48,5 +50,6 @@ def create_fastapi_app(
app.plus_api = plus_api
app.stats_emitter = stats_emitter
app.external_processor = external_processor
app.jwt_token = get_jwt_secret() if frigate_config.auth.enabled else None
return app