mqtt_client.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. import json
  2. import threading
  3. import time
  4. from paho.mqtt import client as mqtt_communication
  5. # 状态消息定时器
  6. class TimeStatu:
  7. def __init__(self, statu=None, timeout=3):
  8. self.statu = statu
  9. self.time = time.time()
  10. self.timeout_ms = timeout
  11. def timeout(self):
  12. tm=time.time()
  13. return tm-self.time>self.timeout_ms or self.statu==None
  14. # mqtt客户端
  15. class matt_client(threading.Thread):
  16. # ip,端口,用户名(可选),密码(可选)
  17. def __init__(self, mqtt_ip, mqtt_port = 1883, mqtt_username=None, mqtt_passwd=None):
  18. threading.Thread.__init__(self)
  19. self.mqtt_ip = mqtt_ip
  20. self.mqtt_port = mqtt_port
  21. self.mqtt_username = mqtt_username
  22. self.mqtt_passwd = mqtt_passwd
  23. self.client = None
  24. self.recv_topic_list = []
  25. self.message_dict = {}
  26. self.message_callback_dict = {}
  27. # 连接mqtt默认回调函数
  28. self.connect_callback = {}
  29. # 绑定话题回调函数 若要绑定必须在run前调用
  30. def bind_topic_callback(self, topic, callback):
  31. self.message_callback_dict[topic] = callback
  32. # 初始化 客户端id,接收状态列表 ps: ["topic1","topic2"] ,连接回调默认为mqtt_connect_callback
  33. def init(self,client_id, recv_topic_list,connect_callback = None):
  34. # 默认连接回调
  35. def on_connect(client, userdata, flags, rc):
  36. if rc == 0:
  37. print("Connected to MQTT Broker!")
  38. else:
  39. print("Failed to connect mqtt, return code %d\n", rc)
  40. self.recv_topic_list = recv_topic_list
  41. self.client = mqtt_communication.Client(client_id)
  42. # 是否自定义连接回调
  43. if connect_callback is not None:
  44. self.client.on_connect = connect_callback
  45. else:
  46. self.client.on_connect = on_connect
  47. # 是否有密码
  48. if self.mqtt_username is not None:
  49. self.client.username_pw_set(self.mqtt_username,self.mqtt_passwd)
  50. # 连接
  51. self.client.connect(self.mqtt_ip, self.mqtt_port)
  52. if self.recv_topic_list is not None:
  53. for topic in self.recv_topic_list:
  54. self.message_dict[topic] = TimeStatu(None, 0.1)
  55. # 发布消息
  56. def publish(self,topic,message):
  57. if self.client is None:
  58. print("ERROR: client is not initialized!")
  59. else:
  60. return self.client.publish(topic, message)
  61. # 获取状态消息
  62. def _get_message_dict(self):
  63. return self.message_dict
  64. def __getitem__(self, topic):
  65. return self.message_dict[topic]
  66. def __setitem__(self, topic, value):
  67. self.message_dict[topic]=value
  68. def run(self):
  69. # 默认消息回调
  70. def on_message(client, userdata, msg):
  71. print("on_message")
  72. self.message_dict[msg.topic] = TimeStatu(msg.payload.decode(), 3)
  73. # 绑定状态消息
  74. if self.recv_topic_list is not None:
  75. for topic in self.recv_topic_list:
  76. print("bind topic:"+topic)
  77. self.client.subscribe(topic)
  78. self.client.on_message = on_message
  79. # 如果自定义了回调函数则调用自定义的 未自定义调用默认的
  80. if topic in self.message_callback_dict.keys():
  81. self.client.message_callback_add(topic,self.message_callback_dict[topic])
  82. self.message_dict[topic] = TimeStatu(None, 0.1)
  83. self.client.loop_forever()
  84. def close(self):
  85. self.client.disconnect()
  86. topic_all = ["wk1","wk2","wk3"]
  87. def callback1(client, userdata, msg):
  88. print(f"callback1 Received `{msg.payload.decode()}` from `{msg.topic}` topic")
  89. def callback2(client, userdata, msg):
  90. print(f"callback2 Received `{msg.payload.decode()}` from `{msg.topic}` topic")
  91. if __name__ == '__main__':
  92. # 初始化
  93. client = matt_client("192.168.2.40")
  94. # for topic in topic_all:
  95. client.init("WK",topic_all)
  96. # client.publish("test","hello")
  97. for topic in topic_all:
  98. if topic.find("1") >= 0:
  99. client.bind_topic_callback(topic, callback1)
  100. if topic.find("2") >= 0:
  101. client.bind_topic_callback(topic, callback2)
  102. # 启动
  103. client.start()
  104. client.join()
  105. # time.sleep(1)
  106. # close = False
  107. # while(close is False):
  108. # for topic in topic_all:
  109. # if client[topic].statu is not None:
  110. # str = json.loads(client[topic].statu)
  111. # if str['msg'] == 'close':
  112. # client.close()
  113. # close = True
  114. # break
  115. # print(str)
  116. # print(client[topic].timeout())
  117. # time.sleep(1)