diff --git a/frigate/record/export.py b/frigate/record/export.py index 32df8be75..e1dfff683 100644 --- a/frigate/record/export.py +++ b/frigate/record/export.py @@ -23,13 +23,13 @@ from frigate.const import ( EXPORT_DIR, MAX_PLAYLIST_SECONDS, PREVIEW_FRAME_TYPE, - PROCESS_PRIORITY_LOW, ) from frigate.ffmpeg_presets import ( EncodeTypeEnum, parse_preset_hardware_acceleration_encode, ) from frigate.models import Export, Previews, Recordings, ReviewSegment +from frigate.util.ffmpeg import run_ffmpeg_with_progress from frigate.util.time import is_current_hour logger = logging.getLogger(__name__) @@ -243,107 +243,29 @@ class RecordingExporter(threading.Thread): return total - def _inject_progress_flags(self, ffmpeg_cmd: list[str]) -> list[str]: - """Insert FFmpeg progress reporting flags before the output path. - - ``-progress pipe:2`` writes structured key=value lines to stderr, - ``-nostats`` suppresses the noisy default stats output. - """ - if not ffmpeg_cmd: - return ffmpeg_cmd - return ffmpeg_cmd[:-1] + ["-progress", "pipe:2", "-nostats", ffmpeg_cmd[-1]] - def _run_ffmpeg_with_progress( self, ffmpeg_cmd: list[str], playlist_lines: str | list[str], step: str = "encoding", ) -> tuple[int, str]: - """Run an FFmpeg export command, parsing progress events from stderr. + """Delegate to the shared helper, mapping percent → (step, percent). - Returns ``(returncode, captured_stderr)``. Stdout is left attached to - the parent process so we don't have to drain it (and risk a deadlock - if the buffer fills). Progress percent is computed against the - expected output duration; values are clamped to [0, 100] inside - :py:meth:`_emit_progress`. + Returns ``(returncode, captured_stderr)``. """ - cmd = ["nice", "-n", str(PROCESS_PRIORITY_LOW)] + self._inject_progress_flags( - ffmpeg_cmd - ) - if isinstance(playlist_lines, list): stdin_payload = "\n".join(playlist_lines) else: stdin_payload = playlist_lines - expected_duration = self._expected_output_duration_seconds() - - self._emit_progress(step, 0.0) - - proc = sp.Popen( - cmd, - stdin=sp.PIPE, - stderr=sp.PIPE, - text=True, - encoding="ascii", - errors="replace", + return run_ffmpeg_with_progress( + ffmpeg_cmd, + expected_duration_seconds=self._expected_output_duration_seconds(), + on_progress=lambda percent: self._emit_progress(step, percent), + stdin_payload=stdin_payload, + use_low_priority=True, ) - assert proc.stdin is not None - assert proc.stderr is not None - - try: - proc.stdin.write(stdin_payload) - except (BrokenPipeError, OSError): - # FFmpeg may have rejected the input early; still wait for it - # to terminate so the returncode is meaningful. - pass - finally: - try: - proc.stdin.close() - except (BrokenPipeError, OSError): - pass - - captured: list[str] = [] - - try: - for raw_line in proc.stderr: - captured.append(raw_line) - line = raw_line.strip() - - if not line: - continue - - if line.startswith("out_time_us="): - if expected_duration <= 0: - continue - try: - out_time_us = int(line.split("=", 1)[1]) - except (ValueError, IndexError): - continue - if out_time_us < 0: - continue - out_seconds = out_time_us / 1_000_000.0 - percent = (out_seconds / expected_duration) * 100.0 - self._emit_progress(step, percent) - elif line == "progress=end": - self._emit_progress(step, 100.0) - break - except Exception: - logger.exception("Failed reading FFmpeg progress for %s", self.export_id) - - proc.wait() - - # Drain any remaining stderr so callers can log it on failure. - try: - remaining = proc.stderr.read() - if remaining: - captured.append(remaining) - except Exception: - pass - - return proc.returncode, "".join(captured) - def get_datetime_from_timestamp(self, timestamp: int) -> str: # return in iso format using the configured ui.timezone when set, # so the auto-generated export name reflects local time rather diff --git a/frigate/test/test_export_progress.py b/frigate/test/test_export_progress.py index 903914815..dc6c16f03 100644 --- a/frigate/test/test_export_progress.py +++ b/frigate/test/test_export_progress.py @@ -14,6 +14,7 @@ from frigate.jobs.export import ( ) from frigate.record.export import PlaybackSourceEnum, RecordingExporter from frigate.types import JobStatusTypesEnum +from frigate.util.ffmpeg import inject_progress_flags def _make_exporter( @@ -118,10 +119,9 @@ class TestExpectedOutputDuration(unittest.TestCase): class TestProgressFlagInjection(unittest.TestCase): def test_inserts_before_output_path(self) -> None: - exporter = _make_exporter() cmd = ["ffmpeg", "-i", "input.m3u8", "-c", "copy", "/tmp/output.mp4"] - result = exporter._inject_progress_flags(cmd) + result = inject_progress_flags(cmd) assert result == [ "ffmpeg", @@ -136,8 +136,7 @@ class TestProgressFlagInjection(unittest.TestCase): ] def test_handles_empty_cmd(self) -> None: - exporter = _make_exporter() - assert exporter._inject_progress_flags([]) == [] + assert inject_progress_flags([]) == [] class TestFfmpegProgressParsing(unittest.TestCase): @@ -167,7 +166,7 @@ class TestFfmpegProgressParsing(unittest.TestCase): fake_proc.returncode = 0 fake_proc.wait = MagicMock(return_value=0) - with patch("frigate.record.export.sp.Popen", return_value=fake_proc): + with patch("frigate.util.ffmpeg.sp.Popen", return_value=fake_proc): returncode, _stderr = exporter._run_ffmpeg_with_progress( ["ffmpeg", "-i", "x.m3u8", "/tmp/out.mp4"], "playlist", step="encoding" ) diff --git a/frigate/test/test_ffmpeg_progress.py b/frigate/test/test_ffmpeg_progress.py new file mode 100644 index 000000000..ae9c4b94f --- /dev/null +++ b/frigate/test/test_ffmpeg_progress.py @@ -0,0 +1,102 @@ +"""Tests for the shared ffmpeg progress helper.""" + +import subprocess as sp +import unittest +from unittest.mock import MagicMock, patch + +from frigate.util.ffmpeg import inject_progress_flags, run_ffmpeg_with_progress + + +class TestInjectProgressFlags(unittest.TestCase): + def test_inserts_flags_before_output_path(self): + cmd = ["ffmpeg", "-i", "in.mp4", "-c", "copy", "out.mp4"] + result = inject_progress_flags(cmd) + self.assertEqual( + result, + ["ffmpeg", "-i", "in.mp4", "-c", "copy", "-progress", "pipe:2", "-nostats", "out.mp4"], + ) + + def test_empty_cmd_returns_empty(self): + self.assertEqual(inject_progress_flags([]), []) + + +class TestRunFfmpegWithProgress(unittest.TestCase): + def _make_fake_proc(self, stderr_lines, returncode=0): + proc = MagicMock() + proc.stderr = iter(stderr_lines) + proc.stdin = MagicMock() + proc.returncode = returncode + proc.wait = MagicMock() + return proc + + def test_emits_percent_from_out_time_us_lines(self): + captured: list[float] = [] + + def on_progress(percent: float) -> None: + captured.append(percent) + + stderr_lines = [ + "out_time_us=1000000\n", + "out_time_us=5000000\n", + "progress=end\n", + ] + proc = self._make_fake_proc(stderr_lines) + proc.stderr = MagicMock() + proc.stderr.__iter__ = lambda self: iter(stderr_lines) + proc.stderr.read = MagicMock(return_value="") + + with patch("subprocess.Popen", return_value=proc): + returncode, _stderr = run_ffmpeg_with_progress( + ["ffmpeg", "-i", "in", "out"], + expected_duration_seconds=10.0, + on_progress=on_progress, + use_low_priority=False, + ) + + self.assertEqual(returncode, 0) + self.assertEqual(len(captured), 4) # initial 0.0 + two parsed + final 100.0 + self.assertAlmostEqual(captured[0], 0.0) + self.assertAlmostEqual(captured[1], 10.0) + self.assertAlmostEqual(captured[2], 50.0) + self.assertAlmostEqual(captured[3], 100.0) + + def test_passes_started_process_to_callback(self): + proc = self._make_fake_proc([]) + proc.stderr = MagicMock() + proc.stderr.__iter__ = lambda self: iter([]) + proc.stderr.read = MagicMock(return_value="") + + seen: list = [] + + with patch("subprocess.Popen", return_value=proc): + run_ffmpeg_with_progress( + ["ffmpeg", "out"], + expected_duration_seconds=1.0, + process_started=lambda p: seen.append(p), + use_low_priority=False, + ) + + self.assertEqual(seen, [proc]) + + def test_clamps_percent_to_0_100(self): + captured: list[float] = [] + + def on_progress(percent: float) -> None: + captured.append(percent) + + stderr_lines = ["out_time_us=999999999999\n"] + proc = self._make_fake_proc(stderr_lines) + proc.stderr = MagicMock() + proc.stderr.__iter__ = lambda self: iter(stderr_lines) + proc.stderr.read = MagicMock(return_value="") + + with patch("subprocess.Popen", return_value=proc): + run_ffmpeg_with_progress( + ["ffmpeg", "out"], + expected_duration_seconds=10.0, + on_progress=on_progress, + use_low_priority=False, + ) + + # initial 0.0 then a clamped reading + self.assertEqual(captured[-1], 100.0) diff --git a/frigate/util/ffmpeg.py b/frigate/util/ffmpeg.py index 9abacd4ed..b5c51d779 100644 --- a/frigate/util/ffmpeg.py +++ b/frigate/util/ffmpeg.py @@ -2,8 +2,9 @@ import logging import subprocess as sp -from typing import Any +from typing import Any, Callable, Optional +from frigate.const import PROCESS_PRIORITY_LOW from frigate.log import LogPipe @@ -46,3 +47,124 @@ def start_or_restart_ffmpeg( start_new_session=True, ) return process + + +logger = logging.getLogger(__name__) + + +def inject_progress_flags(cmd: list[str]) -> list[str]: + """Insert ``-progress pipe:2 -nostats`` immediately before the output path. + + ``-progress pipe:2`` writes structured key=value lines to stderr; + ``-nostats`` suppresses the noisy default stats output. The output path + is conventionally the last token in an FFmpeg argv. + """ + if not cmd: + return cmd + return cmd[:-1] + ["-progress", "pipe:2", "-nostats", cmd[-1]] + + +def run_ffmpeg_with_progress( + cmd: list[str], + *, + expected_duration_seconds: float, + on_progress: Optional[Callable[[float], None]] = None, + stdin_payload: Optional[str] = None, + process_started: Optional[Callable[[sp.Popen], None]] = None, + use_low_priority: bool = True, +) -> tuple[int, str]: + """Run an ffmpeg command, streaming progress via ``-progress pipe:2``. + + Args: + cmd: ffmpeg argv. Output path must be the last token. + expected_duration_seconds: Duration of the expected output clip in + seconds. Used to convert ffmpeg's ``out_time_us`` into a percent. + on_progress: Optional callback invoked with a percent in [0, 100]. + Called once with 0.0 at start, again on each ``out_time_us=`` + stderr line, and once with 100.0 on ``progress=end``. + stdin_payload: Optional string written to ffmpeg stdin (used by + export for concat playlists). + process_started: Optional callback invoked with the live ``Popen`` + once spawned — lets callers store the ref for cancellation. + use_low_priority: When True, prepend ``nice -n PROCESS_PRIORITY_LOW`` + so concat doesn't starve detection. + + Returns: + Tuple of ``(returncode, captured_stderr)``. Stdout is left attached + to the parent process to avoid buffer-full deadlocks. + """ + full_cmd = inject_progress_flags(cmd) + if use_low_priority: + full_cmd = ["nice", "-n", str(PROCESS_PRIORITY_LOW)] + full_cmd + + def emit(percent: float) -> None: + if on_progress is None: + return + try: + on_progress(max(0.0, min(100.0, percent))) + except Exception: + logger.exception("FFmpeg progress callback failed") + + emit(0.0) + + proc = sp.Popen( + full_cmd, + stdin=sp.PIPE if stdin_payload is not None else None, + stderr=sp.PIPE, + text=True, + encoding="ascii", + errors="replace", + ) + if process_started is not None: + try: + process_started(proc) + except Exception: + logger.exception("FFmpeg process_started callback failed") + + if stdin_payload is not None and proc.stdin is not None: + try: + proc.stdin.write(stdin_payload) + except (BrokenPipeError, OSError): + pass + finally: + try: + proc.stdin.close() + except (BrokenPipeError, OSError): + pass + + captured: list[str] = [] + if proc.stderr is not None: + try: + for raw_line in proc.stderr: + captured.append(raw_line) + line = raw_line.strip() + if not line: + continue + if line.startswith("out_time_us="): + if expected_duration_seconds <= 0: + continue + try: + out_time_us = int(line.split("=", 1)[1]) + except (ValueError, IndexError): + continue + if out_time_us < 0: + continue + out_seconds = out_time_us / 1_000_000.0 + emit((out_seconds / expected_duration_seconds) * 100.0) + elif line == "progress=end": + emit(100.0) + break + except Exception: + logger.exception("Failed reading FFmpeg progress stream") + + proc.wait() + + if proc.stderr is not None: + try: + remaining = proc.stderr.read() + if remaining: + captured.append(remaining) + except Exception: + pass + + return proc.returncode or 0, "".join(captured)