recordings data pub/sub

This commit is contained in:
Josh Hawkins 2024-11-03 19:08:01 -06:00
parent 7048769dc1
commit 6c56cc3a54
4 changed files with 82 additions and 2 deletions

View File

@ -0,0 +1,36 @@
"""Facilitates communication between processes."""
import logging
from enum import Enum
from .zmq_proxy import Publisher, Subscriber
logger = logging.getLogger(__name__)
class RecordingsDataTypeEnum(str, Enum):
all = ""
recordings_available_through = "recordings_available_through"
class RecordingsDataPublisher(Publisher):
"""Publishes latest recording data."""
topic_base = "recordings/"
def __init__(self, topic: RecordingsDataTypeEnum) -> None:
topic = topic.value
super().__init__(topic)
def publish(self, payload: tuple[str, float]) -> None:
super().publish(payload)
class RecordingsDataSubscriber(Subscriber):
"""Receives latest recording data."""
topic_base = "recordings/"
def __init__(self, topic: RecordingsDataTypeEnum) -> None:
topic = topic.value
super().__init__(topic)

View File

@ -9,6 +9,10 @@ from peewee import DoesNotExist
from playhouse.sqliteq import SqliteQueueDatabase from playhouse.sqliteq import SqliteQueueDatabase
from frigate.comms.events_updater import EventEndSubscriber, EventUpdateSubscriber from frigate.comms.events_updater import EventEndSubscriber, EventUpdateSubscriber
from frigate.comms.recordings_updater import (
RecordingsDataSubscriber,
RecordingsDataTypeEnum,
)
from frigate.config import FrigateConfig from frigate.config import FrigateConfig
from frigate.embeddings.functions.embeddings_mixin import ( from frigate.embeddings.functions.embeddings_mixin import (
EmbeddingsMixin, EmbeddingsMixin,
@ -40,6 +44,12 @@ class ClassificationMaintainer(threading.Thread, EmbeddingsMixin):
self.stop_event = stop_event self.stop_event = stop_event
self.event_subscriber = EventUpdateSubscriber() self.event_subscriber = EventUpdateSubscriber()
self.event_end_subscriber = EventEndSubscriber() self.event_end_subscriber = EventEndSubscriber()
self.recordings_subscriber = RecordingsDataSubscriber(
RecordingsDataTypeEnum.recordings_available_through
)
# recordings data
self.recordings_available_through: dict[str, float] = {}
# Share required attributes and objects # Share required attributes and objects
self.face_detector = face_detector self.face_detector = face_detector
@ -58,10 +68,12 @@ class ClassificationMaintainer(threading.Thread, EmbeddingsMixin):
def run(self) -> None: def run(self) -> None:
"""Run classification for finalized events.""" """Run classification for finalized events."""
while not self.stop_event.is_set(): while not self.stop_event.is_set():
self._process_updates() self._process_recordings_updates()
self._process_event_updates()
self.event_subscriber.stop() self.event_subscriber.stop()
self.event_end_subscriber.stop() self.event_end_subscriber.stop()
self.recordings_subscriber.stop()
logger.info("Exiting classification maintainer...") logger.info("Exiting classification maintainer...")
def _fetch_cropped_recording_snapshot( def _fetch_cropped_recording_snapshot(
@ -127,7 +139,25 @@ class ClassificationMaintainer(threading.Thread, EmbeddingsMixin):
return yuv_image.tobytes() return yuv_image.tobytes()
def _process_updates(self) -> None: def _process_recordings_updates(self) -> None:
"""Process recordings updates."""
while True:
recordings_data = self.recordings_subscriber.check_for_update(timeout=0.01)
if recordings_data == None:
break
camera, recordings_available_through_timestamp = recordings_data
self.recordings_available_through[camera] = (
recordings_available_through_timestamp
)
logger.debug(
f"{camera} now has recordings available through {recordings_available_through_timestamp}"
)
def _process_event_updates(self) -> None:
"""Process events.""" """Process events."""
# TODO: check new topic for last recording time # TODO: check new topic for last recording time
update = self.event_subscriber.check_for_update(timeout=0.01) update = self.event_subscriber.check_for_update(timeout=0.01)

View File

@ -361,4 +361,5 @@ class EmbeddingsMixin:
"plate": top_plate, "plate": top_plate,
"char_confidences": top_char_confidences, "char_confidences": top_char_confidences,
"area": top_area, "area": top_area,
"frame_time": obj_data["frame_time"],
} }

View File

@ -19,6 +19,10 @@ import psutil
from frigate.comms.config_updater import ConfigSubscriber from frigate.comms.config_updater import ConfigSubscriber
from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum
from frigate.comms.inter_process import InterProcessRequestor from frigate.comms.inter_process import InterProcessRequestor
from frigate.comms.recordings_updater import (
RecordingsDataPublisher,
RecordingsDataTypeEnum,
)
from frigate.config import FrigateConfig, RetainModeEnum from frigate.config import FrigateConfig, RetainModeEnum
from frigate.const import ( from frigate.const import (
CACHE_DIR, CACHE_DIR,
@ -69,6 +73,9 @@ class RecordingMaintainer(threading.Thread):
self.requestor = InterProcessRequestor() self.requestor = InterProcessRequestor()
self.config_subscriber = ConfigSubscriber("config/record/") self.config_subscriber = ConfigSubscriber("config/record/")
self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.all) self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.all)
self.recordings_publisher = RecordingsDataPublisher(
RecordingsDataTypeEnum.recordings_available_through
)
self.stop_event = stop_event self.stop_event = stop_event
self.object_recordings_info: dict[str, list] = defaultdict(list) self.object_recordings_info: dict[str, list] = defaultdict(list)
@ -211,6 +218,11 @@ class RecordingMaintainer(threading.Thread):
[self.validate_and_move_segment(camera, reviews, r) for r in recordings] [self.validate_and_move_segment(camera, reviews, r) for r in recordings]
) )
# TODO: this is not correct
self.recordings_publisher.publish(
(camera, recordings[0]["start_time"].timestamp())
)
recordings_to_insert: list[Optional[Recordings]] = await asyncio.gather(*tasks) recordings_to_insert: list[Optional[Recordings]] = await asyncio.gather(*tasks)
# fire and forget recordings entries # fire and forget recordings entries
@ -571,4 +583,5 @@ class RecordingMaintainer(threading.Thread):
self.requestor.stop() self.requestor.stop()
self.config_subscriber.stop() self.config_subscriber.stop()
self.detection_subscriber.stop() self.detection_subscriber.stop()
self.recordings_publisher.stop()
logger.info("Exiting recording maintenance...") logger.info("Exiting recording maintenance...")