basic zooming

This commit is contained in:
Josh Hawkins 2023-09-01 08:10:12 -05:00
parent 144264e760
commit c7bf8cb4a6
4 changed files with 131 additions and 15 deletions

View File

@ -179,6 +179,9 @@ class FrigateApp:
"ptz_stop_time": mp.Value("d", 0.0), # type: ignore[typeddict-item]
# issue https://github.com/python/typeshed/issues/8799
# from mypy 0.981 onwards
"ptz_zoom_level": mp.Value("d", 0.0), # type: ignore[typeddict-item]
# issue https://github.com/python/typeshed/issues/8799
# from mypy 0.981 onwards
}
self.ptz_metrics[camera_name]["ptz_stopped"].set()
self.feature_metrics[camera_name] = {

View File

@ -11,7 +11,7 @@ from multiprocessing.synchronize import Event as MpEvent
import cv2
import numpy as np
from norfair.camera_motion import MotionEstimator, TranslationTransformationGetter
from norfair.camera_motion import HomographyTransformationGetter, MotionEstimator
from frigate.config import CameraConfig, FrigateConfig
from frigate.ptz.onvif import OnvifController
@ -57,7 +57,7 @@ class PtzMotionEstimator:
logger.debug("Motion estimator reset")
# homography is nice (zooming) but slow, translation is pan/tilt only but fast.
self.norfair_motion_estimator = MotionEstimator(
transformations_getter=TranslationTransformationGetter(),
transformations_getter=HomographyTransformationGetter(),
min_distance=30,
max_points=900,
)
@ -97,9 +97,10 @@ class PtzMotionEstimator:
self.frame_manager.close(frame_id)
logger.debug(
f"Motion estimator transformation: {self.coord_transformations.rel_to_abs((0,0))}"
)
# doesn't work with homography
# logger.debug(
# f"Motion estimator transformation: {self.coord_transformations.rel_to_abs((0,0))}"
# )
return self.coord_transformations
@ -200,7 +201,7 @@ class PtzAutoTracker:
while True:
try:
move_data = self.move_queues[camera].get()
frame_time, pan, tilt = move_data
frame_time, pan, tilt, zoom = move_data
# if we're receiving move requests during a PTZ move, ignore them
if ptz_moving_at_frame_time(
@ -211,15 +212,18 @@ class PtzAutoTracker:
# instead of dequeueing this might be a good place to preemptively move based
# on an estimate - for fast moving objects, etc.
logger.debug(
f"Move queue: PTZ moving, dequeueing move request - frame time: {frame_time}, final pan: {pan}, final tilt: {tilt}"
f"Move queue: PTZ moving, dequeueing move request - frame time: {frame_time}, final pan: {pan}, final tilt: {tilt}, final zoom: {zoom}"
)
continue
else:
if abs(zoom) > 0:
self.onvif._zoom_relative(camera, zoom, 1)
# on some cameras with cheaper motors it seems like small values can cause jerky movement
# TODO: double check, might not need this
if abs(pan) > 0.02 or abs(tilt) > 0.02:
self.onvif._move_relative(camera, pan, tilt, 0, 1)
self.onvif._move_relative(camera, pan, tilt, 1)
else:
logger.debug(
f"Not moving, pan and tilt too small: {pan}, {tilt}"
@ -233,13 +237,15 @@ class PtzAutoTracker:
except queue.Empty:
continue
def _enqueue_move(self, camera, frame_time, pan, tilt):
move_data = (frame_time, pan, tilt)
def _enqueue_move(self, camera, frame_time, pan, tilt, zoom):
move_data = (frame_time, pan, tilt, zoom)
if (
frame_time > self.ptz_metrics[camera]["ptz_start_time"].value
and frame_time > self.ptz_metrics[camera]["ptz_stop_time"].value
):
logger.debug(f"enqueue pan: {pan}, enqueue tilt: {tilt}")
logger.debug(
f"enqueue pan: {pan}, enqueue tilt: {tilt}, enqueue zoom: {zoom}"
)
self.move_queues[camera].put(move_data)
def _autotrack_move_ptz(self, camera, obj):
@ -254,7 +260,31 @@ class PtzAutoTracker:
tilt = (0.5 - (obj.obj_data["centroid"][1] / camera_height)) * 2
# ideas: check object velocity for camera speed?
self._enqueue_move(camera, obj.obj_data["frame_time"], pan, tilt)
self._enqueue_move(camera, obj.obj_data["frame_time"], pan, tilt, 0)
def _autotrack_zoom_ptz(self, camera, obj):
camera_config = self.config.cameras[camera]
# frame width and height
camera_width = camera_config.frame_shape[1]
camera_height = camera_config.frame_shape[0]
bb_left, bb_top, bb_right, bb_bottom = obj.obj_data["box"]
zoom_level = self.ptz_metrics[camera]["ptz_zoom_level"].value
if -1 <= zoom_level < 1:
if (
bb_left > 0.1 * camera_width
and bb_right < 0.9 * camera_width
and bb_top > 0.1 * camera_height
and bb_bottom < 0.9 * camera_height
):
zoom = 0.1 # Zoom in
else:
zoom = -0.1 # Zoom out
self._enqueue_move(camera, obj.obj_data["frame_time"], 0, 0, zoom)
def autotrack_object(self, camera, obj):
camera_config = self.config.cameras[camera]
@ -331,6 +361,10 @@ class PtzAutoTracker:
logger.debug(
f"Autotrack: Existing object (do NOT move ptz): {obj.obj_data['id']} {obj.obj_data['box']} {obj.obj_data['frame_time']}"
)
# no need to move, but try zooming
self._autotrack_zoom_ptz(camera, obj)
return
logger.debug(
@ -339,6 +373,9 @@ class PtzAutoTracker:
self.tracked_object_previous[camera] = copy.deepcopy(obj)
self._autotrack_move_ptz(camera, obj)
# try zooming too
self._autotrack_zoom_ptz(camera, obj)
return
if (

View File

@ -73,6 +73,10 @@ class OnvifController:
return False
ptz = onvif.create_ptz_service()
request = ptz.create_type("GetConfigurations")
configs = ptz.GetConfigurations(request)[0]
request = ptz.create_type("GetConfigurationOptions")
request.ConfigurationToken = profile.PTZConfiguration.token
ptz_config = ptz.GetConfigurationOptions(request)
@ -158,6 +162,10 @@ class OnvifController:
if ptz_config.Spaces and ptz_config.Spaces.RelativeZoomTranslationSpace:
supported_features.append("zoom-r")
self.cams[camera_name][
"relative_zoom_range"
] = ptz_config.Spaces.RelativeZoomTranslationSpace[0]
self.cams[camera_name]["zoom_limits"] = configs.ZoomLimits
if fov_space_id is not None:
supported_features.append("pt-r-fov")
@ -214,7 +222,7 @@ class OnvifController:
onvif.get_service("ptz").ContinuousMove(move_request)
def _move_relative(self, camera_name: str, pan, tilt, zoom, speed) -> None:
def _move_relative(self, camera_name: str, pan, tilt, speed) -> None:
if "pt-r-fov" not in self.cams[camera_name]["features"]:
logger.error(f"{camera_name} does not support ONVIF RelativeMove (FOV).")
return
@ -267,11 +275,14 @@ class OnvifController:
move_request.Translation.PanTilt.y = tilt
if "zoom-r" in self.cams[camera_name]["features"]:
move_request.Speed["Zoom"] = 0
move_request.Translation.Zoom.x = zoom
move_request.Translation.Zoom.x = 0
onvif.get_service("ptz").RelativeMove(move_request)
# reset after the move request
move_request.Translation.PanTilt.x = 0
move_request.Translation.PanTilt.y = 0
self.cams[camera_name]["active"] = False
def _move_to_preset(self, camera_name: str, preset: str) -> None:
@ -311,6 +322,54 @@ class OnvifController:
onvif.get_service("ptz").ContinuousMove(move_request)
def _zoom_relative(self, camera_name: str, zoom, speed) -> None:
if "zoom-r" not in self.cams[camera_name]["features"]:
logger.error(f"{camera_name} does not support ONVIF RelativeMove zooming.")
return
logger.debug(f"{camera_name} called RelativeMove: zoom: {zoom}")
if self.cams[camera_name]["active"]:
logger.warning(
f"{camera_name} is already performing an action, not moving..."
)
return
self.cams[camera_name]["active"] = True
self.ptz_metrics[camera_name]["ptz_stopped"].clear()
logger.debug(f"PTZ start time: {datetime.datetime.now().timestamp()}")
self.ptz_metrics[camera_name][
"ptz_start_time"
].value = datetime.datetime.now().timestamp()
self.ptz_metrics[camera_name]["ptz_stop_time"].value = 0
onvif: ONVIFCamera = self.cams[camera_name]["onvif"]
move_request = self.cams[camera_name]["relative_move_request"]
# function takes in -1 to 1 for zoom, interpolate to the values of the camera.
zoom = numpy.interp(
zoom,
[-1, 1],
[
self.cams[camera_name]["relative_zoom_range"]["XRange"]["Min"],
self.cams[camera_name]["relative_zoom_range"]["XRange"]["Max"],
],
)
move_request.Speed = {"Zoom": speed}
move_request.Translation.PanTilt.x = 0
move_request.Translation.PanTilt.y = 0
move_request.Translation.Zoom.x = zoom
logger.debug(f"Relative zoom: {zoom}")
onvif.get_service("ptz").RelativeMove(move_request)
# reset after the move request
move_request.Translation.Zoom.x = 0
self.cams[camera_name]["active"] = False
def handle_command(
self, camera_name: str, command: OnvifCommandEnum, param: str = ""
) -> None:
@ -384,6 +443,22 @@ class OnvifController:
].value = datetime.datetime.now().timestamp()
self.ptz_metrics[camera_name]["ptz_stop_time"].value = 0
logger.debug(f"PTZ zoom level: {status.Position.Zoom.x}")
self.ptz_metrics[camera_name]["ptz_zoom_level"].value = numpy.interp(
status.Position.Zoom.x,
[
self.cams[camera_name]["zoom_limits"]["Range"]["XRange"]["Min"],
self.cams[camera_name]["zoom_limits"]["Range"]["XRange"]["Max"],
],
[
self.cams[camera_name]["relative_zoom_range"]["XRange"]["Min"],
self.cams[camera_name]["relative_zoom_range"]["XRange"]["Max"],
],
)
logger.debug(
f"Relative zoom level: {self.ptz_metrics[camera_name]['ptz_zoom_level'].value}"
)
return {
"pan": status.Position.PanTilt.x,
"tilt": status.Position.PanTilt.y,

View File

@ -31,6 +31,7 @@ class PTZMetricsTypes(TypedDict):
ptz_reset: Event
ptz_start_time: Synchronized
ptz_stop_time: Synchronized
ptz_zoom_level: Synchronized
class FeatureMetricsTypes(TypedDict):