Fix for multi stream async infernce

This commit is contained in:
OmriAx 2025-03-06 07:47:38 +02:00
parent b8a20f5388
commit 469f9c0a83
2 changed files with 96 additions and 125 deletions

View File

@ -94,6 +94,12 @@ With the [rocm](../configuration/object_detectors.md#amdrocm-gpu-detector) detec
### Hailo-8
| Name | Hailo8 Inference Time | Hailo8L Inference Time |
| --------------- | ---------------------- | ----------------------- |
| ssd mobilenet v1| ~ 6 ms | ~ 10 ms |
| yolov6n | ~ 7 ms | ~ 11 ms |
Frigate supports both the Hailo-8 and Hailo-8L AI Acceleration Modules on compatible hardware platforms—including the Raspberry Pi 5 with the PCIe hat from the AI kit. The Hailo detector integration in Frigate automatically identifies your hardware type and selects the appropriate default model when a custom model isnt provided.
**Default Model Configuration:**

View File

@ -34,69 +34,54 @@ from PIL import Image, ImageDraw, ImageFont
logger = logging.getLogger(__name__)
# ----------------- ResponseStore Class ----------------- #
class ResponseStore:
"""
A thread-safe hash-based response store that maps request IDs
to their results. Threads can wait on the condition variable until
their request's result appears.
"""
def __init__(self):
self.responses = {} # Maps request_id -> (original_input, infer_results)
self.lock = threading.Lock()
self.cond = threading.Condition(self.lock)
# ----------------- Inline Utility Functions ----------------- #
def put(self, request_id, response):
with self.cond:
self.responses[request_id] = response
self.cond.notify_all()
def get(self, request_id, timeout=None):
with self.cond:
if not self.cond.wait_for(lambda: request_id in self.responses, timeout=timeout):
raise TimeoutError(f"Timeout waiting for response {request_id}")
return self.responses.pop(request_id)
# ----------------- Utility Functions ----------------- #
def preprocess_tensor(image: np.ndarray, model_w: int, model_h: int) -> np.ndarray:
"""
Resize a NumPy array image with unchanged aspect ratio using padding.
Optimized for the case where the image is 320x320 and the target is 640x640.
Assumes the input image is of shape (H, W, 3).
Resize an image with unchanged aspect ratio using padding.
Assumes input image shape is (H, W, 3).
"""
# Remove batch dimension if present (assumes batch size of 1)
if image.ndim == 4 and image.shape[0] == 1:
image = image[0]
h, w = image.shape[:2]
# Fast path: if image is 320x320 and target is 640x640, simply double the size quickly.
if (w, h) == (320, 320) and (model_w, model_h) == (640, 640):
return cv2.resize(image, (model_w, model_h), interpolation=cv2.INTER_LINEAR)
# Standard processing: calculate scaling factor to maintain aspect ratio.
scale = min(model_w / w, model_h / h)
new_w, new_h = int(w * scale), int(h * scale)
# Resize with high-quality bicubic interpolation
resized_image = cv2.resize(image, (new_w, new_h), interpolation=cv2.INTER_CUBIC)
# Create a new image with the target size filled with the padding color 114
padded_image = np.full((model_h, model_w, 3), 114, dtype=image.dtype)
# Calculate the center position for the resized image
x_offset = (model_w - new_w) // 2
y_offset = (model_h - new_h) // 2
padded_image[y_offset:y_offset+new_h, x_offset:x_offset+new_w] = resized_image
return padded_image
def extract_detections(input_data: list, threshold: float = 0.5) -> dict:
"""
(Legacy extraction function; not used by detect_raw below.)
Extract detections from raw inference output.
"""
boxes, scores, classes = [], [], []
num_detections = 0
for i, detection in enumerate(input_data):
if len(detection) == 0:
continue
for det in detection:
bbox, score = det[:4], det[4]
if score >= threshold:
boxes.append(bbox)
scores.append(score)
classes.append(i)
num_detections += 1
return {
'detection_boxes': boxes,
'detection_classes': classes,
'detection_scores': scores,
'num_detections': num_detections
}
# ----------------- End of Utility Functions ----------------- #
# Global constants and default URLs
# ----------------- Global Constants ----------------- #
DETECTOR_KEY = "hailo8l"
ARCH = None
H8_DEFAULT_MODEL = "yolov6n.hef"
@ -116,32 +101,30 @@ def detect_hailo_arch():
return "hailo8l"
elif "HAILO8" in line:
return "hailo8"
logger.error(f"Inference error: Could not determine Hailo architecture from device information.")
logger.error("Inference error: Could not determine Hailo architecture.")
return None
except Exception as e:
logger.error(f"Inference error: {e}")
return None
# ----------------- Inline Asynchronous Inference Class ----------------- #
# ----------------- HailoAsyncInference Class ----------------- #
class HailoAsyncInference:
def __init__(
self,
hef_path: str,
input_queue: queue.Queue,
output_queue: queue.Queue,
output_store: ResponseStore,
batch_size: int = 1,
input_type: Optional[str] = None,
output_type: Optional[Dict[str, str]] = None,
send_original_frame: bool = False,
) -> None:
self.input_queue = input_queue
self.output_queue = output_queue
self.output_store = output_store
# Create VDevice parameters with round-robin scheduling
params = VDevice.create_params()
params.scheduling_algorithm = HailoSchedulingAlgorithm.ROUND_ROBIN
# Load HEF and create the infer model
self.hef = HEF(hef_path)
self.target = VDevice(params)
self.infer_model = self.target.create_infer_model(hef_path)
@ -160,7 +143,7 @@ class HailoAsyncInference:
for output_name, output_type in output_type_dict.items():
self.infer_model.output(output_name).set_format_type(getattr(FormatType, output_type))
def callback(self, completion_info, bindings_list: List, input_batch: List):
def callback(self, completion_info, bindings_list: List, input_batch: List, request_ids: List[int]):
if completion_info.exception:
logger.error(f"Inference error: {completion_info.exception}")
else:
@ -172,7 +155,7 @@ class HailoAsyncInference:
name: np.expand_dims(bindings.output(name).get_buffer(), axis=0)
for name in bindings._output_names
}
self.output_queue.put((input_batch[i], result))
self.output_store.put(request_ids[i], (input_batch[i], result))
def _create_bindings(self, configured_infer_model) -> object:
if self.output_type is None:
@ -197,16 +180,16 @@ class HailoAsyncInference:
return self.hef.get_input_vstream_infos()[0].shape
def run(self) -> None:
# Configure the infer model once and reuse vstream settings via run_async
with self.infer_model.configure() as configured_infer_model:
while True:
batch_data = self.input_queue.get()
if batch_data is None:
break # Sentinel to exit loop
if self.send_original_frame:
original_batch, preprocessed_batch = batch_data
else:
preprocessed_batch = batch_data
break
request_id, frame_data = batch_data
preprocessed_batch = [frame_data]
request_ids = [request_id]
input_batch = preprocessed_batch # non-send_original_frame mode
bindings_list = []
for frame in preprocessed_batch:
bindings = self._create_bindings(configured_infer_model)
@ -217,12 +200,12 @@ class HailoAsyncInference:
bindings_list,
partial(
self.callback,
input_batch=original_batch if self.send_original_frame else preprocessed_batch,
input_batch=input_batch,
request_ids=request_ids,
bindings_list=bindings_list,
)
)
job.wait(10000) # Wait for the last job to complete
# ----------------- End of Async Class ----------------- #
job.wait(100)
# ----------------- HailoDetector Class ----------------- #
class HailoDetector(DetectionApi):
@ -233,7 +216,6 @@ class HailoDetector(DetectionApi):
ARCH = detect_hailo_arch()
self.cache_dir = MODEL_CACHE_DIR
self.device_type = detector_config.device
# Model attributes should be provided in detector_config.model
self.model_height = detector_config.model.height if hasattr(detector_config.model, "height") else None
self.model_width = detector_config.model.width if hasattr(detector_config.model, "width") else None
self.model_type = detector_config.model.model_type if hasattr(detector_config.model, "model_type") else None
@ -244,21 +226,22 @@ class HailoDetector(DetectionApi):
self.set_path_and_url(detector_config.model.path)
self.working_model_path = self.check_and_prepare()
# Set up asynchronous inference
self.batch_size = 1
self.input_queue = queue.Queue()
self.output_queue = queue.Queue()
self.response_store = ResponseStore()
self.request_counter = 0
self.request_counter_lock = threading.Lock()
try:
logger.debug(f"[INIT] Loading HEF model from {self.working_model_path}")
self.inference_engine = HailoAsyncInference(
self.working_model_path,
self.input_queue,
self.output_queue,
self.response_store,
self.batch_size
)
self.input_shape = self.inference_engine.get_input_shape()
logger.debug(f"[INIT] Model input shape: {self.input_shape}")
# Start the inference loop in a background thread
self.inference_thread = threading.Thread(target=self.inference_engine.run, daemon=True)
self.inference_thread.start()
except Exception as e:
@ -270,7 +253,6 @@ class HailoDetector(DetectionApi):
self.model_path = None
self.url = None
return
if self.is_url(path):
self.url = path
self.model_path = None
@ -283,19 +265,15 @@ class HailoDetector(DetectionApi):
@staticmethod
def extract_model_name(path: str = None, url: str = None) -> str:
model_name = None
if path and path.endswith(".hef"):
model_name = os.path.basename(path)
return os.path.basename(path)
elif url and url.endswith(".hef"):
model_name = os.path.basename(url)
return os.path.basename(url)
else:
print("Model name not found in path or URL. Checking default settings...")
if ARCH == "hailo8":
model_name = H8_DEFAULT_MODEL
return H8_DEFAULT_MODEL
else:
model_name = H8L_DEFAULT_MODEL
print(f"Using default model: {model_name}")
return model_name
return H8L_DEFAULT_MODEL
@staticmethod
def download_model(url: str, destination: str):
@ -311,73 +289,58 @@ class HailoDetector(DetectionApi):
if not os.path.exists(self.cache_dir):
os.makedirs(self.cache_dir)
model_name = self.extract_model_name(self.model_path, self.url)
model_path = os.path.join(self.cache_dir, model_name)
cached_model_path = os.path.join(self.cache_dir, model_name)
if not self.model_path and not self.url:
if os.path.exists(model_path):
print(f"Model found in cache: {model_path}")
return model_path
if os.path.exists(cached_model_path):
print(f"Model found in cache: {cached_model_path}")
return cached_model_path
else:
print(f"Downloading default model: {model_name}")
if ARCH == "hailo8":
self.download_model(H8_DEFAULT_URL, model_path)
self.download_model(H8_DEFAULT_URL, cached_model_path)
else:
self.download_model(H8L_DEFAULT_URL, model_path)
elif self.model_path and self.url:
if os.path.exists(self.model_path):
print(f"Model found at path: {self.model_path}")
return self.model_path
else:
print(f"Model not found at path. Downloading from URL: {self.url}")
self.download_model(self.url, model_path)
self.download_model(H8L_DEFAULT_URL, cached_model_path)
elif self.url:
print(f"Downloading model from URL: {self.url}")
self.download_model(self.url, model_path)
self.download_model(self.url, cached_model_path)
elif self.model_path:
if os.path.exists(self.model_path):
print(f"Using existing model at: {self.model_path}")
return self.model_path
else:
raise FileNotFoundError(f"Model file not found at: {self.model_path}")
return model_path
return cached_model_path
def _get_request_id(self) -> int:
with self.request_counter_lock:
request_id = self.request_counter
self.request_counter += 1
if self.request_counter > 1000000:
self.request_counter = 0
return request_id
def detect_raw(self, tensor_input):
logger.debug("[DETECT_RAW] Starting detection")
request_id = self._get_request_id()
# Preprocess the input tensor
logger.debug(f"[DETECT_RAW] Starting pre processing")
tensor_input = self.preprocess(tensor_input)
# Ensure tensor_input has a batch dimension
if isinstance(tensor_input, np.ndarray) and len(tensor_input.shape) == 3:
tensor_input = np.expand_dims(tensor_input, axis=0)
logger.debug(f"[DETECT_RAW] Expanded input shape to {tensor_input.shape}")
# Enqueue input for asynchronous inference
self.input_queue.put(tensor_input)
# Wait for inference result from the output queue
result = self.output_queue.get()
if result is None:
logger.error("[DETECT_RAW] No inference result received")
self.input_queue.put((request_id, tensor_input))
try:
original_input, infer_results = self.response_store.get(request_id, timeout=10.0)
except TimeoutError:
logger.error(f"Timeout waiting for inference results for request {request_id}")
return np.zeros((20, 6), dtype=np.float32)
original_input, infer_results = result
logger.debug("[DETECT_RAW] Inference completed.")
# If infer_results is a single-element list, unwrap it.
if isinstance(infer_results, list) and len(infer_results) == 1:
infer_results = infer_results[0]
# Set your threshold (adjust as needed)
threshold = 0.4
all_detections = []
# Process each detection set
for class_id, detection_set in enumerate(infer_results):
if not isinstance(detection_set, np.ndarray) or detection_set.size == 0:
continue
logger.debug(f"[DETECT_RAW] Processing detection set {class_id} with shape {detection_set.shape}")
for det in detection_set:
if det.shape[0] < 5:
continue
@ -387,39 +350,41 @@ class HailoDetector(DetectionApi):
all_detections.append([class_id, score, det[0], det[1], det[2], det[3]])
if len(all_detections) == 0:
return np.zeros((20, 6), dtype=np.float32)
detections_array = np.zeros((20, 6), dtype=np.float32)
else:
detections_array = np.array(all_detections, dtype=np.float32)
if detections_array.shape[0] > 20:
detections_array = detections_array[:20, :]
elif detections_array.shape[0] < 20:
pad = np.zeros((20 - detections_array.shape[0], 6), dtype=np.float32)
detections_array = np.vstack((detections_array, pad))
detections_array = np.array(all_detections, dtype=np.float32)
# Pad or truncate to exactly 20 rows
if detections_array.shape[0] > 20:
detections_array = detections_array[:20, :]
elif detections_array.shape[0] < 20:
pad = np.zeros((20 - detections_array.shape[0], 6), dtype=np.float32)
detections_array = np.vstack((detections_array, pad))
logger.debug(f"[DETECT_RAW] Processed detections: {detections_array}")
return detections_array
def preprocess(self, image):
if isinstance(image, np.ndarray):
# Process the tensor input and reintroduce the batch dimension.
processed = preprocess_tensor(image, self.input_shape[1], self.input_shape[0])
return np.expand_dims(processed, axis=0)
else:
raise ValueError("Unsupported image format for preprocessing")
def close(self):
"""Properly shuts down the inference engine and releases the VDevice."""
logger.debug("[CLOSE] Closing HailoDetector")
try:
self.inference_engine.hef.close()
logger.debug("Hailo device closed successfully")
if hasattr(self, "inference_engine"):
if hasattr(self.inference_engine, "target"):
self.inference_engine.target.release()
logger.debug("Hailo VDevice released successfully")
except Exception as e:
logger.error(f"Failed to close Hailo device: {e}")
raise
# ----------------- Configuration Class ----------------- #
def __del__(self):
"""Destructor to ensure cleanup when the object is deleted."""
self.close()
# ----------------- HailoDetectorConfig Class ----------------- #
class HailoDetectorConfig(BaseDetectorConfig):
type: Literal[DETECTOR_KEY]
device: str = Field(default="PCIe", title="Device Type")
#url: Optional[str] = Field(default=None, title="Custom Model URL")