|
@@ -0,0 +1,135 @@
|
|
|
+import threading
|
|
|
+import time
|
|
|
+import asyncio
|
|
|
+import aio_pika
|
|
|
+import queue
|
|
|
+
|
|
|
+
|
|
|
+class TimeStatu:
|
|
|
+ def __init__(self, statu=None, timeout=3):
|
|
|
+ self.statu = statu
|
|
|
+ self.time = time.time()
|
|
|
+ self.timeout_ms = timeout
|
|
|
+
|
|
|
+ def timeout(self, timeout=0):
|
|
|
+ tm = time.time()
|
|
|
+ if timeout > self.timeout_ms:
|
|
|
+ return tm - self.time > self.timeout_ms
|
|
|
+ return tm - self.time > self.timeout_ms
|
|
|
+
|
|
|
+ def reset(self, statu, timeout):
|
|
|
+ self.statu = statu
|
|
|
+ self.time = time.time()
|
|
|
+ self.timeout_ms = timeout
|
|
|
+
|
|
|
+
|
|
|
+class RabbitAsyncCommunicator(threading.Thread):
|
|
|
+ def __init__(self, host, port, user, password):
|
|
|
+ threading.Thread.__init__(self)
|
|
|
+ self._host = host
|
|
|
+ self._port = port
|
|
|
+ self._user = user
|
|
|
+ self._password = password
|
|
|
+ self._connection = None
|
|
|
+ self._channel_consumer = None
|
|
|
+ self._channel_send = None
|
|
|
+ self._channel_statu = None
|
|
|
+
|
|
|
+ self._consumer_callbacks = None
|
|
|
+ self._recv_status = None
|
|
|
+ self._queue_callbacks = {}
|
|
|
+ self._publish_msg_queue = queue.Queue()
|
|
|
+ self._status = {}
|
|
|
+ self._statu_callbacks = {}
|
|
|
+ self._closing = False
|
|
|
+
|
|
|
+ def Init(self, consumer_callbacks, recv_status):
|
|
|
+ self._consumer_callbacks = consumer_callbacks
|
|
|
+ self._recv_status = recv_status
|
|
|
+ if self._recv_status == None:
|
|
|
+ return
|
|
|
+ for ex_name, key in self._recv_status:
|
|
|
+ self._status[ex_name + ":" + key] = TimeStatu(None, 0.1)
|
|
|
+
|
|
|
+ def publish(self, ex_name, key, msg, timeout=None):
|
|
|
+ self._publish_msg_queue.put([ex_name, key, msg, timeout])
|
|
|
+
|
|
|
+ def bind_statu_callback(self, ex_name, key, callback):
|
|
|
+ self._statu_callbacks[ex_name + ":" + key] = callback
|
|
|
+
|
|
|
+ def close(self):
|
|
|
+ self._closing = True
|
|
|
+
|
|
|
+ async def init(self):
|
|
|
+ connection_string = "amqp://%s:%s@%s/" % (self._user, self._password, self._host)
|
|
|
+ self._connection = await aio_pika.connect_robust(connection_string)
|
|
|
+ self._channel_consumer = await self._connection.channel()
|
|
|
+ self._channel_send = await self._connection.channel()
|
|
|
+ self._channel_statu = await self._connection.channel()
|
|
|
+ # Will take no more than 10 messages in advance
|
|
|
+ await self._channel_consumer.set_qos(prefetch_count=1)
|
|
|
+ if self._consumer_callbacks == None:
|
|
|
+ return
|
|
|
+ for queue_name, callback in self._consumer_callbacks:
|
|
|
+ queue = await self._channel_consumer.get_queue(queue_name)
|
|
|
+ self._queue_callbacks[queue] = callback
|
|
|
+
|
|
|
+ def statu_callback(self, ex, key, msg):
|
|
|
+ id = ex + ":" + key
|
|
|
+ self._status[id] = TimeStatu(msg)
|
|
|
+ callback = self._statu_callbacks.get(id)
|
|
|
+ if not callback == None:
|
|
|
+ callback(self._status[id])
|
|
|
+
|
|
|
+ def __getitem__(self, key):
|
|
|
+ return self._status[key]
|
|
|
+
|
|
|
+ def __setitem__(self, key, value):
|
|
|
+ self._status[key] = value
|
|
|
+
|
|
|
+ async def recv(self, queue, callback):
|
|
|
+ async with queue.iterator() as queue_iter:
|
|
|
+ async for message in queue_iter:
|
|
|
+ if not callback == None:
|
|
|
+ callback(message.body.decode())
|
|
|
+ await message.ack()
|
|
|
+ if self._closing == True:
|
|
|
+ return
|
|
|
+
|
|
|
+ async def recv_statu(self, ex_name, key, ttl=200):
|
|
|
+ statu_ex = await self._channel_statu.get_exchange(ex_name)
|
|
|
+ arg = {}
|
|
|
+ arg["x-message-ttl"] = ttl
|
|
|
+ queue = await self._channel_statu.declare_queue("", auto_delete=True, arguments=arg)
|
|
|
+ await queue.bind(statu_ex, routing_key=key)
|
|
|
+ async with queue.iterator() as queue_iter:
|
|
|
+ async for message in queue_iter:
|
|
|
+ async with message.process():
|
|
|
+ self.statu_callback(ex_name, key, message.body.decode())
|
|
|
+ if self._closing == True:
|
|
|
+ return
|
|
|
+
|
|
|
+ async def send(self):
|
|
|
+ while self._closing == False:
|
|
|
+ if self._publish_msg_queue.qsize() > 0:
|
|
|
+ msg_bag = self._publish_msg_queue.get(False)
|
|
|
+ if not msg_bag == None:
|
|
|
+ ex_name, key, msg, ttl = msg_bag
|
|
|
+ ex = await self._channel_send.get_exchange(ex_name)
|
|
|
+ await ex.publish(aio_pika.Message(body=msg.encode(), expiration=ttl), routing_key=key)
|
|
|
+ await asyncio.sleep(0.001)
|
|
|
+
|
|
|
+ async def main(self):
|
|
|
+ await self.init()
|
|
|
+ tasks = []
|
|
|
+ tasks.append(self.send())
|
|
|
+ if not self._consumer_callbacks == None:
|
|
|
+ for queue, callback in self._queue_callbacks.items():
|
|
|
+ tasks.append(self.recv(queue, callback))
|
|
|
+ if not self._recv_status == None:
|
|
|
+ for ex_name, key in self._recv_status:
|
|
|
+ tasks.append(self.recv_statu(ex_name, key))
|
|
|
+ await asyncio.gather(*tasks)
|
|
|
+
|
|
|
+ def run(self):
|
|
|
+ asyncio.run(self.main())
|