paho_client.cpp 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. //
  2. // Created by zx on 23-2-22.
  3. //
  4. #include "paho_client.h"
  5. Paho_client::Paho_client(std::string clientid){
  6. clientid_=clientid;
  7. context_= nullptr;
  8. connected_=false;
  9. }
  10. Paho_client::~Paho_client(){}
  11. void Paho_client::set_maxbuf(int size)
  12. {
  13. create_opts_.maxBufferedMessages=size+256;
  14. }
  15. bool Paho_client::connect(std::string address,int port){
  16. char connstr[255]={0};
  17. sprintf(connstr,"mqtt://%s:%d",address.c_str(),port);
  18. int rc;
  19. if ((rc=MQTTAsync_createWithOptions(&client_,connstr,clientid_.c_str(),MQTTCLIENT_PERSISTENCE_NONE,NULL,&create_opts_)
  20. != MQTTASYNC_SUCCESS))
  21. {
  22. printf("Failed to create client object, return code %d\n", rc);
  23. return false;
  24. }
  25. if ((rc = MQTTAsync_setCallbacks(client_, this, connlost, messageArrived, NULL)) != MQTTASYNC_SUCCESS)
  26. {
  27. printf("Failed to set callback, return code %d\n", rc);
  28. return false;
  29. }
  30. conn_opts_.keepAliveInterval = 500;
  31. conn_opts_.cleansession = 1;
  32. conn_opts_.onSuccess = onConnect;
  33. conn_opts_.onFailure = onConnectFailure;
  34. conn_opts_.context = this;
  35. if (MQTTAsync_connect(client_, &conn_opts_) == MQTTASYNC_SUCCESS)
  36. {
  37. printf("Connected success \n");
  38. }
  39. address_=address;
  40. port_=port;
  41. return true;
  42. }
  43. bool Paho_client::disconnect(){
  44. return true;
  45. }
  46. void Paho_client::publish(const std::string& topic,int QOS,const MqttMsg& msg){
  47. if(connected_==false)
  48. {
  49. printf(" pls connect before publish\n");
  50. return;
  51. }
  52. pub_opts_.onSuccess = onSend;
  53. pub_opts_.onFailure = onSendFailure;
  54. pub_opts_.context = client_;
  55. pubmsg_.payload = msg.data();
  56. pubmsg_.payloadlen = msg.length();
  57. pubmsg_.qos = QOS;
  58. pubmsg_.retained = 0;
  59. int rc=-1;
  60. if ((rc = MQTTAsync_sendMessage(client_, topic.c_str(), &pubmsg_, &pub_opts_)) != MQTTASYNC_SUCCESS)
  61. {
  62. printf("Failed to start sendMessage, return code %d\n", rc);
  63. }
  64. }
  65. void Paho_client::subcribe(std::string topic,int QOS,ArrivedCallback callback,void* context){
  66. arrivedCallback_=callback;
  67. context_=context;
  68. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  69. printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n"
  70. "\n", topic.c_str(), clientid_.c_str(), QOS);
  71. opts.onSuccess = onSubscribe;
  72. opts.onFailure = onSubscribeFailure;
  73. opts.context = this;
  74. int rc=-1;
  75. if ((rc = MQTTAsync_subscribe(client_, topic.c_str(), QOS, &opts)) != MQTTASYNC_SUCCESS)
  76. {
  77. printf("Failed to start subscribe, return code %d\n", rc);
  78. return;
  79. }
  80. }
  81. int Paho_client::messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
  82. {
  83. Paho_client* cl=(Paho_client*)context;
  84. MqttMsg msg((char*)message->payload,message->payloadlen);
  85. if(cl->arrivedCallback_!= nullptr)
  86. cl->arrivedCallback_(topicName,message->qos,msg,cl->context_);
  87. MQTTAsync_freeMessage(&message);
  88. MQTTAsync_free(topicName);
  89. return 1;
  90. }
  91. void Paho_client::connlost(void *context, char *cause)
  92. {
  93. Paho_client* cl=(Paho_client*)context;
  94. MQTTAsync client = cl->client_;
  95. MQTTAsync_connectOptions conn_opts = cl->conn_opts_;
  96. int rc;
  97. printf("\nConnection lost\n");
  98. printf(" cause: %s\n", cause);
  99. printf("Reconnecting\n");
  100. conn_opts.keepAliveInterval = 20;
  101. conn_opts.cleansession = 1;
  102. conn_opts.onSuccess = onConnect;
  103. conn_opts.onFailure = onConnectFailure;
  104. conn_opts.context = context;
  105. cl->connected_=false;
  106. if ((rc = MQTTAsync_connect(client, &conn_opts)) == MQTTASYNC_SUCCESS)
  107. {
  108. printf(" Reconnected success\n");
  109. cl->connected_=true;
  110. }
  111. }
  112. void Paho_client::onConnectFailure(void* context, MQTTAsync_failureData* response)
  113. {
  114. Paho_client* cl=(Paho_client*)context;
  115. printf("clientid:%s,Connect failed, rc %d\n", cl->clientid_.c_str(),response ? response->code : 0);
  116. int rc=-1;
  117. if ((rc = MQTTAsync_connect(cl->client_, &(cl->conn_opts_))) == MQTTASYNC_SUCCESS)
  118. {
  119. printf(" Reconnected success\n");
  120. }
  121. }
  122. void Paho_client::onSubscribe(void* context, MQTTAsync_successData* response)
  123. {
  124. Paho_client* cl=(Paho_client*)context;
  125. printf("clientid:%s Subscribe succeeded\n",cl->clientid_.c_str());
  126. }
  127. void Paho_client::onSubscribeFailure(void* context, MQTTAsync_failureData* response)
  128. {
  129. Paho_client* cl=(Paho_client*)context;
  130. printf("clientid:%s Subscribe failed, rc %d\n",cl->clientid_.c_str(), response->code);
  131. }
  132. void Paho_client::onConnect(void* context, MQTTAsync_successData* response)
  133. {
  134. Paho_client* cl=(Paho_client*)context;
  135. cl->connected_=true;
  136. }
  137. void Paho_client::onSendFailure(void* context, MQTTAsync_failureData* response)
  138. {
  139. MQTTAsync client = (MQTTAsync)context;
  140. MQTTAsync_disconnectOptions opts = MQTTAsync_disconnectOptions_initializer;
  141. int rc;
  142. printf("Message send failed token %d error code %d\n", response->token, response->code);
  143. opts.onSuccess = onDisconnect;
  144. opts.onFailure = onDisconnectFailure;
  145. opts.context = client;
  146. if ((rc = MQTTAsync_disconnect(client, &opts)) != MQTTASYNC_SUCCESS)
  147. {
  148. printf("Failed to start disconnect, return code %d\n", rc);
  149. }
  150. }
  151. void Paho_client::onSend(void* context, MQTTAsync_successData* response)
  152. {
  153. // This gets called when a message is acknowledged successfully.
  154. }
  155. void Paho_client::onDisconnectFailure(void* context, MQTTAsync_failureData* response)
  156. {
  157. printf("Disconnect failed\n");
  158. }
  159. void Paho_client::onDisconnect(void* context, MQTTAsync_successData* response)
  160. {
  161. printf("Successful disconnection\n");
  162. }