123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138 |
- import threading
- import time
- import asyncio
- from concurrent.futures import ThreadPoolExecutor
- import aio_pika
- import queue
- class TimeStatu:
- def __init__(self,statu=None,timeout=3):
- self.statu=statu
- self.time=time.time()
- self.timeout_ms=timeout
- def timeout(self):
- tm=time.time()
- return tm-self.time>self.timeout_ms or self.statu==None
- class RabbitAsyncCommunicator(threading.Thread):
- def __init__(self, host,port, user, password):
- threading.Thread.__init__(self)
- self._host = host
- self._port=port
- self._user = user
- self._password = password
- self._connection = None
- self._channel_consumer = None
- self._channel_send = None
- self._channel_statu = None
- self._consumer_callbacks= None
- self._recv_status=None
- self._queue_callbacks= {}
- self._publish_msg_queue=queue.Queue()
- self._status={}
- self._statu_callbacks={}
- self._closing = False
- self._receiver = None
- self._pool = ThreadPoolExecutor()
- def Init(self,consumer_callbacks,recv_status,receiver=None):
- self._consumer_callbacks=consumer_callbacks
- self._recv_status=recv_status
- self._receiver = receiver
- if self._recv_status==None:
- return
- for ex_name,key in self._recv_status:
- self._status[ex_name+":"+key]=TimeStatu(None,0.1)
- def publish(self,ex_name,key,msg):
- self._publish_msg_queue.put([ex_name,key,msg])
- def bind_statu_callback(self,ex_name,key,callback):
- self._statu_callbacks[ex_name+":"+key]=callback
- def close(self):
- self._closing=True
- self._pool.shutdown(wait=True)
- async def init(self):
- connection_string="amqp://%s:%s@%s/"%(self._user,self._password,self._host)
- self._connection = await aio_pika.connect_robust(connection_string)
- self._channel_consumer = await self._connection.channel()
- self._channel_send = await self._connection.channel()
- self._channel_statu = await self._connection.channel()
- # Will take no more than 10 messages in advance
- await self._channel_consumer.set_qos(prefetch_count=1)
- if self._consumer_callbacks==None:
- return
- for queue_name,callback in self._consumer_callbacks:
- queue= await self._channel_consumer.get_queue(queue_name)
- self._queue_callbacks[queue]=callback
- def statu_callback(self,ex,key,msg):
- id=ex+":"+key
- self._status[id]=TimeStatu(msg)
- callback=self._statu_callbacks.get(id)
- if not callback==None:
- callback(self._status[id],ex,key)
- def __getitem__(self, key):
- return self._status[key]
- def __setitem__(self, key, value):
- self._status[key]=value
- async def recv(self,queue,callback):
- await asyncio.sleep(2)
- print(" 订阅队列:%s" % queue)
- async with queue.iterator() as queue_iter:
- async for message in queue_iter:
- if not callback==None:
- self._pool.submit(callback,message.body.decode())
- await message.ack()
- if self._closing==True:
- return
- async def recv_statu(self,ex_name,key,ttl=200):
- statu_ex=await self._channel_statu.get_exchange(ex_name)
- arg={}
- arg["x-message-ttl"]=ttl
- if self._receiver is not None:
- queue=await self._channel_statu.declare_queue(key+"_for_"+self._receiver, auto_delete=True,arguments=arg)
- else:
- queue=await self._channel_statu.declare_queue('', auto_delete=True,arguments=arg)
- await queue.bind(statu_ex,routing_key=key)
- print("订阅状态端口:%s" % key)
- async with queue.iterator() as queue_iter:
- async for message in queue_iter:
- async with message.process():
- self.statu_callback(ex_name,key,message.body.decode())
- if self._closing==True:
- return
- async def send(self):
- while self._closing==False:
- if self._publish_msg_queue.qsize()>0:
- msg_bag=self._publish_msg_queue.get(False)
- if not msg_bag==None:
- ex_name,key,msg=msg_bag
- ex= await self._channel_send.get_exchange(ex_name)
- await ex.publish(aio_pika.Message(body=msg.encode()),routing_key=key)
- await asyncio.sleep(0.001)
- #time.sleep(0.001)
- async def main(self):
- await self.init()
- tasks=[]
- tasks.append(self.send())
- if not self._consumer_callbacks==None:
- for queue,callback in self._queue_callbacks.items():
- tasks.append(self.recv(queue,callback))
- if not self._recv_status==None:
- for ex_name,key in self._recv_status:
- tasks.append(self.recv_statu(ex_name,key))
- await asyncio.gather(*tasks)
- def run(self):
- asyncio.run(self.main())
|