diff --git a/frigate/app.py b/frigate/app.py index eca56f49f..89f3e2221 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -36,13 +36,14 @@ from frigate.events.external import ExternalEventProcessor from frigate.events.maintainer import EventProcessor from frigate.http import create_app from frigate.log import log_process, root_configurer -from frigate.models import Event, Recordings, Timeline +from frigate.models import Event, Recordings, RecordingsToDelete, Timeline from frigate.object_detection import ObjectDetectProcess from frigate.object_processing import TrackedObjectProcessor from frigate.output import output_frames from frigate.plus import PlusApi from frigate.ptz.autotrack import PtzAutoTrackerThread from frigate.ptz.onvif import OnvifController +from frigate.record.cleanup import RecordingCleanup from frigate.record.record import manage_recordings from frigate.stats import StatsEmitter, stats_init from frigate.storage import StorageMaintainer @@ -292,6 +293,7 @@ class FrigateApp: name="recording_manager", args=( self.config, + self.inter_process_queue, self.object_recordings_info_queue, self.audio_recordings_info_queue, self.feature_metrics, @@ -317,7 +319,7 @@ class FrigateApp: 60, 10 * len([c for c in self.config.cameras.values() if c.enabled]) ), ) - models = [Event, Recordings, Timeline] + models = [Event, Recordings, RecordingsToDelete, Timeline] self.db.bind(models) def init_stats(self) -> None: @@ -522,6 +524,10 @@ class FrigateApp: self.event_cleanup = EventCleanup(self.config, self.stop_event) self.event_cleanup.start() + def start_record_cleanup(self) -> None: + self.record_cleanup = RecordingCleanup(self.config, self.stop_event) + self.record_cleanup.start() + def start_storage_maintainer(self) -> None: self.storage_maintainer = StorageMaintainer(self.config, self.stop_event) self.storage_maintainer.start() @@ -607,6 +613,7 @@ class FrigateApp: self.start_timeline_processor() self.start_event_processor() self.start_event_cleanup() + self.start_record_cleanup() self.start_stats_emitter() self.start_watchdog() self.check_shm() @@ -643,6 +650,7 @@ class FrigateApp: self.ptz_autotracker_thread.join() self.event_processor.join() self.event_cleanup.join() + self.record_cleanup.join() self.stats_emitter.join() self.frigate_watchdog.join() self.db.stop() diff --git a/frigate/comms/dispatcher.py b/frigate/comms/dispatcher.py index f2fe40c5d..56a2c5c9b 100644 --- a/frigate/comms/dispatcher.py +++ b/frigate/comms/dispatcher.py @@ -5,6 +5,8 @@ from abc import ABC, abstractmethod from typing import Any, Callable from frigate.config import FrigateConfig +from frigate.const import INSERT_MANY_RECORDINGS +from frigate.models import Recordings from frigate.ptz.onvif import OnvifCommandEnum, OnvifController from frigate.types import CameraMetricsTypes, FeatureMetricsTypes, PTZMetricsTypes from frigate.util.services import restart_frigate @@ -86,6 +88,8 @@ class Dispatcher: return elif topic == "restart": restart_frigate() + elif topic == INSERT_MANY_RECORDINGS: + Recordings.insert_many(payload).execute() else: self.publish(topic, payload, retain=False) diff --git a/frigate/config.py b/frigate/config.py index 101775f6d..7f1624ed4 100644 --- a/frigate/config.py +++ b/frigate/config.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio import json import logging import os @@ -1059,7 +1060,7 @@ class FrigateConfig(FrigateBaseModel): if "detect" in input.roles: stream_info = {"width": 0, "height": 0} try: - stream_info = get_video_properties(input.path) + stream_info = asyncio.run(get_video_properties(input.path)) except Exception: logger.warn( f"Error detecting stream resolution automatically for {input.path} Applying default values." diff --git a/frigate/const.py b/frigate/const.py index 4dec7f366..c6912471b 100644 --- a/frigate/const.py +++ b/frigate/const.py @@ -47,3 +47,7 @@ DRIVER_INTEL_iHD = "iHD" MAX_SEGMENT_DURATION = 600 MAX_PLAYLIST_SECONDS = 7200 # support 2 hour segments for a single playlist to account for cameras with inconsistent segment times + +# Internal Comms Topics + +INSERT_MANY_RECORDINGS = "insert_many_recordings" diff --git a/frigate/record/maintainer.py b/frigate/record/maintainer.py index 372e6a4fd..c67f07c80 100644 --- a/frigate/record/maintainer.py +++ b/frigate/record/maintainer.py @@ -8,7 +8,6 @@ import os import queue import random import string -import subprocess as sp import threading from collections import defaultdict from multiprocessing.synchronize import Event as MpEvent @@ -19,7 +18,12 @@ import numpy as np import psutil from frigate.config import FrigateConfig, RetainModeEnum -from frigate.const import CACHE_DIR, MAX_SEGMENT_DURATION, RECORD_DIR +from frigate.const import ( + CACHE_DIR, + INSERT_MANY_RECORDINGS, + MAX_SEGMENT_DURATION, + RECORD_DIR, +) from frigate.models import Event, Recordings from frigate.types import FeatureMetricsTypes from frigate.util.image import area @@ -51,6 +55,7 @@ class RecordingMaintainer(threading.Thread): def __init__( self, config: FrigateConfig, + inter_process_queue: mp.Queue, object_recordings_info_queue: mp.Queue, audio_recordings_info_queue: Optional[mp.Queue], process_info: dict[str, FeatureMetricsTypes], @@ -59,6 +64,7 @@ class RecordingMaintainer(threading.Thread): threading.Thread.__init__(self) self.name = "recording_maintainer" self.config = config + self.inter_process_queue = inter_process_queue self.object_recordings_info_queue = object_recordings_info_queue self.audio_recordings_info_queue = audio_recordings_info_queue self.process_info = process_info @@ -161,9 +167,11 @@ class RecordingMaintainer(threading.Thread): ) recordings_to_insert: list[Optional[Recordings]] = await asyncio.gather(*tasks) - Recordings.insert_many( - [r for r in recordings_to_insert if r is not None] - ).execute() + + # fire and forget recordings entries + self.inter_process_queue.put( + (INSERT_MANY_RECORDINGS, [r for r in recordings_to_insert if r is not None]) + ) async def validate_and_move_segment( self, camera: str, events: Event, recording: dict[str, any] @@ -183,7 +191,7 @@ class RecordingMaintainer(threading.Thread): if cache_path in self.end_time_cache: end_time, duration = self.end_time_cache[cache_path] else: - segment_info = get_video_properties(cache_path, get_duration=True) + segment_info = await get_video_properties(cache_path, get_duration=True) if segment_info["duration"]: duration = float(segment_info["duration"]) @@ -231,7 +239,7 @@ class RecordingMaintainer(threading.Thread): if overlaps: record_mode = self.config.cameras[camera].record.events.retain.mode # move from cache to recordings immediately - return self.move_segment( + return await self.move_segment( camera, start_time, end_time, @@ -253,7 +261,7 @@ class RecordingMaintainer(threading.Thread): # else retain days includes this segment else: record_mode = self.config.cameras[camera].record.retain.mode - return self.move_segment( + return await self.move_segment( camera, start_time, end_time, duration, cache_path, record_mode ) @@ -296,7 +304,7 @@ class RecordingMaintainer(threading.Thread): return SegmentInfo(motion_count, active_count, round(average_dBFS)) - def move_segment( + async def move_segment( self, camera: str, start_time: datetime.datetime, @@ -332,7 +340,7 @@ class RecordingMaintainer(threading.Thread): start_frame = datetime.datetime.now().timestamp() # add faststart to kept segments to improve metadata reading - ffmpeg_cmd = [ + p = await asyncio.create_subprocess_exec( "ffmpeg", "-hide_banner", "-y", @@ -343,17 +351,13 @@ class RecordingMaintainer(threading.Thread): "-movflags", "+faststart", file_path, - ] - - p = sp.run( - ffmpeg_cmd, - encoding="ascii", - capture_output=True, + stderr=asyncio.subprocess.PIPE, ) + await p.wait() if p.returncode != 0: logger.error(f"Unable to convert {cache_path} to {file_path}") - logger.error(p.stderr) + logger.error((await p.stderr.read()).decode("ascii")) return None else: logger.debug( diff --git a/frigate/record/record.py b/frigate/record/record.py index 0c76f33cb..ca4400e57 100644 --- a/frigate/record/record.py +++ b/frigate/record/record.py @@ -11,8 +11,7 @@ from playhouse.sqliteq import SqliteQueueDatabase from setproctitle import setproctitle from frigate.config import FrigateConfig -from frigate.models import Event, Recordings, RecordingsToDelete, Timeline -from frigate.record.cleanup import RecordingCleanup +from frigate.models import Event, Recordings from frigate.record.maintainer import RecordingMaintainer from frigate.types import FeatureMetricsTypes from frigate.util.services import listen @@ -22,6 +21,7 @@ logger = logging.getLogger(__name__) def manage_recordings( config: FrigateConfig, + inter_process_queue: mp.Queue, object_recordings_info_queue: mp.Queue, audio_recordings_info_queue: mp.Queue, process_info: dict[str, FeatureMetricsTypes], @@ -47,17 +47,15 @@ def manage_recordings( }, timeout=max(60, 10 * len([c for c in config.cameras.values() if c.enabled])), ) - models = [Event, Recordings, Timeline, RecordingsToDelete] + models = [Event, Recordings] db.bind(models) maintainer = RecordingMaintainer( config, + inter_process_queue, object_recordings_info_queue, audio_recordings_info_queue, process_info, stop_event, ) maintainer.start() - - cleanup = RecordingCleanup(config, stop_event) - cleanup.start() diff --git a/frigate/util/services.py b/frigate/util/services.py index 3c591feb9..0d5de327d 100644 --- a/frigate/util/services.py +++ b/frigate/util/services.py @@ -1,5 +1,6 @@ """Utilities for services.""" +import asyncio import json import logging import os @@ -352,8 +353,8 @@ def vainfo_hwaccel(device_name: Optional[str] = None) -> sp.CompletedProcess: return sp.run(ffprobe_cmd, capture_output=True) -def get_video_properties(url, get_duration=False): - def calculate_duration(video: Optional[any]) -> float: +async def get_video_properties(url, get_duration=False): + async def calculate_duration(video: Optional[any]) -> float: duration = None if video is not None: @@ -366,7 +367,7 @@ def get_video_properties(url, get_duration=False): # if cv2 failed need to use ffprobe if duration is None: - ffprobe_cmd = [ + p = await asyncio.create_subprocess_exec( "ffprobe", "-v", "error", @@ -375,11 +376,18 @@ def get_video_properties(url, get_duration=False): "-of", "default=noprint_wrappers=1:nokey=1", f"{url}", - ] - p = sp.run(ffprobe_cmd, capture_output=True) + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + await p.wait() - if p.returncode == 0 and p.stdout.decode(): - duration = float(p.stdout.decode().strip()) + if p.returncode == 0: + result = (await p.stdout.read()).decode() + else: + result = None + + if result: + duration = float(result.strip()) else: duration = -1 @@ -400,7 +408,7 @@ def get_video_properties(url, get_duration=False): result = {} if get_duration: - result["duration"] = calculate_duration(video) + result["duration"] = await calculate_duration(video) if video is not None: # Get the width of frames in the video stream