Add support for ptz commands via websocket

This commit is contained in:
Nick Mowen 2022-12-14 10:00:38 -07:00
parent 0d16bd0144
commit a7048bccb5
5 changed files with 143 additions and 0 deletions

View File

@ -7,6 +7,7 @@ from typing import Any, Callable
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from frigate.config import FrigateConfig from frigate.config import FrigateConfig
from frigate.ptz import OnvifController, OnvifCommandEnum
from frigate.types import CameraMetricsTypes from frigate.types import CameraMetricsTypes
from frigate.util import restart_frigate from frigate.util import restart_frigate
@ -39,10 +40,12 @@ class Dispatcher:
def __init__( def __init__(
self, self,
config: FrigateConfig, config: FrigateConfig,
onvif: OnvifController,
camera_metrics: dict[str, CameraMetricsTypes], camera_metrics: dict[str, CameraMetricsTypes],
communicators: list[Communicator], communicators: list[Communicator],
) -> None: ) -> None:
self.config = config self.config = config
self.onvif = onvif
self.camera_metrics = camera_metrics self.camera_metrics = camera_metrics
self.comms = communicators self.comms = communicators
@ -63,12 +66,21 @@ class Dispatcher:
"""Handle receiving of payload from communicators.""" """Handle receiving of payload from communicators."""
if topic.endswith("set"): if topic.endswith("set"):
try: try:
# example /cam_name/detect/set payload=ON|OFF
camera_name = topic.split("/")[-3] camera_name = topic.split("/")[-3]
command = topic.split("/")[-2] command = topic.split("/")[-2]
self._camera_settings_handlers[command](camera_name, payload) self._camera_settings_handlers[command](camera_name, payload)
except Exception as e: except Exception as e:
logger.error(f"Received invalid set command: {topic}") logger.error(f"Received invalid set command: {topic}")
return return
elif topic.endswith("ptz"):
try:
# example /cam_name/ptz payload=MOVE_UP|MOVE_DOWN|STOP...
camera_name = topic.split("/")[-2]
self._on_ptz_command(camera_name, payload)
except Exception as e:
logger.error(f"Received invalid ptz command: {topic}")
return
elif topic == "restart": elif topic == "restart":
restart_frigate() restart_frigate()
@ -204,3 +216,11 @@ class Dispatcher:
snapshots_settings.enabled = False snapshots_settings.enabled = False
self.publish(f"{camera_name}/snapshots/state", payload, retain=True) self.publish(f"{camera_name}/snapshots/state", payload, retain=True)
def _on_ptz_command(self, camera_name: str, payload: str) -> None:
"""Callback for ptz topic."""
try:
command = OnvifCommandEnum[payload.lower()]
self.onvif.handle_command(camera_name, command)
except Exception as e:
return

View File

@ -167,6 +167,11 @@ class MqttClient(Communicator): # type: ignore[misc]
self.on_mqtt_command, self.on_mqtt_command,
) )
if self.config.cameras[name].onvif.host:
self.client.message_callback_add(
f"{self.mqtt_config.topic_prefix}/{name}/ptz/#"
)
self.client.message_callback_add( self.client.message_callback_add(
f"{self.mqtt_config.topic_prefix}/restart", self.on_mqtt_command f"{self.mqtt_config.topic_prefix}/restart", self.on_mqtt_command
) )

View File

@ -125,6 +125,13 @@ class MqttConfig(FrigateBaseModel):
return v return v
class OnvifConfig(FrigateBaseModel):
host: str = Field(default="", title="Onvif Host")
port: int = Field(default=8000, title="Onvif Port")
user: Optional[str] = Field(title="Onvif Username")
password: Optional[str] = Field(title="Onvif Password")
class RetainModeEnum(str, Enum): class RetainModeEnum(str, Enum):
all = "all" all = "all"
motion = "motion" motion = "motion"
@ -607,6 +614,9 @@ class CameraConfig(FrigateBaseModel):
detect: DetectConfig = Field( detect: DetectConfig = Field(
default_factory=DetectConfig, title="Object detection configuration." default_factory=DetectConfig, title="Object detection configuration."
) )
onvif: OnvifConfig = Field(
default_factory=OnvifConfig, title="Camera Onvif Configuration."
)
ui: CameraUiConfig = Field( ui: CameraUiConfig = Field(
default_factory=CameraUiConfig, title="Camera UI Modifications." default_factory=CameraUiConfig, title="Camera UI Modifications."
) )

