#include "mqtt_async.h" MqttAsyncClient::MqttAsyncClient(/* args */) { m_etc_ = new MqttAsyncConfig(); m_client_ = nullptr; m_message_arrived_callback_ = nullptr; m_connect_lost_callback_ = nullptr; m_connect_success_ = nullptr; m_disconnect_success_ = nullptr; m_subscribe_success_ = nullptr; m_unsubscribe_success_ = nullptr; m_send_success_ = nullptr; m_connect_failure_ = nullptr; m_disconnect_failure_ = nullptr; m_subscribe_failure_ = nullptr; m_unsubscribe_failure_ = nullptr; m_send_failure_ = nullptr; } MqttAsyncClient::~MqttAsyncClient() { Disconnect(); MQTTAsync_destroy(&m_client_); } bool MqttAsyncClient::init_from_proto(const std::string &path) { auto ret = loadProtobufFile(path, *m_etc_); LOG(INFO) << m_etc_->DebugString(); if (ret != SUCCESS) { LOG(ERROR) << "init config proto error from " << path; return false; } m_create_opts.maxBufferedMessages = m_etc_->create_opts().maxbufferedmessages(); m_connect_ops.cleansession = m_etc_->connect_ops().cleansession(); m_connect_ops.keepAliveInterval = m_etc_->connect_ops().keepaliveinterval(); m_connect_ops.automaticReconnect = m_etc_->connect_ops().automaticreconnect(); m_connect_ops.minRetryInterval = m_etc_->connect_ops().minretryinterval(); m_connect_ops.maxRetryInterval = m_etc_->connect_ops().maxretryinterval(); if (m_etc_->client_id() == "Auto") { m_etc_->set_client_id(std::to_string(std::chrono::steady_clock::now().time_since_epoch().count())); } LOG(INFO) << m_etc_->client_id(); Connect(m_etc_->address(), m_etc_->client_id()); return true; } bool MqttAsyncClient::Connect(const std::string &address, const std::string &client_id) { int ret = MQTTASYNC_FAILURE; if ((ret = MQTTAsync_createWithOptions(&m_client_, address.c_str(), client_id.c_str(), MQTTCLIENT_PERSISTENCE_NONE, nullptr, &m_create_opts) != MQTTASYNC_SUCCESS)) { LOG(ERROR) << "Failed to create client object, return code " << ret; return false; } else { DLOG(INFO) << "Success to create client object: " << address << ", client id: " << client_id << "."; } if ((ret = MQTTAsync_setCallbacks(m_client_, this, onMessageLost, onMessageArrived, nullptr)) != MQTTASYNC_SUCCESS) { LOG(ERROR) << "Failed to set callback, return code " << ret; return false; } if ((ret = MQTTAsync_setConnected(m_client_, this, onConnected)) != MQTTASYNC_SUCCESS) { printf("Failed to set connected callback, return code %d\n", ret); return false; } if ((ret = MQTTAsync_connect(m_client_, &m_connect_ops)) != MQTTASYNC_SUCCESS) { printf("Failed to start connect, return code %d\n", ret); return false; } return true; } bool MqttAsyncClient::Disconnect() { DLOG(INFO) << __func__; int ret = MQTTASYNC_FAILURE; MQTTAsync_disconnectOptions opts = MQTTAsync_disconnectOptions_initializer; opts.timeout = 30; opts.context = this; opts.onSuccess = onDisconnectSuccess; opts.onFailure = onDisconnectFailure; if ((ret = MQTTAsync_disconnect(m_client_, &opts)) != MQTTASYNC_SUCCESS) { printf("Failed to disconnect, return code %d\n", ret); return false; } return true; } bool MqttAsyncClient::isConnected() { return MQTTAsync_isConnected(m_client_); } bool MqttAsyncClient::SendMessage(const std::string& topic, void *message, int length, int qos) { if (message == nullptr) { return false; } int ret = MQTTASYNC_FAILURE; MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; MQTTAsync_message pubmsg = MQTTAsync_message_initializer; opts.context = this; opts.onSuccess = onSendMessageSuccess; opts.onFailure = onSendMessageFailure; pubmsg.payload = message; pubmsg.payloadlen = length; pubmsg.retained = 0; m_etc_->sendmessage().size(); // if (m_etc.findsendMessageEtc(topic) != nullptr) { // pubmsg.qos = m_etc.findsendMessageEtc(topic)->qos; // } else { pubmsg.qos = qos; // } if ((ret = MQTTAsync_sendMessage(m_client_, topic.c_str(), &pubmsg, &opts)) != MQTTASYNC_SUCCESS) { printf("Failed to start sendMessage, return code %d, connect statu %d\n", ret, MQTTAsync_isConnected(m_client_)); return false; } return true; } void MqttAsyncClient::setCallback(MessageArrived callback) { DLOG(INFO) << __func__; m_message_arrived_callback_ = callback; } void MqttAsyncClient::setCallback(ConnectionLost callback) { DLOG(INFO) << __func__; m_connect_lost_callback_ = callback; } bool MqttAsyncClient::setCallback(OnSuccess callback, const std::string &function) { DLOG(INFO) << __func__; bool ret = true; if (function == "subscribe") { m_subscribe_success_ = callback; } else if (function == "unsubscribe") { m_unsubscribe_success_ = callback; } else if (function == "send") { m_send_success_ = callback; } else if (function == "connect") { m_connect_success_ = callback; } else if (function == "disconnect") { m_disconnect_success_ = callback; } else { ret = false; } return ret; } bool MqttAsyncClient::setCallback(OnFailure callback, const std::string &function) { DLOG(INFO) << __func__ << function; bool ret = true; if (function == "subscribe") { m_subscribe_failure_ = callback; } else if (function == "unsubscribe") { m_unsubscribe_failure_ = callback; } else if (function == "send") { m_send_failure_ = callback; } else if (function == "connect") { m_connect_failure_ = callback; } else if (function == "disconnect") { m_disconnect_failure_ = callback; } else { ret = false; } return ret; } google::protobuf::RepeatedPtrField MqttAsyncClient::getSubscribeEtc() { return m_etc_->subscribe(); } bool MqttAsyncClient::Subscribe(const std::string &topic, int qos) { int ret = MQTTASYNC_FAILURE; MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; opts.context = this; opts.onSuccess = onSubscribeSuccess; opts.onFailure = onSubscribeFailure; if ((ret = MQTTAsync_subscribe(m_client_, topic.c_str(), qos, &opts)) != MQTTASYNC_SUCCESS) { DLOG(INFO) << "Failed to subscribe " << topic << ", return code " << ret; } else { DLOG(INFO) << "Success to subscribe " << topic; } return true; } bool MqttAsyncClient::unSubscribe(const std::string &topic) { DLOG(INFO) << __func__; int ret = MQTTASYNC_FAILURE; MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; opts.context = this; opts.onSuccess = onUnSubscribeSuccess; opts.onFailure = onUnSubscribeFailure; if ((ret = MQTTAsync_unsubscribe(m_client_, topic.c_str(), &opts)) != MQTTASYNC_SUCCESS) { printf("Failed to subscribe %s, return code %d\n", topic.c_str(), ret); return false; } return true; } int MqttAsyncClient::onMessageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message) { bool ret = true; auto *client = (MqttAsyncClient *) context; if (client->m_message_arrived_callback_ != nullptr) { ret = client->m_message_arrived_callback_(client, topicName, topicLen, message); } else { } MQTTAsync_freeMessage(&message); MQTTAsync_free(topicName); return ret; } void MqttAsyncClient::onMessageLost(void *context, char *cause) { if (context == nullptr || cause == nullptr) { DLOG(INFO) << __func__; } auto *client = (MqttAsyncClient *) context; if (client->m_connect_lost_callback_ != nullptr) { client->m_connect_lost_callback_(client, cause); } else { } } void MqttAsyncClient::onConnectSuccess(void *context, MQTTAsync_successData *response) { DLOG(INFO) << __func__; auto *client = (MqttAsyncClient *) context; if (client->m_connect_success_ != nullptr) { DLOG(INFO) << "run m_connect_success_."; client->m_connect_success_(client, response); } DLOG(INFO) << "m_connect_success_ is nullptr."; } void MqttAsyncClient::onConnectFailure(void *context, MQTTAsync_failureData *response) { DLOG(INFO) << __func__; auto *client = (MqttAsyncClient *) context; if (client->m_connect_failure_ != nullptr) { client->m_connect_failure_(client, response); } else { } } void MqttAsyncClient::onDisconnectSuccess(void *context, MQTTAsync_successData *response) { DLOG(INFO) << __func__; auto *client = (MqttAsyncClient *) context; if (client->m_disconnect_success_ != nullptr) { client->m_disconnect_success_(client, response); } else { } } void MqttAsyncClient::onDisconnectFailure(void *context, MQTTAsync_failureData *response) { DLOG(INFO) << __func__; auto *client = (MqttAsyncClient *) context; if (client->m_disconnect_failure_ != nullptr) { client->m_disconnect_failure_(client, response); } else { } } void MqttAsyncClient::onSubscribeSuccess(void *context, MQTTAsync_successData *response) { DLOG(INFO) << __func__; auto *client = (MqttAsyncClient *) context; if (client->m_subscribe_success_ != nullptr) { client->m_subscribe_success_(client, response); } else { } } void MqttAsyncClient::onSubscribeFailure(void *context, MQTTAsync_failureData *response) { DLOG(INFO) << __func__; auto *client = (MqttAsyncClient *) context; if (client->m_subscribe_failure_ != nullptr) { client->m_subscribe_failure_(client, response); } else { } } void MqttAsyncClient::onUnSubscribeSuccess(void *context, MQTTAsync_successData *response) { DLOG(INFO) << __func__; auto *client = (MqttAsyncClient *) context; if (client->m_unsubscribe_success_ != nullptr) { client->m_unsubscribe_success_(client, response); } else { } } void MqttAsyncClient::onUnSubscribeFailure(void *context, MQTTAsync_failureData *response) { DLOG(INFO) << __func__; auto *client = (MqttAsyncClient *) context; if (client->m_unsubscribe_failure_ != nullptr) { client->m_unsubscribe_failure_(client, response); } else { } } void MqttAsyncClient::onSendMessageSuccess(void *context, MQTTAsync_successData *response) { DLOG(INFO) << __func__; auto *client = (MqttAsyncClient *) context; if (client->m_send_success_ != nullptr) { client->m_send_success_(client, response); } else { } } void MqttAsyncClient::onSendMessageFailure(void *context, MQTTAsync_failureData *response) { DLOG(INFO) << __func__; auto *client = (MqttAsyncClient *) context; if (client->m_send_failure_ != nullptr) { client->m_send_failure_(client, response); } else { } } void MqttAsyncClient::onConnected(void *context, char *cause) { DLOG(INFO) << __func__; auto *client = (MqttAsyncClient *) context; if (client == nullptr || client->m_etc_->subscribe_size() == 0) { return; } for (auto &sub: client->m_etc_->subscribe()) { client->Subscribe(sub.topic(), sub.qos()); } }