auth api backend changes

- use os.open to create jwt secret with restrictive permissions (0o600: read/write for owner only)
- add backend validation for password strength
- add iat claim to jwt so the server can determine when a token was issued and reject any jwts issued before a user's password_changed_at timestamp, ensuring old tokens are invalidated after a password change
- set logout route to public to avoid 401 when logging out
- issue new jwt for users who change their own password so they stay logged in
This commit is contained in:
Josh Hawkins 2025-12-07 19:30:31 -06:00
parent 29fb4b9c07
commit c7322cde5e

View File

@ -8,7 +8,6 @@ import logging
import os import os
import re import re
import secrets import secrets
import stat
import time import time
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
@ -312,8 +311,9 @@ def get_jwt_secret() -> str:
) )
jwt_secret = secrets.token_hex(64) jwt_secret = secrets.token_hex(64)
try: try:
# Use os.open to create file with restrictive permissions (0o600: read/write for owner only) fd = os.open(
fd = os.open(jwt_secret_file, os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o600) jwt_secret_file, os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o600
)
with os.fdopen(fd, "w") as f: with os.fdopen(fd, "w") as f:
f.write(str(jwt_secret)) f.write(str(jwt_secret))
except Exception: except Exception:
@ -359,9 +359,35 @@ def verify_password(password, password_hash):
return secrets.compare_digest(password_hash, compare_hash) return secrets.compare_digest(password_hash, compare_hash)
def validate_password_strength(password: str) -> tuple[bool, Optional[str]]:
"""
Validate password strength.
Returns a tuple of (is_valid, error_message).
"""
if not password:
return False, "Password cannot be empty"
if len(password) < 8:
return False, "Password must be at least 8 characters long"
if not any(c.isupper() for c in password):
return False, "Password must contain at least one uppercase letter"
if not any(c.isdigit() for c in password):
return False, "Password must contain at least one digit"
if not any(c in '!@#$%^&*(),.?":{}|<>' for c in password):
return False, "Password must contain at least one special character"
return True, None
def create_encoded_jwt(user, role, expiration, secret): def create_encoded_jwt(user, role, expiration, secret):
return jwt.encode( return jwt.encode(
{"alg": "HS256"}, {"sub": user, "role": role, "exp": expiration}, secret {"alg": "HS256"},
{"sub": user, "role": role, "exp": expiration, "iat": int(time.time())},
secret,
) )
@ -621,14 +647,27 @@ def auth(request: Request):
logger.debug("jwt token expired") logger.debug("jwt token expired")
return fail_response return fail_response
# if the jwt cookie is expiring soon # Check if password has been changed after token was issued
elif jwt_source == "cookie" and expiration - JWT_REFRESH <= current_time:
logger.debug("jwt token expiring soon, refreshing cookie")
# ensure the user hasn't been deleted
try: try:
User.get_by_id(user) user_obj = User.get_by_id(user)
except DoesNotExist: # Only invalidate token if password was changed after token was issued
if user_obj.password_changed_at is not None:
token_iat = int(token.claims.get("iat", 0))
password_changed_timestamp = int(
user_obj.password_changed_at.timestamp()
)
if token_iat < password_changed_timestamp:
logger.debug(
"jwt token issued before password change, invalidating token"
)
return fail_response return fail_response
except DoesNotExist:
logger.debug("user not found")
return fail_response
# if the jwt cookie is expiring soon
if jwt_source == "cookie" and expiration - JWT_REFRESH <= current_time:
logger.debug("jwt token expiring soon, refreshing cookie")
new_expiration = current_time + JWT_SESSION_LENGTH new_expiration = current_time + JWT_SESSION_LENGTH
new_encoded_jwt = create_encoded_jwt( new_encoded_jwt = create_encoded_jwt(
user, role, new_expiration, request.app.jwt_token user, role, new_expiration, request.app.jwt_token
@ -663,7 +702,7 @@ def profile(request: Request):
) )
@router.get("/logout", dependencies=[Depends(allow_any_authenticated())]) @router.get("/logout", dependencies=[Depends(allow_public())])
def logout(request: Request): def logout(request: Request):
auth_config: AuthConfig = request.app.frigate_config.auth auth_config: AuthConfig = request.app.frigate_config.auth
response = RedirectResponse("/login", status_code=303) response = RedirectResponse("/login", status_code=303)
@ -785,8 +824,60 @@ async def update_password(
HASH_ITERATIONS = request.app.frigate_config.auth.hash_iterations HASH_ITERATIONS = request.app.frigate_config.auth.hash_iterations
try:
user = User.get_by_id(username)
except DoesNotExist:
return JSONResponse(content={"message": "User not found"}, status_code=404)
# Require old_password when:
# 1. Non-admin user is changing another user's password (admin only action)
# 2. Any user is changing their own password
is_changing_own_password = current_username == username
is_non_admin = current_role != "admin"
if is_changing_own_password or is_non_admin:
if not body.old_password:
return JSONResponse(
content={"message": "Current password is required"},
status_code=400,
)
if not verify_password(body.old_password, user.password_hash):
return JSONResponse(
content={"message": "Current password is incorrect"},
status_code=401,
)
# Validate new password strength
is_valid, error_message = validate_password_strength(body.password)
if not is_valid:
return JSONResponse(
content={"message": error_message},
status_code=400,
)
password_hash = hash_password(body.password, iterations=HASH_ITERATIONS) password_hash = hash_password(body.password, iterations=HASH_ITERATIONS)
User.set_by_id(username, {User.password_hash: password_hash}) User.update(
{
User.password_hash: password_hash,
User.password_changed_at: datetime.now(),
}
).where(User.username == username).execute()
# If user changed their own password, issue a new JWT to keep them logged in
if current_username == username:
JWT_COOKIE_NAME = request.app.frigate_config.auth.cookie_name
JWT_COOKIE_SECURE = request.app.frigate_config.auth.cookie_secure
JWT_SESSION_LENGTH = request.app.frigate_config.auth.session_length
expiration = int(time.time()) + JWT_SESSION_LENGTH
encoded_jwt = create_encoded_jwt(
username, current_role, expiration, request.app.jwt_token
)
response = JSONResponse(content={"success": True})
set_jwt_cookie(
response, JWT_COOKIE_NAME, encoded_jwt, expiration, JWT_COOKIE_SECURE
)
return response
return JSONResponse(content={"success": True}) return JSONResponse(content={"success": True})