perf(util): use monotonic clock and bounded deque in EventsPerSecond

EventsPerSecond is updated on every captured frame, every detection and
every processed frame across all cameras and detectors. The previous
implementation derived timestamps from datetime.now().timestamp() (wall
clock), so an NTP or manual clock adjustment could skew the rolling-window
expiry; it also stored timestamps in a list and expired them with
del self._timestamps[0] (O(n) per removal) plus a periodic slice-copy to
cap growth.

Switch to time.monotonic() for the interval math (correct by construction
and immune to wall-clock jumps) and a collections.deque(maxlen=...) so
expiry is O(1) (popleft) and retention is bounded automatically. This
mirrors the deque-based expiry already used in video/ffmpeg.py and
watchdog.py. Observable output is unchanged.

Adds frigate/test/test_builtin.py covering rate calculation, window
expiry and the memory bound.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Daniel-dev22 2026-06-20 09:04:17 -04:00
parent 5003ab895c
commit bf3190a6d7
2 changed files with 60 additions and 9 deletions

View File

@ -0,0 +1,53 @@
"""Tests for frigate.util.builtin helpers."""
import unittest
from unittest.mock import patch
from frigate.util.builtin import EventsPerSecond
class TestEventsPerSecond(unittest.TestCase):
def test_eps_is_zero_before_any_events(self) -> None:
eps = EventsPerSecond()
with patch("frigate.util.builtin.time.monotonic", return_value=100.0):
self.assertEqual(eps.eps(), 0.0)
def test_eps_counts_events_in_window(self) -> None:
eps = EventsPerSecond(last_n_seconds=10)
clock = [1000.0]
with patch("frigate.util.builtin.time.monotonic", side_effect=lambda: clock[0]):
eps.start()
# one event per second for five seconds
for _ in range(5):
clock[0] += 1.0
eps.update()
# five events over the five seconds since start
self.assertAlmostEqual(eps.eps(), 1.0)
def test_old_timestamps_expire_from_window(self) -> None:
eps = EventsPerSecond(last_n_seconds=10)
clock = [0.0]
with patch("frigate.util.builtin.time.monotonic", side_effect=lambda: clock[0]):
eps.start()
for _ in range(10):
clock[0] += 1.0
eps.update()
# jump well past the window so every timestamp ages out
clock[0] += 100.0
self.assertEqual(eps.eps(), 0.0)
def test_timestamps_are_memory_bounded(self) -> None:
# last_n_seconds large enough that nothing expires by time, so the
# maxlen cap is what bounds retention
eps = EventsPerSecond(max_events=5, last_n_seconds=10_000)
clock = [0.0]
with patch("frigate.util.builtin.time.monotonic", side_effect=lambda: clock[0]):
eps.start()
for _ in range(1000):
clock[0] += 0.001
eps.update()
self.assertLessEqual(len(eps._timestamps), 5)
if __name__ == "__main__":
unittest.main()

View File

@ -2,7 +2,6 @@
import ast
import copy
import datetime
import logging
import math
import multiprocessing.queues
@ -10,7 +9,9 @@ import queue
import re
import shlex
import struct
import time
import urllib.parse
from collections import deque
from collections.abc import Mapping
from multiprocessing.managers import ValueProxy
from pathlib import Path
@ -32,23 +33,20 @@ class EventsPerSecond:
self._start = None
self._max_events = max_events
self._last_n_seconds = last_n_seconds
self._timestamps = []
self._timestamps: deque[float] = deque(maxlen=max_events)
def start(self) -> None:
self._start = datetime.datetime.now().timestamp()
self._start = time.monotonic()
def update(self) -> None:
now = datetime.datetime.now().timestamp()
now = time.monotonic()
if self._start is None:
self._start = now
self._timestamps.append(now)
# truncate the list when it goes 100 over the max_size
if len(self._timestamps) > self._max_events + 100:
self._timestamps = self._timestamps[(1 - self._max_events) :]
self.expire_timestamps(now)
def eps(self) -> float:
now = datetime.datetime.now().timestamp()
now = time.monotonic()
if self._start is None:
self._start = now
# compute the (approximate) events in the last n seconds
@ -63,7 +61,7 @@ class EventsPerSecond:
def expire_timestamps(self, now: float) -> None:
threshold = now - self._last_n_seconds
while self._timestamps and self._timestamps[0] < threshold:
del self._timestamps[0]
self._timestamps.popleft()
class InferenceSpeed: