This commit is contained in:
Daniel 2026-06-20 09:05:14 -04:00 committed by GitHub
commit 1b349cca2c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 63 additions and 9 deletions

View File

@ -0,0 +1,53 @@
"""Tests for frigate.util.builtin helpers."""
import unittest
from unittest.mock import patch
from frigate.util.builtin import EventsPerSecond
class TestEventsPerSecond(unittest.TestCase):
def test_eps_is_zero_before_any_events(self) -> None:
eps = EventsPerSecond()
with patch("frigate.util.builtin.time.monotonic", return_value=100.0):
self.assertEqual(eps.eps(), 0.0)
def test_eps_counts_events_in_window(self) -> None:
eps = EventsPerSecond(last_n_seconds=10)
clock = [1000.0]
with patch("frigate.util.builtin.time.monotonic", side_effect=lambda: clock[0]):
eps.start()
# one event per second for five seconds
for _ in range(5):
clock[0] += 1.0
eps.update()
# five events over the five seconds since start
self.assertAlmostEqual(eps.eps(), 1.0)
def test_old_timestamps_expire_from_window(self) -> None:
eps = EventsPerSecond(last_n_seconds=10)
clock = [0.0]
with patch("frigate.util.builtin.time.monotonic", side_effect=lambda: clock[0]):
eps.start()
for _ in range(10):
clock[0] += 1.0
eps.update()
# jump well past the window so every timestamp ages out
clock[0] += 100.0
self.assertEqual(eps.eps(), 0.0)
def test_timestamps_are_memory_bounded(self) -> None:
# last_n_seconds large enough that nothing expires by time, so the
# maxlen cap is what bounds retention
eps = EventsPerSecond(max_events=5, last_n_seconds=10_000)
clock = [0.0]
with patch("frigate.util.builtin.time.monotonic", side_effect=lambda: clock[0]):
eps.start()
for _ in range(1000):
clock[0] += 0.001
eps.update()
self.assertLessEqual(len(eps._timestamps), 5)
if __name__ == "__main__":
unittest.main()

View File

@ -2,7 +2,6 @@
import ast
import copy
import datetime
import logging
import math
import multiprocessing.queues
@ -10,7 +9,9 @@ import queue
import re
import shlex
import struct
import time
import urllib.parse
from collections import deque
from collections.abc import Mapping
from multiprocessing.managers import ValueProxy
from pathlib import Path
@ -32,23 +33,23 @@ class EventsPerSecond:
self._start = None
self._max_events = max_events
self._last_n_seconds = last_n_seconds
self._timestamps = []
# bounded deque: O(1) expiry from the left and an automatic cap on
# retained timestamps (the time-based expiry keeps it far smaller in
# practice, this is just a memory backstop)
self._timestamps: deque[float] = deque(maxlen=max_events)
def start(self) -> None:
self._start = datetime.datetime.now().timestamp()
self._start = time.monotonic()
def update(self) -> None:
now = datetime.datetime.now().timestamp()
now = time.monotonic()
if self._start is None:
self._start = now
self._timestamps.append(now)
# truncate the list when it goes 100 over the max_size
if len(self._timestamps) > self._max_events + 100:
self._timestamps = self._timestamps[(1 - self._max_events) :]
self.expire_timestamps(now)
def eps(self) -> float:
now = datetime.datetime.now().timestamp()
now = time.monotonic()
if self._start is None:
self._start = now
# compute the (approximate) events in the last n seconds
@ -63,7 +64,7 @@ class EventsPerSecond:
def expire_timestamps(self, now: float) -> None:
threshold = now - self._last_n_seconds
while self._timestamps and self._timestamps[0] < threshold:
del self._timestamps[0]
self._timestamps.popleft()
class InferenceSpeed: