Add support a looping GenAI process to monitor a camera (#22556)

* Add support for VLM monitoring a camera

* Cleanup

* Cleanup
This commit is contained in:
Nicolas Mowen 2026-03-20 17:44:02 -06:00 committed by GitHub
parent acd10d0e08
commit a8da4c4521
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 670 additions and 5 deletions

View File

@ -26,6 +26,11 @@ from frigate.api.defs.response.chat_response import (
from frigate.api.defs.tags import Tags
from frigate.api.event import events
from frigate.genai.utils import build_assistant_message_for_conversation
from frigate.jobs.vlm_watch import (
get_vlm_watch_job,
start_vlm_watch_job,
stop_vlm_watch_job,
)
logger = logging.getLogger(__name__)
@ -82,6 +87,16 @@ class ToolExecuteRequest(BaseModel):
arguments: Dict[str, Any]
class VLMMonitorRequest(BaseModel):
"""Request model for starting a VLM watch job."""
camera: str
condition: str
max_duration_minutes: int = 60
labels: List[str] = []
zones: List[str] = []
def get_tool_definitions() -> List[Dict[str, Any]]:
"""
Get OpenAI-compatible tool definitions for Frigate.
@ -95,9 +110,11 @@ def get_tool_definitions() -> List[Dict[str, Any]]:
"function": {
"name": "search_objects",
"description": (
"Search for detected objects in Frigate by camera, object label, time range, "
"zones, and other filters. Use this to answer questions about when "
"objects were detected, what objects appeared, or to find specific object detections. "
"Search the historical record of detected objects in Frigate. "
"Use this ONLY for questions about the PAST — e.g. 'did anyone come by today?', "
"'when was the last car?', 'show me detections from yesterday'. "
"Do NOT use this for monitoring or alerting requests about future events — "
"use start_camera_watch instead for those. "
"An 'object' in Frigate represents a tracked detection (e.g., a person, package, car). "
"When the user asks about a specific name (person, delivery company, animal, etc.), "
"filter by sub_label only and do not set label."
@ -217,6 +234,65 @@ def get_tool_definitions() -> List[Dict[str, Any]]:
},
},
},
{
"type": "function",
"function": {
"name": "start_camera_watch",
"description": (
"Start a continuous VLM watch job that monitors a camera and sends a notification "
"when a specified condition is met. Use this when the user wants to be alerted about "
"a future event, e.g. 'tell me when guests arrive' or 'notify me when the package is picked up'. "
"Only one watch job can run at a time. Returns a job ID."
),
"parameters": {
"type": "object",
"properties": {
"camera": {
"type": "string",
"description": "Camera ID to monitor.",
},
"condition": {
"type": "string",
"description": (
"Natural-language description of the condition to watch for, "
"e.g. 'a person arrives at the front door'."
),
},
"max_duration_minutes": {
"type": "integer",
"description": "Maximum time to watch before giving up (minutes, default 60).",
"default": 60,
},
"labels": {
"type": "array",
"items": {"type": "string"},
"description": "Object labels that should trigger a VLM check (e.g. ['person', 'car']). If omitted, any detection on the camera triggers a check.",
},
"zones": {
"type": "array",
"items": {"type": "string"},
"description": "Zone names to filter by. If specified, only detections in these zones trigger a VLM check.",
},
},
"required": ["camera", "condition"],
},
},
},
{
"type": "function",
"function": {
"name": "stop_camera_watch",
"description": (
"Cancel the currently running VLM watch job. Use this when the user wants to "
"stop a previously started watch, e.g. 'stop watching the front door'."
),
"parameters": {
"type": "object",
"properties": {},
"required": [],
},
},
},
]
@ -565,16 +641,75 @@ async def _execute_tool_internal(
)
return {"error": "Camera parameter is required"}
return await _execute_get_live_context(request, camera, allowed_cameras)
elif tool_name == "start_camera_watch":
return await _execute_start_camera_watch(request, arguments)
elif tool_name == "stop_camera_watch":
return _execute_stop_camera_watch()
else:
logger.error(
"Tool call failed: unknown tool %r. Expected one of: search_objects, get_live_context. "
"Arguments received: %s",
"Tool call failed: unknown tool %r. Expected one of: search_objects, get_live_context, "
"start_camera_watch, stop_camera_watch. Arguments received: %s",
tool_name,
json.dumps(arguments),
)
return {"error": f"Unknown tool: {tool_name}"}
async def _execute_start_camera_watch(
request: Request,
arguments: Dict[str, Any],
) -> Dict[str, Any]:
camera = arguments.get("camera", "").strip()
condition = arguments.get("condition", "").strip()
max_duration_minutes = int(arguments.get("max_duration_minutes", 60))
labels = arguments.get("labels") or []
zones = arguments.get("zones") or []
if not camera or not condition:
return {"error": "camera and condition are required."}
config = request.app.frigate_config
if camera not in config.cameras:
return {"error": f"Camera '{camera}' not found."}
genai_manager = request.app.genai_manager
vision_client = genai_manager.vision_client or genai_manager.tool_client
if vision_client is None:
return {"error": "No vision/GenAI provider configured."}
try:
job_id = start_vlm_watch_job(
camera=camera,
condition=condition,
max_duration_minutes=max_duration_minutes,
config=config,
frame_processor=request.app.detected_frames_processor,
genai_manager=genai_manager,
dispatcher=request.app.dispatcher,
labels=labels,
zones=zones,
)
except RuntimeError as e:
logger.error("Failed to start VLM watch job: %s", e, exc_info=True)
return {"error": "Failed to start VLM watch job."}
return {
"success": True,
"job_id": job_id,
"message": (
f"Now watching '{camera}' for: {condition}. "
f"You'll receive a notification when the condition is met (timeout: {max_duration_minutes} min)."
),
}
def _execute_stop_camera_watch() -> Dict[str, Any]:
cancelled = stop_vlm_watch_job()
if cancelled:
return {"success": True, "message": "Watch job cancelled."}
return {"success": False, "message": "No active watch job to cancel."}
async def _execute_pending_tools(
pending_tool_calls: List[Dict[str, Any]],
request: Request,
@ -991,3 +1126,95 @@ Always be accurate with time calculations based on the current date provided.{ca
},
status_code=500,
)
# ---------------------------------------------------------------------------
# VLM Monitor endpoints
# ---------------------------------------------------------------------------
@router.post(
"/vlm/monitor",
dependencies=[Depends(allow_any_authenticated())],
summary="Start a VLM watch job",
description=(
"Start monitoring a camera with the vision provider. "
"The VLM analyzes live frames until the specified condition is met, "
"then sends a notification. Only one watch job can run at a time."
),
)
async def start_vlm_monitor(
request: Request,
body: VLMMonitorRequest,
) -> JSONResponse:
config = request.app.frigate_config
genai_manager = request.app.genai_manager
if body.camera not in config.cameras:
return JSONResponse(
content={"success": False, "message": f"Camera '{body.camera}' not found."},
status_code=404,
)
vision_client = genai_manager.vision_client or genai_manager.tool_client
if vision_client is None:
return JSONResponse(
content={
"success": False,
"message": "No vision/GenAI provider configured.",
},
status_code=400,
)
try:
job_id = start_vlm_watch_job(
camera=body.camera,
condition=body.condition,
max_duration_minutes=body.max_duration_minutes,
config=config,
frame_processor=request.app.detected_frames_processor,
genai_manager=genai_manager,
dispatcher=request.app.dispatcher,
labels=body.labels,
zones=body.zones,
)
except RuntimeError as e:
logger.error("Failed to start VLM watch job: %s", e, exc_info=True)
return JSONResponse(
content={"success": False, "message": "Failed to start VLM watch job."},
status_code=409,
)
return JSONResponse(
content={"success": True, "job_id": job_id},
status_code=201,
)
@router.get(
"/vlm/monitor",
dependencies=[Depends(allow_any_authenticated())],
summary="Get current VLM watch job",
description="Returns the current (or most recently completed) VLM watch job.",
)
async def get_vlm_monitor() -> JSONResponse:
job = get_vlm_watch_job()
if job is None:
return JSONResponse(content={"active": False}, status_code=200)
return JSONResponse(content={"active": True, **job.to_dict()}, status_code=200)
@router.delete(
"/vlm/monitor",
dependencies=[Depends(allow_any_authenticated())],
summary="Cancel the current VLM watch job",
description="Cancels the running watch job if one exists.",
)
async def cancel_vlm_monitor() -> JSONResponse:
cancelled = stop_vlm_watch_job()
if not cancelled:
return JSONResponse(
content={"success": False, "message": "No active watch job to cancel."},
status_code=404,
)
return JSONResponse(content={"success": True}, status_code=200)

View File

@ -210,6 +210,15 @@ class WebPushClient(Communicator):
logger.debug(f"Notifications for {camera} are currently suspended.")
return
self.send_trigger(decoded)
elif topic == "camera_monitoring":
decoded = json.loads(payload)
camera = decoded["camera"]
if not self.config.cameras[camera].notifications.enabled:
return
if self.is_camera_suspended(camera):
logger.debug(f"Notifications for {camera} are currently suspended.")
return
self.send_camera_monitoring(decoded)
elif topic == "notification_test":
if not self.config.notifications.enabled and not any(
cam.notifications.enabled for cam in self.config.cameras.values()
@ -477,6 +486,30 @@ class WebPushClient(Communicator):
self.cleanup_registrations()
def send_camera_monitoring(self, payload: dict[str, Any]) -> None:
camera: str = payload["camera"]
camera_name: str = getattr(
self.config.cameras[camera], "friendly_name", None
) or titlecase(camera.replace("_", " "))
self.check_registrations()
reasoning: str = payload.get("reasoning", "")
title = f"{camera_name}: Monitoring Alert"
message = (reasoning[:197] + "...") if len(reasoning) > 200 else reasoning
logger.debug(f"Sending camera monitoring push notification for {camera_name}")
for user in self.web_pushers:
self.send_push_notification(
user=user,
payload=payload,
title=title,
message=message,
)
self.cleanup_registrations()
def stop(self) -> None:
logger.info("Closing notification queue")
self.notification_thread.join()

405
frigate/jobs/vlm_watch.py Normal file
View File

@ -0,0 +1,405 @@
"""VLM watch job: continuously monitors a camera and notifies when a condition is met."""
import base64
import json
import logging
import re
import threading
import time
from dataclasses import asdict, dataclass, field
from datetime import datetime
from typing import Any, Optional
import cv2
from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import FrigateConfig
from frigate.const import UPDATE_JOB_STATE
from frigate.jobs.job import Job
from frigate.types import JobStatusTypesEnum
logger = logging.getLogger(__name__)
# Polling interval bounds (seconds)
_MIN_INTERVAL = 1
_MAX_INTERVAL = 300
# Max user/assistant turn pairs to keep in conversation history
_MAX_HISTORY = 10
@dataclass
class VLMWatchJob(Job):
"""Job state for a VLM watch monitor."""
job_type: str = "vlm_watch"
camera: str = ""
condition: str = ""
max_duration_minutes: int = 60
labels: list = field(default_factory=list)
zones: list = field(default_factory=list)
last_reasoning: str = ""
iteration_count: int = 0
def to_dict(self) -> dict[str, Any]:
return asdict(self)
class VLMWatchRunner(threading.Thread):
"""Background thread that polls a camera with the vision client until a condition is met."""
def __init__(
self,
job: VLMWatchJob,
config: FrigateConfig,
cancel_event: threading.Event,
frame_processor,
genai_manager,
dispatcher,
) -> None:
super().__init__(daemon=True, name=f"vlm_watch_{job.id}")
self.job = job
self.config = config
self.cancel_event = cancel_event
self.frame_processor = frame_processor
self.genai_manager = genai_manager
self.dispatcher = dispatcher
self.requestor = InterProcessRequestor()
self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.video.value)
self.conversation: list[dict[str, Any]] = []
def run(self) -> None:
self.job.status = JobStatusTypesEnum.running
self.job.start_time = time.time()
self._broadcast_status()
self.conversation = [{"role": "system", "content": self._build_system_prompt()}]
max_end_time = self.job.start_time + self.job.max_duration_minutes * 60
try:
while not self.cancel_event.is_set():
if time.time() > max_end_time:
logger.debug(
"VLM watch job %s timed out after %d minutes",
self.job.id,
self.job.max_duration_minutes,
)
self.job.status = JobStatusTypesEnum.failed
self.job.error_message = f"Monitor timed out after {self.job.max_duration_minutes} minutes"
break
next_run_in = self._run_iteration()
if self.job.status == JobStatusTypesEnum.success:
break
self._wait_for_trigger(next_run_in)
except Exception as e:
logger.exception("VLM watch job %s failed: %s", self.job.id, e)
self.job.status = JobStatusTypesEnum.failed
self.job.error_message = str(e)
finally:
if self.job.status == JobStatusTypesEnum.running:
self.job.status = JobStatusTypesEnum.cancelled
self.job.end_time = time.time()
self._broadcast_status()
try:
self.detection_subscriber.stop()
except Exception:
pass
try:
self.requestor.stop()
except Exception:
pass
def _run_iteration(self) -> float:
"""Run one VLM analysis iteration. Returns seconds until next run."""
vision_client = (
self.genai_manager.vision_client or self.genai_manager.tool_client
)
if vision_client is None:
logger.warning("VLM watch job %s: no vision client available", self.job.id)
return 30
frame = self.frame_processor.get_current_frame(self.job.camera, {})
if frame is None:
logger.debug(
"VLM watch job %s: frame unavailable for camera %s",
self.job.id,
self.job.camera,
)
self.job.last_reasoning = "Camera frame unavailable"
return 10
# Downscale frame to 480p max height
h, w = frame.shape[:2]
if h > 480:
scale = 480.0 / h
frame = cv2.resize(
frame, (int(w * scale), 480), interpolation=cv2.INTER_AREA
)
_, enc = cv2.imencode(".jpg", frame, [cv2.IMWRITE_JPEG_QUALITY, 85])
b64 = base64.b64encode(enc.tobytes()).decode()
timestamp = datetime.now().strftime("%H:%M:%S")
self.conversation.append(
{
"role": "user",
"content": [
{"type": "text", "text": f"Frame captured at {timestamp}."},
{
"type": "image_url",
"image_url": {"url": f"data:image/jpeg;base64,{b64}"},
},
],
}
)
response = vision_client.chat_with_tools(
messages=self.conversation,
tools=None,
tool_choice=None,
)
response_str = response.get("content") or ""
if not response_str:
logger.warning(
"VLM watch job %s: empty response from vision client", self.job.id
)
# Remove the user message we just added so we don't leave a dangling turn
self.conversation.pop()
return 30
logger.debug("VLM watch job %s response: %s", self.job.id, response_str)
self.conversation.append({"role": "assistant", "content": response_str})
# Keep system prompt + last _MAX_HISTORY user/assistant pairs
max_msgs = 1 + _MAX_HISTORY * 2
if len(self.conversation) > max_msgs:
self.conversation = [self.conversation[0]] + self.conversation[
-(max_msgs - 1) :
]
try:
clean = re.sub(
r"\n?```$", "", re.sub(r"^```[a-zA-Z0-9]*\n?", "", response_str)
)
parsed = json.loads(clean)
condition_met = bool(parsed.get("condition_met", False))
next_run_in = max(
_MIN_INTERVAL,
min(_MAX_INTERVAL, int(parsed.get("next_run_in", 30))),
)
reasoning = str(parsed.get("reasoning", ""))
except (json.JSONDecodeError, ValueError, TypeError) as e:
logger.warning(
"VLM watch job %s: failed to parse VLM response: %s", self.job.id, e
)
return 30
self.job.last_reasoning = reasoning
self.job.iteration_count += 1
self._broadcast_status()
if condition_met:
logger.debug(
"VLM watch job %s: condition met on camera %s%s",
self.job.id,
self.job.camera,
reasoning,
)
self._send_notification(reasoning)
self.job.status = JobStatusTypesEnum.success
return 0
return next_run_in
def _wait_for_trigger(self, max_wait: float) -> None:
"""Wait up to max_wait seconds, returning early if a relevant detection fires on the target camera."""
deadline = time.time() + max_wait
while not self.cancel_event.is_set():
remaining = deadline - time.time()
if remaining <= 0:
break
topic, payload = self.detection_subscriber.check_for_update(
timeout=min(1.0, remaining)
)
if topic is None or payload is None:
continue
# payload = (camera, frame_name, frame_time, tracked_objects, motion_boxes, regions)
cam = payload[0]
tracked_objects = payload[3]
logger.debug(
"VLM watch job %s: detection event cam=%s (want %s), objects=%s",
self.job.id,
cam,
self.job.camera,
[
{"label": o.get("label"), "zones": o.get("current_zones")}
for o in (tracked_objects or [])
],
)
if cam != self.job.camera or not tracked_objects:
continue
if self._detection_matches_filters(tracked_objects):
logger.debug(
"VLM watch job %s: woken early by detection event on %s",
self.job.id,
self.job.camera,
)
break
def _detection_matches_filters(self, tracked_objects: list) -> bool:
"""Return True if any tracked object passes the label and zone filters."""
labels = self.job.labels
zones = self.job.zones
for obj in tracked_objects:
label_ok = not labels or obj.get("label") in labels
zone_ok = not zones or bool(set(obj.get("current_zones", [])) & set(zones))
if label_ok and zone_ok:
return True
return False
def _build_system_prompt(self) -> str:
focus_text = ""
if self.job.labels or self.job.zones:
parts = []
if self.job.labels:
parts.append(f"object types: {', '.join(self.job.labels)}")
if self.job.zones:
parts.append(f"zones: {', '.join(self.job.zones)}")
focus_text = f"\nFocus on {' and '.join(parts)}.\n"
return (
f'You are monitoring a security camera. Your task: determine when "{self.job.condition}" occurs.\n'
f"{focus_text}\n"
f"You will receive a sequence of frames over time. Use the conversation history to understand "
f"what is stationary vs. actively changing.\n\n"
f"For each frame respond with JSON only:\n"
f'{{"condition_met": <true/false>, "next_run_in": <integer seconds 1-300>, "reasoning": "<brief explanation>"}}\n\n'
f"Guidelines for next_run_in:\n"
f"- Scene is empty / nothing of interest visible: 60-300.\n"
f"- Relevant object(s) visible anywhere in frame (even outside the target zone): 3-10. "
f"They may be moving toward the zone.\n"
f"- Condition is actively forming (object approaching zone or threshold): 1-5.\n"
f"- Set condition_met to true only when you are confident the condition is currently met.\n"
f"- Keep reasoning to 1-2 sentences."
)
def _send_notification(self, reasoning: str) -> None:
"""Publish a camera_monitoring event so downstream handlers (web push, MQTT) can notify users."""
payload = {
"camera": self.job.camera,
"condition": self.job.condition,
"reasoning": reasoning,
"job_id": self.job.id,
}
if self.dispatcher:
try:
self.dispatcher.publish("camera_monitoring", json.dumps(payload))
except Exception as e:
logger.warning(
"VLM watch job %s: failed to publish alert: %s", self.job.id, e
)
def _broadcast_status(self) -> None:
try:
self.requestor.send_data(UPDATE_JOB_STATE, self.job.to_dict())
except Exception as e:
logger.warning(
"VLM watch job %s: failed to broadcast status: %s", self.job.id, e
)
# Module-level singleton (only one watch job at a time)
_current_job: Optional[VLMWatchJob] = None
_cancel_event: Optional[threading.Event] = None
_job_lock = threading.Lock()
def start_vlm_watch_job(
camera: str,
condition: str,
max_duration_minutes: int,
config: FrigateConfig,
frame_processor,
genai_manager,
dispatcher,
labels: list[str] | None = None,
zones: list[str] | None = None,
) -> str:
"""Start a new VLM watch job. Returns the job ID.
Raises RuntimeError if a job is already running.
"""
global _current_job, _cancel_event
with _job_lock:
if _current_job is not None and _current_job.status in (
JobStatusTypesEnum.queued,
JobStatusTypesEnum.running,
):
raise RuntimeError(
f"A VLM watch job is already running (id={_current_job.id}). "
"Cancel it before starting a new one."
)
job = VLMWatchJob(
camera=camera,
condition=condition,
max_duration_minutes=max_duration_minutes,
labels=labels or [],
zones=zones or [],
)
cancel_ev = threading.Event()
_current_job = job
_cancel_event = cancel_ev
runner = VLMWatchRunner(
job=job,
config=config,
cancel_event=cancel_ev,
frame_processor=frame_processor,
genai_manager=genai_manager,
dispatcher=dispatcher,
)
runner.start()
logger.debug(
"Started VLM watch job %s: camera=%s, condition=%r, max_duration=%dm",
job.id,
camera,
condition,
max_duration_minutes,
)
return job.id
def stop_vlm_watch_job() -> bool:
"""Cancel the current VLM watch job. Returns True if a job was cancelled."""
global _current_job, _cancel_event
with _job_lock:
if _current_job is None or _current_job.status not in (
JobStatusTypesEnum.queued,
JobStatusTypesEnum.running,
):
return False
if _cancel_event:
_cancel_event.set()
_current_job.status = JobStatusTypesEnum.cancelled
logger.debug("Cancelled VLM watch job %s", _current_job.id)
return True
def get_vlm_watch_job() -> Optional[VLMWatchJob]:
"""Return the current (or most recent) VLM watch job."""
return _current_job