diff --git a/frigate/test/test_builtin.py b/frigate/test/test_builtin.py new file mode 100644 index 0000000000..4c9db6140d --- /dev/null +++ b/frigate/test/test_builtin.py @@ -0,0 +1,53 @@ +"""Tests for frigate.util.builtin helpers.""" + +import unittest +from unittest.mock import patch + +from frigate.util.builtin import EventsPerSecond + + +class TestEventsPerSecond(unittest.TestCase): + def test_eps_is_zero_before_any_events(self) -> None: + eps = EventsPerSecond() + with patch("frigate.util.builtin.time.monotonic", return_value=100.0): + self.assertEqual(eps.eps(), 0.0) + + def test_eps_counts_events_in_window(self) -> None: + eps = EventsPerSecond(last_n_seconds=10) + clock = [1000.0] + with patch("frigate.util.builtin.time.monotonic", side_effect=lambda: clock[0]): + eps.start() + # one event per second for five seconds + for _ in range(5): + clock[0] += 1.0 + eps.update() + # five events over the five seconds since start + self.assertAlmostEqual(eps.eps(), 1.0) + + def test_old_timestamps_expire_from_window(self) -> None: + eps = EventsPerSecond(last_n_seconds=10) + clock = [0.0] + with patch("frigate.util.builtin.time.monotonic", side_effect=lambda: clock[0]): + eps.start() + for _ in range(10): + clock[0] += 1.0 + eps.update() + # jump well past the window so every timestamp ages out + clock[0] += 100.0 + self.assertEqual(eps.eps(), 0.0) + + def test_timestamps_are_memory_bounded(self) -> None: + # last_n_seconds large enough that nothing expires by time, so the + # maxlen cap is what bounds retention + eps = EventsPerSecond(max_events=5, last_n_seconds=10_000) + clock = [0.0] + with patch("frigate.util.builtin.time.monotonic", side_effect=lambda: clock[0]): + eps.start() + for _ in range(1000): + clock[0] += 0.001 + eps.update() + self.assertLessEqual(len(eps._timestamps), 5) + + +if __name__ == "__main__": + unittest.main() diff --git a/frigate/util/builtin.py b/frigate/util/builtin.py index c1c9d9d6ea..288ecd3c7c 100644 --- a/frigate/util/builtin.py +++ b/frigate/util/builtin.py @@ -2,7 +2,6 @@ import ast import copy -import datetime import logging import math import multiprocessing.queues @@ -10,7 +9,9 @@ import queue import re import shlex import struct +import time import urllib.parse +from collections import deque from collections.abc import Mapping from multiprocessing.managers import ValueProxy from pathlib import Path @@ -32,23 +33,20 @@ class EventsPerSecond: self._start = None self._max_events = max_events self._last_n_seconds = last_n_seconds - self._timestamps = [] + self._timestamps: deque[float] = deque(maxlen=max_events) def start(self) -> None: - self._start = datetime.datetime.now().timestamp() + self._start = time.monotonic() def update(self) -> None: - now = datetime.datetime.now().timestamp() + now = time.monotonic() if self._start is None: self._start = now self._timestamps.append(now) - # truncate the list when it goes 100 over the max_size - if len(self._timestamps) > self._max_events + 100: - self._timestamps = self._timestamps[(1 - self._max_events) :] self.expire_timestamps(now) def eps(self) -> float: - now = datetime.datetime.now().timestamp() + now = time.monotonic() if self._start is None: self._start = now # compute the (approximate) events in the last n seconds @@ -63,7 +61,7 @@ class EventsPerSecond: def expire_timestamps(self, now: float) -> None: threshold = now - self._last_n_seconds while self._timestamps and self._timestamps[0] < threshold: - del self._timestamps[0] + self._timestamps.popleft() class InferenceSpeed: