much improved motion estimation and tracking

This commit is contained in:
Josh Hawkins 2023-07-10 15:27:02 -05:00
parent 5e772c3625
commit ecb820e940
6 changed files with 138 additions and 48 deletions

View File

@ -142,6 +142,12 @@ class FrigateApp:
self.config.cameras[camera_name].onvif.autotracking.enabled, self.config.cameras[camera_name].onvif.autotracking.enabled,
), ),
"ptz_stopped": mp.Event(), "ptz_stopped": mp.Event(),
"ptz_start_time": mp.Value("d", 0.0), # type: ignore[typeddict-item]
# issue https://github.com/python/typeshed/issues/8799
# from mypy 0.981 onwards
"ptz_stop_time": mp.Value("d", 0.0), # type: ignore[typeddict-item]
# issue https://github.com/python/typeshed/issues/8799
# from mypy 0.981 onwards
"motion_threshold": mp.Value( # type: ignore[typeddict-item] "motion_threshold": mp.Value( # type: ignore[typeddict-item]
# issue https://github.com/python/typeshed/issues/8799 # issue https://github.com/python/typeshed/issues/8799
# from mypy 0.981 onwards # from mypy 0.981 onwards

View File

@ -2,6 +2,7 @@
import copy import copy
import logging import logging
import math
import queue import queue
import threading import threading
import time import time
@ -20,24 +21,38 @@ from frigate.util.image import SharedMemoryFrameManager, intersection_over_union
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def ptz_moving_at_frame_time(frame_time, ptz_start_time, ptz_stop_time):
# Determine if the PTZ was in motion at the set frame time
# for non ptz/autotracking cameras, this will always return False
# ptz_start_time is initialized to 0 on startup and only changes
# when autotracking movements are made
# the offset "primes" the motion estimator with a few frames before movement
offset = 0.5
return (ptz_start_time != 0.0 and frame_time >= ptz_start_time - offset) and (
ptz_stop_time == 0.0 or (ptz_start_time - offset <= frame_time <= ptz_stop_time)
)
class PtzMotionEstimator: class PtzMotionEstimator:
def __init__(self, config: CameraConfig, ptz_stopped) -> None: def __init__(self, config: CameraConfig, ptz_start_time, ptz_stop_time) -> None:
self.frame_manager = SharedMemoryFrameManager() self.frame_manager = SharedMemoryFrameManager()
# homography is nice (zooming) but slow, translation is pan/tilt only but fast. # homography is nice (zooming) but slow, translation is pan/tilt only but fast.
self.norfair_motion_estimator = MotionEstimator( self.norfair_motion_estimator = MotionEstimator(
transformations_getter=TranslationTransformationGetter(), transformations_getter=TranslationTransformationGetter(),
min_distance=30, min_distance=30,
max_points=500, max_points=900,
) )
self.camera_config = config self.camera_config = config
self.coord_transformations = None self.coord_transformations = None
self.ptz_stopped = ptz_stopped self.ptz_start_time = ptz_start_time
self.ptz_stop_time = ptz_stop_time
logger.debug(f"Motion estimator init for cam: {config.name}") logger.debug(f"Motion estimator init for cam: {config.name}")
def motion_estimator(self, detections, frame_time, camera_name): def motion_estimator(self, detections, frame_time, camera_name):
if ( if ptz_moving_at_frame_time(
self.camera_config.onvif.autotracking.enabled frame_time, self.ptz_start_time.value, self.ptz_stop_time.value
and not self.ptz_stopped.is_set()
): ):
logger.debug( logger.debug(
f"Motion estimator running for {camera_name} - frame time: {frame_time}" f"Motion estimator running for {camera_name} - frame time: {frame_time}"
@ -76,8 +91,6 @@ class PtzMotionEstimator:
return self.coord_transformations return self.coord_transformations
return None
class PtzAutoTrackerThread(threading.Thread): class PtzAutoTrackerThread(threading.Thread):
def __init__( def __init__(
@ -198,12 +211,6 @@ class PtzAutoTracker:
move_data = self.move_queues[camera].get() move_data = self.move_queues[camera].get()
pan, tilt = move_data pan, tilt = move_data
# check if ptz is moving
self.onvif.get_camera_status(camera)
# Wait until the camera finishes moving
self.camera_metrics[camera]["ptz_stopped"].wait()
self.onvif._move_relative(camera, pan, tilt, 1) self.onvif._move_relative(camera, pan, tilt, 1)
# Wait until the camera finishes moving # Wait until the camera finishes moving
@ -237,10 +244,7 @@ class PtzAutoTracker:
def autotrack_object(self, camera, obj): def autotrack_object(self, camera, obj):
camera_config = self.config.cameras[camera] camera_config = self.config.cameras[camera]
if ( if camera_config.onvif.autotracking.enabled:
camera_config.onvif.autotracking.enabled
and self.camera_metrics[camera]["ptz_stopped"].is_set()
):
# either this is a brand new object that's on our camera, has our label, entered the zone, is not a false positive, # either this is a brand new object that's on our camera, has our label, entered the zone, is not a false positive,
# and is not initially motionless - or one we're already tracking, which assumes all those things are already true # and is not initially motionless - or one we're already tracking, which assumes all those things are already true
if ( if (
@ -259,6 +263,10 @@ class PtzAutoTracker:
) )
self.tracked_object[camera] = obj self.tracked_object[camera] = obj
self.tracked_object_previous[camera] = copy.deepcopy(obj) self.tracked_object_previous[camera] = copy.deepcopy(obj)
# only enqueue another move if the camera isn't moving
if self.camera_metrics[camera]["ptz_stopped"].is_set():
self.camera_metrics[camera]["ptz_stopped"].clear()
logger.debug("Autotrack: New object, moving ptz")
self._autotrack_move_ptz(camera, obj) self._autotrack_move_ptz(camera, obj)
return return
@ -271,27 +279,56 @@ class PtzAutoTracker:
and obj.obj_data["frame_time"] and obj.obj_data["frame_time"]
!= self.tracked_object_previous[camera].obj_data["frame_time"] != self.tracked_object_previous[camera].obj_data["frame_time"]
): ):
# don't move the ptz if we're relatively close to the existing box # Don't move ptz if Euclidean distance from object to center of frame is
# should we use iou or euclidean distance or both? # less than 15% of the of the larger dimension (width or height) of the frame,
# distance = math.sqrt((obj.obj_data["centroid"][0] - camera_width/2)**2 + (obj.obj_data["centroid"][1] - obj.camera_height/2)**2) # multiplied by a scaling factor for object size.
# if distance <= (self.camera_width * .15) or distance <= (self.camera_height * .15) # Adjusting this percentage slightly lower will effectively cause the camera to move
if ( # more often to keep the object in the center. Raising the percentage will cause less
intersection_over_union( # movement and will be more flexible with objects not quite being centered.
# TODO: there's probably a better way to approach this
distance = math.sqrt(
(obj.obj_data["centroid"][0] - camera_config.detect.width / 2) ** 2
+ (obj.obj_data["centroid"][1] - camera_config.detect.height / 2)
** 2
)
obj_width = obj.obj_data["box"][2] - obj.obj_data["box"][0]
obj_height = obj.obj_data["box"][3] - obj.obj_data["box"][1]
max_obj = max(obj_width, obj_height)
max_frame = max(camera_config.detect.width, camera_config.detect.height)
# larger objects should lower the threshold, smaller objects should raise it
scaling_factor = 1 - (max_obj / max_frame)
distance_threshold = 0.15 * (max_frame) * scaling_factor
iou = intersection_over_union(
self.tracked_object_previous[camera].obj_data["box"], self.tracked_object_previous[camera].obj_data["box"],
obj.obj_data["box"], obj.obj_data["box"],
) )
> 0.5
): logger.debug(
f"Distance: {distance}, threshold: {distance_threshold}, iou: {iou}"
)
if (distance < distance_threshold or iou > 0.5) and self.camera_metrics[
camera
]["ptz_stopped"].is_set():
logger.debug( logger.debug(
f"Autotrack: Existing object (do NOT move ptz): {obj.obj_data['id']} {obj.obj_data['box']} {obj.obj_data['frame_time']}" f"Autotrack: Existing object (do NOT move ptz): {obj.obj_data['id']} {obj.obj_data['box']} {obj.obj_data['frame_time']}"
) )
self.tracked_object_previous[camera] = copy.deepcopy(obj)
return return
logger.debug( logger.debug(
f"Autotrack: Existing object (move ptz): {obj.obj_data['id']} {obj.obj_data['box']} {obj.obj_data['frame_time']}" f"Autotrack: Existing object (need to move ptz): {obj.obj_data['id']} {obj.obj_data['box']} {obj.obj_data['frame_time']}"
) )
self.tracked_object_previous[camera] = copy.deepcopy(obj) self.tracked_object_previous[camera] = copy.deepcopy(obj)
# only enqueue another move if the camera isn't moving
if self.camera_metrics[camera]["ptz_stopped"].is_set():
self.camera_metrics[camera]["ptz_stopped"].clear()
logger.debug("Autotrack: Existing object, moving ptz")
self._autotrack_move_ptz(camera, obj) self._autotrack_move_ptz(camera, obj)
return return
@ -320,6 +357,10 @@ class PtzAutoTracker:
) )
self.tracked_object[camera] = obj self.tracked_object[camera] = obj
self.tracked_object_previous[camera] = copy.deepcopy(obj) self.tracked_object_previous[camera] = copy.deepcopy(obj)
# only enqueue another move if the camera isn't moving
if self.camera_metrics[camera]["ptz_stopped"].is_set():
self.camera_metrics[camera]["ptz_stopped"].clear()
logger.debug("Autotrack: Reacquired object, moving ptz")
self._autotrack_move_ptz(camera, obj) self._autotrack_move_ptz(camera, obj)
return return
@ -334,7 +375,6 @@ class PtzAutoTracker:
f"Autotrack: End object: {obj.obj_data['id']} {obj.obj_data['box']}" f"Autotrack: End object: {obj.obj_data['id']} {obj.obj_data['box']}"
) )
self.tracked_object[camera] = None self.tracked_object[camera] = None
self.onvif.get_camera_status(camera)
def camera_maintenance(self, camera): def camera_maintenance(self, camera):
# calls get_camera_status to check/update ptz movement # calls get_camera_status to check/update ptz movement

