Introduce custom mp.Process subclass

In preparation to switch the multiprocessing startup method away from
"fork", we cannot rely on os.fork cloning the log state at fork time.
Instead, we have to set up logging before we run the business logic of
each process.
This commit is contained in:
George Tsiamasiotis 2024-09-24 16:36:41 +03:00
parent 5c421375c0
commit 30a3c6dc86
8 changed files with 93 additions and 58 deletions

View File

@ -4,6 +4,7 @@ from statistics import mean
import numpy as np
import frigate.util as util
from frigate.config import DetectorTypeEnum
from frigate.object_detection import (
ObjectDetectProcess,
@ -90,7 +91,7 @@ edgetpu_process_2 = ObjectDetectProcess(
)
for x in range(0, 10):
camera_process = mp.Process(
camera_process = util.Process(
target=start, args=(x, 300, detection_queue, events[str(x)])
)
camera_process.daemon = True

View File

@ -1,6 +1,5 @@
import argparse
import faulthandler
import logging
import signal
import sys
import threading
@ -9,29 +8,20 @@ from pydantic import ValidationError
from frigate.app import FrigateApp
from frigate.config import FrigateConfig
from frigate.log import log_thread
from frigate.log import setup_logging
def main() -> None:
faulthandler.enable()
# Clear all existing handlers.
logging.basicConfig(
level=logging.INFO,
handlers=[],
force=True,
)
# Setup the logging thread
setup_logging()
threading.current_thread().name = "frigate"
# Make sure we exit cleanly on SIGTERM.
signal.signal(signal.SIGTERM, lambda sig, frame: sys.exit())
run()
@log_thread()
def run() -> None:
# Parse the cli arguments.
parser = argparse.ArgumentParser(
prog="Frigate",

View File

@ -14,6 +14,7 @@ from peewee_migrate import Router
from playhouse.sqlite_ext import SqliteExtDatabase
from playhouse.sqliteq import SqliteQueueDatabase
import frigate.util as util
from frigate.api.auth import hash_password
from frigate.api.fastapi_app import create_fastapi_app
from frigate.comms.config_updater import ConfigPublisher
@ -251,7 +252,7 @@ class FrigateApp:
self.processes["go2rtc"] = proc.info["pid"]
def init_recording_manager(self) -> None:
recording_process = mp.Process(
recording_process = util.Process(
target=manage_recordings,
name="recording_manager",
args=(self.config,),
@ -263,7 +264,7 @@ class FrigateApp:
logger.info(f"Recording process started: {recording_process.pid}")
def init_review_segment_manager(self) -> None:
review_segment_process = mp.Process(
review_segment_process = util.Process(
target=manage_review_segments,
name="review_segment_manager",
args=(self.config,),
@ -281,7 +282,7 @@ class FrigateApp:
# Create a client for other processes to use
self.embeddings = EmbeddingsContext()
embedding_process = mp.Process(
embedding_process = util.Process(
target=manage_embeddings,
name="embeddings_manager",
args=(self.config,),
@ -422,7 +423,7 @@ class FrigateApp:
self.detected_frames_processor.start()
def start_video_output_processor(self) -> None:
output_processor = mp.Process(
output_processor = util.Process(
target=output_frames,
name="output_processor",
args=(self.config,),
@ -451,7 +452,7 @@ class FrigateApp:
logger.info(f"Camera processor not started for disabled camera {name}")
continue
camera_process = mp.Process(
camera_process = util.Process(
target=track_camera,
name=f"camera_processor:{name}",
args=(
@ -466,8 +467,8 @@ class FrigateApp:
self.ptz_metrics[name],
self.region_grids[name],
),
daemon=True,
)
camera_process.daemon = True
self.camera_metrics[name]["process"] = camera_process
camera_process.start()
logger.info(f"Camera processor started for {name}: {camera_process.pid}")
@ -478,7 +479,7 @@ class FrigateApp:
logger.info(f"Capture process not started for disabled camera {name}")
continue
capture_process = mp.Process(
capture_process = util.Process(
target=capture_camera,
name=f"camera_capture:{name}",
args=(name, config, self.shm_frame_count(), self.camera_metrics[name]),

View File

@ -2,7 +2,6 @@
import datetime
import logging
import multiprocessing as mp
import signal
import sys
import threading
@ -12,6 +11,7 @@ from typing import Tuple
import numpy as np
import requests
import frigate.util as util
from frigate.comms.config_updater import ConfigSubscriber
from frigate.comms.detections_updater import DetectionPublisher, DetectionTypeEnum
from frigate.comms.inter_process import InterProcessRequestor
@ -65,7 +65,7 @@ def get_ffmpeg_command(ffmpeg: FfmpegConfig) -> list[str]:
)
class AudioProcessor(mp.Process):
class AudioProcessor(util.Process):
def __init__(
self,
config: FrigateConfig,

View File

@ -5,13 +5,9 @@ import os
import sys
import threading
from collections import deque
from contextlib import AbstractContextManager, ContextDecorator
from logging.handlers import QueueHandler, QueueListener
from types import TracebackType
from typing import Deque, Optional
from typing_extensions import Self
from frigate.util.builtin import clean_camera_user_pass
LOG_HANDLER = logging.StreamHandler()
@ -28,45 +24,33 @@ LOG_HANDLER.addFilter(
)
)
log_listener: Optional[QueueListener] = None
class log_thread(AbstractContextManager, ContextDecorator):
def __init__(self, *, handler: logging.Handler = LOG_HANDLER):
super().__init__()
self._handler = handler
def setup_logging() -> None:
global log_listener
log_queue: mp.Queue = mp.Queue()
self._queue_handler = QueueHandler(log_queue)
log_queue: mp.Queue = mp.Queue()
log_listener = QueueListener(log_queue, LOG_HANDLER, respect_handler_level=True)
self._log_listener = QueueListener(
log_queue, self._handler, respect_handler_level=True
)
atexit.register(_stop_logging)
log_listener.start()
@property
def handler(self) -> logging.Handler:
return self._handler
logging.basicConfig(
level=logging.INFO,
handlers=[],
force=True,
)
def _stop_thread(self) -> None:
self._log_listener.stop()
logging.getLogger().addHandler(QueueHandler(log_listener.queue))
def __enter__(self) -> Self:
logging.getLogger().addHandler(self._queue_handler)
atexit.register(self._stop_thread)
self._log_listener.start()
def _stop_logging() -> None:
global log_listener
return self
def __exit__(
self,
exc_type: Optional[type[BaseException]],
exc_info: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
logging.getLogger().removeHandler(self._queue_handler)
atexit.unregister(self._stop_thread)
self._stop_thread()
if log_listener is not None:
log_listener.stop()
log_listener = None
# When a multiprocessing.Process exits, python tries to flush stdout and stderr. However, if the

View File

@ -10,6 +10,7 @@ from abc import ABC, abstractmethod
import numpy as np
from setproctitle import setproctitle
import frigate.util as util
from frigate.detectors import create_detector
from frigate.detectors.detector_config import InputTensorEnum
from frigate.util.builtin import EventsPerSecond, load_labels
@ -168,7 +169,7 @@ class ObjectDetectProcess:
self.detection_start.value = 0.0
if (self.detect_process is not None) and self.detect_process.is_alive():
self.stop()
self.detect_process = mp.Process(
self.detect_process = util.Process(
target=run_detector,
name=f"detector:{self.name}",
args=(

3
frigate/util/__init__.py Normal file
View File

@ -0,0 +1,3 @@
from .process import Process
__all__ = ["Process"]

55
frigate/util/process.py Normal file
View File

@ -0,0 +1,55 @@
import logging
import multiprocessing as mp
from functools import wraps
from logging.handlers import QueueHandler
from typing import Any
import frigate.log
class BaseProcess(mp.Process):
def __init__(self, **kwargs):
super().__init__(**kwargs)
def start(self, *args, **kwargs):
self.before_start()
super().start(*args, **kwargs)
self.after_start()
def __getattribute__(self, name: str) -> Any:
if name == "run":
run = super().__getattribute__("run")
@wraps(run)
def run_wrapper(*args, **kwargs):
try:
self.before_run()
return run(*args, **kwargs)
finally:
self.after_run()
return run_wrapper
return super().__getattribute__(name)
def before_start(self) -> None:
pass
def after_start(self) -> None:
pass
def before_run(self) -> None:
pass
def after_run(self) -> None:
pass
class Process(BaseProcess):
def before_start(self) -> None:
self.__log_queue = frigate.log.log_listener.queue
def before_run(self) -> None:
if self.__log_queue:
logging.basicConfig(handlers=[], force=True)
logging.getLogger().addHandler(QueueHandler(self.__log_queue))