#ifndef MQTT_ASYNC_CLIENT_H_ #define MQTT_ASYNC_CLIENT_H_ #include #include #include "tool/load_protobuf.hpp" #include "tool/log.hpp" #include "mqtt_async.pb.h" class MqttAsyncClient; typedef int (*MessageArrived)(void *client, char *topicName, int topicLen, MQTTAsync_message *message); typedef void (*ConnectionLost)(MqttAsyncClient *client, char *cause); typedef void (*OnSuccess)(MqttAsyncClient *client, MQTTAsync_successData *response); typedef void (*OnFailure)(MqttAsyncClient *client, MQTTAsync_failureData *response); class MqttAsyncClient { protected: MqttAsyncClient(); virtual ~MqttAsyncClient(); public: /** * @brief 通过json文件初始化配置,请保证配置文件路径及内容正确. * @param path json配置文件路径 * @return 如果获取到配置文件就会解析配置,配置文件没有则按照默认配置,找不到配置文件则会返回false. */ bool init_from_proto(const std::string &path); /** * @brief 连接指定地址和id的EMQX服务器 * @param address EMQX服务器地址,不要带端口 * @param client_id 注册的客户端id,不同客户端id不要重复 **/ bool Connect(const std::string &address, const std::string &client_id); /** * @brief 断开当前已连接的EMQX服务器 **/ bool Disconnect(); /** * @brief 当前mqtt连接状态 **/ bool isConnected(); /** * @brief 添加对EMQX服务器指定topic的监听 * @param topic EMQX服务器topic * @param qos 通信模式,默认为1 **/ bool Subscribe(const std::string &topic, int qos = 1); /** * @brief 取消对EMQX服务器指定topic的监听 * @param topic EMQX服务器topic * @param qos 通信模式,默认为1 **/ bool unSubscribe(const std::string &topic); /** * @brief 向EMQX服务器指定的topic发布消息 * @param topic EMQX服务器topic * @param message 发布消息的具体内容 * @param length 发布消息的长度 * @param qos 通信模式,默认为1 **/ bool SendMessage(const std::string& topic, void *message, int length, int qos = 1); /** * @brief 设置接收消息的回调函数,当本客户端收到消息时会执行该回调函数,如果未设置则按照默认回调函数执行 * @param callback 回调函数 **/ void setCallback(MessageArrived callback); /** * @brief 设置连接丢失的回调函数,当本客户端与EMQX服务器连接丢失时会执行该回调函数,如果未设置则按照默认回调函数执行 * @param callback 回调函数 **/ void setCallback(ConnectionLost callback); /** * @brief 设置指定功能执行成功的回调函数,当本客户端执行某功能成功时会执行该回调函数,如果未设置则按照默认回调函数执行 * @param callback 回调函数 * @param function 指定功能,目前有subscribe、send、connect、disconnect **/ bool setCallback(OnSuccess callback, const std::string &function); /** * @brief 设置指定功能执行失败的回调函数,当本客户端执行某功能失败时会执行该回调函数,如果未设置则按照默认回调函数执行 * @param callback 回调函数 * @param function 指定功能,目前有subscribe、send、connect、disconnect **/ bool setCallback(OnFailure callback, const std::string &function); /** * @brief 获取监听话题的配置 **/ google::protobuf::RepeatedPtrField getSubscribeEtc(); private: static int onMessageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message); static void onMessageLost(void *context, char *cause); static void onConnected(void *context, char *cause); static void onConnectSuccess(void *context, MQTTAsync_successData *response); static void onConnectFailure(void *context, MQTTAsync_failureData *response); static void onDisconnectSuccess(void *context, MQTTAsync_successData *response); static void onDisconnectFailure(void *context, MQTTAsync_failureData *response); static void onSubscribeSuccess(void *context, MQTTAsync_successData *response); static void onSubscribeFailure(void *context, MQTTAsync_failureData *response); static void onUnSubscribeSuccess(void *context, MQTTAsync_successData *response); static void onUnSubscribeFailure(void *context, MQTTAsync_failureData *response); static void onSendMessageSuccess(void *context, MQTTAsync_successData *response); static void onSendMessageFailure(void *context, MQTTAsync_failureData *response); public: protected: MqttAsyncConfig *m_etc_; MQTTAsync m_client_; MQTTAsync_createOptions m_create_opts = MQTTAsync_createOptions_initializer; MQTTAsync_connectOptions m_connect_ops = MQTTAsync_connectOptions_initializer; MessageArrived m_message_arrived_callback_; ConnectionLost m_connect_lost_callback_; OnSuccess m_connect_success_; OnSuccess m_disconnect_success_; OnSuccess m_subscribe_success_; OnSuccess m_unsubscribe_success_; OnSuccess m_send_success_; OnFailure m_connect_failure_; OnFailure m_disconnect_failure_; OnFailure m_subscribe_failure_; OnFailure m_unsubscribe_failure_; OnFailure m_send_failure_; }; #endif