import threading import time import asyncio import aio_pika import queue class TimeStatu: def __init__(self, statu=None, timeout=3): self.statu = statu self.time = time.time() self.timeout_ms = timeout def timeout(self): tm = time.time() return tm - self.time > self.timeout_ms class RabbitAsyncCommunicator(threading.Thread): def __init__(self, host, port, user, password): threading.Thread.__init__(self) self._host = host self._port = port self._user = user self._password = password self._connection = None self._channel_consumer = None self._channel_send = None self._channel_statu = None self._consumer_callbacks = None self._recv_status = None self._queue_callbacks = {} self._publish_msg_queue = queue.Queue() self._status = {} self._statu_callbacks = {} self._closing = False def Init(self, consumer_callbacks, recv_status): self._consumer_callbacks = consumer_callbacks self._recv_status = recv_status if self._recv_status == None: return for ex_name, key in self._recv_status: self._status[ex_name + ":" + key] = TimeStatu(None, 0.1) def publish(self, ex_name, key, msg): self._publish_msg_queue.put([ex_name, key, msg]) def bind_statu_callback(self, ex_name, key, callback): self._statu_callbacks[ex_name + ":" + key] = callback def close(self): self._closing = True async def init(self): connection_string = "amqp://%s:%s@%s/" % (self._user, self._password, self._host) self._connection = await aio_pika.connect_robust(connection_string) self._channel_consumer = await self._connection.channel() self._channel_send = await self._connection.channel() self._channel_statu = await self._connection.channel() # Will take no more than 10 messages in advance await self._channel_consumer.set_qos(prefetch_count=1) if self._consumer_callbacks == None: return for queue_name, callback in self._consumer_callbacks: queue = await self._channel_consumer.get_queue(queue_name) self._queue_callbacks[queue] = callback def statu_callback(self, ex, key, msg): id = ex + ":" + key self._status[id] = TimeStatu(msg) callback = self._statu_callbacks.get(id) if not callback == None: callback(self._status[id]) def __getitem__(self, key): return self._status[key] def __setitem__(self, key, value): self._status[key] = value async def recv(self, queue, callback): async with queue.iterator() as queue_iter: async for message in queue_iter: if not callback == None: callback(message.body.decode()) await message.ack() if self._closing == True: return async def recv_statu(self, ex_name, key, ttl=200): statu_ex = await self._channel_statu.get_exchange(ex_name) arg = {} arg["x-message-ttl"] = ttl queue = await self._channel_statu.declare_queue("", auto_delete=True, arguments=arg) await queue.bind(statu_ex, routing_key=key) async with queue.iterator() as queue_iter: async for message in queue_iter: async with message.process(): self.statu_callback(ex_name, key, message.body.decode()) if self._closing == True: return async def send(self): while self._closing == False: if self._publish_msg_queue.qsize() > 0: msg_bag = self._publish_msg_queue.get(False) if not msg_bag == None: ex_name, key, msg = msg_bag ex = await self._channel_send.get_exchange(ex_name) await ex.publish(aio_pika.Message(body=msg.encode()), routing_key=key) await asyncio.sleep(0.001) # time.sleep(0.001) async def main(self): await self.init() tasks = [] tasks.append(self.send()) if not self._consumer_callbacks == None: for queue, callback in self._queue_callbacks.items(): tasks.append(self.recv(queue, callback)) if not self._recv_status == None: for ex_name, key in self._recv_status: tasks.append(self.recv_statu(ex_name, key)) await asyncio.gather(*tasks) def run(self): asyncio.run(self.main())