mirror of
https://github.com/blakeblackshear/frigate.git
synced 2026-05-07 14:05:28 +03:00
extract shared ffmpeg progress helper
This commit is contained in:
parent
8dc68a8abd
commit
5fe58d9aa6
@ -23,13 +23,13 @@ from frigate.const import (
|
|||||||
EXPORT_DIR,
|
EXPORT_DIR,
|
||||||
MAX_PLAYLIST_SECONDS,
|
MAX_PLAYLIST_SECONDS,
|
||||||
PREVIEW_FRAME_TYPE,
|
PREVIEW_FRAME_TYPE,
|
||||||
PROCESS_PRIORITY_LOW,
|
|
||||||
)
|
)
|
||||||
from frigate.ffmpeg_presets import (
|
from frigate.ffmpeg_presets import (
|
||||||
EncodeTypeEnum,
|
EncodeTypeEnum,
|
||||||
parse_preset_hardware_acceleration_encode,
|
parse_preset_hardware_acceleration_encode,
|
||||||
)
|
)
|
||||||
from frigate.models import Export, Previews, Recordings, ReviewSegment
|
from frigate.models import Export, Previews, Recordings, ReviewSegment
|
||||||
|
from frigate.util.ffmpeg import run_ffmpeg_with_progress
|
||||||
from frigate.util.time import is_current_hour
|
from frigate.util.time import is_current_hour
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
@ -243,107 +243,29 @@ class RecordingExporter(threading.Thread):
|
|||||||
|
|
||||||
return total
|
return total
|
||||||
|
|
||||||
def _inject_progress_flags(self, ffmpeg_cmd: list[str]) -> list[str]:
|
|
||||||
"""Insert FFmpeg progress reporting flags before the output path.
|
|
||||||
|
|
||||||
``-progress pipe:2`` writes structured key=value lines to stderr,
|
|
||||||
``-nostats`` suppresses the noisy default stats output.
|
|
||||||
"""
|
|
||||||
if not ffmpeg_cmd:
|
|
||||||
return ffmpeg_cmd
|
|
||||||
return ffmpeg_cmd[:-1] + ["-progress", "pipe:2", "-nostats", ffmpeg_cmd[-1]]
|
|
||||||
|
|
||||||
def _run_ffmpeg_with_progress(
|
def _run_ffmpeg_with_progress(
|
||||||
self,
|
self,
|
||||||
ffmpeg_cmd: list[str],
|
ffmpeg_cmd: list[str],
|
||||||
playlist_lines: str | list[str],
|
playlist_lines: str | list[str],
|
||||||
step: str = "encoding",
|
step: str = "encoding",
|
||||||
) -> tuple[int, str]:
|
) -> tuple[int, str]:
|
||||||
"""Run an FFmpeg export command, parsing progress events from stderr.
|
"""Delegate to the shared helper, mapping percent → (step, percent).
|
||||||
|
|
||||||
Returns ``(returncode, captured_stderr)``. Stdout is left attached to
|
Returns ``(returncode, captured_stderr)``.
|
||||||
the parent process so we don't have to drain it (and risk a deadlock
|
|
||||||
if the buffer fills). Progress percent is computed against the
|
|
||||||
expected output duration; values are clamped to [0, 100] inside
|
|
||||||
:py:meth:`_emit_progress`.
|
|
||||||
"""
|
"""
|
||||||
cmd = ["nice", "-n", str(PROCESS_PRIORITY_LOW)] + self._inject_progress_flags(
|
|
||||||
ffmpeg_cmd
|
|
||||||
)
|
|
||||||
|
|
||||||
if isinstance(playlist_lines, list):
|
if isinstance(playlist_lines, list):
|
||||||
stdin_payload = "\n".join(playlist_lines)
|
stdin_payload = "\n".join(playlist_lines)
|
||||||
else:
|
else:
|
||||||
stdin_payload = playlist_lines
|
stdin_payload = playlist_lines
|
||||||
|
|
||||||
expected_duration = self._expected_output_duration_seconds()
|
return run_ffmpeg_with_progress(
|
||||||
|
ffmpeg_cmd,
|
||||||
self._emit_progress(step, 0.0)
|
expected_duration_seconds=self._expected_output_duration_seconds(),
|
||||||
|
on_progress=lambda percent: self._emit_progress(step, percent),
|
||||||
proc = sp.Popen(
|
stdin_payload=stdin_payload,
|
||||||
cmd,
|
use_low_priority=True,
|
||||||
stdin=sp.PIPE,
|
|
||||||
stderr=sp.PIPE,
|
|
||||||
text=True,
|
|
||||||
encoding="ascii",
|
|
||||||
errors="replace",
|
|
||||||
)
|
)
|
||||||
|
|
||||||
assert proc.stdin is not None
|
|
||||||
assert proc.stderr is not None
|
|
||||||
|
|
||||||
try:
|
|
||||||
proc.stdin.write(stdin_payload)
|
|
||||||
except (BrokenPipeError, OSError):
|
|
||||||
# FFmpeg may have rejected the input early; still wait for it
|
|
||||||
# to terminate so the returncode is meaningful.
|
|
||||||
pass
|
|
||||||
finally:
|
|
||||||
try:
|
|
||||||
proc.stdin.close()
|
|
||||||
except (BrokenPipeError, OSError):
|
|
||||||
pass
|
|
||||||
|
|
||||||
captured: list[str] = []
|
|
||||||
|
|
||||||
try:
|
|
||||||
for raw_line in proc.stderr:
|
|
||||||
captured.append(raw_line)
|
|
||||||
line = raw_line.strip()
|
|
||||||
|
|
||||||
if not line:
|
|
||||||
continue
|
|
||||||
|
|
||||||
if line.startswith("out_time_us="):
|
|
||||||
if expected_duration <= 0:
|
|
||||||
continue
|
|
||||||
try:
|
|
||||||
out_time_us = int(line.split("=", 1)[1])
|
|
||||||
except (ValueError, IndexError):
|
|
||||||
continue
|
|
||||||
if out_time_us < 0:
|
|
||||||
continue
|
|
||||||
out_seconds = out_time_us / 1_000_000.0
|
|
||||||
percent = (out_seconds / expected_duration) * 100.0
|
|
||||||
self._emit_progress(step, percent)
|
|
||||||
elif line == "progress=end":
|
|
||||||
self._emit_progress(step, 100.0)
|
|
||||||
break
|
|
||||||
except Exception:
|
|
||||||
logger.exception("Failed reading FFmpeg progress for %s", self.export_id)
|
|
||||||
|
|
||||||
proc.wait()
|
|
||||||
|
|
||||||
# Drain any remaining stderr so callers can log it on failure.
|
|
||||||
try:
|
|
||||||
remaining = proc.stderr.read()
|
|
||||||
if remaining:
|
|
||||||
captured.append(remaining)
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
return proc.returncode, "".join(captured)
|
|
||||||
|
|
||||||
def get_datetime_from_timestamp(self, timestamp: int) -> str:
|
def get_datetime_from_timestamp(self, timestamp: int) -> str:
|
||||||
# return in iso format using the configured ui.timezone when set,
|
# return in iso format using the configured ui.timezone when set,
|
||||||
# so the auto-generated export name reflects local time rather
|
# so the auto-generated export name reflects local time rather
|
||||||
|
|||||||
@ -14,6 +14,7 @@ from frigate.jobs.export import (
|
|||||||
)
|
)
|
||||||
from frigate.record.export import PlaybackSourceEnum, RecordingExporter
|
from frigate.record.export import PlaybackSourceEnum, RecordingExporter
|
||||||
from frigate.types import JobStatusTypesEnum
|
from frigate.types import JobStatusTypesEnum
|
||||||
|
from frigate.util.ffmpeg import inject_progress_flags
|
||||||
|
|
||||||
|
|
||||||
def _make_exporter(
|
def _make_exporter(
|
||||||
@ -118,10 +119,9 @@ class TestExpectedOutputDuration(unittest.TestCase):
|
|||||||
|
|
||||||
class TestProgressFlagInjection(unittest.TestCase):
|
class TestProgressFlagInjection(unittest.TestCase):
|
||||||
def test_inserts_before_output_path(self) -> None:
|
def test_inserts_before_output_path(self) -> None:
|
||||||
exporter = _make_exporter()
|
|
||||||
cmd = ["ffmpeg", "-i", "input.m3u8", "-c", "copy", "/tmp/output.mp4"]
|
cmd = ["ffmpeg", "-i", "input.m3u8", "-c", "copy", "/tmp/output.mp4"]
|
||||||
|
|
||||||
result = exporter._inject_progress_flags(cmd)
|
result = inject_progress_flags(cmd)
|
||||||
|
|
||||||
assert result == [
|
assert result == [
|
||||||
"ffmpeg",
|
"ffmpeg",
|
||||||
@ -136,8 +136,7 @@ class TestProgressFlagInjection(unittest.TestCase):
|
|||||||
]
|
]
|
||||||
|
|
||||||
def test_handles_empty_cmd(self) -> None:
|
def test_handles_empty_cmd(self) -> None:
|
||||||
exporter = _make_exporter()
|
assert inject_progress_flags([]) == []
|
||||||
assert exporter._inject_progress_flags([]) == []
|
|
||||||
|
|
||||||
|
|
||||||
class TestFfmpegProgressParsing(unittest.TestCase):
|
class TestFfmpegProgressParsing(unittest.TestCase):
|
||||||
@ -167,7 +166,7 @@ class TestFfmpegProgressParsing(unittest.TestCase):
|
|||||||
fake_proc.returncode = 0
|
fake_proc.returncode = 0
|
||||||
fake_proc.wait = MagicMock(return_value=0)
|
fake_proc.wait = MagicMock(return_value=0)
|
||||||
|
|
||||||
with patch("frigate.record.export.sp.Popen", return_value=fake_proc):
|
with patch("frigate.util.ffmpeg.sp.Popen", return_value=fake_proc):
|
||||||
returncode, _stderr = exporter._run_ffmpeg_with_progress(
|
returncode, _stderr = exporter._run_ffmpeg_with_progress(
|
||||||
["ffmpeg", "-i", "x.m3u8", "/tmp/out.mp4"], "playlist", step="encoding"
|
["ffmpeg", "-i", "x.m3u8", "/tmp/out.mp4"], "playlist", step="encoding"
|
||||||
)
|
)
|
||||||
|
|||||||
102
frigate/test/test_ffmpeg_progress.py
Normal file
102
frigate/test/test_ffmpeg_progress.py
Normal file
@ -0,0 +1,102 @@
|
|||||||
|
"""Tests for the shared ffmpeg progress helper."""
|
||||||
|
|
||||||
|
import subprocess as sp
|
||||||
|
import unittest
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
|
from frigate.util.ffmpeg import inject_progress_flags, run_ffmpeg_with_progress
|
||||||
|
|
||||||
|
|
||||||
|
class TestInjectProgressFlags(unittest.TestCase):
|
||||||
|
def test_inserts_flags_before_output_path(self):
|
||||||
|
cmd = ["ffmpeg", "-i", "in.mp4", "-c", "copy", "out.mp4"]
|
||||||
|
result = inject_progress_flags(cmd)
|
||||||
|
self.assertEqual(
|
||||||
|
result,
|
||||||
|
["ffmpeg", "-i", "in.mp4", "-c", "copy", "-progress", "pipe:2", "-nostats", "out.mp4"],
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_empty_cmd_returns_empty(self):
|
||||||
|
self.assertEqual(inject_progress_flags([]), [])
|
||||||
|
|
||||||
|
|
||||||
|
class TestRunFfmpegWithProgress(unittest.TestCase):
|
||||||
|
def _make_fake_proc(self, stderr_lines, returncode=0):
|
||||||
|
proc = MagicMock()
|
||||||
|
proc.stderr = iter(stderr_lines)
|
||||||
|
proc.stdin = MagicMock()
|
||||||
|
proc.returncode = returncode
|
||||||
|
proc.wait = MagicMock()
|
||||||
|
return proc
|
||||||
|
|
||||||
|
def test_emits_percent_from_out_time_us_lines(self):
|
||||||
|
captured: list[float] = []
|
||||||
|
|
||||||
|
def on_progress(percent: float) -> None:
|
||||||
|
captured.append(percent)
|
||||||
|
|
||||||
|
stderr_lines = [
|
||||||
|
"out_time_us=1000000\n",
|
||||||
|
"out_time_us=5000000\n",
|
||||||
|
"progress=end\n",
|
||||||
|
]
|
||||||
|
proc = self._make_fake_proc(stderr_lines)
|
||||||
|
proc.stderr = MagicMock()
|
||||||
|
proc.stderr.__iter__ = lambda self: iter(stderr_lines)
|
||||||
|
proc.stderr.read = MagicMock(return_value="")
|
||||||
|
|
||||||
|
with patch("subprocess.Popen", return_value=proc):
|
||||||
|
returncode, _stderr = run_ffmpeg_with_progress(
|
||||||
|
["ffmpeg", "-i", "in", "out"],
|
||||||
|
expected_duration_seconds=10.0,
|
||||||
|
on_progress=on_progress,
|
||||||
|
use_low_priority=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertEqual(returncode, 0)
|
||||||
|
self.assertEqual(len(captured), 4) # initial 0.0 + two parsed + final 100.0
|
||||||
|
self.assertAlmostEqual(captured[0], 0.0)
|
||||||
|
self.assertAlmostEqual(captured[1], 10.0)
|
||||||
|
self.assertAlmostEqual(captured[2], 50.0)
|
||||||
|
self.assertAlmostEqual(captured[3], 100.0)
|
||||||
|
|
||||||
|
def test_passes_started_process_to_callback(self):
|
||||||
|
proc = self._make_fake_proc([])
|
||||||
|
proc.stderr = MagicMock()
|
||||||
|
proc.stderr.__iter__ = lambda self: iter([])
|
||||||
|
proc.stderr.read = MagicMock(return_value="")
|
||||||
|
|
||||||
|
seen: list = []
|
||||||
|
|
||||||
|
with patch("subprocess.Popen", return_value=proc):
|
||||||
|
run_ffmpeg_with_progress(
|
||||||
|
["ffmpeg", "out"],
|
||||||
|
expected_duration_seconds=1.0,
|
||||||
|
process_started=lambda p: seen.append(p),
|
||||||
|
use_low_priority=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertEqual(seen, [proc])
|
||||||
|
|
||||||
|
def test_clamps_percent_to_0_100(self):
|
||||||
|
captured: list[float] = []
|
||||||
|
|
||||||
|
def on_progress(percent: float) -> None:
|
||||||
|
captured.append(percent)
|
||||||
|
|
||||||
|
stderr_lines = ["out_time_us=999999999999\n"]
|
||||||
|
proc = self._make_fake_proc(stderr_lines)
|
||||||
|
proc.stderr = MagicMock()
|
||||||
|
proc.stderr.__iter__ = lambda self: iter(stderr_lines)
|
||||||
|
proc.stderr.read = MagicMock(return_value="")
|
||||||
|
|
||||||
|
with patch("subprocess.Popen", return_value=proc):
|
||||||
|
run_ffmpeg_with_progress(
|
||||||
|
["ffmpeg", "out"],
|
||||||
|
expected_duration_seconds=10.0,
|
||||||
|
on_progress=on_progress,
|
||||||
|
use_low_priority=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
# initial 0.0 then a clamped reading
|
||||||
|
self.assertEqual(captured[-1], 100.0)
|
||||||
@ -2,8 +2,9 @@
|
|||||||
|
|
||||||
import logging
|
import logging
|
||||||
import subprocess as sp
|
import subprocess as sp
|
||||||
from typing import Any
|
from typing import Any, Callable, Optional
|
||||||
|
|
||||||
|
from frigate.const import PROCESS_PRIORITY_LOW
|
||||||
from frigate.log import LogPipe
|
from frigate.log import LogPipe
|
||||||
|
|
||||||
|
|
||||||
@ -46,3 +47,124 @@ def start_or_restart_ffmpeg(
|
|||||||
start_new_session=True,
|
start_new_session=True,
|
||||||
)
|
)
|
||||||
return process
|
return process
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def inject_progress_flags(cmd: list[str]) -> list[str]:
|
||||||
|
"""Insert ``-progress pipe:2 -nostats`` immediately before the output path.
|
||||||
|
|
||||||
|
``-progress pipe:2`` writes structured key=value lines to stderr;
|
||||||
|
``-nostats`` suppresses the noisy default stats output. The output path
|
||||||
|
is conventionally the last token in an FFmpeg argv.
|
||||||
|
"""
|
||||||
|
if not cmd:
|
||||||
|
return cmd
|
||||||
|
return cmd[:-1] + ["-progress", "pipe:2", "-nostats", cmd[-1]]
|
||||||
|
|
||||||
|
|
||||||
|
def run_ffmpeg_with_progress(
|
||||||
|
cmd: list[str],
|
||||||
|
*,
|
||||||
|
expected_duration_seconds: float,
|
||||||
|
on_progress: Optional[Callable[[float], None]] = None,
|
||||||
|
stdin_payload: Optional[str] = None,
|
||||||
|
process_started: Optional[Callable[[sp.Popen], None]] = None,
|
||||||
|
use_low_priority: bool = True,
|
||||||
|
) -> tuple[int, str]:
|
||||||
|
"""Run an ffmpeg command, streaming progress via ``-progress pipe:2``.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
cmd: ffmpeg argv. Output path must be the last token.
|
||||||
|
expected_duration_seconds: Duration of the expected output clip in
|
||||||
|
seconds. Used to convert ffmpeg's ``out_time_us`` into a percent.
|
||||||
|
on_progress: Optional callback invoked with a percent in [0, 100].
|
||||||
|
Called once with 0.0 at start, again on each ``out_time_us=``
|
||||||
|
stderr line, and once with 100.0 on ``progress=end``.
|
||||||
|
stdin_payload: Optional string written to ffmpeg stdin (used by
|
||||||
|
export for concat playlists).
|
||||||
|
process_started: Optional callback invoked with the live ``Popen``
|
||||||
|
once spawned — lets callers store the ref for cancellation.
|
||||||
|
use_low_priority: When True, prepend ``nice -n PROCESS_PRIORITY_LOW``
|
||||||
|
so concat doesn't starve detection.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Tuple of ``(returncode, captured_stderr)``. Stdout is left attached
|
||||||
|
to the parent process to avoid buffer-full deadlocks.
|
||||||
|
"""
|
||||||
|
full_cmd = inject_progress_flags(cmd)
|
||||||
|
if use_low_priority:
|
||||||
|
full_cmd = ["nice", "-n", str(PROCESS_PRIORITY_LOW)] + full_cmd
|
||||||
|
|
||||||
|
def emit(percent: float) -> None:
|
||||||
|
if on_progress is None:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
on_progress(max(0.0, min(100.0, percent)))
|
||||||
|
except Exception:
|
||||||
|
logger.exception("FFmpeg progress callback failed")
|
||||||
|
|
||||||
|
emit(0.0)
|
||||||
|
|
||||||
|
proc = sp.Popen(
|
||||||
|
full_cmd,
|
||||||
|
stdin=sp.PIPE if stdin_payload is not None else None,
|
||||||
|
stderr=sp.PIPE,
|
||||||
|
text=True,
|
||||||
|
encoding="ascii",
|
||||||
|
errors="replace",
|
||||||
|
)
|
||||||
|
if process_started is not None:
|
||||||
|
try:
|
||||||
|
process_started(proc)
|
||||||
|
except Exception:
|
||||||
|
logger.exception("FFmpeg process_started callback failed")
|
||||||
|
|
||||||
|
if stdin_payload is not None and proc.stdin is not None:
|
||||||
|
try:
|
||||||
|
proc.stdin.write(stdin_payload)
|
||||||
|
except (BrokenPipeError, OSError):
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
try:
|
||||||
|
proc.stdin.close()
|
||||||
|
except (BrokenPipeError, OSError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
captured: list[str] = []
|
||||||
|
if proc.stderr is not None:
|
||||||
|
try:
|
||||||
|
for raw_line in proc.stderr:
|
||||||
|
captured.append(raw_line)
|
||||||
|
line = raw_line.strip()
|
||||||
|
if not line:
|
||||||
|
continue
|
||||||
|
if line.startswith("out_time_us="):
|
||||||
|
if expected_duration_seconds <= 0:
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
out_time_us = int(line.split("=", 1)[1])
|
||||||
|
except (ValueError, IndexError):
|
||||||
|
continue
|
||||||
|
if out_time_us < 0:
|
||||||
|
continue
|
||||||
|
out_seconds = out_time_us / 1_000_000.0
|
||||||
|
emit((out_seconds / expected_duration_seconds) * 100.0)
|
||||||
|
elif line == "progress=end":
|
||||||
|
emit(100.0)
|
||||||
|
break
|
||||||
|
except Exception:
|
||||||
|
logger.exception("Failed reading FFmpeg progress stream")
|
||||||
|
|
||||||
|
proc.wait()
|
||||||
|
|
||||||
|
if proc.stderr is not None:
|
||||||
|
try:
|
||||||
|
remaining = proc.stderr.read()
|
||||||
|
if remaining:
|
||||||
|
captured.append(remaining)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
return proc.returncode or 0, "".join(captured)
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user