Move to using process for logging

This commit is contained in:
Nicolas Mowen 2025-06-12 10:42:10 -06:00
parent 9414a11df4
commit bd2cc18260
8 changed files with 102 additions and 95 deletions

View File

@ -16,10 +16,11 @@ from frigate.util.config import find_config_file
def main() -> None: def main() -> None:
manager = mp.Manager()
faulthandler.enable() faulthandler.enable()
# Setup the logging thread # Setup the logging thread
setup_logging() setup_logging(manager)
threading.current_thread().name = "frigate" threading.current_thread().name = "frigate"
@ -109,7 +110,7 @@ def main() -> None:
sys.exit(0) sys.exit(0)
# Run the main application. # Run the main application.
FrigateApp(config).start() FrigateApp(config, manager).start()
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -5,7 +5,7 @@ import os
import secrets import secrets
import shutil import shutil
from multiprocessing import Queue from multiprocessing import Queue
from multiprocessing.managers import DictProxy from multiprocessing.managers import DictProxy, SyncManager
from multiprocessing.synchronize import Event as MpEvent from multiprocessing.synchronize import Event as MpEvent
from typing import Optional from typing import Optional
@ -80,15 +80,15 @@ logger = logging.getLogger(__name__)
class FrigateApp: class FrigateApp:
def __init__(self, config: FrigateConfig) -> None: def __init__(self, config: FrigateConfig, manager: SyncManager) -> None:
self.metrics_manager = manager
self.audio_process: Optional[mp.Process] = None self.audio_process: Optional[mp.Process] = None
self.stop_event: MpEvent = mp.Event() self.stop_event: MpEvent = mp.Event()
self.detection_queue: Queue = mp.Queue() self.detection_queue: Queue = mp.Queue()
self.detectors: dict[str, ObjectDetectProcess] = {} self.detectors: dict[str, ObjectDetectProcess] = {}
self.detection_shms: list[mp.shared_memory.SharedMemory] = [] self.detection_shms: list[mp.shared_memory.SharedMemory] = []
self.log_queue: Queue = mp.Queue() self.log_queue: Queue = mp.Queue()
self.metrics_manager = mp.Manager() self.camera_metrics: DictProxy = self.metrics_manager.dict()
self.camera_metrics: DictProxy[str, CameraMetrics] = self.metrics_manager.dict()
self.embeddings_metrics: DataProcessorMetrics | None = ( self.embeddings_metrics: DataProcessorMetrics | None = (
DataProcessorMetrics( DataProcessorMetrics(
self.metrics_manager, list(config.classification.custom.keys()) self.metrics_manager, list(config.classification.custom.keys())
@ -658,7 +658,6 @@ class FrigateApp:
self.stats_emitter.join() self.stats_emitter.join()
self.frigate_watchdog.join() self.frigate_watchdog.join()
self.db.stop() self.db.stop()
self.metrics_manager.shutdown()
# Save embeddings stats to disk # Save embeddings stats to disk
if self.embeddings: if self.embeddings:
@ -676,7 +675,6 @@ class FrigateApp:
shm.close() shm.close()
shm.unlink() shm.unlink()
# exit the mp Manager process
_stop_logging() _stop_logging()
self.metrics_manager.shutdown()
os._exit(os.EX_OK) os._exit(os.EX_OK)

View File

@ -33,7 +33,7 @@ class CameraMaintainer(threading.Thread):
config: FrigateConfig, config: FrigateConfig,
detection_queue: Queue, detection_queue: Queue,
detected_frames_queue: Queue, detected_frames_queue: Queue,
camera_metrics: DictProxy[str, CameraMetrics], camera_metrics: DictProxy,
ptz_metrics: dict[str, PTZMetrics], ptz_metrics: dict[str, PTZMetrics],
stop_event: MpEvent, stop_event: MpEvent,
): ):

View File

@ -13,6 +13,7 @@ import numpy as np
import frigate.util as util import frigate.util as util
from frigate.camera import CameraMetrics from frigate.camera import CameraMetrics
from logging.handlers import QueueHandler
from frigate.comms.detections_updater import DetectionPublisher, DetectionTypeEnum from frigate.comms.detections_updater import DetectionPublisher, DetectionTypeEnum
from frigate.comms.event_metadata_updater import ( from frigate.comms.event_metadata_updater import (
EventMetadataPublisher, EventMetadataPublisher,
@ -84,7 +85,7 @@ class AudioProcessor(util.Process):
self, self,
config: FrigateConfig, config: FrigateConfig,
cameras: list[CameraConfig], cameras: list[CameraConfig],
camera_metrics: DictProxy[str, CameraMetrics], camera_metrics: DictProxy,
): ):
super().__init__(name="frigate.audio_manager", daemon=True) super().__init__(name="frigate.audio_manager", daemon=True)
@ -106,6 +107,7 @@ class AudioProcessor(util.Process):
self.transcription_model_runner = None self.transcription_model_runner = None
def run(self) -> None: def run(self) -> None:
self.pre_run_setup()
audio_threads: list[AudioEventMaintainer] = [] audio_threads: list[AudioEventMaintainer] = []
threading.current_thread().name = "process:audio_manager" threading.current_thread().name = "process:audio_manager"
@ -147,7 +149,7 @@ class AudioEventMaintainer(threading.Thread):
self, self,
camera: CameraConfig, camera: CameraConfig,
config: FrigateConfig, config: FrigateConfig,
camera_metrics: DictProxy[str, CameraMetrics], camera_metrics: DictProxy,
audio_transcription_model_runner: AudioTranscriptionModelRunner | None, audio_transcription_model_runner: AudioTranscriptionModelRunner | None,
stop_event: threading.Event, stop_event: threading.Event,
) -> None: ) -> None:

View File

@ -1,12 +1,12 @@
# In log.py # In log.py
import atexit import atexit
import logging import logging
import multiprocessing as mp
import os import os
import sys import sys
import threading import threading
from collections import deque from collections import deque
from logging.handlers import QueueHandler, QueueListener from logging.handlers import QueueHandler, QueueListener
from multiprocessing.managers import SyncManager
from queue import Queue from queue import Queue
from typing import Deque, Optional from typing import Deque, Optional
@ -35,12 +35,10 @@ LOG_HANDLER.addFilter(
log_listener: Optional[QueueListener] = None log_listener: Optional[QueueListener] = None
log_queue: Optional[Queue] = None log_queue: Optional[Queue] = None
manager = None
def setup_logging() -> None: def setup_logging(manager: SyncManager) -> None:
global log_listener, log_queue, manager global log_listener, log_queue
manager = mp.Manager()
log_queue = manager.Queue() log_queue = manager.Queue()
log_listener = QueueListener(log_queue, LOG_HANDLER, respect_handler_level=True) log_listener = QueueListener(log_queue, LOG_HANDLER, respect_handler_level=True)
@ -57,13 +55,10 @@ def setup_logging() -> None:
def _stop_logging() -> None: def _stop_logging() -> None:
global log_listener, manager global log_listener
if log_listener is not None: if log_listener is not None:
log_listener.stop() log_listener.stop()
log_listener = None log_listener = None
if manager is not None:
manager.shutdown()
manager = None
# When a multiprocessing.Process exits, python tries to flush stdout and stderr. However, if the # When a multiprocessing.Process exits, python tries to flush stdout and stderr. However, if the

View File

@ -1,16 +1,11 @@
import datetime import datetime
import logging import logging
import multiprocessing as mp
import os
import queue import queue
import signal
import threading
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from multiprocessing import Queue, Value from multiprocessing import Queue, Value
from multiprocessing.synchronize import Event as MpEvent from multiprocessing.synchronize import Event as MpEvent
import numpy as np import numpy as np
from setproctitle import setproctitle
import frigate.util as util import frigate.util as util
from frigate.comms.object_detector_signaler import ( from frigate.comms.object_detector_signaler import (
@ -25,7 +20,6 @@ from frigate.detectors.detector_config import (
) )
from frigate.util.builtin import EventsPerSecond, load_labels from frigate.util.builtin import EventsPerSecond, load_labels
from frigate.util.image import SharedMemoryFrameManager, UntrackedSharedMemory from frigate.util.image import SharedMemoryFrameManager, UntrackedSharedMemory
from frigate.util.services import listen
from .util import tensor_transform from .util import tensor_transform
@ -90,72 +84,75 @@ class LocalObjectDetector(ObjectDetector):
return self.detect_api.detect_raw(tensor_input=tensor_input) return self.detect_api.detect_raw(tensor_input=tensor_input)
def run_detector( class DetectorRunner(util.Process):
name: str, def __init__(
detection_queue: Queue, self,
cameras: list[str], name,
avg_speed: Value, detection_queue: Queue,
start: Value, cameras: list[str],
detector_config: BaseDetectorConfig, avg_speed: Value,
): start_time: Value,
threading.current_thread().name = f"detector:{name}" detector_config: BaseDetectorConfig,
logger = logging.getLogger(f"detector.{name}") ) -> None:
logger.info(f"Starting detection process: {os.getpid()}") super().__init__(name=name, daemon=True)
setproctitle(f"frigate.detector.{name}") self.detection_queue = detection_queue
listen() self.cameras = cameras
self.avg_speed = avg_speed
self.start_time = start_time
self.detector_config = detector_config
self.outputs: dict = {}
stop_event: MpEvent = mp.Event() def create_output_shm(self, name: str):
def receiveSignal(signalNumber, frame):
stop_event.set()
signal.signal(signal.SIGTERM, receiveSignal)
signal.signal(signal.SIGINT, receiveSignal)
def create_output_shm(name: str):
out_shm = UntrackedSharedMemory(name=f"out-{name}", create=False) out_shm = UntrackedSharedMemory(name=f"out-{name}", create=False)
out_np = np.ndarray((20, 6), dtype=np.float32, buffer=out_shm.buf) out_np = np.ndarray((20, 6), dtype=np.float32, buffer=out_shm.buf)
outputs[name] = {"shm": out_shm, "np": out_np} self.outputs[name] = {"shm": out_shm, "np": out_np}
frame_manager = SharedMemoryFrameManager() def run(self) -> None:
object_detector = LocalObjectDetector(detector_config=detector_config) self.pre_run_setup()
detector_publisher = ObjectDetectorPublisher()
outputs = {} frame_manager = SharedMemoryFrameManager()
for name in cameras: object_detector = LocalObjectDetector(detector_config=self.detector_config)
create_output_shm(name) detector_publisher = ObjectDetectorPublisher()
while not stop_event.is_set(): for name in self.cameras:
try: self.create_output_shm(name)
connection_id = detection_queue.get(timeout=1)
except queue.Empty:
continue
input_frame = frame_manager.get(
connection_id,
(1, detector_config.model.height, detector_config.model.width, 3),
)
if input_frame is None: while not self.stop_event.is_set():
logger.warning(f"Failed to get frame {connection_id} from SHM") try:
continue connection_id = self.detection_queue.get(timeout=1)
except queue.Empty:
continue
input_frame = frame_manager.get(
connection_id,
(
1,
self.detector_config.model.height,
self.detector_config.model.width,
3,
),
)
# detect and send the output if input_frame is None:
start.value = datetime.datetime.now().timestamp() logger.warning(f"Failed to get frame {connection_id} from SHM")
detections = object_detector.detect_raw(input_frame) continue
duration = datetime.datetime.now().timestamp() - start.value
frame_manager.close(connection_id)
if connection_id not in outputs: # detect and send the output
create_output_shm(connection_id) self.start_time.value = datetime.datetime.now().timestamp()
detections = object_detector.detect_raw(input_frame)
duration = datetime.datetime.now().timestamp() - self.start_time.value
frame_manager.close(connection_id)
outputs[connection_id]["np"][:] = detections[:] if connection_id not in self.outputs:
detector_publisher.publish(connection_id) self.create_output_shm(connection_id)
start.value = 0.0
avg_speed.value = (avg_speed.value * 9 + duration) / 10 self.outputs[connection_id]["np"][:] = detections[:]
detector_publisher.publish(connection_id)
self.start_time.value = 0.0
detector_publisher.stop() self.avg_speed.value = (self.avg_speed.value * 9 + duration) / 10
logger.info("Exited detection process...")
detector_publisher.stop()
logger.info("Exited detection process...")
class ObjectDetectProcess: class ObjectDetectProcess:
@ -192,19 +189,14 @@ class ObjectDetectProcess:
self.detection_start.value = 0.0 self.detection_start.value = 0.0
if (self.detect_process is not None) and self.detect_process.is_alive(): if (self.detect_process is not None) and self.detect_process.is_alive():
self.stop() self.stop()
self.detect_process = util.Process( self.detect_process = DetectorRunner(
target=run_detector, f"detector:{self.name}",
name=f"detector:{self.name}", self.detection_queue,
args=( self.cameras,
self.name, self.avg_inference_speed,
self.detection_queue, self.detection_start,
self.cameras, self.detector_config,
self.avg_inference_speed,
self.detection_start,
self.detector_config,
),
) )
self.detect_process.daemon = True
self.detect_process.start() self.detect_process.start()

View File

@ -54,7 +54,7 @@ def get_latest_version(config: FrigateConfig) -> str:
def stats_init( def stats_init(
config: FrigateConfig, config: FrigateConfig,
camera_metrics: DictProxy[str, CameraMetrics], camera_metrics: DictProxy,
embeddings_metrics: DataProcessorMetrics | None, embeddings_metrics: DataProcessorMetrics | None,
detectors: dict[str, ObjectDetectProcess], detectors: dict[str, ObjectDetectProcess],
processes: dict[str, int], processes: dict[str, int],

View File

@ -1,5 +1,8 @@
import faulthandler
import logging import logging
import multiprocessing as mp import multiprocessing as mp
import signal
import sys
import threading import threading
from logging.handlers import QueueHandler from logging.handlers import QueueHandler
from typing import Callable, Optional from typing import Callable, Optional
@ -45,7 +48,23 @@ class Process(BaseProcess):
return self.__dict__["stop_event"] return self.__dict__["stop_event"]
def before_start(self) -> None: def before_start(self) -> None:
self.__log_queue = frigate.log.log_queue self.__log_queue = frigate.log.log_listener.queue
def pre_run_setup(self) -> None:
faulthandler.enable()
def receiveSignal(signalNumber, frame):
# Get the stop_event through the dict to bypass lazy initialization.
stop_event = self.__dict__.get("stop_event")
if stop_event is not None:
# Someone is monitoring stop_event. We should set it.
stop_event.set()
else:
# Nobody is monitoring stop_event. We should raise SystemExit.
sys.exit()
signal.signal(signal.SIGTERM, receiveSignal)
signal.signal(signal.SIGINT, receiveSignal)
self.logger = logging.getLogger(self.name) self.logger = logging.getLogger(self.name)
logging.basicConfig(handlers=[], force=True) logging.basicConfig(handlers=[], force=True)
logging.getLogger().addHandler(QueueHandler(self.__log_queue)) logging.getLogger().addHandler(QueueHandler(self.__log_queue))