diff --git a/frigate/__main__.py b/frigate/__main__.py index 4c732be80..747a7f8bd 100644 --- a/frigate/__main__.py +++ b/frigate/__main__.py @@ -1,5 +1,6 @@ import argparse import faulthandler +import multiprocessing as mp import signal import sys import threading @@ -112,4 +113,5 @@ def main() -> None: if __name__ == "__main__": + mp.set_start_method("forkserver", force=True) main() diff --git a/frigate/app.py b/frigate/app.py index 6a10375ef..dc189780a 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -86,8 +86,11 @@ class FrigateApp: self.detection_shms: list[mp.shared_memory.SharedMemory] = [] self.log_queue: Queue = mp.Queue() self.camera_metrics: dict[str, CameraMetrics] = {} + self.metrics_manager = mp.Manager() self.embeddings_metrics: DataProcessorMetrics | None = ( - DataProcessorMetrics(list(config.classification.custom.keys())) + DataProcessorMetrics( + self.metrics_manager, list(config.classification.custom.keys()) + ) if ( config.semantic_search.enabled or config.lpr.enabled @@ -653,6 +656,7 @@ class FrigateApp: self.stats_emitter.join() self.frigate_watchdog.join() self.db.stop() + self.metrics_manager.shutdown() # Save embeddings stats to disk if self.embeddings: diff --git a/frigate/camera/maintainer.py b/frigate/camera/maintainer.py index f5bf657d0..09aad0e9a 100644 --- a/frigate/camera/maintainer.py +++ b/frigate/camera/maintainer.py @@ -1,6 +1,7 @@ """Create and maintain camera processes / management.""" import logging +import multiprocessing as mp import os import shutil import threading @@ -119,7 +120,7 @@ class CameraMaintainer(threading.Thread): def __start_camera_processor( self, name: str, config: CameraConfig, runtime: bool = False - ) -> None: + ) -> mp.Process: if not config.enabled_in_config: logger.info(f"Camera processor not started for disabled camera {name}") return @@ -167,13 +168,13 @@ class CameraMaintainer(threading.Thread): ), daemon=True, ) - self.camera_metrics[config.name].process = camera_process camera_process.start() logger.info(f"Camera processor started for {config.name}: {camera_process.pid}") + return camera_process def __start_camera_capture( self, name: str, config: CameraConfig, runtime: bool = False - ) -> None: + ) -> mp.Process: if not config.enabled_in_config: logger.info(f"Capture process not started for disabled camera {name}") return @@ -190,9 +191,9 @@ class CameraMaintainer(threading.Thread): args=(config, count, self.camera_metrics[name]), ) capture_process.daemon = True - self.camera_metrics[name].capture_process = capture_process capture_process.start() logger.info(f"Capture process started for {name}: {capture_process.pid}") + return capture_process def __stop_camera_capture_process(self, camera: str) -> None: capture_process = self.camera_metrics[camera].capture_process @@ -225,16 +226,20 @@ class CameraMaintainer(threading.Thread): for update_type, updated_cameras in updates.items(): if update_type == CameraConfigUpdateEnum.add.name: for camera in updated_cameras: - self.__start_camera_processor( + camera_process = self.__start_camera_processor( camera, self.update_subscriber.camera_configs[camera], runtime=True, ) - self.__start_camera_capture( + capture_process = self.__start_camera_capture( camera, self.update_subscriber.camera_configs[camera], runtime=True, ) + self.camera_metrics[config.name].process = camera_process + self.camera_metrics[ + config.name + ].capture_process = capture_process elif update_type == CameraConfigUpdateEnum.remove.name: self.__stop_camera_capture_process(camera) self.__stop_camera_process(camera) diff --git a/frigate/data_processing/types.py b/frigate/data_processing/types.py index 50f1ed561..d18a1175a 100644 --- a/frigate/data_processing/types.py +++ b/frigate/data_processing/types.py @@ -1,7 +1,7 @@ """Embeddings types.""" -import multiprocessing as mp from enum import Enum +from multiprocessing.managers import SyncManager from multiprocessing.sharedctypes import Synchronized import sherpa_onnx @@ -20,25 +20,27 @@ class DataProcessorMetrics: alpr_pps: Synchronized yolov9_lpr_speed: Synchronized yolov9_lpr_pps: Synchronized - classification_speeds: dict[str, Synchronized] = {} - classification_cps: dict[str, Synchronized] = {} + classification_speeds: dict[str, Synchronized] + classification_cps: dict[str, Synchronized] - def __init__(self, custom_classification_models: list[str]): - self.image_embeddings_speed = mp.Value("d", 0.0) - self.image_embeddings_eps = mp.Value("d", 0.0) - self.text_embeddings_speed = mp.Value("d", 0.0) - self.text_embeddings_eps = mp.Value("d", 0.0) - self.face_rec_speed = mp.Value("d", 0.0) - self.face_rec_fps = mp.Value("d", 0.0) - self.alpr_speed = mp.Value("d", 0.0) - self.alpr_pps = mp.Value("d", 0.0) - self.yolov9_lpr_speed = mp.Value("d", 0.0) - self.yolov9_lpr_pps = mp.Value("d", 0.0) + def __init__(self, manager: SyncManager, custom_classification_models: list[str]): + self.image_embeddings_speed = manager.Value("d", 0.0) + self.image_embeddings_eps = manager.Value("d", 0.0) + self.text_embeddings_speed = manager.Value("d", 0.0) + self.text_embeddings_eps = manager.Value("d", 0.0) + self.face_rec_speed = manager.Value("d", 0.0) + self.face_rec_fps = manager.Value("d", 0.0) + self.alpr_speed = manager.Value("d", 0.0) + self.alpr_pps = manager.Value("d", 0.0) + self.yolov9_lpr_speed = manager.Value("d", 0.0) + self.yolov9_lpr_pps = manager.Value("d", 0.0) + self.classification_speeds = manager.dict() + self.classification_cps = manager.dict() if custom_classification_models: for key in custom_classification_models: - self.classification_speeds[key] = mp.Value("d", 0.0) - self.classification_cps[key] = mp.Value("d", 0.0) + self.classification_speeds[key] = manager.Value("d", 0.0) + self.classification_cps[key] = manager.Value("d", 0.0) class DataProcessorModelRunner: diff --git a/frigate/embeddings/__init__.py b/frigate/embeddings/__init__.py index 80832369c..432ecbe02 100644 --- a/frigate/embeddings/__init__.py +++ b/frigate/embeddings/__init__.py @@ -19,7 +19,7 @@ from frigate.config import FrigateConfig from frigate.const import CONFIG_DIR, FACE_DIR from frigate.data_processing.types import DataProcessorMetrics from frigate.db.sqlitevecq import SqliteVecQueueDatabase -from frigate.models import Event, Recordings +from frigate.models import Event from frigate.util.builtin import serialize from frigate.util.classification import kickoff_model_training from frigate.util.services import listen @@ -43,22 +43,7 @@ def manage_embeddings(config: FrigateConfig, metrics: DataProcessorMetrics) -> N setproctitle("frigate.embeddings_manager") listen() - # Configure Frigate DB - db = SqliteVecQueueDatabase( - config.database.path, - pragmas={ - "auto_vacuum": "FULL", # Does not defragment database - "cache_size": -512 * 1000, # 512MB of cache - "synchronous": "NORMAL", # Safe when using WAL https://www.sqlite.org/pragma.html#pragma_synchronous - }, - timeout=max(60, 10 * len([c for c in config.cameras.values() if c.enabled])), - load_vec_extension=True, - ) - models = [Event, Recordings] - db.bind(models) - maintainer = EmbeddingMaintainer( - db, config, metrics, stop_event, diff --git a/frigate/embeddings/maintainer.py b/frigate/embeddings/maintainer.py index 0980a8ae8..4608834cf 100644 --- a/frigate/embeddings/maintainer.py +++ b/frigate/embeddings/maintainer.py @@ -12,7 +12,6 @@ from typing import Any, Optional import cv2 import numpy as np from peewee import DoesNotExist -from playhouse.sqliteq import SqliteQueueDatabase from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum from frigate.comms.embeddings_updater import EmbeddingsRequestEnum, EmbeddingsResponder @@ -58,9 +57,10 @@ from frigate.data_processing.real_time.license_plate import ( LicensePlateRealTimeProcessor, ) from frigate.data_processing.types import DataProcessorMetrics, PostProcessDataEnum +from frigate.db.sqlitevecq import SqliteVecQueueDatabase from frigate.events.types import EventTypeEnum, RegenerateDescriptionEnum from frigate.genai import get_genai_client -from frigate.models import Event +from frigate.models import Event, Recordings from frigate.types import TrackedObjectUpdateTypesEnum from frigate.util.builtin import serialize from frigate.util.image import ( @@ -82,7 +82,6 @@ class EmbeddingMaintainer(threading.Thread): def __init__( self, - db: SqliteQueueDatabase, config: FrigateConfig, metrics: DataProcessorMetrics, stop_event: MpEvent, @@ -97,6 +96,22 @@ class EmbeddingMaintainer(threading.Thread): [CameraConfigUpdateEnum.add, CameraConfigUpdateEnum.remove], ) + # Configure Frigate DB + db = SqliteVecQueueDatabase( + config.database.path, + pragmas={ + "auto_vacuum": "FULL", # Does not defragment database + "cache_size": -512 * 1000, # 512MB of cache + "synchronous": "NORMAL", # Safe when using WAL https://www.sqlite.org/pragma.html#pragma_synchronous + }, + timeout=max( + 60, 10 * len([c for c in config.cameras.values() if c.enabled]) + ), + load_vec_extension=True, + ) + models = [Event, Recordings] + db.bind(models) + if config.semantic_search.enabled: self.embeddings = Embeddings(config, db, metrics)