frigate/frigate/stats/emitter.py
Josh Hawkins f44f485f48
Improve metrics UI performance (#22691)
* embed cpu/mem stats into detectors, cameras, and processes

so history consumers don't need the full cpu_usages dict

* support dot-notation for nested keys

to avoid returning large objects when only specific subfields are needed

* fix setLastUpdated being called inside useMemo

this triggered a setState-during-render warning, so moved to a useEffect

* frontend types

* frontend

hide instead of unmount all graphs - re-rendering is much more expensive and disruptive than the amount of dom memory required

keep track of visited tabs to keep them mounted rather than re-mounting or mounting all tabs

add isActive prop to all charts to re-trigger animation when switching metrics tabs

fix chart data padding bug where the loop used number of series rather than number of data points

fix bug where only a shallow copy of the array was used for mutation

fix missing key prop causing console logs

* add isactive after rebase

* formatting

* skip None values in filtered output for dot notation
2026-03-29 11:58:47 -06:00

150 lines
4.9 KiB
Python

"""Emit stats to listeners."""
import itertools
import json
import logging
import threading
import time
from multiprocessing.synchronize import Event as MpEvent
from typing import Any, Optional
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import FrigateConfig
from frigate.const import FREQUENCY_STATS_POINTS
from frigate.stats.prometheus import update_metrics
from frigate.stats.util import stats_snapshot
from frigate.types import StatsTrackingTypes
logger = logging.getLogger(__name__)
MAX_STATS_POINTS = 80
class StatsEmitter(threading.Thread):
def __init__(
self,
config: FrigateConfig,
stats_tracking: StatsTrackingTypes,
stop_event: MpEvent,
):
super().__init__(name="frigate_stats_emitter")
self.config = config
self.stats_tracking = stats_tracking
self.stop_event = stop_event
self.hwaccel_errors: list[str] = []
self.stats_history: list[dict[str, Any]] = []
# create communication for stats
self.requestor = InterProcessRequestor()
def get_latest_stats(self) -> dict[str, Any]:
"""Get latest stats."""
if len(self.stats_history) > 0:
return self.stats_history[-1]
else:
stats = stats_snapshot(
self.config, self.stats_tracking, self.hwaccel_errors
)
self.stats_history.append(stats)
return stats
def get_stats_history(
self, keys: Optional[list[str]] = None
) -> list[dict[str, Any]]:
"""Get stats history.
Supports dot-notation for nested keys to avoid returning large objects
when only specific subfields are needed. Handles two patterns:
- Flat dict: "service.last_updated" returns {"service": {"last_updated": ...}}
- Dict-of-dicts: "cameras.camera_fps" returns each camera entry filtered
to only include "camera_fps"
"""
if not keys:
return self.stats_history
# Pre-parse keys into top-level keys and dot-notation fields
top_level_keys: list[str] = []
nested_keys: dict[str, list[str]] = {}
for k in keys:
if "." in k:
parent_key, child_key = k.split(".", 1)
nested_keys.setdefault(parent_key, []).append(child_key)
else:
top_level_keys.append(k)
selected_stats: list[dict[str, Any]] = []
for s in self.stats_history:
selected: dict[str, Any] = {}
for k in top_level_keys:
selected[k] = s.get(k)
for parent_key, child_keys in nested_keys.items():
parent = s.get(parent_key)
if not isinstance(parent, dict):
selected[parent_key] = parent
continue
# Check if values are dicts (dict-of-dicts like cameras/detectors)
first_value = next(iter(parent.values()), None)
if isinstance(first_value, dict):
# Filter each nested entry to only requested fields,
# omitting None values to preserve key-absence semantics
selected[parent_key] = {
entry_key: {
field: val
for field in child_keys
if (val := entry.get(field)) is not None
}
for entry_key, entry in parent.items()
}
else:
# Flat dict (like service) - pick individual fields
if parent_key not in selected:
selected[parent_key] = {}
for child_key in child_keys:
selected[parent_key][child_key] = parent.get(child_key)
selected_stats.append(selected)
return selected_stats
def stats_init(config, camera_metrics, detectors, processes):
stats = {
"cameras": camera_metrics,
"detectors": detectors,
"processes": processes,
}
# Update Prometheus metrics with initial stats
update_metrics(stats)
return stats
def run(self) -> None:
time.sleep(10)
for counter in itertools.cycle(
range(int(self.config.mqtt.stats_interval / FREQUENCY_STATS_POINTS))
):
if self.stop_event.wait(FREQUENCY_STATS_POINTS):
break
logger.debug("Starting stats collection")
stats = stats_snapshot(
self.config, self.stats_tracking, self.hwaccel_errors
)
self.stats_history.append(stats)
self.stats_history = self.stats_history[-MAX_STATS_POINTS:]
if counter == 0:
self.requestor.send_data("stats", json.dumps(stats))
logger.debug("Finished stats collection")
logger.info("Exiting stats emitter...")