move async code to main class

This commit is contained in:
Josh Hawkins 2025-01-03 19:29:22 -06:00
parent a69c82c4f4
commit 6d1e08cc8d
3 changed files with 80 additions and 136 deletions

View File

@ -17,7 +17,6 @@ import frigate.util as util
from frigate.api.auth import hash_password from frigate.api.auth import hash_password
from frigate.api.fastapi_app import create_fastapi_app from frigate.api.fastapi_app import create_fastapi_app
from frigate.camera import CameraMetrics, PTZMetrics from frigate.camera import CameraMetrics, PTZMetrics
from frigate.comms.async_webpush_client import AsyncWebPushClient
from frigate.comms.base_communicator import Communicator from frigate.comms.base_communicator import Communicator
from frigate.comms.config_updater import ConfigPublisher from frigate.comms.config_updater import ConfigPublisher
from frigate.comms.dispatcher import Dispatcher from frigate.comms.dispatcher import Dispatcher
@ -27,6 +26,7 @@ from frigate.comms.event_metadata_updater import (
) )
from frigate.comms.inter_process import InterProcessCommunicator from frigate.comms.inter_process import InterProcessCommunicator
from frigate.comms.mqtt import MqttClient from frigate.comms.mqtt import MqttClient
from frigate.comms.webpush import WebPushClient
from frigate.comms.ws import WebSocketClient from frigate.comms.ws import WebSocketClient
from frigate.comms.zmq_proxy import ZmqProxy from frigate.comms.zmq_proxy import ZmqProxy
from frigate.config.config import FrigateConfig from frigate.config.config import FrigateConfig
@ -309,7 +309,7 @@ class FrigateApp:
] ]
if notification_cameras: if notification_cameras:
comms.append(AsyncWebPushClient(self.config, self.stop_event)) comms.append(WebPushClient(self.config, self.stop_event))
comms.append(WebSocketClient(self.config)) comms.append(WebSocketClient(self.config))
comms.append(self.inter_process_communicator) comms.append(self.inter_process_communicator)

View File

@ -1,108 +0,0 @@
import json
import logging
import queue
import threading
from dataclasses import dataclass
from multiprocessing.synchronize import Event as MpEvent
from typing import Any
from webpush import WebPushClient
from frigate.config import FrigateConfig
logger = logging.getLogger(__name__)
@dataclass
class PushNotification:
user: str
payload: dict[str, Any]
title: str
message: str
direct_url: str = ""
image: str = ""
notification_type: str = "alert"
ttl: int = 0
class AsyncWebPushClient(WebPushClient):
def __init__(self, config: FrigateConfig, stop_event: MpEvent) -> None:
super().__init__(config)
self.notification_queue: queue.Queue[PushNotification] = queue.Queue()
self.notification_thread = threading.Thread(
target=self._process_notifications, daemon=True
)
self.notification_thread.start()
self.stop_event = stop_event
def send_push_notification(
self,
user: str,
payload: dict[str, Any],
title: str,
message: str,
direct_url: str = "",
image: str = "",
notification_type: str = "alert",
ttl: int = 0,
) -> None:
notification = PushNotification(
user=user,
payload=payload,
title=title,
message=message,
direct_url=direct_url,
image=image,
notification_type=notification_type,
ttl=ttl,
)
self.notification_queue.put(notification)
def _process_notifications(self) -> None:
while not self.stop_event.is_set():
try:
notification = self.notification_queue.get(timeout=1.0)
self.check_registrations()
for pusher in self.web_pushers[notification.user]:
endpoint = pusher.subscription_info["endpoint"]
headers = self.claim_headers[
endpoint[: endpoint.index("/", 10)]
].copy()
headers["urgency"] = "high"
resp = pusher.send(
headers=headers,
ttl=notification.ttl,
data=json.dumps(
{
"title": notification.title,
"message": notification.message,
"direct_url": notification.direct_url,
"image": notification.image,
"id": notification.payload.get("after", {}).get(
"id", ""
),
"type": notification.notification_type,
}
),
timeout=10,
)
if resp.status_code in (404, 410):
self.expired_subs.setdefault(notification.user, []).append(
endpoint
)
elif resp.status_code != 201:
logger.warning(
f"Failed to send notification to {notification.user} :: {resp.status_code}"
)
except queue.Empty:
continue
except Exception as e:
logger.error(f"Error processing notification: {str(e)}")
def stop(self) -> None:
self.notification_thread.join()
super().stop()

