combine event tests and refactor for camera access

This commit is contained in:
Josh Hawkins 2025-09-10 15:13:13 -05:00
parent 32b599d011
commit 69d3fd9269
3 changed files with 176 additions and 338 deletions

View File

@ -112,7 +112,7 @@ class BaseTestHttp(unittest.TestCase):
except OSError:
pass
def create_app(self, stats=None):
def create_app(self, stats=None, event_metadata_publisher=None):
return create_fastapi_app(
FrigateConfig(**self.minimal_config),
self.db,
@ -121,7 +121,7 @@ class BaseTestHttp(unittest.TestCase):
None,
None,
stats,
None,
event_metadata_publisher,
None,
)
@ -134,12 +134,13 @@ class BaseTestHttp(unittest.TestCase):
top_score: int = 100,
score: int = 0,
data: Json = {},
camera: str = "front_door",
) -> Event:
"""Inserts a basic event model with a given id."""
return Event.insert(
id=id,
label="Mock",
camera="front_door",
camera=camera,
start_time=start_time,
end_time=end_time,
top_score=top_score,
@ -158,15 +159,23 @@ class BaseTestHttp(unittest.TestCase):
def insert_mock_review_segment(
self,
id: str,
start_time: float = datetime.datetime.now().timestamp(),
end_time: float = datetime.datetime.now().timestamp() + 20,
start_time: float | None = None,
end_time: float | None = None,
severity: SeverityEnum = SeverityEnum.alert,
data: Json = {},
data: dict | None = None,
camera: str = "front_door",
) -> ReviewSegment:
"""Inserts a review segment model with a given id."""
if start_time is None:
start_time = datetime.datetime.now().timestamp()
if end_time is None:
end_time = start_time + 20
if data is None:
data = {}
return ReviewSegment.insert(
id=id,
camera="front_door",
camera=camera,
start_time=start_time,
end_time=end_time,
severity=severity,

View File

@ -1,16 +1,34 @@
from datetime import datetime
from typing import Any
from unittest.mock import Mock
from fastapi.testclient import TestClient
from playhouse.shortcuts import model_to_dict
from frigate.models import Event, Recordings, ReviewSegment
from frigate.api.auth import get_allowed_cameras_for_filter, get_current_user
from frigate.comms.event_metadata_updater import EventMetadataPublisher
from frigate.models import Event, Recordings, ReviewSegment, Timeline
from frigate.test.http_api.base_http_test import BaseTestHttp
class TestHttpApp(BaseTestHttp):
def setUp(self):
super().setUp([Event, Recordings, ReviewSegment])
super().setUp([Event, Recordings, ReviewSegment, Timeline])
self.app = super().create_app()
# Mock auth to bypass camera access for tests
async def mock_get_current_user(request: Any):
return {"username": "test_user", "role": "admin"}
self.app.dependency_overrides[get_current_user] = mock_get_current_user
self.app.dependency_overrides[get_allowed_cameras_for_filter] = lambda: [
"front_door"
]
def tearDown(self):
self.app.dependency_overrides.clear()
super().tearDown()
####################################################################################################################
################################### GET /events Endpoint #########################################################
####################################################################################################################
@ -135,3 +153,143 @@ class TestHttpApp(BaseTestHttp):
assert len(events) == 2
assert events[0]["id"] == id
assert events[1]["id"] == id2
def test_get_good_event(self):
id = "123456.random"
with TestClient(self.app) as client:
super().insert_mock_event(id)
event = client.get(f"/events/{id}").json()
assert event
assert event["id"] == id
assert event["id"] == model_to_dict(Event.get(Event.id == id))["id"]
def test_get_bad_event(self):
id = "123456.random"
bad_id = "654321.other"
with TestClient(self.app) as client:
super().insert_mock_event(id)
event_response = client.get(f"/events/{bad_id}")
assert event_response.status_code == 404
assert event_response.json() == "Event not found"
def test_delete_event(self):
id = "123456.random"
with TestClient(self.app) as client:
super().insert_mock_event(id)
event = client.get(f"/events/{id}").json()
assert event
assert event["id"] == id
response = client.delete(f"/events/{id}", headers={"remote-role": "admin"})
assert response.status_code == 200
event_after_delete = client.get(f"/events/{id}")
assert event_after_delete.status_code == 404
def test_event_retention(self):
id = "123456.random"
with TestClient(self.app) as client:
super().insert_mock_event(id)
client.post(f"/events/{id}/retain", headers={"remote-role": "admin"})
event = client.get(f"/events/{id}").json()
assert event
assert event["id"] == id
assert event["retain_indefinitely"] is True
client.delete(f"/events/{id}/retain", headers={"remote-role": "admin"})
event = client.get(f"/events/{id}").json()
assert event
assert event["id"] == id
assert event["retain_indefinitely"] is False
def test_event_time_filtering(self):
morning_id = "123456.random"
evening_id = "654321.random"
morning = 1656590400 # 06/30/2022 6 am (GMT)
evening = 1656633600 # 06/30/2022 6 pm (GMT)
with TestClient(self.app) as client:
super().insert_mock_event(morning_id, morning)
super().insert_mock_event(evening_id, evening)
# both events come back
events = client.get("/events").json()
print("events!!!", events)
assert events
assert len(events) == 2
# morning event is excluded
events = client.get(
"/events",
params={"time_range": "07:00,24:00"},
).json()
assert events
assert len(events) == 1
# evening event is excluded
events = client.get(
"/events",
params={"time_range": "00:00,18:00"},
).json()
assert events
assert len(events) == 1
def test_set_delete_sub_label(self):
mock_event_updater = Mock(spec=EventMetadataPublisher)
app = super().create_app(event_metadata_publisher=mock_event_updater)
id = "123456.random"
sub_label = "sub"
def update_event(payload: Any, topic: str):
event = Event.get(id=id)
event.sub_label = payload[1]
event.save()
mock_event_updater.publish.side_effect = update_event
with TestClient(app) as client:
super().insert_mock_event(id)
new_sub_label_response = client.post(
f"/events/{id}/sub_label",
json={"subLabel": sub_label},
headers={"remote-role": "admin"},
)
assert new_sub_label_response.status_code == 200
event = client.get(f"/events/{id}").json()
assert event
assert event["id"] == id
assert event["sub_label"] == sub_label
empty_sub_label_response = client.post(
f"/events/{id}/sub_label",
json={"subLabel": ""},
headers={"remote-role": "admin"},
)
assert empty_sub_label_response.status_code == 200
event = client.get(f"/events/{id}").json()
assert event
assert event["id"] == id
assert event["sub_label"] == None
def test_sub_label_list(self):
mock_event_updater = Mock(spec=EventMetadataPublisher)
app = super().create_app(event_metadata_publisher=mock_event_updater)
app.event_metadata_publisher = mock_event_updater
id = "123456.random"
sub_label = "sub"
def update_event(payload: Any, _: str):
event = Event.get(id=id)
event.sub_label = payload[1]
event.save()
mock_event_updater.publish.side_effect = update_event
with TestClient(app) as client:
super().insert_mock_event(id)
client.post(
f"/events/{id}/sub_label",
json={"subLabel": sub_label},
headers={"remote-role": "admin"},
)
sub_labels = client.get("/sub_labels").json()
assert sub_labels
assert sub_labels == [sub_label]

View File

@ -1,329 +0,0 @@
import datetime
import logging
import os
import unittest
from typing import Any
from unittest.mock import Mock
from fastapi.testclient import TestClient
from peewee_migrate import Router
from playhouse.shortcuts import model_to_dict
from playhouse.sqlite_ext import SqliteExtDatabase
from playhouse.sqliteq import SqliteQueueDatabase
from frigate.api.fastapi_app import create_fastapi_app
from frigate.comms.event_metadata_updater import EventMetadataPublisher
from frigate.config import FrigateConfig
from frigate.const import BASE_DIR, CACHE_DIR
from frigate.models import Event, Recordings, Timeline
from frigate.test.const import TEST_DB, TEST_DB_CLEANUPS
class TestHttp(unittest.TestCase):
def setUp(self):
# setup clean database for each test run
migrate_db = SqliteExtDatabase("test.db")
del logging.getLogger("peewee_migrate").handlers[:]
router = Router(migrate_db)
router.run()
migrate_db.close()
self.db = SqliteQueueDatabase(TEST_DB)
models = [Event, Recordings, Timeline]
self.db.bind(models)
self.minimal_config = {
"mqtt": {"host": "mqtt"},
"cameras": {
"front_door": {
"ffmpeg": {
"inputs": [
{"path": "rtsp://10.0.0.1:554/video", "roles": ["detect"]}
]
},
"detect": {
"height": 1080,
"width": 1920,
"fps": 5,
},
}
},
}
self.test_stats = {
"camera_fps": 5.0,
"process_fps": 5.0,
"skipped_fps": 0.0,
"detection_fps": 13.7,
"detectors": {
"cpu1": {
"detection_start": 0.0,
"inference_speed": 91.43,
"pid": 42,
},
"cpu2": {
"detection_start": 0.0,
"inference_speed": 84.99,
"pid": 44,
},
},
"front_door": {
"camera_fps": 0.0,
"capture_pid": 53,
"detection_fps": 0.0,
"pid": 52,
"process_fps": 0.0,
"skipped_fps": 0.0,
},
"service": {
"storage": {
"/dev/shm": {
"free": 50.5,
"mount_type": "tmpfs",
"total": 67.1,
"used": 16.6,
},
os.path.join(BASE_DIR, "clips"): {
"free": 42429.9,
"mount_type": "ext4",
"total": 244529.7,
"used": 189607.0,
},
os.path.join(BASE_DIR, "recordings"): {
"free": 0.2,
"mount_type": "ext4",
"total": 8.0,
"used": 7.8,
},
CACHE_DIR: {
"free": 976.8,
"mount_type": "tmpfs",
"total": 1000.0,
"used": 23.2,
},
},
"uptime": 101113,
"version": "0.10.1",
"latest_version": "0.11",
},
}
def tearDown(self):
if not self.db.is_closed():
self.db.close()
try:
for file in TEST_DB_CLEANUPS:
os.remove(file)
except OSError:
pass
def __init_app(self, updater: Any | None = None) -> Any:
return create_fastapi_app(
FrigateConfig(**self.minimal_config),
self.db,
None,
None,
None,
None,
None,
updater,
None,
)
def test_get_good_event(self):
app = self.__init_app()
id = "123456.random"
with TestClient(app) as client:
_insert_mock_event(id)
event = client.get(f"/events/{id}").json()
assert event
assert event["id"] == id
assert event["id"] == model_to_dict(Event.get(Event.id == id))["id"]
def test_get_bad_event(self):
app = self.__init_app()
id = "123456.random"
bad_id = "654321.other"
with TestClient(app) as client:
_insert_mock_event(id)
event_response = client.get(f"/events/{bad_id}")
assert event_response.status_code == 404
assert event_response.json() == "Event not found"
def test_delete_event(self):
app = self.__init_app()
id = "123456.random"
with TestClient(app) as client:
_insert_mock_event(id)
event = client.get(f"/events/{id}").json()
assert event
assert event["id"] == id
client.delete(f"/events/{id}", headers={"remote-role": "admin"})
event = client.get(f"/events/{id}").json()
assert event == "Event not found"
def test_event_retention(self):
app = self.__init_app()
id = "123456.random"
with TestClient(app) as client:
_insert_mock_event(id)
client.post(f"/events/{id}/retain", headers={"remote-role": "admin"})
event = client.get(f"/events/{id}").json()
assert event
assert event["id"] == id
assert event["retain_indefinitely"] is True
client.delete(f"/events/{id}/retain", headers={"remote-role": "admin"})
event = client.get(f"/events/{id}").json()
assert event
assert event["id"] == id
assert event["retain_indefinitely"] is False
def test_event_time_filtering(self):
app = self.__init_app()
morning_id = "123456.random"
evening_id = "654321.random"
morning = 1656590400 # 06/30/2022 6 am (GMT)
evening = 1656633600 # 06/30/2022 6 pm (GMT)
with TestClient(app) as client:
_insert_mock_event(morning_id, morning)
_insert_mock_event(evening_id, evening)
# both events come back
events = client.get("/events").json()
assert events
assert len(events) == 2
# morning event is excluded
events = client.get(
"/events",
params={"time_range": "07:00,24:00"},
).json()
assert events
# assert len(events) == 1
# evening event is excluded
events = client.get(
"/events",
params={"time_range": "00:00,18:00"},
).json()
assert events
assert len(events) == 1
def test_set_delete_sub_label(self):
mock_event_updater = Mock(spec=EventMetadataPublisher)
app = app = self.__init_app(updater=mock_event_updater)
id = "123456.random"
sub_label = "sub"
def update_event(payload: Any, topic: str):
event = Event.get(id=id)
event.sub_label = payload[1]
event.save()
mock_event_updater.publish.side_effect = update_event
with TestClient(app) as client:
_insert_mock_event(id)
new_sub_label_response = client.post(
f"/events/{id}/sub_label",
json={"subLabel": sub_label},
headers={"remote-role": "admin"},
)
assert new_sub_label_response.status_code == 200
event = client.get(f"/events/{id}").json()
assert event
assert event["id"] == id
assert event["sub_label"] == sub_label
empty_sub_label_response = client.post(
f"/events/{id}/sub_label",
json={"subLabel": ""},
headers={"remote-role": "admin"},
)
assert empty_sub_label_response.status_code == 200
event = client.get(f"/events/{id}").json()
assert event
assert event["id"] == id
assert event["sub_label"] == None
def test_sub_label_list(self):
mock_event_updater = Mock(spec=EventMetadataPublisher)
app = self.__init_app(updater=mock_event_updater)
id = "123456.random"
sub_label = "sub"
def update_event(payload: Any, _: str):
event = Event.get(id=id)
event.sub_label = payload[1]
event.save()
mock_event_updater.publish.side_effect = update_event
with TestClient(app) as client:
_insert_mock_event(id)
client.post(
f"/events/{id}/sub_label",
json={"subLabel": sub_label},
headers={"remote-role": "admin"},
)
sub_labels = client.get("/sub_labels").json()
assert sub_labels
assert sub_labels == [sub_label]
def test_config(self):
app = self.__init_app()
with TestClient(app) as client:
config = client.get("/config").json()
assert config
assert config["cameras"]["front_door"]
def test_recordings(self):
app = self.__init_app()
id = "123456.random"
with TestClient(app) as client:
_insert_mock_recording(id)
response = client.get("/front_door/recordings")
assert response.status_code == 200
recording = response.json()
assert recording
assert recording[0]["id"] == id
def _insert_mock_event(
id: str,
start_time: datetime.datetime = datetime.datetime.now().timestamp(),
) -> Event:
"""Inserts a basic event model with a given id."""
return Event.insert(
id=id,
label="Mock",
camera="front_door",
start_time=start_time,
end_time=start_time + 20,
top_score=100,
false_positive=False,
zones=list(),
thumbnail="",
region=[],
box=[],
area=0,
has_clip=True,
has_snapshot=True,
).execute()
def _insert_mock_recording(id: str) -> Event:
"""Inserts a basic recording model with a given id."""
return Recordings.insert(
id=id,
camera="front_door",
path=f"/recordings/{id}",
start_time=datetime.datetime.now().timestamp() - 60,
end_time=datetime.datetime.now().timestamp() - 50,
duration=10,
motion=True,
objects=True,
).execute()