Refactor active objects to class

This commit is contained in:
Nicolas Mowen 2025-08-30 08:23:49 -06:00
parent d78b6e528b
commit c5f83b947a

View File

@ -148,6 +148,92 @@ class PendingReviewSegment:
)
class ActiveObjects:
def __init__(
self,
frame_time: float,
camera_config: CameraConfig,
all_objects: list[TrackedObject],
):
# get current categorization of objects to know if
# these objects are currently being categorized
self.categorized_objects = {
"alerts": [],
"detections": [],
}
for o in all_objects:
if (
o["motionless_count"] >= camera_config.detect.stationary.threshold
and not o["pending_loitering"]
):
# no stationary objects unless loitering
continue
if o["position_changes"] == 0:
# object must have moved at least once
continue
if o["frame_time"] != frame_time:
# object must be detected in this frame
continue
if o["false_positive"]:
# object must not be a false positive
continue
if (
o["label"] in camera_config.review.alerts.labels
and (
not camera_config.review.alerts.required_zones
or (
len(o["current_zones"]) > 0
and set(o["current_zones"])
& o(camera_config.review.alerts.required_zones)
)
)
and camera_config.review.alerts.enabled
):
self.categorized_objects["alerts"].append(o)
if (
(
camera_config.review.detections.labels is None
or o["label"] in camera_config.review.detections.labels
)
and (
not camera_config.review.detections.required_zones
or (
len(o["current_zones"]) > 0
and set(o["current_zones"])
& set(camera_config.review.detections.required_zones)
)
)
and camera_config.review.detections.enabled
):
self.categorized_objects["detections"].append(o)
def has_active_objects(self) -> bool:
return (
len(self.categorized_objects["alerts"]) > 0
or len(self.categorized_objects["detections"]) > 0
)
def get_activity_category(self) -> SeverityEnum | None:
if len(self.categorized_objects["alerts"]) > 0:
return SeverityEnum.alert
if len(self.categorized_objects["detections"]) > 0:
return SeverityEnum.detection
return None
def get_all_objects(self) -> list[TrackedObject]:
return (
self.categorized_objects["alerts"] + self.categorized_objects["detections"]
)
class ReviewSegmentMaintainer(threading.Thread):
"""Maintain review segments."""
@ -273,13 +359,11 @@ class ReviewSegmentMaintainer(threading.Thread):
camera_config = self.config.cameras[segment.camera]
# get active objects + objects loitering in loitering zones
active_objects = get_active_objects(
frame_time, camera_config, objects
) + get_loitering_objects(frame_time, camera_config, objects)
activity = ActiveObjects(frame_time, camera_config, objects)
prev_data = segment.get_data(False)
has_activity = False
if len(active_objects) > 0:
if activity.has_active_objects():
has_activity = True
should_update_image = False
should_update_state = False
@ -287,7 +371,7 @@ class ReviewSegmentMaintainer(threading.Thread):
if frame_time > segment.last_update:
segment.last_update = frame_time
for object in active_objects:
for object in activity.get_all_objects():
if not object["sub_label"]:
segment.detections[object["id"]] = object["label"]
elif object["sub_label"][0] in self.config.model.all_attributes:
@ -301,16 +385,7 @@ class ReviewSegmentMaintainer(threading.Thread):
# mark this review as alert
if (
segment.severity != SeverityEnum.alert
and object["label"] in camera_config.review.alerts.labels
and (
not camera_config.review.alerts.required_zones
or (
len(object["current_zones"]) > 0
and set(object["current_zones"])
& set(camera_config.review.alerts.required_zones)
)
)
and camera_config.review.alerts.enabled
and activity.get_activity_category() == SeverityEnum.alert
):
segment.severity = SeverityEnum.alert
should_update_state = True
@ -322,7 +397,7 @@ class ReviewSegmentMaintainer(threading.Thread):
if zone not in segment.zones:
segment.zones.append(zone)
if len(active_objects) > segment.frame_active_count:
if len(activity.get_all_objects()) > segment.frame_active_count:
should_update_state = True
should_update_image = True
@ -343,7 +418,11 @@ class ReviewSegmentMaintainer(threading.Thread):
yuv_frame = None
self._publish_segment_update(
segment, camera_config, yuv_frame, active_objects, prev_data
segment,
camera_config,
yuv_frame,
activity.get_all_objects(),
prev_data,
)
self.frame_manager.close(frame_name)
except FileNotFoundError:
@ -384,15 +463,30 @@ class ReviewSegmentMaintainer(threading.Thread):
) -> None:
"""Check if a new review segment should be created."""
camera_config = self.config.cameras[camera]
active_objects = get_active_objects(frame_time, camera_config, objects)
activity = ActiveObjects(frame_time, camera_config, objects)
if len(active_objects) > 0:
if activity.has_active_objects():
detections: dict[str, str] = {}
sub_labels: dict[str, str] = {}
zones: list[str] = []
severity = None
for object in active_objects:
# if activity is alert category mark this review as alert
if (
severity != SeverityEnum.alert
and activity.get_activity_category() == SeverityEnum.alert
):
severity = SeverityEnum.alert
# if object is detection label and not already higher severity
# mark this review as detection
if (
not severity
and activity.get_activity_category() == SeverityEnum.detection
):
severity = SeverityEnum.detection
for object in activity.get_all_objects():
if not object["sub_label"]:
detections[object["id"]] = object["label"]
elif object["sub_label"][0] in self.config.model.all_attributes:
@ -401,46 +495,6 @@ class ReviewSegmentMaintainer(threading.Thread):
detections[object["id"]] = f"{object['label']}-verified"
sub_labels[object["id"]] = object["sub_label"][0]
# if object is alert label
# and has entered required zones or required zones is not set
# mark this review as alert
if (
severity != SeverityEnum.alert
and object["label"] in camera_config.review.alerts.labels
and (
not camera_config.review.alerts.required_zones
or (
len(object["current_zones"]) > 0
and set(object["current_zones"])
& set(camera_config.review.alerts.required_zones)
)
)
and camera_config.review.alerts.enabled
):
severity = SeverityEnum.alert
# if object is detection label
# and review is not already a detection or alert
# and has entered required zones or required zones is not set
# mark this review as detection
if (
not severity
and (
camera_config.review.detections.labels is None
or object["label"] in (camera_config.review.detections.labels)
)
and (
not camera_config.review.detections.required_zones
or (
len(object["current_zones"]) > 0
and set(object["current_zones"])
& set(camera_config.review.detections.required_zones)
)
)
and camera_config.review.detections.enabled
):
severity = SeverityEnum.detection
for zone in object["current_zones"]:
if zone not in zones:
zones.append(zone)
@ -466,7 +520,7 @@ class ReviewSegmentMaintainer(threading.Thread):
return
self.active_review_segments[camera].update_frame(
camera_config, yuv_frame, active_objects
camera_config, yuv_frame, activity.get_all_objects()
)
self.frame_manager.close(frame_name)
self._publish_segment_start(self.active_review_segments[camera])
@ -717,46 +771,3 @@ class ReviewSegmentMaintainer(threading.Thread):
self.requestor.stop()
self.detection_subscriber.stop()
logger.info("Exiting review maintainer...")
def get_active_objects(
frame_time: float, camera_config: CameraConfig, all_objects: list[TrackedObject]
) -> list[TrackedObject]:
"""get active objects for detection."""
return [
o
for o in all_objects
if o["motionless_count"]
< camera_config.detect.stationary.threshold # no stationary objects
and o["position_changes"] > 0 # object must have moved at least once
and o["frame_time"] == frame_time # object must be detected in this frame
and not o["false_positive"] # object must not be a false positive
and (
o["label"] in camera_config.review.alerts.labels
or (
camera_config.review.detections.labels is None
or o["label"] in camera_config.review.detections.labels
)
) # object must be in the alerts or detections label list
]
def get_loitering_objects(
frame_time: float, camera_config: CameraConfig, all_objects: list[TrackedObject]
) -> list[TrackedObject]:
"""get loitering objects for detection."""
return [
o
for o in all_objects
if o["pending_loitering"] # object must be pending loitering
and o["position_changes"] > 0 # object must have moved at least once
and o["frame_time"] == frame_time # object must be detected in this frame
and not o["false_positive"] # object must not be a false positive
and (
o["label"] in camera_config.review.alerts.labels
or (
camera_config.review.detections.labels is None
or o["label"] in camera_config.review.detections.labels
)
) # object must be in the alerts or detections label list
]