frigate/frigate/timeline.py
Josh Hawkins c61bb8f8ae
Some checks are pending
CI / AMD64 Build (push) Waiting to run
CI / ARM Build (push) Waiting to run
CI / Jetson Jetpack 6 (push) Waiting to run
CI / AMD64 Extra Build (push) Blocked by required conditions
CI / ARM Extra Build (push) Blocked by required conditions
CI / Synaptics Build (push) Blocked by required conditions
CI / Assemble and push default build (push) Blocked by required conditions
Fix audio transcription (#20395)
* camera level config

* set up model runner on thread start to avoid unpickling error

* ensure feature is enabled globally

* suppress info logs from faster_whisper

* fix incorrect event_type for api and audio timeline entries

* docs

* fix

* clean up
2025-10-08 18:06:03 -05:00

189 lines
6.6 KiB
Python

"""Record events for object, audio, etc. detections."""
import logging
import queue
import threading
from multiprocessing import Queue
from multiprocessing.synchronize import Event as MpEvent
from typing import Any
from frigate.config import FrigateConfig
from frigate.events.maintainer import EventStateEnum, EventTypeEnum
from frigate.models import Timeline
from frigate.util.builtin import to_relative_box
logger = logging.getLogger(__name__)
class TimelineProcessor(threading.Thread):
"""Handle timeline queue and update DB."""
def __init__(
self,
config: FrigateConfig,
queue: Queue,
stop_event: MpEvent,
) -> None:
super().__init__(name="timeline_processor")
self.config = config
self.queue = queue
self.stop_event = stop_event
self.pre_event_cache: dict[str, list[dict[str, Any]]] = {}
def run(self) -> None:
while not self.stop_event.is_set():
try:
(
camera,
input_type,
event_type,
prev_event_data,
event_data,
) = self.queue.get(timeout=1)
except queue.Empty:
continue
if input_type == EventTypeEnum.tracked_object:
# None prev_event_data is only allowed for the start of an event
if event_type != EventStateEnum.start and prev_event_data is None:
continue
self.handle_object_detection(
camera, event_type, prev_event_data, event_data
)
elif input_type == EventTypeEnum.api:
self.handle_api_entry(camera, event_type, event_data)
def insert_or_save(
self,
entry: dict[str, Any],
prev_event_data: dict[Any, Any],
event_data: dict[Any, Any],
) -> None:
"""Insert into db or cache."""
id = entry[Timeline.source_id]
if not event_data["has_clip"] and not event_data["has_snapshot"]:
# the related event has not been saved yet, should be added to cache
if id in self.pre_event_cache.keys():
self.pre_event_cache[id].append(entry)
else:
self.pre_event_cache[id] = [entry]
else:
# the event is saved, insert to db and insert cached into db
if id in self.pre_event_cache.keys():
for e in self.pre_event_cache[id]:
Timeline.insert(e).execute()
self.pre_event_cache.pop(id)
Timeline.insert(entry).execute()
def handle_object_detection(
self,
camera: str,
event_type: str,
prev_event_data: dict[Any, Any],
event_data: dict[Any, Any],
) -> bool:
"""Handle object detection."""
save = False
camera_config = self.config.cameras[camera]
event_id = event_data["id"]
timeline_entry = {
Timeline.timestamp: event_data["frame_time"],
Timeline.camera: camera,
Timeline.source: "tracked_object",
Timeline.source_id: event_id,
Timeline.data: {
"box": to_relative_box(
camera_config.detect.width,
camera_config.detect.height,
event_data["box"],
),
"label": event_data["label"],
"sub_label": event_data.get("sub_label"),
"region": to_relative_box(
camera_config.detect.width,
camera_config.detect.height,
event_data["region"],
),
"attribute": "",
},
}
# update sub labels for existing entries that haven't been added yet
if (
prev_event_data != None
and prev_event_data["sub_label"] != event_data["sub_label"]
and event_id in self.pre_event_cache.keys()
):
for e in self.pre_event_cache[event_id]:
e[Timeline.data]["sub_label"] = event_data["sub_label"]
if event_type == EventStateEnum.start:
timeline_entry[Timeline.class_type] = "visible"
save = True
elif event_type == EventStateEnum.update:
if (
len(prev_event_data["current_zones"]) < len(event_data["current_zones"])
and not event_data["stationary"]
):
timeline_entry[Timeline.class_type] = "entered_zone"
timeline_entry[Timeline.data]["zones"] = event_data["current_zones"]
save = True
elif prev_event_data["stationary"] != event_data["stationary"]:
timeline_entry[Timeline.class_type] = (
"stationary" if event_data["stationary"] else "active"
)
save = True
elif prev_event_data["attributes"] == {} and event_data["attributes"] != {}:
timeline_entry[Timeline.class_type] = "attribute"
timeline_entry[Timeline.data]["attribute"] = list(
event_data["attributes"].keys()
)[0]
save = True
elif event_type == EventStateEnum.end:
timeline_entry[Timeline.class_type] = "gone"
save = True
if save:
self.insert_or_save(timeline_entry, prev_event_data, event_data)
def handle_api_entry(
self,
camera: str,
event_type: str,
event_data: dict[Any, Any],
) -> bool:
if event_type != "start":
return False
if event_data.get("type", "api") == "audio":
timeline_entry = {
Timeline.class_type: "heard",
Timeline.timestamp: event_data["start_time"],
Timeline.camera: camera,
Timeline.source: "audio",
Timeline.source_id: event_data["id"],
Timeline.data: {
"label": event_data["label"],
"sub_label": event_data.get("sub_label"),
},
}
else:
timeline_entry = {
Timeline.class_type: "external",
Timeline.timestamp: event_data["start_time"],
Timeline.camera: camera,
Timeline.source: "api",
Timeline.source_id: event_data["id"],
Timeline.data: {
"label": event_data["label"],
"sub_label": event_data.get("sub_label"),
},
}
Timeline.insert(timeline_entry).execute()
return True