implement JWT actual secret

This commit is contained in:
Blake Blackshear 2024-05-06 06:57:02 -05:00
parent 38f25548b1
commit bf35d16a89
3 changed files with 76 additions and 15 deletions

View File

@ -14,7 +14,7 @@ from markupsafe import escape
from peewee import operator
from playhouse.sqliteq import SqliteQueueDatabase
from frigate.api.auth import AuthBp
from frigate.api.auth import AuthBp, get_jwt_secret
from frigate.api.event import EventBp
from frigate.api.export import ExportBp
from frigate.api.media import MediaBp
@ -85,6 +85,7 @@ def create_app(
app.plus_api = plus_api
app.camera_error_image = None
app.stats_emitter = stats_emitter
app.jwt_token = get_jwt_secret() if frigate_config.auth.enabled else None
app.register_blueprint(bp)

View File

@ -2,21 +2,78 @@
import base64
import hashlib
import json
import logging
import os
import secrets
import time
from datetime import datetime
from pathlib import Path
from flask import Blueprint, current_app, make_response, request
from joserfc import jwt
from frigate.const import CONFIG_DIR, JWT_SECRET_ENV_VAR, PASSWORD_HASH_ALGORITHM
logger = logging.getLogger(__name__)
AuthBp = Blueprint("auth", __name__)
ALGORITHM = "pbkdf2_sha256"
JWT_SECRET = "secret"
def get_jwt_secret() -> str:
jwt_secret = None
# check env var
if JWT_SECRET_ENV_VAR in os.environ:
logger.debug(
f"Using jwt secret from {JWT_SECRET_ENV_VAR} environment variable."
)
jwt_secret = os.environ.get(JWT_SECRET_ENV_VAR)
# check docker secrets
elif (
os.path.isdir("/run/secrets")
and os.access("/run/secrets", os.R_OK)
and JWT_SECRET_ENV_VAR in os.listdir("/run/secrets")
):
logger.debug(f"Using jwt secret from {JWT_SECRET_ENV_VAR} docker secret file.")
jwt_secret = Path(os.path.join("/run/secrets", JWT_SECRET_ENV_VAR)).read_text()
# check for the addon options file
elif os.path.isfile("/data/options.json"):
with open("/data/options.json") as f:
raw_options = f.read()
logger.debug("Using jwt secret from Home Assistant addon options file.")
options = json.loads(raw_options)
jwt_secret = options.get("jwt_secret")
if jwt_secret is None:
jwt_secret_file = os.path.join(CONFIG_DIR, ".jwt_secret")
# check .jwt_secrets file
if not os.path.isfile(jwt_secret_file):
logger.debug(
"No jwt secret found. Generating one and storing in .jwt_secret file in config directory."
)
jwt_secret = secrets.token_hex(64)
try:
with open(jwt_secret_file, "w") as f:
f.write(str(jwt_secret))
except Exception:
logger.warn(
"Unable to write jwt token file to config directory. A new jwt token will be created at each startup."
)
else:
logger.debug("Using jwt secret from .jwt_secret file in config directory.")
with open(jwt_secret_file) as f:
try:
jwt_secret = f.readline()
except Exception:
logger.warn(
"Unable to read jwt token from .jwt_secret file in config directory. A new jwt token will be created at each startup."
)
jwt_secret = secrets.token_hex(64)
if len(jwt_secret) < 64:
logger.warn("JWT Secret is recommended to be 64 characters or more")
return jwt_secret
def hash_password(password, salt=None, iterations=260000):
@ -28,7 +85,7 @@ def hash_password(password, salt=None, iterations=260000):
"sha256", password.encode("utf-8"), salt.encode("utf-8"), iterations
)
b64_hash = base64.b64encode(pw_hash).decode("ascii").strip()
return "{}${}${}${}".format(ALGORITHM, iterations, salt, b64_hash)
return "{}${}${}${}".format(PASSWORD_HASH_ALGORITHM, iterations, salt, b64_hash)
def verify_password(password, password_hash):
@ -36,7 +93,7 @@ def verify_password(password, password_hash):
return False
algorithm, iterations, salt, b64_hash = password_hash.split("$", 3)
iterations = int(iterations)
assert algorithm == ALGORITHM
assert algorithm == PASSWORD_HASH_ALGORITHM
compare_hash = hash_password(password, salt, iterations)
return secrets.compare_digest(password_hash, compare_hash)
@ -50,13 +107,11 @@ def set_jwt_cookie(response, cookie_name, encoded_jwt, expiration):
response.set_cookie(cookie_name, encoded_jwt, httponly=True, expires=expiration)
# TODO:
# - on startup, generate a signing secret for jwt if it doesn't exist and save as ".auth-token" in the config folder
# -
@AuthBp.route("/auth")
def auth():
if not current_app.frigate_config.auth.enabled:
return make_response({}, 202)
JWT_COOKIE_NAME = current_app.frigate_config.auth.cookie_name
JWT_REFRESH = current_app.frigate_config.auth.refresh_time
JWT_SESSION_LENGTH = current_app.frigate_config.auth.session_length
@ -81,7 +136,7 @@ def auth():
try:
response = make_response({}, 202)
token = jwt.decode(encoded_token, JWT_SECRET)
token = jwt.decode(encoded_token, current_app.jwt_token)
if "sub" not in token.claims:
logger.debug("user not set in jwt token")
return make_response({}, 401)
@ -111,7 +166,9 @@ def auth():
elif jwt_source == "cookie" and expiration - JWT_REFRESH <= current_time:
logger.debug("jwt token expiring soon, refreshing cookie")
new_expiration = current_time + JWT_SESSION_LENGTH
new_encoded_jwt = create_encoded_jwt(user, new_expiration, JWT_SECRET)
new_encoded_jwt = create_encoded_jwt(
user, new_expiration, current_app.jwt_token
)
set_jwt_cookie(response, JWT_COOKIE_NAME, new_encoded_jwt, new_expiration)
response.headers["remote-user"] = user
@ -120,8 +177,6 @@ def auth():
logger.error(f"Error parsing jwt: {e}")
return make_response({}, 401)
# grab the jwt token from the authorization header or a cookie
@AuthBp.route("/login", methods=["POST"])
def login():
@ -143,7 +198,7 @@ def login():
make_response({"message": "Login failed"}, 400)
if verify_password(password, password_hash):
expiration = int(time.time()) + JWT_SESSION_LENGTH
encoded_jwt = create_encoded_jwt(user, expiration, JWT_SECRET)
encoded_jwt = create_encoded_jwt(user, expiration, current_app.jwt_token)
response = make_response({}, 200)
set_jwt_cookie(response, JWT_COOKIE_NAME, encoded_jwt, expiration)
return response

View File

@ -91,3 +91,8 @@ AUTOTRACKING_MAX_MOVE_METRICS = 500
AUTOTRACKING_ZOOM_OUT_HYSTERESIS = 1.1
AUTOTRACKING_ZOOM_IN_HYSTERESIS = 0.95
AUTOTRACKING_ZOOM_EDGE_THRESHOLD = 0.05
# Auth
JWT_SECRET_ENV_VAR = "FRIGATE_JWT_SECRET"
PASSWORD_HASH_ALGORITHM = "pbkdf2_sha256"