use thread and queue for processing notifications with 10s timeout

This commit is contained in:
Josh Hawkins 2025-01-02 14:08:18 -06:00
parent e13d4a9aba
commit 5568151053
2 changed files with 113 additions and 2 deletions

View File

@ -17,6 +17,7 @@ import frigate.util as util
from frigate.api.auth import hash_password from frigate.api.auth import hash_password
from frigate.api.fastapi_app import create_fastapi_app from frigate.api.fastapi_app import create_fastapi_app
from frigate.camera import CameraMetrics, PTZMetrics from frigate.camera import CameraMetrics, PTZMetrics
from frigate.comms.async_webpush_client import AsyncWebPushClient
from frigate.comms.base_communicator import Communicator from frigate.comms.base_communicator import Communicator
from frigate.comms.config_updater import ConfigPublisher from frigate.comms.config_updater import ConfigPublisher
from frigate.comms.dispatcher import Dispatcher from frigate.comms.dispatcher import Dispatcher
@ -26,7 +27,6 @@ from frigate.comms.event_metadata_updater import (
) )
from frigate.comms.inter_process import InterProcessCommunicator from frigate.comms.inter_process import InterProcessCommunicator
from frigate.comms.mqtt import MqttClient from frigate.comms.mqtt import MqttClient
from frigate.comms.webpush import WebPushClient
from frigate.comms.ws import WebSocketClient from frigate.comms.ws import WebSocketClient
from frigate.comms.zmq_proxy import ZmqProxy from frigate.comms.zmq_proxy import ZmqProxy
from frigate.config.config import FrigateConfig from frigate.config.config import FrigateConfig
@ -309,7 +309,7 @@ class FrigateApp:
] ]
if notification_cameras: if notification_cameras:
comms.append(WebPushClient(self.config)) comms.append(AsyncWebPushClient(self.config, self.stop_event))
comms.append(WebSocketClient(self.config)) comms.append(WebSocketClient(self.config))
comms.append(self.inter_process_communicator) comms.append(self.inter_process_communicator)

View File

@ -0,0 +1,111 @@
import json
import logging
import queue
import threading
from dataclasses import dataclass
from multiprocessing.synchronize import Event as MpEvent
from typing import Any
from webpush import WebPushClient
from frigate.config import FrigateConfig
logger = logging.getLogger(__name__)
@dataclass
class PushNotification:
user: str
payload: dict[str, Any]
title: str
message: str
direct_url: str = ""
image: str = ""
notification_type: str = "alert"
ttl: int = 0
class AsyncWebPushClient(WebPushClient):
def __init__(self, config: FrigateConfig, stop_event: MpEvent) -> None:
super().__init__(config)
self.notification_queue: queue.Queue[PushNotification] = queue.Queue()
self.notification_thread = threading.Thread(
target=self._process_notifications, daemon=True
)
self.notification_thread.start()
self.stop_event = stop_event
def send_push_notification(
self,
user: str,
payload: dict[str, Any],
title: str,
message: str,
direct_url: str = "",
image: str = "",
notification_type: str = "alert",
ttl: int = 0,
) -> None:
notification = PushNotification(
user=user,
payload=payload,
title=title,
message=message,
direct_url=direct_url,
image=image,
notification_type=notification_type,
ttl=ttl,
)
self.notification_queue.put(notification)
def _process_notifications(self) -> None:
while not self.stop_event.is_set():
try:
notification = self.notification_queue.get(timeout=1.0)
self.check_registrations()
for pusher in self.web_pushers[notification.user]:
endpoint = pusher.subscription_info["endpoint"]
headers = self.claim_headers[
endpoint[: endpoint.index("/", 10)]
].copy()
headers["urgency"] = "high"
resp = pusher.send(
headers=headers,
ttl=notification.ttl,
data=json.dumps(
{
"title": notification.title,
"message": notification.message,
"direct_url": notification.direct_url,
"image": notification.image,
"id": notification.payload.get("after", {}).get(
"id", ""
),
"type": notification.notification_type,
}
),
timeout=10,
)
if resp.status_code in (404, 410):
self.expired_subs.setdefault(notification.user, []).append(
endpoint
)
elif resp.status_code != 201:
logger.warning(
f"Failed to send notification to {notification.user} :: {resp.status_code}"
)
self.cleanup_registrations()
self.notification_queue.task_done()
except queue.Empty:
continue
except Exception as e:
logger.error(f"Error processing notification: {str(e)}")
def stop(self) -> None:
self.notification_thread.join()
super().stop()