diff --git a/frigate/record/maintainer.py b/frigate/record/maintainer.py index 66036f807..783b186f7 100644 --- a/frigate/record/maintainer.py +++ b/frigate/record/maintainer.py @@ -6,24 +6,17 @@ import logging import os import random import string -import threading -import time from collections import defaultdict -from multiprocessing.synchronize import Event as MpEvent from pathlib import Path from typing import Any, Optional, Tuple import numpy as np import psutil -from frigate.comms.config_updater import ConfigSubscriber -from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum -from frigate.comms.inter_process import InterProcessRequestor from frigate.config import FrigateConfig, RetainModeEnum from frigate.const import ( CACHE_DIR, CACHE_SEGMENT_FORMAT, - INSERT_MANY_RECORDINGS, MAX_SEGMENT_DURATION, MAX_SEGMENTS_IN_CACHE, RECORD_DIR, @@ -60,29 +53,21 @@ class SegmentInfo: ) -class RecordingMaintainer(threading.Thread): - def __init__(self, config: FrigateConfig, stop_event: MpEvent): - threading.Thread.__init__(self) - self.name = "recording_maintainer" +class RecordingMaintainer: + def __init__(self, config: FrigateConfig): self.config = config - # create communication for retained recordings - self.requestor = InterProcessRequestor() - self.config_subscriber = ConfigSubscriber("config/record/") - self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.all) - - self.stop_event = stop_event self.object_recordings_info: dict[str, list] = defaultdict(list) self.audio_recordings_info: dict[str, list] = defaultdict(list) self.end_time_cache: dict[str, Tuple[datetime.datetime, float]] = {} - async def move_files(self) -> None: + async def move_files(self) -> list[Recordings]: cache_files = [ d for d in os.listdir(CACHE_DIR) - if os.path.isfile(os.path.join(CACHE_DIR, d)) - and d.endswith(".mp4") + if d.endswith(".mp4") and not d.startswith("preview_") + and os.path.isfile(os.path.join(CACHE_DIR, d)) ] files_in_use = [] @@ -182,13 +167,8 @@ class RecordingMaintainer(threading.Thread): [self.validate_and_move_segment(camera, reviews, r) for r in recordings] ) - recordings_to_insert: list[Optional[Recordings]] = await asyncio.gather(*tasks) - - # fire and forget recordings entries - self.requestor.send_data( - INSERT_MANY_RECORDINGS, - [r for r in recordings_to_insert if r is not None], - ) + recordings = await asyncio.gather(*tasks) + return [r for r in recordings if r is not None] async def validate_and_move_segment( self, camera: str, reviews: list[ReviewSegment], recording: dict[str, any] @@ -452,94 +432,6 @@ class RecordingMaintainer(threading.Thread): return None def run(self) -> None: - # Check for new files every 5 seconds - wait_time = 0.0 - while not self.stop_event.is_set(): - time.sleep(wait_time) - - if self.stop_event.is_set(): - break - - run_start = datetime.datetime.now().timestamp() - - # check if there is an updated config - while True: - ( - updated_topic, - updated_record_config, - ) = self.config_subscriber.check_for_update() - - if not updated_topic: - break - - camera_name = updated_topic.rpartition("/")[-1] - self.config.cameras[camera_name].record = updated_record_config - - stale_frame_count = 0 - stale_frame_count_threshold = 10 - # empty the object recordings info queue - while True: - (topic, data) = self.detection_subscriber.check_for_update( - timeout=QUEUE_READ_TIMEOUT - ) - - if not topic: - break - - if topic == DetectionTypeEnum.video: - ( - camera, - frame_time, - current_tracked_objects, - motion_boxes, - regions, - ) = data - - if self.config.cameras[camera].record.enabled: - self.object_recordings_info[camera].append( - ( - frame_time, - current_tracked_objects, - motion_boxes, - regions, - ) - ) - elif topic == DetectionTypeEnum.audio: - ( - camera, - frame_time, - dBFS, - audio_detections, - ) = data - - if self.config.cameras[camera].record.enabled: - self.audio_recordings_info[camera].append( - ( - frame_time, - dBFS, - audio_detections, - ) - ) - elif topic == DetectionTypeEnum.api: - continue - - if frame_time < run_start - stale_frame_count_threshold: - stale_frame_count += 1 - - if stale_frame_count > 0: - logger.debug(f"Found {stale_frame_count} old frames.") - - try: - asyncio.run(self.move_files()) - except Exception as e: - logger.error( - "Error occurred when attempting to maintain recording cache" - ) - logger.error(e) - duration = datetime.datetime.now().timestamp() - run_start - wait_time = max(0, 5 - duration) + # (...) self.requestor.stop() - self.config_subscriber.stop() - self.detection_subscriber.stop() - logger.info("Exiting recording maintenance...") diff --git a/frigate/record/record.py b/frigate/record/record.py index 385f0b0ca..35ec49f2f 100644 --- a/frigate/record/record.py +++ b/frigate/record/record.py @@ -1,13 +1,19 @@ """Run recording maintainer and cleanup.""" +import asyncio +import datetime import logging from playhouse.sqliteq import SqliteQueueDatabase from frigate import util +from frigate.comms.config_updater import ConfigSubscriber +from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum +from frigate.comms.inter_process import InterProcessRequestor from frigate.config import FrigateConfig +from frigate.const import INSERT_MANY_RECORDINGS from frigate.models import Recordings, ReviewSegment -from frigate.record.maintainer import RecordingMaintainer +from frigate.record.maintainer import QUEUE_READ_TIMEOUT, RecordingMaintainer logger = logging.getLogger(__name__) @@ -30,8 +36,97 @@ class ManageRecordings(util.Process): models = [ReviewSegment, Recordings] db.bind(models) - maintainer = RecordingMaintainer( - self.config, - self.stop_event, - ) - maintainer.start() + requestor = InterProcessRequestor() + config_subscriber = ConfigSubscriber("config/record/") + detection_subscriber = DetectionSubscriber(DetectionTypeEnum.all) + + maintainer = RecordingMaintainer(self.config) + + wait_time = 0 + while not self.stop_event.wait(wait_time): + run_start = datetime.datetime.now().timestamp() + + # Check if there is an updated config + while True: + ( + updated_topic, + updated_record_config, + ) = config_subscriber.check_for_update() + + if not updated_topic: + break + + camera_name = updated_topic.rpartition("/")[-1] + self.config.cameras[camera_name].record = updated_record_config + + stale_frame_count = 0 + stale_frame_count_threshold = 10 + + # Empty the object recordings info queue + while True: + (topic, data) = detection_subscriber.check_for_update( + timeout=QUEUE_READ_TIMEOUT + ) + + if not topic: + break + + if topic == DetectionTypeEnum.video: + ( + camera, + frame_time, + current_tracked_objects, + motion_boxes, + regions, + ) = data + + if self.config.cameras[camera].record.enabled: + maintainer.object_recordings_info[camera].append( + ( + frame_time, + current_tracked_objects, + motion_boxes, + regions, + ) + ) + elif topic == DetectionTypeEnum.audio: + ( + camera, + frame_time, + dBFS, + audio_detections, + ) = data + + if self.config.cameras[camera].record.enabled: + maintainer.audio_recordings_info[camera].append( + ( + frame_time, + dBFS, + audio_detections, + ) + ) + elif topic == DetectionTypeEnum.api: + continue + + if frame_time < run_start - stale_frame_count_threshold: + stale_frame_count += 1 + + if stale_frame_count > 0: + logger.debug(f"Found {stale_frame_count} old frames.") + + try: + recordings = asyncio.run(maintainer.move_files()) + + # fire and forget recordings entries + requestor.send_data(INSERT_MANY_RECORDINGS, recordings) + except Exception: + logger.exception( + "Error occurred when attempting to maintain recording cache" + ) + duration = datetime.datetime.now().timestamp() - run_start + wait_time = max(0, 5 - duration) + + requestor.stop() + config_subscriber.stop() + detection_subscriber.stop() + logger.info("Exiting recording maintenance...")