refactor dispatcher

This commit is contained in:
Josh Hawkins 2024-10-10 13:58:04 -05:00
parent 8ade85edec
commit 4d0f523b5a

View File

@ -15,6 +15,7 @@ from frigate.const import (
INSERT_PREVIEW, INSERT_PREVIEW,
REQUEST_REGION_GRID, REQUEST_REGION_GRID,
UPDATE_CAMERA_ACTIVITY, UPDATE_CAMERA_ACTIVITY,
UPDATE_EMBEDDINGS_REINDEX_PROGRESS,
UPDATE_EVENT_DESCRIPTION, UPDATE_EVENT_DESCRIPTION,
UPDATE_MODEL_STATE, UPDATE_MODEL_STATE,
UPSERT_REVIEW_SEGMENT, UPSERT_REVIEW_SEGMENT,
@ -86,35 +87,27 @@ class Dispatcher:
self.camera_activity = {} self.camera_activity = {}
self.model_state = {} self.model_state = {}
self.embeddings_reindex = {}
def _receive(self, topic: str, payload: str) -> Optional[Any]: def _receive(self, topic: str, payload: str) -> Optional[Any]:
"""Handle receiving of payload from communicators.""" """Handle receiving of payload from communicators."""
if topic.endswith("set"):
def handle_camera_command(command_type, camera_name, payload):
try: try:
# example /cam_name/detect/set payload=ON|OFF if command_type == "set":
if topic.count("/") == 2: self._camera_settings_handlers[camera_name](camera_name, payload)
camera_name = topic.split("/")[-3] elif command_type == "ptz":
command = topic.split("/")[-2] self._on_ptz_command(camera_name, payload)
self._camera_settings_handlers[command](camera_name, payload) except KeyError:
elif topic.count("/") == 1: logger.error(f"Invalid command type: {command_type}")
command = topic.split("/")[-2]
self._global_settings_handlers[command](payload) def handle_restart():
except IndexError:
logger.error(f"Received invalid set command: {topic}")
return
elif topic.endswith("ptz"):
try:
# example /cam_name/ptz payload=MOVE_UP|MOVE_DOWN|STOP...
camera_name = topic.split("/")[-2]
self._on_ptz_command(camera_name, payload)
except IndexError:
logger.error(f"Received invalid ptz command: {topic}")
return
elif topic == "restart":
restart_frigate() restart_frigate()
elif topic == INSERT_MANY_RECORDINGS:
def handle_insert_many_recordings():
Recordings.insert_many(payload).execute() Recordings.insert_many(payload).execute()
elif topic == REQUEST_REGION_GRID:
def handle_request_region_grid():
camera = payload camera = payload
grid = get_camera_regions_grid( grid = get_camera_regions_grid(
camera, camera,
@ -122,24 +115,25 @@ class Dispatcher:
max(self.config.model.width, self.config.model.height), max(self.config.model.width, self.config.model.height),
) )
return grid return grid
elif topic == INSERT_PREVIEW:
def handle_insert_preview():
Previews.insert(payload).execute() Previews.insert(payload).execute()
elif topic == UPSERT_REVIEW_SEGMENT:
( def handle_upsert_review_segment():
ReviewSegment.insert(payload) ReviewSegment.insert(payload).on_conflict(
.on_conflict( conflict_target=[ReviewSegment.id],
conflict_target=[ReviewSegment.id], update=payload,
update=payload,
)
.execute()
)
elif topic == CLEAR_ONGOING_REVIEW_SEGMENTS:
ReviewSegment.update(end_time=datetime.datetime.now().timestamp()).where(
ReviewSegment.end_time == None
).execute() ).execute()
elif topic == UPDATE_CAMERA_ACTIVITY:
def handle_clear_ongoing_review_segments():
ReviewSegment.update(end_time=datetime.datetime.now().timestamp()).where(
ReviewSegment.end_time.is_null(True)
).execute()
def handle_update_camera_activity():
self.camera_activity = payload self.camera_activity = payload
elif topic == UPDATE_EVENT_DESCRIPTION:
def handle_update_event_description():
event: Event = Event.get(Event.id == payload["id"]) event: Event = Event.get(Event.id == payload["id"])
event.data["description"] = payload["description"] event.data["description"] = payload["description"]
event.save() event.save()
@ -147,15 +141,32 @@ class Dispatcher:
"event_update", "event_update",
json.dumps({"id": event.id, "description": event.data["description"]}), json.dumps({"id": event.id, "description": event.data["description"]}),
) )
elif topic == UPDATE_MODEL_STATE:
def handle_update_model_state():
model = payload["model"] model = payload["model"]
state = payload["state"] state = payload["state"]
self.model_state[model] = ModelStatusTypesEnum[state] self.model_state[model] = ModelStatusTypesEnum[state]
self.publish("model_state", json.dumps(self.model_state)) self.publish("model_state", json.dumps(self.model_state))
elif topic == "modelState":
model_state = self.model_state.copy() def handle_model_state():
self.publish("model_state", json.dumps(model_state)) self.publish("model_state", json.dumps(self.model_state.copy()))
elif topic == "onConnect":
def handle_update_embeddings_reindex_progress():
self.embeddings_reindex = payload
logger.info(self.embeddings_reindex)
self.publish(
"embeddings_reindex_progress",
json.dumps(payload),
)
def handle_embeddings_reindex_progress():
logger.info(self.embeddings_reindex)
self.publish(
"embeddings_reindex_progress",
json.dumps(self.embeddings_reindex.copy()),
)
def handle_on_connect():
camera_status = self.camera_activity.copy() camera_status = self.camera_activity.copy()
for camera in camera_status.keys(): for camera in camera_status.keys():
@ -170,6 +181,44 @@ class Dispatcher:
} }
self.publish("camera_activity", json.dumps(camera_status)) self.publish("camera_activity", json.dumps(camera_status))
# Dictionary mapping topic to handlers
topic_handlers = {
INSERT_MANY_RECORDINGS: handle_insert_many_recordings,
REQUEST_REGION_GRID: handle_request_region_grid,
INSERT_PREVIEW: handle_insert_preview,
UPSERT_REVIEW_SEGMENT: handle_upsert_review_segment,
CLEAR_ONGOING_REVIEW_SEGMENTS: handle_clear_ongoing_review_segments,
UPDATE_CAMERA_ACTIVITY: handle_update_camera_activity,
UPDATE_EVENT_DESCRIPTION: handle_update_event_description,
UPDATE_MODEL_STATE: handle_update_model_state,
UPDATE_EMBEDDINGS_REINDEX_PROGRESS: handle_update_embeddings_reindex_progress,
"restart": handle_restart,
"embeddingsReindexProgress": handle_embeddings_reindex_progress,
"modelState": handle_model_state,
"onConnect": handle_on_connect,
}
if topic.endswith("set") or topic.endswith("ptz"):
try:
parts = topic.split("/")
if len(parts) == 3 and topic.endswith("set"):
camera_name = parts[-3]
command = parts[-2]
handle_camera_command("set", camera_name, payload)
elif len(parts) == 2 and topic.endswith("set"):
command = parts[-2]
self._global_settings_handlers[command](payload)
elif len(parts) == 2 and topic.endswith("ptz"):
camera_name = parts[-2]
handle_camera_command("ptz", camera_name, payload)
except IndexError:
logger.error(
f"Received invalid {topic.split('/')[-1]} command: {topic}"
)
return
elif topic in topic_handlers:
return topic_handlers[topic]()
else: else:
self.publish(topic, payload, retain=False) self.publish(topic, payload, retain=False)