From ec1e074c08516b40e68d50d4c593db51c37c8b13 Mon Sep 17 00:00:00 2001 From: Dan Brown Date: Tue, 2 Dec 2025 12:21:54 +0100 Subject: [PATCH] Adds support for YOLO v9 models running on Google Coral --- docs/docs/configuration/object_detectors.md | 58 +++- frigate/detectors/plugins/edgetpu_tfl.py | 350 ++++++++++++++++++-- 2 files changed, 383 insertions(+), 25 deletions(-) diff --git a/docs/docs/configuration/object_detectors.md b/docs/docs/configuration/object_detectors.md index 23d571bc7..14531625f 100644 --- a/docs/docs/configuration/object_detectors.md +++ b/docs/docs/configuration/object_detectors.md @@ -13,7 +13,7 @@ Frigate supports multiple different detectors that work on different types of ha **Most Hardware** -- [Coral EdgeTPU](#edge-tpu-detector): The Google Coral EdgeTPU is available in USB and m.2 format allowing for a wide range of compatibility with devices. +- [Coral EdgeTPU](#edge-tpu-detector): The Google Coral EdgeTPU is available in USB, Mini PCIe, and m.2 formats allowing for a wide range of compatibility with devices. - [Hailo](#hailo-8): The Hailo8 and Hailo8L AI Acceleration module is available in m.2 format with a HAT for RPi devices, offering a wide range of compatibility with devices. - [MemryX](#memryx-mx3): The MX3 Acceleration module is available in m.2 format, offering broad compatibility across various platforms. - [DeGirum](#degirum): Service for using hardware devices in the cloud or locally. Hardware and models provided on the cloud on [their website](https://hub.degirum.com). @@ -69,12 +69,10 @@ Frigate provides the following builtin detector types: `cpu`, `edgetpu`, `hailo8 ## Edge TPU Detector -The Edge TPU detector type runs a TensorFlow Lite model utilizing the Google Coral delegate for hardware acceleration. To configure an Edge TPU detector, set the `"type"` attribute to `"edgetpu"`. +The Edge TPU detector type runs TensorFlow Lite models utilizing the Google Coral delegate for hardware acceleration. To configure an Edge TPU detector, set the `"type"` attribute to `"edgetpu"`. The Edge TPU device can be specified using the `"device"` attribute according to the [Documentation for the TensorFlow Lite Python API](https://coral.ai/docs/edgetpu/multiple-edgetpu/#using-the-tensorflow-lite-python-api). If not set, the delegate will use the first device it finds. -A TensorFlow Lite model is provided in the container at `/edgetpu_model.tflite` and is used by this detector type by default. To provide your own model, bind mount the file into the container and provide the path with `model.path`. - :::tip See [common Edge TPU troubleshooting steps](/troubleshooting/edgetpu) if the Edge TPU is not detected. @@ -146,6 +144,58 @@ detectors: device: pci ``` +### EdgeTPU Supported Models + +| Model | Notes | +| ------------------------------------- | ------------------------------------------- | +| [MobileNet v2](#ssdlite-mobilenet-v2) | Default model | +| [YOLOv9](#yolo-v9) | More accurate but slower than default model | + +#### SSDLite MobileNet v2 + +A TensorFlow Lite model is provided in the container at `/edgetpu_model.tflite` and is used by this detector type by default. To provide your own model, bind mount the file into the container and provide the path with `model.path`. + +A Tensorflow Lite is provided in the container at `/openvino-model/ssdlite_mobilenet_v2.xml` and is used by this detector type by default. The model comes from Intel's Open Model Zoo [SSDLite MobileNet V2](https://github.com/openvinotoolkit/open_model_zoo/tree/master/models/public/ssdlite_mobilenet_v2) and is converted to an INT8 precision model. + +#### YOLO v9 + +[YOLOv9](https://github.com/dbro/frigate-detector-edgetpu-yolo9/releases/download/v1.0/yolov9-s-relu6-best_320_int8_edgetpu.tflite) models that are compiled for Tensorflow Lite and properly quantized are supported, but not included by default. To provide your own model, bind mount the file into the container and provide the path with `model.path`. Note that the model may require a custom label file (eg. [use this 17 label file](https://raw.githubusercontent.com/dbro/frigate-detector-edgetpu-yolo9/refs/heads/main/labels-coco17.txt) for the model linked above.) + +:::tip + +The YOLO detector has been designed to support YOLOv9 models, and may support other YOLO model architectures as well. + +::: + +
+ YOLOv9 Setup & Config + +:::warning + +If you are using a Frigate+ YOLOv9 model, you should not define any of the below `model` parameters in your config except for `path`. See [the Frigate+ model docs](/plus/first_model#step-3-set-your-model-id-in-the-config) for more information on setting up your model. + +::: + +After placing the downloaded files for the tflite model and labels in your config folder, you can use the following configuration: + +```yaml +detectors: + coral: + type: edgetpu + device: usb + +model: + model_type: yolo-generic + width: 320 # <--- should match the imgsize of the model, typically 320 + height: 320 # <--- should match the imgsize of the model, typically 320 + path: /config/model_cache/yolov9-s-relu6-best_320_int8_edgetpu.tflite + labelmap_path: /labelmap/labels-coco-17.txt +``` + +Note that the labelmap uses a subset of the complete COCO label set that has only 17 objects. + +
+ --- ## Hailo-8 diff --git a/frigate/detectors/plugins/edgetpu_tfl.py b/frigate/detectors/plugins/edgetpu_tfl.py index 246d2dd41..1ce10cc74 100644 --- a/frigate/detectors/plugins/edgetpu_tfl.py +++ b/frigate/detectors/plugins/edgetpu_tfl.py @@ -1,19 +1,21 @@ import logging +import math import os +import cv2 import numpy as np from pydantic import Field from typing_extensions import Literal from frigate.detectors.detection_api import DetectionApi -from frigate.detectors.detector_config import BaseDetectorConfig +from frigate.detectors.detector_config import BaseDetectorConfig, ModelTypeEnum +from frigate.util.model import post_process_yolo try: from tflite_runtime.interpreter import Interpreter, load_delegate except ModuleNotFoundError: from tensorflow.lite.python.interpreter import Interpreter, load_delegate - logger = logging.getLogger(__name__) DETECTOR_KEY = "edgetpu" @@ -22,12 +24,20 @@ DETECTOR_KEY = "edgetpu" class EdgeTpuDetectorConfig(BaseDetectorConfig): type: Literal[DETECTOR_KEY] device: str = Field(default=None, title="Device Type") + # model_type inherited from BaseDetectorConfig, but can override default class EdgeTpuTfl(DetectionApi): type_key = DETECTOR_KEY + supported_models = [ + ModelTypeEnum.ssd, + ModelTypeEnum.yologeneric, + ] def __init__(self, detector_config: EdgeTpuDetectorConfig): + logger.info( + f"Initializing {DETECTOR_KEY} detector with support for SSD and YOLOv9 models" + ) device_config = {} if detector_config.device is not None: device_config = {"device": detector_config.device} @@ -63,31 +73,329 @@ class EdgeTpuTfl(DetectionApi): self.tensor_input_details = self.interpreter.get_input_details() self.tensor_output_details = self.interpreter.get_output_details() - self.model_type = detector_config.model.model_type + self.model_width = detector_config.model.width + self.model_height = detector_config.model.height + + self.min_score = 0.4 + try: + self.min_score = detector_config.model.min_score + except AttributeError: + pass + + self.max_detections = 20 + try: + self.max_detections = detector_config.model.max_detections + except AttributeError: + pass + + model_type = detector_config.model.model_type + self.yolo_model = model_type == ModelTypeEnum.yologeneric + self.model_requires_int8 = self.tensor_input_details[0]["dtype"] == np.int8 + if self.model_requires_int8: + logger.info("Detection model requires int8 format input") + + if self.yolo_model: + logger.info( + f"Preparing YOLO postprocessing for {len(self.tensor_output_details)}-tensor output" + ) + if len(self.tensor_output_details) > 1: # expecting 2 or 3 + self.reg_max = 16 # = 64 dfl_channels // 4 # YOLO standard + self.min_logit_value = np.log( + self.min_score / (1 - self.min_score) + ) # for filtering + self._generate_anchors_and_strides() # decode bounding box DFL + self.project = np.arange( + self.reg_max, dtype=np.float32 + ) # for decoding bounding box DFL information + + # Determine YOLO tensor indices and quantization scales for + # boxes and class_scores the tensor ordering and names are + # not reliable, so use tensor shape to detect which tensor + # holds boxes or class scores. + # The tensors have shapes (B, N, C) + # where N is the number of candidates (=2100 for 320x320) + # this may guess wrong if the number of classes is exactly 64 + output_boxes_index = None + output_classes_index = None + for i, x in enumerate(self.tensor_output_details): + # the nominal index seems to start at 1 instead of 0 + if len(x["shape"]) == 3 and x["shape"][2] == 64: + output_boxes_index = i + elif len(x["shape"]) == 3 and x["shape"][2] > 1: + # require the number of classes to be more than 1 + # to differentiate from (not used) max score tensor + output_classes_index = i + if output_boxes_index is None or output_classes_index is None: + logger.warning( + "Unrecognized model output, unexpected tensor shapes." + ) + output_classes_index = ( + 0 + if (output_boxes_index is None or output_classes_index == 1) + else 1 + ) # 0 is default guess + output_boxes_index = 1 if (output_boxes_index == 0) else 0 + scores_details = self.tensor_output_details[output_classes_index] + classes_count = scores_details["shape"][2] + self.scores_tensor_index = scores_details["index"] + self.scores_scale, self.scores_zero_point = scores_details[ + "quantization" + ] + # calculate the quantized version of the min_score + self.min_score_quantized = int( + (self.min_logit_value / self.scores_scale) + self.scores_zero_point + ) + self.logit_shift_to_positive_values = ( + max( + 0, math.ceil((128 + self.scores_zero_point) * self.scores_scale) + ) + + 1 + ) # round up + + boxes_details = self.tensor_output_details[output_boxes_index] + self.boxes_tensor_index = boxes_details["index"] + self.boxes_scale, self.boxes_zero_point = boxes_details["quantization"] + logger.info( + f"Using tensor index {output_boxes_index} for boxes(DFL), {output_classes_index} for {classes_count} class scores" + ) + + else: + if model_type not in [ModelTypeEnum.ssd, None]: + logger.warning( + f"Unsupported model_type '{model_type}' for EdgeTPU detector, falling back to SSD" + ) + logger.info("Using SSD preprocessing/postprocessing") + + # SSD model indices (4 outputs: boxes, class_ids, scores, count) + for x in self.tensor_output_details: + if len(x["shape"]) == 3: + self.output_boxes_index = x["index"] + elif len(x["shape"]) == 1: + self.output_count_index = x["index"] + + self.output_class_ids_index = None + self.output_class_scores_index = None + + def _generate_anchors_and_strides(self): + # for decoding the bounding box DFL information into xy coordinates + all_anchors = [] + all_strides = [] + strides = (8, 16, 32) # YOLO's small, medium, large detection heads + + for stride in strides: + feat_h, feat_w = self.model_height // stride, self.model_width // stride + + grid_y, grid_x = np.meshgrid( + np.arange(feat_h, dtype=np.float32), + np.arange(feat_w, dtype=np.float32), + indexing="ij", + ) + + grid_coords = np.stack((grid_x.flatten(), grid_y.flatten()), axis=1) + anchor_points = grid_coords + 0.5 + + all_anchors.append(anchor_points) + all_strides.append(np.full((feat_h * feat_w, 1), stride, dtype=np.float32)) + + self.anchors = np.concatenate(all_anchors, axis=0) + self.anchor_strides = np.concatenate(all_strides, axis=0) + + def determine_indexes_for_non_yolo_models(self): + """Legacy method for SSD models.""" + if ( + self.output_class_ids_index is None + or self.output_class_scores_index is None + ): + for i in range(4): + index = self.tensor_output_details[i]["index"] + if ( + index != self.output_boxes_index + and index != self.output_count_index + ): + if ( + np.mod(np.float32(self.interpreter.tensor(index)()[0][0]), 1) + == 0.0 + ): + self.output_class_ids_index = index + else: + self.output_scores_index = index def detect_raw(self, tensor_input): + if self.model_requires_int8: + tensor_input = np.bitwise_xor(tensor_input, 128).view( + np.int8 + ) # shift by -128 self.interpreter.set_tensor(self.tensor_input_details[0]["index"], tensor_input) self.interpreter.invoke() - boxes = self.interpreter.tensor(self.tensor_output_details[0]["index"])()[0] - class_ids = self.interpreter.tensor(self.tensor_output_details[1]["index"])()[0] - scores = self.interpreter.tensor(self.tensor_output_details[2]["index"])()[0] - count = int( - self.interpreter.tensor(self.tensor_output_details[3]["index"])()[0] - ) + if self.yolo_model: + if len(self.tensor_output_details) == 1: + # Single-tensor YOLO model + # model output is (1, NC+4, 2100) for 320x320 image size + # boxes as xywh (normalized to [0,1]) + # followed by NC class probabilities (also [0,1]) + # BEWARE the tensor has only one quantization scale/zero_point, + # so it should be assembled carefully to have a range of [0,1] + outputs = [] + for output in self.tensor_output_details: + x = self.interpreter.get_tensor(output["index"]) + scale, zero_point = output["quantization"] + x = (x.astype(np.float32) - zero_point) * scale + # Denormalize xywh by image size + x[:, [0, 2]] *= self.model_width + x[:, [1, 3]] *= self.model_height + outputs.append(x) - detections = np.zeros((20, 6), np.float32) + return post_process_yolo(outputs, self.model_width, self.model_height) - for i in range(count): - if scores[i] < 0.4 or i == 20: - break - detections[i] = [ - class_ids[i], - float(scores[i]), - boxes[i][0], - boxes[i][1], - boxes[i][2], - boxes[i][3], + else: + # Multi-tensor YOLO model with (non-standard B(H*W)C output format). + # (the comments indicate the shape of tensors, + # using "2100" as the anchor count (for image size of 320x320), + # "NC" as number of classes, + # "N" as the count that survive after min-score filtering) + # TENSOR A) class scores (1, 2100, NC) with logit values + # TENSOR B) box coordinates (1, 2100, 64) encoded as dfl scores + # Recommend that the model clamp the logit values in tensor (A) + # to the range [-4,+4] to preserve precision from [2%,98%] + # and because NMS requires the min_score parameter to be >= 0 + + # don't dequantize scores data yet, wait until the low-confidence + # candidates are filtered out from the overall result set. + # This reduces the work and makes post-processing faster. + # this method works with raw quantized numbers when possible, + # which relies on the value of the scale factor to be >0. + # This speeds up max and argmax operations. + # Get max confidence for each detection and create the mask + detections = np.zeros( + (self.max_detections, 6), np.float32 + ) # initialize zero results + scores_output_quantized = self.interpreter.get_tensor( + self.scores_tensor_index + )[ + 0 + ] # (2100, NC) + max_scores_quantized = np.max( + scores_output_quantized, axis=1 + ) # (2100,) + mask = max_scores_quantized >= self.min_score_quantized # (2100,) + + if not np.any(mask): + return detections # empty results + + max_scores_filtered_shiftedpositive = ( + (max_scores_quantized[mask] - self.scores_zero_point) + * self.scores_scale + ) + self.logit_shift_to_positive_values # (N,1) shifted logit values + scores_output_quantized_filtered = scores_output_quantized[mask] + + # dequantize boxes. NMS needs them to be in float format + # remove candidates with probabilities < threshold + boxes_output_quantized_filtered = ( + self.interpreter.get_tensor(self.boxes_tensor_index)[0] + )[ + mask + ] # (N, 64) + boxes_output_filtered = ( + boxes_output_quantized_filtered.astype(np.float32) + - self.boxes_zero_point + ) * self.boxes_scale + + # 2. Decode DFL to distances (ltrb) + dfl_distributions = boxes_output_filtered.reshape( + -1, 4, self.reg_max + ) # (N, 4, 16) + + # Softmax over the 16 bins + dfl_max = np.max(dfl_distributions, axis=2, keepdims=True) + dfl_exp = np.exp(dfl_distributions - dfl_max) + dfl_probs = dfl_exp / np.sum( + dfl_exp, axis=2, keepdims=True + ) # (N, 4, 16) + + # Weighted sum: (N, 4, 16) * (16,) -> (N, 4) + distances = np.einsum("pcr,r->pc", dfl_probs, self.project) + + # Calculate box corners in pixel coordinates + anchors_filtered = self.anchors[mask] + anchor_strides_filtered = self.anchor_strides[mask] + x1y1 = ( + anchors_filtered - distances[:, [0, 1]] + ) * anchor_strides_filtered # (N, 2) + x2y2 = ( + anchors_filtered + distances[:, [2, 3]] + ) * anchor_strides_filtered # (N, 2) + boxes_filtered_decoded = np.concatenate((x1y1, x2y2), axis=-1) # (N, 4) + + # 9. Apply NMS. Use logit scores here to defer sigmoid() + # until after filtering out redundant boxes + # Shift the logit scores to be non-negative (required by cv2) + indices = cv2.dnn.NMSBoxes( + bboxes=boxes_filtered_decoded, + scores=max_scores_filtered_shiftedpositive, + score_threshold=( + self.min_logit_value + self.logit_shift_to_positive_values + ), + nms_threshold=0.4, # should this be a model config setting? + ) + num_detections = len(indices) + if num_detections == 0: + return detections # empty results + + nms_indices = np.array(indices, dtype=np.int32).ravel() # or .flatten() + if num_detections > self.max_detections: + nms_indices = nms_indices[: self.max_detections] + num_detections = self.max_detections + kept_logits_quantized = scores_output_quantized_filtered[nms_indices] + class_ids_post_nms = np.argmax(kept_logits_quantized, axis=1) + + # Extract the final boxes and scores using fancy indexing + final_boxes = boxes_filtered_decoded[nms_indices] + final_scores_logits = ( + max_scores_filtered_shiftedpositive[nms_indices] + - self.logit_shift_to_positive_values + ) # Unshifted logits + + # Detections array format: [class_id, score, ymin, xmin, ymax, xmax] + detections[:num_detections, 0] = class_ids_post_nms + detections[:num_detections, 1] = 1.0 / ( + 1.0 + np.exp(-final_scores_logits) + ) # sigmoid + detections[:num_detections, 2] = final_boxes[:, 1] / self.model_height + detections[:num_detections, 3] = final_boxes[:, 0] / self.model_width + detections[:num_detections, 4] = final_boxes[:, 3] / self.model_height + detections[:num_detections, 5] = final_boxes[:, 2] / self.model_width + return detections + + else: + # Default SSD model + self.determine_indexes_for_non_yolo_models() + boxes = self.interpreter.tensor(self.tensor_output_details[0]["index"])()[0] + class_ids = self.interpreter.tensor( + self.tensor_output_details[1]["index"] + )()[0] + scores = self.interpreter.tensor(self.tensor_output_details[2]["index"])()[ + 0 ] + count = int( + self.interpreter.tensor(self.tensor_output_details[3]["index"])()[0] + ) - return detections + detections = np.zeros((self.max_detections, 6), np.float32) + + for i in range(count): + if scores[i] < self.min_score: + break + if i == self.max_detections: + logger.info(f"Too many detections ({count})!") + break + detections[i] = [ + class_ids[i], + float(scores[i]), + boxes[i][0], + boxes[i][1], + boxes[i][2], + boxes[i][3], + ] + + return detections