Remove superfluous RecordingMaintainer thread

This commit is contained in:
George Tsiamasiotis 2024-09-30 16:58:35 +03:00
parent e2ad2c43b0
commit 6db9a8c639
2 changed files with 109 additions and 122 deletions

View File

@ -6,24 +6,17 @@ import logging
import os
import random
import string
import threading
import time
from collections import defaultdict
from multiprocessing.synchronize import Event as MpEvent
from pathlib import Path
from typing import Any, Optional, Tuple
import numpy as np
import psutil
from frigate.comms.config_updater import ConfigSubscriber
from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import FrigateConfig, RetainModeEnum
from frigate.const import (
CACHE_DIR,
CACHE_SEGMENT_FORMAT,
INSERT_MANY_RECORDINGS,
MAX_SEGMENT_DURATION,
MAX_SEGMENTS_IN_CACHE,
RECORD_DIR,
@ -60,29 +53,21 @@ class SegmentInfo:
)
class RecordingMaintainer(threading.Thread):
def __init__(self, config: FrigateConfig, stop_event: MpEvent):
threading.Thread.__init__(self)
self.name = "recording_maintainer"
class RecordingMaintainer:
def __init__(self, config: FrigateConfig):
self.config = config
# create communication for retained recordings
self.requestor = InterProcessRequestor()
self.config_subscriber = ConfigSubscriber("config/record/")
self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.all)
self.stop_event = stop_event
self.object_recordings_info: dict[str, list] = defaultdict(list)
self.audio_recordings_info: dict[str, list] = defaultdict(list)
self.end_time_cache: dict[str, Tuple[datetime.datetime, float]] = {}
async def move_files(self) -> None:
async def move_files(self) -> list[Recordings]:
cache_files = [
d
for d in os.listdir(CACHE_DIR)
if os.path.isfile(os.path.join(CACHE_DIR, d))
and d.endswith(".mp4")
if d.endswith(".mp4")
and not d.startswith("preview_")
and os.path.isfile(os.path.join(CACHE_DIR, d))
]
files_in_use = []
@ -182,13 +167,8 @@ class RecordingMaintainer(threading.Thread):
[self.validate_and_move_segment(camera, reviews, r) for r in recordings]
)
recordings_to_insert: list[Optional[Recordings]] = await asyncio.gather(*tasks)
# fire and forget recordings entries
self.requestor.send_data(
INSERT_MANY_RECORDINGS,
[r for r in recordings_to_insert if r is not None],
)
recordings = await asyncio.gather(*tasks)
return [r for r in recordings if r is not None]
async def validate_and_move_segment(
self, camera: str, reviews: list[ReviewSegment], recording: dict[str, any]
@ -452,94 +432,6 @@ class RecordingMaintainer(threading.Thread):
return None
def run(self) -> None:
# Check for new files every 5 seconds
wait_time = 0.0
while not self.stop_event.is_set():
time.sleep(wait_time)
if self.stop_event.is_set():
break
run_start = datetime.datetime.now().timestamp()
# check if there is an updated config
while True:
(
updated_topic,
updated_record_config,
) = self.config_subscriber.check_for_update()
if not updated_topic:
break
camera_name = updated_topic.rpartition("/")[-1]
self.config.cameras[camera_name].record = updated_record_config
stale_frame_count = 0
stale_frame_count_threshold = 10
# empty the object recordings info queue
while True:
(topic, data) = self.detection_subscriber.check_for_update(
timeout=QUEUE_READ_TIMEOUT
)
if not topic:
break
if topic == DetectionTypeEnum.video:
(
camera,
frame_time,
current_tracked_objects,
motion_boxes,
regions,
) = data
if self.config.cameras[camera].record.enabled:
self.object_recordings_info[camera].append(
(
frame_time,
current_tracked_objects,
motion_boxes,
regions,
)
)
elif topic == DetectionTypeEnum.audio:
(
camera,
frame_time,
dBFS,
audio_detections,
) = data
if self.config.cameras[camera].record.enabled:
self.audio_recordings_info[camera].append(
(
frame_time,
dBFS,
audio_detections,
)
)
elif topic == DetectionTypeEnum.api:
continue
if frame_time < run_start - stale_frame_count_threshold:
stale_frame_count += 1
if stale_frame_count > 0:
logger.debug(f"Found {stale_frame_count} old frames.")
try:
asyncio.run(self.move_files())
except Exception as e:
logger.error(
"Error occurred when attempting to maintain recording cache"
)
logger.error(e)
duration = datetime.datetime.now().timestamp() - run_start
wait_time = max(0, 5 - duration)
# (...)
self.requestor.stop()
self.config_subscriber.stop()
self.detection_subscriber.stop()
logger.info("Exiting recording maintenance...")

View File

@ -1,13 +1,19 @@
"""Run recording maintainer and cleanup."""
import asyncio
import datetime
import logging
from playhouse.sqliteq import SqliteQueueDatabase
from frigate import util
from frigate.comms.config_updater import ConfigSubscriber
from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import FrigateConfig
from frigate.const import INSERT_MANY_RECORDINGS
from frigate.models import Recordings, ReviewSegment
from frigate.record.maintainer import RecordingMaintainer
from frigate.record.maintainer import QUEUE_READ_TIMEOUT, RecordingMaintainer
logger = logging.getLogger(__name__)
@ -30,8 +36,97 @@ class ManageRecordings(util.Process):
models = [ReviewSegment, Recordings]
db.bind(models)
maintainer = RecordingMaintainer(
self.config,
self.stop_event,
requestor = InterProcessRequestor()
config_subscriber = ConfigSubscriber("config/record/")
detection_subscriber = DetectionSubscriber(DetectionTypeEnum.all)
maintainer = RecordingMaintainer(self.config)
wait_time = 0
while not self.stop_event.wait(wait_time):
run_start = datetime.datetime.now().timestamp()
# Check if there is an updated config
while True:
(
updated_topic,
updated_record_config,
) = config_subscriber.check_for_update()
if not updated_topic:
break
camera_name = updated_topic.rpartition("/")[-1]
self.config.cameras[camera_name].record = updated_record_config
stale_frame_count = 0
stale_frame_count_threshold = 10
# Empty the object recordings info queue
while True:
(topic, data) = detection_subscriber.check_for_update(
timeout=QUEUE_READ_TIMEOUT
)
maintainer.start()
if not topic:
break
if topic == DetectionTypeEnum.video:
(
camera,
frame_time,
current_tracked_objects,
motion_boxes,
regions,
) = data
if self.config.cameras[camera].record.enabled:
maintainer.object_recordings_info[camera].append(
(
frame_time,
current_tracked_objects,
motion_boxes,
regions,
)
)
elif topic == DetectionTypeEnum.audio:
(
camera,
frame_time,
dBFS,
audio_detections,
) = data
if self.config.cameras[camera].record.enabled:
maintainer.audio_recordings_info[camera].append(
(
frame_time,
dBFS,
audio_detections,
)
)
elif topic == DetectionTypeEnum.api:
continue
if frame_time < run_start - stale_frame_count_threshold:
stale_frame_count += 1
if stale_frame_count > 0:
logger.debug(f"Found {stale_frame_count} old frames.")
try:
recordings = asyncio.run(maintainer.move_files())
# fire and forget recordings entries
requestor.send_data(INSERT_MANY_RECORDINGS, recordings)
except Exception:
logger.exception(
"Error occurred when attempting to maintain recording cache"
)
duration = datetime.datetime.now().timestamp() - run_start
wait_time = max(0, 5 - duration)
requestor.stop()
config_subscriber.stop()
detection_subscriber.stop()
logger.info("Exiting recording maintenance...")