From 9824cb86ba455c902f4c33f1a63f4e75c7fdb85e Mon Sep 17 00:00:00 2001 From: Nicolas Mowen Date: Sun, 21 Jul 2024 12:06:51 -0600 Subject: [PATCH] Handle cleaning up expired notification registrations --- frigate/comms/webpush.py | 103 +++++++++++++++++++++++++++++---------- 1 file changed, 78 insertions(+), 25 deletions(-) diff --git a/frigate/comms/webpush.py b/frigate/comms/webpush.py index 389283a2a..b09615a44 100644 --- a/frigate/comms/webpush.py +++ b/frigate/comms/webpush.py @@ -24,7 +24,8 @@ class WebPushClient(Communicator): # type: ignore[misc] self.config = config self.claim_headers: dict[str, dict[str, str]] = {} self.refresh = 0 - self.web_pushers: list[WebPusher] = [] + self.web_pushers: dict[str, list[WebPusher]] = {} + self.expired_subs: dict[str, list[str]] = {} if not self.config.notifications.email: logger.warning("Email must be provided for push notifications to be sent.") @@ -32,10 +33,13 @@ class WebPushClient(Communicator): # type: ignore[misc] # Pull keys from PEM or generate if they do not exist self.vapid = Vapid01.from_file(os.path.join(CONFIG_DIR, "notifications.pem")) - users: list[User] = User.select(User.notification_tokens).dicts().iterator() + users: list[User] = ( + User.select(User.username, User.notification_tokens).dicts().iterator() + ) for user in users: + self.web_pushers[user["username"]] = [] for sub in user["notification_tokens"]: - self.web_pushers.append(WebPusher(sub)) + self.web_pushers[user["username"]].append(WebPusher(sub)) def subscribe(self, receiver: Callable) -> None: """Wrapper for allowing dispatcher to subscribe.""" @@ -51,9 +55,10 @@ class WebPushClient(Communicator): # type: ignore[misc] endpoints: set[str] = set() # get a unique set of push endpoints - for push in self.web_pushers: - endpoint: str = push.subscription_info["endpoint"] - endpoints.add(endpoint[0 : endpoint.index("/", 10)]) + for pushers in self.web_pushers.values(): + for push in pushers: + endpoint: str = push.subscription_info["endpoint"] + endpoints.add(endpoint[0 : endpoint.index("/", 10)]) # create new claim for endpoint in endpoints: @@ -64,6 +69,36 @@ class WebPushClient(Communicator): # type: ignore[misc] } self.claim_headers[endpoint] = self.vapid.sign(claim) + def cleanup_registrations(self) -> None: + # delete any expired subs + if len(self.expired_subs) > 0: + for user, expired in self.expired_subs.items(): + user_subs = [] + + # get all subscriptions, removing ones that are expired + stored_user: User = User.get_by_id(user) + for token in stored_user.notification_tokens: + if token["endpoint"] in expired: + continue + + user_subs.append(token) + + # overwrite the database and reset web pushers + User.update(notification_tokens=user_subs).where( + User.username == user + ).execute() + + self.web_pushers[user] = [] + + for sub in user_subs: + self.web_pushers[user].append(WebPusher(sub)) + + logger.info( + f"Cleaned up {len(expired)} notification subscriptions for {user}" + ) + + self.expired_subs = {} + def publish(self, topic: str, payload: Any, retain: bool = False) -> None: """Wrapper for publishing when client is in valid state.""" if topic == "reviews": @@ -108,27 +143,45 @@ class WebPushClient(Communicator): # type: ignore[misc] # if event is ongoing open to live view otherwise open to recordings view direct_url = f"/review?id={reviewId}" if state == "end" else f"/#{camera}" - for pusher in self.web_pushers: - endpoint = pusher.subscription_info["endpoint"] + for user, pushers in self.web_pushers.items(): + for pusher in pushers: + endpoint = pusher.subscription_info["endpoint"] - # set headers for notification behavior - headers = self.claim_headers[endpoint[0 : endpoint.index("/", 10)]].copy() - headers["urgency"] = "high" + # set headers for notification behavior + headers = self.claim_headers[ + endpoint[0 : endpoint.index("/", 10)] + ].copy() + headers["urgency"] = "high" - # send message - pusher.send( - headers=headers, - ttl=3600, - data=json.dumps( - { - "title": title, - "message": message, - "direct_url": direct_url, - "image": image, - "id": reviewId, - } - ), - ) + # send message + resp = pusher.send( + headers=headers, + ttl=3600, + data=json.dumps( + { + "title": title, + "message": message, + "direct_url": direct_url, + "image": image, + "id": reviewId, + } + ), + ) + + if resp.status_code == 201: + pass + elif resp.status_code == 410: + if not self.expired_subs.get(user): + self.expired_subs[user] = [] + + self.expired_subs[user].append(pusher.subscription_info["endpoint"]) + # the subscription no longer exists and should be removed + else: + logger.warning( + f"Failed to send notification to {user} :: {resp.headers}" + ) + + self.cleanup_registrations() def stop(self) -> None: pass