// // Created by zx on 23-4-11. // #include "terminator_emqx.h" #include "unistd.h" Terminator_emqx::~Terminator_emqx() { if(client_!= nullptr){ client_->disconnect(); delete client_; client_= nullptr; } } bool Terminator_emqx::Connect(std::string ip,int port) { if(client_!= nullptr) { client_->disconnect(); delete client_; } client_=new Paho_client(nodeId_); connected_= client_->connect(ip,port); return connected_; } void Terminator_emqx::Publish(std::string topic,const MqttMsg& msg) { if(client_) { mtx_.lock(); client_->publish(topic, 1, msg); mtx_.unlock(); } else { printf("publish failed : emqx client maybe disconnected...\n"); } } bool Terminator_emqx::AddCallback(std::string topic,Callback callback,void* context) { CallBackInfo info; info.func=callback; info.contextx=context; sub_topic_callbacks_[topic]=info; if(connected_) { while(!client_->isconnected()) usleep(1000); client_->subcribe(topic,1,MessageArrivedCallback,this); return true; } else { printf(" emqx has not connected ,subcribe topic:%s failed ...!!\n",topic.c_str()); return false; } } Terminator_emqx::Terminator_emqx(std::string nodeId) :client_(nullptr),nodeId_(nodeId),connected_(false) { } void Terminator_emqx::MessageArrivedCallback(std::string topic,int QOS,MqttMsg& msg,void* context) { Terminator_emqx* terminator=(Terminator_emqx*)context; if(terminator->sub_topic_callbacks_.find(topic)!=terminator->sub_topic_callbacks_.end()) { CallBackInfo info = terminator->sub_topic_callbacks_[topic]; if (info.func) info.func(msg, info.contextx); } }