Update timouts

This commit is contained in:
Nicolas Mowen 2024-10-10 13:54:49 -06:00
parent 157b1771ce
commit ff36b8b88c
3 changed files with 9 additions and 7 deletions

View File

@ -25,9 +25,9 @@ class EmbeddingsResponder:
def check_for_request(self, process: Callable) -> None: def check_for_request(self, process: Callable) -> None:
while True: # load all messages that are queued while True: # load all messages that are queued
logger.debug("Checking for embeddings requests") print("Checking for embeddings requests")
has_message, _, _ = zmq.select([self.socket], [], [], 0.1) has_message, _, _ = zmq.select([self.socket], [], [], 0.1)
logger.debug(f"has a request? {has_message}") print(f"has a request? {has_message}")
if not has_message: if not has_message:
break break

View File

@ -43,6 +43,7 @@ def manage_embeddings(config: FrigateConfig) -> None:
listen() listen()
# Configure Frigate DB # Configure Frigate DB
print("connecting to db in embed")
db = SqliteVecQueueDatabase( db = SqliteVecQueueDatabase(
config.database.path, config.database.path,
pragmas={ pragmas={
@ -54,6 +55,7 @@ def manage_embeddings(config: FrigateConfig) -> None:
load_vec_extension=True, load_vec_extension=True,
) )
models = [Event] models = [Event]
print("binding db to model")
db.bind(models) db.bind(models)
print("creating embedding maintainer") print("creating embedding maintainer")

View File

@ -84,7 +84,7 @@ class EmbeddingMaintainer(threading.Thread):
def _process_requests(self) -> None: def _process_requests(self) -> None:
"""Process embeddings requests""" """Process embeddings requests"""
def handle_request(topic: str, data: str) -> str: def _handle_request(topic: str, data: str) -> str:
print(f"Handling embeddings request of type {topic} with data {data}") print(f"Handling embeddings request of type {topic} with data {data}")
try: try:
@ -108,11 +108,11 @@ class EmbeddingMaintainer(threading.Thread):
except Exception as e: except Exception as e:
logger.error(f"Unable to handle embeddings request {e}") logger.error(f"Unable to handle embeddings request {e}")
self.embeddings_responder.check_for_request(handle_request) self.embeddings_responder.check_for_request(_handle_request)
def _process_updates(self) -> None: def _process_updates(self) -> None:
"""Process event updates""" """Process event updates"""
update = self.event_subscriber.check_for_update() update = self.event_subscriber.check_for_update(timeout=0.1)
if update is None: if update is None:
return return
@ -150,7 +150,7 @@ class EmbeddingMaintainer(threading.Thread):
def _process_finalized(self) -> None: def _process_finalized(self) -> None:
"""Process the end of an event.""" """Process the end of an event."""
while True: while True:
ended = self.event_end_subscriber.check_for_update() ended = self.event_end_subscriber.check_for_update(timeout=0.1)
if ended == None: if ended == None:
break break
@ -245,7 +245,7 @@ class EmbeddingMaintainer(threading.Thread):
def _process_event_metadata(self): def _process_event_metadata(self):
# Check for regenerate description requests # Check for regenerate description requests
(topic, event_id, source) = self.event_metadata_subscriber.check_for_update() (topic, event_id, source) = self.event_metadata_subscriber.check_for_update(timeout=0.1)
if topic is None: if topic is None:
return return