TP-link tapo C210 PTZ support

DO NOT MERGE!
This commit is contained in:
Daniël van den Berg 2023-10-01 03:52:21 +02:00 committed by GitHub
parent 9a4f970337
commit c00f19bd33
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -4,6 +4,13 @@ import logging
import site
from enum import Enum
import subprocess
import sys
subprocess.check_call([sys.executable, "-m", "pip", "install", "pytapo"])
from pytapo import Tapo
import threading
import time
import numpy
from onvif import ONVIFCamera, ONVIFError
@ -13,6 +20,8 @@ from frigate.util.builtin import find_by_key
logger = logging.getLogger(__name__)
mover_thread = None
stop_mover_thread = False
class OnvifCommandEnum(str, Enum):
"""Holds all possible move commands"""
@ -57,9 +66,13 @@ class OnvifController:
"active": False,
"features": [],
"presets": {},
"pytapo": Tapo(cam.onvif.host, "admin", "my-tp-link-cloud-password") #either admin/tp-link-cloud-password (used in app to set up device), or actual device username/password as configured in the app.
}
print("pytapo:",self.cams[cam_name]["pytapo"].getBasicInfo())
except ONVIFError as e:
logger.error(f"Onvif connection to {cam.name} failed: {e}")
except Exception as e:
logger.error(f"Unknown error connecting to {cam.name}: {e}")
def _init_onvif(self, camera_name: str) -> bool:
onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
@ -98,6 +111,19 @@ class OnvifController:
),
None,
)
# status request for autotracking and filling ptz-parameters
status_request = ptz.create_type("GetStatus")
status_request.ProfileToken = profile.token
self.cams[camera_name]["status_request"] = status_request
try:
status = ptz.GetStatus(status_request)
except Exception as e:
logger.warning(
f"Unable to get status from camera: {camera_name}: {e}"
)
status = None
logger.debug(f"Onvif status config for {camera_name}: {status}")
# autoracking relative panning/tilting needs a relative zoom value set to 0
# if camera supports relative movement
@ -122,9 +148,7 @@ class OnvifController:
move_request = ptz.create_type("RelativeMove")
move_request.ProfileToken = profile.token
if move_request.Translation is None and fov_space_id is not None:
move_request.Translation = ptz.GetStatus(
{"ProfileToken": profile.token}
).Position
move_request.Translation = status.Position
move_request.Translation.PanTilt.space = ptz_config["Spaces"][
"RelativePanTiltTranslationSpace"
][fov_space_id]["URI"]
@ -146,7 +170,7 @@ class OnvifController:
)
if move_request.Speed is None:
move_request.Speed = ptz.GetStatus({"ProfileToken": profile.token}).Position
move_request.Speed = status.Position if status else None
self.cams[camera_name]["relative_move_request"] = move_request
# setup absolute moving request for autotracking zooming
@ -154,13 +178,6 @@ class OnvifController:
move_request.ProfileToken = profile.token
self.cams[camera_name]["absolute_move_request"] = move_request
# status request for autotracking
status_request = ptz.create_type("GetStatus")
status_request.ProfileToken = profile.token
self.cams[camera_name]["status_request"] = status_request
status = ptz.GetStatus(status_request)
logger.debug(f"Onvif status config for {camera_name}: {status}")
# setup existing presets
try:
presets: list[dict] = ptz.GetPresets({"ProfileToken": profile.token})
@ -215,18 +232,29 @@ class OnvifController:
return True
def _stop(self, camera_name: str) -> None:
logger.info(f"Stopping {camera_name}")
onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
move_request = self.cams[camera_name]["move_request"]
onvif.get_service("ptz").Stop(
{
"ProfileToken": move_request.ProfileToken,
"PanTilt": True,
"Zoom": True,
}
)
# onvif.get_service("ptz").Stop(
# {
# "ProfileToken": move_request.ProfileToken,
# "PanTilt": True,
# "Zoom": True,
# }
# )
self.cams[camera_name]["active"] = False
# Stop the mover thread
# global mover_thread, stop_mover_thread
# if mover_thread is not None:
# stop_mover_thread = True
# mover_thread.join()
# mover_thread = None
pytapo = self.cams[camera_name]["pytapo"]
pytapo.performRequest({"method": "do", "motor": {"stop":"null"}})
def _move(self, camera_name: str, command: OnvifCommandEnum) -> None:
logger.info(f"Moving {camera_name} {command}")
if self.cams[camera_name]["active"]:
logger.warning(
f"{camera_name} is already performing an action, stopping..."
@ -234,6 +262,54 @@ class OnvifController:
self._stop(camera_name)
self.cams[camera_name]["active"] = True
# Start the mover thread.
def mover_thread_func():
global stop_mover_thread
# call the pytapo library once per second. Functions are: moveMotorClockWise, moveMotorCounterClockWise, moveMotorVertical (for up), moveMotorHorizontal (for down).
try:
while not stop_mover_thread:
pytapo = self.cams[camera_name]["pytapo"]
if command == OnvifCommandEnum.move_left:
res = pytapo.moveMotorCounterClockWise()
elif command == OnvifCommandEnum.move_right:
res = pytapo.moveMotorClockWise()
elif command == OnvifCommandEnum.move_up:
res = pytapo.moveMotorVertical()
elif command == OnvifCommandEnum.move_down:
res = pytapo.moveMotorHorizontal()
logger.info(f"Pytapo move direction: {command} result: {res}")
time.sleep(1)
pytapo.performRequest({"method": "do", "motor": {"stop":"null"}})
except Exception as e:
# Probably at max pan/tilt
logger.info(f"Pytapo move direction: {command} result: {e}")
logger.info(f"Pytapo mover thread for {camera_name} stopped.")
logger.info(f"Starting mover thread for {camera_name} in direction {command}")
pytapo = self.cams[camera_name]["pytapo"]
try:
if command == OnvifCommandEnum.move_left:
res = pytapo.moveMotorCounterClockWise()
elif command == OnvifCommandEnum.move_right:
res = pytapo.moveMotorClockWise()
elif command == OnvifCommandEnum.move_up:
res = pytapo.moveMotorVertical()
elif command == OnvifCommandEnum.move_down:
res = pytapo.moveMotorHorizontal()
logger.info(f"Pytapo move direction: {command} result: {res}")
except Exception as e:
# Probably at max pan/tilt
logger.info(f"Pytapo move direction: {command} result: {e}")
return
global mover_thread, stop_mover_thread
mover_thread = threading.Thread(target=mover_thread_func)
stop_mover_thread = False
mover_thread.start()
return
onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
move_request = self.cams[camera_name]["move_request"]
@ -256,7 +332,7 @@ class OnvifController:
}
}
onvif.get_service("ptz").ContinuousMove(move_request)
reply = onvif.get_service("ptz").ContinuousMove(move_request)
def _move_relative(self, camera_name: str, pan, tilt, zoom, speed) -> None:
if "pt-r-fov" not in self.cams[camera_name]["features"]:
@ -486,7 +562,10 @@ class OnvifController:
onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
status_request = self.cams[camera_name]["status_request"]
status = onvif.get_service("ptz").GetStatus(status_request)
try:
status = onvif.get_service("ptz").GetStatus(status_request)
except Exception as e:
pass # We're unsupported, that'll be reported in the next check.
# there doesn't seem to be an onvif standard with this optional parameter
# some cameras can report MoveStatus with or without PanTilt or Zoom attributes