diff --git a/frigate/api/chat.py b/frigate/api/chat.py new file mode 100644 index 000000000..eeff3ab6d --- /dev/null +++ b/frigate/api/chat.py @@ -0,0 +1,476 @@ +"""Chat and LLM tool calling APIs.""" + +import json +import logging +from datetime import datetime, timezone +from typing import Any, Dict, List + +from fastapi import APIRouter, Body, Depends, Request +from fastapi.responses import JSONResponse +from pydantic import BaseModel + +from frigate.api.auth import ( + allow_any_authenticated, + get_allowed_cameras_for_filter, +) +from frigate.api.defs.query.events_query_parameters import EventsQueryParams +from frigate.api.defs.request.chat_body import ChatCompletionRequest +from frigate.api.defs.response.chat_response import ( + ChatCompletionResponse, + ChatMessageResponse, +) +from frigate.api.defs.tags import Tags +from frigate.api.event import events +from frigate.genai import get_genai_client + +logger = logging.getLogger(__name__) + +router = APIRouter(tags=[Tags.chat]) + + +class ToolExecuteRequest(BaseModel): + """Request model for tool execution.""" + + tool_name: str + arguments: Dict[str, Any] + + +def get_tool_definitions() -> List[Dict[str, Any]]: + """ + Get OpenAI-compatible tool definitions for Frigate. + + Returns a list of tool definitions that can be used with OpenAI-compatible + function calling APIs. + """ + return [ + { + "type": "function", + "function": { + "name": "search_objects", + "description": ( + "Search for detected objects in Frigate by camera, object label, time range, " + "zones, and other filters. Use this to answer questions about when " + "objects were detected, what objects appeared, or to find specific object detections. " + "An 'object' in Frigate represents a tracked detection (e.g., a person, package, car)." + ), + "parameters": { + "type": "object", + "properties": { + "camera": { + "type": "string", + "description": "Camera name to filter by (optional). Use 'all' for all cameras.", + }, + "label": { + "type": "string", + "description": "Object label to filter by (e.g., 'person', 'package', 'car').", + }, + "after": { + "type": "string", + "description": "Start time in ISO 8601 format (e.g., '2024-01-01T00:00:00Z').", + }, + "before": { + "type": "string", + "description": "End time in ISO 8601 format (e.g., '2024-01-01T23:59:59Z').", + }, + "zones": { + "type": "array", + "items": {"type": "string"}, + "description": "List of zone names to filter by.", + }, + "limit": { + "type": "integer", + "description": "Maximum number of objects to return (default: 10).", + "default": 10, + }, + }, + }, + "required": [], + }, + }, + ] + + +@router.get( + "/chat/tools", + dependencies=[Depends(allow_any_authenticated())], + summary="Get available tools", + description="Returns OpenAI-compatible tool definitions for function calling.", +) +def get_tools(request: Request) -> JSONResponse: + """Get list of available tools for LLM function calling.""" + tools = get_tool_definitions() + return JSONResponse(content={"tools": tools}) + + +async def _execute_search_objects( + request: Request, + arguments: Dict[str, Any], + allowed_cameras: List[str], +) -> JSONResponse: + """ + Execute the search_objects tool. + + This searches for detected objects (events) in Frigate using the same + logic as the events API endpoint. + """ + # Parse ISO 8601 timestamps to Unix timestamps if provided + after = arguments.get("after") + before = arguments.get("before") + + if after: + try: + after_dt = datetime.fromisoformat(after.replace("Z", "+00:00")) + after = after_dt.timestamp() + except (ValueError, AttributeError): + logger.warning(f"Invalid 'after' timestamp format: {after}") + after = None + + if before: + try: + before_dt = datetime.fromisoformat(before.replace("Z", "+00:00")) + before = before_dt.timestamp() + except (ValueError, AttributeError): + logger.warning(f"Invalid 'before' timestamp format: {before}") + before = None + + # Convert zones array to comma-separated string if provided + zones = arguments.get("zones") + if isinstance(zones, list): + zones = ",".join(zones) + elif zones is None: + zones = "all" + + # Build query parameters compatible with EventsQueryParams + query_params = EventsQueryParams( + camera=arguments.get("camera", "all"), + cameras=arguments.get("camera", "all"), + label=arguments.get("label", "all"), + labels=arguments.get("label", "all"), + zones=zones, + zone=zones, + after=after, + before=before, + limit=arguments.get("limit", 10), + ) + + try: + # Call the events endpoint function directly + # The events function is synchronous and takes params and allowed_cameras + response = events(query_params, allowed_cameras) + + # The response is already a JSONResponse with event data + # Return it as-is for the LLM + return response + except Exception as e: + logger.error(f"Error executing search_objects: {e}", exc_info=True) + return JSONResponse( + content={ + "success": False, + "message": f"Error searching objects: {str(e)}", + }, + status_code=500, + ) + + +@router.post( + "/chat/execute", + dependencies=[Depends(allow_any_authenticated())], + summary="Execute a tool", + description="Execute a tool function call from an LLM.", +) +async def execute_tool( + request: Request, + body: ToolExecuteRequest = Body(...), + allowed_cameras: List[str] = Depends(get_allowed_cameras_for_filter), +) -> JSONResponse: + """ + Execute a tool function call. + + This endpoint receives tool calls from LLMs and executes the corresponding + Frigate operations, returning results in a format the LLM can understand. + """ + tool_name = body.tool_name + arguments = body.arguments + + logger.debug(f"Executing tool: {tool_name} with arguments: {arguments}") + + if tool_name == "search_objects": + return await _execute_search_objects(request, arguments, allowed_cameras) + + return JSONResponse( + content={ + "success": False, + "message": f"Unknown tool: {tool_name}", + "tool": tool_name, + }, + status_code=400, + ) + + +async def _execute_tool_internal( + tool_name: str, + arguments: Dict[str, Any], + request: Request, + allowed_cameras: List[str], +) -> Dict[str, Any]: + """ + Internal helper to execute a tool and return the result as a dict. + + This is used by the chat completion endpoint to execute tools. + """ + if tool_name == "search_objects": + response = await _execute_search_objects(request, arguments, allowed_cameras) + try: + if hasattr(response, "body"): + body_str = response.body.decode("utf-8") + return json.loads(body_str) + elif hasattr(response, "content"): + return response.content + else: + return {} + except (json.JSONDecodeError, AttributeError) as e: + logger.warning(f"Failed to extract tool result: {e}") + return {"error": "Failed to parse tool result"} + else: + return {"error": f"Unknown tool: {tool_name}"} + + +@router.post( + "/chat/completion", + response_model=ChatCompletionResponse, + dependencies=[Depends(allow_any_authenticated())], + summary="Chat completion with tool calling", + description=( + "Send a chat message to the configured GenAI provider with tool calling support. " + "The LLM can call Frigate tools to answer questions about your cameras and events." + ), +) +async def chat_completion( + request: Request, + body: ChatCompletionRequest = Body(...), + allowed_cameras: List[str] = Depends(get_allowed_cameras_for_filter), +) -> JSONResponse: + """ + Chat completion endpoint with tool calling support. + + This endpoint: + 1. Gets the configured GenAI client + 2. Gets tool definitions + 3. Sends messages + tools to LLM + 4. Handles tool_calls if present + 5. Executes tools and sends results back to LLM + 6. Repeats until final answer + 7. Returns response to user + """ + genai_client = get_genai_client(request.app.frigate_config) + if not genai_client: + return JSONResponse( + content={ + "error": "GenAI is not configured. Please configure a GenAI provider in your Frigate config.", + }, + status_code=400, + ) + + tools = get_tool_definitions() + conversation = [] + + current_datetime = datetime.now(timezone.utc) + current_date_str = current_datetime.strftime("%Y-%m-%d") + current_time_str = current_datetime.strftime("%H:%M:%S %Z") + system_prompt = f"""You are a helpful assistant for Frigate, a security camera NVR system. You help users answer questions about their cameras, detected objects, and events. + +Current date and time: {current_date_str} at {current_time_str} (UTC) + +When users ask questions about "today", "yesterday", "this week", etc., use the current date above as reference. +When searching for objects or events, use ISO 8601 format for dates (e.g., {current_date_str}T00:00:00Z for the start of today). +Always be accurate with time calculations based on the current date provided.""" + + conversation.append( + { + "role": "system", + "content": system_prompt, + } + ) + + for msg in body.messages: + msg_dict = { + "role": msg.role, + "content": msg.content, + } + if msg.tool_call_id: + msg_dict["tool_call_id"] = msg.tool_call_id + if msg.name: + msg_dict["name"] = msg.name + conversation.append(msg_dict) + + tool_iterations = 0 + max_iterations = body.max_tool_iterations + + logger.debug( + f"Starting chat completion with {len(conversation)} message(s), " + f"{len(tools)} tool(s) available, max_iterations={max_iterations}" + ) + + try: + while tool_iterations < max_iterations: + logger.debug( + f"Calling LLM (iteration {tool_iterations + 1}/{max_iterations}) " + f"with {len(conversation)} message(s) in conversation" + ) + response = genai_client.chat_with_tools( + messages=conversation, + tools=tools if tools else None, + tool_choice="auto", + ) + + if response.get("finish_reason") == "error": + logger.error("GenAI client returned an error") + return JSONResponse( + content={ + "error": "An error occurred while processing your request.", + }, + status_code=500, + ) + + assistant_message = { + "role": "assistant", + "content": response.get("content"), + } + if response.get("tool_calls"): + assistant_message["tool_calls"] = [ + { + "id": tc["id"], + "type": "function", + "function": { + "name": tc["name"], + "arguments": json.dumps(tc["arguments"]), + }, + } + for tc in response["tool_calls"] + ] + conversation.append(assistant_message) + + tool_calls = response.get("tool_calls") + if not tool_calls: + logger.debug( + f"Chat completion finished with final answer (iterations: {tool_iterations})" + ) + return JSONResponse( + content=ChatCompletionResponse( + message=ChatMessageResponse( + role="assistant", + content=response.get("content"), + tool_calls=None, + ), + finish_reason=response.get("finish_reason", "stop"), + tool_iterations=tool_iterations, + ).model_dump(), + ) + + # Execute tools + tool_iterations += 1 + logger.debug( + f"Tool calls detected (iteration {tool_iterations}/{max_iterations}): " + f"{len(tool_calls)} tool(s) to execute" + ) + tool_results = [] + + for tool_call in tool_calls: + tool_name = tool_call["name"] + tool_args = tool_call["arguments"] + tool_call_id = tool_call["id"] + + logger.debug( + f"Executing tool: {tool_name} (id: {tool_call_id}) with arguments: {json.dumps(tool_args, indent=2)}" + ) + + try: + tool_result = await _execute_tool_internal( + tool_name, tool_args, request, allowed_cameras + ) + + if isinstance(tool_result, dict): + result_content = json.dumps(tool_result) + result_summary = tool_result + if isinstance(tool_result, dict) and isinstance( + tool_result.get("content"), list + ): + result_count = len(tool_result.get("content", [])) + result_summary = { + "count": result_count, + "sample": tool_result.get("content", [])[:2] + if result_count > 0 + else [], + } + logger.debug( + f"Tool {tool_name} (id: {tool_call_id}) completed successfully. " + f"Result: {json.dumps(result_summary, indent=2)}" + ) + elif isinstance(tool_result, str): + result_content = tool_result + logger.debug( + f"Tool {tool_name} (id: {tool_call_id}) completed successfully. " + f"Result length: {len(result_content)} characters" + ) + else: + result_content = str(tool_result) + logger.debug( + f"Tool {tool_name} (id: {tool_call_id}) completed successfully. " + f"Result type: {type(tool_result).__name__}" + ) + + tool_results.append( + { + "role": "tool", + "tool_call_id": tool_call_id, + "content": result_content, + } + ) + except Exception as e: + logger.error( + f"Error executing tool {tool_name} (id: {tool_call_id}): {e}", + exc_info=True, + ) + error_content = json.dumps( + {"error": f"Tool execution failed: {str(e)}"} + ) + tool_results.append( + { + "role": "tool", + "tool_call_id": tool_call_id, + "content": error_content, + } + ) + logger.debug( + f"Tool {tool_name} (id: {tool_call_id}) failed. Error result added to conversation." + ) + + conversation.extend(tool_results) + logger.debug( + f"Added {len(tool_results)} tool result(s) to conversation. " + f"Continuing with next LLM call..." + ) + + logger.warning( + f"Max tool iterations ({max_iterations}) reached. Returning partial response." + ) + return JSONResponse( + content=ChatCompletionResponse( + message=ChatMessageResponse( + role="assistant", + content="I reached the maximum number of tool call iterations. Please try rephrasing your question.", + tool_calls=None, + ), + finish_reason="length", + tool_iterations=tool_iterations, + ).model_dump(), + ) + + except Exception as e: + logger.error(f"Error in chat completion: {e}", exc_info=True) + return JSONResponse( + content={ + "error": "An error occurred while processing your request.", + }, + status_code=500, + ) diff --git a/frigate/api/defs/request/chat_body.py b/frigate/api/defs/request/chat_body.py new file mode 100644 index 000000000..7b327bf5a --- /dev/null +++ b/frigate/api/defs/request/chat_body.py @@ -0,0 +1,34 @@ +"""Chat API request models.""" + +from typing import Optional + +from pydantic import BaseModel, Field + + +class ChatMessage(BaseModel): + """A single message in a chat conversation.""" + + role: str = Field( + description="Message role: 'user', 'assistant', 'system', or 'tool'" + ) + content: str = Field(description="Message content") + tool_call_id: Optional[str] = Field( + default=None, description="For tool messages, the ID of the tool call" + ) + name: Optional[str] = Field( + default=None, description="For tool messages, the tool name" + ) + + +class ChatCompletionRequest(BaseModel): + """Request for chat completion with tool calling.""" + + messages: list[ChatMessage] = Field( + description="List of messages in the conversation" + ) + max_tool_iterations: int = Field( + default=5, + ge=1, + le=10, + description="Maximum number of tool call iterations (default: 5)", + ) diff --git a/frigate/api/defs/response/chat_response.py b/frigate/api/defs/response/chat_response.py new file mode 100644 index 000000000..f1cc9194b --- /dev/null +++ b/frigate/api/defs/response/chat_response.py @@ -0,0 +1,37 @@ +"""Chat API response models.""" + +from typing import Any, Optional + +from pydantic import BaseModel, Field + + +class ToolCall(BaseModel): + """A tool call from the LLM.""" + + id: str = Field(description="Unique identifier for this tool call") + name: str = Field(description="Tool name to call") + arguments: dict[str, Any] = Field(description="Arguments for the tool call") + + +class ChatMessageResponse(BaseModel): + """A message in the chat response.""" + + role: str = Field(description="Message role") + content: Optional[str] = Field( + default=None, description="Message content (None if tool calls present)" + ) + tool_calls: Optional[list[ToolCall]] = Field( + default=None, description="Tool calls if LLM wants to call tools" + ) + + +class ChatCompletionResponse(BaseModel): + """Response from chat completion.""" + + message: ChatMessageResponse = Field(description="The assistant's message") + finish_reason: str = Field( + description="Reason generation stopped: 'stop', 'tool_calls', 'length', 'error'" + ) + tool_iterations: int = Field( + default=0, description="Number of tool call iterations performed" + ) diff --git a/frigate/api/defs/tags.py b/frigate/api/defs/tags.py index 20e4ac31b..3aaaa59ef 100644 --- a/frigate/api/defs/tags.py +++ b/frigate/api/defs/tags.py @@ -5,6 +5,7 @@ class Tags(Enum): app = "App" auth = "Auth" camera = "Camera" + chat = "Chat" events = "Events" export = "Export" classification = "Classification" diff --git a/frigate/api/fastapi_app.py b/frigate/api/fastapi_app.py index 27d844b8a..496c8fada 100644 --- a/frigate/api/fastapi_app.py +++ b/frigate/api/fastapi_app.py @@ -16,6 +16,7 @@ from frigate.api import app as main_app from frigate.api import ( auth, camera, + chat, classification, event, export, @@ -121,6 +122,7 @@ def create_fastapi_app( # Order of include_router matters: https://fastapi.tiangolo.com/tutorial/path-params/#order-matters app.include_router(auth.router) app.include_router(camera.router) + app.include_router(chat.router) app.include_router(classification.router) app.include_router(review.router) app.include_router(main_app.router) diff --git a/frigate/genai/__init__.py b/frigate/genai/__init__.py index 910fc13b9..4be75f418 100644 --- a/frigate/genai/__init__.py +++ b/frigate/genai/__init__.py @@ -285,6 +285,64 @@ Guidelines: """Get the context window size for this provider in tokens.""" return 4096 + def chat_with_tools( + self, + messages: list[dict[str, Any]], + tools: Optional[list[dict[str, Any]]] = None, + tool_choice: Optional[str] = "auto", + ) -> dict[str, Any]: + """ + Send chat messages to LLM with optional tool definitions. + + This method handles conversation-style interactions with the LLM, + including function calling/tool usage capabilities. + + Args: + messages: List of message dictionaries. Each message should have: + - 'role': str - One of 'user', 'assistant', 'system', or 'tool' + - 'content': str - The message content + - 'tool_call_id': Optional[str] - For tool responses, the ID of the tool call + - 'name': Optional[str] - For tool messages, the tool name + tools: Optional list of tool definitions in OpenAI-compatible format. + Each tool should have 'type': 'function' and 'function' with: + - 'name': str - Tool name + - 'description': str - Tool description + - 'parameters': dict - JSON schema for parameters + tool_choice: How the model should handle tools: + - 'auto': Model decides whether to call tools + - 'none': Model must not call tools + - 'required': Model must call at least one tool + - Or a dict specifying a specific tool to call + **kwargs: Additional provider-specific parameters. + + Returns: + Dictionary with: + - 'content': Optional[str] - The text response from the LLM, None if tool calls + - 'tool_calls': Optional[List[Dict]] - List of tool calls if LLM wants to call tools. + Each tool call dict has: + - 'id': str - Unique identifier for this tool call + - 'name': str - Tool name to call + - 'arguments': dict - Arguments for the tool call (parsed JSON) + - 'finish_reason': str - Reason generation stopped: + - 'stop': Normal completion + - 'tool_calls': LLM wants to call tools + - 'length': Hit token limit + - 'error': An error occurred + + Raises: + NotImplementedError: If the provider doesn't implement this method. + """ + # Base implementation - each provider should override this + logger.warning( + f"{self.__class__.__name__} does not support chat_with_tools. " + "This method should be overridden by the provider implementation." + ) + return { + "content": None, + "tool_calls": None, + "finish_reason": "error", + } + def get_genai_client(config: FrigateConfig) -> Optional[GenAIClient]: """Get the GenAI client.""" diff --git a/frigate/genai/azure-openai.py b/frigate/genai/azure-openai.py index eba8b47c0..78ed376e5 100644 --- a/frigate/genai/azure-openai.py +++ b/frigate/genai/azure-openai.py @@ -1,8 +1,9 @@ """Azure OpenAI Provider for Frigate AI.""" import base64 +import json import logging -from typing import Optional +from typing import Any, Optional from urllib.parse import parse_qs, urlparse from openai import AzureOpenAI @@ -75,3 +76,93 @@ class OpenAIClient(GenAIClient): def get_context_size(self) -> int: """Get the context window size for Azure OpenAI.""" return 128000 + + def chat_with_tools( + self, + messages: list[dict[str, Any]], + tools: Optional[list[dict[str, Any]]] = None, + tool_choice: Optional[str] = "auto", + ) -> dict[str, Any]: + try: + openai_tool_choice = None + if tool_choice: + if tool_choice == "none": + openai_tool_choice = "none" + elif tool_choice == "auto": + openai_tool_choice = "auto" + elif tool_choice == "required": + openai_tool_choice = "required" + + request_params = { + "model": self.genai_config.model, + "messages": messages, + "timeout": self.timeout, + } + + if tools: + request_params["tools"] = tools + if openai_tool_choice is not None: + request_params["tool_choice"] = openai_tool_choice + + result = self.provider.chat.completions.create(**request_params) + + if ( + result is None + or not hasattr(result, "choices") + or len(result.choices) == 0 + ): + return { + "content": None, + "tool_calls": None, + "finish_reason": "error", + } + + choice = result.choices[0] + message = choice.message + + content = message.content.strip() if message.content else None + + tool_calls = None + if message.tool_calls: + tool_calls = [] + for tool_call in message.tool_calls: + try: + arguments = json.loads(tool_call.function.arguments) + except (json.JSONDecodeError, AttributeError) as e: + logger.warning( + f"Failed to parse tool call arguments: {e}, " + f"tool: {tool_call.function.name if hasattr(tool_call.function, 'name') else 'unknown'}" + ) + arguments = {} + + tool_calls.append( + { + "id": tool_call.id if hasattr(tool_call, "id") else "", + "name": tool_call.function.name + if hasattr(tool_call.function, "name") + else "", + "arguments": arguments, + } + ) + + finish_reason = "error" + if hasattr(choice, "finish_reason") and choice.finish_reason: + finish_reason = choice.finish_reason + elif tool_calls: + finish_reason = "tool_calls" + elif content: + finish_reason = "stop" + + return { + "content": content, + "tool_calls": tool_calls, + "finish_reason": finish_reason, + } + + except Exception as e: + logger.warning("Azure OpenAI returned an error: %s", str(e)) + return { + "content": None, + "tool_calls": None, + "finish_reason": "error", + } diff --git a/frigate/genai/gemini.py b/frigate/genai/gemini.py index f94448d75..e5cf39e4d 100644 --- a/frigate/genai/gemini.py +++ b/frigate/genai/gemini.py @@ -1,7 +1,8 @@ """Gemini Provider for Frigate AI.""" +import json import logging -from typing import Optional +from typing import Any, Optional import google.generativeai as genai from google.api_core.exceptions import GoogleAPICallError @@ -58,3 +59,188 @@ class GeminiClient(GenAIClient): """Get the context window size for Gemini.""" # Gemini Pro Vision has a 1M token context window return 1000000 + + def chat_with_tools( + self, + messages: list[dict[str, Any]], + tools: Optional[list[dict[str, Any]]] = None, + tool_choice: Optional[str] = "auto", + ) -> dict[str, Any]: + try: + if tools: + function_declarations = [] + for tool in tools: + if tool.get("type") == "function": + func_def = tool.get("function", {}) + function_declarations.append( + genai.protos.FunctionDeclaration( + name=func_def.get("name"), + description=func_def.get("description"), + parameters=genai.protos.Schema( + type=genai.protos.Type.OBJECT, + properties={ + prop_name: genai.protos.Schema( + type=_convert_json_type_to_gemini( + prop.get("type") + ), + description=prop.get("description"), + ) + for prop_name, prop in func_def.get( + "parameters", {} + ) + .get("properties", {}) + .items() + }, + required=func_def.get("parameters", {}).get( + "required", [] + ), + ), + ) + ) + + tool_config = genai.protos.Tool( + function_declarations=function_declarations + ) + + if tool_choice == "none": + function_calling_config = genai.protos.FunctionCallingConfig( + mode=genai.protos.FunctionCallingConfig.Mode.NONE + ) + elif tool_choice == "required": + function_calling_config = genai.protos.FunctionCallingConfig( + mode=genai.protos.FunctionCallingConfig.Mode.ANY + ) + else: + function_calling_config = genai.protos.FunctionCallingConfig( + mode=genai.protos.FunctionCallingConfig.Mode.AUTO + ) + else: + tool_config = None + function_calling_config = None + + contents = [] + for msg in messages: + role = msg.get("role") + content = msg.get("content", "") + + if role == "system": + continue + elif role == "user": + contents.append({"role": "user", "parts": [content]}) + elif role == "assistant": + parts = [content] if content else [] + if "tool_calls" in msg: + for tc in msg["tool_calls"]: + parts.append( + genai.protos.FunctionCall( + name=tc["function"]["name"], + args=json.loads(tc["function"]["arguments"]), + ) + ) + contents.append({"role": "model", "parts": parts}) + elif role == "tool": + tool_name = msg.get("name", "") + tool_result = ( + json.loads(content) if isinstance(content, str) else content + ) + contents.append( + { + "role": "function", + "parts": [ + genai.protos.FunctionResponse( + name=tool_name, + response=tool_result, + ) + ], + } + ) + + generation_config = genai.types.GenerationConfig( + candidate_count=1, + ) + if function_calling_config: + generation_config.function_calling_config = function_calling_config + + response = self.provider.generate_content( + contents, + tools=[tool_config] if tool_config else None, + generation_config=generation_config, + request_options=genai.types.RequestOptions(timeout=self.timeout), + ) + + content = None + tool_calls = None + + if response.candidates and response.candidates[0].content: + parts = response.candidates[0].content.parts + text_parts = [p.text for p in parts if hasattr(p, "text") and p.text] + if text_parts: + content = " ".join(text_parts).strip() + + function_calls = [ + p.function_call + for p in parts + if hasattr(p, "function_call") and p.function_call + ] + if function_calls: + tool_calls = [] + for fc in function_calls: + tool_calls.append( + { + "id": f"call_{hash(fc.name)}", + "name": fc.name, + "arguments": dict(fc.args) + if hasattr(fc, "args") + else {}, + } + ) + + finish_reason = "error" + if response.candidates: + finish_reason_map = { + genai.types.FinishReason.STOP: "stop", + genai.types.FinishReason.MAX_TOKENS: "length", + genai.types.FinishReason.SAFETY: "stop", + genai.types.FinishReason.RECITATION: "stop", + genai.types.FinishReason.OTHER: "error", + } + finish_reason = finish_reason_map.get( + response.candidates[0].finish_reason, "error" + ) + elif tool_calls: + finish_reason = "tool_calls" + elif content: + finish_reason = "stop" + + return { + "content": content, + "tool_calls": tool_calls, + "finish_reason": finish_reason, + } + + except GoogleAPICallError as e: + logger.warning("Gemini returned an error: %s", str(e)) + return { + "content": None, + "tool_calls": None, + "finish_reason": "error", + } + except Exception as e: + logger.warning("Unexpected error in Gemini chat_with_tools: %s", str(e)) + return { + "content": None, + "tool_calls": None, + "finish_reason": "error", + } + + +def _convert_json_type_to_gemini(json_type: str) -> genai.protos.Type: + type_map = { + "string": genai.protos.Type.STRING, + "integer": genai.protos.Type.INTEGER, + "number": genai.protos.Type.NUMBER, + "boolean": genai.protos.Type.BOOLEAN, + "array": genai.protos.Type.ARRAY, + "object": genai.protos.Type.OBJECT, + } + return type_map.get(json_type, genai.protos.Type.STRING) diff --git a/frigate/genai/llama_cpp.py b/frigate/genai/llama_cpp.py index 45e364bc0..5523ce389 100644 --- a/frigate/genai/llama_cpp.py +++ b/frigate/genai/llama_cpp.py @@ -1,6 +1,7 @@ """llama.cpp Provider for Frigate AI.""" import base64 +import json import logging from typing import Any, Optional @@ -99,3 +100,132 @@ class LlamaCppClient(GenAIClient): def get_context_size(self) -> int: """Get the context window size for llama.cpp.""" return self.genai_config.provider_options.get("context_size", 4096) + + def chat_with_tools( + self, + messages: list[dict[str, Any]], + tools: Optional[list[dict[str, Any]]] = None, + tool_choice: Optional[str] = "auto", + ) -> dict[str, Any]: + """ + Send chat messages to llama.cpp server with optional tool definitions. + + Uses the OpenAI-compatible endpoint but passes through all native llama.cpp + parameters (like slot_id, temperature, etc.) via provider_options. + """ + if self.provider is None: + logger.warning( + "llama.cpp provider has not been initialized. Check your llama.cpp configuration." + ) + return { + "content": None, + "tool_calls": None, + "finish_reason": "error", + } + + try: + openai_tool_choice = None + if tool_choice: + if tool_choice == "none": + openai_tool_choice = "none" + elif tool_choice == "auto": + openai_tool_choice = "auto" + elif tool_choice == "required": + openai_tool_choice = "required" + + payload = { + "messages": messages, + } + + if tools: + payload["tools"] = tools + if openai_tool_choice is not None: + payload["tool_choice"] = openai_tool_choice + + provider_opts = { + k: v for k, v in self.provider_options.items() if k != "context_size" + } + payload.update(provider_opts) + + response = requests.post( + f"{self.provider}/v1/chat/completions", + json=payload, + timeout=self.timeout, + ) + response.raise_for_status() + result = response.json() + + if result is None or "choices" not in result or len(result["choices"]) == 0: + return { + "content": None, + "tool_calls": None, + "finish_reason": "error", + } + + choice = result["choices"][0] + message = choice.get("message", {}) + + content = message.get("content") + if content: + content = content.strip() + else: + content = None + + tool_calls = None + if "tool_calls" in message and message["tool_calls"]: + tool_calls = [] + for tool_call in message["tool_calls"]: + try: + function_data = tool_call.get("function", {}) + arguments_str = function_data.get("arguments", "{}") + arguments = json.loads(arguments_str) + except (json.JSONDecodeError, KeyError, TypeError) as e: + logger.warning( + f"Failed to parse tool call arguments: {e}, " + f"tool: {function_data.get('name', 'unknown')}" + ) + arguments = {} + + tool_calls.append( + { + "id": tool_call.get("id", ""), + "name": function_data.get("name", ""), + "arguments": arguments, + } + ) + + finish_reason = "error" + if "finish_reason" in choice and choice["finish_reason"]: + finish_reason = choice["finish_reason"] + elif tool_calls: + finish_reason = "tool_calls" + elif content: + finish_reason = "stop" + + return { + "content": content, + "tool_calls": tool_calls, + "finish_reason": finish_reason, + } + + except requests.exceptions.Timeout as e: + logger.warning("llama.cpp request timed out: %s", str(e)) + return { + "content": None, + "tool_calls": None, + "finish_reason": "error", + } + except requests.exceptions.RequestException as e: + logger.warning("llama.cpp returned an error: %s", str(e)) + return { + "content": None, + "tool_calls": None, + "finish_reason": "error", + } + except Exception as e: + logger.warning("Unexpected error in llama.cpp chat_with_tools: %s", str(e)) + return { + "content": None, + "tool_calls": None, + "finish_reason": "error", + } diff --git a/frigate/genai/ollama.py b/frigate/genai/ollama.py index 9f9c8a750..84f5148e5 100644 --- a/frigate/genai/ollama.py +++ b/frigate/genai/ollama.py @@ -1,5 +1,6 @@ """Ollama Provider for Frigate AI.""" +import json import logging from typing import Any, Optional @@ -77,3 +78,120 @@ class OllamaClient(GenAIClient): return self.genai_config.provider_options.get("options", {}).get( "num_ctx", 4096 ) + + def chat_with_tools( + self, + messages: list[dict[str, Any]], + tools: Optional[list[dict[str, Any]]] = None, + tool_choice: Optional[str] = "auto", + ) -> dict[str, Any]: + if self.provider is None: + logger.warning( + "Ollama provider has not been initialized. Check your Ollama configuration." + ) + return { + "content": None, + "tool_calls": None, + "finish_reason": "error", + } + + try: + request_messages = [] + for msg in messages: + msg_dict = { + "role": msg.get("role"), + "content": msg.get("content", ""), + } + if msg.get("tool_call_id"): + msg_dict["tool_call_id"] = msg["tool_call_id"] + if msg.get("name"): + msg_dict["name"] = msg["name"] + if msg.get("tool_calls"): + msg_dict["tool_calls"] = msg["tool_calls"] + request_messages.append(msg_dict) + + request_params = { + "model": self.genai_config.model, + "messages": request_messages, + } + + if tools: + request_params["tools"] = tools + if tool_choice: + if tool_choice == "none": + request_params["tool_choice"] = "none" + elif tool_choice == "required": + request_params["tool_choice"] = "required" + elif tool_choice == "auto": + request_params["tool_choice"] = "auto" + + request_params.update(self.provider_options) + + response = self.provider.chat(**request_params) + + if not response or "message" not in response: + return { + "content": None, + "tool_calls": None, + "finish_reason": "error", + } + + message = response["message"] + content = ( + message.get("content", "").strip() if message.get("content") else None + ) + + tool_calls = None + if "tool_calls" in message and message["tool_calls"]: + tool_calls = [] + for tool_call in message["tool_calls"]: + try: + function_data = tool_call.get("function", {}) + arguments_str = function_data.get("arguments", "{}") + arguments = json.loads(arguments_str) + except (json.JSONDecodeError, KeyError, TypeError) as e: + logger.warning( + f"Failed to parse tool call arguments: {e}, " + f"tool: {function_data.get('name', 'unknown')}" + ) + arguments = {} + + tool_calls.append( + { + "id": tool_call.get("id", ""), + "name": function_data.get("name", ""), + "arguments": arguments, + } + ) + + finish_reason = "error" + if "done" in response and response["done"]: + if tool_calls: + finish_reason = "tool_calls" + elif content: + finish_reason = "stop" + elif tool_calls: + finish_reason = "tool_calls" + elif content: + finish_reason = "stop" + + return { + "content": content, + "tool_calls": tool_calls, + "finish_reason": finish_reason, + } + + except (TimeoutException, ResponseError, ConnectionError) as e: + logger.warning("Ollama returned an error: %s", str(e)) + return { + "content": None, + "tool_calls": None, + "finish_reason": "error", + } + except Exception as e: + logger.warning("Unexpected error in Ollama chat_with_tools: %s", str(e)) + return { + "content": None, + "tool_calls": None, + "finish_reason": "error", + } diff --git a/frigate/genai/openai.py b/frigate/genai/openai.py index 631cb3480..1998b6b60 100644 --- a/frigate/genai/openai.py +++ b/frigate/genai/openai.py @@ -1,8 +1,9 @@ """OpenAI Provider for Frigate AI.""" import base64 +import json import logging -from typing import Optional +from typing import Any, Optional from httpx import TimeoutException from openai import OpenAI @@ -100,3 +101,113 @@ class OpenAIClient(GenAIClient): f"Using default context size {self.context_size} for model {self.genai_config.model}" ) return self.context_size + + def chat_with_tools( + self, + messages: list[dict[str, Any]], + tools: Optional[list[dict[str, Any]]] = None, + tool_choice: Optional[str] = "auto", + ) -> dict[str, Any]: + """ + Send chat messages to OpenAI with optional tool definitions. + + Implements function calling/tool usage for OpenAI models. + """ + try: + openai_tool_choice = None + if tool_choice: + if tool_choice == "none": + openai_tool_choice = "none" + elif tool_choice == "auto": + openai_tool_choice = "auto" + elif tool_choice == "required": + openai_tool_choice = "required" + + request_params = { + "model": self.genai_config.model, + "messages": messages, + "timeout": self.timeout, + } + + if tools: + request_params["tools"] = tools + if openai_tool_choice is not None: + request_params["tool_choice"] = openai_tool_choice + + if isinstance(self.genai_config.provider_options, dict): + excluded_options = {"context_size"} + provider_opts = { + k: v + for k, v in self.genai_config.provider_options.items() + if k not in excluded_options + } + request_params.update(provider_opts) + + result = self.provider.chat.completions.create(**request_params) + + if ( + result is None + or not hasattr(result, "choices") + or len(result.choices) == 0 + ): + return { + "content": None, + "tool_calls": None, + "finish_reason": "error", + } + + choice = result.choices[0] + message = choice.message + content = message.content.strip() if message.content else None + + tool_calls = None + if message.tool_calls: + tool_calls = [] + for tool_call in message.tool_calls: + try: + arguments = json.loads(tool_call.function.arguments) + except (json.JSONDecodeError, AttributeError) as e: + logger.warning( + f"Failed to parse tool call arguments: {e}, " + f"tool: {tool_call.function.name if hasattr(tool_call.function, 'name') else 'unknown'}" + ) + arguments = {} + + tool_calls.append( + { + "id": tool_call.id if hasattr(tool_call, "id") else "", + "name": tool_call.function.name + if hasattr(tool_call.function, "name") + else "", + "arguments": arguments, + } + ) + + finish_reason = "error" + if hasattr(choice, "finish_reason") and choice.finish_reason: + finish_reason = choice.finish_reason + elif tool_calls: + finish_reason = "tool_calls" + elif content: + finish_reason = "stop" + + return { + "content": content, + "tool_calls": tool_calls, + "finish_reason": finish_reason, + } + + except TimeoutException as e: + logger.warning("OpenAI request timed out: %s", str(e)) + return { + "content": None, + "tool_calls": None, + "finish_reason": "error", + } + except Exception as e: + logger.warning("OpenAI returned an error: %s", str(e)) + return { + "content": None, + "tool_calls": None, + "finish_reason": "error", + }