diff --git a/frigate/api/chat.py b/frigate/api/chat.py index 900fa86cc..136c425f2 100644 --- a/frigate/api/chat.py +++ b/frigate/api/chat.py @@ -26,6 +26,11 @@ from frigate.api.defs.response.chat_response import ( from frigate.api.defs.tags import Tags from frigate.api.event import events from frigate.genai.utils import build_assistant_message_for_conversation +from frigate.jobs.vlm_watch import ( + get_vlm_watch_job, + start_vlm_watch_job, + stop_vlm_watch_job, +) logger = logging.getLogger(__name__) @@ -82,6 +87,16 @@ class ToolExecuteRequest(BaseModel): arguments: Dict[str, Any] +class VLMMonitorRequest(BaseModel): + """Request model for starting a VLM watch job.""" + + camera: str + condition: str + max_duration_minutes: int = 60 + labels: List[str] = [] + zones: List[str] = [] + + def get_tool_definitions() -> List[Dict[str, Any]]: """ Get OpenAI-compatible tool definitions for Frigate. @@ -95,9 +110,11 @@ def get_tool_definitions() -> List[Dict[str, Any]]: "function": { "name": "search_objects", "description": ( - "Search for detected objects in Frigate by camera, object label, time range, " - "zones, and other filters. Use this to answer questions about when " - "objects were detected, what objects appeared, or to find specific object detections. " + "Search the historical record of detected objects in Frigate. " + "Use this ONLY for questions about the PAST — e.g. 'did anyone come by today?', " + "'when was the last car?', 'show me detections from yesterday'. " + "Do NOT use this for monitoring or alerting requests about future events — " + "use start_camera_watch instead for those. " "An 'object' in Frigate represents a tracked detection (e.g., a person, package, car). " "When the user asks about a specific name (person, delivery company, animal, etc.), " "filter by sub_label only and do not set label." @@ -217,6 +234,65 @@ def get_tool_definitions() -> List[Dict[str, Any]]: }, }, }, + { + "type": "function", + "function": { + "name": "start_camera_watch", + "description": ( + "Start a continuous VLM watch job that monitors a camera and sends a notification " + "when a specified condition is met. Use this when the user wants to be alerted about " + "a future event, e.g. 'tell me when guests arrive' or 'notify me when the package is picked up'. " + "Only one watch job can run at a time. Returns a job ID." + ), + "parameters": { + "type": "object", + "properties": { + "camera": { + "type": "string", + "description": "Camera ID to monitor.", + }, + "condition": { + "type": "string", + "description": ( + "Natural-language description of the condition to watch for, " + "e.g. 'a person arrives at the front door'." + ), + }, + "max_duration_minutes": { + "type": "integer", + "description": "Maximum time to watch before giving up (minutes, default 60).", + "default": 60, + }, + "labels": { + "type": "array", + "items": {"type": "string"}, + "description": "Object labels that should trigger a VLM check (e.g. ['person', 'car']). If omitted, any detection on the camera triggers a check.", + }, + "zones": { + "type": "array", + "items": {"type": "string"}, + "description": "Zone names to filter by. If specified, only detections in these zones trigger a VLM check.", + }, + }, + "required": ["camera", "condition"], + }, + }, + }, + { + "type": "function", + "function": { + "name": "stop_camera_watch", + "description": ( + "Cancel the currently running VLM watch job. Use this when the user wants to " + "stop a previously started watch, e.g. 'stop watching the front door'." + ), + "parameters": { + "type": "object", + "properties": {}, + "required": [], + }, + }, + }, ] @@ -565,16 +641,75 @@ async def _execute_tool_internal( ) return {"error": "Camera parameter is required"} return await _execute_get_live_context(request, camera, allowed_cameras) + elif tool_name == "start_camera_watch": + return await _execute_start_camera_watch(request, arguments) + elif tool_name == "stop_camera_watch": + return _execute_stop_camera_watch() else: logger.error( - "Tool call failed: unknown tool %r. Expected one of: search_objects, get_live_context. " - "Arguments received: %s", + "Tool call failed: unknown tool %r. Expected one of: search_objects, get_live_context, " + "start_camera_watch, stop_camera_watch. Arguments received: %s", tool_name, json.dumps(arguments), ) return {"error": f"Unknown tool: {tool_name}"} +async def _execute_start_camera_watch( + request: Request, + arguments: Dict[str, Any], +) -> Dict[str, Any]: + camera = arguments.get("camera", "").strip() + condition = arguments.get("condition", "").strip() + max_duration_minutes = int(arguments.get("max_duration_minutes", 60)) + labels = arguments.get("labels") or [] + zones = arguments.get("zones") or [] + + if not camera or not condition: + return {"error": "camera and condition are required."} + + config = request.app.frigate_config + if camera not in config.cameras: + return {"error": f"Camera '{camera}' not found."} + + genai_manager = request.app.genai_manager + vision_client = genai_manager.vision_client or genai_manager.tool_client + if vision_client is None: + return {"error": "No vision/GenAI provider configured."} + + try: + job_id = start_vlm_watch_job( + camera=camera, + condition=condition, + max_duration_minutes=max_duration_minutes, + config=config, + frame_processor=request.app.detected_frames_processor, + genai_manager=genai_manager, + dispatcher=request.app.dispatcher, + labels=labels, + zones=zones, + ) + except RuntimeError as e: + logger.error("Failed to start VLM watch job: %s", e, exc_info=True) + return {"error": "Failed to start VLM watch job."} + + return { + "success": True, + "job_id": job_id, + "message": ( + f"Now watching '{camera}' for: {condition}. " + f"You'll receive a notification when the condition is met (timeout: {max_duration_minutes} min)." + ), + } + + +def _execute_stop_camera_watch() -> Dict[str, Any]: + cancelled = stop_vlm_watch_job() + if cancelled: + return {"success": True, "message": "Watch job cancelled."} + return {"success": False, "message": "No active watch job to cancel."} + + async def _execute_pending_tools( pending_tool_calls: List[Dict[str, Any]], request: Request, @@ -991,3 +1126,95 @@ Always be accurate with time calculations based on the current date provided.{ca }, status_code=500, ) + + +# --------------------------------------------------------------------------- +# VLM Monitor endpoints +# --------------------------------------------------------------------------- + + +@router.post( + "/vlm/monitor", + dependencies=[Depends(allow_any_authenticated())], + summary="Start a VLM watch job", + description=( + "Start monitoring a camera with the vision provider. " + "The VLM analyzes live frames until the specified condition is met, " + "then sends a notification. Only one watch job can run at a time." + ), +) +async def start_vlm_monitor( + request: Request, + body: VLMMonitorRequest, +) -> JSONResponse: + config = request.app.frigate_config + genai_manager = request.app.genai_manager + + if body.camera not in config.cameras: + return JSONResponse( + content={"success": False, "message": f"Camera '{body.camera}' not found."}, + status_code=404, + ) + + vision_client = genai_manager.vision_client or genai_manager.tool_client + if vision_client is None: + return JSONResponse( + content={ + "success": False, + "message": "No vision/GenAI provider configured.", + }, + status_code=400, + ) + + try: + job_id = start_vlm_watch_job( + camera=body.camera, + condition=body.condition, + max_duration_minutes=body.max_duration_minutes, + config=config, + frame_processor=request.app.detected_frames_processor, + genai_manager=genai_manager, + dispatcher=request.app.dispatcher, + labels=body.labels, + zones=body.zones, + ) + except RuntimeError as e: + logger.error("Failed to start VLM watch job: %s", e, exc_info=True) + return JSONResponse( + content={"success": False, "message": "Failed to start VLM watch job."}, + status_code=409, + ) + + return JSONResponse( + content={"success": True, "job_id": job_id}, + status_code=201, + ) + + +@router.get( + "/vlm/monitor", + dependencies=[Depends(allow_any_authenticated())], + summary="Get current VLM watch job", + description="Returns the current (or most recently completed) VLM watch job.", +) +async def get_vlm_monitor() -> JSONResponse: + job = get_vlm_watch_job() + if job is None: + return JSONResponse(content={"active": False}, status_code=200) + return JSONResponse(content={"active": True, **job.to_dict()}, status_code=200) + + +@router.delete( + "/vlm/monitor", + dependencies=[Depends(allow_any_authenticated())], + summary="Cancel the current VLM watch job", + description="Cancels the running watch job if one exists.", +) +async def cancel_vlm_monitor() -> JSONResponse: + cancelled = stop_vlm_watch_job() + if not cancelled: + return JSONResponse( + content={"success": False, "message": "No active watch job to cancel."}, + status_code=404, + ) + return JSONResponse(content={"success": True}, status_code=200) diff --git a/frigate/comms/webpush.py b/frigate/comms/webpush.py index 62cc12c9a..ad8142f6f 100644 --- a/frigate/comms/webpush.py +++ b/frigate/comms/webpush.py @@ -210,6 +210,15 @@ class WebPushClient(Communicator): logger.debug(f"Notifications for {camera} are currently suspended.") return self.send_trigger(decoded) + elif topic == "camera_monitoring": + decoded = json.loads(payload) + camera = decoded["camera"] + if not self.config.cameras[camera].notifications.enabled: + return + if self.is_camera_suspended(camera): + logger.debug(f"Notifications for {camera} are currently suspended.") + return + self.send_camera_monitoring(decoded) elif topic == "notification_test": if not self.config.notifications.enabled and not any( cam.notifications.enabled for cam in self.config.cameras.values() @@ -477,6 +486,30 @@ class WebPushClient(Communicator): self.cleanup_registrations() + def send_camera_monitoring(self, payload: dict[str, Any]) -> None: + camera: str = payload["camera"] + camera_name: str = getattr( + self.config.cameras[camera], "friendly_name", None + ) or titlecase(camera.replace("_", " ")) + + self.check_registrations() + + reasoning: str = payload.get("reasoning", "") + title = f"{camera_name}: Monitoring Alert" + message = (reasoning[:197] + "...") if len(reasoning) > 200 else reasoning + + logger.debug(f"Sending camera monitoring push notification for {camera_name}") + + for user in self.web_pushers: + self.send_push_notification( + user=user, + payload=payload, + title=title, + message=message, + ) + + self.cleanup_registrations() + def stop(self) -> None: logger.info("Closing notification queue") self.notification_thread.join() diff --git a/frigate/jobs/vlm_watch.py b/frigate/jobs/vlm_watch.py new file mode 100644 index 000000000..dae5e5f41 --- /dev/null +++ b/frigate/jobs/vlm_watch.py @@ -0,0 +1,405 @@ +"""VLM watch job: continuously monitors a camera and notifies when a condition is met.""" + +import base64 +import json +import logging +import re +import threading +import time +from dataclasses import asdict, dataclass, field +from datetime import datetime +from typing import Any, Optional + +import cv2 + +from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum +from frigate.comms.inter_process import InterProcessRequestor +from frigate.config import FrigateConfig +from frigate.const import UPDATE_JOB_STATE +from frigate.jobs.job import Job +from frigate.types import JobStatusTypesEnum + +logger = logging.getLogger(__name__) + +# Polling interval bounds (seconds) +_MIN_INTERVAL = 1 +_MAX_INTERVAL = 300 + +# Max user/assistant turn pairs to keep in conversation history +_MAX_HISTORY = 10 + + +@dataclass +class VLMWatchJob(Job): + """Job state for a VLM watch monitor.""" + + job_type: str = "vlm_watch" + camera: str = "" + condition: str = "" + max_duration_minutes: int = 60 + labels: list = field(default_factory=list) + zones: list = field(default_factory=list) + last_reasoning: str = "" + iteration_count: int = 0 + + def to_dict(self) -> dict[str, Any]: + return asdict(self) + + +class VLMWatchRunner(threading.Thread): + """Background thread that polls a camera with the vision client until a condition is met.""" + + def __init__( + self, + job: VLMWatchJob, + config: FrigateConfig, + cancel_event: threading.Event, + frame_processor, + genai_manager, + dispatcher, + ) -> None: + super().__init__(daemon=True, name=f"vlm_watch_{job.id}") + self.job = job + self.config = config + self.cancel_event = cancel_event + self.frame_processor = frame_processor + self.genai_manager = genai_manager + self.dispatcher = dispatcher + self.requestor = InterProcessRequestor() + self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.video.value) + self.conversation: list[dict[str, Any]] = [] + + def run(self) -> None: + self.job.status = JobStatusTypesEnum.running + self.job.start_time = time.time() + self._broadcast_status() + self.conversation = [{"role": "system", "content": self._build_system_prompt()}] + + max_end_time = self.job.start_time + self.job.max_duration_minutes * 60 + + try: + while not self.cancel_event.is_set(): + if time.time() > max_end_time: + logger.debug( + "VLM watch job %s timed out after %d minutes", + self.job.id, + self.job.max_duration_minutes, + ) + self.job.status = JobStatusTypesEnum.failed + self.job.error_message = f"Monitor timed out after {self.job.max_duration_minutes} minutes" + break + + next_run_in = self._run_iteration() + + if self.job.status == JobStatusTypesEnum.success: + break + + self._wait_for_trigger(next_run_in) + + except Exception as e: + logger.exception("VLM watch job %s failed: %s", self.job.id, e) + self.job.status = JobStatusTypesEnum.failed + self.job.error_message = str(e) + + finally: + if self.job.status == JobStatusTypesEnum.running: + self.job.status = JobStatusTypesEnum.cancelled + self.job.end_time = time.time() + self._broadcast_status() + try: + self.detection_subscriber.stop() + except Exception: + pass + try: + self.requestor.stop() + except Exception: + pass + + def _run_iteration(self) -> float: + """Run one VLM analysis iteration. Returns seconds until next run.""" + vision_client = ( + self.genai_manager.vision_client or self.genai_manager.tool_client + ) + if vision_client is None: + logger.warning("VLM watch job %s: no vision client available", self.job.id) + return 30 + + frame = self.frame_processor.get_current_frame(self.job.camera, {}) + if frame is None: + logger.debug( + "VLM watch job %s: frame unavailable for camera %s", + self.job.id, + self.job.camera, + ) + self.job.last_reasoning = "Camera frame unavailable" + return 10 + + # Downscale frame to 480p max height + h, w = frame.shape[:2] + if h > 480: + scale = 480.0 / h + frame = cv2.resize( + frame, (int(w * scale), 480), interpolation=cv2.INTER_AREA + ) + + _, enc = cv2.imencode(".jpg", frame, [cv2.IMWRITE_JPEG_QUALITY, 85]) + b64 = base64.b64encode(enc.tobytes()).decode() + + timestamp = datetime.now().strftime("%H:%M:%S") + self.conversation.append( + { + "role": "user", + "content": [ + {"type": "text", "text": f"Frame captured at {timestamp}."}, + { + "type": "image_url", + "image_url": {"url": f"data:image/jpeg;base64,{b64}"}, + }, + ], + } + ) + + response = vision_client.chat_with_tools( + messages=self.conversation, + tools=None, + tool_choice=None, + ) + response_str = response.get("content") or "" + + if not response_str: + logger.warning( + "VLM watch job %s: empty response from vision client", self.job.id + ) + # Remove the user message we just added so we don't leave a dangling turn + self.conversation.pop() + return 30 + + logger.debug("VLM watch job %s response: %s", self.job.id, response_str) + + self.conversation.append({"role": "assistant", "content": response_str}) + + # Keep system prompt + last _MAX_HISTORY user/assistant pairs + max_msgs = 1 + _MAX_HISTORY * 2 + if len(self.conversation) > max_msgs: + self.conversation = [self.conversation[0]] + self.conversation[ + -(max_msgs - 1) : + ] + + try: + clean = re.sub( + r"\n?```$", "", re.sub(r"^```[a-zA-Z0-9]*\n?", "", response_str) + ) + parsed = json.loads(clean) + condition_met = bool(parsed.get("condition_met", False)) + next_run_in = max( + _MIN_INTERVAL, + min(_MAX_INTERVAL, int(parsed.get("next_run_in", 30))), + ) + reasoning = str(parsed.get("reasoning", "")) + except (json.JSONDecodeError, ValueError, TypeError) as e: + logger.warning( + "VLM watch job %s: failed to parse VLM response: %s", self.job.id, e + ) + return 30 + + self.job.last_reasoning = reasoning + self.job.iteration_count += 1 + self._broadcast_status() + + if condition_met: + logger.debug( + "VLM watch job %s: condition met on camera %s — %s", + self.job.id, + self.job.camera, + reasoning, + ) + self._send_notification(reasoning) + self.job.status = JobStatusTypesEnum.success + return 0 + + return next_run_in + + def _wait_for_trigger(self, max_wait: float) -> None: + """Wait up to max_wait seconds, returning early if a relevant detection fires on the target camera.""" + deadline = time.time() + max_wait + while not self.cancel_event.is_set(): + remaining = deadline - time.time() + if remaining <= 0: + break + topic, payload = self.detection_subscriber.check_for_update( + timeout=min(1.0, remaining) + ) + if topic is None or payload is None: + continue + # payload = (camera, frame_name, frame_time, tracked_objects, motion_boxes, regions) + cam = payload[0] + tracked_objects = payload[3] + logger.debug( + "VLM watch job %s: detection event cam=%s (want %s), objects=%s", + self.job.id, + cam, + self.job.camera, + [ + {"label": o.get("label"), "zones": o.get("current_zones")} + for o in (tracked_objects or []) + ], + ) + if cam != self.job.camera or not tracked_objects: + continue + if self._detection_matches_filters(tracked_objects): + logger.debug( + "VLM watch job %s: woken early by detection event on %s", + self.job.id, + self.job.camera, + ) + break + + def _detection_matches_filters(self, tracked_objects: list) -> bool: + """Return True if any tracked object passes the label and zone filters.""" + labels = self.job.labels + zones = self.job.zones + for obj in tracked_objects: + label_ok = not labels or obj.get("label") in labels + zone_ok = not zones or bool(set(obj.get("current_zones", [])) & set(zones)) + if label_ok and zone_ok: + return True + return False + + def _build_system_prompt(self) -> str: + focus_text = "" + if self.job.labels or self.job.zones: + parts = [] + if self.job.labels: + parts.append(f"object types: {', '.join(self.job.labels)}") + if self.job.zones: + parts.append(f"zones: {', '.join(self.job.zones)}") + focus_text = f"\nFocus on {' and '.join(parts)}.\n" + + return ( + f'You are monitoring a security camera. Your task: determine when "{self.job.condition}" occurs.\n' + f"{focus_text}\n" + f"You will receive a sequence of frames over time. Use the conversation history to understand " + f"what is stationary vs. actively changing.\n\n" + f"For each frame respond with JSON only:\n" + f'{{"condition_met": , "next_run_in": , "reasoning": ""}}\n\n' + f"Guidelines for next_run_in:\n" + f"- Scene is empty / nothing of interest visible: 60-300.\n" + f"- Relevant object(s) visible anywhere in frame (even outside the target zone): 3-10. " + f"They may be moving toward the zone.\n" + f"- Condition is actively forming (object approaching zone or threshold): 1-5.\n" + f"- Set condition_met to true only when you are confident the condition is currently met.\n" + f"- Keep reasoning to 1-2 sentences." + ) + + def _send_notification(self, reasoning: str) -> None: + """Publish a camera_monitoring event so downstream handlers (web push, MQTT) can notify users.""" + payload = { + "camera": self.job.camera, + "condition": self.job.condition, + "reasoning": reasoning, + "job_id": self.job.id, + } + + if self.dispatcher: + try: + self.dispatcher.publish("camera_monitoring", json.dumps(payload)) + except Exception as e: + logger.warning( + "VLM watch job %s: failed to publish alert: %s", self.job.id, e + ) + + def _broadcast_status(self) -> None: + try: + self.requestor.send_data(UPDATE_JOB_STATE, self.job.to_dict()) + except Exception as e: + logger.warning( + "VLM watch job %s: failed to broadcast status: %s", self.job.id, e + ) + + +# Module-level singleton (only one watch job at a time) +_current_job: Optional[VLMWatchJob] = None +_cancel_event: Optional[threading.Event] = None +_job_lock = threading.Lock() + + +def start_vlm_watch_job( + camera: str, + condition: str, + max_duration_minutes: int, + config: FrigateConfig, + frame_processor, + genai_manager, + dispatcher, + labels: list[str] | None = None, + zones: list[str] | None = None, +) -> str: + """Start a new VLM watch job. Returns the job ID. + + Raises RuntimeError if a job is already running. + """ + global _current_job, _cancel_event + + with _job_lock: + if _current_job is not None and _current_job.status in ( + JobStatusTypesEnum.queued, + JobStatusTypesEnum.running, + ): + raise RuntimeError( + f"A VLM watch job is already running (id={_current_job.id}). " + "Cancel it before starting a new one." + ) + + job = VLMWatchJob( + camera=camera, + condition=condition, + max_duration_minutes=max_duration_minutes, + labels=labels or [], + zones=zones or [], + ) + cancel_ev = threading.Event() + _current_job = job + _cancel_event = cancel_ev + + runner = VLMWatchRunner( + job=job, + config=config, + cancel_event=cancel_ev, + frame_processor=frame_processor, + genai_manager=genai_manager, + dispatcher=dispatcher, + ) + runner.start() + + logger.debug( + "Started VLM watch job %s: camera=%s, condition=%r, max_duration=%dm", + job.id, + camera, + condition, + max_duration_minutes, + ) + return job.id + + +def stop_vlm_watch_job() -> bool: + """Cancel the current VLM watch job. Returns True if a job was cancelled.""" + global _current_job, _cancel_event + + with _job_lock: + if _current_job is None or _current_job.status not in ( + JobStatusTypesEnum.queued, + JobStatusTypesEnum.running, + ): + return False + + if _cancel_event: + _cancel_event.set() + + _current_job.status = JobStatusTypesEnum.cancelled + logger.debug("Cancelled VLM watch job %s", _current_job.id) + return True + + +def get_vlm_watch_job() -> Optional[VLMWatchJob]: + """Return the current (or most recent) VLM watch job.""" + return _current_job