rename enum

This commit is contained in:
Nick Mowen 2023-04-29 12:50:58 -06:00
parent b46132e6f8
commit dd304b162e
3 changed files with 52 additions and 54 deletions

View File

@ -4,7 +4,7 @@ import os
import queue import queue
import threading import threading
from enum import Enum
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
@ -13,7 +13,6 @@ from peewee import fn
from frigate.config import EventsConfig, FrigateConfig from frigate.config import EventsConfig, FrigateConfig
from frigate.const import CLIPS_DIR from frigate.const import CLIPS_DIR
from frigate.models import Event from frigate.models import Event
from frigate.timeline import TimelineSourceEnum
from frigate.types import CameraMetricsTypes from frigate.types import CameraMetricsTypes
from frigate.util import to_relative_box from frigate.util import to_relative_box
@ -24,6 +23,12 @@ from typing import Dict
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class DetectionTypeEnum(str, Enum):
# api = "api"
# audio = "audio"
tracked_object = "tracked_object"
def should_update_db(prev_event: Event, current_event: Event) -> bool: def should_update_db(prev_event: Event, current_event: Event) -> bool:
"""If current_event has updated fields and (clip or snapshot).""" """If current_event has updated fields and (clip or snapshot)."""
if current_event["has_clip"] or current_event["has_snapshot"]: if current_event["has_clip"] or current_event["has_snapshot"]:
@ -61,6 +66,45 @@ class EventProcessor(threading.Thread):
self.events_in_process: Dict[str, Event] = {} self.events_in_process: Dict[str, Event] = {}
self.stop_event = stop_event self.stop_event = stop_event
def run(self) -> None:
# set an end_time on events without an end_time on startup
Event.update(end_time=Event.start_time + 30).where(
Event.end_time == None
).execute()
while not self.stop_event.is_set():
try:
source_type, event_type, camera, event_data = self.event_queue.get(
timeout=1
)
except queue.Empty:
continue
logger.debug(f"Event received: {event_type} {camera} {event_data['id']}")
self.timeline_queue.put(
(
camera,
source_type,
event_type,
self.events_in_process.get(event_data["id"]),
event_data,
)
)
if source_type == DetectionTypeEnum.tracked_object:
if event_type == "start":
self.events_in_process[event_data["id"]] = event_data
continue
self.handle_object_detection(event_type, camera, event_data)
# set an end_time on events without an end_time before exiting
Event.update(end_time=datetime.datetime.now().timestamp()).where(
Event.end_time == None
).execute()
logger.info(f"Exiting event processor...")
def handle_object_detection( def handle_object_detection(
self, event_type: str, camera: str, event_data: dict[str, Any] self, event_type: str, camera: str, event_data: dict[str, Any]
) -> None: ) -> None:
@ -153,45 +197,6 @@ class EventProcessor(threading.Thread):
del self.events_in_process[event_data["id"]] del self.events_in_process[event_data["id"]]
self.event_processed_queue.put((event_data["id"], camera)) self.event_processed_queue.put((event_data["id"], camera))
def run(self) -> None:
# set an end_time on events without an end_time on startup
Event.update(end_time=Event.start_time + 30).where(
Event.end_time == None
).execute()
while not self.stop_event.is_set():
try:
source_type, event_type, camera, event_data = self.event_queue.get(
timeout=1
)
except queue.Empty:
continue
logger.debug(f"Event received: {event_type} {camera} {event_data['id']}")
self.timeline_queue.put(
(
camera,
source_type,
event_type,
self.events_in_process.get(event_data["id"]),
event_data,
)
)
if source_type == TimelineSourceEnum.tracked_object:
if event_type == "start":
self.events_in_process[event_data["id"]] = event_data
continue
self.handle_object_detection(event_type, camera, event_data)
# set an end_time on events without an end_time before exiting
Event.update(end_time=datetime.datetime.now().timestamp()).where(
Event.end_time == None
).execute()
logger.info(f"Exiting event processor...")
class EventCleanup(threading.Thread): class EventCleanup(threading.Thread):
def __init__(self, config: FrigateConfig, stop_event: MpEvent): def __init__(self, config: FrigateConfig, stop_event: MpEvent):

