frigate/frigate/stats/emitter.py
2026-03-29 12:30:59 -05:00

145 lines
4.7 KiB
Python

"""Emit stats to listeners."""
import itertools
import json
import logging
import threading
import time
from multiprocessing.synchronize import Event as MpEvent
from typing import Any, Optional
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import FrigateConfig
from frigate.const import FREQUENCY_STATS_POINTS
from frigate.stats.prometheus import update_metrics
from frigate.stats.util import stats_snapshot
from frigate.types import StatsTrackingTypes
logger = logging.getLogger(__name__)
MAX_STATS_POINTS = 80
class StatsEmitter(threading.Thread):
def __init__(
self,
config: FrigateConfig,
stats_tracking: StatsTrackingTypes,
stop_event: MpEvent,
):
super().__init__(name="frigate_stats_emitter")
self.config = config
self.stats_tracking = stats_tracking
self.stop_event = stop_event
self.hwaccel_errors: list[str] = []
self.stats_history: list[dict[str, Any]] = []
# create communication for stats
self.requestor = InterProcessRequestor()
def get_latest_stats(self) -> dict[str, Any]:
"""Get latest stats."""
if len(self.stats_history) > 0:
return self.stats_history[-1]
else:
stats = stats_snapshot(
self.config, self.stats_tracking, self.hwaccel_errors
)
self.stats_history.append(stats)
return stats
def get_stats_history(
self, keys: Optional[list[str]] = None
) -> list[dict[str, Any]]:
"""Get stats history.
Supports dot-notation for nested keys to avoid returning large objects
when only specific subfields are needed. Handles two patterns:
- Flat dict: "service.last_updated" returns {"service": {"last_updated": ...}}
- Dict-of-dicts: "cameras.camera_fps" returns each camera entry filtered
to only include "camera_fps"
"""
if not keys:
return self.stats_history
# Pre-parse keys into top-level keys and dot-notation fields
top_level_keys: list[str] = []
nested_keys: dict[str, list[str]] = {}
for k in keys:
if "." in k:
parent_key, child_key = k.split(".", 1)
nested_keys.setdefault(parent_key, []).append(child_key)
else:
top_level_keys.append(k)
selected_stats: list[dict[str, Any]] = []
for s in self.stats_history:
selected: dict[str, Any] = {}
for k in top_level_keys:
selected[k] = s.get(k)
for parent_key, child_keys in nested_keys.items():
parent = s.get(parent_key)
if not isinstance(parent, dict):
selected[parent_key] = parent
continue
# Check if values are dicts (dict-of-dicts like cameras/detectors)
first_value = next(iter(parent.values()), None)
if isinstance(first_value, dict):
# Filter each nested entry to only requested fields
selected[parent_key] = {
entry_key: {field: entry.get(field) for field in child_keys}
for entry_key, entry in parent.items()
}
else:
# Flat dict (like service) - pick individual fields
if parent_key not in selected:
selected[parent_key] = {}
for child_key in child_keys:
selected[parent_key][child_key] = parent.get(child_key)
selected_stats.append(selected)
return selected_stats
def stats_init(config, camera_metrics, detectors, processes):
stats = {
"cameras": camera_metrics,
"detectors": detectors,
"processes": processes,
}
# Update Prometheus metrics with initial stats
update_metrics(stats)
return stats
def run(self) -> None:
time.sleep(10)
for counter in itertools.cycle(
range(int(self.config.mqtt.stats_interval / FREQUENCY_STATS_POINTS))
):
if self.stop_event.wait(FREQUENCY_STATS_POINTS):
break
logger.debug("Starting stats collection")
stats = stats_snapshot(
self.config, self.stats_tracking, self.hwaccel_errors
)
self.stats_history.append(stats)
self.stats_history = self.stats_history[-MAX_STATS_POINTS:]
if counter == 0:
self.requestor.send_data("stats", json.dumps(stats))
logger.debug("Finished stats collection")
logger.info("Exiting stats emitter...")