Upsert and maintain review segments

This commit is contained in:
Nicolas Mowen 2024-02-17 07:25:24 -07:00
parent 6b551ca073
commit d8ef721c94
3 changed files with 151 additions and 2 deletions

View File

@ -6,8 +6,13 @@ from typing import Any, Callable, Optional
from frigate.comms.config_updater import ConfigPublisher
from frigate.config import BirdseyeModeEnum, FrigateConfig
from frigate.const import INSERT_MANY_RECORDINGS, INSERT_PREVIEW, REQUEST_REGION_GRID
from frigate.models import Previews, Recordings
from frigate.const import (
INSERT_MANY_RECORDINGS,
INSERT_PREVIEW,
REQUEST_REGION_GRID,
UPSERT_REVIEW_SEGMENT,
)
from frigate.models import Previews, Recordings, ReviewSegment
from frigate.ptz.onvif import OnvifCommandEnum, OnvifController
from frigate.types import PTZMetricsTypes
from frigate.util.object import get_camera_regions_grid
@ -102,6 +107,15 @@ class Dispatcher:
return grid
elif topic == INSERT_PREVIEW:
Previews.insert(payload).execute()
elif topic == UPSERT_REVIEW_SEGMENT:
(
ReviewSegment.insert(payload)
.on_conflict(
conflict_target=[ReviewSegment.id],
update=payload,
)
.execute()
)
else:
self.publish(topic, payload, retain=False)

View File

@ -70,6 +70,7 @@ MAX_PLAYLIST_SECONDS = 7200 # support 2 hour segments for a single playlist to
INSERT_MANY_RECORDINGS = "insert_many_recordings"
INSERT_PREVIEW = "insert_preview"
REQUEST_REGION_GRID = "request_region_grid"
UPSERT_REVIEW_SEGMENT = "upsert_review_segment"
# Autotracking

View File

@ -1,17 +1,58 @@
"""Maintain review segments in db."""
import logging
import random
import string
import threading
from enum import Enum
from multiprocessing.synchronize import Event as MpEvent
from typing import Optional
from frigate.comms.config_updater import ConfigSubscriber
from frigate.comms.detections_updater import DetectionSubscriber, DetectionTypeEnum
from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import FrigateConfig
from frigate.const import UPSERT_REVIEW_SEGMENT
from frigate.models import ReviewSegment
from frigate.object_processing import TrackedObject
logger = logging.getLogger(__name__)
class SeverityEnum(str, Enum):
alert = "alert"
detection = "detection"
signification_motion = "significant_motion"
class PendingReviewSegment:
def __init__(self, camera: str, frame_time: float, severity: SeverityEnum):
rand_id = "".join(random.choices(string.ascii_lowercase + string.digits, k=6))
self.id = f"{frame_time}-{rand_id}"
self.camera = camera
self.start_time = frame_time
self.severity = severity
self.data: dict[str, set] = {
"objects": set(),
"zones": set(),
"audio": set(),
"significant_motion_areas": [],
}
self.last_update = frame_time
def end(self) -> dict:
return {
ReviewSegment.id: self.id,
ReviewSegment.camera: self.camera,
ReviewSegment.start_time: self.start_time,
ReviewSegment.end_time: self.last_update,
ReviewSegment.severity: self.severity.value,
ReviewSegment.thumb_path: "somewhere",
ReviewSegment.data: self.data,
}
class ReviewSegmentMaintainer(threading.Thread):
"""Maintain review segments."""
@ -19,6 +60,7 @@ class ReviewSegmentMaintainer(threading.Thread):
threading.Thread.__init__(self)
self.name = "review_segment_maintainer"
self.config = config
self.active_review_segments: dict[str, Optional[PendingReviewSegment]] = {}
# create communication for review segments
self.requestor = InterProcessRequestor()
@ -26,3 +68,95 @@ class ReviewSegmentMaintainer(threading.Thread):
self.detection_subscriber = DetectionSubscriber(DetectionTypeEnum.all)
self.stop_event = stop_event
def end_segment(self, segment: PendingReviewSegment) -> None:
"""End segment."""
self.requestor.send_data(UPSERT_REVIEW_SEGMENT, segment.end())
self.active_review_segments[segment.camera] = None
def update_existing_segment(
self,
segment: PendingReviewSegment,
frame_time: float,
objects: list[TrackedObject],
motion: list,
) -> None:
"""Validate if existing review segment should continue."""
camera_config = self.config.cameras[segment.camera]
active_objects = [
o
for o in objects
if o.obj_data["motionless_count"]
> camera_config.detect.stationary.threshold
]
if len(active_objects) > 0:
if segment.severity == SeverityEnum.signification_motion:
segment.severity = SeverityEnum.detection
for object in active_objects:
segment.data["objects"].add(object.obj_data["label"])
if segment.severity == SeverityEnum.detection and object.has_clip:
segment.severity = SeverityEnum.alert
if object.current_zones:
segment.data["zones"].update(object.current_zones)
elif frame_time > (segment.last_update + camera_config.detect.max_disappeared):
self.end_segment(segment)
def run(self) -> None:
while not self.stop_event.is_set():
# check if there is an updated config
while True:
(
updated_topic,
updated_record_config,
) = self.config_subscriber.check_for_update()
if not updated_topic:
break
camera_name = updated_topic.rpartition("/")[-1]
self.config.cameras[camera_name].record = updated_record_config
(topic, data) = self.detection_subscriber.get_data()
if not topic:
continue
if topic == DetectionTypeEnum.video:
(
camera,
frame_time,
current_tracked_objects,
motion_boxes,
regions,
) = data
elif topic == DetectionTypeEnum.audio:
(
camera,
frame_time,
dBFS,
audio_detections,
) = data
if not self.config.cameras[camera].record.enabled:
continue
current_segment = self.active_review_segments.get(camera)
if current_segment is not None:
if topic == DetectionTypeEnum.video:
self.update_existing_segment(
topic,
current_segment,
frame_time,
current_tracked_objects,
motion_boxes,
)
elif topic == DetectionTypeEnum.audio:
pass
else:
pass