ExportIO.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. import json
  2. import message_pb2 as message
  3. import google.protobuf.text_format as tf
  4. import queue
  5. import time
  6. import asyncio
  7. import async_communication as ASC
  8. import threading
  9. '''
  10. 单片机状态,2表示门已经关闭
  11. '''
  12. class ExportIO(threading.Thread):
  13. def __init__(self,parameter,rabbitmq_parameter):
  14. threading.Thread.__init__(self)
  15. self._unit,self._id,self._ip,self._port=parameter
  16. self._close=False
  17. self._latest_iomsg=None
  18. self._last_user_table = None
  19. self._send_queue=queue.Queue()
  20. self._last_statucall_time=time.time()
  21. self._user_leave_callback=None
  22. self._last_table=None
  23. self._open_door_flag = False
  24. self._rabbit_mq=ASC.RabbitAsyncCommunicator(rabbitmq_parameter["ip"],rabbitmq_parameter["port"],
  25. rabbitmq_parameter['user'],rabbitmq_parameter['password'])
  26. #消费关门消息
  27. self._rabbit_mq.Init(None, None)
  28. self._rabbit_mq.start()
  29. def SetUserLeaveCallback(self,callback):
  30. self._user_leave_callback=callback
  31. def close(self):
  32. self._close=True
  33. self._rabbit_mq.close()
  34. self.join()
  35. def export_idle(self):
  36. if self._latest_iomsg==None:
  37. return False
  38. else:
  39. return self._latest_iomsg.outside_safety==1
  40. #open_door 函数,负责修改开门任务标记,只要数据库查到了,就把_open_door_flag修改为true
  41. #open_door 函数被node_py的主函数调用,不要做太多的事,剩余的开门控制和数据库清理,交给open_door_loop
  42. def open_door(self,table):
  43. self._open_door_flag = True
  44. return
  45. async def open_door_loop(self):
  46. while self._close==False:
  47. try:
  48. if (self._latest_iomsg is not None) and self._open_door_flag == True:
  49. #门不是开状态,那么就发送开门,无论里面是否有车
  50. if self._latest_iomsg.door_statu != 2:
  51. print(" 出口编号:%d , 执行开门 " % (self._id))
  52. opencmd = "{\"TerminalID\": %d, \"DispatchDirection\": 2, \"OutPutDo\": {\"Do0\": 0, \"Do1\": 0, \"Do2\": 0,\"Do3\": 0,\"Do4\": 0, \"Do5\": 0, \"Do6\": 0, \"Do7\": 0}, \"ProcessControl\": 4}" % (
  53. self._id - 1)
  54. print(opencmd)
  55. self._send_queue.put(opencmd)
  56. print(" opendoor queue size:%d"%self._send_queue.qsize())
  57. await asyncio.sleep(3)
  58. #如果门开到位,则检查汽车是否离开,如果没车就删库,清除flag, 注意了, 在汽车离开3秒后,由单片机底层自动关门,我们不用管
  59. elif self._latest_iomsg.outside_safety == 1:
  60. if not self._user_leave_callback == None:
  61. self._user_leave_callback(self._id)
  62. self._open_door_flag = False
  63. #else 无限等待,什么也不做, 等待汽车离开
  64. except:
  65. pass
  66. await asyncio.sleep(0)
  67. async def connect(self):
  68. print("Trying to connect {}:{}".format(self._ip, self._port))
  69. while self._close == False:
  70. try:
  71. reader, writer = await asyncio.open_connection(
  72. self._ip, self._port
  73. )
  74. break
  75. except Exception as e:
  76. print("Failed to connect {} {}: {}".format(self._ip, self._port, e))
  77. await asyncio.sleep(3)
  78. print("Connected to {}:{}".format(self._ip, self._port))
  79. return reader, writer
  80. async def recv_loop(self):
  81. while self._close==False:
  82. try:
  83. recieve=await self._reader.readuntil(b'$')
  84. except Exception as e:
  85. print(" e: {}".format(e))
  86. self._reader,self._writer=await self.connect()
  87. await asyncio.sleep(0.001)
  88. continue
  89. #socket.ConnectionResetError:
  90. # print(recieve)
  91. start_index = recieve.find(b'@')
  92. end_index = recieve.find(b'$')
  93. if start_index >= 0 and end_index >= 0 and end_index >=start_index:
  94. bytes = recieve[start_index + 4:end_index]
  95. self._latest_iomsg =self.recieve2message(bytes)
  96. if (not self._rabbit_mq == None) and (not self._latest_iomsg == None):
  97. if self._latest_iomsg.outside_safety == 0 or self._latest_iomsg.door_statu == 0:
  98. continue
  99. else:
  100. ex_name = "statu_ex"
  101. key = "out_mcpu_%d_statu_port" % self._id
  102. self._rabbit_mq.publish(ex_name, key, tf.MessageToString(self._latest_iomsg, as_utf8=True))
  103. else:
  104. print(" start_index = %s"%start_index)
  105. print("end_index = %s"%end_index)
  106. print("WARNING : mcpu recieve format error , time = %s"%time.time())
  107. print(" recieve = " + str(recieve))
  108. await asyncio.sleep(0)
  109. continue
  110. await asyncio.sleep(0)
  111. async def send_loop(self):
  112. while self._close == False:
  113. try:
  114. if self._send_queue.qsize() > 0:
  115. print("给单片机发送指令 size:%d" % self._send_queue.qsize())
  116. msg = self._send_queue.get(False)
  117. if not msg == None:
  118. self._writer.write(msg.encode())
  119. await self._writer.drain()
  120. except Exception as e:
  121. print(" send error :{}".format(e))
  122. self._reader, self._writer = await self.connect()
  123. await asyncio.sleep(0)
  124. time.sleep(0.001)
  125. async def main(self):
  126. self._reader,self._writer=await self.connect()
  127. await asyncio.gather(self.recv_loop(),self.send_loop(),self.open_door_loop())
  128. def run(self):
  129. asyncio.run(self.main())
  130. def recieve2message(self,bytes):
  131. try:
  132. data=json.loads(bytes)
  133. except:
  134. print("tf json load error ")
  135. return None
  136. statu_msg=message.out_mcpu_statu()
  137. if 'InsideExistenceFlag' in data.keys():
  138. statu_msg.outside_safety = data['InsideExistenceFlag'] + 1
  139. else:
  140. statu_msg.outside_safety = 0
  141. if 'OutsideDoorStatus' in data.keys():
  142. statu_msg.door_statu = data["OutsideDoorStatus"] + 1 # 1 k 2g 3 running
  143. else:
  144. statu_msg.door_statu = 0
  145. return statu_msg