Refactor synap detector to more closely match other implementations

This commit is contained in:
Nicolas Mowen 2025-09-15 16:49:57 -06:00
parent 49c15ce6c3
commit 1bbcd9a1a3

View File

@ -1,35 +1,20 @@
import logging import logging
import numpy as np
from PIL import Image
from pydantic import Field
from typing_extensions import Literal from typing_extensions import Literal
import numpy as np
from synap import Network from synap import Network
from synap.types import Shape, Layout, DataType from synap.types import Shape, Layout
from synap.preprocessor import Preprocessor, InputData from synap.preprocessor import Preprocessor
from synap.postprocessor import Detector from synap.postprocessor import Detector
from frigate.detectors.detection_api import DetectionApi from frigate.detectors.detection_api import DetectionApi
from frigate.detectors.detector_config import BaseDetectorConfig from frigate.detectors.detector_config import BaseDetectorConfig, ModelTypeEnum
try:
from tflite_runtime.interpreter import Interpreter, load_delegate
except ModuleNotFoundError:
from tensorflow.lite.python.interpreter import Interpreter, load_delegate
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
DETECTOR_KEY = "synap1680" DETECTOR_KEY = "synap1680"
# normalize the final bbox boundry
def normalize_bbox(x, y, w, h, preproc_w, preproc_h) -> tuple[float, float, float, float]:
x_norm = x / preproc_w
y_norm = y / preproc_h
w_norm = (x + w) / preproc_w
h_norm = (y + h) / preproc_h
return x_norm, y_norm, w_norm, h_norm
class SynapDetectorConfig(BaseDetectorConfig): class SynapDetectorConfig(BaseDetectorConfig):
type: Literal[DETECTOR_KEY] type: Literal[DETECTOR_KEY]
@ -41,63 +26,52 @@ class SynapDetector(DetectionApi):
try: try:
synap_network = Network(detector_config.model.path) synap_network = Network(detector_config.model.path)
logger.info("[INIT] NPU found") logger.info(f"Synap NPU loaded model: {detector_config.model.path}")
except Exception as e: except Exception as e:
logger.error(f"[INIT] Failed to init NPU: {e}") logger.error(f"Failed to init Synap NPU: {e}")
raise raise
self.width = detector_config.model.width
self.height = detector_config.model.height
self.model_type = detector_config.model.model_type
self.network = synap_network self.network = synap_network
self.network_input_details = self.network.inputs[0] self.network_input_details = self.network.inputs[0]
self.model_shape = self.network_input_details.shape
self.model_dtype = self.network_input_details.data_type
def detect_raw(self, tensor_input): # Create Inference Engine
detections = np.zeros((20, 6), np.float32) self.preprocessor = Preprocessor()
batch_size, height, width, channels = tensor_input.shape self.detector = Detector(score_threshold=0.4, iou_threshold=0.4)
target_height, target_width = self.model_shape[1], self.model_shape[2]
resized_tensors = []
for i in range(batch_size):
image = Image.fromarray(tensor_input[i])
resized_image = image.resize((target_width, target_height)) def detect_raw(self, tensor_input: np.ndarray):
postprocess_data = self.preprocessor.assign(self.network.inputs, tensor_input, Shape(tensor_input.shape), Layout.nhwc)
resized_tensor = np.array(resized_image) output_tensor_obj = self.network.predict()
resized_tensors.append(resized_tensor) output = self.detector.process(output_tensor_obj, postprocess_data)
tensor_input = np.stack(resized_tensors)
if tuple(tensor_input.shape) != tuple(self.model_shape):
raise ValueError(f"diff shape {tensor_input.shape} {self.model_shape}")
preprocessor = Preprocessor()
detector = Detector()
assigned_rect = preprocessor.assign(self.network.inputs, tensor_input, Shape(tensor_input.shape), Layout.nhwc)
outputs = self.network.predict()
result = detector.process(outputs, assigned_rect)
if self.model_type == ModelTypeEnum.yologeneric:
detections = np.zeros((20, 6), np.float32) detections = np.zeros((20, 6), np.float32)
for i, item in enumerate(result.items): for i, item in enumerate(output.items):
if i == 20: if i == 20:
break break
if float(item.confidence) < 0.4:
continue
bb = item.bounding_box bb = item.bounding_box
raw_x, raw_y, raw_w, raw_h = normalize_bbox(
bb.origin.x, # Convert corner coordinates to normalized [0,1] range
bb.origin.y, x1 = bb.origin.x / self.width # Top-left X
bb.size.x, y1 = bb.origin.y / self.height # Top-left Y
bb.size.y, x2 = (bb.origin.x + bb.size.x) / self.width # Bottom-right X
assigned_rect.size.x, y2 = (bb.origin.y + bb.size.y) / self.height # Bottom-right Y
assigned_rect.size.y
)
detections[i] = [ detections[i] = [
item.class_index, item.class_index,
float(item.confidence), float(item.confidence),
raw_y, y1,
raw_x, x1,
raw_h, y2,
raw_w, x2,
] ]
return detections return detections
else:
print(f"Unsupported model type: {self.model_type}")
return np.zeros((20, 6), np.float32)