Api docs updates (#20388)

* Update classification API docs

* Add information to events api

* Fix tag

* Add exports

* Add generic response to model for classification apis

* Add preview API information

* Cleanup

* Cleanup
This commit is contained in:
Nicolas Mowen 2025-10-08 13:55:38 -06:00 committed by GitHub
parent 28e3f83ae3
commit 3c7e36fb16
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 1932 additions and 618 deletions

File diff suppressed because it is too large Load Diff

View File

@ -16,8 +16,14 @@ from playhouse.shortcuts import model_to_dict
from frigate.api.auth import require_role
from frigate.api.defs.request.classification_body import (
AudioTranscriptionBody,
DeleteFaceImagesBody,
RenameFaceBody,
)
from frigate.api.defs.response.classification_response import (
FaceRecognitionResponse,
FacesResponse,
)
from frigate.api.defs.response.generic_response import GenericResponse
from frigate.api.defs.tags import Tags
from frigate.config import FrigateConfig
from frigate.config.camera import DetectConfig
@ -28,10 +34,18 @@ from frigate.util.path import get_event_snapshot
logger = logging.getLogger(__name__)
router = APIRouter(tags=[Tags.events])
router = APIRouter(tags=[Tags.classification])
@router.get("/faces")
@router.get(
"/faces",
response_model=FacesResponse,
summary="Get all registered faces",
description="""Returns a dictionary mapping face names to lists of image filenames.
Each key represents a registered face name, and the value is a list of image
files associated with that face. Supported image formats include .webp, .png,
.jpg, and .jpeg.""",
)
def get_faces():
face_dict: dict[str, list[str]] = {}
@ -55,7 +69,15 @@ def get_faces():
return JSONResponse(status_code=200, content=face_dict)
@router.post("/faces/reprocess", dependencies=[Depends(require_role(["admin"]))])
@router.post(
"/faces/reprocess",
dependencies=[Depends(require_role(["admin"]))],
summary="Reprocess a face training image",
description="""Reprocesses a face training image to update the prediction.
Requires face recognition to be enabled in the configuration. The training file
must exist in the faces/train directory. Returns a success response or an error
message if face recognition is not enabled or the training file is invalid.""",
)
def reclassify_face(request: Request, body: dict = None):
if not request.app.frigate_config.face_recognition.enabled:
return JSONResponse(
@ -88,7 +110,17 @@ def reclassify_face(request: Request, body: dict = None):
)
@router.post("/faces/train/{name}/classify")
@router.post(
"/faces/train/{name}/classify",
response_model=GenericResponse,
summary="Classify and save a face training image",
description="""Adds a training image to a specific face name for face recognition.
Accepts either a training file from the train directory or an event_id to extract
the face from. The image is saved to the face's directory and the face classifier
is cleared to incorporate the new training data. Returns a success message with
the new filename or an error if face recognition is not enabled, the file/event
is invalid, or the face cannot be extracted.""",
)
def train_face(request: Request, name: str, body: dict = None):
if not request.app.frigate_config.face_recognition.enabled:
return JSONResponse(
@ -192,7 +224,16 @@ def train_face(request: Request, name: str, body: dict = None):
)
@router.post("/faces/{name}/create", dependencies=[Depends(require_role(["admin"]))])
@router.post(
"/faces/{name}/create",
response_model=GenericResponse,
dependencies=[Depends(require_role(["admin"]))],
summary="Create a new face name",
description="""Creates a new folder for a face name in the faces directory.
This is used to organize face training images. The face name is sanitized and
spaces are replaced with underscores. Returns a success message or an error if
face recognition is not enabled.""",
)
async def create_face(request: Request, name: str):
if not request.app.frigate_config.face_recognition.enabled:
return JSONResponse(
@ -209,7 +250,16 @@ async def create_face(request: Request, name: str):
)
@router.post("/faces/{name}/register", dependencies=[Depends(require_role(["admin"]))])
@router.post(
"/faces/{name}/register",
response_model=GenericResponse,
dependencies=[Depends(require_role(["admin"]))],
summary="Register a face image",
description="""Registers a face image for a specific face name by uploading an image file.
The uploaded image is processed and added to the face recognition system. Returns a
success response with details about the registration, or an error if face recognition
is not enabled or the image cannot be processed.""",
)
async def register_face(request: Request, name: str, file: UploadFile):
if not request.app.frigate_config.face_recognition.enabled:
return JSONResponse(
@ -235,7 +285,14 @@ async def register_face(request: Request, name: str, file: UploadFile):
)
@router.post("/faces/recognize")
@router.post(
"/faces/recognize",
response_model=FaceRecognitionResponse,
summary="Recognize a face from an uploaded image",
description="""Recognizes a face from an uploaded image file by comparing it against
registered faces in the system. Returns the recognized face name and confidence score,
or an error if face recognition is not enabled or the image cannot be processed.""",
)
async def recognize_face(request: Request, file: UploadFile):
if not request.app.frigate_config.face_recognition.enabled:
return JSONResponse(
@ -261,28 +318,38 @@ async def recognize_face(request: Request, file: UploadFile):
)
@router.post("/faces/{name}/delete", dependencies=[Depends(require_role(["admin"]))])
def deregister_faces(request: Request, name: str, body: dict = None):
@router.post(
"/faces/{name}/delete",
response_model=GenericResponse,
dependencies=[Depends(require_role(["admin"]))],
summary="Delete face images",
description="""Deletes specific face images for a given face name. The image IDs must belong
to the specified face folder. To delete an entire face folder, all image IDs in that
folder must be sent. Returns a success message or an error if face recognition is not enabled.""",
)
def deregister_faces(request: Request, name: str, body: DeleteFaceImagesBody):
if not request.app.frigate_config.face_recognition.enabled:
return JSONResponse(
status_code=400,
content={"message": "Face recognition is not enabled.", "success": False},
)
json: dict[str, Any] = body or {}
list_of_ids = json.get("ids", "")
context: EmbeddingsContext = request.app.embeddings
context.delete_face_ids(
name, map(lambda file: sanitize_filename(file), list_of_ids)
)
context.delete_face_ids(name, map(lambda file: sanitize_filename(file), body.ids))
return JSONResponse(
content=({"success": True, "message": "Successfully deleted faces."}),
status_code=200,
)
@router.put("/faces/{old_name}/rename", dependencies=[Depends(require_role(["admin"]))])
@router.put(
"/faces/{old_name}/rename",
response_model=GenericResponse,
dependencies=[Depends(require_role(["admin"]))],
summary="Rename a face name",
description="""Renames a face name in the system. The old name must exist and the new
name must be valid. Returns a success message or an error if face recognition is not enabled.""",
)
def rename_face(request: Request, old_name: str, body: RenameFaceBody):
if not request.app.frigate_config.face_recognition.enabled:
return JSONResponse(
@ -311,7 +378,14 @@ def rename_face(request: Request, old_name: str, body: RenameFaceBody):
)
@router.put("/lpr/reprocess")
@router.put(
"/lpr/reprocess",
summary="Reprocess a license plate",
description="""Reprocesses a license plate image to update the plate.
Requires license plate recognition to be enabled in the configuration. The event_id
must exist in the database. Returns a success message or an error if license plate
recognition is not enabled or the event_id is invalid.""",
)
def reprocess_license_plate(request: Request, event_id: str):
if not request.app.frigate_config.lpr.enabled:
message = "License plate recognition is not enabled."
@ -344,7 +418,14 @@ def reprocess_license_plate(request: Request, event_id: str):
)
@router.put("/reindex", dependencies=[Depends(require_role(["admin"]))])
@router.put(
"/reindex",
response_model=GenericResponse,
dependencies=[Depends(require_role(["admin"]))],
summary="Reindex embeddings",
description="""Reindexes the embeddings for all tracked objects.
Requires semantic search to be enabled in the configuration. Returns a success message or an error if semantic search is not enabled.""",
)
def reindex_embeddings(request: Request):
if not request.app.frigate_config.semantic_search.enabled:
message = (
@ -390,7 +471,14 @@ def reindex_embeddings(request: Request):
)
@router.put("/audio/transcribe")
@router.put(
"/audio/transcribe",
response_model=GenericResponse,
summary="Transcribe audio",
description="""Transcribes audio from a specific event.
Requires audio transcription to be enabled in the configuration. The event_id
must exist in the database. Returns a success message or an error if audio transcription is not enabled or the event_id is invalid.""",
)
def transcribe_audio(request: Request, body: AudioTranscriptionBody):
event_id = body.event_id
@ -448,7 +536,12 @@ def transcribe_audio(request: Request, body: AudioTranscriptionBody):
# custom classification training
@router.get("/classification/{name}/dataset")
@router.get(
"/classification/{name}/dataset",
summary="Get classification dataset",
description="""Gets the dataset for a specific classification model.
The name must exist in the classification models. Returns a success message or an error if the name is invalid.""",
)
def get_classification_dataset(name: str):
dataset_dict: dict[str, list[str]] = {}
@ -474,7 +567,12 @@ def get_classification_dataset(name: str):
return JSONResponse(status_code=200, content=dataset_dict)
@router.get("/classification/{name}/train")
@router.get(
"/classification/{name}/train",
summary="Get classification train images",
description="""Gets the train images for a specific classification model.
The name must exist in the classification models. Returns a success message or an error if the name is invalid.""",
)
def get_classification_images(name: str):
train_dir = os.path.join(CLIPS_DIR, sanitize_filename(name), "train")
@ -492,7 +590,13 @@ def get_classification_images(name: str):
)
@router.post("/classification/{name}/train")
@router.post(
"/classification/{name}/train",
response_model=GenericResponse,
summary="Train a classification model",
description="""Trains a specific classification model.
The name must exist in the classification models. Returns a success message or an error if the name is invalid.""",
)
async def train_configured_model(request: Request, name: str):
config: FrigateConfig = request.app.frigate_config
@ -517,7 +621,11 @@ async def train_configured_model(request: Request, name: str):
@router.post(
"/classification/{name}/dataset/{category}/delete",
response_model=GenericResponse,
dependencies=[Depends(require_role(["admin"]))],
summary="Delete classification dataset images",
description="""Deletes specific dataset images for a given classification model and category.
The image IDs must belong to the specified category. Returns a success message or an error if the name or category is invalid.""",
)
def delete_classification_dataset_images(
request: Request, name: str, category: str, body: dict = None
@ -555,7 +663,11 @@ def delete_classification_dataset_images(
@router.post(
"/classification/{name}/dataset/categorize",
response_model=GenericResponse,
dependencies=[Depends(require_role(["admin"]))],
summary="Categorize a classification image",
description="""Categorizes a specific classification image for a given classification model and category.
The image must exist in the specified category. Returns a success message or an error if the name or category is invalid.""",
)
def categorize_classification_image(request: Request, name: str, body: dict = None):
config: FrigateConfig = request.app.frigate_config
@ -610,7 +722,11 @@ def categorize_classification_image(request: Request, name: str, body: dict = No
@router.post(
"/classification/{name}/train/delete",
response_model=GenericResponse,
dependencies=[Depends(require_role(["admin"]))],
summary="Delete classification train images",
description="""Deletes specific train images for a given classification model.
The image IDs must belong to the specified train folder. Returns a success message or an error if the name is invalid.""",
)
def delete_classification_train_images(request: Request, name: str, body: dict = None):
config: FrigateConfig = request.app.frigate_config

View File

@ -1,4 +1,6 @@
from pydantic import BaseModel
from typing import List
from pydantic import BaseModel, Field
class RenameFaceBody(BaseModel):
@ -7,3 +9,9 @@ class RenameFaceBody(BaseModel):
class AudioTranscriptionBody(BaseModel):
event_id: str
class DeleteFaceImagesBody(BaseModel):
ids: List[str] = Field(
description="List of image filenames to delete from the face folder"
)

View File

@ -0,0 +1,38 @@
from typing import Dict, List, Optional
from pydantic import BaseModel, Field, RootModel
class FacesResponse(RootModel[Dict[str, List[str]]]):
"""Response model for the get_faces endpoint.
Returns a mapping of face names to lists of image filenames.
Each face name corresponds to a directory in the faces folder,
and the list contains the names of image files for that face.
Example:
{
"john_doe": ["face1.webp", "face2.jpg"],
"jane_smith": ["face3.png"]
}
"""
root: Dict[str, List[str]] = Field(
default_factory=dict,
description="Dictionary mapping face names to lists of image filenames",
)
class FaceRecognitionResponse(BaseModel):
"""Response model for face recognition endpoint.
Returns the result of attempting to recognize a face from an uploaded image.
"""
success: bool = Field(description="Whether the face recognition was successful")
score: Optional[float] = Field(
default=None, description="Confidence score of the recognition (0-1)"
)
face_name: Optional[str] = Field(
default=None, description="The recognized face name if successful"
)

View File

@ -0,0 +1,30 @@
from typing import List, Optional
from pydantic import BaseModel, Field
class ExportModel(BaseModel):
"""Model representing a single export."""
id: str = Field(description="Unique identifier for the export")
camera: str = Field(description="Camera name associated with this export")
name: str = Field(description="Friendly name of the export")
date: float = Field(description="Unix timestamp when the export was created")
video_path: str = Field(description="File path to the exported video")
thumb_path: str = Field(description="File path to the export thumbnail")
in_progress: bool = Field(
description="Whether the export is currently being processed"
)
class StartExportResponse(BaseModel):
"""Response model for starting an export."""
success: bool = Field(description="Whether the export was started successfully")
message: str = Field(description="Status or error message")
export_id: Optional[str] = Field(
default=None, description="The export ID if successfully started"
)
ExportsResponse = List[ExportModel]

View File

@ -0,0 +1,17 @@
from typing import List
from pydantic import BaseModel, Field
class PreviewModel(BaseModel):
"""Model representing a single preview clip."""
camera: str = Field(description="Camera name for this preview")
src: str = Field(description="Path to the preview video file")
type: str = Field(description="MIME type of the preview video (video/mp4)")
start: float = Field(description="Unix timestamp when the preview starts")
end: float = Field(description="Unix timestamp when the preview ends")
PreviewsResponse = List[PreviewModel]
PreviewFramesResponse = List[str]

View File

@ -10,5 +10,5 @@ class Tags(Enum):
review = "Review"
export = "Export"
events = "Events"
classification = "classification"
classification = "Classification"
auth = "Auth"

View File

@ -65,7 +65,12 @@ logger = logging.getLogger(__name__)
router = APIRouter(tags=[Tags.events])
@router.get("/events", response_model=list[EventResponse])
@router.get(
"/events",
response_model=list[EventResponse],
summary="Get events",
description="Returns a list of events.",
)
def events(
params: EventsQueryParams = Depends(),
allowed_cameras: List[str] = Depends(get_allowed_cameras_for_filter),
@ -334,7 +339,14 @@ def events(
return JSONResponse(content=list(events))
@router.get("/events/explore", response_model=list[EventResponse])
@router.get(
"/events/explore",
response_model=list[EventResponse],
summary="Get summary of objects.",
description="""Gets a summary of objects from the database.
Returns a list of objects with a max of `limit` objects for each label.
""",
)
def events_explore(
limit: int = 10,
allowed_cameras: List[str] = Depends(get_allowed_cameras_for_filter),
@ -419,7 +431,14 @@ def events_explore(
return JSONResponse(content=processed_events)
@router.get("/event_ids", response_model=list[EventResponse])
@router.get(
"/event_ids",
response_model=list[EventResponse],
summary="Get events by ids.",
description="""Gets events by a list of ids.
Returns a list of events.
""",
)
async def event_ids(ids: str, request: Request):
ids = ids.split(",")
@ -446,7 +465,13 @@ async def event_ids(ids: str, request: Request):
)
@router.get("/events/search")
@router.get(
"/events/search",
summary="Search events.",
description="""Searches for events in the database.
Returns a list of events.
""",
)
def events_search(
request: Request,
params: EventsSearchQueryParams = Depends(),
@ -832,7 +857,12 @@ def events_summary(
return JSONResponse(content=[e for e in groups.dicts()])
@router.get("/events/{event_id}", response_model=EventResponse)
@router.get(
"/events/{event_id}",
response_model=EventResponse,
summary="Get event by id.",
description="Gets an event by its id.",
)
async def event(event_id: str, request: Request):
try:
event = Event.get(Event.id == event_id)
@ -846,6 +876,11 @@ async def event(event_id: str, request: Request):
"/events/{event_id}/retain",
response_model=GenericResponse,
dependencies=[Depends(require_role(["admin"]))],
summary="Set event retain indefinitely.",
description="""Sets an event to retain indefinitely.
Returns a success message or an error if the event is not found.
NOTE: This is a legacy endpoint and is not supported in the frontend.
""",
)
def set_retain(event_id: str):
try:
@ -865,7 +900,14 @@ def set_retain(event_id: str):
)
@router.post("/events/{event_id}/plus", response_model=EventUploadPlusResponse)
@router.post(
"/events/{event_id}/plus",
response_model=EventUploadPlusResponse,
summary="Send event to Frigate+.",
description="""Sends an event to Frigate+.
Returns a success message or an error if the event is not found.
""",
)
async def send_to_plus(request: Request, event_id: str, body: SubmitPlusBody = None):
if not request.app.frigate_config.plus_api.is_active():
message = "PLUS_API_KEY environment variable is not set"
@ -978,7 +1020,14 @@ async def send_to_plus(request: Request, event_id: str, body: SubmitPlusBody = N
)
@router.put("/events/{event_id}/false_positive", response_model=EventUploadPlusResponse)
@router.put(
"/events/{event_id}/false_positive",
response_model=EventUploadPlusResponse,
summary="Submit false positive to Frigate+",
description="""Submit an event as a false positive to Frigate+.
This endpoint is the same as the standard Frigate+ submission endpoint,
but is specifically for marking an event as a false positive.""",
)
async def false_positive(request: Request, event_id: str):
if not request.app.frigate_config.plus_api.is_active():
message = "PLUS_API_KEY environment variable is not set"
@ -1072,6 +1121,11 @@ async def false_positive(request: Request, event_id: str):
"/events/{event_id}/retain",
response_model=GenericResponse,
dependencies=[Depends(require_role(["admin"]))],
summary="Stop event from being retained indefinitely.",
description="""Stops an event from being retained indefinitely.
Returns a success message or an error if the event is not found.
NOTE: This is a legacy endpoint and is not supported in the frontend.
""",
)
async def delete_retain(event_id: str, request: Request):
try:
@ -1096,6 +1150,10 @@ async def delete_retain(event_id: str, request: Request):
"/events/{event_id}/sub_label",
response_model=GenericResponse,
dependencies=[Depends(require_role(["admin"]))],
summary="Set event sub label.",
description="""Sets an event's sub label.
Returns a success message or an error if the event is not found.
""",
)
async def set_sub_label(
request: Request,
@ -1151,6 +1209,10 @@ async def set_sub_label(
"/events/{event_id}/recognized_license_plate",
response_model=GenericResponse,
dependencies=[Depends(require_role(["admin"]))],
summary="Set event license plate.",
description="""Sets an event's license plate.
Returns a success message or an error if the event is not found.
""",
)
async def set_plate(
request: Request,
@ -1207,6 +1269,10 @@ async def set_plate(
"/events/{event_id}/description",
response_model=GenericResponse,
dependencies=[Depends(require_role(["admin"]))],
summary="Set event description.",
description="""Sets an event's description.
Returns a success message or an error if the event is not found.
""",
)
async def set_description(
request: Request,
@ -1259,6 +1325,10 @@ async def set_description(
"/events/{event_id}/description/regenerate",
response_model=GenericResponse,
dependencies=[Depends(require_role(["admin"]))],
summary="Regenerate event description.",
description="""Regenerates an event's description.
Returns a success message or an error if the event is not found.
""",
)
async def regenerate_description(
request: Request, event_id: str, params: RegenerateQueryParameters = Depends()
@ -1308,6 +1378,10 @@ async def regenerate_description(
"/description/generate",
response_model=GenericResponse,
# dependencies=[Depends(require_role(["admin"]))],
summary="Generate description embedding.",
description="""Generates an embedding for an event's description.
Returns a success message or an error if the event is not found.
""",
)
def generate_description_embedding(
request: Request,
@ -1368,6 +1442,10 @@ async def delete_single_event(event_id: str, request: Request) -> dict:
"/events/{event_id}",
response_model=GenericResponse,
dependencies=[Depends(require_role(["admin"]))],
summary="Delete event.",
description="""Deletes an event from the database.
Returns a success message or an error if the event is not found.
""",
)
async def delete_event(request: Request, event_id: str):
result = await delete_single_event(event_id, request)
@ -1379,6 +1457,10 @@ async def delete_event(request: Request, event_id: str):
"/events/",
response_model=EventMultiDeleteResponse,
dependencies=[Depends(require_role(["admin"]))],
summary="Delete events.",
description="""Deletes a list of events from the database.
Returns a success message or an error if the events are not found.
""",
)
async def delete_events(request: Request, body: EventsDeleteBody):
if not body.event_ids:
@ -1409,6 +1491,13 @@ async def delete_events(request: Request, body: EventsDeleteBody):
"/events/{camera_name}/{label}/create",
response_model=EventCreateResponse,
dependencies=[Depends(require_role(["admin"]))],
summary="Create manual event.",
description="""Creates a manual event in the database.
Returns a success message or an error if the event is not found.
NOTES:
- Creating a manual event does not trigger an update to /events MQTT topic.
- If a duration is set to null, the event will need to be ended manually by calling /events/{event_id}/end.
""",
)
def create_event(
request: Request,
@ -1466,6 +1555,11 @@ def create_event(
"/events/{event_id}/end",
response_model=GenericResponse,
dependencies=[Depends(require_role(["admin"]))],
summary="End manual event.",
description="""Ends a manual event.
Returns a success message or an error if the event is not found.
NOTE: This should only be used for manual events.
""",
)
async def end_event(request: Request, event_id: str, body: EventsEndBody):
try:
@ -1493,6 +1587,10 @@ async def end_event(request: Request, event_id: str, body: EventsEndBody):
"/trigger/embedding",
response_model=dict,
dependencies=[Depends(require_role(["admin"]))],
summary="Create trigger embedding.",
description="""Creates a trigger embedding for a specific trigger.
Returns a success message or an error if the trigger is not found.
""",
)
def create_trigger_embedding(
request: Request,
@ -1645,6 +1743,10 @@ def create_trigger_embedding(
"/trigger/embedding/{camera_name}/{name}",
response_model=dict,
dependencies=[Depends(require_role(["admin"]))],
summary="Update trigger embedding.",
description="""Updates a trigger embedding for a specific trigger.
Returns a success message or an error if the trigger is not found.
""",
)
def update_trigger_embedding(
request: Request,
@ -1806,6 +1908,10 @@ def update_trigger_embedding(
"/trigger/embedding/{camera_name}/{name}",
response_model=dict,
dependencies=[Depends(require_role(["admin"]))],
summary="Delete trigger embedding.",
description="""Deletes a trigger embedding for a specific trigger.
Returns a success message or an error if the trigger is not found.
""",
)
def delete_trigger_embedding(
request: Request,
@ -1877,6 +1983,10 @@ def delete_trigger_embedding(
"/triggers/status/{camera_name}",
response_model=dict,
dependencies=[Depends(require_role(["admin"]))],
summary="Get triggers status.",
description="""Gets the status of all triggers for a specific camera.
Returns a success message or an error if the camera is not found.
""",
)
def get_triggers_status(
camera_name: str,

View File

@ -19,6 +19,12 @@ from frigate.api.auth import (
)
from frigate.api.defs.request.export_recordings_body import ExportRecordingsBody
from frigate.api.defs.request.export_rename_body import ExportRenameBody
from frigate.api.defs.response.export_response import (
ExportModel,
ExportsResponse,
StartExportResponse,
)
from frigate.api.defs.response.generic_response import GenericResponse
from frigate.api.defs.tags import Tags
from frigate.const import EXPORT_DIR
from frigate.models import Export, Previews, Recordings
@ -34,7 +40,13 @@ logger = logging.getLogger(__name__)
router = APIRouter(tags=[Tags.export])
@router.get("/exports")
@router.get(
"/exports",
response_model=ExportsResponse,
summary="Get exports",
description="""Gets all exports from the database for cameras the user has access to.
Returns a list of exports ordered by date (most recent first).""",
)
def get_exports(
allowed_cameras: List[str] = Depends(get_allowed_cameras_for_filter),
):
@ -50,7 +62,13 @@ def get_exports(
@router.post(
"/export/{camera_name}/start/{start_time}/end/{end_time}",
response_model=StartExportResponse,
dependencies=[Depends(require_camera_access)],
summary="Start recording export",
description="""Starts an export of a recording for the specified time range.
The export can be from recordings or preview footage. Returns the export ID if
successful, or an error message if the camera is invalid or no recordings/previews
are found for the time range.""",
)
def export_recording(
request: Request,
@ -148,7 +166,13 @@ def export_recording(
@router.patch(
"/export/{event_id}/rename", dependencies=[Depends(require_role(["admin"]))]
"/export/{event_id}/rename",
response_model=GenericResponse,
dependencies=[Depends(require_role(["admin"]))],
summary="Rename export",
description="""Renames an export.
NOTE: This changes the friendly name of the export, not the filename.
""",
)
async def export_rename(event_id: str, body: ExportRenameBody, request: Request):
try:
@ -178,7 +202,12 @@ async def export_rename(event_id: str, body: ExportRenameBody, request: Request)
)
@router.delete("/export/{event_id}", dependencies=[Depends(require_role(["admin"]))])
@router.delete(
"/export/{event_id}",
response_model=GenericResponse,
dependencies=[Depends(require_role(["admin"]))],
summary="Delete export",
)
async def export_delete(event_id: str, request: Request):
try:
export: Export = Export.get(Export.id == event_id)
@ -232,7 +261,13 @@ async def export_delete(event_id: str, request: Request):
)
@router.get("/exports/{export_id}")
@router.get(
"/exports/{export_id}",
response_model=ExportModel,
summary="Get a single export",
description="""Gets a specific export by ID. The user must have access to the camera
associated with the export.""",
)
async def get_export(export_id: str, request: Request):
try:
export = Export.get(Export.id == export_id)

View File

@ -19,7 +19,13 @@ logger = logging.getLogger(__name__)
router = APIRouter(tags=[Tags.notifications])
@router.get("/notifications/pubkey")
@router.get(
"/notifications/pubkey",
summary="Get VAPID public key",
description="""Gets the VAPID public key for the notifications.
Returns the public key or an error if notifications are not enabled.
""",
)
def get_vapid_pub_key(request: Request):
config = request.app.frigate_config
notifications_enabled = config.notifications.enabled
@ -39,7 +45,13 @@ def get_vapid_pub_key(request: Request):
return JSONResponse(content=utils.b64urlencode(raw_pub), status_code=200)
@router.post("/notifications/register")
@router.post(
"/notifications/register",
summary="Register notifications",
description="""Registers a notifications subscription.
Returns a success message or an error if the subscription is not provided.
""",
)
def register_notifications(request: Request, body: dict = None):
if request.app.frigate_config.auth.enabled:
# FIXME: For FastAPI the remote-user is not being populated

View File

@ -9,6 +9,10 @@ from fastapi import APIRouter, Depends
from fastapi.responses import JSONResponse
from frigate.api.auth import require_camera_access
from frigate.api.defs.response.preview_response import (
PreviewFramesResponse,
PreviewsResponse,
)
from frigate.api.defs.tags import Tags
from frigate.const import BASE_DIR, CACHE_DIR, PREVIEW_FRAME_TYPE
from frigate.models import Previews
@ -21,7 +25,13 @@ router = APIRouter(tags=[Tags.preview])
@router.get(
"/preview/{camera_name}/start/{start_ts}/end/{end_ts}",
response_model=PreviewsResponse,
dependencies=[Depends(require_camera_access)],
summary="Get preview clips for time range",
description="""Gets all preview clips for a specified camera and time range.
Returns a list of preview video clips that overlap with the requested time period,
ordered by start time. Use camera_name='all' to get previews from all cameras.
Returns an error if no previews are found.""",
)
def preview_ts(camera_name: str, start_ts: float, end_ts: float):
"""Get all mp4 previews relevant for time period."""
@ -77,7 +87,13 @@ def preview_ts(camera_name: str, start_ts: float, end_ts: float):
@router.get(
"/preview/{year_month}/{day}/{hour}/{camera_name}/{tz_name}",
response_model=PreviewsResponse,
dependencies=[Depends(require_camera_access)],
summary="Get preview clips for specific hour",
description="""Gets all preview clips for a specific hour in a given timezone.
Converts the provided date/time from the specified timezone to UTC and retrieves
all preview clips for that hour. Use camera_name='all' to get previews from all cameras.
The tz_name should be a timezone like 'America/New_York' (use commas instead of slashes).""",
)
def preview_hour(year_month: str, day: int, hour: int, camera_name: str, tz_name: str):
"""Get all mp4 previews relevant for time period given the timezone"""
@ -95,7 +111,12 @@ def preview_hour(year_month: str, day: int, hour: int, camera_name: str, tz_name
@router.get(
"/preview/{camera_name}/start/{start_ts}/end/{end_ts}/frames",
response_model=PreviewFramesResponse,
dependencies=[Depends(require_camera_access)],
summary="Get cached preview frame filenames",
description="""Gets a list of cached preview frame filenames for a specific camera and time range.
Returns an array of filenames for preview frames that fall within the specified time period,
sorted in chronological order. These are individual frame images cached for quick preview display.""",
)
def get_preview_frames_from_cache(camera_name: str, start_ts: float, end_ts: float):
"""Get list of cached preview frames"""