View File

@ -4,6 +4,10 @@ import datetime
import json import json
import logging import logging
import os import os
import queue
import threading
from dataclasses import dataclass
from multiprocessing.synchronize import Event as MpEvent
from typing import Any, Callable from typing import Any, Callable
from py_vapid import Vapid01 from py_vapid import Vapid01
@ -18,10 +22,22 @@ from frigate.models import User
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@dataclass
class PushNotification:
user: str
payload: dict[str, Any]
title: str
message: str
direct_url: str = ""
image: str = ""
notification_type: str = "alert"
ttl: int = 0
class WebPushClient(Communicator): # type: ignore[misc] class WebPushClient(Communicator): # type: ignore[misc]
"""Frigate wrapper for webpush client.""" """Frigate wrapper for webpush client."""
def __init__(self, config: FrigateConfig) -> None: def __init__(self, config: FrigateConfig, stop_event: MpEvent) -> None:
self.config = config self.config = config
self.claim_headers: dict[str, dict[str, str]] = {} self.claim_headers: dict[str, dict[str, str]] = {}
self.refresh: int = 0 self.refresh: int = 0
@ -30,6 +46,12 @@ class WebPushClient(Communicator): # type: ignore[misc]
self.suspended_cameras: dict[str, int] = { self.suspended_cameras: dict[str, int] = {
c.name: 0 for c in self.config.cameras.values() c.name: 0 for c in self.config.cameras.values()
} }
self.notification_queue: queue.Queue[PushNotification] = queue.Queue()
self.notification_thread = threading.Thread(
target=self._process_notifications, daemon=True
)
self.notification_thread.start()
self.stop_event = stop_event
if not self.config.notifications.email: if not self.config.notifications.email:
logger.warning("Email must be provided for push notifications to be sent.") logger.warning("Email must be provided for push notifications to be sent.")
@ -162,32 +184,62 @@ class WebPushClient(Communicator): # type: ignore[misc]
notification_type: str = "alert", notification_type: str = "alert",
ttl: int = 0, ttl: int = 0,
) -> None: ) -> None:
for pusher in self.web_pushers[user]: notification = PushNotification(
endpoint = pusher.subscription_info["endpoint"] user=user,
headers = self.claim_headers[endpoint[: endpoint.index("/", 10)]].copy() payload=payload,
headers["urgency"] = "high" title=title,
message=message,
direct_url=direct_url,
image=image,
notification_type=notification_type,
ttl=ttl,
)
self.notification_queue.put(notification)
resp = pusher.send( def _process_notifications(self) -> None:
headers=headers, while not self.stop_event.is_set():
ttl=ttl, try:
data=json.dumps( notification = self.notification_queue.get(timeout=1.0)
{ self.check_registrations()
"title": title,
"message": message,
"direct_url": direct_url,
"image": image,
"id": payload.get("after", {}).get("id", ""),
"type": notification_type,
}
),
)
if resp.status_code in (404, 410): for pusher in self.web_pushers[notification.user]:
self.expired_subs.setdefault(user, []).append(endpoint) endpoint = pusher.subscription_info["endpoint"]
elif resp.status_code != 201: headers = self.claim_headers[
logger.warning( endpoint[: endpoint.index("/", 10)]
f"Failed to send notification to {user} :: {resp.headers}" ].copy()
) headers["urgency"] = "high"
resp = pusher.send(
headers=headers,
ttl=notification.ttl,
data=json.dumps(
{
"title": notification.title,
"message": notification.message,
"direct_url": notification.direct_url,
"image": notification.image,
"id": notification.payload.get("after", {}).get(
"id", ""
),
"type": notification.notification_type,
}
),
timeout=10,
)
if resp.status_code in (404, 410):
self.expired_subs.setdefault(notification.user, []).append(
endpoint
)
elif resp.status_code != 201:
logger.warning(
f"Failed to send notification to {notification.user} :: {resp.status_code}"
)
except queue.Empty:
continue
except Exception as e:
logger.error(f"Error processing notification: {str(e)}")
def send_notification_test(self) -> None: def send_notification_test(self) -> None:
if not self.config.notifications.email: if not self.config.notifications.email:
@ -257,4 +309,4 @@ class WebPushClient(Communicator): # type: ignore[misc]
self.cleanup_registrations() self.cleanup_registrations()
def stop(self) -> None: def stop(self) -> None:
pass self.notification_thread.join()