add decorator for redirecting c/c++ level output to logger

This commit is contained in:
Josh Hawkins 2025-06-25 07:40:47 -05:00
parent 4701bedde4
commit 8acbe0bde9
6 changed files with 121 additions and 17 deletions

View File

@ -11,7 +11,7 @@ from scipy import stats
from frigate.config import FrigateConfig
from frigate.const import MODEL_CACHE_DIR
from frigate.embeddings.onnx.face_embedding import ArcfaceEmbedding, FaceNetEmbedding
from frigate.log import redirect_stdout_to_logger
from frigate.log import redirect_output_to_logger
logger = logging.getLogger(__name__)
@ -38,7 +38,7 @@ class FaceRecognizer(ABC):
def classify(self, face_image: np.ndarray) -> tuple[str, float] | None:
pass
@redirect_stdout_to_logger(logger, logging.DEBUG)
@redirect_output_to_logger(logger, logging.DEBUG)
def init_landmark_detector(self) -> None:
landmark_model = os.path.join(MODEL_CACHE_DIR, "facedet/landmarkdet.yaml")

View File

@ -5,7 +5,7 @@ from typing_extensions import Literal
from frigate.detectors.detection_api import DetectionApi
from frigate.detectors.detector_config import BaseDetectorConfig
from frigate.log import redirect_stdout_to_logger
from frigate.log import redirect_output_to_logger
from ..detector_utils import tflite_detect_raw, tflite_init
@ -28,7 +28,7 @@ class CpuDetectorConfig(BaseDetectorConfig):
class CpuTfl(DetectionApi):
type_key = DETECTOR_KEY
@redirect_stdout_to_logger(logger, logging.DEBUG)
@redirect_output_to_logger(logger, logging.DEBUG)
def __init__(self, detector_config: CpuDetectorConfig):
interpreter = Interpreter(
model_path=detector_config.model.path,

View File

@ -6,7 +6,7 @@ import os
import numpy as np
from frigate.const import MODEL_CACHE_DIR
from frigate.log import redirect_stdout_to_logger
from frigate.log import redirect_output_to_logger
from frigate.util.downloader import ModelDownloader
from .base_embedding import BaseEmbedding
@ -54,7 +54,7 @@ class FaceNetEmbedding(BaseEmbedding):
self._load_model_and_utils()
logger.debug(f"models are already downloaded for {self.model_name}")
@redirect_stdout_to_logger(logger, logging.DEBUG)
@redirect_output_to_logger(logger, logging.DEBUG)
def _load_model_and_utils(self):
if self.runner is None:
if self.downloader:

View File

@ -37,7 +37,7 @@ from frigate.data_processing.real_time.audio_transcription import (
AudioTranscriptionRealTimeProcessor,
)
from frigate.ffmpeg_presets import parse_preset_input
from frigate.log import LogPipe, redirect_stdout_to_logger
from frigate.log import LogPipe, redirect_output_to_logger
from frigate.object_detection.base import load_labels
from frigate.util.builtin import get_ffmpeg_arg_list
from frigate.util.process import FrigateProcess
@ -426,7 +426,7 @@ class AudioEventMaintainer(threading.Thread):
class AudioTfl:
@redirect_stdout_to_logger(logger, logging.DEBUG)
@redirect_output_to_logger(logger, logging.DEBUG)
def __init__(self, stop_event: threading.Event, num_threads=2):
self.stop_event = stop_event
self.num_threads = num_threads

View File

@ -6,11 +6,12 @@ import os
import sys
import threading
from collections import deque
from contextlib import contextmanager
from enum import Enum
from functools import wraps
from logging.handlers import QueueHandler, QueueListener
from multiprocessing.managers import SyncManager
from queue import Queue
from queue import Empty, Queue
from typing import Any, Callable, Deque, Optional
from frigate.util.builtin import clean_camera_user_pass
@ -189,26 +190,128 @@ class LogRedirect(io.StringIO):
self.flush()
def redirect_stdout_to_logger(logger: logging.Logger, level: int) -> Any:
@contextmanager
def redirect_fd_to_queue(queue):
"""Redirect file descriptor 1 (stdout) to a pipe and capture output in a queue."""
stdout_fd = os.dup(1)
read_fd, write_fd = os.pipe()
os.dup2(write_fd, 1)
os.close(write_fd)
stop_event = threading.Event()
def reader():
"""Read from pipe and put lines in queue until stop_event is set."""
try:
with os.fdopen(read_fd, "r") as pipe:
while not stop_event.is_set():
line = pipe.readline()
if not line: # EOF
break
queue.put(line.strip())
except OSError as e:
queue.put(f"Reader error: {e}")
finally:
if not stop_event.is_set():
stop_event.set()
reader_thread = threading.Thread(target=reader, daemon=False)
reader_thread.start()
try:
yield
finally:
os.dup2(stdout_fd, 1)
os.close(stdout_fd)
stop_event.set()
reader_thread.join(timeout=1.0)
try:
os.close(read_fd)
except OSError:
pass
def redirect_output_to_logger(logger: logging.Logger, level: int) -> Any:
"""Decorator to redirect both Python sys.stdout/stderr and C-level stdout to logger."""
def decorator(func: Callable):
@wraps(func)
def wrapper(*args, **kwargs):
current_log_pipe = LogRedirect(logger, logging.ERROR)
queue = Queue()
log_redirect = LogRedirect(logger, level)
old_stdout = sys.stdout
old_stderr = sys.stderr
sys.stdout = current_log_pipe
sys.stderr = current_log_pipe
sys.stdout = log_redirect
sys.stderr = log_redirect
try:
result = func(*args, **kwargs)
# Redirect C-level stdout
with redirect_fd_to_queue(queue):
result = func(*args, **kwargs)
finally:
# Restore Python stdout/stderr
sys.stdout = old_stdout
sys.stderr = old_stderr
current_log_pipe.flush()
log_redirect.flush()
# Log C-level output from queue
while True:
try:
logger.log(level, queue.get_nowait())
except Empty:
break
return result
return wrapper
return decorator
def suppress_os_output(func: Callable) -> Callable:
"""
A decorator that suppresses all output (stdout and stderr)
at the operating system file descriptor level for the decorated function.
This is useful for silencing noisy C/C++ libraries.
Note: This is a Unix-specific solution using os.dup2 and os.pipe.
It temporarily redirects file descriptors 1 (stdout) and 2 (stderr)
to a non-read pipe, effectively discarding their output.
"""
@wraps(func)
def wrapper(*args: tuple, **kwargs: dict[str, Any]) -> Any:
# Save the original file descriptors for stdout (1) and stderr (2)
original_stdout_fd = os.dup(1)
original_stderr_fd = os.dup(2)
# Create dummy pipes. We only need the write ends to redirect to.
# The data written to these pipes will be discarded as nothing
# will read from the read ends.
devnull_read_fd, devnull_write_fd = os.pipe()
try:
# Redirect stdout (FD 1) and stderr (FD 2) to the write end of our dummy pipe
os.dup2(devnull_write_fd, 1) # Redirect stdout to devnull pipe
os.dup2(devnull_write_fd, 2) # Redirect stderr to devnull pipe
# Execute the original function
result = func(*args, **kwargs)
finally:
# Restore original stdout and stderr file descriptors (1 and 2)
# This is crucial to ensure normal printing resumes after the decorated function.
os.dup2(original_stdout_fd, 1)
os.dup2(original_stderr_fd, 2)
# Close all duplicated and pipe file descriptors to prevent resource leaks.
# It's important to close the read end of the dummy pipe too,
# as nothing is explicitly reading from it.
os.close(original_stdout_fd)
os.close(original_stderr_fd)
os.close(devnull_read_fd)
os.close(devnull_write_fd)
return result
return wrapper

View File

@ -9,7 +9,7 @@ import numpy as np
from frigate.comms.embeddings_updater import EmbeddingsRequestEnum, EmbeddingsRequestor
from frigate.comms.inter_process import InterProcessRequestor
from frigate.const import CLIPS_DIR, MODEL_CACHE_DIR, UPDATE_MODEL_STATE
from frigate.log import redirect_stdout_to_logger
from frigate.log import redirect_output_to_logger
from frigate.types import ModelStatusTypesEnum
from frigate.util.process import FrigateProcess
@ -39,7 +39,7 @@ def __generate_representative_dataset_factory(dataset_dir: str):
return generate_representative_dataset
@redirect_stdout_to_logger(logger, logging.DEBUG)
@redirect_output_to_logger(logger, logging.DEBUG)
def __train_classification_model(model_name: str) -> bool:
"""Train a classification model."""
@ -138,6 +138,7 @@ def kickoff_model_training(
# tensorflow will free CPU / GPU memory
# upon training completion
training_process = FrigateProcess(
None,
target=__train_classification_model,
name=f"model_training:{model_name}",
args=(model_name,),