diff --git a/benchmark.py b/benchmark.py index 8ba22d093..5c0c68419 100755 --- a/benchmark.py +++ b/benchmark.py @@ -4,6 +4,7 @@ from statistics import mean import numpy as np +import frigate.util as util from frigate.config import DetectorTypeEnum from frigate.object_detection import ( ObjectDetectProcess, @@ -90,7 +91,7 @@ edgetpu_process_2 = ObjectDetectProcess( ) for x in range(0, 10): - camera_process = mp.Process( + camera_process = util.Process( target=start, args=(x, 300, detection_queue, events[str(x)]) ) camera_process.daemon = True diff --git a/frigate/__main__.py b/frigate/__main__.py index ccd2594e2..ec82739e6 100644 --- a/frigate/__main__.py +++ b/frigate/__main__.py @@ -1,6 +1,5 @@ import argparse import faulthandler -import logging import signal import sys import threading @@ -9,29 +8,20 @@ from pydantic import ValidationError from frigate.app import FrigateApp from frigate.config import FrigateConfig -from frigate.log import log_thread +from frigate.log import setup_logging def main() -> None: faulthandler.enable() - # Clear all existing handlers. - logging.basicConfig( - level=logging.INFO, - handlers=[], - force=True, - ) + # Setup the logging thread + setup_logging() threading.current_thread().name = "frigate" # Make sure we exit cleanly on SIGTERM. signal.signal(signal.SIGTERM, lambda sig, frame: sys.exit()) - run() - - -@log_thread() -def run() -> None: # Parse the cli arguments. parser = argparse.ArgumentParser( prog="Frigate", diff --git a/frigate/app.py b/frigate/app.py index e233820ee..02e3784dc 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -14,6 +14,7 @@ from peewee_migrate import Router from playhouse.sqlite_ext import SqliteExtDatabase from playhouse.sqliteq import SqliteQueueDatabase +import frigate.util as util from frigate.api.auth import hash_password from frigate.api.fastapi_app import create_fastapi_app from frigate.comms.config_updater import ConfigPublisher @@ -251,7 +252,7 @@ class FrigateApp: self.processes["go2rtc"] = proc.info["pid"] def init_recording_manager(self) -> None: - recording_process = mp.Process( + recording_process = util.Process( target=manage_recordings, name="recording_manager", args=(self.config,), @@ -263,7 +264,7 @@ class FrigateApp: logger.info(f"Recording process started: {recording_process.pid}") def init_review_segment_manager(self) -> None: - review_segment_process = mp.Process( + review_segment_process = util.Process( target=manage_review_segments, name="review_segment_manager", args=(self.config,), @@ -281,7 +282,7 @@ class FrigateApp: # Create a client for other processes to use self.embeddings = EmbeddingsContext() - embedding_process = mp.Process( + embedding_process = util.Process( target=manage_embeddings, name="embeddings_manager", args=(self.config,), @@ -422,7 +423,7 @@ class FrigateApp: self.detected_frames_processor.start() def start_video_output_processor(self) -> None: - output_processor = mp.Process( + output_processor = util.Process( target=output_frames, name="output_processor", args=(self.config,), @@ -451,7 +452,7 @@ class FrigateApp: logger.info(f"Camera processor not started for disabled camera {name}") continue - camera_process = mp.Process( + camera_process = util.Process( target=track_camera, name=f"camera_processor:{name}", args=( @@ -466,8 +467,8 @@ class FrigateApp: self.ptz_metrics[name], self.region_grids[name], ), + daemon=True, ) - camera_process.daemon = True self.camera_metrics[name]["process"] = camera_process camera_process.start() logger.info(f"Camera processor started for {name}: {camera_process.pid}") @@ -478,7 +479,7 @@ class FrigateApp: logger.info(f"Capture process not started for disabled camera {name}") continue - capture_process = mp.Process( + capture_process = util.Process( target=capture_camera, name=f"camera_capture:{name}", args=(name, config, self.shm_frame_count(), self.camera_metrics[name]), diff --git a/frigate/events/audio.py b/frigate/events/audio.py index e617d29c6..207f73767 100644 --- a/frigate/events/audio.py +++ b/frigate/events/audio.py @@ -2,7 +2,6 @@ import datetime import logging -import multiprocessing as mp import signal import sys import threading @@ -12,6 +11,7 @@ from typing import Tuple import numpy as np import requests +import frigate.util as util from frigate.comms.config_updater import ConfigSubscriber from frigate.comms.detections_updater import DetectionPublisher, DetectionTypeEnum from frigate.comms.inter_process import InterProcessRequestor @@ -65,7 +65,7 @@ def get_ffmpeg_command(ffmpeg: FfmpegConfig) -> list[str]: ) -class AudioProcessor(mp.Process): +class AudioProcessor(util.Process): def __init__( self, config: FrigateConfig, diff --git a/frigate/log.py b/frigate/log.py index ec60b1b71..a657fbb7d 100644 --- a/frigate/log.py +++ b/frigate/log.py @@ -5,13 +5,9 @@ import os import sys import threading from collections import deque -from contextlib import AbstractContextManager, ContextDecorator from logging.handlers import QueueHandler, QueueListener -from types import TracebackType from typing import Deque, Optional -from typing_extensions import Self - from frigate.util.builtin import clean_camera_user_pass LOG_HANDLER = logging.StreamHandler() @@ -28,45 +24,33 @@ LOG_HANDLER.addFilter( ) ) +log_listener: Optional[QueueListener] = None -class log_thread(AbstractContextManager, ContextDecorator): - def __init__(self, *, handler: logging.Handler = LOG_HANDLER): - super().__init__() - self._handler = handler +def setup_logging() -> None: + global log_listener - log_queue: mp.Queue = mp.Queue() - self._queue_handler = QueueHandler(log_queue) + log_queue: mp.Queue = mp.Queue() + log_listener = QueueListener(log_queue, LOG_HANDLER, respect_handler_level=True) - self._log_listener = QueueListener( - log_queue, self._handler, respect_handler_level=True - ) + atexit.register(_stop_logging) + log_listener.start() - @property - def handler(self) -> logging.Handler: - return self._handler + logging.basicConfig( + level=logging.INFO, + handlers=[], + force=True, + ) - def _stop_thread(self) -> None: - self._log_listener.stop() + logging.getLogger().addHandler(QueueHandler(log_listener.queue)) - def __enter__(self) -> Self: - logging.getLogger().addHandler(self._queue_handler) - atexit.register(self._stop_thread) - self._log_listener.start() +def _stop_logging() -> None: + global log_listener - return self - - def __exit__( - self, - exc_type: Optional[type[BaseException]], - exc_info: Optional[BaseException], - exc_tb: Optional[TracebackType], - ) -> None: - logging.getLogger().removeHandler(self._queue_handler) - - atexit.unregister(self._stop_thread) - self._stop_thread() + if log_listener is not None: + log_listener.stop() + log_listener = None # When a multiprocessing.Process exits, python tries to flush stdout and stderr. However, if the diff --git a/frigate/object_detection.py b/frigate/object_detection.py index d5b8b0cfe..eac019a7a 100644 --- a/frigate/object_detection.py +++ b/frigate/object_detection.py @@ -10,6 +10,7 @@ from abc import ABC, abstractmethod import numpy as np from setproctitle import setproctitle +import frigate.util as util from frigate.detectors import create_detector from frigate.detectors.detector_config import InputTensorEnum from frigate.util.builtin import EventsPerSecond, load_labels @@ -168,7 +169,7 @@ class ObjectDetectProcess: self.detection_start.value = 0.0 if (self.detect_process is not None) and self.detect_process.is_alive(): self.stop() - self.detect_process = mp.Process( + self.detect_process = util.Process( target=run_detector, name=f"detector:{self.name}", args=( diff --git a/frigate/util/__init__.py b/frigate/util/__init__.py new file mode 100644 index 000000000..307bf4f8b --- /dev/null +++ b/frigate/util/__init__.py @@ -0,0 +1,3 @@ +from .process import Process + +__all__ = ["Process"] diff --git a/frigate/util/process.py b/frigate/util/process.py new file mode 100644 index 000000000..1a14cae58 --- /dev/null +++ b/frigate/util/process.py @@ -0,0 +1,55 @@ +import logging +import multiprocessing as mp +from functools import wraps +from logging.handlers import QueueHandler +from typing import Any + +import frigate.log + + +class BaseProcess(mp.Process): + def __init__(self, **kwargs): + super().__init__(**kwargs) + + def start(self, *args, **kwargs): + self.before_start() + super().start(*args, **kwargs) + self.after_start() + + def __getattribute__(self, name: str) -> Any: + if name == "run": + run = super().__getattribute__("run") + + @wraps(run) + def run_wrapper(*args, **kwargs): + try: + self.before_run() + return run(*args, **kwargs) + finally: + self.after_run() + + return run_wrapper + + return super().__getattribute__(name) + + def before_start(self) -> None: + pass + + def after_start(self) -> None: + pass + + def before_run(self) -> None: + pass + + def after_run(self) -> None: + pass + + +class Process(BaseProcess): + def before_start(self) -> None: + self.__log_queue = frigate.log.log_listener.queue + + def before_run(self) -> None: + if self.__log_queue: + logging.basicConfig(handlers=[], force=True) + logging.getLogger().addHandler(QueueHandler(self.__log_queue))