frigate/frigate/camera/maintainer.py
Josh Hawkins 95956a690b
Some checks are pending
CI / AMD64 Build (push) Waiting to run
CI / ARM Build (push) Waiting to run
CI / Jetson Jetpack 6 (push) Waiting to run
CI / AMD64 Extra Build (push) Blocked by required conditions
CI / ARM Extra Build (push) Blocked by required conditions
CI / Synaptics Build (push) Blocked by required conditions
CI / Assemble and push default build (push) Blocked by required conditions
Debug replay (#22212)
* debug replay implementation

* fix masks after dev rebase

* fix squash merge issues

* fix

* fix

* fix

* no need to write debug replay camera to config

* camera and filter button and dropdown

* add filters

* add ability to edit motion and object config for debug replay

* add debug draw overlay to debug replay

* add guard to prevent crash when camera is no longer in camera_states

* fix overflow due to radix absolutely positioned elements

* increase number of messages

* ensure deep_merge replaces existing list values when override is true

* add back button

* add debug replay to explore and review menus

* clean up

* clean up

* update instructions to prevent exposing exception info

* fix typing

* refactor output logic

* refactor with helper function

* move init to function for consistency
2026-03-04 10:07:34 -06:00

273 lines
10 KiB
Python

"""Create and maintain camera processes / management."""
import logging
import multiprocessing as mp
import threading
from multiprocessing import Queue
from multiprocessing.managers import DictProxy, SyncManager
from multiprocessing.synchronize import Event as MpEvent
from frigate.camera import CameraMetrics, PTZMetrics
from frigate.config import FrigateConfig
from frigate.config.camera import CameraConfig
from frigate.config.camera.updater import (
CameraConfigUpdateEnum,
CameraConfigUpdateSubscriber,
)
from frigate.models import Regions
from frigate.util.builtin import empty_and_close_queue
from frigate.util.image import SharedMemoryFrameManager, UntrackedSharedMemory
from frigate.util.object import get_camera_regions_grid
from frigate.util.services import calculate_shm_requirements
from frigate.video import CameraCapture, CameraTracker
logger = logging.getLogger(__name__)
class CameraMaintainer(threading.Thread):
def __init__(
self,
config: FrigateConfig,
detection_queue: Queue,
detected_frames_queue: Queue,
camera_metrics: DictProxy,
ptz_metrics: dict[str, PTZMetrics],
stop_event: MpEvent,
metrics_manager: SyncManager,
):
super().__init__(name="camera_processor")
self.config = config
self.detection_queue = detection_queue
self.detected_frames_queue = detected_frames_queue
self.stop_event = stop_event
self.camera_metrics = camera_metrics
self.ptz_metrics = ptz_metrics
self.frame_manager = SharedMemoryFrameManager()
self.region_grids: dict[str, list[list[dict[str, int]]]] = {}
self.update_subscriber = CameraConfigUpdateSubscriber(
self.config,
{},
[
CameraConfigUpdateEnum.add,
CameraConfigUpdateEnum.remove,
],
)
self.shm_count = self.__calculate_shm_frame_count()
self.camera_processes: dict[str, mp.Process] = {}
self.capture_processes: dict[str, mp.Process] = {}
self.camera_stop_events: dict[str, MpEvent] = {}
self.metrics_manager = metrics_manager
def __ensure_camera_stop_event(self, camera: str) -> MpEvent:
camera_stop_event = self.camera_stop_events.get(camera)
if camera_stop_event is None:
camera_stop_event = mp.Event()
self.camera_stop_events[camera] = camera_stop_event
else:
camera_stop_event.clear()
return camera_stop_event
def __init_historical_regions(self) -> None:
# delete region grids for removed or renamed cameras
cameras = list(self.config.cameras.keys())
Regions.delete().where(~(Regions.camera << cameras)).execute()
# create or update region grids for each camera
for camera in self.config.cameras.values():
assert camera.name is not None
self.region_grids[camera.name] = get_camera_regions_grid(
camera.name,
camera.detect,
max(self.config.model.width, self.config.model.height),
)
def __calculate_shm_frame_count(self) -> int:
shm_stats = calculate_shm_requirements(self.config)
if not shm_stats:
# /dev/shm not available
return 0
logger.debug(
f"Calculated total camera size {shm_stats['available']} / "
f"{shm_stats['camera_frame_size']} :: {shm_stats['shm_frame_count']} "
f"frames for each camera in SHM"
)
if shm_stats["shm_frame_count"] < 20:
logger.warning(
f"The current SHM size of {shm_stats['total']}MB is too small, "
f"recommend increasing it to at least {shm_stats['min_shm']}MB."
)
return shm_stats["shm_frame_count"]
def __start_camera_processor(
self, name: str, config: CameraConfig, runtime: bool = False
) -> None:
if not config.enabled_in_config:
logger.info(f"Camera processor not started for disabled camera {name}")
return
camera_stop_event = self.__ensure_camera_stop_event(name)
if runtime:
self.camera_metrics[name] = CameraMetrics(self.metrics_manager)
self.ptz_metrics[name] = PTZMetrics(autotracker_enabled=False)
self.region_grids[name] = get_camera_regions_grid(
name,
config.detect,
max(self.config.model.width, self.config.model.height),
)
try:
largest_frame = max(
[
det.model.height * det.model.width * 3
if det.model is not None
else 320
for det in self.config.detectors.values()
]
)
UntrackedSharedMemory(name=f"out-{name}", create=True, size=20 * 6 * 4)
UntrackedSharedMemory(
name=name,
create=True,
size=largest_frame,
)
except FileExistsError:
pass
camera_process = CameraTracker(
config,
self.config.model,
self.config.model.merged_labelmap,
self.detection_queue,
self.detected_frames_queue,
self.camera_metrics[name],
self.ptz_metrics[name],
self.region_grids[name],
camera_stop_event,
self.config.logger,
)
self.camera_processes[config.name] = camera_process
camera_process.start()
self.camera_metrics[config.name].process_pid.value = camera_process.pid
logger.info(f"Camera processor started for {config.name}: {camera_process.pid}")
def __start_camera_capture(
self, name: str, config: CameraConfig, runtime: bool = False
) -> None:
if not config.enabled_in_config:
logger.info(f"Capture process not started for disabled camera {name}")
return
camera_stop_event = self.__ensure_camera_stop_event(name)
# pre-create shms
count = 10 if runtime else self.shm_count
for i in range(count):
frame_size = config.frame_shape_yuv[0] * config.frame_shape_yuv[1]
self.frame_manager.create(f"{config.name}_frame{i}", frame_size)
capture_process = CameraCapture(
config,
count,
self.camera_metrics[name],
camera_stop_event,
self.config.logger,
)
capture_process.daemon = True
self.capture_processes[name] = capture_process
capture_process.start()
self.camera_metrics[name].capture_process_pid.value = capture_process.pid
logger.info(f"Capture process started for {name}: {capture_process.pid}")
def __stop_camera_capture_process(self, camera: str) -> None:
capture_process = self.capture_processes.get(camera)
if capture_process is not None:
logger.info(f"Waiting for capture process for {camera} to stop")
camera_stop_event = self.camera_stop_events.get(camera)
if camera_stop_event is not None:
camera_stop_event.set()
capture_process.join(timeout=10)
if capture_process.is_alive():
logger.warning(
f"Capture process for {camera} didn't exit, forcing termination"
)
capture_process.terminate()
capture_process.join()
def __stop_camera_process(self, camera: str) -> None:
camera_process = self.camera_processes.get(camera)
if camera_process is not None:
logger.info(f"Waiting for process for {camera} to stop")
camera_stop_event = self.camera_stop_events.get(camera)
if camera_stop_event is not None:
camera_stop_event.set()
camera_process.join(timeout=10)
if camera_process.is_alive():
logger.warning(f"Process for {camera} didn't exit, forcing termination")
camera_process.terminate()
camera_process.join()
logger.info(f"Closing frame queue for {camera}")
empty_and_close_queue(self.camera_metrics[camera].frame_queue)
def run(self):
self.__init_historical_regions()
# start camera processes
for camera, config in self.config.cameras.items():
self.__start_camera_processor(camera, config)
self.__start_camera_capture(camera, config)
while not self.stop_event.wait(1):
updates = self.update_subscriber.check_for_updates()
for update_type, updated_cameras in updates.items():
if update_type == CameraConfigUpdateEnum.add.name:
for camera in updated_cameras:
if (
camera in self.camera_processes
or camera in self.capture_processes
):
continue
self.__start_camera_processor(
camera,
self.update_subscriber.camera_configs[camera],
runtime=True,
)
self.__start_camera_capture(
camera,
self.update_subscriber.camera_configs[camera],
runtime=True,
)
elif update_type == CameraConfigUpdateEnum.remove.name:
for camera in updated_cameras:
self.__stop_camera_capture_process(camera)
self.__stop_camera_process(camera)
self.capture_processes.pop(camera, None)
self.camera_processes.pop(camera, None)
self.camera_stop_events.pop(camera, None)
self.region_grids.pop(camera, None)
self.camera_metrics.pop(camera, None)
self.ptz_metrics.pop(camera, None)
# ensure the capture processes are done
for camera in self.capture_processes.keys():
self.__stop_camera_capture_process(camera)
# ensure the camera processors are done
for camera in self.camera_processes.keys():
self.__stop_camera_process(camera)
self.update_subscriber.stop()
self.frame_manager.cleanup()