diff --git a/frigate/util/rknn_converter.py b/frigate/util/rknn_converter.py index 48ce3e0b1..3317cb34f 100644 --- a/frigate/util/rknn_converter.py +++ b/frigate/util/rknn_converter.py @@ -1,15 +1,16 @@ """RKNN model conversion utility for Frigate.""" +import fcntl import logging import os import subprocess import sys +import time from pathlib import Path -from typing import Dict, Any, Optional +from typing import Optional logger = logging.getLogger(__name__) -# Model type to RKNN conversion config mapping MODEL_TYPE_CONFIGS = { "yolo-generic": { "mean_values": [[0, 0, 0]], @@ -32,7 +33,7 @@ MODEL_TYPE_CONFIGS = { def ensure_torch_dependencies() -> bool: """Dynamically install torch dependencies if not available.""" try: - import torch + import torch # type: ignore logger.debug("PyTorch is already available") return True @@ -40,7 +41,6 @@ def ensure_torch_dependencies() -> bool: logger.info("PyTorch not found, attempting to install...") try: - # Try to install torch using pip subprocess.check_call( [ sys.executable, @@ -55,7 +55,6 @@ def ensure_torch_dependencies() -> bool: stderr=subprocess.DEVNULL, ) - # Verify installation import torch logger.info("PyTorch installed successfully") @@ -68,8 +67,8 @@ def ensure_torch_dependencies() -> bool: def ensure_rknn_toolkit() -> bool: """Ensure RKNN toolkit is available.""" try: - import rknn - from rknn.api import RKNN + import rknn # type: ignore # noqa: F401 + from rknn.api import RKNN # type: ignore # noqa: F401 logger.debug("RKNN toolkit is already available") return True @@ -109,7 +108,6 @@ def convert_onnx_to_rknn( Returns: True if conversion successful, False otherwise """ - # Ensure dependencies are available if not ensure_torch_dependencies(): logger.error("PyTorch dependencies not available") return False @@ -137,24 +135,17 @@ def convert_onnx_to_rknn( from rknn.api import RKNN logger.info(f"Converting {onnx_path} to RKNN format for {soc}") - - # Initialize RKNN rknn = RKNN(verbose=True) - - # Configure RKNN rknn.config(**config) - # Load ONNX model if rknn.load_onnx(model=onnx_path) != 0: logger.error("Failed to load ONNX model") return False - # Build RKNN model if rknn.build(do_quantization=quantization) != 0: logger.error("Failed to build RKNN model") return False - # Export RKNN model if rknn.export_rknn(output_path) != 0: logger.error("Failed to export RKNN model") return False @@ -167,6 +158,185 @@ def convert_onnx_to_rknn( return False +def cleanup_stale_lock(lock_file_path: Path) -> bool: + """ + Clean up a stale lock file if it exists and is old. + + Args: + lock_file_path: Path to the lock file + + Returns: + True if lock was cleaned up, False otherwise + """ + try: + if lock_file_path.exists(): + # Check if lock file is older than 10 minutes (stale) + lock_age = time.time() - lock_file_path.stat().st_mtime + if lock_age > 600: # 10 minutes + logger.warning( + f"Removing stale lock file: {lock_file_path} (age: {lock_age:.1f}s)" + ) + lock_file_path.unlink() + return True + except Exception as e: + logger.error(f"Error cleaning up stale lock: {e}") + + return False + + +def acquire_conversion_lock(lock_file_path: Path, timeout: int = 300) -> bool: + """ + Acquire a file-based lock for model conversion. + + Args: + lock_file_path: Path to the lock file + timeout: Maximum time to wait for lock in seconds + + Returns: + True if lock acquired, False if timeout or error + """ + try: + lock_file_path.parent.mkdir(parents=True, exist_ok=True) + cleanup_stale_lock(lock_file_path) + lock_fd = os.open(lock_file_path, os.O_CREAT | os.O_RDWR) + + # Try to acquire exclusive lock + start_time = time.time() + while time.time() - start_time < timeout: + try: + fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) + # Lock acquired successfully + logger.debug(f"Acquired conversion lock: {lock_file_path}") + return True + except (OSError, IOError): + # Lock is held by another process, wait and retry + if time.time() - start_time >= timeout: + logger.warning( + f"Timeout waiting for conversion lock: {lock_file_path}" + ) + os.close(lock_fd) + return False + + logger.debug("Waiting for conversion lock to be released...") + time.sleep(1) + + os.close(lock_fd) + return False + + except Exception as e: + logger.error(f"Error acquiring conversion lock: {e}") + return False + + +def release_conversion_lock(lock_file_path: Path) -> None: + """ + Release the conversion lock. + + Args: + lock_file_path: Path to the lock file + """ + try: + if lock_file_path.exists(): + lock_file_path.unlink() + logger.debug(f"Released conversion lock: {lock_file_path}") + except Exception as e: + logger.error(f"Error releasing conversion lock: {e}") + + +def is_lock_stale(lock_file_path: Path, max_age: int = 600) -> bool: + """ + Check if a lock file is stale (older than max_age seconds). + + Args: + lock_file_path: Path to the lock file + max_age: Maximum age in seconds before considering lock stale + + Returns: + True if lock is stale, False otherwise + """ + try: + if lock_file_path.exists(): + lock_age = time.time() - lock_file_path.stat().st_mtime + return lock_age > max_age + except Exception: + pass + + return False + + +def wait_for_conversion_completion( + rknn_path: Path, lock_file_path: Path, timeout: int = 300 +) -> bool: + """ + Wait for another process to complete the conversion. + + Args: + rknn_path: Path to the expected RKNN model + lock_file_path: Path to the lock file to monitor + timeout: Maximum time to wait in seconds + + Returns: + True if RKNN model appears, False if timeout + """ + start_time = time.time() + while time.time() - start_time < timeout: + # Check if RKNN model appeared + if rknn_path.exists(): + logger.info(f"RKNN model appeared: {rknn_path}") + return True + + # Check if lock file is gone (conversion completed or failed) + if not lock_file_path.exists(): + logger.info("Lock file removed, checking for RKNN model...") + if rknn_path.exists(): + logger.info(f"RKNN model found after lock removal: {rknn_path}") + return True + else: + logger.warning( + "Lock file removed but RKNN model not found, conversion may have failed" + ) + return False + + # Check if lock is stale + if is_lock_stale(lock_file_path): + logger.warning(f"Lock file is stale, attempting to clean up and retry...") + cleanup_stale_lock(lock_file_path) + # Try to acquire lock again + if acquire_conversion_lock(lock_file_path, timeout=60): + try: + # Check if RKNN file appeared while waiting + if rknn_path.exists(): + logger.info(f"RKNN model appeared while waiting: {rknn_path}") + return str(rknn_path) + + # Convert ONNX to RKNN + logger.info( + f"Retrying conversion of {rknn_path} after stale lock cleanup..." + ) + + # Get the original model path from rknn_path + base_path = rknn_path.parent / rknn_path.stem + onnx_path = base_path.with_suffix(".onnx") + + if onnx_path.exists(): + if convert_onnx_to_rknn( + str(onnx_path), str(rknn_path), "yolo-generic", False + ): + return str(rknn_path) + + logger.error("Failed to convert model after stale lock cleanup") + return None + + finally: + release_conversion_lock(lock_file_path) + + logger.debug("Waiting for RKNN model to appear...") + time.sleep(1) + + logger.warning(f"Timeout waiting for RKNN model: {rknn_path}") + return False + + def auto_convert_model( model_path: str, model_type: str, quantization: bool = False ) -> Optional[str]: @@ -181,16 +351,12 @@ def auto_convert_model( Returns: Path to the RKNN model if successful, None otherwise """ - from frigate.const import MODEL_CACHE_DIR - - # Check if model already has .rknn extension if model_path.endswith(".rknn"): return model_path # Check if equivalent .rknn file exists base_path = Path(model_path) if base_path.suffix.lower() in [".onnx", ""]: - # Remove extension if present base_name = base_path.stem if base_path.suffix else base_path.name rknn_path = base_path.parent / f"{base_name}.rknn" @@ -198,19 +364,38 @@ def auto_convert_model( logger.info(f"Found existing RKNN model: {rknn_path}") return str(rknn_path) - # Convert ONNX to RKNN - if base_path.suffix.lower() == ".onnx" or not base_path.suffix: - logger.info(f"Converting {model_path} to RKNN format...") + lock_file_path = base_path.parent / f"{base_name}.conversion.lock" - # Create output directory if it doesn't exist - rknn_path.parent.mkdir(parents=True, exist_ok=True) + if acquire_conversion_lock(lock_file_path): + try: + if rknn_path.exists(): + logger.info( + f"RKNN model appeared while waiting for lock: {rknn_path}" + ) + return str(rknn_path) - if convert_onnx_to_rknn( - str(base_path), str(rknn_path), model_type, quantization - ): + logger.info(f"Converting {model_path} to RKNN format...") + rknn_path.parent.mkdir(parents=True, exist_ok=True) + + if convert_onnx_to_rknn( + str(base_path), str(rknn_path), model_type, quantization + ): + return str(rknn_path) + else: + logger.error(f"Failed to convert {model_path} to RKNN format") + return None + + finally: + release_conversion_lock(lock_file_path) + else: + logger.info( + f"Another process is converting {model_path}, waiting for completion..." + ) + + if wait_for_conversion_completion(rknn_path, lock_file_path): return str(rknn_path) else: - logger.error(f"Failed to convert {model_path} to RKNN format") + logger.error(f"Timeout waiting for conversion of {model_path}") return None return None