Improve timeout handling

This commit is contained in:
Nicolas Mowen 2024-10-10 10:28:29 -06:00
parent 6f335800e9
commit 9928592266
2 changed files with 28 additions and 17 deletions

View File

@ -22,7 +22,7 @@ class EmbeddingsResponder:
def check_for_request(self, process: Callable) -> None: def check_for_request(self, process: Callable) -> None:
while True: # load all messages that are queued while True: # load all messages that are queued
has_message, _, _ = zmq.select([self.socket], [], [], 1) has_message, _, _ = zmq.select([self.socket], [], [], 0.1)
if not has_message: if not has_message:
break break

View File

@ -77,9 +77,12 @@ class EmbeddingMaintainer(threading.Thread):
"""Process embeddings requests""" """Process embeddings requests"""
def handle_request(topic: str, data: str) -> str: def handle_request(topic: str, data: str) -> str:
try:
if topic == EmbeddingsRequestEnum.embed_description.value: if topic == EmbeddingsRequestEnum.embed_description.value:
return serialize( return serialize(
self.embeddings.upsert_description(data["id"], data["description"]), self.embeddings.upsert_description(
data["id"], data["description"]
),
pack=False, pack=False,
) )
elif topic == EmbeddingsRequestEnum.embed_thumbnail.value: elif topic == EmbeddingsRequestEnum.embed_thumbnail.value:
@ -89,13 +92,17 @@ class EmbeddingMaintainer(threading.Thread):
pack=False, pack=False,
) )
elif topic == EmbeddingsRequestEnum.generate_search.value: elif topic == EmbeddingsRequestEnum.generate_search.value:
return serialize(self.embeddings.text_embedding([data])[0], pack=False) return serialize(
self.embeddings.text_embedding([data])[0], pack=False
)
except Exception as e:
logger.error(f"Unable to handle embeddings request {e}")
self.embeddings_responder.check_for_request(handle_request) self.embeddings_responder.check_for_request(handle_request)
def _process_updates(self) -> None: def _process_updates(self) -> None:
"""Process event updates""" """Process event updates"""
update = self.event_subscriber.check_for_update() update = self.event_subscriber.check_for_update(timeout=0.1)
if update is None: if update is None:
return return
@ -118,13 +125,17 @@ class EmbeddingMaintainer(threading.Thread):
data["thumbnail"] = self._create_thumbnail(yuv_frame, data["box"]) data["thumbnail"] = self._create_thumbnail(yuv_frame, data["box"])
self.tracked_events[data["id"]].append(data) self.tracked_events[data["id"]].append(data)
self.frame_manager.close(frame_id) self.frame_manager.close(frame_id)
else:
logger.debug(
f"Unable to create embedding for thumbnail from {camera} because frame is missing."
)
except FileNotFoundError: except FileNotFoundError:
pass pass
def _process_finalized(self) -> None: def _process_finalized(self) -> None:
"""Process the end of an event.""" """Process the end of an event."""
while True: while True:
ended = self.event_end_subscriber.check_for_update() ended = self.event_end_subscriber.check_for_update(timeout=0.1)
if ended == None: if ended == None:
break break
@ -217,7 +228,7 @@ class EmbeddingMaintainer(threading.Thread):
def _process_event_metadata(self): def _process_event_metadata(self):
# Check for regenerate description requests # Check for regenerate description requests
(topic, event_id, source) = self.event_metadata_subscriber.check_for_update( (topic, event_id, source) = self.event_metadata_subscriber.check_for_update(
timeout=1 timeout=0.1
) )
if topic is None: if topic is None: