Start processes dynamically when needed

This commit is contained in:
Nicolas Mowen 2025-06-10 16:15:27 -06:00
parent f5b3f9ae87
commit c7b4d7791b
2 changed files with 68 additions and 56 deletions

View File

@ -9,6 +9,7 @@ from multiprocessing.synchronize import Event as MpEvent
from frigate.camera import CameraMetrics, PTZMetrics
from frigate.config import FrigateConfig
from frigate.config.camera import CameraConfig
from frigate.config.updater import GlobalConfigUpdateEnum, GlobalConfigUpdateSubscriber
from frigate.const import SHM_FRAMES_VAR
from frigate.models import Regions
@ -49,6 +50,7 @@ class CameraMaintainer(threading.Thread):
GlobalConfigUpdateEnum.remove_camera,
]
)
self.shm_count = self.__calculate_shm_frame_count()
def __init_historical_regions(self) -> None:
# delete region grids for removed or renamed cameras
@ -77,13 +79,23 @@ class CameraMaintainer(threading.Thread):
cam_total_frame_size = 0.0
for camera in self.config.cameras.values():
if camera.enabled and camera.detect.width and camera.detect.height:
if (
camera.enabled_in_config
and camera.detect.width
and camera.detect.height
):
cam_total_frame_size += round(
(camera.detect.width * camera.detect.height * 1.5 + 270480)
/ 1048576,
1,
)
# leave room for 2 cameras that are added dynamically, if a user wants to add more cameras they may need to increase the SHM size and restart after adding them.
cam_total_frame_size += 2 * round(
(camera.detect.width * camera.detect.height * 1.5 + 270480) / 1048576,
1,
)
if cam_total_frame_size == 0.0:
return 0
@ -103,55 +115,52 @@ class CameraMaintainer(threading.Thread):
return shm_frame_count
def __start_camera_processors(self) -> None:
for name, config in self.config.cameras.items():
if not self.config.cameras[name].enabled_in_config:
logger.info(f"Camera processor not started for disabled camera {name}")
continue
def __start_camera_processor(self, name: str, config: CameraConfig) -> None:
config = self.config.cameras[name]
if not config.enabled_in_config:
logger.info(f"Camera processor not started for disabled camera {name}")
return
camera_process = FrigateProcess(
target=track_camera,
name=f"camera_processor:{name}",
args=(
name,
config,
self.config.model,
self.config.model.merged_labelmap,
self.detection_queue,
self.detection_out_events[name],
self.detected_frames_queue,
self.camera_metrics[name],
self.ptz_metrics[name],
self.region_grids[name],
),
daemon=True,
)
self.camera_metrics[name].process = camera_process
camera_process.start()
logger.info(f"Camera processor started for {name}: {camera_process.pid}")
camera_process = FrigateProcess(
target=track_camera,
name=f"camera_processor:{name}",
args=(
config.name,
config,
self.config.model,
self.config.model.merged_labelmap,
self.detection_queue,
self.detection_out_events[name],
self.detected_frames_queue,
self.camera_metrics[name],
self.ptz_metrics[name],
self.region_grids[name],
),
daemon=True,
)
self.camera_metrics[config.name].process = camera_process
camera_process.start()
logger.info(f"Camera processor started for {config.name}: {camera_process.pid}")
def __start_camera_capture(self) -> None:
shm_frame_count = self.__calculate_shm_frame_count()
def __start_camera_capture(self, name: str, config: CameraConfig) -> None:
if not self.config.cameras[name].enabled_in_config:
logger.info(f"Capture process not started for disabled camera {name}")
return
for name, config in self.config.cameras.items():
if not self.config.cameras[name].enabled_in_config:
logger.info(f"Capture process not started for disabled camera {name}")
continue
# pre-create shms
for i in range(self.shm_count):
frame_size = config.frame_shape_yuv[0] * config.frame_shape_yuv[1]
self.frame_manager.create(f"{config.name}_frame{i}", frame_size)
# pre-create shms
for i in range(shm_frame_count):
frame_size = config.frame_shape_yuv[0] * config.frame_shape_yuv[1]
self.frame_manager.create(f"{config.name}_frame{i}", frame_size)
capture_process = FrigateProcess(
target=capture_camera,
name=f"camera_capture:{name}",
args=(config, shm_frame_count, self.camera_metrics[name]),
)
capture_process.daemon = True
self.camera_metrics[name].capture_process = capture_process
capture_process.start()
logger.info(f"Capture process started for {name}: {capture_process.pid}")
capture_process = FrigateProcess(
target=capture_camera,
name=f"camera_capture:{name}",
args=(config, self.shm_count, self.camera_metrics[name]),
)
capture_process.daemon = True
self.camera_metrics[name].capture_process = capture_process
capture_process.start()
logger.info(f"Capture process started for {name}: {capture_process.pid}")
def __stop_camera_capture_process(self, camera: str) -> None:
capture_process = self.camera_metrics[camera].capture_process
@ -174,19 +183,21 @@ class CameraMaintainer(threading.Thread):
self.__init_historical_regions()
# start camera processes
self.__start_camera_processors()
self.__start_camera_capture()
for camera, config in self.config.cameras.items():
self.__start_camera_processor(camera, config)
self.__start_camera_capture(camera, config)
while not self.stop_event.wait(1):
updates = self.update_subscriber.check_for_updates()
for update_type, update_payload in updates:
for update_type, update_config in updates:
if update_type == GlobalConfigUpdateEnum.add_camera:
pass
self.__start_camera_processor(update_config.name, update_config)
self.__start_camera_capture(update_config.name, update_config)
elif update_type == GlobalConfigUpdateEnum.debug_camera:
pass
elif update_type == GlobalConfigUpdateEnum.remove_camera:
camera = update_payload.get("camera")
camera = update_config.name
if camera:
self.__stop_camera_capture_process(camera)

View File

@ -5,6 +5,7 @@ from enum import Enum
from typing import Any
from frigate.comms.config_updater import ConfigPublisher, ConfigSubscriber
from frigate.config.camera import CameraConfig
class GlobalConfigUpdateEnum(str, Enum):
@ -46,21 +47,21 @@ class GlobalConfigUpdateSubscriber:
exact=False,
)
def check_for_updates(self) -> list[tuple[GlobalConfigUpdateEnum, Any]]:
updated_topics: list[tuple[GlobalConfigUpdateEnum, Any]] = []
def check_for_updates(self) -> list[tuple[GlobalConfigUpdateEnum, CameraConfig]]:
updated_topics: list[tuple[GlobalConfigUpdateEnum, CameraConfig]] = []
# get all updates available
while True:
update_topic, payload = self.subscriber.check_for_update()
update_topic, update_config = self.subscriber.check_for_update()
if update_topic is None or payload is None:
if update_topic is None or update_config is None:
break
_, raw_type = update_topic.split("/")
update_type = GlobalConfigUpdateEnum[raw_type]
if update_type in self.topics:
updated_topics.append((update_type, payload))
updated_topics.append((update_type, update_config))
return updated_topics