use a different strategy for clustering motion and object boxes

This commit is contained in:
Blake Blackshear 2023-06-10 14:13:23 -05:00
parent beb59458d7
commit cfa8a39720
2 changed files with 284 additions and 23 deletions

171
frigate/test/test_video.py Normal file
View File

@ -0,0 +1,171 @@
import unittest
import cv2
import numpy as np
from norfair.drawing.color import Palette
from norfair.drawing.drawer import Drawer
from frigate.video import (
get_cluster_boundary,
get_cluster_candidates,
get_cluster_regions,
)
def draw_box(frame, box, color=(255, 0, 0), thickness=2):
cv2.rectangle(
frame,
(box[0], box[1]),
(box[2], box[3]),
color,
thickness,
)
def save_clusters_image(name, boxes, candidates, regions=[]):
canvas = np.zeros((1000, 2000, 3), np.uint8)
for cluster in candidates:
color = Palette.choose_color(np.random.rand())
for b in cluster:
box = boxes[b]
draw_box(canvas, box, color, 2)
# bottom right
text_anchor = (
box[2],
box[3],
)
canvas = Drawer.text(
canvas,
str(b),
position=text_anchor,
size=None,
color=(255, 255, 255),
thickness=None,
)
for r in regions:
draw_box(canvas, r, (0, 255, 0), 2)
cv2.imwrite(
f"debug/frames/{name}.jpg",
canvas,
)
def save_cluster_boundary_image(name, boxes, bounding_boxes):
canvas = np.zeros((1000, 2000, 3), np.uint8)
color = Palette.choose_color(np.random.rand())
for box in boxes:
draw_box(canvas, box, color, 2)
for bound in bounding_boxes:
draw_box(canvas, bound, (0, 255, 0), 2)
cv2.imwrite(
f"debug/frames/{name}.jpg",
canvas,
)
class TestConfig(unittest.TestCase):
def setUp(self):
self.frame_shape = (1000, 2000)
self.min_region_size = 160
def test_cluster_candidates(self):
boxes = [(100, 100, 200, 200), (202, 150, 252, 200), (900, 900, 950, 950)]
cluster_candidates = get_cluster_candidates(
self.frame_shape, self.min_region_size, boxes
)
# save_clusters_image("cluster_candidates", boxes, cluster_candidates)
assert len(cluster_candidates) == 2
def test_cluster_boundary(self):
boxes = [(100, 100, 200, 200), (215, 215, 325, 325)]
boundary_boxes = [get_cluster_boundary(box) for box in boxes]
# save_cluster_boundary_image("bound", boxes, boundary_boxes)
assert len(boundary_boxes) == 2
def test_cluster_regions(self):
boxes = [(100, 100, 200, 200), (202, 150, 252, 200), (900, 900, 950, 950)]
cluster_candidates = get_cluster_candidates(
self.frame_shape, self.min_region_size, boxes
)
regions = get_cluster_regions(
self.frame_shape, self.min_region_size, cluster_candidates, boxes
)
# save_clusters_image("regions", boxes, cluster_candidates, regions)
assert len(regions) == 2
def test_box_too_small_for_cluster(self):
boxes = [(100, 100, 600, 600), (655, 100, 700, 145)]
cluster_candidates = get_cluster_candidates(
self.frame_shape, self.min_region_size, boxes
)
regions = get_cluster_regions(
self.frame_shape, self.min_region_size, cluster_candidates, boxes
)
# save_clusters_image("too_small", boxes, cluster_candidates, regions)
assert len(cluster_candidates) == 2
assert len(regions) == 2
def test_redundant_clusters(self):
boxes = [(100, 100, 200, 200), (215, 215, 325, 325)]
cluster_candidates = get_cluster_candidates(
self.frame_shape, self.min_region_size, boxes
)
regions = get_cluster_regions(
self.frame_shape, self.min_region_size, cluster_candidates, boxes
)
# save_clusters_image("redundant", boxes, cluster_candidates, regions)
assert len(cluster_candidates) == 2
assert all([len(c) == 1 for c in cluster_candidates])
assert len(regions) == 2
def test_combine_boxes(self):
boxes = [
(460, 0, 561, 144),
(565, 0, 586, 71),
]
# boundary_boxes = [get_cluster_boundary(box) for box in boxes]
# save_cluster_boundary_image("combine_bound", boxes, boundary_boxes)
cluster_candidates = get_cluster_candidates(
self.frame_shape, self.min_region_size, boxes
)
regions = get_cluster_regions(
self.frame_shape, self.min_region_size, cluster_candidates, boxes
)
# save_clusters_image("combine", boxes, cluster_candidates, regions)
assert len(regions) == 1
def test_dont_combine_boxes(self):
boxes = [(460, 0, 532, 129), (586, 0, 606, 46)]
# boundary_boxes = [get_cluster_boundary(box) for box in boxes]
# save_cluster_boundary_image("dont_combine_bound", boxes, boundary_boxes)
cluster_candidates = get_cluster_candidates(
self.frame_shape, self.min_region_size, boxes
)
regions = get_cluster_regions(
self.frame_shape, self.min_region_size, cluster_candidates, boxes
)
# save_clusters_image("dont_combine", boxes, cluster_candidates, regions)
assert len(regions) == 2

