fix typing

This commit is contained in:
Josh Hawkins 2025-06-25 08:10:51 -05:00
parent 8acbe0bde9
commit 219d3f540b

View File

@ -12,7 +12,7 @@ from functools import wraps
from logging.handlers import QueueHandler, QueueListener from logging.handlers import QueueHandler, QueueListener
from multiprocessing.managers import SyncManager from multiprocessing.managers import SyncManager
from queue import Empty, Queue from queue import Empty, Queue
from typing import Any, Callable, Deque, Optional from typing import Any, Callable, Deque, Generator, Optional
from frigate.util.builtin import clean_camera_user_pass from frigate.util.builtin import clean_camera_user_pass
@ -151,47 +151,49 @@ class LogRedirect(io.StringIO):
super().__init__() super().__init__()
self.logger = logger_instance self.logger = logger_instance
self.log_level = level self.log_level = level
self.buffer = [] self._line_buffer: list[str] = []
def write(self, s): def write(self, s: Any) -> int:
if not isinstance(s, str): if not isinstance(s, str):
s = str(s) s = str(s)
self.buffer.append(s) self._line_buffer.append(s)
# Process output line by line if a newline is present # Process output line by line if a newline is present
if "\n" in s: if "\n" in s:
full_output = "".join(self.buffer) full_output = "".join(self._line_buffer)
lines = full_output.splitlines(keepends=True) lines = full_output.splitlines(keepends=True)
self.buffer = [] self._line_buffer = []
for line in lines: for line in lines:
if line.endswith("\n"): if line.endswith("\n"):
self._process_line(line.rstrip("\n")) self._process_line(line.rstrip("\n"))
else: else:
self.buffer.append(line) self._line_buffer.append(line)
def _process_line(self, line): return len(s)
def _process_line(self, line: str) -> None:
self.logger.log(self.log_level, line) self.logger.log(self.log_level, line)
def flush(self): def flush(self) -> None:
if self.buffer: if self._line_buffer:
full_output = "".join(self.buffer) full_output = "".join(self._line_buffer)
self.buffer = [] self._line_buffer = []
if full_output: # Only process if there's content if full_output: # Only process if there's content
self._process_line(full_output) self._process_line(full_output)
def __enter__(self): def __enter__(self) -> "LogRedirect":
"""Context manager entry point.""" """Context manager entry point."""
return self return self
def __exit__(self, exc_type, exc_val, exc_tb): def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
"""Context manager exit point. Ensures buffered content is flushed.""" """Context manager exit point. Ensures buffered content is flushed."""
self.flush() self.flush()
@contextmanager @contextmanager
def redirect_fd_to_queue(queue): def redirect_fd_to_queue(queue: Queue[str]) -> Generator[None, None, None]:
"""Redirect file descriptor 1 (stdout) to a pipe and capture output in a queue.""" """Redirect file descriptor 1 (stdout) to a pipe and capture output in a queue."""
stdout_fd = os.dup(1) stdout_fd = os.dup(1)
read_fd, write_fd = os.pipe() read_fd, write_fd = os.pipe()
@ -200,7 +202,7 @@ def redirect_fd_to_queue(queue):
stop_event = threading.Event() stop_event = threading.Event()
def reader(): def reader() -> None:
"""Read from pipe and put lines in queue until stop_event is set.""" """Read from pipe and put lines in queue until stop_event is set."""
try: try:
with os.fdopen(read_fd, "r") as pipe: with os.fdopen(read_fd, "r") as pipe:
@ -234,10 +236,10 @@ def redirect_fd_to_queue(queue):
def redirect_output_to_logger(logger: logging.Logger, level: int) -> Any: def redirect_output_to_logger(logger: logging.Logger, level: int) -> Any:
"""Decorator to redirect both Python sys.stdout/stderr and C-level stdout to logger.""" """Decorator to redirect both Python sys.stdout/stderr and C-level stdout to logger."""
def decorator(func: Callable): def decorator(func: Callable) -> Callable:
@wraps(func) @wraps(func)
def wrapper(*args, **kwargs): def wrapper(*args: Any, **kwargs: Any) -> Any:
queue = Queue() queue: Queue[str] = Queue()
log_redirect = LogRedirect(logger, level) log_redirect = LogRedirect(logger, level)
old_stdout = sys.stdout old_stdout = sys.stdout