From e759fe4a55408ebdbba1c3cdc15da51c051a7040 Mon Sep 17 00:00:00 2001 From: Nick Mowen Date: Thu, 20 Jul 2023 11:09:24 -0600 Subject: [PATCH] Insert recordings as bulk instead of individually. --- frigate/record/maintainer.py | 46 ++++++++++++++++++++---------------- 1 file changed, 26 insertions(+), 20 deletions(-) diff --git a/frigate/record/maintainer.py b/frigate/record/maintainer.py index e1dabdf67..9587d1517 100644 --- a/frigate/record/maintainer.py +++ b/frigate/record/maintainer.py @@ -125,6 +125,7 @@ class RecordingMaintainer(threading.Thread): self.end_time_cache.pop(cache_path, None) grouped_recordings[camera] = grouped_recordings[camera][-keep_count:] + tasks = [] for camera, recordings in grouped_recordings.items(): # clear out all the object recording info for old frames while ( @@ -155,10 +156,13 @@ class RecordingMaintainer(threading.Thread): .order_by(Event.start_time) ) - await asyncio.gather( - *(self.validate_and_move_segment(camera, events, r) for r in recordings) + tasks.extend( + [self.validate_and_move_segment(camera, events, r) for r in recordings] ) + recordings_to_insert: list[Optional[Recordings]] = await asyncio.gather(*tasks) + Recordings.insert_many([r for r in recordings_to_insert if r is not None]) + async def validate_and_move_segment( self, camera: str, events: Event, recording: dict[str, any] ) -> None: @@ -225,7 +229,7 @@ class RecordingMaintainer(threading.Thread): if overlaps: record_mode = self.config.cameras[camera].record.events.retain.mode # move from cache to recordings immediately - self.store_segment( + return self.move_segment( camera, start_time, end_time, @@ -247,7 +251,7 @@ class RecordingMaintainer(threading.Thread): # else retain days includes this segment else: record_mode = self.config.cameras[camera].record.retain.mode - self.store_segment( + return self.move_segment( camera, start_time, end_time, duration, cache_path, record_mode ) @@ -290,7 +294,7 @@ class RecordingMaintainer(threading.Thread): return SegmentInfo(motion_count, active_count, round(average_dBFS)) - def store_segment( + def move_segment( self, camera: str, start_time: datetime.datetime, @@ -298,7 +302,7 @@ class RecordingMaintainer(threading.Thread): duration: float, cache_path: str, store_mode: RetainModeEnum, - ) -> None: + ) -> Optional[Recordings]: segment_info = self.segment_stats(camera, start_time, end_time) # check if the segment shouldn't be stored @@ -348,7 +352,7 @@ class RecordingMaintainer(threading.Thread): if p.returncode != 0: logger.error(f"Unable to convert {cache_path} to {file_path}") logger.error(p.stderr) - return + return None else: logger.debug( f"Copied {file_path} in {datetime.datetime.now().timestamp()-start_frame} seconds." @@ -368,19 +372,20 @@ class RecordingMaintainer(threading.Thread): rand_id = "".join( random.choices(string.ascii_lowercase + string.digits, k=6) ) - Recordings.create( - id=f"{start_time.timestamp()}-{rand_id}", - camera=camera, - path=file_path, - start_time=start_time.timestamp(), - end_time=end_time.timestamp(), - duration=duration, - motion=segment_info.motion_box_count, + + return { + Recordings.id: f"{start_time.timestamp()}-{rand_id}", + Recordings.camera: camera, + Recordings.path: file_path, + Recordings.start_time: start_time.timestamp(), + Recordings.end_time: end_time.timestamp(), + Recordings.duration: duration, + Recordings.motion: segment_info.motion_box_count, # TODO: update this to store list of active objects at some point - objects=segment_info.active_object_count, - dBFS=segment_info.average_dBFS, - segment_size=segment_size, - ) + Recordings.objects: segment_info.active_object_count, + Recordings.dBFS: segment_info.average_dBFS, + Recordings.segment_size: segment_size, + } except Exception as e: logger.error(f"Unable to store recording segment {cache_path}") Path(cache_path).unlink(missing_ok=True) @@ -388,10 +393,11 @@ class RecordingMaintainer(threading.Thread): # clear end_time cache self.end_time_cache.pop(cache_path, None) + return None def run(self) -> None: # Check for new files every 5 seconds - wait_time = 5.0 + wait_time = 0.0 while not self.stop_event.wait(wait_time): run_start = datetime.datetime.now().timestamp()