frigate/frigate/ptz/autotrack.py

766 lines
31 KiB
Python
Raw Normal View History

"""Automatically pan, tilt, and zoom on detected objects via onvif."""
import copy
import logging
import math
2023-09-22 17:02:10 +03:00
import os
import queue
import threading
import time
from functools import partial
from multiprocessing.synchronize import Event as MpEvent
import cv2
import numpy as np
2023-09-01 16:58:50 +03:00
from norfair.camera_motion import (
HomographyTransformationGetter,
MotionEstimator,
TranslationTransformationGetter,
)
2023-09-06 17:08:48 +03:00
from frigate.config import CameraConfig, FrigateConfig, ZoomingModeEnum
2023-09-22 17:02:10 +03:00
from frigate.const import CONFIG_DIR
from frigate.ptz.onvif import OnvifController
from frigate.types import PTZMetricsTypes
2023-09-22 17:02:10 +03:00
from frigate.util.builtin import update_yaml_file
from frigate.util.image import SharedMemoryFrameManager, intersection_over_union
logger = logging.getLogger(__name__)
def ptz_moving_at_frame_time(frame_time, ptz_start_time, ptz_stop_time):
# Determine if the PTZ was in motion at the set frame time
# for non ptz/autotracking cameras, this will always return False
# ptz_start_time is initialized to 0 on startup and only changes
# when autotracking movements are made
return (ptz_start_time != 0.0 and frame_time > ptz_start_time) and (
ptz_stop_time == 0.0 or (ptz_start_time <= frame_time <= ptz_stop_time)
)
class PtzMotionEstimator:
def __init__(
self, config: CameraConfig, ptz_metrics: dict[str, PTZMetricsTypes]
) -> None:
self.frame_manager = SharedMemoryFrameManager()
self.norfair_motion_estimator = None
self.camera_config = config
self.coord_transformations = None
self.ptz_metrics = ptz_metrics
self.ptz_start_time = self.ptz_metrics["ptz_start_time"]
self.ptz_stop_time = self.ptz_metrics["ptz_stop_time"]
self.ptz_metrics["ptz_reset"].set()
logger.debug(f"Motion estimator init for cam: {config.name}")
def motion_estimator(self, detections, frame_time, camera_name):
# If we've just started up or returned to our preset, reset motion estimator for new tracking session
if self.ptz_metrics["ptz_reset"].is_set():
self.ptz_metrics["ptz_reset"].clear()
2023-09-01 22:43:15 +03:00
# homography is nice (zooming) but slow, translation is pan/tilt only but fast.
2023-09-06 17:08:48 +03:00
if (
self.camera_config.onvif.autotracking.zooming
!= ZoomingModeEnum.disabled
):
2023-09-01 16:58:50 +03:00
logger.debug("Motion estimator reset - homography")
2023-09-01 22:43:15 +03:00
transformation_type = HomographyTransformationGetter()
2023-09-01 16:58:50 +03:00
else:
logger.debug("Motion estimator reset - translation")
2023-09-01 22:43:15 +03:00
transformation_type = TranslationTransformationGetter()
self.norfair_motion_estimator = MotionEstimator(
transformations_getter=transformation_type,
min_distance=30,
max_points=900,
)
2023-09-01 16:58:50 +03:00
self.coord_transformations = None
if ptz_moving_at_frame_time(
frame_time, self.ptz_start_time.value, self.ptz_stop_time.value
):
logger.debug(
f"Motion estimator running for {camera_name} - frame time: {frame_time}, {self.ptz_start_time.value}, {self.ptz_stop_time.value}"
)
frame_id = f"{camera_name}{frame_time}"
yuv_frame = self.frame_manager.get(
frame_id, self.camera_config.frame_shape_yuv
)
frame = cv2.cvtColor(yuv_frame, cv2.COLOR_YUV2GRAY_I420)
# mask out detections for better motion estimation
mask = np.ones(frame.shape[:2], frame.dtype)
detection_boxes = [x[2] for x in detections]
for detection in detection_boxes:
x1, y1, x2, y2 = detection
mask[y1:y2, x1:x2] = 0
# merge camera config motion mask with detections. Norfair function needs 0,1 mask
mask = np.bitwise_and(mask, self.camera_config.motion.mask).clip(max=1)
# Norfair estimator function needs color so it can convert it right back to gray
frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGRA)
self.coord_transformations = self.norfair_motion_estimator.update(
frame, mask
)
self.frame_manager.close(frame_id)
2023-09-22 17:02:10 +03:00
logger.debug(
f"Motion estimator transformation: {self.coord_transformations.rel_to_abs([[0,0]])}"
)
return self.coord_transformations
class PtzAutoTrackerThread(threading.Thread):
def __init__(
self,
config: FrigateConfig,
onvif: OnvifController,
ptz_metrics: dict[str, PTZMetricsTypes],
stop_event: MpEvent,
) -> None:
threading.Thread.__init__(self)
self.name = "ptz_autotracker"
self.ptz_autotracker = PtzAutoTracker(config, onvif, ptz_metrics)
self.stop_event = stop_event
self.config = config
def run(self):
while not self.stop_event.wait(1):
for camera_name, cam in self.config.cameras.items():
if cam.onvif.autotracking.enabled:
self.ptz_autotracker.camera_maintenance(camera_name)
else:
# disabled dynamically by mqtt
if self.ptz_autotracker.tracked_object.get(camera_name):
self.ptz_autotracker.tracked_object[camera_name] = None
self.ptz_autotracker.tracked_object_previous[camera_name] = None
logger.info("Exiting autotracker...")
class PtzAutoTracker:
def __init__(
self,
config: FrigateConfig,
onvif: OnvifController,
ptz_metrics: PTZMetricsTypes,
) -> None:
self.config = config
self.onvif = onvif
self.ptz_metrics = ptz_metrics
self.tracked_object: dict[str, object] = {}
self.tracked_object_previous: dict[str, object] = {}
2023-09-22 17:02:10 +03:00
self.previous_frame_time: dict[str, object] = {}
2023-09-22 18:45:16 +03:00
self.object_types: dict[str, object] = {}
self.required_zones: dict[str, object] = {}
self.move_queues: dict[str, object] = {}
self.move_threads: dict[str, object] = {}
self.autotracker_init: dict[str, object] = {}
2023-09-22 17:02:10 +03:00
self.move_metrics: dict[str, object] = {}
self.calibrating: dict[str, object] = {}
self.intercept: dict[str, object] = {}
self.move_coefficients: dict[str, object] = {}
# if cam is set to autotrack, onvif should be set up
for camera_name, cam in self.config.cameras.items():
self.autotracker_init[camera_name] = False
if cam.onvif.autotracking.enabled:
self._autotracker_setup(cam, camera_name)
def _autotracker_setup(self, cam, camera_name):
logger.debug(f"Autotracker init for cam: {camera_name}")
self.object_types[camera_name] = cam.onvif.autotracking.track
self.required_zones[camera_name] = cam.onvif.autotracking.required_zones
self.tracked_object[camera_name] = None
self.tracked_object_previous[camera_name] = None
2023-09-22 17:02:10 +03:00
self.calibrating[camera_name] = False
self.move_metrics[camera_name] = []
self.intercept[camera_name] = None
2023-09-22 17:02:10 +03:00
self.move_coefficients[camera_name] = []
self.move_queues[camera_name] = queue.Queue()
if not self.onvif.cams[camera_name]["init"]:
if not self.onvif._init_onvif(camera_name):
logger.warning(f"Unable to initialize onvif for {camera_name}")
cam.onvif.autotracking.enabled = False
self.ptz_metrics[camera_name]["ptz_autotracker_enabled"].value = False
return
if "pt-r-fov" not in self.onvif.cams[camera_name]["features"]:
cam.onvif.autotracking.enabled = False
self.ptz_metrics[camera_name]["ptz_autotracker_enabled"].value = False
logger.warning(
f"Disabling autotracking for {camera_name}: FOV relative movement not supported"
)
return
movestatus_supported = self.onvif.get_service_capabilities(camera_name)
if movestatus_supported is None or movestatus_supported.lower() != "true":
cam.onvif.autotracking.enabled = False
self.ptz_metrics[camera_name]["ptz_autotracker_enabled"].value = False
logger.warning(
f"Disabling autotracking for {camera_name}: ONVIF MoveStatus not supported"
)
return
2023-09-04 00:48:49 +03:00
self.onvif.get_camera_status(camera_name)
# movement thread per camera
if not self.move_threads or not self.move_threads[camera_name]:
self.move_threads[camera_name] = threading.Thread(
name=f"move_thread_{camera_name}",
target=partial(self._process_move_queue, camera_name),
)
self.move_threads[camera_name].daemon = True
self.move_threads[camera_name].start()
2023-09-22 17:02:10 +03:00
if cam.onvif.autotracking.movement_weights:
self.intercept[camera_name] = cam.onvif.autotracking.movement_weights[0]
self.move_coefficients[
camera_name
] = cam.onvif.autotracking.movement_weights[1:]
if cam.onvif.autotracking.calibrate_on_startup:
self._calibrate_camera(camera_name)
self.autotracker_init[camera_name] = True
2023-09-22 17:02:10 +03:00
def write_config(self, camera):
config_file = os.environ.get("CONFIG_FILE", f"{CONFIG_DIR}/config.yml")
logger.debug(
f"Writing new config with autotracker motion coefficients: {self.config.cameras[camera].onvif.autotracking.movement_weights}"
)
update_yaml_file(
config_file,
["cameras", camera, "onvif", "autotracking", "movement_weights"],
self.config.cameras[camera].onvif.autotracking.movement_weights,
)
def _calibrate_camera(self, camera):
# move the camera from the preset in steps and measure the time it takes to move that amount
# this will allow us to predict movement times with a simple linear regression
# start with 0 so we can determine a baseline (to be used as the intercept in the regression calc)
# TODO: take zooming into account too
num_steps = 30
step_sizes = np.linspace(0, 1, num_steps)
self.calibrating[camera] = True
logger.info(f"Camera calibration for {camera} in progress")
2023-09-22 17:02:10 +03:00
self.onvif._move_to_preset(
camera,
self.config.cameras[camera].onvif.autotracking.return_preset.lower(),
)
self.ptz_metrics[camera]["ptz_reset"].set()
self.ptz_metrics[camera]["ptz_stopped"].clear()
# Wait until the camera finishes moving
while not self.ptz_metrics[camera]["ptz_stopped"].is_set():
self.onvif.get_camera_status(camera)
for step in range(num_steps):
pan = step_sizes[step]
tilt = step_sizes[step]
start_time = time.time()
2023-09-22 17:02:10 +03:00
self.onvif._move_relative(camera, pan, tilt, 0, 1)
# Wait until the camera finishes moving
while not self.ptz_metrics[camera]["ptz_stopped"].is_set():
self.onvif.get_camera_status(camera)
stop_time = time.time()
2023-09-22 17:02:10 +03:00
self.move_metrics[camera].append(
{
"pan": pan,
"tilt": tilt,
"start_timestamp": start_time,
"end_timestamp": stop_time,
2023-09-22 17:02:10 +03:00
}
)
self.onvif._move_to_preset(
camera,
self.config.cameras[camera].onvif.autotracking.return_preset.lower(),
)
self.ptz_metrics[camera]["ptz_reset"].set()
self.ptz_metrics[camera]["ptz_stopped"].clear()
# Wait until the camera finishes moving
while not self.ptz_metrics[camera]["ptz_stopped"].is_set():
self.onvif.get_camera_status(camera)
self.calibrating[camera] = False
logger.info(f"Calibration for {camera} complete")
2023-09-22 17:02:10 +03:00
# calculate and save new intercept and coefficients
self._calculate_move_coefficients(camera, True)
def _calculate_move_coefficients(self, camera, calibration=False):
# calculate new coefficients when we have 50 more new values. Save up to 500
if calibration or (
2023-09-22 18:55:36 +03:00
len(self.move_metrics[camera]) % 50 == 0
2023-09-22 17:02:10 +03:00
and len(self.move_metrics[camera]) != 0
and len(self.move_metrics[camera]) <= 500
):
X = np.array(
[abs(d["pan"]) + abs(d["tilt"]) for d in self.move_metrics[camera]]
)
y = np.array(
[
d["end_timestamp"] - d["start_timestamp"]
for d in self.move_metrics[camera]
]
)
# simple linear regression with intercept
X_with_intercept = np.column_stack((np.ones(X.shape[0]), X))
self.move_coefficients[camera] = np.linalg.lstsq(
X_with_intercept, y, rcond=None
)[0]
# only assign a new intercept if we're calibrating
if calibration:
self.intercept[camera] = y[0]
2023-09-22 17:02:10 +03:00
# write the intercept and coefficients back to the config file as a comma separated string
movement_weights = np.concatenate(
([self.intercept[camera]], self.move_coefficients[camera])
)
self.config.cameras[camera].onvif.autotracking.movement_weights = ", ".join(
map(str, movement_weights)
)
logger.debug(
f"New regression parameters - intercept: {self.intercept[camera]}, coefficients: {self.move_coefficients[camera]}"
)
self.write_config(camera)
def _predict_movement_time(self, camera, pan, tilt):
combined_movement = abs(pan) + abs(tilt)
input_data = np.array([self.intercept[camera], combined_movement])
return np.dot(self.move_coefficients[camera], input_data)
def _process_move_queue(self, camera):
while True:
try:
move_data = self.move_queues[camera].get()
2023-09-01 16:10:12 +03:00
frame_time, pan, tilt, zoom = move_data
# if we're receiving move requests during a PTZ move, ignore them
if ptz_moving_at_frame_time(
frame_time,
self.ptz_metrics[camera]["ptz_start_time"].value,
self.ptz_metrics[camera]["ptz_stop_time"].value,
):
# instead of dequeueing this might be a good place to preemptively move based
# on an estimate - for fast moving objects, etc.
logger.debug(
2023-09-01 16:10:12 +03:00
f"Move queue: PTZ moving, dequeueing move request - frame time: {frame_time}, final pan: {pan}, final tilt: {tilt}, final zoom: {zoom}"
)
continue
else:
if (
self.config.cameras[camera].onvif.autotracking.zooming
2023-09-06 17:08:48 +03:00
== ZoomingModeEnum.relative
):
self.onvif._move_relative(camera, pan, tilt, zoom, 1)
else:
if zoom > 0:
self.onvif._zoom_absolute(camera, zoom, 1)
else:
2023-09-05 15:19:27 +03:00
self.onvif._move_relative(camera, pan, tilt, 0, 1)
# Wait until the camera finishes moving
while not self.ptz_metrics[camera]["ptz_stopped"].is_set():
# check if ptz is moving
self.onvif.get_camera_status(camera)
2023-09-22 17:02:10 +03:00
if self.config.cameras[camera].onvif.autotracking.movement_weights:
logger.debug(
f"Predicted movement time: {self._predict_movement_time(camera, pan, tilt)}"
)
logger.debug(
f'Actual movement time: {self.ptz_metrics[camera]["ptz_stop_time"].value-self.ptz_metrics[camera]["ptz_start_time"].value}'
)
# save metrics for better estimate calculations
if (
self.intercept[camera] is not None
and len(self.move_metrics[camera]) < 500
):
logger.debug("Adding new values to move metrics")
self.move_metrics[camera].append(
{
"pan": pan,
"tilt": tilt,
"start_timestamp": self.ptz_metrics[camera][
"ptz_start_time"
].value,
"end_timestamp": self.ptz_metrics[camera][
"ptz_stop_time"
].value,
}
)
2023-09-22 17:02:10 +03:00
# calculate new coefficients if we have enough data
self._calculate_move_coefficients(camera)
except queue.Empty:
continue
2023-09-01 16:10:12 +03:00
def _enqueue_move(self, camera, frame_time, pan, tilt, zoom):
def split_value(value):
clipped = np.clip(value, -1, 1)
return clipped, value - clipped
if (
frame_time > self.ptz_metrics[camera]["ptz_start_time"].value
and frame_time > self.ptz_metrics[camera]["ptz_stop_time"].value
and self.move_queues[camera].qsize() == 0
):
2023-09-05 02:34:14 +03:00
# don't make small movements
if abs(pan) < 0.02:
pan = 0
if abs(tilt) < 0.02:
tilt = 0
# split up any large moves caused by velocity estimated movements
while pan != 0 or tilt != 0 or zoom != 0:
pan, pan_excess = split_value(pan)
tilt, tilt_excess = split_value(tilt)
zoom, zoom_excess = split_value(zoom)
2023-09-05 02:34:14 +03:00
logger.debug(
f"Enqueue movement for frame time: {frame_time} pan: {pan}, enqueue tilt: {tilt}, enqueue zoom: {zoom}"
2023-09-05 02:34:14 +03:00
)
2023-09-05 20:02:50 +03:00
move_data = (frame_time, pan, tilt, zoom)
2023-09-05 02:34:14 +03:00
self.move_queues[camera].put(move_data)
pan = pan_excess
tilt = tilt_excess
zoom = zoom_excess
def _should_zoom_in(self, camera, box, area):
camera_config = self.config.cameras[camera]
camera_width = camera_config.frame_shape[1]
camera_height = camera_config.frame_shape[0]
camera_area = camera_width * camera_height
bb_left, bb_top, bb_right, bb_bottom = box
# If bounding box is not within 5% of an edge
# If object area is less than 70% of frame
# Then zoom in, otherwise try zooming out
# should we make these configurable?
#
# TODO: Take into account the area changing when an object is moving out of frame
edge_threshold = 0.15
area_threshold = 0.7
# returns True to zoom in, False to zoom out
return (
bb_left > edge_threshold * camera_width
and bb_right < (1 - edge_threshold) * camera_width
and bb_top > edge_threshold * camera_height
and bb_bottom < (1 - edge_threshold) * camera_height
and area < area_threshold * camera_area
)
def _autotrack_move_ptz(self, camera, obj):
camera_config = self.config.cameras[camera]
# # frame width and height
camera_width = camera_config.frame_shape[1]
camera_height = camera_config.frame_shape[0]
2023-09-22 17:02:10 +03:00
camera_fps = camera_config.detect.fps
centroid_x = obj.obj_data["centroid"][0]
centroid_y = obj.obj_data["centroid"][1]
# Normalize coordinates. top right of the fov is (1,1), center is (0,0), bottom left is (-1, -1).
2023-09-22 17:02:10 +03:00
pan = ((centroid_x / camera_width) - 0.5) * 2
tilt = (0.5 - (centroid_y / camera_height)) * 2
if (
camera_config.onvif.autotracking.movement_weights
): # use estimates if we have available coefficients
predicted_movement_time = self._predict_movement_time(camera, pan, tilt)
# Norfair gives us two points for the velocity of an object represented as x1, y1, x2, y2
x1, y1, x2, y2 = obj.obj_data["estimate_velocity"]
average_velocity = (
(x1 + x2) / 2,
(y1 + y2) / 2,
(x1 + x2) / 2,
(y1 + y2) / 2,
)
# this box could exceed the frame boundaries if velocity is high
# but we'll handle that in _enqueue_move() as two separate moves
2023-09-22 17:02:10 +03:00
predicted_box = [
round(x + camera_fps * predicted_movement_time * v)
for x, v in zip(obj.obj_data["box"], average_velocity)
2023-09-22 17:02:10 +03:00
]
centroid_x = round((predicted_box[0] + predicted_box[2]) / 2)
centroid_y = round((predicted_box[1] + predicted_box[3]) / 2)
# recalculate pan and tilt with new centroid
pan = ((centroid_x / camera_width) - 0.5) * 2
tilt = (0.5 - (centroid_y / camera_height)) * 2
logger.debug(f'Original box: {obj.obj_data["box"]}')
logger.debug(f"Predicted box: {predicted_box}")
logger.debug(f'Velocity: {obj.obj_data["estimate_velocity"]}')
2023-09-06 17:08:48 +03:00
if camera_config.onvif.autotracking.zooming == ZoomingModeEnum.relative:
2023-09-05 02:34:14 +03:00
# relative zooming concurrently with pan/tilt
2023-09-25 02:41:08 +03:00
zoom = obj.obj_data["area"] / (camera_width * camera_height)
# test if we need to zoom out
if not self._should_zoom_in(
camera,
predicted_box
if camera_config.onvif.autotracking.movement_weights
else obj.obj_data["box"],
obj.obj_data["area"],
):
zoom = -(1 - zoom)
2023-09-05 02:34:14 +03:00
# don't make small movements if area hasn't changed significantly
if (
"area" in obj.previous
and abs(obj.obj_data["area"] - obj.previous["area"])
/ obj.obj_data["area"]
< 0.1
):
zoom = 0
else:
2023-09-05 02:34:14 +03:00
zoom = 0
self._enqueue_move(camera, obj.obj_data["frame_time"], pan, tilt, zoom)
2023-09-01 16:10:12 +03:00
2023-09-05 02:34:14 +03:00
def _autotrack_zoom_only(self, camera, obj):
2023-09-01 16:10:12 +03:00
camera_config = self.config.cameras[camera]
2023-09-05 02:34:14 +03:00
# absolute zooming separately from pan/tilt
2023-09-06 17:08:48 +03:00
if camera_config.onvif.autotracking.zooming == ZoomingModeEnum.absolute:
2023-09-01 16:58:50 +03:00
zoom_level = self.ptz_metrics[camera]["ptz_zoom_level"].value
2023-09-01 16:10:12 +03:00
if 0 < zoom_level <= 1:
if self._should_zoom_in(
camera, obj.obj_data["box"], obj.obj_data["area"]
):
zoom = min(1.0, zoom_level + 0.1)
2023-09-01 16:58:50 +03:00
else:
zoom = max(0.0, zoom_level - 0.1)
2023-09-01 16:10:12 +03:00
2023-09-03 22:21:17 +03:00
if zoom != zoom_level:
self._enqueue_move(camera, obj.obj_data["frame_time"], 0, 0, zoom)
def autotrack_object(self, camera, obj):
camera_config = self.config.cameras[camera]
if camera_config.onvif.autotracking.enabled:
if not self.autotracker_init[camera]:
self._autotracker_setup(self.config.cameras[camera], camera)
2023-09-22 17:02:10 +03:00
if self.calibrating[camera]:
logger.debug("Calibrating camera")
return
# either this is a brand new object that's on our camera, has our label, entered the zone, is not a false positive,
# and is not initially motionless - or one we're already tracking, which assumes all those things are already true
if (
# new object
self.tracked_object[camera] is None
and obj.camera == camera
and obj.obj_data["label"] in self.object_types[camera]
and set(obj.entered_zones) & set(self.required_zones[camera])
and not obj.previous["false_positive"]
and not obj.false_positive
and self.tracked_object_previous[camera] is None
and obj.obj_data["motionless_count"] == 0
):
logger.debug(
f"Autotrack: New object: {obj.obj_data['id']} {obj.obj_data['box']} {obj.obj_data['frame_time']}"
)
self.tracked_object[camera] = obj
self.tracked_object_previous[camera] = copy.deepcopy(obj)
2023-09-22 17:02:10 +03:00
self.previous_frame_time[camera] = obj.obj_data["frame_time"]
self._autotrack_move_ptz(camera, obj)
return
if (
# already tracking an object
self.tracked_object[camera] is not None
and self.tracked_object_previous[camera] is not None
and obj.obj_data["id"] == self.tracked_object[camera].obj_data["id"]
and obj.obj_data["frame_time"] != self.previous_frame_time
):
2023-09-22 17:02:10 +03:00
self.previous_frame_time[camera] = obj.obj_data["frame_time"]
# Don't move ptz if Euclidean distance from object to center of frame is
# less than 15% of the of the larger dimension (width or height) of the frame,
# multiplied by a scaling factor for object size.
# Adjusting this percentage slightly lower will effectively cause the camera to move
# more often to keep the object in the center. Raising the percentage will cause less
# movement and will be more flexible with objects not quite being centered.
# TODO: there's probably a better way to approach this
distance = math.sqrt(
(obj.obj_data["centroid"][0] - camera_config.detect.width / 2) ** 2
+ (obj.obj_data["centroid"][1] - camera_config.detect.height / 2)
** 2
)
obj_width = obj.obj_data["box"][2] - obj.obj_data["box"][0]
obj_height = obj.obj_data["box"][3] - obj.obj_data["box"][1]
max_obj = max(obj_width, obj_height)
max_frame = max(camera_config.detect.width, camera_config.detect.height)
# larger objects should lower the threshold, smaller objects should raise it
scaling_factor = 1 - (max_obj / max_frame)
distance_threshold = 0.15 * (max_frame) * scaling_factor
iou = intersection_over_union(
self.tracked_object_previous[camera].obj_data["box"],
obj.obj_data["box"],
)
logger.debug(
f"Distance: {distance}, threshold: {distance_threshold}, iou: {iou}"
)
if distance < distance_threshold and iou > 0.2:
logger.debug(
f"Autotrack: Existing object (do NOT move ptz): {obj.obj_data['id']} {obj.obj_data['box']} {obj.obj_data['frame_time']}"
)
2023-09-01 16:10:12 +03:00
2023-09-05 02:34:14 +03:00
# no need to move, but try absolute zooming
self._autotrack_zoom_only(camera, obj)
2023-09-01 16:10:12 +03:00
return
logger.debug(
f"Autotrack: Existing object (need to move ptz): {obj.obj_data['id']} {obj.obj_data['box']} {obj.obj_data['frame_time']}"
)
self.tracked_object_previous[camera] = copy.deepcopy(obj)
self._autotrack_move_ptz(camera, obj)
2023-09-05 02:34:14 +03:00
# try absolute zooming too
self._autotrack_zoom_only(camera, obj)
2023-09-01 16:10:12 +03:00
return
if (
# The tracker lost an object, so let's check the previous object's region and compare it with the incoming object
# If it's within bounds, start tracking that object.
# Should we check region (maybe too broad) or expand the previous object's box a bit and check that?
self.tracked_object[camera] is None
and obj.camera == camera
and obj.obj_data["label"] in self.object_types[camera]
and not obj.previous["false_positive"]
and not obj.false_positive
and self.tracked_object_previous[camera] is not None
):
2023-09-22 17:02:10 +03:00
self.previous_frame_time[camera] = obj.obj_data["frame_time"]
if (
intersection_over_union(
self.tracked_object_previous[camera].obj_data["region"],
obj.obj_data["box"],
)
< 0.2
):
logger.debug(
f"Autotrack: Reacquired object: {obj.obj_data['id']} {obj.obj_data['box']} {obj.obj_data['frame_time']}"
)
self.tracked_object[camera] = obj
self.tracked_object_previous[camera] = copy.deepcopy(obj)
self._autotrack_move_ptz(camera, obj)
return
def end_object(self, camera, obj):
if self.config.cameras[camera].onvif.autotracking.enabled:
if (
self.tracked_object[camera] is not None
and obj.obj_data["id"] == self.tracked_object[camera].obj_data["id"]
):
logger.debug(
f"Autotrack: End object: {obj.obj_data['id']} {obj.obj_data['box']}"
)
self.tracked_object[camera] = None
def camera_maintenance(self, camera):
2023-09-22 17:02:10 +03:00
# bail and don't check anything if we're calibrating or tracking an object
if self.calibrating[camera] or self.tracked_object[camera] is not None:
return
logger.debug("Running camera maintenance")
# calls get_camera_status to check/update ptz movement
# returns camera to preset after timeout when tracking is over
autotracker_config = self.config.cameras[camera].onvif.autotracking
if not self.autotracker_init[camera]:
self._autotracker_setup(self.config.cameras[camera], camera)
# regularly update camera status
if not self.ptz_metrics[camera]["ptz_stopped"].is_set():
self.onvif.get_camera_status(camera)
# return to preset if tracking is over
if (
self.tracked_object[camera] is None
and self.tracked_object_previous[camera] is not None
and (
# might want to use a different timestamp here?
self.ptz_metrics[camera]["ptz_frame_time"].value
- self.tracked_object_previous[camera].obj_data["frame_time"]
> autotracker_config.timeout
)
and autotracker_config.return_preset
):
2023-09-14 18:50:10 +03:00
# empty move queue
while not self.move_queues[camera].empty():
self.move_queues[camera].get()
# clear tracked object
self.tracked_object[camera] = None
self.tracked_object_previous[camera] = None
self.ptz_metrics[camera]["ptz_stopped"].wait()
logger.debug(
f"Autotrack: Time is {self.ptz_metrics[camera]['ptz_frame_time'].value}, returning to preset: {autotracker_config.return_preset}"
)
self.onvif._move_to_preset(
camera,
autotracker_config.return_preset.lower(),
)
self.ptz_metrics[camera]["ptz_reset"].set()