View File

@ -1,5 +1,6 @@
import datetime
import logging
import math
import multiprocessing as mp
import os
import queue
@ -507,6 +508,13 @@ def box_overlaps(b1, b2):
return True
def box_inside(b1, b2):
# check if b2 is inside b1
if b2[0] >= b1[0] and b2[1] >= b1[1] and b2[2] <= b1[2] and b2[3] <= b1[3]:
return True
return False
def reduce_boxes(boxes, iou_threshold=0.0):
clusters = []
@ -577,6 +585,102 @@ def detect(
return detections
def get_cluster_boundary(box):
# compute the max region size for the current box (box is 20% of region)
box_width = box[2] - box[0]
box_height = box[3] - box[1]
max_region_area = abs(box_width * box_height) / 0.2
max_region_size = max(160, int(math.sqrt(max_region_area)))
centroid = (box_width / 2 + box[0], box_height / 2 + box[1])
max_x_dist = int(max_region_size - box_width / 2 * 1.1)
max_y_dist = int(max_region_size - box_height / 2 * 1.1)
return [
int(centroid[0] - max_x_dist),
int(centroid[1] - max_y_dist),
int(centroid[0] + max_x_dist),
int(centroid[1] + max_y_dist),
]
def get_cluster_candidates(frame_shape, min_region, boxes):
# and create a cluster of other boxes using it's max region size
# only include boxes where the region is an appropriate(except the region could possibly be smaller?)
# size in the cluster. in order to be in the cluster, the furthest corner needs to be within x,y offset
# determined by the max_region size minus half the box + 20%
# TODO: see if we can do this with numpy
cluster_candidates = []
used_boxes = []
# loop over each box
for current_index, b in enumerate(boxes):
if current_index in used_boxes:
continue
cluster = [current_index]
used_boxes.append(current_index)
cluster_boundary = get_cluster_boundary(b)
# find all other boxes that fit inside the boundary
for compare_index, compare_box in enumerate(boxes):
if compare_index in used_boxes:
continue
# get the region if you were to add this box to the cluster
cluster_regions = get_cluster_regions(
frame_shape, min_region, [cluster + [compare_index]], boxes
)
# if adding the box would result in multiple regions, dont cluster
if len(cluster_regions) > 1:
continue
# if the box is inside the potential cluster area, cluster them
if box_inside(cluster_boundary, compare_box):
cluster.append(compare_index)
used_boxes.append(compare_index)
cluster_candidates.append(cluster)
# return the unique clusters only
unique = {tuple(sorted(c)) for c in cluster_candidates}
return [list(tup) for tup in unique]
def get_cluster_regions(frame_shape, min_region, clusters, boxes):
regions = []
for c in clusters:
min_x = frame_shape[1]
min_y = frame_shape[0]
max_x = 0
max_y = 0
for b in c:
min_x = min(boxes[b][0], min_x)
min_y = min(boxes[b][1], min_y)
max_x = max(boxes[b][2], max_x)
max_y = max(boxes[b][3], max_y)
region = calculate_region(
frame_shape, min_x, min_y, max_x, max_y, min_region, multiplier=1.2
)
regions.append(region)
# TODO: move this to a dedicated check function so it doesnt run again
# now check each box in the region to ensure it's not too small
# if it is, create a dedicated region for it
if len(c) > 1 and (region[2] - region[0]) > min_region:
for b in c:
box = boxes[b]
if area(box) / area(region) < 0.05:
regions.append(
calculate_region(
frame_shape,
box[0],
box[1],
box[2],
box[3],
min_region,
multiplier=1.35,
)
)
return regions
def process_frames(
camera_name: str,
frame_queue: mp.Queue,
@ -605,6 +709,8 @@ def process_frames(
startup_scan_counter = 0
region_min_size = int(max(model_config.height, model_config.width) / 2)
while not stop_event.is_set():
if exit_on_empty and frame_queue.empty():
logger.info("Exiting track_objects...")
@ -661,31 +767,15 @@ def process_frames(
if obj["id"] not in stationary_object_ids
]
# combine motion boxes with known locations of existing objects
combined_boxes = reduce_boxes(motion_boxes + tracked_object_boxes)
combined_boxes = motion_boxes + tracked_object_boxes
region_min_size = max(model_config.height, model_config.width)
# compute regions
regions = [
calculate_region(
frame_shape,
a[0],
a[1],
a[2],
a[3],
region_min_size,
multiplier=random.uniform(1.2, 1.5),
cluster_candidates = get_cluster_candidates(
frame_shape, region_min_size, combined_boxes
)
for a in combined_boxes
]
# consolidate regions with heavy overlap
regions = [
calculate_region(
frame_shape, a[0], a[1], a[2], a[3], region_min_size, multiplier=1.0
regions = get_cluster_regions(
frame_shape, region_min_size, cluster_candidates, combined_boxes
)
for a in reduce_boxes(regions, 0.4)
]
# if starting up, get the next startup scan region
if startup_scan_counter < 9: