12345678910111213141516171819202122232425262728293031323334353637383940414243 |
- from paho.mqtt import client as mqtt_client
- from threading import Lock
- class MqttAsync(object):
- def __init__(self):
- self.lock_ = Lock()
- self.connected = False
- def connect(self, client_id, ip, port, user, password):
- self.client_id = client_id
- def on_connect(client, userdata, flags, rc):
- if rc == 0:
- print("Connected to MQTT Broker!")
- else:
- print("Failed to connect, return code %d\n", rc)
- self.client = mqtt_client.Client(self.client_id)
- self.client.username_pw_set(user, password)
- self.client.on_connect = on_connect
- self.client.connect(ip, port)
- self.connected = True
- def publish(self, topic, msg):
- if self.connected == False:
- print("mqtt is not connected Failed to send message to topic {topic}")
- with self.lock_:
- result = self.client.publish(topic, msg, qos=1)
- status = result[0]
- if status == 0:
- pass
- else:
- print(f"Failed to send message to topic {topic}")
- def subscribe(self, topic, callback):
- with self.lock_:
- self.client.subscribe(topic)
- self.client.on_message = callback
- def loop_start(self):
- with self.lock_:
- self.client.loop_start()
|