frigate/frigate/camera/activity_manager.py
Josh Hawkins 9b02c7318d
Some checks are pending
CI / AMD64 Extra Build (push) Blocked by required conditions
CI / ARM Extra Build (push) Blocked by required conditions
CI / Synaptics Build (push) Blocked by required conditions
CI / Assemble and push default build (push) Blocked by required conditions
CI / AMD64 Build (push) Waiting to run
CI / ARM Build (push) Waiting to run
CI / Jetson Jetpack 6 (push) Waiting to run
Miscellaneous fixes (#23610)
* Handle back seeking going to previous clip

* scope /recordings/unavailable query to the caller's allowed cameras

* listen for config updates in activity manager

* don't set search after awaited request

Intentionally do NOT setSearch() to mark the open event submitted. This runs after the awaited request, by which point the user may have closed the dialog; re-setting the parent's selected event would resurrect it and the force-open effect would reopen it (see #23599). The local "submitted" state covers the open card, and mutate() updates the events cache so the grid and any future open reflect the result.

* fix ruff

#23201 removed pathlib import but for some reason it's just now causing ruff to fail

---------

Co-authored-by: Nicolas Mowen <nickmowen213@gmail.com>
2026-07-01 15:03:34 -05:00

326 lines
12 KiB
Python

"""Manage camera activity and updating listeners."""
import datetime
import json
import logging
import random
import string
from collections import Counter
from typing import Any, Callable
from frigate.comms.event_metadata_updater import (
EventMetadataPublisher,
EventMetadataTypeEnum,
)
from frigate.config import CameraConfig, FrigateConfig
from frigate.config.camera.updater import (
CameraConfigUpdateEnum,
CameraConfigUpdateSubscriber,
)
logger = logging.getLogger(__name__)
class CameraActivityManager:
def __init__(
self, config: FrigateConfig, publish: Callable[[str, Any], None]
) -> None:
self.config = config
self.publish = publish
self.last_camera_activity: dict[str, dict[str, Any]] = {}
self.camera_all_object_counts: dict[str, Counter] = {}
self.camera_active_object_counts: dict[str, Counter] = {}
self.zone_all_object_counts: dict[str, Counter] = {}
self.zone_active_object_counts: dict[str, Counter] = {}
self.all_zone_labels: dict[str, set[str]] = {}
self.config_subscriber = CameraConfigUpdateSubscriber(
config,
config.cameras,
[CameraConfigUpdateEnum.zones, CameraConfigUpdateEnum.objects],
)
for camera_config in config.cameras.values():
if not camera_config.enabled_in_config:
continue
self.__init_camera(camera_config)
def __init_camera(self, camera_config: CameraConfig) -> None:
if camera_config.name is None:
return
self.last_camera_activity[camera_config.name] = {}
self.camera_all_object_counts[camera_config.name] = Counter()
self.camera_active_object_counts[camera_config.name] = Counter()
for zone, zone_config in camera_config.zones.items():
if zone not in self.all_zone_labels:
self.zone_all_object_counts[zone] = Counter()
self.zone_active_object_counts[zone] = Counter()
self.all_zone_labels[zone] = set()
self.all_zone_labels[zone].update(
zone_config.objects
if zone_config.objects
else camera_config.objects.track
)
def __rebuild_zone_labels(self) -> None:
"""Rebuild zone label tracking after a runtime zones/objects change."""
new_zone_labels: dict[str, set[str]] = {}
for camera_config in self.config.cameras.values():
if not camera_config.enabled_in_config or camera_config.name is None:
continue
for zone, zone_config in camera_config.zones.items():
new_zone_labels.setdefault(zone, set()).update(
zone_config.objects
if zone_config.objects
else camera_config.objects.track
)
# drop counters for zones that no longer exist
for zone in list(self.zone_all_object_counts.keys()):
if zone not in new_zone_labels:
self.zone_all_object_counts.pop(zone, None)
self.zone_active_object_counts.pop(zone, None)
# ensure counters exist for new zones so the first count is published
for zone in new_zone_labels:
self.zone_all_object_counts.setdefault(zone, Counter())
self.zone_active_object_counts.setdefault(zone, Counter())
self.all_zone_labels = new_zone_labels
def update_activity(self, new_activity: dict[str, dict[str, Any]]) -> None:
updated_topics = self.config_subscriber.check_for_updates()
if "zones" in updated_topics or "objects" in updated_topics:
self.__rebuild_zone_labels()
all_objects: list[dict[str, Any]] = []
for camera in new_activity.keys():
if camera not in self.config.cameras:
continue
# handle cameras that were added dynamically
if camera not in self.camera_all_object_counts:
self.__init_camera(self.config.cameras[camera])
new_objects = new_activity[camera].get("objects", [])
all_objects.extend(new_objects)
if self.last_camera_activity.get(camera, {}).get("objects") != new_objects:
self.compare_camera_activity(camera, new_objects)
# run through every zone, getting a count of objects in that zone right now
for zone, labels in self.all_zone_labels.items():
all_zone_objects = Counter(
obj["label"].replace("-verified", "")
for obj in all_objects
if zone in obj["current_zones"]
)
active_zone_objects = Counter(
obj["label"].replace("-verified", "")
for obj in all_objects
if zone in obj["current_zones"] and not obj["stationary"]
)
any_changed = False
# run through each object and check what topics need to be updated for this zone
for label in labels:
new_count = all_zone_objects[label]
new_active_count = active_zone_objects[label]
if (
new_count != self.zone_all_object_counts[zone][label]
or label not in self.zone_all_object_counts[zone]
):
any_changed = True
self.publish(f"{zone}/{label}", new_count)
self.zone_all_object_counts[zone][label] = new_count
if (
new_active_count != self.zone_active_object_counts[zone][label]
or label not in self.zone_active_object_counts[zone]
):
any_changed = True
self.publish(f"{zone}/{label}/active", new_active_count)
self.zone_active_object_counts[zone][label] = new_active_count
if any_changed:
self.publish(f"{zone}/all", sum(list(all_zone_objects.values())))
self.publish(
f"{zone}/all/active", sum(list(active_zone_objects.values()))
)
self.last_camera_activity = new_activity
def compare_camera_activity(
self, camera: str, new_activity: list[dict[str, Any]]
) -> None:
all_objects = Counter(
obj["label"].replace("-verified", "") for obj in new_activity
)
active_objects = Counter(
obj["label"].replace("-verified", "")
for obj in new_activity
if not obj["stationary"]
)
any_changed = False
# run through each object and check what topics need to be updated
camera_config = self.config.cameras.get(camera)
if camera_config is None:
return
for label in camera_config.objects.track:
if label in self.config.model.non_logo_attributes:
continue
new_count = all_objects[label]
new_active_count = active_objects[label]
if (
new_count != self.camera_all_object_counts[camera][label]
or label not in self.camera_all_object_counts[camera]
):
any_changed = True
self.publish(f"{camera}/{label}", new_count)
self.camera_all_object_counts[camera][label] = new_count
if (
new_active_count != self.camera_active_object_counts[camera][label]
or label not in self.camera_active_object_counts[camera]
):
any_changed = True
self.publish(f"{camera}/{label}/active", new_active_count)
self.camera_active_object_counts[camera][label] = new_active_count
if any_changed:
self.publish(f"{camera}/all", sum(list(all_objects.values())))
self.publish(f"{camera}/all/active", sum(list(active_objects.values())))
def stop(self) -> None:
self.config_subscriber.stop()
class AudioActivityManager:
def __init__(
self, config: FrigateConfig, publish: Callable[[str, Any], None]
) -> None:
self.config = config
self.publish = publish
self.current_audio_detections: dict[str, dict[str, dict[str, Any]]] = {}
self.event_metadata_publisher = EventMetadataPublisher()
for camera_config in config.cameras.values():
if not camera_config.audio.enabled_in_config:
continue
self.__init_camera(camera_config)
def __init_camera(self, camera_config: CameraConfig) -> None:
if camera_config.name is None:
return
self.current_audio_detections[camera_config.name] = {}
def update_activity(self, new_activity: dict[str, dict[str, Any]]) -> None:
now = datetime.datetime.now().timestamp()
for camera in new_activity.keys():
if camera not in self.config.cameras:
continue
# handle cameras that were added dynamically
if camera not in self.current_audio_detections:
self.__init_camera(self.config.cameras[camera])
new_detections = new_activity[camera].get("detections", [])
if self.compare_audio_activity(camera, new_detections, now):
logger.debug(f"Audio detections for {camera}: {new_activity}")
self.publish(
f"{camera}/audio/all",
"ON" if len(self.current_audio_detections[camera]) > 0 else "OFF",
)
self.publish(
"audio_detections",
json.dumps(self.current_audio_detections),
)
def compare_audio_activity(
self, camera: str, new_detections: list[tuple[str, float]], now: float
) -> bool:
camera_config = self.config.cameras.get(camera)
if camera_config is None:
return False
max_not_heard = camera_config.audio.max_not_heard
current = self.current_audio_detections[camera]
any_changed = False
for label, score in new_detections:
any_changed = True
if label in current:
current[label]["last_detection"] = now
current[label]["score"] = score
else:
rand_id = "".join(
random.choices(string.ascii_lowercase + string.digits, k=6)
)
event_id = f"{now}-{rand_id}"
self.publish(f"{camera}/audio/{label}", "ON")
self.event_metadata_publisher.publish(
(
now,
camera,
label,
event_id,
True,
score,
None,
None,
"audio",
{},
None,
),
EventMetadataTypeEnum.manual_event_create.value,
)
current[label] = {
"id": event_id,
"score": score,
"last_detection": now,
}
# expire detections
for label in list(current.keys()):
if now - current[label]["last_detection"] > max_not_heard:
any_changed = True
self.publish(f"{camera}/audio/{label}", "OFF")
self.event_metadata_publisher.publish(
(current[label]["id"], now),
EventMetadataTypeEnum.manual_event_end.value,
)
del current[label]
return any_changed
def expire_all(self, camera: str) -> None:
now = datetime.datetime.now().timestamp()
current = self.current_audio_detections.get(camera, {})
for label in list(current.keys()):
self.publish(f"{camera}/audio/{label}", "OFF")
self.event_metadata_publisher.publish(
(current[label]["id"], now),
EventMetadataTypeEnum.manual_event_end.value,
)
del current[label]