diff --git a/motion_estimator.py b/motion_estimator.py new file mode 100644 index 000000000..62949341e --- /dev/null +++ b/motion_estimator.py @@ -0,0 +1,284 @@ +import argparse +from functools import partial + +import numpy as np +import torch +from norfair import ( + AbsolutePaths, + Detection, + FixedCamera, + Tracker, + Video, + draw_absolute_grid, +) +from norfair.camera_motion import ( + HomographyTransformationGetter, + MotionEstimator, + TranslationTransformationGetter, +) +from norfair.drawing import draw_tracked_objects + + +def yolo_detections_to_norfair_detections(yolo_detections, track_boxes): + norfair_detections = [] + boxes = [] + detections_as_xyxy = yolo_detections.xyxy[0] + for detection_as_xyxy in detections_as_xyxy: + detection_as_xyxy = detection_as_xyxy.cpu().numpy() + bbox = np.array( + [ + [detection_as_xyxy[0].item(), detection_as_xyxy[1].item()], + [detection_as_xyxy[2].item(), detection_as_xyxy[3].item()], + ] + ) + boxes.append(bbox) + if track_boxes: + points = bbox + scores = np.array([detection_as_xyxy[4], detection_as_xyxy[4]]) + else: + points = bbox.mean(axis=0, keepdims=True) + scores = detection_as_xyxy[[4]] + + norfair_detections.append( + Detection(points=points, scores=scores, label=detection_as_xyxy[-1].item()) + ) + + return norfair_detections, boxes + + +def run(): + parser = argparse.ArgumentParser(description="Track objects in a video.") + parser.add_argument("files", type=str, nargs="+", help="Video files to process") + parser.add_argument( + "--model", + type=str, + default="yolov5n", + help="YOLO model to use, possible values are yolov5n, yolov5s, yolov5m, yolov5l, yolov5x", + ) + parser.add_argument( + "--confidence-threshold", + type=float, + help="Confidence threshold of detections", + default=0.15, + ) + parser.add_argument( + "--distance-threshold", + type=float, + default=0.8, + help="Max distance to consider when matching detections and tracked objects", + ) + parser.add_argument( + "--initialization-delay", + type=float, + default=3, + help="Min detections needed to start the tracked object", + ) + parser.add_argument( + "--track-boxes", + dest="track_boxes", + action="store_true", + help="Pass it to track bounding boxes instead of just the centroids", + ) + parser.add_argument( + "--hit-counter-max", + type=int, + default=30, + help="Max iteration the tracked object is kept after when there are no detections", + ) + parser.add_argument( + "--iou-threshold", type=float, help="Iou threshold for detector", default=0.15 + ) + parser.add_argument( + "--image-size", type=int, help="Size of the images for detector", default=480 + ) + parser.add_argument( + "--classes", type=int, nargs="+", default=[0], help="Classes to track" + ) + parser.add_argument( + "--transformation", + default="homography", + help="Type of transformation, possible values are homography, translation, none", + ) + parser.add_argument( + "--max-points", + type=int, + default=500, + help="Max points sampled to calculate camera motion", + ) + parser.add_argument( + "--min-distance", + type=float, + default=7, + help="Min distance between points sampled to calculate camera motion", + ) + parser.add_argument( + "--no-mask-detections", + dest="mask_detections", + action="store_false", + default=True, + help="By default we don't sample regions where objects were detected when estimating camera motion. Pass this flag to disable this behavior", + ) + parser.add_argument( + "--save", + dest="save", + action="store_true", + help="Pass this flag to save the video instead of showing the frames", + ) + parser.add_argument( + "--output-name", + default=None, + help="Name of the output file", + ) + parser.add_argument( + "--downsample-ratio", + type=int, + default=1, + help="Downsample ratio when showing frames", + ) + parser.add_argument( + "--fixed-camera-scale", + type=float, + default=0, + help="Scale of the fixed camera, set to 0 to disable. Note that this only works for translation", + ) + parser.add_argument( + "--draw-absolute-grid", + dest="absolute_grid", + action="store_true", + help="Pass this flag to draw absolute grid for reference", + ) + parser.add_argument( + "--draw-objects", + dest="draw_objects", + action="store_true", + help="Pass this flag to draw tracked object as points or as boxes if --track-boxes is used.", + ) + parser.add_argument( + "--draw-paths", + dest="draw_paths", + action="store_true", + help="Pass this flag to draw the paths of the objects (SLOW)", + ) + parser.add_argument( + "--path-history", + type=int, + default=20, + help="Length of the paths", + ) + parser.add_argument( + "--id-size", + type=float, + default=None, + help="Size multiplier of the ids when drawing. Thikness will addapt to size", + ) + parser.add_argument( + "--draw-flow", + dest="draw_flow", + action="store_true", + help="Pass this flag to draw the optical flow of the selected points", + ) + + args = parser.parse_args() + + model = torch.hub.load("ultralytics/yolov5", args.model) + model.conf_threshold = 0 + model.iou_threshold = args.iou_threshold + model.image_size = args.image_size + model.classes = args.classes + + use_fixed_camera = args.fixed_camera_scale > 0 + tracked_objects = [] + # Process Videos + for input_path in args.files: + if args.transformation == "homography": + transformations_getter = HomographyTransformationGetter() + elif args.transformation == "translation": + transformations_getter = TranslationTransformationGetter() + elif args.transformation == "none": + transformations_getter = None + else: + raise ValueError(f"invalid transformation {args.transformation}") + if transformations_getter is not None: + motion_estimator = MotionEstimator( + max_points=args.max_points, + min_distance=args.min_distance, + transformations_getter=transformations_getter, + draw_flow=args.draw_flow, + ) + else: + motion_estimator = None + + if use_fixed_camera: + fixed_camera = FixedCamera(scale=args.fixed_camera_scale) + + if args.draw_paths: + path_drawer = AbsolutePaths(max_history=args.path_history, thickness=2) + + video = Video(input_path=input_path) + show_or_write = ( + video.write + if args.save + else partial(video.show, downsample_ratio=args.downsample_ratio) + ) + + tracker = Tracker( + distance_function="euclidean", + detection_threshold=args.confidence_threshold, + distance_threshold=args.distance_threshold, + initialization_delay=args.initialization_delay, + hit_counter_max=args.hit_counter_max, + ) + for frame in video: + detections = model(frame) + detections, boxes = yolo_detections_to_norfair_detections( + detections, args.track_boxes + ) + + mask = None + if args.mask_detections: + # create a mask of ones + mask = np.ones(frame.shape[:2], frame.dtype) + # set to 0 all detections + for b in boxes: + i = b.astype(int) + mask[i[0, 1] : i[1, 1], i[0, 0] : i[1, 0]] = 0 + if args.track_boxes: + for obj in tracked_objects: + i = obj.estimate.astype(int) + mask[i[0, 1] : i[1, 1], i[0, 0] : i[1, 0]] = 0 + + if motion_estimator is None: + coord_transformations = None + else: + coord_transformations = motion_estimator.update(frame, mask) + + tracked_objects = tracker.update( + detections=detections, coord_transformations=coord_transformations + ) + + if args.draw_objects: + draw_tracked_objects( + frame, + tracked_objects, + id_size=args.id_size, + id_thickness=None + if args.id_size is None + else int(args.id_size * 2), + ) + + if args.absolute_grid: + draw_absolute_grid(frame, coord_transformations) + + if args.draw_paths: + frame = path_drawer.draw( + frame, tracked_objects, coord_transform=coord_transformations + ) + + if use_fixed_camera: + frame = fixed_camera.adjust_frame(frame, coord_transformations) + + show_or_write(frame) + + +if __name__ == "__main__": + run()