backend for suspended notifications

This commit is contained in:
Josh Hawkins 2024-12-17 11:22:50 -06:00
parent fe5f94aaeb
commit b9f70b2e4a
4 changed files with 121 additions and 23 deletions

View File

@ -301,7 +301,13 @@ class FrigateApp:
if self.config.mqtt.enabled:
comms.append(MqttClient(self.config))
if self.config.notifications.enabled_in_config:
notification_cameras = [
c
for c in self.config.cameras.values()
if c.enabled and c.notifications.enabled_in_config
]
if self.config.notifications.enabled_in_config or notification_cameras:
comms.append(WebPushClient(self.config))
comms.append(WebSocketClient(self.config))

View File

@ -0,0 +1,21 @@
from abc import ABC, abstractmethod
from typing import Any, Callable
class Communicator(ABC):
"""pub/sub model via specific protocol."""
@abstractmethod
def publish(self, topic: str, payload: Any, retain: bool = False) -> None:
"""Send data via specific protocol."""
pass
@abstractmethod
def subscribe(self, receiver: Callable) -> None:
"""Pass receiver so communicators can pass commands."""
pass
@abstractmethod
def stop(self) -> None:
"""Stop the communicator."""
pass

View File

@ -3,11 +3,12 @@
import datetime
import json
import logging
from abc import ABC, abstractmethod
from typing import Any, Callable, Optional
from frigate.camera import PTZMetrics
from frigate.comms.base_communicator import Communicator
from frigate.comms.config_updater import ConfigPublisher
from frigate.comms.webpush import WebPushClient
from frigate.config import BirdseyeModeEnum, FrigateConfig
from frigate.const import (
CLEAR_ONGOING_REVIEW_SEGMENTS,
@ -30,25 +31,6 @@ from frigate.util.services import restart_frigate
logger = logging.getLogger(__name__)
class Communicator(ABC):
"""pub/sub model via specific protocol."""
@abstractmethod
def publish(self, topic: str, payload: Any, retain: bool = False) -> None:
"""Send data via specific protocol."""
pass
@abstractmethod
def subscribe(self, receiver: Callable) -> None:
"""Pass receiver so communicators can pass commands."""
pass
@abstractmethod
def stop(self) -> None:
"""Stop the communicator."""
pass
class Dispatcher:
"""Handle communication between Frigate and communicators."""
@ -90,6 +72,12 @@ class Dispatcher:
for comm in self.comms:
comm.subscribe(self._receive)
self.web_push_client = next(
(comm for comm in communicators if isinstance(comm, WebPushClient)), None
)
if self.web_push_client is None:
logger.warning("WebPushClient not found in communicators")
def _receive(self, topic: str, payload: str) -> Optional[Any]:
"""Handle receiving of payload from communicators."""
@ -182,6 +170,13 @@ class Dispatcher:
"record": self.config.cameras[camera].record.enabled,
"audio": self.config.cameras[camera].audio.enabled,
"notifications": self.config.cameras[camera].notifications.enabled,
"notifications_suspended": int(
self.web_push_client.suspended_cameras.get(
camera, None
).timestamp()
)
if camera in self.web_push_client.suspended_cameras
else 0,
"autotracking": self.config.cameras[
camera
].onvif.autotracking.enabled,
@ -215,7 +210,7 @@ class Dispatcher:
"onConnect": handle_on_connect,
}
if topic.endswith("set") or topic.endswith("ptz"):
if topic.endswith("set") or topic.endswith("ptz") or topic.endswith("suspend"):
try:
parts = topic.split("/")
if len(parts) == 3 and topic.endswith("set"):
@ -230,6 +225,11 @@ class Dispatcher:
# example /cam_name/ptz payload=MOVE_UP|MOVE_DOWN|STOP...
camera_name = parts[-2]
handle_camera_command("ptz", camera_name, "", payload)
elif len(parts) == 3 and topic.endswith("suspend"):
# example /cam_name/notifications/suspend payload=duration
camera_name = parts[-3]
command = parts[-2]
self._on_camera_notification_suspend(camera_name, payload)
except IndexError:
logger.error(
f"Received invalid {topic.split('/')[-1]} command: {topic}"
@ -514,12 +514,60 @@ class Dispatcher:
if not notification_settings.enabled:
logger.info(f"Turning on notifications for {camera_name}")
notification_settings.enabled = True
if (
self.web_push_client
and camera_name in self.web_push_client.suspended_cameras
):
del self.web_push_client.suspended_cameras[camera_name]
elif payload == "OFF":
if notification_settings.enabled:
logger.info(f"Turning off notifications for {camera_name}")
notification_settings.enabled = False
if (
self.web_push_client
and camera_name in self.web_push_client.suspended_cameras
):
del self.web_push_client.suspended_cameras[camera_name]
self.config_updater.publish(
"config/notifications", {camera_name: notification_settings}
)
self.publish(f"{camera_name}/notifications/state", payload, retain=True)
self.publish(f"{camera_name}/notifications/suspended", "0", retain=True)
def _on_camera_notification_suspend(self, camera_name: str, payload: str) -> None:
"""Callback for camera level notifications suspend topic."""
try:
duration = int(payload)
except ValueError:
logger.error(f"Invalid suspension duration: {payload}")
return
if self.web_push_client is None:
logger.error("WebPushClient not available for suspension")
return
notification_settings = self.config.cameras[camera_name].notifications
if not notification_settings.enabled:
logger.error(f"Notifications are not enabled for {camera_name}")
return
if duration != 0:
self.web_push_client.suspend_notifications(camera_name, duration)
else:
self.web_push_client.unsuspend_notifications(camera_name)
self.publish(
f"{camera_name}/notifications/suspended",
str(
int(
self.web_push_client.suspended_cameras.get(
camera_name, None
).timestamp()
)
if camera_name in self.web_push_client.suspended_cameras
else 0
),
retain=True,
)

View File

@ -9,8 +9,8 @@ from typing import Any, Callable
from py_vapid import Vapid01
from pywebpush import WebPusher
from frigate.comms.base_communicator import Communicator
from frigate.comms.config_updater import ConfigSubscriber
from frigate.comms.dispatcher import Communicator
from frigate.config import FrigateConfig
from frigate.const import CONFIG_DIR
from frigate.models import User
@ -27,6 +27,7 @@ class WebPushClient(Communicator): # type: ignore[misc]
self.refresh: int = 0
self.web_pushers: dict[str, list[WebPusher]] = {}
self.expired_subs: dict[str, list[str]] = {}
self.suspended_cameras: dict[str, datetime.datetime] = {}
if not self.config.notifications.email:
logger.warning("Email must be provided for push notifications to be sent.")
@ -103,6 +104,25 @@ class WebPushClient(Communicator): # type: ignore[misc]
self.expired_subs = {}
def suspend_notifications(self, camera: str, minutes: int) -> None:
"""Suspend notifications for a specific camera."""
suspend_until = datetime.datetime.now() + datetime.timedelta(minutes=minutes)
self.suspended_cameras[camera] = suspend_until
logger.info(f"Notifications for {camera} suspended until {suspend_until}")
def unsuspend_notifications(self, camera: str) -> None:
"""Unsuspend notifications for a specific camera."""
del self.suspended_cameras[camera]
logger.info(f"Notifications for {camera} unsuspended")
def is_camera_suspended(self, camera: str) -> bool:
if camera in self.suspended_cameras:
if datetime.datetime.now() >= self.suspended_cameras[camera]:
del self.suspended_cameras[camera]
return False
return True
return False
def publish(self, topic: str, payload: Any, retain: bool = False) -> None:
"""Wrapper for publishing when client is in valid state."""
# check for updated notification config
@ -121,6 +141,9 @@ class WebPushClient(Communicator): # type: ignore[misc]
camera = decoded["before"]["camera"]
if not self.config.cameras[camera].notifications.enabled:
return
if self.is_camera_suspended(camera):
logger.debug(f"Notifications for {camera} are currently suspended.")
return
self.send_alert(decoded)
elif topic == "notification_test":
if not self.config.notifications.enabled: