Refactor object processing and camera state

This commit is contained in:
Nicolas Mowen 2025-03-11 16:10:15 -06:00
parent ba57e67043
commit e51ba8c9ee
9 changed files with 473 additions and 410 deletions

View File

@ -3,6 +3,8 @@
import datetime
import logging
import os
import random
import string
from functools import reduce
from pathlib import Path
from urllib.parse import unquote
@ -43,9 +45,8 @@ from frigate.api.defs.tags import Tags
from frigate.comms.event_metadata_updater import EventMetadataTypeEnum
from frigate.const import CLIPS_DIR
from frigate.embeddings import EmbeddingsContext
from frigate.events.external import ExternalEventProcessor
from frigate.models import Event, ReviewSegment, Timeline
from frigate.object_processing import TrackedObject, TrackedObjectProcessor
from frigate.track.object_processing import TrackedObject, TrackedObjectProcessor
from frigate.util.builtin import get_tz_modifiers
logger = logging.getLogger(__name__)
@ -1202,6 +1203,22 @@ def create_event(
status_code=404,
)
now = datetime.datetime.now().timestamp()
rand_id = "".join(random.choices(string.ascii_lowercase + string.digits, k=6))
event_id = f"{now}-{rand_id}"
(
camera_name,
label,
event_id,
body.include_recording,
body.score,
body.sub_label,
body.duration,
body.source_type,
body.draw,
)
try:
frame_processor: TrackedObjectProcessor = request.app.detected_frames_processor
external_processor: ExternalEventProcessor = request.app.external_processor

View File

@ -37,7 +37,7 @@ from frigate.const import (
RECORD_DIR,
)
from frigate.models import Event, Previews, Recordings, Regions, ReviewSegment
from frigate.object_processing import TrackedObjectProcessor
from frigate.track.object_processing import TrackedObjectProcessor
from frigate.util.builtin import get_tz_modifiers
from frigate.util.image import get_image_from_recording
from frigate.util.path import get_event_thumbnail_bytes

View File

@ -57,7 +57,7 @@ from frigate.models import (
User,
)
from frigate.object_detection import ObjectDetectProcess
from frigate.object_processing import TrackedObjectProcessor
from frigate.track.object_processing import TrackedObjectProcessor
from frigate.output.output import output_frames
from frigate.ptz.autotrack import PtzAutoTrackerThread
from frigate.ptz.onvif import OnvifController

429
frigate/camera/state.py Normal file
View File

@ -0,0 +1,429 @@
"""Maintains state of camera."""
import datetime
import logging
import threading
from collections import defaultdict
from typing import Callable
import cv2
import numpy as np
from frigate.comms.event_metadata_updater import (
EventMetadataTypeEnum,
)
from frigate.config import (
FrigateConfig,
ZoomingModeEnum,
)
from frigate.ptz.autotrack import PtzAutoTrackerThread
from frigate.track.tracked_object import TrackedObject
from frigate.util.image import (
SharedMemoryFrameManager,
draw_box_with_label,
draw_timestamp,
is_better_thumbnail,
is_label_printable,
)
logger = logging.getLogger(__name__)
class CameraState:
def __init__(
self,
name,
config: FrigateConfig,
frame_manager: SharedMemoryFrameManager,
ptz_autotracker_thread: PtzAutoTrackerThread,
):
self.name = name
self.config = config
self.camera_config = config.cameras[name]
self.frame_manager = frame_manager
self.best_objects: dict[str, TrackedObject] = {}
self.tracked_objects: dict[str, TrackedObject] = {}
self.frame_cache = {}
self.zone_objects = defaultdict(list)
self._current_frame = np.zeros(self.camera_config.frame_shape_yuv, np.uint8)
self.current_frame_lock = threading.Lock()
self.current_frame_time = 0.0
self.motion_boxes = []
self.regions = []
self.previous_frame_id = None
self.callbacks = defaultdict(list)
self.ptz_autotracker_thread = ptz_autotracker_thread
self.prev_enabled = self.camera_config.enabled
def get_current_frame(self, draw_options={}):
with self.current_frame_lock:
frame_copy = np.copy(self._current_frame)
frame_time = self.current_frame_time
tracked_objects = {k: v.to_dict() for k, v in self.tracked_objects.items()}
motion_boxes = self.motion_boxes.copy()
regions = self.regions.copy()
frame_copy = cv2.cvtColor(frame_copy, cv2.COLOR_YUV2BGR_I420)
# draw on the frame
if draw_options.get("mask"):
mask_overlay = np.where(self.camera_config.motion.mask == [0])
frame_copy[mask_overlay] = [0, 0, 0]
if draw_options.get("bounding_boxes"):
# draw the bounding boxes on the frame
for obj in tracked_objects.values():
if obj["frame_time"] == frame_time:
if obj["stationary"]:
color = (220, 220, 220)
thickness = 1
else:
thickness = 2
color = self.config.model.colormap[obj["label"]]
else:
thickness = 1
color = (255, 0, 0)
# draw thicker box around ptz autotracked object
if (
self.camera_config.onvif.autotracking.enabled
and self.ptz_autotracker_thread.ptz_autotracker.autotracker_init[
self.name
]
and self.ptz_autotracker_thread.ptz_autotracker.tracked_object[
self.name
]
is not None
and obj["id"]
== self.ptz_autotracker_thread.ptz_autotracker.tracked_object[
self.name
].obj_data["id"]
and obj["frame_time"] == frame_time
):
thickness = 5
color = self.config.model.colormap[obj["label"]]
# debug autotracking zooming - show the zoom factor box
if (
self.camera_config.onvif.autotracking.zooming
!= ZoomingModeEnum.disabled
):
max_target_box = self.ptz_autotracker_thread.ptz_autotracker.tracked_object_metrics[
self.name
]["max_target_box"]
side_length = max_target_box * (
max(
self.camera_config.detect.width,
self.camera_config.detect.height,
)
)
centroid_x = (obj["box"][0] + obj["box"][2]) // 2
centroid_y = (obj["box"][1] + obj["box"][3]) // 2
top_left = (
int(centroid_x - side_length // 2),
int(centroid_y - side_length // 2),
)
bottom_right = (
int(centroid_x + side_length // 2),
int(centroid_y + side_length // 2),
)
cv2.rectangle(
frame_copy,
top_left,
bottom_right,
(255, 255, 0),
2,
)
# draw the bounding boxes on the frame
box = obj["box"]
text = (
obj["label"]
if (
not obj.get("sub_label")
or not is_label_printable(obj["sub_label"][0])
)
else obj["sub_label"][0]
)
draw_box_with_label(
frame_copy,
box[0],
box[1],
box[2],
box[3],
text,
f"{obj['score']:.0%} {int(obj['area'])}"
+ (
f" {float(obj['current_estimated_speed']):.1f}"
if obj["current_estimated_speed"] != 0
else ""
),
thickness=thickness,
color=color,
)
# draw any attributes
for attribute in obj["current_attributes"]:
box = attribute["box"]
draw_box_with_label(
frame_copy,
box[0],
box[1],
box[2],
box[3],
attribute["label"],
f"{attribute['score']:.0%}",
thickness=thickness,
color=color,
)
if draw_options.get("regions"):
for region in regions:
cv2.rectangle(
frame_copy,
(region[0], region[1]),
(region[2], region[3]),
(0, 255, 0),
2,
)
if draw_options.get("zones"):
for name, zone in self.camera_config.zones.items():
thickness = (
8
if any(
name in obj["current_zones"] for obj in tracked_objects.values()
)
else 2
)
cv2.drawContours(frame_copy, [zone.contour], -1, zone.color, thickness)
if draw_options.get("motion_boxes"):
for m_box in motion_boxes:
cv2.rectangle(
frame_copy,
(m_box[0], m_box[1]),
(m_box[2], m_box[3]),
(0, 0, 255),
2,
)
if draw_options.get("timestamp"):
color = self.camera_config.timestamp_style.color
draw_timestamp(
frame_copy,
frame_time,
self.camera_config.timestamp_style.format,
font_effect=self.camera_config.timestamp_style.effect,
font_thickness=self.camera_config.timestamp_style.thickness,
font_color=(color.blue, color.green, color.red),
position=self.camera_config.timestamp_style.position,
)
return frame_copy
def finished(self, obj_id):
del self.tracked_objects[obj_id]
def on(self, event_type: str, callback: Callable[[dict], None]):
self.callbacks[event_type].append(callback)
def update(
self,
frame_name: str,
frame_time: float,
current_detections: dict[str, dict[str, any]],
motion_boxes: list[tuple[int, int, int, int]],
regions: list[tuple[int, int, int, int]],
):
current_frame = self.frame_manager.get(
frame_name, self.camera_config.frame_shape_yuv
)
tracked_objects = self.tracked_objects.copy()
current_ids = set(current_detections.keys())
previous_ids = set(tracked_objects.keys())
removed_ids = previous_ids.difference(current_ids)
new_ids = current_ids.difference(previous_ids)
updated_ids = current_ids.intersection(previous_ids)
for id in new_ids:
new_obj = tracked_objects[id] = TrackedObject(
self.config.model,
self.camera_config,
self.config.ui,
self.frame_cache,
current_detections[id],
)
# call event handlers
for c in self.callbacks["start"]:
c(self.name, new_obj, frame_name)
for id in updated_ids:
updated_obj = tracked_objects[id]
thumb_update, significant_update, autotracker_update = updated_obj.update(
frame_time, current_detections[id], current_frame is not None
)
if autotracker_update or significant_update:
for c in self.callbacks["autotrack"]:
c(self.name, updated_obj, frame_name)
if thumb_update and current_frame is not None:
# ensure this frame is stored in the cache
if (
updated_obj.thumbnail_data["frame_time"] == frame_time
and frame_time not in self.frame_cache
):
self.frame_cache[frame_time] = np.copy(current_frame)
updated_obj.last_updated = frame_time
# if it has been more than 5 seconds since the last thumb update
# and the last update is greater than the last publish or
# the object has changed significantly
if (
frame_time - updated_obj.last_published > 5
and updated_obj.last_updated > updated_obj.last_published
) or significant_update:
# call event handlers
for c in self.callbacks["update"]:
c(self.name, updated_obj, frame_name)
updated_obj.last_published = frame_time
for id in removed_ids:
# publish events to mqtt
removed_obj = tracked_objects[id]
if "end_time" not in removed_obj.obj_data:
removed_obj.obj_data["end_time"] = frame_time
for c in self.callbacks["end"]:
c(self.name, removed_obj, frame_name)
# TODO: can i switch to looking this up and only changing when an event ends?
# maintain best objects
camera_activity: dict[str, list[any]] = {
"enabled": True,
"motion": len(motion_boxes) > 0,
"objects": [],
}
for obj in tracked_objects.values():
object_type = obj.obj_data["label"]
active = obj.is_active()
if not obj.false_positive:
label = object_type
sub_label = None
if obj.obj_data.get("sub_label"):
if (
obj.obj_data.get("sub_label")[0]
in self.config.model.all_attributes
):
label = obj.obj_data["sub_label"][0]
else:
label = f"{object_type}-verified"
sub_label = obj.obj_data["sub_label"][0]
camera_activity["objects"].append(
{
"id": obj.obj_data["id"],
"label": label,
"stationary": not active,
"area": obj.obj_data["area"],
"ratio": obj.obj_data["ratio"],
"score": obj.obj_data["score"],
"sub_label": sub_label,
"current_zones": obj.current_zones,
}
)
# if we don't have access to the current frame or
# if the object's thumbnail is not from the current frame, skip
if (
current_frame is None
or obj.thumbnail_data is None
or obj.false_positive
or obj.thumbnail_data["frame_time"] != frame_time
):
continue
if object_type in self.best_objects:
current_best = self.best_objects[object_type]
now = datetime.datetime.now().timestamp()
# if the object is a higher score than the current best score
# or the current object is older than desired, use the new object
if (
is_better_thumbnail(
object_type,
current_best.thumbnail_data,
obj.thumbnail_data,
self.camera_config.frame_shape,
)
or (now - current_best.thumbnail_data["frame_time"])
> self.camera_config.best_image_timeout
):
self.best_objects[object_type] = obj
for c in self.callbacks["snapshot"]:
c(self.name, self.best_objects[object_type], frame_name)
else:
self.best_objects[object_type] = obj
for c in self.callbacks["snapshot"]:
c(self.name, self.best_objects[object_type], frame_name)
for c in self.callbacks["camera_activity"]:
c(self.name, camera_activity)
# cleanup thumbnail frame cache
current_thumb_frames = {
obj.thumbnail_data["frame_time"]
for obj in tracked_objects.values()
if not obj.false_positive and obj.thumbnail_data is not None
}
current_best_frames = {
obj.thumbnail_data["frame_time"] for obj in self.best_objects.values()
}
thumb_frames_to_delete = [
t
for t in self.frame_cache.keys()
if t not in current_thumb_frames and t not in current_best_frames
]
for t in thumb_frames_to_delete:
del self.frame_cache[t]
with self.current_frame_lock:
self.tracked_objects = tracked_objects
self.motion_boxes = motion_boxes
self.regions = regions
if current_frame is not None:
self.current_frame_time = frame_time
self._current_frame = np.copy(current_frame)
if self.previous_frame_id is not None:
self.frame_manager.close(self.previous_frame_id)
self.previous_frame_id = frame_name
def process_manual_event(self, topic: str, payload: tuple) -> None:
if topic.endswith(EventMetadataTypeEnum.manual_event_create.value):
(
camera_name,
label,
event_id,
include_recording,
score,
sub_label,
duration,
source_type,
draw,
) = payload
else:
pass
def shutdown(self) -> None:
for obj in self.tracked_objects.values():
if not obj.obj_data.get("end_time"):
obj.write_thumbnail_to_disk()

View File

@ -10,6 +10,8 @@ logger = logging.getLogger(__name__)
class EventMetadataTypeEnum(str, Enum):
all = ""
manual_event_create = "manual_event_create"
manual_event_end = "manual_event_end"
regenerate_description = "regenerate_description"
sub_label = "sub_label"

View File

@ -22,7 +22,7 @@ from frigate.ffmpeg_presets import (
parse_preset_hardware_acceleration_encode,
)
from frigate.models import Previews
from frigate.object_processing import TrackedObject
from frigate.track.object_processing import TrackedObject
from frigate.util.image import copy_yuv_to_position, get_blank_yuv_frame, get_yuv_crop
logger = logging.getLogger(__name__)

View File

@ -25,7 +25,7 @@ from frigate.const import (
)
from frigate.events.external import ManualEventState
from frigate.models import ReviewSegment
from frigate.object_processing import TrackedObject
from frigate.track.object_processing import TrackedObject
from frigate.review.types import SeverityEnum
from frigate.util.image import SharedMemoryFrameManager, calculate_16_9_crop

View File

@ -5,12 +5,11 @@ import queue
import threading
from collections import defaultdict
from multiprocessing.synchronize import Event as MpEvent
from typing import Callable, Optional
import cv2
import numpy as np
from peewee import DoesNotExist
from frigate.camera.state import CameraState
from frigate.comms.config_updater import ConfigSubscriber
from frigate.comms.detections_updater import DetectionPublisher, DetectionTypeEnum
from frigate.comms.dispatcher import Dispatcher
@ -25,408 +24,16 @@ from frigate.config import (
FrigateConfig,
RecordConfig,
SnapshotsConfig,
ZoomingModeEnum,
)
from frigate.const import UPDATE_CAMERA_ACTIVITY
from frigate.events.types import EventStateEnum, EventTypeEnum
from frigate.models import Event, Timeline
from frigate.ptz.autotrack import PtzAutoTrackerThread
from frigate.track.tracked_object import TrackedObject
from frigate.util.image import (
SharedMemoryFrameManager,
draw_box_with_label,
draw_timestamp,
is_better_thumbnail,
is_label_printable,
)
from frigate.util.image import SharedMemoryFrameManager
logger = logging.getLogger(__name__)
# Maintains the state of a camera
class CameraState:
def __init__(
self,
name,
config: FrigateConfig,
frame_manager: SharedMemoryFrameManager,
ptz_autotracker_thread: PtzAutoTrackerThread,
):
self.name = name
self.config = config
self.camera_config = config.cameras[name]
self.frame_manager = frame_manager
self.best_objects: dict[str, TrackedObject] = {}
self.tracked_objects: dict[str, TrackedObject] = {}
self.frame_cache = {}
self.zone_objects = defaultdict(list)
self._current_frame = np.zeros(self.camera_config.frame_shape_yuv, np.uint8)
self.current_frame_lock = threading.Lock()
self.current_frame_time = 0.0
self.motion_boxes = []
self.regions = []
self.previous_frame_id = None
self.callbacks = defaultdict(list)
self.ptz_autotracker_thread = ptz_autotracker_thread
self.prev_enabled = self.camera_config.enabled
def get_current_frame(self, draw_options={}):
with self.current_frame_lock:
frame_copy = np.copy(self._current_frame)
frame_time = self.current_frame_time
tracked_objects = {k: v.to_dict() for k, v in self.tracked_objects.items()}
motion_boxes = self.motion_boxes.copy()
regions = self.regions.copy()
frame_copy = cv2.cvtColor(frame_copy, cv2.COLOR_YUV2BGR_I420)
# draw on the frame
if draw_options.get("mask"):
mask_overlay = np.where(self.camera_config.motion.mask == [0])
frame_copy[mask_overlay] = [0, 0, 0]
if draw_options.get("bounding_boxes"):
# draw the bounding boxes on the frame
for obj in tracked_objects.values():
if obj["frame_time"] == frame_time:
if obj["stationary"]:
color = (220, 220, 220)
thickness = 1
else:
thickness = 2
color = self.config.model.colormap[obj["label"]]
else:
thickness = 1
color = (255, 0, 0)
# draw thicker box around ptz autotracked object
if (
self.camera_config.onvif.autotracking.enabled
and self.ptz_autotracker_thread.ptz_autotracker.autotracker_init[
self.name
]
and self.ptz_autotracker_thread.ptz_autotracker.tracked_object[
self.name
]
is not None
and obj["id"]
== self.ptz_autotracker_thread.ptz_autotracker.tracked_object[
self.name
].obj_data["id"]
and obj["frame_time"] == frame_time
):
thickness = 5
color = self.config.model.colormap[obj["label"]]
# debug autotracking zooming - show the zoom factor box
if (
self.camera_config.onvif.autotracking.zooming
!= ZoomingModeEnum.disabled
):
max_target_box = self.ptz_autotracker_thread.ptz_autotracker.tracked_object_metrics[
self.name
]["max_target_box"]
side_length = max_target_box * (
max(
self.camera_config.detect.width,
self.camera_config.detect.height,
)
)
centroid_x = (obj["box"][0] + obj["box"][2]) // 2
centroid_y = (obj["box"][1] + obj["box"][3]) // 2
top_left = (
int(centroid_x - side_length // 2),
int(centroid_y - side_length // 2),
)
bottom_right = (
int(centroid_x + side_length // 2),
int(centroid_y + side_length // 2),
)
cv2.rectangle(
frame_copy,
top_left,
bottom_right,
(255, 255, 0),
2,
)
# draw the bounding boxes on the frame
box = obj["box"]
text = (
obj["label"]
if (
not obj.get("sub_label")
or not is_label_printable(obj["sub_label"][0])
)
else obj["sub_label"][0]
)
draw_box_with_label(
frame_copy,
box[0],
box[1],
box[2],
box[3],
text,
f"{obj['score']:.0%} {int(obj['area'])}"
+ (
f" {float(obj['current_estimated_speed']):.1f}"
if obj["current_estimated_speed"] != 0
else ""
),
thickness=thickness,
color=color,
)
# draw any attributes
for attribute in obj["current_attributes"]:
box = attribute["box"]
draw_box_with_label(
frame_copy,
box[0],
box[1],
box[2],
box[3],
attribute["label"],
f"{attribute['score']:.0%}",
thickness=thickness,
color=color,
)
if draw_options.get("regions"):
for region in regions:
cv2.rectangle(
frame_copy,
(region[0], region[1]),
(region[2], region[3]),
(0, 255, 0),
2,
)
if draw_options.get("zones"):
for name, zone in self.camera_config.zones.items():
thickness = (
8
if any(
name in obj["current_zones"] for obj in tracked_objects.values()
)
else 2
)
cv2.drawContours(frame_copy, [zone.contour], -1, zone.color, thickness)
if draw_options.get("motion_boxes"):
for m_box in motion_boxes:
cv2.rectangle(
frame_copy,
(m_box[0], m_box[1]),
(m_box[2], m_box[3]),
(0, 0, 255),
2,
)
if draw_options.get("timestamp"):
color = self.camera_config.timestamp_style.color
draw_timestamp(
frame_copy,
frame_time,
self.camera_config.timestamp_style.format,
font_effect=self.camera_config.timestamp_style.effect,
font_thickness=self.camera_config.timestamp_style.thickness,
font_color=(color.blue, color.green, color.red),
position=self.camera_config.timestamp_style.position,
)
return frame_copy
def finished(self, obj_id):
del self.tracked_objects[obj_id]
def on(self, event_type: str, callback: Callable[[dict], None]):
self.callbacks[event_type].append(callback)
def update(
self,
frame_name: str,
frame_time: float,
current_detections: dict[str, dict[str, any]],
motion_boxes: list[tuple[int, int, int, int]],
regions: list[tuple[int, int, int, int]],
):
current_frame = self.frame_manager.get(
frame_name, self.camera_config.frame_shape_yuv
)
tracked_objects = self.tracked_objects.copy()
current_ids = set(current_detections.keys())
previous_ids = set(tracked_objects.keys())
removed_ids = previous_ids.difference(current_ids)
new_ids = current_ids.difference(previous_ids)
updated_ids = current_ids.intersection(previous_ids)
for id in new_ids:
new_obj = tracked_objects[id] = TrackedObject(
self.config.model,
self.camera_config,
self.config.ui,
self.frame_cache,
current_detections[id],
)
# call event handlers
for c in self.callbacks["start"]:
c(self.name, new_obj, frame_name)
for id in updated_ids:
updated_obj = tracked_objects[id]
thumb_update, significant_update, autotracker_update = updated_obj.update(
frame_time, current_detections[id], current_frame is not None
)
if autotracker_update or significant_update:
for c in self.callbacks["autotrack"]:
c(self.name, updated_obj, frame_name)
if thumb_update and current_frame is not None:
# ensure this frame is stored in the cache
if (
updated_obj.thumbnail_data["frame_time"] == frame_time
and frame_time not in self.frame_cache
):
self.frame_cache[frame_time] = np.copy(current_frame)
updated_obj.last_updated = frame_time
# if it has been more than 5 seconds since the last thumb update
# and the last update is greater than the last publish or
# the object has changed significantly
if (
frame_time - updated_obj.last_published > 5
and updated_obj.last_updated > updated_obj.last_published
) or significant_update:
# call event handlers
for c in self.callbacks["update"]:
c(self.name, updated_obj, frame_name)
updated_obj.last_published = frame_time
for id in removed_ids:
# publish events to mqtt
removed_obj = tracked_objects[id]
if "end_time" not in removed_obj.obj_data:
removed_obj.obj_data["end_time"] = frame_time
for c in self.callbacks["end"]:
c(self.name, removed_obj, frame_name)
# TODO: can i switch to looking this up and only changing when an event ends?
# maintain best objects
camera_activity: dict[str, list[any]] = {
"enabled": True,
"motion": len(motion_boxes) > 0,
"objects": [],
}
for obj in tracked_objects.values():
object_type = obj.obj_data["label"]
active = obj.is_active()
if not obj.false_positive:
label = object_type
sub_label = None
if obj.obj_data.get("sub_label"):
if (
obj.obj_data.get("sub_label")[0]
in self.config.model.all_attributes
):
label = obj.obj_data["sub_label"][0]
else:
label = f"{object_type}-verified"
sub_label = obj.obj_data["sub_label"][0]
camera_activity["objects"].append(
{
"id": obj.obj_data["id"],
"label": label,
"stationary": not active,
"area": obj.obj_data["area"],
"ratio": obj.obj_data["ratio"],
"score": obj.obj_data["score"],
"sub_label": sub_label,
"current_zones": obj.current_zones,
}
)
# if we don't have access to the current frame or
# if the object's thumbnail is not from the current frame, skip
if (
current_frame is None
or obj.thumbnail_data is None
or obj.false_positive
or obj.thumbnail_data["frame_time"] != frame_time
):
continue
if object_type in self.best_objects:
current_best = self.best_objects[object_type]
now = datetime.datetime.now().timestamp()
# if the object is a higher score than the current best score
# or the current object is older than desired, use the new object
if (
is_better_thumbnail(
object_type,
current_best.thumbnail_data,
obj.thumbnail_data,
self.camera_config.frame_shape,
)
or (now - current_best.thumbnail_data["frame_time"])
> self.camera_config.best_image_timeout
):
self.best_objects[object_type] = obj
for c in self.callbacks["snapshot"]:
c(self.name, self.best_objects[object_type], frame_name)
else:
self.best_objects[object_type] = obj
for c in self.callbacks["snapshot"]:
c(self.name, self.best_objects[object_type], frame_name)
for c in self.callbacks["camera_activity"]:
c(self.name, camera_activity)
# cleanup thumbnail frame cache
current_thumb_frames = {
obj.thumbnail_data["frame_time"]
for obj in tracked_objects.values()
if not obj.false_positive and obj.thumbnail_data is not None
}
current_best_frames = {
obj.thumbnail_data["frame_time"] for obj in self.best_objects.values()
}
thumb_frames_to_delete = [
t
for t in self.frame_cache.keys()
if t not in current_thumb_frames and t not in current_best_frames
]
for t in thumb_frames_to_delete:
del self.frame_cache[t]
with self.current_frame_lock:
self.tracked_objects = tracked_objects
self.motion_boxes = motion_boxes
self.regions = regions
if current_frame is not None:
self.current_frame_time = frame_time
self._current_frame = np.copy(current_frame)
if self.previous_frame_id is not None:
self.frame_manager.close(self.previous_frame_id)
self.previous_frame_id = frame_name
def shutdown(self) -> None:
for obj in self.tracked_objects.values():
if not obj.obj_data.get("end_time"):
obj.write_thumbnail_to_disk()
class TrackedObjectProcessor(threading.Thread):
def __init__(
self,
@ -452,9 +59,7 @@ class TrackedObjectProcessor(threading.Thread):
self.detection_publisher = DetectionPublisher(DetectionTypeEnum.video)
self.event_sender = EventUpdatePublisher()
self.event_end_subscriber = EventEndSubscriber()
self.sub_label_subscriber = EventMetadataSubscriber(
EventMetadataTypeEnum.sub_label
)
self.sub_label_subscriber = EventMetadataSubscriber(EventMetadataTypeEnum.all)
self.camera_activity: dict[str, dict[str, any]] = {}
@ -677,7 +282,7 @@ class TrackedObjectProcessor(threading.Thread):
def get_current_frame(
self, camera: str, draw_options: dict[str, any] = {}
) -> Optional[np.ndarray]:
) -> np.ndarray | None:
if camera == "birdseye":
return self.frame_manager.get(
"birdseye",
@ -792,13 +397,23 @@ class TrackedObjectProcessor(threading.Thread):
# check for sub label updates
while True:
(topic, payload) = self.sub_label_subscriber.check_for_update(timeout=0)
(raw_topic, payload) = self.sub_label_subscriber.check_for_update(
timeout=0
)
if not topic:
if not raw_topic:
break
(event_id, sub_label, score) = payload
self.set_sub_label(event_id, sub_label, score)
topic = str(raw_topic)
if topic.endswith(EventMetadataTypeEnum.sub_label.value):
(event_id, sub_label, score) = payload
self.set_sub_label(event_id, sub_label, score)
elif topic.endswith(
EventMetadataTypeEnum.manual_event_create.value
) or topic.endswith(EventMetadataTypeEnum.manual_event_end.value):
camera_name = payload[0]
self.camera_states[camera_name].process_manual_event(topic, payload)
try:
(

View File

@ -15,7 +15,7 @@ sys.path.append("/workspace/frigate")
from frigate.config import FrigateConfig # noqa: E402
from frigate.motion import MotionDetector # noqa: E402
from frigate.object_detection import LocalObjectDetector # noqa: E402
from frigate.object_processing import CameraState # noqa: E402
from frigate.track.object_processing import CameraState # noqa: E402
from frigate.track.centroid_tracker import CentroidTracker # noqa: E402
from frigate.util import ( # noqa: E402
EventsPerSecond,