From 30a3c6dc86d51f0f8936893c965f0d70dbfb0d6c Mon Sep 17 00:00:00 2001 From: George Tsiamasiotis Date: Tue, 24 Sep 2024 16:36:41 +0300 Subject: [PATCH] Introduce custom mp.Process subclass In preparation to switch the multiprocessing startup method away from "fork", we cannot rely on os.fork cloning the log state at fork time. Instead, we have to set up logging before we run the business logic of each process. --- benchmark.py | 3 +- frigate/__main__.py | 16 ++--------- frigate/app.py | 15 +++++----- frigate/events/audio.py | 4 +-- frigate/log.py | 52 ++++++++++++----------------------- frigate/object_detection.py | 3 +- frigate/util/__init__.py | 3 ++ frigate/util/process.py | 55 +++++++++++++++++++++++++++++++++++++ 8 files changed, 93 insertions(+), 58 deletions(-) create mode 100644 frigate/util/__init__.py create mode 100644 frigate/util/process.py diff --git a/benchmark.py b/benchmark.py index 8ba22d093..5c0c68419 100755 --- a/benchmark.py +++ b/benchmark.py @@ -4,6 +4,7 @@ from statistics import mean import numpy as np +import frigate.util as util from frigate.config import DetectorTypeEnum from frigate.object_detection import ( ObjectDetectProcess, @@ -90,7 +91,7 @@ edgetpu_process_2 = ObjectDetectProcess( ) for x in range(0, 10): - camera_process = mp.Process( + camera_process = util.Process( target=start, args=(x, 300, detection_queue, events[str(x)]) ) camera_process.daemon = True diff --git a/frigate/__main__.py b/frigate/__main__.py index ccd2594e2..ec82739e6 100644 --- a/frigate/__main__.py +++ b/frigate/__main__.py @@ -1,6 +1,5 @@ import argparse import faulthandler -import logging import signal import sys import threading @@ -9,29 +8,20 @@ from pydantic import ValidationError from frigate.app import FrigateApp from frigate.config import FrigateConfig -from frigate.log import log_thread +from frigate.log import setup_logging def main() -> None: faulthandler.enable() - # Clear all existing handlers. - logging.basicConfig( - level=logging.INFO, - handlers=[], - force=True, - ) + # Setup the logging thread + setup_logging() threading.current_thread().name = "frigate" # Make sure we exit cleanly on SIGTERM. signal.signal(signal.SIGTERM, lambda sig, frame: sys.exit()) - run() - - -@log_thread() -def run() -> None: # Parse the cli arguments. parser = argparse.ArgumentParser( prog="Frigate", diff --git a/frigate/app.py b/frigate/app.py index e233820ee..02e3784dc 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -14,6 +14,7 @@ from peewee_migrate import Router from playhouse.sqlite_ext import SqliteExtDatabase from playhouse.sqliteq import SqliteQueueDatabase +import frigate.util as util from frigate.api.auth import hash_password from frigate.api.fastapi_app import create_fastapi_app from frigate.comms.config_updater import ConfigPublisher @@ -251,7 +252,7 @@ class FrigateApp: self.processes["go2rtc"] = proc.info["pid"] def init_recording_manager(self) -> None: - recording_process = mp.Process( + recording_process = util.Process( target=manage_recordings, name="recording_manager", args=(self.config,), @@ -263,7 +264,7 @@ class FrigateApp: logger.info(f"Recording process started: {recording_process.pid}") def init_review_segment_manager(self) -> None: - review_segment_process = mp.Process( + review_segment_process = util.Process( target=manage_review_segments, name="review_segment_manager", args=(self.config,), @@ -281,7 +282,7 @@ class FrigateApp: # Create a client for other processes to use self.embeddings = EmbeddingsContext() - embedding_process = mp.Process( + embedding_process = util.Process( target=manage_embeddings, name="embeddings_manager", args=(self.config,), @@ -422,7 +423,7 @@ class FrigateApp: self.detected_frames_processor.start() def start_video_output_processor(self) -> None: - output_processor = mp.Process( + output_processor = util.Process( target=output_frames, name="output_processor", args=(self.config,), @@ -451,7 +452,7 @@ class FrigateApp: logger.info(f"Camera processor not started for disabled camera {name}") continue - camera_process = mp.Process( + camera_process = util.Process( target=track_camera, name=f"camera_processor:{name}", args=( @@ -466,8 +467,8 @@ class FrigateApp: self.ptz_metrics[name], self.region_grids[name], ), + daemon=True, ) - camera_process.daemon = True self.camera_metrics[name]["process"] = camera_process camera_process.start() logger.info(f"Camera processor started for {name}: {camera_process.pid}") @@ -478,7 +479,7 @@ class FrigateApp: logger.info(f"Capture process not started for disabled camera {name}") continue - capture_process = mp.Process( + capture_process = util.Process( target=capture_camera, name=f"camera_capture:{name}", args=(name, config, self.shm_frame_count(), self.camera_metrics[name]), diff --git a/frigate/events/audio.py b/frigate/events/audio.py index e617d29c6..207f73767 100644 --- a/frigate/events/audio.py +++ b/frigate/events/audio.py @@ -2,7 +2,6 @@ import datetime import logging -import multiprocessing as mp import signal import sys import threading @@ -12,6 +11,7 @@ from typing import Tuple import numpy as np import requests +import frigate.util as util from frigate.comms.config_updater import ConfigSubscriber from frigate.comms.detections_updater import DetectionPublisher, DetectionTypeEnum from frigate.comms.inter_process import InterProcessRequestor @@ -65,7 +65,7 @@ def get_ffmpeg_command(ffmpeg: FfmpegConfig) -> list[str]: ) -class AudioProcessor(mp.Process): +class AudioProcessor(util.Process): def __init__( self, config: FrigateConfig, diff --git a/frigate/log.py b/frigate/log.py index ec60b1b71..a657fbb7d 100644 --- a/frigate/log.py +++ b/frigate/log.py @@ -5,13 +5,9 @@ import os import sys import threading from collections import deque -from contextlib import AbstractContextManager, ContextDecorator from logging.handlers import QueueHandler, QueueListener -from types import TracebackType from typing import Deque, Optional -from typing_extensions import Self - from frigate.util.builtin import clean_camera_user_pass LOG_HANDLER = logging.StreamHandler() @@ -28,45 +24,33 @@ LOG_HANDLER.addFilter( ) ) +log_listener: Optional[QueueListener] = None -class log_thread(AbstractContextManager, ContextDecorator): - def __init__(self, *, handler: logging.Handler = LOG_HANDLER): - super().__init__() - self._handler = handler +def setup_logging() -> None: + global log_listener - log_queue: mp.Queue = mp.Queue() - self._queue_handler = QueueHandler(log_queue) + log_queue: mp.Queue = mp.Queue() + log_listener = QueueListener(log_queue, LOG_HANDLER, respect_handler_level=True) - self._log_listener = QueueListener( - log_queue, self._handler, respect_handler_level=True - ) + atexit.register(_stop_logging) + log_listener.start() - @property - def handler(self) -> logging.Handler: - return self._handler + logging.basicConfig( + level=logging.INFO, + handlers=[], + force=True, + ) - def _stop_thread(self) -> None: - self._log_listener.stop() + logging.getLogger().addHandler(QueueHandler(log_listener.queue)) - def __enter__(self) -> Self: - logging.getLogger().addHandler(self._queue_handler) - atexit.register(self._stop_thread) - self._log_listener.start() +def _stop_logging() -> None: + global log_listener - return self - - def __exit__( - self, - exc_type: Optional[type[BaseException]], - exc_info: Optional[BaseException], - exc_tb: Optional[TracebackType], - ) -> None: - logging.getLogger().removeHandler(self._queue_handler) - - atexit.unregister(self._stop_thread) - self._stop_thread() + if log_listener is not None: + log_listener.stop() + log_listener = None # When a multiprocessing.Process exits, python tries to flush stdout and stderr. However, if the diff --git a/frigate/object_detection.py b/frigate/object_detection.py index d5b8b0cfe..eac019a7a 100644 --- a/frigate/object_detection.py +++ b/frigate/object_detection.py @@ -10,6 +10,7 @@ from abc import ABC, abstractmethod import numpy as np from setproctitle import setproctitle +import frigate.util as util from frigate.detectors import create_detector from frigate.detectors.detector_config import InputTensorEnum from frigate.util.builtin import EventsPerSecond, load_labels @@ -168,7 +169,7 @@ class ObjectDetectProcess: self.detection_start.value = 0.0 if (self.detect_process is not None) and self.detect_process.is_alive(): self.stop() - self.detect_process = mp.Process( + self.detect_process = util.Process( target=run_detector, name=f"detector:{self.name}", args=( diff --git a/frigate/util/__init__.py b/frigate/util/__init__.py new file mode 100644 index 000000000..307bf4f8b --- /dev/null +++ b/frigate/util/__init__.py @@ -0,0 +1,3 @@ +from .process import Process + +__all__ = ["Process"] diff --git a/frigate/util/process.py b/frigate/util/process.py new file mode 100644 index 000000000..1a14cae58 --- /dev/null +++ b/frigate/util/process.py @@ -0,0 +1,55 @@ +import logging +import multiprocessing as mp +from functools import wraps +from logging.handlers import QueueHandler +from typing import Any + +import frigate.log + + +class BaseProcess(mp.Process): + def __init__(self, **kwargs): + super().__init__(**kwargs) + + def start(self, *args, **kwargs): + self.before_start() + super().start(*args, **kwargs) + self.after_start() + + def __getattribute__(self, name: str) -> Any: + if name == "run": + run = super().__getattribute__("run") + + @wraps(run) + def run_wrapper(*args, **kwargs): + try: + self.before_run() + return run(*args, **kwargs) + finally: + self.after_run() + + return run_wrapper + + return super().__getattribute__(name) + + def before_start(self) -> None: + pass + + def after_start(self) -> None: + pass + + def before_run(self) -> None: + pass + + def after_run(self) -> None: + pass + + +class Process(BaseProcess): + def before_start(self) -> None: + self.__log_queue = frigate.log.log_listener.queue + + def before_run(self) -> None: + if self.__log_queue: + logging.basicConfig(handlers=[], force=True) + logging.getLogger().addHandler(QueueHandler(self.__log_queue))