frigate/frigate/camera/activity_manager.py
Josh Hawkins 95956a690b
Some checks are pending
CI / AMD64 Build (push) Waiting to run
CI / ARM Build (push) Waiting to run
CI / Jetson Jetpack 6 (push) Waiting to run
CI / AMD64 Extra Build (push) Blocked by required conditions
CI / ARM Extra Build (push) Blocked by required conditions
CI / Synaptics Build (push) Blocked by required conditions
CI / Assemble and push default build (push) Blocked by required conditions
Debug replay (#22212)
* debug replay implementation

* fix masks after dev rebase

* fix squash merge issues

* fix

* fix

* fix

* no need to write debug replay camera to config

* camera and filter button and dropdown

* add filters

* add ability to edit motion and object config for debug replay

* add debug draw overlay to debug replay

* add guard to prevent crash when camera is no longer in camera_states

* fix overflow due to radix absolutely positioned elements

* increase number of messages

* ensure deep_merge replaces existing list values when override is true

* add back button

* add debug replay to explore and review menus

* clean up

* clean up

* update instructions to prevent exposing exception info

* fix typing

* refactor output logic

* refactor with helper function

* move init to function for consistency
2026-03-04 10:07:34 -06:00

274 lines
10 KiB
Python

"""Manage camera activity and updating listeners."""
import datetime
import json
import logging
import random
import string
from collections import Counter
from typing import Any, Callable
from frigate.comms.event_metadata_updater import (
EventMetadataPublisher,
EventMetadataTypeEnum,
)
from frigate.config import CameraConfig, FrigateConfig
logger = logging.getLogger(__name__)
class CameraActivityManager:
def __init__(
self, config: FrigateConfig, publish: Callable[[str, Any], None]
) -> None:
self.config = config
self.publish = publish
self.last_camera_activity: dict[str, dict[str, Any]] = {}
self.camera_all_object_counts: dict[str, Counter] = {}
self.camera_active_object_counts: dict[str, Counter] = {}
self.zone_all_object_counts: dict[str, Counter] = {}
self.zone_active_object_counts: dict[str, Counter] = {}
self.all_zone_labels: dict[str, set[str]] = {}
for camera_config in config.cameras.values():
if not camera_config.enabled_in_config:
continue
self.__init_camera(camera_config)
def __init_camera(self, camera_config: CameraConfig) -> None:
self.last_camera_activity[camera_config.name] = {}
self.camera_all_object_counts[camera_config.name] = Counter()
self.camera_active_object_counts[camera_config.name] = Counter()
for zone, zone_config in camera_config.zones.items():
if zone not in self.all_zone_labels:
self.zone_all_object_counts[zone] = Counter()
self.zone_active_object_counts[zone] = Counter()
self.all_zone_labels[zone] = set()
self.all_zone_labels[zone].update(
zone_config.objects
if zone_config.objects
else camera_config.objects.track
)
def update_activity(self, new_activity: dict[str, dict[str, Any]]) -> None:
all_objects: list[dict[str, Any]] = []
for camera in new_activity.keys():
if camera not in self.config.cameras:
continue
# handle cameras that were added dynamically
if camera not in self.camera_all_object_counts:
self.__init_camera(self.config.cameras[camera])
new_objects = new_activity[camera].get("objects", [])
all_objects.extend(new_objects)
if self.last_camera_activity.get(camera, {}).get("objects") != new_objects:
self.compare_camera_activity(camera, new_objects)
# run through every zone, getting a count of objects in that zone right now
for zone, labels in self.all_zone_labels.items():
all_zone_objects = Counter(
obj["label"].replace("-verified", "")
for obj in all_objects
if zone in obj["current_zones"]
)
active_zone_objects = Counter(
obj["label"].replace("-verified", "")
for obj in all_objects
if zone in obj["current_zones"] and not obj["stationary"]
)
any_changed = False
# run through each object and check what topics need to be updated for this zone
for label in labels:
new_count = all_zone_objects[label]
new_active_count = active_zone_objects[label]
if (
new_count != self.zone_all_object_counts[zone][label]
or label not in self.zone_all_object_counts[zone]
):
any_changed = True
self.publish(f"{zone}/{label}", new_count)
self.zone_all_object_counts[zone][label] = new_count
if (
new_active_count != self.zone_active_object_counts[zone][label]
or label not in self.zone_active_object_counts[zone]
):
any_changed = True
self.publish(f"{zone}/{label}/active", new_active_count)
self.zone_active_object_counts[zone][label] = new_active_count
if any_changed:
self.publish(f"{zone}/all", sum(list(all_zone_objects.values())))
self.publish(
f"{zone}/all/active", sum(list(active_zone_objects.values()))
)
self.last_camera_activity = new_activity
def compare_camera_activity(
self, camera: str, new_activity: dict[str, Any]
) -> None:
all_objects = Counter(
obj["label"].replace("-verified", "") for obj in new_activity
)
active_objects = Counter(
obj["label"].replace("-verified", "")
for obj in new_activity
if not obj["stationary"]
)
any_changed = False
# run through each object and check what topics need to be updated
camera_config = self.config.cameras.get(camera)
if camera_config is None:
return
for label in camera_config.objects.track:
if label in self.config.model.non_logo_attributes:
continue
new_count = all_objects[label]
new_active_count = active_objects[label]
if (
new_count != self.camera_all_object_counts[camera][label]
or label not in self.camera_all_object_counts[camera]
):
any_changed = True
self.publish(f"{camera}/{label}", new_count)
self.camera_all_object_counts[camera][label] = new_count
if (
new_active_count != self.camera_active_object_counts[camera][label]
or label not in self.camera_active_object_counts[camera]
):
any_changed = True
self.publish(f"{camera}/{label}/active", new_active_count)
self.camera_active_object_counts[camera][label] = new_active_count
if any_changed:
self.publish(f"{camera}/all", sum(list(all_objects.values())))
self.publish(f"{camera}/all/active", sum(list(active_objects.values())))
class AudioActivityManager:
def __init__(
self, config: FrigateConfig, publish: Callable[[str, Any], None]
) -> None:
self.config = config
self.publish = publish
self.current_audio_detections: dict[str, dict[str, dict[str, Any]]] = {}
self.event_metadata_publisher = EventMetadataPublisher()
for camera_config in config.cameras.values():
if not camera_config.audio.enabled_in_config:
continue
self.__init_camera(camera_config)
def __init_camera(self, camera_config: CameraConfig) -> None:
self.current_audio_detections[camera_config.name] = {}
def update_activity(self, new_activity: dict[str, dict[str, Any]]) -> None:
now = datetime.datetime.now().timestamp()
for camera in new_activity.keys():
if camera not in self.config.cameras:
continue
# handle cameras that were added dynamically
if camera not in self.current_audio_detections:
self.__init_camera(self.config.cameras[camera])
new_detections = new_activity[camera].get("detections", [])
if self.compare_audio_activity(camera, new_detections, now):
logger.debug(f"Audio detections for {camera}: {new_activity}")
self.publish(
f"{camera}/audio/all",
"ON" if len(self.current_audio_detections[camera]) > 0 else "OFF",
)
self.publish(
"audio_detections",
json.dumps(self.current_audio_detections),
)
def compare_audio_activity(
self, camera: str, new_detections: list[tuple[str, float]], now: float
) -> None:
camera_config = self.config.cameras.get(camera)
if camera_config is None:
return False
max_not_heard = camera_config.audio.max_not_heard
current = self.current_audio_detections[camera]
any_changed = False
for label, score in new_detections:
any_changed = True
if label in current:
current[label]["last_detection"] = now
current[label]["score"] = score
else:
rand_id = "".join(
random.choices(string.ascii_lowercase + string.digits, k=6)
)
event_id = f"{now}-{rand_id}"
self.publish(f"{camera}/audio/{label}", "ON")
self.event_metadata_publisher.publish(
(
now,
camera,
label,
event_id,
True,
score,
None,
None,
"audio",
{},
),
EventMetadataTypeEnum.manual_event_create.value,
)
current[label] = {
"id": event_id,
"score": score,
"last_detection": now,
}
# expire detections
for label in list(current.keys()):
if now - current[label]["last_detection"] > max_not_heard:
any_changed = True
self.publish(f"{camera}/audio/{label}", "OFF")
self.event_metadata_publisher.publish(
(current[label]["id"], now),
EventMetadataTypeEnum.manual_event_end.value,
)
del current[label]
return any_changed
def expire_all(self, camera: str) -> None:
now = datetime.datetime.now().timestamp()
current = self.current_audio_detections.get(camera, {})
for label in list(current.keys()):
self.publish(f"{camera}/audio/{label}", "OFF")
self.event_metadata_publisher.publish(
(current[label]["id"], now),
EventMetadataTypeEnum.manual_event_end.value,
)
del current[label]