mqtt_async.cpp 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. #include "mqtt_async.hpp"
  2. MqttAsyncClient::MqttAsyncClient(/* args */)
  3. {
  4. m_conn_opts_ = MQTTAsync_connectOptions_initializer;
  5. m_create_opts_ = MQTTAsync_createOptions_initializer;
  6. m_conn_opts_.context = this;
  7. m_conn_opts_.onSuccess = onConnectSuccess;
  8. m_conn_opts_.onFailure = onConnectFailure;
  9. m_conn_opts_.cleansession = true;
  10. m_conn_opts_.keepAliveInterval = 20;
  11. m_conn_opts_.automaticReconnect = true;
  12. m_conn_opts_.minRetryInterval = 3;
  13. m_conn_opts_.maxRetryInterval = 30;
  14. m_message_arrived_callback_ = nullptr;
  15. }
  16. MqttAsyncClient::~MqttAsyncClient()
  17. {
  18. onDisconnect();
  19. MQTTAsync_destroy(&m_client_);
  20. }
  21. bool MqttAsyncClient::onConnect(std::string address, std::string client_id, int port) {
  22. int ret = MQTTASYNC_FAILURE;
  23. // address = address + ":" + std::to_string(port);
  24. if ((ret = MQTTAsync_createWithOptions(&m_client_, address.c_str(), client_id.c_str(), MQTTCLIENT_PERSISTENCE_NONE, NULL, &m_create_opts_)
  25. != MQTTASYNC_SUCCESS))
  26. {
  27. printf("Failed to create client object, return code %d\n", ret);
  28. return false;
  29. }
  30. else
  31. {
  32. printf("Success to create client object: %s, client id: %s.\n", address.c_str(), client_id.c_str());
  33. }
  34. if ((ret = MQTTAsync_setCallbacks(m_client_, this, onMessageLost, onMessageArrived, NULL)) != MQTTASYNC_SUCCESS)
  35. {
  36. printf("Failed to set callback, return code %d\n", ret);
  37. return false;
  38. }
  39. if ((ret = MQTTAsync_connect(m_client_, &m_conn_opts_)) != MQTTASYNC_SUCCESS)
  40. {
  41. printf("Failed to start connect, return code %d\n", ret);
  42. return false;
  43. }
  44. return true;
  45. }
  46. bool MqttAsyncClient::onDisconnect() {
  47. int ret = MQTTASYNC_FAILURE;
  48. MQTTAsync_disconnectOptions opts = MQTTAsync_disconnectOptions_initializer;
  49. opts.timeout = 30;
  50. opts.context = this;
  51. opts.onSuccess = NULL;
  52. opts.onFailure = NULL;
  53. if ((ret = MQTTAsync_disconnect(m_client_, &opts)) != MQTTASYNC_SUCCESS)
  54. {
  55. printf("Failed to disconnect, return code %d\n", ret);
  56. return false;
  57. }
  58. return true;
  59. }
  60. bool MqttAsyncClient::onSendMessage(std::string topic, unsigned char* message, int length, int qos) {
  61. int ret = MQTTASYNC_FAILURE;
  62. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  63. MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
  64. opts.context = this;
  65. opts.onSuccess = onSendMessageSuccess;
  66. opts.onFailure = onSendMessageFailure;
  67. pubmsg.payload = message;
  68. pubmsg.payloadlen = length;
  69. pubmsg.qos = qos;
  70. pubmsg.retained = 0;
  71. if ((ret = MQTTAsync_sendMessage(m_client_, topic.c_str(), &pubmsg, &opts)) != MQTTASYNC_SUCCESS)
  72. {
  73. printf("Failed to start sendMessage, return code %d\n", ret);
  74. return false;
  75. }
  76. return true;
  77. }
  78. void MqttAsyncClient::setCallback(MessageArrived callback) {
  79. m_message_arrived_callback_ = callback;
  80. }
  81. bool MqttAsyncClient::onSubscribe(std::string topic, int qos) {
  82. int ret = MQTTASYNC_FAILURE;
  83. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  84. opts.context = this;
  85. opts.onSuccess = onSubscribeSuccess;
  86. opts.onFailure = onSubscribeFailure;
  87. if ((ret = MQTTAsync_subscribe(m_client_, topic.c_str(), qos, &opts)) != MQTTASYNC_SUCCESS)
  88. {
  89. printf("Failed to subscribe %s, return code %d\n", topic.c_str(), ret);
  90. return false;
  91. }
  92. return true;
  93. }
  94. int MqttAsyncClient::onMessageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message) {
  95. printf("---Debug %s %s %s.\n", __func__, topicName, (char*)message->payload);
  96. MqttAsyncClient* client = (MqttAsyncClient*)context;
  97. if (client->m_message_arrived_callback_ != nullptr) {
  98. return client->m_message_arrived_callback_(context, topicName, topicLen, message);
  99. } else {
  100. printf("nullptr");
  101. }
  102. return message->msgid;
  103. }
  104. void MqttAsyncClient::onMessageLost(void *context, char *cause) {
  105. printf("---Debug %s.\n", __func__);
  106. }
  107. void MqttAsyncClient::onConnectSuccess(void* context, MQTTAsync_successData* response) {
  108. printf("---Debug %s %d.\n", __func__, response->token);
  109. }
  110. void MqttAsyncClient::onConnectFailure(void* context, MQTTAsync_failureData* response) {
  111. printf("---Debug %s %d.\n", __func__, response->token);
  112. }
  113. void MqttAsyncClient::onDisconnectSuccess(void* context, MQTTAsync_successData* response) {
  114. printf("---Debug %s %d.\n", __func__, response->token);
  115. }
  116. void MqttAsyncClient::onDisconnectFailure(void* context, MQTTAsync_failureData* response) {
  117. printf("---Debug %s %d.\n", __func__, response->token);
  118. }
  119. void MqttAsyncClient::onSubscribeSuccess(void* context, MQTTAsync_successData* response) {
  120. printf("---Debug %s %d.\n", __func__, response->token);
  121. }
  122. void MqttAsyncClient::onSubscribeFailure(void* context, MQTTAsync_failureData* response) {
  123. printf("---Debug %s %d.\n", __func__, response->token);
  124. }
  125. void MqttAsyncClient::onSendMessageSuccess(void* context, MQTTAsync_successData* response) {
  126. printf("---Debug %s %d.\n", __func__, response->token);
  127. }
  128. void MqttAsyncClient::onSendMessageFailure(void* context, MQTTAsync_failureData* response) {
  129. printf("---Debug %s %d.\n", __func__, response->token);
  130. }