Use core mask for rknn

This commit is contained in:
Nicolas Mowen 2025-09-13 20:28:09 -06:00
parent 0b8ac5c6ee
commit 43412f6390
9 changed files with 329 additions and 318 deletions

View File

@ -1,80 +0,0 @@
"""Base runner implementation for ONNX models."""
from abc import ABC, abstractmethod
from typing import Any
from frigate.detectors.plugins.onnx import CudaGraphRunner
import onnxruntime as ort
from frigate.detectors.plugins.openvino import OpenVINOModelRunner
from frigate.detectors.plugins.rknn import RKNNModelRunner
from frigate.util.model import get_ort_providers
from frigate.util.rknn_converter import auto_convert_model, is_rknn_compatible
class BaseModelRunner(ABC):
"""Abstract base class for model runners."""
def __init__(self, model_path: str, device: str, **kwargs):
self.model_path = model_path
self.device = device
@abstractmethod
def get_input_names(self) -> list[str]:
"""Get input names for the model."""
pass
@abstractmethod
def get_input_width(self) -> int:
"""Get the input width of the model."""
pass
@abstractmethod
def run(self, input: dict[str, Any]) -> Any | None:
"""Run inference with the model."""
pass
class ONNXModelRunner(BaseModelRunner):
"""Run ONNX models using ONNX Runtime."""
def __init__(self, ort: ort.InferenceSession):
self.ort = ort
def get_input_names(self) -> list[str]:
return [input.name for input in self.ort.get_inputs()]
def get_input_width(self) -> int:
"""Get the input width of the model."""
return self.ort.get_inputs()[0].shape[3]
def run(self, input: dict[str, Any]) -> Any | None:
return self.ort.run(None, input)
def get_optimized_runner(model_path: str, device: str, complex_model: bool = True, **kwargs) -> BaseModelRunner:
"""Get an optimized runner for the hardware."""
if device == "CPU":
return ONNXModelRunner(model_path, device, **kwargs)
if is_rknn_compatible(model_path):
rknn_path = auto_convert_model(model_path)
if rknn_path:
return RKNNModelRunner(rknn_path)
providers, options = get_ort_providers(device == "CPU", device, **kwargs)
if "OpenVINOExecutionProvider" in providers:
return OpenVINOModelRunner(model_path, device, **kwargs)
ort = ort.InferenceSession(
model_path,
providers=providers,
provider_options=options,
)
if not complex_model and providers[0] == "CUDAExecutionProvider":
return CudaGraphRunner(ort, options[0]["device_id"])
return ONNXModelRunner(model_path, device, **kwargs)

View File

