From e85c571a1962d2e7e10be83c2520f7a00c382bad Mon Sep 17 00:00:00 2001 From: leccelecce Date: Tue, 10 Mar 2026 11:54:20 +0000 Subject: [PATCH] Enable event snapshot API to honour query params --- frigate/api/media.py | 45 ++++----- frigate/events/maintainer.py | 27 +++-- frigate/track/tracked_object.py | 105 ++++--------------- frigate/util/file.py | 165 +++++++++++++++++++++++++++++- frigate/util/image.py | 172 ++++++++++++++++++++++++++++++++ 5 files changed, 393 insertions(+), 121 deletions(-) diff --git a/frigate/api/media.py b/frigate/api/media.py index 903cf60c0..6645db923 100644 --- a/frigate/api/media.py +++ b/frigate/api/media.py @@ -45,8 +45,8 @@ from frigate.const import ( from frigate.models import Event, Previews, Recordings, Regions, ReviewSegment from frigate.output.preview import get_most_recent_preview_frame from frigate.track.object_processing import TrackedObjectProcessor -from frigate.util.file import get_event_thumbnail_bytes -from frigate.util.image import get_image_from_recording +from frigate.util.file import get_event_snapshot_bytes, get_event_thumbnail_bytes +from frigate.util.image import get_image_from_recording, get_image_quality_params logger = logging.getLogger(__name__) @@ -147,14 +147,7 @@ async def latest_frame( "paths": params.paths, "regions": params.regions, } - quality = params.quality - - if extension == Extension.png: - quality_params = None - elif extension == Extension.webp: - quality_params = [int(cv2.IMWRITE_WEBP_QUALITY), quality] - else: # jpg or jpeg - quality_params = [int(cv2.IMWRITE_JPEG_QUALITY), quality] + quality_params = get_image_quality_params(extension.value, params.quality) if camera_name in request.app.frigate_config.cameras: frame = frame_processor.get_current_frame(camera_name, draw_options) @@ -729,7 +722,7 @@ async def vod_clip( @router.get( "/events/{event_id}/snapshot.jpg", - description="Returns a snapshot image for the specified object id. NOTE: The query params only take affect while the event is in-progress. Once the event has ended the snapshot configuration is used.", + description="Returns a snapshot image for the specified object id.", ) async def event_snapshot( request: Request, @@ -748,11 +741,19 @@ async def event_snapshot( content={"success": False, "message": "Snapshot not available"}, status_code=404, ) - # read snapshot from disk - with open( - os.path.join(CLIPS_DIR, f"{event.camera}-{event.id}.jpg"), "rb" - ) as image_file: - jpg_bytes = image_file.read() + jpg_bytes, frame_time = get_event_snapshot_bytes( + event, + ext="jpg", + timestamp=params.timestamp, + bounding_box=params.bbox, + crop=params.crop, + height=params.height, + quality=params.quality, + timestamp_style=request.app.frigate_config.cameras[ + event.camera + ].timestamp_style, + colormap=request.app.frigate_config.model.colormap, + ) except DoesNotExist: # see if the object is currently being tracked try: @@ -865,13 +866,11 @@ async def event_thumbnail( (0, 0, 0), ) - quality_params = None - if extension in (Extension.jpg, Extension.jpeg): - quality_params = [int(cv2.IMWRITE_JPEG_QUALITY), 70] - elif extension == Extension.webp: - quality_params = [int(cv2.IMWRITE_WEBP_QUALITY), 60] - - _, img = cv2.imencode(f".{extension.value}", thumbnail, quality_params) + _, img = cv2.imencode( + f".{extension.value}", + thumbnail, + get_image_quality_params(extension.value, None), + ) thumbnail_bytes = img.tobytes() return Response( diff --git a/frigate/events/maintainer.py b/frigate/events/maintainer.py index 77f6eee5f..ff08f94db 100644 --- a/frigate/events/maintainer.py +++ b/frigate/events/maintainer.py @@ -158,36 +158,33 @@ class EventProcessor(threading.Thread): end_time = ( None if event_data["end_time"] is None else event_data["end_time"] ) + snapshot = event_data["snapshot"] # score of the snapshot - score = ( - None - if event_data["snapshot"] is None - else event_data["snapshot"]["score"] - ) + score = None if snapshot is None else snapshot["score"] # detection region in the snapshot region = ( None - if event_data["snapshot"] is None + if snapshot is None else to_relative_box( width, height, - event_data["snapshot"]["region"], + snapshot["region"], ) ) # bounding box for the snapshot box = ( None - if event_data["snapshot"] is None + if snapshot is None else to_relative_box( width, height, - event_data["snapshot"]["box"], + snapshot["box"], ) ) attributes = ( None - if event_data["snapshot"] is None + if snapshot is None else [ { "box": to_relative_box( @@ -198,9 +195,14 @@ class EventProcessor(threading.Thread): "label": a["label"], "score": a["score"], } - for a in event_data["snapshot"]["attributes"] + for a in snapshot["attributes"] ] ) + snapshot_frame_time = None if snapshot is None else snapshot["frame_time"] + snapshot_area = None if snapshot is None else snapshot["area"] + snapshot_estimated_speed = ( + None if snapshot is None else snapshot["current_estimated_speed"] + ) # keep these from being set back to false because the event # may have started while recordings/snapshots/alerts/detections were enabled @@ -229,6 +231,9 @@ class EventProcessor(threading.Thread): "score": score, "top_score": event_data["top_score"], "attributes": attributes, + "snapshot_frame_time": snapshot_frame_time, + "snapshot_area": snapshot_area, + "snapshot_estimated_speed": snapshot_estimated_speed, "average_estimated_speed": event_data["average_estimated_speed"], "velocity_angle": event_data["velocity_angle"], "type": "object", diff --git a/frigate/track/tracked_object.py b/frigate/track/tracked_object.py index 4eb600fb8..01abe846d 100644 --- a/frigate/track/tracked_object.py +++ b/frigate/track/tracked_object.py @@ -22,9 +22,7 @@ from frigate.review.types import SeverityEnum from frigate.util.builtin import sanitize_float from frigate.util.image import ( area, - calculate_region, - draw_box_with_label, - draw_timestamp, + get_snapshot_bytes, is_better_thumbnail, ) from frigate.util.object import box_inside @@ -495,89 +493,24 @@ class TrackedObject: ) return None, None - if bounding_box: - thickness = 2 - color = self.colormap.get(self.obj_data["label"], (255, 255, 255)) - - # draw the bounding boxes on the frame - box = self.thumbnail_data["box"] - draw_box_with_label( - best_frame, - box[0], - box[1], - box[2], - box[3], - self.obj_data["label"], - f"{int(self.thumbnail_data['score'] * 100)}% {int(self.thumbnail_data['area'])}" - + ( - f" {self.thumbnail_data['current_estimated_speed']:.1f}" - if self.thumbnail_data["current_estimated_speed"] != 0 - else "" - ), - thickness=thickness, - color=color, - ) - - # draw any attributes - for attribute in self.thumbnail_data["attributes"]: - box = attribute["box"] - box_area = int((box[2] - box[0]) * (box[3] - box[1])) - draw_box_with_label( - best_frame, - box[0], - box[1], - box[2], - box[3], - attribute["label"], - f"{attribute['score']:.0%} {str(box_area)}", - thickness=thickness, - color=color, - ) - - if crop: - box = self.thumbnail_data["box"] - box_size = 300 - region = calculate_region( - best_frame.shape, - box[0], - box[1], - box[2], - box[3], - box_size, - multiplier=1.1, - ) - best_frame = best_frame[region[1] : region[3], region[0] : region[2]] - - if height: - width = int(height * best_frame.shape[1] / best_frame.shape[0]) - best_frame = cv2.resize( - best_frame, dsize=(width, height), interpolation=cv2.INTER_AREA - ) - if timestamp: - colors = self.camera_config.timestamp_style.color - draw_timestamp( - best_frame, - self.thumbnail_data["frame_time"], - self.camera_config.timestamp_style.format, - font_effect=self.camera_config.timestamp_style.effect, - font_thickness=self.camera_config.timestamp_style.thickness, - font_color=(colors.blue, colors.green, colors.red), - position=self.camera_config.timestamp_style.position, - ) - - quality_params = [] - - if ext == "jpg": - quality_params = [int(cv2.IMWRITE_JPEG_QUALITY), quality or 70] - elif ext == "webp": - quality_params = [int(cv2.IMWRITE_WEBP_QUALITY), quality or 60] - - ret, jpg = cv2.imencode(f".{ext}", best_frame, quality_params) - - if ret: - return jpg.tobytes(), frame_time - else: - return None, None + return get_snapshot_bytes( + best_frame, + frame_time, + ext=ext, + timestamp=timestamp, + bounding_box=bounding_box, + crop=crop, + height=height, + quality=quality, + label=self.obj_data["label"], + box=self.thumbnail_data["box"], + score=self.thumbnail_data["score"], + area=self.thumbnail_data["area"], + attributes=self.thumbnail_data["attributes"], + color=self.colormap.get(self.obj_data["label"], (255, 255, 255)), + timestamp_style=self.camera_config.timestamp_style, + estimated_speed=self.thumbnail_data["current_estimated_speed"], + ) def write_snapshot_to_disk(self) -> None: snapshot_config: SnapshotsConfig = self.camera_config.snapshots diff --git a/frigate/util/file.py b/frigate/util/file.py index 22be3e511..ed6c899b1 100644 --- a/frigate/util/file.py +++ b/frigate/util/file.py @@ -5,14 +5,16 @@ import fcntl import logging import os import time +from datetime import datetime from pathlib import Path -from typing import Optional +from typing import Any, Optional import cv2 from numpy import ndarray from frigate.const import CLIPS_DIR, THUMB_DIR from frigate.models import Event +from frigate.util.image import get_snapshot_bytes, relative_box_to_absolute logger = logging.getLogger(__name__) @@ -35,6 +37,167 @@ def get_event_snapshot(event: Event) -> ndarray: return cv2.imread(f"{os.path.join(CLIPS_DIR, media_name)}.jpg") +def _load_event_snapshot_image(event: Event) -> tuple[ndarray | None, bool]: + clean_snapshot_paths = [ + os.path.join(CLIPS_DIR, f"{event.camera}-{event.id}-clean.webp"), + os.path.join(CLIPS_DIR, f"{event.camera}-{event.id}-clean.png"), + ] + + for image_path in clean_snapshot_paths: + if not os.path.exists(image_path): + continue + + image = cv2.imread(image_path, cv2.IMREAD_UNCHANGED) + if image is None: + logger.warning("Unable to load clean snapshot from %s", image_path) + continue + + if len(image.shape) == 2: + image = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR) + elif image.shape[2] == 4: + image = cv2.cvtColor(image, cv2.COLOR_BGRA2BGR) + + return image, True + + snapshot_path = os.path.join(CLIPS_DIR, f"{event.camera}-{event.id}.jpg") + if not os.path.exists(snapshot_path): + return None, False + + image = cv2.imread(snapshot_path, cv2.IMREAD_COLOR) + if image is None: + logger.warning("Unable to load snapshot from %s", snapshot_path) + return None, False + + return image, False + + +def get_event_snapshot_bytes( + event: Event, + *, + ext: str, + timestamp: bool = False, + bounding_box: bool = False, + crop: bool = False, + height: int | None = None, + quality: int | None = None, + timestamp_style: Any | None = None, + colormap: dict[str, tuple[int, int, int]] | None = None, +) -> tuple[bytes | None, float]: + best_frame, is_clean_snapshot = _load_event_snapshot_image(event) + if best_frame is None: + return None, 0 + + frame_time = _get_event_snapshot_frame_time(event) + box = relative_box_to_absolute( + best_frame.shape, + event.data.get("box") if event.data else None, + ) + + if (bounding_box or crop or timestamp) and not is_clean_snapshot: + logger.warning( + "Unable to fully honor snapshot query parameters for completed event %s because the clean snapshot is unavailable.", + event.id, + ) + + return get_snapshot_bytes( + best_frame, + frame_time, + ext=ext, + timestamp=timestamp and is_clean_snapshot, + bounding_box=bounding_box and is_clean_snapshot, + crop=crop and is_clean_snapshot, + height=height, + quality=quality, + label=event.label, + box=box, + score=_get_event_snapshot_score(event), + area=_get_event_snapshot_area(event), + attributes=_get_event_snapshot_attributes( + best_frame.shape, + event.data.get("attributes") if event.data else None, + ), + color=(colormap or {}).get(event.label, (255, 255, 255)), + timestamp_style=timestamp_style, + estimated_speed=_get_event_snapshot_estimated_speed(event), + ) + + +def _as_timestamp(value: Any) -> float: + if isinstance(value, datetime): + return value.timestamp() + + return float(value) + + +def _get_event_snapshot_frame_time(event: Event) -> float: + if event.data: + snapshot_frame_time = event.data.get("snapshot_frame_time") + if snapshot_frame_time is not None: + return _as_timestamp(snapshot_frame_time) + + frame_time = event.data.get("frame_time") + if frame_time is not None: + return _as_timestamp(frame_time) + + return _as_timestamp(event.start_time) + + +def _get_event_snapshot_attributes( + frame_shape: tuple[int, ...], attributes: list[dict[str, Any]] | None +) -> list[dict[str, Any]]: + absolute_attributes: list[dict[str, Any]] = [] + + for attribute in attributes or []: + box = relative_box_to_absolute(frame_shape, attribute.get("box")) + if box is None: + continue + + absolute_attributes.append( + { + "box": box, + "label": attribute.get("label", "attribute"), + "score": attribute.get("score", 0), + } + ) + + return absolute_attributes + + +def _get_event_snapshot_score(event: Event) -> float: + if event.data: + score = event.data.get("score") + if score is not None: + return score + + top_score = event.data.get("top_score") + if top_score is not None: + return top_score + + return event.top_score or event.score or 0 + + +def _get_event_snapshot_area(event: Event) -> int | None: + if event.data: + area = event.data.get("snapshot_area") + if area is not None: + return int(area) + + return None + + +def _get_event_snapshot_estimated_speed(event: Event) -> float: + if event.data: + estimated_speed = event.data.get("snapshot_estimated_speed") + if estimated_speed is not None: + return float(estimated_speed) + + average_speed = event.data.get("average_estimated_speed") + if average_speed is not None: + return float(average_speed) + + return 0 + + ### Deletion diff --git a/frigate/util/image.py b/frigate/util/image.py index ea9fb0a0a..d4e1edb53 100644 --- a/frigate/util/image.py +++ b/frigate/util/image.py @@ -270,6 +270,178 @@ def draw_box_with_label( ) +def get_image_quality_params(ext: str, quality: Optional[int]) -> list[int]: + if ext in ("jpg", "jpeg"): + return [int(cv2.IMWRITE_JPEG_QUALITY), quality or 70] + + if ext == "webp": + return [int(cv2.IMWRITE_WEBP_QUALITY), quality or 60] + + return [] + + +def relative_box_to_absolute( + frame_shape: tuple[int, ...], box: list[float] | tuple[float, ...] | None +) -> tuple[int, int, int, int] | None: + if box is None or len(box) != 4: + return None + + frame_height = frame_shape[0] + frame_width = frame_shape[1] + x_min = int(box[0] * frame_width) + y_min = int(box[1] * frame_height) + x_max = x_min + int(box[2] * frame_width) + y_max = y_min + int(box[3] * frame_height) + + x_min = max(0, min(frame_width - 1, x_min)) + y_min = max(0, min(frame_height - 1, y_min)) + x_max = max(x_min + 1, min(frame_width - 1, x_max)) + y_max = max(y_min + 1, min(frame_height - 1, y_max)) + + return (x_min, y_min, x_max, y_max) + + +def _format_snapshot_label( + score: float | None, + area: int | None, + box: tuple[int, int, int, int] | None, + estimated_speed: float = 0, +) -> str: + score_value = score or 0 + score_text = ( + f"{int(score_value * 100)}%" if score_value <= 1 else f"{int(score_value)}%" + ) + + if area is None and box is not None: + area = int((box[2] - box[0]) * (box[3] - box[1])) + + label = f"{score_text} {int(area or 0)}" + if estimated_speed: + label = f"{label} {estimated_speed:.1f}" + + return label + + +def draw_snapshot_bounding_boxes( + frame: np.ndarray, + label: str, + box: tuple[int, int, int, int] | None, + score: float | None, + area: int | None, + attributes: list[dict[str, Any]] | None, + color: tuple[int, int, int], + estimated_speed: float = 0, +) -> None: + if box is None: + return + + draw_box_with_label( + frame, + box[0], + box[1], + box[2], + box[3], + label, + _format_snapshot_label(score, area, box, estimated_speed), + thickness=2, + color=color, + ) + + for attribute in attributes or []: + attribute_box = attribute.get("box") + if attribute_box is None: + continue + + box_area = int( + (attribute_box[2] - attribute_box[0]) + * (attribute_box[3] - attribute_box[1]) + ) + draw_box_with_label( + frame, + attribute_box[0], + attribute_box[1], + attribute_box[2], + attribute_box[3], + attribute.get("label", "attribute"), + f"{attribute.get('score', 0):.0%} {box_area}", + thickness=2, + color=color, + ) + + +def get_snapshot_bytes( + frame: np.ndarray, + frame_time: float, + ext: str, + *, + timestamp: bool = False, + bounding_box: bool = False, + crop: bool = False, + height: int | None = None, + quality: int | None = None, + label: str, + box: tuple[int, int, int, int] | None, + score: float | None, + area: int | None, + attributes: list[dict[str, Any]] | None, + color: tuple[int, int, int], + timestamp_style: Any | None = None, + estimated_speed: float = 0, +) -> tuple[bytes | None, float]: + best_frame = frame.copy() + + if bounding_box and box: + draw_snapshot_bounding_boxes( + best_frame, + label, + box, + score, + area, + attributes, + color, + estimated_speed, + ) + + if crop and box: + region = calculate_region( + best_frame.shape, + box[0], + box[1], + box[2], + box[3], + 300, + multiplier=1.1, + ) + best_frame = best_frame[region[1] : region[3], region[0] : region[2]] + + if height: + width = int(height * best_frame.shape[1] / best_frame.shape[0]) + best_frame = cv2.resize( + best_frame, dsize=(width, height), interpolation=cv2.INTER_AREA + ) + + if timestamp and timestamp_style is not None: + colors = timestamp_style.color + draw_timestamp( + best_frame, + frame_time, + timestamp_style.format, + font_effect=timestamp_style.effect, + font_thickness=timestamp_style.thickness, + font_color=(colors.blue, colors.green, colors.red), + position=timestamp_style.position, + ) + + ret, img = cv2.imencode( + f".{ext}", best_frame, get_image_quality_params(ext, quality) + ) + + if ret: + return img.tobytes(), frame_time + + return None, frame_time + + def grab_cv2_contours(cnts): # if the length the contours tuple returned by cv2.findContours # is '2' then we are using either OpenCV v2.4, v4-beta, or