Compare commits

...

10 Commits

Author SHA1 Message Date
ryzendigo
0d1783f268
Merge fb721b3ec9 into 77831304a7 2026-04-23 19:36:18 +01:00
Josh Hawkins
77831304a7
Camera access fixes (#22987)
Some checks are pending
CI / AMD64 Build (push) Waiting to run
CI / ARM Build (push) Waiting to run
CI / Jetson Jetpack 6 (push) Waiting to run
CI / AMD64 Extra Build (push) Blocked by required conditions
CI / ARM Extra Build (push) Blocked by required conditions
CI / Synaptics Build (push) Blocked by required conditions
CI / Assemble and push default build (push) Blocked by required conditions
* only send monitoring notifications to users with camera access

* check access to similarity search event id camera

* require admin role for storage usage endpoint

* check camera access for jsmpeg and birdseye cameras

* tests

* formatting
2026-04-23 12:27:49 -06:00
Josh Hawkins
1a6d04fde7
use object-anchored snapshot crops for classification wizard examples (#22985) 2026-04-23 08:53:48 -05:00
Josh Hawkins
4a1b7a1629
enforce python-level timeout on ffprobe subprocesses (#22984) 2026-04-23 07:16:22 -06:00
Nicolas Mowen
8eace9c3e7
WebUI tweaks (#22980)
Some checks are pending
CI / AMD64 Build (push) Waiting to run
CI / ARM Build (push) Waiting to run
CI / Jetson Jetpack 6 (push) Waiting to run
CI / AMD64 Extra Build (push) Blocked by required conditions
CI / ARM Extra Build (push) Blocked by required conditions
CI / Synaptics Build (push) Blocked by required conditions
CI / Assemble and push default build (push) Blocked by required conditions
* Use escape key to go back to main camera dashboard

* Add icon showing when review item is needing review
2026-04-22 21:37:17 -05:00
ryzendigo
fb721b3ec9 fix: correct balance_groups test to match actual algorithm behavior 2026-03-21 17:52:25 +08:00
ryzendigo
0115265cb6 fix: use dependencies=[] for auth deps, fix balance test 2026-03-21 17:21:48 +08:00
ryzendigo
2b3f32e5df fix: clean up comment formatting 2026-03-21 16:42:18 +08:00
ryzendigo
17e5211991 feat(recap): add auto-generation scheduler and more config options
New config options:
- auto_generate: trigger daily recap at a scheduled time
- schedule_time: HH:MM for when to run (default 02:00)
- cameras: list of cameras to process (empty = all)
- speed: playback speed multiplier (1-8x, default 2)
- max_per_group: how many events play simultaneously (1-10, default 3)

New scheduler thread (recap/scheduler.py) checks once per minute,
generates yesterday's recap for each configured camera when the
scheduled time hits. Validated schedule_time format in config.
2026-03-21 16:40:49 +08:00
ryzendigo
717b878956 feat: add daily recap video generation
Adds a new recap feature that composites detected people from throughout
the day onto a clean background, producing a short summary video of all
activity for a given camera.

How it works:
- Builds a clean background plate via median of sampled frames
- Extracts clip frames for each person event from recordings
- Uses per-event background subtraction (first frame of clip as reference)
  within a soft spotlight region to isolate the person
- Groups non-overlapping events to play simultaneously
- Balances groups by duration so the video stays even
- Renders at 2x speed, stitches groups into final output

New files:
- frigate/recap/ — core generation module
- frigate/api/recap.py — POST /recap/{camera}, GET /recap/{camera}
- frigate/config/recap.py — recap config section (enabled, fps, etc)
- frigate/test/test_recap.py — unit tests
- web/src/components/overlay/RecapDialog.tsx — UI component (not yet wired)

Config example:
  recap:
    enabled: true
    default_label: person
    output_fps: 10
    video_duration: 30
    background_samples: 30

Relates to #54
2026-03-21 16:36:39 +08:00
27 changed files with 1759 additions and 65 deletions

View File

@ -15,4 +15,5 @@ class Tags(Enum):
notifications = "Notifications"
preview = "Preview"
recordings = "Recordings"
recap = "Recap"
review = "Review"

View File

@ -754,6 +754,15 @@ def events_search(
status_code=404,
)
if search_event.camera not in allowed_cameras:
return JSONResponse(
content={
"success": False,
"message": "Event not found",
},
status_code=404,
)
thumb_result = context.search_thumbnail(search_event)
thumb_ids = {result[0]: result[1] for result in thumb_result}
search_results = {

View File

@ -25,6 +25,7 @@ from frigate.api import (
motion_search,
notification,
preview,
recap,
record,
review,
)
@ -138,6 +139,7 @@ def create_fastapi_app(
app.include_router(preview.router)
app.include_router(notification.router)
app.include_router(export.router)
app.include_router(recap.router)
app.include_router(event.router)
app.include_router(media.router)
app.include_router(motion_search.router)

100
frigate/api/recap.py Normal file
View File

@ -0,0 +1,100 @@
"""Recap API endpoints."""
import logging
import random
import string
from typing import Optional
from fastapi import APIRouter, Depends, Request
from fastapi.responses import JSONResponse
from frigate.api.auth import require_camera_access, require_role
from frigate.api.defs.tags import Tags
from frigate.models import Export
from frigate.recap.recap import RecapGenerator
logger = logging.getLogger(__name__)
router = APIRouter(tags=[Tags.recap])
@router.post(
"/recap/{camera_name}",
summary="Generate a time-stacked recap video",
description="Creates a video showing all detected objects from the given time range "
"composited onto a clean background. Each detection appears at its real "
"position with a timestamp label.",
dependencies=[Depends(require_role(["admin"]))],
)
def generate_recap(
request: Request,
camera_name: str,
start_time: float,
end_time: float,
label: Optional[str] = None,
):
config = request.app.frigate_config
if not config.recap.enabled:
return JSONResponse(
content={
"success": False,
"message": "recap generation is not enabled in config",
},
status_code=400,
)
if camera_name not in config.cameras:
return JSONResponse(
content={"success": False, "message": f"unknown camera: {camera_name}"},
status_code=404,
)
if end_time <= start_time:
return JSONResponse(
content={"success": False, "message": "end_time must be after start_time"},
status_code=400,
)
use_label = label or config.recap.default_label
export_id = (
f"{camera_name}_recap_"
f"{''.join(random.choices(string.ascii_lowercase + string.digits, k=6))}"
)
generator = RecapGenerator(
config=config,
export_id=export_id,
camera=camera_name,
start_time=start_time,
end_time=end_time,
label=use_label,
)
generator.start()
return JSONResponse(
content={
"success": True,
"message": "recap generation started",
"export_id": export_id,
}
)
@router.get(
"/recap/{camera_name}",
summary="List recap exports for a camera",
dependencies=[Depends(require_camera_access)],
)
def get_recaps(
request: Request,
camera_name: str,
):
recaps = (
Export.select()
.where(Export.camera == camera_name)
.where(Export.id.contains("_recap_"))
.order_by(Export.date.desc())
.dicts()
)
return list(recaps)

View File

@ -35,7 +35,7 @@ logger = logging.getLogger(__name__)
router = APIRouter(tags=[Tags.recordings])
@router.get("/recordings/storage", dependencies=[Depends(allow_any_authenticated())])
@router.get("/recordings/storage", dependencies=[Depends(require_role(["admin"]))])
def get_recordings_storage_usage(request: Request):
recording_stats = request.app.stats_emitter.get_latest_stats()["service"][
"storage"

View File

@ -549,6 +549,14 @@ class WebPushClient(Communicator):
logger.debug(f"Sending camera monitoring push notification for {camera_name}")
for user in self.web_pushers:
if not self._user_has_camera_access(user, camera):
logger.debug(
"Skipping notification for user %s - no access to camera %s",
user,
camera,
)
continue
self.send_push_notification(
user=user,
payload=payload,

View File

@ -10,6 +10,7 @@ from .logger import * # noqa: F403
from .mqtt import * # noqa: F403
from .network import * # noqa: F403
from .proxy import * # noqa: F403
from .recap import * # noqa: F403
from .telemetry import * # noqa: F403
from .tls import * # noqa: F403
from .ui import * # noqa: F403

View File

@ -70,6 +70,7 @@ from .mqtt import MqttConfig
from .network import NetworkingConfig
from .profile import ProfileDefinitionConfig
from .proxy import ProxyConfig
from .recap import RecapConfig
from .telemetry import TelemetryConfig
from .tls import TlsConfig
from .ui import UIConfig
@ -414,6 +415,11 @@ class FrigateConfig(FrigateBaseModel):
title="Proxy",
description="Settings for integrating Frigate behind a reverse proxy that passes authenticated user headers.",
)
recap: RecapConfig = Field(
default_factory=RecapConfig,
title="Recap",
description="Settings for time-stacked recap video generation that composites detected objects onto a clean background.",
)
telemetry: TelemetryConfig = Field(
default_factory=TelemetryConfig,
title="Telemetry",

89
frigate/config/recap.py Normal file
View File

@ -0,0 +1,89 @@
from pydantic import Field, field_validator
from .base import FrigateBaseModel
__all__ = ["RecapConfig"]
class RecapConfig(FrigateBaseModel):
enabled: bool = Field(
default=False,
title="Enable recaps",
description="Allow generation of time-stacked recap videos that composite detected objects onto a clean background.",
)
auto_generate: bool = Field(
default=False,
title="Auto-generate daily",
description="Automatically generate a recap for the previous day at the scheduled time.",
)
schedule_time: str = Field(
default="02:00",
title="Schedule time",
description="Time of day (HH:MM, 24h format) to auto-generate the previous day's recap. Only used when auto_generate is true.",
)
cameras: list[str] = Field(
default=[],
title="Cameras",
description="List of camera names to generate recaps for. Empty list means all cameras.",
)
default_label: str = Field(
default="person",
title="Default object label",
description="The object type to include in recaps.",
)
speed: int = Field(
default=2,
title="Playback speed",
description="Speed multiplier for the output video.",
ge=1,
le=8,
)
max_per_group: int = Field(
default=3,
title="Max events per group",
description="Maximum number of events to composite simultaneously. Higher values pack more into the video but can get crowded.",
ge=1,
le=10,
)
ghost_duration: float = Field(
default=3.0,
title="Ghost visibility duration",
description="How long (in seconds of video time) each detection stays visible when path data is unavailable.",
ge=0.5,
le=30.0,
)
output_fps: int = Field(
default=10,
title="Output frame rate",
description="Frame rate of the generated recap video.",
ge=1,
le=30,
)
video_duration: int = Field(
default=30,
title="Minimum video duration",
description="Minimum length in seconds for the output video. Actual length depends on event count and durations.",
ge=5,
le=300,
)
background_samples: int = Field(
default=30,
title="Background sample count",
description="Number of frames sampled across the time range to build the clean background plate via median.",
ge=5,
le=100,
)
@field_validator("schedule_time")
@classmethod
def validate_schedule_time(cls, v: str) -> str:
parts = v.split(":")
if len(parts) != 2:
raise ValueError("schedule_time must be HH:MM format")
try:
h, m = int(parts[0]), int(parts[1])
except ValueError:
raise ValueError("schedule_time must be HH:MM format")
if not (0 <= h <= 23 and 0 <= m <= 59):
raise ValueError("schedule_time hours must be 0-23 and minutes 0-59")
return v

View File

@ -19,6 +19,7 @@ import numpy as np
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import BirdseyeModeEnum, FfmpegConfig, FrigateConfig
from frigate.const import BASE_DIR, BIRDSEYE_PIPE, INSTALL_DIR, UPDATE_BIRDSEYE_LAYOUT
from frigate.output.ws_auth import ws_has_camera_access
from frigate.util.image import (
SharedMemoryFrameManager,
copy_yuv_to_position,
@ -236,12 +237,14 @@ class BroadcastThread(threading.Thread):
converter: FFMpegConverter,
websocket_server: Any,
stop_event: MpEvent,
config: FrigateConfig,
):
super().__init__()
self.camera = camera
self.converter = converter
self.websocket_server = websocket_server
self.stop_event = stop_event
self.config = config
def run(self) -> None:
while not self.stop_event.is_set():
@ -256,6 +259,7 @@ class BroadcastThread(threading.Thread):
if (
not ws.terminated
and ws.environ["PATH_INFO"] == f"/{self.camera}"
and ws_has_camera_access(ws, self.camera, self.config)
):
try:
ws.send(buf, binary=True)
@ -806,7 +810,11 @@ class Birdseye:
config.birdseye.restream,
)
self.broadcaster = BroadcastThread(
"birdseye", self.converter, websocket_server, stop_event
"birdseye",
self.converter,
websocket_server,
stop_event,
config,
)
self.birdseye_manager = BirdsEyeFrameManager(self.config, stop_event)
self.frame_manager = SharedMemoryFrameManager()

View File

@ -7,7 +7,8 @@ import threading
from multiprocessing.synchronize import Event as MpEvent
from typing import Any
from frigate.config import CameraConfig, FfmpegConfig
from frigate.config import CameraConfig, FfmpegConfig, FrigateConfig
from frigate.output.ws_auth import ws_has_camera_access
logger = logging.getLogger(__name__)
@ -102,12 +103,14 @@ class BroadcastThread(threading.Thread):
converter: FFMpegConverter,
websocket_server: Any,
stop_event: MpEvent,
config: FrigateConfig,
):
super().__init__()
self.camera = camera
self.converter = converter
self.websocket_server = websocket_server
self.stop_event = stop_event
self.config = config
def run(self) -> None:
while not self.stop_event.is_set():
@ -122,6 +125,7 @@ class BroadcastThread(threading.Thread):
if (
not ws.terminated
and ws.environ["PATH_INFO"] == f"/{self.camera}"
and ws_has_camera_access(ws, self.camera, self.config)
):
try:
ws.send(buf, binary=True)
@ -135,7 +139,11 @@ class BroadcastThread(threading.Thread):
class JsmpegCamera:
def __init__(
self, config: CameraConfig, stop_event: MpEvent, websocket_server: Any
self,
config: CameraConfig,
frigate_config: FrigateConfig,
stop_event: MpEvent,
websocket_server: Any,
) -> None:
self.config = config
self.input: queue.Queue[bytes] = queue.Queue(maxsize=config.detect.fps)
@ -154,7 +162,11 @@ class JsmpegCamera:
config.live.quality,
)
self.broadcaster = BroadcastThread(
config.name or "", self.converter, websocket_server, stop_event
config.name or "",
self.converter,
websocket_server,
stop_event,
frigate_config,
)
self.converter.start()

View File

@ -32,6 +32,7 @@ from frigate.const import (
from frigate.output.birdseye import Birdseye
from frigate.output.camera import JsmpegCamera
from frigate.output.preview import PreviewRecorder
from frigate.output.ws_auth import ws_has_camera_access
from frigate.util.image import SharedMemoryFrameManager, get_blank_yuv_frame
from frigate.util.process import FrigateProcess
@ -102,7 +103,7 @@ class OutputProcess(FrigateProcess):
) -> None:
camera_config = self.config.cameras[camera]
jsmpeg_cameras[camera] = JsmpegCamera(
camera_config, self.stop_event, websocket_server
camera_config, self.config, self.stop_event, websocket_server
)
preview_recorders[camera] = PreviewRecorder(camera_config)
preview_write_times[camera] = 0
@ -262,6 +263,7 @@ class OutputProcess(FrigateProcess):
# send camera frame to ffmpeg process if websockets are connected
if any(
ws.environ["PATH_INFO"].endswith(camera)
and ws_has_camera_access(ws, camera, self.config)
for ws in websocket_server.manager
):
# write to the converter for the camera if clients are listening to the specific camera
@ -275,6 +277,7 @@ class OutputProcess(FrigateProcess):
self.config.birdseye.restream
or any(
ws.environ["PATH_INFO"].endswith("birdseye")
and ws_has_camera_access(ws, "birdseye", self.config)
for ws in websocket_server.manager
)
)

43
frigate/output/ws_auth.py Normal file
View File

@ -0,0 +1,43 @@
"""Authorization helpers for JSMPEG websocket clients."""
from typing import Any
from frigate.config import FrigateConfig
from frigate.models import User
def _get_valid_ws_roles(ws: Any, config: FrigateConfig) -> list[str]:
role_header = ws.environ.get("HTTP_REMOTE_ROLE", "")
roles = [
role.strip()
for role in role_header.split(config.proxy.separator)
if role.strip()
]
return [role for role in roles if role in config.auth.roles]
def ws_has_camera_access(ws: Any, camera_name: str, config: FrigateConfig) -> bool:
"""Return True when a websocket client is authorized for the camera path."""
roles = _get_valid_ws_roles(ws, config)
if not roles:
return False
roles_dict = config.auth.roles
# Birdseye is a composite stream, so only users with unrestricted access
# should receive it.
if camera_name == "birdseye":
return any(role == "admin" or not roles_dict.get(role) for role in roles)
all_camera_names = set(config.cameras.keys())
for role in roles:
if role == "admin" or not roles_dict.get(role):
return True
allowed_cameras = User.get_allowed_cameras(role, roles_dict, all_camera_names)
if camera_name in allowed_cameras:
return True
return False

View File

658
frigate/recap/recap.py Normal file
View File

@ -0,0 +1,658 @@
"""Time-stacked recap video generator.
Composites detected people from throughout the day onto a single clean
background. Multiple non-overlapping events play simultaneously so you
can see all the day's activity in a short video.
Each person is extracted from their recording clip using per-event
background subtraction within a spotlight region, producing clean cutouts
without needing a segmentation model.
"""
import datetime
import logging
import os
import re
import subprocess as sp
import threading
import time
from pathlib import Path
from typing import Optional
import cv2
import numpy as np
from peewee import DoesNotExist
from frigate.config import FrigateConfig
from frigate.const import (
CACHE_DIR,
CLIPS_DIR,
EXPORT_DIR,
PROCESS_PRIORITY_LOW,
)
from frigate.models import Event, Export, Recordings
logger = logging.getLogger(__name__)
RECAP_CACHE = os.path.join(CACHE_DIR, "recap")
OUTPUT_CRF = "23"
# bg subtraction within per-event spotlight - threshold can be low
# because the reference frame matches the event's lighting exactly
BG_DIFF_THRESHOLD = 25
DILATE_ITERATIONS = 2
# spotlight params: generous area, bg sub handles the rest
SPOTLIGHT_PAD = 1.5
SPOTLIGHT_BLUR = 25
def _lower_priority():
os.nice(PROCESS_PRIORITY_LOW)
def _get_recording_at(camera: str, ts: float) -> Optional[tuple[str, float]]:
"""Find the recording segment covering a timestamp.
Returns (path, offset_into_file) or None.
"""
try:
rec = (
Recordings.select(Recordings.path, Recordings.start_time)
.where(Recordings.camera == camera)
.where(Recordings.start_time <= ts)
.where(Recordings.end_time >= ts)
.get()
)
return rec.path, ts - float(rec.start_time)
except DoesNotExist:
return None
def _probe_resolution(ffmpeg_path: str, path: str) -> Optional[tuple[int, int]]:
probe = sp.run(
[ffmpeg_path, "-hide_banner", "-i", path, "-f", "null", "-"],
capture_output=True,
timeout=10,
preexec_fn=_lower_priority,
)
match = re.search(r"(\d{2,5})x(\d{2,5})", probe.stderr.decode(errors="replace"))
if not match:
return None
return int(match.group(1)), int(match.group(2))
def _extract_frame(
ffmpeg_path: str, path: str, offset: float, w: int, h: int
) -> Optional[np.ndarray]:
p = sp.run(
[
ffmpeg_path,
"-hide_banner",
"-loglevel",
"error",
"-ss",
f"{offset:.3f}",
"-i",
path,
"-frames:v",
"1",
"-f",
"rawvideo",
"-pix_fmt",
"bgr24",
"pipe:1",
],
capture_output=True,
timeout=15,
preexec_fn=_lower_priority,
)
if p.returncode != 0 or len(p.stdout) == 0:
return None
expected = w * h * 3
if len(p.stdout) < expected:
return None
return np.frombuffer(p.stdout, dtype=np.uint8)[:expected].reshape((h, w, 3))
def _extract_frames_range(
ffmpeg_path: str,
path: str,
offset: float,
duration: float,
fps: int,
w: int,
h: int,
) -> list[np.ndarray]:
"""Pull multiple frames from a recording at a given fps."""
p = sp.run(
[
ffmpeg_path,
"-hide_banner",
"-loglevel",
"error",
"-ss",
f"{offset:.3f}",
"-t",
f"{duration:.3f}",
"-i",
path,
"-vf",
f"fps={fps}",
"-f",
"rawvideo",
"-pix_fmt",
"bgr24",
"pipe:1",
],
capture_output=True,
timeout=max(30, int(duration) + 15),
preexec_fn=_lower_priority,
)
if p.returncode != 0 or len(p.stdout) == 0:
return []
frame_size = w * h * 3
return [
np.frombuffer(p.stdout[i : i + frame_size], dtype=np.uint8).reshape((h, w, 3))
for i in range(0, len(p.stdout) - frame_size + 1, frame_size)
]
def _build_background(
ffmpeg_path: str,
camera: str,
start_time: float,
end_time: float,
sample_count: int,
) -> Optional[np.ndarray]:
"""Median of sampled frames - removes moving objects, keeps the static scene."""
duration = end_time - start_time
step = duration / (sample_count + 1)
resolution = None
frames = []
for i in range(1, sample_count + 1):
ts = start_time + step * i
result = _get_recording_at(camera, ts)
if result is None:
continue
rec_path, offset = result
if not os.path.isfile(rec_path):
continue
if resolution is None:
resolution = _probe_resolution(ffmpeg_path, rec_path)
if resolution is None:
continue
w, h = resolution
frame = _extract_frame(ffmpeg_path, rec_path, offset, w, h)
if frame is not None and frame.shape == (h, w, 3):
frames.append(frame)
if len(frames) < 3:
logger.warning("only got %d bg frames, need 3+", len(frames))
return None
return np.median(np.stack(frames, axis=0), axis=0).astype(np.uint8)
def _relative_box_to_pixels(
box: list[float], w: int, h: int
) -> tuple[int, int, int, int]:
"""Normalized [x, y, w, h] -> pixel [x1, y1, x2, y2]."""
x1 = max(0, int(box[0] * w))
y1 = max(0, int(box[1] * h))
x2 = min(w, int((box[0] + box[2]) * w))
y2 = min(h, int((box[1] + box[3]) * h))
return x1, y1, x2, y2
def _make_spotlight(w: int, h: int, cx: int, cy: int, rx: int, ry: int) -> np.ndarray:
"""Soft elliptical spotlight mask, float32 0-1."""
m = np.zeros((h, w), np.uint8)
cv2.ellipse(m, (cx, cy), (rx, ry), 0, 0, 360, 255, -1)
m = cv2.GaussianBlur(m, (SPOTLIGHT_BLUR, SPOTLIGHT_BLUR), 0)
return m.astype(np.float32) / 255.0
def _person_mask(
frame: np.ndarray, ref_bg: np.ndarray, spotlight: np.ndarray
) -> np.ndarray:
"""Extract person by diffing against the per-event reference frame,
then AND with the spotlight to contain it to the detection area.
"""
diff = cv2.absdiff(frame, ref_bg)
gray = cv2.cvtColor(diff, cv2.COLOR_BGR2GRAY)
_, fg = cv2.threshold(gray, BG_DIFF_THRESHOLD, 255, cv2.THRESH_BINARY)
fg = cv2.dilate(fg, None, iterations=DILATE_ITERATIONS)
fg = cv2.erode(fg, None, iterations=1)
return (fg.astype(np.float32) / 255.0) * spotlight
def _mask_centroid(m: np.ndarray) -> Optional[tuple[int, int]]:
coords = np.argwhere(m > 0.3)
if len(coords) == 0:
return None
return int(coords[:, 1].mean()), int(coords[:, 0].mean())
def _interpolate_path(
path_data: list, t: float, w: int, h: int
) -> Optional[tuple[int, int]]:
"""Interpolate person position from path_data at time t."""
if not path_data or len(path_data) < 1:
return None
prev = None
for coord, ts in path_data:
if ts > t:
if prev is None:
return int(coord[0] * w), int(coord[1] * h)
pc, pt = prev
dt = ts - pt
if dt <= 0:
return int(coord[0] * w), int(coord[1] * h)
f = (t - pt) / dt
ix = pc[0] + (coord[0] - pc[0]) * f
iy = pc[1] + (coord[1] - pc[1]) * f
return int(ix * w), int(iy * h)
prev = (coord, ts)
if prev:
return int(prev[0][0] * w), int(prev[0][1] * h)
return None
def _draw_label(frame: np.ndarray, text: str, x: int, y: int):
font = cv2.FONT_HERSHEY_SIMPLEX
scale = 0.28
thickness = 1
(tw, th), _ = cv2.getTextSize(text, font, scale, thickness)
lx = max(0, min(x - tw // 2, frame.shape[1] - tw - 3))
ly = max(th + 3, min(y, frame.shape[0] - 2))
cv2.rectangle(frame, (lx, ly - th - 2), (lx + tw + 2, ly + 2), (0, 0, 0), -1)
cv2.putText(frame, text, (lx + 1, ly), font, scale, (255, 255, 255), thickness)
def _balance_groups(events: list[dict], max_per: int) -> list[list[dict]]:
"""Spread events across groups so durations are roughly even.
Longest events get their own group first, shorter ones fill in.
"""
by_len = sorted(events, key=lambda e: len(e["frames"]), reverse=True)
groups: list[list[dict]] = []
lengths: list[int] = []
for ev in by_len:
best = None
best_len = float("inf")
for i, g in enumerate(groups):
if len(g) < max_per and lengths[i] < best_len:
best = i
best_len = lengths[i]
if best is not None:
groups[best].append(ev)
lengths[best] = max(lengths[best], len(ev["frames"]))
else:
groups.append([ev])
lengths.append(len(ev["frames"]))
for g in groups:
g.sort(key=lambda e: e["time"])
return groups
class RecapGenerator(threading.Thread):
def __init__(
self,
config: FrigateConfig,
export_id: str,
camera: str,
start_time: float,
end_time: float,
label: str = "person",
):
super().__init__(daemon=True)
self.config = config
self.export_id = export_id
self.camera = camera
self.start_time = start_time
self.end_time = end_time
self.label = label
self.ffmpeg_path = config.ffmpeg.ffmpeg_path
recap_cfg = config.recap
self.output_fps = recap_cfg.output_fps
self.speed = recap_cfg.speed
self.max_per_group = recap_cfg.max_per_group
self.video_duration = recap_cfg.video_duration
self.background_samples = recap_cfg.background_samples
Path(RECAP_CACHE).mkdir(parents=True, exist_ok=True)
Path(os.path.join(CLIPS_DIR, "export")).mkdir(exist_ok=True)
def _get_events(self) -> list[dict]:
return list(
Event.select(
Event.id,
Event.start_time,
Event.end_time,
Event.label,
Event.data,
Event.box,
Event.top_score,
)
.where(Event.camera == self.camera)
.where(Event.label == self.label)
.where(Event.start_time >= self.start_time)
.where(Event.start_time <= self.end_time)
.where(Event.false_positive == False) # noqa: E712
.order_by(Event.start_time.asc())
.dicts()
)
def run(self):
logger.info(
"generating recap for %s (%s to %s)",
self.camera,
datetime.datetime.fromtimestamp(self.start_time).isoformat(),
datetime.datetime.fromtimestamp(self.end_time).isoformat(),
)
wall_start = time.monotonic()
start_dt = datetime.datetime.fromtimestamp(self.start_time)
end_dt = datetime.datetime.fromtimestamp(self.end_time)
export_name = f"{self.camera} recap {start_dt.strftime('%Y-%m-%d')}"
filename = (
f"{self.camera}_recap_{start_dt.strftime('%Y%m%d_%H%M%S')}-"
f"{end_dt.strftime('%Y%m%d_%H%M%S')}_{self.export_id.split('_')[-1]}.mp4"
)
video_path = os.path.join(EXPORT_DIR, filename)
Export.insert(
{
Export.id: self.export_id,
Export.camera: self.camera,
Export.name: export_name,
Export.date: self.start_time,
Export.video_path: video_path,
Export.thumb_path: "",
Export.in_progress: True,
}
).execute()
try:
self._generate(video_path)
except Exception:
logger.exception("recap failed for %s", self.camera)
Path(video_path).unlink(missing_ok=True)
Export.delete().where(Export.id == self.export_id).execute()
return
logger.info(
"recap for %s done in %.1fs -> %s",
self.camera,
time.monotonic() - wall_start,
video_path,
)
def _generate(self, out_path: str):
events = self._get_events()
if not events:
logger.info("no %s events for %s, nothing to do", self.label, self.camera)
Export.delete().where(Export.id == self.export_id).execute()
return
logger.info("found %d %s events", len(events), self.label)
background = _build_background(
self.ffmpeg_path,
self.camera,
self.start_time,
self.end_time,
self.background_samples,
)
if background is None:
logger.error("couldn't build background for %s", self.camera)
Export.delete().where(Export.id == self.export_id).execute()
return
bg_h, bg_w = background.shape[:2]
bg_f = background.astype(np.float32)
# build clip data for each event
prepped = []
for ev in events:
data = ev.get("data") or {}
box = data.get("box") or ev.get("box")
if not box or len(box) != 4:
continue
ev_time = float(ev["start_time"])
ev_end = float(ev.get("end_time") or ev_time)
ev_dur = max(ev_end - ev_time, 0.5)
result = _get_recording_at(self.camera, ev_time)
if result is None:
continue
rec_path, offset = result
if not os.path.isfile(rec_path):
continue
frames = _extract_frames_range(
self.ffmpeg_path,
rec_path,
offset,
ev_dur,
self.output_fps,
bg_w,
bg_h,
)
if len(frames) < 3:
continue
# first frame is from pre-capture - use as per-event bg reference
ref_bg = frames[0]
event_frames = frames[2:]
if not event_frames:
continue
pbox = _relative_box_to_pixels(box, bg_w, bg_h)
ts_str = datetime.datetime.fromtimestamp(ev_time).strftime("%H:%M:%S")
prepped.append(
{
"frames": event_frames,
"ref_bg": ref_bg,
"pbox": pbox,
"path": data.get("path_data"),
"ts_str": ts_str,
"time": ev_time,
}
)
if not prepped:
logger.warning("no usable clips for %s", self.camera)
Export.delete().where(Export.id == self.export_id).execute()
return
groups = _balance_groups(prepped, self.max_per_group)
logger.info(
"%d events -> %d groups (max %d/group)",
len(prepped),
len(groups),
self.max_per_group,
)
# render each group to a temp file, then concat
tmp_dir = os.path.join(RECAP_CACHE, self.export_id)
Path(tmp_dir).mkdir(parents=True, exist_ok=True)
seg_paths = []
for gi, group in enumerate(groups):
max_frames = max(len(e["frames"]) for e in group)
seg_path = os.path.join(tmp_dir, f"seg_{gi:04d}.mp4")
proc = sp.Popen(
[
self.ffmpeg_path,
"-hide_banner",
"-loglevel",
"error",
"-y",
"-f",
"rawvideo",
"-pix_fmt",
"bgr24",
"-s",
f"{bg_w}x{bg_h}",
"-r",
str(self.output_fps * self.speed),
"-i",
"pipe:0",
"-c:v",
"libx264",
"-preset",
"fast",
"-crf",
OUTPUT_CRF,
"-pix_fmt",
"yuv420p",
"-movflags",
"+faststart",
seg_path,
],
stdin=sp.PIPE,
stdout=sp.PIPE,
stderr=sp.PIPE,
preexec_fn=_lower_priority,
)
try:
for fi in range(max_frames):
canvas = bg_f.copy()
label_info = []
for ev in group:
if fi >= len(ev["frames"]):
continue
src = ev["frames"][fi]
src_f = src.astype(np.float32)
bx1, by1, bx2, by2 = ev["pbox"]
bw = bx2 - bx1
bh = by2 - by1
ft = ev["time"] + fi / self.output_fps
pos = None
if ev["path"] and len(ev["path"]) >= 2:
pos = _interpolate_path(ev["path"], ft, bg_w, bg_h)
cx, cy = pos if pos else ((bx1 + bx2) // 2, (by1 + by2) // 2)
rx = max(20, int(bw * SPOTLIGHT_PAD))
ry = max(25, int(bh * SPOTLIGHT_PAD))
sl = _make_spotlight(bg_w, bg_h, cx, cy, rx, ry)
mask = _person_mask(src, ev["ref_bg"], sl)
m3 = mask[:, :, np.newaxis]
canvas = src_f * m3 + canvas * (1.0 - m3)
ctr = _mask_centroid(mask)
if ctr:
label_info.append(
(ev["ts_str"], ctr[0], ctr[1] - int(bh * 0.5))
)
else:
label_info.append((ev["ts_str"], cx, cy - int(bh * 0.5)))
cu8 = canvas.astype(np.uint8)
for ts, lx, ly in label_info:
_draw_label(cu8, ts, lx, ly)
cv2.rectangle(
cu8,
(0, bg_h - 2),
(int(bg_w * fi / max_frames), bg_h),
(0, 180, 255),
-1,
)
proc.stdin.write(cu8.tobytes())
proc.stdin.close()
proc.wait(timeout=120)
except Exception:
proc.kill()
proc.wait()
raise
if proc.returncode == 0:
seg_paths.append(seg_path)
# free memory as we go
for ev in group:
ev["frames"] = None
ev["ref_bg"] = None
if not seg_paths:
logger.error("no segments rendered for %s", self.camera)
Export.delete().where(Export.id == self.export_id).execute()
return
# concat all segments
concat_file = os.path.join(tmp_dir, "concat.txt")
with open(concat_file, "w") as f:
for p in seg_paths:
f.write(f"file '{p}'\n")
sp.run(
[
self.ffmpeg_path,
"-hide_banner",
"-loglevel",
"error",
"-f",
"concat",
"-safe",
"0",
"-i",
concat_file,
"-c",
"copy",
"-movflags",
"+faststart",
"-y",
out_path,
],
capture_output=True,
timeout=300,
preexec_fn=_lower_priority,
)
# cleanup temp files
for p in seg_paths:
Path(p).unlink(missing_ok=True)
Path(concat_file).unlink(missing_ok=True)
Path(tmp_dir).rmdir()
# thumbnail from the middle
thumb_path = os.path.join(CLIPS_DIR, f"export/{self.export_id}.webp")
total_frames = sum(
max(len(e["frames"]) for e in g) if any(e["frames"] for e in g) else 0
for g in groups
)
sp.run(
[
self.ffmpeg_path,
"-hide_banner",
"-loglevel",
"error",
"-i",
out_path,
"-vf",
f"select=eq(n\\,{max(1, total_frames // 2)})",
"-frames:v",
"1",
"-c:v",
"libwebp",
"-y",
thumb_path,
],
capture_output=True,
timeout=30,
preexec_fn=_lower_priority,
)
Export.update({Export.in_progress: False, Export.thumb_path: thumb_path}).where(
Export.id == self.export_id
).execute()

View File

@ -0,0 +1,94 @@
"""Scheduled daily recap generation.
Runs as a background thread, checks once per minute if it's time
to generate recaps for the previous day.
"""
import logging
import random
import string
import threading
import time
from datetime import datetime, timedelta
from frigate.config import FrigateConfig
from frigate.recap.recap import RecapGenerator
logger = logging.getLogger(__name__)
class RecapScheduler(threading.Thread):
"""Triggers daily recap generation at the configured time."""
def __init__(self, config: FrigateConfig):
super().__init__(daemon=True, name="recap_scheduler")
self.config = config
self._last_run_date = None
def run(self):
recap_cfg = self.config.recap
if not recap_cfg.enabled or not recap_cfg.auto_generate:
logger.info("recap scheduler not enabled, exiting")
return
hour, minute = (int(x) for x in recap_cfg.schedule_time.split(":"))
logger.info(
"recap scheduler started, will run daily at %02d:%02d", hour, minute
)
while True:
now = datetime.now()
today = now.date()
# check if it's time and we haven't already run today
if (
now.hour == hour
and now.minute == minute
and self._last_run_date != today
):
self._last_run_date = today
self._generate_all()
# sleep until next minute
time.sleep(60)
def _generate_all(self):
recap_cfg = self.config.recap
yesterday = datetime.now() - timedelta(days=1)
start = yesterday.replace(hour=0, minute=0, second=0, microsecond=0)
end = start + timedelta(days=1)
# figure out which cameras to process
camera_names = (
list(recap_cfg.cameras)
if recap_cfg.cameras
else list(self.config.cameras.keys())
)
logger.info(
"auto-generating recaps for %d cameras (%s)",
len(camera_names),
start.strftime("%Y-%m-%d"),
)
for camera in camera_names:
if camera not in self.config.cameras:
logger.warning("recap: camera %s not found, skipping", camera)
continue
export_id = (
f"{camera}_recap_"
f"{''.join(random.choices(string.ascii_lowercase + string.digits, k=6))}"
)
generator = RecapGenerator(
config=self.config,
export_id=export_id,
camera=camera,
start_time=start.timestamp(),
end_time=end.timestamp(),
label=recap_cfg.default_label,
)
generator.start()
logger.info("recap started for %s (export_id=%s)", camera, export_id)

View File

@ -23,6 +23,26 @@ class TestHttpApp(BaseTestHttp):
response_json = response.json()
assert response_json == self.test_stats
def test_recordings_storage_requires_admin(self):
stats = Mock(spec=StatsEmitter)
stats.get_latest_stats.return_value = self.test_stats
app = super().create_app(stats)
app.storage_maintainer = Mock()
app.storage_maintainer.calculate_camera_usages.return_value = {
"front_door": {"usage": 2.0},
}
with AuthTestClient(app) as client:
response = client.get(
"/recordings/storage",
headers={"remote-user": "viewer", "remote-role": "viewer"},
)
assert response.status_code == 403
response = client.get("/recordings/storage")
assert response.status_code == 200
assert response.json()["front_door"]["usage_percent"] == 25.0
def test_config_set_in_memory_replaces_objects_track_list(self):
self.minimal_config["cameras"]["front_door"]["objects"] = {
"track": ["person", "car"],

View File

@ -219,6 +219,25 @@ class TestHttpApp(BaseTestHttp):
assert len(events) == 1
assert events[0]["id"] == event_id
def test_similarity_search_hides_unauthorized_anchor_event(self):
mock_embeddings = Mock()
self.app.frigate_config.semantic_search.enabled = True
self.app.embeddings = mock_embeddings
with AuthTestClient(self.app) as client:
super().insert_mock_event("hidden.anchor", camera="back_door")
response = client.get(
"/events/search",
params={
"search_type": "similarity",
"event_id": "hidden.anchor",
},
)
assert response.status_code == 404
assert response.json()["message"] == "Event not found"
mock_embeddings.search_thumbnail.assert_not_called()
def test_get_good_event(self):
id = "123456.random"

View File

@ -145,9 +145,12 @@ class TestExecuteFindSimilarObjects(unittest.TestCase):
embeddings=embeddings,
frigate_config=SimpleNamespace(
semantic_search=SimpleNamespace(enabled=semantic_enabled),
cameras={"driveway": object()},
auth=SimpleNamespace(roles={"admin": [], "viewer": ["driveway"]}),
proxy=SimpleNamespace(separator=","),
),
)
return SimpleNamespace(app=app)
return SimpleNamespace(app=app, headers={})
def test_semantic_search_disabled_returns_error(self):
req = self._make_request(semantic_enabled=False)
@ -180,7 +183,7 @@ class TestExecuteFindSimilarObjects(unittest.TestCase):
_execute_find_similar_objects(
req,
{"event_id": "anchor", "cameras": ["nonexistent_cam"]},
allowed_cameras=["nonexistent_cam"],
allowed_cameras=["driveway"],
)
)
self.assertEqual(result["results"], [])

View File

@ -0,0 +1,57 @@
"""Tests for JSMPEG websocket authorization."""
import unittest
from types import SimpleNamespace
from frigate.config import FrigateConfig
from frigate.output.ws_auth import ws_has_camera_access
class TestWsHasCameraAccess(unittest.TestCase):
def setUp(self):
self.config = FrigateConfig(
mqtt={"host": "mqtt"},
auth={"roles": {"limited_user": ["front_door"]}},
cameras={
"front_door": {
"ffmpeg": {
"inputs": [
{"path": "rtsp://10.0.0.1:554/video", "roles": ["detect"]}
]
},
"detect": {"height": 1080, "width": 1920, "fps": 5},
},
"back_door": {
"ffmpeg": {
"inputs": [
{"path": "rtsp://10.0.0.2:554/video", "roles": ["detect"]}
]
},
"detect": {"height": 1080, "width": 1920, "fps": 5},
},
},
)
def _make_ws(self, role: str):
return SimpleNamespace(environ={"HTTP_REMOTE_ROLE": role})
def test_restricted_role_only_gets_allowed_camera(self):
ws = self._make_ws("limited_user")
self.assertTrue(ws_has_camera_access(ws, "front_door", self.config))
self.assertFalse(ws_has_camera_access(ws, "back_door", self.config))
def test_unrestricted_role_can_access_any_camera(self):
ws = self._make_ws("viewer")
self.assertTrue(ws_has_camera_access(ws, "front_door", self.config))
self.assertTrue(ws_has_camera_access(ws, "back_door", self.config))
def test_birdseye_requires_unrestricted_access(self):
self.assertTrue(
ws_has_camera_access(self._make_ws("admin"), "birdseye", self.config)
)
self.assertTrue(
ws_has_camera_access(self._make_ws("viewer"), "birdseye", self.config)
)
self.assertFalse(
ws_has_camera_access(self._make_ws("limited_user"), "birdseye", self.config)
)

269
frigate/test/test_recap.py Normal file
View File

@ -0,0 +1,269 @@
import unittest
from unittest.mock import patch
import numpy as np
from frigate.recap.recap import (
_balance_groups,
_build_background,
_draw_label,
_interpolate_path,
_make_spotlight,
_mask_centroid,
_person_mask,
_relative_box_to_pixels,
)
class TestRelativeBoxConversion(unittest.TestCase):
def test_basic(self):
x1, y1, x2, y2 = _relative_box_to_pixels([0.5, 0.25, 0.1, 0.2], 1920, 1080)
self.assertEqual(x1, 960)
self.assertEqual(y1, 270)
self.assertEqual(x2, 1152)
self.assertEqual(y2, 486)
def test_clamps(self):
_, _, x2, y2 = _relative_box_to_pixels([0.9, 0.9, 0.2, 0.2], 100, 100)
self.assertEqual(x2, 100)
self.assertEqual(y2, 100)
def test_full_frame(self):
x1, y1, x2, y2 = _relative_box_to_pixels([0.0, 0.0, 1.0, 1.0], 1920, 1080)
self.assertEqual((x1, y1, x2, y2), (0, 0, 1920, 1080))
def test_real_frigate_data(self):
x1, y1, x2, y2 = _relative_box_to_pixels([0.65, 0.117, 0.025, 0.089], 640, 360)
self.assertEqual(x1, 416)
self.assertEqual(y1, 42)
self.assertGreater(x2, x1)
self.assertGreater(y2, y1)
class TestSpotlight(unittest.TestCase):
def test_shape_and_range(self):
sl = _make_spotlight(100, 100, 50, 50, 20, 20)
self.assertEqual(sl.shape, (100, 100))
self.assertGreater(sl[50, 50], 0.5)
self.assertAlmostEqual(sl[0, 0], 0.0, places=1)
def test_off_center(self):
sl = _make_spotlight(200, 200, 10, 10, 15, 15)
self.assertGreater(sl[10, 10], 0.5)
self.assertAlmostEqual(sl[199, 199], 0.0, places=1)
class TestPersonMask(unittest.TestCase):
def test_identical_frames_empty_mask(self):
frame = np.full((100, 100, 3), 128, np.uint8)
ref = frame.copy()
sl = _make_spotlight(100, 100, 50, 50, 30, 30)
mask = _person_mask(frame, ref, sl)
self.assertEqual(mask.sum(), 0.0)
def test_different_region_shows_fg(self):
ref = np.full((100, 100, 3), 50, np.uint8)
frame = ref.copy()
frame[40:60, 40:60] = 200 # person-sized bright block
sl = _make_spotlight(100, 100, 50, 50, 30, 30)
mask = _person_mask(frame, ref, sl)
self.assertGreater(mask[50, 50], 0.0)
class TestMaskCentroid(unittest.TestCase):
def test_centered_blob(self):
m = np.zeros((100, 100), np.float32)
m[40:60, 40:60] = 1.0
cx, cy = _mask_centroid(m)
self.assertAlmostEqual(cx, 50, delta=2)
self.assertAlmostEqual(cy, 50, delta=2)
def test_empty_mask(self):
m = np.zeros((100, 100), np.float32)
self.assertIsNone(_mask_centroid(m))
class TestInterpolatePath(unittest.TestCase):
def test_empty(self):
self.assertIsNone(_interpolate_path([], 1.0, 100, 100))
self.assertIsNone(_interpolate_path(None, 1.0, 100, 100))
def test_midpoint(self):
path = [((0.0, 0.0), 10.0), ((1.0, 1.0), 20.0)]
self.assertEqual(_interpolate_path(path, 15.0, 100, 100), (50, 50))
def test_before_first(self):
path = [((0.25, 0.75), 10.0), ((0.5, 0.5), 20.0)]
self.assertEqual(_interpolate_path(path, 5.0, 100, 100), (25, 75))
def test_after_last(self):
path = [((0.1, 0.2), 10.0), ((0.3, 0.4), 20.0)]
self.assertEqual(_interpolate_path(path, 30.0, 1000, 1000), (300, 400))
def test_real_path(self):
path = [
([0.6219, 0.2028], 1774057715.808),
([0.6297, 0.2028], 1774057716.008),
([0.7078, 0.2167], 1774057720.019),
]
pos = _interpolate_path(path, 1774057718.0, 640, 360)
self.assertIsNotNone(pos)
self.assertGreater(pos[0], int(0.6297 * 640))
self.assertLess(pos[0], int(0.7078 * 640))
class TestDrawLabel(unittest.TestCase):
def test_draws(self):
f = np.zeros((200, 300, 3), np.uint8)
_draw_label(f, "12:34:56", 100, 100)
self.assertFalse(np.all(f == 0))
def test_edge(self):
f = np.zeros((50, 50, 3), np.uint8)
_draw_label(f, "test", 0, 5)
self.assertFalse(np.all(f == 0))
class TestBalanceGroups(unittest.TestCase):
def test_single_event(self):
events = [{"frames": [1] * 10, "time": 0}]
groups = _balance_groups(events, 3)
self.assertEqual(len(groups), 1)
self.assertEqual(len(groups[0]), 1)
def test_even_split(self):
events = [{"frames": [1] * 100, "time": i} for i in range(6)]
groups = _balance_groups(events, 3)
self.assertEqual(len(groups), 2)
self.assertEqual(len(groups[0]), 3)
self.assertEqual(len(groups[1]), 3)
def test_long_events_packed_with_short(self):
events = [
{"frames": [1] * 500, "time": 0},
{"frames": [1] * 400, "time": 1},
{"frames": [1] * 10, "time": 2},
{"frames": [1] * 10, "time": 3},
]
groups = _balance_groups(events, 2)
# the algorithm packs into the shortest available group,
# so 500 and 400 end up together (both long), short ones together
self.assertEqual(len(groups), 2)
all_lengths = sorted(
[len(e["frames"]) for g in groups for e in g], reverse=True
)
self.assertEqual(all_lengths, [500, 400, 10, 10])
def test_sorted_by_time(self):
events = [
{"frames": [1] * 10, "time": 30},
{"frames": [1] * 10, "time": 10},
{"frames": [1] * 10, "time": 20},
]
groups = _balance_groups(events, 3)
times = [e["time"] for e in groups[0]]
self.assertEqual(times, sorted(times))
class TestBuildBackground(unittest.TestCase):
@patch("frigate.recap.recap._extract_frame")
@patch("frigate.recap.recap._probe_resolution")
@patch("frigate.recap.recap._get_recording_at")
def test_too_few(self, mock_rec, mock_probe, mock_extract):
mock_rec.return_value = ("/fake.mp4", 0.0)
mock_probe.return_value = (100, 100)
mock_extract.return_value = None
self.assertIsNone(_build_background("/usr/bin/ffmpeg", "cam", 0.0, 100.0, 10))
@patch("frigate.recap.recap.os.path.isfile", return_value=True)
@patch("frigate.recap.recap._extract_frame")
@patch("frigate.recap.recap._probe_resolution")
@patch("frigate.recap.recap._get_recording_at")
def test_median(self, mock_rec, mock_probe, mock_extract, mock_isfile):
mock_rec.return_value = ("/fake.mp4", 0.0)
mock_probe.return_value = (4, 4)
frames = [np.full((4, 4, 3), v, np.uint8) for v in [0, 100, 200]]
idx = [0]
def side_effect(*a, **kw):
r = frames[idx[0] % 3]
idx[0] += 1
return r
mock_extract.side_effect = side_effect
result = _build_background("/usr/bin/ffmpeg", "cam", 0.0, 100.0, 5)
self.assertIsNotNone(result)
self.assertEqual(result[0, 0, 0], 100)
class TestRecapConfig(unittest.TestCase):
def test_defaults(self):
from frigate.config.recap import RecapConfig
cfg = RecapConfig()
self.assertFalse(cfg.enabled)
self.assertFalse(cfg.auto_generate)
self.assertEqual(cfg.schedule_time, "02:00")
self.assertEqual(cfg.cameras, [])
self.assertEqual(cfg.default_label, "person")
self.assertEqual(cfg.speed, 2)
self.assertEqual(cfg.max_per_group, 3)
self.assertEqual(cfg.video_duration, 30)
def test_custom_values(self):
from frigate.config.recap import RecapConfig
cfg = RecapConfig(
enabled=True,
auto_generate=True,
schedule_time="03:30",
cameras=["front", "back"],
speed=4,
max_per_group=5,
)
self.assertTrue(cfg.auto_generate)
self.assertEqual(cfg.schedule_time, "03:30")
self.assertEqual(cfg.cameras, ["front", "back"])
self.assertEqual(cfg.speed, 4)
self.assertEqual(cfg.max_per_group, 5)
def test_validation_ranges(self):
from pydantic import ValidationError
from frigate.config.recap import RecapConfig
with self.assertRaises(ValidationError):
RecapConfig(ghost_duration=0.1)
with self.assertRaises(ValidationError):
RecapConfig(output_fps=60)
with self.assertRaises(ValidationError):
RecapConfig(video_duration=2)
with self.assertRaises(ValidationError):
RecapConfig(background_samples=2)
with self.assertRaises(ValidationError):
RecapConfig(speed=0)
with self.assertRaises(ValidationError):
RecapConfig(speed=10)
with self.assertRaises(ValidationError):
RecapConfig(max_per_group=0)
def test_schedule_time_validation(self):
from pydantic import ValidationError
from frigate.config.recap import RecapConfig
with self.assertRaises(ValidationError):
RecapConfig(schedule_time="25:00")
with self.assertRaises(ValidationError):
RecapConfig(schedule_time="abc")
with self.assertRaises(ValidationError):
RecapConfig(schedule_time="12:60")
# valid edge cases
cfg = RecapConfig(schedule_time="00:00")
self.assertEqual(cfg.schedule_time, "00:00")
cfg = RecapConfig(schedule_time="23:59")
self.assertEqual(cfg.schedule_time, "23:59")
if __name__ == "__main__":
unittest.main()

View File

@ -0,0 +1,29 @@
"""Tests for camera monitoring notification authorization."""
import unittest
from types import SimpleNamespace
from unittest.mock import MagicMock
from frigate.comms.webpush import WebPushClient
class TestCameraMonitoringNotifications(unittest.TestCase):
def test_send_camera_monitoring_filters_by_camera_access(self):
client = WebPushClient.__new__(WebPushClient)
client.config = SimpleNamespace(
cameras={"front_door": SimpleNamespace(friendly_name=None)}
)
client.web_pushers = {"allowed": [], "denied": []}
client.user_cameras = {"allowed": {"front_door"}, "denied": set()}
client.check_registrations = MagicMock()
client.cleanup_registrations = MagicMock()
client.send_push_notification = MagicMock()
client.send_camera_monitoring(
{"camera": "front_door", "message": "Monitoring condition met"}
)
self.assertEqual(client.send_push_notification.call_count, 1)
self.assertEqual(
client.send_push_notification.call_args.kwargs["user"], "allowed"
)

View File

@ -24,8 +24,12 @@ from frigate.log import redirect_output_to_logger, suppress_stderr_during
from frigate.models import Event, Recordings, ReviewSegment
from frigate.types import ModelStatusTypesEnum
from frigate.util.downloader import ModelDownloader
from frigate.util.file import get_event_thumbnail_bytes
from frigate.util.image import get_image_from_recording
from frigate.util.file import get_event_thumbnail_bytes, load_event_snapshot_image
from frigate.util.image import (
calculate_region,
get_image_from_recording,
relative_box_to_absolute,
)
from frigate.util.process import FrigateProcess
BATCH_SIZE = 16
@ -713,7 +717,7 @@ def collect_object_classification_examples(
This function:
1. Queries events for the specified label
2. Selects 100 balanced events across different cameras and times
3. Retrieves thumbnails for selected events (with 33% center crop applied)
3. Crops each event's clean snapshot around the object bounding box
4. Selects 24 most visually distinct thumbnails
5. Saves to dataset directory
@ -832,66 +836,106 @@ def _select_balanced_events(
def _extract_event_thumbnails(events: list[Event], output_dir: str) -> list[str]:
"""
Extract thumbnails from events and save to disk.
Extract a training image for each event.
Preferred path: load the full-frame clean snapshot and crop around the
stored bounding box with the same calculate_region(..., max(w, h), 1.0)
call the live ObjectClassificationProcessor uses, so wizard examples
are framed like inference-time inputs.
Fallback: if no clean snapshot exists (snapshots disabled, or only a
legacy annotated JPG is on disk), center-crop the stored thumbnail
using a step ladder sized from the box/region area ratio.
Args:
events: List of Event objects
output_dir: Directory to save thumbnails
output_dir: Directory to save crops
Returns:
List of paths to successfully extracted thumbnail images
List of paths to successfully extracted images
"""
thumbnail_paths = []
image_paths = []
for idx, event in enumerate(events):
try:
thumbnail_bytes = get_event_thumbnail_bytes(event)
img = _load_event_classification_crop(event)
if img is None:
continue
if thumbnail_bytes:
nparr = np.frombuffer(thumbnail_bytes, np.uint8)
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if img is not None:
height, width = img.shape[:2]
crop_size = 1.0
if event.data and "box" in event.data and "region" in event.data:
box = event.data["box"]
region = event.data["region"]
if len(box) == 4 and len(region) == 4:
box_w, box_h = box[2], box[3]
region_w, region_h = region[2], region[3]
box_area = (box_w * box_h) / (region_w * region_h)
if box_area < 0.05:
crop_size = 0.4
elif box_area < 0.10:
crop_size = 0.5
elif box_area < 0.20:
crop_size = 0.65
elif box_area < 0.35:
crop_size = 0.80
else:
crop_size = 0.95
crop_width = int(width * crop_size)
crop_height = int(height * crop_size)
x1 = (width - crop_width) // 2
y1 = (height - crop_height) // 2
x2 = x1 + crop_width
y2 = y1 + crop_height
cropped = img[y1:y2, x1:x2]
resized = cv2.resize(cropped, (224, 224))
output_path = os.path.join(output_dir, f"thumbnail_{idx:04d}.jpg")
cv2.imwrite(output_path, resized)
thumbnail_paths.append(output_path)
resized = cv2.resize(img, (224, 224))
output_path = os.path.join(output_dir, f"thumbnail_{idx:04d}.jpg")
cv2.imwrite(output_path, resized)
image_paths.append(output_path)
except Exception as e:
logger.debug(f"Failed to extract thumbnail for event {event.id}: {e}")
logger.debug(f"Failed to extract image for event {event.id}: {e}")
continue
return thumbnail_paths
return image_paths
def _load_event_classification_crop(event: Event) -> np.ndarray | None:
"""Prefer a snapshot-based object crop; fall back to a center-cropped thumbnail."""
if event.data and "box" in event.data:
snapshot, _ = load_event_snapshot_image(event, clean_only=True)
if snapshot is not None:
abs_box = relative_box_to_absolute(snapshot.shape, event.data["box"])
if abs_box is not None:
xmin, ymin, xmax, ymax = abs_box
box_w = xmax - xmin
box_h = ymax - ymin
if box_w > 0 and box_h > 0:
x1, y1, x2, y2 = calculate_region(
snapshot.shape,
xmin,
ymin,
xmax,
ymax,
max(box_w, box_h),
1.0,
)
cropped = snapshot[y1:y2, x1:x2]
if cropped.size > 0:
return cropped
thumbnail_bytes = get_event_thumbnail_bytes(event)
if not thumbnail_bytes:
return None
nparr = np.frombuffer(thumbnail_bytes, np.uint8)
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if img is None or img.size == 0:
return None
height, width = img.shape[:2]
crop_size = 1.0
if event.data and "box" in event.data and "region" in event.data:
box = event.data["box"]
region = event.data["region"]
if len(box) == 4 and len(region) == 4:
box_w, box_h = box[2], box[3]
region_w, region_h = region[2], region[3]
box_area = (box_w * box_h) / (region_w * region_h)
if box_area < 0.05:
crop_size = 0.4
elif box_area < 0.10:
crop_size = 0.5
elif box_area < 0.20:
crop_size = 0.65
elif box_area < 0.35:
crop_size = 0.80
else:
crop_size = 0.95
crop_width = int(width * crop_size)
crop_height = int(height * crop_size)
x1 = (width - crop_width) // 2
y1 = (height - crop_height) // 2
cropped = img[y1 : y1 + crop_height, x1 : x1 + crop_width]
if cropped.size == 0:
return None
return cropped

View File

@ -726,7 +726,20 @@ def ffprobe_stream(ffmpeg, path: str, detailed: bool = False) -> sp.CompletedPro
if detailed and format_entries:
cmd.extend(["-show_entries", f"format={format_entries}"])
cmd.extend(["-loglevel", "error", clean_path])
return sp.run(cmd, capture_output=True)
try:
return sp.run(cmd, capture_output=True, timeout=6)
except sp.TimeoutExpired as e:
logger.info(
"ffprobe timed out while probing %s (transport=%s)",
clean_camera_user_pass(path),
rtsp_transport or "default",
)
return sp.CompletedProcess(
args=cmd,
returncode=1,
stdout=e.stdout or b"",
stderr=(e.stderr or b"") + b"\nffprobe timed out",
)
result = run()
@ -832,11 +845,23 @@ async def get_video_properties(
"-show_streams",
url,
]
proc = None
try:
proc = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, _ = await proc.communicate()
try:
stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=6)
except asyncio.TimeoutError:
logger.info(
"ffprobe timed out while probing %s (transport=%s)",
clean_camera_user_pass(url),
rtsp_transport or "default",
)
proc.kill()
await proc.wait()
return False, 0, 0, None, -1
if proc.returncode != 0:
return False, 0, 0, None, -1

View File

@ -17,6 +17,9 @@ import { useUserPersistence } from "@/hooks/use-user-persistence";
import { Skeleton } from "../ui/skeleton";
import { Button } from "../ui/button";
import { FaCircleCheck } from "react-icons/fa6";
import { FaExclamationTriangle } from "react-icons/fa";
import { MdOutlinePersonSearch } from "react-icons/md";
import { ThreatLevel } from "@/types/review";
import { cn } from "@/lib/utils";
import { useTranslation } from "react-i18next";
import { getTranslatedLabel } from "@/utils/i18n";
@ -127,6 +130,11 @@ export function AnimatedEventCard({
true,
);
const threatLevel = useMemo<ThreatLevel>(
() => (event.data.metadata?.potential_threat_level ?? 0) as ThreatLevel,
[event],
);
const aspectRatio = useMemo(() => {
if (
!config ||
@ -152,7 +160,15 @@ export function AnimatedEventCard({
<Tooltip>
<TooltipTrigger asChild>
<Button
className="pointer-events-none absolute left-2 top-1 z-40 bg-gray-500 bg-gradient-to-br from-gray-400 to-gray-500 opacity-0 transition-opacity group-hover:pointer-events-auto group-hover:opacity-100"
className={cn(
"absolute left-2 top-1 z-40 transition-opacity",
threatLevel === ThreatLevel.SECURITY_CONCERN &&
"pointer-events-auto bg-severity_alert opacity-100 hover:bg-severity_alert",
threatLevel === ThreatLevel.NEEDS_REVIEW &&
"pointer-events-auto bg-severity_detection opacity-100 hover:bg-severity_detection",
threatLevel === ThreatLevel.NORMAL &&
"pointer-events-none bg-gray-500 bg-gradient-to-br from-gray-400 to-gray-500 opacity-0 group-hover:pointer-events-auto group-hover:opacity-100",
)}
size="xs"
aria-label={t("markAsReviewed")}
onClick={async () => {
@ -160,7 +176,13 @@ export function AnimatedEventCard({
updateEvents();
}}
>
<FaCircleCheck className="size-3 text-white" />
{threatLevel === ThreatLevel.SECURITY_CONCERN ? (
<FaExclamationTriangle className="size-3 text-white" />
) : threatLevel === ThreatLevel.NEEDS_REVIEW ? (
<MdOutlinePersonSearch className="size-3 text-white" />
) : (
<FaCircleCheck className="size-3 text-white" />
)}
</Button>
</TooltipTrigger>
<TooltipContent>{t("markAsReviewed")}</TooltipContent>

View File

@ -0,0 +1,166 @@
import { useCallback, useState } from "react";
import {
Dialog,
DialogContent,
DialogFooter,
DialogHeader,
DialogTitle,
} from "../ui/dialog";
import { Button } from "../ui/button";
import { Label } from "../ui/label";
import { RadioGroup, RadioGroupItem } from "../ui/radio-group";
import { Input } from "../ui/input";
import { SelectSeparator } from "../ui/select";
import axios from "axios";
import { toast } from "sonner";
import { isDesktop } from "react-device-detect";
import { Drawer, DrawerContent } from "../ui/drawer";
import ActivityIndicator from "../indicators/activity-indicator";
const RECAP_PERIODS = ["24", "12", "8", "4", "1"] as const;
type RecapPeriod = (typeof RECAP_PERIODS)[number];
type RecapDialogProps = {
camera: string;
open: boolean;
onOpenChange: (open: boolean) => void;
};
export default function RecapDialog({
camera,
open,
onOpenChange,
}: RecapDialogProps) {
const [selectedPeriod, setSelectedPeriod] = useState<RecapPeriod>("24");
const [label, setLabel] = useState("person");
const [isGenerating, setIsGenerating] = useState(false);
const onGenerate = useCallback(() => {
const now = Date.now() / 1000;
const hours = parseInt(selectedPeriod);
const startTime = now - hours * 3600;
setIsGenerating(true);
axios
.post(`recap/${camera}`, null, {
params: {
start_time: startTime,
end_time: now,
label,
},
})
.then((response) => {
if (response.status === 200 && response.data.success) {
toast.success("Recap generation started", {
position: "top-center",
description: "Check Exports when it's done.",
});
onOpenChange(false);
}
})
.catch((error) => {
const msg =
error.response?.data?.message ||
error.response?.data?.detail ||
"Unknown error";
toast.error(`Recap failed: ${msg}`, { position: "top-center" });
})
.finally(() => {
setIsGenerating(false);
});
}, [camera, selectedPeriod, label, onOpenChange]);
const Overlay = isDesktop ? Dialog : Drawer;
const Content = isDesktop ? DialogContent : DrawerContent;
return (
<Overlay open={open} onOpenChange={onOpenChange}>
<Content
className={
isDesktop
? "sm:rounded-lg md:rounded-2xl"
: "mx-4 rounded-lg px-4 pb-4 md:rounded-2xl"
}
>
<div className="w-full">
{isDesktop && (
<>
<DialogHeader>
<DialogTitle>Generate Recap</DialogTitle>
</DialogHeader>
<SelectSeparator className="my-4 bg-secondary" />
</>
)}
<div className={`flex flex-col gap-4 ${isDesktop ? "" : "mt-4"}`}>
<Label className="text-sm font-medium">Time period</Label>
<RadioGroup
className="flex flex-col gap-3"
defaultValue="24"
onValueChange={(v) => setSelectedPeriod(v as RecapPeriod)}
>
{RECAP_PERIODS.map((period) => (
<div key={period} className="flex items-center gap-2">
<RadioGroupItem
className={
period === selectedPeriod
? "bg-selected from-selected/50 to-selected/90 text-selected"
: "bg-secondary from-secondary/50 to-secondary/90 text-secondary"
}
id={`recap-${period}`}
value={period}
/>
<Label
className="cursor-pointer"
htmlFor={`recap-${period}`}
>
Last {period} {parseInt(period) === 1 ? "hour" : "hours"}
</Label>
</div>
))}
</RadioGroup>
<div className="mt-2">
<Label className="text-sm text-secondary-foreground">
Object type
</Label>
<Input
className="text-md mt-2"
type="text"
value={label}
onChange={(e) => setLabel(e.target.value)}
placeholder="person"
/>
</div>
</div>
{isDesktop && <SelectSeparator className="my-4 bg-secondary" />}
<DialogFooter
className={isDesktop ? "" : "mt-6 flex flex-col-reverse gap-4"}
>
<div
className={`cursor-pointer p-2 text-center ${isDesktop ? "" : "w-full"}`}
onClick={() => onOpenChange(false)}
>
Cancel
</div>
<Button
className={isDesktop ? "" : "w-full"}
variant="select"
size="sm"
disabled={isGenerating}
onClick={onGenerate}
>
{isGenerating && (
<ActivityIndicator className="mr-2 h-4 w-4" />
)}
Generate Recap
</Button>
</DialogFooter>
</div>
</Content>
</Overlay>
);
}

View File

@ -389,7 +389,7 @@ export default function LiveCameraView({
return "mse";
}, [lowBandwidth, mic, webRTC, isRestreamed]);
useKeyboardListener(["m"], (key, modifiers) => {
useKeyboardListener(["m", "Escape"], (key, modifiers) => {
if (!modifiers.down) {
return true;
}
@ -407,6 +407,12 @@ export default function LiveCameraView({
return true;
}
break;
case "Escape":
if (!fullscreen) {
navigate(-1);
return true;
}
break;
}
return false;