@ -0,0 +1,320 @@
"""Base runner implementation for ONNX models."""
import logging
import os
from abc import ABC, abstractmethod
from typing import Any
import numpy as np
import onnxruntime as ort
from frigate.util.model import get_ort_providers
from frigate.util.rknn_converter import auto_convert_model, is_rknn_compatible
logger = logging.getLogger(__name__)
# Import OpenVINO only when needed to avoid circular dependencies
try:
import openvino as ov
except ImportError:
ov = None
class BaseModelRunner(ABC):
"""Abstract base class for model runners."""
def __init__(self, model_path: str, device: str, **kwargs):
self.model_path = model_path
self.device = device
@abstractmethod
def get_input_names(self) -> list[str]:
"""Get input names for the model."""
pass
@abstractmethod
def get_input_width(self) -> int:
"""Get the input width of the model."""
pass
@abstractmethod
def run(self, input: dict[str, Any]) -> Any | None:
"""Run inference with the model."""
pass
class ONNXModelRunner(BaseModelRunner):
"""Run ONNX models using ONNX Runtime."""
def __init__(self, ort: ort.InferenceSession):
self.ort = ort
def get_input_names(self) -> list[str]:
return [input.name for input in self.ort.get_inputs()]
def get_input_width(self) -> int:
"""Get the input width of the model."""
return self.ort.get_inputs()[0].shape[3]
def run(self, input: dict[str, Any]) -> Any | None:
return self.ort.run(None, input)
class CudaGraphRunner(BaseModelRunner):
"""Encapsulates CUDA Graph capture and replay using ONNX Runtime IOBinding.
This runner assumes a single tensor input and binds all model outputs.
NOTE: CUDA Graphs limit supported model operations, so they are not usable
for more complex models like CLIP or PaddleOCR.
"""
def __init__(self, session: ort.InferenceSession, cuda_device_id: int):
self._session = session
self._cuda_device_id = cuda_device_id
self._captured = False
self._io_binding: ort.IOBinding | None = None
self._input_name: str | None = None
self._output_names: list[str] | None = None
self._input_ortvalue: ort.OrtValue | None = None
self._output_ortvalues: ort.OrtValue | None = None
def get_input_names(self) -> list[str]:
"""Get input names for the model."""
return [input.name for input in self._session.get_inputs()]
def get_input_width(self) -> int:
"""Get the input width of the model."""
return self._session.get_inputs()[0].shape[3]
def run(self, input_name: str, tensor_input: np.ndarray):
tensor_input = np.ascontiguousarray(tensor_input)
if not self._captured:
# Prepare IOBinding with CUDA buffers and let ORT allocate outputs on device
self._io_binding = self._session.io_binding()
self._input_name = input_name
self._output_names = [o.name for o in self._session.get_outputs()]
self._input_ortvalue = ort.OrtValue.ortvalue_from_numpy(
tensor_input, "cuda", self._cuda_device_id
)
self._io_binding.bind_ortvalue_input(self._input_name, self._input_ortvalue)
for name in self._output_names:
# Bind outputs to CUDA and allow ORT to allocate appropriately
self._io_binding.bind_output(name, "cuda", self._cuda_device_id)
# First IOBinding run to allocate, execute, and capture CUDA Graph
ro = ort.RunOptions()
self._session.run_with_iobinding(self._io_binding, ro)
self._captured = True
return self._io_binding.copy_outputs_to_cpu()
# Replay using updated input, copy results to CPU
self._input_ortvalue.update_inplace(tensor_input)
ro = ort.RunOptions()
self._session.run_with_iobinding(self._io_binding, ro)
return self._io_binding.copy_outputs_to_cpu()
class OpenVINOModelRunner(BaseModelRunner):
"""OpenVINO model runner that handles inference efficiently."""
def __init__(self, model_path: str, device: str, **kwargs):
self.model_path = model_path
self.device = device
if not os.path.isfile(model_path):
raise FileNotFoundError(f"OpenVINO model file {model_path} not found.")
if ov is None:
raise ImportError(
"OpenVINO is not available. Please install openvino package."
)
self.ov_core = ov.Core()
# Apply performance optimization
self.ov_core.set_property(device, {"PERF_COUNT": "NO"})
# Compile model
self.compiled_model = self.ov_core.compile_model(
model=model_path, device_name=device
)
# Create reusable inference request
self.infer_request = self.compiled_model.create_infer_request()
input_shape = self.compiled_model.inputs[0].get_shape()
self.input_tensor = ov.Tensor(ov.Type.f32, input_shape)
def get_input_names(self) -> list[str]:
"""Get input names for the model."""
return [input.get_any_name() for input in self.compiled_model.inputs]
def get_input_width(self) -> int:
"""Get the input width of the model."""
input_shape = self.compiled_model.inputs[0].get_shape()
# Assuming NCHW format, width is the last dimension
return int(input_shape[-1])
def run(self, input_data: np.ndarray) -> list[np.ndarray]:
"""Run inference with the model.
Args:
input_data: Input tensor data
Returns:
List of output tensors
"""
# Copy input data to pre-allocated tensor
np.copyto(self.input_tensor.data, input_data)
# Run inference
self.infer_request.infer(self.input_tensor)
# Get all output tensors
outputs = []
for i in range(len(self.compiled_model.outputs)):
outputs.append(self.infer_request.get_output_tensor(i).data)
return outputs
class RKNNModelRunner(BaseModelRunner):
"""Run RKNN models for embeddings."""
def __init__(self, model_path: str, model_type: str = None, core_mask: int = 0):
self.model_path = model_path
self.model_type = model_type
self.core_mask = core_mask
self.rknn = None
self._load_model()
def _load_model(self):
"""Load the RKNN model."""
try:
from rknnlite.api import RKNNLite
self.rknn = RKNNLite(verbose=False)
if self.rknn.load_rknn(self.model_path) != 0:
logger.error(f"Failed to load RKNN model: {self.model_path}")
raise RuntimeError("Failed to load RKNN model")
if self.rknn.init_runtime(core_mask=self.core_mask) != 0:
logger.error("Failed to initialize RKNN runtime")
raise RuntimeError("Failed to initialize RKNN runtime")
logger.info(f"Successfully loaded RKNN model: {self.model_path}")
except ImportError:
logger.error("RKNN Lite not available")
raise ImportError("RKNN Lite not available")
except Exception as e:
logger.error(f"Error loading RKNN model: {e}")
raise
def get_input_names(self) -> list[str]:
"""Get input names for the model."""
# For detection models, we typically use "input" as the default input name
# For CLIP models, we need to determine the model type from the path
model_name = os.path.basename(self.model_path).lower()
if "vision" in model_name:
return ["pixel_values"]
elif "arcface" in model_name:
return ["data"]
else:
# Default fallback - try to infer from model type
if self.model_type and "jina-clip" in self.model_type:
if "vision" in self.model_type:
return ["pixel_values"]
# Generic fallback
return ["input"]
def get_input_width(self) -> int:
"""Get the input width of the model."""
# For CLIP vision models, this is typically 224
model_name = os.path.basename(self.model_path).lower()
if "vision" in model_name:
return 224 # CLIP V1 uses 224x224
elif "arcface" in model_name:
return 112
# For detection models, we can't easily determine this from the RKNN model
# The calling code should provide this information
return -1
def run(self, inputs: dict[str, Any]) -> Any:
"""Run inference with the RKNN model."""
if not self.rknn:
raise RuntimeError("RKNN model not loaded")
try:
input_names = self.get_input_names()
rknn_inputs = []
for name in input_names:
if name in inputs:
if name == "pixel_values":
# RKNN expects NHWC format, but ONNX typically provides NCHW
# Transpose from [batch, channels, height, width] to [batch, height, width, channels]
pixel_data = inputs[name]
if len(pixel_data.shape) == 4 and pixel_data.shape[1] == 3:
# Transpose from NCHW to NHWC
pixel_data = np.transpose(pixel_data, (0, 2, 3, 1))
rknn_inputs.append(pixel_data)
else:
rknn_inputs.append(inputs[name])
outputs = self.rknn.inference(inputs=rknn_inputs)
return outputs
except Exception as e:
logger.error(f"Error during RKNN inference: {e}")
raise
def __del__(self):
"""Cleanup when the runner is destroyed."""
if self.rknn:
try:
self.rknn.release()
except Exception:
pass
def get_optimized_runner(
model_path: str, device: str, complex_model: bool = True, **kwargs
) -> BaseModelRunner:
"""Get an optimized runner for the hardware."""
if is_rknn_compatible(model_path):
rknn_path = auto_convert_model(model_path)
if rknn_path:
return RKNNModelRunner(rknn_path)
providers, options = get_ort_providers(device == "CPU", device, **kwargs)
if device == "CPU":
return ONNXModelRunner(
ort.InferenceSession(
model_path,
providers=providers,
provider_options=options,
)
)
if "OpenVINOExecutionProvider" in providers:
return OpenVINOModelRunner(model_path, device, **kwargs)
ortSession = ort.InferenceSession(
model_path,
providers=providers,
provider_options=options,
)
if not complex_model and providers[0] == "CUDAExecutionProvider":
return CudaGraphRunner(ortSession, options[0]["device_id"])
return ONNXModelRunner(ortSession)

