#include "mqtt_async.hpp" MqttAsyncClient::MqttAsyncClient(/* args */) { m_conn_opts_ = MQTTAsync_connectOptions_initializer; m_create_opts_ = MQTTAsync_createOptions_initializer; m_conn_opts_.context = this; m_conn_opts_.onSuccess = onConnectSuccess; m_conn_opts_.onFailure = onConnectFailure; m_conn_opts_.cleansession = true; m_conn_opts_.keepAliveInterval = 20; m_conn_opts_.automaticReconnect = true; m_conn_opts_.minRetryInterval = 3; m_conn_opts_.maxRetryInterval = 30; m_message_arrived_callback_ = nullptr; } MqttAsyncClient::~MqttAsyncClient() { onDisconnect(); MQTTAsync_destroy(&m_client_); } bool MqttAsyncClient::onConnect(std::string address, std::string client_id, int port) { int ret = MQTTASYNC_FAILURE; // address = address + ":" + std::to_string(port); if ((ret = MQTTAsync_createWithOptions(&m_client_, address.c_str(), client_id.c_str(), MQTTCLIENT_PERSISTENCE_NONE, NULL, &m_create_opts_) != MQTTASYNC_SUCCESS)) { printf("Failed to create client object, return code %d\n", ret); return false; } else { printf("Success to create client object: %s, client id: %s.\n", address.c_str(), client_id.c_str()); } if ((ret = MQTTAsync_setCallbacks(m_client_, this, onMessageLost, onMessageArrived, NULL)) != MQTTASYNC_SUCCESS) { printf("Failed to set callback, return code %d\n", ret); return false; } if ((ret = MQTTAsync_connect(m_client_, &m_conn_opts_)) != MQTTASYNC_SUCCESS) { printf("Failed to start connect, return code %d\n", ret); return false; } return true; } bool MqttAsyncClient::onDisconnect() { int ret = MQTTASYNC_FAILURE; MQTTAsync_disconnectOptions opts = MQTTAsync_disconnectOptions_initializer; opts.timeout = 30; opts.context = this; opts.onSuccess = NULL; opts.onFailure = NULL; if ((ret = MQTTAsync_disconnect(m_client_, &opts)) != MQTTASYNC_SUCCESS) { printf("Failed to disconnect, return code %d\n", ret); return false; } return true; } bool MqttAsyncClient::onSendMessage(std::string topic, unsigned char* message, int length, int qos) { int ret = MQTTASYNC_FAILURE; MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; MQTTAsync_message pubmsg = MQTTAsync_message_initializer; opts.context = this; opts.onSuccess = onSendMessageSuccess; opts.onFailure = onSendMessageFailure; pubmsg.payload = message; pubmsg.payloadlen = length; pubmsg.qos = qos; pubmsg.retained = 0; if ((ret = MQTTAsync_sendMessage(m_client_, topic.c_str(), &pubmsg, &opts)) != MQTTASYNC_SUCCESS) { printf("Failed to start sendMessage, return code %d\n", ret); return false; } return true; } void MqttAsyncClient::setCallback(MessageArrived callback) { m_message_arrived_callback_ = callback; } bool MqttAsyncClient::onSubscribe(std::string topic, int qos) { int ret = MQTTASYNC_FAILURE; MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; opts.context = this; opts.onSuccess = onSubscribeSuccess; opts.onFailure = onSubscribeFailure; if ((ret = MQTTAsync_subscribe(m_client_, topic.c_str(), qos, &opts)) != MQTTASYNC_SUCCESS) { printf("Failed to subscribe %s, return code %d\n", topic.c_str(), ret); return false; } return true; } int MqttAsyncClient::onMessageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message) { printf("---Debug %s %s %s.\n", __func__, topicName, (char*)message->payload); MqttAsyncClient* client = (MqttAsyncClient*)context; if (client->m_message_arrived_callback_ != nullptr) { return client->m_message_arrived_callback_(context, topicName, topicLen, message); } else { printf("nullptr"); } return message->msgid; } void MqttAsyncClient::onMessageLost(void *context, char *cause) { printf("---Debug %s.\n", __func__); } void MqttAsyncClient::onConnectSuccess(void* context, MQTTAsync_successData* response) { printf("---Debug %s %d.\n", __func__, response->token); } void MqttAsyncClient::onConnectFailure(void* context, MQTTAsync_failureData* response) { printf("---Debug %s %d.\n", __func__, response->token); } void MqttAsyncClient::onDisconnectSuccess(void* context, MQTTAsync_successData* response) { printf("---Debug %s %d.\n", __func__, response->token); } void MqttAsyncClient::onDisconnectFailure(void* context, MQTTAsync_failureData* response) { printf("---Debug %s %d.\n", __func__, response->token); } void MqttAsyncClient::onSubscribeSuccess(void* context, MQTTAsync_successData* response) { printf("---Debug %s %d.\n", __func__, response->token); } void MqttAsyncClient::onSubscribeFailure(void* context, MQTTAsync_failureData* response) { printf("---Debug %s %d.\n", __func__, response->token); } void MqttAsyncClient::onSendMessageSuccess(void* context, MQTTAsync_successData* response) { printf("---Debug %s %d.\n", __func__, response->token); } void MqttAsyncClient::onSendMessageFailure(void* context, MQTTAsync_failureData* response) { printf("---Debug %s %d.\n", __func__, response->token); }