async_communication.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. import threading
  2. import time
  3. import asyncio
  4. import aio_pika
  5. import queue
  6. class TimeStatu:
  7. def __init__(self,statu=None,timeout=3):
  8. self.statu=statu
  9. self.time=time.time()
  10. self.timeout_ms=timeout
  11. def timeout(self):
  12. tm=time.time()
  13. return tm-self.time>self.timeout_ms or self.statu==None
  14. class RabbitAsyncCommunicator(threading.Thread):
  15. def __init__(self, host,port, user, password):
  16. threading.Thread.__init__(self)
  17. self._host = host
  18. self._port=port
  19. self._user = user
  20. self._password = password
  21. self._connection = None
  22. self._channel_consumer = None
  23. self._channel_send = None
  24. self._channel_statu = None
  25. self._consumer_callbacks= None
  26. self._recv_status=None
  27. self._queue_callbacks= {}
  28. self._publish_msg_queue=queue.Queue()
  29. self._status={}
  30. self._statu_callbacks={}
  31. self._closing = False
  32. def Init(self,consumer_callbacks,recv_status):
  33. self._consumer_callbacks=consumer_callbacks
  34. self._recv_status=recv_status
  35. if self._recv_status==None:
  36. return
  37. for ex_name,key in self._recv_status:
  38. self._status[ex_name+":"+key]=TimeStatu(None,0.1)
  39. def publish(self,ex_name,key,msg):
  40. self._publish_msg_queue.put([ex_name,key,msg])
  41. def bind_statu_callback(self,ex_name,key,callback):
  42. self._statu_callbacks[ex_name+":"+key]=callback
  43. def close(self):
  44. self._closing=True
  45. async def init(self):
  46. connection_string="amqp://%s:%s@%s/"%(self._user,self._password,self._host)
  47. self._connection = await aio_pika.connect_robust(connection_string)
  48. self._channel_consumer = await self._connection.channel()
  49. self._channel_send = await self._connection.channel()
  50. self._channel_statu = await self._connection.channel()
  51. # Will take no more than 10 messages in advance
  52. await self._channel_consumer.set_qos(prefetch_count=1)
  53. if self._consumer_callbacks==None:
  54. return
  55. for queue_name,callback in self._consumer_callbacks:
  56. queue= await self._channel_consumer.get_queue(queue_name)
  57. self._queue_callbacks[queue]=callback
  58. def statu_callback(self,ex,key,msg):
  59. id=ex+":"+key
  60. self._status[id]=TimeStatu(msg)
  61. callback=self._statu_callbacks.get(id)
  62. if not callback==None:
  63. callback(self._status[id])
  64. def __getitem__(self, key):
  65. return self._status[key]
  66. def __setitem__(self, key, value):
  67. self._status[key]=value
  68. async def recv(self,queue,callback):
  69. await asyncio.sleep(2)
  70. print(" 订阅队列:%s" % queue)
  71. async with queue.iterator() as queue_iter:
  72. async for message in queue_iter:
  73. if not callback==None:
  74. await callback(message.body.decode())
  75. await message.ack()
  76. if self._closing==True:
  77. return
  78. async def recv_statu(self,ex_name,key,ttl=200):
  79. statu_ex=await self._channel_statu.get_exchange(ex_name)
  80. arg={}
  81. arg["x-message-ttl"]=ttl
  82. queue=await self._channel_statu.declare_queue("", auto_delete=True,arguments=arg)
  83. await queue.bind(statu_ex,routing_key=key)
  84. print("订阅状态端口:%s" % key)
  85. async with queue.iterator() as queue_iter:
  86. async for message in queue_iter:
  87. async with message.process():
  88. self.statu_callback(ex_name,key,message.body.decode())
  89. if self._closing==True:
  90. return
  91. async def send(self):
  92. while self._closing==False:
  93. if self._publish_msg_queue.qsize()>0:
  94. msg_bag=self._publish_msg_queue.get(False)
  95. if not msg_bag==None:
  96. ex_name,key,msg=msg_bag
  97. ex= await self._channel_send.get_exchange(ex_name)
  98. await ex.publish(aio_pika.Message(body=msg.encode()),routing_key=key)
  99. await asyncio.sleep(0)
  100. #time.sleep(0.001)
  101. async def main(self):
  102. await self.init()
  103. tasks=[]
  104. tasks.append(self.send())
  105. if not self._consumer_callbacks==None:
  106. for queue,callback in self._queue_callbacks.items():
  107. tasks.append(self.recv(queue,callback))
  108. if not self._recv_status==None:
  109. for ex_name,key in self._recv_status:
  110. tasks.append(self.recv_statu(ex_name,key))
  111. await asyncio.gather(*tasks)
  112. def run(self):
  113. asyncio.run(self.main())