123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138 |
- import json
- import threading
- import time
- from paho.mqtt import client as mqtt_communication
- # 状态消息定时器
- class TimeStatu:
- def __init__(self, statu=None, timeout=3):
- self.statu = statu
- self.time = time.time()
- self.timeout_ms = timeout
- def timeout(self):
- tm=time.time()
- return tm-self.time>self.timeout_ms or self.statu==None
- # mqtt客户端
- class matt_client(threading.Thread):
- # ip,端口,用户名(可选),密码(可选)
- def __init__(self, mqtt_ip, mqtt_port = 1883, mqtt_username=None, mqtt_passwd=None):
- threading.Thread.__init__(self)
- self.mqtt_ip = mqtt_ip
- self.mqtt_port = mqtt_port
- self.mqtt_username = mqtt_username
- self.mqtt_passwd = mqtt_passwd
- self.client = None
- self.recv_topic_list = []
- self.message_dict = {}
- self.message_callback_dict = {}
- # 连接mqtt默认回调函数
- self.connect_callback = {}
- # 绑定话题回调函数 若要绑定必须在run前调用
- def bind_topic_callback(self, topic, callback):
- self.message_callback_dict[topic] = callback
- # 初始化 客户端id,接收状态列表 ps: ["topic1","topic2"] ,连接回调默认为mqtt_connect_callback
- def init(self,client_id, recv_topic_list,connect_callback = None):
- # 默认连接回调
- def on_connect(client, userdata, flags, rc):
- if rc == 0:
- print("Connected to MQTT Broker!")
- else:
- print("Failed to connect mqtt, return code %d\n", rc)
- self.recv_topic_list = recv_topic_list
- self.client = mqtt_communication.Client(client_id)
- # 是否自定义连接回调
- if connect_callback is not None:
- self.client.on_connect = connect_callback
- else:
- self.client.on_connect = on_connect
- # 是否有密码
- if self.mqtt_username is not None:
- self.client.username_pw_set(self.mqtt_username,self.mqtt_passwd)
- # 连接
- self.client.connect(self.mqtt_ip, self.mqtt_port)
- if self.recv_topic_list is not None:
- for topic in self.recv_topic_list:
- self.message_dict[topic] = TimeStatu(None, 0.1)
- # 发布消息
- def publish(self,topic,message):
- if self.client is None:
- print("ERROR: client is not initialized!")
- else:
- return self.client.publish(topic, message)
- # 获取状态消息
- def _get_message_dict(self):
- return self.message_dict
- def __getitem__(self, topic):
- return self.message_dict[topic]
- def __setitem__(self, topic, value):
- self.message_dict[topic]=value
- def run(self):
- # 默认消息回调
- def on_message(client, userdata, msg):
- print("on_message")
- self.message_dict[msg.topic] = TimeStatu(msg.payload.decode(), 3)
- # 绑定状态消息
- if self.recv_topic_list is not None:
- for topic in self.recv_topic_list:
- print("bind topic:"+topic)
- self.client.subscribe(topic)
- self.client.on_message = on_message
- # 如果自定义了回调函数则调用自定义的 未自定义调用默认的
- if topic in self.message_callback_dict.keys():
- self.client.message_callback_add(topic,self.message_callback_dict[topic])
- self.message_dict[topic] = TimeStatu(None, 0.1)
- self.client.loop_forever()
- def close(self):
- self.client.disconnect()
- topic_all = ["wk1","wk2","wk3"]
- def callback1(client, userdata, msg):
- print(f"callback1 Received `{msg.payload.decode()}` from `{msg.topic}` topic")
- def callback2(client, userdata, msg):
- print(f"callback2 Received `{msg.payload.decode()}` from `{msg.topic}` topic")
- if __name__ == '__main__':
- # 初始化
- client = matt_client("192.168.2.40")
- # for topic in topic_all:
- client.init("WK",topic_all)
- # client.publish("test","hello")
- for topic in topic_all:
- if topic.find("1") >= 0:
- client.bind_topic_callback(topic, callback1)
- if topic.find("2") >= 0:
- client.bind_topic_callback(topic, callback2)
- # 启动
- client.start()
- client.join()
- # time.sleep(1)
- # close = False
- # while(close is False):
- # for topic in topic_all:
- # if client[topic].statu is not None:
- # str = json.loads(client[topic].statu)
- # if str['msg'] == 'close':
- # client.close()
- # close = True
- # break
- # print(str)
- # print(client[topic].timeout())
- # time.sleep(1)
|