frigate/frigate/util/process.py

154 lines
5.2 KiB
Python
Raw Normal View History

import atexit
import faulthandler
import logging
import multiprocessing as mp
import os
import pathlib
import subprocess
import threading
from logging.handlers import QueueHandler
from multiprocessing.synchronize import Event as MpEvent
Use Fork-Server As Spawn Method (#18682) * Set runtime * Use count correctly * Don't assume camera sizes * Use separate zmq proxy for object detection * Correct order * Use forkserver * Only store PID instead of entire process reference * Cleanup * Catch correct errors * Fix typing * Remove before_run from process util The before_run never actually ran because: You're right to suspect an issue with before_run not being called and a potential deadlock. The way you've implemented the run_wrapper using __getattribute__ for the run method of BaseProcess is a common pitfall in Python's multiprocessing, especially when combined with how multiprocessing.Process works internally. Here's a breakdown of why before_run isn't being called and why you might be experiencing a deadlock: The Problem: __getattribute__ and Process Serialization When you create a multiprocessing.Process object and call start(), the multiprocessing module needs to serialize the process object (or at least enough of it to re-create the process in the new interpreter). It then pickles this serialized object and sends it to the newly spawned process. The issue with your __getattribute__ implementation for run is that: run is retrieved during serialization: When multiprocessing tries to pickle your Process object to send to the new process, it will likely access the run attribute. This triggers your __getattribute__ wrapper, which then tries to bind run_wrapper to self. run_wrapper is bound to the parent process's self: The run_wrapper closure, when created in the parent process, captures the self (the Process instance) from the parent's memory space. Deserialization creates a new object: In the child process, a new Process object is created by deserializing the pickled data. However, the run_wrapper method that was pickled still holds a reference to the self from the parent process. This is a subtle but critical distinction. The child's run is not your wrapped run: When the child process starts, it internally calls its own run method. Because of the serialization and deserialization process, the run method that's ultimately executed in the child process is the original multiprocessing.Process.run or the Process.run if you had directly overridden it. Your __getattribute__ magic, which wraps run, isn't correctly applied to the Process object within the child's context. * Cleanup * Logging bugfix (#18465) * use mp Manager to handle logging queues A Python bug (https://github.com/python/cpython/issues/91555) was preventing logs from the embeddings maintainer process from printing. The bug is fixed in Python 3.14, but a viable workaround is to use the multiprocessing Manager, which better manages mp queues and causes the logging to work correctly. * consolidate * fix typing * Fix typing * Use global log queue * Move to using process for logging * Convert camera tracking to process * Add more processes * Finalize process * Cleanup * Cleanup typing * Formatting * Remove daemon --------- Co-authored-by: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com>
2025-06-12 21:12:34 +03:00
from typing import Callable, Optional
from setproctitle import setproctitle
import frigate.log
from frigate.config.logger import LoggerConfig
class BaseProcess(mp.Process):
def __init__(
self,
stop_event: MpEvent,
priority: int,
*,
name: Optional[str] = None,
target: Optional[Callable] = None,
args: tuple = (),
kwargs: dict = {},
daemon: Optional[bool] = None,
):
self.priority = priority
self.stop_event = stop_event
super().__init__(
name=name, target=target, args=args, kwargs=kwargs, daemon=daemon
)
def start(self, *args, **kwargs):
self.before_start()
super().start(*args, **kwargs)
self.after_start()
def before_start(self) -> None:
pass
def after_start(self) -> None:
pass
class FrigateProcess(BaseProcess):
logger: logging.Logger
def before_start(self) -> None:
self.__log_queue = frigate.log.log_listener.queue
self.__memray_tracker = None
def pre_run_setup(self, logConfig: LoggerConfig | None = None) -> None:
os.nice(self.priority)
setproctitle(self.name)
threading.current_thread().name = f"process:{self.name}"
faulthandler.enable()
# setup logging
self.logger = logging.getLogger(self.name)
logging.basicConfig(handlers=[], force=True)
logging.getLogger().addHandler(QueueHandler(self.__log_queue))
if logConfig:
frigate.log.apply_log_levels(
logConfig.default.value.upper(), logConfig.logs
)
self._setup_memray()
def _setup_memray(self) -> None:
"""Setup memray profiling if enabled via environment variable."""
memray_modules = os.environ.get("FRIGATE_MEMRAY_MODULES", "")
if not memray_modules:
return
# Extract module name from process name (e.g., "frigate.capture:camera" -> "frigate.capture")
process_name = self.name
module_name = (
process_name.split(":")[0] if ":" in process_name else process_name
)
enabled_modules = [m.strip() for m in memray_modules.split(",")]
if module_name not in enabled_modules and process_name not in enabled_modules:
return
try:
import memray
reports_dir = pathlib.Path("/config/memray_reports")
reports_dir.mkdir(parents=True, exist_ok=True)
safe_name = (
process_name.replace(":", "_").replace("/", "_").replace("\\", "_")
)
binary_file = reports_dir / f"{safe_name}.bin"
self.__memray_tracker = memray.Tracker(str(binary_file))
self.__memray_tracker.__enter__()
# Register cleanup handler to stop tracking and generate HTML report
# atexit runs on normal exits and most signal-based terminations (SIGTERM, SIGINT)
# For hard kills (SIGKILL) or segfaults, the binary file is preserved for manual generation
atexit.register(self._cleanup_memray, safe_name, binary_file)
self.logger.info(
f"Memray profiling enabled for module {module_name} (process: {self.name}). "
f"Binary file (updated continuously): {binary_file}. "
f"HTML report will be generated on exit: {reports_dir}/{safe_name}.html. "
f"If process crashes, manually generate with: memray flamegraph {binary_file}"
)
except Exception as e:
self.logger.error(f"Failed to setup memray profiling: {e}", exc_info=True)
def _cleanup_memray(self, safe_name: str, binary_file: pathlib.Path) -> None:
"""Stop memray tracking and generate HTML report."""
if self.__memray_tracker is None:
return
try:
self.__memray_tracker.__exit__(None, None, None)
self.__memray_tracker = None
reports_dir = pathlib.Path("/config/memray_reports")
html_file = reports_dir / f"{safe_name}.html"
result = subprocess.run(
["memray", "flamegraph", "--output", str(html_file), str(binary_file)],
capture_output=True,
text=True,
timeout=10,
)
if result.returncode == 0:
self.logger.info(f"Memray report generated: {html_file}")
else:
self.logger.error(
f"Failed to generate memray report: {result.stderr}. "
f"Binary file preserved at {binary_file} for manual generation."
)
# Keep the binary file for manual report generation if needed
# Users can run: memray flamegraph {binary_file}
except subprocess.TimeoutExpired:
self.logger.error("Memray report generation timed out")
except Exception as e:
self.logger.error(f"Failed to cleanup memray profiling: {e}", exc_info=True)