mqtt_async.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351
  1. #include "mqtt_async.h"
  2. MqttAsyncClient::MqttAsyncClient(/* args */) {
  3. m_etc_ = new MqttAsyncConfig();
  4. m_client_ = nullptr;
  5. m_message_arrived_callback_ = nullptr;
  6. m_connect_lost_callback_ = nullptr;
  7. m_connect_success_ = nullptr;
  8. m_disconnect_success_ = nullptr;
  9. m_subscribe_success_ = nullptr;
  10. m_unsubscribe_success_ = nullptr;
  11. m_send_success_ = nullptr;
  12. m_connect_failure_ = nullptr;
  13. m_disconnect_failure_ = nullptr;
  14. m_subscribe_failure_ = nullptr;
  15. m_unsubscribe_failure_ = nullptr;
  16. m_send_failure_ = nullptr;
  17. }
  18. MqttAsyncClient::~MqttAsyncClient() {
  19. Disconnect();
  20. MQTTAsync_destroy(&m_client_);
  21. }
  22. bool MqttAsyncClient::init_from_proto(const std::string &path) {
  23. Json::Value config;
  24. ReadJsonFile(path, config);
  25. auto ret = google::protobuf::util::JsonStringToMessage(config.toStyledString(), m_etc_);
  26. LOG(INFO) << m_etc_->DebugString();
  27. if (!ret.ok()) {
  28. LOG(ERROR) << "init config proto error from " << path;
  29. return false;
  30. }
  31. m_create_opts.maxBufferedMessages = m_etc_->create_opts().maxbufferedmessages();
  32. m_connect_ops.cleansession = m_etc_->connect_ops().cleansession();
  33. m_connect_ops.keepAliveInterval = m_etc_->connect_ops().keepaliveinterval();
  34. m_connect_ops.automaticReconnect = m_etc_->connect_ops().automaticreconnect();
  35. m_connect_ops.minRetryInterval = m_etc_->connect_ops().minretryinterval();
  36. m_connect_ops.maxRetryInterval = m_etc_->connect_ops().maxretryinterval();
  37. if (m_etc_->client_id() == "Auto") {
  38. m_etc_->set_client_id(std::to_string(std::chrono::steady_clock::now().time_since_epoch().count()));
  39. }
  40. LOG(INFO) << m_etc_->client_id();
  41. Connect(m_etc_->address(), m_etc_->client_id());
  42. return true;
  43. }
  44. bool MqttAsyncClient::Connect(const std::string &address, const std::string &client_id) {
  45. int ret = MQTTASYNC_FAILURE;
  46. if ((ret = MQTTAsync_createWithOptions(&m_client_, address.c_str(), client_id.c_str(), MQTTCLIENT_PERSISTENCE_NONE,
  47. nullptr, &m_create_opts)
  48. != MQTTASYNC_SUCCESS)) {
  49. LOG(ERROR) << "Failed to create client object, return code " << ret;
  50. return false;
  51. } else {
  52. DLOG(INFO) << "Success to create client object: " << address << ", client id: " << client_id << ".";
  53. }
  54. if ((ret = MQTTAsync_setCallbacks(m_client_, this, onMessageLost, onMessageArrived, nullptr)) !=
  55. MQTTASYNC_SUCCESS) {
  56. LOG(ERROR) << "Failed to set callback, return code " << ret;
  57. return false;
  58. }
  59. if ((ret = MQTTAsync_setConnected(m_client_, this, onConnected)) != MQTTASYNC_SUCCESS) {
  60. printf("Failed to set connected callback, return code %d\n", ret);
  61. return false;
  62. }
  63. if ((ret = MQTTAsync_connect(m_client_, &m_connect_ops)) != MQTTASYNC_SUCCESS) {
  64. printf("Failed to start connect, return code %d\n", ret);
  65. return false;
  66. }
  67. return true;
  68. }
  69. bool MqttAsyncClient::Disconnect() {
  70. DLOG(INFO) << __func__;
  71. int ret = MQTTASYNC_FAILURE;
  72. MQTTAsync_disconnectOptions opts = MQTTAsync_disconnectOptions_initializer;
  73. opts.timeout = 30;
  74. opts.context = this;
  75. opts.onSuccess = onDisconnectSuccess;
  76. opts.onFailure = onDisconnectFailure;
  77. if ((ret = MQTTAsync_disconnect(m_client_, &opts)) != MQTTASYNC_SUCCESS) {
  78. printf("Failed to disconnect, return code %d\n", ret);
  79. return false;
  80. }
  81. return true;
  82. }
  83. bool MqttAsyncClient::isConnected() {
  84. return MQTTAsync_isConnected(m_client_);
  85. }
  86. bool MqttAsyncClient::SendMessage(std::string topic, void *message, int length, int qos) {
  87. if (message == nullptr) {
  88. return false;
  89. }
  90. int ret = MQTTASYNC_FAILURE;
  91. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  92. MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
  93. opts.context = this;
  94. opts.onSuccess = onSendMessageSuccess;
  95. opts.onFailure = onSendMessageFailure;
  96. pubmsg.payload = message;
  97. pubmsg.payloadlen = length;
  98. pubmsg.retained = 0;
  99. m_etc_->sendmessage().size();
  100. // if (m_etc.findsendMessageEtc(topic) != nullptr) {
  101. // pubmsg.qos = m_etc.findsendMessageEtc(topic)->qos;
  102. // } else {
  103. pubmsg.qos = qos;
  104. // }
  105. if ((ret = MQTTAsync_sendMessage(m_client_, topic.c_str(), &pubmsg, &opts)) != MQTTASYNC_SUCCESS) {
  106. printf("Failed to start sendMessage, return code %d, connect statu %d\n", ret, MQTTAsync_isConnected(m_client_));
  107. return false;
  108. }
  109. return true;
  110. }
  111. void MqttAsyncClient::setCallback(MessageArrived callback) {
  112. DLOG(INFO) << __func__;
  113. m_message_arrived_callback_ = callback;
  114. }
  115. void MqttAsyncClient::setCallback(ConnectionLost callback) {
  116. DLOG(INFO) << __func__;
  117. m_connect_lost_callback_ = callback;
  118. }
  119. bool MqttAsyncClient::setCallback(OnSuccess callback, const std::string &function) {
  120. DLOG(INFO) << __func__;
  121. bool ret = true;
  122. if (function == "subscribe") {
  123. m_subscribe_success_ = callback;
  124. } else if (function == "unsubscribe") {
  125. m_unsubscribe_success_ = callback;
  126. } else if (function == "send") {
  127. m_send_success_ = callback;
  128. } else if (function == "connect") {
  129. m_connect_success_ = callback;
  130. } else if (function == "disconnect") {
  131. m_disconnect_success_ = callback;
  132. } else {
  133. ret = false;
  134. }
  135. return ret;
  136. }
  137. bool MqttAsyncClient::setCallback(OnFailure callback, const std::string &function) {
  138. DLOG(INFO) << __func__ << function;
  139. bool ret = true;
  140. if (function == "subscribe") {
  141. m_subscribe_failure_ = callback;
  142. } else if (function == "unsubscribe") {
  143. m_unsubscribe_failure_ = callback;
  144. } else if (function == "send") {
  145. m_send_failure_ = callback;
  146. } else if (function == "connect") {
  147. m_connect_failure_ = callback;
  148. } else if (function == "disconnect") {
  149. m_disconnect_failure_ = callback;
  150. } else {
  151. ret = false;
  152. }
  153. return ret;
  154. }
  155. google::protobuf::RepeatedPtrField<SubscribeEtc> MqttAsyncClient::getSubscribeEtc() {
  156. return m_etc_->subscribe();
  157. }
  158. bool MqttAsyncClient::Subscribe(const std::string &topic, int qos) {
  159. int ret = MQTTASYNC_FAILURE;
  160. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  161. opts.context = this;
  162. opts.onSuccess = onSubscribeSuccess;
  163. opts.onFailure = onSubscribeFailure;
  164. if ((ret = MQTTAsync_subscribe(m_client_, topic.c_str(), qos, &opts)) != MQTTASYNC_SUCCESS) {
  165. DLOG(INFO) << "Failed to subscribe " << topic << ", return code " << ret;
  166. } else {
  167. DLOG(INFO) << "Success to subscribe " << topic;
  168. }
  169. return true;
  170. }
  171. bool MqttAsyncClient::unSubscribe(const std::string &topic) {
  172. DLOG(INFO) << __func__;
  173. int ret = MQTTASYNC_FAILURE;
  174. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  175. opts.context = this;
  176. opts.onSuccess = onUnSubscribeSuccess;
  177. opts.onFailure = onUnSubscribeFailure;
  178. if ((ret = MQTTAsync_unsubscribe(m_client_, topic.c_str(), &opts)) != MQTTASYNC_SUCCESS) {
  179. printf("Failed to subscribe %s, return code %d\n", topic.c_str(), ret);
  180. return false;
  181. }
  182. return true;
  183. }
  184. int MqttAsyncClient::onMessageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message) {
  185. bool ret = true;
  186. auto *client = (MqttAsyncClient *) context;
  187. if (client->m_message_arrived_callback_ != nullptr) {
  188. ret = client->m_message_arrived_callback_(client, topicName, topicLen, message);
  189. } else {
  190. }
  191. MQTTAsync_freeMessage(&message);
  192. MQTTAsync_free(topicName);
  193. return ret;
  194. }
  195. void MqttAsyncClient::onMessageLost(void *context, char *cause) {
  196. if (context == nullptr || cause == nullptr) {
  197. DLOG(INFO) << __func__;
  198. }
  199. auto *client = (MqttAsyncClient *) context;
  200. if (client->m_connect_lost_callback_ != nullptr) {
  201. client->m_connect_lost_callback_(client, cause);
  202. } else {
  203. }
  204. }
  205. void MqttAsyncClient::onConnectSuccess(void *context, MQTTAsync_successData *response) {
  206. DLOG(INFO) << __func__;
  207. auto *client = (MqttAsyncClient *) context;
  208. if (client->m_connect_success_ != nullptr) {
  209. DLOG(INFO) << "run m_connect_success_.";
  210. client->m_connect_success_(client, response);
  211. }
  212. DLOG(INFO) << "m_connect_success_ is nullptr.";
  213. }
  214. void MqttAsyncClient::onConnectFailure(void *context, MQTTAsync_failureData *response) {
  215. DLOG(INFO) << __func__;
  216. auto *client = (MqttAsyncClient *) context;
  217. if (client->m_connect_failure_ != nullptr) {
  218. client->m_connect_failure_(client, response);
  219. } else {
  220. }
  221. }
  222. void MqttAsyncClient::onDisconnectSuccess(void *context, MQTTAsync_successData *response) {
  223. DLOG(INFO) << __func__;
  224. auto *client = (MqttAsyncClient *) context;
  225. if (client->m_disconnect_success_ != nullptr) {
  226. client->m_disconnect_success_(client, response);
  227. } else {
  228. }
  229. }
  230. void MqttAsyncClient::onDisconnectFailure(void *context, MQTTAsync_failureData *response) {
  231. DLOG(INFO) << __func__;
  232. auto *client = (MqttAsyncClient *) context;
  233. if (client->m_disconnect_failure_ != nullptr) {
  234. client->m_disconnect_failure_(client, response);
  235. } else {
  236. }
  237. }
  238. void MqttAsyncClient::onSubscribeSuccess(void *context, MQTTAsync_successData *response) {
  239. DLOG(INFO) << __func__;
  240. auto *client = (MqttAsyncClient *) context;
  241. if (client->m_subscribe_success_ != nullptr) {
  242. client->m_subscribe_success_(client, response);
  243. } else {
  244. }
  245. }
  246. void MqttAsyncClient::onSubscribeFailure(void *context, MQTTAsync_failureData *response) {
  247. DLOG(INFO) << __func__;
  248. auto *client = (MqttAsyncClient *) context;
  249. if (client->m_subscribe_failure_ != nullptr) {
  250. client->m_subscribe_failure_(client, response);
  251. } else {
  252. }
  253. }
  254. void MqttAsyncClient::onUnSubscribeSuccess(void *context, MQTTAsync_successData *response) {
  255. DLOG(INFO) << __func__;
  256. auto *client = (MqttAsyncClient *) context;
  257. if (client->m_unsubscribe_success_ != nullptr) {
  258. client->m_unsubscribe_success_(client, response);
  259. } else {
  260. }
  261. }
  262. void MqttAsyncClient::onUnSubscribeFailure(void *context, MQTTAsync_failureData *response) {
  263. DLOG(INFO) << __func__;
  264. auto *client = (MqttAsyncClient *) context;
  265. if (client->m_unsubscribe_failure_ != nullptr) {
  266. client->m_unsubscribe_failure_(client, response);
  267. } else {
  268. }
  269. }
  270. void MqttAsyncClient::onSendMessageSuccess(void *context, MQTTAsync_successData *response) {
  271. DLOG(INFO) << __func__;
  272. auto *client = (MqttAsyncClient *) context;
  273. if (client->m_send_success_ != nullptr) {
  274. client->m_send_success_(client, response);
  275. } else {
  276. }
  277. }
  278. void MqttAsyncClient::onSendMessageFailure(void *context, MQTTAsync_failureData *response) {
  279. DLOG(INFO) << __func__;
  280. auto *client = (MqttAsyncClient *) context;
  281. if (client->m_send_failure_ != nullptr) {
  282. client->m_send_failure_(client, response);
  283. } else {
  284. }
  285. }
  286. void MqttAsyncClient::onConnected(void *context, char *cause) {
  287. DLOG(INFO) << __func__;
  288. auto *client = (MqttAsyncClient *) context;
  289. if (client == nullptr || client->m_etc_->subscribe_size() == 0) {
  290. return;
  291. }
  292. for (auto &sub: client->m_etc_->subscribe()) {
  293. client->Subscribe(sub.topic(), sub.qos());
  294. }
  295. }