import threading import time import asyncio import aio_pika import queue class TimeStatu: def __init__(self,statu=None,timeout=3): self.statu=statu self.time=time.time() self.timeout_ms=timeout def timeout(self): tm=time.time() return tm-self.time>self.timeout_ms class RabbitAsyncCommunicator(threading.Thread): def __init__(self, host,port, user, password): threading.Thread.__init__(self) self._host = host self._port=port self._user = user self._password = password self._connection = None self._channel_consumer = None self._channel_send = None self._channel_statu = None self._consumer_callbacks= None self._recv_status=None self._queue_callbacks= {} self._publish_msg_queue=queue.Queue() self._status={} self._statu_callbacks={} self._closing = False def Init(self,consumer_callbacks,recv_status): self._consumer_callbacks=consumer_callbacks self._recv_status=recv_status if self._recv_status==None: return for ex_name,key in self._recv_status: self._status[ex_name+":"+key]=TimeStatu(None,0.1) def publish(self,ex_name,key,msg): self._publish_msg_queue.put([ex_name,key,msg]) def bind_statu_callback(self,ex_name,key,callback): self._statu_callbacks[ex_name+":"+key]=callback def close(self): self._closing=True async def init(self): connection_string="amqp://%s:%s@%s/"%(self._user,self._password,self._host) self._connection = await aio_pika.connect_robust(connection_string) self._channel_consumer = await self._connection.channel() self._channel_send = await self._connection.channel() self._channel_statu = await self._connection.channel() # Will take no more than 10 messages in advance await self._channel_consumer.set_qos(prefetch_count=1) if self._consumer_callbacks==None: return for queue_name,callback in self._consumer_callbacks: queue= await self._channel_consumer.get_queue(queue_name) self._queue_callbacks[queue]=callback def statu_callback(self,ex,key,msg): id=ex+":"+key self._status[id]=TimeStatu(msg) callback=self._statu_callbacks.get(id) if not callback==None: callback(self._status[id]) def __getitem__(self, key): return self._status[key] def __setitem__(self, key, value): self._status[key]=value async def recv(self,queue,callback): async with queue.iterator() as queue_iter: async for message in queue_iter: if not callback==None: callback(message.body.decode()) await message.ack() if self._closing==True: return async def recv_statu(self,ex_name,key,ttl=1000): statu_ex=await self._channel_statu.get_exchange(ex_name) arg={} arg["x-message-ttl"]=ttl queue=await self._channel_statu.declare_queue("", auto_delete=True,arguments=arg) await queue.bind(statu_ex,routing_key=key) async with queue.iterator() as queue_iter: async for message in queue_iter: async with message.process(): self.statu_callback(ex_name,key,message.body.decode()) if self._closing==True: return async def send(self): ex_map={} while self._closing==False: if self._publish_msg_queue.qsize()>0: try: msg_bag = self._publish_msg_queue.get(False) if not msg_bag == None: ex_name, key, msg = msg_bag ex=ex_map.get(ex_name) if ex == None: ex = await self._channel_send.get_exchange(ex_name) ex_map[ex_name]=ex await ex.publish(aio_pika.Message(body=msg.encode()), routing_key=key) except: await asyncio.sleep(1) await asyncio.sleep(0) time.sleep(0.001) async def main(self): await self.init() tasks=[] tasks.append(self.send()) if not self._consumer_callbacks==None: for queue,callback in self._queue_callbacks.items(): tasks.append(self.recv(queue,callback)) if not self._recv_status==None: for ex_name,key in self._recv_status: tasks.append(self.recv_statu(ex_name,key)) await asyncio.gather(*tasks) def run(self): asyncio.run(self.main())