Enable event snapshot API to honour query params

This commit is contained in:
leccelecce 2026-03-10 11:54:20 +00:00
parent 5254bfd00e
commit e85c571a19
5 changed files with 393 additions and 121 deletions

View File

@ -45,8 +45,8 @@ from frigate.const import (
from frigate.models import Event, Previews, Recordings, Regions, ReviewSegment from frigate.models import Event, Previews, Recordings, Regions, ReviewSegment
from frigate.output.preview import get_most_recent_preview_frame from frigate.output.preview import get_most_recent_preview_frame
from frigate.track.object_processing import TrackedObjectProcessor from frigate.track.object_processing import TrackedObjectProcessor
from frigate.util.file import get_event_thumbnail_bytes from frigate.util.file import get_event_snapshot_bytes, get_event_thumbnail_bytes
from frigate.util.image import get_image_from_recording from frigate.util.image import get_image_from_recording, get_image_quality_params
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -147,14 +147,7 @@ async def latest_frame(
"paths": params.paths, "paths": params.paths,
"regions": params.regions, "regions": params.regions,
} }
quality = params.quality quality_params = get_image_quality_params(extension.value, params.quality)
if extension == Extension.png:
quality_params = None
elif extension == Extension.webp:
quality_params = [int(cv2.IMWRITE_WEBP_QUALITY), quality]
else: # jpg or jpeg
quality_params = [int(cv2.IMWRITE_JPEG_QUALITY), quality]
if camera_name in request.app.frigate_config.cameras: if camera_name in request.app.frigate_config.cameras:
frame = frame_processor.get_current_frame(camera_name, draw_options) frame = frame_processor.get_current_frame(camera_name, draw_options)
@ -729,7 +722,7 @@ async def vod_clip(
@router.get( @router.get(
"/events/{event_id}/snapshot.jpg", "/events/{event_id}/snapshot.jpg",
description="Returns a snapshot image for the specified object id. NOTE: The query params only take affect while the event is in-progress. Once the event has ended the snapshot configuration is used.", description="Returns a snapshot image for the specified object id.",
) )
async def event_snapshot( async def event_snapshot(
request: Request, request: Request,
@ -748,11 +741,19 @@ async def event_snapshot(
content={"success": False, "message": "Snapshot not available"}, content={"success": False, "message": "Snapshot not available"},
status_code=404, status_code=404,
) )
# read snapshot from disk jpg_bytes, frame_time = get_event_snapshot_bytes(
with open( event,
os.path.join(CLIPS_DIR, f"{event.camera}-{event.id}.jpg"), "rb" ext="jpg",
) as image_file: timestamp=params.timestamp,
jpg_bytes = image_file.read() bounding_box=params.bbox,
crop=params.crop,
height=params.height,
quality=params.quality,
timestamp_style=request.app.frigate_config.cameras[
event.camera
].timestamp_style,
colormap=request.app.frigate_config.model.colormap,
)
except DoesNotExist: except DoesNotExist:
# see if the object is currently being tracked # see if the object is currently being tracked
try: try:
@ -865,13 +866,11 @@ async def event_thumbnail(
(0, 0, 0), (0, 0, 0),
) )
quality_params = None _, img = cv2.imencode(
if extension in (Extension.jpg, Extension.jpeg): f".{extension.value}",
quality_params = [int(cv2.IMWRITE_JPEG_QUALITY), 70] thumbnail,
elif extension == Extension.webp: get_image_quality_params(extension.value, None),
quality_params = [int(cv2.IMWRITE_WEBP_QUALITY), 60] )
_, img = cv2.imencode(f".{extension.value}", thumbnail, quality_params)
thumbnail_bytes = img.tobytes() thumbnail_bytes = img.tobytes()
return Response( return Response(

View File

@ -158,36 +158,33 @@ class EventProcessor(threading.Thread):
end_time = ( end_time = (
None if event_data["end_time"] is None else event_data["end_time"] None if event_data["end_time"] is None else event_data["end_time"]
) )
snapshot = event_data["snapshot"]
# score of the snapshot # score of the snapshot
score = ( score = None if snapshot is None else snapshot["score"]
None
if event_data["snapshot"] is None
else event_data["snapshot"]["score"]
)
# detection region in the snapshot # detection region in the snapshot
region = ( region = (
None None
if event_data["snapshot"] is None if snapshot is None
else to_relative_box( else to_relative_box(
width, width,
height, height,
event_data["snapshot"]["region"], snapshot["region"],
) )
) )
# bounding box for the snapshot # bounding box for the snapshot
box = ( box = (
None None
if event_data["snapshot"] is None if snapshot is None
else to_relative_box( else to_relative_box(
width, width,
height, height,
event_data["snapshot"]["box"], snapshot["box"],
) )
) )
attributes = ( attributes = (
None None
if event_data["snapshot"] is None if snapshot is None
else [ else [
{ {
"box": to_relative_box( "box": to_relative_box(
@ -198,9 +195,14 @@ class EventProcessor(threading.Thread):
"label": a["label"], "label": a["label"],
"score": a["score"], "score": a["score"],
} }
for a in event_data["snapshot"]["attributes"] for a in snapshot["attributes"]
] ]
) )
snapshot_frame_time = None if snapshot is None else snapshot["frame_time"]
snapshot_area = None if snapshot is None else snapshot["area"]
snapshot_estimated_speed = (
None if snapshot is None else snapshot["current_estimated_speed"]
)
# keep these from being set back to false because the event # keep these from being set back to false because the event
# may have started while recordings/snapshots/alerts/detections were enabled # may have started while recordings/snapshots/alerts/detections were enabled
@ -229,6 +231,9 @@ class EventProcessor(threading.Thread):
"score": score, "score": score,
"top_score": event_data["top_score"], "top_score": event_data["top_score"],
"attributes": attributes, "attributes": attributes,
"snapshot_frame_time": snapshot_frame_time,
"snapshot_area": snapshot_area,
"snapshot_estimated_speed": snapshot_estimated_speed,
"average_estimated_speed": event_data["average_estimated_speed"], "average_estimated_speed": event_data["average_estimated_speed"],
"velocity_angle": event_data["velocity_angle"], "velocity_angle": event_data["velocity_angle"],
"type": "object", "type": "object",

View File

@ -22,9 +22,7 @@ from frigate.review.types import SeverityEnum
from frigate.util.builtin import sanitize_float from frigate.util.builtin import sanitize_float
from frigate.util.image import ( from frigate.util.image import (
area, area,
calculate_region, get_snapshot_bytes,
draw_box_with_label,
draw_timestamp,
is_better_thumbnail, is_better_thumbnail,
) )
from frigate.util.object import box_inside from frigate.util.object import box_inside
@ -495,90 +493,25 @@ class TrackedObject:
) )
return None, None return None, None
if bounding_box: return get_snapshot_bytes(
thickness = 2
color = self.colormap.get(self.obj_data["label"], (255, 255, 255))
# draw the bounding boxes on the frame
box = self.thumbnail_data["box"]
draw_box_with_label(
best_frame, best_frame,
box[0], frame_time,
box[1], ext=ext,
box[2], timestamp=timestamp,
box[3], bounding_box=bounding_box,
self.obj_data["label"], crop=crop,
f"{int(self.thumbnail_data['score'] * 100)}% {int(self.thumbnail_data['area'])}" height=height,
+ ( quality=quality,
f" {self.thumbnail_data['current_estimated_speed']:.1f}" label=self.obj_data["label"],
if self.thumbnail_data["current_estimated_speed"] != 0 box=self.thumbnail_data["box"],
else "" score=self.thumbnail_data["score"],
), area=self.thumbnail_data["area"],
thickness=thickness, attributes=self.thumbnail_data["attributes"],
color=color, color=self.colormap.get(self.obj_data["label"], (255, 255, 255)),
timestamp_style=self.camera_config.timestamp_style,
estimated_speed=self.thumbnail_data["current_estimated_speed"],
) )
# draw any attributes
for attribute in self.thumbnail_data["attributes"]:
box = attribute["box"]
box_area = int((box[2] - box[0]) * (box[3] - box[1]))
draw_box_with_label(
best_frame,
box[0],
box[1],
box[2],
box[3],
attribute["label"],
f"{attribute['score']:.0%} {str(box_area)}",
thickness=thickness,
color=color,
)
if crop:
box = self.thumbnail_data["box"]
box_size = 300
region = calculate_region(
best_frame.shape,
box[0],
box[1],
box[2],
box[3],
box_size,
multiplier=1.1,
)
best_frame = best_frame[region[1] : region[3], region[0] : region[2]]
if height:
width = int(height * best_frame.shape[1] / best_frame.shape[0])
best_frame = cv2.resize(
best_frame, dsize=(width, height), interpolation=cv2.INTER_AREA
)
if timestamp:
colors = self.camera_config.timestamp_style.color
draw_timestamp(
best_frame,
self.thumbnail_data["frame_time"],
self.camera_config.timestamp_style.format,
font_effect=self.camera_config.timestamp_style.effect,
font_thickness=self.camera_config.timestamp_style.thickness,
font_color=(colors.blue, colors.green, colors.red),
position=self.camera_config.timestamp_style.position,
)
quality_params = []
if ext == "jpg":
quality_params = [int(cv2.IMWRITE_JPEG_QUALITY), quality or 70]
elif ext == "webp":
quality_params = [int(cv2.IMWRITE_WEBP_QUALITY), quality or 60]
ret, jpg = cv2.imencode(f".{ext}", best_frame, quality_params)
if ret:
return jpg.tobytes(), frame_time
else:
return None, None
def write_snapshot_to_disk(self) -> None: def write_snapshot_to_disk(self) -> None:
snapshot_config: SnapshotsConfig = self.camera_config.snapshots snapshot_config: SnapshotsConfig = self.camera_config.snapshots
jpg_bytes, _ = self.get_img_bytes( jpg_bytes, _ = self.get_img_bytes(

View File

@ -5,14 +5,16 @@ import fcntl
import logging import logging
import os import os
import time import time
from datetime import datetime
from pathlib import Path from pathlib import Path
from typing import Optional from typing import Any, Optional
import cv2 import cv2
from numpy import ndarray from numpy import ndarray
from frigate.const import CLIPS_DIR, THUMB_DIR from frigate.const import CLIPS_DIR, THUMB_DIR
from frigate.models import Event from frigate.models import Event
from frigate.util.image import get_snapshot_bytes, relative_box_to_absolute
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -35,6 +37,167 @@ def get_event_snapshot(event: Event) -> ndarray:
return cv2.imread(f"{os.path.join(CLIPS_DIR, media_name)}.jpg") return cv2.imread(f"{os.path.join(CLIPS_DIR, media_name)}.jpg")
def _load_event_snapshot_image(event: Event) -> tuple[ndarray | None, bool]:
clean_snapshot_paths = [
os.path.join(CLIPS_DIR, f"{event.camera}-{event.id}-clean.webp"),
os.path.join(CLIPS_DIR, f"{event.camera}-{event.id}-clean.png"),
]
for image_path in clean_snapshot_paths:
if not os.path.exists(image_path):
continue
image = cv2.imread(image_path, cv2.IMREAD_UNCHANGED)
if image is None:
logger.warning("Unable to load clean snapshot from %s", image_path)
continue
if len(image.shape) == 2:
image = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR)
elif image.shape[2] == 4:
image = cv2.cvtColor(image, cv2.COLOR_BGRA2BGR)
return image, True
snapshot_path = os.path.join(CLIPS_DIR, f"{event.camera}-{event.id}.jpg")
if not os.path.exists(snapshot_path):
return None, False
image = cv2.imread(snapshot_path, cv2.IMREAD_COLOR)
if image is None:
logger.warning("Unable to load snapshot from %s", snapshot_path)
return None, False
return image, False
def get_event_snapshot_bytes(
event: Event,
*,
ext: str,
timestamp: bool = False,
bounding_box: bool = False,
crop: bool = False,
height: int | None = None,
quality: int | None = None,
timestamp_style: Any | None = None,
colormap: dict[str, tuple[int, int, int]] | None = None,
) -> tuple[bytes | None, float]:
best_frame, is_clean_snapshot = _load_event_snapshot_image(event)
if best_frame is None:
return None, 0
frame_time = _get_event_snapshot_frame_time(event)
box = relative_box_to_absolute(
best_frame.shape,
event.data.get("box") if event.data else None,
)
if (bounding_box or crop or timestamp) and not is_clean_snapshot:
logger.warning(
"Unable to fully honor snapshot query parameters for completed event %s because the clean snapshot is unavailable.",
event.id,
)
return get_snapshot_bytes(
best_frame,
frame_time,
ext=ext,
timestamp=timestamp and is_clean_snapshot,
bounding_box=bounding_box and is_clean_snapshot,
crop=crop and is_clean_snapshot,
height=height,
quality=quality,
label=event.label,
box=box,
score=_get_event_snapshot_score(event),
area=_get_event_snapshot_area(event),
attributes=_get_event_snapshot_attributes(
best_frame.shape,
event.data.get("attributes") if event.data else None,
),
color=(colormap or {}).get(event.label, (255, 255, 255)),
timestamp_style=timestamp_style,
estimated_speed=_get_event_snapshot_estimated_speed(event),
)
def _as_timestamp(value: Any) -> float:
if isinstance(value, datetime):
return value.timestamp()
return float(value)
def _get_event_snapshot_frame_time(event: Event) -> float:
if event.data:
snapshot_frame_time = event.data.get("snapshot_frame_time")
if snapshot_frame_time is not None:
return _as_timestamp(snapshot_frame_time)
frame_time = event.data.get("frame_time")
if frame_time is not None:
return _as_timestamp(frame_time)
return _as_timestamp(event.start_time)
def _get_event_snapshot_attributes(
frame_shape: tuple[int, ...], attributes: list[dict[str, Any]] | None
) -> list[dict[str, Any]]:
absolute_attributes: list[dict[str, Any]] = []
for attribute in attributes or []:
box = relative_box_to_absolute(frame_shape, attribute.get("box"))
if box is None:
continue
absolute_attributes.append(
{
"box": box,
"label": attribute.get("label", "attribute"),
"score": attribute.get("score", 0),
}
)
return absolute_attributes
def _get_event_snapshot_score(event: Event) -> float:
if event.data:
score = event.data.get("score")
if score is not None:
return score
top_score = event.data.get("top_score")
if top_score is not None:
return top_score
return event.top_score or event.score or 0
def _get_event_snapshot_area(event: Event) -> int | None:
if event.data:
area = event.data.get("snapshot_area")
if area is not None:
return int(area)
return None
def _get_event_snapshot_estimated_speed(event: Event) -> float:
if event.data:
estimated_speed = event.data.get("snapshot_estimated_speed")
if estimated_speed is not None:
return float(estimated_speed)
average_speed = event.data.get("average_estimated_speed")
if average_speed is not None:
return float(average_speed)
return 0
### Deletion ### Deletion

View File

@ -270,6 +270,178 @@ def draw_box_with_label(
) )
def get_image_quality_params(ext: str, quality: Optional[int]) -> list[int]:
if ext in ("jpg", "jpeg"):
return [int(cv2.IMWRITE_JPEG_QUALITY), quality or 70]
if ext == "webp":
return [int(cv2.IMWRITE_WEBP_QUALITY), quality or 60]
return []
def relative_box_to_absolute(
frame_shape: tuple[int, ...], box: list[float] | tuple[float, ...] | None
) -> tuple[int, int, int, int] | None:
if box is None or len(box) != 4:
return None
frame_height = frame_shape[0]
frame_width = frame_shape[1]
x_min = int(box[0] * frame_width)
y_min = int(box[1] * frame_height)
x_max = x_min + int(box[2] * frame_width)
y_max = y_min + int(box[3] * frame_height)
x_min = max(0, min(frame_width - 1, x_min))
y_min = max(0, min(frame_height - 1, y_min))
x_max = max(x_min + 1, min(frame_width - 1, x_max))
y_max = max(y_min + 1, min(frame_height - 1, y_max))
return (x_min, y_min, x_max, y_max)
def _format_snapshot_label(
score: float | None,
area: int | None,
box: tuple[int, int, int, int] | None,
estimated_speed: float = 0,
) -> str:
score_value = score or 0
score_text = (
f"{int(score_value * 100)}%" if score_value <= 1 else f"{int(score_value)}%"
)
if area is None and box is not None:
area = int((box[2] - box[0]) * (box[3] - box[1]))
label = f"{score_text} {int(area or 0)}"
if estimated_speed:
label = f"{label} {estimated_speed:.1f}"
return label
def draw_snapshot_bounding_boxes(
frame: np.ndarray,
label: str,
box: tuple[int, int, int, int] | None,
score: float | None,
area: int | None,
attributes: list[dict[str, Any]] | None,
color: tuple[int, int, int],
estimated_speed: float = 0,
) -> None:
if box is None:
return
draw_box_with_label(
frame,
box[0],
box[1],
box[2],
box[3],
label,
_format_snapshot_label(score, area, box, estimated_speed),
thickness=2,
color=color,
)
for attribute in attributes or []:
attribute_box = attribute.get("box")
if attribute_box is None:
continue
box_area = int(
(attribute_box[2] - attribute_box[0])
* (attribute_box[3] - attribute_box[1])
)
draw_box_with_label(
frame,
attribute_box[0],
attribute_box[1],
attribute_box[2],
attribute_box[3],
attribute.get("label", "attribute"),
f"{attribute.get('score', 0):.0%} {box_area}",
thickness=2,
color=color,
)
def get_snapshot_bytes(
frame: np.ndarray,
frame_time: float,
ext: str,
*,
timestamp: bool = False,
bounding_box: bool = False,
crop: bool = False,
height: int | None = None,
quality: int | None = None,
label: str,
box: tuple[int, int, int, int] | None,
score: float | None,
area: int | None,
attributes: list[dict[str, Any]] | None,
color: tuple[int, int, int],
timestamp_style: Any | None = None,
estimated_speed: float = 0,
) -> tuple[bytes | None, float]:
best_frame = frame.copy()
if bounding_box and box:
draw_snapshot_bounding_boxes(
best_frame,
label,
box,
score,
area,
attributes,
color,
estimated_speed,
)
if crop and box:
region = calculate_region(
best_frame.shape,
box[0],
box[1],
box[2],
box[3],
300,
multiplier=1.1,
)
best_frame = best_frame[region[1] : region[3], region[0] : region[2]]
if height:
width = int(height * best_frame.shape[1] / best_frame.shape[0])
best_frame = cv2.resize(
best_frame, dsize=(width, height), interpolation=cv2.INTER_AREA
)
if timestamp and timestamp_style is not None:
colors = timestamp_style.color
draw_timestamp(
best_frame,
frame_time,
timestamp_style.format,
font_effect=timestamp_style.effect,
font_thickness=timestamp_style.thickness,
font_color=(colors.blue, colors.green, colors.red),
position=timestamp_style.position,
)
ret, img = cv2.imencode(
f".{ext}", best_frame, get_image_quality_params(ext, quality)
)
if ret:
return img.tobytes(), frame_time
return None, frame_time
def grab_cv2_contours(cnts): def grab_cv2_contours(cnts):
# if the length the contours tuple returned by cv2.findContours # if the length the contours tuple returned by cv2.findContours
# is '2' then we are using either OpenCV v2.4, v4-beta, or # is '2' then we are using either OpenCV v2.4, v4-beta, or