From e3f2ba3e424cdd2f5d13339f77d04214079d4d42 Mon Sep 17 00:00:00 2001 From: Nicolas Mowen Date: Wed, 25 Mar 2026 16:50:29 -0600 Subject: [PATCH] Add review mypy --- frigate/review/maintainer.py | 137 ++++++++++++++++++----------------- 1 file changed, 69 insertions(+), 68 deletions(-) diff --git a/frigate/review/maintainer.py b/frigate/review/maintainer.py index 3faf8a99b..cfc59744c 100644 --- a/frigate/review/maintainer.py +++ b/frigate/review/maintainer.py @@ -69,7 +69,9 @@ class PendingReviewSegment: self.last_alert_time = frame_time # thumbnail - self._frame = np.zeros((THUMB_HEIGHT * 3 // 2, THUMB_WIDTH), np.uint8) + self._frame: np.ndarray[Any, Any] = np.zeros( + (THUMB_HEIGHT * 3 // 2, THUMB_WIDTH), np.uint8 + ) self.has_frame = False self.frame_active_count = 0 self.frame_path = os.path.join( @@ -77,8 +79,11 @@ class PendingReviewSegment: ) def update_frame( - self, camera_config: CameraConfig, frame, objects: list[dict[str, Any]] - ): + self, + camera_config: CameraConfig, + frame: np.ndarray, + objects: list[dict[str, Any]], + ) -> None: min_x = camera_config.frame_shape[1] min_y = camera_config.frame_shape[0] max_x = 0 @@ -114,7 +119,7 @@ class PendingReviewSegment: self.frame_path, self._frame, [int(cv2.IMWRITE_WEBP_QUALITY), 60] ) - def save_full_frame(self, camera_config: CameraConfig, frame): + def save_full_frame(self, camera_config: CameraConfig, frame: np.ndarray) -> None: color_frame = cv2.cvtColor(frame, cv2.COLOR_YUV2BGR_I420) width = int(THUMB_HEIGHT * color_frame.shape[1] / color_frame.shape[0]) self._frame = cv2.resize( @@ -171,7 +176,7 @@ class ActiveObjects: # get current categorization of objects to know if # these objects are currently being categorized - self.categorized_objects = { + self.categorized_objects: dict[str, list[dict[str, Any]]] = { "alerts": [], "detections": [], } @@ -309,7 +314,7 @@ class ReviewSegmentMaintainer(threading.Thread): "reviews", json.dumps(review_update), ) - self.review_publisher.publish(review_update, segment.camera) + self.review_publisher.publish(review_update, segment.camera) # type: ignore[arg-type] self.requestor.send_data( f"{segment.camera}/review_status", segment.severity.value.upper() ) @@ -318,7 +323,7 @@ class ReviewSegmentMaintainer(threading.Thread): self, segment: PendingReviewSegment, camera_config: CameraConfig, - frame, + frame: Optional[np.ndarray], objects: list[dict[str, Any]], prev_data: dict[str, Any], ) -> None: @@ -337,7 +342,7 @@ class ReviewSegmentMaintainer(threading.Thread): "reviews", json.dumps(review_update), ) - self.review_publisher.publish(review_update, segment.camera) + self.review_publisher.publish(review_update, segment.camera) # type: ignore[arg-type] self.requestor.send_data( f"{segment.camera}/review_status", segment.severity.value.upper() ) @@ -346,7 +351,7 @@ class ReviewSegmentMaintainer(threading.Thread): self, segment: PendingReviewSegment, prev_data: dict[str, Any], - ) -> float: + ) -> Any: """End segment.""" final_data = segment.get_data(ended=True) end_time = final_data[ReviewSegment.end_time.name] @@ -360,17 +365,18 @@ class ReviewSegmentMaintainer(threading.Thread): "reviews", json.dumps(review_update), ) - self.review_publisher.publish(review_update, segment.camera) + self.review_publisher.publish(review_update, segment.camera) # type: ignore[arg-type] self.requestor.send_data(f"{segment.camera}/review_status", "NONE") self.active_review_segments[segment.camera] = None return end_time - def forcibly_end_segment(self, camera: str) -> float: + def forcibly_end_segment(self, camera: str) -> Any: """Forcibly end the pending segment for a camera.""" segment = self.active_review_segments.get(camera) if segment: prev_data = segment.get_data(False) return self._publish_segment_end(segment, prev_data) + return None def update_existing_segment( self, @@ -492,8 +498,11 @@ class ReviewSegmentMaintainer(threading.Thread): except FileNotFoundError: return - if segment.severity == SeverityEnum.alert and frame_time > ( - segment.last_alert_time + camera_config.review.alerts.cutoff_time + if ( + segment.severity == SeverityEnum.alert + and segment.last_alert_time is not None + and frame_time + > (segment.last_alert_time + camera_config.review.alerts.cutoff_time) ): needs_new_detection = ( segment.last_detection_time > segment.last_alert_time @@ -516,23 +525,18 @@ class ReviewSegmentMaintainer(threading.Thread): new_zones.update(o["current_zones"]) if new_detections: - self.active_review_segments[activity.camera_config.name] = ( - PendingReviewSegment( - activity.camera_config.name, - end_time, - SeverityEnum.detection, - new_detections, - sub_labels={}, - audio=set(), - zones=list(new_zones), - ) + new_segment = PendingReviewSegment( + segment.camera, + end_time, + SeverityEnum.detection, + new_detections, + sub_labels={}, + audio=set(), + zones=list(new_zones), ) - self._publish_segment_start( - self.active_review_segments[activity.camera_config.name] - ) - self.active_review_segments[ - activity.camera_config.name - ].last_detection_time = last_detection_time + self.active_review_segments[segment.camera] = new_segment + self._publish_segment_start(new_segment) + new_segment.last_detection_time = last_detection_time elif segment.severity == SeverityEnum.detection and frame_time > ( segment.last_detection_time + camera_config.review.detections.cutoff_time @@ -581,7 +585,7 @@ class ReviewSegmentMaintainer(threading.Thread): zones.append(zone) if severity: - self.active_review_segments[camera] = PendingReviewSegment( + new_segment = PendingReviewSegment( camera, frame_time, severity, @@ -590,6 +594,7 @@ class ReviewSegmentMaintainer(threading.Thread): audio=set(), zones=zones, ) + self.active_review_segments[camera] = new_segment try: yuv_frame = self.frame_manager.get( @@ -600,11 +605,11 @@ class ReviewSegmentMaintainer(threading.Thread): logger.debug(f"Failed to get frame {frame_name} from SHM") return - self.active_review_segments[camera].update_frame( + new_segment.update_frame( camera_config, yuv_frame, activity.get_all_objects() ) self.frame_manager.close(frame_name) - self._publish_segment_start(self.active_review_segments[camera]) + self._publish_segment_start(new_segment) except FileNotFoundError: return @@ -621,9 +626,14 @@ class ReviewSegmentMaintainer(threading.Thread): for camera in updated_topics["enabled"]: self.forcibly_end_segment(camera) - (topic, data) = self.detection_subscriber.check_for_update(timeout=1) + result = self.detection_subscriber.check_for_update(timeout=1) - if not topic: + if not result: + continue + + topic, data = result + + if not topic or not data: continue if topic == DetectionTypeEnum.video.value: @@ -712,10 +722,13 @@ class ReviewSegmentMaintainer(threading.Thread): if topic == DetectionTypeEnum.api: # manual_info["label"] contains 'label: sub_label' # so split out the label without modifying manual_info + det_labels = self.config.cameras[ + camera + ].review.detections.labels if ( self.config.cameras[camera].review.detections.enabled - and manual_info["label"].split(": ")[0] - in self.config.cameras[camera].review.detections.labels + and det_labels is not None + and manual_info["label"].split(": ")[0] in det_labels ): current_segment.last_detection_time = manual_info[ "end_time" @@ -744,14 +757,15 @@ class ReviewSegmentMaintainer(threading.Thread): ): # manual_info["label"] contains 'label: sub_label' # so split out the label without modifying manual_info + det_labels = self.config.cameras[ + camera + ].review.detections.labels if ( not self.config.cameras[ camera ].review.detections.enabled - or manual_info["label"].split(": ")[0] - not in self.config.cameras[ - camera - ].review.detections.labels + or det_labels is None + or manual_info["label"].split(": ")[0] not in det_labels ): current_segment.severity = SeverityEnum.alert elif ( @@ -828,17 +842,18 @@ class ReviewSegmentMaintainer(threading.Thread): severity = None # manual_info["label"] contains 'label: sub_label' # so split out the label without modifying manual_info + det_labels = self.config.cameras[camera].review.detections.labels if ( self.config.cameras[camera].review.detections.enabled - and manual_info["label"].split(": ")[0] - in self.config.cameras[camera].review.detections.labels + and det_labels is not None + and manual_info["label"].split(": ")[0] in det_labels ): severity = SeverityEnum.detection elif self.config.cameras[camera].review.alerts.enabled: severity = SeverityEnum.alert if severity: - self.active_review_segments[camera] = PendingReviewSegment( + api_segment = PendingReviewSegment( camera, frame_time, severity, @@ -847,32 +862,25 @@ class ReviewSegmentMaintainer(threading.Thread): [], set(), ) + self.active_review_segments[camera] = api_segment if manual_info["state"] == ManualEventState.start: self.indefinite_events[camera][manual_info["event_id"]] = ( manual_info["label"] ) # temporarily make it so this event can not end - self.active_review_segments[ - camera - ].last_alert_time = sys.maxsize - self.active_review_segments[ - camera - ].last_detection_time = sys.maxsize + api_segment.last_alert_time = sys.maxsize + api_segment.last_detection_time = sys.maxsize elif manual_info["state"] == ManualEventState.complete: - self.active_review_segments[ - camera - ].last_alert_time = manual_info["end_time"] - self.active_review_segments[ - camera - ].last_detection_time = manual_info["end_time"] + api_segment.last_alert_time = manual_info["end_time"] + api_segment.last_detection_time = manual_info["end_time"] else: logger.warning( f"Manual event API has been called for {camera}, but alerts and detections are disabled. This manual event will not appear as an alert or detection." ) elif topic == DetectionTypeEnum.lpr: if self.config.cameras[camera].review.detections.enabled: - self.active_review_segments[camera] = PendingReviewSegment( + lpr_segment = PendingReviewSegment( camera, frame_time, SeverityEnum.detection, @@ -881,25 +889,18 @@ class ReviewSegmentMaintainer(threading.Thread): [], set(), ) + self.active_review_segments[camera] = lpr_segment if manual_info["state"] == ManualEventState.start: self.indefinite_events[camera][manual_info["event_id"]] = ( manual_info["label"] ) # temporarily make it so this event can not end - self.active_review_segments[ - camera - ].last_alert_time = sys.maxsize - self.active_review_segments[ - camera - ].last_detection_time = sys.maxsize + lpr_segment.last_alert_time = sys.maxsize + lpr_segment.last_detection_time = sys.maxsize elif manual_info["state"] == ManualEventState.complete: - self.active_review_segments[ - camera - ].last_alert_time = manual_info["end_time"] - self.active_review_segments[ - camera - ].last_detection_time = manual_info["end_time"] + lpr_segment.last_alert_time = manual_info["end_time"] + lpr_segment.last_detection_time = manual_info["end_time"] else: logger.warning( f"Dedicated LPR camera API has been called for {camera}, but detections are disabled. LPR events will not appear as a detection."