View File

@ -1,5 +1,6 @@
"""Configure and control camera via onvif.""" """Configure and control camera via onvif."""
import datetime
import logging import logging
import site import site
from enum import Enum from enum import Enum
@ -207,7 +208,6 @@ class OnvifController:
return return
logger.debug(f"{camera_name} called RelativeMove: pan: {pan} tilt: {tilt}") logger.debug(f"{camera_name} called RelativeMove: pan: {pan} tilt: {tilt}")
self.get_camera_status(camera_name)
if self.cams[camera_name]["active"]: if self.cams[camera_name]["active"]:
logger.warning( logger.warning(
@ -217,6 +217,11 @@ class OnvifController:
self.cams[camera_name]["active"] = True self.cams[camera_name]["active"] = True
self.camera_metrics[camera_name]["ptz_stopped"].clear() self.camera_metrics[camera_name]["ptz_stopped"].clear()
logger.debug(f"PTZ start time: {datetime.datetime.now().timestamp()}")
self.camera_metrics[camera_name][
"ptz_start_time"
].value = datetime.datetime.now().timestamp()
self.camera_metrics[camera_name]["ptz_stop_time"].value = 0
onvif: ONVIFCamera = self.cams[camera_name]["onvif"] onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
move_request = self.cams[camera_name]["relative_move_request"] move_request = self.cams[camera_name]["relative_move_request"]
@ -345,11 +350,26 @@ class OnvifController:
if status.MoveStatus.PanTilt == "IDLE" and status.MoveStatus.Zoom == "IDLE": if status.MoveStatus.PanTilt == "IDLE" and status.MoveStatus.Zoom == "IDLE":
self.cams[camera_name]["active"] = False self.cams[camera_name]["active"] = False
if not self.camera_metrics[camera_name]["ptz_stopped"].is_set():
self.camera_metrics[camera_name]["ptz_stopped"].set() self.camera_metrics[camera_name]["ptz_stopped"].set()
logger.debug(f"PTZ stop time: {datetime.datetime.now().timestamp()}")
self.camera_metrics[camera_name][
"ptz_stop_time"
].value = datetime.datetime.now().timestamp()
else: else:
self.cams[camera_name]["active"] = True self.cams[camera_name]["active"] = True
if self.camera_metrics[camera_name]["ptz_stopped"].is_set():
self.camera_metrics[camera_name]["ptz_stopped"].clear() self.camera_metrics[camera_name]["ptz_stopped"].clear()
logger.debug(f"PTZ start time: {datetime.datetime.now().timestamp()}")
self.camera_metrics[camera_name][
"ptz_start_time"
].value = datetime.datetime.now().timestamp()
self.camera_metrics[camera_name]["ptz_stop_time"].value = 0
return { return {
"pan": status.Position.PanTilt.x, "pan": status.Position.PanTilt.x,
"tilt": status.Position.PanTilt.y, "tilt": status.Position.PanTilt.y,

View File

@ -55,15 +55,20 @@ def frigate_distance(detection: Detection, tracked_object) -> float:
class NorfairTracker(ObjectTracker): class NorfairTracker(ObjectTracker):
def __init__(self, config: CameraConfig, ptz_autotracker_enabled, ptz_stopped): def __init__(
self,
config: CameraConfig,
ptz_autotracker_enabled,
ptz_start_time,
ptz_stop_time,
):
self.tracked_objects = {} self.tracked_objects = {}
self.disappeared = {} self.disappeared = {}
self.positions = {} self.positions = {}
self.max_disappeared = config.detect.max_disappeared self.max_disappeared = config.detect.max_disappeared
self.camera_config = config self.camera_config = config
self.detect_config = config.detect self.detect_config = config.detect
self.ptz_autotracker_enabled = ptz_autotracker_enabled.value self.ptz_autotracker_enabled = ptz_autotracker_enabled
self.ptz_stopped = ptz_stopped
self.camera_name = config.name self.camera_name = config.name
self.track_id_map = {} self.track_id_map = {}
# TODO: could also initialize a tracker per object class if there # TODO: could also initialize a tracker per object class if there
@ -74,8 +79,10 @@ class NorfairTracker(ObjectTracker):
initialization_delay=0, initialization_delay=0,
hit_counter_max=self.max_disappeared, hit_counter_max=self.max_disappeared,
) )
if self.ptz_autotracker_enabled: if self.ptz_autotracker_enabled.value:
self.ptz_motion_estimator = PtzMotionEstimator(config, self.ptz_stopped) self.ptz_motion_estimator = PtzMotionEstimator(
config, ptz_start_time, ptz_stop_time
)
def register(self, track_id, obj): def register(self, track_id, obj):
rand_id = "".join(random.choices(string.ascii_lowercase + string.digits, k=6)) rand_id = "".join(random.choices(string.ascii_lowercase + string.digits, k=6))
@ -239,7 +246,7 @@ class NorfairTracker(ObjectTracker):
coord_transformations = None coord_transformations = None
if self.ptz_autotracker_enabled: if self.ptz_autotracker_enabled.value:
coord_transformations = self.ptz_motion_estimator.motion_estimator( coord_transformations = self.ptz_motion_estimator.motion_estimator(
detections, frame_time, self.camera_name detections, frame_time, self.camera_name
) )

View File

@ -20,6 +20,8 @@ class CameraMetricsTypes(TypedDict):
improve_contrast_enabled: Synchronized improve_contrast_enabled: Synchronized
ptz_autotracker_enabled: Synchronized ptz_autotracker_enabled: Synchronized
ptz_stopped: Event ptz_stopped: Event
ptz_start_time: Synchronized
ptz_stop_time: Synchronized
motion_threshold: Synchronized motion_threshold: Synchronized
motion_contour_area: Synchronized motion_contour_area: Synchronized
process: Optional[Process] process: Optional[Process]

View File

@ -22,6 +22,7 @@ from frigate.log import LogPipe
from frigate.motion import MotionDetector from frigate.motion import MotionDetector
from frigate.motion.improved_motion import ImprovedMotionDetector from frigate.motion.improved_motion import ImprovedMotionDetector
from frigate.object_detection import RemoteObjectDetector from frigate.object_detection import RemoteObjectDetector
from frigate.ptz.autotrack import ptz_moving_at_frame_time
from frigate.track import ObjectTracker from frigate.track import ObjectTracker
from frigate.track.norfair_tracker import NorfairTracker from frigate.track.norfair_tracker import NorfairTracker
from frigate.util.builtin import EventsPerSecond from frigate.util.builtin import EventsPerSecond
@ -479,6 +480,8 @@ def track_camera(
motion_enabled = process_info["motion_enabled"] motion_enabled = process_info["motion_enabled"]
improve_contrast_enabled = process_info["improve_contrast_enabled"] improve_contrast_enabled = process_info["improve_contrast_enabled"]
ptz_autotracker_enabled = process_info["ptz_autotracker_enabled"] ptz_autotracker_enabled = process_info["ptz_autotracker_enabled"]
ptz_start_time = process_info["ptz_start_time"]
ptz_stop_time = process_info["ptz_stop_time"]
ptz_stopped = process_info["ptz_stopped"] ptz_stopped = process_info["ptz_stopped"]
motion_threshold = process_info["motion_threshold"] motion_threshold = process_info["motion_threshold"]
motion_contour_area = process_info["motion_contour_area"] motion_contour_area = process_info["motion_contour_area"]
@ -499,7 +502,9 @@ def track_camera(
name, labelmap, detection_queue, result_connection, model_config, stop_event name, labelmap, detection_queue, result_connection, model_config, stop_event
) )
object_tracker = NorfairTracker(config, ptz_autotracker_enabled, ptz_stopped) object_tracker = NorfairTracker(
config, ptz_autotracker_enabled, ptz_start_time, ptz_stop_time
)
frame_manager = SharedMemoryFrameManager() frame_manager = SharedMemoryFrameManager()
@ -520,6 +525,8 @@ def track_camera(
detection_enabled, detection_enabled,
motion_enabled, motion_enabled,
stop_event, stop_event,
ptz_start_time,
ptz_stop_time,
ptz_stopped, ptz_stopped,
) )
@ -745,6 +752,8 @@ def process_frames(
detection_enabled: mp.Value, detection_enabled: mp.Value,
motion_enabled: mp.Value, motion_enabled: mp.Value,
stop_event, stop_event,
ptz_start_time: mp.Value,
ptz_stop_time: mp.Value,
ptz_stopped: mp.Event, ptz_stopped: mp.Event,
exit_on_empty: bool = False, exit_on_empty: bool = False,
): ):
@ -781,10 +790,16 @@ def process_frames(
logger.info(f"{camera_name}: frame {frame_time} is not in memory store.") logger.info(f"{camera_name}: frame {frame_time} is not in memory store.")
continue continue
# look for motion if enabled # look for motion if enabled and ptz is not moving
# ptz_moving_at_frame_time() always returns False for
# non ptz/autotracking cameras
motion_boxes = ( motion_boxes = (
motion_detector.detect(frame) motion_detector.detect(frame)
if motion_enabled.value and ptz_stopped.is_set() if motion_enabled.value
and ptz_stopped.is_set()
and not ptz_moving_at_frame_time(
frame_time, ptz_start_time.value, ptz_stop_time.value
)
else [] else []
) )