feat: Add position-based PTZ idle detection for buggy cameras

Adds optional position-based movement detection for PTZ cameras with
unreliable ONVIF MoveStatus reporting (e.g., Hikvision DS-2DE2A404IW-DE3).

Changes:
- Add ptz_idle_method config parameter (default: "status")
- Extract helper methods to eliminate code duplication
- Add position-based idle detection monitoring pan/tilt/zoom position
- Maintain 100% backward compatibility with existing behavior

Tested with 3 identical Hikvision DS-2DE2A404IW-DE3 cameras over 33+ hours
in production, 363 tracking movements with 99.9%+ reduction in tracking
failures.
This commit is contained in:
hipitihop 2026-02-10 17:39:59 +10:00
parent 6accc38275
commit 9edee5301e
2 changed files with 218 additions and 73 deletions

View File

@ -42,6 +42,10 @@ class PtzAutotrackConfig(FrigateBaseModel):
timeout: int = Field(
default=10, title="Seconds to delay before returning to preset."
)
ptz_idle_method: str = Field(
default="status",
title="PTZ idle detection method: 'status' (default) or 'position' (for buggy MoveStatus).",
)
movement_weights: Optional[Union[str, list[str]]] = Field(
default_factory=list,
title="Internal value used for PTZ movements based on the speed of your camera's motor.",

View File

@ -49,6 +49,7 @@ class OnvifController:
self.reset_timeout = 900 # 15 minutes
self.config = config
self.ptz_metrics = ptz_metrics
self.position_tracker: dict[str, dict] = {} # Track positions for position-based detection
self.status_locks: dict[str, asyncio.Lock] = {}
@ -858,6 +859,49 @@ class OnvifController:
)
return False
def _update_zoom_level(self, camera_name: str, status) -> None:
"""Calculate and update zoom level metric if zooming is enabled."""
if (
self.config.cameras[camera_name].onvif.autotracking.zooming
!= ZoomingModeEnum.disabled
):
self.ptz_metrics[camera_name].zoom_level.value = numpy.interp(
round(status.Position.Zoom.x, 2),
[
self.cams[camera_name]["absolute_zoom_range"]["XRange"]["Min"],
self.cams[camera_name]["absolute_zoom_range"]["XRange"]["Max"],
],
[0, 1],
)
logger.debug(
f"{camera_name}: Camera zoom level: {self.ptz_metrics[camera_name].zoom_level.value}"
)
def _set_motor_stopped(self, camera_name: str) -> None:
"""Handle motor stopped state transition."""
self.cams[camera_name]["active"] = False
if not self.ptz_metrics[camera_name].motor_stopped.is_set():
self.ptz_metrics[camera_name].motor_stopped.set()
logger.debug(
f"{camera_name}: PTZ stop time: {self.ptz_metrics[camera_name].frame_time.value}"
)
self.ptz_metrics[camera_name].stop_time.value = self.ptz_metrics[
camera_name
].frame_time.value
def _set_motor_moving(self, camera_name: str) -> None:
"""Handle motor moving state transition."""
self.cams[camera_name]["active"] = True
if self.ptz_metrics[camera_name].motor_stopped.is_set():
self.ptz_metrics[camera_name].motor_stopped.clear()
logger.debug(
f"{camera_name}: PTZ start time: {self.ptz_metrics[camera_name].frame_time.value}"
)
self.ptz_metrics[camera_name].start_time.value = self.ptz_metrics[
camera_name
].frame_time.value
self.ptz_metrics[camera_name].stop_time.value = 0
async def get_camera_status(self, camera_name: str) -> None:
async with self.status_locks[camera_name]:
if camera_name not in self.cams.keys():
@ -868,100 +912,197 @@ class OnvifController:
if not await self._init_onvif(camera_name):
return
# Get PTZ idle detection method from config (default to "status")
ptz_idle_method = getattr(
self.config.cameras[camera_name].onvif.autotracking,
'ptz_idle_method',
'status'
)
status_request = self.cams[camera_name]["status_request"]
try:
status = await self.cams[camera_name]["ptz"].GetStatus(status_request)
except Exception:
pass # We're unsupported, that'll be reported in the next check.
try:
pan_tilt_status = getattr(status.MoveStatus, "PanTilt", None)
zoom_status = getattr(status.MoveStatus, "Zoom", None)
# Check if position-based detection is enabled
if ptz_idle_method == "position":
# Position-based detection for cameras with buggy MoveStatus (e.g., Hikvision)
await self._check_position_based_idle(camera_name, status)
else:
# Default: Status-based detection using MoveStatus field
await self._check_status_based_idle(camera_name, status)
# if it's not an attribute, see if MoveStatus even exists in the status result
if pan_tilt_status is None:
pan_tilt_status = getattr(status, "MoveStatus", None)
async def _check_status_based_idle(self, camera_name: str, status) -> None:
"""Original MoveStatus-based idle detection (default behavior)."""
try:
pan_tilt_status = getattr(status.MoveStatus, "PanTilt", None)
zoom_status = getattr(status.MoveStatus, "Zoom", None)
# we're unsupported
if pan_tilt_status is None or pan_tilt_status not in [
"IDLE",
"MOVING",
]:
raise Exception
except Exception:
logger.warning(
f"Camera {camera_name} does not support the ONVIF GetStatus method. Autotracking will not function correctly and must be disabled in your config."
)
return
# if it's not an attribute, see if MoveStatus even exists in the status result
if pan_tilt_status is None:
pan_tilt_status = getattr(status, "MoveStatus", None)
# we're unsupported
if pan_tilt_status is None or pan_tilt_status not in [
"IDLE",
"MOVING",
]:
raise Exception
except Exception:
logger.warning(
f"Camera {camera_name} does not support the ONVIF GetStatus method. "
f"Autotracking will not function correctly and must be disabled in your config."
)
return
logger.debug(
f"{camera_name}: Pan/tilt status: {pan_tilt_status}, Zoom status: {zoom_status}"
)
if pan_tilt_status == "IDLE" and (
zoom_status is None or zoom_status == "IDLE"
):
self._set_motor_stopped(camera_name)
else:
self._set_motor_moving(camera_name)
self._update_zoom_level(camera_name, status)
# some hikvision cams won't update MoveStatus, so warn if it hasn't changed
if (
not self.ptz_metrics[camera_name].motor_stopped.is_set()
and not self.ptz_metrics[camera_name].reset.is_set()
and self.ptz_metrics[camera_name].start_time.value != 0
and self.ptz_metrics[camera_name].frame_time.value
> (self.ptz_metrics[camera_name].start_time.value + 10)
and self.ptz_metrics[camera_name].stop_time.value == 0
):
logger.debug(
f"{camera_name}: Pan/tilt status: {pan_tilt_status}, Zoom status: {zoom_status}"
f"Start time: {self.ptz_metrics[camera_name].start_time.value}, "
f"Stop time: {self.ptz_metrics[camera_name].stop_time.value}, "
f"Frame time: {self.ptz_metrics[camera_name].frame_time.value}"
)
# set the stop time so we don't come back into this again and spam the logs
self.ptz_metrics[camera_name].stop_time.value = self.ptz_metrics[
camera_name
].frame_time.value
logger.warning(
f"Camera {camera_name} is still in ONVIF 'MOVING' status."
)
if pan_tilt_status == "IDLE" and (
zoom_status is None or zoom_status == "IDLE"
):
self.cams[camera_name]["active"] = False
if not self.ptz_metrics[camera_name].motor_stopped.is_set():
self.ptz_metrics[camera_name].motor_stopped.set()
async def _check_position_based_idle(self, camera_name: str, status) -> None:
"""
Position-based idle detection for cameras with buggy MoveStatus (e.g., Hikvision).
logger.debug(
f"{camera_name}: PTZ stop time: {self.ptz_metrics[camera_name].frame_time.value}"
)
self.ptz_metrics[camera_name].stop_time.value = self.ptz_metrics[
camera_name
].frame_time.value
else:
self.cams[camera_name]["active"] = True
if self.ptz_metrics[camera_name].motor_stopped.is_set():
self.ptz_metrics[camera_name].motor_stopped.clear()
logger.debug(
f"{camera_name}: PTZ start time: {self.ptz_metrics[camera_name].frame_time.value}"
)
self.ptz_metrics[camera_name].start_time.value = self.ptz_metrics[
camera_name
].frame_time.value
self.ptz_metrics[camera_name].stop_time.value = 0
Monitors camera position and detects when movement has stopped by checking
if position remains stable for a configured duration.
"""
try:
# Get current position
current_position = {
'pan': float(status.Position.PanTilt.x),
'tilt': float(status.Position.PanTilt.y),
'time': time.time()
}
# Handle zoom if enabled
if (
self.config.cameras[camera_name].onvif.autotracking.zooming
!= ZoomingModeEnum.disabled
):
# store absolute zoom level as 0 to 1 interpolated from the values of the camera
self.ptz_metrics[camera_name].zoom_level.value = numpy.interp(
round(status.Position.Zoom.x, 2),
[
self.cams[camera_name]["absolute_zoom_range"]["XRange"]["Min"],
self.cams[camera_name]["absolute_zoom_range"]["XRange"]["Max"],
],
[0, 1],
)
current_position['zoom'] = float(status.Position.Zoom.x)
# Update zoom level metric
self._update_zoom_level(camera_name, status)
# Initialize position tracker for this camera if needed
if camera_name not in self.position_tracker:
self.position_tracker[camera_name] = {
'last_position': None,
'stable_since': None
}
tracker = self.position_tracker[camera_name]
last_position = tracker['last_position']
# Thresholds
POSITION_THRESHOLD = 0.01 # Position change threshold
STABLE_DURATION = 0.3 # Seconds position must be stable
POSITION_EXPIRY = 5.0 # Seconds before position data is considered stale
logger.debug(
f"{camera_name}: Position - Pan: {current_position['pan']:.3f}, "
f"Tilt: {current_position['tilt']:.3f}"
)
# Check if stored position is stale (expired)
if last_position is not None:
age = current_position['time'] - last_position['time']
if age > POSITION_EXPIRY:
logger.debug(
f"{camera_name}: Position data expired ({age:.1f}s old), resetting tracker"
)
tracker['last_position'] = None
tracker['stable_since'] = None
last_position = None
if last_position is not None:
# Calculate maximum delta across pan/tilt (and zoom if enabled)
delta_pan = abs(current_position['pan'] - last_position['pan'])
delta_tilt = abs(current_position['tilt'] - last_position['tilt'])
delta = max(delta_pan, delta_tilt)
if 'zoom' in current_position and 'zoom' in last_position:
delta_zoom = abs(current_position['zoom'] - last_position['zoom'])
delta = max(delta, delta_zoom)
logger.debug(
f"{camera_name}: Camera zoom level: {self.ptz_metrics[camera_name].zoom_level.value}"
f"{camera_name}: Position delta: {delta:.4f} "
f"(pan: {delta_pan:.4f}, tilt: {delta_tilt:.4f})"
)
# some hikvision cams won't update MoveStatus, so warn if it hasn't changed
if (
not self.ptz_metrics[camera_name].motor_stopped.is_set()
and not self.ptz_metrics[camera_name].reset.is_set()
and self.ptz_metrics[camera_name].start_time.value != 0
and self.ptz_metrics[camera_name].frame_time.value
> (self.ptz_metrics[camera_name].start_time.value + 10)
and self.ptz_metrics[camera_name].stop_time.value == 0
):
logger.debug(
f"Start time: {self.ptz_metrics[camera_name].start_time.value}, Stop time: {self.ptz_metrics[camera_name].stop_time.value}, Frame time: {self.ptz_metrics[camera_name].frame_time.value}"
)
# set the stop time so we don't come back into this again and spam the logs
self.ptz_metrics[camera_name].stop_time.value = self.ptz_metrics[
camera_name
].frame_time.value
logger.warning(
f"Camera {camera_name} is still in ONVIF 'MOVING' status."
)
if delta < POSITION_THRESHOLD:
# Position hasn't changed significantly
if tracker['stable_since'] is None:
tracker['stable_since'] = current_position['time']
logger.debug(f"{camera_name}: Position stable, starting timer")
else:
stable_duration = current_position['time'] - tracker['stable_since']
if stable_duration >= STABLE_DURATION:
# Position has been stable long enough - movement complete
if not self.ptz_metrics[camera_name].motor_stopped.is_set():
logger.debug(
f"{camera_name}: Position stable for {stable_duration:.2f}s - "
f"movement complete"
)
self._set_motor_stopped(camera_name)
else:
# Position is still changing - movement in progress
tracker['stable_since'] = None
if self.ptz_metrics[camera_name].motor_stopped.is_set():
# Movement just started - reset position tracker for clean state
logger.debug(
f"{camera_name}: Position changing - movement started, "
f"resetting position tracker"
)
tracker['last_position'] = None
tracker['stable_since'] = None
self._set_motor_moving(camera_name)
else:
# First position reading - initialize
logger.debug(f"{camera_name}: First position reading")
# Update last position
tracker['last_position'] = current_position
except Exception as e:
logger.error(
f"Camera {camera_name}: Error in position-based idle detection: {e}. "
f"Falling back to status-based detection."
)
# Fallback to status-based detection on error
await self._check_status_based_idle(camera_name, status)
def close(self) -> None:
"""Gracefully shut down the ONVIF controller."""