Merge ReviewSegmentMaintainer

This commit is contained in:
George Tsiamasiotis 2024-10-01 08:34:21 +03:00
parent 6db9a8c639
commit 486793e454
3 changed files with 176 additions and 224 deletions

View File

@ -60,7 +60,7 @@ from frigate.ptz.onvif import OnvifController
from frigate.record.cleanup import RecordingCleanup
from frigate.record.export import migrate_exports
from frigate.record.record import ManageRecordings
from frigate.review.review import manage_review_segments
from frigate.review.maintainer import ReviewSegmentMaintainer
from frigate.stats.emitter import StatsEmitter
from frigate.stats.util import stats_init
from frigate.storage import StorageMaintainer
@ -182,12 +182,7 @@ class FrigateApp:
logger.info(f"Recording process started: {recording_process.pid}")
def init_review_segment_manager(self) -> None:
review_segment_process = util.Process(
target=manage_review_segments,
name="review_segment_manager",
args=(self.config,),
daemon=True,
)
review_segment_process = ReviewSegmentMaintainer(self.config)
self.review_segment_process = review_segment_process
review_segment_process.start()
self.processes["review_segment"] = review_segment_process.pid or 0

View File

@ -1,20 +1,18 @@
"""Maintain review segments in db."""
import json
import logging
import os
import random
import string
import sys
import threading
from enum import Enum
from multiprocessing.synchronize import Event as MpEvent
from pathlib import Path
from typing import Optional
import cv2
import numpy as np
from frigate import util
from frigate.comms.config_updater import ConfigSubscriber
from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum
from frigate.comms.inter_process import InterProcessRequestor
@ -29,9 +27,6 @@ from frigate.models import ReviewSegment
from frigate.object_processing import TrackedObject
from frigate.util.image import SharedMemoryFrameManager, calculate_16_9_crop
logger = logging.getLogger(__name__)
THUMB_HEIGHT = 180
THUMB_WIDTH = 320
@ -142,13 +137,14 @@ class PendingReviewSegment:
}.copy()
class ReviewSegmentMaintainer(threading.Thread):
class ReviewSegmentMaintainer(util.Process):
"""Maintain review segments."""
def __init__(self, config: FrigateConfig, stop_event: MpEvent):
threading.Thread.__init__(self)
self.name = "review_segment_maintainer"
def __init__(self, config: FrigateConfig):
super().__init__(name="frigate.review_segment_manager", daemon=True)
self.config = config
def run(self):
self.active_review_segments: dict[str, Optional[PendingReviewSegment]] = {}
self.frame_manager = SharedMemoryFrameManager()
@ -163,11 +159,174 @@ class ReviewSegmentMaintainer(threading.Thread):
# ensure dirs
Path(os.path.join(CLIPS_DIR, "review")).mkdir(exist_ok=True)
self.stop_event = stop_event
# clear ongoing review segments from last instance
self.requestor.send_data(CLEAR_ONGOING_REVIEW_SEGMENTS, "")
while not self.stop_event.is_set():
# check if there is an updated config
while True:
(
updated_topic,
updated_record_config,
) = self.config_subscriber.check_for_update()
if not updated_topic:
break
camera_name = updated_topic.rpartition("/")[-1]
self.config.cameras[camera_name].record = updated_record_config
(topic, data) = self.detection_subscriber.check_for_update(timeout=1)
if not topic:
continue
if topic == DetectionTypeEnum.video:
(
camera,
frame_time,
current_tracked_objects,
motion_boxes,
regions,
) = data
elif topic == DetectionTypeEnum.audio:
(
camera,
frame_time,
dBFS,
audio_detections,
) = data
elif topic == DetectionTypeEnum.api:
(
camera,
frame_time,
manual_info,
) = data
if camera not in self.indefinite_events:
self.indefinite_events[camera] = {}
current_segment = self.active_review_segments.get(camera)
if not self.config.cameras[camera].record.enabled:
if current_segment:
self.update_existing_segment(current_segment, frame_time, [])
continue
if current_segment is not None:
if topic == DetectionTypeEnum.video:
self.update_existing_segment(
current_segment,
frame_time,
current_tracked_objects,
)
elif topic == DetectionTypeEnum.audio and len(audio_detections) > 0:
camera_config = self.config.cameras[camera]
if frame_time > current_segment.last_update:
current_segment.last_update = frame_time
for audio in audio_detections:
if audio in camera_config.review.alerts.labels:
current_segment.audio.add(audio)
current_segment.severity = SeverityEnum.alert
elif (
camera_config.review.detections.labels is None
or audio in camera_config.review.detections.labels
):
current_segment.audio.add(audio)
elif topic == DetectionTypeEnum.api:
if manual_info["state"] == ManualEventState.complete:
current_segment.detections[manual_info["event_id"]] = (
manual_info["label"]
)
current_segment.severity = SeverityEnum.alert
current_segment.last_update = manual_info["end_time"]
elif manual_info["state"] == ManualEventState.start:
self.indefinite_events[camera][manual_info["event_id"]] = (
manual_info["label"]
)
current_segment.detections[manual_info["event_id"]] = (
manual_info["label"]
)
current_segment.severity = SeverityEnum.alert
# temporarily make it so this event can not end
current_segment.last_update = sys.maxsize
elif manual_info["state"] == ManualEventState.end:
event_id = manual_info["event_id"]
if event_id in self.indefinite_events[camera]:
self.indefinite_events[camera].pop(event_id)
current_segment.last_update = manual_info["end_time"]
else:
self.logger.error(
f"Event with ID {event_id} has a set duration and can not be ended manually."
)
else:
if topic == DetectionTypeEnum.video:
self.check_if_new_segment(
camera,
frame_time,
current_tracked_objects,
)
elif topic == DetectionTypeEnum.audio and len(audio_detections) > 0:
severity = None
camera_config = self.config.cameras[camera]
detections = set()
for audio in audio_detections:
if audio in camera_config.review.alerts.labels:
detections.add(audio)
severity = SeverityEnum.alert
elif (
camera_config.review.detections.labels is None
or audio in camera_config.review.detections.labels
):
detections.add(audio)
if not severity:
severity = SeverityEnum.detection
if severity:
self.active_review_segments[camera] = PendingReviewSegment(
camera,
frame_time,
severity,
{},
set(),
[],
detections,
)
elif topic == DetectionTypeEnum.api:
self.active_review_segments[camera] = PendingReviewSegment(
camera,
frame_time,
SeverityEnum.alert,
{manual_info["event_id"]: manual_info["label"]},
set(),
[],
set(),
)
if manual_info["state"] == ManualEventState.start:
self.indefinite_events[camera][manual_info["event_id"]] = (
manual_info["label"]
)
# temporarily make it so this event can not end
self.active_review_segments[camera].last_update = sys.maxsize
elif manual_info["state"] == ManualEventState.complete:
self.active_review_segments[camera].last_update = manual_info[
"end_time"
]
self.config_subscriber.stop()
self.requestor.stop()
self.detection_subscriber.stop()
self.logger.info("Exiting review maintainer...")
def new_segment(
self,
segment: PendingReviewSegment,
@ -293,7 +452,7 @@ class ReviewSegmentMaintainer(threading.Thread):
)
if yuv_frame is None:
logger.debug(f"Failed to get frame {frame_id} from SHM")
self.logger.debug(f"Failed to get frame {frame_id} from SHM")
return
self.update_segment(
@ -311,7 +470,7 @@ class ReviewSegmentMaintainer(threading.Thread):
)
if yuv_frame is None:
logger.debug(f"Failed to get frame {frame_id} from SHM")
self.logger.debug(f"Failed to get frame {frame_id} from SHM")
return
segment.save_full_frame(camera_config, yuv_frame)
@ -412,7 +571,7 @@ class ReviewSegmentMaintainer(threading.Thread):
)
if yuv_frame is None:
logger.debug(f"Failed to get frame {frame_id} from SHM")
self.logger.debug(f"Failed to get frame {frame_id} from SHM")
return
self.active_review_segments[camera].update_frame(
@ -423,172 +582,6 @@ class ReviewSegmentMaintainer(threading.Thread):
except FileNotFoundError:
return
def run(self) -> None:
while not self.stop_event.is_set():
# check if there is an updated config
while True:
(
updated_topic,
updated_record_config,
) = self.config_subscriber.check_for_update()
if not updated_topic:
break
camera_name = updated_topic.rpartition("/")[-1]
self.config.cameras[camera_name].record = updated_record_config
(topic, data) = self.detection_subscriber.check_for_update(timeout=1)
if not topic:
continue
if topic == DetectionTypeEnum.video:
(
camera,
frame_time,
current_tracked_objects,
motion_boxes,
regions,
) = data
elif topic == DetectionTypeEnum.audio:
(
camera,
frame_time,
dBFS,
audio_detections,
) = data
elif topic == DetectionTypeEnum.api:
(
camera,
frame_time,
manual_info,
) = data
if camera not in self.indefinite_events:
self.indefinite_events[camera] = {}
current_segment = self.active_review_segments.get(camera)
if not self.config.cameras[camera].record.enabled:
if current_segment:
self.update_existing_segment(current_segment, frame_time, [])
continue
if current_segment is not None:
if topic == DetectionTypeEnum.video:
self.update_existing_segment(
current_segment,
frame_time,
current_tracked_objects,
)
elif topic == DetectionTypeEnum.audio and len(audio_detections) > 0:
camera_config = self.config.cameras[camera]
if frame_time > current_segment.last_update:
current_segment.last_update = frame_time
for audio in audio_detections:
if audio in camera_config.review.alerts.labels:
current_segment.audio.add(audio)
current_segment.severity = SeverityEnum.alert
elif (
camera_config.review.detections.labels is None
or audio in camera_config.review.detections.labels
):
current_segment.audio.add(audio)
elif topic == DetectionTypeEnum.api:
if manual_info["state"] == ManualEventState.complete:
current_segment.detections[manual_info["event_id"]] = (
manual_info["label"]
)
current_segment.severity = SeverityEnum.alert
current_segment.last_update = manual_info["end_time"]
elif manual_info["state"] == ManualEventState.start:
self.indefinite_events[camera][manual_info["event_id"]] = (
manual_info["label"]
)
current_segment.detections[manual_info["event_id"]] = (
manual_info["label"]
)
current_segment.severity = SeverityEnum.alert
# temporarily make it so this event can not end
current_segment.last_update = sys.maxsize
elif manual_info["state"] == ManualEventState.end:
event_id = manual_info["event_id"]
if event_id in self.indefinite_events[camera]:
self.indefinite_events[camera].pop(event_id)
current_segment.last_update = manual_info["end_time"]
else:
logger.error(
f"Event with ID {event_id} has a set duration and can not be ended manually."
)
else:
if topic == DetectionTypeEnum.video:
self.check_if_new_segment(
camera,
frame_time,
current_tracked_objects,
)
elif topic == DetectionTypeEnum.audio and len(audio_detections) > 0:
severity = None
camera_config = self.config.cameras[camera]
detections = set()
for audio in audio_detections:
if audio in camera_config.review.alerts.labels:
detections.add(audio)
severity = SeverityEnum.alert
elif (
camera_config.review.detections.labels is None
or audio in camera_config.review.detections.labels
):
detections.add(audio)
if not severity:
severity = SeverityEnum.detection
if severity:
self.active_review_segments[camera] = PendingReviewSegment(
camera,
frame_time,
severity,
{},
set(),
[],
detections,
)
elif topic == DetectionTypeEnum.api:
self.active_review_segments[camera] = PendingReviewSegment(
camera,
frame_time,
SeverityEnum.alert,
{manual_info["event_id"]: manual_info["label"]},
set(),
[],
set(),
)
if manual_info["state"] == ManualEventState.start:
self.indefinite_events[camera][manual_info["event_id"]] = (
manual_info["label"]
)
# temporarily make it so this event can not end
self.active_review_segments[camera].last_update = sys.maxsize
elif manual_info["state"] == ManualEventState.complete:
self.active_review_segments[camera].last_update = manual_info[
"end_time"
]
self.config_subscriber.stop()
self.requestor.stop()
self.detection_subscriber.stop()
logger.info("Exiting review maintainer...")
def get_active_objects(
frame_time: float, camera_config: CameraConfig, all_objects: list[TrackedObject]

View File

@ -1,36 +0,0 @@
"""Run recording maintainer and cleanup."""
import logging
import multiprocessing as mp
import signal
import threading
from types import FrameType
from typing import Optional
from setproctitle import setproctitle
from frigate.config import FrigateConfig
from frigate.review.maintainer import ReviewSegmentMaintainer
from frigate.util.services import listen
logger = logging.getLogger(__name__)
def manage_review_segments(config: FrigateConfig) -> None:
stop_event = mp.Event()
def receiveSignal(signalNumber: int, frame: Optional[FrameType]) -> None:
stop_event.set()
signal.signal(signal.SIGTERM, receiveSignal)
signal.signal(signal.SIGINT, receiveSignal)
threading.current_thread().name = "process:review_segment_manager"
setproctitle("frigate.review_segment_manager")
listen()
maintainer = ReviewSegmentMaintainer(
config,
stop_event,
)
maintainer.start()