mirror of
https://github.com/blakeblackshear/frigate.git
synced 2025-12-06 13:34:13 +03:00
837 lines
32 KiB
Python
837 lines
32 KiB
Python
import glob
|
|
import logging
|
|
import os
|
|
import shutil
|
|
import time
|
|
import urllib.request
|
|
import zipfile
|
|
from queue import Queue
|
|
|
|
import cv2
|
|
import numpy as np
|
|
from pydantic import BaseModel, Field
|
|
from typing_extensions import Literal
|
|
|
|
from frigate.detectors.detection_api import DetectionApi
|
|
from frigate.detectors.detector_config import (
|
|
BaseDetectorConfig,
|
|
ModelTypeEnum,
|
|
)
|
|
from frigate.util.file import FileLock
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
DETECTOR_KEY = "memryx"
|
|
|
|
|
|
# Configuration class for model settings
|
|
class ModelConfig(BaseModel):
|
|
path: str = Field(default=None, title="Model Path") # Path to the DFP file
|
|
labelmap_path: str = Field(default=None, title="Path to Label Map")
|
|
|
|
|
|
class MemryXDetectorConfig(BaseDetectorConfig):
|
|
type: Literal[DETECTOR_KEY]
|
|
device: str = Field(default="PCIe", title="Device Path")
|
|
|
|
|
|
class MemryXDetector(DetectionApi):
|
|
type_key = DETECTOR_KEY # Set the type key
|
|
supported_models = [
|
|
ModelTypeEnum.ssd,
|
|
ModelTypeEnum.yolonas,
|
|
ModelTypeEnum.yologeneric, # Treated as yolov9 in MemryX implementation
|
|
ModelTypeEnum.yolox,
|
|
]
|
|
|
|
def __init__(self, detector_config):
|
|
"""Initialize MemryX detector with the provided configuration."""
|
|
try:
|
|
# Import MemryX SDK
|
|
from memryx import AsyncAccl
|
|
except ModuleNotFoundError:
|
|
raise ImportError(
|
|
"MemryX SDK is not installed. Install it and set up MIX environment."
|
|
)
|
|
return
|
|
|
|
model_cfg = getattr(detector_config, "model", None)
|
|
|
|
# Check if model_type was explicitly set by the user
|
|
if "model_type" in getattr(model_cfg, "__fields_set__", set()):
|
|
detector_config.model.model_type = model_cfg.model_type
|
|
else:
|
|
logger.info(
|
|
"model_type not set in config — defaulting to yolonas for MemryX."
|
|
)
|
|
detector_config.model.model_type = ModelTypeEnum.yolonas
|
|
|
|
self.capture_queue = Queue(maxsize=10)
|
|
self.output_queue = Queue(maxsize=10)
|
|
self.capture_id_queue = Queue(maxsize=10)
|
|
self.logger = logger
|
|
|
|
self.memx_model_path = detector_config.model.path # Path to .dfp file
|
|
self.memx_post_model = None # Path to .post file
|
|
self.expected_post_model = None
|
|
|
|
self.memx_device_path = detector_config.device # Device path
|
|
# Parse the device string to split PCIe:<index>
|
|
device_str = self.memx_device_path
|
|
self.device_id = []
|
|
self.device_id.append(int(device_str.split(":")[1]))
|
|
|
|
self.memx_model_height = detector_config.model.height
|
|
self.memx_model_width = detector_config.model.width
|
|
self.memx_model_type = detector_config.model.model_type
|
|
|
|
self.cache_dir = "/memryx_models"
|
|
|
|
if self.memx_model_type == ModelTypeEnum.yologeneric:
|
|
model_mapping = {
|
|
(640, 640): (
|
|
"https://developer.memryx.com/example_files/2p0_frigate/yolov9_640.zip",
|
|
"yolov9_640",
|
|
),
|
|
(320, 320): (
|
|
"https://developer.memryx.com/example_files/2p0_frigate/yolov9_320.zip",
|
|
"yolov9_320",
|
|
),
|
|
}
|
|
self.model_url, self.model_folder = model_mapping.get(
|
|
(self.memx_model_height, self.memx_model_width),
|
|
(
|
|
"https://developer.memryx.com/example_files/2p0_frigate/yolov9_320.zip",
|
|
"yolov9_320",
|
|
),
|
|
)
|
|
self.expected_dfp_model = "YOLO_v9_small_onnx.dfp"
|
|
|
|
elif self.memx_model_type == ModelTypeEnum.yolonas:
|
|
model_mapping = {
|
|
(640, 640): (
|
|
"https://developer.memryx.com/example_files/2p0_frigate/yolonas_640.zip",
|
|
"yolonas_640",
|
|
),
|
|
(320, 320): (
|
|
"https://developer.memryx.com/example_files/2p0_frigate/yolonas_320.zip",
|
|
"yolonas_320",
|
|
),
|
|
}
|
|
self.model_url, self.model_folder = model_mapping.get(
|
|
(self.memx_model_height, self.memx_model_width),
|
|
(
|
|
"https://developer.memryx.com/example_files/2p0_frigate/yolonas_320.zip",
|
|
"yolonas_320",
|
|
),
|
|
)
|
|
self.expected_dfp_model = "yolo_nas_s.dfp"
|
|
self.expected_post_model = "yolo_nas_s_post.onnx"
|
|
|
|
elif self.memx_model_type == ModelTypeEnum.yolox:
|
|
self.model_folder = "yolox"
|
|
self.model_url = (
|
|
"https://developer.memryx.com/example_files/2p0_frigate/yolox.zip"
|
|
)
|
|
self.expected_dfp_model = "YOLOX_640_640_3_onnx.dfp"
|
|
self.set_strides_grids()
|
|
|
|
elif self.memx_model_type == ModelTypeEnum.ssd:
|
|
self.model_folder = "ssd"
|
|
self.model_url = (
|
|
"https://developer.memryx.com/example_files/2p0_frigate/ssd.zip"
|
|
)
|
|
self.expected_dfp_model = "SSDlite_MobileNet_v2_320_320_3_onnx.dfp"
|
|
self.expected_post_model = "SSDlite_MobileNet_v2_320_320_3_onnx_post.onnx"
|
|
|
|
self.check_and_prepare_model()
|
|
logger.info(
|
|
f"Initializing MemryX with model: {self.memx_model_path} on device {self.memx_device_path}"
|
|
)
|
|
|
|
try:
|
|
# Load MemryX Model
|
|
logger.info(f"dfp path: {self.memx_model_path}")
|
|
|
|
# Initialization code
|
|
# Load MemryX Model with a device target
|
|
self.accl = AsyncAccl(
|
|
self.memx_model_path,
|
|
device_ids=self.device_id, # AsyncAccl device ids
|
|
local_mode=True,
|
|
)
|
|
|
|
# Models that use cropped post-processing sections (YOLO-NAS and SSD)
|
|
# --> These will be moved to pure numpy in the future to improve performance on low-end CPUs
|
|
if self.memx_post_model:
|
|
self.accl.set_postprocessing_model(self.memx_post_model, model_idx=0)
|
|
|
|
self.accl.connect_input(self.process_input)
|
|
self.accl.connect_output(self.process_output)
|
|
|
|
logger.info(
|
|
f"Loaded MemryX model from {self.memx_model_path} and {self.memx_post_model}"
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize MemryX model: {e}")
|
|
raise
|
|
|
|
def check_and_prepare_model(self):
|
|
if not os.path.exists(self.cache_dir):
|
|
os.makedirs(self.cache_dir, exist_ok=True)
|
|
|
|
lock_path = os.path.join(self.cache_dir, f".{self.model_folder}.lock")
|
|
lock = FileLock(lock_path, timeout=60)
|
|
|
|
with lock:
|
|
# ---------- CASE 1: user provided a custom model path ----------
|
|
if self.memx_model_path:
|
|
if not self.memx_model_path.endswith(".zip"):
|
|
raise ValueError(
|
|
f"Invalid model path: {self.memx_model_path}. "
|
|
"Only .zip files are supported. Please provide a .zip model archive."
|
|
)
|
|
if not os.path.exists(self.memx_model_path):
|
|
raise FileNotFoundError(
|
|
f"Custom model zip not found: {self.memx_model_path}"
|
|
)
|
|
|
|
logger.info(f"User provided zip model: {self.memx_model_path}")
|
|
|
|
# Extract custom zip into a separate area so it never clashes with MemryX cache
|
|
custom_dir = os.path.join(
|
|
self.cache_dir, "custom_models", self.model_folder
|
|
)
|
|
if os.path.isdir(custom_dir):
|
|
shutil.rmtree(custom_dir)
|
|
os.makedirs(custom_dir, exist_ok=True)
|
|
|
|
with zipfile.ZipFile(self.memx_model_path, "r") as zip_ref:
|
|
zip_ref.extractall(custom_dir)
|
|
logger.info(f"Custom model extracted to {custom_dir}.")
|
|
|
|
# Find .dfp and optional *_post.onnx recursively
|
|
dfp_candidates = glob.glob(
|
|
os.path.join(custom_dir, "**", "*.dfp"), recursive=True
|
|
)
|
|
post_candidates = glob.glob(
|
|
os.path.join(custom_dir, "**", "*_post.onnx"), recursive=True
|
|
)
|
|
|
|
if not dfp_candidates:
|
|
raise FileNotFoundError(
|
|
"No .dfp file found in custom model zip after extraction."
|
|
)
|
|
|
|
self.memx_model_path = dfp_candidates[0]
|
|
|
|
# Handle post model requirements by model type
|
|
if self.memx_model_type in [
|
|
ModelTypeEnum.yolonas,
|
|
ModelTypeEnum.ssd,
|
|
]:
|
|
if not post_candidates:
|
|
raise FileNotFoundError(
|
|
f"No *_post.onnx file found in custom model zip for {self.memx_model_type.name}."
|
|
)
|
|
self.memx_post_model = post_candidates[0]
|
|
elif self.memx_model_type in [
|
|
ModelTypeEnum.yolox,
|
|
ModelTypeEnum.yologeneric,
|
|
]:
|
|
# Explicitly ignore any post model even if present
|
|
self.memx_post_model = None
|
|
else:
|
|
# Future model types can optionally use post if present
|
|
self.memx_post_model = (
|
|
post_candidates[0] if post_candidates else None
|
|
)
|
|
|
|
logger.info(f"Using custom model: {self.memx_model_path}")
|
|
return
|
|
|
|
# ---------- CASE 2: no custom model path -> use MemryX cached models ----------
|
|
model_subdir = os.path.join(self.cache_dir, self.model_folder)
|
|
dfp_path = os.path.join(model_subdir, self.expected_dfp_model)
|
|
post_path = (
|
|
os.path.join(model_subdir, self.expected_post_model)
|
|
if self.expected_post_model
|
|
else None
|
|
)
|
|
|
|
dfp_exists = os.path.exists(dfp_path)
|
|
post_exists = os.path.exists(post_path) if post_path else True
|
|
|
|
if dfp_exists and post_exists:
|
|
logger.info("Using cached models.")
|
|
self.memx_model_path = dfp_path
|
|
self.memx_post_model = post_path
|
|
return
|
|
|
|
# ---------- CASE 3: download MemryX model (no cache) ----------
|
|
logger.info(
|
|
f"Model files not found locally. Downloading from {self.model_url}..."
|
|
)
|
|
zip_path = os.path.join(self.cache_dir, f"{self.model_folder}.zip")
|
|
|
|
try:
|
|
if not os.path.exists(zip_path):
|
|
urllib.request.urlretrieve(self.model_url, zip_path)
|
|
logger.info(f"Model ZIP downloaded to {zip_path}. Extracting...")
|
|
|
|
if not os.path.exists(model_subdir):
|
|
with zipfile.ZipFile(zip_path, "r") as zip_ref:
|
|
zip_ref.extractall(self.cache_dir)
|
|
logger.info(f"Model extracted to {self.cache_dir}.")
|
|
|
|
# Re-assign model paths after extraction
|
|
self.memx_model_path = os.path.join(
|
|
model_subdir, self.expected_dfp_model
|
|
)
|
|
self.memx_post_model = (
|
|
os.path.join(model_subdir, self.expected_post_model)
|
|
if self.expected_post_model
|
|
else None
|
|
)
|
|
|
|
finally:
|
|
if os.path.exists(zip_path):
|
|
try:
|
|
os.remove(zip_path)
|
|
logger.info("Cleaned up ZIP file after extraction.")
|
|
except Exception as e:
|
|
logger.warning(
|
|
f"Failed to remove downloaded zip {zip_path}: {e}"
|
|
)
|
|
|
|
def send_input(self, connection_id, tensor_input: np.ndarray):
|
|
"""Pre-process (if needed) and send frame to MemryX input queue"""
|
|
if tensor_input is None:
|
|
raise ValueError("[send_input] No image data provided for inference")
|
|
|
|
if self.memx_model_type == ModelTypeEnum.yolonas:
|
|
if tensor_input.ndim == 4 and tensor_input.shape[1:] == (320, 320, 3):
|
|
logger.debug("Transposing tensor from NHWC to NCHW for YOLO-NAS")
|
|
tensor_input = np.transpose(
|
|
tensor_input, (0, 3, 1, 2)
|
|
) # (1, H, W, C) → (1, C, H, W)
|
|
tensor_input = tensor_input.astype(np.float32)
|
|
tensor_input /= 255
|
|
|
|
if self.memx_model_type == ModelTypeEnum.yolox:
|
|
# Remove batch dim → (3, 640, 640)
|
|
tensor_input = tensor_input.squeeze(0)
|
|
|
|
# Convert CHW to HWC for OpenCV
|
|
tensor_input = np.transpose(tensor_input, (1, 2, 0)) # (640, 640, 3)
|
|
|
|
padded_img = np.ones((640, 640, 3), dtype=np.uint8) * 114
|
|
|
|
scale = min(
|
|
640 / float(tensor_input.shape[0]), 640 / float(tensor_input.shape[1])
|
|
)
|
|
sx, sy = (
|
|
int(tensor_input.shape[1] * scale),
|
|
int(tensor_input.shape[0] * scale),
|
|
)
|
|
|
|
resized_img = cv2.resize(
|
|
tensor_input, (sx, sy), interpolation=cv2.INTER_LINEAR
|
|
)
|
|
padded_img[:sy, :sx] = resized_img.astype(np.uint8)
|
|
|
|
# Step 4: Slice the padded image into 4 quadrants and concatenate them into 12 channels
|
|
x0 = padded_img[0::2, 0::2, :] # Top-left
|
|
x1 = padded_img[1::2, 0::2, :] # Bottom-left
|
|
x2 = padded_img[0::2, 1::2, :] # Top-right
|
|
x3 = padded_img[1::2, 1::2, :] # Bottom-right
|
|
|
|
# Step 5: Concatenate along the channel dimension (axis 2)
|
|
concatenated_img = np.concatenate([x0, x1, x2, x3], axis=2)
|
|
tensor_input = concatenated_img.astype(np.float32)
|
|
# Convert to CHW format (12, 320, 320)
|
|
tensor_input = np.transpose(tensor_input, (2, 0, 1))
|
|
|
|
# Add batch dimension → (1, 12, 320, 320)
|
|
tensor_input = np.expand_dims(tensor_input, axis=0)
|
|
|
|
# Send frame to MemryX for processing
|
|
self.capture_queue.put(tensor_input)
|
|
self.capture_id_queue.put(connection_id)
|
|
|
|
def process_input(self):
|
|
"""Input callback function: wait for frames in the input queue, preprocess, and send to MX3 (return)"""
|
|
while True:
|
|
try:
|
|
# Wait for a frame from the queue (blocking call)
|
|
frame = self.capture_queue.get(
|
|
block=True
|
|
) # Blocks until data is available
|
|
|
|
return frame
|
|
|
|
except Exception as e:
|
|
logger.info(f"[process_input] Error processing input: {e}")
|
|
time.sleep(0.1) # Prevent busy waiting in case of error
|
|
|
|
def receive_output(self):
|
|
"""Retrieve processed results from MemryX output queue + a copy of the original frame"""
|
|
connection_id = (
|
|
self.capture_id_queue.get()
|
|
) # Get the corresponding connection ID
|
|
detections = self.output_queue.get() # Get detections from MemryX
|
|
|
|
return connection_id, detections
|
|
|
|
def post_process_yolonas(self, output):
|
|
predictions = output[0]
|
|
|
|
detections = np.zeros((20, 6), np.float32)
|
|
|
|
for i, prediction in enumerate(predictions):
|
|
if i == 20:
|
|
break
|
|
|
|
(_, x_min, y_min, x_max, y_max, confidence, class_id) = prediction
|
|
|
|
if class_id < 0:
|
|
break
|
|
|
|
detections[i] = [
|
|
class_id,
|
|
confidence,
|
|
y_min / self.memx_model_height,
|
|
x_min / self.memx_model_width,
|
|
y_max / self.memx_model_height,
|
|
x_max / self.memx_model_width,
|
|
]
|
|
|
|
# Return the list of final detections
|
|
self.output_queue.put(detections)
|
|
|
|
def process_yolo(self, class_id, conf, pos):
|
|
"""
|
|
Takes in class ID, confidence score, and array of [x, y, w, h] that describes detection position,
|
|
returns an array that's easily passable back to Frigate.
|
|
"""
|
|
return [
|
|
class_id, # class ID
|
|
conf, # confidence score
|
|
(pos[1] - (pos[3] / 2)) / self.memx_model_height, # y_min
|
|
(pos[0] - (pos[2] / 2)) / self.memx_model_width, # x_min
|
|
(pos[1] + (pos[3] / 2)) / self.memx_model_height, # y_max
|
|
(pos[0] + (pos[2] / 2)) / self.memx_model_width, # x_max
|
|
]
|
|
|
|
def set_strides_grids(self):
|
|
grids = []
|
|
expanded_strides = []
|
|
|
|
strides = [8, 16, 32]
|
|
|
|
hsize_list = [self.memx_model_height // stride for stride in strides]
|
|
wsize_list = [self.memx_model_width // stride for stride in strides]
|
|
|
|
for hsize, wsize, stride in zip(hsize_list, wsize_list, strides):
|
|
xv, yv = np.meshgrid(np.arange(wsize), np.arange(hsize))
|
|
grid = np.stack((xv, yv), 2).reshape(1, -1, 2)
|
|
grids.append(grid)
|
|
shape = grid.shape[:2]
|
|
expanded_strides.append(np.full((*shape, 1), stride))
|
|
self.grids = np.concatenate(grids, 1)
|
|
self.expanded_strides = np.concatenate(expanded_strides, 1)
|
|
|
|
def sigmoid(self, x: np.ndarray) -> np.ndarray:
|
|
return 1 / (1 + np.exp(-x))
|
|
|
|
def onnx_concat(self, inputs: list, axis: int) -> np.ndarray:
|
|
# Ensure all inputs are numpy arrays
|
|
if not all(isinstance(x, np.ndarray) for x in inputs):
|
|
raise TypeError("All inputs must be numpy arrays.")
|
|
|
|
# Ensure shapes match on non-concat axes
|
|
ref_shape = list(inputs[0].shape)
|
|
for i, tensor in enumerate(inputs[1:], start=1):
|
|
for ax in range(len(ref_shape)):
|
|
if ax == axis:
|
|
continue
|
|
if tensor.shape[ax] != ref_shape[ax]:
|
|
raise ValueError(
|
|
f"Shape mismatch at axis {ax} between input[0] and input[{i}]"
|
|
)
|
|
|
|
return np.concatenate(inputs, axis=axis)
|
|
|
|
def onnx_reshape(self, data: np.ndarray, shape: np.ndarray) -> np.ndarray:
|
|
# Ensure shape is a 1D array of integers
|
|
target_shape = shape.astype(int).tolist()
|
|
|
|
# Use NumPy reshape with dynamic handling of -1
|
|
reshaped = np.reshape(data, target_shape)
|
|
|
|
return reshaped
|
|
|
|
def post_process_yolox(self, output):
|
|
output_785 = output[0] # 785
|
|
output_794 = output[1] # 794
|
|
output_795 = output[2] # 795
|
|
output_811 = output[3] # 811
|
|
output_820 = output[4] # 820
|
|
output_821 = output[5] # 821
|
|
output_837 = output[6] # 837
|
|
output_846 = output[7] # 846
|
|
output_847 = output[8] # 847
|
|
|
|
output_795 = self.sigmoid(output_795)
|
|
output_785 = self.sigmoid(output_785)
|
|
output_821 = self.sigmoid(output_821)
|
|
output_811 = self.sigmoid(output_811)
|
|
output_847 = self.sigmoid(output_847)
|
|
output_837 = self.sigmoid(output_837)
|
|
|
|
concat_1 = self.onnx_concat([output_794, output_795, output_785], axis=1)
|
|
concat_2 = self.onnx_concat([output_820, output_821, output_811], axis=1)
|
|
concat_3 = self.onnx_concat([output_846, output_847, output_837], axis=1)
|
|
|
|
shape = np.array([1, 85, -1], dtype=np.int64)
|
|
|
|
reshape_1 = self.onnx_reshape(concat_1, shape)
|
|
reshape_2 = self.onnx_reshape(concat_2, shape)
|
|
reshape_3 = self.onnx_reshape(concat_3, shape)
|
|
|
|
concat_out = self.onnx_concat([reshape_1, reshape_2, reshape_3], axis=2)
|
|
|
|
output = concat_out.transpose(0, 2, 1) # 1, 840, 85
|
|
|
|
self.num_classes = output.shape[2] - 5
|
|
|
|
# [x, y, h, w, box_score, class_no_1, ..., class_no_80],
|
|
results = output
|
|
|
|
results[..., :2] = (results[..., :2] + self.grids) * self.expanded_strides
|
|
results[..., 2:4] = np.exp(results[..., 2:4]) * self.expanded_strides
|
|
image_pred = results[0, ...]
|
|
|
|
class_conf = np.max(
|
|
image_pred[:, 5 : 5 + self.num_classes], axis=1, keepdims=True
|
|
)
|
|
class_pred = np.argmax(image_pred[:, 5 : 5 + self.num_classes], axis=1)
|
|
class_pred = np.expand_dims(class_pred, axis=1)
|
|
|
|
conf_mask = (image_pred[:, 4] * class_conf.squeeze() >= 0.3).squeeze()
|
|
# Detections ordered as (x1, y1, x2, y2, obj_conf, class_conf, class_pred)
|
|
detections = np.concatenate((image_pred[:, :5], class_conf, class_pred), axis=1)
|
|
detections = detections[conf_mask]
|
|
|
|
# Sort by class confidence (index 5) and keep top 20 detections
|
|
ordered = detections[detections[:, 5].argsort()[::-1]][:20]
|
|
|
|
# Prepare a final detections array of shape (20, 6)
|
|
final_detections = np.zeros((20, 6), np.float32)
|
|
for i, object_detected in enumerate(ordered):
|
|
final_detections[i] = self.process_yolo(
|
|
object_detected[6], object_detected[5], object_detected[:4]
|
|
)
|
|
|
|
self.output_queue.put(final_detections)
|
|
|
|
def post_process_ssdlite(self, outputs):
|
|
dets = outputs[0].squeeze(0) # Shape: (1, num_dets, 5)
|
|
labels = outputs[1].squeeze(0)
|
|
|
|
detections = []
|
|
|
|
for i in range(dets.shape[0]):
|
|
x_min, y_min, x_max, y_max, confidence = dets[i]
|
|
class_id = int(labels[i]) # Convert label to integer
|
|
|
|
if confidence < 0.45:
|
|
continue # Skip detections below threshold
|
|
|
|
# Convert coordinates to integers
|
|
x_min, y_min, x_max, y_max = map(int, [x_min, y_min, x_max, y_max])
|
|
|
|
# Append valid detections [class_id, confidence, x, y, width, height]
|
|
detections.append([class_id, confidence, x_min, y_min, x_max, y_max])
|
|
|
|
final_detections = np.zeros((20, 6), np.float32)
|
|
|
|
if len(detections) == 0:
|
|
# logger.info("No detections found.")
|
|
self.output_queue.put(final_detections)
|
|
return
|
|
|
|
# Convert to NumPy array
|
|
detections = np.array(detections, dtype=np.float32)
|
|
|
|
# Apply Non-Maximum Suppression (NMS)
|
|
bboxes = detections[:, 2:6].tolist() # (x_min, y_min, width, height)
|
|
scores = detections[:, 1].tolist() # Confidence scores
|
|
|
|
indices = cv2.dnn.NMSBoxes(bboxes, scores, 0.45, 0.5)
|
|
|
|
if len(indices) > 0:
|
|
indices = indices.flatten()[:20] # Keep only the top 20 detections
|
|
selected_detections = detections[indices]
|
|
|
|
# Normalize coordinates AFTER NMS
|
|
for i, det in enumerate(selected_detections):
|
|
class_id, confidence, x_min, y_min, x_max, y_max = det
|
|
|
|
# Normalize coordinates
|
|
x_min /= self.memx_model_width
|
|
y_min /= self.memx_model_height
|
|
x_max /= self.memx_model_width
|
|
y_max /= self.memx_model_height
|
|
|
|
final_detections[i] = [class_id, confidence, y_min, x_min, y_max, x_max]
|
|
|
|
self.output_queue.put(final_detections)
|
|
|
|
def _generate_anchors(self, sizes=[80, 40, 20]):
|
|
"""Generate anchor points for YOLOv9 style processing"""
|
|
yscales = []
|
|
xscales = []
|
|
for s in sizes:
|
|
r = np.arange(s) + 0.5
|
|
yscales.append(np.repeat(r, s))
|
|
xscales.append(np.repeat(r[None, ...], s, axis=0).flatten())
|
|
|
|
yscales = np.concatenate(yscales)
|
|
xscales = np.concatenate(xscales)
|
|
anchors = np.stack([xscales, yscales], axis=1)
|
|
return anchors
|
|
|
|
def _generate_scales(self, sizes=[80, 40, 20]):
|
|
"""Generate scaling factors for each detection level"""
|
|
factors = [8, 16, 32]
|
|
s = np.concatenate([np.ones([int(s * s)]) * f for s, f in zip(sizes, factors)])
|
|
return s[:, None]
|
|
|
|
@staticmethod
|
|
def _softmax(x: np.ndarray, axis: int) -> np.ndarray:
|
|
"""Efficient softmax implementation"""
|
|
x = x - np.max(x, axis=axis, keepdims=True)
|
|
np.exp(x, out=x)
|
|
x /= np.sum(x, axis=axis, keepdims=True)
|
|
return x
|
|
|
|
def dfl(self, x: np.ndarray) -> np.ndarray:
|
|
"""Distribution Focal Loss decoding - YOLOv9 style"""
|
|
x = x.reshape(-1, 4, 16)
|
|
weights = np.arange(16, dtype=np.float32)
|
|
p = self._softmax(x, axis=2)
|
|
p = p * weights[None, None, :]
|
|
out = np.sum(p, axis=2, keepdims=False)
|
|
return out
|
|
|
|
def dist2bbox(
|
|
self, x: np.ndarray, anchors: np.ndarray, scales: np.ndarray
|
|
) -> np.ndarray:
|
|
"""Convert distances to bounding boxes - YOLOv9 style"""
|
|
lt = x[:, :2]
|
|
rb = x[:, 2:]
|
|
|
|
x1y1 = anchors - lt
|
|
x2y2 = anchors + rb
|
|
|
|
wh = x2y2 - x1y1
|
|
c_xy = (x1y1 + x2y2) / 2
|
|
|
|
out = np.concatenate([c_xy, wh], axis=1)
|
|
out = out * scales
|
|
return out
|
|
|
|
def post_process_yolo_optimized(self, outputs):
|
|
"""
|
|
Custom YOLOv9 post-processing optimized for MemryX ONNX outputs.
|
|
Implements DFL decoding, confidence filtering, and NMS in pure NumPy.
|
|
"""
|
|
# YOLOv9 outputs: 6 outputs (lbox, lcls, mbox, mcls, sbox, scls)
|
|
conv_out1, conv_out2, conv_out3, conv_out4, conv_out5, conv_out6 = outputs
|
|
|
|
# Determine grid sizes based on input resolution
|
|
# YOLOv9 uses 3 detection heads with strides [8, 16, 32]
|
|
# Grid sizes = input_size / stride
|
|
sizes = [
|
|
self.memx_model_height
|
|
// 8, # Large objects (e.g., 80 for 640x640, 40 for 320x320)
|
|
self.memx_model_height
|
|
// 16, # Medium objects (e.g., 40 for 640x640, 20 for 320x320)
|
|
self.memx_model_height
|
|
// 32, # Small objects (e.g., 20 for 640x640, 10 for 320x320)
|
|
]
|
|
|
|
# Generate anchors and scales if not already done
|
|
if not hasattr(self, "anchors"):
|
|
self.anchors = self._generate_anchors(sizes)
|
|
self.scales = self._generate_scales(sizes)
|
|
|
|
# Process outputs in YOLOv9 format: reshape and moveaxis for ONNX format
|
|
lbox = np.moveaxis(conv_out1, 1, -1) # Large boxes
|
|
lcls = np.moveaxis(conv_out2, 1, -1) # Large classes
|
|
mbox = np.moveaxis(conv_out3, 1, -1) # Medium boxes
|
|
mcls = np.moveaxis(conv_out4, 1, -1) # Medium classes
|
|
sbox = np.moveaxis(conv_out5, 1, -1) # Small boxes
|
|
scls = np.moveaxis(conv_out6, 1, -1) # Small classes
|
|
|
|
# Determine number of classes dynamically from the class output shape
|
|
# lcls shape should be (batch, height, width, num_classes)
|
|
num_classes = lcls.shape[-1]
|
|
|
|
# Validate that all class outputs have the same number of classes
|
|
if not (mcls.shape[-1] == num_classes and scls.shape[-1] == num_classes):
|
|
raise ValueError(
|
|
f"Class output shapes mismatch: lcls={lcls.shape}, mcls={mcls.shape}, scls={scls.shape}"
|
|
)
|
|
|
|
# Concatenate boxes and classes
|
|
boxes = np.concatenate(
|
|
[
|
|
lbox.reshape(-1, 64), # 64 is for 4 bbox coords * 16 DFL bins
|
|
mbox.reshape(-1, 64),
|
|
sbox.reshape(-1, 64),
|
|
],
|
|
axis=0,
|
|
)
|
|
|
|
classes = np.concatenate(
|
|
[
|
|
lcls.reshape(-1, num_classes),
|
|
mcls.reshape(-1, num_classes),
|
|
scls.reshape(-1, num_classes),
|
|
],
|
|
axis=0,
|
|
)
|
|
|
|
# Apply sigmoid to classes
|
|
classes = self.sigmoid(classes)
|
|
|
|
# Apply DFL to box predictions
|
|
boxes = self.dfl(boxes)
|
|
|
|
# YOLOv9 postprocessing with confidence filtering and NMS
|
|
confidence_thres = 0.4
|
|
iou_thres = 0.6
|
|
|
|
# Find the class with the highest score for each detection
|
|
max_scores = np.max(classes, axis=1) # Maximum class score for each detection
|
|
class_ids = np.argmax(classes, axis=1) # Index of the best class
|
|
|
|
# Filter out detections with scores below the confidence threshold
|
|
valid_indices = np.where(max_scores >= confidence_thres)[0]
|
|
if len(valid_indices) == 0:
|
|
# Return empty detections array
|
|
final_detections = np.zeros((20, 6), np.float32)
|
|
return final_detections
|
|
|
|
# Select only valid detections
|
|
valid_boxes = boxes[valid_indices]
|
|
valid_class_ids = class_ids[valid_indices]
|
|
valid_scores = max_scores[valid_indices]
|
|
|
|
# Convert distances to actual bounding boxes using anchors and scales
|
|
valid_boxes = self.dist2bbox(
|
|
valid_boxes, self.anchors[valid_indices], self.scales[valid_indices]
|
|
)
|
|
|
|
# Convert bounding box coordinates from (x_center, y_center, w, h) to (x_min, y_min, x_max, y_max)
|
|
x_center, y_center, width, height = (
|
|
valid_boxes[:, 0],
|
|
valid_boxes[:, 1],
|
|
valid_boxes[:, 2],
|
|
valid_boxes[:, 3],
|
|
)
|
|
x_min = x_center - width / 2
|
|
y_min = y_center - height / 2
|
|
x_max = x_center + width / 2
|
|
y_max = y_center + height / 2
|
|
|
|
# Convert to format expected by cv2.dnn.NMSBoxes: [x, y, width, height]
|
|
boxes_for_nms = []
|
|
scores_for_nms = []
|
|
|
|
for i in range(len(valid_indices)):
|
|
# Ensure coordinates are within bounds and positive
|
|
x_min_clipped = max(0, x_min[i])
|
|
y_min_clipped = max(0, y_min[i])
|
|
x_max_clipped = min(self.memx_model_width, x_max[i])
|
|
y_max_clipped = min(self.memx_model_height, y_max[i])
|
|
|
|
width_clipped = x_max_clipped - x_min_clipped
|
|
height_clipped = y_max_clipped - y_min_clipped
|
|
|
|
if width_clipped > 0 and height_clipped > 0:
|
|
boxes_for_nms.append(
|
|
[x_min_clipped, y_min_clipped, width_clipped, height_clipped]
|
|
)
|
|
scores_for_nms.append(float(valid_scores[i]))
|
|
|
|
final_detections = np.zeros((20, 6), np.float32)
|
|
|
|
if len(boxes_for_nms) == 0:
|
|
return final_detections
|
|
|
|
# Apply NMS using OpenCV
|
|
indices = cv2.dnn.NMSBoxes(
|
|
boxes_for_nms, scores_for_nms, confidence_thres, iou_thres
|
|
)
|
|
|
|
if len(indices) > 0:
|
|
# Flatten indices if they are returned as a list of arrays
|
|
if isinstance(indices[0], list) or isinstance(indices[0], np.ndarray):
|
|
indices = [i[0] for i in indices]
|
|
|
|
# Limit to top 20 detections
|
|
indices = indices[:20]
|
|
|
|
# Convert to Frigate format: [class_id, confidence, y_min, x_min, y_max, x_max] (normalized)
|
|
for i, idx in enumerate(indices):
|
|
class_id = valid_class_ids[idx]
|
|
confidence = valid_scores[idx]
|
|
|
|
# Get the box coordinates
|
|
box = boxes_for_nms[idx]
|
|
x_min_norm = box[0] / self.memx_model_width
|
|
y_min_norm = box[1] / self.memx_model_height
|
|
x_max_norm = (box[0] + box[2]) / self.memx_model_width
|
|
y_max_norm = (box[1] + box[3]) / self.memx_model_height
|
|
|
|
final_detections[i] = [
|
|
class_id,
|
|
confidence,
|
|
y_min_norm, # Frigate expects y_min first
|
|
x_min_norm,
|
|
y_max_norm,
|
|
x_max_norm,
|
|
]
|
|
|
|
return final_detections
|
|
|
|
def process_output(self, *outputs):
|
|
"""Output callback function -- receives frames from the MX3 and triggers post-processing"""
|
|
if self.memx_model_type == ModelTypeEnum.yologeneric:
|
|
# Use complete YOLOv9-style postprocessing (includes NMS)
|
|
final_detections = self.post_process_yolo_optimized(outputs)
|
|
|
|
self.output_queue.put(final_detections)
|
|
|
|
elif self.memx_model_type == ModelTypeEnum.yolonas:
|
|
return self.post_process_yolonas(outputs)
|
|
|
|
elif self.memx_model_type == ModelTypeEnum.yolox:
|
|
return self.post_process_yolox(outputs)
|
|
|
|
elif self.memx_model_type == ModelTypeEnum.ssd:
|
|
return self.post_process_ssdlite(outputs)
|
|
|
|
else:
|
|
raise Exception(
|
|
f"{self.memx_model_type} is currently not supported for memryx. See the docs for more info on supported models."
|
|
)
|
|
|
|
def detect_raw(self, tensor_input: np.ndarray):
|
|
"""Removed synchronous detect_raw() function so that we only use async"""
|
|
return 0
|