mqtt_async.h 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. #ifndef MQTT_ASYNC_CLIENT_H_
  2. #define MQTT_ASYNC_CLIENT_H_
  3. #include <MQTTAsync.h>
  4. #include <mutex>
  5. #include "defines.hpp"
  6. #include "json/json.h"
  7. #include "tool/static_tool.hpp"
  8. #include "google/protobuf/util/json_util.h"
  9. #include "mqtt_async.pb.h"
  10. class MqttAsyncClient;
  11. typedef int (*MessageArrived)(MqttAsyncClient *client, char *topicName, int topicLen, MQTTAsync_message *message);
  12. typedef void (*ConnectionLost)(MqttAsyncClient *client, char *cause);
  13. typedef void (*OnSuccess)(MqttAsyncClient *client, MQTTAsync_successData *response);
  14. typedef void (*OnFailure)(MqttAsyncClient *client, MQTTAsync_failureData *response);
  15. class MqttAsyncClient {
  16. protected:
  17. MqttAsyncClient();
  18. virtual ~MqttAsyncClient();
  19. public:
  20. /**
  21. * @brief 通过json文件初始化配置,请保证配置文件路径及内容正确.
  22. * @param path json配置文件路径
  23. * @return 如果获取到配置文件就会解析配置,配置文件没有则按照默认配置,找不到配置文件则会返回false.
  24. */
  25. bool init_from_proto(const std::string &path);
  26. /**
  27. * @brief 连接指定地址和id的EMQX服务器
  28. * @param address EMQX服务器地址,不要带端口
  29. * @param client_id 注册的客户端id,不同客户端id不要重复
  30. **/
  31. bool Connect(const std::string &address, const std::string &client_id);
  32. /**
  33. * @brief 断开当前已连接的EMQX服务器
  34. **/
  35. bool Disconnect();
  36. /**
  37. * @brief 当前mqtt连接状态
  38. **/
  39. bool isConnected();
  40. /**
  41. * @brief 添加对EMQX服务器指定topic的监听
  42. * @param topic EMQX服务器topic
  43. * @param qos 通信模式,默认为1
  44. **/
  45. bool Subscribe(const std::string &topic, int qos = 1);
  46. /**
  47. * @brief 取消对EMQX服务器指定topic的监听
  48. * @param topic EMQX服务器topic
  49. * @param qos 通信模式,默认为1
  50. **/
  51. bool unSubscribe(const std::string &topic);
  52. /**
  53. * @brief 向EMQX服务器指定的topic发布消息
  54. * @param topic EMQX服务器topic
  55. * @param message 发布消息的具体内容
  56. * @param length 发布消息的长度
  57. * @param qos 通信模式,默认为1
  58. **/
  59. bool SendMessage(std::string topic, void *message, int length, int qos = 1);
  60. /**
  61. * @brief 设置接收消息的回调函数,当本客户端收到消息时会执行该回调函数,如果未设置则按照默认回调函数执行
  62. * @param callback 回调函数
  63. **/
  64. void setCallback(MessageArrived callback);
  65. /**
  66. * @brief 设置连接丢失的回调函数,当本客户端与EMQX服务器连接丢失时会执行该回调函数,如果未设置则按照默认回调函数执行
  67. * @param callback 回调函数
  68. **/
  69. void setCallback(ConnectionLost callback);
  70. /**
  71. * @brief 设置指定功能执行成功的回调函数,当本客户端执行某功能成功时会执行该回调函数,如果未设置则按照默认回调函数执行
  72. * @param callback 回调函数
  73. * @param function 指定功能,目前有subscribe、send、connect、disconnect
  74. **/
  75. bool setCallback(OnSuccess callback, const std::string &function);
  76. /**
  77. * @brief 设置指定功能执行失败的回调函数,当本客户端执行某功能失败时会执行该回调函数,如果未设置则按照默认回调函数执行
  78. * @param callback 回调函数
  79. * @param function 指定功能,目前有subscribe、send、connect、disconnect
  80. **/
  81. bool setCallback(OnFailure callback, const std::string &function);
  82. /**
  83. * @brief 获取监听话题的配置
  84. **/
  85. google::protobuf::RepeatedPtrField<SubscribeEtc> getSubscribeEtc();
  86. private:
  87. static int onMessageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message);
  88. static void onMessageLost(void *context, char *cause);
  89. static void onConnected(void *context, char *cause);
  90. static void onConnectSuccess(void *context, MQTTAsync_successData *response);
  91. static void onConnectFailure(void *context, MQTTAsync_failureData *response);
  92. static void onDisconnectSuccess(void *context, MQTTAsync_successData *response);
  93. static void onDisconnectFailure(void *context, MQTTAsync_failureData *response);
  94. static void onSubscribeSuccess(void *context, MQTTAsync_successData *response);
  95. static void onSubscribeFailure(void *context, MQTTAsync_failureData *response);
  96. static void onUnSubscribeSuccess(void *context, MQTTAsync_successData *response);
  97. static void onUnSubscribeFailure(void *context, MQTTAsync_failureData *response);
  98. static void onSendMessageSuccess(void *context, MQTTAsync_successData *response);
  99. static void onSendMessageFailure(void *context, MQTTAsync_failureData *response);
  100. public:
  101. protected:
  102. MqttAsyncConfig *m_etc_;
  103. MQTTAsync m_client_;
  104. MQTTAsync_createOptions m_create_opts = MQTTAsync_createOptions_initializer;
  105. MQTTAsync_connectOptions m_connect_ops = MQTTAsync_connectOptions_initializer;
  106. MessageArrived m_message_arrived_callback_;
  107. ConnectionLost m_connect_lost_callback_;
  108. OnSuccess m_connect_success_;
  109. OnSuccess m_disconnect_success_;
  110. OnSuccess m_subscribe_success_;
  111. OnSuccess m_unsubscribe_success_;
  112. OnSuccess m_send_success_;
  113. OnFailure m_connect_failure_;
  114. OnFailure m_disconnect_failure_;
  115. OnFailure m_subscribe_failure_;
  116. OnFailure m_unsubscribe_failure_;
  117. OnFailure m_send_failure_;
  118. };
  119. #endif