From 5f4a107e8e4fede5a2971681c3c9564690a3af36 Mon Sep 17 00:00:00 2001 From: Sergey Krashevich Date: Tue, 23 May 2023 19:59:38 +0300 Subject: [PATCH] Add support for S3 storage configuration and enable S3 download and upload in recording clips and cleanup. Add a storage type field to recordings --- frigate/config.py | 18 ++++++++ frigate/http.py | 8 +++- frigate/models.py | 1 + frigate/record/cleanup.py | 24 ++++++++++- frigate/record/util.py | 6 +++ frigate/storage.py | 53 +++++++++++++++++++++++- migrations/017_recording_storage_type.py | 16 +++++++ requirements-wheels.txt | 1 + 8 files changed, 123 insertions(+), 4 deletions(-) create mode 100644 migrations/017_recording_storage_type.py diff --git a/frigate/config.py b/frigate/config.py index 56e4dc496..88d1d6554 100644 --- a/frigate/config.py +++ b/frigate/config.py @@ -521,6 +521,21 @@ class SnapshotsConfig(FrigateBaseModel): ) +class StorageS3Config(FrigateBaseModel): + enabled: bool = Field(default=False, title="S3 enabled.") + access_key_id: str = Field(default="", title="AWS_ACCESS_KEY_ID") + secret_access_key: str = Field(default="", title="AWS_SECRET_ACCESS_KEY") + bucket_name: str = Field(default="", title="Bucket name") + endpoint_url: str = Field(default="", title="Endpoint URL") + path: str = Field(default="", title="Base Path") + + +class StorageConfig(FrigateBaseModel): + s3: StorageS3Config = Field( + default_factory=StorageS3Config, title="S3 configuration" + ) + + class ColorConfig(FrigateBaseModel): red: int = Field(default=255, ge=0, le=255, title="Red") green: int = Field(default=255, ge=0, le=255, title="Green") @@ -880,6 +895,9 @@ class FrigateConfig(FrigateBaseModel): snapshots: SnapshotsConfig = Field( default_factory=SnapshotsConfig, title="Global snapshots configuration." ) + storage: StorageConfig = Field( + default_factory=StorageConfig, title="Global storage configuration." + ) rtmp: RtmpConfig = Field( default_factory=RtmpConfig, title="Global RTMP restreaming configuration." ) diff --git a/frigate/http.py b/frigate/http.py index de9fd033c..6e9b2a82b 100644 --- a/frigate/http.py +++ b/frigate/http.py @@ -46,7 +46,7 @@ from frigate.util import ( vainfo_hwaccel, get_tz_modifiers, ) -from frigate.storage import StorageMaintainer +from frigate.storage import StorageMaintainer, StorageS3 from frigate.version import VERSION logger = logging.getLogger(__name__) @@ -1323,6 +1323,9 @@ def recordings(camera_name): def recording_clip(camera_name, start_ts, end_ts): download = request.args.get("download", type=bool) + if current_app.frigate_config.storage.s3.enabled: + s3 = StorageS3(current_app.frigate_config) + recordings = ( Recordings.select() .where( @@ -1337,6 +1340,9 @@ def recording_clip(camera_name, start_ts, end_ts): playlist_lines = [] clip: Recordings for clip in recordings: + if recordings.storage == "s3": + clip.path = s3.download_file_from_s3(clip.path) + playlist_lines.append(f"file '{clip.path}'") # if this is the starting clip, add an inpoint if clip.start_time < start_ts: diff --git a/frigate/models.py b/frigate/models.py index 0d5f1ab4a..175ef9f14 100644 --- a/frigate/models.py +++ b/frigate/models.py @@ -66,3 +66,4 @@ class Recordings(Model): # type: ignore[misc] motion = IntegerField(null=True) objects = IntegerField(null=True) segment_size = FloatField(default=0) # this should be stored as MB + storage = CharField(max_length=20) diff --git a/frigate/record/cleanup.py b/frigate/record/cleanup.py index 605979ee4..78e5faba5 100644 --- a/frigate/record/cleanup.py +++ b/frigate/record/cleanup.py @@ -8,12 +8,14 @@ import threading from pathlib import Path from peewee import DoesNotExist +import boto3 from multiprocessing.synchronize import Event as MpEvent from frigate.config import RetainModeEnum, FrigateConfig from frigate.const import RECORD_DIR, SECONDS_IN_DAY from frigate.models import Event, Recordings, Timeline from frigate.record.util import remove_empty_directories +from frigate.storage import StorageS3 logger = logging.getLogger(__name__) @@ -27,6 +29,9 @@ class RecordingCleanup(threading.Thread): self.config = config self.stop_event = stop_event + if self.config.storage.s3.enabled: + self.s3 = StorageS3(config) + def clean_tmp_clips(self) -> None: # delete any clips more than 5 minutes old for p in Path("/tmp/cache").rglob("clip_*.mp4"): @@ -54,6 +59,7 @@ class RecordingCleanup(threading.Thread): ) deleted_recordings = set() + moved_recordings = set() for recording in no_camera_recordings: Path(recording.path).unlink(missing_ok=True) deleted_recordings.add(recording.id) @@ -99,6 +105,7 @@ class RecordingCleanup(threading.Thread): # TODO: expire segments based on segment stats according to config event_start = 0 deleted_recordings = set() + moved_recordings = set() for recording in recordings.objects().iterator(): keep = False # Now look for a reason to keep this recording segment @@ -137,8 +144,16 @@ class RecordingCleanup(threading.Thread): and recording.objects == 0 ) ): - Path(recording.path).unlink(missing_ok=True) - deleted_recordings.add(recording.id) + if self.config.storage.s3.enabled: + s3path = self.s3.upload_file_to_s3(recording.path) + if s3path != "": + moved_recordings.add({"id": recording.id, "path": s3path}) + else: + Path(recording.path).unlink(missing_ok=True) + else: + Path(recording.path).unlink(missing_ok=True) + deleted_recordings.add(recording.id) + # delete timeline entries relevant to this recording segment Timeline.delete().where( @@ -158,6 +173,11 @@ class RecordingCleanup(threading.Thread): Recordings.id << deleted_recordings_list[i : i + max_deletes] ).execute() + for recording in moved_recordings: + Recordings.update({Recordings.storage: "s3", Recordings.path: recording["path"]}).where( + Recordings.id == recording["id"] + ).execute() + logger.debug(f"End camera: {camera}.") logger.debug("End all cameras.") diff --git a/frigate/record/util.py b/frigate/record/util.py index d9692c25e..9737c5968 100644 --- a/frigate/record/util.py +++ b/frigate/record/util.py @@ -1,7 +1,12 @@ """Recordings Utilities.""" import os +import boto3 +import tempfile +import logging +from frigate.config import FrigateConfig +logger = logging.getLogger(__name__) def remove_empty_directories(directory: str) -> None: # list all directories recursively and sort them by path, @@ -17,3 +22,4 @@ def remove_empty_directories(directory: str) -> None: continue if len(os.listdir(path)) == 0: os.rmdir(path) + diff --git a/frigate/storage.py b/frigate/storage.py index 5b66abbdc..eac4c04c1 100644 --- a/frigate/storage.py +++ b/frigate/storage.py @@ -4,8 +4,11 @@ import logging from pathlib import Path import shutil import threading - +import boto3 from peewee import fn +import os +import tempfile + from frigate.config import FrigateConfig from frigate.const import RECORD_DIR @@ -17,6 +20,54 @@ bandwidth_equation = Recordings.segment_size / ( ) +class StorageS3: + def __init__(self, config: FrigateConfig) -> None: + self.config = config + if self.config.storage.s3.enabled: + self.s3_client = boto3.client( + "s3", + aws_access_key_id=self.config.storage.s3.access_key_id, + aws_secret_access_key=self.config.storage.s3.secret_access_key, + endpoint_url=self.config.storage.s3.endpoint_url, + ) + self.s3_bucket = self.config.storage.s3.bucket_name + self.s3_path = self.config.storage.s3.path + + def upload_file_to_s3(self, file_path) -> str: + try: + s3_filename = self.s3_path + "/" + os.path.relpath(file_path, RECORD_DIR) + self.s3_client.upload_file(file_path, self.s3_bucket, s3_filename) + except Exception as e: + logger.debug( + f"Error occurred while uploading {file_path} to S3 {s3_filename}: {e}" + ) + return "" + return s3_filename + + def download_file_from_s3(self, s3_file_name) -> str: + if self.config.storage.s3.enabled: + # Create a temporary directory + temp_dir = tempfile.gettempdir() + + # Create a temporary file name with the same name as the original S3 file + local_file_path = os.path.join(temp_dir, os.path.basename(s3_file_name)) + + try: + # Download the file from S3 + self.s3_client.download_file( + self.s3_bucket, s3_file_name, local_file_path + ) + logger.debug(f"Downloaded {s3_file_name} to {local_file_path}") + return local_file_path + except Exception as e: + logger.debug( + f"Error occurred while downloading {s3_file_name} from S3: {e}" + ) + return None + else: + return False + + class StorageMaintainer(threading.Thread): """Maintain frigates recording storage.""" diff --git a/migrations/017_recording_storage_type.py b/migrations/017_recording_storage_type.py new file mode 100644 index 000000000..2a6062eea --- /dev/null +++ b/migrations/017_recording_storage_type.py @@ -0,0 +1,16 @@ +import datetime as dt +import peewee as pw +from playhouse.sqlite_ext import * +from decimal import ROUND_HALF_EVEN +from frigate.models import Recordings + + +def migrate(migrator, database, fake=False, **kwargs): + migrator.add_fields( + Recordings, + storage=pw.CharField(max_length=20,default="local"), + ) + + +def rollback(migrator, database, fake=False, **kwargs): + migrator.remove_fields(Recordings, ["storage"]) diff --git a/requirements-wheels.txt b/requirements-wheels.txt index d785df5d6..5d193a5d3 100644 --- a/requirements-wheels.txt +++ b/requirements-wheels.txt @@ -1,3 +1,4 @@ +boto3 == 1.26.* click == 8.1.* Flask == 2.3.* imutils == 0.5.*