123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169 |
- #include "mqtt_async.hpp"
- MqttAsyncClient::MqttAsyncClient(/* args */)
- {
- m_conn_opts_ = MQTTAsync_connectOptions_initializer;
- m_create_opts_ = MQTTAsync_createOptions_initializer;
- m_conn_opts_.context = this;
- m_conn_opts_.onSuccess = onConnectSuccess;
- m_conn_opts_.onFailure = onConnectFailure;
- m_conn_opts_.cleansession = true;
- m_conn_opts_.keepAliveInterval = 20;
- m_conn_opts_.automaticReconnect = true;
- m_conn_opts_.minRetryInterval = 3;
- m_conn_opts_.maxRetryInterval = 30;
- m_message_arrived_callback_ = nullptr;
- }
- MqttAsyncClient::~MqttAsyncClient()
- {
- onDisconnect();
- MQTTAsync_destroy(&m_client_);
- }
- bool MqttAsyncClient::onConnect(std::string address, std::string client_id, int port) {
- int ret = MQTTASYNC_FAILURE;
- // address = address + ":" + std::to_string(port);
- if ((ret = MQTTAsync_createWithOptions(&m_client_, address.c_str(), client_id.c_str(), MQTTCLIENT_PERSISTENCE_NONE, NULL, &m_create_opts_)
- != MQTTASYNC_SUCCESS))
- {
- printf("Failed to create client object, return code %d\n", ret);
- return false;
- }
- else
- {
- printf("Success to create client object: %s, client id: %s.\n", address.c_str(), client_id.c_str());
- }
- if ((ret = MQTTAsync_setCallbacks(m_client_, this, onMessageLost, onMessageArrived, NULL)) != MQTTASYNC_SUCCESS)
- {
- printf("Failed to set callback, return code %d\n", ret);
- return false;
- }
-
- if ((ret = MQTTAsync_connect(m_client_, &m_conn_opts_)) != MQTTASYNC_SUCCESS)
- {
- printf("Failed to start connect, return code %d\n", ret);
- return false;
- }
- return true;
- }
- bool MqttAsyncClient::onDisconnect() {
- int ret = MQTTASYNC_FAILURE;
- MQTTAsync_disconnectOptions opts = MQTTAsync_disconnectOptions_initializer;
- opts.timeout = 30;
- opts.context = this;
- opts.onSuccess = NULL;
- opts.onFailure = NULL;
- if ((ret = MQTTAsync_disconnect(m_client_, &opts)) != MQTTASYNC_SUCCESS)
- {
- printf("Failed to disconnect, return code %d\n", ret);
- return false;
- }
- return true;
- }
- bool MqttAsyncClient::onSendMessage(std::string topic, unsigned char* message, int length, int qos) {
- int ret = MQTTASYNC_FAILURE;
- MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
- MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
- opts.context = this;
- opts.onSuccess = onSendMessageSuccess;
- opts.onFailure = onSendMessageFailure;
- pubmsg.payload = message;
- pubmsg.payloadlen = length;
- pubmsg.qos = qos;
- pubmsg.retained = 0;
- if ((ret = MQTTAsync_sendMessage(m_client_, topic.c_str(), &pubmsg, &opts)) != MQTTASYNC_SUCCESS)
- {
- printf("Failed to start sendMessage, return code %d\n", ret);
- return false;
- }
- return true;
- }
- void MqttAsyncClient::setCallback(MessageArrived callback) {
- m_message_arrived_callback_ = callback;
- }
- bool MqttAsyncClient::onSubscribe(std::string topic, int qos) {
- int ret = MQTTASYNC_FAILURE;
- MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
- opts.context = this;
- opts.onSuccess = onSubscribeSuccess;
- opts.onFailure = onSubscribeFailure;
- if ((ret = MQTTAsync_subscribe(m_client_, topic.c_str(), qos, &opts)) != MQTTASYNC_SUCCESS)
- {
- printf("Failed to subscribe %s, return code %d\n", topic.c_str(), ret);
- return false;
- }
- return true;
- }
- int MqttAsyncClient::onMessageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message) {
- printf("---Debug %s %s %s.\n", __func__, topicName, (char*)message->payload);
- MqttAsyncClient* client = (MqttAsyncClient*)context;
- if (client->m_message_arrived_callback_ != nullptr) {
- return client->m_message_arrived_callback_(context, topicName, topicLen, message);
- } else {
- printf("nullptr");
- }
- return message->msgid;
- }
- void MqttAsyncClient::onMessageLost(void *context, char *cause) {
- printf("---Debug %s.\n", __func__);
- }
- void MqttAsyncClient::onConnectSuccess(void* context, MQTTAsync_successData* response) {
- printf("---Debug %s %d.\n", __func__, response->token);
- }
- void MqttAsyncClient::onConnectFailure(void* context, MQTTAsync_failureData* response) {
- printf("---Debug %s %d.\n", __func__, response->token);
- }
- void MqttAsyncClient::onDisconnectSuccess(void* context, MQTTAsync_successData* response) {
- printf("---Debug %s %d.\n", __func__, response->token);
- }
- void MqttAsyncClient::onDisconnectFailure(void* context, MQTTAsync_failureData* response) {
- printf("---Debug %s %d.\n", __func__, response->token);
- }
- void MqttAsyncClient::onSubscribeSuccess(void* context, MQTTAsync_successData* response) {
- printf("---Debug %s %d.\n", __func__, response->token);
- }
- void MqttAsyncClient::onSubscribeFailure(void* context, MQTTAsync_failureData* response) {
- printf("---Debug %s %d.\n", __func__, response->token);
- }
- void MqttAsyncClient::onSendMessageSuccess(void* context, MQTTAsync_successData* response) {
- printf("---Debug %s %d.\n", __func__, response->token);
- }
- void MqttAsyncClient::onSendMessageFailure(void* context, MQTTAsync_failureData* response) {
- printf("---Debug %s %d.\n", __func__, response->token);
- }
|