diff --git a/frigate/comms/dispatcher.py b/frigate/comms/dispatcher.py index 1605d645a..61c82880a 100644 --- a/frigate/comms/dispatcher.py +++ b/frigate/comms/dispatcher.py @@ -15,6 +15,7 @@ from frigate.const import ( INSERT_PREVIEW, REQUEST_REGION_GRID, UPDATE_CAMERA_ACTIVITY, + UPDATE_EMBEDDINGS_REINDEX_PROGRESS, UPDATE_EVENT_DESCRIPTION, UPDATE_MODEL_STATE, UPSERT_REVIEW_SEGMENT, @@ -86,35 +87,27 @@ class Dispatcher: self.camera_activity = {} self.model_state = {} + self.embeddings_reindex = {} def _receive(self, topic: str, payload: str) -> Optional[Any]: """Handle receiving of payload from communicators.""" - if topic.endswith("set"): + + def handle_camera_command(command_type, camera_name, payload): try: - # example /cam_name/detect/set payload=ON|OFF - if topic.count("/") == 2: - camera_name = topic.split("/")[-3] - command = topic.split("/")[-2] - self._camera_settings_handlers[command](camera_name, payload) - elif topic.count("/") == 1: - command = topic.split("/")[-2] - self._global_settings_handlers[command](payload) - except IndexError: - logger.error(f"Received invalid set command: {topic}") - return - elif topic.endswith("ptz"): - try: - # example /cam_name/ptz payload=MOVE_UP|MOVE_DOWN|STOP... - camera_name = topic.split("/")[-2] - self._on_ptz_command(camera_name, payload) - except IndexError: - logger.error(f"Received invalid ptz command: {topic}") - return - elif topic == "restart": + if command_type == "set": + self._camera_settings_handlers[camera_name](camera_name, payload) + elif command_type == "ptz": + self._on_ptz_command(camera_name, payload) + except KeyError: + logger.error(f"Invalid command type: {command_type}") + + def handle_restart(): restart_frigate() - elif topic == INSERT_MANY_RECORDINGS: + + def handle_insert_many_recordings(): Recordings.insert_many(payload).execute() - elif topic == REQUEST_REGION_GRID: + + def handle_request_region_grid(): camera = payload grid = get_camera_regions_grid( camera, @@ -122,24 +115,25 @@ class Dispatcher: max(self.config.model.width, self.config.model.height), ) return grid - elif topic == INSERT_PREVIEW: + + def handle_insert_preview(): Previews.insert(payload).execute() - elif topic == UPSERT_REVIEW_SEGMENT: - ( - ReviewSegment.insert(payload) - .on_conflict( - conflict_target=[ReviewSegment.id], - update=payload, - ) - .execute() - ) - elif topic == CLEAR_ONGOING_REVIEW_SEGMENTS: - ReviewSegment.update(end_time=datetime.datetime.now().timestamp()).where( - ReviewSegment.end_time == None + + def handle_upsert_review_segment(): + ReviewSegment.insert(payload).on_conflict( + conflict_target=[ReviewSegment.id], + update=payload, ).execute() - elif topic == UPDATE_CAMERA_ACTIVITY: + + def handle_clear_ongoing_review_segments(): + ReviewSegment.update(end_time=datetime.datetime.now().timestamp()).where( + ReviewSegment.end_time.is_null(True) + ).execute() + + def handle_update_camera_activity(): self.camera_activity = payload - elif topic == UPDATE_EVENT_DESCRIPTION: + + def handle_update_event_description(): event: Event = Event.get(Event.id == payload["id"]) event.data["description"] = payload["description"] event.save() @@ -147,15 +141,32 @@ class Dispatcher: "event_update", json.dumps({"id": event.id, "description": event.data["description"]}), ) - elif topic == UPDATE_MODEL_STATE: + + def handle_update_model_state(): model = payload["model"] state = payload["state"] self.model_state[model] = ModelStatusTypesEnum[state] self.publish("model_state", json.dumps(self.model_state)) - elif topic == "modelState": - model_state = self.model_state.copy() - self.publish("model_state", json.dumps(model_state)) - elif topic == "onConnect": + + def handle_model_state(): + self.publish("model_state", json.dumps(self.model_state.copy())) + + def handle_update_embeddings_reindex_progress(): + self.embeddings_reindex = payload + logger.info(self.embeddings_reindex) + self.publish( + "embeddings_reindex_progress", + json.dumps(payload), + ) + + def handle_embeddings_reindex_progress(): + logger.info(self.embeddings_reindex) + self.publish( + "embeddings_reindex_progress", + json.dumps(self.embeddings_reindex.copy()), + ) + + def handle_on_connect(): camera_status = self.camera_activity.copy() for camera in camera_status.keys(): @@ -170,6 +181,44 @@ class Dispatcher: } self.publish("camera_activity", json.dumps(camera_status)) + + # Dictionary mapping topic to handlers + topic_handlers = { + INSERT_MANY_RECORDINGS: handle_insert_many_recordings, + REQUEST_REGION_GRID: handle_request_region_grid, + INSERT_PREVIEW: handle_insert_preview, + UPSERT_REVIEW_SEGMENT: handle_upsert_review_segment, + CLEAR_ONGOING_REVIEW_SEGMENTS: handle_clear_ongoing_review_segments, + UPDATE_CAMERA_ACTIVITY: handle_update_camera_activity, + UPDATE_EVENT_DESCRIPTION: handle_update_event_description, + UPDATE_MODEL_STATE: handle_update_model_state, + UPDATE_EMBEDDINGS_REINDEX_PROGRESS: handle_update_embeddings_reindex_progress, + "restart": handle_restart, + "embeddingsReindexProgress": handle_embeddings_reindex_progress, + "modelState": handle_model_state, + "onConnect": handle_on_connect, + } + + if topic.endswith("set") or topic.endswith("ptz"): + try: + parts = topic.split("/") + if len(parts) == 3 and topic.endswith("set"): + camera_name = parts[-3] + command = parts[-2] + handle_camera_command("set", camera_name, payload) + elif len(parts) == 2 and topic.endswith("set"): + command = parts[-2] + self._global_settings_handlers[command](payload) + elif len(parts) == 2 and topic.endswith("ptz"): + camera_name = parts[-2] + handle_camera_command("ptz", camera_name, payload) + except IndexError: + logger.error( + f"Received invalid {topic.split('/')[-1]} command: {topic}" + ) + return + elif topic in topic_handlers: + return topic_handlers[topic]() else: self.publish(topic, payload, retain=False)