frigate/frigate/api/motion_search.py
Josh Hawkins 2babfd2ec9
Improve motion review and add motion search (#22253)
* implement motion search and motion previews

* tweaks

* fix merge issue

* fix copilot instructions
2026-03-05 17:53:48 -06:00

293 lines
8.9 KiB
Python

"""Motion search API for detecting changes within a region of interest."""
import logging
from typing import Any, List, Optional
from fastapi import APIRouter, Depends, Request
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field
from frigate.api.auth import require_camera_access
from frigate.api.defs.tags import Tags
from frigate.jobs.motion_search import (
cancel_motion_search_job,
get_motion_search_job,
start_motion_search_job,
)
from frigate.types import JobStatusTypesEnum
logger = logging.getLogger(__name__)
router = APIRouter(tags=[Tags.motion_search])
class MotionSearchRequest(BaseModel):
"""Request body for motion search."""
start_time: float = Field(description="Start timestamp for the search range")
end_time: float = Field(description="End timestamp for the search range")
polygon_points: List[List[float]] = Field(
description="List of [x, y] normalized coordinates (0-1) defining the ROI polygon"
)
threshold: int = Field(
default=30,
ge=1,
le=255,
description="Pixel difference threshold (1-255)",
)
min_area: float = Field(
default=5.0,
ge=0.1,
le=100.0,
description="Minimum change area as a percentage of the ROI",
)
frame_skip: int = Field(
default=5,
ge=1,
le=30,
description="Process every Nth frame (1=all frames, 5=every 5th frame)",
)
parallel: bool = Field(
default=False,
description="Enable parallel scanning across segments",
)
max_results: int = Field(
default=25,
ge=1,
le=200,
description="Maximum number of search results to return",
)
class MotionSearchResult(BaseModel):
"""A single search result with timestamp and change info."""
timestamp: float = Field(description="Timestamp where change was detected")
change_percentage: float = Field(description="Percentage of ROI area that changed")
class MotionSearchMetricsResponse(BaseModel):
"""Metrics collected during motion search execution."""
segments_scanned: int = 0
segments_processed: int = 0
metadata_inactive_segments: int = 0
heatmap_roi_skip_segments: int = 0
fallback_full_range_segments: int = 0
frames_decoded: int = 0
wall_time_seconds: float = 0.0
segments_with_errors: int = 0
class MotionSearchStartResponse(BaseModel):
"""Response when motion search job starts."""
success: bool
message: str
job_id: str
class MotionSearchStatusResponse(BaseModel):
"""Response containing job status and results."""
success: bool
message: str
status: str # "queued", "running", "success", "failed", or "cancelled"
results: Optional[List[MotionSearchResult]] = None
total_frames_processed: Optional[int] = None
error_message: Optional[str] = None
metrics: Optional[MotionSearchMetricsResponse] = None
@router.post(
"/{camera_name}/search/motion",
response_model=MotionSearchStartResponse,
dependencies=[Depends(require_camera_access)],
summary="Start motion search job",
description="""Starts an asynchronous search for significant motion changes within
a user-defined Region of Interest (ROI) over a specified time range. Returns a job_id
that can be used to poll for results.""",
)
async def start_motion_search(
request: Request,
camera_name: str,
body: MotionSearchRequest,
):
"""Start an async motion search job."""
config = request.app.frigate_config
if camera_name not in config.cameras:
return JSONResponse(
content={"success": False, "message": f"Camera {camera_name} not found"},
status_code=404,
)
# Validate polygon has at least 3 points
if len(body.polygon_points) < 3:
return JSONResponse(
content={
"success": False,
"message": "Polygon must have at least 3 points",
},
status_code=400,
)
# Validate time range
if body.start_time >= body.end_time:
return JSONResponse(
content={
"success": False,
"message": "Start time must be before end time",
},
status_code=400,
)
# Start the job using the jobs module
job_id = start_motion_search_job(
config=config,
camera_name=camera_name,
start_time=body.start_time,
end_time=body.end_time,
polygon_points=body.polygon_points,
threshold=body.threshold,
min_area=body.min_area,
frame_skip=body.frame_skip,
parallel=body.parallel,
max_results=body.max_results,
)
return JSONResponse(
content={
"success": True,
"message": "Search job started",
"job_id": job_id,
}
)
@router.get(
"/{camera_name}/search/motion/{job_id}",
response_model=MotionSearchStatusResponse,
dependencies=[Depends(require_camera_access)],
summary="Get motion search job status",
description="Returns the status and results (if complete) of a motion search job.",
)
async def get_motion_search_status_endpoint(
request: Request,
camera_name: str,
job_id: str,
):
"""Get the status of a motion search job."""
config = request.app.frigate_config
if camera_name not in config.cameras:
return JSONResponse(
content={"success": False, "message": f"Camera {camera_name} not found"},
status_code=404,
)
job = get_motion_search_job(job_id)
if not job:
return JSONResponse(
content={"success": False, "message": "Job not found"},
status_code=404,
)
api_status = job.status
# Build response content
response_content: dict[str, Any] = {
"success": api_status != JobStatusTypesEnum.failed,
"status": api_status,
}
if api_status == JobStatusTypesEnum.failed:
response_content["message"] = job.error_message or "Search failed"
response_content["error_message"] = job.error_message
elif api_status == JobStatusTypesEnum.cancelled:
response_content["message"] = "Search cancelled"
response_content["total_frames_processed"] = job.total_frames_processed
elif api_status == JobStatusTypesEnum.success:
response_content["message"] = "Search complete"
if job.results:
response_content["results"] = job.results.get("results", [])
response_content["total_frames_processed"] = job.results.get(
"total_frames_processed", job.total_frames_processed
)
else:
response_content["results"] = []
response_content["total_frames_processed"] = job.total_frames_processed
else:
response_content["message"] = "Job processing"
response_content["total_frames_processed"] = job.total_frames_processed
# Include partial results if available (streaming)
if job.results:
response_content["results"] = job.results.get("results", [])
response_content["total_frames_processed"] = job.results.get(
"total_frames_processed", job.total_frames_processed
)
# Include metrics if available
if job.metrics:
response_content["metrics"] = job.metrics.to_dict()
return JSONResponse(content=response_content)
@router.post(
"/{camera_name}/search/motion/{job_id}/cancel",
dependencies=[Depends(require_camera_access)],
summary="Cancel motion search job",
description="Cancels an active motion search job if it is still processing.",
)
async def cancel_motion_search_endpoint(
request: Request,
camera_name: str,
job_id: str,
):
"""Cancel an active motion search job."""
config = request.app.frigate_config
if camera_name not in config.cameras:
return JSONResponse(
content={"success": False, "message": f"Camera {camera_name} not found"},
status_code=404,
)
job = get_motion_search_job(job_id)
if not job:
return JSONResponse(
content={"success": False, "message": "Job not found"},
status_code=404,
)
# Check if already finished
api_status = job.status
if api_status not in (JobStatusTypesEnum.queued, JobStatusTypesEnum.running):
return JSONResponse(
content={
"success": True,
"message": "Job already finished",
"status": api_status,
}
)
# Request cancellation
cancelled = cancel_motion_search_job(job_id)
if cancelled:
return JSONResponse(
content={
"success": True,
"message": "Search cancelled",
"status": "cancelled",
}
)
return JSONResponse(
content={
"success": False,
"message": "Failed to cancel job",
},
status_code=500,
)