diff --git a/frigate/detectors/plugins/memryx.py b/frigate/detectors/plugins/memryx.py index f4977acad..2fb87d018 100644 --- a/frigate/detectors/plugins/memryx.py +++ b/frigate/detectors/plugins/memryx.py @@ -44,7 +44,7 @@ class MemryXDetector(DetectionApi): ModelTypeEnum.yolox, ] - def __init__(self, detector_config): + def __init__(self, detector_config, stop_event=None): """Initialize MemryX detector with the provided configuration.""" try: # Import MemryX SDK @@ -54,6 +54,9 @@ class MemryXDetector(DetectionApi): "MemryX SDK is not installed. Install it and set up MIX environment." ) return + + # Get stop_event from detector_config + self.stop_event = getattr(detector_config, "_stop_event", stop_event) model_cfg = getattr(detector_config, "model", None) @@ -363,26 +366,50 @@ class MemryXDetector(DetectionApi): def process_input(self): """Input callback function: wait for frames in the input queue, preprocess, and send to MX3 (return)""" while True: + # Check if shutdown is requested + if self.stop_event.is_set(): + logger.debug("[process_input] Stop event detected, returning None") + return None try: - # Wait for a frame from the queue (blocking call) + # Wait for a frame from the queue with timeout to check stop_event periodically frame = self.capture_queue.get( - block=True - ) # Blocks until data is available + block=True, + timeout=0.5 + ) return frame except Exception as e: - logger.info(f"[process_input] Error processing input: {e}") - time.sleep(0.1) # Prevent busy waiting in case of error + # Silently handle queue.Empty timeouts (expected during normal operation) + # Log any other unexpected exceptions + if "Empty" not in str(type(e).__name__): + logger.warning(f"[process_input] Unexpected error: {e}") + # Loop continues and will check stop_event at the top def receive_output(self): """Retrieve processed results from MemryX output queue + a copy of the original frame""" - connection_id = ( - self.capture_id_queue.get() - ) # Get the corresponding connection ID - detections = self.output_queue.get() # Get detections from MemryX + try: + # Get connection ID with timeout + connection_id = ( + self.capture_id_queue.get( + block=True, + timeout=1.0 + ) + ) # Get the corresponding connection ID + detections = self.output_queue.get() # Get detections from MemryX - return connection_id, detections + return connection_id, detections + + except Exception as e: + # On timeout or stop event, return None + if self.stop_event.is_set(): + logger.debug("[receive_output] Stop event detected, exiting") + # Silently handle queue.Empty timeouts, they're expected during normal operation + elif "Empty" not in str(type(e).__name__): + logger.warning(f"[receive_output] Error receiving output: {e}") + + return None, None + def post_process_yolonas(self, output): predictions = output[0] @@ -830,6 +857,15 @@ class MemryXDetector(DetectionApi): raise Exception( f"{self.memx_model_type} is currently not supported for memryx. See the docs for more info on supported models." ) + + def shutdown(self): + """Gracefully shutdown the MemryX accelerator""" + try: + if hasattr(self, "accl") and self.accl is not None: + self.accl.shutdown() + logger.info("MemryX accelerator shutdown complete") + except Exception as e: + logger.error(f"Error during MemryX shutdown: {e}") def detect_raw(self, tensor_input: np.ndarray): """Removed synchronous detect_raw() function so that we only use async""" diff --git a/frigate/object_detection/base.py b/frigate/object_detection/base.py index bb5f83fab..41ce93589 100644 --- a/frigate/object_detection/base.py +++ b/frigate/object_detection/base.py @@ -43,6 +43,7 @@ class BaseLocalDetector(ObjectDetector): self, detector_config: BaseDetectorConfig = None, labels: str = None, + stop_event: MpEvent = None, ): self.fps = EventsPerSecond() if labels is None: @@ -58,6 +59,10 @@ class BaseLocalDetector(ObjectDetector): self.input_transform = None self.dtype = InputDTypeEnum.int + # Attach stop_event to detector_config so detectors can access it + if detector_config and stop_event: + detector_config._stop_event = stop_event + self.detect_api = create_detector(detector_config) def _transform_input(self, tensor_input: np.ndarray) -> np.ndarray: @@ -240,6 +245,10 @@ class AsyncDetectorRunner(FrigateProcess): while not self.stop_event.is_set(): connection_id, detections = self._detector.async_receive_output() + # Handle timeout case (queue.Empty) - just continue + if connection_id is None: + continue + if not self.send_times: # guard; shouldn't happen if send/recv are balanced continue @@ -266,21 +275,40 @@ class AsyncDetectorRunner(FrigateProcess): self._frame_manager = SharedMemoryFrameManager() self._publisher = ObjectDetectorPublisher() - self._detector = AsyncLocalObjectDetector(detector_config=self.detector_config) + self._detector = AsyncLocalObjectDetector( + detector_config=self.detector_config, stop_event=self.stop_event + ) for name in self.cameras: self.create_output_shm(name) - t_detect = threading.Thread(target=self._detect_worker, daemon=True) - t_result = threading.Thread(target=self._result_worker, daemon=True) + t_detect = threading.Thread(target=self._detect_worker, daemon=False) + t_result = threading.Thread(target=self._result_worker, daemon=False) t_detect.start() t_result.start() - while not self.stop_event.is_set(): - time.sleep(0.5) + try: + while not self.stop_event.is_set(): + time.sleep(0.5) - self._publisher.stop() - logger.info("Exited async detection process...") + logger.info( + "Stop event detected, waiting for detector threads to finish..." + ) + + # Wait for threads to finish processing + t_detect.join(timeout=5) + t_result.join(timeout=5) + + # Explicitly shutdown MemryX accelerator + if hasattr(self._detector.detect_api, "shutdown"): + logger.info("Calling MemryX shutdown method...") + self._detector.detect_api.shutdown() + + self._publisher.stop() + except Exception as e: + logger.error(f"Error during async detector shutdown: {e}") + finally: + logger.info("Exited Async detection process...") class ObjectDetectProcess: @@ -308,7 +336,7 @@ class ObjectDetectProcess: # if the process has already exited on its own, just return if self.detect_process and self.detect_process.exitcode: return - self.detect_process.terminate() + logging.info("Waiting for detection process to exit gracefully...") self.detect_process.join(timeout=30) if self.detect_process.exitcode is None: