paho_client.cpp 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. //
  2. // Created by zx on 23-2-22.
  3. //
  4. #include "paho_client.h"
  5. Paho_client::Paho_client(std::string clientid){
  6. clientid_=clientid;
  7. context_= nullptr;
  8. }
  9. Paho_client::~Paho_client(){}
  10. void Paho_client::set_maxbuf(int size)
  11. {
  12. create_opts_.maxBufferedMessages=size+256;
  13. }
  14. bool Paho_client::connect(std::string address,int port){
  15. char connstr[255]={0};
  16. sprintf(connstr,"mqtt://%s:%d",address.c_str(),port);
  17. int rc;
  18. if ((rc=MQTTAsync_createWithOptions(&client_,connstr,clientid_.c_str(),MQTTCLIENT_PERSISTENCE_NONE,NULL,&create_opts_)
  19. != MQTTASYNC_SUCCESS))
  20. {
  21. printf("Failed to create client object, return code %d\n", rc);
  22. return false;
  23. }
  24. if ((rc = MQTTAsync_setCallbacks(client_, this, connlost, messageArrived, NULL)) != MQTTASYNC_SUCCESS)
  25. {
  26. printf("Failed to set callback, return code %d\n", rc);
  27. return false;
  28. }
  29. conn_opts_.keepAliveInterval = 20;
  30. conn_opts_.cleansession = 1;
  31. conn_opts_.onSuccess = onConnect;
  32. conn_opts_.onFailure = onConnectFailure;
  33. conn_opts_.context = this;
  34. if ((rc = MQTTAsync_connect(client_, &conn_opts_)) == MQTTASYNC_SUCCESS)
  35. {
  36. printf("Connected success \n");
  37. }
  38. address_=address;
  39. port_=port;
  40. return true;
  41. }
  42. bool Paho_client::disconnect(){
  43. return true;
  44. }
  45. void Paho_client::publish(const std::string& topic,int QOS,const MqttMsg& msg){
  46. if(connected_==false)
  47. {
  48. printf(" pls connect before publish\n");
  49. return;
  50. }
  51. pub_opts_.onSuccess = onSend;
  52. pub_opts_.onFailure = onSendFailure;
  53. pub_opts_.context = client_;
  54. pubmsg_.payload = msg.data();
  55. pubmsg_.payloadlen = msg.length();
  56. pubmsg_.qos = QOS;
  57. pubmsg_.retained = 0;
  58. int rc=-1;
  59. if ((rc = MQTTAsync_sendMessage(client_, topic.c_str(), &pubmsg_, &pub_opts_)) != MQTTASYNC_SUCCESS)
  60. {
  61. printf("Failed to start sendMessage, return code %d\n", rc);
  62. }
  63. }
  64. void Paho_client::subcribe(std::string topic,int QOS,ArrivedCallback callback,void* context){
  65. arrivedCallback_=callback;
  66. context_=context;
  67. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  68. printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n"
  69. "\n", topic.c_str(), clientid_.c_str(), QOS);
  70. opts.onSuccess = onSubscribe;
  71. opts.onFailure = onSubscribeFailure;
  72. opts.context = this;
  73. int rc=-1;
  74. if ((rc = MQTTAsync_subscribe(client_, topic.c_str(), QOS, &opts)) != MQTTASYNC_SUCCESS)
  75. {
  76. printf("Failed to start subscribe, return code %d\n", rc);
  77. return;
  78. }
  79. }
  80. int Paho_client::messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
  81. {
  82. Paho_client* cl=(Paho_client*)context;
  83. MqttMsg msg((char*)message->payload,message->payloadlen);
  84. if(cl->arrivedCallback_!= nullptr)
  85. cl->arrivedCallback_(topicName,message->qos,msg,cl->context_);
  86. MQTTAsync_freeMessage(&message);
  87. MQTTAsync_free(topicName);
  88. }
  89. void Paho_client::connlost(void *context, char *cause)
  90. {
  91. Paho_client* cl=(Paho_client*)context;
  92. MQTTAsync client = cl->client_;
  93. MQTTAsync_connectOptions conn_opts = cl->conn_opts_;
  94. int rc;
  95. printf("\nConnection lost\n");
  96. printf(" cause: %s\n", cause);
  97. printf("Reconnecting\n");
  98. conn_opts.keepAliveInterval = 20;
  99. conn_opts.cleansession = 1;
  100. cl->connected_=false;
  101. if ((rc = MQTTAsync_connect(client, &conn_opts)) == MQTTASYNC_SUCCESS)
  102. {
  103. printf(" Reconnected success\n");
  104. cl->connected_=true;
  105. }
  106. }
  107. void Paho_client::onConnectFailure(void* context, MQTTAsync_failureData* response)
  108. {
  109. Paho_client* cl=(Paho_client*)context;
  110. printf("clientid:%s,Connect failed, rc %d\n", cl->clientid_.c_str(),response ? response->code : 0);
  111. int rc=-1;
  112. if ((rc = MQTTAsync_connect(cl->client_, &(cl->conn_opts_))) == MQTTASYNC_SUCCESS)
  113. {
  114. printf(" Reconnected success\n");
  115. }
  116. }
  117. void Paho_client::onSubscribe(void* context, MQTTAsync_successData* response)
  118. {
  119. Paho_client* cl=(Paho_client*)context;
  120. printf("clientid:%s Subscribe succeeded\n",cl->clientid_.c_str());
  121. }
  122. void Paho_client::onSubscribeFailure(void* context, MQTTAsync_failureData* response)
  123. {
  124. Paho_client* cl=(Paho_client*)context;
  125. printf("clientid:%s Subscribe failed, rc %d\n",cl->clientid_.c_str(), response->code);
  126. }
  127. void Paho_client::onConnect(void* context, MQTTAsync_successData* response)
  128. {
  129. Paho_client* cl=(Paho_client*)context;
  130. cl->connected_=true;
  131. }
  132. void Paho_client::onSendFailure(void* context, MQTTAsync_failureData* response)
  133. {
  134. MQTTAsync client = (MQTTAsync)context;
  135. MQTTAsync_disconnectOptions opts = MQTTAsync_disconnectOptions_initializer;
  136. int rc;
  137. printf("Message send failed token %d error code %d\n", response->token, response->code);
  138. opts.onSuccess = onDisconnect;
  139. opts.onFailure = onDisconnectFailure;
  140. opts.context = client;
  141. if ((rc = MQTTAsync_disconnect(client, &opts)) != MQTTASYNC_SUCCESS)
  142. {
  143. printf("Failed to start disconnect, return code %d\n", rc);
  144. }
  145. }
  146. void Paho_client::onSend(void* context, MQTTAsync_successData* response)
  147. {
  148. // This gets called when a message is acknowledged successfully.
  149. }
  150. void Paho_client::onDisconnectFailure(void* context, MQTTAsync_failureData* response)
  151. {
  152. printf("Disconnect failed\n");
  153. }
  154. void Paho_client::onDisconnect(void* context, MQTTAsync_successData* response)
  155. {
  156. printf("Successful disconnection\n");
  157. }