feat: add daily recap video generation

Adds a new recap feature that composites detected people from throughout
the day onto a clean background, producing a short summary video of all
activity for a given camera.

How it works:
- Builds a clean background plate via median of sampled frames
- Extracts clip frames for each person event from recordings
- Uses per-event background subtraction (first frame of clip as reference)
  within a soft spotlight region to isolate the person
- Groups non-overlapping events to play simultaneously
- Balances groups by duration so the video stays even
- Renders at 2x speed, stitches groups into final output

New files:
- frigate/recap/ — core generation module
- frigate/api/recap.py — POST /recap/{camera}, GET /recap/{camera}
- frigate/config/recap.py — recap config section (enabled, fps, etc)
- frigate/test/test_recap.py — unit tests
- web/src/components/overlay/RecapDialog.tsx — UI component (not yet wired)

Config example:
  recap:
    enabled: true
    default_label: person
    output_fps: 10
    video_duration: 30
    background_samples: 30

Relates to #54
This commit is contained in:
ryzendigo 2026-03-21 16:36:39 +08:00
parent 6d2b84e202
commit 717b878956
10 changed files with 1201 additions and 0 deletions

View File

@ -15,4 +15,5 @@ class Tags(Enum):
notifications = "Notifications"
preview = "Preview"
recordings = "Recordings"
recap = "Recap"
review = "Review"

View File

