"""Automatically pan, tilt, and zoom on detected objects via onvif.""" import copy import logging import os import queue import threading import time from functools import partial from multiprocessing.synchronize import Event as MpEvent import cv2 import numpy as np from norfair.camera_motion import ( HomographyTransformationGetter, MotionEstimator, TranslationTransformationGetter, ) from frigate.config import CameraConfig, FrigateConfig, ZoomingModeEnum from frigate.const import CONFIG_DIR from frigate.ptz.onvif import OnvifController from frigate.types import PTZMetricsTypes from frigate.util.builtin import update_yaml_file from frigate.util.image import SharedMemoryFrameManager, intersection_over_union logger = logging.getLogger(__name__) def ptz_moving_at_frame_time(frame_time, ptz_start_time, ptz_stop_time): # Determine if the PTZ was in motion at the set frame time # for non ptz/autotracking cameras, this will always return False # ptz_start_time is initialized to 0 on startup and only changes # when autotracking movements are made return (ptz_start_time != 0.0 and frame_time > ptz_start_time) and ( ptz_stop_time == 0.0 or (ptz_start_time <= frame_time <= ptz_stop_time) ) class PtzMotionEstimator: def __init__( self, config: CameraConfig, ptz_metrics: dict[str, PTZMetricsTypes] ) -> None: self.frame_manager = SharedMemoryFrameManager() self.norfair_motion_estimator = None self.camera_config = config self.coord_transformations = None self.ptz_metrics = ptz_metrics self.ptz_start_time = self.ptz_metrics["ptz_start_time"] self.ptz_stop_time = self.ptz_metrics["ptz_stop_time"] self.ptz_metrics["ptz_reset"].set() logger.debug(f"Motion estimator init for cam: {config.name}") def motion_estimator(self, detections, frame_time, camera_name): # If we've just started up or returned to our preset, reset motion estimator for new tracking session if self.ptz_metrics["ptz_reset"].is_set(): self.ptz_metrics["ptz_reset"].clear() # homography is nice (zooming) but slow, translation is pan/tilt only but fast. if ( self.camera_config.onvif.autotracking.zooming != ZoomingModeEnum.disabled ): logger.debug("Motion estimator reset - homography") transformation_type = HomographyTransformationGetter() else: logger.debug("Motion estimator reset - translation") transformation_type = TranslationTransformationGetter() self.norfair_motion_estimator = MotionEstimator( transformations_getter=transformation_type, min_distance=30, max_points=900, ) self.coord_transformations = None if ptz_moving_at_frame_time( frame_time, self.ptz_start_time.value, self.ptz_stop_time.value ): logger.debug( f"Motion estimator running for {camera_name} - frame time: {frame_time}, {self.ptz_start_time.value}, {self.ptz_stop_time.value}" ) frame_id = f"{camera_name}{frame_time}" yuv_frame = self.frame_manager.get( frame_id, self.camera_config.frame_shape_yuv ) frame = cv2.cvtColor(yuv_frame, cv2.COLOR_YUV2GRAY_I420) # mask out detections for better motion estimation mask = np.ones(frame.shape[:2], frame.dtype) detection_boxes = [x[2] for x in detections] for detection in detection_boxes: x1, y1, x2, y2 = detection mask[y1:y2, x1:x2] = 0 # merge camera config motion mask with detections. Norfair function needs 0,1 mask mask = np.bitwise_and(mask, self.camera_config.motion.mask).clip(max=1) # Norfair estimator function needs color so it can convert it right back to gray frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGRA) try: self.coord_transformations = self.norfair_motion_estimator.update( frame, mask ) logger.debug( f"Motion estimator transformation: {self.coord_transformations.rel_to_abs([[0,0]])}" ) except Exception: # sometimes opencv can't find enough features in the image to find homography, so catch this error logger.warning( f"Autotracker: motion estimator couldn't get transformations for {camera_name} at frame time {frame_time}" ) self.coord_transformations = None self.frame_manager.close(frame_id) return self.coord_transformations class PtzAutoTrackerThread(threading.Thread): def __init__( self, config: FrigateConfig, onvif: OnvifController, ptz_metrics: dict[str, PTZMetricsTypes], stop_event: MpEvent, ) -> None: threading.Thread.__init__(self) self.name = "ptz_autotracker" self.ptz_autotracker = PtzAutoTracker(config, onvif, ptz_metrics) self.stop_event = stop_event self.config = config def run(self): while not self.stop_event.wait(1): for camera_name, cam in self.config.cameras.items(): if not cam.enabled: continue if cam.onvif.autotracking.enabled: self.ptz_autotracker.camera_maintenance(camera_name) else: # disabled dynamically by mqtt if self.ptz_autotracker.tracked_object.get(camera_name): self.ptz_autotracker.tracked_object[camera_name] = None self.ptz_autotracker.tracked_object_previous[camera_name] = None logger.info("Exiting autotracker...") class PtzAutoTracker: def __init__( self, config: FrigateConfig, onvif: OnvifController, ptz_metrics: PTZMetricsTypes, ) -> None: self.config = config self.onvif = onvif self.ptz_metrics = ptz_metrics self.tracked_object: dict[str, object] = {} self.tracked_object_previous: dict[str, object] = {} self.previous_frame_time: dict[str, object] = {} self.object_types: dict[str, object] = {} self.required_zones: dict[str, object] = {} self.move_queues: dict[str, object] = {} self.move_queue_locks: dict[str, object] = {} self.move_threads: dict[str, object] = {} self.autotracker_init: dict[str, object] = {} self.move_metrics: dict[str, object] = {} self.calibrating: dict[str, object] = {} self.intercept: dict[str, object] = {} self.move_coefficients: dict[str, object] = {} self.zoom_factor: dict[str, object] = {} # if cam is set to autotrack, onvif should be set up for camera_name, cam in self.config.cameras.items(): if not cam.enabled: continue self.autotracker_init[camera_name] = False if cam.onvif.autotracking.enabled: self._autotracker_setup(cam, camera_name) def _autotracker_setup(self, camera_config, camera_name): logger.debug(f"Autotracker init for cam: {camera_name}") self.object_types[camera_name] = camera_config.onvif.autotracking.track self.required_zones[ camera_name ] = camera_config.onvif.autotracking.required_zones self.zoom_factor[camera_name] = camera_config.onvif.autotracking.zoom_factor self.tracked_object[camera_name] = None self.tracked_object_previous[camera_name] = None self.calibrating[camera_name] = False self.move_metrics[camera_name] = [] self.intercept[camera_name] = None self.move_coefficients[camera_name] = [] self.move_queues[camera_name] = queue.Queue() self.move_queue_locks[camera_name] = threading.Lock() if not self.onvif.cams[camera_name]["init"]: if not self.onvif._init_onvif(camera_name): logger.warning(f"Unable to initialize onvif for {camera_name}") camera_config.onvif.autotracking.enabled = False self.ptz_metrics[camera_name]["ptz_autotracker_enabled"].value = False return if "pt-r-fov" not in self.onvif.cams[camera_name]["features"]: camera_config.onvif.autotracking.enabled = False self.ptz_metrics[camera_name]["ptz_autotracker_enabled"].value = False logger.warning( f"Disabling autotracking for {camera_name}: FOV relative movement not supported" ) return movestatus_supported = self.onvif.get_service_capabilities(camera_name) if movestatus_supported is None or movestatus_supported.lower() != "true": camera_config.onvif.autotracking.enabled = False self.ptz_metrics[camera_name]["ptz_autotracker_enabled"].value = False logger.warning( f"Disabling autotracking for {camera_name}: ONVIF MoveStatus not supported" ) return self.onvif.get_camera_status(camera_name) # movement thread per camera if not self.move_threads or not self.move_threads[camera_name]: self.move_threads[camera_name] = threading.Thread( name=f"move_thread_{camera_name}", target=partial(self._process_move_queue, camera_name), ) self.move_threads[camera_name].daemon = True self.move_threads[camera_name].start() if camera_config.onvif.autotracking.movement_weights: self.intercept[ camera_name ] = camera_config.onvif.autotracking.movement_weights[0] self.move_coefficients[ camera_name ] = camera_config.onvif.autotracking.movement_weights[1:] if camera_config.onvif.autotracking.calibrate_on_startup: self._calibrate_camera(camera_name) self.autotracker_init[camera_name] = True def _write_config(self, camera): config_file = os.environ.get("CONFIG_FILE", f"{CONFIG_DIR}/config.yml") logger.debug( f"Writing new config with autotracker motion coefficients: {self.config.cameras[camera].onvif.autotracking.movement_weights}" ) update_yaml_file( config_file, ["cameras", camera, "onvif", "autotracking", "movement_weights"], self.config.cameras[camera].onvif.autotracking.movement_weights, ) def _calibrate_camera(self, camera): # move the camera from the preset in steps and measure the time it takes to move that amount # this will allow us to predict movement times with a simple linear regression # start with 0 so we can determine a baseline (to be used as the intercept in the regression calc) # TODO: take zooming into account too num_steps = 30 step_sizes = np.linspace(0, 1, num_steps) self.calibrating[camera] = True logger.info(f"Camera calibration for {camera} in progress") self.onvif._move_to_preset( camera, self.config.cameras[camera].onvif.autotracking.return_preset.lower(), ) self.ptz_metrics[camera]["ptz_reset"].set() self.ptz_metrics[camera]["ptz_stopped"].clear() # Wait until the camera finishes moving while not self.ptz_metrics[camera]["ptz_stopped"].is_set(): self.onvif.get_camera_status(camera) for step in range(num_steps): pan = step_sizes[step] tilt = step_sizes[step] start_time = time.time() self.onvif._move_relative(camera, pan, tilt, 0, 1) # Wait until the camera finishes moving while not self.ptz_metrics[camera]["ptz_stopped"].is_set(): self.onvif.get_camera_status(camera) stop_time = time.time() self.move_metrics[camera].append( { "pan": pan, "tilt": tilt, "start_timestamp": start_time, "end_timestamp": stop_time, } ) self.onvif._move_to_preset( camera, self.config.cameras[camera].onvif.autotracking.return_preset.lower(), ) self.ptz_metrics[camera]["ptz_reset"].set() self.ptz_metrics[camera]["ptz_stopped"].clear() # Wait until the camera finishes moving while not self.ptz_metrics[camera]["ptz_stopped"].is_set(): self.onvif.get_camera_status(camera) self.calibrating[camera] = False logger.info(f"Calibration for {camera} complete") # calculate and save new intercept and coefficients self._calculate_move_coefficients(camera, True) def _calculate_move_coefficients(self, camera, calibration=False): # calculate new coefficients when we have 50 more new values. Save up to 500 if calibration or ( len(self.move_metrics[camera]) % 50 == 0 and len(self.move_metrics[camera]) != 0 and len(self.move_metrics[camera]) <= 500 ): X = np.array( [abs(d["pan"]) + abs(d["tilt"]) for d in self.move_metrics[camera]] ) y = np.array( [ d["end_timestamp"] - d["start_timestamp"] for d in self.move_metrics[camera] ] ) # simple linear regression with intercept X_with_intercept = np.column_stack((np.ones(X.shape[0]), X)) self.move_coefficients[camera] = np.linalg.lstsq( X_with_intercept, y, rcond=None )[0] # only assign a new intercept if we're calibrating if calibration: self.intercept[camera] = y[0] # write the intercept and coefficients back to the config file as a comma separated string movement_weights = np.concatenate( ([self.intercept[camera]], self.move_coefficients[camera]) ) self.config.cameras[camera].onvif.autotracking.movement_weights = ", ".join( map(str, movement_weights) ) logger.debug( f"New regression parameters - intercept: {self.intercept[camera]}, coefficients: {self.move_coefficients[camera]}" ) self._write_config(camera) def _predict_movement_time(self, camera, pan, tilt): combined_movement = abs(pan) + abs(tilt) input_data = np.array([self.intercept[camera], combined_movement]) return np.dot(self.move_coefficients[camera], input_data) def _process_move_queue(self, camera): while True: move_data = self.move_queues[camera].get() with self.move_queue_locks[camera]: frame_time, pan, tilt, zoom = move_data # if we're receiving move requests during a PTZ move, ignore them if ptz_moving_at_frame_time( frame_time, self.ptz_metrics[camera]["ptz_start_time"].value, self.ptz_metrics[camera]["ptz_stop_time"].value, ): # instead of dequeueing this might be a good place to preemptively move based # on an estimate - for fast moving objects, etc. logger.debug( f"Move queue: PTZ moving, dequeueing move request - frame time: {frame_time}, final pan: {pan}, final tilt: {tilt}, final zoom: {zoom}" ) continue else: if ( self.config.cameras[camera].onvif.autotracking.zooming == ZoomingModeEnum.relative # this enables us to absolutely zoom if we lost an object and self.tracked_object[camera] is not None ): self.onvif._move_relative(camera, pan, tilt, zoom, 1) else: if pan != 0 or tilt != 0: self.onvif._move_relative(camera, pan, tilt, 0, 1) # Wait until the camera finishes moving while not self.ptz_metrics[camera]["ptz_stopped"].is_set(): self.onvif.get_camera_status(camera) if ( zoom > 0 and self.ptz_metrics[camera]["ptz_zoom_level"].value != zoom ): self.onvif._zoom_absolute(camera, zoom, 1) # Wait until the camera finishes moving while not self.ptz_metrics[camera]["ptz_stopped"].is_set(): self.onvif.get_camera_status(camera) if self.config.cameras[camera].onvif.autotracking.movement_weights: logger.debug( f"Predicted movement time: {self._predict_movement_time(camera, pan, tilt)}" ) logger.debug( f'Actual movement time: {self.ptz_metrics[camera]["ptz_stop_time"].value-self.ptz_metrics[camera]["ptz_start_time"].value}' ) # save metrics for better estimate calculations if ( self.intercept[camera] is not None and len(self.move_metrics[camera]) < 500 and (pan != 0 or tilt != 0) ): logger.debug("Adding new values to move metrics") self.move_metrics[camera].append( { "pan": pan, "tilt": tilt, "start_timestamp": self.ptz_metrics[camera][ "ptz_start_time" ].value, "end_timestamp": self.ptz_metrics[camera][ "ptz_stop_time" ].value, } ) # calculate new coefficients if we have enough data self._calculate_move_coefficients(camera) def _enqueue_move(self, camera, frame_time, pan, tilt, zoom): def split_value(value): clipped = np.clip(value, -1, 1) return clipped, value - clipped if ( frame_time > self.ptz_metrics[camera]["ptz_start_time"].value and frame_time > self.ptz_metrics[camera]["ptz_stop_time"].value and not self.move_queue_locks[camera].locked() ): # don't make small movements if abs(pan) < 0.02: pan = 0 if abs(tilt) < 0.02: tilt = 0 # split up any large moves caused by velocity estimated movements while pan != 0 or tilt != 0 or zoom != 0: pan, pan_excess = split_value(pan) tilt, tilt_excess = split_value(tilt) zoom, zoom_excess = split_value(zoom) logger.debug( f"Enqueue movement for frame time: {frame_time} pan: {pan}, enqueue tilt: {tilt}, enqueue zoom: {zoom}" ) move_data = (frame_time, pan, tilt, zoom) self.move_queues[camera].put(move_data) pan = pan_excess tilt = tilt_excess zoom = zoom_excess def _below_distance_threshold(self, camera, obj): # Returns true if Euclidean distance from object to center of frame is # less than 20% of the of the larger dimension (width or height) of the frame, # multiplied by a scaling factor for object size. # Distance is increased if object is not moving to prevent small ptz moves # Adjusting this percentage slightly lower will effectively cause the camera to move # more often to keep the object in the center. Raising the percentage will cause less # movement and will be more flexible with objects not quite being centered. # TODO: there's probably a better way to approach this camera_config = self.config.cameras[camera] stationary = False # Norfair gives us two points for the velocity of an object represented as x1, y1, x2, y2 x1, y1, x2, y2 = obj.obj_data["estimate_velocity"] average_velocity = ( (x1 + x2) / 2, (y1 + y2) / 2, ) # get euclidean distance of the two points, sometimes the estimate is way off velocity_distance = np.linalg.norm([x2 - x1, y2 - y1]) if ( velocity_distance <= 5 and abs(average_velocity[0]) <= 1 and abs(average_velocity[1]) <= 1 ): stationary = True centroid_distance = np.linalg.norm( [ obj.obj_data["centroid"][0] - camera_config.detect.width / 2, obj.obj_data["centroid"][1] - camera_config.detect.height / 2, ] ) obj_width = obj.obj_data["box"][2] - obj.obj_data["box"][0] obj_height = obj.obj_data["box"][3] - obj.obj_data["box"][1] max_obj = max(obj_width, obj_height) max_frame = ( camera_config.detect.width if max_obj == obj_width else camera_config.detect.height ) # larger objects should lower the threshold, smaller objects should raise it scaling_factor = 1 - (max_obj / max_frame) # increase distance if object is stationary distance_threshold = ( 0.20 * max_frame * (scaling_factor * 2 if stationary else scaling_factor) ) logger.debug(f"Distance: {centroid_distance}, threshold: {distance_threshold}") return centroid_distance < distance_threshold def _should_zoom_in(self, camera, obj, box): camera_config = self.config.cameras[camera] camera_width = camera_config.frame_shape[1] camera_height = camera_config.frame_shape[0] camera_fps = camera_config.detect.fps camera_area = camera_width * camera_height x1, y1, x2, y2 = obj.obj_data["estimate_velocity"] average_velocity = ( (x1 + x2) / 2, (y1 + y2) / 2, (x1 + x2) / 2, (y1 + y2) / 2, ) # get euclidean distance of the two points, sometimes the estimate is way off distance = np.linalg.norm([x2 - x1, y2 - y1]) bb_left, bb_top, bb_right, bb_bottom = box # If bounding box is not within 15% of an edge, and euclidean distance to center is within threshold # If object area is less than 1 - zoom_factor of frame # Then zoom in, otherwise try zooming out # Should we make these configurable? # # TODO: Take into account the area changing when an object is moving out of frame edge_threshold = 0.15 # calculate a velocity threshold based on movement coefficients if available if camera_config.onvif.autotracking.movement_weights: predicted_movement_time = self._predict_movement_time(camera, 1, 1) velocity_threshold_x = ( camera_width / predicted_movement_time / camera_fps * 0.7 ) velocity_threshold_y = ( camera_width / predicted_movement_time / camera_fps * 0.7 ) else: # use a generic velocity threshold velocity_threshold_x = camera_width * 0.02 velocity_threshold_y = camera_height * 0.02 far_from_edge = ( bb_left > edge_threshold * camera_width and bb_right < (1 - edge_threshold) * camera_width and bb_top > edge_threshold * camera_height and bb_bottom < (1 - edge_threshold) * camera_height ) # make sure object is centered in the frame below_distance_threshold = self._below_distance_threshold(camera, obj) # if we have a big object, let's zoom out below_area_threshold = (obj.obj_data["area"] / camera_area) < ( 1 - self.zoom_factor[camera] ) # if we have a fast moving object, let's zoom out # fast moving is defined as a velocity of more than 10% of the camera's width or height # so an object with an x velocity of 15 pixels per second on a 1280x720 camera would trigger a zoom out below_velocity_threshold = ( abs(average_velocity[0]) < velocity_threshold_x and abs(average_velocity[1]) < velocity_threshold_y and distance <= 10 ) # debug zooming if False: logger.debug( f"Zoom test: left edge: {bb_left > edge_threshold * camera_width}" ) logger.debug( f"Zoom test: right edge: {bb_right < (1 - edge_threshold) * camera_width}" ) logger.debug( f"Zoom test: top edge: {bb_top > edge_threshold * camera_height}" ) logger.debug( f"Zoom test: bottom edge: {bb_bottom < (1 - edge_threshold) * camera_height}" ) logger.debug( f"Zoom test: below distance threshold {(below_distance_threshold or below_area_threshold)}" ) logger.debug( f"Zoom test: below area threshold: {below_area_threshold} ratio: {obj.obj_data['area']/camera_area}, threshold value: {1-self.zoom_factor[camera]}" ) logger.debug( f"Zoom test: below velocity threshold: {below_velocity_threshold} velocity x: {abs(average_velocity[0])}, x threshold: {velocity_threshold_x}, velocity y: {abs(average_velocity[1])}, y threshold: {velocity_threshold_y}" ) # returns True to zoom in, False to zoom out # Zoom in conditions if ( far_from_edge and (below_distance_threshold or below_area_threshold) and below_velocity_threshold ): return True # Zoom out conditions if not far_from_edge and below_distance_threshold: return False return False def _autotrack_move_ptz(self, camera, obj): camera_config = self.config.cameras[camera] camera_width = camera_config.frame_shape[1] camera_height = camera_config.frame_shape[0] camera_fps = camera_config.detect.fps average_velocity = (0,) * 4 predicted_box = obj.obj_data["box"] centroid_x = obj.obj_data["centroid"][0] centroid_y = obj.obj_data["centroid"][1] # Normalize coordinates. top right of the fov is (1,1), center is (0,0), bottom left is (-1, -1). pan = ((centroid_x / camera_width) - 0.5) * 2 tilt = (0.5 - (centroid_y / camera_height)) * 2 if ( camera_config.onvif.autotracking.movement_weights ): # use estimates if we have available coefficients predicted_movement_time = self._predict_movement_time(camera, pan, tilt) # Norfair gives us two points for the velocity of an object represented as x1, y1, x2, y2 x1, y1, x2, y2 = obj.obj_data["estimate_velocity"] average_velocity = ( (x1 + x2) / 2, (y1 + y2) / 2, (x1 + x2) / 2, (y1 + y2) / 2, ) # get euclidean distance of the two points, sometimes the estimate is way off distance = np.linalg.norm([x2 - x1, y2 - y1]) if distance <= 7: # this box could exceed the frame boundaries if velocity is high # but we'll handle that in _enqueue_move() as two separate moves predicted_box = [ round(x + camera_fps * predicted_movement_time * v) for x, v in zip(obj.obj_data["box"], average_velocity) ] centroid_x = round((predicted_box[0] + predicted_box[2]) / 2) centroid_y = round((predicted_box[1] + predicted_box[3]) / 2) # recalculate pan and tilt with new centroid pan = ((centroid_x / camera_width) - 0.5) * 2 tilt = (0.5 - (centroid_y / camera_height)) * 2 logger.debug(f'Original box: {obj.obj_data["box"]}') logger.debug(f"Predicted box: {predicted_box}") logger.debug(f'Velocity: {obj.obj_data["estimate_velocity"]}') zoom = self._get_zoom_amount(camera, obj, predicted_box) self._enqueue_move(camera, obj.obj_data["frame_time"], pan, tilt, zoom) def _autotrack_move_zoom_only(self, camera, obj): camera_config = self.config.cameras[camera] if camera_config.onvif.autotracking.zooming == ZoomingModeEnum.absolute: zoom = self._get_zoom_amount(camera, obj, obj.obj_data["box"]) self._enqueue_move(camera, obj.obj_data["frame_time"], 0, 0, zoom) def _get_zoom_amount(self, camera, obj, predicted_box): camera_config = self.config.cameras[camera] # frame width and height camera_width = camera_config.frame_shape[1] camera_height = camera_config.frame_shape[0] zoom = 0 # absolute zooming separately from pan/tilt if camera_config.onvif.autotracking.zooming == ZoomingModeEnum.absolute: zoom_level = self.ptz_metrics[camera]["ptz_zoom_level"].value # don't zoom on initial move if self.tracked_object_previous[camera] is None: zoom = zoom_level else: if 0 < zoom_level <= 1: if self._should_zoom_in(camera, obj, obj.obj_data["box"]): zoom = min(1.0, zoom_level + 0.1) else: zoom = max(0.0, zoom_level - 0.1) # don't make small movements to zoom in if area hasn't changed significantly # but always zoom out if necessary if ( abs( obj.obj_data["area"] - self.tracked_object_previous[camera].obj_data["area"] ) / obj.obj_data["area"] < 0.3 and zoom <= zoom_level ): zoom = 0 # relative zooming concurrently with pan/tilt if camera_config.onvif.autotracking.zooming == ZoomingModeEnum.relative: # don't zoom on initial move if self.tracked_object_previous[camera] is None: zoom = 0 else: zoom_level = obj.obj_data["area"] / (camera_width * camera_height) if self._should_zoom_in( camera, obj, predicted_box if camera_config.onvif.autotracking.movement_weights else obj.obj_data["box"], ): # zoom in zoom = min(1, zoom_level * (1 / self.zoom_factor[camera]) ** 1.2) else: # zoom out zoom = -(1 - zoom) logger.debug(f"Zoom amount: {zoom}") return zoom def autotrack_object(self, camera, obj): camera_config = self.config.cameras[camera] if camera_config.onvif.autotracking.enabled: if not self.autotracker_init[camera]: self._autotracker_setup(camera_config, camera) if self.calibrating[camera]: logger.debug("Calibrating camera") return # this is a brand new object that's on our camera, has our label, entered the zone, # is not a false positive, and is not initially motionless if ( # new object self.tracked_object[camera] is None and obj.camera == camera and obj.obj_data["label"] in self.object_types[camera] and set(obj.entered_zones) & set(self.required_zones[camera]) and not obj.previous["false_positive"] and not obj.false_positive and self.tracked_object_previous[camera] is None and obj.obj_data["motionless_count"] == 0 ): logger.debug( f"Autotrack: New object: {obj.obj_data['id']} {obj.obj_data['box']} {obj.obj_data['frame_time']}" ) self.tracked_object[camera] = obj self._autotrack_move_ptz(camera, obj) self.tracked_object_previous[camera] = copy.deepcopy(obj) self.previous_frame_time[camera] = obj.obj_data["frame_time"] return if ( # already tracking an object self.tracked_object[camera] is not None and self.tracked_object_previous[camera] is not None and obj.obj_data["id"] == self.tracked_object[camera].obj_data["id"] and obj.obj_data["frame_time"] != self.previous_frame_time and not ptz_moving_at_frame_time( obj.obj_data["frame_time"], self.ptz_metrics[camera]["ptz_start_time"].value, self.ptz_metrics[camera]["ptz_stop_time"].value, ) ): if ( self._below_distance_threshold(camera, obj) and intersection_over_union( self.tracked_object_previous[camera].obj_data["box"], obj.obj_data["box"], ) > 0.2 ): logger.debug( f"Autotrack: Existing object (do NOT move ptz): {obj.obj_data['id']} {obj.obj_data['box']} {obj.obj_data['frame_time']}" ) # no need to move, but try zooming self._autotrack_move_zoom_only(camera, obj) return logger.debug( f"Autotrack: Existing object (need to move ptz): {obj.obj_data['id']} {obj.obj_data['box']} {obj.obj_data['frame_time']}" ) self._autotrack_move_ptz(camera, obj) self.tracked_object_previous[camera] = copy.deepcopy(obj) self.previous_frame_time[camera] = obj.obj_data["frame_time"] return if ( # The tracker lost an object, so let's check the previous object's region and compare it with the incoming object # If it's within bounds, start tracking that object. # Should we check region (maybe too broad) or expand the previous object's box a bit and check that? self.tracked_object[camera] is None and obj.camera == camera and obj.obj_data["label"] in self.object_types[camera] and not obj.previous["false_positive"] and not obj.false_positive and self.tracked_object_previous[camera] is not None ): if ( intersection_over_union( self.tracked_object_previous[camera].obj_data["region"], obj.obj_data["box"], ) < 0.2 ): logger.debug( f"Autotrack: Reacquired object: {obj.obj_data['id']} {obj.obj_data['box']} {obj.obj_data['frame_time']}" ) self.tracked_object[camera] = obj self._autotrack_move_ptz(camera, obj) self.tracked_object_previous[camera] = copy.deepcopy(obj) self.previous_frame_time[camera] = obj.obj_data["frame_time"] return def end_object(self, camera, obj): if self.config.cameras[camera].onvif.autotracking.enabled: if ( self.tracked_object[camera] is not None and obj.obj_data["id"] == self.tracked_object[camera].obj_data["id"] ): logger.debug( f"Autotrack: End object: {obj.obj_data['id']} {obj.obj_data['box']}" ) self.tracked_object[camera] = None def camera_maintenance(self, camera): # bail and don't check anything if we're calibrating or tracking an object if self.calibrating[camera] or self.tracked_object[camera] is not None: return logger.debug("Running camera maintenance") # calls get_camera_status to check/update ptz movement # returns camera to preset after timeout when tracking is over autotracker_config = self.config.cameras[camera].onvif.autotracking if not self.autotracker_init[camera]: self._autotracker_setup(self.config.cameras[camera], camera) # regularly update camera status if not self.ptz_metrics[camera]["ptz_stopped"].is_set(): self.onvif.get_camera_status(camera) # return to preset if tracking is over if ( self.tracked_object[camera] is None and self.tracked_object_previous[camera] is not None and ( # might want to use a different timestamp here? self.ptz_metrics[camera]["ptz_frame_time"].value - self.tracked_object_previous[camera].obj_data["frame_time"] >= autotracker_config.timeout ) and autotracker_config.return_preset ): # empty move queue while not self.move_queues[camera].empty(): self.move_queues[camera].get() # clear tracked object self.tracked_object[camera] = None self.tracked_object_previous[camera] = None self.ptz_metrics[camera]["ptz_stopped"].wait() logger.debug( f"Autotrack: Time is {self.ptz_metrics[camera]['ptz_frame_time'].value}, returning to preset: {autotracker_config.return_preset}" ) self.onvif._move_to_preset( camera, autotracker_config.return_preset.lower(), ) self.ptz_metrics[camera]["ptz_reset"].set()