From 99285922662bbc59386b8ef0abb9ce190d7ef2a6 Mon Sep 17 00:00:00 2001 From: Nicolas Mowen Date: Thu, 10 Oct 2024 10:28:29 -0600 Subject: [PATCH] Improve timeout handling --- frigate/comms/embeddings_updater.py | 2 +- frigate/embeddings/maintainer.py | 43 ++++++++++++++++++----------- 2 files changed, 28 insertions(+), 17 deletions(-) diff --git a/frigate/comms/embeddings_updater.py b/frigate/comms/embeddings_updater.py index 8a7617630..acb11924f 100644 --- a/frigate/comms/embeddings_updater.py +++ b/frigate/comms/embeddings_updater.py @@ -22,7 +22,7 @@ class EmbeddingsResponder: def check_for_request(self, process: Callable) -> None: while True: # load all messages that are queued - has_message, _, _ = zmq.select([self.socket], [], [], 1) + has_message, _, _ = zmq.select([self.socket], [], [], 0.1) if not has_message: break diff --git a/frigate/embeddings/maintainer.py b/frigate/embeddings/maintainer.py index 68c3e3686..60ad8c167 100644 --- a/frigate/embeddings/maintainer.py +++ b/frigate/embeddings/maintainer.py @@ -77,25 +77,32 @@ class EmbeddingMaintainer(threading.Thread): """Process embeddings requests""" def handle_request(topic: str, data: str) -> str: - if topic == EmbeddingsRequestEnum.embed_description.value: - return serialize( - self.embeddings.upsert_description(data["id"], data["description"]), - pack=False, - ) - elif topic == EmbeddingsRequestEnum.embed_thumbnail.value: - thumbnail = base64.b64decode(data["thumbnail"]) - return serialize( - self.embeddings.upsert_thumbnail(data["id"], thumbnail), - pack=False, - ) - elif topic == EmbeddingsRequestEnum.generate_search.value: - return serialize(self.embeddings.text_embedding([data])[0], pack=False) + try: + if topic == EmbeddingsRequestEnum.embed_description.value: + return serialize( + self.embeddings.upsert_description( + data["id"], data["description"] + ), + pack=False, + ) + elif topic == EmbeddingsRequestEnum.embed_thumbnail.value: + thumbnail = base64.b64decode(data["thumbnail"]) + return serialize( + self.embeddings.upsert_thumbnail(data["id"], thumbnail), + pack=False, + ) + elif topic == EmbeddingsRequestEnum.generate_search.value: + return serialize( + self.embeddings.text_embedding([data])[0], pack=False + ) + except Exception as e: + logger.error(f"Unable to handle embeddings request {e}") self.embeddings_responder.check_for_request(handle_request) def _process_updates(self) -> None: """Process event updates""" - update = self.event_subscriber.check_for_update() + update = self.event_subscriber.check_for_update(timeout=0.1) if update is None: return @@ -118,13 +125,17 @@ class EmbeddingMaintainer(threading.Thread): data["thumbnail"] = self._create_thumbnail(yuv_frame, data["box"]) self.tracked_events[data["id"]].append(data) self.frame_manager.close(frame_id) + else: + logger.debug( + f"Unable to create embedding for thumbnail from {camera} because frame is missing." + ) except FileNotFoundError: pass def _process_finalized(self) -> None: """Process the end of an event.""" while True: - ended = self.event_end_subscriber.check_for_update() + ended = self.event_end_subscriber.check_for_update(timeout=0.1) if ended == None: break @@ -217,7 +228,7 @@ class EmbeddingMaintainer(threading.Thread): def _process_event_metadata(self): # Check for regenerate description requests (topic, event_id, source) = self.event_metadata_subscriber.check_for_update( - timeout=1 + timeout=0.1 ) if topic is None: