Filter push notifications by user role camera access (#22385)

* filter push notifications by user camera access with cached role resolution

* simplify
This commit is contained in:
Josh Hawkins 2026-03-11 09:26:09 -05:00 committed by GitHub
parent 59fc8449ed
commit 104e623923
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 62 additions and 11 deletions

View File

@ -837,6 +837,7 @@ def create_user(
User.notification_tokens: [],
}
).execute()
request.app.config_publisher.publisher.publish("config/auth", None)
return JSONResponse(content={"username": body.username})
@ -854,6 +855,7 @@ def delete_user(request: Request, username: str):
)
User.delete_by_id(username)
request.app.config_publisher.publisher.publish("config/auth", None)
return JSONResponse(content={"success": True})
@ -973,6 +975,7 @@ async def update_role(
)
User.set_by_id(username, {User.role: body.role})
request.app.config_publisher.publisher.publish("config/auth", None)
return JSONResponse(content={"success": True})

View File

@ -17,6 +17,7 @@ from titlecase import titlecase
from frigate.comms.base_communicator import Communicator
from frigate.comms.config_updater import ConfigSubscriber
from frigate.config import FrigateConfig
from frigate.config.auth import AuthConfig
from frigate.config.camera.updater import (
CameraConfigUpdateEnum,
CameraConfigUpdateSubscriber,
@ -58,6 +59,7 @@ class WebPushClient(Communicator):
for c in self.config.cameras.values()
}
self.last_notification_time: float = 0
self.user_cameras: dict[str, set[str]] = {}
self.notification_queue: queue.Queue[PushNotification] = queue.Queue()
self.notification_thread = threading.Thread(
target=self._process_notifications, daemon=True
@ -78,13 +80,12 @@ class WebPushClient(Communicator):
for sub in user["notification_tokens"]:
self.web_pushers[user["username"]].append(WebPusher(sub))
# notification config updater
self.global_config_subscriber = ConfigSubscriber(
"config/notifications", exact=True
)
# notification and auth config updater
self.global_config_subscriber = ConfigSubscriber("config/")
self.config_subscriber = CameraConfigUpdateSubscriber(
self.config, self.config.cameras, [CameraConfigUpdateEnum.notifications]
)
self._refresh_user_cameras()
def subscribe(self, receiver: Callable) -> None:
"""Wrapper for allowing dispatcher to subscribe."""
@ -164,13 +165,19 @@ class WebPushClient(Communicator):
def publish(self, topic: str, payload: Any, retain: bool = False) -> None:
"""Wrapper for publishing when client is in valid state."""
# check for updated notification config
_, updated_notification_config = (
self.global_config_subscriber.check_for_update()
)
if updated_notification_config:
self.config.notifications = updated_notification_config
# check for updated global config (notifications, auth)
while True:
config_topic, config_payload = (
self.global_config_subscriber.check_for_update()
)
if config_topic is None:
break
if config_topic == "config/notifications" and config_payload:
self.config.notifications = config_payload
elif config_topic == "config/auth":
if isinstance(config_payload, AuthConfig):
self.config.auth = config_payload
self._refresh_user_cameras()
updates = self.config_subscriber.check_for_updates()
@ -291,6 +298,31 @@ class WebPushClient(Communicator):
except Exception as e:
logger.error(f"Error processing notification: {str(e)}")
def _refresh_user_cameras(self) -> None:
"""Rebuild the user-to-cameras access cache from the database."""
all_camera_names = set(self.config.cameras.keys())
roles_dict = self.config.auth.roles
updated: dict[str, set[str]] = {}
for user in User.select(User.username, User.role).dicts().iterator():
allowed = User.get_allowed_cameras(
user["role"], roles_dict, all_camera_names
)
updated[user["username"]] = set(allowed)
logger.debug(
"User %s has access to cameras: %s",
user["username"],
", ".join(allowed),
)
self.user_cameras = updated
def _user_has_camera_access(self, username: str, camera: str) -> bool:
"""Check if a user has access to a specific camera based on cached roles."""
allowed = self.user_cameras.get(username)
if allowed is None:
logger.debug(f"No camera access information found for user {username}")
return False
return camera in allowed
def _within_cooldown(self, camera: str) -> bool:
now = datetime.datetime.now().timestamp()
if now - self.last_notification_time < self.config.notifications.cooldown:
@ -418,6 +450,14 @@ class WebPushClient(Communicator):
logger.debug(f"Sending push notification for {camera}, review ID {reviewId}")
for user in self.web_pushers:
if not self._user_has_camera_access(user, camera):
logger.debug(
"Skipping notification for user %s - no access to camera %s",
user,
camera,
)
continue
self.send_push_notification(
user=user,
payload=payload,
@ -465,6 +505,14 @@ class WebPushClient(Communicator):
)
for user in self.web_pushers:
if not self._user_has_camera_access(user, camera):
logger.debug(
"Skipping notification for user %s - no access to camera %s",
user,
camera,
)
continue
self.send_push_notification(
user=user,
payload=payload,