From 2bfe2341f0d2b825471401233d27fc486bd9fd13 Mon Sep 17 00:00:00 2001 From: mjq2020 Date: Sun, 6 Apr 2025 18:49:42 +0100 Subject: [PATCH] optim: multi batch inference --- frigate/detectors/plugins/hailo8l.py | 122 +++++++++++++++------------ 1 file changed, 66 insertions(+), 56 deletions(-) diff --git a/frigate/detectors/plugins/hailo8l.py b/frigate/detectors/plugins/hailo8l.py index ad86ca03d1..ddd7f84eb9 100755 --- a/frigate/detectors/plugins/hailo8l.py +++ b/frigate/detectors/plugins/hailo8l.py @@ -46,17 +46,19 @@ class ResponseStore: self.cond = threading.Condition(self.lock) def put(self, request_id, response): - with self.cond: - self.responses[request_id] = response - self.cond.notify_all() + self.responses[request_id] = response + self.cond.notify_all() def get(self, request_id, timeout=None): with self.cond: if not self.cond.wait_for( - lambda: request_id in self.responses, timeout=timeout + lambda: all(ri in self.responses for ri in request_id), timeout=timeout ): raise TimeoutError(f"Timeout waiting for response {request_id}") - return self.responses.pop(request_id) + results = [] + for ri in request_id: + results.append(self.responses.pop(ri)) + return results # ----------------- Utility Functions ----------------- # @@ -167,15 +169,18 @@ class HailoAsyncInference: if completion_info.exception: logger.error(f"Inference error: {completion_info.exception}") else: - for i, bindings in enumerate(bindings_list): - if len(bindings._output_names) == 1: - result = bindings.output().get_buffer() - else: - result = { - name: np.expand_dims(bindings.output(name).get_buffer(), axis=0) - for name in bindings._output_names - } - self.output_store.put(request_ids[i], (input_batch[i], result)) + with self.output_store.cond: + for i, bindings in enumerate(bindings_list): + if len(bindings._output_names) == 1: + result = bindings.output().get_buffer() + else: + result = { + name: np.expand_dims( + bindings.output(name).get_buffer(), axis=0 + ) + for name in bindings._output_names + } + self.output_store.put(request_ids[i], (input_batch[i], result)) def _create_bindings(self, configured_infer_model) -> object: if self.output_type is None: @@ -207,9 +212,7 @@ class HailoAsyncInference: batch_data = self.input_queue.get() if batch_data is None: break - request_id, frame_data = batch_data - preprocessed_batch = [frame_data] - request_ids = [request_id] + request_ids, preprocessed_batch = batch_data input_batch = preprocessed_batch # non-send_original_frame mode bindings_list = [] @@ -364,59 +367,66 @@ class HailoDetector(DetectionApi): raise FileNotFoundError(f"Model file not found at: {self.model_path}") return cached_model_path - def _get_request_id(self) -> int: + def _get_request_id(self, batch) -> int: with self.request_counter_lock: - request_id = self.request_counter - self.request_counter += 1 - if self.request_counter > 1000000: - self.request_counter = 0 - return request_id + request_ids = [] + for i in range(batch): + request_id = self.request_counter + self.request_counter += 1 + if self.request_counter > 1000000: + self.request_counter = 0 + request_ids.append(request_id) + return request_ids def detect_raw(self, tensor_input): - request_id = self._get_request_id() + request_ids = self._get_request_id(tensor_input.shape[0]) - tensor_input = self.preprocess(tensor_input) + # tensor_input = self.preprocess(tensor_input) if isinstance(tensor_input, np.ndarray) and len(tensor_input.shape) == 3: tensor_input = np.expand_dims(tensor_input, axis=0) - self.input_queue.put((request_id, tensor_input)) + self.input_queue.put((request_ids, tensor_input)) try: - original_input, infer_results = self.response_store.get( - request_id, timeout=10.0 - ) + batch_infer_results = self.response_store.get(request_ids, timeout=10.0) + except TimeoutError: logger.error( - f"Timeout waiting for inference results for request {request_id}" + f"Timeout waiting for inference results for request {request_ids}" ) - return np.zeros((20, 6), dtype=np.float32) + return np.zeros((0, 20, 6), dtype=np.float32) + results = [] + for original_input, infer_results in batch_infer_results: + if isinstance(infer_results, list) and len(infer_results) == 1: + infer_results = infer_results[0] - if isinstance(infer_results, list) and len(infer_results) == 1: - infer_results = infer_results[0] - - threshold = 0.4 - all_detections = [] - for class_id, detection_set in enumerate(infer_results): - if not isinstance(detection_set, np.ndarray) or detection_set.size == 0: - continue - for det in detection_set: - if det.shape[0] < 5: + threshold = 0.4 + all_detections = [] + for class_id, detection_set in enumerate(infer_results): + if not isinstance(detection_set, np.ndarray) or detection_set.size == 0: continue - score = float(det[4]) - if score < threshold: - continue - all_detections.append([class_id, score, det[0], det[1], det[2], det[3]]) + for det in detection_set: + if det.shape[0] < 5: + continue + score = float(det[4]) + if score < threshold: + continue + all_detections.append( + [class_id, score, det[0], det[1], det[2], det[3]] + ) - if len(all_detections) == 0: - detections_array = np.zeros((20, 6), dtype=np.float32) - else: - detections_array = np.array(all_detections, dtype=np.float32) - if detections_array.shape[0] > 20: - detections_array = detections_array[:20, :] - elif detections_array.shape[0] < 20: - pad = np.zeros((20 - detections_array.shape[0], 6), dtype=np.float32) - detections_array = np.vstack((detections_array, pad)) - - return detections_array + if len(all_detections) == 0: + detections_array = np.zeros((20, 6), dtype=np.float32) + else: + detections_array = np.array(all_detections, dtype=np.float32) + if detections_array.shape[0] > 20: + detections_array = detections_array[:20, :] + elif detections_array.shape[0] < 20: + pad = np.zeros( + (20 - detections_array.shape[0], 6), dtype=np.float32 + ) + detections_array = np.vstack((detections_array, pad)) + results.append(detections_array) + return np.stack(results, axis=0) def preprocess(self, image): if isinstance(image, np.ndarray):