Add CPU Compatibility Check

This commit is contained in:
Weitheng Haw 2025-08-13 13:01:38 +08:00
parent 34bf1b21df
commit 4a5325c9ab
5 changed files with 181 additions and 14 deletions

View File

@ -125,14 +125,23 @@ class EmbeddingMaintainer(threading.Thread):
db.bind(models)
if config.semantic_search.enabled:
self.embeddings = Embeddings(config, db, metrics)
try:
self.embeddings = Embeddings(config, db, metrics)
# Check if we need to re-index events
if config.semantic_search.reindex:
self.embeddings.reindex()
# Check if we need to re-index events
if config.semantic_search.reindex:
self.embeddings.reindex()
# Sync semantic search triggers in db with config
self.embeddings.sync_triggers()
# Sync semantic search triggers in db with config
self.embeddings.sync_triggers()
except Exception as e:
logger.error(
f"Failed to initialize semantic search embeddings: {e}. "
"Continuing without semantic search. Face recognition and LPR will still work."
)
self.embeddings = None
# Disable semantic search in runtime to prevent further attempts
config.semantic_search.enabled = False
# create communication for updating event descriptions
self.requestor = InterProcessRequestor()
@ -283,7 +292,7 @@ class EmbeddingMaintainer(threading.Thread):
def _handle_request(topic: str, data: dict[str, Any]) -> str:
try:
# First handle the embedding-specific topics when semantic search is enabled
if self.config.semantic_search.enabled:
if self.config.semantic_search.enabled and self.embeddings is not None:
if topic == EmbeddingsRequestEnum.embed_description.value:
return serialize(
self.embeddings.embed_description(
@ -331,7 +340,7 @@ class EmbeddingMaintainer(threading.Thread):
if not camera or source_type != EventTypeEnum.tracked_object:
return
if self.config.semantic_search.enabled:
if self.config.semantic_search.enabled and self.embeddings is not None:
self.embeddings.update_stats()
camera_config = self.config.cameras[camera]
@ -634,7 +643,10 @@ class EmbeddingMaintainer(threading.Thread):
if not self.config.semantic_search.enabled:
return
self.embeddings.embed_thumbnail(event_id, thumbnail)
if self.embeddings is not None:
self.embeddings.embed_thumbnail(event_id, thumbnail)
else:
logger.debug(f"Skipping thumbnail embedding for {event_id} - semantic search not available")
def _process_genai_description(
self, event: Event, camera_config: CameraConfig, thumbnail
@ -716,7 +728,7 @@ class EmbeddingMaintainer(threading.Thread):
)
# Embed the description
if self.config.semantic_search.enabled:
if self.config.semantic_search.enabled and self.embeddings is not None:
self.embeddings.embed_description(event.id, description)
# Check semantic trigger for this description

View File

@ -4,6 +4,17 @@ import logging
import os
import warnings
# Check CPU compatibility before importing transformers
from frigate.util.cpu_compatibility import ensure_cpu_compatibility
# Setup logger early for compatibility warnings
logger = logging.getLogger(__name__)
# Log any CPU compatibility warnings
compatibility_warning = ensure_cpu_compatibility()
if compatibility_warning:
logger.warning(compatibility_warning)
# importing this without pytorch or others causes a warning
# https://github.com/huggingface/transformers/issues/27214
# suppressed by setting env TRANSFORMERS_NO_ADVISORY_WARNINGS=1

View File

@ -6,6 +6,18 @@ import os
import numpy as np
from PIL import Image
# Check CPU compatibility before importing transformers
from frigate.util.cpu_compatibility import ensure_cpu_compatibility
# Setup logger early for compatibility warnings
logger = logging.getLogger(__name__)
# Log any CPU compatibility warnings
compatibility_warning = ensure_cpu_compatibility()
if compatibility_warning:
logger.warning(compatibility_warning)
from transformers import AutoTokenizer
from transformers.utils.logging import disable_progress_bar, set_verbosity_error

View File

@ -60,11 +60,49 @@ class ClassificationTrainingProcess(FrigateProcess):
def __train_classification_model(self) -> bool:
"""Train a classification model."""
# Check CPU compatibility before attempting to import TensorFlow
from frigate.util.cpu_compatibility import check_cpu_has_avx
if not check_cpu_has_avx():
logger.error(
f"Cannot train classification model '{self.model_name}': "
"CPU does not support AVX instructions required by TensorFlow 2.19+. "
"Live model training is not available on this hardware."
)
# Notify that training failed due to hardware incompatibility
requestor = InterProcessRequestor()
requestor.send_data(
UPDATE_MODEL_STATE,
{
"model": self.model_name,
"state": ModelStatusTypesEnum.error,
"message": "CPU incompatible with TensorFlow - AVX instruction set required",
},
)
requestor.stop()
return False
# import in the function so that tensorflow is not initialized multiple times
import tensorflow as tf
from tensorflow.keras import layers, models, optimizers
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.preprocessing.image import ImageDataGenerator
try:
import tensorflow as tf
from tensorflow.keras import layers, models, optimizers
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.preprocessing.image import ImageDataGenerator
except (ImportError, Exception) as e:
logger.error(
f"Failed to import TensorFlow for model training '{self.model_name}': {e}"
)
requestor = InterProcessRequestor()
requestor.send_data(
UPDATE_MODEL_STATE,
{
"model": self.model_name,
"state": ModelStatusTypesEnum.error,
"message": f"TensorFlow import failed: {str(e)}",
},
)
requestor.stop()
return False
logger.info(f"Kicking off classification training for {self.model_name}.")
dataset_dir = os.path.join(CLIPS_DIR, self.model_name, "dataset")

View File

@ -0,0 +1,94 @@
"""CPU compatibility detection utilities."""
import logging
import os
import subprocess
import sys
from functools import lru_cache
from typing import Optional
logger = logging.getLogger(__name__)
@lru_cache(maxsize=1)
def check_cpu_has_avx() -> bool:
"""Check if CPU supports AVX instructions required by TensorFlow 2.19+.
Returns:
bool: True if CPU has AVX support, False otherwise
"""
try:
# Try Linux method first (most common for Frigate)
if sys.platform.startswith("linux"):
with open("/proc/cpuinfo", "r") as f:
cpuinfo = f.read().lower()
return "avx" in cpuinfo
# Fallback to lscpu command
result = subprocess.run(
["lscpu"],
capture_output=True,
text=True,
timeout=2
)
if result.returncode == 0:
return "avx" in result.stdout.lower()
except FileNotFoundError:
logger.debug("Could not find /proc/cpuinfo or lscpu command")
except subprocess.TimeoutExpired:
logger.debug("CPU detection timed out")
except Exception as e:
logger.debug(f"Error detecting CPU capabilities: {e}")
# If we can't detect, assume no AVX for safety
logger.warning(
"Could not detect CPU AVX support, assuming incompatible with TensorFlow 2.19+"
)
return False
def prevent_tensorflow_import() -> None:
"""Prevent TensorFlow from being imported by mocking the module.
This is used when CPU doesn't support required instruction sets.
"""
if "tensorflow" in sys.modules:
# Already imported, too late to prevent
return
class MockTensorflow:
"""Mock TensorFlow module that raises ImportError."""
def __getattr__(self, name):
raise ImportError(
"TensorFlow is disabled due to CPU incompatibility. "
"Using ONNX backends instead."
)
# Insert mock before any real import can happen
sys.modules["tensorflow"] = MockTensorflow()
sys.modules["tensorflow.python"] = MockTensorflow()
# Also set environment variables to prevent import attempts
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"
os.environ["USE_TENSORFLOW"] = "0"
logger.info("TensorFlow import disabled for CPU compatibility")
def ensure_cpu_compatibility() -> Optional[str]:
"""Ensure CPU compatibility and return status message.
Returns:
Optional[str]: Warning message if compatibility issues detected, None if all good
"""
if not check_cpu_has_avx():
prevent_tensorflow_import()
return (
"CPU does not support AVX instructions required by TensorFlow 2.19+. "
"Live classification model training will be disabled. "
"Face recognition, LPR, and semantic search will use ONNX backends."
)
return None