This commit is contained in:
Daniel 2026-06-20 21:05:26 +00:00 committed by GitHub
commit aa8b8a2ba9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 48 additions and 9 deletions

View File

@ -0,0 +1,41 @@
"""Tests for frigate.util.builtin helpers."""
import unittest
from unittest.mock import patch
from frigate.util.builtin import EventsPerSecond
class TestEventsPerSecond(unittest.TestCase):
def test_eps_is_zero_before_any_events(self) -> None:
eps = EventsPerSecond()
with patch("frigate.util.builtin.time.monotonic", return_value=100.0):
self.assertEqual(eps.eps(), 0.0)
def test_eps_counts_events_in_window(self) -> None:
eps = EventsPerSecond(last_n_seconds=10)
clock = [1000.0]
with patch("frigate.util.builtin.time.monotonic", side_effect=lambda: clock[0]):
eps.start()
# one event per second for five seconds
for _ in range(5):
clock[0] += 1.0
eps.update()
# five events over the five seconds since start
self.assertAlmostEqual(eps.eps(), 1.0)
def test_old_timestamps_expire_from_window(self) -> None:
eps = EventsPerSecond(last_n_seconds=10)
clock = [0.0]
with patch("frigate.util.builtin.time.monotonic", side_effect=lambda: clock[0]):
eps.start()
for _ in range(10):
clock[0] += 1.0
eps.update()
# jump well past the window so every timestamp ages out
clock[0] += 100.0
self.assertEqual(eps.eps(), 0.0)
if __name__ == "__main__":
unittest.main()

View File

@ -2,7 +2,6 @@
import ast import ast
import copy import copy
import datetime
import logging import logging
import math import math
import multiprocessing.queues import multiprocessing.queues
@ -10,7 +9,9 @@ import queue
import re import re
import shlex import shlex
import struct import struct
import time
import urllib.parse import urllib.parse
from collections import deque
from collections.abc import Mapping from collections.abc import Mapping
from multiprocessing.managers import ValueProxy from multiprocessing.managers import ValueProxy
from pathlib import Path from pathlib import Path
@ -32,23 +33,20 @@ class EventsPerSecond:
self._start = None self._start = None
self._max_events = max_events self._max_events = max_events
self._last_n_seconds = last_n_seconds self._last_n_seconds = last_n_seconds
self._timestamps = [] self._timestamps: deque[float] = deque(maxlen=max_events)
def start(self) -> None: def start(self) -> None:
self._start = datetime.datetime.now().timestamp() self._start = time.monotonic()
def update(self) -> None: def update(self) -> None:
now = datetime.datetime.now().timestamp() now = time.monotonic()
if self._start is None: if self._start is None:
self._start = now self._start = now
self._timestamps.append(now) self._timestamps.append(now)
# truncate the list when it goes 100 over the max_size
if len(self._timestamps) > self._max_events + 100:
self._timestamps = self._timestamps[(1 - self._max_events) :]
self.expire_timestamps(now) self.expire_timestamps(now)
def eps(self) -> float: def eps(self) -> float:
now = datetime.datetime.now().timestamp() now = time.monotonic()
if self._start is None: if self._start is None:
self._start = now self._start = now
# compute the (approximate) events in the last n seconds # compute the (approximate) events in the last n seconds
@ -63,7 +61,7 @@ class EventsPerSecond:
def expire_timestamps(self, now: float) -> None: def expire_timestamps(self, now: float) -> None:
threshold = now - self._last_n_seconds threshold = now - self._last_n_seconds
while self._timestamps and self._timestamps[0] < threshold: while self._timestamps and self._timestamps[0] < threshold:
del self._timestamps[0] self._timestamps.popleft()
class InferenceSpeed: class InferenceSpeed: