From eaa7e078957a1b2b788f710fb0a48c5f795fc8ec Mon Sep 17 00:00:00 2001 From: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com> Date: Thu, 27 Mar 2025 10:47:11 -0500 Subject: [PATCH] threading --- frigate/api/classification.py | 28 ++++++++++++++++++++++++---- frigate/embeddings/embeddings.py | 29 +++++++++++++++++++++++++++++ frigate/embeddings/maintainer.py | 4 ++-- 3 files changed, 55 insertions(+), 6 deletions(-) diff --git a/frigate/api/classification.py b/frigate/api/classification.py index cdeb31df7..70a641908 100644 --- a/frigate/api/classification.py +++ b/frigate/api/classification.py @@ -320,7 +320,27 @@ def reindex_embeddings(request: Request): context: EmbeddingsContext = request.app.embeddings response = context.reindex_embeddings() - return JSONResponse( - content=response, - status_code=200, - ) + if response == "started": + return JSONResponse( + content={ + "success": True, + "message": "Embeddings reindexing has started.", + }, + status_code=202, # 202 Accepted + ) + elif response == "in_progress": + return JSONResponse( + content={ + "success": False, + "message": "Embeddings reindexing is already in progress.", + }, + status_code=409, # 409 Conflict + ) + else: + return JSONResponse( + content={ + "success": False, + "message": "Failed to start reindexing.", + }, + status_code=500, + ) diff --git a/frigate/embeddings/embeddings.py b/frigate/embeddings/embeddings.py index 7e866d1fe..d2053f5ee 100644 --- a/frigate/embeddings/embeddings.py +++ b/frigate/embeddings/embeddings.py @@ -3,6 +3,7 @@ import datetime import logging import os +import threading import time from numpy import ndarray @@ -74,6 +75,10 @@ class Embeddings: self.metrics = metrics self.requestor = InterProcessRequestor() + self.reindex_lock = threading.Lock() + self.reindex_thread = None + self.reindex_running = False + # Create tables if they don't exist self.db.create_embeddings_tables() @@ -368,3 +373,27 @@ class Embeddings: totals["status"] = "completed" self.requestor.send_data(UPDATE_EMBEDDINGS_REINDEX_PROGRESS, totals) + + def start_reindex(self) -> bool: + """Start reindexing in a separate thread if not already running.""" + with self.reindex_lock: + if self.reindex_running: + logger.warning("Reindex embeddings is already running.") + return False + + # Mark as running and start the thread + self.reindex_running = True + self.reindex_thread = threading.Thread( + target=self._reindex_wrapper, daemon=True + ) + self.reindex_thread.start() + return True + + def _reindex_wrapper(self) -> None: + """Wrapper to run reindex and reset running flag when done.""" + try: + self.reindex() + finally: + with self.reindex_lock: + self.reindex_running = False + self.reindex_thread = None diff --git a/frigate/embeddings/maintainer.py b/frigate/embeddings/maintainer.py index 6bd065ce3..85b0e6d54 100644 --- a/frigate/embeddings/maintainer.py +++ b/frigate/embeddings/maintainer.py @@ -207,8 +207,8 @@ class EmbeddingMaintainer(threading.Thread): pack=False, ) elif topic == EmbeddingsRequestEnum.reindex.value: - self.embeddings.reindex() - return "Embeddings reindex in progress" + response = self.embeddings.start_reindex() + return "started" if response else "in_progress" processors = [self.realtime_processors, self.post_processors] for processor_list in processors: