123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129 |
- import threading
- import time
- import asyncio
- import aio_pika
- import queue
- class TimeStatu:
- def __init__(self, statu=None, timeout=3):
- self.statu = statu
- self.time = time.time()
- self.timeout_ms = timeout
- def timeout(self):
- tm = time.time()
- return tm - self.time > self.timeout_ms
- class RabbitAsyncCommunicator(threading.Thread):
- def __init__(self, host, port, user, password):
- threading.Thread.__init__(self)
- self._host = host
- self._port = port
- self._user = user
- self._password = password
- self._connection = None
- self._channel_consumer = None
- self._channel_send = None
- self._channel_statu = None
- self._consumer_callbacks = None
- self._recv_status = None
- self._queue_callbacks = {}
- self._publish_msg_queue = queue.Queue()
- self._status = {}
- self._statu_callbacks = {}
- self._closing = False
- def Init(self, consumer_callbacks, recv_status):
- self._consumer_callbacks = consumer_callbacks
- self._recv_status = recv_status
- if self._recv_status == None:
- return
- for ex_name, key in self._recv_status:
- self._status[ex_name + ":" + key] = TimeStatu(None, 0.1)
- def publish(self, ex_name, key, msg):
- self._publish_msg_queue.put([ex_name, key, msg])
- def bind_statu_callback(self, ex_name, key, callback):
- self._statu_callbacks[ex_name + ":" + key] = callback
- def close(self):
- self._closing = True
- async def init(self):
- connection_string = "amqp://%s:%s@%s/" % (self._user, self._password, self._host)
- self._connection = await aio_pika.connect_robust(connection_string)
- self._channel_consumer = await self._connection.channel()
- self._channel_send = await self._connection.channel()
- self._channel_statu = await self._connection.channel()
- # Will take no more than 10 messages in advance
- await self._channel_consumer.set_qos(prefetch_count=1)
- if self._consumer_callbacks == None:
- return
- for queue_name, callback in self._consumer_callbacks:
- queue = await self._channel_consumer.get_queue(queue_name)
- self._queue_callbacks[queue] = callback
- def statu_callback(self, ex, key, msg):
- id = ex + ":" + key
- self._status[id] = TimeStatu(msg)
- callback = self._statu_callbacks.get(id)
- if not callback == None:
- callback(self._status[id])
- def __getitem__(self, key):
- return self._status[key]
- def __setitem__(self, key, value):
- self._status[key] = value
- async def recv(self, queue, callback):
- async with queue.iterator() as queue_iter:
- async for message in queue_iter:
- if not callback == None:
- callback(message.body.decode())
- await message.ack()
- if self._closing == True:
- return
- async def recv_statu(self, ex_name, key, ttl=200):
- statu_ex = await self._channel_statu.get_exchange(ex_name)
- arg = {}
- arg["x-message-ttl"] = ttl
- queue = await self._channel_statu.declare_queue("", auto_delete=True, arguments=arg)
- await queue.bind(statu_ex, routing_key=key)
- async with queue.iterator() as queue_iter:
- async for message in queue_iter:
- async with message.process():
- self.statu_callback(ex_name, key, message.body.decode())
- if self._closing == True:
- return
- async def send(self):
- while self._closing == False:
- if self._publish_msg_queue.qsize() > 0:
- msg_bag = self._publish_msg_queue.get(False)
- if not msg_bag == None:
- ex_name, key, msg = msg_bag
- ex = await self._channel_send.get_exchange(ex_name)
- await ex.publish(aio_pika.Message(body=msg.encode()), routing_key=key)
- await asyncio.sleep(0.001)
- # time.sleep(0.001)
- async def main(self):
- await self.init()
- tasks = []
- tasks.append(self.send())
- if not self._consumer_callbacks == None:
- for queue, callback in self._queue_callbacks.items():
- tasks.append(self.recv(queue, callback))
- if not self._recv_status == None:
- for ex_name, key in self._recv_status:
- tasks.append(self.recv_statu(ex_name, key))
- await asyncio.gather(*tasks)
- def run(self):
- asyncio.run(self.main())
|