frigate/frigate/genai/__init__.py

437 lines
16 KiB
Python
Raw Normal View History

"""Generative AI module for Frigate."""
import importlib
import json
import logging
import os
import re
import time
2026-05-21 23:38:38 +03:00
from typing import Any, AsyncGenerator, Callable, Optional
import numpy as np
from pydantic import ValidationError
from frigate.config import CameraConfig, GenAIConfig, GenAIProviderEnum
from frigate.const import CLIPS_DIR
from frigate.data_processing.post.types import ReviewMetadata
from frigate.genai.manager import GenAIClientManager
from frigate.genai.prompts import (
build_object_description_prompt,
build_review_description_prompt,
build_review_description_response_format,
build_review_summary_prompt,
)
from frigate.models import Event
logger = logging.getLogger(__name__)
__all__ = [
"GenAIClient",
"GenAIClientManager",
"GenAIConfig",
"GenAIProviderEnum",
"PROVIDERS",
"load_providers",
"register_genai_provider",
]
PROVIDERS = {}
def register_genai_provider(key: GenAIProviderEnum) -> Callable:
"""Register a GenAI provider."""
def decorator(cls: type) -> type:
PROVIDERS[key] = cls
return cls
return decorator
class GenAIClient:
"""Generative AI client for Frigate."""
# Minimum seconds between re-initialization attempts when the provider was
# offline at startup
REINIT_INTERVAL = 60.0
def __init__(
self,
genai_config: GenAIConfig,
timeout: int = 120,
validate_model: bool = True,
) -> None:
self.genai_config: GenAIConfig = genai_config
self.timeout = timeout
self.validate_model = validate_model
self.provider = self._init_provider()
self._last_init_attempt = time.monotonic()
def ensure_provider(self) -> bool:
"""Ensure a provider is available, retrying initialization if needed.
Providers can fail to initialize at startup when their backing service
isn't online yet (common when both are started together). This retries
``_init_provider`` lazily throttled to ``REINIT_INTERVAL`` so the
client recovers on its own once the service is reachable, without a
config reload.
Returns True if a provider is available.
"""
if self.provider is not None:
return True
now = time.monotonic()
if now - self._last_init_attempt < self.REINIT_INTERVAL:
return False
self._last_init_attempt = now
self.provider = self._init_provider()
if self.provider is not None:
logger.info(
"GenAI provider %s is now available",
self.genai_config.provider,
)
return self.provider is not None
def generate_review_description(
self,
review_data: dict[str, Any],
thumbnails: list[bytes],
concerns: list[str],
preferred_language: str | None,
debug_save: bool,
activity_context_prompt: str,
) -> ReviewMetadata | None:
"""Generate a description for the review item activity."""
context_prompt = build_review_description_prompt(
review_data,
thumbnails,
concerns,
preferred_language,
activity_context_prompt,
)
logger.debug(
f"Sending {len(thumbnails)} images to create review description on {review_data['camera']}"
)
if debug_save:
with open(
os.path.join(
CLIPS_DIR, "genai-requests", review_data["id"], "prompt.txt"
),
"w",
) as f:
f.write(context_prompt)
response_format = build_review_description_response_format(concerns)
response = self._send(context_prompt, thumbnails, response_format)
if debug_save and response:
with open(
os.path.join(
CLIPS_DIR, "genai-requests", review_data["id"], "response.txt"
),
"w",
) as f:
f.write(response)
if response:
clean_json = re.sub(
r"\n?```$", "", re.sub(r"^```[a-zA-Z0-9]*\n?", "", response)
)
try:
metadata = ReviewMetadata.model_validate_json(clean_json)
except ValidationError as ve:
# Constraint violations (length, item count, ranges) are logged
# at debug and the response is kept anyway — a slightly
# off-spec answer is still usable, and dropping the whole
# response loses the narrative content the model produced.
for err in ve.errors():
loc = ".".join(str(p) for p in err["loc"]) or "<root>"
logger.debug(
"Review metadata soft validation: %s%s (input: %r)",
loc,
err["msg"],
err.get("input"),
)
try:
raw = json.loads(clean_json)
except json.JSONDecodeError as je:
logger.error("Failed to parse review description JSON: %s", je)
return None
# observations and confidence are required on the model; fill an empty default
# if the response omitted it so attribute access stays safe.
raw.setdefault("observations", [])
raw.setdefault("confidence", 0.0)
metadata = ReviewMetadata.model_construct(**raw)
except Exception as e:
logger.error(
f"Failed to parse review description as the response did not match expected format. {e}"
)
return None
try:
2026-03-10 16:35:00 +03:00
# Normalize confidence if model returned a percentage (e.g. 85 instead of 0.85)
if metadata.confidence > 1.0:
metadata.confidence = min(metadata.confidence / 100.0, 1.0)
# If any verified objects (contain ← separator), set to 0
if any("" in obj for obj in review_data["unified_objects"]):
metadata.potential_threat_level = 0
metadata.title = metadata.title[0].upper() + metadata.title[1:]
metadata.time = review_data["start"]
return metadata
except Exception as e:
logger.error(f"Failed to post-process review metadata: {e}")
return None
else:
logger.debug(
f"Invalid response received from GenAI provider for review description on {review_data['camera']}. Response: {response}",
)
return None
def generate_review_summary(
self,
start_ts: float,
end_ts: float,
events: list[dict[str, Any]],
preferred_language: str | None,
debug_save: bool,
) -> str | None:
"""Generate a summary of review item descriptions over a period of time."""
timeline_summary_prompt = build_review_summary_prompt(
start_ts, end_ts, events, preferred_language
)
if debug_save:
with open(
os.path.join(
CLIPS_DIR, "genai-requests", f"{start_ts}-{end_ts}", "prompt.txt"
),
"w",
) as f:
f.write(timeline_summary_prompt)
response = self._send(timeline_summary_prompt, [])
if debug_save and response:
with open(
os.path.join(
CLIPS_DIR, "genai-requests", f"{start_ts}-{end_ts}", "response.txt"
),
"w",
) as f:
f.write(response)
return response
def generate_object_description(
self,
camera_config: CameraConfig,
thumbnails: list[bytes],
event: Event,
) -> Optional[str]:
"""Generate a description for the frame."""
try:
prompt = build_object_description_prompt(camera_config, event)
except KeyError as e:
logger.error(f"Invalid key in GenAI prompt: {e}")
return None
logger.debug(f"Sending images to genai provider with prompt: {prompt}")
return self._send(prompt, thumbnails)
def _init_provider(self) -> Any:
"""Initialize the client."""
return None
def _send(
self,
prompt: str,
images: list[bytes],
response_format: Optional[dict] = None,
enable_thinking: bool = False,
) -> Optional[str]:
"""Submit a request to the provider.
``enable_thinking`` is honored only by providers that report
``supports_toggleable_thinking``. Description-style callers leave it
at the default (off) since synthesis tasks don't benefit from
reasoning traces.
"""
return None
@property
def supports_vision(self) -> bool:
"""Whether the model supports vision/image input.
Defaults to True for cloud providers. Providers that can detect
capability at runtime (e.g. llama.cpp) should override this.
"""
return True
@property
def supports_toggleable_thinking(self) -> bool:
"""Whether the configured model exposes a per-request thinking toggle."""
return False
def list_models(self) -> list[str]:
"""Return the list of model names available from this provider.
Providers should override this to query their backend.
"""
return []
def get_context_size(self) -> int:
"""Get the context window size for this provider in tokens."""
return 4096
def estimate_image_tokens(self, width: int, height: int) -> float:
"""Estimate prompt tokens consumed by a single image of the given dimensions.
Default heuristic: ~1 token per 1250 pixels. Providers that can measure or
know their model's exact image-token cost should override.
"""
return (width * height) / 1250
def embed(
self,
texts: list[str] | None = None,
images: list[bytes] | None = None,
) -> list[np.ndarray]:
"""Generate embeddings for text and/or images.
Returns list of numpy arrays (one per input). Expected dimension is 768
for Frigate semantic search compatibility.
Providers that support embeddings should override this method.
"""
logger.warning(
"%s does not support embeddings. "
"This method should be overridden by the provider implementation.",
self.__class__.__name__,
)
return []
def chat_with_tools(
self,
messages: list[dict[str, Any]],
tools: Optional[list[dict[str, Any]]] = None,
tool_choice: Optional[str] = "auto",
enable_thinking: Optional[bool] = None,
) -> dict[str, Any]:
"""
Send chat messages to LLM with optional tool definitions.
This method handles conversation-style interactions with the LLM,
including function calling/tool usage capabilities.
Args:
messages: List of message dictionaries. Each message should have:
- 'role': str - One of 'user', 'assistant', 'system', or 'tool'
- 'content': str - The message content
- 'tool_call_id': Optional[str] - For tool responses, the ID of the tool call
- 'name': Optional[str] - For tool messages, the tool name
tools: Optional list of tool definitions in OpenAI-compatible format.
Each tool should have 'type': 'function' and 'function' with:
- 'name': str - Tool name
- 'description': str - Tool description
- 'parameters': dict - JSON schema for parameters
tool_choice: How the model should handle tools:
- 'auto': Model decides whether to call tools
- 'none': Model must not call tools
- 'required': Model must call at least one tool
- Or a dict specifying a specific tool to call
enable_thinking: Per-request thinking toggle. None means use the
provider default. Ignored by providers without a per-request
toggle (see `supports_toggleable_thinking`).
Returns:
Dictionary with:
- 'content': Optional[str] - The text response from the LLM, None if tool calls
- 'reasoning': Optional[str] - The separated reasoning/thinking trace
if the model emitted one (e.g. via OpenAI-compatible
`reasoning_content`). None when the model does not surface a
trace or the provider does not parse it.
- 'tool_calls': Optional[List[Dict]] - List of tool calls if LLM wants to call tools.
Each tool call dict has:
- 'id': str - Unique identifier for this tool call
- 'name': str - Tool name to call
- 'arguments': dict - Arguments for the tool call (parsed JSON)
- 'finish_reason': str - Reason generation stopped:
- 'stop': Normal completion
- 'tool_calls': LLM wants to call tools
- 'length': Hit token limit
- 'error': An error occurred
Streaming counterpart `chat_with_tools_stream` yields
``(kind, value)`` tuples where ``kind`` is one of:
- 'content_delta': value is a string fragment of the answer
- 'reasoning_delta': value is a string fragment of the reasoning
trace (emitted before content for thinking models)
- 'stats': value is a usage stats dict
- 'message': value is the final dict shape described above
Raises:
NotImplementedError: If the provider doesn't implement this method.
"""
# Base implementation - each provider should override this
logger.warning(
f"{self.__class__.__name__} does not support chat_with_tools. "
"This method should be overridden by the provider implementation."
)
return {
"content": None,
"reasoning": None,
"tool_calls": None,
"finish_reason": "error",
}
2026-05-21 23:38:38 +03:00
async def chat_with_tools_stream(
self,
messages: list[dict[str, Any]],
tools: Optional[list[dict[str, Any]]] = None,
tool_choice: Optional[str] = "auto",
enable_thinking: Optional[bool] = None,
) -> AsyncGenerator[tuple[str, Any], None]:
"""Streaming counterpart to `chat_with_tools`.
Yields ``(kind, value)`` tuples where ``kind`` is one of:
- 'content_delta': value is a string fragment of the answer
- 'reasoning_delta': value is a string fragment of the reasoning
trace (emitted before content for thinking models)
- 'stats': value is a usage stats dict
- 'message': value is the final dict shape described in
`chat_with_tools`
Argument semantics including ``enable_thinking`` match
`chat_with_tools`. Providers that don't support streaming should
override this and yield an error 'message' event.
"""
logger.warning(
f"{self.__class__.__name__} does not support chat_with_tools_stream. "
"This method should be overridden by the provider implementation."
)
yield (
"message",
{
"content": None,
"reasoning": None,
"tool_calls": None,
"finish_reason": "error",
},
)
def load_providers() -> None:
plugins_dir = os.path.join(os.path.dirname(__file__), "plugins")
for filename in os.listdir(plugins_dir):
if filename.endswith(".py") and filename != "__init__.py":
module_name = f"frigate.genai.plugins.{filename[:-3]}"
importlib.import_module(module_name)