remove 1-tensor processing. add pre_process() function

This commit is contained in:
Dan Brown 2025-12-02 16:01:58 +01:00
parent 1664b2f3bb
commit 6e288839be

View File

@ -75,76 +75,80 @@ class EdgeTpuTfl(DetectionApi):
self.min_score = 0.4 self.min_score = 0.4
self.max_detections = 20 self.max_detections = 20
model_type = detector_config.model.model_type self.model_type = detector_config.model.model_type
self.model_requires_int8 = self.tensor_input_details[0]["dtype"] == np.int8 self.model_requires_int8 = self.tensor_input_details[0]["dtype"] == np.int8
if model_type == ModelTypeEnum.yologeneric if self.model_type == ModelTypeEnum.yologeneric:
logger.debug( logger.debug("Using YOLO preprocessing/postprocessing")
f"Using YOLO postprocessing for {len(self.tensor_output_details)}-tensor output"
)
if len(self.tensor_output_details) > 1: # expecting 2 or 3
self.reg_max = 16 # = 64 dfl_channels // 4 # YOLO standard
self.min_logit_value = np.log(
self.min_score / (1 - self.min_score)
) # for filtering
self._generate_anchors_and_strides() # decode bounding box DFL
self.project = np.arange(
self.reg_max, dtype=np.float32
) # for decoding bounding box DFL information
# Determine YOLO tensor indices and quantization scales for if len(self.tensor_output_details) not in [2,3]:
# boxes and class_scores the tensor ordering and names are logger.error(
# not reliable, so use tensor shape to detect which tensor f"Invalid count of output tensors in YOLO model. Found {len(self.tensor_output_details)}, expecting 2 or 3."
# holds boxes or class scores.
# The tensors have shapes (B, N, C)
# where N is the number of candidates (=2100 for 320x320)
# this may guess wrong if the number of classes is exactly 64
output_boxes_index = None
output_classes_index = None
for i, x in enumerate(self.tensor_output_details):
# the nominal index seems to start at 1 instead of 0
if len(x["shape"]) == 3 and x["shape"][2] == 64:
output_boxes_index = i
elif len(x["shape"]) == 3 and x["shape"][2] > 1:
# require the number of classes to be more than 1
# to differentiate from (not used) max score tensor
output_classes_index = i
if output_boxes_index is None or output_classes_index is None:
logger.warning(
"Unrecognized model output, unexpected tensor shapes."
)
output_classes_index = (
0
if (output_boxes_index is None or output_classes_index == 1)
else 1
) # 0 is default guess
output_boxes_index = 1 if (output_boxes_index == 0) else 0
scores_details = self.tensor_output_details[output_classes_index]
classes_count = scores_details["shape"][2]
self.scores_tensor_index = scores_details["index"]
self.scores_scale, self.scores_zero_point = scores_details[
"quantization"
]
# calculate the quantized version of the min_score
self.min_score_quantized = int(
(self.min_logit_value / self.scores_scale) + self.scores_zero_point
) )
self.logit_shift_to_positive_values = ( raise
max(
0, math.ceil((128 + self.scores_zero_point) * self.scores_scale)
)
+ 1
) # round up
boxes_details = self.tensor_output_details[output_boxes_index] self.reg_max = 16 # = 64 dfl_channels // 4 # YOLO standard
self.boxes_tensor_index = boxes_details["index"] self.min_logit_value = np.log(
self.boxes_scale, self.boxes_zero_point = boxes_details["quantization"] self.min_score / (1 - self.min_score)
) # for filtering
self._generate_anchors_and_strides() # decode bounding box DFL
self.project = np.arange(
self.reg_max, dtype=np.float32
) # for decoding bounding box DFL information
# Determine YOLO tensor indices and quantization scales for
# boxes and class_scores the tensor ordering and names are
# not reliable, so use tensor shape to detect which tensor
# holds boxes or class scores.
# The tensors have shapes (B, N, C)
# where N is the number of candidates (=2100 for 320x320)
# this may guess wrong if the number of classes is exactly 64
output_boxes_index = None
output_classes_index = None
for i, x in enumerate(self.tensor_output_details):
# the nominal index seems to start at 1 instead of 0
if len(x["shape"]) == 3 and x["shape"][2] == 64:
output_boxes_index = i
elif len(x["shape"]) == 3 and x["shape"][2] > 1:
# require the number of classes to be more than 1
# to differentiate from (not used) max score tensor
output_classes_index = i
if output_boxes_index is None or output_classes_index is None:
logger.warning(
"Unrecognized model output, unexpected tensor shapes."
)
output_classes_index = (
0
if (output_boxes_index is None or output_classes_index == 1)
else 1
) # 0 is default guess
output_boxes_index = 1 if (output_boxes_index == 0) else 0
scores_details = self.tensor_output_details[output_classes_index]
classes_count = scores_details["shape"][2]
self.scores_tensor_index = scores_details["index"]
self.scores_scale, self.scores_zero_point = scores_details[
"quantization"
]
# calculate the quantized version of the min_score
self.min_score_quantized = int(
(self.min_logit_value / self.scores_scale) + self.scores_zero_point
)
self.logit_shift_to_positive_values = (
max(
0, math.ceil((128 + self.scores_zero_point) * self.scores_scale)
)
+ 1
) # round up
boxes_details = self.tensor_output_details[output_boxes_index]
self.boxes_tensor_index = boxes_details["index"]
self.boxes_scale, self.boxes_zero_point = boxes_details["quantization"]
else: else:
if model_type not in [ModelTypeEnum.ssd, None]: if self.model_type not in [ModelTypeEnum.ssd, None]:
logger.warning( logger.warning(
f"Unsupported model_type '{model_type}' for EdgeTPU detector, falling back to SSD" f"Unsupported model_type '{self.model_type}' for EdgeTPU detector, falling back to SSD"
) )
logger.debug("Using SSD preprocessing/postprocessing") logger.debug("Using SSD preprocessing/postprocessing")
@ -202,155 +206,133 @@ class EdgeTpuTfl(DetectionApi):
else: else:
self.output_scores_index = index self.output_scores_index = index
def detect_raw(self, tensor_input): def pre_process(self, tensor_input):
if self.model_requires_int8: if self.model_requires_int8:
tensor_input = np.bitwise_xor(tensor_input, 128).view( tensor_input = np.bitwise_xor(tensor_input, 128).view(
np.int8 np.int8
) # shift by -128 ) # shift by -128
return tensor_input
def detect_raw(self, tensor_input):
tensor_input = self.pre_process(tensor_input)
self.interpreter.set_tensor(self.tensor_input_details[0]["index"], tensor_input) self.interpreter.set_tensor(self.tensor_input_details[0]["index"], tensor_input)
self.interpreter.invoke() self.interpreter.invoke()
if model_type == ModelTypeEnum.yologeneric if self.model_type == ModelTypeEnum.yologeneric:
output_tensor_count = len(self.tensor_output_details) # Multi-tensor YOLO model with (non-standard B(H*W)C output format).
if output_tensor_count == 1: # (the comments indicate the shape of tensors,
# Single-tensor YOLO model # using "2100" as the anchor count (for image size of 320x320),
# model output is (1, NC+4, 2100) for 320x320 image size # "NC" as number of classes,
# boxes as xywh (normalized to [0,1]) # "N" as the count that survive after min-score filtering)
# followed by NC class probabilities (also [0,1]) # TENSOR A) class scores (1, 2100, NC) with logit values
# BEWARE the tensor has only one quantization scale/zero_point, # TENSOR B) box coordinates (1, 2100, 64) encoded as dfl scores
# so it should be assembled carefully to have a range of [0,1] # Recommend that the model clamp the logit values in tensor (A)
outputs = [] # to the range [-4,+4] to preserve precision from [2%,98%]
for output in self.tensor_output_details: # and because NMS requires the min_score parameter to be >= 0
x = self.interpreter.get_tensor(output["index"])
scale, zero_point = output["quantization"]
x = (x.astype(np.float32) - zero_point) * scale
# Denormalize xywh by image size
x[:, [0, 2]] *= self.model_width
x[:, [1, 3]] *= self.model_height
outputs.append(x)
return post_process_yolo(outputs, self.model_width, self.model_height) # don't dequantize scores data yet, wait until the low-confidence
# candidates are filtered out from the overall result set.
# This reduces the work and makes post-processing faster.
# this method works with raw quantized numbers when possible,
# which relies on the value of the scale factor to be >0.
# This speeds up max and argmax operations.
# Get max confidence for each detection and create the mask
detections = np.zeros(
(self.max_detections, 6), np.float32
) # initialize zero results
scores_output_quantized = self.interpreter.get_tensor(
self.scores_tensor_index
)[0] # (2100, NC)
max_scores_quantized = np.max(
scores_output_quantized, axis=1
) # (2100,)
mask = max_scores_quantized >= self.min_score_quantized # (2100,)
elif output_tensor_count in [2,3]: if not np.any(mask):
# Multi-tensor YOLO model with (non-standard B(H*W)C output format). return detections # empty results
# (the comments indicate the shape of tensors,
# using "2100" as the anchor count (for image size of 320x320),
# "NC" as number of classes,
# "N" as the count that survive after min-score filtering)
# TENSOR A) class scores (1, 2100, NC) with logit values
# TENSOR B) box coordinates (1, 2100, 64) encoded as dfl scores
# Recommend that the model clamp the logit values in tensor (A)
# to the range [-4,+4] to preserve precision from [2%,98%]
# and because NMS requires the min_score parameter to be >= 0
# don't dequantize scores data yet, wait until the low-confidence max_scores_filtered_shiftedpositive = (
# candidates are filtered out from the overall result set. (max_scores_quantized[mask] - self.scores_zero_point)
# This reduces the work and makes post-processing faster. * self.scores_scale
# this method works with raw quantized numbers when possible, ) + self.logit_shift_to_positive_values # (N,1) shifted logit values
# which relies on the value of the scale factor to be >0. scores_output_quantized_filtered = scores_output_quantized[mask]
# This speeds up max and argmax operations.
# Get max confidence for each detection and create the mask
detections = np.zeros(
(self.max_detections, 6), np.float32
) # initialize zero results
scores_output_quantized = self.interpreter.get_tensor(
self.scores_tensor_index
)[0] # (2100, NC)
max_scores_quantized = np.max(
scores_output_quantized, axis=1
) # (2100,)
mask = max_scores_quantized >= self.min_score_quantized # (2100,)
if not np.any(mask): # dequantize boxes. NMS needs them to be in float format
return detections # empty results # remove candidates with probabilities < threshold
boxes_output_quantized_filtered = (
self.interpreter.get_tensor(self.boxes_tensor_index)[0]
)[mask] # (N, 64)
boxes_output_filtered = (
boxes_output_quantized_filtered.astype(np.float32)
- self.boxes_zero_point
) * self.boxes_scale
max_scores_filtered_shiftedpositive = ( # 2. Decode DFL to distances (ltrb)
(max_scores_quantized[mask] - self.scores_zero_point) dfl_distributions = boxes_output_filtered.reshape(
* self.scores_scale -1, 4, self.reg_max
) + self.logit_shift_to_positive_values # (N,1) shifted logit values ) # (N, 4, 16)
scores_output_quantized_filtered = scores_output_quantized[mask]
# dequantize boxes. NMS needs them to be in float format # Softmax over the 16 bins
# remove candidates with probabilities < threshold dfl_max = np.max(dfl_distributions, axis=2, keepdims=True)
boxes_output_quantized_filtered = ( dfl_exp = np.exp(dfl_distributions - dfl_max)
self.interpreter.get_tensor(self.boxes_tensor_index)[0] dfl_probs = dfl_exp / np.sum(
)[mask] # (N, 64) dfl_exp, axis=2, keepdims=True
boxes_output_filtered = ( ) # (N, 4, 16)
boxes_output_quantized_filtered.astype(np.float32)
- self.boxes_zero_point
) * self.boxes_scale
# 2. Decode DFL to distances (ltrb) # Weighted sum: (N, 4, 16) * (16,) -> (N, 4)
dfl_distributions = boxes_output_filtered.reshape( distances = np.einsum("pcr,r->pc", dfl_probs, self.project)
-1, 4, self.reg_max
) # (N, 4, 16)
# Softmax over the 16 bins # Calculate box corners in pixel coordinates
dfl_max = np.max(dfl_distributions, axis=2, keepdims=True) anchors_filtered = self.anchors[mask]
dfl_exp = np.exp(dfl_distributions - dfl_max) anchor_strides_filtered = self.anchor_strides[mask]
dfl_probs = dfl_exp / np.sum( x1y1 = (
dfl_exp, axis=2, keepdims=True anchors_filtered - distances[:, [0, 1]]
) # (N, 4, 16) ) * anchor_strides_filtered # (N, 2)
x2y2 = (
anchors_filtered + distances[:, [2, 3]]
) * anchor_strides_filtered # (N, 2)
boxes_filtered_decoded = np.concatenate((x1y1, x2y2), axis=-1) # (N, 4)
# Weighted sum: (N, 4, 16) * (16,) -> (N, 4) # 9. Apply NMS. Use logit scores here to defer sigmoid()
distances = np.einsum("pcr,r->pc", dfl_probs, self.project) # until after filtering out redundant boxes
# Shift the logit scores to be non-negative (required by cv2)
indices = cv2.dnn.NMSBoxes(
bboxes=boxes_filtered_decoded,
scores=max_scores_filtered_shiftedpositive,
score_threshold=(
self.min_logit_value + self.logit_shift_to_positive_values
),
nms_threshold=0.4, # should this be a model config setting?
)
num_detections = len(indices)
if num_detections == 0:
return detections # empty results
# Calculate box corners in pixel coordinates nms_indices = np.array(indices, dtype=np.int32).ravel() # or .flatten()
anchors_filtered = self.anchors[mask] if num_detections > self.max_detections:
anchor_strides_filtered = self.anchor_strides[mask] nms_indices = nms_indices[: self.max_detections]
x1y1 = ( num_detections = self.max_detections
anchors_filtered - distances[:, [0, 1]] kept_logits_quantized = scores_output_quantized_filtered[nms_indices]
) * anchor_strides_filtered # (N, 2) class_ids_post_nms = np.argmax(kept_logits_quantized, axis=1)
x2y2 = (
anchors_filtered + distances[:, [2, 3]]
) * anchor_strides_filtered # (N, 2)
boxes_filtered_decoded = np.concatenate((x1y1, x2y2), axis=-1) # (N, 4)
# 9. Apply NMS. Use logit scores here to defer sigmoid() # Extract the final boxes and scores using fancy indexing
# until after filtering out redundant boxes final_boxes = boxes_filtered_decoded[nms_indices]
# Shift the logit scores to be non-negative (required by cv2) final_scores_logits = (
indices = cv2.dnn.NMSBoxes( max_scores_filtered_shiftedpositive[nms_indices]
bboxes=boxes_filtered_decoded, - self.logit_shift_to_positive_values
scores=max_scores_filtered_shiftedpositive, ) # Unshifted logits
score_threshold=(
self.min_logit_value + self.logit_shift_to_positive_values
),
nms_threshold=0.4, # should this be a model config setting?
)
num_detections = len(indices)
if num_detections == 0:
return detections # empty results
nms_indices = np.array(indices, dtype=np.int32).ravel() # or .flatten() # Detections array format: [class_id, score, ymin, xmin, ymax, xmax]
if num_detections > self.max_detections: detections[:num_detections, 0] = class_ids_post_nms
nms_indices = nms_indices[: self.max_detections] detections[:num_detections, 1] = 1.0 / (
num_detections = self.max_detections 1.0 + np.exp(-final_scores_logits)
kept_logits_quantized = scores_output_quantized_filtered[nms_indices] ) # sigmoid
class_ids_post_nms = np.argmax(kept_logits_quantized, axis=1) detections[:num_detections, 2] = final_boxes[:, 1] / self.model_height
detections[:num_detections, 3] = final_boxes[:, 0] / self.model_width
# Extract the final boxes and scores using fancy indexing detections[:num_detections, 4] = final_boxes[:, 3] / self.model_height
final_boxes = boxes_filtered_decoded[nms_indices] detections[:num_detections, 5] = final_boxes[:, 2] / self.model_width
final_scores_logits = ( return detections
max_scores_filtered_shiftedpositive[nms_indices]
- self.logit_shift_to_positive_values
) # Unshifted logits
# Detections array format: [class_id, score, ymin, xmin, ymax, xmax]
detections[:num_detections, 0] = class_ids_post_nms
detections[:num_detections, 1] = 1.0 / (
1.0 + np.exp(-final_scores_logits)
) # sigmoid
detections[:num_detections, 2] = final_boxes[:, 1] / self.model_height
detections[:num_detections, 3] = final_boxes[:, 0] / self.model_width
detections[:num_detections, 4] = final_boxes[:, 3] / self.model_height
detections[:num_detections, 5] = final_boxes[:, 2] / self.model_width
return detections
else:
logger.error(
f"Invalid count of output tensors in YOLO model. Found {output_tensor_count}, expecting 1/2/3."
)
raise
else: else:
# Default SSD model # Default SSD model