|
@@ -86,7 +86,7 @@ class RabbitAsyncCommunicator(threading.Thread):
|
|
|
await message.ack()
|
|
|
if self._closing==True:
|
|
|
return
|
|
|
- async def recv_statu(self,ex_name,key,ttl=1000):
|
|
|
+ async def recv_statu(self,ex_name,key,ttl=200):
|
|
|
statu_ex=await self._channel_statu.get_exchange(ex_name)
|
|
|
arg={}
|
|
|
arg["x-message-ttl"]=ttl
|
|
@@ -100,22 +100,18 @@ class RabbitAsyncCommunicator(threading.Thread):
|
|
|
return
|
|
|
|
|
|
async def send(self):
|
|
|
- ex_map={}
|
|
|
while self._closing==False:
|
|
|
if self._publish_msg_queue.qsize()>0:
|
|
|
try:
|
|
|
msg_bag = self._publish_msg_queue.get(False)
|
|
|
if not msg_bag == None:
|
|
|
ex_name, key, msg = msg_bag
|
|
|
- ex=ex_map.get(ex_name)
|
|
|
- if ex == None:
|
|
|
- ex = await self._channel_send.get_exchange(ex_name)
|
|
|
- ex_map[ex_name]=ex
|
|
|
+ ex = await self._channel_send.get_exchange(ex_name)
|
|
|
await ex.publish(aio_pika.Message(body=msg.encode()), routing_key=key)
|
|
|
except:
|
|
|
await asyncio.sleep(1)
|
|
|
- await asyncio.sleep(0)
|
|
|
- time.sleep(0.001)
|
|
|
+ await asyncio.sleep(0.001)
|
|
|
+ #time.sleep(0.001)
|
|
|
|
|
|
|
|
|
async def main(self):
|