mirror of
https://github.com/blakeblackshear/frigate.git
synced 2026-05-09 06:55:28 +03:00
tests
This commit is contained in:
parent
549642e97e
commit
eeff59216b
@ -1,5 +1,6 @@
|
||||
from unittest.mock import patch
|
||||
|
||||
from frigate.jobs.export import ExportJob
|
||||
from frigate.models import Export, ExportCase, Previews, Recordings
|
||||
from frigate.test.http_api.base_http_test import AuthTestClient, BaseTestHttp
|
||||
|
||||
@ -91,7 +92,10 @@ class TestHttpExport(BaseTestHttp):
|
||||
def test_batch_export_creates_case_and_reports_partial_success(self):
|
||||
self._insert_recording("rec-front", "front_door", 100, 200)
|
||||
|
||||
with patch("frigate.api.export._start_exporter") as start_exporter:
|
||||
with patch(
|
||||
"frigate.api.export.start_export_job",
|
||||
side_effect=lambda _config, job: job.id,
|
||||
) as start_export_job:
|
||||
with AuthTestClient(self.app) as client:
|
||||
response = client.post(
|
||||
"/exports/batch",
|
||||
@ -105,7 +109,7 @@ class TestHttpExport(BaseTestHttp):
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
assert response.status_code == 202
|
||||
response_json = response.json()
|
||||
assert len(response_json["export_ids"]) == 1
|
||||
assert response_json["results"] == [
|
||||
@ -113,21 +117,116 @@ class TestHttpExport(BaseTestHttp):
|
||||
"camera": "front_door",
|
||||
"export_id": response_json["export_ids"][0],
|
||||
"success": True,
|
||||
"status": "queued",
|
||||
"error": None,
|
||||
},
|
||||
{
|
||||
"camera": "backyard",
|
||||
"export_id": None,
|
||||
"success": False,
|
||||
"status": None,
|
||||
"error": "No recordings found for time range",
|
||||
},
|
||||
]
|
||||
start_exporter.assert_called_once()
|
||||
start_export_job.assert_called_once()
|
||||
|
||||
case = ExportCase.get(ExportCase.id == response_json["export_case_id"])
|
||||
assert case.name == "Case Alpha"
|
||||
assert case.description == "Batch export"
|
||||
|
||||
def test_single_export_is_queued_immediately(self):
|
||||
self._insert_recording("rec-front", "front_door", 100, 200)
|
||||
|
||||
with patch(
|
||||
"frigate.api.export.start_export_job",
|
||||
side_effect=lambda _config, job: job.id,
|
||||
) as start_export_job:
|
||||
with AuthTestClient(self.app) as client:
|
||||
response = client.post(
|
||||
"/export/front_door/start/110/end/150",
|
||||
json={
|
||||
"name": "Queued export",
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == 202
|
||||
response_json = response.json()
|
||||
assert response_json["success"] is True
|
||||
assert response_json["status"] == "queued"
|
||||
assert response_json["export_id"].startswith("front_door_")
|
||||
start_export_job.assert_called_once()
|
||||
|
||||
def test_single_export_returns_503_when_queue_full(self):
|
||||
self._insert_recording("rec-front", "front_door", 100, 200)
|
||||
|
||||
from frigate.jobs.export import ExportQueueFullError
|
||||
|
||||
with patch(
|
||||
"frigate.api.export.start_export_job",
|
||||
side_effect=ExportQueueFullError("Export queue is full"),
|
||||
):
|
||||
with AuthTestClient(self.app) as client:
|
||||
response = client.post(
|
||||
"/export/front_door/start/110/end/150",
|
||||
json={
|
||||
"name": "Rejected export",
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == 503
|
||||
response_json = response.json()
|
||||
assert response_json["success"] is False
|
||||
assert "queue is full" in response_json["message"].lower()
|
||||
|
||||
def test_batch_export_returns_503_when_queue_cannot_fit_batch(self):
|
||||
self._insert_recording("rec-front", "front_door", 100, 200)
|
||||
self._insert_recording("rec-back", "backyard", 100, 200)
|
||||
|
||||
with patch(
|
||||
"frigate.api.export.available_export_queue_slots",
|
||||
return_value=1,
|
||||
):
|
||||
with patch(
|
||||
"frigate.api.export.start_export_job",
|
||||
side_effect=lambda _config, job: job.id,
|
||||
) as start_export_job:
|
||||
with AuthTestClient(self.app) as client:
|
||||
response = client.post(
|
||||
"/exports/batch",
|
||||
json={
|
||||
"start_time": 110,
|
||||
"end_time": 150,
|
||||
"camera_ids": ["front_door", "backyard"],
|
||||
"new_case_name": "Overflow Case",
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == 503
|
||||
assert response.json()["success"] is False
|
||||
start_export_job.assert_not_called()
|
||||
|
||||
# Empty case should NOT have been created
|
||||
assert ExportCase.select().count() == 0
|
||||
|
||||
def test_get_active_export_jobs_returns_queue_state(self):
|
||||
queued_job = ExportJob(
|
||||
id="front_door_queued",
|
||||
camera="front_door",
|
||||
status="queued",
|
||||
request_start_time=100,
|
||||
request_end_time=150,
|
||||
)
|
||||
|
||||
with patch(
|
||||
"frigate.api.export.list_active_export_jobs",
|
||||
return_value=[queued_job],
|
||||
):
|
||||
with AuthTestClient(self.app) as client:
|
||||
response = client.get("/jobs/export")
|
||||
|
||||
assert response.status_code == 200
|
||||
assert response.json() == [queued_job.to_dict()]
|
||||
|
||||
def test_batch_export_requires_case_target(self):
|
||||
with AuthTestClient(self.app) as client:
|
||||
response = client.post(
|
||||
|
||||
Loading…
Reference in New Issue
Block a user