123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192 |
- //
- // Created by zx on 23-2-22.
- //
- #include "paho_client.h"
- Paho_client::Paho_client(std::string clientid){
- clientid_=clientid;
- context_= nullptr;
- connected_=false;
- }
- Paho_client::~Paho_client(){}
- void Paho_client::set_maxbuf(int size)
- {
- create_opts_.maxBufferedMessages=size+256;
- }
- bool Paho_client::connect(std::string address,int port){
- char connstr[255]={0};
- sprintf(connstr,"mqtt://%s:%d",address.c_str(),port);
- int rc;
- if ((rc=MQTTAsync_createWithOptions(&client_,connstr,clientid_.c_str(),MQTTCLIENT_PERSISTENCE_NONE,NULL,&create_opts_)
- != MQTTASYNC_SUCCESS))
- {
- printf("Failed to create client object, return code %d\n", rc);
- return false;
- }
- if ((rc = MQTTAsync_setCallbacks(client_, this, connlost, messageArrived, NULL)) != MQTTASYNC_SUCCESS)
- {
- printf("Failed to set callback, return code %d\n", rc);
- return false;
- }
- conn_opts_.keepAliveInterval = 500;
- conn_opts_.cleansession = 1;
- conn_opts_.onSuccess = onConnect;
- conn_opts_.onFailure = onConnectFailure;
- conn_opts_.context = this;
- if (MQTTAsync_connect(client_, &conn_opts_) == MQTTASYNC_SUCCESS)
- {
- printf("Connected success \n");
- }
- address_=address;
- port_=port;
- return true;
- }
- bool Paho_client::disconnect(){
- return true;
- }
- void Paho_client::publish(const std::string& topic,int QOS,const MqttMsg& msg){
- if(connected_==false)
- {
- printf(" pls connect before publish\n");
- return;
- }
- pub_opts_.onSuccess = onSend;
- pub_opts_.onFailure = onSendFailure;
- pub_opts_.context = client_;
- pubmsg_.payload = msg.data();
- pubmsg_.payloadlen = msg.length();
- pubmsg_.qos = QOS;
- pubmsg_.retained = 0;
- int rc=-1;
- if ((rc = MQTTAsync_sendMessage(client_, topic.c_str(), &pubmsg_, &pub_opts_)) != MQTTASYNC_SUCCESS)
- {
- printf("Failed to start sendMessage, return code %d\n", rc);
- }
- }
- void Paho_client::subcribe(std::string topic,int QOS,ArrivedCallback callback,void* context){
- arrivedCallback_=callback;
- context_=context;
- MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
- printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n"
- "\n", topic.c_str(), clientid_.c_str(), QOS);
- opts.onSuccess = onSubscribe;
- opts.onFailure = onSubscribeFailure;
- opts.context = this;
- int rc=-1;
- if ((rc = MQTTAsync_subscribe(client_, topic.c_str(), QOS, &opts)) != MQTTASYNC_SUCCESS)
- {
- printf("Failed to start subscribe, return code %d\n", rc);
- return;
- }
- }
- int Paho_client::messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
- {
- Paho_client* cl=(Paho_client*)context;
- MqttMsg msg((char*)message->payload,message->payloadlen);
- if(cl->arrivedCallback_!= nullptr)
- cl->arrivedCallback_(topicName,message->qos,msg,cl->context_);
- MQTTAsync_freeMessage(&message);
- MQTTAsync_free(topicName);
- return 1;
- }
- void Paho_client::connlost(void *context, char *cause)
- {
- Paho_client* cl=(Paho_client*)context;
- MQTTAsync client = cl->client_;
- MQTTAsync_connectOptions conn_opts = cl->conn_opts_;
- int rc;
- printf("\nConnection lost\n");
- printf(" cause: %s\n", cause);
- printf("Reconnecting\n");
- conn_opts.keepAliveInterval = 20;
- conn_opts.cleansession = 1;
- conn_opts.onSuccess = onConnect;
- conn_opts.onFailure = onConnectFailure;
- conn_opts.context = context;
- cl->connected_=false;
- if ((rc = MQTTAsync_connect(client, &conn_opts)) == MQTTASYNC_SUCCESS)
- {
- printf(" Reconnected success\n");
- cl->connected_=true;
- }
- }
- void Paho_client::onConnectFailure(void* context, MQTTAsync_failureData* response)
- {
- Paho_client* cl=(Paho_client*)context;
- printf("clientid:%s,Connect failed, rc %d\n", cl->clientid_.c_str(),response ? response->code : 0);
- int rc=-1;
- if ((rc = MQTTAsync_connect(cl->client_, &(cl->conn_opts_))) == MQTTASYNC_SUCCESS)
- {
- printf(" Reconnected success\n");
- }
- }
- void Paho_client::onSubscribe(void* context, MQTTAsync_successData* response)
- {
- Paho_client* cl=(Paho_client*)context;
- printf("clientid:%s Subscribe succeeded\n",cl->clientid_.c_str());
- }
- void Paho_client::onSubscribeFailure(void* context, MQTTAsync_failureData* response)
- {
- Paho_client* cl=(Paho_client*)context;
- printf("clientid:%s Subscribe failed, rc %d\n",cl->clientid_.c_str(), response->code);
- }
- void Paho_client::onConnect(void* context, MQTTAsync_successData* response)
- {
- Paho_client* cl=(Paho_client*)context;
- cl->connected_=true;
- }
- void Paho_client::onSendFailure(void* context, MQTTAsync_failureData* response)
- {
- MQTTAsync client = (MQTTAsync)context;
- MQTTAsync_disconnectOptions opts = MQTTAsync_disconnectOptions_initializer;
- int rc;
- printf("Message send failed token %d error code %d\n", response->token, response->code);
- opts.onSuccess = onDisconnect;
- opts.onFailure = onDisconnectFailure;
- opts.context = client;
- if ((rc = MQTTAsync_disconnect(client, &opts)) != MQTTASYNC_SUCCESS)
- {
- printf("Failed to start disconnect, return code %d\n", rc);
- }
- }
- void Paho_client::onSend(void* context, MQTTAsync_successData* response)
- {
- // This gets called when a message is acknowledged successfully.
- }
- void Paho_client::onDisconnectFailure(void* context, MQTTAsync_failureData* response)
- {
- printf("Disconnect failed\n");
- }
- void Paho_client::onDisconnect(void* context, MQTTAsync_successData* response)
- {
- printf("Successful disconnection\n");
- }
|