threading

This commit is contained in:
Josh Hawkins 2025-03-27 10:47:11 -05:00
parent 265b9fa698
commit eaa7e07895
3 changed files with 55 additions and 6 deletions

View File

@ -320,7 +320,27 @@ def reindex_embeddings(request: Request):
context: EmbeddingsContext = request.app.embeddings
response = context.reindex_embeddings()
return JSONResponse(
content=response,
status_code=200,
)
if response == "started":
return JSONResponse(
content={
"success": True,
"message": "Embeddings reindexing has started.",
},
status_code=202, # 202 Accepted
)
elif response == "in_progress":
return JSONResponse(
content={
"success": False,
"message": "Embeddings reindexing is already in progress.",
},
status_code=409, # 409 Conflict
)
else:
return JSONResponse(
content={
"success": False,
"message": "Failed to start reindexing.",
},
status_code=500,
)

View File

@ -3,6 +3,7 @@
import datetime
import logging
import os
import threading
import time
from numpy import ndarray
@ -74,6 +75,10 @@ class Embeddings:
self.metrics = metrics
self.requestor = InterProcessRequestor()
self.reindex_lock = threading.Lock()
self.reindex_thread = None
self.reindex_running = False
# Create tables if they don't exist
self.db.create_embeddings_tables()
@ -368,3 +373,27 @@ class Embeddings:
totals["status"] = "completed"
self.requestor.send_data(UPDATE_EMBEDDINGS_REINDEX_PROGRESS, totals)
def start_reindex(self) -> bool:
"""Start reindexing in a separate thread if not already running."""
with self.reindex_lock:
if self.reindex_running:
logger.warning("Reindex embeddings is already running.")
return False
# Mark as running and start the thread
self.reindex_running = True
self.reindex_thread = threading.Thread(
target=self._reindex_wrapper, daemon=True
)
self.reindex_thread.start()
return True
def _reindex_wrapper(self) -> None:
"""Wrapper to run reindex and reset running flag when done."""
try:
self.reindex()
finally:
with self.reindex_lock:
self.reindex_running = False
self.reindex_thread = None

View File

@ -207,8 +207,8 @@ class EmbeddingMaintainer(threading.Thread):
pack=False,
)
elif topic == EmbeddingsRequestEnum.reindex.value:
self.embeddings.reindex()
return "Embeddings reindex in progress"
response = self.embeddings.start_reindex()
return "started" if response else "in_progress"
processors = [self.realtime_processors, self.post_processors]
for processor_list in processors: