// // Created by zx on 23-2-22. // #include "paho_client.h" Paho_client::Paho_client(std::string clientid){ clientid_=clientid; context_= nullptr; connected_=false; } Paho_client::~Paho_client(){} void Paho_client::set_maxbuf(int size) { create_opts_.maxBufferedMessages=size+256; } bool Paho_client::connect(std::string address,int port){ char connstr[255]={0}; sprintf(connstr,"mqtt://%s:%d",address.c_str(),port); int rc; if ((rc=MQTTAsync_createWithOptions(&client_,connstr,clientid_.c_str(),MQTTCLIENT_PERSISTENCE_NONE,NULL,&create_opts_) != MQTTASYNC_SUCCESS)) { printf("Failed to create client object, return code %d\n", rc); return false; } if ((rc = MQTTAsync_setCallbacks(client_, this, connlost, messageArrived, NULL)) != MQTTASYNC_SUCCESS) { printf("Failed to set callback, return code %d\n", rc); return false; } conn_opts_.keepAliveInterval = 20; conn_opts_.cleansession = 1; conn_opts_.onSuccess = onConnect; conn_opts_.onFailure = onConnectFailure; conn_opts_.context = this; if ((rc = MQTTAsync_connect(client_, &conn_opts_)) == MQTTASYNC_SUCCESS) { printf("Connected success \n"); } address_=address; port_=port; return true; } bool Paho_client::disconnect(){ return true; } void Paho_client::publish(const std::string& topic,int QOS,const MqttMsg& msg){ if(connected_==false) { printf(" pls connect before publish\n"); return; } pub_opts_.onSuccess = onSend; pub_opts_.onFailure = onSendFailure; pub_opts_.context = client_; pubmsg_.payload = msg.data(); pubmsg_.payloadlen = msg.length(); pubmsg_.qos = QOS; pubmsg_.retained = 0; int rc=-1; if ((rc = MQTTAsync_sendMessage(client_, topic.c_str(), &pubmsg_, &pub_opts_)) != MQTTASYNC_SUCCESS) { printf("Failed to start sendMessage, return code %d\n", rc); } } void Paho_client::subcribe(std::string topic,int QOS,ArrivedCallback callback,void* context){ arrivedCallback_=callback; context_=context; MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n" "\n", topic.c_str(), clientid_.c_str(), QOS); opts.onSuccess = onSubscribe; opts.onFailure = onSubscribeFailure; opts.context = this; int rc=-1; if ((rc = MQTTAsync_subscribe(client_, topic.c_str(), QOS, &opts)) != MQTTASYNC_SUCCESS) { printf("Failed to start subscribe, return code %d\n", rc); return; } } int Paho_client::messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message) { Paho_client* cl=(Paho_client*)context; MqttMsg msg((char*)message->payload,message->payloadlen); if(cl->arrivedCallback_!= nullptr) cl->arrivedCallback_(topicName,message->qos,msg,cl->context_); MQTTAsync_freeMessage(&message); MQTTAsync_free(topicName); return 1; } void Paho_client::connlost(void *context, char *cause) { Paho_client* cl=(Paho_client*)context; MQTTAsync client = cl->client_; MQTTAsync_connectOptions conn_opts = cl->conn_opts_; int rc; printf("\nConnection lost\n"); printf(" cause: %s\n", cause); printf("Reconnecting\n"); conn_opts.keepAliveInterval = 20; conn_opts.cleansession = 1; conn_opts.onSuccess = onConnect; conn_opts.onFailure = onConnectFailure; conn_opts.context = context; cl->connected_=false; if ((rc = MQTTAsync_connect(client, &conn_opts)) == MQTTASYNC_SUCCESS) { printf(" Reconnected success\n"); cl->connected_=true; } } void Paho_client::onConnectFailure(void* context, MQTTAsync_failureData* response) { Paho_client* cl=(Paho_client*)context; printf("clientid:%s,Connect failed, rc %d\n", cl->clientid_.c_str(),response ? response->code : 0); int rc=-1; if ((rc = MQTTAsync_connect(cl->client_, &(cl->conn_opts_))) == MQTTASYNC_SUCCESS) { printf(" Reconnected success\n"); } } void Paho_client::onSubscribe(void* context, MQTTAsync_successData* response) { Paho_client* cl=(Paho_client*)context; printf("clientid:%s Subscribe succeeded\n",cl->clientid_.c_str()); } void Paho_client::onSubscribeFailure(void* context, MQTTAsync_failureData* response) { Paho_client* cl=(Paho_client*)context; printf("clientid:%s Subscribe failed, rc %d\n",cl->clientid_.c_str(), response->code); } void Paho_client::onConnect(void* context, MQTTAsync_successData* response) { Paho_client* cl=(Paho_client*)context; cl->connected_=true; } void Paho_client::onSendFailure(void* context, MQTTAsync_failureData* response) { MQTTAsync client = (MQTTAsync)context; MQTTAsync_disconnectOptions opts = MQTTAsync_disconnectOptions_initializer; int rc; printf("Message send failed token %d error code %d\n", response->token, response->code); opts.onSuccess = onDisconnect; opts.onFailure = onDisconnectFailure; opts.context = client; if ((rc = MQTTAsync_disconnect(client, &opts)) != MQTTASYNC_SUCCESS) { printf("Failed to start disconnect, return code %d\n", rc); } } void Paho_client::onSend(void* context, MQTTAsync_successData* response) { // This gets called when a message is acknowledged successfully. } void Paho_client::onDisconnectFailure(void* context, MQTTAsync_failureData* response) { printf("Disconnect failed\n"); } void Paho_client::onDisconnect(void* context, MQTTAsync_successData* response) { printf("Successful disconnection\n"); }