View File

@ -21,7 +21,7 @@ from frigate.config import (
FrigateConfig, FrigateConfig,
) )
from frigate.const import CLIPS_DIR from frigate.const import CLIPS_DIR
from frigate.timeline import TimelineSourceEnum from frigate.events import DetectionTypeEnum
from frigate.util import ( from frigate.util import (
SharedMemoryFrameManager, SharedMemoryFrameManager,
calculate_region, calculate_region,
@ -657,7 +657,7 @@ class TrackedObjectProcessor(threading.Thread):
self.last_motion_detected: dict[str, float] = {} self.last_motion_detected: dict[str, float] = {}
def start(camera, obj: TrackedObject, current_frame_time): def start(camera, obj: TrackedObject, current_frame_time):
self.event_queue.put((TimelineSourceEnum.tracked_object, "start", camera, obj.to_dict())) self.event_queue.put((DetectionTypeEnum.tracked_object, "start", camera, obj.to_dict()))
def update(camera, obj: TrackedObject, current_frame_time): def update(camera, obj: TrackedObject, current_frame_time):
obj.has_snapshot = self.should_save_snapshot(camera, obj) obj.has_snapshot = self.should_save_snapshot(camera, obj)
@ -671,7 +671,7 @@ class TrackedObjectProcessor(threading.Thread):
self.dispatcher.publish("events", json.dumps(message), retain=False) self.dispatcher.publish("events", json.dumps(message), retain=False)
obj.previous = after obj.previous = after
self.event_queue.put( self.event_queue.put(
(TimelineSourceEnum.tracked_object, "update", camera, obj.to_dict(include_thumbnail=True)) (DetectionTypeEnum.tracked_object, "update", camera, obj.to_dict(include_thumbnail=True))
) )
def end(camera, obj: TrackedObject, current_frame_time): def end(camera, obj: TrackedObject, current_frame_time):
@ -723,7 +723,7 @@ class TrackedObjectProcessor(threading.Thread):
} }
self.dispatcher.publish("events", json.dumps(message), retain=False) self.dispatcher.publish("events", json.dumps(message), retain=False)
self.event_queue.put((TimelineSourceEnum.tracked_object, "end", camera, obj.to_dict(include_thumbnail=True))) self.event_queue.put((DetectionTypeEnum.tracked_object, "end", camera, obj.to_dict(include_thumbnail=True)))
def snapshot(camera, obj: TrackedObject, current_frame_time): def snapshot(camera, obj: TrackedObject, current_frame_time):
mqtt_config: MqttConfig = self.config.cameras[camera].mqtt mqtt_config: MqttConfig = self.config.cameras[camera].mqtt

View File

@ -4,9 +4,8 @@ import logging
import threading import threading
import queue import queue
from enum import Enum
from frigate.config import FrigateConfig from frigate.config import FrigateConfig
from frigate.events import DetectionTypeEnum
from frigate.models import Timeline from frigate.models import Timeline
from multiprocessing.queues import Queue from multiprocessing.queues import Queue
@ -17,12 +16,6 @@ from frigate.util import to_relative_box
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class TimelineSourceEnum(str, Enum):
# api = "api"
# audio = "audio"
tracked_object = "tracked_object"
class TimelineProcessor(threading.Thread): class TimelineProcessor(threading.Thread):
"""Handle timeline queue and update DB.""" """Handle timeline queue and update DB."""
@ -51,7 +44,7 @@ class TimelineProcessor(threading.Thread):
except queue.Empty: except queue.Empty:
continue continue
if input_type == TimelineSourceEnum.tracked_object: if input_type == DetectionTypeEnum.tracked_object:
self.handle_object_detection( self.handle_object_detection(
camera, event_type, prev_event_data, event_data camera, event_type, prev_event_data, event_data
) )