123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349 |
- #include "mqtt_async.h"
- MqttAsyncClient::MqttAsyncClient(/* args */) {
- m_etc_ = new MqttAsyncConfig();
- m_client_ = nullptr;
- m_message_arrived_callback_ = nullptr;
- m_connect_lost_callback_ = nullptr;
- m_connect_success_ = nullptr;
- m_disconnect_success_ = nullptr;
- m_subscribe_success_ = nullptr;
- m_unsubscribe_success_ = nullptr;
- m_send_success_ = nullptr;
- m_connect_failure_ = nullptr;
- m_disconnect_failure_ = nullptr;
- m_subscribe_failure_ = nullptr;
- m_unsubscribe_failure_ = nullptr;
- m_send_failure_ = nullptr;
- }
- MqttAsyncClient::~MqttAsyncClient() {
- Disconnect();
- MQTTAsync_destroy(&m_client_);
- }
- bool MqttAsyncClient::init_from_proto(const std::string &path) {
- auto ret = loadProtobufFile(path, *m_etc_);
- LOG(INFO) << m_etc_->DebugString();
- if (ret != SUCCESS) {
- LOG(ERROR) << "init config proto error from " << path;
- return false;
- }
- m_create_opts.maxBufferedMessages = m_etc_->create_opts().maxbufferedmessages();
- m_connect_ops.cleansession = m_etc_->connect_ops().cleansession();
- m_connect_ops.keepAliveInterval = m_etc_->connect_ops().keepaliveinterval();
- m_connect_ops.automaticReconnect = m_etc_->connect_ops().automaticreconnect();
- m_connect_ops.minRetryInterval = m_etc_->connect_ops().minretryinterval();
- m_connect_ops.maxRetryInterval = m_etc_->connect_ops().maxretryinterval();
- if (m_etc_->client_id() == "Auto") {
- m_etc_->set_client_id(std::to_string(std::chrono::steady_clock::now().time_since_epoch().count()));
- }
- LOG(INFO) << m_etc_->client_id();
- Connect(m_etc_->address(), m_etc_->client_id());
- return true;
- }
- bool MqttAsyncClient::Connect(const std::string &address, const std::string &client_id) {
- int ret = MQTTASYNC_FAILURE;
- if ((ret = MQTTAsync_createWithOptions(&m_client_, address.c_str(), client_id.c_str(), MQTTCLIENT_PERSISTENCE_NONE,
- nullptr, &m_create_opts)
- != MQTTASYNC_SUCCESS)) {
- LOG(ERROR) << "Failed to create client object, return code " << ret;
- return false;
- } else {
- DLOG(INFO) << "Success to create client object: " << address << ", client id: " << client_id << ".";
- }
- if ((ret = MQTTAsync_setCallbacks(m_client_, this, onMessageLost, onMessageArrived, nullptr)) !=
- MQTTASYNC_SUCCESS) {
- LOG(ERROR) << "Failed to set callback, return code " << ret;
- return false;
- }
- if ((ret = MQTTAsync_setConnected(m_client_, this, onConnected)) != MQTTASYNC_SUCCESS) {
- printf("Failed to set connected callback, return code %d\n", ret);
- return false;
- }
- if ((ret = MQTTAsync_connect(m_client_, &m_connect_ops)) != MQTTASYNC_SUCCESS) {
- printf("Failed to start connect, return code %d\n", ret);
- return false;
- }
- return true;
- }
- bool MqttAsyncClient::Disconnect() {
- DLOG(INFO) << __func__;
- int ret = MQTTASYNC_FAILURE;
- MQTTAsync_disconnectOptions opts = MQTTAsync_disconnectOptions_initializer;
- opts.timeout = 30;
- opts.context = this;
- opts.onSuccess = onDisconnectSuccess;
- opts.onFailure = onDisconnectFailure;
- if ((ret = MQTTAsync_disconnect(m_client_, &opts)) != MQTTASYNC_SUCCESS) {
- printf("Failed to disconnect, return code %d\n", ret);
- return false;
- }
- return true;
- }
- bool MqttAsyncClient::isConnected() {
- return MQTTAsync_isConnected(m_client_);
- }
- bool MqttAsyncClient::SendMessage(const std::string& topic, void *message, int length, int qos) {
- if (message == nullptr) {
- return false;
- }
- int ret = MQTTASYNC_FAILURE;
- MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
- MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
- opts.context = this;
- opts.onSuccess = onSendMessageSuccess;
- opts.onFailure = onSendMessageFailure;
- pubmsg.payload = message;
- pubmsg.payloadlen = length;
- pubmsg.retained = 0;
- m_etc_->sendmessage().size();
- // if (m_etc.findsendMessageEtc(topic) != nullptr) {
- // pubmsg.qos = m_etc.findsendMessageEtc(topic)->qos;
- // } else {
- pubmsg.qos = qos;
- // }
- if ((ret = MQTTAsync_sendMessage(m_client_, topic.c_str(), &pubmsg, &opts)) != MQTTASYNC_SUCCESS) {
- printf("Failed to start sendMessage, return code %d, connect statu %d\n", ret, MQTTAsync_isConnected(m_client_));
- return false;
- }
- return true;
- }
- void MqttAsyncClient::setCallback(MessageArrived callback) {
- DLOG(INFO) << __func__;
- m_message_arrived_callback_ = callback;
- }
- void MqttAsyncClient::setCallback(ConnectionLost callback) {
- DLOG(INFO) << __func__;
- m_connect_lost_callback_ = callback;
- }
- bool MqttAsyncClient::setCallback(OnSuccess callback, const std::string &function) {
- DLOG(INFO) << __func__;
- bool ret = true;
- if (function == "subscribe") {
- m_subscribe_success_ = callback;
- } else if (function == "unsubscribe") {
- m_unsubscribe_success_ = callback;
- } else if (function == "send") {
- m_send_success_ = callback;
- } else if (function == "connect") {
- m_connect_success_ = callback;
- } else if (function == "disconnect") {
- m_disconnect_success_ = callback;
- } else {
- ret = false;
- }
- return ret;
- }
- bool MqttAsyncClient::setCallback(OnFailure callback, const std::string &function) {
- DLOG(INFO) << __func__ << function;
- bool ret = true;
- if (function == "subscribe") {
- m_subscribe_failure_ = callback;
- } else if (function == "unsubscribe") {
- m_unsubscribe_failure_ = callback;
- } else if (function == "send") {
- m_send_failure_ = callback;
- } else if (function == "connect") {
- m_connect_failure_ = callback;
- } else if (function == "disconnect") {
- m_disconnect_failure_ = callback;
- } else {
- ret = false;
- }
- return ret;
- }
- google::protobuf::RepeatedPtrField<SubscribeEtc> MqttAsyncClient::getSubscribeEtc() {
- return m_etc_->subscribe();
- }
- bool MqttAsyncClient::Subscribe(const std::string &topic, int qos) {
- int ret = MQTTASYNC_FAILURE;
- MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
- opts.context = this;
- opts.onSuccess = onSubscribeSuccess;
- opts.onFailure = onSubscribeFailure;
- if ((ret = MQTTAsync_subscribe(m_client_, topic.c_str(), qos, &opts)) != MQTTASYNC_SUCCESS) {
- DLOG(INFO) << "Failed to subscribe " << topic << ", return code " << ret;
- } else {
- DLOG(INFO) << "Success to subscribe " << topic;
- }
- return true;
- }
- bool MqttAsyncClient::unSubscribe(const std::string &topic) {
- DLOG(INFO) << __func__;
- int ret = MQTTASYNC_FAILURE;
- MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
- opts.context = this;
- opts.onSuccess = onUnSubscribeSuccess;
- opts.onFailure = onUnSubscribeFailure;
- if ((ret = MQTTAsync_unsubscribe(m_client_, topic.c_str(), &opts)) != MQTTASYNC_SUCCESS) {
- printf("Failed to subscribe %s, return code %d\n", topic.c_str(), ret);
- return false;
- }
- return true;
- }
- int MqttAsyncClient::onMessageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message) {
- bool ret = true;
- auto *client = (MqttAsyncClient *) context;
- if (client->m_message_arrived_callback_ != nullptr) {
- ret = client->m_message_arrived_callback_(client, topicName, topicLen, message);
- } else {
- }
- MQTTAsync_freeMessage(&message);
- MQTTAsync_free(topicName);
- return ret;
- }
- void MqttAsyncClient::onMessageLost(void *context, char *cause) {
- if (context == nullptr || cause == nullptr) {
- DLOG(INFO) << __func__;
- }
- auto *client = (MqttAsyncClient *) context;
- if (client->m_connect_lost_callback_ != nullptr) {
- client->m_connect_lost_callback_(client, cause);
- } else {
- }
- }
- void MqttAsyncClient::onConnectSuccess(void *context, MQTTAsync_successData *response) {
- DLOG(INFO) << __func__;
- auto *client = (MqttAsyncClient *) context;
- if (client->m_connect_success_ != nullptr) {
- DLOG(INFO) << "run m_connect_success_.";
- client->m_connect_success_(client, response);
- }
- DLOG(INFO) << "m_connect_success_ is nullptr.";
- }
- void MqttAsyncClient::onConnectFailure(void *context, MQTTAsync_failureData *response) {
- DLOG(INFO) << __func__;
- auto *client = (MqttAsyncClient *) context;
- if (client->m_connect_failure_ != nullptr) {
- client->m_connect_failure_(client, response);
- } else {
- }
- }
- void MqttAsyncClient::onDisconnectSuccess(void *context, MQTTAsync_successData *response) {
- DLOG(INFO) << __func__;
- auto *client = (MqttAsyncClient *) context;
- if (client->m_disconnect_success_ != nullptr) {
- client->m_disconnect_success_(client, response);
- } else {
- }
- }
- void MqttAsyncClient::onDisconnectFailure(void *context, MQTTAsync_failureData *response) {
- DLOG(INFO) << __func__;
- auto *client = (MqttAsyncClient *) context;
- if (client->m_disconnect_failure_ != nullptr) {
- client->m_disconnect_failure_(client, response);
- } else {
- }
- }
- void MqttAsyncClient::onSubscribeSuccess(void *context, MQTTAsync_successData *response) {
- DLOG(INFO) << __func__;
- auto *client = (MqttAsyncClient *) context;
- if (client->m_subscribe_success_ != nullptr) {
- client->m_subscribe_success_(client, response);
- } else {
- }
- }
- void MqttAsyncClient::onSubscribeFailure(void *context, MQTTAsync_failureData *response) {
- DLOG(INFO) << __func__;
- auto *client = (MqttAsyncClient *) context;
- if (client->m_subscribe_failure_ != nullptr) {
- client->m_subscribe_failure_(client, response);
- } else {
- }
- }
- void MqttAsyncClient::onUnSubscribeSuccess(void *context, MQTTAsync_successData *response) {
- DLOG(INFO) << __func__;
- auto *client = (MqttAsyncClient *) context;
- if (client->m_unsubscribe_success_ != nullptr) {
- client->m_unsubscribe_success_(client, response);
- } else {
- }
- }
- void MqttAsyncClient::onUnSubscribeFailure(void *context, MQTTAsync_failureData *response) {
- DLOG(INFO) << __func__;
- auto *client = (MqttAsyncClient *) context;
- if (client->m_unsubscribe_failure_ != nullptr) {
- client->m_unsubscribe_failure_(client, response);
- } else {
- }
- }
- void MqttAsyncClient::onSendMessageSuccess(void *context, MQTTAsync_successData *response) {
- DLOG(INFO) << __func__;
- auto *client = (MqttAsyncClient *) context;
- if (client->m_send_success_ != nullptr) {
- client->m_send_success_(client, response);
- } else {
- }
- }
- void MqttAsyncClient::onSendMessageFailure(void *context, MQTTAsync_failureData *response) {
- DLOG(INFO) << __func__;
- auto *client = (MqttAsyncClient *) context;
- if (client->m_send_failure_ != nullptr) {
- client->m_send_failure_(client, response);
- } else {
- }
- }
- void MqttAsyncClient::onConnected(void *context, char *cause) {
- DLOG(INFO) << __func__;
- auto *client = (MqttAsyncClient *) context;
- if (client == nullptr || client->m_etc_->subscribe_size() == 0) {
- return;
- }
- for (auto &sub: client->m_etc_->subscribe()) {
- client->Subscribe(sub.topic(), sub.qos());
- }
- }
|