async_communication.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. import threading
  2. import time
  3. import asyncio
  4. from concurrent.futures import ThreadPoolExecutor
  5. import aio_pika
  6. import queue
  7. class TimeStatu:
  8. def __init__(self,statu=None,timeout=3):
  9. self.statu=statu
  10. self.time=time.time()
  11. self.timeout_ms=timeout
  12. def timeout(self):
  13. tm=time.time()
  14. return tm-self.time>self.timeout_ms or self.statu==None
  15. class RabbitAsyncCommunicator(threading.Thread):
  16. def __init__(self, host,port, user, password):
  17. threading.Thread.__init__(self)
  18. self._host = host
  19. self._port=port
  20. self._user = user
  21. self._password = password
  22. self._connection = None
  23. self._channel_consumer = None
  24. self._channel_send = None
  25. self._channel_statu = None
  26. self._consumer_callbacks= None
  27. self._recv_status=None
  28. self._queue_callbacks= {}
  29. self._publish_msg_queue=queue.Queue()
  30. self._status={}
  31. self._statu_callbacks={}
  32. self._closing = False
  33. self._receiver = None
  34. self._pool = ThreadPoolExecutor()
  35. def Init(self,consumer_callbacks,recv_status,receiver=None):
  36. self._consumer_callbacks=consumer_callbacks
  37. self._recv_status=recv_status
  38. self._receiver = receiver
  39. if self._recv_status==None:
  40. return
  41. for ex_name,key in self._recv_status:
  42. self._status[ex_name+":"+key]=TimeStatu(None,0.1)
  43. def publish(self,ex_name,key,msg):
  44. self._publish_msg_queue.put([ex_name,key,msg])
  45. def bind_statu_callback(self,ex_name,key,callback):
  46. self._statu_callbacks[ex_name+":"+key]=callback
  47. def close(self):
  48. self._closing=True
  49. self._pool.shutdown(wait=True)
  50. async def init(self):
  51. connection_string="amqp://%s:%s@%s/"%(self._user,self._password,self._host)
  52. self._connection = await aio_pika.connect_robust(connection_string)
  53. self._channel_consumer = await self._connection.channel()
  54. self._channel_send = await self._connection.channel()
  55. self._channel_statu = await self._connection.channel()
  56. # Will take no more than 10 messages in advance
  57. await self._channel_consumer.set_qos(prefetch_count=1)
  58. if self._consumer_callbacks==None:
  59. return
  60. for queue_name,callback in self._consumer_callbacks:
  61. queue= await self._channel_consumer.get_queue(queue_name)
  62. self._queue_callbacks[queue]=callback
  63. def statu_callback(self,ex,key,msg):
  64. id=ex+":"+key
  65. self._status[id]=TimeStatu(msg)
  66. callback=self._statu_callbacks.get(id)
  67. if not callback==None:
  68. callback(self._status[id],ex,key)
  69. def __getitem__(self, key):
  70. return self._status[key]
  71. def __setitem__(self, key, value):
  72. self._status[key]=value
  73. async def recv(self,queue,callback):
  74. await asyncio.sleep(2)
  75. print(" 订阅队列:%s" % queue)
  76. async with queue.iterator() as queue_iter:
  77. async for message in queue_iter:
  78. if not callback==None:
  79. self._pool.submit(callback,message.body.decode())
  80. await message.ack()
  81. if self._closing==True:
  82. return
  83. async def recv_statu(self,ex_name,key,ttl=200):
  84. statu_ex=await self._channel_statu.get_exchange(ex_name)
  85. arg={}
  86. arg["x-message-ttl"]=ttl
  87. if self._receiver is not None:
  88. queue=await self._channel_statu.declare_queue(key+"_for_"+self._receiver, auto_delete=True,arguments=arg)
  89. else:
  90. queue=await self._channel_statu.declare_queue('', auto_delete=True,arguments=arg)
  91. await queue.bind(statu_ex,routing_key=key)
  92. print("订阅状态端口:%s" % key)
  93. async with queue.iterator() as queue_iter:
  94. async for message in queue_iter:
  95. async with message.process():
  96. self.statu_callback(ex_name,key,message.body.decode())
  97. if self._closing==True:
  98. return
  99. async def send(self):
  100. while self._closing==False:
  101. if self._publish_msg_queue.qsize()>0:
  102. msg_bag=self._publish_msg_queue.get(False)
  103. if not msg_bag==None:
  104. ex_name,key,msg=msg_bag
  105. ex= await self._channel_send.get_exchange(ex_name)
  106. await ex.publish(aio_pika.Message(body=msg.encode()),routing_key=key)
  107. await asyncio.sleep(0.001)
  108. #time.sleep(0.001)
  109. async def main(self):
  110. await self.init()
  111. tasks=[]
  112. tasks.append(self.send())
  113. if not self._consumer_callbacks==None:
  114. for queue,callback in self._queue_callbacks.items():
  115. tasks.append(self.recv(queue,callback))
  116. if not self._recv_status==None:
  117. for ex_name,key in self._recv_status:
  118. tasks.append(self.recv_statu(ex_name,key))
  119. await asyncio.gather(*tasks)
  120. def run(self):
  121. asyncio.run(self.main())