mqtt_async.h 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. #ifndef MQTT_ASYNC_CLIENT_H_
  2. #define MQTT_ASYNC_CLIENT_H_
  3. #include <MQTTAsync.h>
  4. #include <mutex>
  5. #include "tool/load_protobuf.hpp"
  6. #include "tool/log.hpp"
  7. #include "mqtt_async.pb.h"
  8. class MqttAsyncClient;
  9. typedef int (*MessageArrived)(void *client, char *topicName, int topicLen, MQTTAsync_message *message);
  10. typedef void (*ConnectionLost)(MqttAsyncClient *client, char *cause);
  11. typedef void (*OnSuccess)(MqttAsyncClient *client, MQTTAsync_successData *response);
  12. typedef void (*OnFailure)(MqttAsyncClient *client, MQTTAsync_failureData *response);
  13. class MqttAsyncClient {
  14. protected:
  15. MqttAsyncClient();
  16. virtual ~MqttAsyncClient();
  17. public:
  18. /**
  19. * @brief 通过json文件初始化配置,请保证配置文件路径及内容正确.
  20. * @param path json配置文件路径
  21. * @return 如果获取到配置文件就会解析配置,配置文件没有则按照默认配置,找不到配置文件则会返回false.
  22. */
  23. bool init_from_proto(const std::string &path);
  24. /**
  25. * @brief 连接指定地址和id的EMQX服务器
  26. * @param address EMQX服务器地址,不要带端口
  27. * @param client_id 注册的客户端id,不同客户端id不要重复
  28. **/
  29. bool Connect(const std::string &address, const std::string &client_id);
  30. /**
  31. * @brief 断开当前已连接的EMQX服务器
  32. **/
  33. bool Disconnect();
  34. /**
  35. * @brief 当前mqtt连接状态
  36. **/
  37. bool isConnected();
  38. /**
  39. * @brief 添加对EMQX服务器指定topic的监听
  40. * @param topic EMQX服务器topic
  41. * @param qos 通信模式,默认为1
  42. **/
  43. bool Subscribe(const std::string &topic, int qos = 1);
  44. /**
  45. * @brief 取消对EMQX服务器指定topic的监听
  46. * @param topic EMQX服务器topic
  47. * @param qos 通信模式,默认为1
  48. **/
  49. bool unSubscribe(const std::string &topic);
  50. /**
  51. * @brief 向EMQX服务器指定的topic发布消息
  52. * @param topic EMQX服务器topic
  53. * @param message 发布消息的具体内容
  54. * @param length 发布消息的长度
  55. * @param qos 通信模式,默认为1
  56. **/
  57. bool SendMessage(const std::string& topic, void *message, int length, int qos = 1);
  58. /**
  59. * @brief 设置接收消息的回调函数,当本客户端收到消息时会执行该回调函数,如果未设置则按照默认回调函数执行
  60. * @param callback 回调函数
  61. **/
  62. void setCallback(MessageArrived callback);
  63. /**
  64. * @brief 设置连接丢失的回调函数,当本客户端与EMQX服务器连接丢失时会执行该回调函数,如果未设置则按照默认回调函数执行
  65. * @param callback 回调函数
  66. **/
  67. void setCallback(ConnectionLost callback);
  68. /**
  69. * @brief 设置指定功能执行成功的回调函数,当本客户端执行某功能成功时会执行该回调函数,如果未设置则按照默认回调函数执行
  70. * @param callback 回调函数
  71. * @param function 指定功能,目前有subscribe、send、connect、disconnect
  72. **/
  73. bool setCallback(OnSuccess callback, const std::string &function);
  74. /**
  75. * @brief 设置指定功能执行失败的回调函数,当本客户端执行某功能失败时会执行该回调函数,如果未设置则按照默认回调函数执行
  76. * @param callback 回调函数
  77. * @param function 指定功能,目前有subscribe、send、connect、disconnect
  78. **/
  79. bool setCallback(OnFailure callback, const std::string &function);
  80. /**
  81. * @brief 获取监听话题的配置
  82. **/
  83. google::protobuf::RepeatedPtrField<SubscribeEtc> getSubscribeEtc();
  84. private:
  85. static int onMessageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message);
  86. static void onMessageLost(void *context, char *cause);
  87. static void onConnected(void *context, char *cause);
  88. static void onConnectSuccess(void *context, MQTTAsync_successData *response);
  89. static void onConnectFailure(void *context, MQTTAsync_failureData *response);
  90. static void onDisconnectSuccess(void *context, MQTTAsync_successData *response);
  91. static void onDisconnectFailure(void *context, MQTTAsync_failureData *response);
  92. static void onSubscribeSuccess(void *context, MQTTAsync_successData *response);
  93. static void onSubscribeFailure(void *context, MQTTAsync_failureData *response);
  94. static void onUnSubscribeSuccess(void *context, MQTTAsync_successData *response);
  95. static void onUnSubscribeFailure(void *context, MQTTAsync_failureData *response);
  96. static void onSendMessageSuccess(void *context, MQTTAsync_successData *response);
  97. static void onSendMessageFailure(void *context, MQTTAsync_failureData *response);
  98. public:
  99. protected:
  100. MqttAsyncConfig *m_etc_;
  101. MQTTAsync m_client_;
  102. MQTTAsync_createOptions m_create_opts = MQTTAsync_createOptions_initializer;
  103. MQTTAsync_connectOptions m_connect_ops = MQTTAsync_connectOptions_initializer;
  104. MessageArrived m_message_arrived_callback_;
  105. ConnectionLost m_connect_lost_callback_;
  106. OnSuccess m_connect_success_;
  107. OnSuccess m_disconnect_success_;
  108. OnSuccess m_subscribe_success_;
  109. OnSuccess m_unsubscribe_success_;
  110. OnSuccess m_send_success_;
  111. OnFailure m_connect_failure_;
  112. OnFailure m_disconnect_failure_;
  113. OnFailure m_subscribe_failure_;
  114. OnFailure m_unsubscribe_failure_;
  115. OnFailure m_send_failure_;
  116. };
  117. #endif