Support multiple endpoints

This commit is contained in:
Nicolas Mowen 2024-07-20 16:50:49 -06:00
parent c92c8256b1
commit f6ba1ab6a8
2 changed files with 48 additions and 26 deletions

View File

@ -7,16 +7,16 @@ title: Notifications
Frigate offers native notifications using the [WebPush Protocol](https://web.dev/articles/push-notifications-web-push-protocol) which uses the [VAPID spec](https://tools.ietf.org/html/draft-thomson-webpush-vapid) to deliver notifications to web apps using encryption.
In order to use notifications the following requirements must be met:
- Frigate must be accessed via a secure https connection
- A supported browser must be used. Currently Chrome, Firefox, and Safari are known to be supported.
- In order for notifications to be usable externally, Frigate must be accessible externally
## Configuration
To configure notifications, go to the Frigate WebUI -> Settings -> Notifications and enable, then fill out the fields and save.
:::note
Currently, notifications are only supported in Chrome and Firefox browsers.
:::
## Registration
Once notifications are enabled, press the `Register for Notifications` button on all devices that you would like to receive notifications on. This will register the background worker. After this Frigate must be restarted and then notifications will begin to be sent.

View File

@ -22,8 +22,8 @@ class WebPushClient(Communicator): # type: ignore[misc]
def __init__(self, config: FrigateConfig) -> None:
self.config = config
self.claim = None
self.claim_headers = None
self.claim_headers: dict[str, dict[str, str]] = {}
self.refresh = 0
self.web_pushers: list[WebPusher] = []
if not self.config.notifications.email:
@ -41,27 +41,39 @@ class WebPushClient(Communicator): # type: ignore[misc]
"""Wrapper for allowing dispatcher to subscribe."""
pass
def check_registrations(self) -> None:
# check for valid claim or create new one
now = datetime.datetime.now().timestamp()
if len(self.claim_headers) == 0 or self.refresh < now:
self.refresh = (
datetime.datetime.now() + datetime.timedelta(hours=1)
).timestamp()
endpoints: set[str] = set()
# get a unique set of push endpoints
for push in self.web_pushers:
endpoint: str = push.subscription_info["endpoint"]
endpoints.add(endpoint[0 : endpoint.index("/", 10)])
# create new claim
for endpoint in endpoints:
claim = {
"sub": f"mailto:{self.config.notifications.email}",
"aud": endpoint,
"exp": self.refresh,
}
self.claim_headers[endpoint] = self.vapid.sign(claim)
def publish(self, topic: str, payload: Any, retain: bool = False) -> None:
"""Wrapper for publishing when client is in valid state."""
if topic == "reviews":
self.send_message(json.loads(payload))
self.send_alert(json.loads(payload))
def send_message(self, payload: dict[str, any]) -> None:
def send_alert(self, payload: dict[str, any]) -> None:
if not self.config.notifications.email:
return
# check for valid claim or create new one
now = datetime.datetime.now().timestamp()
if self.claim is None or self.claim["exp"] < now:
# create new claim
self.claim = {
"sub": f"mailto:{self.config.notifications.email}",
"aud": "https://fcm.googleapis.com",
"exp": (
datetime.datetime.now() + datetime.timedelta(hours=1)
).timestamp(),
}
self.claim_headers = self.vapid.sign(self.claim)
self.check_registrations()
# Only notify for alerts
if payload["after"]["severity"] != "alert":
@ -88,15 +100,25 @@ class WebPushClient(Communicator): # type: ignore[misc]
sorted_objects.update(payload["after"]["data"]["sub_labels"])
camera: str = payload["after"]["camera"]
title = f"{', '.join(sorted_objects).replace('_', ' ').title()}{' was' if state == 'end' else ''} detected in {', '.join(payload['after']['data']['zones']).replace('_', ' ').title()}"
message = f"Detected on {payload['after']['camera'].replace('_', ' ').title()}"
direct_url = f"/review?id={reviewId}"
message = f"Detected on {camera.replace('_', ' ').title()}"
image = f'{payload["after"]["thumb_path"].replace("/media/frigate", "")}'
# if event is ongoing open to live view otherwise open to recordings view
direct_url = f"/review?id={reviewId}" if state == "end" else f"/#{camera}"
for pusher in self.web_pushers:
endpoint = pusher.subscription_info["endpoint"]
# set headers for notification behavior
headers = self.claim_headers[endpoint[0 : endpoint.index("/", 10)]].copy()
headers["urgency"] = "high"
# send message
pusher.send(
headers=self.claim_headers,
ttl=0,
headers=headers,
ttl=3600,
data=json.dumps(
{
"title": title,