Refactor ONVIF camera control for synchronous execution

Refactor ONVIF camera control to use synchronous methods instead of asyncio. Update camera initialization and command handling.
This commit is contained in:
omarmeleis 2026-02-06 13:18:44 +02:00 committed by GitHub
parent 4131252a3b
commit cd33e751da
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -1,17 +1,16 @@
"""Configure and control camera via onvif.""" """Configure and control camera via onvif."""
import asyncio
import logging import logging
import threading
import time
from enum import Enum from enum import Enum
from importlib.util import find_spec from importlib.util import find_spec
from pathlib import Path from pathlib import Path
from typing import Any
import numpy import numpy
from onvif import ONVIFCamera, ONVIFError, ONVIFService import requests
import time
from onvif import ONVIFCamera, ONVIFError
from zeep.exceptions import Fault, TransportError from zeep.exceptions import Fault, TransportError
from zeep.transports import Transport
from frigate.camera import PTZMetrics from frigate.camera import PTZMetrics
from frigate.config import FrigateConfig, ZoomingModeEnum from frigate.config import FrigateConfig, ZoomingModeEnum
@ -41,117 +40,69 @@ class OnvifController:
def __init__( def __init__(
self, config: FrigateConfig, ptz_metrics: dict[str, PTZMetrics] self, config: FrigateConfig, ptz_metrics: dict[str, PTZMetrics]
) -> None: ) -> None:
self.cams: dict[str, dict] = {} self.cams: dict[str, ONVIFCamera] = {}
self.failed_cams: dict[str, dict] = {}
self.max_retries = 5
self.reset_timeout = 900 # 15 minutes
self.config = config self.config = config
self.ptz_metrics = ptz_metrics self.ptz_metrics = ptz_metrics
self.status_locks: dict[str, asyncio.Lock] = {}
# Create a dedicated event loop and run it in a separate thread
self.loop = asyncio.new_event_loop()
self.loop_thread = threading.Thread(target=self._run_event_loop, daemon=True)
self.loop_thread.start()
self.camera_configs = {}
for cam_name, cam in config.cameras.items(): for cam_name, cam in config.cameras.items():
if not cam.enabled: if not cam.enabled:
continue continue
if cam.onvif.host: if cam.onvif.host:
self.camera_configs[cam_name] = cam try:
self.status_locks[cam_name] = asyncio.Lock() session = requests.Session()
session.verify = not cam.onvif.tls_insecure
transport = Transport(
timeout=10, operation_timeout=10, session=session
)
self.cams[cam_name] = {
"onvif": ONVIFCamera(
cam.onvif.host,
cam.onvif.port,
cam.onvif.user,
cam.onvif.password,
wsdl_dir=str(
Path(find_spec("onvif").origin).parent / "wsdl"
).replace("dist-packages/onvif", "site-packages"),
adjust_time=cam.onvif.ignore_time_mismatch,
transport=transport,
),
"init": False,
"active": False,
"features": [],
"presets": {},
}
except ONVIFError as e:
logger.error(f"Onvif connection to {cam.name} failed: {e}")
asyncio.run_coroutine_threadsafe(self._init_cameras(), self.loop) def _init_onvif(self, camera_name: str) -> bool:
def _run_event_loop(self) -> None:
"""Run the event loop in a separate thread."""
asyncio.set_event_loop(self.loop)
try:
self.loop.run_forever()
except Exception as e:
logger.error(f"Onvif event loop terminated unexpectedly: {e}")
async def _init_cameras(self) -> None:
"""Initialize all configured cameras."""
for cam_name in self.camera_configs:
await self._init_single_camera(cam_name)
async def _init_single_camera(self, cam_name: str) -> bool:
"""Initialize a single camera by name.
Args:
cam_name: The name of the camera to initialize
Returns:
bool: True if initialization succeeded, False otherwise
"""
if cam_name not in self.camera_configs:
logger.error(f"No configuration found for camera {cam_name}")
return False
cam = self.camera_configs[cam_name]
try:
self.cams[cam_name] = {
"onvif": ONVIFCamera(
cam.onvif.host,
cam.onvif.port,
cam.onvif.user,
cam.onvif.password,
wsdl_dir=str(Path(find_spec("onvif").origin).parent / "wsdl"),
adjust_time=cam.onvif.ignore_time_mismatch,
encrypt=not cam.onvif.tls_insecure,
),
"init": False,
"active": False,
"features": [],
"presets": {},
}
return True
except (Fault, ONVIFError, TransportError, Exception) as e:
logger.error(f"Failed to create ONVIF camera instance for {cam_name}: {e}")
# track initial failures
self.failed_cams[cam_name] = {
"retry_attempts": 0,
"last_error": str(e),
"last_attempt": time.time(),
}
return False
async def _init_onvif(self, camera_name: str) -> bool:
onvif: ONVIFCamera = self.cams[camera_name]["onvif"] onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
try:
await onvif.update_xaddrs()
except Exception as e:
logger.error(f"Onvif connection failed for {camera_name}: {e}")
return False
# create init services # create init services
media: ONVIFService = await onvif.create_media_service() media = onvif.create_media_service()
logger.debug(f"Onvif media xaddr for {camera_name}: {media.xaddr}") logger.debug(f"Onvif media xaddr for {camera_name}: {media.xaddr}")
try: try:
# this will fire an exception if camera is not a ptz # this will fire an exception if camera is not a ptz
capabilities = onvif.get_definition("ptz") capabilities = onvif.get_definition("ptz")
logger.debug(f"Onvif capabilities for {camera_name}: {capabilities}") logger.debug(f"Onvif capabilities for {camera_name}: {capabilities}")
except (Fault, ONVIFError, TransportError, Exception) as e: except (ONVIFError, Fault, TransportError) as e:
logger.error( logger.error(
f"Unable to get Onvif capabilities for camera: {camera_name}: {e}" f"Unable to get Onvif capabilities for camera: {camera_name}: {e}"
) )
return False return False
try: try:
profiles = await media.GetProfiles() profiles = media.GetProfiles()
logger.debug(f"Onvif profiles for {camera_name}: {profiles}") logger.debug(f"Onvif profiles for {camera_name}: {profiles}")
except (Fault, ONVIFError, TransportError, Exception) as e: except (ONVIFError, Fault, TransportError) as e:
logger.error( logger.error(
f"Unable to get Onvif media profiles for camera: {camera_name}: {e}" f"Unable to get Onvif media profiles for camera: {camera_name}: {e}"
) )
return False return False
profile = None profile = None
for _, onvif_profile in enumerate(profiles): for key, onvif_profile in enumerate(profiles):
if ( if (
onvif_profile.VideoEncoderConfiguration onvif_profile.VideoEncoderConfiguration
and onvif_profile.PTZConfiguration and onvif_profile.PTZConfiguration
@ -185,8 +136,7 @@ class OnvifController:
) )
return False return False
ptz: ONVIFService = await onvif.create_ptz_service() ptz = onvif.create_ptz_service()
self.cams[camera_name]["ptz"] = ptz
# setup continuous moving request # setup continuous moving request
move_request = ptz.create_type("ContinuousMove") move_request = ptz.create_type("ContinuousMove")
@ -200,7 +150,7 @@ class OnvifController:
): ):
request = ptz.create_type("GetConfigurationOptions") request = ptz.create_type("GetConfigurationOptions")
request.ConfigurationToken = profile.PTZConfiguration.token request.ConfigurationToken = profile.PTZConfiguration.token
ptz_config = await ptz.GetConfigurationOptions(request) ptz_config = ptz.GetConfigurationOptions(request)
logger.debug(f"Onvif config for {camera_name}: {ptz_config}") logger.debug(f"Onvif config for {camera_name}: {ptz_config}")
service_capabilities_request = ptz.create_type("GetServiceCapabilities") service_capabilities_request = ptz.create_type("GetServiceCapabilities")
@ -224,7 +174,7 @@ class OnvifController:
status_request.ProfileToken = profile.token status_request.ProfileToken = profile.token
self.cams[camera_name]["status_request"] = status_request self.cams[camera_name]["status_request"] = status_request
try: try:
status = await ptz.GetStatus(status_request) status = ptz.GetStatus(status_request)
logger.debug(f"Onvif status config for {camera_name}: {status}") logger.debug(f"Onvif status config for {camera_name}: {status}")
except Exception as e: except Exception as e:
logger.warning(f"Unable to get status from camera: {camera_name}: {e}") logger.warning(f"Unable to get status from camera: {camera_name}: {e}")
@ -268,25 +218,19 @@ class OnvifController:
"RelativeZoomTranslationSpace" "RelativeZoomTranslationSpace"
][zoom_space_id]["URI"] ][zoom_space_id]["URI"]
else: else:
if ( if "Zoom" in move_request["Translation"]:
move_request["Translation"] is not None
and "Zoom" in move_request["Translation"]
):
del move_request["Translation"]["Zoom"] del move_request["Translation"]["Zoom"]
if ( if "Zoom" in move_request["Speed"]:
move_request["Speed"] is not None
and "Zoom" in move_request["Speed"]
):
del move_request["Speed"]["Zoom"] del move_request["Speed"]["Zoom"]
logger.debug( logger.debug(
f"{camera_name}: Relative move request after deleting zoom: {move_request}" f"{camera_name}: Relative move request after deleting zoom: {move_request}"
) )
except Exception as e: except Exception:
self.config.cameras[ self.config.cameras[
camera_name camera_name
].onvif.autotracking.zooming = ZoomingModeEnum.disabled ].onvif.autotracking.zooming = ZoomingModeEnum.disabled
logger.warning( logger.warning(
f"Disabling autotracking zooming for {camera_name}: Relative zoom not supported. Exception: {e}" f"Disabling autotracking zooming for {camera_name}: Relative zoom not supported"
) )
if move_request.Speed is None: if move_request.Speed is None:
@ -303,8 +247,8 @@ class OnvifController:
# setup existing presets # setup existing presets
try: try:
presets: list[dict] = await ptz.GetPresets({"ProfileToken": profile.token}) presets: list[dict] = ptz.GetPresets({"ProfileToken": profile.token})
except (Fault, ONVIFError, TransportError, Exception) as e: except ONVIFError as e:
logger.warning(f"Unable to get presets from camera: {camera_name}: {e}") logger.warning(f"Unable to get presets from camera: {camera_name}: {e}")
presets = [] presets = []
@ -336,7 +280,7 @@ class OnvifController:
self.cams[camera_name]["relative_zoom_range"] = ( self.cams[camera_name]["relative_zoom_range"] = (
ptz_config.Spaces.RelativeZoomTranslationSpace[0] ptz_config.Spaces.RelativeZoomTranslationSpace[0]
) )
except Exception as e: except Exception:
if ( if (
self.config.cameras[camera_name].onvif.autotracking.zooming self.config.cameras[camera_name].onvif.autotracking.zooming
== ZoomingModeEnum.relative == ZoomingModeEnum.relative
@ -345,7 +289,7 @@ class OnvifController:
camera_name camera_name
].onvif.autotracking.zooming = ZoomingModeEnum.disabled ].onvif.autotracking.zooming = ZoomingModeEnum.disabled
logger.warning( logger.warning(
f"Disabling autotracking zooming for {camera_name}: Relative zoom not supported. Exception: {e}" f"Disabling autotracking zooming for {camera_name}: Relative zoom not supported"
) )
if configs.DefaultAbsoluteZoomPositionSpace: if configs.DefaultAbsoluteZoomPositionSpace:
@ -360,13 +304,13 @@ class OnvifController:
ptz_config.Spaces.AbsoluteZoomPositionSpace[0] ptz_config.Spaces.AbsoluteZoomPositionSpace[0]
) )
self.cams[camera_name]["zoom_limits"] = configs.ZoomLimits self.cams[camera_name]["zoom_limits"] = configs.ZoomLimits
except Exception as e: except Exception:
if self.config.cameras[camera_name].onvif.autotracking.zooming: if self.config.cameras[camera_name].onvif.autotracking.zooming:
self.config.cameras[ self.config.cameras[
camera_name camera_name
].onvif.autotracking.zooming = ZoomingModeEnum.disabled ].onvif.autotracking.zooming = ZoomingModeEnum.disabled
logger.warning( logger.warning(
f"Disabling autotracking zooming for {camera_name}: Absolute zoom not supported. Exception: {e}" f"Disabling autotracking zooming for {camera_name}: Absolute zoom not supported"
) )
# set relative pan/tilt space for autotracker # set relative pan/tilt space for autotracker
@ -382,12 +326,17 @@ class OnvifController:
) )
self.cams[camera_name]["features"] = supported_features self.cams[camera_name]["features"] = supported_features
self.cams[camera_name]["last_pan_tilt_pos"] = (0.0, 0.0)
self.cams[camera_name]["last_zoom_pos"] = 0.0
self.cams[camera_name]["last_pos_change_time"] = 0.0
self.cams[camera_name]["init"] = True self.cams[camera_name]["init"] = True
return True return True
async def _stop(self, camera_name: str) -> None: def _stop(self, camera_name: str) -> None:
onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
move_request = self.cams[camera_name]["move_request"] move_request = self.cams[camera_name]["move_request"]
await self.cams[camera_name]["ptz"].Stop( onvif.get_service("ptz").Stop(
{ {
"ProfileToken": move_request.ProfileToken, "ProfileToken": move_request.ProfileToken,
"PanTilt": True, "PanTilt": True,
@ -396,18 +345,19 @@ class OnvifController:
) )
self.cams[camera_name]["active"] = False self.cams[camera_name]["active"] = False
async def _move(self, camera_name: str, command: OnvifCommandEnum) -> None: def _move(self, camera_name: str, command: OnvifCommandEnum) -> None:
if self.cams[camera_name]["active"]: if self.cams[camera_name]["active"]:
logger.warning( logger.warning(
f"{camera_name} is already performing an action, stopping..." f"{camera_name} is already performing an action, stopping..."
) )
await self._stop(camera_name) self._stop(camera_name)
if "pt" not in self.cams[camera_name]["features"]: if "pt" not in self.cams[camera_name]["features"]:
logger.error(f"{camera_name} does not support ONVIF pan/tilt movement.") logger.error(f"{camera_name} does not support ONVIF pan/tilt movement.")
return return
self.cams[camera_name]["active"] = True self.cams[camera_name]["active"] = True
onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
move_request = self.cams[camera_name]["move_request"] move_request = self.cams[camera_name]["move_request"]
if command == OnvifCommandEnum.move_left: if command == OnvifCommandEnum.move_left:
@ -430,11 +380,11 @@ class OnvifController:
} }
try: try:
await self.cams[camera_name]["ptz"].ContinuousMove(move_request) onvif.get_service("ptz").ContinuousMove(move_request)
except (Fault, ONVIFError, TransportError, Exception) as e: except ONVIFError as e:
logger.warning(f"Onvif sending move request to {camera_name} failed: {e}") logger.warning(f"Onvif sending move request to {camera_name} failed: {e}")
async def _move_relative(self, camera_name: str, pan, tilt, zoom, speed) -> None: def _move_relative(self, camera_name: str, pan, tilt, zoom, speed) -> None:
if "pt-r-fov" not in self.cams[camera_name]["features"]: if "pt-r-fov" not in self.cams[camera_name]["features"]:
logger.error(f"{camera_name} does not support ONVIF RelativeMove (FOV).") logger.error(f"{camera_name} does not support ONVIF RelativeMove (FOV).")
return return
@ -458,25 +408,26 @@ class OnvifController:
camera_name camera_name
].frame_time.value ].frame_time.value
self.ptz_metrics[camera_name].stop_time.value = 0 self.ptz_metrics[camera_name].stop_time.value = 0
onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
move_request = self.cams[camera_name]["relative_move_request"] move_request = self.cams[camera_name]["relative_move_request"]
# function takes in -1 to 1 for pan and tilt, interpolate to the values of the camera. # function takes in -1 to 1 for pan and tilt, interpolate to the values of the camera.
# The onvif spec says this can report as +INF and -INF, so this may need to be modified # The onvif spec says this can report as +INF and -INF, so this may need to be modified
pan = numpy.interp( pan = numpy.interp(
pan, pan,
[-1, 1],
[ [
self.cams[camera_name]["relative_fov_range"]["XRange"]["Min"], self.cams[camera_name]["relative_fov_range"]["XRange"]["Min"],
self.cams[camera_name]["relative_fov_range"]["XRange"]["Max"], self.cams[camera_name]["relative_fov_range"]["XRange"]["Max"],
], ],
[-1, 1],
) )
tilt = numpy.interp( tilt = numpy.interp(
tilt, tilt,
[-1, 1],
[ [
self.cams[camera_name]["relative_fov_range"]["YRange"]["Min"], self.cams[camera_name]["relative_fov_range"]["YRange"]["Min"],
self.cams[camera_name]["relative_fov_range"]["YRange"]["Max"], self.cams[camera_name]["relative_fov_range"]["YRange"]["Max"],
], ],
[-1, 1],
) )
move_request.Speed = { move_request.Speed = {
@ -503,7 +454,7 @@ class OnvifController:
} }
move_request.Translation.Zoom.x = zoom move_request.Translation.Zoom.x = zoom
await self.cams[camera_name]["ptz"].RelativeMove(move_request) onvif.get_service("ptz").RelativeMove(move_request)
# reset after the move request # reset after the move request
move_request.Translation.PanTilt.x = 0 move_request.Translation.PanTilt.x = 0
@ -518,18 +469,19 @@ class OnvifController:
self.cams[camera_name]["active"] = False self.cams[camera_name]["active"] = False
async def _move_to_preset(self, camera_name: str, preset: str) -> None: def _move_to_preset(self, camera_name: str, preset: str) -> None:
if preset not in self.cams[camera_name]["presets"]: if preset not in self.cams[camera_name]["presets"]:
logger.error(f"{preset} is not a valid preset for {camera_name}") logger.error(f"{preset} is not a valid preset for {camera_name}")
return return
self.cams[camera_name]["active"] = True self.cams[camera_name]["active"] = True
self.ptz_metrics[camera_name].motor_stopped.clear()
self.ptz_metrics[camera_name].start_time.value = 0 self.ptz_metrics[camera_name].start_time.value = 0
self.ptz_metrics[camera_name].stop_time.value = 0 self.ptz_metrics[camera_name].stop_time.value = 0
move_request = self.cams[camera_name]["move_request"] move_request = self.cams[camera_name]["move_request"]
onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
preset_token = self.cams[camera_name]["presets"][preset] preset_token = self.cams[camera_name]["presets"][preset]
onvif.get_service("ptz").GotoPreset(
await self.cams[camera_name]["ptz"].GotoPreset(
{ {
"ProfileToken": move_request.ProfileToken, "ProfileToken": move_request.ProfileToken,
"PresetToken": preset_token, "PresetToken": preset_token,
@ -538,18 +490,19 @@ class OnvifController:
self.cams[camera_name]["active"] = False self.cams[camera_name]["active"] = False
async def _zoom(self, camera_name: str, command: OnvifCommandEnum) -> None: def _zoom(self, camera_name: str, command: OnvifCommandEnum) -> None:
if self.cams[camera_name]["active"]: if self.cams[camera_name]["active"]:
logger.warning( logger.warning(
f"{camera_name} is already performing an action, stopping..." f"{camera_name} is already performing an action, stopping..."
) )
await self._stop(camera_name) self._stop(camera_name)
if "zoom" not in self.cams[camera_name]["features"]: if "zoom" not in self.cams[camera_name]["features"]:
logger.error(f"{camera_name} does not support ONVIF zooming.") logger.error(f"{camera_name} does not support ONVIF zooming.")
return return
self.cams[camera_name]["active"] = True self.cams[camera_name]["active"] = True
onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
move_request = self.cams[camera_name]["move_request"] move_request = self.cams[camera_name]["move_request"]
if command == OnvifCommandEnum.zoom_in: if command == OnvifCommandEnum.zoom_in:
@ -557,9 +510,9 @@ class OnvifController:
elif command == OnvifCommandEnum.zoom_out: elif command == OnvifCommandEnum.zoom_out:
move_request.Velocity = {"Zoom": {"x": -0.5}} move_request.Velocity = {"Zoom": {"x": -0.5}}
await self.cams[camera_name]["ptz"].ContinuousMove(move_request) onvif.get_service("ptz").ContinuousMove(move_request)
async def _zoom_absolute(self, camera_name: str, zoom, speed) -> None: def _zoom_absolute(self, camera_name: str, zoom, speed) -> None:
if "zoom-a" not in self.cams[camera_name]["features"]: if "zoom-a" not in self.cams[camera_name]["features"]:
logger.error(f"{camera_name} does not support ONVIF AbsoluteMove zooming.") logger.error(f"{camera_name} does not support ONVIF AbsoluteMove zooming.")
return return
@ -581,16 +534,17 @@ class OnvifController:
camera_name camera_name
].frame_time.value ].frame_time.value
self.ptz_metrics[camera_name].stop_time.value = 0 self.ptz_metrics[camera_name].stop_time.value = 0
onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
move_request = self.cams[camera_name]["absolute_move_request"] move_request = self.cams[camera_name]["absolute_move_request"]
# function takes in 0 to 1 for zoom, interpolate to the values of the camera. # function takes in 0 to 1 for zoom, interpolate to the values of the camera.
zoom = numpy.interp( zoom = numpy.interp(
zoom, zoom,
[0, 1],
[ [
self.cams[camera_name]["absolute_zoom_range"]["XRange"]["Min"], self.cams[camera_name]["absolute_zoom_range"]["XRange"]["Min"],
self.cams[camera_name]["absolute_zoom_range"]["XRange"]["Max"], self.cams[camera_name]["absolute_zoom_range"]["XRange"]["Max"],
], ],
[0, 1],
) )
move_request.Speed = {"Zoom": speed} move_request.Speed = {"Zoom": speed}
@ -598,20 +552,19 @@ class OnvifController:
logger.debug(f"{camera_name}: Absolute zoom: {zoom}") logger.debug(f"{camera_name}: Absolute zoom: {zoom}")
await self.cams[camera_name]["ptz"].AbsoluteMove(move_request) onvif.get_service("ptz").AbsoluteMove(move_request)
self.cams[camera_name]["active"] = False self.cams[camera_name]["active"] = False
async def handle_command_async( def handle_command(
self, camera_name: str, command: OnvifCommandEnum, param: str = "" self, camera_name: str, command: OnvifCommandEnum, param: str = ""
) -> None: ) -> None:
"""Handle ONVIF commands asynchronously"""
if camera_name not in self.cams.keys(): if camera_name not in self.cams.keys():
logger.error(f"ONVIF is not configured for {camera_name}") logger.error(f"Onvif is not setup for {camera_name}")
return return
if not self.cams[camera_name]["init"]: if not self.cams[camera_name]["init"]:
if not await self._init_onvif(camera_name): if not self._init_onvif(camera_name):
return return
try: try:
@ -619,140 +572,52 @@ class OnvifController:
# already init # already init
return return
elif command == OnvifCommandEnum.stop: elif command == OnvifCommandEnum.stop:
await self._stop(camera_name) self._stop(camera_name)
elif command == OnvifCommandEnum.preset: elif command == OnvifCommandEnum.preset:
await self._move_to_preset(camera_name, param) self._move_to_preset(camera_name, param)
elif command == OnvifCommandEnum.move_relative: elif command == OnvifCommandEnum.move_relative:
_, pan, tilt = param.split("_") _, pan, tilt = param.split("_")
await self._move_relative(camera_name, float(pan), float(tilt), 0, 1) self._move_relative(camera_name, float(pan), float(tilt), 0, 1)
elif ( elif (
command == OnvifCommandEnum.zoom_in command == OnvifCommandEnum.zoom_in
or command == OnvifCommandEnum.zoom_out or command == OnvifCommandEnum.zoom_out
): ):
await self._zoom(camera_name, command) self._zoom(camera_name, command)
else: else:
await self._move(camera_name, command) self._move(camera_name, command)
except (Fault, ONVIFError, TransportError, Exception) as e: except ONVIFError as e:
logger.error(f"Unable to handle onvif command: {e}") logger.error(f"Unable to handle onvif command: {e}")
def handle_command( def get_camera_info(self, camera_name: str) -> dict[str, any]:
self, camera_name: str, command: OnvifCommandEnum, param: str = ""
) -> None:
"""
Handle ONVIF commands by scheduling them in the event loop.
This is the synchronous interface that schedules async work.
"""
future = asyncio.run_coroutine_threadsafe(
self.handle_command_async(camera_name, command, param), self.loop
)
try:
# Wait with a timeout to prevent blocking indefinitely
future.result(timeout=10)
except asyncio.TimeoutError:
logger.error(f"Command {command} timed out for camera {camera_name}")
except Exception as e:
logger.error(
f"Error executing command {command} for camera {camera_name}: {e}"
)
async def get_camera_info(self, camera_name: str) -> dict[str, Any]:
"""
Get ptz capabilities and presets, attempting to reconnect if ONVIF is configured
but not initialized.
Returns camera details including features and presets if available.
"""
if not self.config.cameras[camera_name].enabled:
logger.debug(
f"Camera {camera_name} disabled, won't try to initialize ONVIF"
)
return {}
if camera_name not in self.cams.keys() and (
camera_name not in self.config.cameras
or not self.config.cameras[camera_name].onvif.host
):
logger.debug(f"ONVIF is not configured for {camera_name}")
return {}
if camera_name in self.cams.keys() and self.cams[camera_name]["init"]:
return {
"name": camera_name,
"features": self.cams[camera_name]["features"],
"presets": list(self.cams[camera_name]["presets"].keys()),
}
if camera_name not in self.cams.keys() and camera_name in self.config.cameras:
success = await self._init_single_camera(camera_name)
if not success:
return {}
# Reset retry count after timeout
attempts = self.failed_cams.get(camera_name, {}).get("retry_attempts", 0)
last_attempt = self.failed_cams.get(camera_name, {}).get("last_attempt", 0)
if last_attempt and (time.time() - last_attempt) > self.reset_timeout:
logger.debug(f"Resetting retry count for {camera_name} after timeout")
attempts = 0
self.failed_cams[camera_name]["retry_attempts"] = 0
# Attempt initialization/reconnection
if attempts < self.max_retries:
logger.info(
f"Attempting ONVIF initialization for {camera_name} (retry {attempts + 1}/{self.max_retries})"
)
try:
if await self._init_onvif(camera_name):
if camera_name in self.failed_cams:
del self.failed_cams[camera_name]
return {
"name": camera_name,
"features": self.cams[camera_name]["features"],
"presets": list(self.cams[camera_name]["presets"].keys()),
}
else:
logger.warning(f"ONVIF initialization failed for {camera_name}")
except Exception as e:
logger.error(
f"Error during ONVIF initialization for {camera_name}: {e}"
)
if camera_name not in self.failed_cams:
self.failed_cams[camera_name] = {"retry_attempts": 0}
self.failed_cams[camera_name].update(
{
"retry_attempts": attempts + 1,
"last_error": str(e),
"last_attempt": time.time(),
}
)
if attempts >= self.max_retries:
remaining_time = max(
0, int((self.reset_timeout - (time.time() - last_attempt)) / 60)
)
logger.error(
f"Too many ONVIF initialization attempts for {camera_name}, retry in {remaining_time} minute{'s' if remaining_time != 1 else ''}"
)
logger.debug(f"Could not initialize ONVIF for {camera_name}")
return {}
async def get_service_capabilities(self, camera_name: str) -> None:
if camera_name not in self.cams.keys(): if camera_name not in self.cams.keys():
logger.error(f"ONVIF is not configured for {camera_name}") logger.debug(f"Onvif is not setup for {camera_name}")
return {} return {}
if not self.cams[camera_name]["init"]: if not self.cams[camera_name]["init"]:
await self._init_onvif(camera_name) self._init_onvif(camera_name)
return {
"name": camera_name,
"features": self.cams[camera_name]["features"],
"presets": list(self.cams[camera_name]["presets"].keys()),
}
def get_service_capabilities(self, camera_name: str) -> None:
if camera_name not in self.cams.keys():
logger.error(f"Onvif is not setup for {camera_name}")
return {}
if not self.cams[camera_name]["init"]:
self._init_onvif(camera_name)
onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
service_capabilities_request = self.cams[camera_name][ service_capabilities_request = self.cams[camera_name][
"service_capabilities_request" "service_capabilities_request"
] ]
try: try:
service_capabilities = await self.cams[camera_name][ service_capabilities = onvif.get_service("ptz").GetServiceCapabilities(
"ptz" service_capabilities_request
].GetServiceCapabilities(service_capabilities_request) )
logger.debug( logger.debug(
f"Onvif service capabilities for {camera_name}: {service_capabilities}" f"Onvif service capabilities for {camera_name}: {service_capabilities}"
@ -760,132 +625,113 @@ class OnvifController:
# MoveStatus is required for autotracking - should return "true" if supported # MoveStatus is required for autotracking - should return "true" if supported
return find_by_key(vars(service_capabilities), "MoveStatus") return find_by_key(vars(service_capabilities), "MoveStatus")
except Exception as e: except Exception:
logger.warning( logger.warning(
f"Camera {camera_name} does not support the ONVIF GetServiceCapabilities method. Autotracking will not function correctly and must be disabled in your config. Exception: {e}" f"Camera {camera_name} does not support the ONVIF GetServiceCapabilities method. Autotracking will not function correctly and must be disabled in your config."
) )
return False return False
async def get_camera_status(self, camera_name: str) -> None: def get_camera_status(self, camera_name: str) -> None:
async with self.status_locks[camera_name]: if camera_name not in self.cams.keys():
if camera_name not in self.cams.keys(): logger.error(f"Onvif is not setup for {camera_name}")
logger.error(f"ONVIF is not configured for {camera_name}") return {}
return
if not self.cams[camera_name]["init"]: if not self.cams[camera_name]["init"]:
if not await self._init_onvif(camera_name): self._init_onvif(camera_name)
return
status_request = self.cams[camera_name]["status_request"] onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
try: status_request = self.cams[camera_name]["status_request"]
status = await self.cams[camera_name]["ptz"].GetStatus(status_request) try:
except Exception: status = onvif.get_service("ptz").GetStatus(status_request)
pass # We're unsupported, that'll be reported in the next check. except Exception:
pass # We're unsupported, that'll be reported in the next check.
try: try:
pan_tilt_status = getattr(status.MoveStatus, "PanTilt", None) pan_tilt_status = getattr(status.MoveStatus, "PanTilt", None)
zoom_status = getattr(status.MoveStatus, "Zoom", None) zoom_status = getattr(status.MoveStatus, "Zoom", None)
# if it's not an attribute, see if MoveStatus even exists in the status result # if it's not an attribute, see if MoveStatus even exists in the status result
if pan_tilt_status is None: if pan_tilt_status is None:
pan_tilt_status = getattr(status, "MoveStatus", None) pan_tilt_status = getattr(status, "MoveStatus", None)
# we're unsupported # we're unsupported
if pan_tilt_status is None or pan_tilt_status not in [ if pan_tilt_status is None or pan_tilt_status not in [
"IDLE", "IDLE",
"MOVING", "MOVING",
]: ]:
raise Exception raise Exception
except Exception: except Exception:
logger.warning( logger.warning(
f"Camera {camera_name} does not support the ONVIF GetStatus method. Autotracking will not function correctly and must be disabled in your config." f"Camera {camera_name} does not support the ONVIF GetStatus method. Autotracking will not function correctly and must be disabled in your config."
)
return
logger.debug(
f"{camera_name}: Pan/tilt status: {pan_tilt_status}, Zoom status: {zoom_status}"
) )
return
if pan_tilt_status == "IDLE" and ( position_changed = status.Position.PanTilt.x != self.cams[camera_name]["last_pan_tilt_pos"][0] or status.Position.PanTilt.y != self.cams[camera_name]["last_pan_tilt_pos"][1]
zoom_status is None or zoom_status == "IDLE" zoom_changed = self.cams[camera_name]["last_zoom_pos"] != status.Position.Zoom.x
):
self.cams[camera_name]["active"] = False
if not self.ptz_metrics[camera_name].motor_stopped.is_set():
self.ptz_metrics[camera_name].motor_stopped.set()
logger.debug( self.cams[camera_name]["last_pan_tilt_pos"] = (status.Position.PanTilt.x, status.Position.PanTilt.y)
f"{camera_name}: PTZ stop time: {self.ptz_metrics[camera_name].frame_time.value}" self.cams[camera_name]["last_zoom_pos"] = status.Position.Zoom.x
) if position_changed or zoom_changed:
self.cams[camera_name]["last_pos_change_time"] = time.time()
time_since_last_pos_change = time.time() - self.cams[camera_name]["last_pos_change_time"]
self.ptz_metrics[camera_name].stop_time.value = self.ptz_metrics[ if time_since_last_pos_change > 0.1 or ((pan_tilt_status == "IDLE") and (zoom_status is None or zoom_status == "IDLE")):
camera_name self.cams[camera_name]["active"] = False
].frame_time.value if not self.ptz_metrics[camera_name].motor_stopped.is_set():
else: self.ptz_metrics[camera_name].motor_stopped.set()
self.cams[camera_name]["active"] = True
if self.ptz_metrics[camera_name].motor_stopped.is_set():
self.ptz_metrics[camera_name].motor_stopped.clear()
logger.debug(
f"{camera_name}: PTZ start time: {self.ptz_metrics[camera_name].frame_time.value}"
)
self.ptz_metrics[camera_name].start_time.value = self.ptz_metrics[
camera_name
].frame_time.value
self.ptz_metrics[camera_name].stop_time.value = 0
if (
self.config.cameras[camera_name].onvif.autotracking.zooming
!= ZoomingModeEnum.disabled
):
# store absolute zoom level as 0 to 1 interpolated from the values of the camera
self.ptz_metrics[camera_name].zoom_level.value = numpy.interp(
round(status.Position.Zoom.x, 2),
[
self.cams[camera_name]["absolute_zoom_range"]["XRange"]["Min"],
self.cams[camera_name]["absolute_zoom_range"]["XRange"]["Max"],
],
[0, 1],
)
logger.debug( logger.debug(
f"{camera_name}: Camera zoom level: {self.ptz_metrics[camera_name].zoom_level.value}" f"{camera_name}: PTZ stop time: {self.ptz_metrics[camera_name].frame_time.value}"
) )
# some hikvision cams won't update MoveStatus, so warn if it hasn't changed
if (
not self.ptz_metrics[camera_name].motor_stopped.is_set()
and not self.ptz_metrics[camera_name].reset.is_set()
and self.ptz_metrics[camera_name].start_time.value != 0
and self.ptz_metrics[camera_name].frame_time.value
> (self.ptz_metrics[camera_name].start_time.value + 10)
and self.ptz_metrics[camera_name].stop_time.value == 0
):
logger.debug(
f"Start time: {self.ptz_metrics[camera_name].start_time.value}, Stop time: {self.ptz_metrics[camera_name].stop_time.value}, Frame time: {self.ptz_metrics[camera_name].frame_time.value}"
)
# set the stop time so we don't come back into this again and spam the logs
self.ptz_metrics[camera_name].stop_time.value = self.ptz_metrics[ self.ptz_metrics[camera_name].stop_time.value = self.ptz_metrics[
camera_name camera_name
].frame_time.value ].frame_time.value
logger.warning( else:
f"Camera {camera_name} is still in ONVIF 'MOVING' status." self.cams[camera_name]["active"] = True
if self.ptz_metrics[camera_name].motor_stopped.is_set():
self.ptz_metrics[camera_name].motor_stopped.clear()
logger.debug(
f"{camera_name}: PTZ start time: {self.ptz_metrics[camera_name].frame_time.value}"
) )
def close(self) -> None: self.ptz_metrics[camera_name].start_time.value = self.ptz_metrics[
"""Gracefully shut down the ONVIF controller.""" camera_name
if not hasattr(self, "loop") or self.loop.is_closed(): ].frame_time.value
logger.debug("ONVIF controller already closed") self.ptz_metrics[camera_name].stop_time.value = 0
return
logger.info("Exiting ONVIF controller...") if (
self.config.cameras[camera_name].onvif.autotracking.zooming
!= ZoomingModeEnum.disabled
):
# store absolute zoom level as 0 to 1 interpolated from the values of the camera
self.ptz_metrics[camera_name].zoom_level.value = numpy.interp(
round(status.Position.Zoom.x, 2),
[
self.cams[camera_name]["absolute_zoom_range"]["XRange"]["Min"],
self.cams[camera_name]["absolute_zoom_range"]["XRange"]["Max"],
],
[0, 1],
)
logger.debug(
f"{camera_name}: Camera zoom level: {self.ptz_metrics[camera_name].zoom_level.value}"
)
def stop_and_cleanup(): # some hikvision cams won't update MoveStatus, so warn if it hasn't changed
try: if (
self.loop.stop() not self.ptz_metrics[camera_name].motor_stopped.is_set()
except Exception as e: and not self.ptz_metrics[camera_name].reset.is_set()
logger.error(f"Error during loop cleanup: {e}") and self.ptz_metrics[camera_name].start_time.value != 0
and self.ptz_metrics[camera_name].frame_time.value
# Schedule stop and cleanup in the loop thread > (self.ptz_metrics[camera_name].start_time.value + 10)
self.loop.call_soon_threadsafe(stop_and_cleanup) and self.ptz_metrics[camera_name].stop_time.value == 0
):
self.loop_thread.join() logger.debug(
f"Start time: {self.ptz_metrics[camera_name].start_time.value}, Stop time: {self.ptz_metrics[camera_name].stop_time.value}, Frame time: {self.ptz_metrics[camera_name].frame_time.value}"
)
# set the stop time so we don't come back into this again and spam the logs
self.ptz_metrics[camera_name].stop_time.value = self.ptz_metrics[
camera_name
].frame_time.value
logger.warning(f"Camera {camera_name} is still in ONVIF 'MOVING' status.")