diff --git a/frigate/app.py b/frigate/app.py index d8f3d8097..05e91324a 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -311,6 +311,7 @@ class FrigateApp: }, detector_config, ) + self.detectors[name].start() def start_ptz_autotracker(self) -> None: self.ptz_autotracker_thread = PtzAutoTrackerThread( @@ -572,7 +573,8 @@ class FrigateApp: # ensure the detectors are done for detector in self.detectors.values(): - detector.stop() + detector.terminate() + detector.join() empty_and_close_queue(self.detection_queue) logger.info("Detection queue closed") diff --git a/frigate/object_detection.py b/frigate/object_detection.py index 0e51bac64..20f19f5d5 100644 --- a/frigate/object_detection.py +++ b/frigate/object_detection.py @@ -1,22 +1,17 @@ import datetime import logging import multiprocessing as mp -import os import queue -import signal -import threading from abc import ABC, abstractmethod from multiprocessing.synchronize import Event import numpy as np -from setproctitle import setproctitle from frigate import util from frigate.detectors import create_detector from frigate.detectors.detector_config import InputTensorEnum from frigate.util.builtin import EventsPerSecond, load_labels from frigate.util.image import SharedMemoryFrameManager -from frigate.util.services import listen logger = logging.getLogger(__name__) @@ -77,116 +72,7 @@ class LocalObjectDetector(ObjectDetector): return self.detect_api.detect_raw(tensor_input=tensor_input) -def run_detector( - name: str, - detection_queue: mp.Queue, - out_events: dict[str, Event], - avg_speed, - start, - detector_config, -): - threading.current_thread().name = f"detector:{name}" - logger = logging.getLogger(f"detector.{name}") - logger.info(f"Starting detection process: {os.getpid()}") - setproctitle(f"frigate.detector.{name}") - listen() - - stop_event = mp.Event() - - def receiveSignal(signalNumber, frame): - stop_event.set() - - signal.signal(signal.SIGTERM, receiveSignal) - signal.signal(signal.SIGINT, receiveSignal) - - frame_manager = SharedMemoryFrameManager() - object_detector = LocalObjectDetector(detector_config=detector_config) - - outputs = {} - for name in out_events.keys(): - out_shm = mp.shared_memory.SharedMemory(name=f"out-{name}", create=False) - out_np = np.ndarray((20, 6), dtype=np.float32, buffer=out_shm.buf) - outputs[name] = {"shm": out_shm, "np": out_np} - - while not stop_event.is_set(): - try: - connection_id = detection_queue.get(timeout=1) - except queue.Empty: - continue - input_frame = frame_manager.get( - connection_id, - (1, detector_config.model.height, detector_config.model.width, 3), - ) - - if input_frame is None: - logger.warning(f"Failed to get frame {connection_id} from SHM") - continue - - # detect and send the output - start.value = datetime.datetime.now().timestamp() - detections = object_detector.detect_raw(input_frame) - duration = datetime.datetime.now().timestamp() - start.value - frame_manager.close(connection_id) - outputs[connection_id]["np"][:] = detections[:] - out_events[connection_id].set() - start.value = 0.0 - - avg_speed.value = (avg_speed.value * 9 + duration) / 10 - - logger.info("Exited detection process...") - - -class ObjectDetectProcess: - def __init__( - self, - name, - detection_queue, - out_events, - detector_config, - ): - self.name = name - self.out_events = out_events - self.detection_queue = detection_queue - self.avg_inference_speed = mp.Value("d", 0.01) - self.detection_start = mp.Value("d", 0.0) - self.detect_process = None - self.detector_config = detector_config - self.start_or_restart() - - def stop(self): - # if the process has already exited on its own, just return - if self.detect_process and self.detect_process.exitcode: - return - self.detect_process.terminate() - logging.info("Waiting for detection process to exit gracefully...") - self.detect_process.join(timeout=30) - if self.detect_process.exitcode is None: - logging.info("Detection process didn't exit. Force killing...") - self.detect_process.kill() - self.detect_process.join() - logging.info("Detection process has exited...") - - def start_or_restart(self): - self.detection_start.value = 0.0 - if (self.detect_process is not None) and self.detect_process.is_alive(): - self.stop() - self.detect_process = util.Process( - target=run_detector, - name=f"detector:{self.name}", - args=( - self.name, - self.detection_queue, - self.out_events, - self.avg_inference_speed, - self.detection_start, - self.detector_config, - ), - ) - self.detect_process.daemon = True - self.detect_process.start() - - -class RemoteObjectDetector: +class RemoteObjectDetector(ObjectDetector): def __init__(self, name, labels, detection_queue, event, model_config, stop_event): self.labels = labels self.name = name @@ -233,3 +119,70 @@ class RemoteObjectDetector: def cleanup(self): self.shm.unlink() self.out_shm.unlink() + + +class ObjectDetectProcess(util.Process): + def __init__( + self, + detector_name: str, + detection_queue: mp.Queue, + out_events: dict[str, Event], + detector_config, + ): + super().__init__(name=f"frigate.detector:{detector_name}", daemon=True) + + self.detector_name = detector_name + self.detection_queue = detection_queue + self.out_events = out_events + self.detector_config = detector_config + + self.avg_inference_speed = mp.Value("d", 0.01) + self.detection_start = mp.Value("d", 0.0) + + def run(self): + self.logger.info(f"Starting detection process: {self.pid}") + + frame_manager = SharedMemoryFrameManager() + object_detector = LocalObjectDetector(detector_config=self.detector_config) + + outputs = {} + for event_name in self.out_events.keys(): + out_shm = mp.shared_memory.SharedMemory( + name=f"out-{event_name}", create=False + ) + out_np = np.ndarray((20, 6), dtype=np.float32, buffer=out_shm.buf) + outputs[event_name] = {"shm": out_shm, "np": out_np} + + while not self.stop_event.is_set(): + try: + connection_id = self.detection_queue.get(timeout=1) + except queue.Empty: + continue + input_frame = frame_manager.get( + connection_id, + ( + 1, + self.detector_config.model.height, + self.detector_config.model.width, + 3, + ), + ) + + if input_frame is None: + self.logger.warning(f"Failed to get frame {connection_id} from SHM") + continue + + # detect and send the output + self.detection_start.value = datetime.datetime.now().timestamp() + detections = object_detector.detect_raw(input_frame) + duration = datetime.datetime.now().timestamp() - self.detection_start.value + frame_manager.close(connection_id) + outputs[connection_id]["np"][:] = detections[:] + self.out_events[connection_id].set() + self.detection_start.value = 0.0 + + self.avg_inference_speed.value = ( + self.avg_inference_speed.value * 9 + duration + ) / 10 + + self.logger.info("Exited detection process...") diff --git a/frigate/stats/util.py b/frigate/stats/util.py index ad2f59f73..34939a268 100644 --- a/frigate/stats/util.py +++ b/frigate/stats/util.py @@ -265,7 +265,6 @@ def stats_snapshot( stats["detectors"] = {} for name, detector in stats_tracking["detectors"].items(): - pid = detector.detect_process.pid if detector.detect_process else None stats["detectors"][name] = { "inference_speed": round(detector.avg_inference_speed.value * 1000, 2), # type: ignore[attr-defined] # issue https://github.com/python/typeshed/issues/8799 @@ -273,7 +272,7 @@ def stats_snapshot( "detection_start": detector.detection_start.value, # type: ignore[attr-defined] # issue https://github.com/python/typeshed/issues/8799 # from mypy 0.981 onwards - "pid": pid, + "pid": detector.pid, } stats["detection_fps"] = round(total_detection_fps, 2) diff --git a/frigate/watchdog.py b/frigate/watchdog.py index c6d55d18c..dae687390 100644 --- a/frigate/watchdog.py +++ b/frigate/watchdog.py @@ -1,7 +1,6 @@ import datetime import logging import threading -import time from multiprocessing.synchronize import Event as MpEvent from frigate.object_detection import ObjectDetectProcess @@ -18,12 +17,13 @@ class FrigateWatchdog(threading.Thread): self.stop_event = stop_event def run(self) -> None: - time.sleep(10) while not self.stop_event.wait(10): now = datetime.datetime.now().timestamp() # check the detection processes - for detector in self.detectors.values(): + for detector_name in list(self.detectors.keys()): + detector = self.detectors[detector_name] + detection_start = detector.detection_start.value # type: ignore[attr-defined] # issue https://github.com/python/typeshed/issues/8799 # from mypy 0.981 onwards @@ -31,12 +31,29 @@ class FrigateWatchdog(threading.Thread): logger.info( "Detection appears to be stuck. Restarting detection process..." ) - detector.start_or_restart() - elif ( - detector.detect_process is not None - and not detector.detect_process.is_alive() - ): + + # Stop the detector + detector.terminate() + logger.info("Waiting for detection process to exit gracefully...") + detector.join(timeout=30) + if detector.exitcode is None: + logger.info("Detection process didn't exit. Force killing...") + detector.kill() + detector.join() + + # Start the detector + detector = ObjectDetectProcess( + detector_name, + detector.detection_queue, + detector.out_events, + detector.detector_config, + ) + detector.start() + + elif not detector.is_alive(): logger.info("Detection appears to have stopped. Exiting Frigate...") restart_frigate() + self.detectors[detector_name] = detector + logger.info("Exiting watchdog...")