From 486793e454583e18aa68a5c469934cf03500afd8 Mon Sep 17 00:00:00 2001 From: George Tsiamasiotis Date: Tue, 1 Oct 2024 08:34:21 +0300 Subject: [PATCH] Merge ReviewSegmentMaintainer --- frigate/app.py | 9 +- frigate/review/maintainer.py | 355 +++++++++++++++++------------------ frigate/review/review.py | 36 ---- 3 files changed, 176 insertions(+), 224 deletions(-) delete mode 100644 frigate/review/review.py diff --git a/frigate/app.py b/frigate/app.py index 5ef469e39..587f592ae 100644 --- a/frigate/app.py +++ b/frigate/app.py @@ -60,7 +60,7 @@ from frigate.ptz.onvif import OnvifController from frigate.record.cleanup import RecordingCleanup from frigate.record.export import migrate_exports from frigate.record.record import ManageRecordings -from frigate.review.review import manage_review_segments +from frigate.review.maintainer import ReviewSegmentMaintainer from frigate.stats.emitter import StatsEmitter from frigate.stats.util import stats_init from frigate.storage import StorageMaintainer @@ -182,12 +182,7 @@ class FrigateApp: logger.info(f"Recording process started: {recording_process.pid}") def init_review_segment_manager(self) -> None: - review_segment_process = util.Process( - target=manage_review_segments, - name="review_segment_manager", - args=(self.config,), - daemon=True, - ) + review_segment_process = ReviewSegmentMaintainer(self.config) self.review_segment_process = review_segment_process review_segment_process.start() self.processes["review_segment"] = review_segment_process.pid or 0 diff --git a/frigate/review/maintainer.py b/frigate/review/maintainer.py index b92fac99d..eb7ac6829 100644 --- a/frigate/review/maintainer.py +++ b/frigate/review/maintainer.py @@ -1,20 +1,18 @@ """Maintain review segments in db.""" import json -import logging import os import random import string import sys -import threading from enum import Enum -from multiprocessing.synchronize import Event as MpEvent from pathlib import Path from typing import Optional import cv2 import numpy as np +from frigate import util from frigate.comms.config_updater import ConfigSubscriber from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum from frigate.comms.inter_process import InterProcessRequestor @@ -29,9 +27,6 @@ from frigate.models import ReviewSegment from frigate.object_processing import TrackedObject from frigate.util.image import SharedMemoryFrameManager, calculate_16_9_crop -logger = logging.getLogger(__name__) - - THUMB_HEIGHT = 180 THUMB_WIDTH = 320 @@ -142,13 +137,14 @@ class PendingReviewSegment: }.copy() -class ReviewSegmentMaintainer(threading.Thread): +class ReviewSegmentMaintainer(util.Process): """Maintain review segments.""" - def __init__(self, config: FrigateConfig, stop_event: MpEvent): - threading.Thread.__init__(self) - self.name = "review_segment_maintainer" + def __init__(self, config: FrigateConfig): + super().__init__(name="frigate.review_segment_manager", daemon=True) self.config = config + + def run(self): self.active_review_segments: dict[str, Optional[PendingReviewSegment]] = {} self.frame_manager = SharedMemoryFrameManager() @@ -163,11 +159,174 @@ class ReviewSegmentMaintainer(threading.Thread): # ensure dirs Path(os.path.join(CLIPS_DIR, "review")).mkdir(exist_ok=True) - self.stop_event = stop_event - # clear ongoing review segments from last instance self.requestor.send_data(CLEAR_ONGOING_REVIEW_SEGMENTS, "") + while not self.stop_event.is_set(): + # check if there is an updated config + while True: + ( + updated_topic, + updated_record_config, + ) = self.config_subscriber.check_for_update() + + if not updated_topic: + break + + camera_name = updated_topic.rpartition("/")[-1] + self.config.cameras[camera_name].record = updated_record_config + + (topic, data) = self.detection_subscriber.check_for_update(timeout=1) + + if not topic: + continue + + if topic == DetectionTypeEnum.video: + ( + camera, + frame_time, + current_tracked_objects, + motion_boxes, + regions, + ) = data + elif topic == DetectionTypeEnum.audio: + ( + camera, + frame_time, + dBFS, + audio_detections, + ) = data + elif topic == DetectionTypeEnum.api: + ( + camera, + frame_time, + manual_info, + ) = data + + if camera not in self.indefinite_events: + self.indefinite_events[camera] = {} + + current_segment = self.active_review_segments.get(camera) + + if not self.config.cameras[camera].record.enabled: + if current_segment: + self.update_existing_segment(current_segment, frame_time, []) + + continue + + if current_segment is not None: + if topic == DetectionTypeEnum.video: + self.update_existing_segment( + current_segment, + frame_time, + current_tracked_objects, + ) + elif topic == DetectionTypeEnum.audio and len(audio_detections) > 0: + camera_config = self.config.cameras[camera] + + if frame_time > current_segment.last_update: + current_segment.last_update = frame_time + + for audio in audio_detections: + if audio in camera_config.review.alerts.labels: + current_segment.audio.add(audio) + current_segment.severity = SeverityEnum.alert + elif ( + camera_config.review.detections.labels is None + or audio in camera_config.review.detections.labels + ): + current_segment.audio.add(audio) + elif topic == DetectionTypeEnum.api: + if manual_info["state"] == ManualEventState.complete: + current_segment.detections[manual_info["event_id"]] = ( + manual_info["label"] + ) + current_segment.severity = SeverityEnum.alert + current_segment.last_update = manual_info["end_time"] + elif manual_info["state"] == ManualEventState.start: + self.indefinite_events[camera][manual_info["event_id"]] = ( + manual_info["label"] + ) + current_segment.detections[manual_info["event_id"]] = ( + manual_info["label"] + ) + current_segment.severity = SeverityEnum.alert + + # temporarily make it so this event can not end + current_segment.last_update = sys.maxsize + elif manual_info["state"] == ManualEventState.end: + event_id = manual_info["event_id"] + + if event_id in self.indefinite_events[camera]: + self.indefinite_events[camera].pop(event_id) + current_segment.last_update = manual_info["end_time"] + else: + self.logger.error( + f"Event with ID {event_id} has a set duration and can not be ended manually." + ) + else: + if topic == DetectionTypeEnum.video: + self.check_if_new_segment( + camera, + frame_time, + current_tracked_objects, + ) + elif topic == DetectionTypeEnum.audio and len(audio_detections) > 0: + severity = None + + camera_config = self.config.cameras[camera] + detections = set() + + for audio in audio_detections: + if audio in camera_config.review.alerts.labels: + detections.add(audio) + severity = SeverityEnum.alert + elif ( + camera_config.review.detections.labels is None + or audio in camera_config.review.detections.labels + ): + detections.add(audio) + + if not severity: + severity = SeverityEnum.detection + + if severity: + self.active_review_segments[camera] = PendingReviewSegment( + camera, + frame_time, + severity, + {}, + set(), + [], + detections, + ) + elif topic == DetectionTypeEnum.api: + self.active_review_segments[camera] = PendingReviewSegment( + camera, + frame_time, + SeverityEnum.alert, + {manual_info["event_id"]: manual_info["label"]}, + set(), + [], + set(), + ) + + if manual_info["state"] == ManualEventState.start: + self.indefinite_events[camera][manual_info["event_id"]] = ( + manual_info["label"] + ) + # temporarily make it so this event can not end + self.active_review_segments[camera].last_update = sys.maxsize + elif manual_info["state"] == ManualEventState.complete: + self.active_review_segments[camera].last_update = manual_info[ + "end_time" + ] + + self.config_subscriber.stop() + self.requestor.stop() + self.detection_subscriber.stop() + self.logger.info("Exiting review maintainer...") + def new_segment( self, segment: PendingReviewSegment, @@ -293,7 +452,7 @@ class ReviewSegmentMaintainer(threading.Thread): ) if yuv_frame is None: - logger.debug(f"Failed to get frame {frame_id} from SHM") + self.logger.debug(f"Failed to get frame {frame_id} from SHM") return self.update_segment( @@ -311,7 +470,7 @@ class ReviewSegmentMaintainer(threading.Thread): ) if yuv_frame is None: - logger.debug(f"Failed to get frame {frame_id} from SHM") + self.logger.debug(f"Failed to get frame {frame_id} from SHM") return segment.save_full_frame(camera_config, yuv_frame) @@ -412,7 +571,7 @@ class ReviewSegmentMaintainer(threading.Thread): ) if yuv_frame is None: - logger.debug(f"Failed to get frame {frame_id} from SHM") + self.logger.debug(f"Failed to get frame {frame_id} from SHM") return self.active_review_segments[camera].update_frame( @@ -423,172 +582,6 @@ class ReviewSegmentMaintainer(threading.Thread): except FileNotFoundError: return - def run(self) -> None: - while not self.stop_event.is_set(): - # check if there is an updated config - while True: - ( - updated_topic, - updated_record_config, - ) = self.config_subscriber.check_for_update() - - if not updated_topic: - break - - camera_name = updated_topic.rpartition("/")[-1] - self.config.cameras[camera_name].record = updated_record_config - - (topic, data) = self.detection_subscriber.check_for_update(timeout=1) - - if not topic: - continue - - if topic == DetectionTypeEnum.video: - ( - camera, - frame_time, - current_tracked_objects, - motion_boxes, - regions, - ) = data - elif topic == DetectionTypeEnum.audio: - ( - camera, - frame_time, - dBFS, - audio_detections, - ) = data - elif topic == DetectionTypeEnum.api: - ( - camera, - frame_time, - manual_info, - ) = data - - if camera not in self.indefinite_events: - self.indefinite_events[camera] = {} - - current_segment = self.active_review_segments.get(camera) - - if not self.config.cameras[camera].record.enabled: - if current_segment: - self.update_existing_segment(current_segment, frame_time, []) - - continue - - if current_segment is not None: - if topic == DetectionTypeEnum.video: - self.update_existing_segment( - current_segment, - frame_time, - current_tracked_objects, - ) - elif topic == DetectionTypeEnum.audio and len(audio_detections) > 0: - camera_config = self.config.cameras[camera] - - if frame_time > current_segment.last_update: - current_segment.last_update = frame_time - - for audio in audio_detections: - if audio in camera_config.review.alerts.labels: - current_segment.audio.add(audio) - current_segment.severity = SeverityEnum.alert - elif ( - camera_config.review.detections.labels is None - or audio in camera_config.review.detections.labels - ): - current_segment.audio.add(audio) - elif topic == DetectionTypeEnum.api: - if manual_info["state"] == ManualEventState.complete: - current_segment.detections[manual_info["event_id"]] = ( - manual_info["label"] - ) - current_segment.severity = SeverityEnum.alert - current_segment.last_update = manual_info["end_time"] - elif manual_info["state"] == ManualEventState.start: - self.indefinite_events[camera][manual_info["event_id"]] = ( - manual_info["label"] - ) - current_segment.detections[manual_info["event_id"]] = ( - manual_info["label"] - ) - current_segment.severity = SeverityEnum.alert - - # temporarily make it so this event can not end - current_segment.last_update = sys.maxsize - elif manual_info["state"] == ManualEventState.end: - event_id = manual_info["event_id"] - - if event_id in self.indefinite_events[camera]: - self.indefinite_events[camera].pop(event_id) - current_segment.last_update = manual_info["end_time"] - else: - logger.error( - f"Event with ID {event_id} has a set duration and can not be ended manually." - ) - else: - if topic == DetectionTypeEnum.video: - self.check_if_new_segment( - camera, - frame_time, - current_tracked_objects, - ) - elif topic == DetectionTypeEnum.audio and len(audio_detections) > 0: - severity = None - - camera_config = self.config.cameras[camera] - detections = set() - - for audio in audio_detections: - if audio in camera_config.review.alerts.labels: - detections.add(audio) - severity = SeverityEnum.alert - elif ( - camera_config.review.detections.labels is None - or audio in camera_config.review.detections.labels - ): - detections.add(audio) - - if not severity: - severity = SeverityEnum.detection - - if severity: - self.active_review_segments[camera] = PendingReviewSegment( - camera, - frame_time, - severity, - {}, - set(), - [], - detections, - ) - elif topic == DetectionTypeEnum.api: - self.active_review_segments[camera] = PendingReviewSegment( - camera, - frame_time, - SeverityEnum.alert, - {manual_info["event_id"]: manual_info["label"]}, - set(), - [], - set(), - ) - - if manual_info["state"] == ManualEventState.start: - self.indefinite_events[camera][manual_info["event_id"]] = ( - manual_info["label"] - ) - # temporarily make it so this event can not end - self.active_review_segments[camera].last_update = sys.maxsize - elif manual_info["state"] == ManualEventState.complete: - self.active_review_segments[camera].last_update = manual_info[ - "end_time" - ] - - self.config_subscriber.stop() - self.requestor.stop() - self.detection_subscriber.stop() - logger.info("Exiting review maintainer...") - def get_active_objects( frame_time: float, camera_config: CameraConfig, all_objects: list[TrackedObject] diff --git a/frigate/review/review.py b/frigate/review/review.py deleted file mode 100644 index dafa6c802..000000000 --- a/frigate/review/review.py +++ /dev/null @@ -1,36 +0,0 @@ -"""Run recording maintainer and cleanup.""" - -import logging -import multiprocessing as mp -import signal -import threading -from types import FrameType -from typing import Optional - -from setproctitle import setproctitle - -from frigate.config import FrigateConfig -from frigate.review.maintainer import ReviewSegmentMaintainer -from frigate.util.services import listen - -logger = logging.getLogger(__name__) - - -def manage_review_segments(config: FrigateConfig) -> None: - stop_event = mp.Event() - - def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None: - stop_event.set() - - signal.signal(signal.SIGTERM, receiveSignal) - signal.signal(signal.SIGINT, receiveSignal) - - threading.current_thread().name = "process:review_segment_manager" - setproctitle("frigate.review_segment_manager") - listen() - - maintainer = ReviewSegmentMaintainer( - config, - stop_event, - ) - maintainer.start()