Make ObjectDetectProcess into an actual process

This commit is contained in:
George Tsiamasiotis 2024-10-02 13:04:37 +03:00
parent 3badc757cc
commit 3987d09550
4 changed files with 97 additions and 126 deletions

View File

@ -311,6 +311,7 @@ class FrigateApp:
}, },
detector_config, detector_config,
) )
self.detectors[name].start()
def start_ptz_autotracker(self) -> None: def start_ptz_autotracker(self) -> None:
self.ptz_autotracker_thread = PtzAutoTrackerThread( self.ptz_autotracker_thread = PtzAutoTrackerThread(
@ -572,7 +573,8 @@ class FrigateApp:
# ensure the detectors are done # ensure the detectors are done
for detector in self.detectors.values(): for detector in self.detectors.values():
detector.stop() detector.terminate()
detector.join()
empty_and_close_queue(self.detection_queue) empty_and_close_queue(self.detection_queue)
logger.info("Detection queue closed") logger.info("Detection queue closed")

View File

@ -1,22 +1,17 @@
import datetime import datetime
import logging import logging
import multiprocessing as mp import multiprocessing as mp
import os
import queue import queue
import signal
import threading
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from multiprocessing.synchronize import Event from multiprocessing.synchronize import Event
import numpy as np import numpy as np
from setproctitle import setproctitle
from frigate import util from frigate import util
from frigate.detectors import create_detector from frigate.detectors import create_detector
from frigate.detectors.detector_config import InputTensorEnum from frigate.detectors.detector_config import InputTensorEnum
from frigate.util.builtin import EventsPerSecond, load_labels from frigate.util.builtin import EventsPerSecond, load_labels
from frigate.util.image import SharedMemoryFrameManager from frigate.util.image import SharedMemoryFrameManager
from frigate.util.services import listen
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -77,116 +72,7 @@ class LocalObjectDetector(ObjectDetector):
return self.detect_api.detect_raw(tensor_input=tensor_input) return self.detect_api.detect_raw(tensor_input=tensor_input)
def run_detector( class RemoteObjectDetector(ObjectDetector):
name: str,
detection_queue: mp.Queue,
out_events: dict[str, Event],
avg_speed,
start,
detector_config,
):
threading.current_thread().name = f"detector:{name}"
logger = logging.getLogger(f"detector.{name}")
logger.info(f"Starting detection process: {os.getpid()}")
setproctitle(f"frigate.detector.{name}")
listen()
stop_event = mp.Event()
def receiveSignal(signalNumber, frame):
stop_event.set()
signal.signal(signal.SIGTERM, receiveSignal)
signal.signal(signal.SIGINT, receiveSignal)
frame_manager = SharedMemoryFrameManager()
object_detector = LocalObjectDetector(detector_config=detector_config)
outputs = {}
for name in out_events.keys():
out_shm = mp.shared_memory.SharedMemory(name=f"out-{name}", create=False)
out_np = np.ndarray((20, 6), dtype=np.float32, buffer=out_shm.buf)
outputs[name] = {"shm": out_shm, "np": out_np}
while not stop_event.is_set():
try:
connection_id = detection_queue.get(timeout=1)
except queue.Empty:
continue
input_frame = frame_manager.get(
connection_id,
(1, detector_config.model.height, detector_config.model.width, 3),
)
if input_frame is None:
logger.warning(f"Failed to get frame {connection_id} from SHM")
continue
# detect and send the output
start.value = datetime.datetime.now().timestamp()
detections = object_detector.detect_raw(input_frame)
duration = datetime.datetime.now().timestamp() - start.value
frame_manager.close(connection_id)
outputs[connection_id]["np"][:] = detections[:]
out_events[connection_id].set()
start.value = 0.0
avg_speed.value = (avg_speed.value * 9 + duration) / 10
logger.info("Exited detection process...")
class ObjectDetectProcess:
def __init__(
self,
name,
detection_queue,
out_events,
detector_config,
):
self.name = name
self.out_events = out_events
self.detection_queue = detection_queue
self.avg_inference_speed = mp.Value("d", 0.01)
self.detection_start = mp.Value("d", 0.0)
self.detect_process = None
self.detector_config = detector_config
self.start_or_restart()
def stop(self):
# if the process has already exited on its own, just return
if self.detect_process and self.detect_process.exitcode:
return
self.detect_process.terminate()
logging.info("Waiting for detection process to exit gracefully...")
self.detect_process.join(timeout=30)
if self.detect_process.exitcode is None:
logging.info("Detection process didn't exit. Force killing...")
self.detect_process.kill()
self.detect_process.join()
logging.info("Detection process has exited...")
def start_or_restart(self):
self.detection_start.value = 0.0
if (self.detect_process is not None) and self.detect_process.is_alive():
self.stop()
self.detect_process = util.Process(
target=run_detector,
name=f"detector:{self.name}",
args=(
self.name,
self.detection_queue,
self.out_events,
self.avg_inference_speed,
self.detection_start,
self.detector_config,
),
)
self.detect_process.daemon = True
self.detect_process.start()
class RemoteObjectDetector:
def __init__(self, name, labels, detection_queue, event, model_config, stop_event): def __init__(self, name, labels, detection_queue, event, model_config, stop_event):
self.labels = labels self.labels = labels
self.name = name self.name = name
@ -233,3 +119,70 @@ class RemoteObjectDetector:
def cleanup(self): def cleanup(self):
self.shm.unlink() self.shm.unlink()
self.out_shm.unlink() self.out_shm.unlink()
class ObjectDetectProcess(util.Process):
def __init__(
self,
detector_name: str,
detection_queue: mp.Queue,
out_events: dict[str, Event],
detector_config,
):
super().__init__(name=f"frigate.detector:{detector_name}", daemon=True)
self.detector_name = detector_name
self.detection_queue = detection_queue
self.out_events = out_events
self.detector_config = detector_config
self.avg_inference_speed = mp.Value("d", 0.01)
self.detection_start = mp.Value("d", 0.0)
def run(self):
self.logger.info(f"Starting detection process: {self.pid}")
frame_manager = SharedMemoryFrameManager()
object_detector = LocalObjectDetector(detector_config=self.detector_config)
outputs = {}
for event_name in self.out_events.keys():
out_shm = mp.shared_memory.SharedMemory(
name=f"out-{event_name}", create=False
)
out_np = np.ndarray((20, 6), dtype=np.float32, buffer=out_shm.buf)
outputs[event_name] = {"shm": out_shm, "np": out_np}
while not self.stop_event.is_set():
try:
connection_id = self.detection_queue.get(timeout=1)
except queue.Empty:
continue
input_frame = frame_manager.get(
connection_id,
(
1,
self.detector_config.model.height,
self.detector_config.model.width,
3,
),
)
if input_frame is None:
self.logger.warning(f"Failed to get frame {connection_id} from SHM")
continue
# detect and send the output
self.detection_start.value = datetime.datetime.now().timestamp()
detections = object_detector.detect_raw(input_frame)
duration = datetime.datetime.now().timestamp() - self.detection_start.value
frame_manager.close(connection_id)
outputs[connection_id]["np"][:] = detections[:]
self.out_events[connection_id].set()
self.detection_start.value = 0.0
self.avg_inference_speed.value = (
self.avg_inference_speed.value * 9 + duration
) / 10
self.logger.info("Exited detection process...")

View File

@ -265,7 +265,6 @@ def stats_snapshot(
stats["detectors"] = {} stats["detectors"] = {}
for name, detector in stats_tracking["detectors"].items(): for name, detector in stats_tracking["detectors"].items():
pid = detector.detect_process.pid if detector.detect_process else None
stats["detectors"][name] = { stats["detectors"][name] = {
"inference_speed": round(detector.avg_inference_speed.value * 1000, 2), # type: ignore[attr-defined] "inference_speed": round(detector.avg_inference_speed.value * 1000, 2), # type: ignore[attr-defined]
# issue https://github.com/python/typeshed/issues/8799 # issue https://github.com/python/typeshed/issues/8799
@ -273,7 +272,7 @@ def stats_snapshot(
"detection_start": detector.detection_start.value, # type: ignore[attr-defined] "detection_start": detector.detection_start.value, # type: ignore[attr-defined]
# issue https://github.com/python/typeshed/issues/8799 # issue https://github.com/python/typeshed/issues/8799
# from mypy 0.981 onwards # from mypy 0.981 onwards
"pid": pid, "pid": detector.pid,
} }
stats["detection_fps"] = round(total_detection_fps, 2) stats["detection_fps"] = round(total_detection_fps, 2)

View File

@ -1,7 +1,6 @@
import datetime import datetime
import logging import logging
import threading import threading
import time
from multiprocessing.synchronize import Event as MpEvent from multiprocessing.synchronize import Event as MpEvent
from frigate.object_detection import ObjectDetectProcess from frigate.object_detection import ObjectDetectProcess
@ -18,12 +17,13 @@ class FrigateWatchdog(threading.Thread):
self.stop_event = stop_event self.stop_event = stop_event
def run(self) -> None: def run(self) -> None:
time.sleep(10)
while not self.stop_event.wait(10): while not self.stop_event.wait(10):
now = datetime.datetime.now().timestamp() now = datetime.datetime.now().timestamp()
# check the detection processes # check the detection processes
for detector in self.detectors.values(): for detector_name in list(self.detectors.keys()):
detector = self.detectors[detector_name]
detection_start = detector.detection_start.value # type: ignore[attr-defined] detection_start = detector.detection_start.value # type: ignore[attr-defined]
# issue https://github.com/python/typeshed/issues/8799 # issue https://github.com/python/typeshed/issues/8799
# from mypy 0.981 onwards # from mypy 0.981 onwards
@ -31,12 +31,29 @@ class FrigateWatchdog(threading.Thread):
logger.info( logger.info(
"Detection appears to be stuck. Restarting detection process..." "Detection appears to be stuck. Restarting detection process..."
) )
detector.start_or_restart()
elif ( # Stop the detector
detector.detect_process is not None detector.terminate()
and not detector.detect_process.is_alive() logger.info("Waiting for detection process to exit gracefully...")
): detector.join(timeout=30)
if detector.exitcode is None:
logger.info("Detection process didn't exit. Force killing...")
detector.kill()
detector.join()
# Start the detector
detector = ObjectDetectProcess(
detector_name,
detector.detection_queue,
detector.out_events,
detector.detector_config,
)
detector.start()
elif not detector.is_alive():
logger.info("Detection appears to have stopped. Exiting Frigate...") logger.info("Detection appears to have stopped. Exiting Frigate...")
restart_frigate() restart_frigate()
self.detectors[detector_name] = detector
logger.info("Exiting watchdog...") logger.info("Exiting watchdog...")