frigate/frigate/api/notification.py
Josh Hawkins eb3d0ddfb6 notification fixes
the pubkey was not being returned if notifications was not enabled at the global level
2025-08-07 13:55:51 -06:00

72 lines
2.3 KiB
Python

"""Notification apis."""
import logging
import os
from typing import Any
from cryptography.hazmat.primitives import serialization
from fastapi import APIRouter, Request
from fastapi.responses import JSONResponse
from peewee import DoesNotExist
from py_vapid import Vapid01, utils
from frigate.api.defs.tags import Tags
from frigate.const import CONFIG_DIR
from frigate.models import User
logger = logging.getLogger(__name__)
router = APIRouter(tags=[Tags.notifications])
@router.get("/notifications/pubkey")
def get_vapid_pub_key(request: Request):
config = request.app.frigate_config
notifications_enabled = config.notifications.enabled
camera_notifications_enabled = [
c for c in config.cameras.values() if c.enabled and c.notifications.enabled
]
if not (notifications_enabled or camera_notifications_enabled):
return JSONResponse(
content=({"success": False, "message": "Notifications are not enabled."}),
status_code=400,
)
key = Vapid01.from_file(os.path.join(CONFIG_DIR, "notifications.pem"))
raw_pub = key.public_key.public_bytes(
serialization.Encoding.X962, serialization.PublicFormat.UncompressedPoint
)
return JSONResponse(content=utils.b64urlencode(raw_pub), status_code=200)
@router.post("/notifications/register")
def register_notifications(request: Request, body: dict = None):
if request.app.frigate_config.auth.enabled:
# FIXME: For FastAPI the remote-user is not being populated
username = request.headers.get("remote-user") or "admin"
else:
username = "admin"
json: dict[str, Any] = body or {}
sub = json.get("sub")
if not sub:
return JSONResponse(
content={"success": False, "message": "Subscription must be provided."},
status_code=400,
)
try:
User.update(notification_tokens=User.notification_tokens.append(sub)).where(
User.username == username
).execute()
return JSONResponse(
content=({"success": True, "message": "Successfully saved token."}),
status_code=200,
)
except DoesNotExist:
return JSONResponse(
content=({"success": False, "message": "Could not find user."}),
status_code=404,
)