@ -25,6 +25,7 @@ from frigate.api import (
motion_search,
notification,
preview,
recap,
record,
review,
)
@ -138,6 +139,7 @@ def create_fastapi_app(
app.include_router(preview.router)
app.include_router(notification.router)
app.include_router(export.router)
app.include_router(recap.router)
app.include_router(event.router)
app.include_router(media.router)
app.include_router(motion_search.router)

100
frigate/api/recap.py Normal file
View File

@ -0,0 +1,100 @@
"""Recap API endpoints."""
import logging
import random
import string
from typing import Optional
from fastapi import APIRouter, Depends, Request
from fastapi.responses import JSONResponse
from frigate.api.auth import require_camera_access, require_role
from frigate.api.defs.tags import Tags
from frigate.models import Export
from frigate.recap.recap import RecapGenerator
logger = logging.getLogger(__name__)
router = APIRouter(tags=[Tags.recap])
@router.post(
"/recap/{camera_name}",
summary="Generate a time-stacked recap video",
description="Creates a video showing all detected objects from the given time range "
"composited onto a clean background. Each detection appears at its real "
"position with a timestamp label.",
)
def generate_recap(
request: Request,
camera_name: str,
start_time: float,
end_time: float,
label: Optional[str] = None,
_: str = Depends(require_role(["admin"])),
):
config = request.app.frigate_config
if not config.recap.enabled:
return JSONResponse(
content={
"success": False,
"message": "recap generation is not enabled in config",
},
status_code=400,
)
if camera_name not in config.cameras:
return JSONResponse(
content={"success": False, "message": f"unknown camera: {camera_name}"},
status_code=404,
)
if end_time <= start_time:
return JSONResponse(
content={"success": False, "message": "end_time must be after start_time"},
status_code=400,
)
use_label = label or config.recap.default_label
export_id = (
f"{camera_name}_recap_"
f"{''.join(random.choices(string.ascii_lowercase + string.digits, k=6))}"
)
generator = RecapGenerator(
config=config,
export_id=export_id,
camera=camera_name,
start_time=start_time,
end_time=end_time,
label=use_label,
)
generator.start()
return JSONResponse(
content={
"success": True,
"message": "recap generation started",
"export_id": export_id,
}
)
@router.get(
"/recap/{camera_name}",
summary="List recap exports for a camera",
)
def get_recaps(
request: Request,
camera_name: str,
_: str = Depends(require_camera_access()),
):
recaps = (
Export.select()
.where(Export.camera == camera_name)
.where(Export.id.contains("_recap_"))
.order_by(Export.date.desc())
.dicts()
)
return list(recaps)

View File

@ -10,6 +10,7 @@ from .logger import * # noqa: F403
from .mqtt import * # noqa: F403
from .network import * # noqa: F403
from .proxy import * # noqa: F403
from .recap import * # noqa: F403
from .telemetry import * # noqa: F403
from .tls import * # noqa: F403
from .ui import * # noqa: F403

View File

@ -69,6 +69,7 @@ from .mqtt import MqttConfig
from .network import NetworkingConfig
from .profile import ProfileDefinitionConfig
from .proxy import ProxyConfig
from .recap import RecapConfig
from .telemetry import TelemetryConfig
from .tls import TlsConfig
from .ui import UIConfig
@ -413,6 +414,11 @@ class FrigateConfig(FrigateBaseModel):
title="Proxy",
description="Settings for integrating Frigate behind a reverse proxy that passes authenticated user headers.",
)
recap: RecapConfig = Field(
default_factory=RecapConfig,
title="Recap",
description="Settings for time-stacked recap video generation that composites detected objects onto a clean background.",
)
telemetry: TelemetryConfig = Field(
default_factory=TelemetryConfig,
title="Telemetry",

46
frigate/config/recap.py Normal file
View File

@ -0,0 +1,46 @@
from pydantic import Field
from .base import FrigateBaseModel
__all__ = ["RecapConfig"]
class RecapConfig(FrigateBaseModel):
enabled: bool = Field(
default=False,
title="Enable recaps",
description="Allow generation of time-stacked recap videos that composite detected objects onto a clean background.",
)
default_label: str = Field(
default="person",
title="Default object label",
description="The object type to include in recaps by default.",
)
ghost_duration: float = Field(
default=3.0,
title="Ghost visibility duration",
description="How long (in seconds) each detected object stays visible on the recap video.",
ge=0.5,
le=30.0,
)
output_fps: int = Field(
default=10,
title="Output frame rate",
description="Frame rate of the generated recap video.",
ge=1,
le=30,
)
video_duration: int = Field(
default=30,
title="Video duration",
description="Target length in seconds for the output video. The full time range is compressed into this duration.",
ge=5,
le=300,
)
background_samples: int = Field(
default=30,
title="Background sample count",
description="Number of frames sampled across the time range to build the clean background plate via median.",
ge=5,
le=100,
)

View File

658
frigate/recap/recap.py Normal file
View File

@ -0,0 +1,658 @@
"""Time-stacked recap video generator.
Composites detected people from throughout the day onto a single clean
background. Multiple non-overlapping events play simultaneously so you
can see all the day's activity in a short video.
Each person is extracted from their recording clip using per-event
background subtraction within a spotlight region, producing clean cutouts
without needing a segmentation model.
"""
import datetime
import logging
import os
import re
import subprocess as sp
import threading
import time
from pathlib import Path
from typing import Optional
import cv2
import numpy as np
from peewee import DoesNotExist
from frigate.config import FrigateConfig
from frigate.const import (
CACHE_DIR,
CLIPS_DIR,
EXPORT_DIR,
PROCESS_PRIORITY_LOW,
)
from frigate.models import Event, Export, Recordings
logger = logging.getLogger(__name__)
RECAP_CACHE = os.path.join(CACHE_DIR, "recap")
OUTPUT_CRF = "23"
# bg subtraction within per-event spotlight — threshold can be low
# because the reference frame matches the event's lighting exactly
BG_DIFF_THRESHOLD = 25
DILATE_ITERATIONS = 2
# spotlight params: generous area, bg sub handles the rest
SPOTLIGHT_PAD = 1.5
SPOTLIGHT_BLUR = 25
def _lower_priority():
os.nice(PROCESS_PRIORITY_LOW)
def _get_recording_at(camera: str, ts: float) -> Optional[tuple[str, float]]:
"""Find the recording segment covering a timestamp.
Returns (path, offset_into_file) or None.
"""
try:
rec = (
Recordings.select(Recordings.path, Recordings.start_time)
.where(Recordings.camera == camera)
.where(Recordings.start_time <= ts)
.where(Recordings.end_time >= ts)
.get()
)
return rec.path, ts - float(rec.start_time)
except DoesNotExist:
return None
def _probe_resolution(ffmpeg_path: str, path: str) -> Optional[tuple[int, int]]:
probe = sp.run(
[ffmpeg_path, "-hide_banner", "-i", path, "-f", "null", "-"],
capture_output=True,
timeout=10,
preexec_fn=_lower_priority,
)
match = re.search(r"(\d{2,5})x(\d{2,5})", probe.stderr.decode(errors="replace"))
if not match:
return None
return int(match.group(1)), int(match.group(2))
def _extract_frame(
ffmpeg_path: str, path: str, offset: float, w: int, h: int
) -> Optional[np.ndarray]:
p = sp.run(
[
ffmpeg_path,
"-hide_banner",
"-loglevel",
"error",
"-ss",
f"{offset:.3f}",
"-i",
path,
"-frames:v",
"1",
"-f",
"rawvideo",
"-pix_fmt",
"bgr24",
"pipe:1",
],
capture_output=True,
timeout=15,
preexec_fn=_lower_priority,
)
if p.returncode != 0 or len(p.stdout) == 0:
return None
expected = w * h * 3
if len(p.stdout) < expected:
return None
return np.frombuffer(p.stdout, dtype=np.uint8)[:expected].reshape((h, w, 3))
def _extract_frames_range(
ffmpeg_path: str,
path: str,
offset: float,
duration: float,
fps: int,
w: int,
h: int,
) -> list[np.ndarray]:
"""Pull multiple frames from a recording at a given fps."""
p = sp.run(
[
ffmpeg_path,
"-hide_banner",
"-loglevel",
"error",
"-ss",
f"{offset:.3f}",
"-t",
f"{duration:.3f}",
"-i",
path,
"-vf",
f"fps={fps}",
"-f",
"rawvideo",
"-pix_fmt",
"bgr24",
"pipe:1",
],
capture_output=True,
timeout=max(30, int(duration) + 15),
preexec_fn=_lower_priority,
)
if p.returncode != 0 or len(p.stdout) == 0:
return []
frame_size = w * h * 3
return [
np.frombuffer(p.stdout[i : i + frame_size], dtype=np.uint8).reshape((h, w, 3))
for i in range(0, len(p.stdout) - frame_size + 1, frame_size)
]
def _build_background(
ffmpeg_path: str,
camera: str,
start_time: float,
end_time: float,
sample_count: int,
) -> Optional[np.ndarray]:
"""Median of sampled frames — removes moving objects, keeps the static scene."""
duration = end_time - start_time
step = duration / (sample_count + 1)
resolution = None
frames = []
for i in range(1, sample_count + 1):
ts = start_time + step * i
result = _get_recording_at(camera, ts)
if result is None:
continue
rec_path, offset = result
if not os.path.isfile(rec_path):
continue
if resolution is None:
resolution = _probe_resolution(ffmpeg_path, rec_path)
if resolution is None:
continue
w, h = resolution
frame = _extract_frame(ffmpeg_path, rec_path, offset, w, h)
if frame is not None and frame.shape == (h, w, 3):
frames.append(frame)
if len(frames) < 3:
logger.warning("only got %d bg frames, need 3+", len(frames))
return None
return np.median(np.stack(frames, axis=0), axis=0).astype(np.uint8)
def _relative_box_to_pixels(
box: list[float], w: int, h: int
) -> tuple[int, int, int, int]:
"""Normalized [x, y, w, h] -> pixel [x1, y1, x2, y2]."""
x1 = max(0, int(box[0] * w))
y1 = max(0, int(box[1] * h))
x2 = min(w, int((box[0] + box[2]) * w))
y2 = min(h, int((box[1] + box[3]) * h))
return x1, y1, x2, y2
def _make_spotlight(w: int, h: int, cx: int, cy: int, rx: int, ry: int) -> np.ndarray:
"""Soft elliptical spotlight mask, float32 0-1."""
m = np.zeros((h, w), np.uint8)
cv2.ellipse(m, (cx, cy), (rx, ry), 0, 0, 360, 255, -1)
m = cv2.GaussianBlur(m, (SPOTLIGHT_BLUR, SPOTLIGHT_BLUR), 0)
return m.astype(np.float32) / 255.0
def _person_mask(
frame: np.ndarray, ref_bg: np.ndarray, spotlight: np.ndarray
) -> np.ndarray:
"""Extract person by diffing against the per-event reference frame,
then AND with the spotlight to contain it to the detection area.
"""
diff = cv2.absdiff(frame, ref_bg)
gray = cv2.cvtColor(diff, cv2.COLOR_BGR2GRAY)
_, fg = cv2.threshold(gray, BG_DIFF_THRESHOLD, 255, cv2.THRESH_BINARY)
fg = cv2.dilate(fg, None, iterations=DILATE_ITERATIONS)
fg = cv2.erode(fg, None, iterations=1)
return (fg.astype(np.float32) / 255.0) * spotlight
def _mask_centroid(m: np.ndarray) -> Optional[tuple[int, int]]:
coords = np.argwhere(m > 0.3)
if len(coords) == 0:
return None
return int(coords[:, 1].mean()), int(coords[:, 0].mean())
def _interpolate_path(
path_data: list, t: float, w: int, h: int
) -> Optional[tuple[int, int]]:
"""Interpolate person position from path_data at time t."""
if not path_data or len(path_data) < 1:
return None
prev = None
for coord, ts in path_data:
if ts > t:
if prev is None:
return int(coord[0] * w), int(coord[1] * h)
pc, pt = prev
dt = ts - pt
if dt <= 0:
return int(coord[0] * w), int(coord[1] * h)
f = (t - pt) / dt
ix = pc[0] + (coord[0] - pc[0]) * f
iy = pc[1] + (coord[1] - pc[1]) * f
return int(ix * w), int(iy * h)
prev = (coord, ts)
if prev:
return int(prev[0][0] * w), int(prev[0][1] * h)
return None
def _draw_label(frame: np.ndarray, text: str, x: int, y: int):
font = cv2.FONT_HERSHEY_SIMPLEX
scale = 0.28
thickness = 1
(tw, th), _ = cv2.getTextSize(text, font, scale, thickness)
lx = max(0, min(x - tw // 2, frame.shape[1] - tw - 3))
ly = max(th + 3, min(y, frame.shape[0] - 2))
cv2.rectangle(frame, (lx, ly - th - 2), (lx + tw + 2, ly + 2), (0, 0, 0), -1)
cv2.putText(frame, text, (lx + 1, ly), font, scale, (255, 255, 255), thickness)
def _balance_groups(events: list[dict], max_per: int) -> list[list[dict]]:
"""Spread events across groups so durations are roughly even.
Longest events get their own group first, shorter ones fill in.
"""
by_len = sorted(events, key=lambda e: len(e["frames"]), reverse=True)
groups: list[list[dict]] = []
lengths: list[int] = []
for ev in by_len:
best = None
best_len = float("inf")
for i, g in enumerate(groups):
if len(g) < max_per and lengths[i] < best_len:
best = i
best_len = lengths[i]
if best is not None:
groups[best].append(ev)
lengths[best] = max(lengths[best], len(ev["frames"]))
else:
groups.append([ev])
lengths.append(len(ev["frames"]))
for g in groups:
g.sort(key=lambda e: e["time"])
return groups
class RecapGenerator(threading.Thread):
def __init__(
self,
config: FrigateConfig,
export_id: str,
camera: str,
start_time: float,
end_time: float,
label: str = "person",
):
super().__init__(daemon=True)
self.config = config
self.export_id = export_id
self.camera = camera
self.start_time = start_time
self.end_time = end_time
self.label = label
self.ffmpeg_path = config.ffmpeg.ffmpeg_path
recap_cfg = config.recap
self.output_fps = recap_cfg.output_fps
self.speed = 2
self.max_per_group = 3
self.video_duration = recap_cfg.video_duration
self.background_samples = recap_cfg.background_samples
Path(RECAP_CACHE).mkdir(parents=True, exist_ok=True)
Path(os.path.join(CLIPS_DIR, "export")).mkdir(exist_ok=True)
def _get_events(self) -> list[dict]:
return list(
Event.select(
Event.id,
Event.start_time,
Event.end_time,
Event.label,
Event.data,
Event.box,
Event.top_score,
)
.where(Event.camera == self.camera)
.where(Event.label == self.label)
.where(Event.start_time >= self.start_time)
.where(Event.start_time <= self.end_time)
.where(Event.false_positive == False) # noqa: E712
.order_by(Event.start_time.asc())
.dicts()
)
def run(self):
logger.info(
"generating recap for %s (%s to %s)",
self.camera,
datetime.datetime.fromtimestamp(self.start_time).isoformat(),
datetime.datetime.fromtimestamp(self.end_time).isoformat(),
)
wall_start = time.monotonic()
start_dt = datetime.datetime.fromtimestamp(self.start_time)
end_dt = datetime.datetime.fromtimestamp(self.end_time)
export_name = f"{self.camera} recap {start_dt.strftime('%Y-%m-%d')}"
filename = (
f"{self.camera}_recap_{start_dt.strftime('%Y%m%d_%H%M%S')}-"
f"{end_dt.strftime('%Y%m%d_%H%M%S')}_{self.export_id.split('_')[-1]}.mp4"
)
video_path = os.path.join(EXPORT_DIR, filename)
Export.insert(
{
Export.id: self.export_id,
Export.camera: self.camera,
Export.name: export_name,
Export.date: self.start_time,
Export.video_path: video_path,
Export.thumb_path: "",
Export.in_progress: True,
}
).execute()
try:
self._generate(video_path)
except Exception:
logger.exception("recap failed for %s", self.camera)
Path(video_path).unlink(missing_ok=True)
Export.delete().where(Export.id == self.export_id).execute()
return
logger.info(
"recap for %s done in %.1fs -> %s",
self.camera,
time.monotonic() - wall_start,
video_path,
)
def _generate(self, out_path: str):
events = self._get_events()
if not events:
logger.info("no %s events for %s, nothing to do", self.label, self.camera)
Export.delete().where(Export.id == self.export_id).execute()
return
logger.info("found %d %s events", len(events), self.label)
background = _build_background(
self.ffmpeg_path,
self.camera,
self.start_time,
self.end_time,
self.background_samples,
)
if background is None:
logger.error("couldn't build background for %s", self.camera)
Export.delete().where(Export.id == self.export_id).execute()
return
bg_h, bg_w = background.shape[:2]
bg_f = background.astype(np.float32)
# build clip data for each event
prepped = []
for ev in events:
data = ev.get("data") or {}
box = data.get("box") or ev.get("box")
if not box or len(box) != 4:
continue
ev_time = float(ev["start_time"])
ev_end = float(ev.get("end_time") or ev_time)
ev_dur = max(ev_end - ev_time, 0.5)
result = _get_recording_at(self.camera, ev_time)
if result is None:
continue
rec_path, offset = result
if not os.path.isfile(rec_path):
continue
frames = _extract_frames_range(
self.ffmpeg_path,
rec_path,
offset,
ev_dur,
self.output_fps,
bg_w,
bg_h,
)
if len(frames) < 3:
continue
# first frame is from pre-capture — use as per-event bg reference
ref_bg = frames[0]
event_frames = frames[2:]
if not event_frames:
continue
pbox = _relative_box_to_pixels(box, bg_w, bg_h)
ts_str = datetime.datetime.fromtimestamp(ev_time).strftime("%H:%M:%S")
prepped.append(
{
"frames": event_frames,
"ref_bg": ref_bg,
"pbox": pbox,
"path": data.get("path_data"),
"ts_str": ts_str,
"time": ev_time,
}
)
if not prepped:
logger.warning("no usable clips for %s", self.camera)
Export.delete().where(Export.id == self.export_id).execute()
return
groups = _balance_groups(prepped, self.max_per_group)
logger.info(
"%d events -> %d groups (max %d/group)",
len(prepped),
len(groups),
self.max_per_group,
)
# render each group to a temp file, then concat
tmp_dir = os.path.join(RECAP_CACHE, self.export_id)
Path(tmp_dir).mkdir(parents=True, exist_ok=True)
seg_paths = []
for gi, group in enumerate(groups):
max_frames = max(len(e["frames"]) for e in group)
seg_path = os.path.join(tmp_dir, f"seg_{gi:04d}.mp4")
proc = sp.Popen(
[
self.ffmpeg_path,
"-hide_banner",
"-loglevel",
"error",
"-y",
"-f",
"rawvideo",
"-pix_fmt",
"bgr24",
"-s",
f"{bg_w}x{bg_h}",
"-r",
str(self.output_fps * self.speed),
"-i",
"pipe:0",
"-c:v",
"libx264",
"-preset",
"fast",
"-crf",
OUTPUT_CRF,
"-pix_fmt",
"yuv420p",
"-movflags",
"+faststart",
seg_path,
],
stdin=sp.PIPE,
stdout=sp.PIPE,
stderr=sp.PIPE,
preexec_fn=_lower_priority,
)
try:
for fi in range(max_frames):
canvas = bg_f.copy()
label_info = []
for ev in group:
if fi >= len(ev["frames"]):
continue
src = ev["frames"][fi]
src_f = src.astype(np.float32)
bx1, by1, bx2, by2 = ev["pbox"]
bw = bx2 - bx1
bh = by2 - by1
ft = ev["time"] + fi / self.output_fps
pos = None
if ev["path"] and len(ev["path"]) >= 2:
pos = _interpolate_path(ev["path"], ft, bg_w, bg_h)
cx, cy = pos if pos else ((bx1 + bx2) // 2, (by1 + by2) // 2)
rx = max(20, int(bw * SPOTLIGHT_PAD))
ry = max(25, int(bh * SPOTLIGHT_PAD))
sl = _make_spotlight(bg_w, bg_h, cx, cy, rx, ry)
mask = _person_mask(src, ev["ref_bg"], sl)
m3 = mask[:, :, np.newaxis]
canvas = src_f * m3 + canvas * (1.0 - m3)
ctr = _mask_centroid(mask)
if ctr:
label_info.append(
(ev["ts_str"], ctr[0], ctr[1] - int(bh * 0.5))
)
else:
label_info.append((ev["ts_str"], cx, cy - int(bh * 0.5)))
cu8 = canvas.astype(np.uint8)
for ts, lx, ly in label_info:
_draw_label(cu8, ts, lx, ly)
cv2.rectangle(
cu8,
(0, bg_h - 2),
(int(bg_w * fi / max_frames), bg_h),
(0, 180, 255),
-1,
)
proc.stdin.write(cu8.tobytes())
proc.stdin.close()
proc.wait(timeout=120)
except Exception:
proc.kill()
proc.wait()
raise
if proc.returncode == 0:
seg_paths.append(seg_path)
# free memory as we go
for ev in group:
ev["frames"] = None
ev["ref_bg"] = None
if not seg_paths:
logger.error("no segments rendered for %s", self.camera)
Export.delete().where(Export.id == self.export_id).execute()
return
# concat all segments
concat_file = os.path.join(tmp_dir, "concat.txt")
with open(concat_file, "w") as f:
for p in seg_paths:
f.write(f"file '{p}'\n")
sp.run(
[
self.ffmpeg_path,
"-hide_banner",
"-loglevel",
"error",
"-f",
"concat",
"-safe",
"0",
"-i",
concat_file,
"-c",
"copy",
"-movflags",
"+faststart",
"-y",
out_path,
],
capture_output=True,
timeout=300,
preexec_fn=_lower_priority,
)
# cleanup temp files
for p in seg_paths:
Path(p).unlink(missing_ok=True)
Path(concat_file).unlink(missing_ok=True)
Path(tmp_dir).rmdir()
# thumbnail from the middle
thumb_path = os.path.join(CLIPS_DIR, f"export/{self.export_id}.webp")
total_frames = sum(
max(len(e["frames"]) for e in g) if any(e["frames"] for e in g) else 0
for g in groups
)
sp.run(
[
self.ffmpeg_path,
"-hide_banner",
"-loglevel",
"error",
"-i",
out_path,
"-vf",
f"select=eq(n\\,{max(1, total_frames // 2)})",
"-frames:v",
"1",
"-c:v",
"libwebp",
"-y",
thumb_path,
],
capture_output=True,
timeout=30,
preexec_fn=_lower_priority,
)
Export.update({Export.in_progress: False, Export.thumb_path: thumb_path}).where(
Export.id == self.export_id
).execute()

221
frigate/test/test_recap.py Normal file
View File

@ -0,0 +1,221 @@
import unittest
from unittest.mock import patch
import numpy as np
from frigate.recap.recap import (
_balance_groups,
_build_background,
_draw_label,
_interpolate_path,
_make_spotlight,
_mask_centroid,
_person_mask,
_relative_box_to_pixels,
)
class TestRelativeBoxConversion(unittest.TestCase):
def test_basic(self):
x1, y1, x2, y2 = _relative_box_to_pixels([0.5, 0.25, 0.1, 0.2], 1920, 1080)
self.assertEqual(x1, 960)
self.assertEqual(y1, 270)
self.assertEqual(x2, 1152)
self.assertEqual(y2, 486)
def test_clamps(self):
_, _, x2, y2 = _relative_box_to_pixels([0.9, 0.9, 0.2, 0.2], 100, 100)
self.assertEqual(x2, 100)
self.assertEqual(y2, 100)
def test_full_frame(self):
x1, y1, x2, y2 = _relative_box_to_pixels([0.0, 0.0, 1.0, 1.0], 1920, 1080)
self.assertEqual((x1, y1, x2, y2), (0, 0, 1920, 1080))
def test_real_frigate_data(self):
x1, y1, x2, y2 = _relative_box_to_pixels([0.65, 0.117, 0.025, 0.089], 640, 360)
self.assertEqual(x1, 416)
self.assertEqual(y1, 42)
self.assertGreater(x2, x1)
self.assertGreater(y2, y1)
class TestSpotlight(unittest.TestCase):
def test_shape_and_range(self):
sl = _make_spotlight(100, 100, 50, 50, 20, 20)
self.assertEqual(sl.shape, (100, 100))
self.assertGreater(sl[50, 50], 0.5)
self.assertAlmostEqual(sl[0, 0], 0.0, places=1)
def test_off_center(self):
sl = _make_spotlight(200, 200, 10, 10, 15, 15)
self.assertGreater(sl[10, 10], 0.5)
self.assertAlmostEqual(sl[199, 199], 0.0, places=1)
class TestPersonMask(unittest.TestCase):
def test_identical_frames_empty_mask(self):
frame = np.full((100, 100, 3), 128, np.uint8)
ref = frame.copy()
sl = _make_spotlight(100, 100, 50, 50, 30, 30)
mask = _person_mask(frame, ref, sl)
self.assertEqual(mask.sum(), 0.0)
def test_different_region_shows_fg(self):
ref = np.full((100, 100, 3), 50, np.uint8)
frame = ref.copy()
frame[40:60, 40:60] = 200 # person-sized bright block
sl = _make_spotlight(100, 100, 50, 50, 30, 30)
mask = _person_mask(frame, ref, sl)
self.assertGreater(mask[50, 50], 0.0)
class TestMaskCentroid(unittest.TestCase):
def test_centered_blob(self):
m = np.zeros((100, 100), np.float32)
m[40:60, 40:60] = 1.0
cx, cy = _mask_centroid(m)
self.assertAlmostEqual(cx, 50, delta=2)
self.assertAlmostEqual(cy, 50, delta=2)
def test_empty_mask(self):
m = np.zeros((100, 100), np.float32)
self.assertIsNone(_mask_centroid(m))
class TestInterpolatePath(unittest.TestCase):
def test_empty(self):
self.assertIsNone(_interpolate_path([], 1.0, 100, 100))
self.assertIsNone(_interpolate_path(None, 1.0, 100, 100))
def test_midpoint(self):
path = [((0.0, 0.0), 10.0), ((1.0, 1.0), 20.0)]
self.assertEqual(_interpolate_path(path, 15.0, 100, 100), (50, 50))
def test_before_first(self):
path = [((0.25, 0.75), 10.0), ((0.5, 0.5), 20.0)]
self.assertEqual(_interpolate_path(path, 5.0, 100, 100), (25, 75))
def test_after_last(self):
path = [((0.1, 0.2), 10.0), ((0.3, 0.4), 20.0)]
self.assertEqual(_interpolate_path(path, 30.0, 1000, 1000), (300, 400))
def test_real_path(self):
path = [
([0.6219, 0.2028], 1774057715.808),
([0.6297, 0.2028], 1774057716.008),
([0.7078, 0.2167], 1774057720.019),
]
pos = _interpolate_path(path, 1774057718.0, 640, 360)
self.assertIsNotNone(pos)
self.assertGreater(pos[0], int(0.6297 * 640))
self.assertLess(pos[0], int(0.7078 * 640))
class TestDrawLabel(unittest.TestCase):
def test_draws(self):
f = np.zeros((200, 300, 3), np.uint8)
_draw_label(f, "12:34:56", 100, 100)
self.assertFalse(np.all(f == 0))
def test_edge(self):
f = np.zeros((50, 50, 3), np.uint8)
_draw_label(f, "test", 0, 5)
self.assertFalse(np.all(f == 0))
class TestBalanceGroups(unittest.TestCase):
def test_single_event(self):
events = [{"frames": [1] * 10, "time": 0}]
groups = _balance_groups(events, 3)
self.assertEqual(len(groups), 1)
self.assertEqual(len(groups[0]), 1)
def test_even_split(self):
events = [{"frames": [1] * 100, "time": i} for i in range(6)]
groups = _balance_groups(events, 3)
self.assertEqual(len(groups), 2)
self.assertEqual(len(groups[0]), 3)
self.assertEqual(len(groups[1]), 3)
def test_long_events_spread(self):
events = [
{"frames": [1] * 500, "time": 0},
{"frames": [1] * 400, "time": 1},
{"frames": [1] * 10, "time": 2},
{"frames": [1] * 10, "time": 3},
]
groups = _balance_groups(events, 3)
# long events should end up in different groups
group_maxes = [max(len(e["frames"]) for e in g) for g in groups]
self.assertIn(500, group_maxes)
self.assertIn(400, group_maxes)
def test_sorted_by_time(self):
events = [
{"frames": [1] * 10, "time": 30},
{"frames": [1] * 10, "time": 10},
{"frames": [1] * 10, "time": 20},
]
groups = _balance_groups(events, 3)
times = [e["time"] for e in groups[0]]
self.assertEqual(times, sorted(times))
class TestBuildBackground(unittest.TestCase):
@patch("frigate.recap.recap._extract_frame")
@patch("frigate.recap.recap._probe_resolution")
@patch("frigate.recap.recap._get_recording_at")
def test_too_few(self, mock_rec, mock_probe, mock_extract):
mock_rec.return_value = ("/fake.mp4", 0.0)
mock_probe.return_value = (100, 100)
mock_extract.return_value = None
self.assertIsNone(_build_background("/usr/bin/ffmpeg", "cam", 0.0, 100.0, 10))
@patch("frigate.recap.recap.os.path.isfile", return_value=True)
@patch("frigate.recap.recap._extract_frame")
@patch("frigate.recap.recap._probe_resolution")
@patch("frigate.recap.recap._get_recording_at")
def test_median(self, mock_rec, mock_probe, mock_extract, mock_isfile):
mock_rec.return_value = ("/fake.mp4", 0.0)
mock_probe.return_value = (4, 4)
frames = [np.full((4, 4, 3), v, np.uint8) for v in [0, 100, 200]]
idx = [0]
def side_effect(*a, **kw):
r = frames[idx[0] % 3]
idx[0] += 1
return r
mock_extract.side_effect = side_effect
result = _build_background("/usr/bin/ffmpeg", "cam", 0.0, 100.0, 5)
self.assertIsNotNone(result)
self.assertEqual(result[0, 0, 0], 100)
class TestRecapConfig(unittest.TestCase):
def test_defaults(self):
from frigate.config.recap import RecapConfig
cfg = RecapConfig()
self.assertFalse(cfg.enabled)
self.assertEqual(cfg.default_label, "person")
self.assertEqual(cfg.video_duration, 30)
def test_validation(self):
from pydantic import ValidationError
from frigate.config.recap import RecapConfig
with self.assertRaises(ValidationError):
RecapConfig(ghost_duration=0.1)
with self.assertRaises(ValidationError):
RecapConfig(output_fps=60)
with self.assertRaises(ValidationError):
RecapConfig(video_duration=2)
with self.assertRaises(ValidationError):
RecapConfig(background_samples=2)
if __name__ == "__main__":
unittest.main()

View File

@ -0,0 +1,166 @@
import { useCallback, useState } from "react";
import {
Dialog,
DialogContent,
DialogFooter,
DialogHeader,
DialogTitle,
} from "../ui/dialog";
import { Button } from "../ui/button";
import { Label } from "../ui/label";
import { RadioGroup, RadioGroupItem } from "../ui/radio-group";
import { Input } from "../ui/input";
import { SelectSeparator } from "../ui/select";
import axios from "axios";
import { toast } from "sonner";
import { isDesktop } from "react-device-detect";
import { Drawer, DrawerContent } from "../ui/drawer";
import ActivityIndicator from "../indicators/activity-indicator";
const RECAP_PERIODS = ["24", "12", "8", "4", "1"] as const;
type RecapPeriod = (typeof RECAP_PERIODS)[number];
type RecapDialogProps = {
camera: string;
open: boolean;
onOpenChange: (open: boolean) => void;
};
export default function RecapDialog({
camera,
open,
onOpenChange,
}: RecapDialogProps) {
const [selectedPeriod, setSelectedPeriod] = useState<RecapPeriod>("24");
const [label, setLabel] = useState("person");
const [isGenerating, setIsGenerating] = useState(false);
const onGenerate = useCallback(() => {
const now = Date.now() / 1000;
const hours = parseInt(selectedPeriod);
const startTime = now - hours * 3600;
setIsGenerating(true);
axios
.post(`recap/${camera}`, null, {
params: {
start_time: startTime,
end_time: now,
label,
},
})
.then((response) => {
if (response.status === 200 && response.data.success) {
toast.success("Recap generation started", {
position: "top-center",
description: "Check Exports when it's done.",
});
onOpenChange(false);
}
})
.catch((error) => {
const msg =
error.response?.data?.message ||
error.response?.data?.detail ||
"Unknown error";
toast.error(`Recap failed: ${msg}`, { position: "top-center" });
})
.finally(() => {
setIsGenerating(false);
});
}, [camera, selectedPeriod, label, onOpenChange]);
const Overlay = isDesktop ? Dialog : Drawer;
const Content = isDesktop ? DialogContent : DrawerContent;
return (
<Overlay open={open} onOpenChange={onOpenChange}>
<Content
className={
isDesktop
? "sm:rounded-lg md:rounded-2xl"
: "mx-4 rounded-lg px-4 pb-4 md:rounded-2xl"
}
>
<div className="w-full">
{isDesktop && (
<>
<DialogHeader>
<DialogTitle>Generate Recap</DialogTitle>
</DialogHeader>
<SelectSeparator className="my-4 bg-secondary" />
</>
)}
<div className={`flex flex-col gap-4 ${isDesktop ? "" : "mt-4"}`}>
<Label className="text-sm font-medium">Time period</Label>
<RadioGroup
className="flex flex-col gap-3"
defaultValue="24"
onValueChange={(v) => setSelectedPeriod(v as RecapPeriod)}
>
{RECAP_PERIODS.map((period) => (
<div key={period} className="flex items-center gap-2">
<RadioGroupItem
className={
period === selectedPeriod
? "bg-selected from-selected/50 to-selected/90 text-selected"
: "bg-secondary from-secondary/50 to-secondary/90 text-secondary"
}
id={`recap-${period}`}
value={period}
/>
<Label
className="cursor-pointer"
htmlFor={`recap-${period}`}
>
Last {period} {parseInt(period) === 1 ? "hour" : "hours"}
</Label>
</div>
))}
</RadioGroup>
<div className="mt-2">
<Label className="text-sm text-secondary-foreground">
Object type
</Label>
<Input
className="text-md mt-2"
type="text"
value={label}
onChange={(e) => setLabel(e.target.value)}
placeholder="person"
/>
</div>
</div>
{isDesktop && <SelectSeparator className="my-4 bg-secondary" />}
<DialogFooter
className={isDesktop ? "" : "mt-6 flex flex-col-reverse gap-4"}
>
<div
className={`cursor-pointer p-2 text-center ${isDesktop ? "" : "w-full"}`}
onClick={() => onOpenChange(false)}
>
Cancel
</div>
<Button
className={isDesktop ? "" : "w-full"}
variant="select"
size="sm"
disabled={isGenerating}
onClick={onGenerate}
>
{isGenerating && (
<ActivityIndicator className="mr-2 h-4 w-4" />
)}
Generate Recap
</Button>
</DialogFooter>
</div>
</Content>
</Overlay>
);
}