mirror of
https://github.com/blakeblackshear/frigate.git
synced 2026-02-03 17:55:21 +03:00
Initial commit to enable Yolox models with OpenVINO in Frigate
This commit is contained in:
parent
116edce3dc
commit
2d8ca064fe
@ -22,6 +22,9 @@ class InputTensorEnum(str, Enum):
|
|||||||
nchw = "nchw"
|
nchw = "nchw"
|
||||||
nhwc = "nhwc"
|
nhwc = "nhwc"
|
||||||
|
|
||||||
|
class ModelTypeEnum(str, Enum):
|
||||||
|
ssd = "ssd"
|
||||||
|
yolox = "yolox"
|
||||||
|
|
||||||
class ModelConfig(BaseModel):
|
class ModelConfig(BaseModel):
|
||||||
path: Optional[str] = Field(title="Custom Object detection model path.")
|
path: Optional[str] = Field(title="Custom Object detection model path.")
|
||||||
@ -37,6 +40,9 @@ class ModelConfig(BaseModel):
|
|||||||
input_pixel_format: PixelFormatEnum = Field(
|
input_pixel_format: PixelFormatEnum = Field(
|
||||||
default=PixelFormatEnum.rgb, title="Model Input Pixel Color Format"
|
default=PixelFormatEnum.rgb, title="Model Input Pixel Color Format"
|
||||||
)
|
)
|
||||||
|
model_type: ModelTypeEnum = Field(
|
||||||
|
default=ModelTypeEnum.ssd, title="Object Detection Model Type"
|
||||||
|
)
|
||||||
_merged_labelmap: Optional[Dict[int, str]] = PrivateAttr()
|
_merged_labelmap: Optional[Dict[int, str]] = PrivateAttr()
|
||||||
_colormap: Dict[int, Tuple[int, int, int]] = PrivateAttr()
|
_colormap: Dict[int, Tuple[int, int, int]] = PrivateAttr()
|
||||||
|
|
||||||
|
|||||||
@ -4,6 +4,7 @@ import openvino.runtime as ov
|
|||||||
|
|
||||||
from frigate.detectors.detection_api import DetectionApi
|
from frigate.detectors.detection_api import DetectionApi
|
||||||
from frigate.detectors.detector_config import BaseDetectorConfig
|
from frigate.detectors.detector_config import BaseDetectorConfig
|
||||||
|
from frigate.config import ModelTypeEnum
|
||||||
from typing import Literal
|
from typing import Literal
|
||||||
from pydantic import Extra, Field
|
from pydantic import Extra, Field
|
||||||
|
|
||||||
@ -24,6 +25,14 @@ class OvDetector(DetectionApi):
|
|||||||
def __init__(self, detector_config: OvDetectorConfig):
|
def __init__(self, detector_config: OvDetectorConfig):
|
||||||
self.ov_core = ov.Core()
|
self.ov_core = ov.Core()
|
||||||
self.ov_model = self.ov_core.read_model(detector_config.model.path)
|
self.ov_model = self.ov_core.read_model(detector_config.model.path)
|
||||||
|
self.ov_model_type = detector_config.model.model_type
|
||||||
|
|
||||||
|
self.num_classes = 80 # TODO
|
||||||
|
self.h = detector_config.model.height # 416
|
||||||
|
self.w = detector_config.model.width # 416
|
||||||
|
logger.info(self.ov_model_type)
|
||||||
|
if(self.ov_model_type == ModelTypeEnum.yolox):
|
||||||
|
self.set_strides_grids()
|
||||||
|
|
||||||
self.interpreter = self.ov_core.compile_model(
|
self.interpreter = self.ov_core.compile_model(
|
||||||
model=self.ov_model, device_name=detector_config.device
|
model=self.ov_model, device_name=detector_config.device
|
||||||
@ -39,28 +48,86 @@ class OvDetector(DetectionApi):
|
|||||||
logger.info(f"Model has {self.output_indexes} Output Tensors")
|
logger.info(f"Model has {self.output_indexes} Output Tensors")
|
||||||
break
|
break
|
||||||
|
|
||||||
def detect_raw(self, tensor_input):
|
def set_strides_grids(self):
|
||||||
|
grids = []
|
||||||
|
expanded_strides = []
|
||||||
|
|
||||||
|
strides = [8, 16, 32]
|
||||||
|
|
||||||
|
hsizes = [self.h // stride for stride in strides]
|
||||||
|
wsizes = [self.w // stride for stride in strides]
|
||||||
|
|
||||||
|
for hsize, wsize, stride in zip(hsizes, wsizes, strides):
|
||||||
|
xv, yv = np.meshgrid(np.arange(wsize), np.arange(hsize))
|
||||||
|
grid = np.stack((xv, yv), 2).reshape(1, -1, 2)
|
||||||
|
grids.append(grid)
|
||||||
|
shape = grid.shape[:2]
|
||||||
|
expanded_strides.append(np.full((*shape, 1), stride))
|
||||||
|
|
||||||
|
self.grids = np.concatenate(grids, 1)
|
||||||
|
self.expanded_strides = np.concatenate(expanded_strides, 1)
|
||||||
|
|
||||||
|
def detect_raw(self, tensor_input):
|
||||||
infer_request = self.interpreter.create_infer_request()
|
infer_request = self.interpreter.create_infer_request()
|
||||||
infer_request.infer([tensor_input])
|
infer_request.infer([tensor_input])
|
||||||
|
|
||||||
results = infer_request.get_output_tensor()
|
if(self.ov_model_type == ModelTypeEnum.ssd):
|
||||||
|
results = infer_request.get_output_tensor()
|
||||||
|
|
||||||
detections = np.zeros((20, 6), np.float32)
|
detections = np.zeros((20, 6), np.float32)
|
||||||
i = 0
|
i = 0
|
||||||
for object_detected in results.data[0, 0, :]:
|
for object_detected in results.data[0, 0, :]:
|
||||||
if object_detected[0] != -1:
|
if object_detected[0] != -1:
|
||||||
logger.debug(object_detected)
|
logger.debug(object_detected)
|
||||||
if object_detected[2] < 0.1 or i == 20:
|
if object_detected[2] < 0.1 or i == 20:
|
||||||
break
|
break
|
||||||
detections[i] = [
|
detections[i] = [
|
||||||
object_detected[1], # Label ID
|
object_detected[1], # Label ID
|
||||||
float(object_detected[2]), # Confidence
|
float(object_detected[2]), # Confidence
|
||||||
object_detected[4], # y_min
|
object_detected[4], # y_min
|
||||||
object_detected[3], # x_min
|
object_detected[3], # x_min
|
||||||
object_detected[6], # y_max
|
object_detected[6], # y_max
|
||||||
object_detected[5], # x_max
|
object_detected[5], # x_max
|
||||||
]
|
]
|
||||||
i += 1
|
i += 1
|
||||||
|
return detections
|
||||||
|
|
||||||
return detections
|
elif(self.ov_model_type == ModelTypeEnum.yolox):
|
||||||
|
out_tensor = infer_request.get_output_tensor()
|
||||||
|
results = out_tensor.data
|
||||||
|
results[..., :2] = (results[..., :2] + self.grids) * self.expanded_strides
|
||||||
|
results[..., 2:4] = np.exp(results[..., 2:4]) * self.expanded_strides
|
||||||
|
image_pred = results[0, ...]
|
||||||
|
|
||||||
|
class_conf = np.max(image_pred[:, 5:5+self.num_classes], axis=1, keepdims=True)
|
||||||
|
class_pred = np.argmax(image_pred[: , 5:5+self.num_classes], axis=1)
|
||||||
|
class_pred = np.expand_dims(class_pred, axis=1)
|
||||||
|
|
||||||
|
conf_mask = (image_pred[:, 4] * class_conf.squeeze() >= 0.3).squeeze()
|
||||||
|
# Detections ordered as (x1, y1, x2, y2, obj_conf, class_conf, class_pred)
|
||||||
|
dets = np.concatenate((image_pred[:, :5], class_conf, class_pred), axis=1)
|
||||||
|
dets = dets[conf_mask]
|
||||||
|
|
||||||
|
ordered = dets[dets[:, 5].argsort()[::-1]][:20]
|
||||||
|
|
||||||
|
detections = np.zeros((20, 6), np.float32)
|
||||||
|
i = 0
|
||||||
|
|
||||||
|
for object_detected in ordered:
|
||||||
|
if i < 20:
|
||||||
|
# [x, y, h, w, box_score, class_no_1, ..., class_no_80],
|
||||||
|
detections[i] = [
|
||||||
|
object_detected[6], # Label ID
|
||||||
|
object_detected[5], # Confidence
|
||||||
|
(object_detected[1]-(object_detected[3]/2))/self.h, # y_min
|
||||||
|
(object_detected[0]-(object_detected[2]/2))/self.w, # x_min
|
||||||
|
(object_detected[1]+(object_detected[3]/2))/self.h, # y_max
|
||||||
|
(object_detected[0]+(object_detected[2]/2))/self.w, # x_max
|
||||||
|
]
|
||||||
|
#logger.info(object_detected)
|
||||||
|
#logger.info(detections[i])
|
||||||
|
i += 1
|
||||||
|
else:
|
||||||
|
break
|
||||||
|
|
||||||
|
return detections
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user