Refactor attribute saving

This commit is contained in:
Nicolas Mowen 2024-10-01 07:09:24 -06:00
parent fafe5623d1
commit 6c971ca37a
4 changed files with 113 additions and 19 deletions

View File

@ -227,8 +227,8 @@ class TrackedObject:
if self.attributes[attr["label"]] < attr["score"]:
self.attributes[attr["label"]] = attr["score"]
# populate the sub_label for car with highest scoring logo
if self.obj_data["label"] == "car":
# populate the sub_label for object with highest scoring logo
if self.obj_data["label"] in ["car", "package", "person"]:
recognized_logos = {
k: self.attributes[k]
for k in ["ups", "fedex", "amazon"]

View File

@ -0,0 +1,50 @@
import unittest
from frigate.track.object_attribute import ObjectAttribute
class TestAttribute(unittest.TestCase):
def test_overlapping_object_selection(self) -> None:
attribute = ObjectAttribute(
(
"amazon",
0.80078125,
(847, 242, 883, 255),
468,
2.769230769230769,
(702, 134, 1050, 482),
)
)
objects = [
{
"label": "car",
"score": 0.98828125,
"box": (728, 223, 1266, 719),
"area": 266848,
"ratio": 1.0846774193548387,
"region": (349, 0, 1397, 1048),
"frame_time": 1727785394.498972,
"centroid": (997, 471),
"id": "1727785349.150633-408hal",
"start_time": 1727785349.150633,
"motionless_count": 362,
"position_changes": 0,
"score_history": [0.98828125, 0.95703125, 0.98828125, 0.98828125],
},
{
"label": "person",
"score": 0.76953125,
"box": (826, 172, 939, 417),
"area": 27685,
"ratio": 0.46122448979591835,
"region": (702, 134, 1050, 482),
"frame_time": 1727785394.498972,
"centroid": (882, 294),
"id": "1727785390.499768-9fbhem",
"start_time": 1727785390.499768,
"motionless_count": 2,
"position_changes": 1,
"score_history": [0.8828125, 0.83984375, 0.91796875, 0.94140625],
},
]
assert attribute.find_best_object(objects) == "1727785390.499768-9fbhem"

View File

@ -0,0 +1,44 @@
"""Object attribute."""
from frigate.util.object import area, box_inside
class ObjectAttribute:
def __init__(self, raw_data: tuple) -> None:
self.label = raw_data[0]
self.score = raw_data[1]
self.box = raw_data[2]
self.area = raw_data[3]
self.ratio = raw_data[4]
self.region = raw_data[5]
def get_tracking_data(self) -> dict[str, any]:
"""Return data saved to the object."""
return {
"label": self.label,
"score": self.score,
"box": self.box,
}
def find_best_object(self, objects: list[dict[str, any]]) -> str:
"""Find the best attribute for each object and return its ID."""
best_object_area = None
best_object_id = None
for obj in objects:
if not box_inside(obj["box"], self.box):
continue
object_area = area(obj["box"])
# if multiple objects have the same attribute then they
# are overlapping, it is most likely that the smaller object
# is the one with the attribute
if best_object_area is None:
best_object_area = object_area
best_object_id = obj["id"]
elif object_area < best_object_area:
best_object_area = object_area
best_object_id = obj["id"]
return best_object_id

View File

@ -27,6 +27,7 @@ from frigate.object_detection import RemoteObjectDetector
from frigate.ptz.autotrack import ptz_moving_at_frame_time
from frigate.track import ObjectTracker
from frigate.track.norfair_tracker import NorfairTracker
from frigate.track.object_attribute import ObjectAttribute
from frigate.util.builtin import EventsPerSecond, get_tomorrow_at_time
from frigate.util.image import (
FrameManager,
@ -734,29 +735,28 @@ def process_frames(
object_tracker.update_frame_times(frame_time)
# group the attribute detections based on what label they apply to
attribute_detections = {}
attribute_detections: dict[str, ObjectAttribute] = {}
for label, attribute_labels in model_config.attributes_map.items():
attribute_detections[label] = [
d for d in consolidated_detections if d[0] in attribute_labels
ObjectAttribute(d)
for d in consolidated_detections
if d[0] in attribute_labels
]
# build detections and add attributes
# build detections
detections = {}
for obj in object_tracker.tracked_objects.values():
attributes = []
# if the objects label has associated attribute detections
if obj["label"] in attribute_detections.keys():
# add them to attributes if they intersect
for attribute_detection in attribute_detections[obj["label"]]:
if box_inside(obj["box"], (attribute_detection[2])):
attributes.append(
{
"label": attribute_detection[0],
"score": attribute_detection[1],
"box": attribute_detection[2],
}
)
detections[obj["id"]] = {**obj, "attributes": attributes}
detections[obj["id"]] = {**obj, "attributes": []}
# find the best object for each attribute to be assigned to
all_objects: list[dict[str, any]] = object_tracker.tracked_objects.values()
for attributes in attribute_detections.values():
for attribute in attributes:
filtered_objects = filter(lambda o: o["label"] in attribute_detections.keys(), all_objects)
selected_object_id = attribute.find_best_object(filtered_objects)
if selected_object_id is not None:
detections[selected_object_id]["attributes"].append(attribute.get_tracking_data())
# debug object tracking
if False: