diff --git a/frigate/app.py b/frigate/app.py index 54d2825c8..5fd24798f 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -28,7 +28,7 @@ from frigate.object_processing import TrackedObjectProcessor from frigate.output import output_frames from frigate.plus import PlusApi from frigate.ptz import OnvifController -from frigate.record import RecordingCleanup, RecordingMaintainer +from frigate.record.record import manage_recordings from frigate.stats import StatsEmitter, stats_init from frigate.storage import StorageMaintainer from frigate.timeline import TimelineProcessor @@ -158,6 +158,20 @@ class FrigateApp: migrate_db.close() + def init_recording_manager(self) -> None: + recording_process = mp.Process( + target=manage_recordings, + name="recording_manager", + args=(self.config, self.recordings_info_queue), + ) + recording_process.daemon = True + self.recording_process = recording_process + recording_process.start() + logger.info(f"Recording process started: {recording_process.pid}") + + def bind_database(self) -> None: + """Bind db to the main process.""" + # NOTE: all db accessing processes need to be created before the db can be bound to the main process self.db = SqliteQueueDatabase(self.config.database.path) models = [Event, Recordings, Timeline] self.db.bind(models) @@ -318,16 +332,6 @@ class FrigateApp: self.event_cleanup = EventCleanup(self.config, self.stop_event) self.event_cleanup.start() - def start_recording_maintainer(self) -> None: - self.recording_maintainer = RecordingMaintainer( - self.config, self.recordings_info_queue, self.stop_event - ) - self.recording_maintainer.start() - - def start_recording_cleanup(self) -> None: - self.recording_cleanup = RecordingCleanup(self.config, self.stop_event) - self.recording_cleanup.start() - def start_storage_maintainer(self) -> None: self.storage_maintainer = StorageMaintainer(self.config, self.stop_event) self.storage_maintainer.start() @@ -390,6 +394,8 @@ class FrigateApp: self.init_queues() self.init_database() self.init_onvif() + self.init_recording_manager() + self.bind_database() self.init_dispatcher() except Exception as e: print(e) @@ -406,8 +412,6 @@ class FrigateApp: self.start_timeline_processor() self.start_event_processor() self.start_event_cleanup() - self.start_recording_maintainer() - self.start_recording_cleanup() self.start_stats_emitter() self.start_watchdog() self.check_shm() diff --git a/frigate/const.py b/frigate/const.py index f0d76d940..8e1e42bb9 100644 --- a/frigate/const.py +++ b/frigate/const.py @@ -8,7 +8,6 @@ CACHE_DIR = "/tmp/cache" YAML_EXT = (".yaml", ".yml") PLUS_ENV_VAR = "PLUS_API_KEY" PLUS_API_HOST = "https://api.frigate.video" -MAX_SEGMENT_DURATION = 600 BTBN_PATH = "/usr/lib/btbn-ffmpeg" # Regex Consts @@ -23,3 +22,8 @@ DRIVER_ENV_VAR = "LIBVA_DRIVER_NAME" DRIVER_AMD = "radeonsi" DRIVER_INTEL_i965 = "i965" DRIVER_INTEL_iHD = "iHD" + +# Record Values + +MAX_SEGMENT_DURATION = 600 +SECONDS_IN_DAY = 60 * 60 * 24 diff --git a/frigate/record/cleanup.py b/frigate/record/cleanup.py new file mode 100644 index 000000000..4ea3015b2 --- /dev/null +++ b/frigate/record/cleanup.py @@ -0,0 +1,245 @@ +"""Cleanup recordings that are expired based on retention config.""" + +import datetime +import itertools +import logging +import subprocess as sp +import threading +from pathlib import Path + +from peewee import DoesNotExist + +from frigate.config import RetainModeEnum, FrigateConfig +from frigate.const import RECORD_DIR, SECONDS_IN_DAY +from frigate.models import Event, Recordings +from frigate.record.util import remove_empty_directories + +logger = logging.getLogger(__name__) + + +class RecordingCleanup(threading.Thread): + def __init__(self, config: FrigateConfig, stop_event): + threading.Thread.__init__(self) + self.name = "recording_cleanup" + self.config = config + self.stop_event = stop_event + + def clean_tmp_clips(self): + # delete any clips more than 5 minutes old + for p in Path("/tmp/cache").rglob("clip_*.mp4"): + logger.debug(f"Checking tmp clip {p}.") + if p.stat().st_mtime < (datetime.datetime.now().timestamp() - 60 * 1): + logger.debug("Deleting tmp clip.") + + # empty contents of file before unlinking https://github.com/blakeblackshear/frigate/issues/4769 + with open(p, "w"): + pass + p.unlink(missing_ok=True) + + def expire_recordings(self): + logger.debug("Start expire recordings (new).") + + logger.debug("Start deleted cameras.") + # Handle deleted cameras + expire_days = self.config.record.retain.days + expire_before = ( + datetime.datetime.now() - datetime.timedelta(days=expire_days) + ).timestamp() + no_camera_recordings: Recordings = Recordings.select().where( + Recordings.camera.not_in(list(self.config.cameras.keys())), + Recordings.end_time < expire_before, + ) + + deleted_recordings = set() + for recording in no_camera_recordings: + Path(recording.path).unlink(missing_ok=True) + deleted_recordings.add(recording.id) + + logger.debug(f"Expiring {len(deleted_recordings)} recordings") + Recordings.delete().where(Recordings.id << deleted_recordings).execute() + logger.debug("End deleted cameras.") + + logger.debug("Start all cameras.") + for camera, config in self.config.cameras.items(): + logger.debug(f"Start camera: {camera}.") + # Get the timestamp for cutoff of retained days + expire_days = config.record.retain.days + expire_date = ( + datetime.datetime.now() - datetime.timedelta(days=expire_days) + ).timestamp() + + # Get recordings to check for expiration + recordings: Recordings = ( + Recordings.select() + .where( + Recordings.camera == camera, + Recordings.end_time < expire_date, + ) + .order_by(Recordings.start_time) + ) + + # Get all the events to check against + events: Event = ( + Event.select() + .where( + Event.camera == camera, + # need to ensure segments for all events starting + # before the expire date are included + Event.start_time < expire_date, + Event.has_clip, + ) + .order_by(Event.start_time) + .objects() + ) + + # loop over recordings and see if they overlap with any non-expired events + # TODO: expire segments based on segment stats according to config + event_start = 0 + deleted_recordings = set() + for recording in recordings.objects().iterator(): + keep = False + # Now look for a reason to keep this recording segment + for idx in range(event_start, len(events)): + event = events[idx] + + # if the event starts in the future, stop checking events + # and let this recording segment expire + if event.start_time > recording.end_time: + keep = False + break + + # if the event is in progress or ends after the recording starts, keep it + # and stop looking at events + if event.end_time is None or event.end_time >= recording.start_time: + keep = True + break + + # if the event ends before this recording segment starts, skip + # this event and check the next event for an overlap. + # since the events and recordings are sorted, we can skip events + # that end before the previous recording segment started on future segments + if event.end_time < recording.start_time: + event_start = idx + + # Delete recordings outside of the retention window or based on the retention mode + if ( + not keep + or ( + config.record.events.retain.mode == RetainModeEnum.motion + and recording.motion == 0 + ) + or ( + config.record.events.retain.mode + == RetainModeEnum.active_objects + and recording.objects == 0 + ) + ): + Path(recording.path).unlink(missing_ok=True) + deleted_recordings.add(recording.id) + + logger.debug(f"Expiring {len(deleted_recordings)} recordings") + # delete up to 100,000 at a time + max_deletes = 100000 + deleted_recordings_list = list(deleted_recordings) + for i in range(0, len(deleted_recordings_list), max_deletes): + Recordings.delete().where( + Recordings.id << deleted_recordings_list[i : i + max_deletes] + ).execute() + + logger.debug(f"End camera: {camera}.") + + logger.debug("End all cameras.") + logger.debug("End expire recordings (new).") + + def expire_files(self): + logger.debug("Start expire files (legacy).") + + default_expire = ( + datetime.datetime.now().timestamp() + - SECONDS_IN_DAY * self.config.record.retain.days + ) + delete_before = {} + + for name, camera in self.config.cameras.items(): + delete_before[name] = ( + datetime.datetime.now().timestamp() + - SECONDS_IN_DAY * camera.record.retain.days + ) + + # find all the recordings older than the oldest recording in the db + try: + oldest_recording = Recordings.select().order_by(Recordings.start_time).get() + + p = Path(oldest_recording.path) + oldest_timestamp = p.stat().st_mtime - 1 + except DoesNotExist: + oldest_timestamp = datetime.datetime.now().timestamp() + except FileNotFoundError: + logger.warning(f"Unable to find file from recordings database: {p}") + Recordings.delete().where(Recordings.id == oldest_recording.id).execute() + return + + logger.debug(f"Oldest recording in the db: {oldest_timestamp}") + process = sp.run( + ["find", RECORD_DIR, "-type", "f", "!", "-newermt", f"@{oldest_timestamp}"], + capture_output=True, + text=True, + ) + files_to_check = process.stdout.splitlines() + + for f in files_to_check: + p = Path(f) + try: + if p.stat().st_mtime < delete_before.get(p.parent.name, default_expire): + p.unlink(missing_ok=True) + except FileNotFoundError: + logger.warning(f"Attempted to expire missing file: {f}") + + logger.debug("End expire files (legacy).") + + def sync_recordings(self): + logger.debug("Start sync recordings.") + + # get all recordings in the db + recordings: Recordings = Recordings.select() + + # get all recordings files on disk + process = sp.run( + ["find", RECORD_DIR, "-type", "f"], + capture_output=True, + text=True, + ) + files_on_disk = process.stdout.splitlines() + + recordings_to_delete = [] + for recording in recordings.objects().iterator(): + if not recording.path in files_on_disk: + recordings_to_delete.append(recording.id) + + logger.debug( + f"Deleting {len(recordings_to_delete)} recordings with missing files" + ) + # delete up to 100,000 at a time + max_deletes = 100000 + for i in range(0, len(recordings_to_delete), max_deletes): + Recordings.delete().where( + Recordings.id << recordings_to_delete[i : i + max_deletes] + ).execute() + + logger.debug("End sync recordings.") + + def run(self): + # on startup sync recordings with disk (disabled due to too much CPU usage) + # self.sync_recordings() + + # Expire tmp clips every minute, recordings and clean directories every hour. + for counter in itertools.cycle(range(self.config.record.expire_interval)): + if self.stop_event.wait(60): + logger.info(f"Exiting recording cleanup...") + break + self.clean_tmp_clips() + + if counter == 0: + self.expire_recordings() + self.expire_files() + remove_empty_directories(RECORD_DIR) diff --git a/frigate/record.py b/frigate/record/maintainer.py similarity index 60% rename from frigate/record.py rename to frigate/record/maintainer.py index 4ec3ff9a1..fd9846e56 100644 --- a/frigate/record.py +++ b/frigate/record/maintainer.py @@ -1,5 +1,6 @@ +"""Maintain recording segments in cache.""" + import datetime -import itertools import logging import multiprocessing as mp import os @@ -12,7 +13,6 @@ from collections import defaultdict from pathlib import Path import psutil -from peewee import JOIN, DoesNotExist from frigate.config import RetainModeEnum, FrigateConfig from frigate.const import CACHE_DIR, MAX_SEGMENT_DURATION, RECORD_DIR @@ -21,24 +21,6 @@ from frigate.util import area logger = logging.getLogger(__name__) -SECONDS_IN_DAY = 60 * 60 * 24 - - -def remove_empty_directories(directory): - # list all directories recursively and sort them by path, - # longest first - paths = sorted( - [x[0] for x in os.walk(RECORD_DIR)], - key=lambda p: len(str(p)), - reverse=True, - ) - for path in paths: - # don't delete the parent - if path == RECORD_DIR: - continue - if len(os.listdir(path)) == 0: - os.rmdir(path) - class RecordingMaintainer(threading.Thread): def __init__( @@ -403,231 +385,3 @@ class RecordingMaintainer(threading.Thread): wait_time = max(0, 5 - duration) logger.info(f"Exiting recording maintenance...") - - -class RecordingCleanup(threading.Thread): - def __init__(self, config: FrigateConfig, stop_event): - threading.Thread.__init__(self) - self.name = "recording_cleanup" - self.config = config - self.stop_event = stop_event - - def clean_tmp_clips(self): - # delete any clips more than 5 minutes old - for p in Path("/tmp/cache").rglob("clip_*.mp4"): - logger.debug(f"Checking tmp clip {p}.") - if p.stat().st_mtime < (datetime.datetime.now().timestamp() - 60 * 1): - logger.debug("Deleting tmp clip.") - - # empty contents of file before unlinking https://github.com/blakeblackshear/frigate/issues/4769 - with open(p, "w"): - pass - p.unlink(missing_ok=True) - - def expire_recordings(self): - logger.debug("Start expire recordings (new).") - - logger.debug("Start deleted cameras.") - # Handle deleted cameras - expire_days = self.config.record.retain.days - expire_before = ( - datetime.datetime.now() - datetime.timedelta(days=expire_days) - ).timestamp() - no_camera_recordings: Recordings = Recordings.select().where( - Recordings.camera.not_in(list(self.config.cameras.keys())), - Recordings.end_time < expire_before, - ) - - deleted_recordings = set() - for recording in no_camera_recordings: - Path(recording.path).unlink(missing_ok=True) - deleted_recordings.add(recording.id) - - logger.debug(f"Expiring {len(deleted_recordings)} recordings") - Recordings.delete().where(Recordings.id << deleted_recordings).execute() - logger.debug("End deleted cameras.") - - logger.debug("Start all cameras.") - for camera, config in self.config.cameras.items(): - logger.debug(f"Start camera: {camera}.") - # Get the timestamp for cutoff of retained days - expire_days = config.record.retain.days - expire_date = ( - datetime.datetime.now() - datetime.timedelta(days=expire_days) - ).timestamp() - - # Get recordings to check for expiration - recordings: Recordings = ( - Recordings.select() - .where( - Recordings.camera == camera, - Recordings.end_time < expire_date, - ) - .order_by(Recordings.start_time) - ) - - # Get all the events to check against - events: Event = ( - Event.select() - .where( - Event.camera == camera, - # need to ensure segments for all events starting - # before the expire date are included - Event.start_time < expire_date, - Event.has_clip, - ) - .order_by(Event.start_time) - .objects() - ) - - # loop over recordings and see if they overlap with any non-expired events - # TODO: expire segments based on segment stats according to config - event_start = 0 - deleted_recordings = set() - for recording in recordings.objects().iterator(): - keep = False - # Now look for a reason to keep this recording segment - for idx in range(event_start, len(events)): - event = events[idx] - - # if the event starts in the future, stop checking events - # and let this recording segment expire - if event.start_time > recording.end_time: - keep = False - break - - # if the event is in progress or ends after the recording starts, keep it - # and stop looking at events - if event.end_time is None or event.end_time >= recording.start_time: - keep = True - break - - # if the event ends before this recording segment starts, skip - # this event and check the next event for an overlap. - # since the events and recordings are sorted, we can skip events - # that end before the previous recording segment started on future segments - if event.end_time < recording.start_time: - event_start = idx - - # Delete recordings outside of the retention window or based on the retention mode - if ( - not keep - or ( - config.record.events.retain.mode == RetainModeEnum.motion - and recording.motion == 0 - ) - or ( - config.record.events.retain.mode - == RetainModeEnum.active_objects - and recording.objects == 0 - ) - ): - Path(recording.path).unlink(missing_ok=True) - deleted_recordings.add(recording.id) - - logger.debug(f"Expiring {len(deleted_recordings)} recordings") - # delete up to 100,000 at a time - max_deletes = 100000 - deleted_recordings_list = list(deleted_recordings) - for i in range(0, len(deleted_recordings_list), max_deletes): - Recordings.delete().where( - Recordings.id << deleted_recordings_list[i : i + max_deletes] - ).execute() - - logger.debug(f"End camera: {camera}.") - - logger.debug("End all cameras.") - logger.debug("End expire recordings (new).") - - def expire_files(self): - logger.debug("Start expire files (legacy).") - - default_expire = ( - datetime.datetime.now().timestamp() - - SECONDS_IN_DAY * self.config.record.retain.days - ) - delete_before = {} - - for name, camera in self.config.cameras.items(): - delete_before[name] = ( - datetime.datetime.now().timestamp() - - SECONDS_IN_DAY * camera.record.retain.days - ) - - # find all the recordings older than the oldest recording in the db - try: - oldest_recording = Recordings.select().order_by(Recordings.start_time).get() - - p = Path(oldest_recording.path) - oldest_timestamp = p.stat().st_mtime - 1 - except DoesNotExist: - oldest_timestamp = datetime.datetime.now().timestamp() - except FileNotFoundError: - logger.warning(f"Unable to find file from recordings database: {p}") - Recordings.delete().where(Recordings.id == oldest_recording.id).execute() - return - - logger.debug(f"Oldest recording in the db: {oldest_timestamp}") - process = sp.run( - ["find", RECORD_DIR, "-type", "f", "!", "-newermt", f"@{oldest_timestamp}"], - capture_output=True, - text=True, - ) - files_to_check = process.stdout.splitlines() - - for f in files_to_check: - p = Path(f) - try: - if p.stat().st_mtime < delete_before.get(p.parent.name, default_expire): - p.unlink(missing_ok=True) - except FileNotFoundError: - logger.warning(f"Attempted to expire missing file: {f}") - - logger.debug("End expire files (legacy).") - - def sync_recordings(self): - logger.debug("Start sync recordings.") - - # get all recordings in the db - recordings: Recordings = Recordings.select() - - # get all recordings files on disk - process = sp.run( - ["find", RECORD_DIR, "-type", "f"], - capture_output=True, - text=True, - ) - files_on_disk = process.stdout.splitlines() - - recordings_to_delete = [] - for recording in recordings.objects().iterator(): - if not recording.path in files_on_disk: - recordings_to_delete.append(recording.id) - - logger.debug( - f"Deleting {len(recordings_to_delete)} recordings with missing files" - ) - # delete up to 100,000 at a time - max_deletes = 100000 - for i in range(0, len(recordings_to_delete), max_deletes): - Recordings.delete().where( - Recordings.id << recordings_to_delete[i : i + max_deletes] - ).execute() - - logger.debug("End sync recordings.") - - def run(self): - # on startup sync recordings with disk (disabled due to too much CPU usage) - # self.sync_recordings() - - # Expire tmp clips every minute, recordings and clean directories every hour. - for counter in itertools.cycle(range(self.config.record.expire_interval)): - if self.stop_event.wait(60): - logger.info(f"Exiting recording cleanup...") - break - self.clean_tmp_clips() - - if counter == 0: - self.expire_recordings() - self.expire_files() - remove_empty_directories(RECORD_DIR) diff --git a/frigate/record/record.py b/frigate/record/record.py new file mode 100644 index 000000000..a63a18a7c --- /dev/null +++ b/frigate/record/record.py @@ -0,0 +1,47 @@ +"""Run recording maintainer and cleanup.""" + +import logging +import multiprocessing as mp +import signal +import threading + +from setproctitle import setproctitle + +from playhouse.sqliteq import SqliteQueueDatabase + +from frigate.config import FrigateConfig +from frigate.models import Event, Recordings, Timeline +from frigate.record.cleanup import RecordingCleanup +from frigate.record.maintainer import RecordingMaintainer +from frigate.util import listen + +logger = logging.getLogger(__name__) + + +def manage_recordings( + config: FrigateConfig, + recordings_info_queue: mp.Queue, +): + stop_event = mp.Event() + + def receiveSignal(signalNumber, frame): + stop_event.set() + + signal.signal(signal.SIGTERM, receiveSignal) + signal.signal(signal.SIGINT, receiveSignal) + + threading.current_thread().name = "process:recording_manager" + setproctitle("frigate.recording_manager") + listen() + + db = SqliteQueueDatabase(config.database.path) + models = [Event, Recordings, Timeline] + db.bind(models) + + maintainer = RecordingMaintainer(config, recordings_info_queue, stop_event) + maintainer.start() + + cleanup = RecordingCleanup(config, stop_event) + cleanup.start() + + logger.info("recording_manager: exiting subprocess") diff --git a/frigate/record/util.py b/frigate/record/util.py new file mode 100644 index 000000000..d02f83f55 --- /dev/null +++ b/frigate/record/util.py @@ -0,0 +1,21 @@ +"""Recordings Utilities.""" + +import os + +from frigate.const import RECORD_DIR + + +def remove_empty_directories(directory): + # list all directories recursively and sort them by path, + # longest first + paths = sorted( + [x[0] for x in os.walk(RECORD_DIR)], + key=lambda p: len(str(p)), + reverse=True, + ) + for path in paths: + # don't delete the parent + if path == RECORD_DIR: + continue + if len(os.listdir(path)) == 0: + os.rmdir(path)