Make camera recordings mover asynchronous

This commit is contained in:
Nick Mowen 2023-06-14 16:21:17 -06:00
parent ff90db30e6
commit c217c34e3d

View File

@ -1,5 +1,6 @@
"""Maintain recording segments in cache."""
import asyncio
import datetime
import logging
import multiprocessing as mp
@ -13,6 +14,7 @@ from collections import defaultdict
from multiprocessing.synchronize import Event as MpEvent
from pathlib import Path
from typing import Any, Tuple
import traceback
import psutil
@ -42,7 +44,7 @@ class RecordingMaintainer(threading.Thread):
self.recordings_info: dict[str, Any] = defaultdict(list)
self.end_time_cache: dict[str, Tuple[datetime.datetime, float]] = {}
def move_files(self) -> None:
async def move_files(self) -> None:
cache_files = sorted(
[
d
@ -121,115 +123,111 @@ class RecordingMaintainer(threading.Thread):
)
.order_by(Event.start_time)
)
for r in recordings:
cache_path = r["cache_path"]
start_time = r["start_time"]
# Just delete files if recordings are turned off
if (
camera not in self.config.cameras
or not self.process_info[camera]["record_enabled"].value
):
await asyncio.gather(
*(self.validate_segment(camera, events, r) for r in recordings)
)
async def validate_segment(
self, camera: str, events: Event, recording: dict[str, any]
) -> None:
cache_path = recording["cache_path"]
start_time = recording["start_time"]
# Just delete files if recordings are turned off
if (
camera not in self.config.cameras
or not self.process_info[camera]["record_enabled"].value
):
Path(cache_path).unlink(missing_ok=True)
self.end_time_cache.pop(cache_path, None)
return
if cache_path in self.end_time_cache:
end_time, duration = self.end_time_cache[cache_path]
else:
ffprobe_cmd = [
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
f"{cache_path}",
]
p = sp.run(ffprobe_cmd, capture_output=True)
if p.returncode == 0 and p.stdout.decode():
duration = float(p.stdout.decode().strip())
else:
duration = -1
# ensure duration is within expected length
if 0 < duration < MAX_SEGMENT_DURATION:
end_time = start_time + datetime.timedelta(seconds=duration)
self.end_time_cache[cache_path] = (end_time, duration)
else:
if duration == -1:
logger.warning(
f"Failed to probe corrupt segment {cache_path} : {p.returncode} - {str(p.stderr)}"
)
logger.warning(f"Discarding a corrupt recording segment: {cache_path}")
Path(cache_path).unlink(missing_ok=True)
return
# if cached file's start_time is earlier than the retain days for the camera
if start_time <= (
(
datetime.datetime.now()
- datetime.timedelta(
days=self.config.cameras[camera].record.retain.days
)
)
):
# if the cached segment overlaps with the events:
overlaps = False
for event in events:
# if the event starts in the future, stop checking events
# and remove this segment
if event.start_time > end_time.timestamp():
overlaps = False
Path(cache_path).unlink(missing_ok=True)
self.end_time_cache.pop(cache_path, None)
continue
break
if cache_path in self.end_time_cache:
end_time, duration = self.end_time_cache[cache_path]
else:
ffprobe_cmd = [
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
f"{cache_path}",
]
p = sp.run(ffprobe_cmd, capture_output=True)
if p.returncode == 0 and p.stdout.decode():
duration = float(p.stdout.decode().strip())
else:
duration = -1
# if the event is in progress or ends after the recording starts, keep it
# and stop looking at events
if event.end_time is None or event.end_time >= start_time.timestamp():
overlaps = True
break
# ensure duration is within expected length
if 0 < duration < MAX_SEGMENT_DURATION:
end_time = start_time + datetime.timedelta(seconds=duration)
self.end_time_cache[cache_path] = (end_time, duration)
else:
if duration == -1:
logger.warning(
f"Failed to probe corrupt segment {cache_path} : {p.returncode} - {str(p.stderr)}"
)
logger.warning(
f"Discarding a corrupt recording segment: {cache_path}"
)
Path(cache_path).unlink(missing_ok=True)
continue
# if cached file's start_time is earlier than the retain days for the camera
if start_time <= (
(
datetime.datetime.now()
- datetime.timedelta(
days=self.config.cameras[camera].record.retain.days
)
)
):
# if the cached segment overlaps with the events:
overlaps = False
for event in events:
# if the event starts in the future, stop checking events
# and remove this segment
if event.start_time > end_time.timestamp():
overlaps = False
Path(cache_path).unlink(missing_ok=True)
self.end_time_cache.pop(cache_path, None)
break
# if the event is in progress or ends after the recording starts, keep it
# and stop looking at events
if (
event.end_time is None
or event.end_time >= start_time.timestamp()
):
overlaps = True
break
if overlaps:
record_mode = self.config.cameras[
camera
].record.events.retain.mode
# move from cache to recordings immediately
self.store_segment(
camera,
start_time,
end_time,
duration,
cache_path,
record_mode,
)
# if it doesn't overlap with an event, go ahead and drop the segment
# if it ends more than the configured pre_capture for the camera
else:
pre_capture = self.config.cameras[
camera
].record.events.pre_capture
most_recently_processed_frame_time = self.recordings_info[
camera
][-1][0]
retain_cutoff = most_recently_processed_frame_time - pre_capture
if end_time.timestamp() < retain_cutoff:
Path(cache_path).unlink(missing_ok=True)
self.end_time_cache.pop(cache_path, None)
# else retain days includes this segment
else:
record_mode = self.config.cameras[camera].record.retain.mode
self.store_segment(
camera, start_time, end_time, duration, cache_path, record_mode
)
if overlaps:
record_mode = self.config.cameras[camera].record.events.retain.mode
# move from cache to recordings immediately
self.store_segment(
camera,
start_time,
end_time,
duration,
cache_path,
record_mode,
)
# if it doesn't overlap with an event, go ahead and drop the segment
# if it ends more than the configured pre_capture for the camera
else:
pre_capture = self.config.cameras[camera].record.events.pre_capture
most_recently_processed_frame_time = self.recordings_info[camera][-1][0]
retain_cutoff = most_recently_processed_frame_time - pre_capture
if end_time.timestamp() < retain_cutoff:
Path(cache_path).unlink(missing_ok=True)
self.end_time_cache.pop(cache_path, None)
# else retain days includes this segment
else:
record_mode = self.config.cameras[camera].record.retain.mode
self.store_segment(
camera, start_time, end_time, duration, cache_path, record_mode
)
def segment_stats(
self, camera: str, start_time: datetime.datetime, end_time: datetime.datetime
@ -386,12 +384,13 @@ class RecordingMaintainer(threading.Thread):
break
try:
self.move_files()
asyncio.run(self.move_files())
except Exception as e:
logger.error(
"Error occurred when attempting to maintain recording cache"
)
logger.error(e)
logger.error(str(traceback.print_exc()))
duration = datetime.datetime.now().timestamp() - run_start
wait_time = max(0, 5 - duration)