frigate/frigate/ptz/autotrack.py
2023-09-04 12:59:44 -05:00

519 lines
21 KiB
Python

"""Automatically pan, tilt, and zoom on detected objects via onvif."""
import copy
import logging
import math
import queue
import threading
import time
from functools import partial
from multiprocessing.synchronize import Event as MpEvent
import cv2
import numpy as np
from norfair.camera_motion import (
HomographyTransformationGetter,
MotionEstimator,
TranslationTransformationGetter,
)
from frigate.config import CameraConfig, FrigateConfig
from frigate.ptz.onvif import OnvifController
from frigate.types import PTZMetricsTypes
from frigate.util.image import SharedMemoryFrameManager, intersection_over_union
logger = logging.getLogger(__name__)
def ptz_moving_at_frame_time(frame_time, ptz_start_time, ptz_stop_time):
# Determine if the PTZ was in motion at the set frame time
# for non ptz/autotracking cameras, this will always return False
# ptz_start_time is initialized to 0 on startup and only changes
# when autotracking movements are made
# the offset "primes" the motion estimator with a few frames before movement
offset = 0.5
return (ptz_start_time != 0.0 and frame_time >= ptz_start_time - offset) and (
ptz_stop_time == 0.0 or (ptz_start_time - offset <= frame_time <= ptz_stop_time)
)
class PtzMotionEstimator:
def __init__(
self, config: CameraConfig, ptz_metrics: dict[str, PTZMetricsTypes]
) -> None:
self.frame_manager = SharedMemoryFrameManager()
self.norfair_motion_estimator = None
self.camera_config = config
self.coord_transformations = None
self.ptz_metrics = ptz_metrics
self.ptz_start_time = self.ptz_metrics["ptz_start_time"]
self.ptz_stop_time = self.ptz_metrics["ptz_stop_time"]
self.ptz_metrics["ptz_reset"].set()
logger.debug(f"Motion estimator init for cam: {config.name}")
def motion_estimator(self, detections, frame_time, camera_name):
# If we've just started up or returned to our preset, reset motion estimator for new tracking session
if self.ptz_metrics["ptz_reset"].is_set():
self.ptz_metrics["ptz_reset"].clear()
# homography is nice (zooming) but slow, translation is pan/tilt only but fast.
if self.camera_config.onvif.autotracking.zooming:
logger.debug("Motion estimator reset - homography")
transformation_type = HomographyTransformationGetter()
else:
logger.debug("Motion estimator reset - translation")
transformation_type = TranslationTransformationGetter()
self.norfair_motion_estimator = MotionEstimator(
transformations_getter=transformation_type,
min_distance=30,
max_points=900,
)
self.coord_transformations = None
if ptz_moving_at_frame_time(
frame_time, self.ptz_start_time.value, self.ptz_stop_time.value
):
logger.debug(
f"Motion estimator running for {camera_name} - frame time: {frame_time}, {self.ptz_start_time.value}, {self.ptz_stop_time.value}"
)
frame_id = f"{camera_name}{frame_time}"
yuv_frame = self.frame_manager.get(
frame_id, self.camera_config.frame_shape_yuv
)
frame = cv2.cvtColor(yuv_frame, cv2.COLOR_YUV2GRAY_I420)
# mask out detections for better motion estimation
mask = np.ones(frame.shape[:2], frame.dtype)
detection_boxes = [x[2] for x in detections]
for detection in detection_boxes:
x1, y1, x2, y2 = detection
mask[y1:y2, x1:x2] = 0
# merge camera config motion mask with detections. Norfair function needs 0,1 mask
mask = np.bitwise_and(mask, self.camera_config.motion.mask).clip(max=1)
# Norfair estimator function needs color so it can convert it right back to gray
frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGRA)
self.coord_transformations = self.norfair_motion_estimator.update(
frame, mask
)
self.frame_manager.close(frame_id)
if not self.camera_config.onvif.autotracking.zooming:
# doesn't work with homography
logger.debug(
f"Motion estimator transformation: {self.coord_transformations.rel_to_abs((0,0))}"
)
return self.coord_transformations
class PtzAutoTrackerThread(threading.Thread):
def __init__(
self,
config: FrigateConfig,
onvif: OnvifController,
ptz_metrics: dict[str, PTZMetricsTypes],
stop_event: MpEvent,
) -> None:
threading.Thread.__init__(self)
self.name = "ptz_autotracker"
self.ptz_autotracker = PtzAutoTracker(config, onvif, ptz_metrics)
self.stop_event = stop_event
self.config = config
def run(self):
while not self.stop_event.wait(1):
for camera_name, cam in self.config.cameras.items():
if cam.onvif.autotracking.enabled:
self.ptz_autotracker.camera_maintenance(camera_name)
else:
# disabled dynamically by mqtt
if self.ptz_autotracker.tracked_object.get(camera_name):
self.ptz_autotracker.tracked_object[camera_name] = None
self.ptz_autotracker.tracked_object_previous[camera_name] = None
logger.info("Exiting autotracker...")
class PtzAutoTracker:
def __init__(
self,
config: FrigateConfig,
onvif: OnvifController,
ptz_metrics: PTZMetricsTypes,
) -> None:
self.config = config
self.onvif = onvif
self.ptz_metrics = ptz_metrics
self.tracked_object: dict[str, object] = {}
self.tracked_object_previous: dict[str, object] = {}
self.previous_frame_time = None
self.object_types = {}
self.required_zones = {}
self.move_queues = {}
self.move_threads = {}
self.autotracker_init = {}
# if cam is set to autotrack, onvif should be set up
for camera_name, cam in self.config.cameras.items():
self.autotracker_init[camera_name] = False
if cam.onvif.autotracking.enabled:
self._autotracker_setup(cam, camera_name)
def _autotracker_setup(self, cam, camera_name):
logger.debug(f"Autotracker init for cam: {camera_name}")
self.object_types[camera_name] = cam.onvif.autotracking.track
self.required_zones[camera_name] = cam.onvif.autotracking.required_zones
self.tracked_object[camera_name] = None
self.tracked_object_previous[camera_name] = None
self.move_queues[camera_name] = queue.Queue()
if not self.onvif.cams[camera_name]["init"]:
if not self.onvif._init_onvif(camera_name):
logger.warning(f"Unable to initialize onvif for {camera_name}")
cam.onvif.autotracking.enabled = False
self.ptz_metrics[camera_name]["ptz_autotracker_enabled"].value = False
return
if "pt-r-fov" not in self.onvif.cams[camera_name]["features"]:
cam.onvif.autotracking.enabled = False
self.ptz_metrics[camera_name]["ptz_autotracker_enabled"].value = False
logger.warning(
f"Disabling autotracking for {camera_name}: FOV relative movement not supported"
)
return
self.onvif.get_camera_status(camera_name)
# movement thread per camera
if not self.move_threads or not self.move_threads[camera_name]:
self.move_threads[camera_name] = threading.Thread(
name=f"move_thread_{camera_name}",
target=partial(self._process_move_queue, camera_name),
)
self.move_threads[camera_name].daemon = True
self.move_threads[camera_name].start()
self.autotracker_init[camera_name] = True
def _process_move_queue(self, camera):
while True:
try:
move_data = self.move_queues[camera].get()
frame_time, pan, tilt, zoom = move_data
# if we're receiving move requests during a PTZ move, ignore them
if ptz_moving_at_frame_time(
frame_time,
self.ptz_metrics[camera]["ptz_start_time"].value,
self.ptz_metrics[camera]["ptz_stop_time"].value,
):
# instead of dequeueing this might be a good place to preemptively move based
# on an estimate - for fast moving objects, etc.
logger.debug(
f"Move queue: PTZ moving, dequeueing move request - frame time: {frame_time}, final pan: {pan}, final tilt: {tilt}, final zoom: {zoom}"
)
continue
else:
if (
self.config.cameras[camera].onvif.autotracking.zooming
and self.config.cameras[camera].onvif.autotracking.zoom_relative
):
self.onvif._move_relative(camera, pan, tilt, zoom, 1)
else:
if zoom > 0:
self.onvif._zoom_absolute(camera, zoom, 1)
# on some cameras with cheaper motors it seems like small values can cause jerky movement
# TODO: double check, might not need this
if abs(pan) > 0.02 or abs(tilt) > 0.02:
self.onvif._move_relative(camera, pan, tilt, 0, 1)
else:
logger.debug(
f"Not moving, pan and tilt too small: {pan}, {tilt}"
)
# Wait until the camera finishes moving
while not self.ptz_metrics[camera]["ptz_stopped"].is_set():
# check if ptz is moving
self.onvif.get_camera_status(camera)
except queue.Empty:
continue
def _enqueue_move(self, camera, frame_time, pan, tilt, zoom):
move_data = (frame_time, pan, tilt, zoom)
if (
frame_time > self.ptz_metrics[camera]["ptz_start_time"].value
and frame_time > self.ptz_metrics[camera]["ptz_stop_time"].value
):
logger.debug(
f"enqueue pan: {pan}, enqueue tilt: {tilt}, enqueue zoom: {zoom}"
)
self.move_queues[camera].put(move_data)
def _should_zoom_in(self, obj, camera):
camera_config = self.config.cameras[camera]
camera_width = camera_config.frame_shape[1]
camera_height = camera_config.frame_shape[0]
camera_area = camera_width * camera_height
bb_left, bb_top, bb_right, bb_bottom = obj.obj_data["box"]
# If bounding box is not within 5% of an edge
# If object area is less than 70% of frame
# Then zoom in, otherwise try zooming out
# should we make these configurable?
edge_threshold = 0.05
area_threshold = 0.7
# returns True to zoom in, False to zoom out
return (
bb_left > edge_threshold * camera_width
and bb_right < (1 - edge_threshold) * camera_width
and bb_top > edge_threshold * camera_height
and bb_bottom < (1 - edge_threshold) * camera_height
and obj.obj_data["area"] < area_threshold * camera_area
)
def _autotrack_move_ptz(self, camera, obj):
camera_config = self.config.cameras[camera]
# # frame width and height
camera_width = camera_config.frame_shape[1]
camera_height = camera_config.frame_shape[0]
# Normalize coordinates. top right of the fov is (1,1), center is (0,0), bottom left is (-1, -1).
pan = ((obj.obj_data["centroid"][0] / camera_width) - 0.5) * 2
tilt = (0.5 - (obj.obj_data["centroid"][1] / camera_height)) * 2
# ideas: check object velocity for camera speed?
if (
camera_config.onvif.autotracking.zooming
and camera_config.onvif.autotracking.zoom_relative
):
zoom = obj.obj_data["area"] / (camera_width * camera_height)
# test if we need to zoom out
if not self._should_zoom_in(obj, camera):
zoom = -(1 - zoom)
self._enqueue_move(
camera,
obj.obj_data["frame_time"],
pan,
tilt,
zoom,
)
else:
self._enqueue_move(camera, obj.obj_data["frame_time"], pan, tilt, 0)
def _autotrack_zoom_ptz(self, camera, obj):
camera_config = self.config.cameras[camera]
if (
camera_config.onvif.autotracking.zooming
and not camera_config.onvif.autotracking.zoom_relative
):
# absolute zooming
zoom_level = self.ptz_metrics[camera]["ptz_zoom_level"].value
if 0 < zoom_level <= 1:
if self._should_zoom_in(obj, camera):
zoom = min(1.0, zoom_level + 0.1)
else:
zoom = max(0.0, zoom_level - 0.1)
if zoom != zoom_level:
self._enqueue_move(camera, obj.obj_data["frame_time"], 0, 0, zoom)
def autotrack_object(self, camera, obj):
camera_config = self.config.cameras[camera]
if camera_config.onvif.autotracking.enabled:
if not self.autotracker_init[camera]:
self._autotracker_setup(self.config.cameras[camera], camera)
# either this is a brand new object that's on our camera, has our label, entered the zone, is not a false positive,
# and is not initially motionless - or one we're already tracking, which assumes all those things are already true
if (
# new object
self.tracked_object[camera] is None
and obj.camera == camera
and obj.obj_data["label"] in self.object_types[camera]
and set(obj.entered_zones) & set(self.required_zones[camera])
and not obj.previous["false_positive"]
and not obj.false_positive
and self.tracked_object_previous[camera] is None
and obj.obj_data["motionless_count"] == 0
):
logger.debug(
f"Autotrack: New object: {obj.obj_data['id']} {obj.obj_data['box']} {obj.obj_data['frame_time']}"
)
self.tracked_object[camera] = obj
self.tracked_object_previous[camera] = copy.deepcopy(obj)
self.previous_frame_time = obj.obj_data["frame_time"]
self._autotrack_move_ptz(camera, obj)
return
if (
# already tracking an object
self.tracked_object[camera] is not None
and self.tracked_object_previous[camera] is not None
and obj.obj_data["id"] == self.tracked_object[camera].obj_data["id"]
and obj.obj_data["frame_time"] != self.previous_frame_time
):
self.previous_frame_time = obj.obj_data["frame_time"]
# Don't move ptz if Euclidean distance from object to center of frame is
# less than 15% of the of the larger dimension (width or height) of the frame,
# multiplied by a scaling factor for object size.
# Adjusting this percentage slightly lower will effectively cause the camera to move
# more often to keep the object in the center. Raising the percentage will cause less
# movement and will be more flexible with objects not quite being centered.
# TODO: there's probably a better way to approach this
distance = math.sqrt(
(obj.obj_data["centroid"][0] - camera_config.detect.width / 2) ** 2
+ (obj.obj_data["centroid"][1] - camera_config.detect.height / 2)
** 2
)
obj_width = obj.obj_data["box"][2] - obj.obj_data["box"][0]
obj_height = obj.obj_data["box"][3] - obj.obj_data["box"][1]
max_obj = max(obj_width, obj_height)
max_frame = max(camera_config.detect.width, camera_config.detect.height)
# larger objects should lower the threshold, smaller objects should raise it
scaling_factor = 1 - (max_obj / max_frame)
distance_threshold = 0.15 * (max_frame) * scaling_factor
iou = intersection_over_union(
self.tracked_object_previous[camera].obj_data["box"],
obj.obj_data["box"],
)
logger.debug(
f"Distance: {distance}, threshold: {distance_threshold}, iou: {iou}"
)
if distance < distance_threshold and iou > 0.2:
logger.debug(
f"Autotrack: Existing object (do NOT move ptz): {obj.obj_data['id']} {obj.obj_data['box']} {obj.obj_data['frame_time']}"
)
# no need to move, but try zooming
self._autotrack_zoom_ptz(camera, obj)
return
logger.debug(
f"Autotrack: Existing object (need to move ptz): {obj.obj_data['id']} {obj.obj_data['box']} {obj.obj_data['frame_time']}"
)
self.tracked_object_previous[camera] = copy.deepcopy(obj)
self._autotrack_move_ptz(camera, obj)
# try zooming too
self._autotrack_zoom_ptz(camera, obj)
return
if (
# The tracker lost an object, so let's check the previous object's region and compare it with the incoming object
# If it's within bounds, start tracking that object.
# Should we check region (maybe too broad) or expand the previous object's box a bit and check that?
self.tracked_object[camera] is None
and obj.camera == camera
and obj.obj_data["label"] in self.object_types[camera]
and not obj.previous["false_positive"]
and not obj.false_positive
and obj.obj_data["motionless_count"] == 0
and self.tracked_object_previous[camera] is not None
):
self.previous_frame_time = obj.obj_data["frame_time"]
if (
intersection_over_union(
self.tracked_object_previous[camera].obj_data["region"],
obj.obj_data["box"],
)
< 0.2
):
logger.debug(
f"Autotrack: Reacquired object: {obj.obj_data['id']} {obj.obj_data['box']} {obj.obj_data['frame_time']}"
)
self.tracked_object[camera] = obj
self.tracked_object_previous[camera] = copy.deepcopy(obj)
self._autotrack_move_ptz(camera, obj)
return
def end_object(self, camera, obj):
if self.config.cameras[camera].onvif.autotracking.enabled:
if (
self.tracked_object[camera] is not None
and obj.obj_data["id"] == self.tracked_object[camera].obj_data["id"]
):
logger.debug(
f"Autotrack: End object: {obj.obj_data['id']} {obj.obj_data['box']}"
)
self.tracked_object[camera] = None
def camera_maintenance(self, camera):
# calls get_camera_status to check/update ptz movement
# returns camera to preset after timeout when tracking is over
autotracker_config = self.config.cameras[camera].onvif.autotracking
if not self.autotracker_init[camera]:
self._autotracker_setup(self.config.cameras[camera], camera)
# regularly update camera status
if not self.ptz_metrics[camera]["ptz_stopped"].is_set():
self.onvif.get_camera_status(camera)
# return to preset if tracking is over
if (
self.tracked_object[camera] is None
and self.tracked_object_previous[camera] is not None
and (
# might want to use a different timestamp here?
time.time()
- self.tracked_object_previous[camera].obj_data["frame_time"]
> autotracker_config.timeout
)
and autotracker_config.return_preset
):
self.ptz_metrics[camera]["ptz_stopped"].wait()
logger.debug(
f"Autotrack: Time is {time.time()}, returning to preset: {autotracker_config.return_preset}"
)
self.onvif._move_to_preset(
camera,
autotracker_config.return_preset.lower(),
)
self.ptz_metrics[camera]["ptz_reset"].set()
# empty move queue
while not self.move_queues[camera].empty():
self.move_queues[camera].get()
# clear tracked object
self.tracked_object[camera] = None
self.tracked_object_previous[camera] = None