from paho.mqtt import client as mqtt_client from threading import Lock class MqttAsync(object): def __init__(self): self.lock_ = Lock() self.connected = False def connect(self, client_id, ip, port, user, password): self.client_id = client_id def on_connect(client, userdata, flags, rc): if rc == 0: print("Connected to MQTT Broker!") else: print("Failed to connect, return code %d\n", rc) self.client = mqtt_client.Client(self.client_id) self.client.username_pw_set(user, password) self.client.on_connect = on_connect self.client.connect(ip, port) self.connected = True def publish(self, topic, msg): if self.connected == False: print("mqtt is not connected Failed to send message to topic {topic}") with self.lock_: result = self.client.publish(topic, msg, qos=1) status = result[0] if status == 0: pass else: print(f"Failed to send message to topic {topic}") def subscribe(self, topic, callback): with self.lock_: self.client.subscribe(topic) self.client.on_message = callback def loop_start(self): with self.lock_: self.client.loop_start()