diff --git a/frigate/comms/recordings_updater.py b/frigate/comms/recordings_updater.py new file mode 100644 index 000000000..862ec1041 --- /dev/null +++ b/frigate/comms/recordings_updater.py @@ -0,0 +1,36 @@ +"""Facilitates communication between processes.""" + +import logging +from enum import Enum + +from .zmq_proxy import Publisher, Subscriber + +logger = logging.getLogger(__name__) + + +class RecordingsDataTypeEnum(str, Enum): + all = "" + recordings_available_through = "recordings_available_through" + + +class RecordingsDataPublisher(Publisher): + """Publishes latest recording data.""" + + topic_base = "recordings/" + + def __init__(self, topic: RecordingsDataTypeEnum) -> None: + topic = topic.value + super().__init__(topic) + + def publish(self, payload: tuple[str, float]) -> None: + super().publish(payload) + + +class RecordingsDataSubscriber(Subscriber): + """Receives latest recording data.""" + + topic_base = "recordings/" + + def __init__(self, topic: RecordingsDataTypeEnum) -> None: + topic = topic.value + super().__init__(topic) diff --git a/frigate/embeddings/classification.py b/frigate/embeddings/classification.py index 2a8fa916b..57e2d264a 100644 --- a/frigate/embeddings/classification.py +++ b/frigate/embeddings/classification.py @@ -9,6 +9,10 @@ from peewee import DoesNotExist from playhouse.sqliteq import SqliteQueueDatabase from frigate.comms.events_updater import EventEndSubscriber, EventUpdateSubscriber +from frigate.comms.recordings_updater import ( + RecordingsDataSubscriber, + RecordingsDataTypeEnum, +) from frigate.config import FrigateConfig from frigate.embeddings.functions.embeddings_mixin import ( EmbeddingsMixin, @@ -40,6 +44,12 @@ class ClassificationMaintainer(threading.Thread, EmbeddingsMixin): self.stop_event = stop_event self.event_subscriber = EventUpdateSubscriber() self.event_end_subscriber = EventEndSubscriber() + self.recordings_subscriber = RecordingsDataSubscriber( + RecordingsDataTypeEnum.recordings_available_through + ) + + # recordings data + self.recordings_available_through: dict[str, float] = {} # Share required attributes and objects self.face_detector = face_detector @@ -58,10 +68,12 @@ class ClassificationMaintainer(threading.Thread, EmbeddingsMixin): def run(self) -> None: """Run classification for finalized events.""" while not self.stop_event.is_set(): - self._process_updates() + self._process_recordings_updates() + self._process_event_updates() self.event_subscriber.stop() self.event_end_subscriber.stop() + self.recordings_subscriber.stop() logger.info("Exiting classification maintainer...") def _fetch_cropped_recording_snapshot( @@ -127,7 +139,25 @@ class ClassificationMaintainer(threading.Thread, EmbeddingsMixin): return yuv_image.tobytes() - def _process_updates(self) -> None: + def _process_recordings_updates(self) -> None: + """Process recordings updates.""" + while True: + recordings_data = self.recordings_subscriber.check_for_update(timeout=0.01) + + if recordings_data == None: + break + + camera, recordings_available_through_timestamp = recordings_data + + self.recordings_available_through[camera] = ( + recordings_available_through_timestamp + ) + + logger.debug( + f"{camera} now has recordings available through {recordings_available_through_timestamp}" + ) + + def _process_event_updates(self) -> None: """Process events.""" # TODO: check new topic for last recording time update = self.event_subscriber.check_for_update(timeout=0.01) diff --git a/frigate/embeddings/functions/embeddings_mixin.py b/frigate/embeddings/functions/embeddings_mixin.py index 0b446b7e4..db930de2f 100644 --- a/frigate/embeddings/functions/embeddings_mixin.py +++ b/frigate/embeddings/functions/embeddings_mixin.py @@ -361,4 +361,5 @@ class EmbeddingsMixin: "plate": top_plate, "char_confidences": top_char_confidences, "area": top_area, + "frame_time": obj_data["frame_time"], } diff --git a/frigate/record/maintainer.py b/frigate/record/maintainer.py index 314ff3646..3eb7b8eed 100644 --- a/frigate/record/maintainer.py +++ b/frigate/record/maintainer.py @@ -19,6 +19,10 @@ import psutil from frigate.comms.config_updater import ConfigSubscriber from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum from frigate.comms.inter_process import InterProcessRequestor +from frigate.comms.recordings_updater import ( + RecordingsDataPublisher, + RecordingsDataTypeEnum, +) from frigate.config import FrigateConfig, RetainModeEnum from frigate.const import ( CACHE_DIR, @@ -69,6 +73,9 @@ class RecordingMaintainer(threading.Thread): self.requestor = InterProcessRequestor() self.config_subscriber = ConfigSubscriber("config/record/") self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.all) + self.recordings_publisher = RecordingsDataPublisher( + RecordingsDataTypeEnum.recordings_available_through + ) self.stop_event = stop_event self.object_recordings_info: dict[str, list] = defaultdict(list) @@ -211,6 +218,11 @@ class RecordingMaintainer(threading.Thread): [self.validate_and_move_segment(camera, reviews, r) for r in recordings] ) + # TODO: this is not correct + self.recordings_publisher.publish( + (camera, recordings[0]["start_time"].timestamp()) + ) + recordings_to_insert: list[Optional[Recordings]] = await asyncio.gather(*tasks) # fire and forget recordings entries @@ -571,4 +583,5 @@ class RecordingMaintainer(threading.Thread): self.requestor.stop() self.config_subscriber.stop() self.detection_subscriber.stop() + self.recordings_publisher.stop() logger.info("Exiting recording maintenance...")