use asyncio for autotracking move queues

This commit is contained in:
Josh Hawkins 2025-05-06 17:27:04 -05:00
parent 20647ce891
commit 9c97b89aed

View File

@ -3,11 +3,9 @@
import asyncio import asyncio
import copy import copy
import logging import logging
import queue
import threading import threading
import time import time
from collections import deque from collections import deque
from functools import partial
from multiprocessing.synchronize import Event as MpEvent from multiprocessing.synchronize import Event as MpEvent
import cv2 import cv2
@ -251,8 +249,8 @@ class PtzAutoTracker:
self.intercept[camera] = None self.intercept[camera] = None
self.move_coefficients[camera] = [] self.move_coefficients[camera] = []
self.move_queues[camera] = queue.Queue() self.move_queues[camera] = asyncio.Queue()
self.move_queue_locks[camera] = threading.Lock() self.move_queue_locks[camera] = asyncio.Lock()
# handle onvif constructor failing due to no connection # handle onvif constructor failing due to no connection
if camera not in self.onvif.cams: if camera not in self.onvif.cams:
@ -298,13 +296,10 @@ class PtzAutoTracker:
if self.onvif.cams[camera]["init"]: if self.onvif.cams[camera]["init"]:
await self.onvif.get_camera_status(camera) await self.onvif.get_camera_status(camera)
# movement thread per camera # movement queue with asyncio on OnvifController loop
self.move_threads[camera] = threading.Thread( asyncio.run_coroutine_threadsafe(
name=f"ptz_move_thread_{camera}", self._process_move_queue(camera), self.onvif.loop
target=partial(self._process_move_queue, camera),
) )
self.move_threads[camera].daemon = True
self.move_threads[camera].start()
if camera_config.onvif.autotracking.movement_weights: if camera_config.onvif.autotracking.movement_weights:
if len(camera_config.onvif.autotracking.movement_weights) == 6: if len(camera_config.onvif.autotracking.movement_weights) == 6:
@ -718,18 +713,17 @@ class PtzAutoTracker:
centroid_distance < self.tracked_object_metrics[camera]["distance"] centroid_distance < self.tracked_object_metrics[camera]["distance"]
) )
def _process_move_queue(self, camera): async def _process_move_queue(self, camera):
camera_config = self.config.cameras[camera] move_queue = self.move_queues[camera]
camera_config.frame_shape[1]
camera_config.frame_shape[0]
while not self.stop_event.is_set(): while not self.stop_event.is_set():
try: try:
move_data = self.move_queues[camera].get(True, 0.1) # Asynchronously wait for move data with a timeout
except queue.Empty: move_data = await asyncio.wait_for(move_queue.get(), timeout=0.1)
except asyncio.TimeoutError:
continue continue
with self.move_queue_locks[camera]: async with self.move_queue_locks[camera]:
frame_time, pan, tilt, zoom = move_data frame_time, pan, tilt, zoom = move_data
# if we're receiving move requests during a PTZ move, ignore them # if we're receiving move requests during a PTZ move, ignore them
@ -738,8 +732,6 @@ class PtzAutoTracker:
self.ptz_metrics[camera].start_time.value, self.ptz_metrics[camera].start_time.value,
self.ptz_metrics[camera].stop_time.value, self.ptz_metrics[camera].stop_time.value,
): ):
# instead of dequeueing this might be a good place to preemptively move based
# on an estimate - for fast moving objects, etc.
logger.debug( logger.debug(
f"{camera}: Move queue: PTZ moving, dequeueing move request - frame time: {frame_time}, final pan: {pan}, final tilt: {tilt}, final zoom: {zoom}" f"{camera}: Move queue: PTZ moving, dequeueing move request - frame time: {frame_time}, final pan: {pan}, final tilt: {tilt}, final zoom: {zoom}"
) )
@ -750,44 +742,24 @@ class PtzAutoTracker:
self.config.cameras[camera].onvif.autotracking.zooming self.config.cameras[camera].onvif.autotracking.zooming
== ZoomingModeEnum.relative == ZoomingModeEnum.relative
): ):
future = asyncio.run_coroutine_threadsafe( await self.onvif._move_relative(camera, pan, tilt, zoom, 1)
self.onvif._move_relative(camera, pan, tilt, zoom, 1),
self.onvif.loop,
)
future.result()
else: else:
if pan != 0 or tilt != 0: if pan != 0 or tilt != 0:
future = asyncio.run_coroutine_threadsafe( await self.onvif._move_relative(camera, pan, tilt, 0, 1)
self.onvif._move_relative(camera, pan, tilt, 0, 1),
self.onvif.loop,
)
future.result()
# Wait until the camera finishes moving # Wait until the camera finishes moving
while not self.ptz_metrics[camera].motor_stopped.is_set(): while not self.ptz_metrics[camera].motor_stopped.is_set():
future = asyncio.run_coroutine_threadsafe( await self.onvif.get_camera_status(camera)
self.onvif.get_camera_status(camera),
self.onvif.loop,
)
future.result()
if ( if (
zoom > 0 zoom > 0
and self.ptz_metrics[camera].zoom_level.value != zoom and self.ptz_metrics[camera].zoom_level.value != zoom
): ):
future = asyncio.run_coroutine_threadsafe( await self.onvif._zoom_absolute(camera, zoom, 1)
self.onvif._zoom_absolute(camera, zoom, 1),
self.onvif.loop,
)
future.result()
# Wait until the camera finishes moving # Wait until the camera finishes moving
while not self.ptz_metrics[camera].motor_stopped.is_set(): while not self.ptz_metrics[camera].motor_stopped.is_set():
future = asyncio.run_coroutine_threadsafe( await self.onvif.get_camera_status(camera)
self.onvif.get_camera_status(camera), self.onvif.loop
)
future.result()
if self.config.cameras[camera].onvif.autotracking.movement_weights: if self.config.cameras[camera].onvif.autotracking.movement_weights:
logger.debug( logger.debug(
@ -824,6 +796,10 @@ class PtzAutoTracker:
# calculate new coefficients if we have enough data # calculate new coefficients if we have enough data
self._calculate_move_coefficients(camera) self._calculate_move_coefficients(camera)
# Clean up the queue on exit
while not move_queue.empty():
await move_queue.get()
def _enqueue_move(self, camera, frame_time, pan, tilt, zoom): def _enqueue_move(self, camera, frame_time, pan, tilt, zoom):
def split_value(value, suppress_diff=True): def split_value(value, suppress_diff=True):
clipped = np.clip(value, -1, 1) clipped = np.clip(value, -1, 1)
@ -852,7 +828,9 @@ class PtzAutoTracker:
f"{camera}: Enqueue movement for frame time: {frame_time} pan: {pan}, tilt: {tilt}, zoom: {zoom}" f"{camera}: Enqueue movement for frame time: {frame_time} pan: {pan}, tilt: {tilt}, zoom: {zoom}"
) )
move_data = (frame_time, pan, tilt, zoom) move_data = (frame_time, pan, tilt, zoom)
self.move_queues[camera].put(move_data) self.onvif.loop.call_soon_threadsafe(
self.move_queues[camera].put_nowait, move_data
)
# reset values to not split up large movements # reset values to not split up large movements
pan = 0 pan = 0
@ -1483,10 +1461,6 @@ class PtzAutoTracker:
self.tracked_object[camera] = None self.tracked_object[camera] = None
self.tracked_object_history[camera].clear() self.tracked_object_history[camera].clear()
# empty move queue
while not self.move_queues[camera].empty():
self.move_queues[camera].get()
self.ptz_metrics[camera].motor_stopped.wait() self.ptz_metrics[camera].motor_stopped.wait()
logger.debug( logger.debug(
f"{camera}: Time is {self.ptz_metrics[camera].frame_time.value}, returning to preset: {autotracker_config.return_preset}" f"{camera}: Time is {self.ptz_metrics[camera].frame_time.value}, returning to preset: {autotracker_config.return_preset}"