Add support for S3 storage configuration and enable S3 download and upload in recording clips and cleanup. Add a storage type field to recordings

This commit is contained in:
Sergey Krashevich 2023-05-23 19:59:38 +03:00
parent f66ccb04f6
commit 5f4a107e8e
No known key found for this signature in database
GPG Key ID: 625171324E7D3856
8 changed files with 123 additions and 4 deletions

View File

@ -521,6 +521,21 @@ class SnapshotsConfig(FrigateBaseModel):
)
class StorageS3Config(FrigateBaseModel):
enabled: bool = Field(default=False, title="S3 enabled.")
access_key_id: str = Field(default="", title="AWS_ACCESS_KEY_ID")
secret_access_key: str = Field(default="", title="AWS_SECRET_ACCESS_KEY")
bucket_name: str = Field(default="", title="Bucket name")
endpoint_url: str = Field(default="", title="Endpoint URL")
path: str = Field(default="", title="Base Path")
class StorageConfig(FrigateBaseModel):
s3: StorageS3Config = Field(
default_factory=StorageS3Config, title="S3 configuration"
)
class ColorConfig(FrigateBaseModel):
red: int = Field(default=255, ge=0, le=255, title="Red")
green: int = Field(default=255, ge=0, le=255, title="Green")
@ -880,6 +895,9 @@ class FrigateConfig(FrigateBaseModel):
snapshots: SnapshotsConfig = Field(
default_factory=SnapshotsConfig, title="Global snapshots configuration."
)
storage: StorageConfig = Field(
default_factory=StorageConfig, title="Global storage configuration."
)
rtmp: RtmpConfig = Field(
default_factory=RtmpConfig, title="Global RTMP restreaming configuration."
)

View File

@ -46,7 +46,7 @@ from frigate.util import (
vainfo_hwaccel,
get_tz_modifiers,
)
from frigate.storage import StorageMaintainer
from frigate.storage import StorageMaintainer, StorageS3
from frigate.version import VERSION
logger = logging.getLogger(__name__)
@ -1323,6 +1323,9 @@ def recordings(camera_name):
def recording_clip(camera_name, start_ts, end_ts):
download = request.args.get("download", type=bool)
if current_app.frigate_config.storage.s3.enabled:
s3 = StorageS3(current_app.frigate_config)
recordings = (
Recordings.select()
.where(
@ -1337,6 +1340,9 @@ def recording_clip(camera_name, start_ts, end_ts):
playlist_lines = []
clip: Recordings
for clip in recordings:
if recordings.storage == "s3":
clip.path = s3.download_file_from_s3(clip.path)
playlist_lines.append(f"file '{clip.path}'")
# if this is the starting clip, add an inpoint
if clip.start_time < start_ts:

View File

@ -66,3 +66,4 @@ class Recordings(Model): # type: ignore[misc]
motion = IntegerField(null=True)
objects = IntegerField(null=True)
segment_size = FloatField(default=0) # this should be stored as MB
storage = CharField(max_length=20)

View File

@ -8,12 +8,14 @@ import threading
from pathlib import Path
from peewee import DoesNotExist
import boto3
from multiprocessing.synchronize import Event as MpEvent
from frigate.config import RetainModeEnum, FrigateConfig
from frigate.const import RECORD_DIR, SECONDS_IN_DAY
from frigate.models import Event, Recordings, Timeline
from frigate.record.util import remove_empty_directories
from frigate.storage import StorageS3
logger = logging.getLogger(__name__)
@ -27,6 +29,9 @@ class RecordingCleanup(threading.Thread):
self.config = config
self.stop_event = stop_event
if self.config.storage.s3.enabled:
self.s3 = StorageS3(config)
def clean_tmp_clips(self) -> None:
# delete any clips more than 5 minutes old
for p in Path("/tmp/cache").rglob("clip_*.mp4"):
@ -54,6 +59,7 @@ class RecordingCleanup(threading.Thread):
)
deleted_recordings = set()
moved_recordings = set()
for recording in no_camera_recordings:
Path(recording.path).unlink(missing_ok=True)
deleted_recordings.add(recording.id)
@ -99,6 +105,7 @@ class RecordingCleanup(threading.Thread):
# TODO: expire segments based on segment stats according to config
event_start = 0
deleted_recordings = set()
moved_recordings = set()
for recording in recordings.objects().iterator():
keep = False
# Now look for a reason to keep this recording segment
@ -137,8 +144,16 @@ class RecordingCleanup(threading.Thread):
and recording.objects == 0
)
):
Path(recording.path).unlink(missing_ok=True)
deleted_recordings.add(recording.id)
if self.config.storage.s3.enabled:
s3path = self.s3.upload_file_to_s3(recording.path)
if s3path != "":
moved_recordings.add({"id": recording.id, "path": s3path})
else:
Path(recording.path).unlink(missing_ok=True)
else:
Path(recording.path).unlink(missing_ok=True)
deleted_recordings.add(recording.id)
# delete timeline entries relevant to this recording segment
Timeline.delete().where(
@ -158,6 +173,11 @@ class RecordingCleanup(threading.Thread):
Recordings.id << deleted_recordings_list[i : i + max_deletes]
).execute()
for recording in moved_recordings:
Recordings.update({Recordings.storage: "s3", Recordings.path: recording["path"]}).where(
Recordings.id == recording["id"]
).execute()
logger.debug(f"End camera: {camera}.")
logger.debug("End all cameras.")

View File

@ -1,7 +1,12 @@
"""Recordings Utilities."""
import os
import boto3
import tempfile
import logging
from frigate.config import FrigateConfig
logger = logging.getLogger(__name__)
def remove_empty_directories(directory: str) -> None:
# list all directories recursively and sort them by path,
@ -17,3 +22,4 @@ def remove_empty_directories(directory: str) -> None:
continue
if len(os.listdir(path)) == 0:
os.rmdir(path)

View File

@ -4,8 +4,11 @@ import logging
from pathlib import Path
import shutil
import threading
import boto3
from peewee import fn
import os
import tempfile
from frigate.config import FrigateConfig
from frigate.const import RECORD_DIR
@ -17,6 +20,54 @@ bandwidth_equation = Recordings.segment_size / (
)
class StorageS3:
def __init__(self, config: FrigateConfig) -> None:
self.config = config
if self.config.storage.s3.enabled:
self.s3_client = boto3.client(
"s3",
aws_access_key_id=self.config.storage.s3.access_key_id,
aws_secret_access_key=self.config.storage.s3.secret_access_key,
endpoint_url=self.config.storage.s3.endpoint_url,
)
self.s3_bucket = self.config.storage.s3.bucket_name
self.s3_path = self.config.storage.s3.path
def upload_file_to_s3(self, file_path) -> str:
try:
s3_filename = self.s3_path + "/" + os.path.relpath(file_path, RECORD_DIR)
self.s3_client.upload_file(file_path, self.s3_bucket, s3_filename)
except Exception as e:
logger.debug(
f"Error occurred while uploading {file_path} to S3 {s3_filename}: {e}"
)
return ""
return s3_filename
def download_file_from_s3(self, s3_file_name) -> str:
if self.config.storage.s3.enabled:
# Create a temporary directory
temp_dir = tempfile.gettempdir()
# Create a temporary file name with the same name as the original S3 file
local_file_path = os.path.join(temp_dir, os.path.basename(s3_file_name))
try:
# Download the file from S3
self.s3_client.download_file(
self.s3_bucket, s3_file_name, local_file_path
)
logger.debug(f"Downloaded {s3_file_name} to {local_file_path}")
return local_file_path
except Exception as e:
logger.debug(
f"Error occurred while downloading {s3_file_name} from S3: {e}"
)
return None
else:
return False
class StorageMaintainer(threading.Thread):
"""Maintain frigates recording storage."""

View File

@ -0,0 +1,16 @@
import datetime as dt
import peewee as pw
from playhouse.sqlite_ext import *
from decimal import ROUND_HALF_EVEN
from frigate.models import Recordings
def migrate(migrator, database, fake=False, **kwargs):
migrator.add_fields(
Recordings,
storage=pw.CharField(max_length=20,default="local"),
)
def rollback(migrator, database, fake=False, **kwargs):
migrator.remove_fields(Recordings, ["storage"])

View File

@ -1,3 +1,4 @@
boto3 == 1.26.*
click == 8.1.*
Flask == 2.3.*
imutils == 0.5.*