mqtt_async.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349
  1. #include "mqtt_async.h"
  2. MqttAsyncClient::MqttAsyncClient(/* args */) {
  3. m_etc_ = new MqttAsyncConfig();
  4. m_client_ = nullptr;
  5. m_message_arrived_callback_ = nullptr;
  6. m_connect_lost_callback_ = nullptr;
  7. m_connect_success_ = nullptr;
  8. m_disconnect_success_ = nullptr;
  9. m_subscribe_success_ = nullptr;
  10. m_unsubscribe_success_ = nullptr;
  11. m_send_success_ = nullptr;
  12. m_connect_failure_ = nullptr;
  13. m_disconnect_failure_ = nullptr;
  14. m_subscribe_failure_ = nullptr;
  15. m_unsubscribe_failure_ = nullptr;
  16. m_send_failure_ = nullptr;
  17. }
  18. MqttAsyncClient::~MqttAsyncClient() {
  19. Disconnect();
  20. MQTTAsync_destroy(&m_client_);
  21. }
  22. bool MqttAsyncClient::init_from_proto(const std::string &path) {
  23. auto ret = loadProtobufFile(path, *m_etc_);
  24. LOG(INFO) << m_etc_->DebugString();
  25. if (ret != SUCCESS) {
  26. LOG(ERROR) << "init config proto error from " << path;
  27. return false;
  28. }
  29. m_create_opts.maxBufferedMessages = m_etc_->create_opts().maxbufferedmessages();
  30. m_connect_ops.cleansession = m_etc_->connect_ops().cleansession();
  31. m_connect_ops.keepAliveInterval = m_etc_->connect_ops().keepaliveinterval();
  32. m_connect_ops.automaticReconnect = m_etc_->connect_ops().automaticreconnect();
  33. m_connect_ops.minRetryInterval = m_etc_->connect_ops().minretryinterval();
  34. m_connect_ops.maxRetryInterval = m_etc_->connect_ops().maxretryinterval();
  35. if (m_etc_->client_id() == "Auto") {
  36. m_etc_->set_client_id(std::to_string(std::chrono::steady_clock::now().time_since_epoch().count()));
  37. }
  38. LOG(INFO) << m_etc_->client_id();
  39. Connect(m_etc_->address(), m_etc_->client_id());
  40. return true;
  41. }
  42. bool MqttAsyncClient::Connect(const std::string &address, const std::string &client_id) {
  43. int ret = MQTTASYNC_FAILURE;
  44. if ((ret = MQTTAsync_createWithOptions(&m_client_, address.c_str(), client_id.c_str(), MQTTCLIENT_PERSISTENCE_NONE,
  45. nullptr, &m_create_opts)
  46. != MQTTASYNC_SUCCESS)) {
  47. LOG(ERROR) << "Failed to create client object, return code " << ret;
  48. return false;
  49. } else {
  50. DLOG(INFO) << "Success to create client object: " << address << ", client id: " << client_id << ".";
  51. }
  52. if ((ret = MQTTAsync_setCallbacks(m_client_, this, onMessageLost, onMessageArrived, nullptr)) !=
  53. MQTTASYNC_SUCCESS) {
  54. LOG(ERROR) << "Failed to set callback, return code " << ret;
  55. return false;
  56. }
  57. if ((ret = MQTTAsync_setConnected(m_client_, this, onConnected)) != MQTTASYNC_SUCCESS) {
  58. printf("Failed to set connected callback, return code %d\n", ret);
  59. return false;
  60. }
  61. if ((ret = MQTTAsync_connect(m_client_, &m_connect_ops)) != MQTTASYNC_SUCCESS) {
  62. printf("Failed to start connect, return code %d\n", ret);
  63. return false;
  64. }
  65. return true;
  66. }
  67. bool MqttAsyncClient::Disconnect() {
  68. DLOG(INFO) << __func__;
  69. int ret = MQTTASYNC_FAILURE;
  70. MQTTAsync_disconnectOptions opts = MQTTAsync_disconnectOptions_initializer;
  71. opts.timeout = 30;
  72. opts.context = this;
  73. opts.onSuccess = onDisconnectSuccess;
  74. opts.onFailure = onDisconnectFailure;
  75. if ((ret = MQTTAsync_disconnect(m_client_, &opts)) != MQTTASYNC_SUCCESS) {
  76. printf("Failed to disconnect, return code %d\n", ret);
  77. return false;
  78. }
  79. return true;
  80. }
  81. bool MqttAsyncClient::isConnected() {
  82. return MQTTAsync_isConnected(m_client_);
  83. }
  84. bool MqttAsyncClient::SendMessage(const std::string& topic, void *message, int length, int qos) {
  85. if (message == nullptr) {
  86. return false;
  87. }
  88. int ret = MQTTASYNC_FAILURE;
  89. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  90. MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
  91. opts.context = this;
  92. opts.onSuccess = onSendMessageSuccess;
  93. opts.onFailure = onSendMessageFailure;
  94. pubmsg.payload = message;
  95. pubmsg.payloadlen = length;
  96. pubmsg.retained = 0;
  97. m_etc_->sendmessage().size();
  98. // if (m_etc.findsendMessageEtc(topic) != nullptr) {
  99. // pubmsg.qos = m_etc.findsendMessageEtc(topic)->qos;
  100. // } else {
  101. pubmsg.qos = qos;
  102. // }
  103. if ((ret = MQTTAsync_sendMessage(m_client_, topic.c_str(), &pubmsg, &opts)) != MQTTASYNC_SUCCESS) {
  104. printf("Failed to start sendMessage, return code %d, connect statu %d\n", ret, MQTTAsync_isConnected(m_client_));
  105. return false;
  106. }
  107. return true;
  108. }
  109. void MqttAsyncClient::setCallback(MessageArrived callback) {
  110. DLOG(INFO) << __func__;
  111. m_message_arrived_callback_ = callback;
  112. }
  113. void MqttAsyncClient::setCallback(ConnectionLost callback) {
  114. DLOG(INFO) << __func__;
  115. m_connect_lost_callback_ = callback;
  116. }
  117. bool MqttAsyncClient::setCallback(OnSuccess callback, const std::string &function) {
  118. DLOG(INFO) << __func__;
  119. bool ret = true;
  120. if (function == "subscribe") {
  121. m_subscribe_success_ = callback;
  122. } else if (function == "unsubscribe") {
  123. m_unsubscribe_success_ = callback;
  124. } else if (function == "send") {
  125. m_send_success_ = callback;
  126. } else if (function == "connect") {
  127. m_connect_success_ = callback;
  128. } else if (function == "disconnect") {
  129. m_disconnect_success_ = callback;
  130. } else {
  131. ret = false;
  132. }
  133. return ret;
  134. }
  135. bool MqttAsyncClient::setCallback(OnFailure callback, const std::string &function) {
  136. DLOG(INFO) << __func__ << function;
  137. bool ret = true;
  138. if (function == "subscribe") {
  139. m_subscribe_failure_ = callback;
  140. } else if (function == "unsubscribe") {
  141. m_unsubscribe_failure_ = callback;
  142. } else if (function == "send") {
  143. m_send_failure_ = callback;
  144. } else if (function == "connect") {
  145. m_connect_failure_ = callback;
  146. } else if (function == "disconnect") {
  147. m_disconnect_failure_ = callback;
  148. } else {
  149. ret = false;
  150. }
  151. return ret;
  152. }
  153. google::protobuf::RepeatedPtrField<SubscribeEtc> MqttAsyncClient::getSubscribeEtc() {
  154. return m_etc_->subscribe();
  155. }
  156. bool MqttAsyncClient::Subscribe(const std::string &topic, int qos) {
  157. int ret = MQTTASYNC_FAILURE;
  158. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  159. opts.context = this;
  160. opts.onSuccess = onSubscribeSuccess;
  161. opts.onFailure = onSubscribeFailure;
  162. if ((ret = MQTTAsync_subscribe(m_client_, topic.c_str(), qos, &opts)) != MQTTASYNC_SUCCESS) {
  163. DLOG(INFO) << "Failed to subscribe " << topic << ", return code " << ret;
  164. } else {
  165. DLOG(INFO) << "Success to subscribe " << topic;
  166. }
  167. return true;
  168. }
  169. bool MqttAsyncClient::unSubscribe(const std::string &topic) {
  170. DLOG(INFO) << __func__;
  171. int ret = MQTTASYNC_FAILURE;
  172. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  173. opts.context = this;
  174. opts.onSuccess = onUnSubscribeSuccess;
  175. opts.onFailure = onUnSubscribeFailure;
  176. if ((ret = MQTTAsync_unsubscribe(m_client_, topic.c_str(), &opts)) != MQTTASYNC_SUCCESS) {
  177. printf("Failed to subscribe %s, return code %d\n", topic.c_str(), ret);
  178. return false;
  179. }
  180. return true;
  181. }
  182. int MqttAsyncClient::onMessageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message) {
  183. bool ret = true;
  184. auto *client = (MqttAsyncClient *) context;
  185. if (client->m_message_arrived_callback_ != nullptr) {
  186. ret = client->m_message_arrived_callback_(client, topicName, topicLen, message);
  187. } else {
  188. }
  189. MQTTAsync_freeMessage(&message);
  190. MQTTAsync_free(topicName);
  191. return ret;
  192. }
  193. void MqttAsyncClient::onMessageLost(void *context, char *cause) {
  194. if (context == nullptr || cause == nullptr) {
  195. DLOG(INFO) << __func__;
  196. }
  197. auto *client = (MqttAsyncClient *) context;
  198. if (client->m_connect_lost_callback_ != nullptr) {
  199. client->m_connect_lost_callback_(client, cause);
  200. } else {
  201. }
  202. }
  203. void MqttAsyncClient::onConnectSuccess(void *context, MQTTAsync_successData *response) {
  204. DLOG(INFO) << __func__;
  205. auto *client = (MqttAsyncClient *) context;
  206. if (client->m_connect_success_ != nullptr) {
  207. DLOG(INFO) << "run m_connect_success_.";
  208. client->m_connect_success_(client, response);
  209. }
  210. DLOG(INFO) << "m_connect_success_ is nullptr.";
  211. }
  212. void MqttAsyncClient::onConnectFailure(void *context, MQTTAsync_failureData *response) {
  213. DLOG(INFO) << __func__;
  214. auto *client = (MqttAsyncClient *) context;
  215. if (client->m_connect_failure_ != nullptr) {
  216. client->m_connect_failure_(client, response);
  217. } else {
  218. }
  219. }
  220. void MqttAsyncClient::onDisconnectSuccess(void *context, MQTTAsync_successData *response) {
  221. DLOG(INFO) << __func__;
  222. auto *client = (MqttAsyncClient *) context;
  223. if (client->m_disconnect_success_ != nullptr) {
  224. client->m_disconnect_success_(client, response);
  225. } else {
  226. }
  227. }
  228. void MqttAsyncClient::onDisconnectFailure(void *context, MQTTAsync_failureData *response) {
  229. DLOG(INFO) << __func__;
  230. auto *client = (MqttAsyncClient *) context;
  231. if (client->m_disconnect_failure_ != nullptr) {
  232. client->m_disconnect_failure_(client, response);
  233. } else {
  234. }
  235. }
  236. void MqttAsyncClient::onSubscribeSuccess(void *context, MQTTAsync_successData *response) {
  237. DLOG(INFO) << __func__;
  238. auto *client = (MqttAsyncClient *) context;
  239. if (client->m_subscribe_success_ != nullptr) {
  240. client->m_subscribe_success_(client, response);
  241. } else {
  242. }
  243. }
  244. void MqttAsyncClient::onSubscribeFailure(void *context, MQTTAsync_failureData *response) {
  245. DLOG(INFO) << __func__;
  246. auto *client = (MqttAsyncClient *) context;
  247. if (client->m_subscribe_failure_ != nullptr) {
  248. client->m_subscribe_failure_(client, response);
  249. } else {
  250. }
  251. }
  252. void MqttAsyncClient::onUnSubscribeSuccess(void *context, MQTTAsync_successData *response) {
  253. DLOG(INFO) << __func__;
  254. auto *client = (MqttAsyncClient *) context;
  255. if (client->m_unsubscribe_success_ != nullptr) {
  256. client->m_unsubscribe_success_(client, response);
  257. } else {
  258. }
  259. }
  260. void MqttAsyncClient::onUnSubscribeFailure(void *context, MQTTAsync_failureData *response) {
  261. DLOG(INFO) << __func__;
  262. auto *client = (MqttAsyncClient *) context;
  263. if (client->m_unsubscribe_failure_ != nullptr) {
  264. client->m_unsubscribe_failure_(client, response);
  265. } else {
  266. }
  267. }
  268. void MqttAsyncClient::onSendMessageSuccess(void *context, MQTTAsync_successData *response) {
  269. DLOG(INFO) << __func__;
  270. auto *client = (MqttAsyncClient *) context;
  271. if (client->m_send_success_ != nullptr) {
  272. client->m_send_success_(client, response);
  273. } else {
  274. }
  275. }
  276. void MqttAsyncClient::onSendMessageFailure(void *context, MQTTAsync_failureData *response) {
  277. DLOG(INFO) << __func__;
  278. auto *client = (MqttAsyncClient *) context;
  279. if (client->m_send_failure_ != nullptr) {
  280. client->m_send_failure_(client, response);
  281. } else {
  282. }
  283. }
  284. void MqttAsyncClient::onConnected(void *context, char *cause) {
  285. DLOG(INFO) << __func__;
  286. auto *client = (MqttAsyncClient *) context;
  287. if (client == nullptr || client->m_etc_->subscribe_size() == 0) {
  288. return;
  289. }
  290. for (auto &sub: client->m_etc_->subscribe()) {
  291. client->Subscribe(sub.topic(), sub.qos());
  292. }
  293. }