Support token streaming stats

This commit is contained in:
Nicolas Mowen 2026-05-14 09:35:57 -06:00
parent 78fc472026
commit 475f87d40b
5 changed files with 150 additions and 0 deletions

View File

@ -10,6 +10,7 @@ from openai import AzureOpenAI
from frigate.config import GenAIProviderEnum
from frigate.genai import GenAIClient, register_genai_provider
from frigate.genai.openai import _stats_from_openai_usage
logger = logging.getLogger(__name__)
@ -210,6 +211,7 @@ class OpenAIClient(GenAIClient):
"messages": messages,
"timeout": self.timeout,
"stream": True,
"stream_options": {"include_usage": True},
}
if tools:
@ -221,10 +223,15 @@ class OpenAIClient(GenAIClient):
content_parts: list[str] = []
tool_calls_by_index: dict[int, dict[str, Any]] = {}
finish_reason = "stop"
usage_stats: Optional[dict[str, Any]] = None
stream = self.provider.chat.completions.create(**request_params) # type: ignore[call-overload]
for chunk in stream:
chunk_usage = getattr(chunk, "usage", None)
if chunk_usage is not None:
usage_stats = _stats_from_openai_usage(chunk_usage)
if not chunk or not chunk.choices:
continue
@ -284,6 +291,9 @@ class OpenAIClient(GenAIClient):
)
finish_reason = "tool_calls"
if usage_stats is not None:
yield ("stats", usage_stats)
yield (
"message",
{

View File

@ -14,6 +14,20 @@ from frigate.genai import GenAIClient, register_genai_provider
logger = logging.getLogger(__name__)
def _stats_from_gemini_usage(usage: Any) -> Optional[dict[str, Any]]:
"""Build a stats dict from a Gemini usage_metadata object."""
prompt_tokens = getattr(usage, "prompt_token_count", None)
completion_tokens = getattr(usage, "candidates_token_count", None)
if prompt_tokens is None and completion_tokens is None:
return None
stats: dict[str, Any] = {}
if isinstance(prompt_tokens, int):
stats["prompt_tokens"] = prompt_tokens
if isinstance(completion_tokens, int):
stats["completion_tokens"] = completion_tokens
return stats or None
@register_genai_provider(GenAIProviderEnum.gemini)
class GeminiClient(GenAIClient):
"""Generative AI client for Frigate using Gemini."""
@ -471,6 +485,7 @@ class GeminiClient(GenAIClient):
content_parts: list[str] = []
tool_calls_by_index: dict[int, dict[str, Any]] = {}
finish_reason = "stop"
usage_stats: Optional[dict[str, Any]] = None
stream = await self.provider.aio.models.generate_content_stream(
model=self.genai_config.model,
@ -479,6 +494,12 @@ class GeminiClient(GenAIClient):
)
async for chunk in stream:
chunk_usage = getattr(chunk, "usage_metadata", None)
if chunk_usage is not None:
maybe_stats = _stats_from_gemini_usage(chunk_usage)
if maybe_stats is not None:
usage_stats = maybe_stats
if not chunk or not chunk.candidates:
continue
@ -565,6 +586,9 @@ class GeminiClient(GenAIClient):
)
finish_reason = "tool_calls"
if usage_stats is not None:
yield ("stats", usage_stats)
yield (
"message",
{

View File

@ -18,6 +18,52 @@ from frigate.genai.utils import parse_tool_calls_from_message
logger = logging.getLogger(__name__)
def _stats_from_llama_cpp_chunk(data: dict[str, Any]) -> Optional[dict[str, Any]]:
"""Build a stats dict from a llama.cpp streaming chunk.
Final-chunk `usage` carries authoritative token counts. Per-chunk
`timings` (enabled via timings_per_token) carries the running token
counts (prompt_n, predicted_n) and generation rate, so live updates
work mid-stream.
"""
usage = data.get("usage") or {}
timings = data.get("timings") or {}
prompt_tokens = usage.get("prompt_tokens")
completion_tokens = usage.get("completion_tokens")
predicted_ms = timings.get("predicted_ms")
tps = timings.get("predicted_per_second")
stats: dict[str, Any] = {}
if not isinstance(prompt_tokens, int):
prompt_n = timings.get("prompt_n")
if isinstance(prompt_n, int):
prompt_tokens = prompt_n
if not isinstance(completion_tokens, int):
predicted_n = timings.get("predicted_n")
if isinstance(predicted_n, int):
completion_tokens = predicted_n
if not isinstance(prompt_tokens, int) and not isinstance(completion_tokens, int):
return None
if isinstance(prompt_tokens, int):
stats["prompt_tokens"] = prompt_tokens
if isinstance(completion_tokens, int):
stats["completion_tokens"] = completion_tokens
if isinstance(predicted_ms, (int, float)) and predicted_ms > 0:
stats["completion_duration_ms"] = float(predicted_ms)
if isinstance(tps, (int, float)) and tps > 0:
stats["tokens_per_second"] = float(tps)
return stats or None
def _parse_launch_arg(args: list[str], flag: str) -> str | None:
"""Return the value following `flag` in a positional argv list, or None."""
try:
@ -462,6 +508,8 @@ class LlamaCppClient(GenAIClient):
}
if stream:
payload["stream"] = True
payload["stream_options"] = {"include_usage": True}
payload["timings_per_token"] = True
if tools:
payload["tools"] = tools
if openai_tool_choice is not None:
@ -724,6 +772,9 @@ class LlamaCppClient(GenAIClient):
data = json.loads(data_str)
except json.JSONDecodeError:
continue
maybe_stats = _stats_from_llama_cpp_chunk(data)
if maybe_stats is not None:
yield ("stats", maybe_stats)
choices = data.get("choices") or []
if not choices:
continue

View File

@ -18,6 +18,37 @@ from frigate.genai.utils import parse_tool_calls_from_message
logger = logging.getLogger(__name__)
def _extract_ollama_stats(response: Any) -> Optional[dict[str, Any]]:
"""Build a stats dict from Ollama's response metadata.
Ollama reports eval_count/eval_duration (generation) and
prompt_eval_count (context size). Durations are nanoseconds.
"""
if not response:
return None
if hasattr(response, "get"):
getter = response.get
else:
getter = lambda key: getattr(response, key, None) # noqa: E731
eval_count = getter("eval_count")
eval_duration_ns = getter("eval_duration")
prompt_eval_count = getter("prompt_eval_count")
if eval_count is None and prompt_eval_count is None:
return None
stats: dict[str, Any] = {}
if isinstance(prompt_eval_count, int):
stats["prompt_tokens"] = prompt_eval_count
if isinstance(eval_count, int):
stats["completion_tokens"] = eval_count
if isinstance(eval_duration_ns, int) and eval_duration_ns > 0:
stats["completion_duration_ms"] = eval_duration_ns / 1_000_000
if isinstance(eval_count, int) and eval_count > 0:
stats["tokens_per_second"] = eval_count / (eval_duration_ns / 1_000_000_000)
return stats or None
def _normalize_multimodal_content(
content: Any,
) -> tuple[Optional[str], Optional[list[bytes]]]:
@ -403,6 +434,9 @@ class OllamaClient(GenAIClient):
content = result.get("content")
if content:
yield ("content_delta", content)
stats = _extract_ollama_stats(response)
if stats is not None:
yield ("stats", stats)
yield ("message", result)
return
@ -416,6 +450,7 @@ class OllamaClient(GenAIClient):
)
content_parts: list[str] = []
final_message: dict[str, Any] | None = None
final_chunk: Any = None
stream = await async_client.chat(**request_params)
async for chunk in stream:
if not chunk or "message" not in chunk:
@ -426,6 +461,7 @@ class OllamaClient(GenAIClient):
content_parts.append(delta)
yield ("content_delta", delta)
if chunk.get("done"):
final_chunk = chunk
full_content = "".join(content_parts).strip() or None
final_message = {
"content": full_content,
@ -434,6 +470,10 @@ class OllamaClient(GenAIClient):
}
break
stats = _extract_ollama_stats(final_chunk)
if stats is not None:
yield ("stats", stats)
if final_message is not None:
yield ("message", final_message)
else:

View File

@ -14,6 +14,22 @@ from frigate.genai import GenAIClient, register_genai_provider
logger = logging.getLogger(__name__)
def _stats_from_openai_usage(usage: Any) -> Optional[dict[str, Any]]:
"""Build a stats dict from an OpenAI-compatible usage object."""
if usage is None:
return None
prompt_tokens = getattr(usage, "prompt_tokens", None)
completion_tokens = getattr(usage, "completion_tokens", None)
if prompt_tokens is None and completion_tokens is None:
return None
stats: dict[str, Any] = {}
if isinstance(prompt_tokens, int):
stats["prompt_tokens"] = prompt_tokens
if isinstance(completion_tokens, int):
stats["completion_tokens"] = completion_tokens
return stats or None
@register_genai_provider(GenAIProviderEnum.openai)
class OpenAIClient(GenAIClient):
"""Generative AI client for Frigate using OpenAI."""
@ -298,6 +314,7 @@ class OpenAIClient(GenAIClient):
"messages": messages,
"timeout": self.timeout,
"stream": True,
"stream_options": {"include_usage": True},
}
if tools:
@ -318,10 +335,15 @@ class OpenAIClient(GenAIClient):
content_parts: list[str] = []
tool_calls_by_index: dict[int, dict[str, Any]] = {}
finish_reason = "stop"
usage_stats: Optional[dict[str, Any]] = None
stream = self.provider.chat.completions.create(**request_params) # type: ignore[call-overload]
for chunk in stream:
chunk_usage = getattr(chunk, "usage", None)
if chunk_usage is not None:
usage_stats = _stats_from_openai_usage(chunk_usage)
if not chunk or not chunk.choices:
continue
@ -381,6 +403,9 @@ class OpenAIClient(GenAIClient):
)
finish_reason = "tool_calls"
if usage_stats is not None:
yield ("stats", usage_stats)
yield (
"message",
{