import threading import time import asyncio import aio_pika import queue class TimeStatu: def __init__(self,statu=None,timeout=3): self.statu=statu self.time=time.time() self.timeout_ms=timeout def timeout(self): tm=time.time() return tm-self.time>self.timeout_ms or self.statu==None class RabbitAsyncCommunicator(threading.Thread): def __init__(self, host,port, user, password): threading.Thread.__init__(self) self._host = host self._port=port self._user = user self._password = password self._connection = None self._channel_consumer = None self._channel_send = None self._channel_statu = None self._consumer_callbacks= None self._recv_status=None self._queue_callbacks= {} self._publish_msg_queue=queue.Queue() self._status={} self._statu_callbacks={} self._closing = False self._receiver = None def Init(self,consumer_callbacks,recv_status,receiver): self._consumer_callbacks=consumer_callbacks self._recv_status=recv_status self._receiver = receiver if self._recv_status==None: return for ex_name,key in self._recv_status: self._status[ex_name+":"+key]=TimeStatu(None,0.1) def publish(self,ex_name,key,msg): self._publish_msg_queue.put([ex_name,key,msg]) def bind_statu_callback(self,ex_name,key,callback): self._statu_callbacks[ex_name+":"+key]=callback def close(self): self._closing=True async def init(self): connection_string="amqp://%s:%s@%s/"%(self._user,self._password,self._host) self._connection = await aio_pika.connect_robust(connection_string) self._channel_consumer = await self._connection.channel() self._channel_send = await self._connection.channel() self._channel_statu = await self._connection.channel() # Will take no more than 10 messages in advance await self._channel_consumer.set_qos(prefetch_count=1) if self._consumer_callbacks==None: return for queue_name,callback in self._consumer_callbacks: queue= await self._channel_consumer.get_queue(queue_name) self._queue_callbacks[queue]=callback def statu_callback(self,ex,key,msg): id=ex+":"+key self._status[id]=TimeStatu(msg) callback=self._statu_callbacks.get(id) if not callback==None: callback(self._status[id],ex,key) def __getitem__(self, key): return self._status[key] def __setitem__(self, key, value): self._status[key]=value async def recv(self,queue,callback): await asyncio.sleep(2) print(" 订阅队列:%s" % queue) async with queue.iterator() as queue_iter: async for message in queue_iter: if not callback==None: await callback(message.body.decode()) await message.ack() if self._closing==True: return async def recv_statu(self,ex_name,key,ttl=200): statu_ex=await self._channel_statu.get_exchange(ex_name) arg={} arg["x-message-ttl"]=ttl queue=await self._channel_statu.declare_queue(key+"_for_"+self._receiver, auto_delete=True,arguments=arg) await queue.bind(statu_ex,routing_key=key) print("订阅状态端口:%s" % key) async with queue.iterator() as queue_iter: async for message in queue_iter: async with message.process(): self.statu_callback(ex_name,key,message.body.decode()) if self._closing==True: return async def send(self): while self._closing==False: if self._publish_msg_queue.qsize()>0: msg_bag=self._publish_msg_queue.get(False) if not msg_bag==None: ex_name,key,msg=msg_bag ex= await self._channel_send.get_exchange(ex_name) await ex.publish(aio_pika.Message(body=msg.encode()),routing_key=key) await asyncio.sleep(0) #time.sleep(0.001) async def main(self): await self.init() tasks=[] tasks.append(self.send()) if not self._consumer_callbacks==None: for queue,callback in self._queue_callbacks.items(): tasks.append(self.recv(queue,callback)) if not self._recv_status==None: for ex_name,key in self._recv_status: tasks.append(self.recv_statu(ex_name,key)) await asyncio.gather(*tasks) def run(self): asyncio.run(self.main())