diff --git a/frigate/events.py b/frigate/events.py index ee44340f9..7cf1d8be0 100644 --- a/frigate/events.py +++ b/frigate/events.py @@ -4,7 +4,7 @@ import os import queue import threading - +from enum import Enum from pathlib import Path from typing import Any @@ -13,7 +13,6 @@ from peewee import fn from frigate.config import EventsConfig, FrigateConfig from frigate.const import CLIPS_DIR from frigate.models import Event -from frigate.timeline import TimelineSourceEnum from frigate.types import CameraMetricsTypes from frigate.util import to_relative_box @@ -24,6 +23,12 @@ from typing import Dict logger = logging.getLogger(__name__) +class DetectionTypeEnum(str, Enum): + # api = "api" + # audio = "audio" + tracked_object = "tracked_object" + + def should_update_db(prev_event: Event, current_event: Event) -> bool: """If current_event has updated fields and (clip or snapshot).""" if current_event["has_clip"] or current_event["has_snapshot"]: @@ -61,6 +66,45 @@ class EventProcessor(threading.Thread): self.events_in_process: Dict[str, Event] = {} self.stop_event = stop_event + def run(self) -> None: + # set an end_time on events without an end_time on startup + Event.update(end_time=Event.start_time + 30).where( + Event.end_time == None + ).execute() + + while not self.stop_event.is_set(): + try: + source_type, event_type, camera, event_data = self.event_queue.get( + timeout=1 + ) + except queue.Empty: + continue + + logger.debug(f"Event received: {event_type} {camera} {event_data['id']}") + + self.timeline_queue.put( + ( + camera, + source_type, + event_type, + self.events_in_process.get(event_data["id"]), + event_data, + ) + ) + + if source_type == DetectionTypeEnum.tracked_object: + if event_type == "start": + self.events_in_process[event_data["id"]] = event_data + continue + + self.handle_object_detection(event_type, camera, event_data) + + # set an end_time on events without an end_time before exiting + Event.update(end_time=datetime.datetime.now().timestamp()).where( + Event.end_time == None + ).execute() + logger.info(f"Exiting event processor...") + def handle_object_detection( self, event_type: str, camera: str, event_data: dict[str, Any] ) -> None: @@ -153,45 +197,6 @@ class EventProcessor(threading.Thread): del self.events_in_process[event_data["id"]] self.event_processed_queue.put((event_data["id"], camera)) - def run(self) -> None: - # set an end_time on events without an end_time on startup - Event.update(end_time=Event.start_time + 30).where( - Event.end_time == None - ).execute() - - while not self.stop_event.is_set(): - try: - source_type, event_type, camera, event_data = self.event_queue.get( - timeout=1 - ) - except queue.Empty: - continue - - logger.debug(f"Event received: {event_type} {camera} {event_data['id']}") - - self.timeline_queue.put( - ( - camera, - source_type, - event_type, - self.events_in_process.get(event_data["id"]), - event_data, - ) - ) - - if source_type == TimelineSourceEnum.tracked_object: - if event_type == "start": - self.events_in_process[event_data["id"]] = event_data - continue - - self.handle_object_detection(event_type, camera, event_data) - - # set an end_time on events without an end_time before exiting - Event.update(end_time=datetime.datetime.now().timestamp()).where( - Event.end_time == None - ).execute() - logger.info(f"Exiting event processor...") - class EventCleanup(threading.Thread): def __init__(self, config: FrigateConfig, stop_event: MpEvent): diff --git a/frigate/object_processing.py b/frigate/object_processing.py index 766054818..e25ebc5fa 100644 --- a/frigate/object_processing.py +++ b/frigate/object_processing.py @@ -21,7 +21,7 @@ from frigate.config import ( FrigateConfig, ) from frigate.const import CLIPS_DIR -from frigate.timeline import TimelineSourceEnum +from frigate.events import DetectionTypeEnum from frigate.util import ( SharedMemoryFrameManager, calculate_region, @@ -657,7 +657,7 @@ class TrackedObjectProcessor(threading.Thread): self.last_motion_detected: dict[str, float] = {} def start(camera, obj: TrackedObject, current_frame_time): - self.event_queue.put((TimelineSourceEnum.tracked_object, "start", camera, obj.to_dict())) + self.event_queue.put((DetectionTypeEnum.tracked_object, "start", camera, obj.to_dict())) def update(camera, obj: TrackedObject, current_frame_time): obj.has_snapshot = self.should_save_snapshot(camera, obj) @@ -671,7 +671,7 @@ class TrackedObjectProcessor(threading.Thread): self.dispatcher.publish("events", json.dumps(message), retain=False) obj.previous = after self.event_queue.put( - (TimelineSourceEnum.tracked_object, "update", camera, obj.to_dict(include_thumbnail=True)) + (DetectionTypeEnum.tracked_object, "update", camera, obj.to_dict(include_thumbnail=True)) ) def end(camera, obj: TrackedObject, current_frame_time): @@ -723,7 +723,7 @@ class TrackedObjectProcessor(threading.Thread): } self.dispatcher.publish("events", json.dumps(message), retain=False) - self.event_queue.put((TimelineSourceEnum.tracked_object, "end", camera, obj.to_dict(include_thumbnail=True))) + self.event_queue.put((DetectionTypeEnum.tracked_object, "end", camera, obj.to_dict(include_thumbnail=True))) def snapshot(camera, obj: TrackedObject, current_frame_time): mqtt_config: MqttConfig = self.config.cameras[camera].mqtt diff --git a/frigate/timeline.py b/frigate/timeline.py index c351e3e68..321879813 100644 --- a/frigate/timeline.py +++ b/frigate/timeline.py @@ -4,9 +4,8 @@ import logging import threading import queue -from enum import Enum - from frigate.config import FrigateConfig +from frigate.events import DetectionTypeEnum from frigate.models import Timeline from multiprocessing.queues import Queue @@ -17,12 +16,6 @@ from frigate.util import to_relative_box logger = logging.getLogger(__name__) -class TimelineSourceEnum(str, Enum): - # api = "api" - # audio = "audio" - tracked_object = "tracked_object" - - class TimelineProcessor(threading.Thread): """Handle timeline queue and update DB.""" @@ -51,7 +44,7 @@ class TimelineProcessor(threading.Thread): except queue.Empty: continue - if input_type == TimelineSourceEnum.tracked_object: + if input_type == DetectionTypeEnum.tracked_object: self.handle_object_detection( camera, event_type, prev_event_data, event_data )