Use thread lock for openvino to avoid concurrent requests with JinaV2

This commit is contained in:
Nicolas Mowen 2025-11-07 08:46:43 -07:00
parent 2376bcaf97
commit ab3ded38e6

View File

@ -3,6 +3,7 @@
import logging import logging
import os import os
import platform import platform
import threading
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Any from typing import Any
@ -290,6 +291,10 @@ class OpenVINOModelRunner(BaseModelRunner):
self.infer_request = self.compiled_model.create_infer_request() self.infer_request = self.compiled_model.create_infer_request()
self.input_tensor: ov.Tensor | None = None self.input_tensor: ov.Tensor | None = None
# Thread lock to prevent concurrent inference (needed for JinaV2 which shares
# one runner between text and vision embeddings called from different threads)
self._inference_lock = threading.Lock()
if not self.complex_model: if not self.complex_model:
try: try:
input_shape = self.compiled_model.inputs[0].get_shape() input_shape = self.compiled_model.inputs[0].get_shape()
@ -333,67 +338,70 @@ class OpenVINOModelRunner(BaseModelRunner):
Returns: Returns:
List of output tensors List of output tensors
""" """
# Handle single input case for backward compatibility # Lock prevents concurrent access to infer_request
if ( # Needed for JinaV2: genai thread (text) + embeddings thread (vision)
len(inputs) == 1 with self._inference_lock:
and len(self.compiled_model.inputs) == 1 # Handle single input case for backward compatibility
and self.input_tensor is not None if (
): len(inputs) == 1
# Single input case - use the pre-allocated tensor for efficiency and len(self.compiled_model.inputs) == 1
input_data = list(inputs.values())[0] and self.input_tensor is not None
np.copyto(self.input_tensor.data, input_data) ):
self.infer_request.infer(self.input_tensor) # Single input case - use the pre-allocated tensor for efficiency
else: input_data = list(inputs.values())[0]
if self.complex_model: np.copyto(self.input_tensor.data, input_data)
try: self.infer_request.infer(self.input_tensor)
# This ensures the model starts with a clean state for each sequence else:
# Important for RNN models like PaddleOCR recognition if self.complex_model:
self.infer_request.reset_state() try:
except Exception: # This ensures the model starts with a clean state for each sequence
# this will raise an exception for models with AUTO set as the device # Important for RNN models like PaddleOCR recognition
pass self.infer_request.reset_state()
except Exception:
# this will raise an exception for models with AUTO set as the device
pass
# Multiple inputs case - set each input by name # Multiple inputs case - set each input by name
for input_name, input_data in inputs.items(): for input_name, input_data in inputs.items():
# Find the input by name and its index # Find the input by name and its index
input_port = None input_port = None
input_index = None input_index = None
for idx, port in enumerate(self.compiled_model.inputs): for idx, port in enumerate(self.compiled_model.inputs):
if port.get_any_name() == input_name: if port.get_any_name() == input_name:
input_port = port input_port = port
input_index = idx input_index = idx
break break
if input_port is None: if input_port is None:
raise ValueError(f"Input '{input_name}' not found in model") raise ValueError(f"Input '{input_name}' not found in model")
# Create tensor with the correct element type # Create tensor with the correct element type
input_element_type = input_port.get_element_type() input_element_type = input_port.get_element_type()
# Ensure input data matches the expected dtype to prevent type mismatches # Ensure input data matches the expected dtype to prevent type mismatches
# that can occur with models like Jina-CLIP v2 running on OpenVINO # that can occur with models like Jina-CLIP v2 running on OpenVINO
expected_dtype = input_element_type.to_dtype() expected_dtype = input_element_type.to_dtype()
if input_data.dtype != expected_dtype: if input_data.dtype != expected_dtype:
logger.debug( logger.debug(
f"Converting input '{input_name}' from {input_data.dtype} to {expected_dtype}" f"Converting input '{input_name}' from {input_data.dtype} to {expected_dtype}"
) )
input_data = input_data.astype(expected_dtype) input_data = input_data.astype(expected_dtype)
input_tensor = ov.Tensor(input_element_type, input_data.shape) input_tensor = ov.Tensor(input_element_type, input_data.shape)
np.copyto(input_tensor.data, input_data) np.copyto(input_tensor.data, input_data)
# Set the input tensor for the specific port index # Set the input tensor for the specific port index
self.infer_request.set_input_tensor(input_index, input_tensor) self.infer_request.set_input_tensor(input_index, input_tensor)
# Run inference # Run inference
self.infer_request.infer() self.infer_request.infer()
# Get all output tensors # Get all output tensors
outputs = [] outputs = []
for i in range(len(self.compiled_model.outputs)): for i in range(len(self.compiled_model.outputs)):
outputs.append(self.infer_request.get_output_tensor(i).data) outputs.append(self.infer_request.get_output_tensor(i).data)
return outputs return outputs
class RKNNModelRunner(BaseModelRunner): class RKNNModelRunner(BaseModelRunner):