api to create, edit, delete triggers

This commit is contained in:
Josh Hawkins 2025-06-29 16:41:34 -05:00
parent 783c20a6fc
commit fbc9c2c101
3 changed files with 274 additions and 2 deletions

View File

@ -1,4 +1,4 @@
from typing import List, Optional, Union
from typing import List, Literal, Optional, Union
from pydantic import BaseModel, Field
@ -45,3 +45,9 @@ class EventsDeleteBody(BaseModel):
class SubmitPlusBody(BaseModel):
include_annotation: int = Field(default=1)
class TriggerEmbeddingBody(BaseModel):
type: Literal["text", "image", "both"]
data: str
threshold: float = Field(default=0.5, ge=0.0, le=1.0)

View File

@ -1,5 +1,6 @@
"""Event apis."""
import base64
import datetime
import logging
import os
@ -10,6 +11,7 @@ from pathlib import Path
from urllib.parse import unquote
import cv2
import numpy as np
from fastapi import APIRouter, Request
from fastapi.params import Depends
from fastapi.responses import JSONResponse
@ -34,6 +36,7 @@ from frigate.api.defs.request.events_body import (
EventsLPRBody,
EventsSubLabelBody,
SubmitPlusBody,
TriggerEmbeddingBody,
)
from frigate.api.defs.response.event_response import (
EventCreateResponse,
@ -46,9 +49,10 @@ from frigate.api.defs.tags import Tags
from frigate.comms.event_metadata_updater import EventMetadataTypeEnum
from frigate.const import CLIPS_DIR
from frigate.embeddings import EmbeddingsContext
from frigate.models import Event, ReviewSegment, Timeline
from frigate.models import Event, ReviewSegment, Timeline, Trigger
from frigate.track.object_processing import TrackedObject
from frigate.util.builtin import get_tz_modifiers
from frigate.util.path import get_event_thumbnail_bytes
logger = logging.getLogger(__name__)
@ -1435,3 +1439,264 @@ def end_event(request: Request, event_id: str, body: EventsEndBody):
content=({"success": True, "message": "Event successfully ended."}),
status_code=200,
)
@router.post(
"/trigger/embedding",
response_model=dict,
dependencies=[Depends(require_role(["admin"]))],
)
def create_trigger_embedding(
request: Request,
body: TriggerEmbeddingBody,
camera: str,
name: str,
):
try:
if not request.app.frigate_config.semantic_search.enabled:
return JSONResponse(
content={
"success": False,
"message": "Semantic search is not enabled",
},
status_code=400,
)
# Check if trigger already exists
if (
Trigger.select()
.where(Trigger.camera == camera, Trigger.name == name)
.exists()
):
return JSONResponse(
content={
"success": False,
"message": f"Trigger {camera}:{name} already exists",
},
status_code=400,
)
context: EmbeddingsContext = request.app.embeddings
# Generate embedding based on type
embedding = None
if body.type == "text" or body.type == "both":
embedding = context.generate_description_embedding(body.data)
elif body.type == "image":
try:
event: Event = Event.get(Event.id == body.data)
except DoesNotExist:
# TODO: check triggers directory for image
return JSONResponse(
content={
"success": False,
"message": f"Failed to fetch event for {body.type} trigger",
},
status_code=400,
)
# Skip the event if not an object
if event.data.get("type") != "object":
return
if thumbnail := get_event_thumbnail_bytes(event):
cursor = context.db.execute_sql(
"""
SELECT thumbnail_embedding FROM vec_thumbnails WHERE id = ?
""",
[body.data],
)
row = cursor.fetchone() if cursor else None
if row:
query_embedding = row[0]
embedding = np.frombuffer(query_embedding, dtype=np.float32)
else:
# TODO: fixme
# Extract valid thumbnail
thumbnail = get_event_thumbnail_bytes(event)
# TODO: save image to the triggers directory
embedding = context.generate_image_embedding(
body.data, (base64.b64encode(thumbnail).decode("ASCII"))
)
if embedding is None:
return JSONResponse(
content={
"success": False,
"message": f"Failed to generate embedding for {body.type} trigger",
},
status_code=400,
)
Trigger.create(
camera=camera,
name=name,
type=body.type,
data=body.data,
threshold=body.threshold,
model=request.app.frigate_config.semantic_search.model,
embedding=np.array(embedding, dtype=np.float32).tobytes(),
triggering_event_id="",
last_triggered=None,
)
return JSONResponse(
content={
"success": True,
"message": f"Trigger created successfully for {camera}:{name}",
},
status_code=200,
)
except Exception as e:
return JSONResponse(
content={
"success": False,
"message": f"Error creating trigger embedding: {str(e)}",
},
status_code=500,
)
@router.put(
"/trigger/embedding/{camera}/{name}",
response_model=dict,
dependencies=[Depends(require_role(["admin"]))],
)
def update_trigger_embedding(
request: Request,
camera: str,
name: str,
body: TriggerEmbeddingBody,
):
try:
if not request.app.frigate_config.semantic_search.enabled:
return JSONResponse(
content={
"success": False,
"message": "Semantic search is not enabled",
},
status_code=400,
)
context: EmbeddingsContext = request.app.embeddings
# Generate embedding based on type
embedding = None
if body.type == "text" or body.type == "both":
embedding = context.generate_description_embedding(body.data)
elif body.type == "image":
try:
event: Event = Event.get(Event.id == body.data)
except DoesNotExist:
# TODO: check triggers directory for image
return JSONResponse(
content={
"success": False,
"message": f"Failed to fetch event for {body.type} trigger",
},
status_code=400,
)
# Skip the event if not an object
if event.data.get("type") != "object":
return
# Extract valid thumbnail
thumbnail = get_event_thumbnail_bytes(event)
# TODO: save image to the triggers directory
embedding = context.generate_image_embedding(
body.data, (base64.b64encode(thumbnail).decode("ASCII"))
)
if embedding is None:
return JSONResponse(
content={
"success": False,
"message": f"Failed to generate embedding for {body.type} trigger",
},
status_code=400,
)
updated = (
Trigger.update(
data=body.data,
model=request.app.frigate_config.semantic_search.model,
embedding=np.array(embedding, dtype=np.float32).tobytes(),
threshold=body.threshold,
triggering_event_id="",
last_triggered=None,
)
.where(Trigger.camera == camera, Trigger.name == name)
.execute()
)
if updated == 0:
return JSONResponse(
content={
"success": False,
"message": f"Trigger {camera}:{name} not found",
},
status_code=404,
)
return JSONResponse(
content={
"success": True,
"message": f"Trigger updated successfully for {camera}:{name}",
},
status_code=200,
)
except Exception as e:
return JSONResponse(
content={
"success": False,
"message": f"Error updating trigger embedding: {str(e)}",
},
status_code=500,
)
@router.delete(
"/trigger/embedding/{camera}/{name}",
response_model=dict,
dependencies=[Depends(require_role(["admin"]))],
)
def delete_trigger_embedding(
camera: str,
name: str,
):
try:
deleted = (
Trigger.delete()
.where(Trigger.camera == camera, Trigger.name == name)
.execute()
)
if deleted == 0:
return JSONResponse(
content={
"success": False,
"message": f"Trigger {camera}:{name} not found",
},
status_code=404,
)
return JSONResponse(
content={
"success": True,
"message": f"Trigger deleted successfully for {camera}:{name}",
},
status_code=200,
)
except Exception as e:
return JSONResponse(
content={
"success": False,
"message": f"Error deleting trigger embedding: {str(e)}",
},
status_code=500,
)

View File

@ -23,6 +23,7 @@ class CameraConfigUpdateEnum(str, Enum):
record = "record"
remove = "remove" # for removing a camera
review = "review"
semantic_search = "semantic_search" # for semantic search triggers
snapshots = "snapshots"
zones = "zones"