From 219d3f540bf27502923b25f16625f13db82ab57b Mon Sep 17 00:00:00 2001 From: Josh Hawkins <32435876+hawkeye217@users.noreply.github.com> Date: Wed, 25 Jun 2025 08:10:51 -0500 Subject: [PATCH] fix typing --- frigate/log.py | 40 +++++++++++++++++++++------------------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/frigate/log.py b/frigate/log.py index 79baf14b8..11f2da254 100644 --- a/frigate/log.py +++ b/frigate/log.py @@ -12,7 +12,7 @@ from functools import wraps from logging.handlers import QueueHandler, QueueListener from multiprocessing.managers import SyncManager from queue import Empty, Queue -from typing import Any, Callable, Deque, Optional +from typing import Any, Callable, Deque, Generator, Optional from frigate.util.builtin import clean_camera_user_pass @@ -151,47 +151,49 @@ class LogRedirect(io.StringIO): super().__init__() self.logger = logger_instance self.log_level = level - self.buffer = [] + self._line_buffer: list[str] = [] - def write(self, s): + def write(self, s: Any) -> int: if not isinstance(s, str): s = str(s) - self.buffer.append(s) + self._line_buffer.append(s) # Process output line by line if a newline is present if "\n" in s: - full_output = "".join(self.buffer) + full_output = "".join(self._line_buffer) lines = full_output.splitlines(keepends=True) - self.buffer = [] + self._line_buffer = [] for line in lines: if line.endswith("\n"): self._process_line(line.rstrip("\n")) else: - self.buffer.append(line) + self._line_buffer.append(line) - def _process_line(self, line): + return len(s) + + def _process_line(self, line: str) -> None: self.logger.log(self.log_level, line) - def flush(self): - if self.buffer: - full_output = "".join(self.buffer) - self.buffer = [] + def flush(self) -> None: + if self._line_buffer: + full_output = "".join(self._line_buffer) + self._line_buffer = [] if full_output: # Only process if there's content self._process_line(full_output) - def __enter__(self): + def __enter__(self) -> "LogRedirect": """Context manager entry point.""" return self - def __exit__(self, exc_type, exc_val, exc_tb): + def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: """Context manager exit point. Ensures buffered content is flushed.""" self.flush() @contextmanager -def redirect_fd_to_queue(queue): +def redirect_fd_to_queue(queue: Queue[str]) -> Generator[None, None, None]: """Redirect file descriptor 1 (stdout) to a pipe and capture output in a queue.""" stdout_fd = os.dup(1) read_fd, write_fd = os.pipe() @@ -200,7 +202,7 @@ def redirect_fd_to_queue(queue): stop_event = threading.Event() - def reader(): + def reader() -> None: """Read from pipe and put lines in queue until stop_event is set.""" try: with os.fdopen(read_fd, "r") as pipe: @@ -234,10 +236,10 @@ def redirect_fd_to_queue(queue): def redirect_output_to_logger(logger: logging.Logger, level: int) -> Any: """Decorator to redirect both Python sys.stdout/stderr and C-level stdout to logger.""" - def decorator(func: Callable): + def decorator(func: Callable) -> Callable: @wraps(func) - def wrapper(*args, **kwargs): - queue = Queue() + def wrapper(*args: Any, **kwargs: Any) -> Any: + queue: Queue[str] = Queue() log_redirect = LogRedirect(logger, level) old_stdout = sys.stdout