Move manual event handling to camera state / tracked object

This commit is contained in:
Nicolas Mowen 2025-03-11 16:28:52 -06:00
parent e51ba8c9ee
commit 9ab665f2d1
5 changed files with 167 additions and 238 deletions

View File

@ -1207,40 +1207,21 @@ def create_event(
rand_id = "".join(random.choices(string.ascii_lowercase + string.digits, k=6))
event_id = f"{now}-{rand_id}"
(
camera_name,
label,
event_id,
body.include_recording,
body.score,
body.sub_label,
body.duration,
body.source_type,
body.draw,
)
try:
frame_processor: TrackedObjectProcessor = request.app.detected_frames_processor
external_processor: ExternalEventProcessor = request.app.external_processor
frame = frame_processor.get_current_frame(camera_name)
event_id = external_processor.create_manual_event(
request.app.event_metadata_updater.publish(
EventMetadataTypeEnum.manual_event_create,
(
now,
camera_name,
label,
body.source_type,
body.sub_label,
body.score,
body.duration,
event_id,
body.include_recording,
body.score,
body.sub_label,
body.duration,
body.source_type,
body.draw,
frame,
)
except Exception as e:
logger.error(e)
return JSONResponse(
content=({"success": False, "message": "An unknown error occurred"}),
status_code=500,
)
),
)
return JSONResponse(
content=(
@ -1262,7 +1243,9 @@ def create_event(
def end_event(request: Request, event_id: str, body: EventsEndBody):
try:
end_time = body.end_time or datetime.datetime.now().timestamp()
request.app.external_processor.finish_manual_event(event_id, end_time)
request.app.event_metadata_updater.publish(
EventMetadataTypeEnum.manual_event_end, (event_id, end_time)
)
except Exception:
return JSONResponse(
content=(

View File

@ -43,7 +43,6 @@ from frigate.db.sqlitevecq import SqliteVecQueueDatabase
from frigate.embeddings import EmbeddingsContext, manage_embeddings
from frigate.events.audio import AudioProcessor
from frigate.events.cleanup import EventCleanup
from frigate.events.external import ExternalEventProcessor
from frigate.events.maintainer import EventProcessor
from frigate.models import (
Event,
@ -57,7 +56,6 @@ from frigate.models import (
User,
)
from frigate.object_detection import ObjectDetectProcess
from frigate.track.object_processing import TrackedObjectProcessor
from frigate.output.output import output_frames
from frigate.ptz.autotrack import PtzAutoTrackerThread
from frigate.ptz.onvif import OnvifController
@ -69,6 +67,7 @@ from frigate.stats.emitter import StatsEmitter
from frigate.stats.util import stats_init
from frigate.storage import StorageMaintainer
from frigate.timeline import TimelineProcessor
from frigate.track.object_processing import TrackedObjectProcessor
from frigate.util.builtin import empty_and_close_queue
from frigate.util.image import SharedMemoryFrameManager, UntrackedSharedMemory
from frigate.util.object import get_camera_regions_grid

View File

@ -2,6 +2,7 @@
import datetime
import logging
import os
import threading
from collections import defaultdict
from typing import Callable
@ -16,6 +17,7 @@ from frigate.config import (
FrigateConfig,
ZoomingModeEnum,
)
from frigate.const import CLIPS_DIR, THUMB_DIR
from frigate.ptz.autotrack import PtzAutoTrackerThread
from frigate.track.tracked_object import TrackedObject
from frigate.util.image import (
@ -406,22 +408,58 @@ class CameraState:
self.previous_frame_id = frame_name
def process_manual_event(self, topic: str, payload: tuple) -> None:
if topic.endswith(EventMetadataTypeEnum.manual_event_create.value):
(
camera_name,
label,
event_id,
include_recording,
score,
sub_label,
duration,
source_type,
draw,
) = payload
def save_manual_event_image(
self, event_id: str, label: str, draw: dict[str, list[dict]]
) -> None:
img_frame = self.get_current_frame()
else:
pass
# write clean snapshot if enabled
if self.camera_config.snapshots.clean_copy:
ret, png = cv2.imencode(".png", img_frame)
if ret:
with open(
os.path.join(
CLIPS_DIR,
f"{self.camera_config.name}-{event_id}-clean.png",
),
"wb",
) as p:
p.write(png.tobytes())
# write jpg snapshot with optional annotations
if draw.get("boxes") and isinstance(draw.get("boxes"), list):
for box in draw.get("boxes"):
x = int(box["box"][0] * self.camera_config.detect.width)
y = int(box["box"][1] * self.camera_config.detect.height)
width = int(box["box"][2] * self.camera_config.detect.width)
height = int(box["box"][3] * self.camera_config.detect.height)
draw_box_with_label(
img_frame,
x,
y,
x + width,
y + height,
label,
f"{box.get('score', '-')}% {int(width * height)}",
thickness=2,
color=box.get("color", (255, 0, 0)),
)
ret, jpg = cv2.imencode(".jpg", img_frame)
with open(
os.path.join(CLIPS_DIR, f"{self.camera_config.name}-{event_id}.jpg"),
"wb",
) as j:
j.write(jpg.tobytes())
# create thumbnail with max height of 175 and save
width = int(175 * img_frame.shape[1] / img_frame.shape[0])
thumb = cv2.resize(img_frame, dsize=(width, 175), interpolation=cv2.INTER_AREA)
cv2.imwrite(
os.path.join(THUMB_DIR, self.camera_config.name, f"{event_id}.webp"), thumb
)
def shutdown(self) -> None:
for obj in self.tracked_objects.values():

View File

@ -1,187 +0,0 @@
"""Handle external events created by the user."""
import datetime
import logging
import os
import random
import string
from enum import Enum
from typing import Optional
import cv2
from numpy import ndarray
from frigate.comms.detections_updater import DetectionPublisher, DetectionTypeEnum
from frigate.comms.events_updater import EventUpdatePublisher
from frigate.config import CameraConfig, FrigateConfig
from frigate.const import CLIPS_DIR, THUMB_DIR
from frigate.events.types import EventStateEnum, EventTypeEnum
from frigate.util.image import draw_box_with_label
logger = logging.getLogger(__name__)
class ManualEventState(str, Enum):
complete = "complete"
start = "start"
end = "end"
class ExternalEventProcessor:
def __init__(self, config: FrigateConfig) -> None:
self.config = config
self.default_thumbnail = None
self.event_sender = EventUpdatePublisher()
self.detection_updater = DetectionPublisher(DetectionTypeEnum.api)
self.event_camera = {}
def create_manual_event(
self,
camera: str,
label: str,
source_type: str,
sub_label: Optional[str],
score: int,
duration: Optional[int],
include_recording: bool,
draw: dict[str, any],
snapshot_frame: Optional[ndarray],
) -> str:
now = datetime.datetime.now().timestamp()
camera_config = self.config.cameras.get(camera)
# create event id and start frame time
rand_id = "".join(random.choices(string.ascii_lowercase + string.digits, k=6))
event_id = f"{now}-{rand_id}"
self._write_images(camera_config, label, event_id, draw, snapshot_frame)
end = now + duration if duration is not None else None
self.event_sender.publish(
(
EventTypeEnum.api,
EventStateEnum.start,
camera,
"",
{
"id": event_id,
"label": label,
"sub_label": sub_label,
"score": score,
"camera": camera,
"start_time": now - camera_config.record.event_pre_capture,
"end_time": end,
"has_clip": camera_config.record.enabled and include_recording,
"has_snapshot": True,
"type": source_type,
},
)
)
if source_type == "api":
self.event_camera[event_id] = camera
self.detection_updater.publish(
(
camera,
now,
{
"state": (
ManualEventState.complete if end else ManualEventState.start
),
"label": f"{label}: {sub_label}" if sub_label else label,
"event_id": event_id,
"end_time": end,
},
)
)
return event_id
def finish_manual_event(self, event_id: str, end_time: float) -> None:
"""Finish external event with indeterminate duration."""
self.event_sender.publish(
(
EventTypeEnum.api,
EventStateEnum.end,
None,
"",
{"id": event_id, "end_time": end_time},
)
)
if event_id in self.event_camera:
self.detection_updater.publish(
(
self.event_camera[event_id],
end_time,
{
"state": ManualEventState.end,
"event_id": event_id,
"end_time": end_time,
},
)
)
self.event_camera.pop(event_id)
def _write_images(
self,
camera_config: CameraConfig,
label: str,
event_id: str,
draw: dict[str, any],
img_frame: Optional[ndarray],
) -> None:
if img_frame is None:
return
# write clean snapshot if enabled
if camera_config.snapshots.clean_copy:
ret, png = cv2.imencode(".png", img_frame)
if ret:
with open(
os.path.join(
CLIPS_DIR,
f"{camera_config.name}-{event_id}-clean.png",
),
"wb",
) as p:
p.write(png.tobytes())
# write jpg snapshot with optional annotations
if draw.get("boxes") and isinstance(draw.get("boxes"), list):
for box in draw.get("boxes"):
x = int(box["box"][0] * camera_config.detect.width)
y = int(box["box"][1] * camera_config.detect.height)
width = int(box["box"][2] * camera_config.detect.width)
height = int(box["box"][3] * camera_config.detect.height)
draw_box_with_label(
img_frame,
x,
y,
x + width,
y + height,
label,
f"{box.get('score', '-')}% {int(width * height)}",
thickness=2,
color=box.get("color", (255, 0, 0)),
)
ret, jpg = cv2.imencode(".jpg", img_frame)
with open(
os.path.join(CLIPS_DIR, f"{camera_config.name}-{event_id}.jpg"),
"wb",
) as j:
j.write(jpg.tobytes())
# create thumbnail with max height of 175 and save
width = int(175 * img_frame.shape[1] / img_frame.shape[0])
thumb = cv2.resize(img_frame, dsize=(width, 175), interpolation=cv2.INTER_AREA)
cv2.imwrite(
os.path.join(THUMB_DIR, camera_config.name, f"{event_id}.webp"), thumb
)
def stop(self):
self.event_sender.stop()
self.detection_updater.stop()

View File

@ -4,6 +4,7 @@ import logging
import queue
import threading
from collections import defaultdict
from enum import Enum
from multiprocessing.synchronize import Event as MpEvent
import numpy as np
@ -34,6 +35,12 @@ from frigate.util.image import SharedMemoryFrameManager
logger = logging.getLogger(__name__)
class ManualEventState(str, Enum):
complete = "complete"
start = "start"
end = "end"
class TrackedObjectProcessor(threading.Thread):
def __init__(
self,
@ -62,6 +69,7 @@ class TrackedObjectProcessor(threading.Thread):
self.sub_label_subscriber = EventMetadataSubscriber(EventMetadataTypeEnum.all)
self.camera_activity: dict[str, dict[str, any]] = {}
self.ongoing_manual_events: dict[str, str] = {}
# {
# 'zone_name': {
@ -338,6 +346,94 @@ class TrackedObjectProcessor(threading.Thread):
return True
def create_manual_event(self, payload: tuple) -> None:
(
frame_time,
camera_name,
label,
event_id,
include_recording,
score,
sub_label,
duration,
source_type,
draw,
) = payload
# save the snapshot image
self.camera_states[camera_name].save_manual_event_image(event_id, label, draw)
end_time = frame_time + duration if duration is not None else None
# send event to event maintainer
self.event_sender.publish(
(
EventTypeEnum.api,
EventStateEnum.start,
camera_name,
"",
{
"id": event_id,
"label": label,
"sub_label": sub_label,
"score": score,
"camera": camera_name,
"start_time": frame_time
- self.config.cameras[camera_name].record.event_pre_capture,
"end_time": end_time,
"has_clip": self.config.cameras[camera_name].record.enabled
and include_recording,
"has_snapshot": True,
"type": source_type,
},
)
)
if source_type == "api":
self.ongoing_manual_events[event_id] = camera_name
self.detection_publisher.publish(
(
camera_name,
frame_time,
{
"state": (
ManualEventState.complete
if end_time
else ManualEventState.start
),
"label": f"{label}: {sub_label}" if sub_label else label,
"event_id": event_id,
"end_time": end_time,
},
)
)
def end_manual_event(self, payload: tuple) -> None:
(event_id, end_time) = payload
self.event_sender.publish(
(
EventTypeEnum.api,
EventStateEnum.end,
None,
"",
{"id": event_id, "end_time": end_time},
)
)
if event_id in self.ongoing_manual_events:
self.detection_publisher.publish(
(
self.ongoing_manual_events[event_id],
end_time,
{
"state": ManualEventState.end,
"event_id": event_id,
"end_time": end_time,
},
)
)
self.ongoing_manual_events.pop(event_id)
def force_end_all_events(self, camera: str, camera_state: CameraState):
"""Ends all active events on camera when disabling."""
last_frame_name = camera_state.previous_frame_id
@ -409,9 +505,9 @@ class TrackedObjectProcessor(threading.Thread):
if topic.endswith(EventMetadataTypeEnum.sub_label.value):
(event_id, sub_label, score) = payload
self.set_sub_label(event_id, sub_label, score)
elif topic.endswith(
EventMetadataTypeEnum.manual_event_create.value
) or topic.endswith(EventMetadataTypeEnum.manual_event_end.value):
elif topic.endswith(EventMetadataTypeEnum.manual_event_create.value):
self.create_manual_event(payload)
elif topic.endswith(EventMetadataTypeEnum.manual_event_end.value):
camera_name = payload[0]
self.camera_states[camera_name].process_manual_event(topic, payload)