frigate/frigate/genai/llama_cpp.py

443 lines
16 KiB
Python
Raw Normal View History

"""llama.cpp Provider for Frigate AI."""
import base64
import json
import logging
from typing import Any, Optional
import httpx
2026-02-19 18:13:50 +03:00
import numpy as np
import requests
from frigate.config import GenAIProviderEnum
from frigate.genai import GenAIClient, register_genai_provider
from frigate.genai.utils import parse_tool_calls_from_message
logger = logging.getLogger(__name__)
@register_genai_provider(GenAIProviderEnum.llamacpp)
class LlamaCppClient(GenAIClient):
"""Generative AI client for Frigate using llama.cpp server."""
LOCAL_OPTIMIZED_OPTIONS = {
"temperature": 0.7,
"repeat_penalty": 1.05,
"top_p": 0.8,
}
provider: str # base_url
provider_options: dict[str, Any]
def _init_provider(self):
"""Initialize the client."""
self.provider_options = {
**self.LOCAL_OPTIMIZED_OPTIONS,
**self.genai_config.provider_options,
}
return (
self.genai_config.base_url.rstrip("/")
if self.genai_config.base_url
else None
)
def _send(self, prompt: str, images: list[bytes]) -> Optional[str]:
"""Submit a request to llama.cpp server."""
if self.provider is None:
logger.warning(
"llama.cpp provider has not been initialized, a description will not be generated. Check your llama.cpp configuration."
)
return None
try:
content = []
for image in images:
encoded_image = base64.b64encode(image).decode("utf-8")
content.append(
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{encoded_image}",
},
}
)
content.append(
{
"type": "text",
"text": prompt,
}
)
# Build request payload with llama.cpp native options
payload = {
"model": self.genai_config.model,
"messages": [
{
"role": "user",
"content": content,
},
],
**self.provider_options,
}
response = requests.post(
f"{self.provider}/v1/chat/completions",
json=payload,
timeout=self.timeout,
)
response.raise_for_status()
result = response.json()
if (
result is not None
and "choices" in result
and len(result["choices"]) > 0
):
choice = result["choices"][0]
if "message" in choice and "content" in choice["message"]:
return choice["message"]["content"].strip()
return None
except Exception as e:
logger.warning("llama.cpp returned an error: %s", str(e))
return None
def get_context_size(self) -> int:
"""Get the context window size for llama.cpp."""
return int(self.provider_options.get("context_size", 4096))
def _build_payload(
self,
messages: list[dict[str, Any]],
tools: Optional[list[dict[str, Any]]],
tool_choice: Optional[str],
stream: bool = False,
) -> dict[str, Any]:
"""Build request payload for chat completions (sync or stream)."""
openai_tool_choice = None
if tool_choice:
if tool_choice == "none":
openai_tool_choice = "none"
elif tool_choice == "auto":
openai_tool_choice = "auto"
elif tool_choice == "required":
openai_tool_choice = "required"
payload: dict[str, Any] = {
"messages": messages,
"model": self.genai_config.model,
}
if stream:
payload["stream"] = True
if tools:
payload["tools"] = tools
if openai_tool_choice is not None:
payload["tool_choice"] = openai_tool_choice
provider_opts = {
k: v for k, v in self.provider_options.items() if k != "context_size"
}
payload.update(provider_opts)
return payload
def _message_from_choice(self, choice: dict[str, Any]) -> dict[str, Any]:
"""Parse OpenAI-style choice into {content, tool_calls, finish_reason}."""
message = choice.get("message", {})
content = message.get("content")
content = content.strip() if content else None
tool_calls = parse_tool_calls_from_message(message)
finish_reason = choice.get("finish_reason") or (
"tool_calls" if tool_calls else "stop" if content else "error"
)
return {
"content": content,
"tool_calls": tool_calls,
"finish_reason": finish_reason,
}
@staticmethod
def _streamed_tool_calls_to_list(
tool_calls_by_index: dict[int, dict[str, Any]],
) -> Optional[list[dict[str, Any]]]:
"""Convert streamed tool_calls index map to list of {id, name, arguments}."""
if not tool_calls_by_index:
return None
result = []
for idx in sorted(tool_calls_by_index.keys()):
t = tool_calls_by_index[idx]
args_str = t.get("arguments") or "{}"
try:
arguments = json.loads(args_str)
except json.JSONDecodeError:
arguments = {}
result.append(
{
"id": t.get("id", ""),
"name": t.get("name", ""),
"arguments": arguments,
}
)
return result if result else None
2026-02-19 18:13:50 +03:00
def embed(
self,
texts: list[str] | None = None,
images: list[bytes] | None = None,
) -> list[np.ndarray]:
"""Generate embeddings via llama.cpp /embeddings endpoint.
Supports batch requests. Uses content format with prompt_string and
multimodal_data for images (PR #15108). Server must be started with
--embeddings and --mmproj for multimodal support.
"""
if self.provider is None:
logger.warning(
"llama.cpp provider has not been initialized. Check your llama.cpp configuration."
)
return []
texts = texts or []
images = images or []
if not texts and not images:
return []
EMBEDDING_DIM = 768
content = []
for text in texts:
content.append({"prompt_string": text})
for img in images:
encoded = base64.b64encode(img).decode("utf-8")
content.append({"prompt_string": "", "multimodal_data": [encoded]})
try:
response = requests.post(
f"{self.provider}/embeddings",
json={"content": content},
timeout=self.timeout,
)
response.raise_for_status()
result = response.json()
items = result.get("data", result) if isinstance(result, dict) else result
if not isinstance(items, list):
logger.warning("llama.cpp embeddings returned unexpected format")
return []
embeddings = []
for item in items:
emb = item.get("embedding") if isinstance(item, dict) else None
if emb is None:
logger.warning("llama.cpp embeddings item missing embedding field")
continue
arr = np.array(emb, dtype=np.float32)
orig_dim = arr.size
if orig_dim != EMBEDDING_DIM:
if orig_dim > EMBEDDING_DIM:
arr = arr[:EMBEDDING_DIM]
logger.debug(
"Truncated llama.cpp embedding from %d to %d dimensions",
orig_dim,
EMBEDDING_DIM,
)
else:
arr = np.pad(
arr,
(0, EMBEDDING_DIM - orig_dim),
mode="constant",
constant_values=0,
)
logger.debug(
"Padded llama.cpp embedding from %d to %d dimensions",
orig_dim,
EMBEDDING_DIM,
)
embeddings.append(arr)
return embeddings
except requests.exceptions.Timeout:
logger.warning("llama.cpp embeddings request timed out")
return []
except requests.exceptions.RequestException as e:
error_detail = str(e)
if hasattr(e, "response") and e.response is not None:
try:
error_detail = f"{str(e)} - Response: {e.response.text[:500]}"
except Exception:
pass
logger.warning("llama.cpp embeddings error: %s", error_detail)
return []
except Exception as e:
logger.warning("Unexpected error in llama.cpp embeddings: %s", str(e))
return []
def chat_with_tools(
self,
messages: list[dict[str, Any]],
tools: Optional[list[dict[str, Any]]] = None,
tool_choice: Optional[str] = "auto",
) -> dict[str, Any]:
"""
Send chat messages to llama.cpp server with optional tool definitions.
Uses the OpenAI-compatible endpoint but passes through all native llama.cpp
parameters (like slot_id, temperature, etc.) via provider_options.
"""
if self.provider is None:
logger.warning(
"llama.cpp provider has not been initialized. Check your llama.cpp configuration."
)
return {
"content": None,
"tool_calls": None,
"finish_reason": "error",
}
try:
payload = self._build_payload(messages, tools, tool_choice, stream=False)
response = requests.post(
f"{self.provider}/v1/chat/completions",
json=payload,
timeout=self.timeout,
)
response.raise_for_status()
result = response.json()
if result is None or "choices" not in result or len(result["choices"]) == 0:
return {
"content": None,
"tool_calls": None,
"finish_reason": "error",
}
return self._message_from_choice(result["choices"][0])
except requests.exceptions.Timeout as e:
logger.warning("llama.cpp request timed out: %s", str(e))
return {
"content": None,
"tool_calls": None,
"finish_reason": "error",
}
except requests.exceptions.RequestException as e:
error_detail = str(e)
if hasattr(e, "response") and e.response is not None:
try:
error_detail = f"{str(e)} - Response: {e.response.text[:500]}"
except Exception:
pass
logger.warning("llama.cpp returned an error: %s", error_detail)
return {
"content": None,
"tool_calls": None,
"finish_reason": "error",
}
except Exception as e:
logger.warning("Unexpected error in llama.cpp chat_with_tools: %s", str(e))
return {
"content": None,
"tool_calls": None,
"finish_reason": "error",
}
async def chat_with_tools_stream(
self,
messages: list[dict[str, Any]],
tools: Optional[list[dict[str, Any]]] = None,
tool_choice: Optional[str] = "auto",
):
"""Stream chat with tools via OpenAI-compatible streaming API."""
if self.provider is None:
logger.warning(
"llama.cpp provider has not been initialized. Check your llama.cpp configuration."
)
yield (
"message",
{
"content": None,
"tool_calls": None,
"finish_reason": "error",
},
)
return
try:
payload = self._build_payload(messages, tools, tool_choice, stream=True)
content_parts: list[str] = []
tool_calls_by_index: dict[int, dict[str, Any]] = {}
finish_reason = "stop"
async with httpx.AsyncClient(timeout=float(self.timeout)) as client:
async with client.stream(
"POST",
f"{self.provider}/v1/chat/completions",
json=payload,
) as response:
response.raise_for_status()
async for line in response.aiter_lines():
if not line.startswith("data: "):
continue
data_str = line[6:].strip()
if data_str == "[DONE]":
break
try:
data = json.loads(data_str)
except json.JSONDecodeError:
continue
choices = data.get("choices") or []
if not choices:
continue
delta = choices[0].get("delta", {})
if choices[0].get("finish_reason"):
finish_reason = choices[0]["finish_reason"]
if delta.get("content"):
content_parts.append(delta["content"])
yield ("content_delta", delta["content"])
for tc in delta.get("tool_calls") or []:
idx = tc.get("index", 0)
fn = tc.get("function") or {}
if idx not in tool_calls_by_index:
tool_calls_by_index[idx] = {
"id": tc.get("id", ""),
"name": tc.get("name") or fn.get("name", ""),
"arguments": "",
}
t = tool_calls_by_index[idx]
if tc.get("id"):
t["id"] = tc["id"]
name = tc.get("name") or fn.get("name")
if name:
t["name"] = name
arg = tc.get("arguments") or fn.get("arguments")
if arg is not None:
t["arguments"] += (
arg if isinstance(arg, str) else json.dumps(arg)
)
full_content = "".join(content_parts).strip() or None
tool_calls_list = self._streamed_tool_calls_to_list(tool_calls_by_index)
if tool_calls_list:
finish_reason = "tool_calls"
yield (
"message",
{
"content": full_content,
"tool_calls": tool_calls_list,
"finish_reason": finish_reason,
},
)
except httpx.HTTPStatusError as e:
logger.warning("llama.cpp streaming HTTP error: %s", e)
yield (
"message",
{
"content": None,
"tool_calls": None,
"finish_reason": "error",
},
)
except Exception as e:
logger.warning(
"Unexpected error in llama.cpp chat_with_tools_stream: %s", str(e)
)
yield (
"message",
{
"content": None,
"tool_calls": None,
"finish_reason": "error",
},
)