View File

@ -5,8 +5,8 @@ import onnxruntime as ort
from pydantic import Field
from typing_extensions import Literal
from frigate.detectors.base_runner import BaseModelRunner
from frigate.detectors.detection_api import DetectionApi
from frigate.detectors.detection_runners import CudaGraphRunner
from frigate.detectors.detector_config import (
BaseDetectorConfig,
ModelTypeEnum,
@ -24,64 +24,6 @@ logger = logging.getLogger(__name__)
DETECTOR_KEY = "onnx"
class CudaGraphRunner(BaseModelRunner):
"""Encapsulates CUDA Graph capture and replay using ONNX Runtime IOBinding.
This runner assumes a single tensor input and binds all model outputs.
NOTE: CUDA Graphs limit supported model operations, so they are not usable
for more complex models like CLIP or PaddleOCR.
"""
def __init__(self, session: ort.InferenceSession, cuda_device_id: int):
self._session = session
self._cuda_device_id = cuda_device_id
self._captured = False
self._io_binding: ort.IOBinding | None = None
self._input_name: str | None = None
self._output_names: list[str] | None = None
self._input_ortvalue: ort.OrtValue | None = None
self._output_ortvalues: ort.OrtValue | None = None
def get_input_names(self) -> list[str]:
"""Get input names for the model."""
return [input.name for input in self._session.get_inputs()]
def get_input_width(self) -> int:
"""Get the input width of the model."""
return self._session.get_inputs()[0].shape[3]
def run(self, input_name: str, tensor_input: np.ndarray):
tensor_input = np.ascontiguousarray(tensor_input)
if not self._captured:
# Prepare IOBinding with CUDA buffers and let ORT allocate outputs on device
self._io_binding = self._session.io_binding()
self._input_name = input_name
self._output_names = [o.name for o in self._session.get_outputs()]
self._input_ortvalue = ort.OrtValue.ortvalue_from_numpy(
tensor_input, "cuda", self._cuda_device_id
)
self._io_binding.bind_ortvalue_input(self._input_name, self._input_ortvalue)
for name in self._output_names:
# Bind outputs to CUDA and allow ORT to allocate appropriately
self._io_binding.bind_output(name, "cuda", self._cuda_device_id)
# First IOBinding run to allocate, execute, and capture CUDA Graph
ro = ort.RunOptions()
self._session.run_with_iobinding(self._io_binding, ro)
self._captured = True
return self._io_binding.copy_outputs_to_cpu()
# Replay using updated input, copy results to CPU
self._input_ortvalue.update_inplace(tensor_input)
ro = ort.RunOptions()
self._session.run_with_iobinding(self._io_binding, ro)
return self._io_binding.copy_outputs_to_cpu()
class ONNXDetectorConfig(BaseDetectorConfig):
type: Literal[DETECTOR_KEY]
device: str = Field(default="AUTO", title="Device Type")

View File

@ -1,5 +1,4 @@
import logging
import os
import numpy as np
import openvino as ov
@ -7,6 +6,7 @@ from pydantic import Field
from typing_extensions import Literal
from frigate.detectors.detection_api import DetectionApi
from frigate.detectors.detection_runners import OpenVINOModelRunner
from frigate.detectors.detector_config import BaseDetectorConfig, ModelTypeEnum
from frigate.util.model import (
post_process_dfine,
@ -24,75 +24,6 @@ class OvDetectorConfig(BaseDetectorConfig):
device: str = Field(default=None, title="Device Type")
"""OpenVINO model runner implementation."""
import logging
import os
import numpy as np
import openvino as ov
logger = logging.getLogger(__name__)
class OpenVINOModelRunner:
"""OpenVINO model runner that handles inference efficiently."""
def __init__(self, model_path: str, device: str, **kwargs):
self.model_path = model_path
self.device = device
if not os.path.isfile(model_path):
raise FileNotFoundError(f"OpenVINO model file {model_path} not found.")
self.ov_core = ov.Core()
# Apply performance optimization
self.ov_core.set_property(device, {"PERF_COUNT": "NO"})
# Compile model
self.compiled_model = self.ov_core.compile_model(
model=model_path, device_name=device
)
# Create reusable inference request
self.infer_request = self.compiled_model.create_infer_request()
input_shape = self.compiled_model.inputs[0].get_shape()
self.input_tensor = ov.Tensor(ov.Type.f32, input_shape)
def get_input_names(self) -> list[str]:
"""Get input names for the model."""
return [input.get_any_name() for input in self.compiled_model.inputs]
def get_input_width(self) -> int:
"""Get the input width of the model."""
input_shape = self.compiled_model.inputs[0].get_shape()
# Assuming NCHW format, width is the last dimension
return int(input_shape[-1])
def run(self, input_data: np.ndarray) -> list[np.ndarray]:
"""Run inference with the model.
Args:
input_data: Input tensor data
Returns:
List of output tensors
"""
# Copy input data to pre-allocated tensor
np.copyto(self.input_tensor.data, input_data)
# Run inference
self.infer_request.infer(self.input_tensor)
# Get all output tensors
outputs = []
for i in range(len(self.compiled_model.outputs)):
outputs.append(self.infer_request.get_output_tensor(i).data)
return outputs
class OvDetector(DetectionApi):
type_key = DETECTOR_KEY
supported_models = [

View File

@ -2,15 +2,15 @@ import logging
import os.path
import re
import urllib.request
from typing import Any, Literal
from typing import Literal
import cv2
import numpy as np
from pydantic import Field
from frigate.const import MODEL_CACHE_DIR
from frigate.detectors.base_runner import BaseModelRunner
from frigate.detectors.detection_api import DetectionApi
from frigate.detectors.detection_runners import RKNNModelRunner
from frigate.detectors.detector_config import BaseDetectorConfig, ModelTypeEnum
from frigate.util.model import post_process_yolo
from frigate.util.rknn_converter import auto_convert_model
@ -35,108 +35,6 @@ class RknnDetectorConfig(BaseDetectorConfig):
num_cores: int = Field(default=0, ge=0, le=3, title="Number of NPU cores to use.")
class RKNNModelRunner(BaseModelRunner):
"""Run RKNN models for embeddings."""
def __init__(self, model_path: str, model_type: str = None):
self.model_path = model_path
self.model_type = model_type
self.rknn = None
self._load_model()
def _load_model(self):
"""Load the RKNN model."""
try:
from rknnlite.api import RKNNLite
self.rknn = RKNNLite(verbose=False)
if self.rknn.load_rknn(self.model_path) != 0:
logger.error(f"Failed to load RKNN model: {self.model_path}")
raise RuntimeError("Failed to load RKNN model")
if self.rknn.init_runtime() != 0:
logger.error("Failed to initialize RKNN runtime")
raise RuntimeError("Failed to initialize RKNN runtime")
logger.info(f"Successfully loaded RKNN model: {self.model_path}")
except ImportError:
logger.error("RKNN Lite not available")
raise ImportError("RKNN Lite not available")
except Exception as e:
logger.error(f"Error loading RKNN model: {e}")
raise
def get_input_names(self) -> list[str]:
"""Get input names for the model."""
# For detection models, we typically use "input" as the default input name
# For CLIP models, we need to determine the model type from the path
model_name = os.path.basename(self.model_path).lower()
if "vision" in model_name:
return ["pixel_values"]
elif "arcface" in model_name:
return ["data"]
else:
# Default fallback - try to infer from model type
if self.model_type and "jina-clip" in self.model_type:
if "vision" in self.model_type:
return ["pixel_values"]
# Generic fallback
return ["input"]
def get_input_width(self) -> int:
"""Get the input width of the model."""
# For CLIP vision models, this is typically 224
model_name = os.path.basename(self.model_path).lower()
if "vision" in model_name:
return 224 # CLIP V1 uses 224x224
elif "arcface" in model_name:
return 112
# For detection models, we can't easily determine this from the RKNN model
# The calling code should provide this information
return -1
def run(self, inputs: dict[str, Any]) -> Any:
"""Run inference with the RKNN model."""
if not self.rknn:
raise RuntimeError("RKNN model not loaded")
try:
input_names = self.get_input_names()
rknn_inputs = []
for name in input_names:
if name in inputs:
if name == "pixel_values":
# RKNN expects NHWC format, but ONNX typically provides NCHW
# Transpose from [batch, channels, height, width] to [batch, height, width, channels]
pixel_data = inputs[name]
if len(pixel_data.shape) == 4 and pixel_data.shape[1] == 3:
# Transpose from NCHW to NHWC
pixel_data = np.transpose(pixel_data, (0, 2, 3, 1))
rknn_inputs.append(pixel_data)
else:
rknn_inputs.append(inputs[name])
outputs = self.rknn.inference(inputs=rknn_inputs)
return outputs
except Exception as e:
logger.error(f"Error during RKNN inference: {e}")
raise
def __del__(self):
"""Cleanup when the runner is destroyed."""
if self.rknn:
try:
self.rknn.release()
except Exception:
pass
class Rknn(DetectionApi):
type_key = DETECTOR_KEY
@ -164,12 +62,12 @@ class Rknn(DetectionApi):
"For more information, see: https://docs.deci.ai/super-gradients/latest/LICENSE.YOLONAS.html"
)
# Initialize the RKNN model runner
self.runner = RKNNModelRunner(
model_path=model_props["path"],
model_type=config.model.model_type.value
if config.model.model_type
else None,
core_mask=core_mask,
)
def __del__(self):

View File

@ -6,7 +6,7 @@ import os
import numpy as np
from frigate.const import MODEL_CACHE_DIR
from frigate.detectors.base_runner import get_optimized_runner
from frigate.detectors.detection_runners import get_optimized_runner
from frigate.log import redirect_output_to_logger
from frigate.util.downloader import ModelDownloader

View File

@ -7,7 +7,7 @@ import warnings
# importing this without pytorch or others causes a warning
# https://github.com/huggingface/transformers/issues/27214
# suppressed by setting env TRANSFORMERS_NO_ADVISORY_WARNINGS=1
from frigate.detectors.base_runner import BaseModelRunner, get_optimized_runner
from frigate.detectors.detection_runners import BaseModelRunner, get_optimized_runner
from transformers import AutoFeatureExtractor, AutoTokenizer
from transformers.utils.logging import disable_progress_bar

View File

@ -6,7 +6,7 @@ import os
import numpy as np
from PIL import Image
from frigate.detectors.base_runner import get_optimized_runner
from frigate.detectors.detection_runners import get_optimized_runner
from transformers import AutoTokenizer
from transformers.utils.logging import disable_progress_bar, set_verbosity_error

View File

@ -7,7 +7,7 @@ import numpy as np
from frigate.comms.inter_process import InterProcessRequestor
from frigate.const import MODEL_CACHE_DIR
from frigate.detectors.base_runner import BaseModelRunner, get_optimized_runner
from frigate.detectors.detection_runners import BaseModelRunner, get_optimized_runner
from frigate.types import ModelStatusTypesEnum
from frigate.util.downloader import ModelDownloader