remove recording from feature metrics

This commit is contained in:
Nicolas Mowen 2024-02-16 06:35:20 -07:00
parent ed1cd052db
commit e3dc23a520
5 changed files with 33 additions and 28 deletions

View File

@ -17,6 +17,7 @@ from peewee_migrate import Router
from playhouse.sqlite_ext import SqliteExtDatabase from playhouse.sqlite_ext import SqliteExtDatabase
from playhouse.sqliteq import SqliteQueueDatabase from playhouse.sqliteq import SqliteQueueDatabase
from frigate.comms.config_updater import ConfigPublisher
from frigate.comms.dispatcher import Communicator, Dispatcher from frigate.comms.dispatcher import Communicator, Dispatcher
from frigate.comms.inter_process import InterProcessCommunicator from frigate.comms.inter_process import InterProcessCommunicator
from frigate.comms.mqtt import MqttClient from frigate.comms.mqtt import MqttClient
@ -227,12 +228,6 @@ class FrigateApp:
"i", "i",
self.config.cameras[camera_name].audio.enabled, self.config.cameras[camera_name].audio.enabled,
), ),
"record_enabled": mp.Value( # type: ignore[typeddict-item]
# issue https://github.com/python/typeshed/issues/8799
# from mypy 0.981 onwards
"i",
self.config.cameras[camera_name].record.enabled,
),
} }
def set_log_levels(self) -> None: def set_log_levels(self) -> None:
@ -348,7 +343,6 @@ class FrigateApp:
self.config, self.config,
self.object_recordings_info_queue, self.object_recordings_info_queue,
self.audio_recordings_info_queue, self.audio_recordings_info_queue,
self.feature_metrics,
), ),
) )
recording_process.daemon = True recording_process.daemon = True
@ -386,6 +380,7 @@ class FrigateApp:
def init_inter_process_communicator(self) -> None: def init_inter_process_communicator(self) -> None:
self.inter_process_communicator = InterProcessCommunicator() self.inter_process_communicator = InterProcessCommunicator()
self.inter_config_updater = ConfigPublisher()
def init_web_server(self) -> None: def init_web_server(self) -> None:
self.flask_app = create_app( self.flask_app = create_app(
@ -413,6 +408,7 @@ class FrigateApp:
self.dispatcher = Dispatcher( self.dispatcher = Dispatcher(
self.config, self.config,
self.inter_config_updater,
self.onvif_controller, self.onvif_controller,
self.camera_metrics, self.camera_metrics,
self.feature_metrics, self.feature_metrics,

View File

@ -7,30 +7,26 @@ from typing import Callable, Optional
import zmq import zmq
from frigate.comms.dispatcher import Communicator
from frigate.const import PORT_INTER_PROCESS_CONFIG from frigate.const import PORT_INTER_PROCESS_CONFIG
class ConfigPublisher(Communicator): class ConfigPublisher:
"""Publishes config changes to different processes.""" """Publishes config changes to different processes."""
def __init__(self) -> None: def __init__(self) -> None:
INTER_PROCESS_COMM_PORT = ( INTER_PROCESS_CONFIG_PORT = (
os.environ.get("INTER_PROCESS_CONFIG_PORT") or PORT_INTER_PROCESS_CONFIG os.environ.get("INTER_PROCESS_CONFIG_PORT") or PORT_INTER_PROCESS_CONFIG
) )
self.context = zmq.Context() self.context = zmq.Context()
self.socket = self.context.socket(zmq.PUB) self.socket = self.context.socket(zmq.PUB)
self.socket.bind(f"tcp://127.0.0.1:{INTER_PROCESS_COMM_PORT}") self.socket.bind(f"tcp://127.0.0.1:{INTER_PROCESS_CONFIG_PORT}")
self.stop_event: MpEvent = mp.Event() self.stop_event: MpEvent = mp.Event()
def publish(self, topic: str, payload: str, retain: bool) -> None: def publish(self, topic: str, payload: any) -> None:
"""There is no communication back to the processes.""" """There is no communication back to the processes."""
self.socket.send_string(topic, flags=zmq.SNDMORE) self.socket.send_string(topic, flags=zmq.SNDMORE)
self.socket.send_pyobj(payload) self.socket.send_pyobj(payload)
def subscribe(self, receiver: Callable) -> None:
pass # this class does not subscribe
def stop(self) -> None: def stop(self) -> None:
self.stop_event.set() self.stop_event.set()
self.socket.close() self.socket.close()
@ -47,13 +43,15 @@ class ConfigSubscriber:
self.socket.setsockopt_string(zmq.SUBSCRIBE, topic) self.socket.setsockopt_string(zmq.SUBSCRIBE, topic)
self.socket.connect(f"tcp://127.0.0.1:{port}") self.socket.connect(f"tcp://127.0.0.1:{port}")
def check_for_update(self) -> Optional[any]: def check_for_update(self) -> Optional[tuple[str, any]]:
"""Sends data and then waits for reply.""" """Returns updated config or None if no update."""
try: try:
self.socket.recv_string(flags=zmq.NOBLOCK) # receive the topic string topic = self.socket.recv_string(
return self.socket.recv_pyobj() flags=zmq.NOBLOCK
)
return (topic, self.socket.recv_pyobj())
except zmq.ZMQError: except zmq.ZMQError:
return None return (None, None)
def stop(self) -> None: def stop(self) -> None:
self.socket.close() self.socket.close()

View File

@ -4,6 +4,7 @@ import logging
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Any, Callable, Optional from typing import Any, Callable, Optional
from frigate.comms.config_updater import ConfigPublisher
from frigate.config import BirdseyeModeEnum, FrigateConfig from frigate.config import BirdseyeModeEnum, FrigateConfig
from frigate.const import INSERT_MANY_RECORDINGS, INSERT_PREVIEW, REQUEST_REGION_GRID from frigate.const import INSERT_MANY_RECORDINGS, INSERT_PREVIEW, REQUEST_REGION_GRID
from frigate.models import Previews, Recordings from frigate.models import Previews, Recordings
@ -40,6 +41,7 @@ class Dispatcher:
def __init__( def __init__(
self, self,
config: FrigateConfig, config: FrigateConfig,
config_updater: ConfigPublisher,
onvif: OnvifController, onvif: OnvifController,
camera_metrics: dict[str, CameraMetricsTypes], camera_metrics: dict[str, CameraMetricsTypes],
feature_metrics: dict[str, FeatureMetricsTypes], feature_metrics: dict[str, FeatureMetricsTypes],
@ -47,6 +49,7 @@ class Dispatcher:
communicators: list[Communicator], communicators: list[Communicator],
) -> None: ) -> None:
self.config = config self.config = config
self.config_updater = config_updater
self.onvif = onvif self.onvif = onvif
self.camera_metrics = camera_metrics self.camera_metrics = camera_metrics
self.feature_metrics = feature_metrics self.feature_metrics = feature_metrics
@ -279,6 +282,7 @@ class Dispatcher:
record_settings.enabled = False record_settings.enabled = False
self.feature_metrics[camera_name]["record_enabled"].value = False self.feature_metrics[camera_name]["record_enabled"].value = False
self.config_updater.publish(f"config/record/{camera_name}", self.config.cameras[camera_name].record)
self.publish(f"{camera_name}/recordings/state", payload, retain=True) self.publish(f"{camera_name}/recordings/state", payload, retain=True)
def _on_snapshots_command(self, camera_name: str, payload: str) -> None: def _on_snapshots_command(self, camera_name: str, payload: str) -> None:

View File

@ -17,6 +17,7 @@ from typing import Any, Optional, Tuple
import numpy as np import numpy as np
import psutil import psutil
from frigate.comms.config_updater import ConfigSubscriber
from frigate.comms.inter_process import InterProcessRequestor from frigate.comms.inter_process import InterProcessRequestor
from frigate.config import FrigateConfig, RetainModeEnum from frigate.config import FrigateConfig, RetainModeEnum
from frigate.const import ( from frigate.const import (
@ -62,7 +63,6 @@ class RecordingMaintainer(threading.Thread):
config: FrigateConfig, config: FrigateConfig,
object_recordings_info_queue: mp.Queue, object_recordings_info_queue: mp.Queue,
audio_recordings_info_queue: Optional[mp.Queue], audio_recordings_info_queue: Optional[mp.Queue],
process_info: dict[str, FeatureMetricsTypes],
stop_event: MpEvent, stop_event: MpEvent,
): ):
threading.Thread.__init__(self) threading.Thread.__init__(self)
@ -71,10 +71,10 @@ class RecordingMaintainer(threading.Thread):
# create communication for retained recordings # create communication for retained recordings
self.requestor = InterProcessRequestor() self.requestor = InterProcessRequestor()
self.config_subscriber = ConfigSubscriber("config/record/")
self.object_recordings_info_queue = object_recordings_info_queue self.object_recordings_info_queue = object_recordings_info_queue
self.audio_recordings_info_queue = audio_recordings_info_queue self.audio_recordings_info_queue = audio_recordings_info_queue
self.process_info = process_info
self.stop_event = stop_event self.stop_event = stop_event
self.object_recordings_info: dict[str, list] = defaultdict(list) self.object_recordings_info: dict[str, list] = defaultdict(list)
self.audio_recordings_info: dict[str, list] = defaultdict(list) self.audio_recordings_info: dict[str, list] = defaultdict(list)
@ -200,7 +200,7 @@ class RecordingMaintainer(threading.Thread):
# Just delete files if recordings are turned off # Just delete files if recordings are turned off
if ( if (
camera not in self.config.cameras camera not in self.config.cameras
or not self.process_info[camera]["record_enabled"].value or not self.config.cameras[camera].record.enabled
): ):
Path(cache_path).unlink(missing_ok=True) Path(cache_path).unlink(missing_ok=True)
self.end_time_cache.pop(cache_path, None) self.end_time_cache.pop(cache_path, None)
@ -442,6 +442,16 @@ class RecordingMaintainer(threading.Thread):
wait_time = 0.0 wait_time = 0.0
while not self.stop_event.wait(wait_time): while not self.stop_event.wait(wait_time):
run_start = datetime.datetime.now().timestamp() run_start = datetime.datetime.now().timestamp()
# check if there is an updated config
updated_topic, updated_record_config = (
self.config_subscriber.check_for_update()
)
if updated_topic:
camera_name = updated_topic.rpartition("/")[-1]
self.config.cameras[camera_name].record = updated_record_config
stale_frame_count = 0 stale_frame_count = 0
stale_frame_count_threshold = 10 stale_frame_count_threshold = 10
# empty the object recordings info queue # empty the object recordings info queue
@ -460,7 +470,7 @@ class RecordingMaintainer(threading.Thread):
if frame_time < run_start - stale_frame_count_threshold: if frame_time < run_start - stale_frame_count_threshold:
stale_frame_count += 1 stale_frame_count += 1
if self.process_info[camera]["record_enabled"].value: if self.config.cameras[camera].record.enabled:
self.object_recordings_info[camera].append( self.object_recordings_info[camera].append(
( (
frame_time, frame_time,
@ -498,7 +508,7 @@ class RecordingMaintainer(threading.Thread):
if frame_time < run_start - stale_frame_count_threshold: if frame_time < run_start - stale_frame_count_threshold:
stale_frame_count += 1 stale_frame_count += 1
if self.process_info[camera]["record_enabled"].value: if self.config.cameras[camera].record.enabled:
self.audio_recordings_info[camera].append( self.audio_recordings_info[camera].append(
( (
frame_time, frame_time,

View File

@ -13,7 +13,6 @@ from setproctitle import setproctitle
from frigate.config import FrigateConfig from frigate.config import FrigateConfig
from frigate.models import Event, Recordings from frigate.models import Event, Recordings
from frigate.record.maintainer import RecordingMaintainer from frigate.record.maintainer import RecordingMaintainer
from frigate.types import FeatureMetricsTypes
from frigate.util.services import listen from frigate.util.services import listen
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -23,7 +22,6 @@ def manage_recordings(
config: FrigateConfig, config: FrigateConfig,
object_recordings_info_queue: mp.Queue, object_recordings_info_queue: mp.Queue,
audio_recordings_info_queue: mp.Queue, audio_recordings_info_queue: mp.Queue,
process_info: dict[str, FeatureMetricsTypes],
) -> None: ) -> None:
stop_event = mp.Event() stop_event = mp.Event()
@ -53,7 +51,6 @@ def manage_recordings(
config, config,
object_recordings_info_queue, object_recordings_info_queue,
audio_recordings_info_queue, audio_recordings_info_queue,
process_info,
stop_event, stop_event,
) )
maintainer.start() maintainer.start()