From d1b8fa18a99a6ba0c45b97b18338ccd02403dbab Mon Sep 17 00:00:00 2001 From: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com> Date: Mon, 5 May 2025 14:10:41 -0500 Subject: [PATCH] use async/await instead of asyncio.run() --- frigate/app.py | 4 + frigate/ptz/autotrack.py | 60 +++++++-------- frigate/ptz/onvif.py | 153 ++++++++++++++++++++++++++------------- 3 files changed, 138 insertions(+), 79 deletions(-) diff --git a/frigate/app.py b/frigate/app.py index 683ff7ab5..ac3e6d7da 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -699,6 +699,10 @@ class FrigateApp: self.audio_process.terminate() self.audio_process.join() + # stop the onvif controller + if self.onvif_controller: + self.onvif_controller.close() + # ensure the capture processes are done for camera, metrics in self.camera_metrics.items(): capture_process = metrics.capture_process diff --git a/frigate/ptz/autotrack.py b/frigate/ptz/autotrack.py index c6db85a6c..27a8abf1c 100644 --- a/frigate/ptz/autotrack.py +++ b/frigate/ptz/autotrack.py @@ -169,7 +169,7 @@ class PtzAutoTrackerThread(threading.Thread): continue if camera_config.onvif.autotracking.enabled: - self.ptz_autotracker.camera_maintenance(camera) + asyncio.run(self.ptz_autotracker.camera_maintenance(camera)) else: # disabled dynamically by mqtt if self.ptz_autotracker.tracked_object.get(camera): @@ -219,9 +219,9 @@ class PtzAutoTracker: camera_config.onvif.autotracking.enabled and camera_config.onvif.autotracking.enabled_in_config ): - self._autotracker_setup(camera_config, camera) + asyncio.run(self._autotracker_setup(camera_config, camera)) - def _autotracker_setup(self, camera_config: CameraConfig, camera: str): + async def _autotracker_setup(self, camera_config: CameraConfig, camera: str): logger.debug(f"{camera}: Autotracker init") self.object_types[camera] = camera_config.onvif.autotracking.track @@ -255,7 +255,7 @@ class PtzAutoTracker: return if not self.onvif.cams[camera]["init"]: - if not asyncio.run(self.onvif._init_onvif(camera)): + if not await self.onvif._init_onvif(camera): logger.warning( f"Disabling autotracking for {camera}: Unable to initialize onvif" ) @@ -271,7 +271,7 @@ class PtzAutoTracker: self.ptz_metrics[camera].autotracker_enabled.value = False return - move_status_supported = self.onvif.get_service_capabilities(camera) + move_status_supported = await self.onvif.get_service_capabilities(camera) if not ( isinstance(move_status_supported, bool) and move_status_supported @@ -287,7 +287,7 @@ class PtzAutoTracker: return if self.onvif.cams[camera]["init"]: - self.onvif.get_camera_status(camera) + await self.onvif.get_camera_status(camera) # movement thread per camera self.move_threads[camera] = threading.Thread( @@ -349,7 +349,7 @@ class PtzAutoTracker: self.config.cameras[camera].onvif.autotracking.movement_weights, ) - def _calibrate_camera(self, camera): + async def _calibrate_camera(self, camera): # move the camera from the preset in steps and measure the time it takes to move that amount # this will allow us to predict movement times with a simple linear regression # start with 0 so we can determine a baseline (to be used as the intercept in the regression calc) @@ -380,7 +380,7 @@ class PtzAutoTracker: ) while not self.ptz_metrics[camera].motor_stopped.is_set(): - self.onvif.get_camera_status(camera) + await self.onvif.get_camera_status(camera) zoom_out_values.append(self.ptz_metrics[camera].zoom_level.value) @@ -391,7 +391,7 @@ class PtzAutoTracker: ) while not self.ptz_metrics[camera].motor_stopped.is_set(): - self.onvif.get_camera_status(camera) + await self.onvif.get_camera_status(camera) zoom_in_values.append(self.ptz_metrics[camera].zoom_level.value) @@ -400,7 +400,7 @@ class PtzAutoTracker: == ZoomingModeEnum.relative ): # relative move to -0.01 - self.onvif._move_relative( + await self.onvif._move_relative( camera, 0, 0, @@ -409,13 +409,13 @@ class PtzAutoTracker: ) while not self.ptz_metrics[camera].motor_stopped.is_set(): - self.onvif.get_camera_status(camera) + await self.onvif.get_camera_status(camera) zoom_out_values.append(self.ptz_metrics[camera].zoom_level.value) zoom_start_time = time.time() # relative move to 0.01 - self.onvif._move_relative( + await self.onvif._move_relative( camera, 0, 0, @@ -424,13 +424,13 @@ class PtzAutoTracker: ) while not self.ptz_metrics[camera].motor_stopped.is_set(): - self.onvif.get_camera_status(camera) + await self.onvif.get_camera_status(camera) zoom_stop_time = time.time() full_relative_start_time = time.time() - self.onvif._move_relative( + await self.onvif._move_relative( camera, -1, -1, @@ -439,11 +439,11 @@ class PtzAutoTracker: ) while not self.ptz_metrics[camera].motor_stopped.is_set(): - self.onvif.get_camera_status(camera) + await self.onvif.get_camera_status(camera) full_relative_stop_time = time.time() - self.onvif._move_relative( + await self.onvif._move_relative( camera, 1, 1, @@ -452,7 +452,7 @@ class PtzAutoTracker: ) while not self.ptz_metrics[camera].motor_stopped.is_set(): - self.onvif.get_camera_status(camera) + await self.onvif.get_camera_status(camera) self.zoom_time[camera] = ( full_relative_stop_time - full_relative_start_time @@ -480,18 +480,18 @@ class PtzAutoTracker: # Wait until the camera finishes moving while not self.ptz_metrics[camera].motor_stopped.is_set(): - self.onvif.get_camera_status(camera) + await self.onvif.get_camera_status(camera) for step in range(num_steps): pan = step_sizes[step] tilt = step_sizes[step] start_time = time.time() - self.onvif._move_relative(camera, pan, tilt, 0, 1) + await self.onvif._move_relative(camera, pan, tilt, 0, 1) # Wait until the camera finishes moving while not self.ptz_metrics[camera].motor_stopped.is_set(): - self.onvif.get_camera_status(camera) + await self.onvif.get_camera_status(camera) stop_time = time.time() self.move_metrics[camera].append( @@ -503,7 +503,7 @@ class PtzAutoTracker: } ) - self.onvif._move_to_preset( + await self.onvif._move_to_preset( camera, self.config.cameras[camera].onvif.autotracking.return_preset.lower(), ) @@ -512,7 +512,7 @@ class PtzAutoTracker: # Wait until the camera finishes moving while not self.ptz_metrics[camera].motor_stopped.is_set(): - self.onvif.get_camera_status(camera) + await self.onvif.get_camera_status(camera) logger.info( f"Calibration for {camera} in progress: {round((step / num_steps) * 100)}% complete" @@ -741,7 +741,9 @@ class PtzAutoTracker: self.config.cameras[camera].onvif.autotracking.zooming == ZoomingModeEnum.relative ): - self.onvif._move_relative(camera, pan, tilt, zoom, 1) + asyncio.run( + self.onvif._move_relative(camera, pan, tilt, zoom, 1) + ) else: if pan != 0 or tilt != 0: @@ -749,7 +751,7 @@ class PtzAutoTracker: # Wait until the camera finishes moving while not self.ptz_metrics[camera].motor_stopped.is_set(): - self.onvif.get_camera_status(camera) + asyncio.run(self.onvif.get_camera_status(camera)) if ( zoom > 0 @@ -759,7 +761,7 @@ class PtzAutoTracker: # Wait until the camera finishes moving while not self.ptz_metrics[camera].motor_stopped.is_set(): - self.onvif.get_camera_status(camera) + asyncio.run(self.onvif.get_camera_status(camera)) if self.config.cameras[camera].onvif.autotracking.movement_weights: logger.debug( @@ -1420,7 +1422,7 @@ class PtzAutoTracker: ** (1 / self.zoom_factor[camera]) } - def camera_maintenance(self, camera): + async def camera_maintenance(self, camera): # bail and don't check anything if we're calibrating or tracking an object if ( not self.autotracker_init[camera] @@ -1437,7 +1439,7 @@ class PtzAutoTracker: self._autotracker_setup(self.config.cameras[camera], camera) # regularly update camera status if not self.ptz_metrics[camera].motor_stopped.is_set(): - self.onvif.get_camera_status(camera) + await self.onvif.get_camera_status(camera) # return to preset if tracking is over if ( @@ -1463,14 +1465,14 @@ class PtzAutoTracker: logger.debug( f"{camera}: Time is {self.ptz_metrics[camera].frame_time.value}, returning to preset: {autotracker_config.return_preset}" ) - self.onvif._move_to_preset( + await self.onvif._move_to_preset( camera, autotracker_config.return_preset.lower(), ) # update stored zoom level from preset if not self.ptz_metrics[camera].motor_stopped.is_set(): - self.onvif.get_camera_status(camera) + await self.onvif.get_camera_status(camera) self.ptz_metrics[camera].tracking_active.clear() self.dispatcher.publish( diff --git a/frigate/ptz/onvif.py b/frigate/ptz/onvif.py index 8b133469c..676384e36 100644 --- a/frigate/ptz/onvif.py +++ b/frigate/ptz/onvif.py @@ -2,6 +2,7 @@ import asyncio import logging +import threading import time from enum import Enum from importlib.util import find_spec @@ -39,23 +40,34 @@ class OnvifController: def __init__( self, config: FrigateConfig, ptz_metrics: dict[str, PTZMetrics] ) -> None: - self.cams: dict[str, ONVIFCamera] = {} + self.cams: dict[str, dict] = {} self.failed_cams: dict[str, dict] = {} self.max_retries = 5 self.reset_timeout = 900 # 15 minutes - self.config = config self.ptz_metrics = ptz_metrics + # Create a dedicated event loop and run it in a separate thread + self.loop = asyncio.new_event_loop() + self.loop_thread = threading.Thread(target=self._run_event_loop, daemon=True) + self.loop_thread.start() + for cam_name, cam in config.cameras.items(): if not cam.enabled: continue - if cam.onvif.host: result = self._create_onvif_camera(cam_name, cam) if result: self.cams[cam_name] = result + def _run_event_loop(self) -> None: + """Run the event loop in a separate thread.""" + asyncio.set_event_loop(self.loop) + try: + self.loop.run_forever() + except Exception as e: + logger.error(f"Onvif event loop terminated unexpectedly: {e}") + def _create_onvif_camera(self, cam_name: str, cam) -> dict | None: """Create an ONVIF camera instance and handle failures.""" try: @@ -344,25 +356,23 @@ class OnvifController: self.cams[camera_name]["init"] = True return True - def _stop(self, camera_name: str) -> None: + async def _stop(self, camera_name: str) -> None: move_request = self.cams[camera_name]["move_request"] - asyncio.run( - self.cams[camera_name]["ptz"].Stop( - { - "ProfileToken": move_request.ProfileToken, - "PanTilt": True, - "Zoom": True, - } - ) + await self.cams[camera_name]["ptz"].Stop( + { + "ProfileToken": move_request.ProfileToken, + "PanTilt": True, + "Zoom": True, + } ) self.cams[camera_name]["active"] = False - def _move(self, camera_name: str, command: OnvifCommandEnum) -> None: + async def _move(self, camera_name: str, command: OnvifCommandEnum) -> None: if self.cams[camera_name]["active"]: logger.warning( f"{camera_name} is already performing an action, stopping..." ) - self._stop(camera_name) + await self._stop(camera_name) if "pt" not in self.cams[camera_name]["features"]: logger.error(f"{camera_name} does not support ONVIF pan/tilt movement.") @@ -391,11 +401,11 @@ class OnvifController: } try: - asyncio.run(self.cams[camera_name]["ptz"].ContinuousMove(move_request)) + await self.cams[camera_name]["ptz"].ContinuousMove(move_request) except (Fault, ONVIFError, TransportError, Exception) as e: logger.warning(f"Onvif sending move request to {camera_name} failed: {e}") - def _move_relative(self, camera_name: str, pan, tilt, zoom, speed) -> None: + async def _move_relative(self, camera_name: str, pan, tilt, zoom, speed) -> None: if "pt-r-fov" not in self.cams[camera_name]["features"]: logger.error(f"{camera_name} does not support ONVIF RelativeMove (FOV).") return @@ -464,7 +474,7 @@ class OnvifController: } move_request.Translation.Zoom.x = zoom - asyncio.run(self.cams[camera_name]["ptz"].RelativeMove(move_request)) + await self.cams[camera_name]["ptz"].RelativeMove(move_request) # reset after the move request move_request.Translation.PanTilt.x = 0 @@ -479,7 +489,7 @@ class OnvifController: self.cams[camera_name]["active"] = False - def _move_to_preset(self, camera_name: str, preset: str) -> None: + async def _move_to_preset(self, camera_name: str, preset: str) -> None: if preset not in self.cams[camera_name]["presets"]: logger.error(f"{preset} is not a valid preset for {camera_name}") return @@ -489,23 +499,22 @@ class OnvifController: self.ptz_metrics[camera_name].stop_time.value = 0 move_request = self.cams[camera_name]["move_request"] preset_token = self.cams[camera_name]["presets"][preset] - asyncio.run( - self.cams[camera_name]["ptz"].GotoPreset( - { - "ProfileToken": move_request.ProfileToken, - "PresetToken": preset_token, - } - ) + + await self.cams[camera_name]["ptz"].GotoPreset( + { + "ProfileToken": move_request.ProfileToken, + "PresetToken": preset_token, + } ) self.cams[camera_name]["active"] = False - def _zoom(self, camera_name: str, command: OnvifCommandEnum) -> None: + async def _zoom(self, camera_name: str, command: OnvifCommandEnum) -> None: if self.cams[camera_name]["active"]: logger.warning( f"{camera_name} is already performing an action, stopping..." ) - self._stop(camera_name) + await self._stop(camera_name) if "zoom" not in self.cams[camera_name]["features"]: logger.error(f"{camera_name} does not support ONVIF zooming.") @@ -519,9 +528,9 @@ class OnvifController: elif command == OnvifCommandEnum.zoom_out: move_request.Velocity = {"Zoom": {"x": -0.5}} - asyncio.run(self.cams[camera_name]["ptz"].ContinuousMove(move_request)) + await self.cams[camera_name]["ptz"].ContinuousMove(move_request) - def _zoom_absolute(self, camera_name: str, zoom, speed) -> None: + async def _zoom_absolute(self, camera_name: str, zoom, speed) -> None: if "zoom-a" not in self.cams[camera_name]["features"]: logger.error(f"{camera_name} does not support ONVIF AbsoluteMove zooming.") return @@ -560,19 +569,21 @@ class OnvifController: logger.debug(f"{camera_name}: Absolute zoom: {zoom}") - asyncio.run(self.cams[camera_name]["ptz"].AbsoluteMove(move_request)) + await self.cams[camera_name]["ptz"].AbsoluteMove(move_request) self.cams[camera_name]["active"] = False - def handle_command( + async def handle_command_async( self, camera_name: str, command: OnvifCommandEnum, param: str = "" ) -> None: + """Handle ONVIF commands asynchronously""" + # logger.debug(f"handling async: {camera_name}, {command}, {param}") if camera_name not in self.cams.keys(): logger.error(f"ONVIF is not configured for {camera_name}") return if not self.cams[camera_name]["init"]: - if not asyncio.run(self._init_onvif(camera_name)): + if not await self._init_onvif(camera_name): return try: @@ -580,22 +591,48 @@ class OnvifController: # already init return elif command == OnvifCommandEnum.stop: - self._stop(camera_name) + await self._stop(camera_name) elif command == OnvifCommandEnum.preset: - self._move_to_preset(camera_name, param) + await self._move_to_preset(camera_name, param) elif command == OnvifCommandEnum.move_relative: _, pan, tilt = param.split("_") - self._move_relative(camera_name, float(pan), float(tilt), 0, 1) + await self._move_relative(camera_name, float(pan), float(tilt), 0, 1) elif ( command == OnvifCommandEnum.zoom_in or command == OnvifCommandEnum.zoom_out ): - self._zoom(camera_name, command) + await self._zoom(camera_name, command) else: - self._move(camera_name, command) + await self._move(camera_name, command) except (Fault, ONVIFError, TransportError, Exception) as e: logger.error(f"Unable to handle onvif command: {e}") + def handle_command( + self, camera_name: str, command: OnvifCommandEnum, param: str = "" + ) -> None: + """ + Handle ONVIF commands by scheduling them in the event loop. + This is the synchronous interface that schedules async work. + """ + # Run the async command in the event loop + # logger.debug( + # f"Scheduling handle_command_async for {camera_name} with command {command}. {self.loop}" + # ) + future = asyncio.run_coroutine_threadsafe( + self.handle_command_async(camera_name, command, param), self.loop + ) + # logger.debug(f"Scheduled handle_command_async for {camera_name}") + + try: + # Wait with a timeout to prevent blocking indefinitely + future.result(timeout=10) + except asyncio.TimeoutError: + logger.error(f"Command {command} timed out for camera {camera_name}") + except Exception as e: + logger.error( + f"Error executing command {command} for camera {camera_name}: {e}" + ) + async def get_camera_info(self, camera_name: str) -> dict[str, any]: """ Get ptz capabilities and presets, attempting to reconnect if ONVIF is configured @@ -681,23 +718,21 @@ class OnvifController: logger.debug(f"Could not initialize ONVIF for {camera_name}") return {} - def get_service_capabilities(self, camera_name: str) -> None: + async def get_service_capabilities(self, camera_name: str) -> None: if camera_name not in self.cams.keys(): logger.error(f"ONVIF is not configured for {camera_name}") return {} if not self.cams[camera_name]["init"]: - asyncio.run(self._init_onvif(camera_name)) + await self._init_onvif(camera_name) service_capabilities_request = self.cams[camera_name][ "service_capabilities_request" ] try: - service_capabilities = asyncio.run( - self.cams[camera_name]["ptz"].GetServiceCapabilities( - service_capabilities_request - ) - ) + service_capabilities = await self.cams[camera_name][ + "ptz" + ].GetServiceCapabilities(service_capabilities_request) logger.debug( f"Onvif service capabilities for {camera_name}: {service_capabilities}" @@ -711,19 +746,18 @@ class OnvifController: ) return False - def get_camera_status(self, camera_name: str) -> None: + async def get_camera_status(self, camera_name: str) -> None: if camera_name not in self.cams.keys(): logger.error(f"ONVIF is not configured for {camera_name}") - return {} + return if not self.cams[camera_name]["init"]: - asyncio.run(self._init_onvif(camera_name)) + if not await self._init_onvif(camera_name): + return status_request = self.cams[camera_name]["status_request"] try: - status = asyncio.run( - self.cams[camera_name]["ptz"].GetStatus(status_request) - ) + status = await self.cams[camera_name]["ptz"].GetStatus(status_request) except Exception: pass # We're unsupported, that'll be reported in the next check. @@ -807,3 +841,22 @@ class OnvifController: camera_name ].frame_time.value logger.warning(f"Camera {camera_name} is still in ONVIF 'MOVING' status.") + + def close(self) -> None: + """Gracefully shut down the ONVIF controller.""" + if not hasattr(self, "loop") or self.loop.is_closed(): + logger.debug("ONVIF controller already closed") + return + + logger.info("Exiting ONVIF controller...") + + def stop_and_cleanup(): + try: + self.loop.stop() + except Exception as e: + logger.error(f"Error during loop cleanup: {e}") + + # Schedule stop and cleanup in the loop thread + self.loop.call_soon_threadsafe(stop_and_cleanup) + + self.loop_thread.join()