async_communication.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. import threading
  2. import time
  3. import asyncio
  4. import aio_pika
  5. import queue
  6. class TimeStatu:
  7. def __init__(self, statu=None, timeout=3):
  8. self.statu = statu
  9. self.time = time.time()
  10. self.timeout_ms = timeout
  11. def timeout(self, timeout=0):
  12. tm = time.time()
  13. if timeout > self.timeout_ms:
  14. return tm - self.time > self.timeout_ms
  15. return tm - self.time > self.timeout_ms
  16. def reset(self, statu, timeout):
  17. self.statu = statu
  18. self.time = time.time()
  19. self.timeout_ms = timeout
  20. class RabbitAsyncCommunicator(threading.Thread):
  21. def __init__(self, host, port, user, password):
  22. threading.Thread.__init__(self)
  23. self._host = host
  24. self._port = port
  25. self._user = user
  26. self._password = password
  27. self._connection = None
  28. self._channel_consumer = None
  29. self._channel_send = None
  30. self._channel_statu = None
  31. self._consumer_callbacks = None
  32. self._recv_status = None
  33. self._queue_callbacks = {}
  34. self._publish_msg_queue = queue.Queue()
  35. self._status = {}
  36. self._statu_callbacks = {}
  37. self._closing = False
  38. def Init(self, consumer_callbacks, recv_status):
  39. self._consumer_callbacks = consumer_callbacks
  40. self._recv_status = recv_status
  41. if self._recv_status == None:
  42. return
  43. for ex_name, key in self._recv_status:
  44. self._status[ex_name + ":" + key] = TimeStatu(None, 0.1)
  45. def publish(self, ex_name, key, msg, timeout=None):
  46. self._publish_msg_queue.put([ex_name, key, msg, timeout])
  47. def bind_statu_callback(self, ex_name, key, callback):
  48. self._statu_callbacks[ex_name + ":" + key] = callback
  49. def close(self):
  50. self._closing = True
  51. async def init(self):
  52. connection_string = "amqp://%s:%s@%s/" % (self._user, self._password, self._host)
  53. self._connection = await aio_pika.connect_robust(connection_string)
  54. self._channel_consumer = await self._connection.channel()
  55. self._channel_send = await self._connection.channel()
  56. self._channel_statu = await self._connection.channel()
  57. # Will take no more than 10 messages in advance
  58. await self._channel_consumer.set_qos(prefetch_count=1)
  59. if self._consumer_callbacks == None:
  60. return
  61. for queue_name, callback in self._consumer_callbacks:
  62. queue = await self._channel_consumer.get_queue(queue_name)
  63. self._queue_callbacks[queue] = callback
  64. def statu_callback(self, ex, key, msg):
  65. id = ex + ":" + key
  66. self._status[id] = TimeStatu(msg)
  67. callback = self._statu_callbacks.get(id)
  68. if not callback == None:
  69. callback(self._status[id])
  70. def __getitem__(self, key):
  71. return self._status[key]
  72. def __setitem__(self, key, value):
  73. self._status[key] = value
  74. async def recv(self, queue, callback):
  75. async with queue.iterator() as queue_iter:
  76. async for message in queue_iter:
  77. if not callback == None:
  78. callback(message.body.decode())
  79. await message.ack()
  80. if self._closing == True:
  81. return
  82. async def recv_statu(self, ex_name, key, ttl=200):
  83. statu_ex = await self._channel_statu.get_exchange(ex_name)
  84. arg = {}
  85. arg["x-message-ttl"] = ttl
  86. queue = await self._channel_statu.declare_queue("", auto_delete=True, arguments=arg)
  87. await queue.bind(statu_ex, routing_key=key)
  88. async with queue.iterator() as queue_iter:
  89. async for message in queue_iter:
  90. async with message.process():
  91. self.statu_callback(ex_name, key, message.body.decode())
  92. if self._closing == True:
  93. return
  94. async def send(self):
  95. while self._closing == False:
  96. if self._publish_msg_queue.qsize() > 0:
  97. msg_bag = self._publish_msg_queue.get(False)
  98. if not msg_bag == None:
  99. ex_name, key, msg, ttl = msg_bag
  100. ex = await self._channel_send.get_exchange(ex_name)
  101. await ex.publish(aio_pika.Message(body=msg.encode(), expiration=ttl), routing_key=key)
  102. await asyncio.sleep(0.001)
  103. async def main(self):
  104. await self.init()
  105. tasks = []
  106. tasks.append(self.send())
  107. if not self._consumer_callbacks == None:
  108. for queue, callback in self._queue_callbacks.items():
  109. tasks.append(self.recv(queue, callback))
  110. if not self._recv_status == None:
  111. for ex_name, key in self._recv_status:
  112. tasks.append(self.recv_statu(ex_name, key))
  113. await asyncio.gather(*tasks)
  114. def run(self):
  115. asyncio.run(self.main())