Implement LLM Chat API with tool calling support (#21731)

* Implement initial tools definiton APIs

* Add initial chat completion API with tool support

* Implement other providers

* Cleanup
This commit is contained in:
Nicolas Mowen 2026-01-20 08:13:12 -07:00 committed by GitHub
parent 16d94c3cfa
commit 31ee62b760
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 1247 additions and 3 deletions

476
frigate/api/chat.py Normal file
View File

@ -0,0 +1,476 @@
"""Chat and LLM tool calling APIs."""
import json
import logging
from datetime import datetime, timezone
from typing import Any, Dict, List
from fastapi import APIRouter, Body, Depends, Request
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from frigate.api.auth import (
allow_any_authenticated,
get_allowed_cameras_for_filter,
)
from frigate.api.defs.query.events_query_parameters import EventsQueryParams
from frigate.api.defs.request.chat_body import ChatCompletionRequest
from frigate.api.defs.response.chat_response import (
ChatCompletionResponse,
ChatMessageResponse,
)
from frigate.api.defs.tags import Tags
from frigate.api.event import events
from frigate.genai import get_genai_client
logger = logging.getLogger(__name__)
router = APIRouter(tags=[Tags.chat])
class ToolExecuteRequest(BaseModel):
"""Request model for tool execution."""
tool_name: str
arguments: Dict[str, Any]
def get_tool_definitions() -> List[Dict[str, Any]]:
"""
Get OpenAI-compatible tool definitions for Frigate.
Returns a list of tool definitions that can be used with OpenAI-compatible
function calling APIs.
"""
return [
{
"type": "function",
"function": {
"name": "search_objects",
"description": (
"Search for detected objects in Frigate by camera, object label, time range, "
"zones, and other filters. Use this to answer questions about when "
"objects were detected, what objects appeared, or to find specific object detections. "
"An 'object' in Frigate represents a tracked detection (e.g., a person, package, car)."
),
"parameters": {
"type": "object",
"properties": {
"camera": {
"type": "string",
"description": "Camera name to filter by (optional). Use 'all' for all cameras.",
},
"label": {
"type": "string",
"description": "Object label to filter by (e.g., 'person', 'package', 'car').",
},
"after": {
"type": "string",
"description": "Start time in ISO 8601 format (e.g., '2024-01-01T00:00:00Z').",
},
"before": {
"type": "string",
"description": "End time in ISO 8601 format (e.g., '2024-01-01T23:59:59Z').",
},
"zones": {
"type": "array",
"items": {"type": "string"},
"description": "List of zone names to filter by.",
},
"limit": {
"type": "integer",
"description": "Maximum number of objects to return (default: 10).",
"default": 10,
},
},
},
"required": [],
},
},
]
@router.get(
"/chat/tools",
dependencies=[Depends(allow_any_authenticated())],
summary="Get available tools",
description="Returns OpenAI-compatible tool definitions for function calling.",
)
def get_tools(request: Request) -> JSONResponse:
"""Get list of available tools for LLM function calling."""
tools = get_tool_definitions()
return JSONResponse(content={"tools": tools})
async def _execute_search_objects(
request: Request,
arguments: Dict[str, Any],
allowed_cameras: List[str],
) -> JSONResponse:
"""
Execute the search_objects tool.
This searches for detected objects (events) in Frigate using the same
logic as the events API endpoint.
"""
# Parse ISO 8601 timestamps to Unix timestamps if provided
after = arguments.get("after")
before = arguments.get("before")
if after:
try:
after_dt = datetime.fromisoformat(after.replace("Z", "+00:00"))
after = after_dt.timestamp()
except (ValueError, AttributeError):
logger.warning(f"Invalid 'after' timestamp format: {after}")
after = None
if before:
try:
before_dt = datetime.fromisoformat(before.replace("Z", "+00:00"))
before = before_dt.timestamp()
except (ValueError, AttributeError):
logger.warning(f"Invalid 'before' timestamp format: {before}")
before = None
# Convert zones array to comma-separated string if provided
zones = arguments.get("zones")
if isinstance(zones, list):
zones = ",".join(zones)
elif zones is None:
zones = "all"
# Build query parameters compatible with EventsQueryParams
query_params = EventsQueryParams(
camera=arguments.get("camera", "all"),
cameras=arguments.get("camera", "all"),
label=arguments.get("label", "all"),
labels=arguments.get("label", "all"),
zones=zones,
zone=zones,
after=after,
before=before,
limit=arguments.get("limit", 10),
)
try:
# Call the events endpoint function directly
# The events function is synchronous and takes params and allowed_cameras
response = events(query_params, allowed_cameras)
# The response is already a JSONResponse with event data
# Return it as-is for the LLM
return response
except Exception as e:
logger.error(f"Error executing search_objects: {e}", exc_info=True)
return JSONResponse(
content={
"success": False,
"message": f"Error searching objects: {str(e)}",
},
status_code=500,
)
@router.post(
"/chat/execute",
dependencies=[Depends(allow_any_authenticated())],
summary="Execute a tool",
description="Execute a tool function call from an LLM.",
)
async def execute_tool(
request: Request,
body: ToolExecuteRequest = Body(...),
allowed_cameras: List[str] = Depends(get_allowed_cameras_for_filter),
) -> JSONResponse:
"""
Execute a tool function call.
This endpoint receives tool calls from LLMs and executes the corresponding
Frigate operations, returning results in a format the LLM can understand.
"""
tool_name = body.tool_name
arguments = body.arguments
logger.debug(f"Executing tool: {tool_name} with arguments: {arguments}")
if tool_name == "search_objects":
return await _execute_search_objects(request, arguments, allowed_cameras)
return JSONResponse(
content={
"success": False,
"message": f"Unknown tool: {tool_name}",
"tool": tool_name,
},
status_code=400,
)
async def _execute_tool_internal(
tool_name: str,
arguments: Dict[str, Any],
request: Request,
allowed_cameras: List[str],
) -> Dict[str, Any]:
"""
Internal helper to execute a tool and return the result as a dict.
This is used by the chat completion endpoint to execute tools.
"""
if tool_name == "search_objects":
response = await _execute_search_objects(request, arguments, allowed_cameras)
try:
if hasattr(response, "body"):
body_str = response.body.decode("utf-8")
return json.loads(body_str)
elif hasattr(response, "content"):
return response.content
else:
return {}
except (json.JSONDecodeError, AttributeError) as e:
logger.warning(f"Failed to extract tool result: {e}")
return {"error": "Failed to parse tool result"}
else:
return {"error": f"Unknown tool: {tool_name}"}
@router.post(
"/chat/completion",
response_model=ChatCompletionResponse,
dependencies=[Depends(allow_any_authenticated())],
summary="Chat completion with tool calling",
description=(
"Send a chat message to the configured GenAI provider with tool calling support. "
"The LLM can call Frigate tools to answer questions about your cameras and events."
),
)
async def chat_completion(
request: Request,
body: ChatCompletionRequest = Body(...),
allowed_cameras: List[str] = Depends(get_allowed_cameras_for_filter),
) -> JSONResponse:
"""
Chat completion endpoint with tool calling support.
This endpoint:
1. Gets the configured GenAI client
2. Gets tool definitions
3. Sends messages + tools to LLM
4. Handles tool_calls if present
5. Executes tools and sends results back to LLM
6. Repeats until final answer
7. Returns response to user
"""
genai_client = get_genai_client(request.app.frigate_config)
if not genai_client:
return JSONResponse(
content={
"error": "GenAI is not configured. Please configure a GenAI provider in your Frigate config.",
},
status_code=400,
)
tools = get_tool_definitions()
conversation = []
current_datetime = datetime.now(timezone.utc)
current_date_str = current_datetime.strftime("%Y-%m-%d")
current_time_str = current_datetime.strftime("%H:%M:%S %Z")
system_prompt = f"""You are a helpful assistant for Frigate, a security camera NVR system. You help users answer questions about their cameras, detected objects, and events.
Current date and time: {current_date_str} at {current_time_str} (UTC)
When users ask questions about "today", "yesterday", "this week", etc., use the current date above as reference.
When searching for objects or events, use ISO 8601 format for dates (e.g., {current_date_str}T00:00:00Z for the start of today).
Always be accurate with time calculations based on the current date provided."""
conversation.append(
{
"role": "system",
"content": system_prompt,
}
)
for msg in body.messages:
msg_dict = {
"role": msg.role,
"content": msg.content,
}
if msg.tool_call_id:
msg_dict["tool_call_id"] = msg.tool_call_id
if msg.name:
msg_dict["name"] = msg.name
conversation.append(msg_dict)
tool_iterations = 0
max_iterations = body.max_tool_iterations
logger.debug(
f"Starting chat completion with {len(conversation)} message(s), "
f"{len(tools)} tool(s) available, max_iterations={max_iterations}"
)
try:
while tool_iterations < max_iterations:
logger.debug(
f"Calling LLM (iteration {tool_iterations + 1}/{max_iterations}) "
f"with {len(conversation)} message(s) in conversation"
)
response = genai_client.chat_with_tools(
messages=conversation,
tools=tools if tools else None,
tool_choice="auto",
)
if response.get("finish_reason") == "error":
logger.error("GenAI client returned an error")
return JSONResponse(
content={
"error": "An error occurred while processing your request.",
},
status_code=500,
)
assistant_message = {
"role": "assistant",
"content": response.get("content"),
}
if response.get("tool_calls"):
assistant_message["tool_calls"] = [
{
"id": tc["id"],
"type": "function",
"function": {
"name": tc["name"],
"arguments": json.dumps(tc["arguments"]),
},
}
for tc in response["tool_calls"]
]
conversation.append(assistant_message)
tool_calls = response.get("tool_calls")
if not tool_calls:
logger.debug(
f"Chat completion finished with final answer (iterations: {tool_iterations})"
)
return JSONResponse(
content=ChatCompletionResponse(
message=ChatMessageResponse(
role="assistant",
content=response.get("content"),
tool_calls=None,
),
finish_reason=response.get("finish_reason", "stop"),
tool_iterations=tool_iterations,
).model_dump(),
)
# Execute tools
tool_iterations += 1
logger.debug(
f"Tool calls detected (iteration {tool_iterations}/{max_iterations}): "
f"{len(tool_calls)} tool(s) to execute"
)
tool_results = []
for tool_call in tool_calls:
tool_name = tool_call["name"]
tool_args = tool_call["arguments"]
tool_call_id = tool_call["id"]
logger.debug(
f"Executing tool: {tool_name} (id: {tool_call_id}) with arguments: {json.dumps(tool_args, indent=2)}"
)
try:
tool_result = await _execute_tool_internal(
tool_name, tool_args, request, allowed_cameras
)
if isinstance(tool_result, dict):
result_content = json.dumps(tool_result)
result_summary = tool_result
if isinstance(tool_result, dict) and isinstance(
tool_result.get("content"), list
):
result_count = len(tool_result.get("content", []))
result_summary = {
"count": result_count,
"sample": tool_result.get("content", [])[:2]
if result_count > 0
else [],
}
logger.debug(
f"Tool {tool_name} (id: {tool_call_id}) completed successfully. "
f"Result: {json.dumps(result_summary, indent=2)}"
)
elif isinstance(tool_result, str):
result_content = tool_result
logger.debug(
f"Tool {tool_name} (id: {tool_call_id}) completed successfully. "
f"Result length: {len(result_content)} characters"
)
else:
result_content = str(tool_result)
logger.debug(
f"Tool {tool_name} (id: {tool_call_id}) completed successfully. "
f"Result type: {type(tool_result).__name__}"
)
tool_results.append(
{
"role": "tool",
"tool_call_id": tool_call_id,
"content": result_content,
}
)
except Exception as e:
logger.error(
f"Error executing tool {tool_name} (id: {tool_call_id}): {e}",
exc_info=True,
)
error_content = json.dumps(
{"error": f"Tool execution failed: {str(e)}"}
)
tool_results.append(
{
"role": "tool",
"tool_call_id": tool_call_id,
"content": error_content,
}
)
logger.debug(
f"Tool {tool_name} (id: {tool_call_id}) failed. Error result added to conversation."
)
conversation.extend(tool_results)
logger.debug(
f"Added {len(tool_results)} tool result(s) to conversation. "
f"Continuing with next LLM call..."
)
logger.warning(
f"Max tool iterations ({max_iterations}) reached. Returning partial response."
)
return JSONResponse(
content=ChatCompletionResponse(
message=ChatMessageResponse(
role="assistant",
content="I reached the maximum number of tool call iterations. Please try rephrasing your question.",
tool_calls=None,
),
finish_reason="length",
tool_iterations=tool_iterations,
).model_dump(),
)
except Exception as e:
logger.error(f"Error in chat completion: {e}", exc_info=True)
return JSONResponse(
content={
"error": "An error occurred while processing your request.",
},
status_code=500,
)

View File

@ -0,0 +1,34 @@
"""Chat API request models."""
from typing import Optional
from pydantic import BaseModel, Field
class ChatMessage(BaseModel):
"""A single message in a chat conversation."""
role: str = Field(
description="Message role: 'user', 'assistant', 'system', or 'tool'"
)
content: str = Field(description="Message content")
tool_call_id: Optional[str] = Field(
default=None, description="For tool messages, the ID of the tool call"
)
name: Optional[str] = Field(
default=None, description="For tool messages, the tool name"
)
class ChatCompletionRequest(BaseModel):
"""Request for chat completion with tool calling."""
messages: list[ChatMessage] = Field(
description="List of messages in the conversation"
)
max_tool_iterations: int = Field(
default=5,
ge=1,
le=10,
description="Maximum number of tool call iterations (default: 5)",
)

View File

@ -0,0 +1,37 @@
"""Chat API response models."""
from typing import Any, Optional
from pydantic import BaseModel, Field
class ToolCall(BaseModel):
"""A tool call from the LLM."""
id: str = Field(description="Unique identifier for this tool call")
name: str = Field(description="Tool name to call")
arguments: dict[str, Any] = Field(description="Arguments for the tool call")
class ChatMessageResponse(BaseModel):
"""A message in the chat response."""
role: str = Field(description="Message role")
content: Optional[str] = Field(
default=None, description="Message content (None if tool calls present)"
)
tool_calls: Optional[list[ToolCall]] = Field(
default=None, description="Tool calls if LLM wants to call tools"
)
class ChatCompletionResponse(BaseModel):
"""Response from chat completion."""
message: ChatMessageResponse = Field(description="The assistant's message")
finish_reason: str = Field(
description="Reason generation stopped: 'stop', 'tool_calls', 'length', 'error'"
)
tool_iterations: int = Field(
default=0, description="Number of tool call iterations performed"
)

View File

@ -5,6 +5,7 @@ class Tags(Enum):
app = "App" app = "App"
auth = "Auth" auth = "Auth"
camera = "Camera" camera = "Camera"
chat = "Chat"
events = "Events" events = "Events"
export = "Export" export = "Export"
classification = "Classification" classification = "Classification"

View File

@ -16,6 +16,7 @@ from frigate.api import app as main_app
from frigate.api import ( from frigate.api import (
auth, auth,
camera, camera,
chat,
classification, classification,
event, event,
export, export,
@ -121,6 +122,7 @@ def create_fastapi_app(
# Order of include_router matters: https://fastapi.tiangolo.com/tutorial/path-params/#order-matters # Order of include_router matters: https://fastapi.tiangolo.com/tutorial/path-params/#order-matters
app.include_router(auth.router) app.include_router(auth.router)
app.include_router(camera.router) app.include_router(camera.router)
app.include_router(chat.router)
app.include_router(classification.router) app.include_router(classification.router)
app.include_router(review.router) app.include_router(review.router)
app.include_router(main_app.router) app.include_router(main_app.router)

View File

@ -285,6 +285,64 @@ Guidelines:
"""Get the context window size for this provider in tokens.""" """Get the context window size for this provider in tokens."""
return 4096 return 4096
def chat_with_tools(
self,
messages: list[dict[str, Any]],
tools: Optional[list[dict[str, Any]]] = None,
tool_choice: Optional[str] = "auto",
) -> dict[str, Any]:
"""
Send chat messages to LLM with optional tool definitions.
This method handles conversation-style interactions with the LLM,
including function calling/tool usage capabilities.
Args:
messages: List of message dictionaries. Each message should have:
- 'role': str - One of 'user', 'assistant', 'system', or 'tool'
- 'content': str - The message content
- 'tool_call_id': Optional[str] - For tool responses, the ID of the tool call
- 'name': Optional[str] - For tool messages, the tool name
tools: Optional list of tool definitions in OpenAI-compatible format.
Each tool should have 'type': 'function' and 'function' with:
- 'name': str - Tool name
- 'description': str - Tool description
- 'parameters': dict - JSON schema for parameters
tool_choice: How the model should handle tools:
- 'auto': Model decides whether to call tools
- 'none': Model must not call tools
- 'required': Model must call at least one tool
- Or a dict specifying a specific tool to call
**kwargs: Additional provider-specific parameters.
Returns:
Dictionary with:
- 'content': Optional[str] - The text response from the LLM, None if tool calls
- 'tool_calls': Optional[List[Dict]] - List of tool calls if LLM wants to call tools.
Each tool call dict has:
- 'id': str - Unique identifier for this tool call
- 'name': str - Tool name to call
- 'arguments': dict - Arguments for the tool call (parsed JSON)
- 'finish_reason': str - Reason generation stopped:
- 'stop': Normal completion
- 'tool_calls': LLM wants to call tools
- 'length': Hit token limit
- 'error': An error occurred
Raises:
NotImplementedError: If the provider doesn't implement this method.
"""
# Base implementation - each provider should override this
logger.warning(
f"{self.__class__.__name__} does not support chat_with_tools. "
"This method should be overridden by the provider implementation."
)
return {
"content": None,
"tool_calls": None,
"finish_reason": "error",
}
def get_genai_client(config: FrigateConfig) -> Optional[GenAIClient]: def get_genai_client(config: FrigateConfig) -> Optional[GenAIClient]:
"""Get the GenAI client.""" """Get the GenAI client."""

View File

@ -1,8 +1,9 @@
"""Azure OpenAI Provider for Frigate AI.""" """Azure OpenAI Provider for Frigate AI."""
import base64 import base64
import json
import logging import logging
from typing import Optional from typing import Any, Optional
from urllib.parse import parse_qs, urlparse from urllib.parse import parse_qs, urlparse
from openai import AzureOpenAI from openai import AzureOpenAI
@ -75,3 +76,93 @@ class OpenAIClient(GenAIClient):
def get_context_size(self) -> int: def get_context_size(self) -> int:
"""Get the context window size for Azure OpenAI.""" """Get the context window size for Azure OpenAI."""
return 128000 return 128000
def chat_with_tools(
self,
messages: list[dict[str, Any]],
tools: Optional[list[dict[str, Any]]] = None,
tool_choice: Optional[str] = "auto",
) -> dict[str, Any]:
try:
openai_tool_choice = None
if tool_choice:
if tool_choice == "none":
openai_tool_choice = "none"
elif tool_choice == "auto":
openai_tool_choice = "auto"
elif tool_choice == "required":
openai_tool_choice = "required"
request_params = {
"model": self.genai_config.model,
"messages": messages,
"timeout": self.timeout,
}
if tools:
request_params["tools"] = tools
if openai_tool_choice is not None:
request_params["tool_choice"] = openai_tool_choice
result = self.provider.chat.completions.create(**request_params)
if (
result is None
or not hasattr(result, "choices")
or len(result.choices) == 0
):
return {
"content": None,
"tool_calls": None,
"finish_reason": "error",
}
choice = result.choices[0]
message = choice.message
content = message.content.strip() if message.content else None
tool_calls = None
if message.tool_calls:
tool_calls = []
for tool_call in message.tool_calls:
try:
arguments = json.loads(tool_call.function.arguments)
except (json.JSONDecodeError, AttributeError) as e:
logger.warning(
f"Failed to parse tool call arguments: {e}, "
f"tool: {tool_call.function.name if hasattr(tool_call.function, 'name') else 'unknown'}"
)
arguments = {}
tool_calls.append(
{
"id": tool_call.id if hasattr(tool_call, "id") else "",
"name": tool_call.function.name
if hasattr(tool_call.function, "name")
else "",
"arguments": arguments,
}
)
finish_reason = "error"
if hasattr(choice, "finish_reason") and choice.finish_reason:
finish_reason = choice.finish_reason
elif tool_calls:
finish_reason = "tool_calls"
elif content:
finish_reason = "stop"
return {
"content": content,
"tool_calls": tool_calls,
"finish_reason": finish_reason,
}
except Exception as e:
logger.warning("Azure OpenAI returned an error: %s", str(e))
return {
"content": None,
"tool_calls": None,
"finish_reason": "error",
}

View File

@ -1,7 +1,8 @@
"""Gemini Provider for Frigate AI.""" """Gemini Provider for Frigate AI."""
import json
import logging import logging
from typing import Optional from typing import Any, Optional
import google.generativeai as genai import google.generativeai as genai
from google.api_core.exceptions import GoogleAPICallError from google.api_core.exceptions import GoogleAPICallError
@ -58,3 +59,188 @@ class GeminiClient(GenAIClient):
"""Get the context window size for Gemini.""" """Get the context window size for Gemini."""
# Gemini Pro Vision has a 1M token context window # Gemini Pro Vision has a 1M token context window
return 1000000 return 1000000
def chat_with_tools(
self,
messages: list[dict[str, Any]],
tools: Optional[list[dict[str, Any]]] = None,
tool_choice: Optional[str] = "auto",
) -> dict[str, Any]:
try:
if tools:
function_declarations = []
for tool in tools:
if tool.get("type") == "function":
func_def = tool.get("function", {})
function_declarations.append(
genai.protos.FunctionDeclaration(
name=func_def.get("name"),
description=func_def.get("description"),
parameters=genai.protos.Schema(
type=genai.protos.Type.OBJECT,
properties={
prop_name: genai.protos.Schema(
type=_convert_json_type_to_gemini(
prop.get("type")
),
description=prop.get("description"),
)
for prop_name, prop in func_def.get(
"parameters", {}
)
.get("properties", {})
.items()
},
required=func_def.get("parameters", {}).get(
"required", []
),
),
)
)
tool_config = genai.protos.Tool(
function_declarations=function_declarations
)
if tool_choice == "none":
function_calling_config = genai.protos.FunctionCallingConfig(
mode=genai.protos.FunctionCallingConfig.Mode.NONE
)
elif tool_choice == "required":
function_calling_config = genai.protos.FunctionCallingConfig(
mode=genai.protos.FunctionCallingConfig.Mode.ANY
)
else:
function_calling_config = genai.protos.FunctionCallingConfig(
mode=genai.protos.FunctionCallingConfig.Mode.AUTO
)
else:
tool_config = None
function_calling_config = None
contents = []
for msg in messages:
role = msg.get("role")
content = msg.get("content", "")
if role == "system":
continue
elif role == "user":
contents.append({"role": "user", "parts": [content]})
elif role == "assistant":
parts = [content] if content else []
if "tool_calls" in msg:
for tc in msg["tool_calls"]:
parts.append(
genai.protos.FunctionCall(
name=tc["function"]["name"],
args=json.loads(tc["function"]["arguments"]),
)
)
contents.append({"role": "model", "parts": parts})
elif role == "tool":
tool_name = msg.get("name", "")
tool_result = (
json.loads(content) if isinstance(content, str) else content
)
contents.append(
{
"role": "function",
"parts": [
genai.protos.FunctionResponse(
name=tool_name,
response=tool_result,
)
],
}
)
generation_config = genai.types.GenerationConfig(
candidate_count=1,
)
if function_calling_config:
generation_config.function_calling_config = function_calling_config
response = self.provider.generate_content(
contents,
tools=[tool_config] if tool_config else None,
generation_config=generation_config,
request_options=genai.types.RequestOptions(timeout=self.timeout),
)
content = None
tool_calls = None
if response.candidates and response.candidates[0].content:
parts = response.candidates[0].content.parts
text_parts = [p.text for p in parts if hasattr(p, "text") and p.text]
if text_parts:
content = " ".join(text_parts).strip()
function_calls = [
p.function_call
for p in parts
if hasattr(p, "function_call") and p.function_call
]
if function_calls:
tool_calls = []
for fc in function_calls:
tool_calls.append(
{
"id": f"call_{hash(fc.name)}",
"name": fc.name,
"arguments": dict(fc.args)
if hasattr(fc, "args")
else {},
}
)
finish_reason = "error"
if response.candidates:
finish_reason_map = {
genai.types.FinishReason.STOP: "stop",
genai.types.FinishReason.MAX_TOKENS: "length",
genai.types.FinishReason.SAFETY: "stop",
genai.types.FinishReason.RECITATION: "stop",
genai.types.FinishReason.OTHER: "error",
}
finish_reason = finish_reason_map.get(
response.candidates[0].finish_reason, "error"
)
elif tool_calls:
finish_reason = "tool_calls"
elif content:
finish_reason = "stop"
return {
"content": content,
"tool_calls": tool_calls,
"finish_reason": finish_reason,
}
except GoogleAPICallError as e:
logger.warning("Gemini returned an error: %s", str(e))
return {
"content": None,
"tool_calls": None,
"finish_reason": "error",
}
except Exception as e:
logger.warning("Unexpected error in Gemini chat_with_tools: %s", str(e))
return {
"content": None,
"tool_calls": None,
"finish_reason": "error",
}
def _convert_json_type_to_gemini(json_type: str) -> genai.protos.Type:
type_map = {
"string": genai.protos.Type.STRING,
"integer": genai.protos.Type.INTEGER,
"number": genai.protos.Type.NUMBER,
"boolean": genai.protos.Type.BOOLEAN,
"array": genai.protos.Type.ARRAY,
"object": genai.protos.Type.OBJECT,
}
return type_map.get(json_type, genai.protos.Type.STRING)

View File

@ -1,6 +1,7 @@
"""llama.cpp Provider for Frigate AI.""" """llama.cpp Provider for Frigate AI."""
import base64 import base64
import json
import logging import logging
from typing import Any, Optional from typing import Any, Optional
@ -99,3 +100,132 @@ class LlamaCppClient(GenAIClient):
def get_context_size(self) -> int: def get_context_size(self) -> int:
"""Get the context window size for llama.cpp.""" """Get the context window size for llama.cpp."""
return self.genai_config.provider_options.get("context_size", 4096) return self.genai_config.provider_options.get("context_size", 4096)
def chat_with_tools(
self,
messages: list[dict[str, Any]],
tools: Optional[list[dict[str, Any]]] = None,
tool_choice: Optional[str] = "auto",
) -> dict[str, Any]:
"""
Send chat messages to llama.cpp server with optional tool definitions.
Uses the OpenAI-compatible endpoint but passes through all native llama.cpp
parameters (like slot_id, temperature, etc.) via provider_options.
"""
if self.provider is None:
logger.warning(
"llama.cpp provider has not been initialized. Check your llama.cpp configuration."
)
return {
"content": None,
"tool_calls": None,
"finish_reason": "error",
}
try:
openai_tool_choice = None
if tool_choice:
if tool_choice == "none":
openai_tool_choice = "none"
elif tool_choice == "auto":
openai_tool_choice = "auto"
elif tool_choice == "required":
openai_tool_choice = "required"
payload = {
"messages": messages,
}
if tools:
payload["tools"] = tools
if openai_tool_choice is not None:
payload["tool_choice"] = openai_tool_choice
provider_opts = {
k: v for k, v in self.provider_options.items() if k != "context_size"
}
payload.update(provider_opts)
response = requests.post(
f"{self.provider}/v1/chat/completions",
json=payload,
timeout=self.timeout,
)
response.raise_for_status()
result = response.json()
if result is None or "choices" not in result or len(result["choices"]) == 0:
return {
"content": None,
"tool_calls": None,
"finish_reason": "error",
}
choice = result["choices"][0]
message = choice.get("message", {})
content = message.get("content")
if content:
content = content.strip()
else:
content = None
tool_calls = None
if "tool_calls" in message and message["tool_calls"]:
tool_calls = []
for tool_call in message["tool_calls"]:
try:
function_data = tool_call.get("function", {})
arguments_str = function_data.get("arguments", "{}")
arguments = json.loads(arguments_str)
except (json.JSONDecodeError, KeyError, TypeError) as e:
logger.warning(
f"Failed to parse tool call arguments: {e}, "
f"tool: {function_data.get('name', 'unknown')}"
)
arguments = {}
tool_calls.append(
{
"id": tool_call.get("id", ""),
"name": function_data.get("name", ""),
"arguments": arguments,
}
)
finish_reason = "error"
if "finish_reason" in choice and choice["finish_reason"]:
finish_reason = choice["finish_reason"]
elif tool_calls:
finish_reason = "tool_calls"
elif content:
finish_reason = "stop"
return {
"content": content,
"tool_calls": tool_calls,
"finish_reason": finish_reason,
}
except requests.exceptions.Timeout as e:
logger.warning("llama.cpp request timed out: %s", str(e))
return {
"content": None,
"tool_calls": None,
"finish_reason": "error",
}
except requests.exceptions.RequestException as e:
logger.warning("llama.cpp returned an error: %s", str(e))
return {
"content": None,
"tool_calls": None,
"finish_reason": "error",
}
except Exception as e:
logger.warning("Unexpected error in llama.cpp chat_with_tools: %s", str(e))
return {
"content": None,
"tool_calls": None,
"finish_reason": "error",
}

View File

@ -1,5 +1,6 @@
"""Ollama Provider for Frigate AI.""" """Ollama Provider for Frigate AI."""
import json
import logging import logging
from typing import Any, Optional from typing import Any, Optional
@ -77,3 +78,120 @@ class OllamaClient(GenAIClient):
return self.genai_config.provider_options.get("options", {}).get( return self.genai_config.provider_options.get("options", {}).get(
"num_ctx", 4096 "num_ctx", 4096
) )
def chat_with_tools(
self,
messages: list[dict[str, Any]],
tools: Optional[list[dict[str, Any]]] = None,
tool_choice: Optional[str] = "auto",
) -> dict[str, Any]:
if self.provider is None:
logger.warning(
"Ollama provider has not been initialized. Check your Ollama configuration."
)
return {
"content": None,
"tool_calls": None,
"finish_reason": "error",
}
try:
request_messages = []
for msg in messages:
msg_dict = {
"role": msg.get("role"),
"content": msg.get("content", ""),
}
if msg.get("tool_call_id"):
msg_dict["tool_call_id"] = msg["tool_call_id"]
if msg.get("name"):
msg_dict["name"] = msg["name"]
if msg.get("tool_calls"):
msg_dict["tool_calls"] = msg["tool_calls"]
request_messages.append(msg_dict)
request_params = {
"model": self.genai_config.model,
"messages": request_messages,
}
if tools:
request_params["tools"] = tools
if tool_choice:
if tool_choice == "none":
request_params["tool_choice"] = "none"
elif tool_choice == "required":
request_params["tool_choice"] = "required"
elif tool_choice == "auto":
request_params["tool_choice"] = "auto"
request_params.update(self.provider_options)
response = self.provider.chat(**request_params)
if not response or "message" not in response:
return {
"content": None,
"tool_calls": None,
"finish_reason": "error",
}
message = response["message"]
content = (
message.get("content", "").strip() if message.get("content") else None
)
tool_calls = None
if "tool_calls" in message and message["tool_calls"]:
tool_calls = []
for tool_call in message["tool_calls"]:
try:
function_data = tool_call.get("function", {})
arguments_str = function_data.get("arguments", "{}")
arguments = json.loads(arguments_str)
except (json.JSONDecodeError, KeyError, TypeError) as e:
logger.warning(
f"Failed to parse tool call arguments: {e}, "
f"tool: {function_data.get('name', 'unknown')}"
)
arguments = {}
tool_calls.append(
{
"id": tool_call.get("id", ""),
"name": function_data.get("name", ""),
"arguments": arguments,
}
)
finish_reason = "error"
if "done" in response and response["done"]:
if tool_calls:
finish_reason = "tool_calls"
elif content:
finish_reason = "stop"
elif tool_calls:
finish_reason = "tool_calls"
elif content:
finish_reason = "stop"
return {
"content": content,
"tool_calls": tool_calls,
"finish_reason": finish_reason,
}
except (TimeoutException, ResponseError, ConnectionError) as e:
logger.warning("Ollama returned an error: %s", str(e))
return {
"content": None,
"tool_calls": None,
"finish_reason": "error",
}
except Exception as e:
logger.warning("Unexpected error in Ollama chat_with_tools: %s", str(e))
return {
"content": None,
"tool_calls": None,
"finish_reason": "error",
}

View File

@ -1,8 +1,9 @@
"""OpenAI Provider for Frigate AI.""" """OpenAI Provider for Frigate AI."""
import base64 import base64
import json
import logging import logging
from typing import Optional from typing import Any, Optional
from httpx import TimeoutException from httpx import TimeoutException
from openai import OpenAI from openai import OpenAI
@ -100,3 +101,113 @@ class OpenAIClient(GenAIClient):
f"Using default context size {self.context_size} for model {self.genai_config.model}" f"Using default context size {self.context_size} for model {self.genai_config.model}"
) )
return self.context_size return self.context_size
def chat_with_tools(
self,
messages: list[dict[str, Any]],
tools: Optional[list[dict[str, Any]]] = None,
tool_choice: Optional[str] = "auto",
) -> dict[str, Any]:
"""
Send chat messages to OpenAI with optional tool definitions.
Implements function calling/tool usage for OpenAI models.
"""
try:
openai_tool_choice = None
if tool_choice:
if tool_choice == "none":
openai_tool_choice = "none"
elif tool_choice == "auto":
openai_tool_choice = "auto"
elif tool_choice == "required":
openai_tool_choice = "required"
request_params = {
"model": self.genai_config.model,
"messages": messages,
"timeout": self.timeout,
}
if tools:
request_params["tools"] = tools
if openai_tool_choice is not None:
request_params["tool_choice"] = openai_tool_choice
if isinstance(self.genai_config.provider_options, dict):
excluded_options = {"context_size"}
provider_opts = {
k: v
for k, v in self.genai_config.provider_options.items()
if k not in excluded_options
}
request_params.update(provider_opts)
result = self.provider.chat.completions.create(**request_params)
if (
result is None
or not hasattr(result, "choices")
or len(result.choices) == 0
):
return {
"content": None,
"tool_calls": None,
"finish_reason": "error",
}
choice = result.choices[0]
message = choice.message
content = message.content.strip() if message.content else None
tool_calls = None
if message.tool_calls:
tool_calls = []
for tool_call in message.tool_calls:
try:
arguments = json.loads(tool_call.function.arguments)
except (json.JSONDecodeError, AttributeError) as e:
logger.warning(
f"Failed to parse tool call arguments: {e}, "
f"tool: {tool_call.function.name if hasattr(tool_call.function, 'name') else 'unknown'}"
)
arguments = {}
tool_calls.append(
{
"id": tool_call.id if hasattr(tool_call, "id") else "",
"name": tool_call.function.name
if hasattr(tool_call.function, "name")
else "",
"arguments": arguments,
}
)
finish_reason = "error"
if hasattr(choice, "finish_reason") and choice.finish_reason:
finish_reason = choice.finish_reason
elif tool_calls:
finish_reason = "tool_calls"
elif content:
finish_reason = "stop"
return {
"content": content,
"tool_calls": tool_calls,
"finish_reason": finish_reason,
}
except TimeoutException as e:
logger.warning("OpenAI request timed out: %s", str(e))
return {
"content": None,
"tool_calls": None,
"finish_reason": "error",
}
except Exception as e:
logger.warning("OpenAI returned an error: %s", str(e))
return {
"content": None,
"tool_calls": None,
"finish_reason": "error",
}