mirror of
https://github.com/blakeblackshear/frigate.git
synced 2026-02-13 06:35:24 +03:00
Handle cleaning up expired notification registrations
This commit is contained in:
parent
f6ba1ab6a8
commit
9824cb86ba
@ -24,7 +24,8 @@ class WebPushClient(Communicator): # type: ignore[misc]
|
|||||||
self.config = config
|
self.config = config
|
||||||
self.claim_headers: dict[str, dict[str, str]] = {}
|
self.claim_headers: dict[str, dict[str, str]] = {}
|
||||||
self.refresh = 0
|
self.refresh = 0
|
||||||
self.web_pushers: list[WebPusher] = []
|
self.web_pushers: dict[str, list[WebPusher]] = {}
|
||||||
|
self.expired_subs: dict[str, list[str]] = {}
|
||||||
|
|
||||||
if not self.config.notifications.email:
|
if not self.config.notifications.email:
|
||||||
logger.warning("Email must be provided for push notifications to be sent.")
|
logger.warning("Email must be provided for push notifications to be sent.")
|
||||||
@ -32,10 +33,13 @@ class WebPushClient(Communicator): # type: ignore[misc]
|
|||||||
# Pull keys from PEM or generate if they do not exist
|
# Pull keys from PEM or generate if they do not exist
|
||||||
self.vapid = Vapid01.from_file(os.path.join(CONFIG_DIR, "notifications.pem"))
|
self.vapid = Vapid01.from_file(os.path.join(CONFIG_DIR, "notifications.pem"))
|
||||||
|
|
||||||
users: list[User] = User.select(User.notification_tokens).dicts().iterator()
|
users: list[User] = (
|
||||||
|
User.select(User.username, User.notification_tokens).dicts().iterator()
|
||||||
|
)
|
||||||
for user in users:
|
for user in users:
|
||||||
|
self.web_pushers[user["username"]] = []
|
||||||
for sub in user["notification_tokens"]:
|
for sub in user["notification_tokens"]:
|
||||||
self.web_pushers.append(WebPusher(sub))
|
self.web_pushers[user["username"]].append(WebPusher(sub))
|
||||||
|
|
||||||
def subscribe(self, receiver: Callable) -> None:
|
def subscribe(self, receiver: Callable) -> None:
|
||||||
"""Wrapper for allowing dispatcher to subscribe."""
|
"""Wrapper for allowing dispatcher to subscribe."""
|
||||||
@ -51,9 +55,10 @@ class WebPushClient(Communicator): # type: ignore[misc]
|
|||||||
endpoints: set[str] = set()
|
endpoints: set[str] = set()
|
||||||
|
|
||||||
# get a unique set of push endpoints
|
# get a unique set of push endpoints
|
||||||
for push in self.web_pushers:
|
for pushers in self.web_pushers.values():
|
||||||
endpoint: str = push.subscription_info["endpoint"]
|
for push in pushers:
|
||||||
endpoints.add(endpoint[0 : endpoint.index("/", 10)])
|
endpoint: str = push.subscription_info["endpoint"]
|
||||||
|
endpoints.add(endpoint[0 : endpoint.index("/", 10)])
|
||||||
|
|
||||||
# create new claim
|
# create new claim
|
||||||
for endpoint in endpoints:
|
for endpoint in endpoints:
|
||||||
@ -64,6 +69,36 @@ class WebPushClient(Communicator): # type: ignore[misc]
|
|||||||
}
|
}
|
||||||
self.claim_headers[endpoint] = self.vapid.sign(claim)
|
self.claim_headers[endpoint] = self.vapid.sign(claim)
|
||||||
|
|
||||||
|
def cleanup_registrations(self) -> None:
|
||||||
|
# delete any expired subs
|
||||||
|
if len(self.expired_subs) > 0:
|
||||||
|
for user, expired in self.expired_subs.items():
|
||||||
|
user_subs = []
|
||||||
|
|
||||||
|
# get all subscriptions, removing ones that are expired
|
||||||
|
stored_user: User = User.get_by_id(user)
|
||||||
|
for token in stored_user.notification_tokens:
|
||||||
|
if token["endpoint"] in expired:
|
||||||
|
continue
|
||||||
|
|
||||||
|
user_subs.append(token)
|
||||||
|
|
||||||
|
# overwrite the database and reset web pushers
|
||||||
|
User.update(notification_tokens=user_subs).where(
|
||||||
|
User.username == user
|
||||||
|
).execute()
|
||||||
|
|
||||||
|
self.web_pushers[user] = []
|
||||||
|
|
||||||
|
for sub in user_subs:
|
||||||
|
self.web_pushers[user].append(WebPusher(sub))
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
f"Cleaned up {len(expired)} notification subscriptions for {user}"
|
||||||
|
)
|
||||||
|
|
||||||
|
self.expired_subs = {}
|
||||||
|
|
||||||
def publish(self, topic: str, payload: Any, retain: bool = False) -> None:
|
def publish(self, topic: str, payload: Any, retain: bool = False) -> None:
|
||||||
"""Wrapper for publishing when client is in valid state."""
|
"""Wrapper for publishing when client is in valid state."""
|
||||||
if topic == "reviews":
|
if topic == "reviews":
|
||||||
@ -108,27 +143,45 @@ class WebPushClient(Communicator): # type: ignore[misc]
|
|||||||
# if event is ongoing open to live view otherwise open to recordings view
|
# if event is ongoing open to live view otherwise open to recordings view
|
||||||
direct_url = f"/review?id={reviewId}" if state == "end" else f"/#{camera}"
|
direct_url = f"/review?id={reviewId}" if state == "end" else f"/#{camera}"
|
||||||
|
|
||||||
for pusher in self.web_pushers:
|
for user, pushers in self.web_pushers.items():
|
||||||
endpoint = pusher.subscription_info["endpoint"]
|
for pusher in pushers:
|
||||||
|
endpoint = pusher.subscription_info["endpoint"]
|
||||||
|
|
||||||
# set headers for notification behavior
|
# set headers for notification behavior
|
||||||
headers = self.claim_headers[endpoint[0 : endpoint.index("/", 10)]].copy()
|
headers = self.claim_headers[
|
||||||
headers["urgency"] = "high"
|
endpoint[0 : endpoint.index("/", 10)]
|
||||||
|
].copy()
|
||||||
|
headers["urgency"] = "high"
|
||||||
|
|
||||||
# send message
|
# send message
|
||||||
pusher.send(
|
resp = pusher.send(
|
||||||
headers=headers,
|
headers=headers,
|
||||||
ttl=3600,
|
ttl=3600,
|
||||||
data=json.dumps(
|
data=json.dumps(
|
||||||
{
|
{
|
||||||
"title": title,
|
"title": title,
|
||||||
"message": message,
|
"message": message,
|
||||||
"direct_url": direct_url,
|
"direct_url": direct_url,
|
||||||
"image": image,
|
"image": image,
|
||||||
"id": reviewId,
|
"id": reviewId,
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if resp.status_code == 201:
|
||||||
|
pass
|
||||||
|
elif resp.status_code == 410:
|
||||||
|
if not self.expired_subs.get(user):
|
||||||
|
self.expired_subs[user] = []
|
||||||
|
|
||||||
|
self.expired_subs[user].append(pusher.subscription_info["endpoint"])
|
||||||
|
# the subscription no longer exists and should be removed
|
||||||
|
else:
|
||||||
|
logger.warning(
|
||||||
|
f"Failed to send notification to {user} :: {resp.headers}"
|
||||||
|
)
|
||||||
|
|
||||||
|
self.cleanup_registrations()
|
||||||
|
|
||||||
def stop(self) -> None:
|
def stop(self) -> None:
|
||||||
pass
|
pass
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user