107
frigate/ptz.py Normal file
View File

@ -0,0 +1,107 @@
"""Configure and control camera via onvif."""
import logging
from enum import Enum
from onvif import ONVIFCamera
from frigate.config import FrigateConfig
logger = logging.getLogger(__name__)
class OnvifCommandEnum(str, Enum):
"""Holds all possible move commands"""
move_down = "move_down"
move_left = "move_left"
move_right = "move_right"
move_up = "move_up"
stop = "stop"
class OnvifController:
def __init__(self, config: FrigateConfig) -> None:
self.cams: dict[str, ONVIFCamera] = {}
for cam_name, cam in config.cameras.items():
if cam.onvif.host:
self.cams[cam_name] = {
"onvif": ONVIFCamera(
cam.onvif.host,
cam.onvif.port,
cam.onvif.user,
cam.onvif.password,
),
"init": False,
"active": False,
}
def _init_onvif(self, camera_name: str) -> None:
onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
media = onvif.create_media_service()
profile = media.GetProfiles()[0]
ptz = onvif.create_ptz_service()
request = ptz.create_type("GetConfigurationOptions")
request.ConfigurationToken = profile.PTZConfiguration.token
ptz_config = ptz.GetConfigurationOptions(request)
move_request = ptz.create_type("ContinuousMove")
move_request.ProfileToken = profile.token
if move_request.Velocity is None:
move_request.Velocity = ptz.GetStatus(
{"ProfileToken": profile.token}
).Position
self.cams[camera_name]["move_request"] = move_request
def _stop(self, camera_name: str) -> None:
onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
move_request = self.cams[camera_name]["move_request"]
onvif.get_service("ptz").Stop(
{
"ProfileToken": move_request.ProfileToken,
"PanTilt": True,
"Zoom": True,
}
)
self.cams[camera_name]["active"] = False
def _move(self, camera_name: str, command: OnvifCommandEnum) -> None:
if self.cams[camera_name]["active"]:
logger.warning(
f"{camera_name} is already performing an action, stopping..."
)
self._stop(camera_name)
self.cams[camera_name]["active"] = True
onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
move_request = self.cams[camera_name]["move_request"]
if command == OnvifCommandEnum.left:
move_request.Velocity.PanTilt.x = -0.5
move_request.Velocity.PanTilt.y = 0
elif command == OnvifCommandEnum.right:
move_request.Velocity.PanTilt.x = 0.5
move_request.Velocity.PanTilt.y = 0
elif command == OnvifCommandEnum.up:
move_request.Velocity.PanTilt.x = 0
move_request.Velocity.PanTilt.y = 1
else:
move_request.Velocity.PanTilt.x = 0
move_request.Velocity.PanTilt.y = -1
onvif.get_service("ptz").ContinuousMove(move_request)
def handle_command(self, camera_name, command: OnvifCommandEnum) -> None:
if camera_name not in self.cams.keys():
logger.error(f"Onvif is not setup for {camera_name}")
return
if not self.cams[camera_name]["init"]:
self._init_onvif(camera_name)
if command == OnvifCommandEnum.stop:
self._stop(camera_name)
else:
self._move(camera_name, command)

View File

@ -4,6 +4,7 @@ imutils == 0.5.*
matplotlib == 3.6.* matplotlib == 3.6.*
mypy == 0.942 mypy == 0.942
numpy == 1.23.* numpy == 1.23.*
onvif_zeep == 0.2.12
opencv-python-headless == 4.5.5.* opencv-python-headless == 4.5.5.*
paho-mqtt == 1.6.* paho-mqtt == 1.6.*
peewee == 3.15.* peewee == 3.15.*