EntranceIO.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. import json
  2. import message_pb2 as message
  3. import google.protobuf.text_format as tf
  4. import queue
  5. import time
  6. import asyncio
  7. import async_communication as ASC
  8. import threading
  9. '''
  10. 单片机状态,2表示门已经关闭
  11. '''
  12. class EntranceIO(threading.Thread):
  13. def __init__(self,parameter,rabbitmq_parameter):
  14. threading.Thread.__init__(self)
  15. self._unit,self._id,self._ip,self._port=parameter
  16. self._close=False
  17. self._latest_iomsg=None
  18. self._send_queue=queue.Queue()
  19. self._statu_send_freq=50
  20. self._last_statucall_time=time.time()
  21. self._rabbit_mq=ASC.RabbitAsyncCommunicator(rabbitmq_parameter["ip"],rabbitmq_parameter["port"],
  22. rabbitmq_parameter['user'],rabbitmq_parameter['password'])
  23. self._close_door_cmd_queue=queue.Queue()
  24. #消费关门消息
  25. self._rabbit_mq.Init([["close_door_%d_queue"%self._id, self.close_door]], None)
  26. self._rabbit_mq.start()
  27. def close(self):
  28. self._close=True
  29. self._rabbit_mq.close()
  30. self.join()
  31. def close_door(self,msg):
  32. table = message.park_table()
  33. try:
  34. tf.Parse(msg, table)
  35. except:
  36. print("收到关门指令格式错误")
  37. return
  38. if table.statu.execute_statu == message.eError:
  39. print("单片机节点:收到消息,指令错误,不关门")
  40. return
  41. self._close_door_cmd_queue.put("close")
  42. async def connect(self):
  43. print("Trying to connect {}:{}".format(self._ip, self._port))
  44. while self._close==False:
  45. try:
  46. reader, writer = await asyncio.open_connection(
  47. self._ip, self._port
  48. )
  49. break
  50. except Exception as e:
  51. print("Failed to connect {} {}: {}".format(self._ip, self._port, e))
  52. await asyncio.sleep(3)
  53. print("Connected to {}:{}".format(self._ip, self._port))
  54. return reader, writer
  55. async def close_door_loop(self):
  56. while self._close==False:
  57. try:
  58. if self._close_door_cmd_queue.qsize() > 0:
  59. msg = self._close_door_cmd_queue.get(False)
  60. print(" 入口:%d 开始关门 " % (self._id))
  61. closecmd = "{\"TerminalID\": %d, \"DispatchDirection\": 2, \"OutPutDo\": {\"Do0\": 0, \"Do1\": 0, \"Do2\": 0,\
  62. \"Do3\": 0,\"Do4\": 0, \"Do5\": 0, \"Do6\": 0, \"Do7\": 0}, \"ProcessControl\": 3}" % (
  63. self._id - 1)
  64. while self._close == False:
  65. self._send_queue.put(closecmd)
  66. # 等待知道门开到位
  67. if not self._latest_iomsg == None:
  68. if self._latest_iomsg.door_statu == 2 + 1: # 状态值3表示关门
  69. break
  70. await asyncio.sleep(3)
  71. print(" 入口 %d 关门完成" % (self._id))
  72. except:
  73. pass
  74. await asyncio.sleep(0)
  75. async def recv_loop(self):
  76. while self._close==False:
  77. # print("iiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii")
  78. try:
  79. recieve=await self._reader.readuntil(b'$')
  80. except Exception as e:
  81. print(" e: {}".format(e))
  82. self._reader,self._writer=await self.connect()
  83. continue
  84. at=recieve.find(b'@')
  85. if at>=0:
  86. bytes=recieve[at+4:-1]
  87. self._latest_iomsg=self.recieve2message(bytes)
  88. if (not self._rabbit_mq==None) and (not self._latest_iomsg==None):
  89. ex_name = "statu_ex"
  90. key = "in_mcpu_%d_statu_port" % self._id
  91. self._rabbit_mq.publish(ex_name,key,tf.MessageToString(self._latest_iomsg,as_utf8=True))
  92. else:
  93. print("%s WARNING : mcpu recieve format error"%time.time())
  94. print(len(recieve),recieve)
  95. await asyncio.sleep(0)
  96. async def send_loop(self):
  97. while self._close == False:
  98. try:
  99. if self._send_queue.qsize() > 0:
  100. print("发送关门指令 size:%d" % self._send_queue.qsize())
  101. msg = self._send_queue.get(False)
  102. if not msg == None:
  103. self._writer.write(msg.encode())
  104. await self._writer.drain()
  105. except Exception as e:
  106. print(" send error :{}".format(e))
  107. self._reader, self._writer = await self.connect()
  108. await asyncio.sleep(0)
  109. time.sleep(0.001)
  110. async def main(self):
  111. self._reader,self._writer=await self.connect()
  112. await asyncio.gather(self.recv_loop(),self.send_loop(),self.close_door_loop())
  113. def run(self):
  114. asyncio.run(self.main())
  115. def recieve2message(self,bytes):
  116. try:
  117. data=json.loads(bytes.decode())
  118. except:
  119. return None
  120. statu_msg=message.in_mcpu_statu()
  121. if 'OutsideDoorStatus' in data.keys():
  122. statu_msg.door_statu=data["OutsideDoorStatus"]+1
  123. if 'InPutDi' in data.keys():
  124. if "Di1" in data["InPutDi"].keys():
  125. statu_msg.back_io=data["InPutDi"]['Di1']+1
  126. if 'InsideExistenceFlag' in data.keys():
  127. statu_msg.is_occupy=data["InsideExistenceFlag"]+1
  128. if 'CarHeightStatusCurrent' in data.keys():
  129. statu_msg.heighth=data["CarHeightStatusCurrent"]+1
  130. # if self._id==4:
  131. # #print(bytes)
  132. # print(time.time(),data["CarHeightStatusCurrent"],statu_msg.heighth)
  133. return statu_msg