Cleanup recording mypy

This commit is contained in:
Nicolas Mowen 2026-03-25 13:37:45 -06:00
parent 3f04b98e93
commit b41054eaee
4 changed files with 46 additions and 34 deletions

View File

@ -68,6 +68,12 @@ ignore_errors = false
[mypy-frigate.ptz] [mypy-frigate.ptz]
ignore_errors = false ignore_errors = false
[mypy-frigate.record.*]
ignore_errors = false
[mypy-frigate.service_manager.*]
ignore_errors = false
[mypy-frigate.stats] [mypy-frigate.stats]
ignore_errors = false ignore_errors = false
@ -89,7 +95,3 @@ ignore_errors = false
[mypy-frigate.watchdog] [mypy-frigate.watchdog]
ignore_errors = false ignore_errors = false
disallow_untyped_calls = false disallow_untyped_calls = false
[mypy-frigate.service_manager.*]
ignore_errors = false

View File

@ -7,6 +7,7 @@ import os
import threading import threading
from multiprocessing.synchronize import Event as MpEvent from multiprocessing.synchronize import Event as MpEvent
from pathlib import Path from pathlib import Path
from typing import Any
from playhouse.sqlite_ext import SqliteExtDatabase from playhouse.sqlite_ext import SqliteExtDatabase
@ -60,7 +61,9 @@ class RecordingCleanup(threading.Thread):
db.execute_sql("PRAGMA wal_checkpoint(TRUNCATE);") db.execute_sql("PRAGMA wal_checkpoint(TRUNCATE);")
db.close() db.close()
def expire_review_segments(self, config: CameraConfig, now: datetime) -> set[Path]: def expire_review_segments(
self, config: CameraConfig, now: datetime.datetime
) -> set[Path]:
"""Delete review segments that are expired""" """Delete review segments that are expired"""
alert_expire_date = ( alert_expire_date = (
now - datetime.timedelta(days=config.record.alerts.retain.days) now - datetime.timedelta(days=config.record.alerts.retain.days)
@ -68,7 +71,7 @@ class RecordingCleanup(threading.Thread):
detection_expire_date = ( detection_expire_date = (
now - datetime.timedelta(days=config.record.detections.retain.days) now - datetime.timedelta(days=config.record.detections.retain.days)
).timestamp() ).timestamp()
expired_reviews: ReviewSegment = ( expired_reviews = (
ReviewSegment.select(ReviewSegment.id, ReviewSegment.thumb_path) ReviewSegment.select(ReviewSegment.id, ReviewSegment.thumb_path)
.where(ReviewSegment.camera == config.name) .where(ReviewSegment.camera == config.name)
.where( .where(
@ -109,13 +112,13 @@ class RecordingCleanup(threading.Thread):
continuous_expire_date: float, continuous_expire_date: float,
motion_expire_date: float, motion_expire_date: float,
config: CameraConfig, config: CameraConfig,
reviews: ReviewSegment, reviews: list[Any],
) -> set[Path]: ) -> set[Path]:
"""Delete recordings for existing camera based on retention config.""" """Delete recordings for existing camera based on retention config."""
# Get the timestamp for cutoff of retained days # Get the timestamp for cutoff of retained days
# Get recordings to check for expiration # Get recordings to check for expiration
recordings: Recordings = ( recordings = (
Recordings.select( Recordings.select(
Recordings.id, Recordings.id,
Recordings.start_time, Recordings.start_time,
@ -148,13 +151,12 @@ class RecordingCleanup(threading.Thread):
review_start = 0 review_start = 0
deleted_recordings = set() deleted_recordings = set()
kept_recordings: list[tuple[float, float]] = [] kept_recordings: list[tuple[float, float]] = []
recording: Recordings
for recording in recordings: for recording in recordings:
keep = False keep = False
mode = None mode = None
# Now look for a reason to keep this recording segment # Now look for a reason to keep this recording segment
for idx in range(review_start, len(reviews)): for idx in range(review_start, len(reviews)):
review: ReviewSegment = reviews[idx] review = reviews[idx]
severity = review.severity severity = review.severity
pre_capture = config.record.get_review_pre_capture(severity) pre_capture = config.record.get_review_pre_capture(severity)
post_capture = config.record.get_review_post_capture(severity) post_capture = config.record.get_review_post_capture(severity)
@ -214,7 +216,7 @@ class RecordingCleanup(threading.Thread):
Recordings.id << deleted_recordings_list[i : i + max_deletes] Recordings.id << deleted_recordings_list[i : i + max_deletes]
).execute() ).execute()
previews: list[Previews] = ( previews = (
Previews.select( Previews.select(
Previews.id, Previews.id,
Previews.start_time, Previews.start_time,
@ -290,13 +292,13 @@ class RecordingCleanup(threading.Thread):
expire_before = ( expire_before = (
datetime.datetime.now() - datetime.timedelta(days=expire_days) datetime.datetime.now() - datetime.timedelta(days=expire_days)
).timestamp() ).timestamp()
no_camera_recordings: Recordings = ( no_camera_recordings = (
Recordings.select( Recordings.select(
Recordings.id, Recordings.id,
Recordings.path, Recordings.path,
) )
.where( .where(
Recordings.camera.not_in(list(self.config.cameras.keys())), Recordings.camera.not_in(list(self.config.cameras.keys())), # type: ignore[call-arg, arg-type]
Recordings.end_time < expire_before, Recordings.end_time < expire_before,
) )
.namedtuples() .namedtuples()
@ -341,7 +343,7 @@ class RecordingCleanup(threading.Thread):
).timestamp() ).timestamp()
# Get all the reviews to check against # Get all the reviews to check against
reviews: ReviewSegment = ( reviews = (
ReviewSegment.select( ReviewSegment.select(
ReviewSegment.start_time, ReviewSegment.start_time,
ReviewSegment.end_time, ReviewSegment.end_time,

View File

@ -85,7 +85,7 @@ def validate_ffmpeg_args(args: str) -> tuple[bool, str]:
return True, "" return True, ""
def lower_priority(): def lower_priority() -> None:
os.nice(PROCESS_PRIORITY_LOW) os.nice(PROCESS_PRIORITY_LOW)
@ -150,7 +150,7 @@ class RecordingExporter(threading.Thread):
): ):
# has preview mp4 # has preview mp4
try: try:
preview: Previews = ( preview = (
Previews.select( Previews.select(
Previews.camera, Previews.camera,
Previews.path, Previews.path,
@ -231,20 +231,19 @@ class RecordingExporter(threading.Thread):
def get_record_export_command( def get_record_export_command(
self, video_path: str, use_hwaccel: bool = True self, video_path: str, use_hwaccel: bool = True
) -> list[str]: ) -> tuple[list[str], str | list[str]]:
# handle case where internal port is a string with ip:port # handle case where internal port is a string with ip:port
internal_port = self.config.networking.listen.internal internal_port = self.config.networking.listen.internal
if type(internal_port) is str: if type(internal_port) is str:
internal_port = int(internal_port.split(":")[-1]) internal_port = int(internal_port.split(":")[-1])
playlist_lines: list[str] = []
if (self.end_time - self.start_time) <= MAX_PLAYLIST_SECONDS: if (self.end_time - self.start_time) <= MAX_PLAYLIST_SECONDS:
playlist_lines = f"http://127.0.0.1:{internal_port}/vod/{self.camera}/start/{self.start_time}/end/{self.end_time}/index.m3u8" playlist_url = f"http://127.0.0.1:{internal_port}/vod/{self.camera}/start/{self.start_time}/end/{self.end_time}/index.m3u8"
ffmpeg_input = ( ffmpeg_input = (
f"-y -protocol_whitelist pipe,file,http,tcp -i {playlist_lines}" f"-y -protocol_whitelist pipe,file,http,tcp -i {playlist_url}"
) )
else: else:
playlist_lines = []
# get full set of recordings # get full set of recordings
export_recordings = ( export_recordings = (
Recordings.select( Recordings.select(
@ -305,7 +304,7 @@ class RecordingExporter(threading.Thread):
def get_preview_export_command( def get_preview_export_command(
self, video_path: str, use_hwaccel: bool = True self, video_path: str, use_hwaccel: bool = True
) -> list[str]: ) -> tuple[list[str], list[str]]:
playlist_lines = [] playlist_lines = []
codec = "-c copy" codec = "-c copy"
@ -355,7 +354,6 @@ class RecordingExporter(threading.Thread):
.iterator() .iterator()
) )
preview: Previews
for preview in export_previews: for preview in export_previews:
playlist_lines.append(f"file '{preview.path}'") playlist_lines.append(f"file '{preview.path}'")
@ -493,7 +491,7 @@ class RecordingExporter(threading.Thread):
logger.debug(f"Finished exporting {video_path}") logger.debug(f"Finished exporting {video_path}")
def migrate_exports(ffmpeg: FfmpegConfig, camera_names: list[str]): def migrate_exports(ffmpeg: FfmpegConfig, camera_names: list[str]) -> None:
Path(os.path.join(CLIPS_DIR, "export")).mkdir(exist_ok=True) Path(os.path.join(CLIPS_DIR, "export")).mkdir(exist_ok=True)
exports = [] exports = []

View File

@ -266,7 +266,7 @@ class RecordingMaintainer(threading.Thread):
# get all reviews with the end time after the start of the oldest cache file # get all reviews with the end time after the start of the oldest cache file
# or with end_time None # or with end_time None
reviews: ReviewSegment = ( reviews = (
ReviewSegment.select( ReviewSegment.select(
ReviewSegment.start_time, ReviewSegment.start_time,
ReviewSegment.end_time, ReviewSegment.end_time,
@ -301,7 +301,9 @@ class RecordingMaintainer(threading.Thread):
RecordingsDataTypeEnum.saved.value, RecordingsDataTypeEnum.saved.value,
) )
recordings_to_insert: list[Optional[Recordings]] = await asyncio.gather(*tasks) recordings_to_insert: list[Optional[dict[str, Any]]] = await asyncio.gather(
*tasks
)
# fire and forget recordings entries # fire and forget recordings entries
self.requestor.send_data( self.requestor.send_data(
@ -314,8 +316,8 @@ class RecordingMaintainer(threading.Thread):
self.end_time_cache.pop(cache_path, None) self.end_time_cache.pop(cache_path, None)
async def validate_and_move_segment( async def validate_and_move_segment(
self, camera: str, reviews: list[ReviewSegment], recording: dict[str, Any] self, camera: str, reviews: Any, recording: dict[str, Any]
) -> Optional[Recordings]: ) -> Optional[dict[str, Any]]:
cache_path: str = recording["cache_path"] cache_path: str = recording["cache_path"]
start_time: datetime.datetime = recording["start_time"] start_time: datetime.datetime = recording["start_time"]
@ -456,6 +458,8 @@ class RecordingMaintainer(threading.Thread):
if end_time < retain_cutoff: if end_time < retain_cutoff:
self.drop_segment(cache_path) self.drop_segment(cache_path)
return None
def _compute_motion_heatmap( def _compute_motion_heatmap(
self, camera: str, motion_boxes: list[tuple[int, int, int, int]] self, camera: str, motion_boxes: list[tuple[int, int, int, int]]
) -> dict[str, int] | None: ) -> dict[str, int] | None:
@ -481,7 +485,7 @@ class RecordingMaintainer(threading.Thread):
frame_width = camera_config.detect.width frame_width = camera_config.detect.width
frame_height = camera_config.detect.height frame_height = camera_config.detect.height
if frame_width <= 0 or frame_height <= 0: if not frame_width or frame_width <= 0 or not frame_height or frame_height <= 0:
return None return None
GRID_SIZE = 16 GRID_SIZE = 16
@ -575,13 +579,13 @@ class RecordingMaintainer(threading.Thread):
duration: float, duration: float,
cache_path: str, cache_path: str,
store_mode: RetainModeEnum, store_mode: RetainModeEnum,
) -> Optional[Recordings]: ) -> Optional[dict[str, Any]]:
segment_info = self.segment_stats(camera, start_time, end_time) segment_info = self.segment_stats(camera, start_time, end_time)
# check if the segment shouldn't be stored # check if the segment shouldn't be stored
if segment_info.should_discard_segment(store_mode): if segment_info.should_discard_segment(store_mode):
self.drop_segment(cache_path) self.drop_segment(cache_path)
return return None
# directory will be in utc due to start_time being in utc # directory will be in utc due to start_time being in utc
directory = os.path.join( directory = os.path.join(
@ -620,7 +624,8 @@ class RecordingMaintainer(threading.Thread):
if p.returncode != 0: if p.returncode != 0:
logger.error(f"Unable to convert {cache_path} to {file_path}") logger.error(f"Unable to convert {cache_path} to {file_path}")
logger.error((await p.stderr.read()).decode("ascii")) if p.stderr:
logger.error((await p.stderr.read()).decode("ascii"))
return None return None
else: else:
logger.debug( logger.debug(
@ -684,11 +689,16 @@ class RecordingMaintainer(threading.Thread):
stale_frame_count_threshold = 10 stale_frame_count_threshold = 10
# empty the object recordings info queue # empty the object recordings info queue
while True: while True:
(topic, data) = self.detection_subscriber.check_for_update( result = self.detection_subscriber.check_for_update(
timeout=FAST_QUEUE_TIMEOUT timeout=FAST_QUEUE_TIMEOUT
) )
if not topic: if not result:
break
topic, data = result
if not topic or not data:
break break
if topic == DetectionTypeEnum.video.value: if topic == DetectionTypeEnum.video.value: