mqtt_async.py 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243
  1. from paho.mqtt import client as mqtt_client
  2. from threading import Lock
  3. class MqttAsync(object):
  4. def __init__(self):
  5. self.lock_ = Lock()
  6. self.connected = False
  7. def connect(self, client_id, ip, port, user, password):
  8. self.client_id = client_id
  9. def on_connect(client, userdata, flags, rc):
  10. if rc == 0:
  11. print("Connected to MQTT Broker!")
  12. else:
  13. print("Failed to connect, return code %d\n", rc)
  14. self.client = mqtt_client.Client(self.client_id)
  15. self.client.username_pw_set(user, password)
  16. self.client.on_connect = on_connect
  17. self.client.connect(ip, port)
  18. self.connected = True
  19. def publish(self, topic, msg):
  20. if self.connected == False:
  21. print("mqtt is not connected Failed to send message to topic {topic}")
  22. with self.lock_:
  23. result = self.client.publish(topic, msg, qos=1)
  24. status = result[0]
  25. if status == 0:
  26. pass
  27. else:
  28. print(f"Failed to send message to topic {topic}")
  29. def subscribe(self, topic, callback):
  30. with self.lock_:
  31. self.client.subscribe(topic)
  32. self.client.on_message = callback
  33. def loop_start(self):
  34. with self.lock_:
  35. self.client.loop_start()