diff --git a/frigate/test/test_video.py b/frigate/test/test_video.py index 99736f658..f2667bb62 100644 --- a/frigate/test/test_video.py +++ b/frigate/test/test_video.py @@ -6,7 +6,7 @@ from norfair.drawing.color import Palette from norfair.drawing.drawer import Drawer from frigate.util.image import intersection -from frigate.video import ( +from frigate.util.object import ( get_cluster_boundary, get_cluster_candidates, get_cluster_region, diff --git a/frigate/util/object.py b/frigate/util/object.py index ec06602d5..ae2a96a3d 100644 --- a/frigate/util/object.py +++ b/frigate/util/object.py @@ -2,6 +2,7 @@ import datetime import logging +import math import cv2 import numpy as np @@ -11,7 +12,10 @@ from frigate.config import CameraConfig, ModelConfig from frigate.detectors.detector_config import PixelFormatEnum from frigate.models import Event, Regions, Timeline from frigate.util.image import ( + area, calculate_region, + intersection, + intersection_over_union, yuv_region_2_bgr, yuv_region_2_rgb, yuv_region_2_yuv, @@ -273,3 +277,171 @@ def create_tensor_input(frame, model_config: ModelConfig, region): # Expand dimensions since the model expects images to have shape: [1, height, width, 3] return np.expand_dims(cropped_frame, axis=0) + + +def box_overlaps(b1, b2): + if b1[2] < b2[0] or b1[0] > b2[2] or b1[1] > b2[3] or b1[3] < b2[1]: + return False + return True + + +def box_inside(b1, b2): + # check if b2 is inside b1 + if b2[0] >= b1[0] and b2[1] >= b1[1] and b2[2] <= b1[2] and b2[3] <= b1[3]: + return True + return False + + +def reduce_boxes(boxes, iou_threshold=0.0): + clusters = [] + + for box in boxes: + matched = 0 + for cluster in clusters: + if intersection_over_union(box, cluster) > iou_threshold: + matched = 1 + cluster[0] = min(cluster[0], box[0]) + cluster[1] = min(cluster[1], box[1]) + cluster[2] = max(cluster[2], box[2]) + cluster[3] = max(cluster[3], box[3]) + + if not matched: + clusters.append(list(box)) + + return [tuple(c) for c in clusters] + + +def intersects_any(box_a, boxes): + for box in boxes: + if box_overlaps(box_a, box): + return True + return False + + +def inside_any(box_a, boxes): + for box in boxes: + # check if box_a is inside of box + if box_inside(box, box_a): + return True + return False + + +def get_cluster_boundary(box, min_region): + # compute the max region size for the current box (box is 10% of region) + box_width = box[2] - box[0] + box_height = box[3] - box[1] + max_region_area = abs(box_width * box_height) / 0.1 + max_region_size = max(min_region, int(math.sqrt(max_region_area))) + + centroid = (box_width / 2 + box[0], box_height / 2 + box[1]) + + max_x_dist = int(max_region_size - box_width / 2 * 1.1) + max_y_dist = int(max_region_size - box_height / 2 * 1.1) + + return [ + int(centroid[0] - max_x_dist), + int(centroid[1] - max_y_dist), + int(centroid[0] + max_x_dist), + int(centroid[1] + max_y_dist), + ] + + +def get_cluster_candidates( + frame_shape, min_region, boxes, region_grid: list[list[dict[str, any]]] +): + # and create a cluster of other boxes using it's max region size + # only include boxes where the region is an appropriate(except the region could possibly be smaller?) + # size in the cluster. in order to be in the cluster, the furthest corner needs to be within x,y offset + # determined by the max_region size minus half the box + 20% + # TODO: see if we can do this with numpy + cluster_candidates = [] + used_boxes = [] + # loop over each box + for current_index, b in enumerate(boxes): + if current_index in used_boxes: + continue + cluster = [current_index] + used_boxes.append(current_index) + cluster_boundary = get_cluster_boundary(b, min_region) + # find all other boxes that fit inside the boundary + for compare_index, compare_box in enumerate(boxes): + if compare_index in used_boxes: + continue + + # if the box is not inside the potential cluster area, cluster them + if not box_inside(cluster_boundary, compare_box): + continue + + # get the region if you were to add this box to the cluster + potential_cluster = cluster + [compare_index] + cluster_region = get_cluster_region( + frame_shape, min_region, potential_cluster, boxes + ) + # if region could be smaller and either box would be too small + # for the resulting region, dont cluster + should_cluster = True + if (cluster_region[2] - cluster_region[0]) > min_region: + for b in potential_cluster: + box = boxes[b] + # boxes should be more than 5% of the area of the region + if area(box) / area(cluster_region) < 0.05: + should_cluster = False + break + + if should_cluster: + cluster.append(compare_index) + used_boxes.append(compare_index) + cluster_candidates.append(cluster) + + # return the unique clusters only + unique = {tuple(sorted(c)) for c in cluster_candidates} + return [list(tup) for tup in unique] + + +def get_cluster_region(frame_shape, min_region, cluster, boxes): + min_x = frame_shape[1] + min_y = frame_shape[0] + max_x = 0 + max_y = 0 + for b in cluster: + min_x = min(boxes[b][0], min_x) + min_y = min(boxes[b][1], min_y) + max_x = max(boxes[b][2], max_x) + max_y = max(boxes[b][3], max_y) + return calculate_region( + frame_shape, min_x, min_y, max_x, max_y, min_region, multiplier=1.2 + ) + + +def get_consolidated_object_detections(detected_object_groups): + """Drop detections that overlap too much""" + consolidated_detections = [] + for group in detected_object_groups.values(): + # if the group only has 1 item, skip + if len(group) == 1: + consolidated_detections.append(group[0]) + continue + + # sort smallest to largest by area + sorted_by_area = sorted(group, key=lambda g: g[3]) + + for current_detection_idx in range(0, len(sorted_by_area)): + current_detection = sorted_by_area[current_detection_idx][2] + overlap = 0 + for to_check_idx in range( + min(current_detection_idx + 1, len(sorted_by_area)), + len(sorted_by_area), + ): + to_check = sorted_by_area[to_check_idx][2] + intersect_box = intersection(current_detection, to_check) + # if 90% of smaller detection is inside of another detection, consolidate + if ( + intersect_box is not None + and area(intersect_box) / area(current_detection) > 0.9 + ): + overlap = 1 + break + if overlap == 0: + consolidated_detections.append(sorted_by_area[current_detection_idx]) + + return consolidated_detections diff --git a/frigate/video.py b/frigate/video.py index b5735fcf6..891958f79 100755 --- a/frigate/video.py +++ b/frigate/video.py @@ -1,6 +1,5 @@ import datetime import logging -import math import multiprocessing as mp import os import queue @@ -27,16 +26,19 @@ from frigate.util.builtin import EventsPerSecond from frigate.util.image import ( FrameManager, SharedMemoryFrameManager, - area, calculate_region, draw_box_with_label, - intersection, - intersection_over_union, ) from frigate.util.object import ( + box_inside, create_tensor_input, + get_cluster_candidates, + get_cluster_region, get_cluster_region_from_grid, + get_consolidated_object_detections, get_min_region_size, + inside_any, + intersects_any, is_object_filtered, ) from frigate.util.services import listen @@ -445,53 +447,6 @@ def track_camera( logger.info(f"{name}: exiting subprocess") -def box_overlaps(b1, b2): - if b1[2] < b2[0] or b1[0] > b2[2] or b1[1] > b2[3] or b1[3] < b2[1]: - return False - return True - - -def box_inside(b1, b2): - # check if b2 is inside b1 - if b2[0] >= b1[0] and b2[1] >= b1[1] and b2[2] <= b1[2] and b2[3] <= b1[3]: - return True - return False - - -def reduce_boxes(boxes, iou_threshold=0.0): - clusters = [] - - for box in boxes: - matched = 0 - for cluster in clusters: - if intersection_over_union(box, cluster) > iou_threshold: - matched = 1 - cluster[0] = min(cluster[0], box[0]) - cluster[1] = min(cluster[1], box[1]) - cluster[2] = max(cluster[2], box[2]) - cluster[3] = max(cluster[3], box[3]) - - if not matched: - clusters.append(list(box)) - - return [tuple(c) for c in clusters] - - -def intersects_any(box_a, boxes): - for box in boxes: - if box_overlaps(box_a, box): - return True - return False - - -def inside_any(box_a, boxes): - for box in boxes: - # check if box_a is inside of box - if box_inside(box, box_a): - return True - return False - - def detect( detect_config: DetectConfig, object_detector, @@ -536,127 +491,6 @@ def detect( return detections -def get_cluster_boundary(box, min_region): - # compute the max region size for the current box (box is 10% of region) - box_width = box[2] - box[0] - box_height = box[3] - box[1] - max_region_area = abs(box_width * box_height) / 0.1 - max_region_size = max(min_region, int(math.sqrt(max_region_area))) - - centroid = (box_width / 2 + box[0], box_height / 2 + box[1]) - - max_x_dist = int(max_region_size - box_width / 2 * 1.1) - max_y_dist = int(max_region_size - box_height / 2 * 1.1) - - return [ - int(centroid[0] - max_x_dist), - int(centroid[1] - max_y_dist), - int(centroid[0] + max_x_dist), - int(centroid[1] + max_y_dist), - ] - - -def get_cluster_candidates( - frame_shape, min_region, boxes, region_grid: list[list[dict[str, any]]] -): - # and create a cluster of other boxes using it's max region size - # only include boxes where the region is an appropriate(except the region could possibly be smaller?) - # size in the cluster. in order to be in the cluster, the furthest corner needs to be within x,y offset - # determined by the max_region size minus half the box + 20% - # TODO: see if we can do this with numpy - cluster_candidates = [] - used_boxes = [] - # loop over each box - for current_index, b in enumerate(boxes): - if current_index in used_boxes: - continue - cluster = [current_index] - used_boxes.append(current_index) - cluster_boundary = get_cluster_boundary(b, min_region) - # find all other boxes that fit inside the boundary - for compare_index, compare_box in enumerate(boxes): - if compare_index in used_boxes: - continue - - # if the box is not inside the potential cluster area, cluster them - if not box_inside(cluster_boundary, compare_box): - continue - - # get the region if you were to add this box to the cluster - potential_cluster = cluster + [compare_index] - cluster_region = get_cluster_region( - frame_shape, min_region, potential_cluster, boxes - ) - # if region could be smaller and either box would be too small - # for the resulting region, dont cluster - should_cluster = True - if (cluster_region[2] - cluster_region[0]) > min_region: - for b in potential_cluster: - box = boxes[b] - # boxes should be more than 5% of the area of the region - if area(box) / area(cluster_region) < 0.05: - should_cluster = False - break - - if should_cluster: - cluster.append(compare_index) - used_boxes.append(compare_index) - cluster_candidates.append(cluster) - - # return the unique clusters only - unique = {tuple(sorted(c)) for c in cluster_candidates} - return [list(tup) for tup in unique] - - -def get_cluster_region(frame_shape, min_region, cluster, boxes): - min_x = frame_shape[1] - min_y = frame_shape[0] - max_x = 0 - max_y = 0 - for b in cluster: - min_x = min(boxes[b][0], min_x) - min_y = min(boxes[b][1], min_y) - max_x = max(boxes[b][2], max_x) - max_y = max(boxes[b][3], max_y) - return calculate_region( - frame_shape, min_x, min_y, max_x, max_y, min_region, multiplier=1.2 - ) - - -def get_consolidated_object_detections(detected_object_groups): - """Drop detections that overlap too much""" - consolidated_detections = [] - for group in detected_object_groups.values(): - # if the group only has 1 item, skip - if len(group) == 1: - consolidated_detections.append(group[0]) - continue - - # sort smallest to largest by area - sorted_by_area = sorted(group, key=lambda g: g[3]) - - for current_detection_idx in range(0, len(sorted_by_area)): - current_detection = sorted_by_area[current_detection_idx][2] - overlap = 0 - for to_check_idx in range( - min(current_detection_idx + 1, len(sorted_by_area)), - len(sorted_by_area), - ): - to_check = sorted_by_area[to_check_idx][2] - intersect_box = intersection(current_detection, to_check) - # if 90% of smaller detection is inside of another detection, consolidate - if ( - intersect_box is not None - and area(intersect_box) / area(current_detection) > 0.9 - ): - overlap = 1 - break - if overlap == 0: - consolidated_detections.append(sorted_by_area[current_detection_idx]) - - return consolidated_detections - - def process_frames( camera_name: str, frame_queue: mp.Queue,