Migrate to new dispatcher

This commit is contained in:
Nick Mowen 2022-11-22 13:40:31 -07:00
parent 5e1cf474d6
commit 5cf3dc4798
5 changed files with 41 additions and 40 deletions

View File

@ -13,14 +13,16 @@ from peewee_migrate import Router
from playhouse.sqlite_ext import SqliteExtDatabase from playhouse.sqlite_ext import SqliteExtDatabase
from playhouse.sqliteq import SqliteQueueDatabase from playhouse.sqliteq import SqliteQueueDatabase
from frigate.config import DetectorTypeEnum, FrigateConfig from frigate.communication.dispatcher import Communicator, Dispatcher
from frigate.communication.mqtt import MqttClient
from frigate.communication.ws import WebSocketClient
from frigate.config import FrigateConfig
from frigate.const import CACHE_DIR, CLIPS_DIR, RECORD_DIR from frigate.const import CACHE_DIR, CLIPS_DIR, RECORD_DIR
from frigate.object_detection import ObjectDetectProcess from frigate.object_detection import ObjectDetectProcess
from frigate.events import EventCleanup, EventProcessor from frigate.events import EventCleanup, EventProcessor
from frigate.http import create_app from frigate.http import create_app
from frigate.log import log_process, root_configurer from frigate.log import log_process, root_configurer
from frigate.models import Event, Recordings from frigate.models import Event, Recordings
from frigate.mqtt import FrigateMqttClient, MqttSocketRelay
from frigate.object_processing import TrackedObjectProcessor from frigate.object_processing import TrackedObjectProcessor
from frigate.output import output_frames from frigate.output import output_frames
from frigate.plus import PlusApi from frigate.plus import PlusApi
@ -168,14 +170,16 @@ class FrigateApp:
self.restream = RestreamApi(self.config) self.restream = RestreamApi(self.config)
self.restream.add_cameras() self.restream.add_cameras()
def init_mqtt(self) -> None: def init_dispatcher(self) -> None:
self.mqtt_client = FrigateMqttClient(self.config, self.camera_metrics) comms: list[Communicator] = []
def start_mqtt_relay(self) -> None: if self.config.mqtt.enabled:
self.mqtt_relay = MqttSocketRelay( comms.append(MqttClient(self.config))
self.mqtt_client, self.config.mqtt.topic_prefix
) self.ws_client = WebSocketClient(self.config)
self.mqtt_relay.start() self.ws_client.start()
comms.append(self.ws_client)
self.dispatcher = Dispatcher(self.config, self.camera_metrics, comms)
def start_detectors(self) -> None: def start_detectors(self) -> None:
for name in self.config.cameras.keys(): for name in self.config.cameras.keys():
@ -214,7 +218,7 @@ class FrigateApp:
def start_detected_frames_processor(self) -> None: def start_detected_frames_processor(self) -> None:
self.detected_frames_processor = TrackedObjectProcessor( self.detected_frames_processor = TrackedObjectProcessor(
self.config, self.config,
self.mqtt_client, self.dispatcher,
self.config.mqtt.topic_prefix, self.config.mqtt.topic_prefix,
self.detected_frames_queue, self.detected_frames_queue,
self.event_queue, self.event_queue,
@ -312,7 +316,7 @@ class FrigateApp:
self.stats_emitter = StatsEmitter( self.stats_emitter = StatsEmitter(
self.config, self.config,
self.stats_tracking, self.stats_tracking,
self.mqtt_client, self.dispatcher,
self.config.mqtt.topic_prefix, self.config.mqtt.topic_prefix,
self.stop_event, self.stop_event,
) )
@ -350,7 +354,7 @@ class FrigateApp:
self.set_log_levels() self.set_log_levels()
self.init_queues() self.init_queues()
self.init_database() self.init_database()
self.init_mqtt() self.init_dispatcher
except Exception as e: except Exception as e:
print(e) print(e)
self.log_process.terminate() self.log_process.terminate()
@ -363,7 +367,6 @@ class FrigateApp:
self.start_camera_capture_processes() self.start_camera_capture_processes()
self.init_stats() self.init_stats()
self.init_web_server() self.init_web_server()
self.start_mqtt_relay()
self.start_event_processor() self.start_event_processor()
self.start_event_cleanup() self.start_event_cleanup()
self.start_recording_maintainer() self.start_recording_maintainer()
@ -390,7 +393,7 @@ class FrigateApp:
logger.info(f"Stopping...") logger.info(f"Stopping...")
self.stop_event.set() self.stop_event.set()
self.mqtt_relay.stop() self.ws_client.stop()
self.detected_frames_processor.join() self.detected_frames_processor.join()
self.event_processor.join() self.event_processor.join()
self.event_cleanup.join() self.event_cleanup.join()

View File

@ -11,15 +11,12 @@ from frigate.types import CameraMetricsTypes
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class FrigateMqttClient(Communicator): class MqttClient(Communicator):
"""Frigate wrapper for mqtt client.""" """Frigate wrapper for mqtt client."""
def __init__( def __init__(self, config: FrigateConfig) -> None:
self, config: FrigateConfig, camera_metrics: dict[str, CameraMetricsTypes]
) -> None:
self.config = config self.config = config
self.mqtt_config = config.mqtt self.mqtt_config = config.mqtt
self.camera_metrics = camera_metrics
self.connected: bool = False self.connected: bool = False
self._start() self._start()

View File

@ -14,11 +14,12 @@ from ws4py.server.wsgiutils import WebSocketWSGIApplication
from ws4py.websocket import WebSocket from ws4py.websocket import WebSocket
from frigate.communication.dispatcher import Communicator from frigate.communication.dispatcher import Communicator
from frigate.config import FrigateConfig
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class WebSocketHandler(WebSocket): class _WebSocketHandler(WebSocket):
def received_message(self, message): def received_message(self, message):
try: try:
json_message = json.loads(message.data.decode("utf-8")) json_message = json.loads(message.data.decode("utf-8"))
@ -44,8 +45,8 @@ class WebSocketHandler(WebSocket):
class WebSocketClient(Communicator): class WebSocketClient(Communicator):
"""Frigate wrapper for ws client.""" """Frigate wrapper for ws client."""
def __init__(self) -> None: def __init__(self, config: FrigateConfig) -> None:
pass self.config = config
def start(self): def start(self):
@ -56,7 +57,7 @@ class WebSocketClient(Communicator):
5002, 5002,
server_class=WSGIServer, server_class=WSGIServer,
handler_class=WebSocketWSGIRequestHandler, handler_class=WebSocketWSGIRequestHandler,
app=WebSocketWSGIApplication(handler_cls=WebSocketHandler), app=WebSocketWSGIApplication(handler_cls=_WebSocketHandler),
) )
self.websocket_server.initialize_websockets_manager() self.websocket_server.initialize_websockets_manager()
self.websocket_thread = threading.Thread( self.websocket_thread = threading.Thread(

View File

@ -12,6 +12,7 @@ from typing import Callable
import cv2 import cv2
import numpy as np import numpy as np
from frigate.communication.dispatcher import Dispatcher
from frigate.config import ( from frigate.config import (
CameraConfig, CameraConfig,
MqttConfig, MqttConfig,
@ -20,7 +21,6 @@ from frigate.config import (
FrigateConfig, FrigateConfig,
) )
from frigate.const import CLIPS_DIR from frigate.const import CLIPS_DIR
from frigate.mqtt import FrigateMqttClient
from frigate.util import ( from frigate.util import (
SharedMemoryFrameManager, SharedMemoryFrameManager,
calculate_region, calculate_region,
@ -633,7 +633,7 @@ class TrackedObjectProcessor(threading.Thread):
def __init__( def __init__(
self, self,
config: FrigateConfig, config: FrigateConfig,
client: FrigateMqttClient, dispatcher: Dispatcher,
topic_prefix, topic_prefix,
tracked_objects_queue, tracked_objects_queue,
event_queue, event_queue,
@ -645,7 +645,7 @@ class TrackedObjectProcessor(threading.Thread):
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.name = "detected_frames_processor" self.name = "detected_frames_processor"
self.config = config self.config = config
self.client = client self.dispatcher = dispatcher
self.topic_prefix = topic_prefix self.topic_prefix = topic_prefix
self.tracked_objects_queue = tracked_objects_queue self.tracked_objects_queue = tracked_objects_queue
self.event_queue = event_queue self.event_queue = event_queue
@ -669,7 +669,7 @@ class TrackedObjectProcessor(threading.Thread):
"after": after, "after": after,
"type": "new" if obj.previous["false_positive"] else "update", "type": "new" if obj.previous["false_positive"] else "update",
} }
self.client.publish( self.dispatcher.publish(
f"{self.topic_prefix}/events", json.dumps(message), retain=False f"{self.topic_prefix}/events", json.dumps(message), retain=False
) )
obj.previous = after obj.previous = after
@ -724,7 +724,7 @@ class TrackedObjectProcessor(threading.Thread):
"after": obj.to_dict(), "after": obj.to_dict(),
"type": "end", "type": "end",
} }
self.client.publish( self.dispatcher.publish(
f"{self.topic_prefix}/events", json.dumps(message), retain=False f"{self.topic_prefix}/events", json.dumps(message), retain=False
) )
@ -746,14 +746,14 @@ class TrackedObjectProcessor(threading.Thread):
f"Unable to send mqtt snapshot for {obj.obj_data['id']}." f"Unable to send mqtt snapshot for {obj.obj_data['id']}."
) )
else: else:
self.client.publish( self.dispatcher.publish(
f"{self.topic_prefix}/{camera}/{obj.obj_data['label']}/snapshot", f"{self.topic_prefix}/{camera}/{obj.obj_data['label']}/snapshot",
jpg_bytes, jpg_bytes,
retain=True, retain=True,
) )
def object_status(camera, object_name, status): def object_status(camera, object_name, status):
self.client.publish( self.dispatcher.publish(
f"{self.topic_prefix}/{camera}/{object_name}", status, retain=False f"{self.topic_prefix}/{camera}/{object_name}", status, retain=False
) )
@ -853,7 +853,7 @@ class TrackedObjectProcessor(threading.Thread):
if motion_boxes: if motion_boxes:
# only send ON if motion isn't already active # only send ON if motion isn't already active
if self.last_motion_detected.get(camera, 0) == 0: if self.last_motion_detected.get(camera, 0) == 0:
self.client.publish( self.dispatcher.publish(
f"{self.topic_prefix}/{camera}/motion", f"{self.topic_prefix}/{camera}/motion",
"ON", "ON",
retain=False, retain=False,
@ -866,7 +866,7 @@ class TrackedObjectProcessor(threading.Thread):
# If no motion, make sure the off_delay has passed # If no motion, make sure the off_delay has passed
if frame_time - self.last_motion_detected.get(camera, 0) >= mqtt_delay: if frame_time - self.last_motion_detected.get(camera, 0) >= mqtt_delay:
self.client.publish( self.dispatcher.publish(
f"{self.topic_prefix}/{camera}/motion", f"{self.topic_prefix}/{camera}/motion",
"OFF", "OFF",
retain=False, retain=False,
@ -962,7 +962,7 @@ class TrackedObjectProcessor(threading.Thread):
) )
new_count = sum(zone_label.values()) new_count = sum(zone_label.values())
if new_count != current_count: if new_count != current_count:
self.client.publish( self.dispatcher.publish(
f"{self.topic_prefix}/{zone}/{label}", f"{self.topic_prefix}/{zone}/{label}",
new_count, new_count,
retain=False, retain=False,
@ -975,7 +975,7 @@ class TrackedObjectProcessor(threading.Thread):
else: else:
if label in obj_counter: if label in obj_counter:
zone_label[camera] = obj_counter[label] zone_label[camera] = obj_counter[label]
self.client.publish( self.dispatcher.publish(
f"{self.topic_prefix}/{zone}/{label}", f"{self.topic_prefix}/{zone}/{label}",
obj_counter[label], obj_counter[label],
retain=False, retain=False,
@ -992,7 +992,7 @@ class TrackedObjectProcessor(threading.Thread):
new_count = sum(zone_label.values()) new_count = sum(zone_label.values())
if new_count != current_count: if new_count != current_count:
self.client.publish( self.dispatcher.publish(
f"{self.topic_prefix}/{zone}/all", f"{self.topic_prefix}/{zone}/all",
new_count, new_count,
retain=False, retain=False,
@ -1000,7 +1000,7 @@ class TrackedObjectProcessor(threading.Thread):
# if this is a new zone all label for this camera # if this is a new zone all label for this camera
else: else:
zone_label[camera] = total_label_count zone_label[camera] = total_label_count
self.client.publish( self.dispatcher.publish(
f"{self.topic_prefix}/{zone}/all", f"{self.topic_prefix}/{zone}/all",
total_label_count, total_label_count,
retain=False, retain=False,

View File

@ -9,9 +9,9 @@ import requests
from typing import Optional, Any from typing import Optional, Any
from multiprocessing.synchronize import Event as MpEvent from multiprocessing.synchronize import Event as MpEvent
from frigate.communication.dispatcher import Dispatcher
from frigate.config import FrigateConfig from frigate.config import FrigateConfig
from frigate.const import RECORD_DIR, CLIPS_DIR, CACHE_DIR from frigate.const import RECORD_DIR, CLIPS_DIR, CACHE_DIR
from frigate.mqtt import FrigateMqttClient
from frigate.types import StatsTrackingTypes, CameraMetricsTypes from frigate.types import StatsTrackingTypes, CameraMetricsTypes
from frigate.version import VERSION from frigate.version import VERSION
from frigate.util import get_cpu_stats from frigate.util import get_cpu_stats
@ -146,7 +146,7 @@ class StatsEmitter(threading.Thread):
self, self,
config: FrigateConfig, config: FrigateConfig,
stats_tracking: StatsTrackingTypes, stats_tracking: StatsTrackingTypes,
mqtt_client: FrigateMqttClient, dispatcher: Dispatcher,
topic_prefix: str, topic_prefix: str,
stop_event: MpEvent, stop_event: MpEvent,
): ):
@ -154,7 +154,7 @@ class StatsEmitter(threading.Thread):
self.name = "frigate_stats_emitter" self.name = "frigate_stats_emitter"
self.config = config self.config = config
self.stats_tracking = stats_tracking self.stats_tracking = stats_tracking
self.mqtt_client = mqtt_client self.dispatcher = dispatcher
self.topic_prefix = topic_prefix self.topic_prefix = topic_prefix
self.stop_event = stop_event self.stop_event = stop_event
@ -162,7 +162,7 @@ class StatsEmitter(threading.Thread):
time.sleep(10) time.sleep(10)
while not self.stop_event.wait(self.config.mqtt.stats_interval): while not self.stop_event.wait(self.config.mqtt.stats_interval):
stats = stats_snapshot(self.stats_tracking) stats = stats_snapshot(self.stats_tracking)
self.mqtt_client.publish( self.dispatcher.publish(
f"{self.topic_prefix}/stats", json.dumps(stats), retain=False f"{self.topic_prefix}/stats", json.dumps(stats), retain=False
) )
logger.info(f"Exiting watchdog...") logger.info(f"Exiting watchdog...")