diff --git a/frigate/api/auth.py b/frigate/api/auth.py index dd35219eb..26fb5dd54 100644 --- a/frigate/api/auth.py +++ b/frigate/api/auth.py @@ -8,7 +8,6 @@ import logging import os import re import secrets -import stat import time from datetime import datetime from pathlib import Path @@ -312,8 +311,9 @@ def get_jwt_secret() -> str: ) jwt_secret = secrets.token_hex(64) try: - # Use os.open to create file with restrictive permissions (0o600: read/write for owner only) - fd = os.open(jwt_secret_file, os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o600) + fd = os.open( + jwt_secret_file, os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o600 + ) with os.fdopen(fd, "w") as f: f.write(str(jwt_secret)) except Exception: @@ -359,9 +359,35 @@ def verify_password(password, password_hash): return secrets.compare_digest(password_hash, compare_hash) +def validate_password_strength(password: str) -> tuple[bool, Optional[str]]: + """ + Validate password strength. + + Returns a tuple of (is_valid, error_message). + """ + if not password: + return False, "Password cannot be empty" + + if len(password) < 8: + return False, "Password must be at least 8 characters long" + + if not any(c.isupper() for c in password): + return False, "Password must contain at least one uppercase letter" + + if not any(c.isdigit() for c in password): + return False, "Password must contain at least one digit" + + if not any(c in '!@#$%^&*(),.?":{}|<>' for c in password): + return False, "Password must contain at least one special character" + + return True, None + + def create_encoded_jwt(user, role, expiration, secret): return jwt.encode( - {"alg": "HS256"}, {"sub": user, "role": role, "exp": expiration}, secret + {"alg": "HS256"}, + {"sub": user, "role": role, "exp": expiration, "iat": int(time.time())}, + secret, ) @@ -621,14 +647,27 @@ def auth(request: Request): logger.debug("jwt token expired") return fail_response + # Check if password has been changed after token was issued + try: + user_obj = User.get_by_id(user) + # Only invalidate token if password was changed after token was issued + if user_obj.password_changed_at is not None: + token_iat = int(token.claims.get("iat", 0)) + password_changed_timestamp = int( + user_obj.password_changed_at.timestamp() + ) + if token_iat < password_changed_timestamp: + logger.debug( + "jwt token issued before password change, invalidating token" + ) + return fail_response + except DoesNotExist: + logger.debug("user not found") + return fail_response + # if the jwt cookie is expiring soon - elif jwt_source == "cookie" and expiration - JWT_REFRESH <= current_time: + if jwt_source == "cookie" and expiration - JWT_REFRESH <= current_time: logger.debug("jwt token expiring soon, refreshing cookie") - # ensure the user hasn't been deleted - try: - User.get_by_id(user) - except DoesNotExist: - return fail_response new_expiration = current_time + JWT_SESSION_LENGTH new_encoded_jwt = create_encoded_jwt( user, role, new_expiration, request.app.jwt_token @@ -663,7 +702,7 @@ def profile(request: Request): ) -@router.get("/logout", dependencies=[Depends(allow_any_authenticated())]) +@router.get("/logout", dependencies=[Depends(allow_public())]) def logout(request: Request): auth_config: AuthConfig = request.app.frigate_config.auth response = RedirectResponse("/login", status_code=303) @@ -785,8 +824,60 @@ async def update_password( HASH_ITERATIONS = request.app.frigate_config.auth.hash_iterations + try: + user = User.get_by_id(username) + except DoesNotExist: + return JSONResponse(content={"message": "User not found"}, status_code=404) + + # Require old_password when: + # 1. Non-admin user is changing another user's password (admin only action) + # 2. Any user is changing their own password + is_changing_own_password = current_username == username + is_non_admin = current_role != "admin" + + if is_changing_own_password or is_non_admin: + if not body.old_password: + return JSONResponse( + content={"message": "Current password is required"}, + status_code=400, + ) + if not verify_password(body.old_password, user.password_hash): + return JSONResponse( + content={"message": "Current password is incorrect"}, + status_code=401, + ) + + # Validate new password strength + is_valid, error_message = validate_password_strength(body.password) + if not is_valid: + return JSONResponse( + content={"message": error_message}, + status_code=400, + ) + password_hash = hash_password(body.password, iterations=HASH_ITERATIONS) - User.set_by_id(username, {User.password_hash: password_hash}) + User.update( + { + User.password_hash: password_hash, + User.password_changed_at: datetime.now(), + } + ).where(User.username == username).execute() + + # If user changed their own password, issue a new JWT to keep them logged in + if current_username == username: + JWT_COOKIE_NAME = request.app.frigate_config.auth.cookie_name + JWT_COOKIE_SECURE = request.app.frigate_config.auth.cookie_secure + JWT_SESSION_LENGTH = request.app.frigate_config.auth.session_length + + expiration = int(time.time()) + JWT_SESSION_LENGTH + encoded_jwt = create_encoded_jwt( + username, current_role, expiration, request.app.jwt_token + ) + response = JSONResponse(content={"success": True}) + set_jwt_cookie( + response, JWT_COOKIE_NAME, encoded_jwt, expiration, JWT_COOKIE_SECURE + ) + return response return JSONResponse(content={"success": True})