Use forkserver

This commit is contained in:
Nicolas Mowen 2025-06-12 08:07:19 -06:00
parent acac743dea
commit 4dd5e9c66e
6 changed files with 55 additions and 42 deletions

View File

@ -1,5 +1,6 @@
import argparse
import faulthandler
import multiprocessing as mp
import signal
import sys
import threading
@ -112,4 +113,5 @@ def main() -> None:
if __name__ == "__main__":
mp.set_start_method("forkserver", force=True)
main()

View File

@ -86,8 +86,11 @@ class FrigateApp:
self.detection_shms: list[mp.shared_memory.SharedMemory] = []
self.log_queue: Queue = mp.Queue()
self.camera_metrics: dict[str, CameraMetrics] = {}
self.metrics_manager = mp.Manager()
self.embeddings_metrics: DataProcessorMetrics | None = (
DataProcessorMetrics(list(config.classification.custom.keys()))
DataProcessorMetrics(
self.metrics_manager, list(config.classification.custom.keys())
)
if (
config.semantic_search.enabled
or config.lpr.enabled
@ -653,6 +656,7 @@ class FrigateApp:
self.stats_emitter.join()
self.frigate_watchdog.join()
self.db.stop()
self.metrics_manager.shutdown()
# Save embeddings stats to disk
if self.embeddings:

View File

@ -1,6 +1,7 @@
"""Create and maintain camera processes / management."""
import logging
import multiprocessing as mp
import os
import shutil
import threading
@ -119,7 +120,7 @@ class CameraMaintainer(threading.Thread):
def __start_camera_processor(
self, name: str, config: CameraConfig, runtime: bool = False
) -> None:
) -> mp.Process:
if not config.enabled_in_config:
logger.info(f"Camera processor not started for disabled camera {name}")
return
@ -167,13 +168,13 @@ class CameraMaintainer(threading.Thread):
),
daemon=True,
)
self.camera_metrics[config.name].process = camera_process
camera_process.start()
logger.info(f"Camera processor started for {config.name}: {camera_process.pid}")
return camera_process
def __start_camera_capture(
self, name: str, config: CameraConfig, runtime: bool = False
) -> None:
) -> mp.Process:
if not config.enabled_in_config:
logger.info(f"Capture process not started for disabled camera {name}")
return
@ -190,9 +191,9 @@ class CameraMaintainer(threading.Thread):
args=(config, count, self.camera_metrics[name]),
)
capture_process.daemon = True
self.camera_metrics[name].capture_process = capture_process
capture_process.start()
logger.info(f"Capture process started for {name}: {capture_process.pid}")
return capture_process
def __stop_camera_capture_process(self, camera: str) -> None:
capture_process = self.camera_metrics[camera].capture_process
@ -225,16 +226,20 @@ class CameraMaintainer(threading.Thread):
for update_type, updated_cameras in updates.items():
if update_type == CameraConfigUpdateEnum.add.name:
for camera in updated_cameras:
self.__start_camera_processor(
camera_process = self.__start_camera_processor(
camera,
self.update_subscriber.camera_configs[camera],
runtime=True,
)
self.__start_camera_capture(
capture_process = self.__start_camera_capture(
camera,
self.update_subscriber.camera_configs[camera],
runtime=True,
)
self.camera_metrics[config.name].process = camera_process
self.camera_metrics[
config.name
].capture_process = capture_process
elif update_type == CameraConfigUpdateEnum.remove.name:
self.__stop_camera_capture_process(camera)
self.__stop_camera_process(camera)

View File

@ -1,7 +1,7 @@
"""Embeddings types."""
import multiprocessing as mp
from enum import Enum
from multiprocessing.managers import SyncManager
from multiprocessing.sharedctypes import Synchronized
import sherpa_onnx
@ -20,25 +20,27 @@ class DataProcessorMetrics:
alpr_pps: Synchronized
yolov9_lpr_speed: Synchronized
yolov9_lpr_pps: Synchronized
classification_speeds: dict[str, Synchronized] = {}
classification_cps: dict[str, Synchronized] = {}
classification_speeds: dict[str, Synchronized]
classification_cps: dict[str, Synchronized]
def __init__(self, custom_classification_models: list[str]):
self.image_embeddings_speed = mp.Value("d", 0.0)
self.image_embeddings_eps = mp.Value("d", 0.0)
self.text_embeddings_speed = mp.Value("d", 0.0)
self.text_embeddings_eps = mp.Value("d", 0.0)
self.face_rec_speed = mp.Value("d", 0.0)
self.face_rec_fps = mp.Value("d", 0.0)
self.alpr_speed = mp.Value("d", 0.0)
self.alpr_pps = mp.Value("d", 0.0)
self.yolov9_lpr_speed = mp.Value("d", 0.0)
self.yolov9_lpr_pps = mp.Value("d", 0.0)
def __init__(self, manager: SyncManager, custom_classification_models: list[str]):
self.image_embeddings_speed = manager.Value("d", 0.0)
self.image_embeddings_eps = manager.Value("d", 0.0)
self.text_embeddings_speed = manager.Value("d", 0.0)
self.text_embeddings_eps = manager.Value("d", 0.0)
self.face_rec_speed = manager.Value("d", 0.0)
self.face_rec_fps = manager.Value("d", 0.0)
self.alpr_speed = manager.Value("d", 0.0)
self.alpr_pps = manager.Value("d", 0.0)
self.yolov9_lpr_speed = manager.Value("d", 0.0)
self.yolov9_lpr_pps = manager.Value("d", 0.0)
self.classification_speeds = manager.dict()
self.classification_cps = manager.dict()
if custom_classification_models:
for key in custom_classification_models:
self.classification_speeds[key] = mp.Value("d", 0.0)
self.classification_cps[key] = mp.Value("d", 0.0)
self.classification_speeds[key] = manager.Value("d", 0.0)
self.classification_cps[key] = manager.Value("d", 0.0)
class DataProcessorModelRunner:

View File

@ -19,7 +19,7 @@ from frigate.config import FrigateConfig
from frigate.const import CONFIG_DIR, FACE_DIR
from frigate.data_processing.types import DataProcessorMetrics
from frigate.db.sqlitevecq import SqliteVecQueueDatabase
from frigate.models import Event, Recordings
from frigate.models import Event
from frigate.util.builtin import serialize
from frigate.util.classification import kickoff_model_training
from frigate.util.services import listen
@ -43,22 +43,7 @@ def manage_embeddings(config: FrigateConfig, metrics: DataProcessorMetrics) -> N
setproctitle("frigate.embeddings_manager")
listen()
# Configure Frigate DB
db = SqliteVecQueueDatabase(
config.database.path,
pragmas={
"auto_vacuum": "FULL", # Does not defragment database
"cache_size": -512 * 1000, # 512MB of cache
"synchronous": "NORMAL", # Safe when using WAL https://www.sqlite.org/pragma.html#pragma_synchronous
},
timeout=max(60, 10 * len([c for c in config.cameras.values() if c.enabled])),
load_vec_extension=True,
)
models = [Event, Recordings]
db.bind(models)
maintainer = EmbeddingMaintainer(
db,
config,
metrics,
stop_event,

View File

@ -12,7 +12,6 @@ from typing import Any, Optional
import cv2
import numpy as np
from peewee import DoesNotExist
from playhouse.sqliteq import SqliteQueueDatabase
from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum
from frigate.comms.embeddings_updater import EmbeddingsRequestEnum, EmbeddingsResponder
@ -58,9 +57,10 @@ from frigate.data_processing.real_time.license_plate import (
LicensePlateRealTimeProcessor,
)
from frigate.data_processing.types import DataProcessorMetrics, PostProcessDataEnum
from frigate.db.sqlitevecq import SqliteVecQueueDatabase
from frigate.events.types import EventTypeEnum, RegenerateDescriptionEnum
from frigate.genai import get_genai_client
from frigate.models import Event
from frigate.models import Event, Recordings
from frigate.types import TrackedObjectUpdateTypesEnum
from frigate.util.builtin import serialize
from frigate.util.image import (
@ -82,7 +82,6 @@ class EmbeddingMaintainer(threading.Thread):
def __init__(
self,
db: SqliteQueueDatabase,
config: FrigateConfig,
metrics: DataProcessorMetrics,
stop_event: MpEvent,
@ -97,6 +96,22 @@ class EmbeddingMaintainer(threading.Thread):
[CameraConfigUpdateEnum.add, CameraConfigUpdateEnum.remove],
)
# Configure Frigate DB
db = SqliteVecQueueDatabase(
config.database.path,
pragmas={
"auto_vacuum": "FULL", # Does not defragment database
"cache_size": -512 * 1000, # 512MB of cache
"synchronous": "NORMAL", # Safe when using WAL https://www.sqlite.org/pragma.html#pragma_synchronous
},
timeout=max(
60, 10 * len([c for c in config.cameras.values() if c.enabled])
),
load_vec_extension=True,
)
models = [Event, Recordings]
db.bind(models)
if config.semantic_search.enabled:
self.embeddings = Embeddings(config, db, metrics)