frigate/frigate/camera/__init__.py
Nicolas Mowen 0cf9d7d5b1
Some checks are pending
CI / AMD64 Build (push) Waiting to run
CI / ARM Build (push) Waiting to run
CI / Jetson Jetpack 6 (push) Waiting to run
CI / AMD64 Extra Build (push) Blocked by required conditions
CI / ARM Extra Build (push) Blocked by required conditions
CI / Synaptics Build (push) Blocked by required conditions
CI / Assemble and push default build (push) Blocked by required conditions
Inverse mypy and more mypy fixes (#22645)
* organize

* Improve storage mypy

* Cleanup timeline mypy

* Cleanup recording mypy

* Improve review mypy

* Add review mypy

* Inverse mypy

* Fix ffmpeg presets

* fix template thing

* Cleanup camera
2026-03-25 19:30:59 -05:00

74 lines
2.4 KiB
Python

import multiprocessing as mp
import queue
from multiprocessing.managers import SyncManager, ValueProxy
from multiprocessing.sharedctypes import Synchronized
from multiprocessing.synchronize import Event
class CameraMetrics:
camera_fps: ValueProxy[float]
detection_fps: ValueProxy[float]
detection_frame: ValueProxy[float]
process_fps: ValueProxy[float]
skipped_fps: ValueProxy[float]
read_start: ValueProxy[float]
audio_rms: ValueProxy[float]
audio_dBFS: ValueProxy[float]
frame_queue: queue.Queue
process_pid: ValueProxy[int]
capture_process_pid: ValueProxy[int]
ffmpeg_pid: ValueProxy[int]
reconnects_last_hour: ValueProxy[int]
stalls_last_hour: ValueProxy[int]
def __init__(self, manager: SyncManager):
self.camera_fps = manager.Value("d", 0)
self.detection_fps = manager.Value("d", 0)
self.detection_frame = manager.Value("d", 0)
self.process_fps = manager.Value("d", 0)
self.skipped_fps = manager.Value("d", 0)
self.read_start = manager.Value("d", 0)
self.audio_rms = manager.Value("d", 0)
self.audio_dBFS = manager.Value("d", 0)
self.frame_queue = manager.Queue(maxsize=2)
self.process_pid = manager.Value("i", 0)
self.capture_process_pid = manager.Value("i", 0)
self.ffmpeg_pid = manager.Value("i", 0)
self.reconnects_last_hour = manager.Value("i", 0)
self.stalls_last_hour = manager.Value("i", 0)
class PTZMetrics:
autotracker_enabled: Synchronized
start_time: Synchronized
stop_time: Synchronized
frame_time: Synchronized
zoom_level: Synchronized
max_zoom: Synchronized
min_zoom: Synchronized
tracking_active: Event
motor_stopped: Event
reset: Event
def __init__(self, *, autotracker_enabled: bool):
self.autotracker_enabled = mp.Value("i", autotracker_enabled) # type: ignore[assignment]
self.start_time = mp.Value("d", 0) # type: ignore[assignment]
self.stop_time = mp.Value("d", 0) # type: ignore[assignment]
self.frame_time = mp.Value("d", 0) # type: ignore[assignment]
self.zoom_level = mp.Value("d", 0) # type: ignore[assignment]
self.max_zoom = mp.Value("d", 0) # type: ignore[assignment]
self.min_zoom = mp.Value("d", 0) # type: ignore[assignment]
self.tracking_active = mp.Event()
self.motor_stopped = mp.Event()
self.reset = mp.Event()
self.motor_stopped.set()