cancel in-flight ffmpeg when stop is called during preparation

This commit is contained in:
Josh Hawkins 2026-05-02 23:11:26 -05:00
parent 6f43ed41f2
commit aae84546cc
2 changed files with 79 additions and 10 deletions

View File

@ -272,6 +272,9 @@ class DebugReplayManager:
except Exception as exc: except Exception as exc:
logger.exception("Debug replay startup failed") logger.exception("Debug replay startup failed")
with self._lock: with self._lock:
# If stop() already ran while we were preparing, don't overwrite idle state.
if self._state == ReplayState.idle:
return
self._set_state(ReplayState.error, error_message=str(exc)) self._set_state(ReplayState.error, error_message=str(exc))
# Drop session pointers so the next /start is allowed. # Drop session pointers so the next /start is allowed.
self.replay_camera_name = None self.replay_camera_name = None
@ -355,23 +358,33 @@ class DebugReplayManager:
return return
replay_name = self.replay_camera_name replay_name = self.replay_camera_name
was_preparing = self._state == ReplayState.preparing_clip
# Publish remove event so subscribers stop and remove from their config if was_preparing and self._active_process is not None:
if replay_name in frigate_config.cameras: logger.info("Cancelling in-flight replay clip generation")
try:
self._active_process.terminate()
except Exception as exc:
logger.warning("Failed to terminate ffmpeg subprocess: %s", exc)
# Keep a reference so we can join the worker after we've finished cleanup.
worker = self._worker_thread
# Only publish the remove event if the camera was actually published.
if (
not was_preparing
and replay_name is not None
and replay_name in frigate_config.cameras
):
config_publisher.publish_update( config_publisher.publish_update(
CameraConfigUpdateTopic(CameraConfigUpdateEnum.remove, replay_name), CameraConfigUpdateTopic(CameraConfigUpdateEnum.remove, replay_name),
frigate_config.cameras[replay_name], frigate_config.cameras[replay_name],
) )
# Do NOT pop here — let subscribers handle removal from the shared
# config dict when they process the ZMQ message to avoid race conditions
# Defensive DB cleanup if replay_name is not None:
self._cleanup_db(replay_name) self._cleanup_db(replay_name)
self._cleanup_files(replay_name)
# Remove filesystem artifacts
self._cleanup_files(replay_name)
# Reset state
self.replay_camera_name = None self.replay_camera_name = None
self.source_camera = None self.source_camera = None
self.clip_path = None self.clip_path = None
@ -381,6 +394,10 @@ class DebugReplayManager:
logger.info("Debug replay stopped and cleaned up: %s", replay_name) logger.info("Debug replay stopped and cleaned up: %s", replay_name)
# Bounded worker join so the API never hangs.
if worker is not None and worker.is_alive():
worker.join(timeout=2.0)
def _build_camera_config_dict( def _build_camera_config_dict(
self, self,
source_config, source_config,

View File

@ -198,3 +198,55 @@ class TestDebugReplayManagerAsyncStart(unittest.TestCase):
self.assertEqual(self.manager.state, ReplayState.error) self.assertEqual(self.manager.state, ReplayState.error)
self.assertIsNotNone(self.manager.error_message) self.assertIsNotNone(self.manager.error_message)
self.assertIn("ffmpeg", self.manager.error_message.lower()) self.assertIn("ffmpeg", self.manager.error_message.lower())
class TestDebugReplayManagerCancellation(unittest.TestCase):
def test_stop_during_preparing_clip_terminates_ffmpeg(self):
manager = DebugReplayManager()
frigate_config = MagicMock()
frigate_config.cameras = {"front": MagicMock()}
frigate_config.ffmpeg.ffmpeg_path = "/bin/sh"
publisher = MagicMock()
recordings_qs = MagicMock()
recordings_qs.count.return_value = 1
recordings_qs.__iter__.return_value = iter([MagicMock(path="/tmp/r1.mp4")])
terminated_event = threading.Event()
fake_proc = MagicMock()
original_terminate = MagicMock(side_effect=lambda: terminated_event.set())
fake_proc.terminate = original_terminate
def fake_helper(cmd, *, expected_duration_seconds, on_progress, process_started, **kwargs):
if process_started is not None:
process_started(fake_proc)
# Block until stop() calls fake_proc.terminate()
terminated_event.wait(timeout=5)
return -15, "killed"
with (
patch.object(manager, "_query_recordings", return_value=recordings_qs),
patch("frigate.debug_replay.run_ffmpeg_with_progress", side_effect=fake_helper),
patch("os.path.exists", return_value=True),
patch("os.makedirs"),
patch("os.remove"),
patch("builtins.open", unittest.mock.mock_open()),
):
manager.start(
source_camera="front",
start_ts=100.0,
end_ts=200.0,
frigate_config=frigate_config,
config_publisher=publisher,
)
# Wait for the worker to register the active process
for _ in range(50):
if manager._active_process is fake_proc:
break
time.sleep(0.02)
self.assertEqual(manager.state, ReplayState.preparing_clip)
manager.stop(frigate_config=frigate_config, config_publisher=publisher)
self.assertTrue(original_terminate.called, "terminate() should have been called")
self.assertEqual(manager.state, ReplayState.idle)