Add review mypy

This commit is contained in:
Nicolas Mowen 2026-03-25 16:50:29 -06:00
parent dcc9b6ebb1
commit e3f2ba3e42

View File

@ -69,7 +69,9 @@ class PendingReviewSegment:
self.last_alert_time = frame_time
# thumbnail
self._frame = np.zeros((THUMB_HEIGHT * 3 // 2, THUMB_WIDTH), np.uint8)
self._frame: np.ndarray[Any, Any] = np.zeros(
(THUMB_HEIGHT * 3 // 2, THUMB_WIDTH), np.uint8
)
self.has_frame = False
self.frame_active_count = 0
self.frame_path = os.path.join(
@ -77,8 +79,11 @@ class PendingReviewSegment:
)
def update_frame(
self, camera_config: CameraConfig, frame, objects: list[dict[str, Any]]
):
self,
camera_config: CameraConfig,
frame: np.ndarray,
objects: list[dict[str, Any]],
) -> None:
min_x = camera_config.frame_shape[1]
min_y = camera_config.frame_shape[0]
max_x = 0
@ -114,7 +119,7 @@ class PendingReviewSegment:
self.frame_path, self._frame, [int(cv2.IMWRITE_WEBP_QUALITY), 60]
)
def save_full_frame(self, camera_config: CameraConfig, frame):
def save_full_frame(self, camera_config: CameraConfig, frame: np.ndarray) -> None:
color_frame = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420)
width = int(THUMB_HEIGHT * color_frame.shape[1] / color_frame.shape[0])
self._frame = cv2.resize(
@ -171,7 +176,7 @@ class ActiveObjects:
# get current categorization of objects to know if
# these objects are currently being categorized
self.categorized_objects = {
self.categorized_objects: dict[str, list[dict[str, Any]]] = {
"alerts": [],
"detections": [],
}
@ -309,7 +314,7 @@ class ReviewSegmentMaintainer(threading.Thread):
"reviews",
json.dumps(review_update),
)
self.review_publisher.publish(review_update, segment.camera)
self.review_publisher.publish(review_update, segment.camera) # type: ignore[arg-type]
self.requestor.send_data(
f"{segment.camera}/review_status", segment.severity.value.upper()
)
@ -318,7 +323,7 @@ class ReviewSegmentMaintainer(threading.Thread):
self,
segment: PendingReviewSegment,
camera_config: CameraConfig,
frame,
frame: Optional[np.ndarray],
objects: list[dict[str, Any]],
prev_data: dict[str, Any],
) -> None:
@ -337,7 +342,7 @@ class ReviewSegmentMaintainer(threading.Thread):
"reviews",
json.dumps(review_update),
)
self.review_publisher.publish(review_update, segment.camera)
self.review_publisher.publish(review_update, segment.camera) # type: ignore[arg-type]
self.requestor.send_data(
f"{segment.camera}/review_status", segment.severity.value.upper()
)
@ -346,7 +351,7 @@ class ReviewSegmentMaintainer(threading.Thread):
self,
segment: PendingReviewSegment,
prev_data: dict[str, Any],
) -> float:
) -> Any:
"""End segment."""
final_data = segment.get_data(ended=True)
end_time = final_data[ReviewSegment.end_time.name]
@ -360,17 +365,18 @@ class ReviewSegmentMaintainer(threading.Thread):
"reviews",
json.dumps(review_update),
)
self.review_publisher.publish(review_update, segment.camera)
self.review_publisher.publish(review_update, segment.camera) # type: ignore[arg-type]
self.requestor.send_data(f"{segment.camera}/review_status", "NONE")
self.active_review_segments[segment.camera] = None
return end_time
def forcibly_end_segment(self, camera: str) -> float:
def forcibly_end_segment(self, camera: str) -> Any:
"""Forcibly end the pending segment for a camera."""
segment = self.active_review_segments.get(camera)
if segment:
prev_data = segment.get_data(False)
return self._publish_segment_end(segment, prev_data)
return None
def update_existing_segment(
self,
@ -492,8 +498,11 @@ class ReviewSegmentMaintainer(threading.Thread):
except FileNotFoundError:
return
if segment.severity == SeverityEnum.alert and frame_time > (
segment.last_alert_time + camera_config.review.alerts.cutoff_time
if (
segment.severity == SeverityEnum.alert
and segment.last_alert_time is not None
and frame_time
> (segment.last_alert_time + camera_config.review.alerts.cutoff_time)
):
needs_new_detection = (
segment.last_detection_time > segment.last_alert_time
@ -516,23 +525,18 @@ class ReviewSegmentMaintainer(threading.Thread):
new_zones.update(o["current_zones"])
if new_detections:
self.active_review_segments[activity.camera_config.name] = (
PendingReviewSegment(
activity.camera_config.name,
end_time,
SeverityEnum.detection,
new_detections,
sub_labels={},
audio=set(),
zones=list(new_zones),
)
new_segment = PendingReviewSegment(
segment.camera,
end_time,
SeverityEnum.detection,
new_detections,
sub_labels={},
audio=set(),
zones=list(new_zones),
)
self._publish_segment_start(
self.active_review_segments[activity.camera_config.name]
)
self.active_review_segments[
activity.camera_config.name
].last_detection_time = last_detection_time
self.active_review_segments[segment.camera] = new_segment
self._publish_segment_start(new_segment)
new_segment.last_detection_time = last_detection_time
elif segment.severity == SeverityEnum.detection and frame_time > (
segment.last_detection_time
+ camera_config.review.detections.cutoff_time
@ -581,7 +585,7 @@ class ReviewSegmentMaintainer(threading.Thread):
zones.append(zone)
if severity:
self.active_review_segments[camera] = PendingReviewSegment(
new_segment = PendingReviewSegment(
camera,
frame_time,
severity,
@ -590,6 +594,7 @@ class ReviewSegmentMaintainer(threading.Thread):
audio=set(),
zones=zones,
)
self.active_review_segments[camera] = new_segment
try:
yuv_frame = self.frame_manager.get(
@ -600,11 +605,11 @@ class ReviewSegmentMaintainer(threading.Thread):
logger.debug(f"Failed to get frame {frame_name} from SHM")
return
self.active_review_segments[camera].update_frame(
new_segment.update_frame(
camera_config, yuv_frame, activity.get_all_objects()
)
self.frame_manager.close(frame_name)
self._publish_segment_start(self.active_review_segments[camera])
self._publish_segment_start(new_segment)
except FileNotFoundError:
return
@ -621,9 +626,14 @@ class ReviewSegmentMaintainer(threading.Thread):
for camera in updated_topics["enabled"]:
self.forcibly_end_segment(camera)
(topic, data) = self.detection_subscriber.check_for_update(timeout=1)
result = self.detection_subscriber.check_for_update(timeout=1)
if not topic:
if not result:
continue
topic, data = result
if not topic or not data:
continue
if topic == DetectionTypeEnum.video.value:
@ -712,10 +722,13 @@ class ReviewSegmentMaintainer(threading.Thread):
if topic == DetectionTypeEnum.api:
# manual_info["label"] contains 'label: sub_label'
# so split out the label without modifying manual_info
det_labels = self.config.cameras[
camera
].review.detections.labels
if (
self.config.cameras[camera].review.detections.enabled
and manual_info["label"].split(": ")[0]
in self.config.cameras[camera].review.detections.labels
and det_labels is not None
and manual_info["label"].split(": ")[0] in det_labels
):
current_segment.last_detection_time = manual_info[
"end_time"
@ -744,14 +757,15 @@ class ReviewSegmentMaintainer(threading.Thread):
):
# manual_info["label"] contains 'label: sub_label'
# so split out the label without modifying manual_info
det_labels = self.config.cameras[
camera
].review.detections.labels
if (
not self.config.cameras[
camera
].review.detections.enabled
or manual_info["label"].split(": ")[0]
not in self.config.cameras[
camera
].review.detections.labels
or det_labels is None
or manual_info["label"].split(": ")[0] not in det_labels
):
current_segment.severity = SeverityEnum.alert
elif (
@ -828,17 +842,18 @@ class ReviewSegmentMaintainer(threading.Thread):
severity = None
# manual_info["label"] contains 'label: sub_label'
# so split out the label without modifying manual_info
det_labels = self.config.cameras[camera].review.detections.labels
if (
self.config.cameras[camera].review.detections.enabled
and manual_info["label"].split(": ")[0]
in self.config.cameras[camera].review.detections.labels
and det_labels is not None
and manual_info["label"].split(": ")[0] in det_labels
):
severity = SeverityEnum.detection
elif self.config.cameras[camera].review.alerts.enabled:
severity = SeverityEnum.alert
if severity:
self.active_review_segments[camera] = PendingReviewSegment(
api_segment = PendingReviewSegment(
camera,
frame_time,
severity,
@ -847,32 +862,25 @@ class ReviewSegmentMaintainer(threading.Thread):
[],
set(),
)
self.active_review_segments[camera] = api_segment
if manual_info["state"] == ManualEventState.start:
self.indefinite_events[camera][manual_info["event_id"]] = (
manual_info["label"]
)
# temporarily make it so this event can not end
self.active_review_segments[
camera
].last_alert_time = sys.maxsize
self.active_review_segments[
camera
].last_detection_time = sys.maxsize
api_segment.last_alert_time = sys.maxsize
api_segment.last_detection_time = sys.maxsize
elif manual_info["state"] == ManualEventState.complete:
self.active_review_segments[
camera
].last_alert_time = manual_info["end_time"]
self.active_review_segments[
camera
].last_detection_time = manual_info["end_time"]
api_segment.last_alert_time = manual_info["end_time"]
api_segment.last_detection_time = manual_info["end_time"]
else:
logger.warning(
f"Manual event API has been called for {camera}, but alerts and detections are disabled. This manual event will not appear as an alert or detection."
)
elif topic == DetectionTypeEnum.lpr:
if self.config.cameras[camera].review.detections.enabled:
self.active_review_segments[camera] = PendingReviewSegment(
lpr_segment = PendingReviewSegment(
camera,
frame_time,
SeverityEnum.detection,
@ -881,25 +889,18 @@ class ReviewSegmentMaintainer(threading.Thread):
[],
set(),
)
self.active_review_segments[camera] = lpr_segment
if manual_info["state"] == ManualEventState.start:
self.indefinite_events[camera][manual_info["event_id"]] = (
manual_info["label"]
)
# temporarily make it so this event can not end
self.active_review_segments[
camera
].last_alert_time = sys.maxsize
self.active_review_segments[
camera
].last_detection_time = sys.maxsize
lpr_segment.last_alert_time = sys.maxsize
lpr_segment.last_detection_time = sys.maxsize
elif manual_info["state"] == ManualEventState.complete:
self.active_review_segments[
camera
].last_alert_time = manual_info["end_time"]
self.active_review_segments[
camera
].last_detection_time = manual_info["end_time"]
lpr_segment.last_alert_time = manual_info["end_time"]
lpr_segment.last_detection_time = manual_info["end_time"]
else:
logger.warning(
f"Dedicated LPR camera API has been called for {camera}, but detections are disabled. LPR events will not appear as a detection."