mirror of
https://github.com/blakeblackshear/frigate.git
synced 2026-03-13 03:47:34 +03:00
Merge f537c13bbc into 192aba901a
This commit is contained in:
commit
621f17e5b1
@ -616,13 +616,11 @@ record:
|
||||
# never stored, so setting the mode to "all" here won't bring them back.
|
||||
mode: motion
|
||||
|
||||
# Optional: Configuration for the jpg snapshots written to the clips directory for each tracked object
|
||||
# Optional: Configuration for the snapshots written to the clips directory for each tracked object
|
||||
# NOTE: Can be overridden at the camera level
|
||||
snapshots:
|
||||
# Optional: Enable writing jpg snapshot to /media/frigate/clips (default: shown below)
|
||||
# Optional: Enable writing snapshot images to /media/frigate/clips (default: shown below)
|
||||
enabled: False
|
||||
# Optional: save a clean copy of the snapshot image (default: shown below)
|
||||
clean_copy: True
|
||||
# Optional: print a timestamp on the snapshots (default: shown below)
|
||||
timestamp: False
|
||||
# Optional: draw bounding box on the snapshots (default: shown below)
|
||||
@ -640,8 +638,8 @@ snapshots:
|
||||
# Optional: Per object retention days
|
||||
objects:
|
||||
person: 15
|
||||
# Optional: quality of the encoded jpeg, 0-100 (default: shown below)
|
||||
quality: 70
|
||||
# Optional: quality of the encoded snapshot image, 0-100 (default: shown below)
|
||||
quality: 60
|
||||
|
||||
# Optional: Configuration for semantic search capability
|
||||
semantic_search:
|
||||
|
||||
@ -3,7 +3,7 @@ id: snapshots
|
||||
title: Snapshots
|
||||
---
|
||||
|
||||
Frigate can save a snapshot image to `/media/frigate/clips` for each object that is detected named as `<camera>-<id>.jpg`. They are also accessible [via the api](../integrations/api/event-snapshot-events-event-id-snapshot-jpg-get.api.mdx)
|
||||
Frigate can save a snapshot image to `/media/frigate/clips` for each object that is detected named as `<camera>-<id>-clean.webp`. They are also accessible [via the api](../integrations/api/event-snapshot-events-event-id-snapshot-jpg-get.api.mdx)
|
||||
|
||||
Snapshots are accessible in the UI in the Explore pane. This allows for quick submission to the Frigate+ service.
|
||||
|
||||
@ -13,21 +13,19 @@ Snapshots sent via MQTT are configured in the [config file](/configuration) unde
|
||||
|
||||
## Frame Selection
|
||||
|
||||
Frigate does not save every frame — it picks a single "best" frame for each tracked object and uses it for both the snapshot and clean copy. As the object is tracked across frames, Frigate continuously evaluates whether the current frame is better than the previous best based on detection confidence, object size, and the presence of key attributes like faces or license plates. Frames where the object touches the edge of the frame are deprioritized. The snapshot is written to disk once tracking ends using whichever frame was determined to be the best.
|
||||
Frigate does not save every frame. It picks a single "best" frame for each tracked object based on detection confidence, object size, and the presence of key attributes like faces or license plates. Frames where the object touches the edge of the frame are deprioritized. That best frame is written to disk once tracking ends.
|
||||
|
||||
MQTT snapshots are published more frequently — each time a better thumbnail frame is found during tracking, or when the current best image is older than `best_image_timeout` (default: 60s). These use their own annotation settings configured under `cameras -> your_camera -> mqtt`.
|
||||
|
||||
## Clean Copy
|
||||
## Rendering
|
||||
|
||||
Frigate can produce up to two snapshot files per event, each used in different places:
|
||||
Frigate stores a single clean snapshot on disk:
|
||||
|
||||
| Version | File | Annotations | Used by |
|
||||
| --- | --- | --- | --- |
|
||||
| **Regular snapshot** | `<camera>-<id>.jpg` | Respects your `timestamp`, `bounding_box`, `crop`, and `height` settings | API (`/api/events/<id>/snapshot.jpg`), MQTT (`<camera>/<label>/snapshot`), Explore pane in the UI |
|
||||
| **Clean copy** | `<camera>-<id>-clean.webp` | Always unannotated — no bounding box, no timestamp, no crop, full resolution | API (`/api/events/<id>/snapshot-clean.webp`), [Frigate+](/plus/first_model) submissions, "Download Clean Snapshot" in the UI |
|
||||
| API / Use | Result |
|
||||
| --- | --- |
|
||||
| Stored file | `<camera>-<id>-clean.webp`, always unannotated |
|
||||
| `/api/events/<id>/snapshot.jpg` | Starts from the camera's `snapshots` defaults, then applies any query param overrides at request time |
|
||||
| `/api/events/<id>/snapshot-clean.webp` | Returns the same stored snapshot without annotations |
|
||||
| [Frigate+](/plus/first_model) submission | Uses the same stored clean snapshot |
|
||||
|
||||
MQTT snapshots are configured separately under `cameras -> your_camera -> mqtt` and are unrelated to the clean copy.
|
||||
|
||||
The clean copy is required for submitting events to [Frigate+](/plus/first_model) — if you plan to use Frigate+, keep `clean_copy` enabled regardless of your other snapshot settings.
|
||||
|
||||
If you are not using Frigate+ and `timestamp`, `bounding_box`, and `crop` are all disabled, the regular snapshot is already effectively clean, so `clean_copy` provides no benefit and only uses additional disk space. You can safely set `clean_copy: False` in this case.
|
||||
MQTT snapshots are configured separately under `cameras -> your_camera -> mqtt` and are unrelated to the stored event snapshot.
|
||||
|
||||
@ -25,10 +25,9 @@ Yes. Subscriptions to Frigate+ provide access to the infrastructure used to trai
|
||||
|
||||
### Why can't I submit images to Frigate+?
|
||||
|
||||
If you've configured your API key and the Frigate+ Settings page in the UI shows that the key is active, you need to ensure that you've enabled both snapshots and `clean_copy` snapshots for the cameras you'd like to submit images for. Note that `clean_copy` is enabled by default when snapshots are enabled.
|
||||
If you've configured your API key and the Frigate+ Settings page in the UI shows that the key is active, you need to ensure that snapshots are enabled for the cameras you'd like to submit images for.
|
||||
|
||||
```yaml
|
||||
snapshots:
|
||||
enabled: true
|
||||
clean_copy: true
|
||||
```
|
||||
|
||||
6
docs/static/frigate-api.yaml
vendored
6
docs/static/frigate-api.yaml
vendored
@ -4120,10 +4120,7 @@ paths:
|
||||
tags:
|
||||
- Media
|
||||
summary: Event Snapshot
|
||||
description: >-
|
||||
Returns a snapshot image for the specified object id. NOTE: The query
|
||||
params only take affect while the event is in-progress. Once the event
|
||||
has ended the snapshot configuration is used.
|
||||
description: Returns a snapshot image for the specified object id.
|
||||
operationId: event_snapshot_events__event_id__snapshot_jpg_get
|
||||
parameters:
|
||||
- name: event_id
|
||||
@ -4180,7 +4177,6 @@ paths:
|
||||
anyOf:
|
||||
- type: integer
|
||||
- type: "null"
|
||||
default: 70
|
||||
title: Quality
|
||||
responses:
|
||||
"200":
|
||||
|
||||
@ -35,7 +35,7 @@ class MediaEventsSnapshotQueryParams(BaseModel):
|
||||
bbox: Optional[int] = None
|
||||
crop: Optional[int] = None
|
||||
height: Optional[int] = None
|
||||
quality: Optional[int] = 70
|
||||
quality: Optional[int] = None
|
||||
|
||||
|
||||
class MediaMjpegFeedQueryParams(BaseModel):
|
||||
|
||||
@ -12,7 +12,6 @@ from pathlib import Path
|
||||
from typing import List
|
||||
from urllib.parse import unquote
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
from fastapi import APIRouter, Request
|
||||
from fastapi.params import Depends
|
||||
@ -61,7 +60,7 @@ from frigate.const import CLIPS_DIR, TRIGGER_DIR
|
||||
from frigate.embeddings import EmbeddingsContext
|
||||
from frigate.models import Event, ReviewSegment, Timeline, Trigger
|
||||
from frigate.track.object_processing import TrackedObject
|
||||
from frigate.util.file import get_event_thumbnail_bytes
|
||||
from frigate.util.file import get_event_thumbnail_bytes, load_event_snapshot_image
|
||||
from frigate.util.time import get_dst_transitions, get_tz_modifiers
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@ -1081,30 +1080,8 @@ async def send_to_plus(request: Request, event_id: str, body: SubmitPlusBody = N
|
||||
content=({"success": False, "message": message}), status_code=400
|
||||
)
|
||||
|
||||
# load clean.webp or clean.png (legacy)
|
||||
try:
|
||||
filename_webp = f"{event.camera}-{event.id}-clean.webp"
|
||||
filename_png = f"{event.camera}-{event.id}-clean.png"
|
||||
|
||||
image_path = None
|
||||
if os.path.exists(os.path.join(CLIPS_DIR, filename_webp)):
|
||||
image_path = os.path.join(CLIPS_DIR, filename_webp)
|
||||
elif os.path.exists(os.path.join(CLIPS_DIR, filename_png)):
|
||||
image_path = os.path.join(CLIPS_DIR, filename_png)
|
||||
|
||||
if image_path is None:
|
||||
logger.error(f"Unable to find clean snapshot for event: {event.id}")
|
||||
return JSONResponse(
|
||||
content=(
|
||||
{
|
||||
"success": False,
|
||||
"message": "Unable to find clean snapshot for event",
|
||||
}
|
||||
),
|
||||
status_code=400,
|
||||
)
|
||||
|
||||
image = cv2.imread(image_path)
|
||||
image, is_clean_snapshot = load_event_snapshot_image(event, clean_only=True)
|
||||
except Exception:
|
||||
logger.error(f"Unable to load clean snapshot for event: {event.id}")
|
||||
return JSONResponse(
|
||||
@ -1114,11 +1091,14 @@ async def send_to_plus(request: Request, event_id: str, body: SubmitPlusBody = N
|
||||
status_code=400,
|
||||
)
|
||||
|
||||
if image is None or image.size == 0:
|
||||
logger.error(f"Unable to load clean snapshot for event: {event.id}")
|
||||
if not is_clean_snapshot or image is None or image.size == 0:
|
||||
logger.error(f"Unable to find clean snapshot for event: {event.id}")
|
||||
return JSONResponse(
|
||||
content=(
|
||||
{"success": False, "message": "Unable to load clean snapshot for event"}
|
||||
{
|
||||
"success": False,
|
||||
"message": "Unable to find clean snapshot for event",
|
||||
}
|
||||
),
|
||||
status_code=400,
|
||||
)
|
||||
|
||||
@ -37,7 +37,6 @@ from frigate.camera.state import CameraState
|
||||
from frigate.config import FrigateConfig
|
||||
from frigate.const import (
|
||||
CACHE_DIR,
|
||||
CLIPS_DIR,
|
||||
INSTALL_DIR,
|
||||
MAX_SEGMENT_DURATION,
|
||||
PREVIEW_FRAME_TYPE,
|
||||
@ -45,8 +44,13 @@ from frigate.const import (
|
||||
from frigate.models import Event, Previews, Recordings, Regions, ReviewSegment
|
||||
from frigate.output.preview import get_most_recent_preview_frame
|
||||
from frigate.track.object_processing import TrackedObjectProcessor
|
||||
from frigate.util.file import get_event_thumbnail_bytes
|
||||
from frigate.util.image import get_image_from_recording
|
||||
from frigate.util.file import (
|
||||
get_event_snapshot_bytes,
|
||||
get_event_snapshot_path,
|
||||
get_event_thumbnail_bytes,
|
||||
load_event_snapshot_image,
|
||||
)
|
||||
from frigate.util.image import get_image_from_recording, get_image_quality_params
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -110,6 +114,24 @@ def imagestream(
|
||||
)
|
||||
|
||||
|
||||
def _resolve_snapshot_settings(
|
||||
snapshot_config: Any, params: MediaEventsSnapshotQueryParams
|
||||
) -> dict[str, Any]:
|
||||
return {
|
||||
"timestamp": snapshot_config.timestamp
|
||||
if params.timestamp is None
|
||||
else bool(params.timestamp),
|
||||
"bounding_box": snapshot_config.bounding_box
|
||||
if params.bbox is None
|
||||
else bool(params.bbox),
|
||||
"crop": snapshot_config.crop if params.crop is None else bool(params.crop),
|
||||
"height": snapshot_config.height if params.height is None else params.height,
|
||||
"quality": snapshot_config.quality
|
||||
if params.quality is None
|
||||
else params.quality,
|
||||
}
|
||||
|
||||
|
||||
@router.get("/{camera_name}/ptz/info", dependencies=[Depends(require_camera_access)])
|
||||
async def camera_ptz_info(request: Request, camera_name: str):
|
||||
if camera_name in request.app.frigate_config.cameras:
|
||||
@ -147,14 +169,7 @@ async def latest_frame(
|
||||
"paths": params.paths,
|
||||
"regions": params.regions,
|
||||
}
|
||||
quality = params.quality
|
||||
|
||||
if extension == Extension.png:
|
||||
quality_params = None
|
||||
elif extension == Extension.webp:
|
||||
quality_params = [int(cv2.IMWRITE_WEBP_QUALITY), quality]
|
||||
else: # jpg or jpeg
|
||||
quality_params = [int(cv2.IMWRITE_JPEG_QUALITY), quality]
|
||||
quality_params = get_image_quality_params(extension.value, params.quality)
|
||||
|
||||
if camera_name in request.app.frigate_config.cameras:
|
||||
frame = frame_processor.get_current_frame(camera_name, draw_options)
|
||||
@ -729,7 +744,7 @@ async def vod_clip(
|
||||
|
||||
@router.get(
|
||||
"/events/{event_id}/snapshot.jpg",
|
||||
description="Returns a snapshot image for the specified object id. NOTE: The query params only take affect while the event is in-progress. Once the event has ended the snapshot configuration is used.",
|
||||
description="Returns a snapshot image for the specified object id.",
|
||||
)
|
||||
async def event_snapshot(
|
||||
request: Request,
|
||||
@ -748,11 +763,22 @@ async def event_snapshot(
|
||||
content={"success": False, "message": "Snapshot not available"},
|
||||
status_code=404,
|
||||
)
|
||||
# read snapshot from disk
|
||||
with open(
|
||||
os.path.join(CLIPS_DIR, f"{event.camera}-{event.id}.jpg"), "rb"
|
||||
) as image_file:
|
||||
jpg_bytes = image_file.read()
|
||||
snapshot_settings = _resolve_snapshot_settings(
|
||||
request.app.frigate_config.cameras[event.camera].snapshots, params
|
||||
)
|
||||
jpg_bytes, frame_time = get_event_snapshot_bytes(
|
||||
event,
|
||||
ext="jpg",
|
||||
timestamp=snapshot_settings["timestamp"],
|
||||
bounding_box=snapshot_settings["bounding_box"],
|
||||
crop=snapshot_settings["crop"],
|
||||
height=snapshot_settings["height"],
|
||||
quality=snapshot_settings["quality"],
|
||||
timestamp_style=request.app.frigate_config.cameras[
|
||||
event.camera
|
||||
].timestamp_style,
|
||||
colormap=request.app.frigate_config.model.colormap,
|
||||
)
|
||||
except DoesNotExist:
|
||||
# see if the object is currently being tracked
|
||||
try:
|
||||
@ -763,13 +789,16 @@ async def event_snapshot(
|
||||
if event_id in camera_state.tracked_objects:
|
||||
tracked_obj = camera_state.tracked_objects.get(event_id)
|
||||
if tracked_obj is not None:
|
||||
snapshot_settings = _resolve_snapshot_settings(
|
||||
camera_state.camera_config.snapshots, params
|
||||
)
|
||||
jpg_bytes, frame_time = tracked_obj.get_img_bytes(
|
||||
ext="jpg",
|
||||
timestamp=params.timestamp,
|
||||
bounding_box=params.bbox,
|
||||
crop=params.crop,
|
||||
height=params.height,
|
||||
quality=params.quality,
|
||||
timestamp=snapshot_settings["timestamp"],
|
||||
bounding_box=snapshot_settings["bounding_box"],
|
||||
crop=snapshot_settings["crop"],
|
||||
height=snapshot_settings["height"],
|
||||
quality=snapshot_settings["quality"],
|
||||
)
|
||||
await require_camera_access(camera_state.name, request=request)
|
||||
except Exception:
|
||||
@ -865,13 +894,11 @@ async def event_thumbnail(
|
||||
(0, 0, 0),
|
||||
)
|
||||
|
||||
quality_params = None
|
||||
if extension in (Extension.jpg, Extension.jpeg):
|
||||
quality_params = [int(cv2.IMWRITE_JPEG_QUALITY), 70]
|
||||
elif extension == Extension.webp:
|
||||
quality_params = [int(cv2.IMWRITE_WEBP_QUALITY), 60]
|
||||
|
||||
_, img = cv2.imencode(f".{extension.value}", thumbnail, quality_params)
|
||||
_, img = cv2.imencode(
|
||||
f".{extension.value}",
|
||||
thumbnail,
|
||||
get_image_quality_params(extension.value, None),
|
||||
)
|
||||
thumbnail_bytes = img.tobytes()
|
||||
|
||||
return Response(
|
||||
@ -1029,14 +1056,16 @@ def clear_region_grid(request: Request, camera_name: str):
|
||||
)
|
||||
def event_snapshot_clean(request: Request, event_id: str, download: bool = False):
|
||||
webp_bytes = None
|
||||
event_complete = False
|
||||
try:
|
||||
event = Event.get(Event.id == event_id)
|
||||
event_complete = event.end_time is not None
|
||||
snapshot_config = request.app.frigate_config.cameras[event.camera].snapshots
|
||||
if not (snapshot_config.enabled and event.has_snapshot):
|
||||
return JSONResponse(
|
||||
content={
|
||||
"success": False,
|
||||
"message": "Snapshots and clean_copy must be enabled in the config",
|
||||
"message": "Snapshots must be enabled in the config",
|
||||
},
|
||||
status_code=404,
|
||||
)
|
||||
@ -1068,54 +1097,10 @@ def event_snapshot_clean(request: Request, event_id: str, download: bool = False
|
||||
)
|
||||
if webp_bytes is None:
|
||||
try:
|
||||
# webp
|
||||
clean_snapshot_path_webp = os.path.join(
|
||||
CLIPS_DIR, f"{event.camera}-{event.id}-clean.webp"
|
||||
image_path, is_clean_snapshot = get_event_snapshot_path(
|
||||
event, clean_only=True
|
||||
)
|
||||
# png (legacy)
|
||||
clean_snapshot_path_png = os.path.join(
|
||||
CLIPS_DIR, f"{event.camera}-{event.id}-clean.png"
|
||||
)
|
||||
|
||||
if os.path.exists(clean_snapshot_path_webp):
|
||||
with open(clean_snapshot_path_webp, "rb") as image_file:
|
||||
webp_bytes = image_file.read()
|
||||
elif os.path.exists(clean_snapshot_path_png):
|
||||
# convert png to webp and save for future use
|
||||
png_image = cv2.imread(clean_snapshot_path_png, cv2.IMREAD_UNCHANGED)
|
||||
if png_image is None:
|
||||
return JSONResponse(
|
||||
content={
|
||||
"success": False,
|
||||
"message": "Invalid png snapshot",
|
||||
},
|
||||
status_code=400,
|
||||
)
|
||||
|
||||
ret, webp_data = cv2.imencode(
|
||||
".webp", png_image, [int(cv2.IMWRITE_WEBP_QUALITY), 60]
|
||||
)
|
||||
if not ret:
|
||||
return JSONResponse(
|
||||
content={
|
||||
"success": False,
|
||||
"message": "Unable to convert png to webp",
|
||||
},
|
||||
status_code=400,
|
||||
)
|
||||
|
||||
webp_bytes = webp_data.tobytes()
|
||||
|
||||
# save the converted webp for future requests
|
||||
try:
|
||||
with open(clean_snapshot_path_webp, "wb") as f:
|
||||
f.write(webp_bytes)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Failed to save converted webp for event {event.id}: {e}"
|
||||
)
|
||||
# continue since we now have the data to return
|
||||
else:
|
||||
if not is_clean_snapshot or image_path is None:
|
||||
return JSONResponse(
|
||||
content={
|
||||
"success": False,
|
||||
@ -1123,6 +1108,34 @@ def event_snapshot_clean(request: Request, event_id: str, download: bool = False
|
||||
},
|
||||
status_code=404,
|
||||
)
|
||||
|
||||
if image_path.endswith(".webp"):
|
||||
with open(image_path, "rb") as image_file:
|
||||
webp_bytes = image_file.read()
|
||||
else:
|
||||
image = load_event_snapshot_image(event, clean_only=True)[0]
|
||||
if image is None:
|
||||
return JSONResponse(
|
||||
content={
|
||||
"success": False,
|
||||
"message": "Unable to load clean snapshot for event",
|
||||
},
|
||||
status_code=400,
|
||||
)
|
||||
|
||||
ret, webp_data = cv2.imencode(
|
||||
".webp", image, get_image_quality_params("webp", None)
|
||||
)
|
||||
if not ret:
|
||||
return JSONResponse(
|
||||
content={
|
||||
"success": False,
|
||||
"message": "Unable to convert snapshot to webp",
|
||||
},
|
||||
status_code=400,
|
||||
)
|
||||
|
||||
webp_bytes = webp_data.tobytes()
|
||||
except Exception:
|
||||
logger.error(f"Unable to load clean snapshot for event: {event.id}")
|
||||
return JSONResponse(
|
||||
@ -1135,7 +1148,7 @@ def event_snapshot_clean(request: Request, event_id: str, download: bool = False
|
||||
|
||||
headers = {
|
||||
"Content-Type": "image/webp",
|
||||
"Cache-Control": "private, max-age=31536000",
|
||||
"Cache-Control": "private, max-age=31536000" if event_complete else "no-cache",
|
||||
}
|
||||
|
||||
if download:
|
||||
|
||||
@ -532,21 +532,19 @@ class CameraState:
|
||||
) -> None:
|
||||
img_frame = frame if frame is not None else self.get_current_frame()
|
||||
|
||||
# write clean snapshot if enabled
|
||||
if self.camera_config.snapshots.clean_copy:
|
||||
ret, webp = cv2.imencode(
|
||||
".webp", img_frame, [int(cv2.IMWRITE_WEBP_QUALITY), 80]
|
||||
)
|
||||
ret, webp = cv2.imencode(
|
||||
".webp", img_frame, [int(cv2.IMWRITE_WEBP_QUALITY), 80]
|
||||
)
|
||||
|
||||
if ret:
|
||||
with open(
|
||||
os.path.join(
|
||||
CLIPS_DIR,
|
||||
f"{self.camera_config.name}-{event_id}-clean.webp",
|
||||
),
|
||||
"wb",
|
||||
) as p:
|
||||
p.write(webp.tobytes())
|
||||
if ret:
|
||||
with open(
|
||||
os.path.join(
|
||||
CLIPS_DIR,
|
||||
f"{self.camera_config.name}-{event_id}-clean.webp",
|
||||
),
|
||||
"wb",
|
||||
) as p:
|
||||
p.write(webp.tobytes())
|
||||
|
||||
# write jpg snapshot with optional annotations
|
||||
if draw.get("boxes") and isinstance(draw.get("boxes"), list):
|
||||
|
||||
@ -32,11 +32,6 @@ class SnapshotsConfig(FrigateBaseModel):
|
||||
title="Snapshots enabled",
|
||||
description="Enable or disable saving snapshots for all cameras; can be overridden per-camera.",
|
||||
)
|
||||
clean_copy: bool = Field(
|
||||
default=True,
|
||||
title="Save clean copy",
|
||||
description="Save an unannotated clean copy of snapshots in addition to annotated ones.",
|
||||
)
|
||||
timestamp: bool = Field(
|
||||
default=False,
|
||||
title="Timestamp overlay",
|
||||
@ -68,9 +63,9 @@ class SnapshotsConfig(FrigateBaseModel):
|
||||
description="Retention settings for saved snapshots including default days and per-object overrides.",
|
||||
)
|
||||
quality: int = Field(
|
||||
default=70,
|
||||
title="JPEG quality",
|
||||
description="JPEG encode quality for saved snapshots (0-100).",
|
||||
default=60,
|
||||
title="Snapshot quality",
|
||||
description="Encode quality for saved snapshots (0-100).",
|
||||
ge=0,
|
||||
le=100,
|
||||
)
|
||||
|
||||
@ -933,11 +933,6 @@ class FrigateConfig(FrigateBaseModel):
|
||||
f"Camera {camera.name} has audio transcription enabled, but audio detection is not enabled for this camera. Audio detection must be enabled for cameras with audio transcription when it is disabled globally."
|
||||
)
|
||||
|
||||
if self.plus_api and not self.snapshots.clean_copy:
|
||||
logger.warning(
|
||||
"Frigate+ is configured but clean snapshots are not enabled, submissions to Frigate+ will not be possible./"
|
||||
)
|
||||
|
||||
# Validate auth roles against cameras
|
||||
camera_names = set(self.cameras.keys())
|
||||
|
||||
|
||||
@ -20,7 +20,7 @@ from frigate.genai import GenAIClient
|
||||
from frigate.models import Event
|
||||
from frigate.types import TrackedObjectUpdateTypesEnum
|
||||
from frigate.util.builtin import EventsPerSecond, InferenceSpeed
|
||||
from frigate.util.file import get_event_thumbnail_bytes
|
||||
from frigate.util.file import get_event_thumbnail_bytes, load_event_snapshot_image
|
||||
from frigate.util.image import create_thumbnail, ensure_jpeg_bytes
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@ -224,39 +224,28 @@ class ObjectDescriptionProcessor(PostProcessorApi):
|
||||
def _read_and_crop_snapshot(self, event: Event) -> bytes | None:
|
||||
"""Read, decode, and crop the snapshot image."""
|
||||
|
||||
snapshot_file = os.path.join(CLIPS_DIR, f"{event.camera}-{event.id}.jpg")
|
||||
|
||||
if not os.path.isfile(snapshot_file):
|
||||
logger.error(
|
||||
f"Cannot load snapshot for {event.id}, file not found: {snapshot_file}"
|
||||
)
|
||||
return None
|
||||
|
||||
try:
|
||||
with open(snapshot_file, "rb") as image_file:
|
||||
snapshot_image = image_file.read()
|
||||
img, _ = load_event_snapshot_image(event)
|
||||
if img is None:
|
||||
logger.error(f"Cannot load snapshot for {event.id}, file not found")
|
||||
return None
|
||||
|
||||
img = cv2.imdecode(
|
||||
np.frombuffer(snapshot_image, dtype=np.int8),
|
||||
cv2.IMREAD_COLOR,
|
||||
)
|
||||
# Crop snapshot based on region
|
||||
# provide full image if region doesn't exist (manual events)
|
||||
height, width = img.shape[:2]
|
||||
x1_rel, y1_rel, width_rel, height_rel = event.data.get(
|
||||
"region", [0, 0, 1, 1]
|
||||
)
|
||||
x1, y1 = int(x1_rel * width), int(y1_rel * height)
|
||||
|
||||
# Crop snapshot based on region
|
||||
# provide full image if region doesn't exist (manual events)
|
||||
height, width = img.shape[:2]
|
||||
x1_rel, y1_rel, width_rel, height_rel = event.data.get(
|
||||
"region", [0, 0, 1, 1]
|
||||
)
|
||||
x1, y1 = int(x1_rel * width), int(y1_rel * height)
|
||||
cropped_image = img[
|
||||
y1 : y1 + int(height_rel * height),
|
||||
x1 : x1 + int(width_rel * width),
|
||||
]
|
||||
|
||||
cropped_image = img[
|
||||
y1 : y1 + int(height_rel * height),
|
||||
x1 : x1 + int(width_rel * width),
|
||||
]
|
||||
_, buffer = cv2.imencode(".jpg", cropped_image)
|
||||
|
||||
_, buffer = cv2.imencode(".jpg", cropped_image)
|
||||
|
||||
return buffer.tobytes()
|
||||
return buffer.tobytes()
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
@ -158,36 +158,33 @@ class EventProcessor(threading.Thread):
|
||||
end_time = (
|
||||
None if event_data["end_time"] is None else event_data["end_time"]
|
||||
)
|
||||
snapshot = event_data["snapshot"]
|
||||
# score of the snapshot
|
||||
score = (
|
||||
None
|
||||
if event_data["snapshot"] is None
|
||||
else event_data["snapshot"]["score"]
|
||||
)
|
||||
score = None if snapshot is None else snapshot["score"]
|
||||
# detection region in the snapshot
|
||||
region = (
|
||||
None
|
||||
if event_data["snapshot"] is None
|
||||
if snapshot is None
|
||||
else to_relative_box(
|
||||
width,
|
||||
height,
|
||||
event_data["snapshot"]["region"],
|
||||
snapshot["region"],
|
||||
)
|
||||
)
|
||||
# bounding box for the snapshot
|
||||
box = (
|
||||
None
|
||||
if event_data["snapshot"] is None
|
||||
if snapshot is None
|
||||
else to_relative_box(
|
||||
width,
|
||||
height,
|
||||
event_data["snapshot"]["box"],
|
||||
snapshot["box"],
|
||||
)
|
||||
)
|
||||
|
||||
attributes = (
|
||||
None
|
||||
if event_data["snapshot"] is None
|
||||
if snapshot is None
|
||||
else [
|
||||
{
|
||||
"box": to_relative_box(
|
||||
@ -198,9 +195,14 @@ class EventProcessor(threading.Thread):
|
||||
"label": a["label"],
|
||||
"score": a["score"],
|
||||
}
|
||||
for a in event_data["snapshot"]["attributes"]
|
||||
for a in snapshot["attributes"]
|
||||
]
|
||||
)
|
||||
snapshot_frame_time = None if snapshot is None else snapshot["frame_time"]
|
||||
snapshot_area = None if snapshot is None else snapshot["area"]
|
||||
snapshot_estimated_speed = (
|
||||
None if snapshot is None else snapshot["current_estimated_speed"]
|
||||
)
|
||||
|
||||
# keep these from being set back to false because the event
|
||||
# may have started while recordings/snapshots/alerts/detections were enabled
|
||||
@ -229,6 +231,10 @@ class EventProcessor(threading.Thread):
|
||||
"score": score,
|
||||
"top_score": event_data["top_score"],
|
||||
"attributes": attributes,
|
||||
"snapshot_clean": event_data.get("snapshot_clean", False),
|
||||
"snapshot_frame_time": snapshot_frame_time,
|
||||
"snapshot_area": snapshot_area,
|
||||
"snapshot_estimated_speed": snapshot_estimated_speed,
|
||||
"average_estimated_speed": event_data["average_estimated_speed"],
|
||||
"velocity_angle": event_data["velocity_angle"],
|
||||
"type": "object",
|
||||
@ -306,8 +312,11 @@ class EventProcessor(threading.Thread):
|
||||
"type": event_data["type"],
|
||||
"score": event_data["score"],
|
||||
"top_score": event_data["score"],
|
||||
"snapshot_clean": event_data.get("snapshot_clean", False),
|
||||
},
|
||||
}
|
||||
if event_data.get("draw") is not None:
|
||||
event[Event.data]["draw"] = event_data["draw"]
|
||||
if event_data.get("recognized_license_plate") is not None:
|
||||
event[Event.data]["recognized_license_plate"] = event_data[
|
||||
"recognized_license_plate"
|
||||
|
||||
@ -1,13 +1,19 @@
|
||||
"""Unit tests for recordings/media API endpoints."""
|
||||
|
||||
import os
|
||||
import tempfile
|
||||
from datetime import datetime, timezone
|
||||
from unittest.mock import patch
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
import pytz
|
||||
from fastapi import Request
|
||||
|
||||
from frigate.api.auth import get_allowed_cameras_for_filter, get_current_user
|
||||
from frigate.models import Recordings
|
||||
from frigate.models import Event, Recordings
|
||||
from frigate.test.http_api.base_http_test import AuthTestClient, BaseTestHttp
|
||||
from frigate.util import file as file_util
|
||||
|
||||
|
||||
class TestHttpMedia(BaseTestHttp):
|
||||
@ -15,8 +21,19 @@ class TestHttpMedia(BaseTestHttp):
|
||||
|
||||
def setUp(self):
|
||||
"""Set up test fixtures."""
|
||||
super().setUp([Recordings])
|
||||
super().setUp([Event, Recordings])
|
||||
self.minimal_config["cameras"]["front_door"]["snapshots"] = {
|
||||
"enabled": True,
|
||||
"bounding_box": True,
|
||||
"height": 40,
|
||||
"timestamp": False,
|
||||
}
|
||||
self.app = super().create_app()
|
||||
self.clips_dir = tempfile.TemporaryDirectory()
|
||||
self.clips_dir_patcher = patch.object(
|
||||
file_util, "CLIPS_DIR", self.clips_dir.name
|
||||
)
|
||||
self.clips_dir_patcher.start()
|
||||
|
||||
# Mock get_current_user for all tests
|
||||
async def mock_get_current_user(request: Request):
|
||||
@ -41,9 +58,17 @@ class TestHttpMedia(BaseTestHttp):
|
||||
|
||||
def tearDown(self):
|
||||
"""Clean up after tests."""
|
||||
self.clips_dir_patcher.stop()
|
||||
self.clips_dir.cleanup()
|
||||
self.app.dependency_overrides.clear()
|
||||
super().tearDown()
|
||||
|
||||
def _write_clean_snapshot(self, event_id: str, image: np.ndarray) -> None:
|
||||
assert cv2.imwrite(
|
||||
os.path.join(self.clips_dir.name, f"front_door-{event_id}-clean.webp"),
|
||||
image,
|
||||
)
|
||||
|
||||
def test_recordings_summary_across_dst_spring_forward(self):
|
||||
"""
|
||||
Test recordings summary across spring DST transition (spring forward).
|
||||
@ -403,3 +428,49 @@ class TestHttpMedia(BaseTestHttp):
|
||||
assert len(summary) == 1
|
||||
assert "2024-03-10" in summary
|
||||
assert summary["2024-03-10"] is True
|
||||
|
||||
def test_event_snapshot_helpers_read_clean_webp(self):
|
||||
event_id = "clean-webp"
|
||||
image = np.zeros((100, 200, 3), np.uint8)
|
||||
|
||||
self.insert_mock_event(
|
||||
event_id,
|
||||
data={
|
||||
"box": [0.25, 0.25, 0.25, 0.5],
|
||||
"score": 0.85,
|
||||
"attributes": [],
|
||||
"snapshot_clean": True,
|
||||
},
|
||||
)
|
||||
self._write_clean_snapshot(event_id, image)
|
||||
|
||||
event = Event.get(Event.id == event_id)
|
||||
snapshot_image, is_clean = file_util.load_event_snapshot_image(
|
||||
event, clean_only=True
|
||||
)
|
||||
|
||||
assert is_clean
|
||||
assert snapshot_image is not None
|
||||
assert snapshot_image.shape[:2] == image.shape[:2]
|
||||
|
||||
rendered_bytes, _ = file_util.get_event_snapshot_bytes(
|
||||
event,
|
||||
ext="jpg",
|
||||
timestamp=False,
|
||||
bounding_box=True,
|
||||
crop=False,
|
||||
height=40,
|
||||
quality=None,
|
||||
timestamp_style=self.app.frigate_config.cameras[
|
||||
event.camera
|
||||
].timestamp_style,
|
||||
colormap=self.app.frigate_config.model.colormap,
|
||||
)
|
||||
assert rendered_bytes is not None
|
||||
|
||||
rendered_image = cv2.imdecode(
|
||||
np.frombuffer(rendered_bytes, dtype=np.uint8),
|
||||
cv2.IMREAD_COLOR,
|
||||
)
|
||||
assert rendered_image.shape[0] == 40
|
||||
assert rendered_image.max() > 0
|
||||
|
||||
@ -1208,7 +1208,7 @@ class TestConfig(unittest.TestCase):
|
||||
|
||||
frigate_config = FrigateConfig(**config)
|
||||
assert frigate_config.cameras["back"].snapshots.bounding_box
|
||||
assert frigate_config.cameras["back"].snapshots.quality == 70
|
||||
assert frigate_config.cameras["back"].snapshots.quality == 60
|
||||
|
||||
def test_global_snapshots_merge(self):
|
||||
config = {
|
||||
|
||||
@ -547,7 +547,9 @@ class TrackedObjectProcessor(threading.Thread):
|
||||
"has_clip": self.config.cameras[camera_name].record.enabled
|
||||
and include_recording,
|
||||
"has_snapshot": True,
|
||||
"snapshot_clean": True,
|
||||
"type": source_type,
|
||||
"draw": draw,
|
||||
},
|
||||
)
|
||||
)
|
||||
@ -603,6 +605,7 @@ class TrackedObjectProcessor(threading.Thread):
|
||||
"has_clip": self.config.cameras[camera_name].record.enabled
|
||||
and include_recording,
|
||||
"has_snapshot": True,
|
||||
"snapshot_clean": True,
|
||||
"type": "api",
|
||||
"recognized_license_plate": plate,
|
||||
"recognized_license_plate_score": score,
|
||||
|
||||
@ -13,7 +13,6 @@ import numpy as np
|
||||
from frigate.config import (
|
||||
CameraConfig,
|
||||
FilterConfig,
|
||||
SnapshotsConfig,
|
||||
UIConfig,
|
||||
)
|
||||
from frigate.const import CLIPS_DIR, REPLAY_CAMERA_PREFIX, THUMB_DIR
|
||||
@ -22,9 +21,7 @@ from frigate.review.types import SeverityEnum
|
||||
from frigate.util.builtin import sanitize_float
|
||||
from frigate.util.image import (
|
||||
area,
|
||||
calculate_region,
|
||||
draw_box_with_label,
|
||||
draw_timestamp,
|
||||
get_snapshot_bytes,
|
||||
is_better_thumbnail,
|
||||
)
|
||||
from frigate.util.object import box_inside
|
||||
@ -393,6 +390,7 @@ class TrackedObject:
|
||||
"camera": self.camera_config.name,
|
||||
"frame_time": self.obj_data["frame_time"],
|
||||
"snapshot": self.thumbnail_data,
|
||||
"snapshot_clean": True,
|
||||
"label": self.obj_data["label"],
|
||||
"sub_label": self.obj_data.get("sub_label"),
|
||||
"top_score": self.top_score,
|
||||
@ -449,27 +447,15 @@ class TrackedObject:
|
||||
return img.tobytes()
|
||||
|
||||
def get_clean_webp(self) -> bytes | None:
|
||||
if self.thumbnail_data is None:
|
||||
return None
|
||||
|
||||
try:
|
||||
best_frame = cv2.cvtColor(
|
||||
self.frame_cache[self.thumbnail_data["frame_time"]]["frame"],
|
||||
cv2.COLOR_YUV2BGR_I420,
|
||||
)
|
||||
except KeyError:
|
||||
logger.warning(
|
||||
f"Unable to create clean webp because frame {self.thumbnail_data['frame_time']} is not in the cache"
|
||||
)
|
||||
return None
|
||||
|
||||
ret, webp = cv2.imencode(
|
||||
".webp", best_frame, [int(cv2.IMWRITE_WEBP_QUALITY), 60]
|
||||
webp_bytes, _ = self.get_img_bytes(
|
||||
ext="webp",
|
||||
timestamp=False,
|
||||
bounding_box=False,
|
||||
crop=False,
|
||||
height=None,
|
||||
quality=self.camera_config.snapshots.quality,
|
||||
)
|
||||
if ret:
|
||||
return webp.tobytes()
|
||||
else:
|
||||
return None
|
||||
return webp_bytes
|
||||
|
||||
def get_img_bytes(
|
||||
self,
|
||||
@ -491,131 +477,42 @@ class TrackedObject:
|
||||
)
|
||||
except KeyError:
|
||||
logger.warning(
|
||||
f"Unable to create jpg because frame {frame_time} is not in the cache"
|
||||
f"Unable to create snapshot because frame {frame_time} is not in the cache"
|
||||
)
|
||||
return None, None
|
||||
|
||||
if bounding_box:
|
||||
thickness = 2
|
||||
color = self.colormap.get(self.obj_data["label"], (255, 255, 255))
|
||||
|
||||
# draw the bounding boxes on the frame
|
||||
box = self.thumbnail_data["box"]
|
||||
draw_box_with_label(
|
||||
best_frame,
|
||||
box[0],
|
||||
box[1],
|
||||
box[2],
|
||||
box[3],
|
||||
self.obj_data["label"],
|
||||
f"{int(self.thumbnail_data['score'] * 100)}% {int(self.thumbnail_data['area'])}"
|
||||
+ (
|
||||
f" {self.thumbnail_data['current_estimated_speed']:.1f}"
|
||||
if self.thumbnail_data["current_estimated_speed"] != 0
|
||||
else ""
|
||||
),
|
||||
thickness=thickness,
|
||||
color=color,
|
||||
)
|
||||
|
||||
# draw any attributes
|
||||
for attribute in self.thumbnail_data["attributes"]:
|
||||
box = attribute["box"]
|
||||
box_area = int((box[2] - box[0]) * (box[3] - box[1]))
|
||||
draw_box_with_label(
|
||||
best_frame,
|
||||
box[0],
|
||||
box[1],
|
||||
box[2],
|
||||
box[3],
|
||||
attribute["label"],
|
||||
f"{attribute['score']:.0%} {str(box_area)}",
|
||||
thickness=thickness,
|
||||
color=color,
|
||||
)
|
||||
|
||||
if crop:
|
||||
box = self.thumbnail_data["box"]
|
||||
box_size = 300
|
||||
region = calculate_region(
|
||||
best_frame.shape,
|
||||
box[0],
|
||||
box[1],
|
||||
box[2],
|
||||
box[3],
|
||||
box_size,
|
||||
multiplier=1.1,
|
||||
)
|
||||
best_frame = best_frame[region[1] : region[3], region[0] : region[2]]
|
||||
|
||||
if height:
|
||||
width = int(height * best_frame.shape[1] / best_frame.shape[0])
|
||||
best_frame = cv2.resize(
|
||||
best_frame, dsize=(width, height), interpolation=cv2.INTER_AREA
|
||||
)
|
||||
if timestamp:
|
||||
colors = self.camera_config.timestamp_style.color
|
||||
draw_timestamp(
|
||||
best_frame,
|
||||
self.thumbnail_data["frame_time"],
|
||||
self.camera_config.timestamp_style.format,
|
||||
font_effect=self.camera_config.timestamp_style.effect,
|
||||
font_thickness=self.camera_config.timestamp_style.thickness,
|
||||
font_color=(colors.blue, colors.green, colors.red),
|
||||
position=self.camera_config.timestamp_style.position,
|
||||
)
|
||||
|
||||
quality_params = []
|
||||
|
||||
if ext == "jpg":
|
||||
quality_params = [int(cv2.IMWRITE_JPEG_QUALITY), quality or 70]
|
||||
elif ext == "webp":
|
||||
quality_params = [int(cv2.IMWRITE_WEBP_QUALITY), quality or 60]
|
||||
|
||||
ret, jpg = cv2.imencode(f".{ext}", best_frame, quality_params)
|
||||
|
||||
if ret:
|
||||
return jpg.tobytes(), frame_time
|
||||
else:
|
||||
return None, None
|
||||
return get_snapshot_bytes(
|
||||
best_frame,
|
||||
frame_time,
|
||||
ext=ext,
|
||||
timestamp=timestamp,
|
||||
bounding_box=bounding_box,
|
||||
crop=crop,
|
||||
height=height,
|
||||
quality=quality,
|
||||
label=self.obj_data["label"],
|
||||
box=self.thumbnail_data["box"],
|
||||
score=self.thumbnail_data["score"],
|
||||
area=self.thumbnail_data["area"],
|
||||
attributes=self.thumbnail_data["attributes"],
|
||||
color=self.colormap.get(self.obj_data["label"], (255, 255, 255)),
|
||||
timestamp_style=self.camera_config.timestamp_style,
|
||||
estimated_speed=self.thumbnail_data["current_estimated_speed"],
|
||||
)
|
||||
|
||||
def write_snapshot_to_disk(self) -> None:
|
||||
snapshot_config: SnapshotsConfig = self.camera_config.snapshots
|
||||
jpg_bytes, _ = self.get_img_bytes(
|
||||
ext="jpg",
|
||||
timestamp=snapshot_config.timestamp,
|
||||
bounding_box=snapshot_config.bounding_box,
|
||||
crop=snapshot_config.crop,
|
||||
height=snapshot_config.height,
|
||||
quality=snapshot_config.quality,
|
||||
)
|
||||
if jpg_bytes is None:
|
||||
webp_bytes = self.get_clean_webp()
|
||||
if webp_bytes is None:
|
||||
logger.warning(f"Unable to save snapshot for {self.obj_data['id']}.")
|
||||
else:
|
||||
with open(
|
||||
os.path.join(
|
||||
CLIPS_DIR, f"{self.camera_config.name}-{self.obj_data['id']}.jpg"
|
||||
CLIPS_DIR,
|
||||
f"{self.camera_config.name}-{self.obj_data['id']}-clean.webp",
|
||||
),
|
||||
"wb",
|
||||
) as j:
|
||||
j.write(jpg_bytes)
|
||||
|
||||
# write clean snapshot if enabled
|
||||
if snapshot_config.clean_copy:
|
||||
webp_bytes = self.get_clean_webp()
|
||||
if webp_bytes is None:
|
||||
logger.warning(
|
||||
f"Unable to save clean snapshot for {self.obj_data['id']}."
|
||||
)
|
||||
else:
|
||||
with open(
|
||||
os.path.join(
|
||||
CLIPS_DIR,
|
||||
f"{self.camera_config.name}-{self.obj_data['id']}-clean.webp",
|
||||
),
|
||||
"wb",
|
||||
) as p:
|
||||
p.write(webp_bytes)
|
||||
) as p:
|
||||
p.write(webp_bytes)
|
||||
|
||||
def write_thumbnail_to_disk(self) -> None:
|
||||
if not self.camera_config.name:
|
||||
|
||||
@ -133,6 +133,18 @@ def cleanup_camera_files(
|
||||
except Exception as e:
|
||||
logger.error("Failed to remove snapshot %s: %s", snapshot, e)
|
||||
|
||||
for snapshot in glob.glob(os.path.join(CLIPS_DIR, f"{camera_name}-*-clean.webp")):
|
||||
try:
|
||||
os.remove(snapshot)
|
||||
except Exception as e:
|
||||
logger.error("Failed to remove snapshot %s: %s", snapshot, e)
|
||||
|
||||
for snapshot in glob.glob(os.path.join(CLIPS_DIR, f"{camera_name}-*-clean.png")):
|
||||
try:
|
||||
os.remove(snapshot)
|
||||
except Exception as e:
|
||||
logger.error("Failed to remove snapshot %s: %s", snapshot, e)
|
||||
|
||||
# Remove review thumbnail files
|
||||
for thumb in glob.glob(
|
||||
os.path.join(CLIPS_DIR, "review", f"thumb-{camera_name}-*.webp")
|
||||
|
||||
@ -586,6 +586,23 @@ def migrate_018_0(config: dict[str, dict[str, Any]]) -> dict[str, dict[str, Any]
|
||||
|
||||
new_config["cameras"][name] = camera_config
|
||||
|
||||
# Remove deprecated clean_copy from global snapshots config
|
||||
if new_config.get("snapshots", {}).get("clean_copy") is not None:
|
||||
del new_config["snapshots"]["clean_copy"]
|
||||
if not new_config["snapshots"]:
|
||||
del new_config["snapshots"]
|
||||
|
||||
# Remove deprecated clean_copy from camera snapshots configs
|
||||
for name, camera in new_config.get("cameras", {}).items():
|
||||
camera_config: dict[str, dict[str, Any]] = camera.copy()
|
||||
|
||||
if camera_config.get("snapshots", {}).get("clean_copy") is not None:
|
||||
del camera_config["snapshots"]["clean_copy"]
|
||||
if not camera_config["snapshots"]:
|
||||
del camera_config["snapshots"]
|
||||
|
||||
new_config["cameras"][name] = camera_config
|
||||
|
||||
new_config["version"] = "0.18-0"
|
||||
return new_config
|
||||
|
||||
|
||||
@ -5,14 +5,16 @@ import fcntl
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
from typing import Any, Optional
|
||||
|
||||
import cv2
|
||||
from numpy import ndarray
|
||||
|
||||
from frigate.const import CLIPS_DIR, THUMB_DIR
|
||||
from frigate.models import Event
|
||||
from frigate.util.image import get_snapshot_bytes, relative_box_to_absolute
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -30,9 +32,224 @@ def get_event_thumbnail_bytes(event: Event) -> bytes | None:
|
||||
return None
|
||||
|
||||
|
||||
def get_event_snapshot(event: Event) -> ndarray:
|
||||
media_name = f"{event.camera}-{event.id}"
|
||||
return cv2.imread(f"{os.path.join(CLIPS_DIR, media_name)}.jpg")
|
||||
def get_event_snapshot(event: Event) -> ndarray | None:
|
||||
image, _ = load_event_snapshot_image(event)
|
||||
return image
|
||||
|
||||
|
||||
def _load_snapshot_image(image_path: str) -> ndarray | None:
|
||||
image = cv2.imread(image_path, cv2.IMREAD_UNCHANGED)
|
||||
if image is None:
|
||||
return None
|
||||
|
||||
if len(image.shape) == 2:
|
||||
return cv2.cvtColor(image, cv2.COLOR_GRAY2BGR)
|
||||
|
||||
if len(image.shape) == 3 and image.shape[2] == 4:
|
||||
return cv2.cvtColor(image, cv2.COLOR_BGRA2BGR)
|
||||
|
||||
return image
|
||||
|
||||
|
||||
def _event_snapshot_is_clean(event: Event) -> bool:
|
||||
return bool(event.data and event.data.get("snapshot_clean"))
|
||||
|
||||
|
||||
def get_event_snapshot_path(
|
||||
event: Event, *, clean_only: bool = False
|
||||
) -> tuple[str | None, bool]:
|
||||
clean_snapshot_paths = [
|
||||
os.path.join(CLIPS_DIR, f"{event.camera}-{event.id}-clean.webp"),
|
||||
os.path.join(CLIPS_DIR, f"{event.camera}-{event.id}-clean.png"),
|
||||
]
|
||||
|
||||
for image_path in clean_snapshot_paths:
|
||||
if os.path.exists(image_path):
|
||||
return image_path, True
|
||||
|
||||
snapshot_path = os.path.join(CLIPS_DIR, f"{event.camera}-{event.id}.jpg")
|
||||
if not os.path.exists(snapshot_path):
|
||||
return None, False
|
||||
|
||||
is_clean_snapshot = _event_snapshot_is_clean(event)
|
||||
if clean_only and not is_clean_snapshot:
|
||||
return None, False
|
||||
|
||||
return snapshot_path, is_clean_snapshot
|
||||
|
||||
|
||||
def load_event_snapshot_image(
|
||||
event: Event, *, clean_only: bool = False
|
||||
) -> tuple[ndarray | None, bool]:
|
||||
image_path, is_clean_snapshot = get_event_snapshot_path(
|
||||
event, clean_only=clean_only
|
||||
)
|
||||
if image_path is None:
|
||||
return None, False
|
||||
|
||||
image = _load_snapshot_image(image_path)
|
||||
if image is None:
|
||||
logger.warning("Unable to load snapshot from %s", image_path)
|
||||
return None, False
|
||||
|
||||
return image, is_clean_snapshot
|
||||
|
||||
|
||||
def _get_event_snapshot_overlay_boxes(
|
||||
frame_shape: tuple[int, ...], event: Event
|
||||
) -> list[dict[str, Any]]:
|
||||
overlay_boxes: list[dict[str, Any]] = []
|
||||
draw_data = event.data.get("draw") if event.data else {}
|
||||
draw_boxes = draw_data.get("boxes", []) if isinstance(draw_data, dict) else []
|
||||
|
||||
for draw_box in draw_boxes:
|
||||
box = relative_box_to_absolute(frame_shape, draw_box.get("box"))
|
||||
if box is None:
|
||||
continue
|
||||
|
||||
draw_color = draw_box.get("color", (255, 0, 0))
|
||||
color = (
|
||||
tuple(draw_color) if isinstance(draw_color, (list, tuple)) else (255, 0, 0)
|
||||
)
|
||||
overlay_boxes.append(
|
||||
{
|
||||
"box": box,
|
||||
"label": event.label,
|
||||
"score": draw_box.get("score"),
|
||||
"color": color,
|
||||
}
|
||||
)
|
||||
|
||||
return overlay_boxes
|
||||
|
||||
|
||||
def get_event_snapshot_bytes(
|
||||
event: Event,
|
||||
*,
|
||||
ext: str,
|
||||
timestamp: bool = False,
|
||||
bounding_box: bool = False,
|
||||
crop: bool = False,
|
||||
height: int | None = None,
|
||||
quality: int | None = None,
|
||||
timestamp_style: Any | None = None,
|
||||
colormap: dict[str, tuple[int, int, int]] | None = None,
|
||||
) -> tuple[bytes | None, float]:
|
||||
best_frame, is_clean_snapshot = load_event_snapshot_image(event)
|
||||
if best_frame is None:
|
||||
return None, 0
|
||||
|
||||
frame_time = _get_event_snapshot_frame_time(event)
|
||||
box = relative_box_to_absolute(
|
||||
best_frame.shape,
|
||||
event.data.get("box") if event.data else None,
|
||||
)
|
||||
overlay_boxes = _get_event_snapshot_overlay_boxes(best_frame.shape, event)
|
||||
|
||||
if (bounding_box or crop or timestamp) and not is_clean_snapshot:
|
||||
logger.warning(
|
||||
"Unable to fully honor snapshot query parameters for completed event %s because the clean snapshot is unavailable.",
|
||||
event.id,
|
||||
)
|
||||
|
||||
return get_snapshot_bytes(
|
||||
best_frame,
|
||||
frame_time,
|
||||
ext=ext,
|
||||
timestamp=timestamp and is_clean_snapshot,
|
||||
bounding_box=bounding_box and is_clean_snapshot,
|
||||
crop=crop and is_clean_snapshot,
|
||||
height=height,
|
||||
quality=quality,
|
||||
label=event.label,
|
||||
box=box,
|
||||
score=_get_event_snapshot_score(event),
|
||||
area=_get_event_snapshot_area(event),
|
||||
attributes=_get_event_snapshot_attributes(
|
||||
best_frame.shape,
|
||||
event.data.get("attributes") if event.data else None,
|
||||
),
|
||||
color=(colormap or {}).get(event.label, (255, 255, 255)),
|
||||
overlay_boxes=overlay_boxes,
|
||||
timestamp_style=timestamp_style,
|
||||
estimated_speed=_get_event_snapshot_estimated_speed(event),
|
||||
)
|
||||
|
||||
|
||||
def _as_timestamp(value: Any) -> float:
|
||||
if isinstance(value, datetime):
|
||||
return value.timestamp()
|
||||
|
||||
return float(value)
|
||||
|
||||
|
||||
def _get_event_snapshot_frame_time(event: Event) -> float:
|
||||
if event.data:
|
||||
snapshot_frame_time = event.data.get("snapshot_frame_time")
|
||||
if snapshot_frame_time is not None:
|
||||
return _as_timestamp(snapshot_frame_time)
|
||||
|
||||
frame_time = event.data.get("frame_time")
|
||||
if frame_time is not None:
|
||||
return _as_timestamp(frame_time)
|
||||
|
||||
return _as_timestamp(event.start_time)
|
||||
|
||||
|
||||
def _get_event_snapshot_attributes(
|
||||
frame_shape: tuple[int, ...], attributes: list[dict[str, Any]] | None
|
||||
) -> list[dict[str, Any]]:
|
||||
absolute_attributes: list[dict[str, Any]] = []
|
||||
|
||||
for attribute in attributes or []:
|
||||
box = relative_box_to_absolute(frame_shape, attribute.get("box"))
|
||||
if box is None:
|
||||
continue
|
||||
|
||||
absolute_attributes.append(
|
||||
{
|
||||
"box": box,
|
||||
"label": attribute.get("label", "attribute"),
|
||||
"score": attribute.get("score", 0),
|
||||
}
|
||||
)
|
||||
|
||||
return absolute_attributes
|
||||
|
||||
|
||||
def _get_event_snapshot_score(event: Event) -> float:
|
||||
if event.data:
|
||||
score = event.data.get("score")
|
||||
if score is not None:
|
||||
return score
|
||||
|
||||
top_score = event.data.get("top_score")
|
||||
if top_score is not None:
|
||||
return top_score
|
||||
|
||||
return event.top_score or event.score or 0
|
||||
|
||||
|
||||
def _get_event_snapshot_area(event: Event) -> int | None:
|
||||
if event.data:
|
||||
area = event.data.get("snapshot_area")
|
||||
if area is not None:
|
||||
return int(area)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def _get_event_snapshot_estimated_speed(event: Event) -> float:
|
||||
if event.data:
|
||||
estimated_speed = event.data.get("snapshot_estimated_speed")
|
||||
if estimated_speed is not None:
|
||||
return float(estimated_speed)
|
||||
|
||||
average_speed = event.data.get("average_estimated_speed")
|
||||
if average_speed is not None:
|
||||
return float(average_speed)
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
### Deletion
|
||||
|
||||
@ -270,6 +270,229 @@ def draw_box_with_label(
|
||||
)
|
||||
|
||||
|
||||
def get_image_quality_params(ext: str, quality: Optional[int]) -> list[int]:
|
||||
if ext in ("jpg", "jpeg"):
|
||||
return [int(cv2.IMWRITE_JPEG_QUALITY), quality or 70]
|
||||
|
||||
if ext == "webp":
|
||||
return [int(cv2.IMWRITE_WEBP_QUALITY), quality or 60]
|
||||
|
||||
return []
|
||||
|
||||
|
||||
def relative_box_to_absolute(
|
||||
frame_shape: tuple[int, ...], box: list[float] | tuple[float, ...] | None
|
||||
) -> tuple[int, int, int, int] | None:
|
||||
if box is None or len(box) != 4:
|
||||
return None
|
||||
|
||||
frame_height = frame_shape[0]
|
||||
frame_width = frame_shape[1]
|
||||
x_min = int(box[0] * frame_width)
|
||||
y_min = int(box[1] * frame_height)
|
||||
x_max = x_min + int(box[2] * frame_width)
|
||||
y_max = y_min + int(box[3] * frame_height)
|
||||
|
||||
x_min = max(0, min(frame_width - 1, x_min))
|
||||
y_min = max(0, min(frame_height - 1, y_min))
|
||||
x_max = max(x_min + 1, min(frame_width - 1, x_max))
|
||||
y_max = max(y_min + 1, min(frame_height - 1, y_max))
|
||||
|
||||
return (x_min, y_min, x_max, y_max)
|
||||
|
||||
|
||||
def _format_snapshot_label(
|
||||
score: float | None,
|
||||
area: int | None,
|
||||
box: tuple[int, int, int, int] | None,
|
||||
estimated_speed: float = 0,
|
||||
) -> str:
|
||||
score_value = score or 0
|
||||
score_text = (
|
||||
f"{int(score_value * 100)}%" if score_value <= 1 else f"{int(score_value)}%"
|
||||
)
|
||||
|
||||
if area is None and box is not None:
|
||||
area = int((box[2] - box[0]) * (box[3] - box[1]))
|
||||
|
||||
label = f"{score_text} {int(area or 0)}"
|
||||
if estimated_speed:
|
||||
label = f"{label} {estimated_speed:.1f}"
|
||||
|
||||
return label
|
||||
|
||||
|
||||
def draw_snapshot_bounding_boxes(
|
||||
frame: np.ndarray,
|
||||
label: str,
|
||||
box: tuple[int, int, int, int] | None,
|
||||
score: float | None,
|
||||
area: int | None,
|
||||
attributes: list[dict[str, Any]] | None,
|
||||
color: tuple[int, int, int],
|
||||
estimated_speed: float = 0,
|
||||
) -> None:
|
||||
if box is None:
|
||||
return
|
||||
|
||||
draw_box_with_label(
|
||||
frame,
|
||||
box[0],
|
||||
box[1],
|
||||
box[2],
|
||||
box[3],
|
||||
label,
|
||||
_format_snapshot_label(score, area, box, estimated_speed),
|
||||
thickness=2,
|
||||
color=color,
|
||||
)
|
||||
|
||||
for attribute in attributes or []:
|
||||
attribute_box = attribute.get("box")
|
||||
if attribute_box is None:
|
||||
continue
|
||||
|
||||
box_area = int(
|
||||
(attribute_box[2] - attribute_box[0])
|
||||
* (attribute_box[3] - attribute_box[1])
|
||||
)
|
||||
draw_box_with_label(
|
||||
frame,
|
||||
attribute_box[0],
|
||||
attribute_box[1],
|
||||
attribute_box[2],
|
||||
attribute_box[3],
|
||||
attribute.get("label", "attribute"),
|
||||
f"{attribute.get('score', 0):.0%} {box_area}",
|
||||
thickness=2,
|
||||
color=color,
|
||||
)
|
||||
|
||||
|
||||
def _get_snapshot_overlay_box_label(
|
||||
score: float | int | None, box: tuple[int, int, int, int]
|
||||
) -> str:
|
||||
area = int((box[2] - box[0]) * (box[3] - box[1]))
|
||||
|
||||
if score is None:
|
||||
return f"- {area}"
|
||||
|
||||
score_value = float(score)
|
||||
score_text = (
|
||||
f"{int(score_value * 100)}%" if score_value <= 1 else f"{int(score_value)}%"
|
||||
)
|
||||
return f"{score_text} {area}"
|
||||
|
||||
|
||||
def draw_snapshot_overlay_boxes(
|
||||
frame: np.ndarray,
|
||||
overlay_boxes: list[dict[str, Any]] | None,
|
||||
default_label: str,
|
||||
default_color: tuple[int, int, int],
|
||||
) -> None:
|
||||
for overlay_box in overlay_boxes or []:
|
||||
box = overlay_box.get("box")
|
||||
if box is None:
|
||||
continue
|
||||
|
||||
box_color = overlay_box.get("color", default_color)
|
||||
color = (
|
||||
tuple(box_color) if isinstance(box_color, (list, tuple)) else default_color
|
||||
)
|
||||
draw_box_with_label(
|
||||
frame,
|
||||
box[0],
|
||||
box[1],
|
||||
box[2],
|
||||
box[3],
|
||||
overlay_box.get("label", default_label),
|
||||
_get_snapshot_overlay_box_label(overlay_box.get("score"), box),
|
||||
thickness=2,
|
||||
color=color,
|
||||
)
|
||||
|
||||
|
||||
def get_snapshot_bytes(
|
||||
frame: np.ndarray,
|
||||
frame_time: float,
|
||||
ext: str,
|
||||
*,
|
||||
timestamp: bool = False,
|
||||
bounding_box: bool = False,
|
||||
crop: bool = False,
|
||||
height: int | None = None,
|
||||
quality: int | None = None,
|
||||
label: str,
|
||||
box: tuple[int, int, int, int] | None,
|
||||
score: float | None,
|
||||
area: int | None,
|
||||
attributes: list[dict[str, Any]] | None,
|
||||
color: tuple[int, int, int],
|
||||
overlay_boxes: list[dict[str, Any]] | None = None,
|
||||
timestamp_style: Any | None = None,
|
||||
estimated_speed: float = 0,
|
||||
) -> tuple[bytes | None, float]:
|
||||
best_frame = frame.copy()
|
||||
crop_box = box
|
||||
|
||||
if crop_box is None and overlay_boxes and len(overlay_boxes) == 1:
|
||||
crop_box = overlay_boxes[0].get("box")
|
||||
|
||||
if bounding_box and box:
|
||||
draw_snapshot_bounding_boxes(
|
||||
best_frame,
|
||||
label,
|
||||
box,
|
||||
score,
|
||||
area,
|
||||
attributes,
|
||||
color,
|
||||
estimated_speed,
|
||||
)
|
||||
|
||||
if bounding_box and overlay_boxes:
|
||||
draw_snapshot_overlay_boxes(best_frame, overlay_boxes, label, color)
|
||||
|
||||
if crop and crop_box:
|
||||
region = calculate_region(
|
||||
best_frame.shape,
|
||||
crop_box[0],
|
||||
crop_box[1],
|
||||
crop_box[2],
|
||||
crop_box[3],
|
||||
300,
|
||||
multiplier=1.1,
|
||||
)
|
||||
best_frame = best_frame[region[1] : region[3], region[0] : region[2]]
|
||||
|
||||
if height:
|
||||
width = int(height * best_frame.shape[1] / best_frame.shape[0])
|
||||
best_frame = cv2.resize(
|
||||
best_frame, dsize=(width, height), interpolation=cv2.INTER_AREA
|
||||
)
|
||||
|
||||
if timestamp and timestamp_style is not None:
|
||||
colors = timestamp_style.color
|
||||
draw_timestamp(
|
||||
best_frame,
|
||||
frame_time,
|
||||
timestamp_style.format,
|
||||
font_effect=timestamp_style.effect,
|
||||
font_thickness=timestamp_style.thickness,
|
||||
font_color=(colors.blue, colors.green, colors.red),
|
||||
position=timestamp_style.position,
|
||||
)
|
||||
|
||||
ret, img = cv2.imencode(
|
||||
f".{ext}", best_frame, get_image_quality_params(ext, quality)
|
||||
)
|
||||
|
||||
if ret:
|
||||
return img.tobytes(), frame_time
|
||||
|
||||
return None, frame_time
|
||||
|
||||
|
||||
def grab_cv2_contours(cnts):
|
||||
# if the length the contours tuple returned by cv2.findContours
|
||||
# is '2' then we are using either OpenCV v2.4, v4-beta, or
|
||||
|
||||
@ -246,8 +246,8 @@ def sync_recordings(
|
||||
def sync_event_snapshots(dry_run: bool = False, force: bool = False) -> SyncResult:
|
||||
"""Sync event snapshots - delete files not referenced by any event.
|
||||
|
||||
Event snapshots are stored at: CLIPS_DIR/{camera}-{event_id}.jpg
|
||||
Also checks for clean variants: {camera}-{event_id}-clean.webp and -clean.png
|
||||
Event snapshots are stored at: CLIPS_DIR/{camera}-{event_id}-clean.webp
|
||||
Also checks legacy variants: {camera}-{event_id}.jpg and -clean.png
|
||||
"""
|
||||
result = SyncResult(media_type="event_snapshots")
|
||||
|
||||
|
||||
@ -166,8 +166,7 @@ export default function SearchResultActions({
|
||||
</a>
|
||||
</MenuItem>
|
||||
)}
|
||||
{searchResult.has_snapshot &&
|
||||
config?.cameras[searchResult.camera].snapshots.clean_copy && (
|
||||
{searchResult.has_snapshot && (
|
||||
<MenuItem aria-label={t("itemMenu.downloadCleanSnapshot.aria")}>
|
||||
<a
|
||||
className="flex items-center"
|
||||
|
||||
@ -94,8 +94,7 @@ export default function DetailActionsMenu({
|
||||
</a>
|
||||
</DropdownMenuItem>
|
||||
)}
|
||||
{search.has_snapshot &&
|
||||
config?.cameras[search.camera].snapshots.clean_copy && (
|
||||
{search.has_snapshot && (
|
||||
<DropdownMenuItem>
|
||||
<a
|
||||
className="w-full"
|
||||
|
||||
@ -263,7 +263,6 @@ export interface CameraConfig {
|
||||
};
|
||||
snapshots: {
|
||||
bounding_box: boolean;
|
||||
clean_copy: boolean;
|
||||
crop: boolean;
|
||||
enabled: boolean;
|
||||
height: number | null;
|
||||
@ -580,7 +579,6 @@ export interface FrigateConfig {
|
||||
|
||||
snapshots: {
|
||||
bounding_box: boolean;
|
||||
clean_copy: boolean;
|
||||
crop: boolean;
|
||||
enabled: boolean;
|
||||
height: number | null;
|
||||
|
||||
@ -8,7 +8,6 @@ import axios from "axios";
|
||||
import { FrigateConfig } from "@/types/frigateConfig";
|
||||
import { CheckCircle2, XCircle } from "lucide-react";
|
||||
import { Trans, useTranslation } from "react-i18next";
|
||||
import { IoIosWarning } from "react-icons/io";
|
||||
import { Button } from "@/components/ui/button";
|
||||
import { Link } from "react-router-dom";
|
||||
import { LuExternalLink } from "react-icons/lu";
|
||||
@ -197,15 +196,6 @@ export default function FrigatePlusSettingsView({
|
||||
document.title = t("documentTitle.frigatePlus");
|
||||
}, [t]);
|
||||
|
||||
const needCleanSnapshots = () => {
|
||||
if (!config) {
|
||||
return false;
|
||||
}
|
||||
return Object.values(config.cameras).some(
|
||||
(camera) => camera.snapshots.enabled && !camera.snapshots.clean_copy,
|
||||
);
|
||||
};
|
||||
|
||||
if (!config) {
|
||||
return <ActivityIndicator />;
|
||||
}
|
||||
@ -440,8 +430,7 @@ export default function FrigatePlusSettingsView({
|
||||
)}
|
||||
</td>
|
||||
<td className="px-4 py-2 text-center">
|
||||
{camera.snapshots?.enabled &&
|
||||
camera.snapshots?.clean_copy ? (
|
||||
{camera.snapshots?.enabled ? (
|
||||
<CheckCircle2 className="mx-auto size-5 text-green-500" />
|
||||
) : (
|
||||
<XCircle className="mx-auto size-5 text-danger" />
|
||||
@ -453,18 +442,6 @@ export default function FrigatePlusSettingsView({
|
||||
</tbody>
|
||||
</table>
|
||||
</div>
|
||||
{needCleanSnapshots() && (
|
||||
<div className="rounded-lg border border-secondary-foreground bg-secondary p-4 text-sm text-danger">
|
||||
<div className="flex items-center gap-2">
|
||||
<IoIosWarning className="mr-2 size-5 text-danger" />
|
||||
<div className="max-w-[85%] text-sm">
|
||||
<Trans ns="views/settings">
|
||||
frigatePlus.snapshotConfig.cleanCopyWarning
|
||||
</Trans>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
}
|
||||
/>
|
||||
|
||||
Loading…
Reference in New Issue
Block a user