123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155 |
- #ifndef MQTT_ASYNC_CLIENT_H_
- #define MQTT_ASYNC_CLIENT_H_
- #include <MQTTAsync.h>
- #include <mutex>
- #include "tool/load_protobuf.hpp"
- #include "tool/log.hpp"
- #include "mqtt_async.pb.h"
- class MqttAsyncClient;
- typedef int (*MessageArrived)(void *client, char *topicName, int topicLen, MQTTAsync_message *message);
- typedef void (*ConnectionLost)(MqttAsyncClient *client, char *cause);
- typedef void (*OnSuccess)(MqttAsyncClient *client, MQTTAsync_successData *response);
- typedef void (*OnFailure)(MqttAsyncClient *client, MQTTAsync_failureData *response);
- class MqttAsyncClient {
- protected:
- MqttAsyncClient();
- virtual ~MqttAsyncClient();
- public:
- /**
- * @brief 通过json文件初始化配置,请保证配置文件路径及内容正确.
- * @param path json配置文件路径
- * @return 如果获取到配置文件就会解析配置,配置文件没有则按照默认配置,找不到配置文件则会返回false.
- */
- bool init_from_proto(const std::string &path);
- /**
- * @brief 连接指定地址和id的EMQX服务器
- * @param address EMQX服务器地址,不要带端口
- * @param client_id 注册的客户端id,不同客户端id不要重复
- **/
- bool Connect(const std::string &address, const std::string &client_id);
- /**
- * @brief 断开当前已连接的EMQX服务器
- **/
- bool Disconnect();
- /**
- * @brief 当前mqtt连接状态
- **/
- bool isConnected();
- /**
- * @brief 添加对EMQX服务器指定topic的监听
- * @param topic EMQX服务器topic
- * @param qos 通信模式,默认为1
- **/
- bool Subscribe(const std::string &topic, int qos = 1);
- /**
- * @brief 取消对EMQX服务器指定topic的监听
- * @param topic EMQX服务器topic
- * @param qos 通信模式,默认为1
- **/
- bool unSubscribe(const std::string &topic);
- /**
- * @brief 向EMQX服务器指定的topic发布消息
- * @param topic EMQX服务器topic
- * @param message 发布消息的具体内容
- * @param length 发布消息的长度
- * @param qos 通信模式,默认为1
- **/
- bool SendMessage(const std::string& topic, void *message, int length, int qos = 1);
- /**
- * @brief 设置接收消息的回调函数,当本客户端收到消息时会执行该回调函数,如果未设置则按照默认回调函数执行
- * @param callback 回调函数
- **/
- void setCallback(MessageArrived callback);
- /**
- * @brief 设置连接丢失的回调函数,当本客户端与EMQX服务器连接丢失时会执行该回调函数,如果未设置则按照默认回调函数执行
- * @param callback 回调函数
- **/
- void setCallback(ConnectionLost callback);
- /**
- * @brief 设置指定功能执行成功的回调函数,当本客户端执行某功能成功时会执行该回调函数,如果未设置则按照默认回调函数执行
- * @param callback 回调函数
- * @param function 指定功能,目前有subscribe、send、connect、disconnect
- **/
- bool setCallback(OnSuccess callback, const std::string &function);
- /**
- * @brief 设置指定功能执行失败的回调函数,当本客户端执行某功能失败时会执行该回调函数,如果未设置则按照默认回调函数执行
- * @param callback 回调函数
- * @param function 指定功能,目前有subscribe、send、connect、disconnect
- **/
- bool setCallback(OnFailure callback, const std::string &function);
- /**
- * @brief 获取监听话题的配置
- **/
- google::protobuf::RepeatedPtrField<SubscribeEtc> getSubscribeEtc();
- private:
- static int onMessageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message);
- static void onMessageLost(void *context, char *cause);
- static void onConnected(void *context, char *cause);
- static void onConnectSuccess(void *context, MQTTAsync_successData *response);
- static void onConnectFailure(void *context, MQTTAsync_failureData *response);
- static void onDisconnectSuccess(void *context, MQTTAsync_successData *response);
- static void onDisconnectFailure(void *context, MQTTAsync_failureData *response);
- static void onSubscribeSuccess(void *context, MQTTAsync_successData *response);
- static void onSubscribeFailure(void *context, MQTTAsync_failureData *response);
- static void onUnSubscribeSuccess(void *context, MQTTAsync_successData *response);
- static void onUnSubscribeFailure(void *context, MQTTAsync_failureData *response);
- static void onSendMessageSuccess(void *context, MQTTAsync_successData *response);
- static void onSendMessageFailure(void *context, MQTTAsync_failureData *response);
- public:
- protected:
- MqttAsyncConfig *m_etc_;
- MQTTAsync m_client_;
- MQTTAsync_createOptions m_create_opts = MQTTAsync_createOptions_initializer;
- MQTTAsync_connectOptions m_connect_ops = MQTTAsync_connectOptions_initializer;
- MessageArrived m_message_arrived_callback_;
- ConnectionLost m_connect_lost_callback_;
- OnSuccess m_connect_success_;
- OnSuccess m_disconnect_success_;
- OnSuccess m_subscribe_success_;
- OnSuccess m_unsubscribe_success_;
- OnSuccess m_send_success_;
- OnFailure m_connect_failure_;
- OnFailure m_disconnect_failure_;
- OnFailure m_subscribe_failure_;
- OnFailure m_unsubscribe_failure_;
- OnFailure m_send_failure_;
- };
- #endif
|