async_communication.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. import threading
  2. import time
  3. import asyncio
  4. import aio_pika
  5. import queue
  6. class TimeStatu:
  7. def __init__(self,statu=None,timeout=3):
  8. self.statu=statu
  9. self.time=time.time()
  10. self.timeout_ms=timeout
  11. def timeout(self):
  12. tm=time.time()
  13. return tm-self.time>self.timeout_ms
  14. class RabbitAsyncCommunicator(threading.Thread):
  15. def __init__(self, host,port, user, password):
  16. threading.Thread.__init__(self)
  17. self._host = host
  18. self._port=port
  19. self._user = user
  20. self._password = password
  21. self._connection = None
  22. self._channel_consumer = None
  23. self._channel_send = None
  24. self._channel_statu = None
  25. self._consumer_callbacks= None
  26. self._recv_status=None
  27. self._queue_callbacks= {}
  28. self._publish_msg_queue=queue.Queue()
  29. self._status={}
  30. self._statu_callbacks={}
  31. self._closing = False
  32. def Init(self,consumer_callbacks,recv_status):
  33. self._consumer_callbacks=consumer_callbacks
  34. self._recv_status=recv_status
  35. if self._recv_status==None:
  36. return
  37. for ex_name,key in self._recv_status:
  38. self._status[ex_name+":"+key]=TimeStatu(None,0.1)
  39. def publish(self,ex_name,key,msg):
  40. self._publish_msg_queue.put([ex_name,key,msg])
  41. def bind_statu_callback(self,ex_name,key,callback):
  42. self._statu_callbacks[ex_name+":"+key]=callback
  43. def close(self):
  44. self._closing=True
  45. async def init(self):
  46. connection_string="amqp://%s:%s@%s/"%(self._user,self._password,self._host)
  47. self._connection = await aio_pika.connect_robust(connection_string)
  48. self._channel_consumer = await self._connection.channel()
  49. self._channel_send = await self._connection.channel()
  50. self._channel_statu = await self._connection.channel()
  51. # Will take no more than 10 messages in advance
  52. await self._channel_consumer.set_qos(prefetch_count=1)
  53. if self._consumer_callbacks==None:
  54. return
  55. for queue_name,callback in self._consumer_callbacks:
  56. queue= await self._channel_consumer.get_queue(queue_name)
  57. self._queue_callbacks[queue]=callback
  58. def statu_callback(self,ex,key,msg):
  59. id=ex+":"+key
  60. self._status[id]=TimeStatu(msg)
  61. callback=self._statu_callbacks.get(id)
  62. if not callback==None:
  63. callback(self._status[id])
  64. def __getitem__(self, key):
  65. return self._status[key]
  66. def __setitem__(self, key, value):
  67. self._status[key]=value
  68. async def recv(self,queue,callback):
  69. async with queue.iterator() as queue_iter:
  70. async for message in queue_iter:
  71. if not callback==None:
  72. callback(message.body.decode())
  73. await message.ack()
  74. if self._closing==True:
  75. return
  76. async def recv_statu(self,ex_name,key,ttl=1000):
  77. statu_ex=await self._channel_statu.get_exchange(ex_name)
  78. arg={}
  79. arg["x-message-ttl"]=ttl
  80. queue=await self._channel_statu.declare_queue("", auto_delete=True,arguments=arg)
  81. await queue.bind(statu_ex,routing_key=key)
  82. async with queue.iterator() as queue_iter:
  83. async for message in queue_iter:
  84. async with message.process():
  85. self.statu_callback(ex_name,key,message.body.decode())
  86. if self._closing==True:
  87. return
  88. async def send(self):
  89. while self._closing==False:
  90. if self._publish_msg_queue.qsize()>0:
  91. try:
  92. msg_bag = self._publish_msg_queue.get(False)
  93. if not msg_bag == None:
  94. ex_name, key, msg = msg_bag
  95. ex = await self._channel_send.get_exchange(ex_name)
  96. await ex.publish(aio_pika.Message(body=msg.encode()), routing_key=key)
  97. except:
  98. await asyncio.sleep(1)
  99. await asyncio.sleep(0.001)
  100. #time.sleep(0.001)
  101. async def main(self):
  102. await self.init()
  103. tasks=[]
  104. tasks.append(self.send())
  105. if not self._consumer_callbacks==None:
  106. for queue,callback in self._queue_callbacks.items():
  107. tasks.append(self.recv(queue,callback))
  108. if not self._recv_status==None:
  109. for ex_name,key in self._recv_status:
  110. tasks.append(self.recv_statu(ex_name,key))
  111. await asyncio.gather(*tasks)
  112. def run(self):
  113. asyncio.run(self.main())