terminator_emqx.cpp 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. //
  2. // Created by zx on 23-4-11.
  3. //
  4. #include "terminator_emqx.h"
  5. #include "unistd.h"
  6. Terminator_emqx::~Terminator_emqx()
  7. {
  8. if(client_!= nullptr){
  9. client_->disconnect();
  10. delete client_;
  11. client_= nullptr;
  12. }
  13. }
  14. bool Terminator_emqx::Connect(std::string ip,int port)
  15. {
  16. if(client_!= nullptr)
  17. {
  18. client_->disconnect();
  19. delete client_;
  20. }
  21. client_=new Paho_client(nodeId_);
  22. connected_= client_->connect(ip,port);
  23. return connected_;
  24. }
  25. void Terminator_emqx::Publish(std::string topic,const MqttMsg& msg)
  26. {
  27. if(client_) {
  28. mtx_.lock();
  29. client_->publish(topic, 1, msg);
  30. mtx_.unlock();
  31. }
  32. else {
  33. printf("publish failed : emqx client maybe disconnected...\n");
  34. }
  35. }
  36. bool Terminator_emqx::AddCallback(std::string topic,Callback callback,void* context)
  37. {
  38. CallBackInfo info;
  39. info.func=callback;
  40. info.contextx=context;
  41. sub_topic_callbacks_[topic]=info;
  42. if(connected_)
  43. {
  44. while(!client_->isconnected()) usleep(1000);
  45. client_->subcribe(topic,1,MessageArrivedCallback,this);
  46. return true;
  47. }
  48. else
  49. {
  50. printf(" emqx has not connected ,subcribe topic:%s failed ...!!\n",topic.c_str());
  51. return false;
  52. }
  53. }
  54. Terminator_emqx::Terminator_emqx(std::string nodeId)
  55. :client_(nullptr),nodeId_(nodeId),connected_(false)
  56. {
  57. }
  58. void Terminator_emqx::MessageArrivedCallback(std::string topic,int QOS,MqttMsg& msg,void* context)
  59. {
  60. Terminator_emqx* terminator=(Terminator_emqx*)context;
  61. if(terminator->sub_topic_callbacks_.find(topic)!=terminator->sub_topic_callbacks_.end()) {
  62. CallBackInfo info = terminator->sub_topic_callbacks_[topic];
  63. if (info.func)
  64. info.func(msg, info.contextx);
  65. }
  66. }