import json import threading import time from paho.mqtt import client as mqtt_communication # 状态消息定时器 class TimeStatu: def __init__(self, statu=None, timeout=3): self.statu = statu self.time = time.time() self.timeout_ms = timeout def timeout(self): tm=time.time() return tm-self.time>self.timeout_ms or self.statu==None # mqtt客户端 class matt_client(threading.Thread): # ip,端口,用户名(可选),密码(可选) def __init__(self, mqtt_ip, mqtt_port = 1883, mqtt_username=None, mqtt_passwd=None): threading.Thread.__init__(self) self.mqtt_ip = mqtt_ip self.mqtt_port = mqtt_port self.mqtt_username = mqtt_username self.mqtt_passwd = mqtt_passwd self.client = None self.recv_topic_list = [] self.message_dict = {} self.message_callback_dict = {} # 连接mqtt默认回调函数 self.connect_callback = {} # 绑定话题回调函数 若要绑定必须在run前调用 def bind_topic_callback(self, topic, callback): self.message_callback_dict[topic] = callback # 初始化 客户端id,接收状态列表 ps: ["topic1","topic2"] ,连接回调默认为mqtt_connect_callback def init(self,client_id, recv_topic_list,connect_callback = None): # 默认连接回调 def on_connect(client, userdata, flags, rc): if rc == 0: print("Connected to MQTT Broker!") else: print("Failed to connect mqtt, return code %d\n", rc) self.recv_topic_list = recv_topic_list self.client = mqtt_communication.Client(client_id) # 是否自定义连接回调 if connect_callback is not None: self.client.on_connect = connect_callback else: self.client.on_connect = on_connect # 是否有密码 if self.mqtt_username is not None: self.client.username_pw_set(self.mqtt_username,self.mqtt_passwd) # 连接 self.client.connect(self.mqtt_ip, self.mqtt_port) if self.recv_topic_list is not None: for topic in self.recv_topic_list: self.message_dict[topic] = TimeStatu(None, 0.1) # 发布消息 def publish(self,topic,message): if self.client is None: print("ERROR: client is not initialized!") else: return self.client.publish(topic, message) # 获取状态消息 def _get_message_dict(self): return self.message_dict def __getitem__(self, topic): return self.message_dict[topic] def __setitem__(self, topic, value): self.message_dict[topic]=value def run(self): # 默认消息回调 def on_message(client, userdata, msg): print("on_message") self.message_dict[msg.topic] = TimeStatu(msg.payload.decode(), 3) # 绑定状态消息 if self.recv_topic_list is not None: for topic in self.recv_topic_list: print("bind topic:"+topic) self.client.subscribe(topic) self.client.on_message = on_message # 如果自定义了回调函数则调用自定义的 未自定义调用默认的 if topic in self.message_callback_dict.keys(): self.client.message_callback_add(topic,self.message_callback_dict[topic]) self.message_dict[topic] = TimeStatu(None, 0.1) self.client.loop_forever() def close(self): self.client.disconnect() topic_all = ["wk1","wk2","wk3"] def callback1(client, userdata, msg): print(f"callback1 Received `{msg.payload.decode()}` from `{msg.topic}` topic") def callback2(client, userdata, msg): print(f"callback2 Received `{msg.payload.decode()}` from `{msg.topic}` topic") if __name__ == '__main__': # 初始化 client = matt_client("192.168.2.40") # for topic in topic_all: client.init("WK",topic_all) # client.publish("test","hello") for topic in topic_all: if topic.find("1") >= 0: client.bind_topic_callback(topic, callback1) if topic.find("2") >= 0: client.bind_topic_callback(topic, callback2) # 启动 client.start() client.join() # time.sleep(1) # close = False # while(close is False): # for topic in topic_all: # if client[topic].statu is not None: # str = json.loads(client[topic].statu) # if str['msg'] == 'close': # client.close() # close = True # break # print(str) # print(client[topic].timeout()) # time.sleep(1)