From c00f19bd33ce3bc162825ac7c0b71695b7e090d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20van=20den=20Berg?= Date: Sun, 1 Oct 2023 03:52:21 +0200 Subject: [PATCH] TP-link tapo C210 PTZ support DO NOT MERGE! --- frigate/ptz/onvif.py | 119 +++++++++++++++++++++++++++++++++++-------- 1 file changed, 99 insertions(+), 20 deletions(-) diff --git a/frigate/ptz/onvif.py b/frigate/ptz/onvif.py index b8794b6f1..e558c31dc 100644 --- a/frigate/ptz/onvif.py +++ b/frigate/ptz/onvif.py @@ -4,6 +4,13 @@ import logging import site from enum import Enum +import subprocess +import sys +subprocess.check_call([sys.executable, "-m", "pip", "install", "pytapo"]) +from pytapo import Tapo +import threading +import time + import numpy from onvif import ONVIFCamera, ONVIFError @@ -13,6 +20,8 @@ from frigate.util.builtin import find_by_key logger = logging.getLogger(__name__) +mover_thread = None +stop_mover_thread = False class OnvifCommandEnum(str, Enum): """Holds all possible move commands""" @@ -57,9 +66,13 @@ class OnvifController: "active": False, "features": [], "presets": {}, + "pytapo": Tapo(cam.onvif.host, "admin", "my-tp-link-cloud-password") #either admin/tp-link-cloud-password (used in app to set up device), or actual device username/password as configured in the app. } + print("pytapo:",self.cams[cam_name]["pytapo"].getBasicInfo()) except ONVIFError as e: logger.error(f"Onvif connection to {cam.name} failed: {e}") + except Exception as e: + logger.error(f"Unknown error connecting to {cam.name}: {e}") def _init_onvif(self, camera_name: str) -> bool: onvif: ONVIFCamera = self.cams[camera_name]["onvif"] @@ -98,6 +111,19 @@ class OnvifController: ), None, ) + + # status request for autotracking and filling ptz-parameters + status_request = ptz.create_type("GetStatus") + status_request.ProfileToken = profile.token + self.cams[camera_name]["status_request"] = status_request + try: + status = ptz.GetStatus(status_request) + except Exception as e: + logger.warning( + f"Unable to get status from camera: {camera_name}: {e}" + ) + status = None + logger.debug(f"Onvif status config for {camera_name}: {status}") # autoracking relative panning/tilting needs a relative zoom value set to 0 # if camera supports relative movement @@ -122,9 +148,7 @@ class OnvifController: move_request = ptz.create_type("RelativeMove") move_request.ProfileToken = profile.token if move_request.Translation is None and fov_space_id is not None: - move_request.Translation = ptz.GetStatus( - {"ProfileToken": profile.token} - ).Position + move_request.Translation = status.Position move_request.Translation.PanTilt.space = ptz_config["Spaces"][ "RelativePanTiltTranslationSpace" ][fov_space_id]["URI"] @@ -146,7 +170,7 @@ class OnvifController: ) if move_request.Speed is None: - move_request.Speed = ptz.GetStatus({"ProfileToken": profile.token}).Position + move_request.Speed = status.Position if status else None self.cams[camera_name]["relative_move_request"] = move_request # setup absolute moving request for autotracking zooming @@ -154,13 +178,6 @@ class OnvifController: move_request.ProfileToken = profile.token self.cams[camera_name]["absolute_move_request"] = move_request - # status request for autotracking - status_request = ptz.create_type("GetStatus") - status_request.ProfileToken = profile.token - self.cams[camera_name]["status_request"] = status_request - status = ptz.GetStatus(status_request) - logger.debug(f"Onvif status config for {camera_name}: {status}") - # setup existing presets try: presets: list[dict] = ptz.GetPresets({"ProfileToken": profile.token}) @@ -215,18 +232,29 @@ class OnvifController: return True def _stop(self, camera_name: str) -> None: + logger.info(f"Stopping {camera_name}") onvif: ONVIFCamera = self.cams[camera_name]["onvif"] move_request = self.cams[camera_name]["move_request"] - onvif.get_service("ptz").Stop( - { - "ProfileToken": move_request.ProfileToken, - "PanTilt": True, - "Zoom": True, - } - ) + # onvif.get_service("ptz").Stop( + # { + # "ProfileToken": move_request.ProfileToken, + # "PanTilt": True, + # "Zoom": True, + # } + # ) self.cams[camera_name]["active"] = False + # Stop the mover thread + # global mover_thread, stop_mover_thread + # if mover_thread is not None: + # stop_mover_thread = True + # mover_thread.join() + # mover_thread = None + + pytapo = self.cams[camera_name]["pytapo"] + pytapo.performRequest({"method": "do", "motor": {"stop":"null"}}) def _move(self, camera_name: str, command: OnvifCommandEnum) -> None: + logger.info(f"Moving {camera_name} {command}") if self.cams[camera_name]["active"]: logger.warning( f"{camera_name} is already performing an action, stopping..." @@ -234,6 +262,54 @@ class OnvifController: self._stop(camera_name) self.cams[camera_name]["active"] = True + + # Start the mover thread. + def mover_thread_func(): + global stop_mover_thread + # call the pytapo library once per second. Functions are: moveMotorClockWise, moveMotorCounterClockWise, moveMotorVertical (for up), moveMotorHorizontal (for down). + try: + while not stop_mover_thread: + pytapo = self.cams[camera_name]["pytapo"] + if command == OnvifCommandEnum.move_left: + res = pytapo.moveMotorCounterClockWise() + elif command == OnvifCommandEnum.move_right: + res = pytapo.moveMotorClockWise() + elif command == OnvifCommandEnum.move_up: + res = pytapo.moveMotorVertical() + elif command == OnvifCommandEnum.move_down: + res = pytapo.moveMotorHorizontal() + logger.info(f"Pytapo move direction: {command} result: {res}") + time.sleep(1) + pytapo.performRequest({"method": "do", "motor": {"stop":"null"}}) + except Exception as e: + # Probably at max pan/tilt + logger.info(f"Pytapo move direction: {command} result: {e}") + logger.info(f"Pytapo mover thread for {camera_name} stopped.") + + logger.info(f"Starting mover thread for {camera_name} in direction {command}") + + pytapo = self.cams[camera_name]["pytapo"] + try: + if command == OnvifCommandEnum.move_left: + res = pytapo.moveMotorCounterClockWise() + elif command == OnvifCommandEnum.move_right: + res = pytapo.moveMotorClockWise() + elif command == OnvifCommandEnum.move_up: + res = pytapo.moveMotorVertical() + elif command == OnvifCommandEnum.move_down: + res = pytapo.moveMotorHorizontal() + logger.info(f"Pytapo move direction: {command} result: {res}") + except Exception as e: + # Probably at max pan/tilt + logger.info(f"Pytapo move direction: {command} result: {e}") + return + + global mover_thread, stop_mover_thread + mover_thread = threading.Thread(target=mover_thread_func) + stop_mover_thread = False + mover_thread.start() + return + onvif: ONVIFCamera = self.cams[camera_name]["onvif"] move_request = self.cams[camera_name]["move_request"] @@ -256,7 +332,7 @@ class OnvifController: } } - onvif.get_service("ptz").ContinuousMove(move_request) + reply = onvif.get_service("ptz").ContinuousMove(move_request) def _move_relative(self, camera_name: str, pan, tilt, zoom, speed) -> None: if "pt-r-fov" not in self.cams[camera_name]["features"]: @@ -486,7 +562,10 @@ class OnvifController: onvif: ONVIFCamera = self.cams[camera_name]["onvif"] status_request = self.cams[camera_name]["status_request"] - status = onvif.get_service("ptz").GetStatus(status_request) + try: + status = onvif.get_service("ptz").GetStatus(status_request) + except Exception as e: + pass # We're unsupported, that'll be reported in the next check. # there doesn't seem to be an onvif standard with this optional parameter # some cameras can report MoveStatus with or without PanTilt or Zoom attributes