From 1bbcd9a1a3fe33a9d8555601ef6e4afb0d60f489 Mon Sep 17 00:00:00 2001 From: Nicolas Mowen Date: Mon, 15 Sep 2025 16:49:57 -0600 Subject: [PATCH] Refactor synap detector to more closely match other implementations --- frigate/detectors/plugins/synap1680.py | 96 ++++++++++---------------- 1 file changed, 35 insertions(+), 61 deletions(-) diff --git a/frigate/detectors/plugins/synap1680.py b/frigate/detectors/plugins/synap1680.py index ae7a5a9c6..802b7ab67 100644 --- a/frigate/detectors/plugins/synap1680.py +++ b/frigate/detectors/plugins/synap1680.py @@ -1,35 +1,20 @@ import logging -import numpy as np -from PIL import Image -from pydantic import Field from typing_extensions import Literal +import numpy as np from synap import Network -from synap.types import Shape, Layout, DataType -from synap.preprocessor import Preprocessor, InputData +from synap.types import Shape, Layout +from synap.preprocessor import Preprocessor from synap.postprocessor import Detector from frigate.detectors.detection_api import DetectionApi -from frigate.detectors.detector_config import BaseDetectorConfig - -try: - from tflite_runtime.interpreter import Interpreter, load_delegate -except ModuleNotFoundError: - from tensorflow.lite.python.interpreter import Interpreter, load_delegate - +from frigate.detectors.detector_config import BaseDetectorConfig, ModelTypeEnum logger = logging.getLogger(__name__) DETECTOR_KEY = "synap1680" -# normalize the final bbox boundry -def normalize_bbox(x, y, w, h, preproc_w, preproc_h) -> tuple[float, float, float, float]: - x_norm = x / preproc_w - y_norm = y / preproc_h - w_norm = (x + w) / preproc_w - h_norm = (y + h) / preproc_h - return x_norm, y_norm, w_norm, h_norm class SynapDetectorConfig(BaseDetectorConfig): type: Literal[DETECTOR_KEY] @@ -41,63 +26,52 @@ class SynapDetector(DetectionApi): try: synap_network = Network(detector_config.model.path) - logger.info("[INIT] NPU found") + logger.info(f"Synap NPU loaded model: {detector_config.model.path}") except Exception as e: - logger.error(f"[INIT] Failed to init NPU: {e}") + logger.error(f"Failed to init Synap NPU: {e}") raise - + + self.width = detector_config.model.width + self.height = detector_config.model.height + self.model_type = detector_config.model.model_type self.network = synap_network self.network_input_details = self.network.inputs[0] - self.model_shape = self.network_input_details.shape - self.model_dtype = self.network_input_details.data_type - def detect_raw(self, tensor_input): - detections = np.zeros((20, 6), np.float32) - batch_size, height, width, channels = tensor_input.shape - target_height, target_width = self.model_shape[1], self.model_shape[2] - resized_tensors = [] - for i in range(batch_size): - image = Image.fromarray(tensor_input[i]) + # Create Inference Engine + self.preprocessor = Preprocessor() + self.detector = Detector(score_threshold=0.4, iou_threshold=0.4) - resized_image = image.resize((target_width, target_height)) + def detect_raw(self, tensor_input: np.ndarray): + postprocess_data = self.preprocessor.assign(self.network.inputs, tensor_input, Shape(tensor_input.shape), Layout.nhwc) + output_tensor_obj = self.network.predict() + output = self.detector.process(output_tensor_obj, postprocess_data) - resized_tensor = np.array(resized_image) - resized_tensors.append(resized_tensor) - - tensor_input = np.stack(resized_tensors) - if tuple(tensor_input.shape) != tuple(self.model_shape): - raise ValueError(f"diff shape {tensor_input.shape} {self.model_shape}") - - preprocessor = Preprocessor() - detector = Detector() - assigned_rect = preprocessor.assign(self.network.inputs, tensor_input, Shape(tensor_input.shape), Layout.nhwc) - outputs = self.network.predict() - result = detector.process(outputs, assigned_rect) - + if self.model_type == ModelTypeEnum.yologeneric: detections = np.zeros((20, 6), np.float32) - for i, item in enumerate(result.items): + for i, item in enumerate(output.items): if i == 20: break - if float(item.confidence) < 0.4: - continue + bb = item.bounding_box - raw_x, raw_y, raw_w, raw_h = normalize_bbox( - bb.origin.x, - bb.origin.y, - bb.size.x, - bb.size.y, - assigned_rect.size.x, - assigned_rect.size.y - ) + + # Convert corner coordinates to normalized [0,1] range + x1 = bb.origin.x / self.width # Top-left X + y1 = bb.origin.y / self.height # Top-left Y + x2 = (bb.origin.x + bb.size.x) / self.width # Bottom-right X + y2 = (bb.origin.y + bb.size.y) / self.height # Bottom-right Y + detections[i] = [ item.class_index, float(item.confidence), - raw_y, - raw_x, - raw_h, - raw_w, + y1, + x1, + y2, + x2, ] - return detections + return detections + else: + print(f"Unsupported model type: {self.model_type}") + return np.zeros((20, 6), np.float32)