Add model conversion lock so it doesn't break when multiple detectors are defined

This commit is contained in:
Nicolas Mowen 2025-08-20 14:57:56 -06:00
parent ed8af79efa
commit fa2e628337

View File

@ -1,15 +1,16 @@
"""RKNN model conversion utility for Frigate.""" """RKNN model conversion utility for Frigate."""
import fcntl
import logging import logging
import os import os
import subprocess import subprocess
import sys import sys
import time
from pathlib import Path from pathlib import Path
from typing import Dict, Any, Optional from typing import Optional
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Model type to RKNN conversion config mapping
MODEL_TYPE_CONFIGS = { MODEL_TYPE_CONFIGS = {
"yolo-generic": { "yolo-generic": {
"mean_values": [[0, 0, 0]], "mean_values": [[0, 0, 0]],
@ -32,7 +33,7 @@ MODEL_TYPE_CONFIGS = {
def ensure_torch_dependencies() -> bool: def ensure_torch_dependencies() -> bool:
"""Dynamically install torch dependencies if not available.""" """Dynamically install torch dependencies if not available."""
try: try:
import torch import torch # type: ignore
logger.debug("PyTorch is already available") logger.debug("PyTorch is already available")
return True return True
@ -40,7 +41,6 @@ def ensure_torch_dependencies() -> bool:
logger.info("PyTorch not found, attempting to install...") logger.info("PyTorch not found, attempting to install...")
try: try:
# Try to install torch using pip
subprocess.check_call( subprocess.check_call(
[ [
sys.executable, sys.executable,
@ -55,7 +55,6 @@ def ensure_torch_dependencies() -> bool:
stderr=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
) )
# Verify installation
import torch import torch
logger.info("PyTorch installed successfully") logger.info("PyTorch installed successfully")
@ -68,8 +67,8 @@ def ensure_torch_dependencies() -> bool:
def ensure_rknn_toolkit() -> bool: def ensure_rknn_toolkit() -> bool:
"""Ensure RKNN toolkit is available.""" """Ensure RKNN toolkit is available."""
try: try:
import rknn import rknn # type: ignore # noqa: F401
from rknn.api import RKNN from rknn.api import RKNN # type: ignore # noqa: F401
logger.debug("RKNN toolkit is already available") logger.debug("RKNN toolkit is already available")
return True return True
@ -109,7 +108,6 @@ def convert_onnx_to_rknn(
Returns: Returns:
True if conversion successful, False otherwise True if conversion successful, False otherwise
""" """
# Ensure dependencies are available
if not ensure_torch_dependencies(): if not ensure_torch_dependencies():
logger.error("PyTorch dependencies not available") logger.error("PyTorch dependencies not available")
return False return False
@ -137,24 +135,17 @@ def convert_onnx_to_rknn(
from rknn.api import RKNN from rknn.api import RKNN
logger.info(f"Converting {onnx_path} to RKNN format for {soc}") logger.info(f"Converting {onnx_path} to RKNN format for {soc}")
# Initialize RKNN
rknn = RKNN(verbose=True) rknn = RKNN(verbose=True)
# Configure RKNN
rknn.config(**config) rknn.config(**config)
# Load ONNX model
if rknn.load_onnx(model=onnx_path) != 0: if rknn.load_onnx(model=onnx_path) != 0:
logger.error("Failed to load ONNX model") logger.error("Failed to load ONNX model")
return False return False
# Build RKNN model
if rknn.build(do_quantization=quantization) != 0: if rknn.build(do_quantization=quantization) != 0:
logger.error("Failed to build RKNN model") logger.error("Failed to build RKNN model")
return False return False
# Export RKNN model
if rknn.export_rknn(output_path) != 0: if rknn.export_rknn(output_path) != 0:
logger.error("Failed to export RKNN model") logger.error("Failed to export RKNN model")
return False return False
@ -167,6 +158,185 @@ def convert_onnx_to_rknn(
return False return False
def cleanup_stale_lock(lock_file_path: Path) -> bool:
"""
Clean up a stale lock file if it exists and is old.
Args:
lock_file_path: Path to the lock file
Returns:
True if lock was cleaned up, False otherwise
"""
try:
if lock_file_path.exists():
# Check if lock file is older than 10 minutes (stale)
lock_age = time.time() - lock_file_path.stat().st_mtime
if lock_age > 600: # 10 minutes
logger.warning(
f"Removing stale lock file: {lock_file_path} (age: {lock_age:.1f}s)"
)
lock_file_path.unlink()
return True
except Exception as e:
logger.error(f"Error cleaning up stale lock: {e}")
return False
def acquire_conversion_lock(lock_file_path: Path, timeout: int = 300) -> bool:
"""
Acquire a file-based lock for model conversion.
Args:
lock_file_path: Path to the lock file
timeout: Maximum time to wait for lock in seconds
Returns:
True if lock acquired, False if timeout or error
"""
try:
lock_file_path.parent.mkdir(parents=True, exist_ok=True)
cleanup_stale_lock(lock_file_path)
lock_fd = os.open(lock_file_path, os.O_CREAT | os.O_RDWR)
# Try to acquire exclusive lock
start_time = time.time()
while time.time() - start_time < timeout:
try:
fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
# Lock acquired successfully
logger.debug(f"Acquired conversion lock: {lock_file_path}")
return True
except (OSError, IOError):
# Lock is held by another process, wait and retry
if time.time() - start_time >= timeout:
logger.warning(
f"Timeout waiting for conversion lock: {lock_file_path}"
)
os.close(lock_fd)
return False
logger.debug("Waiting for conversion lock to be released...")
time.sleep(1)
os.close(lock_fd)
return False
except Exception as e:
logger.error(f"Error acquiring conversion lock: {e}")
return False
def release_conversion_lock(lock_file_path: Path) -> None:
"""
Release the conversion lock.
Args:
lock_file_path: Path to the lock file
"""
try:
if lock_file_path.exists():
lock_file_path.unlink()
logger.debug(f"Released conversion lock: {lock_file_path}")
except Exception as e:
logger.error(f"Error releasing conversion lock: {e}")
def is_lock_stale(lock_file_path: Path, max_age: int = 600) -> bool:
"""
Check if a lock file is stale (older than max_age seconds).
Args:
lock_file_path: Path to the lock file
max_age: Maximum age in seconds before considering lock stale
Returns:
True if lock is stale, False otherwise
"""
try:
if lock_file_path.exists():
lock_age = time.time() - lock_file_path.stat().st_mtime
return lock_age > max_age
except Exception:
pass
return False
def wait_for_conversion_completion(
rknn_path: Path, lock_file_path: Path, timeout: int = 300
) -> bool:
"""
Wait for another process to complete the conversion.
Args:
rknn_path: Path to the expected RKNN model
lock_file_path: Path to the lock file to monitor
timeout: Maximum time to wait in seconds
Returns:
True if RKNN model appears, False if timeout
"""
start_time = time.time()
while time.time() - start_time < timeout:
# Check if RKNN model appeared
if rknn_path.exists():
logger.info(f"RKNN model appeared: {rknn_path}")
return True
# Check if lock file is gone (conversion completed or failed)
if not lock_file_path.exists():
logger.info("Lock file removed, checking for RKNN model...")
if rknn_path.exists():
logger.info(f"RKNN model found after lock removal: {rknn_path}")
return True
else:
logger.warning(
"Lock file removed but RKNN model not found, conversion may have failed"
)
return False
# Check if lock is stale
if is_lock_stale(lock_file_path):
logger.warning(f"Lock file is stale, attempting to clean up and retry...")
cleanup_stale_lock(lock_file_path)
# Try to acquire lock again
if acquire_conversion_lock(lock_file_path, timeout=60):
try:
# Check if RKNN file appeared while waiting
if rknn_path.exists():
logger.info(f"RKNN model appeared while waiting: {rknn_path}")
return str(rknn_path)
# Convert ONNX to RKNN
logger.info(
f"Retrying conversion of {rknn_path} after stale lock cleanup..."
)
# Get the original model path from rknn_path
base_path = rknn_path.parent / rknn_path.stem
onnx_path = base_path.with_suffix(".onnx")
if onnx_path.exists():
if convert_onnx_to_rknn(
str(onnx_path), str(rknn_path), "yolo-generic", False
):
return str(rknn_path)
logger.error("Failed to convert model after stale lock cleanup")
return None
finally:
release_conversion_lock(lock_file_path)
logger.debug("Waiting for RKNN model to appear...")
time.sleep(1)
logger.warning(f"Timeout waiting for RKNN model: {rknn_path}")
return False
def auto_convert_model( def auto_convert_model(
model_path: str, model_type: str, quantization: bool = False model_path: str, model_type: str, quantization: bool = False
) -> Optional[str]: ) -> Optional[str]:
@ -181,16 +351,12 @@ def auto_convert_model(
Returns: Returns:
Path to the RKNN model if successful, None otherwise Path to the RKNN model if successful, None otherwise
""" """
from frigate.const import MODEL_CACHE_DIR
# Check if model already has .rknn extension
if model_path.endswith(".rknn"): if model_path.endswith(".rknn"):
return model_path return model_path
# Check if equivalent .rknn file exists # Check if equivalent .rknn file exists
base_path = Path(model_path) base_path = Path(model_path)
if base_path.suffix.lower() in [".onnx", ""]: if base_path.suffix.lower() in [".onnx", ""]:
# Remove extension if present
base_name = base_path.stem if base_path.suffix else base_path.name base_name = base_path.stem if base_path.suffix else base_path.name
rknn_path = base_path.parent / f"{base_name}.rknn" rknn_path = base_path.parent / f"{base_name}.rknn"
@ -198,19 +364,38 @@ def auto_convert_model(
logger.info(f"Found existing RKNN model: {rknn_path}") logger.info(f"Found existing RKNN model: {rknn_path}")
return str(rknn_path) return str(rknn_path)
# Convert ONNX to RKNN lock_file_path = base_path.parent / f"{base_name}.conversion.lock"
if base_path.suffix.lower() == ".onnx" or not base_path.suffix:
logger.info(f"Converting {model_path} to RKNN format...")
# Create output directory if it doesn't exist if acquire_conversion_lock(lock_file_path):
rknn_path.parent.mkdir(parents=True, exist_ok=True) try:
if rknn_path.exists():
logger.info(
f"RKNN model appeared while waiting for lock: {rknn_path}"
)
return str(rknn_path)
if convert_onnx_to_rknn( logger.info(f"Converting {model_path} to RKNN format...")
str(base_path), str(rknn_path), model_type, quantization rknn_path.parent.mkdir(parents=True, exist_ok=True)
):
if convert_onnx_to_rknn(
str(base_path), str(rknn_path), model_type, quantization
):
return str(rknn_path)
else:
logger.error(f"Failed to convert {model_path} to RKNN format")
return None
finally:
release_conversion_lock(lock_file_path)
else:
logger.info(
f"Another process is converting {model_path}, waiting for completion..."
)
if wait_for_conversion_completion(rknn_path, lock_file_path):
return str(rknn_path) return str(rknn_path)
else: else:
logger.error(f"Failed to convert {model_path} to RKNN format") logger.error(f"Timeout waiting for conversion of {model_path}")
return None return None
return None return None