filter similarity results in python, not sqlite-vec

This commit is contained in:
Josh Hawkins 2026-04-08 15:59:36 -05:00
parent 635e41fc8c
commit 82d5fbfb87
2 changed files with 44 additions and 224 deletions

View File

@ -109,11 +109,6 @@ class VLMMonitorRequest(BaseModel):
VISUAL_WEIGHT = 0.65
DESCRIPTION_WEIGHT = 0.35
# Must match or stay <= the k used by EmbeddingsContext vec searches
# (see frigate/embeddings/__init__.py search_thumbnail/search_description).
# Pre-filtering a larger pool is wasted work — vec will only rank top-k anyway.
CANDIDATE_CAP = 100
def _distance_to_score(distance: float, stats: ZScoreNormalization) -> float:
"""Convert a cosine distance to a [0, 1] similarity score.
@ -149,46 +144,6 @@ def _fuse_scores(
return VISUAL_WEIGHT * visual_score + DESCRIPTION_WEIGHT * description_score
def _build_similar_candidates_query(
anchor_id: str,
after: Optional[float],
before: Optional[float],
cameras: Optional[List[str]],
labels: Optional[List[str]],
sub_labels: Optional[List[str]],
zones: Optional[List[str]],
) -> List[str]:
"""Return up to CANDIDATE_CAP event ids eligible as similarity candidates.
Pre-filters events by the structured fields, excludes the anchor itself,
and orders by most recent first so over-cap queries keep recent events.
"""
clauses = [Event.id != anchor_id]
if after is not None:
clauses.append(Event.start_time >= after)
if before is not None:
clauses.append(Event.start_time <= before)
if cameras:
clauses.append(Event.camera.in_(cameras))
if labels:
clauses.append(Event.label.in_(labels))
if sub_labels:
clauses.append(Event.sub_label.in_(sub_labels))
if zones:
# Mirror the pattern used by frigate/api/event.py for JSON-array zone match.
zone_clauses = [Event.zones.cast("text") % f'*"{zone}"*' for zone in zones]
clauses.append(reduce(operator.or_, zone_clauses))
query = (
Event.select(Event.id)
.where(reduce(operator.and_, clauses))
.order_by(Event.start_time.desc())
.limit(CANDIDATE_CAP)
)
return [row.id for row in query]
def get_tool_definitions() -> List[Dict[str, Any]]:
"""
Get OpenAI-compatible tool definitions for Frigate.
@ -692,33 +647,17 @@ async def _execute_find_similar_objects(
limit = int(arguments.get("limit", 10))
limit = max(1, min(limit, 50))
# 4. Pre-filter candidates.
candidate_ids = _build_similar_candidates_query(
anchor_id=anchor.id,
after=after,
before=before,
cameras=cameras,
labels=labels,
sub_labels=sub_labels,
zones=zones,
)
candidate_truncated = len(candidate_ids) == CANDIDATE_CAP
if not candidate_ids:
return {
"anchor": _hydrate_event(anchor),
"results": [],
"similarity_mode": similarity_mode,
"candidate_truncated": False,
}
# 5. Run similarity searches.
# 4. Run similarity searches. We deliberately do NOT pass event_ids into
# the vec queries — the IN filter on sqlite-vec is broken in the installed
# version (see frigate/embeddings/__init__.py). Mirror the pattern used by
# frigate/api/event.py events_search: fetch top-k globally, then intersect
# with the structured filters via Peewee.
visual_distances: Dict[str, float] = {}
description_distances: Dict[str, float] = {}
try:
if similarity_mode in ("visual", "fused"):
rows = context.search_thumbnail(anchor, event_ids=candidate_ids)
rows = context.search_thumbnail(anchor)
visual_distances = {row[0]: row[1] for row in rows}
if similarity_mode in ("semantic", "fused"):
@ -727,7 +666,7 @@ async def _execute_find_similar_objects(
or anchor.sub_label
or anchor.label
)
rows = context.search_description(query_text, event_ids=candidate_ids)
rows = context.search_description(query_text)
description_distances = {row[0]: row[1] for row in rows}
except Exception:
logger.exception("Similarity search failed")
@ -736,10 +675,44 @@ async def _execute_find_similar_objects(
"message": "Failed to run similarity search.",
}
vec_ids = set(visual_distances) | set(description_distances)
vec_ids.discard(anchor.id)
# vec layer returns up to k=100 per modality; flag when we hit that ceiling
# so the LLM can mention there may be more matches beyond what we saw.
candidate_truncated = (
len(visual_distances) >= 100 or len(description_distances) >= 100
)
if not vec_ids:
return {
"anchor": _hydrate_event(anchor),
"results": [],
"similarity_mode": similarity_mode,
"candidate_truncated": candidate_truncated,
}
# 5. Apply structured filters, intersected with vec hits.
clauses = [Event.id.in_(list(vec_ids))]
if after is not None:
clauses.append(Event.start_time >= after)
if before is not None:
clauses.append(Event.start_time <= before)
if cameras:
clauses.append(Event.camera.in_(cameras))
if labels:
clauses.append(Event.label.in_(labels))
if sub_labels:
clauses.append(Event.sub_label.in_(sub_labels))
if zones:
# Mirror the pattern used by frigate/api/event.py for JSON-array zone match.
zone_clauses = [Event.zones.cast("text") % f'*"{zone}"*' for zone in zones]
clauses.append(reduce(operator.or_, zone_clauses))
eligible = {e.id: e for e in Event.select().where(reduce(operator.and_, clauses))}
# 6. Fuse and rank.
scored: List[tuple[str, float]] = []
matched_ids = set(visual_distances) | set(description_distances)
for eid in matched_ids:
for eid in eligible:
v_score = (
_distance_to_score(visual_distances[eid], context.thumb_stats)
if eid in visual_distances
@ -760,19 +733,7 @@ async def _execute_find_similar_objects(
scored.sort(key=lambda pair: pair[1], reverse=True)
scored = scored[:limit]
# 7. Hydrate.
if scored:
event_rows = {
e.id: e
for e in Event.select().where(Event.id.in_([eid for eid, _ in scored]))
}
results = [
_hydrate_event(event_rows[eid], score=score)
for eid, score in scored
if eid in event_rows
]
else:
results = []
results = [_hydrate_event(eligible[eid], score=score) for eid, score in scored]
return {
"anchor": _hydrate_event(anchor),

View File

@ -10,10 +10,8 @@ from unittest.mock import MagicMock
from playhouse.sqlite_ext import SqliteExtDatabase
from frigate.api.chat import (
CANDIDATE_CAP,
DESCRIPTION_WEIGHT,
VISUAL_WEIGHT,
_build_similar_candidates_query,
_distance_to_score,
_execute_find_similar_objects,
_fuse_scores,
@ -68,145 +66,6 @@ class TestFuseScores(unittest.TestCase):
self.assertIsNone(_fuse_scores(visual_score=None, description_score=None))
class TestBuildSimilarCandidatesQuery(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
self.tmp.close()
self.db = SqliteExtDatabase(self.tmp.name)
Event.bind(self.db, bind_refs=False, bind_backrefs=False)
self.db.connect()
self.db.create_tables([Event])
# Minimal helper for creating events.
def make_event(
event_id,
camera="driveway",
label="car",
sub_label=None,
start=1_700_000_000,
zones=None,
):
Event.create(
id=event_id,
label=label,
sub_label=sub_label,
camera=camera,
start_time=start,
end_time=start + 10,
top_score=0.9,
score=0.9,
false_positive=False,
zones=zones or [],
thumbnail="",
has_clip=True,
has_snapshot=True,
region=[0, 0, 1, 1],
box=[0, 0, 1, 1],
area=1,
retain_indefinitely=False,
ratio=1.0,
plus_id="",
model_hash="",
detector_type="",
model_type="",
data={},
)
self.make_event = make_event
def tearDown(self):
self.db.close()
os.unlink(self.tmp.name)
def test_excludes_anchor(self):
self.make_event("anchor")
self.make_event("other")
ids = _build_similar_candidates_query(
anchor_id="anchor",
after=None,
before=None,
cameras=None,
labels=["car"],
sub_labels=None,
zones=None,
)
self.assertEqual(ids, ["other"])
def test_time_range_filters(self):
self.make_event("in_range", start=1_700_000_500)
self.make_event("too_early", start=1_699_999_000)
self.make_event("too_late", start=1_700_001_000)
ids = _build_similar_candidates_query(
anchor_id="nonexistent",
after=1_700_000_000,
before=1_700_000_999,
cameras=None,
labels=["car"],
sub_labels=None,
zones=None,
)
self.assertEqual(ids, ["in_range"])
def test_camera_filter(self):
self.make_event("driveway_a", camera="driveway")
self.make_event("porch_a", camera="porch")
ids = _build_similar_candidates_query(
anchor_id="nonexistent",
after=None,
before=None,
cameras=["driveway"],
labels=["car"],
sub_labels=None,
zones=None,
)
self.assertEqual(ids, ["driveway_a"])
def test_label_filter(self):
self.make_event("car_a", label="car")
self.make_event("person_a", label="person")
ids = _build_similar_candidates_query(
anchor_id="nonexistent",
after=None,
before=None,
cameras=None,
labels=["car"],
sub_labels=None,
zones=None,
)
self.assertEqual(ids, ["car_a"])
def test_zone_any_match(self):
self.make_event("in_zone", zones=["driveway_zone"])
self.make_event("other_zone", zones=["porch_zone"])
ids = _build_similar_candidates_query(
anchor_id="nonexistent",
after=None,
before=None,
cameras=None,
labels=["car"],
sub_labels=None,
zones=["driveway_zone"],
)
self.assertEqual(ids, ["in_zone"])
def test_respects_candidate_cap(self):
for i in range(CANDIDATE_CAP + 20):
self.make_event(f"e{i:04d}", start=1_700_000_000 + i)
ids = _build_similar_candidates_query(
anchor_id="nonexistent",
after=None,
before=None,
cameras=None,
labels=["car"],
sub_labels=None,
zones=None,
)
self.assertEqual(len(ids), CANDIDATE_CAP)
# Most recent first, so we should keep the latest CANDIDATE_CAP events.
self.assertIn(f"e{CANDIDATE_CAP + 19:04d}", ids)
self.assertNotIn("e0000", ids)
class TestToolDefinition(unittest.TestCase):
def test_find_similar_objects_is_registered(self):
tools = get_tool_definitions()