diff --git a/frigate/detectors/plugins/edgetpu_tfl.py b/frigate/detectors/plugins/edgetpu_tfl.py index 3176df0e4..aac9f4e89 100644 --- a/frigate/detectors/plugins/edgetpu_tfl.py +++ b/frigate/detectors/plugins/edgetpu_tfl.py @@ -75,76 +75,80 @@ class EdgeTpuTfl(DetectionApi): self.min_score = 0.4 self.max_detections = 20 - model_type = detector_config.model.model_type + self.model_type = detector_config.model.model_type self.model_requires_int8 = self.tensor_input_details[0]["dtype"] == np.int8 - if model_type == ModelTypeEnum.yologeneric - logger.debug( - f"Using YOLO postprocessing for {len(self.tensor_output_details)}-tensor output" - ) - if len(self.tensor_output_details) > 1: # expecting 2 or 3 - self.reg_max = 16 # = 64 dfl_channels // 4 # YOLO standard - self.min_logit_value = np.log( - self.min_score / (1 - self.min_score) - ) # for filtering - self._generate_anchors_and_strides() # decode bounding box DFL - self.project = np.arange( - self.reg_max, dtype=np.float32 - ) # for decoding bounding box DFL information + if self.model_type == ModelTypeEnum.yologeneric: + logger.debug("Using YOLO preprocessing/postprocessing") - # Determine YOLO tensor indices and quantization scales for - # boxes and class_scores the tensor ordering and names are - # not reliable, so use tensor shape to detect which tensor - # holds boxes or class scores. - # The tensors have shapes (B, N, C) - # where N is the number of candidates (=2100 for 320x320) - # this may guess wrong if the number of classes is exactly 64 - output_boxes_index = None - output_classes_index = None - for i, x in enumerate(self.tensor_output_details): - # the nominal index seems to start at 1 instead of 0 - if len(x["shape"]) == 3 and x["shape"][2] == 64: - output_boxes_index = i - elif len(x["shape"]) == 3 and x["shape"][2] > 1: - # require the number of classes to be more than 1 - # to differentiate from (not used) max score tensor - output_classes_index = i - if output_boxes_index is None or output_classes_index is None: - logger.warning( - "Unrecognized model output, unexpected tensor shapes." - ) - output_classes_index = ( - 0 - if (output_boxes_index is None or output_classes_index == 1) - else 1 - ) # 0 is default guess - output_boxes_index = 1 if (output_boxes_index == 0) else 0 - - scores_details = self.tensor_output_details[output_classes_index] - classes_count = scores_details["shape"][2] - self.scores_tensor_index = scores_details["index"] - self.scores_scale, self.scores_zero_point = scores_details[ - "quantization" - ] - # calculate the quantized version of the min_score - self.min_score_quantized = int( - (self.min_logit_value / self.scores_scale) + self.scores_zero_point + if len(self.tensor_output_details) not in [2,3]: + logger.error( + f"Invalid count of output tensors in YOLO model. Found {len(self.tensor_output_details)}, expecting 2 or 3." ) - self.logit_shift_to_positive_values = ( - max( - 0, math.ceil((128 + self.scores_zero_point) * self.scores_scale) - ) - + 1 - ) # round up + raise - boxes_details = self.tensor_output_details[output_boxes_index] - self.boxes_tensor_index = boxes_details["index"] - self.boxes_scale, self.boxes_zero_point = boxes_details["quantization"] + self.reg_max = 16 # = 64 dfl_channels // 4 # YOLO standard + self.min_logit_value = np.log( + self.min_score / (1 - self.min_score) + ) # for filtering + self._generate_anchors_and_strides() # decode bounding box DFL + self.project = np.arange( + self.reg_max, dtype=np.float32 + ) # for decoding bounding box DFL information + + # Determine YOLO tensor indices and quantization scales for + # boxes and class_scores the tensor ordering and names are + # not reliable, so use tensor shape to detect which tensor + # holds boxes or class scores. + # The tensors have shapes (B, N, C) + # where N is the number of candidates (=2100 for 320x320) + # this may guess wrong if the number of classes is exactly 64 + output_boxes_index = None + output_classes_index = None + for i, x in enumerate(self.tensor_output_details): + # the nominal index seems to start at 1 instead of 0 + if len(x["shape"]) == 3 and x["shape"][2] == 64: + output_boxes_index = i + elif len(x["shape"]) == 3 and x["shape"][2] > 1: + # require the number of classes to be more than 1 + # to differentiate from (not used) max score tensor + output_classes_index = i + if output_boxes_index is None or output_classes_index is None: + logger.warning( + "Unrecognized model output, unexpected tensor shapes." + ) + output_classes_index = ( + 0 + if (output_boxes_index is None or output_classes_index == 1) + else 1 + ) # 0 is default guess + output_boxes_index = 1 if (output_boxes_index == 0) else 0 + + scores_details = self.tensor_output_details[output_classes_index] + classes_count = scores_details["shape"][2] + self.scores_tensor_index = scores_details["index"] + self.scores_scale, self.scores_zero_point = scores_details[ + "quantization" + ] + # calculate the quantized version of the min_score + self.min_score_quantized = int( + (self.min_logit_value / self.scores_scale) + self.scores_zero_point + ) + self.logit_shift_to_positive_values = ( + max( + 0, math.ceil((128 + self.scores_zero_point) * self.scores_scale) + ) + + 1 + ) # round up + + boxes_details = self.tensor_output_details[output_boxes_index] + self.boxes_tensor_index = boxes_details["index"] + self.boxes_scale, self.boxes_zero_point = boxes_details["quantization"] else: - if model_type not in [ModelTypeEnum.ssd, None]: + if self.model_type not in [ModelTypeEnum.ssd, None]: logger.warning( - f"Unsupported model_type '{model_type}' for EdgeTPU detector, falling back to SSD" + f"Unsupported model_type '{self.model_type}' for EdgeTPU detector, falling back to SSD" ) logger.debug("Using SSD preprocessing/postprocessing") @@ -202,155 +206,133 @@ class EdgeTpuTfl(DetectionApi): else: self.output_scores_index = index - def detect_raw(self, tensor_input): + def pre_process(self, tensor_input): if self.model_requires_int8: tensor_input = np.bitwise_xor(tensor_input, 128).view( np.int8 ) # shift by -128 + return tensor_input + + def detect_raw(self, tensor_input): + tensor_input = self.pre_process(tensor_input) + self.interpreter.set_tensor(self.tensor_input_details[0]["index"], tensor_input) self.interpreter.invoke() - if model_type == ModelTypeEnum.yologeneric - output_tensor_count = len(self.tensor_output_details) - if output_tensor_count == 1: - # Single-tensor YOLO model - # model output is (1, NC+4, 2100) for 320x320 image size - # boxes as xywh (normalized to [0,1]) - # followed by NC class probabilities (also [0,1]) - # BEWARE the tensor has only one quantization scale/zero_point, - # so it should be assembled carefully to have a range of [0,1] - outputs = [] - for output in self.tensor_output_details: - x = self.interpreter.get_tensor(output["index"]) - scale, zero_point = output["quantization"] - x = (x.astype(np.float32) - zero_point) * scale - # Denormalize xywh by image size - x[:, [0, 2]] *= self.model_width - x[:, [1, 3]] *= self.model_height - outputs.append(x) + if self.model_type == ModelTypeEnum.yologeneric: + # Multi-tensor YOLO model with (non-standard B(H*W)C output format). + # (the comments indicate the shape of tensors, + # using "2100" as the anchor count (for image size of 320x320), + # "NC" as number of classes, + # "N" as the count that survive after min-score filtering) + # TENSOR A) class scores (1, 2100, NC) with logit values + # TENSOR B) box coordinates (1, 2100, 64) encoded as dfl scores + # Recommend that the model clamp the logit values in tensor (A) + # to the range [-4,+4] to preserve precision from [2%,98%] + # and because NMS requires the min_score parameter to be >= 0 - return post_process_yolo(outputs, self.model_width, self.model_height) + # don't dequantize scores data yet, wait until the low-confidence + # candidates are filtered out from the overall result set. + # This reduces the work and makes post-processing faster. + # this method works with raw quantized numbers when possible, + # which relies on the value of the scale factor to be >0. + # This speeds up max and argmax operations. + # Get max confidence for each detection and create the mask + detections = np.zeros( + (self.max_detections, 6), np.float32 + ) # initialize zero results + scores_output_quantized = self.interpreter.get_tensor( + self.scores_tensor_index + )[0] # (2100, NC) + max_scores_quantized = np.max( + scores_output_quantized, axis=1 + ) # (2100,) + mask = max_scores_quantized >= self.min_score_quantized # (2100,) - elif output_tensor_count in [2,3]: - # Multi-tensor YOLO model with (non-standard B(H*W)C output format). - # (the comments indicate the shape of tensors, - # using "2100" as the anchor count (for image size of 320x320), - # "NC" as number of classes, - # "N" as the count that survive after min-score filtering) - # TENSOR A) class scores (1, 2100, NC) with logit values - # TENSOR B) box coordinates (1, 2100, 64) encoded as dfl scores - # Recommend that the model clamp the logit values in tensor (A) - # to the range [-4,+4] to preserve precision from [2%,98%] - # and because NMS requires the min_score parameter to be >= 0 + if not np.any(mask): + return detections # empty results - # don't dequantize scores data yet, wait until the low-confidence - # candidates are filtered out from the overall result set. - # This reduces the work and makes post-processing faster. - # this method works with raw quantized numbers when possible, - # which relies on the value of the scale factor to be >0. - # This speeds up max and argmax operations. - # Get max confidence for each detection and create the mask - detections = np.zeros( - (self.max_detections, 6), np.float32 - ) # initialize zero results - scores_output_quantized = self.interpreter.get_tensor( - self.scores_tensor_index - )[0] # (2100, NC) - max_scores_quantized = np.max( - scores_output_quantized, axis=1 - ) # (2100,) - mask = max_scores_quantized >= self.min_score_quantized # (2100,) + max_scores_filtered_shiftedpositive = ( + (max_scores_quantized[mask] - self.scores_zero_point) + * self.scores_scale + ) + self.logit_shift_to_positive_values # (N,1) shifted logit values + scores_output_quantized_filtered = scores_output_quantized[mask] - if not np.any(mask): - return detections # empty results + # dequantize boxes. NMS needs them to be in float format + # remove candidates with probabilities < threshold + boxes_output_quantized_filtered = ( + self.interpreter.get_tensor(self.boxes_tensor_index)[0] + )[mask] # (N, 64) + boxes_output_filtered = ( + boxes_output_quantized_filtered.astype(np.float32) + - self.boxes_zero_point + ) * self.boxes_scale - max_scores_filtered_shiftedpositive = ( - (max_scores_quantized[mask] - self.scores_zero_point) - * self.scores_scale - ) + self.logit_shift_to_positive_values # (N,1) shifted logit values - scores_output_quantized_filtered = scores_output_quantized[mask] + # 2. Decode DFL to distances (ltrb) + dfl_distributions = boxes_output_filtered.reshape( + -1, 4, self.reg_max + ) # (N, 4, 16) - # dequantize boxes. NMS needs them to be in float format - # remove candidates with probabilities < threshold - boxes_output_quantized_filtered = ( - self.interpreter.get_tensor(self.boxes_tensor_index)[0] - )[mask] # (N, 64) - boxes_output_filtered = ( - boxes_output_quantized_filtered.astype(np.float32) - - self.boxes_zero_point - ) * self.boxes_scale + # Softmax over the 16 bins + dfl_max = np.max(dfl_distributions, axis=2, keepdims=True) + dfl_exp = np.exp(dfl_distributions - dfl_max) + dfl_probs = dfl_exp / np.sum( + dfl_exp, axis=2, keepdims=True + ) # (N, 4, 16) - # 2. Decode DFL to distances (ltrb) - dfl_distributions = boxes_output_filtered.reshape( - -1, 4, self.reg_max - ) # (N, 4, 16) + # Weighted sum: (N, 4, 16) * (16,) -> (N, 4) + distances = np.einsum("pcr,r->pc", dfl_probs, self.project) - # Softmax over the 16 bins - dfl_max = np.max(dfl_distributions, axis=2, keepdims=True) - dfl_exp = np.exp(dfl_distributions - dfl_max) - dfl_probs = dfl_exp / np.sum( - dfl_exp, axis=2, keepdims=True - ) # (N, 4, 16) + # Calculate box corners in pixel coordinates + anchors_filtered = self.anchors[mask] + anchor_strides_filtered = self.anchor_strides[mask] + x1y1 = ( + anchors_filtered - distances[:, [0, 1]] + ) * anchor_strides_filtered # (N, 2) + x2y2 = ( + anchors_filtered + distances[:, [2, 3]] + ) * anchor_strides_filtered # (N, 2) + boxes_filtered_decoded = np.concatenate((x1y1, x2y2), axis=-1) # (N, 4) - # Weighted sum: (N, 4, 16) * (16,) -> (N, 4) - distances = np.einsum("pcr,r->pc", dfl_probs, self.project) + # 9. Apply NMS. Use logit scores here to defer sigmoid() + # until after filtering out redundant boxes + # Shift the logit scores to be non-negative (required by cv2) + indices = cv2.dnn.NMSBoxes( + bboxes=boxes_filtered_decoded, + scores=max_scores_filtered_shiftedpositive, + score_threshold=( + self.min_logit_value + self.logit_shift_to_positive_values + ), + nms_threshold=0.4, # should this be a model config setting? + ) + num_detections = len(indices) + if num_detections == 0: + return detections # empty results - # Calculate box corners in pixel coordinates - anchors_filtered = self.anchors[mask] - anchor_strides_filtered = self.anchor_strides[mask] - x1y1 = ( - anchors_filtered - distances[:, [0, 1]] - ) * anchor_strides_filtered # (N, 2) - x2y2 = ( - anchors_filtered + distances[:, [2, 3]] - ) * anchor_strides_filtered # (N, 2) - boxes_filtered_decoded = np.concatenate((x1y1, x2y2), axis=-1) # (N, 4) + nms_indices = np.array(indices, dtype=np.int32).ravel() # or .flatten() + if num_detections > self.max_detections: + nms_indices = nms_indices[: self.max_detections] + num_detections = self.max_detections + kept_logits_quantized = scores_output_quantized_filtered[nms_indices] + class_ids_post_nms = np.argmax(kept_logits_quantized, axis=1) - # 9. Apply NMS. Use logit scores here to defer sigmoid() - # until after filtering out redundant boxes - # Shift the logit scores to be non-negative (required by cv2) - indices = cv2.dnn.NMSBoxes( - bboxes=boxes_filtered_decoded, - scores=max_scores_filtered_shiftedpositive, - score_threshold=( - self.min_logit_value + self.logit_shift_to_positive_values - ), - nms_threshold=0.4, # should this be a model config setting? - ) - num_detections = len(indices) - if num_detections == 0: - return detections # empty results + # Extract the final boxes and scores using fancy indexing + final_boxes = boxes_filtered_decoded[nms_indices] + final_scores_logits = ( + max_scores_filtered_shiftedpositive[nms_indices] + - self.logit_shift_to_positive_values + ) # Unshifted logits - nms_indices = np.array(indices, dtype=np.int32).ravel() # or .flatten() - if num_detections > self.max_detections: - nms_indices = nms_indices[: self.max_detections] - num_detections = self.max_detections - kept_logits_quantized = scores_output_quantized_filtered[nms_indices] - class_ids_post_nms = np.argmax(kept_logits_quantized, axis=1) - - # Extract the final boxes and scores using fancy indexing - final_boxes = boxes_filtered_decoded[nms_indices] - final_scores_logits = ( - max_scores_filtered_shiftedpositive[nms_indices] - - self.logit_shift_to_positive_values - ) # Unshifted logits - - # Detections array format: [class_id, score, ymin, xmin, ymax, xmax] - detections[:num_detections, 0] = class_ids_post_nms - detections[:num_detections, 1] = 1.0 / ( - 1.0 + np.exp(-final_scores_logits) - ) # sigmoid - detections[:num_detections, 2] = final_boxes[:, 1] / self.model_height - detections[:num_detections, 3] = final_boxes[:, 0] / self.model_width - detections[:num_detections, 4] = final_boxes[:, 3] / self.model_height - detections[:num_detections, 5] = final_boxes[:, 2] / self.model_width - return detections - - else: - logger.error( - f"Invalid count of output tensors in YOLO model. Found {output_tensor_count}, expecting 1/2/3." - ) - raise + # Detections array format: [class_id, score, ymin, xmin, ymax, xmax] + detections[:num_detections, 0] = class_ids_post_nms + detections[:num_detections, 1] = 1.0 / ( + 1.0 + np.exp(-final_scores_logits) + ) # sigmoid + detections[:num_detections, 2] = final_boxes[:, 1] / self.model_height + detections[:num_detections, 3] = final_boxes[:, 0] / self.model_width + detections[:num_detections, 4] = final_boxes[:, 3] / self.model_height + detections[:num_detections, 5] = final_boxes[:, 2] / self.model_width + return detections else: # Default SSD model