async_communication.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. import threading
  2. import time
  3. import asyncio
  4. import aio_pika
  5. import queue
  6. class TimeStatu:
  7. def __init__(self,statu=None,timeout=3):
  8. self.statu=statu
  9. self.time=time.time()
  10. self.timeout_ms=timeout
  11. def timeout(self):
  12. tm=time.time()
  13. return tm-self.time>self.timeout_ms or self.statu==None
  14. class RabbitAsyncCommunicator(threading.Thread):
  15. def __init__(self, host,port, user, password):
  16. threading.Thread.__init__(self)
  17. self._host = host
  18. self._port=port
  19. self._user = user
  20. self._password = password
  21. self._connection = None
  22. self._channel_consumer = None
  23. self._channel_send = None
  24. self._channel_statu = None
  25. self._consumer_callbacks= None
  26. self._recv_status=None
  27. self._queue_callbacks= {}
  28. self._publish_msg_queue=queue.Queue()
  29. self._status={}
  30. self._statu_callbacks={}
  31. self._closing = False
  32. self._receiver = None
  33. def Init(self,consumer_callbacks,recv_status,receiver):
  34. self._consumer_callbacks=consumer_callbacks
  35. self._recv_status=recv_status
  36. self._receiver = receiver
  37. if self._recv_status==None:
  38. return
  39. for ex_name,key in self._recv_status:
  40. self._status[ex_name+":"+key]=TimeStatu(None,0.1)
  41. def publish(self,ex_name,key,msg):
  42. self._publish_msg_queue.put([ex_name,key,msg])
  43. def bind_statu_callback(self,ex_name,key,callback):
  44. self._statu_callbacks[ex_name+":"+key]=callback
  45. def close(self):
  46. self._closing=True
  47. async def init(self):
  48. connection_string="amqp://%s:%s@%s/"%(self._user,self._password,self._host)
  49. self._connection = await aio_pika.connect_robust(connection_string)
  50. self._channel_consumer = await self._connection.channel()
  51. self._channel_send = await self._connection.channel()
  52. self._channel_statu = await self._connection.channel()
  53. # Will take no more than 10 messages in advance
  54. await self._channel_consumer.set_qos(prefetch_count=1)
  55. if self._consumer_callbacks==None:
  56. return
  57. for queue_name,callback in self._consumer_callbacks:
  58. queue= await self._channel_consumer.get_queue(queue_name)
  59. self._queue_callbacks[queue]=callback
  60. def statu_callback(self,ex,key,msg):
  61. id=ex+":"+key
  62. self._status[id]=TimeStatu(msg)
  63. callback=self._statu_callbacks.get(id)
  64. if not callback==None:
  65. callback(self._status[id],ex,key)
  66. def __getitem__(self, key):
  67. return self._status[key]
  68. def __setitem__(self, key, value):
  69. self._status[key]=value
  70. async def recv(self,queue,callback):
  71. await asyncio.sleep(2)
  72. print(" 订阅队列:%s" % queue)
  73. async with queue.iterator() as queue_iter:
  74. async for message in queue_iter:
  75. if not callback==None:
  76. await callback(message.body.decode())
  77. await message.ack()
  78. if self._closing==True:
  79. return
  80. async def recv_statu(self,ex_name,key,ttl=200):
  81. statu_ex=await self._channel_statu.get_exchange(ex_name)
  82. arg={}
  83. arg["x-message-ttl"]=ttl
  84. queue=await self._channel_statu.declare_queue(key+"_for_"+self._receiver, auto_delete=True,arguments=arg)
  85. await queue.bind(statu_ex,routing_key=key)
  86. print("订阅状态端口:%s" % key)
  87. async with queue.iterator() as queue_iter:
  88. async for message in queue_iter:
  89. async with message.process():
  90. self.statu_callback(ex_name,key,message.body.decode())
  91. if self._closing==True:
  92. return
  93. async def send(self):
  94. while self._closing==False:
  95. if self._publish_msg_queue.qsize()>0:
  96. msg_bag=self._publish_msg_queue.get(False)
  97. if not msg_bag==None:
  98. ex_name,key,msg=msg_bag
  99. ex= await self._channel_send.get_exchange(ex_name)
  100. await ex.publish(aio_pika.Message(body=msg.encode()),routing_key=key)
  101. await asyncio.sleep(0)
  102. #time.sleep(0.001)
  103. async def main(self):
  104. await self.init()
  105. tasks=[]
  106. tasks.append(self.send())
  107. if not self._consumer_callbacks==None:
  108. for queue,callback in self._queue_callbacks.items():
  109. tasks.append(self.recv(queue,callback))
  110. if not self._recv_status==None:
  111. for ex_name,key in self._recv_status:
  112. tasks.append(self.recv_statu(ex_name,key))
  113. await asyncio.gather(*tasks)
  114. def run(self):
  115. asyncio.run(self.main())