diff --git a/frigate/app.py b/frigate/app.py index 7f51fac1b..30c16ecc8 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -17,6 +17,7 @@ import frigate.util as util from frigate.api.auth import hash_password from frigate.api.fastapi_app import create_fastapi_app from frigate.camera import CameraMetrics, PTZMetrics +from frigate.comms.async_webpush_client import AsyncWebPushClient from frigate.comms.base_communicator import Communicator from frigate.comms.config_updater import ConfigPublisher from frigate.comms.dispatcher import Dispatcher @@ -26,7 +27,6 @@ from frigate.comms.event_metadata_updater import ( ) from frigate.comms.inter_process import InterProcessCommunicator from frigate.comms.mqtt import MqttClient -from frigate.comms.webpush import WebPushClient from frigate.comms.ws import WebSocketClient from frigate.comms.zmq_proxy import ZmqProxy from frigate.config.config import FrigateConfig @@ -309,7 +309,7 @@ class FrigateApp: ] if notification_cameras: - comms.append(WebPushClient(self.config)) + comms.append(AsyncWebPushClient(self.config, self.stop_event)) comms.append(WebSocketClient(self.config)) comms.append(self.inter_process_communicator) diff --git a/frigate/comms/async_webpush_client.py b/frigate/comms/async_webpush_client.py new file mode 100644 index 000000000..beefba266 --- /dev/null +++ b/frigate/comms/async_webpush_client.py @@ -0,0 +1,111 @@ +import json +import logging +import queue +import threading +from dataclasses import dataclass +from multiprocessing.synchronize import Event as MpEvent +from typing import Any + +from webpush import WebPushClient + +from frigate.config import FrigateConfig + +logger = logging.getLogger(__name__) + + +@dataclass +class PushNotification: + user: str + payload: dict[str, Any] + title: str + message: str + direct_url: str = "" + image: str = "" + notification_type: str = "alert" + ttl: int = 0 + + +class AsyncWebPushClient(WebPushClient): + def __init__(self, config: FrigateConfig, stop_event: MpEvent) -> None: + super().__init__(config) + self.notification_queue: queue.Queue[PushNotification] = queue.Queue() + self.notification_thread = threading.Thread( + target=self._process_notifications, daemon=True + ) + self.notification_thread.start() + self.stop_event = stop_event + + def send_push_notification( + self, + user: str, + payload: dict[str, Any], + title: str, + message: str, + direct_url: str = "", + image: str = "", + notification_type: str = "alert", + ttl: int = 0, + ) -> None: + notification = PushNotification( + user=user, + payload=payload, + title=title, + message=message, + direct_url=direct_url, + image=image, + notification_type=notification_type, + ttl=ttl, + ) + self.notification_queue.put(notification) + + def _process_notifications(self) -> None: + while not self.stop_event.is_set(): + try: + notification = self.notification_queue.get(timeout=1.0) + self.check_registrations() + + for pusher in self.web_pushers[notification.user]: + endpoint = pusher.subscription_info["endpoint"] + headers = self.claim_headers[ + endpoint[: endpoint.index("/", 10)] + ].copy() + headers["urgency"] = "high" + + resp = pusher.send( + headers=headers, + ttl=notification.ttl, + data=json.dumps( + { + "title": notification.title, + "message": notification.message, + "direct_url": notification.direct_url, + "image": notification.image, + "id": notification.payload.get("after", {}).get( + "id", "" + ), + "type": notification.notification_type, + } + ), + timeout=10, + ) + + if resp.status_code in (404, 410): + self.expired_subs.setdefault(notification.user, []).append( + endpoint + ) + elif resp.status_code != 201: + logger.warning( + f"Failed to send notification to {notification.user} :: {resp.status_code}" + ) + + self.cleanup_registrations() + self.notification_queue.task_done() + + except queue.Empty: + continue + except Exception as e: + logger.error(f"Error processing notification: {str(e)}") + + def stop(self) -> None: + self.notification_thread.join() + super().stop()