terminator_emqx.cpp 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. //
  2. // Created by zx on 23-4-11.
  3. //
  4. #include "terminator_emqx.h"
  5. #include "unistd.h"
  6. Terminator_emqx::~Terminator_emqx()
  7. {
  8. if(client_!= nullptr){
  9. client_->disconnect();
  10. delete client_;
  11. client_= nullptr;
  12. }
  13. }
  14. bool Terminator_emqx::Connect(std::string ip,int port)
  15. {
  16. if(client_!= nullptr)
  17. {
  18. client_->disconnect();
  19. delete client_;
  20. }
  21. client_=new Paho_client(nodeId_);
  22. connected_= client_->connect(ip,port);
  23. return connected_;
  24. }
  25. void Terminator_emqx::Publish(std::string topic,const MqttMsg& msg)
  26. {
  27. if(client_)
  28. client_->publish(topic,1,msg);
  29. else
  30. printf("publish failed : emqx client maybe disconnected...\n");
  31. }
  32. bool Terminator_emqx::AddCallback(std::string topic,Callback callback,void* context)
  33. {
  34. CallBackInfo info;
  35. info.func=callback;
  36. info.contextx=context;
  37. sub_topic_callbacks_[topic]=info;
  38. if(connected_)
  39. {
  40. while(!client_->isconnected()) usleep(1000);
  41. client_->subcribe(topic,1,StatuArrivedCallback,this);
  42. return true;
  43. }
  44. else
  45. {
  46. printf(" emqx has not connected ,subcribe topic:%s failed ...!!\n",topic.c_str());
  47. return false;
  48. }
  49. }
  50. Terminator_emqx::Terminator_emqx(std::string nodeId)
  51. :client_(nullptr),nodeId_(nodeId),connected_(false)
  52. {
  53. }
  54. void Terminator_emqx::StatuArrivedCallback(std::string topic,int QOS,MqttMsg& msg,void* context)
  55. {
  56. Terminator_emqx* terminator=(Terminator_emqx*)context;
  57. if(terminator->sub_topic_callbacks_.find(topic)!=terminator->sub_topic_callbacks_.end()) {
  58. CallBackInfo info = terminator->sub_topic_callbacks_[topic];
  59. if (info.func)
  60. info.func(msg, info.contextx);
  61. }
  62. }