Use inter process communication to write recordings into the DB

This commit is contained in:
Nick Mowen 2023-07-24 14:21:54 -06:00
parent f5af4c2125
commit 0fb32ee2e6
5 changed files with 17 additions and 4 deletions

View File

@ -293,6 +293,7 @@ class FrigateApp:
name="recording_manager", name="recording_manager",
args=( args=(
self.config, self.config,
self.inter_process_queue,
self.object_recordings_info_queue, self.object_recordings_info_queue,
self.audio_recordings_info_queue, self.audio_recordings_info_queue,
self.feature_metrics, self.feature_metrics,

View File

@ -5,6 +5,8 @@ from abc import ABC, abstractmethod
from typing import Any, Callable from typing import Any, Callable
from frigate.config import FrigateConfig from frigate.config import FrigateConfig
from frigate.const import INSERT_MANY_RECORDINGS
from frigate.models import Recordings
from frigate.ptz.onvif import OnvifCommandEnum, OnvifController from frigate.ptz.onvif import OnvifCommandEnum, OnvifController
from frigate.types import CameraMetricsTypes, FeatureMetricsTypes, PTZMetricsTypes from frigate.types import CameraMetricsTypes, FeatureMetricsTypes, PTZMetricsTypes
from frigate.util.services import restart_frigate from frigate.util.services import restart_frigate
@ -86,6 +88,8 @@ class Dispatcher:
return return
elif topic == "restart": elif topic == "restart":
restart_frigate() restart_frigate()
elif topic == INSERT_MANY_RECORDINGS:
Recordings.insert_many(payload).execute()
else: else:
self.publish(topic, payload, retain=False) self.publish(topic, payload, retain=False)

View File

@ -47,3 +47,7 @@ DRIVER_INTEL_iHD = "iHD"
MAX_SEGMENT_DURATION = 600 MAX_SEGMENT_DURATION = 600
MAX_PLAYLIST_SECONDS = 7200 # support 2 hour segments for a single playlist to account for cameras with inconsistent segment times MAX_PLAYLIST_SECONDS = 7200 # support 2 hour segments for a single playlist to account for cameras with inconsistent segment times
# Internal Comms Topics
INSERT_MANY_RECORDINGS = "insert_many_recordings"

View File

@ -18,7 +18,7 @@ import numpy as np
import psutil import psutil
from frigate.config import FrigateConfig, RetainModeEnum from frigate.config import FrigateConfig, RetainModeEnum
from frigate.const import CACHE_DIR, MAX_SEGMENT_DURATION, RECORD_DIR from frigate.const import CACHE_DIR, INSERT_MANY_RECORDINGS, MAX_SEGMENT_DURATION, RECORD_DIR
from frigate.models import Event, Recordings from frigate.models import Event, Recordings
from frigate.types import FeatureMetricsTypes from frigate.types import FeatureMetricsTypes
from frigate.util.image import area from frigate.util.image import area
@ -50,6 +50,7 @@ class RecordingMaintainer(threading.Thread):
def __init__( def __init__(
self, self,
config: FrigateConfig, config: FrigateConfig,
inter_process_queue: mp.Queue,
object_recordings_info_queue: mp.Queue, object_recordings_info_queue: mp.Queue,
audio_recordings_info_queue: Optional[mp.Queue], audio_recordings_info_queue: Optional[mp.Queue],
process_info: dict[str, FeatureMetricsTypes], process_info: dict[str, FeatureMetricsTypes],
@ -58,6 +59,7 @@ class RecordingMaintainer(threading.Thread):
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.name = "recording_maintainer" self.name = "recording_maintainer"
self.config = config self.config = config
self.inter_process_queue = inter_process_queue
self.object_recordings_info_queue = object_recordings_info_queue self.object_recordings_info_queue = object_recordings_info_queue
self.audio_recordings_info_queue = audio_recordings_info_queue self.audio_recordings_info_queue = audio_recordings_info_queue
self.process_info = process_info self.process_info = process_info
@ -160,9 +162,9 @@ class RecordingMaintainer(threading.Thread):
) )
recordings_to_insert: list[Optional[Recordings]] = await asyncio.gather(*tasks) recordings_to_insert: list[Optional[Recordings]] = await asyncio.gather(*tasks)
Recordings.insert_many(
[r for r in recordings_to_insert if r is not None] # fire and forget recordings entries
).execute() self.inter_process_queue.put((INSERT_MANY_RECORDINGS, [r for r in recordings_to_insert if r is not None]))
async def validate_and_move_segment( async def validate_and_move_segment(
self, camera: str, events: Event, recording: dict[str, any] self, camera: str, events: Event, recording: dict[str, any]

View File

@ -21,6 +21,7 @@ logger = logging.getLogger(__name__)
def manage_recordings( def manage_recordings(
config: FrigateConfig, config: FrigateConfig,
inter_process_queue: mp.Queue,
object_recordings_info_queue: mp.Queue, object_recordings_info_queue: mp.Queue,
audio_recordings_info_queue: mp.Queue, audio_recordings_info_queue: mp.Queue,
process_info: dict[str, FeatureMetricsTypes], process_info: dict[str, FeatureMetricsTypes],
@ -51,6 +52,7 @@ def manage_recordings(
maintainer = RecordingMaintainer( maintainer = RecordingMaintainer(
config, config,
inter_process_queue,
object_recordings_info_queue, object_recordings_info_queue,
audio_recordings_info_queue, audio_recordings_info_queue,
process_info, process_info,