import json import message_pb2 as message import google.protobuf.text_format as tf import queue import time import asyncio import async_communication as ASC import threading ''' 单片机状态,2表示门已经关闭 ''' class EntranceIO(threading.Thread): def __init__(self,parameter,rabbitmq_parameter): threading.Thread.__init__(self) self._unit,self._id,self._ip,self._port=parameter self._close=False self._latest_iomsg=None self._send_queue=queue.Queue() self._statu_send_freq=50 self._last_statucall_time=time.time() self._rabbit_mq=ASC.RabbitAsyncCommunicator(rabbitmq_parameter["ip"],rabbitmq_parameter["port"], rabbitmq_parameter['user'],rabbitmq_parameter['password']) self._close_door_cmd_queue=queue.Queue() #消费关门消息 self._rabbit_mq.Init([["close_door_%d_queue"%self._id, self.close_door]], None) self._rabbit_mq.start() def close(self): self._close=True self._rabbit_mq.close() self.join() def close_door(self,msg): table = message.park_table() try: tf.Parse(msg, table) except: print("收到关门指令格式错误") return if table.statu.execute_statu == message.eError: print("单片机节点:收到消息,指令错误,不关门") return self._close_door_cmd_queue.put("close") async def connect(self): print("Trying to connect {}:{}".format(self._ip, self._port)) while self._close==False: try: reader, writer = await asyncio.open_connection( self._ip, self._port ) break except Exception as e: print("Failed to connect {} {}: {}".format(self._ip, self._port, e)) await asyncio.sleep(3) print("Connected to {}:{}".format(self._ip, self._port)) return reader, writer async def close_door_loop(self): while self._close==False: try: if self._close_door_cmd_queue.qsize() > 0: msg = self._close_door_cmd_queue.get(False) print(" 入口:%d 开始关门 " % (self._id)) closecmd = "{\"TerminalID\": %d, \"DispatchDirection\": 2, \"OutPutDo\": {\"Do0\": 0, \"Do1\": 0, \"Do2\": 0,\ \"Do3\": 0,\"Do4\": 0, \"Do5\": 0, \"Do6\": 0, \"Do7\": 0}, \"ProcessControl\": 3}" % ( self._id - 1) while self._close == False: self._send_queue.put(closecmd) # 等待知道门开到位 if not self._latest_iomsg == None: if self._latest_iomsg.door_statu == 2 + 1: # 状态值3表示关门 break await asyncio.sleep(3) print(" 入口 %d 关门完成" % (self._id)) except: pass await asyncio.sleep(0) async def recv_loop(self): while self._close==False: # print("iiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii") try: recieve=await self._reader.readuntil(b'$') except Exception as e: print(" e: {}".format(e)) self._reader,self._writer=await self.connect() continue at=recieve.find(b'@') if at>=0: bytes=recieve[at+4:-1] self._latest_iomsg=self.recieve2message(bytes) if (not self._rabbit_mq==None) and (not self._latest_iomsg==None): ex_name = "statu_ex" key = "in_mcpu_%d_statu_port" % self._id self._rabbit_mq.publish(ex_name,key,tf.MessageToString(self._latest_iomsg,as_utf8=True)) else: print("%s WARNING : mcpu recieve format error"%time.time()) print(len(recieve),recieve) await asyncio.sleep(0) async def send_loop(self): while self._close == False: try: if self._send_queue.qsize() > 0: print("发送关门指令 size:%d" % self._send_queue.qsize()) msg = self._send_queue.get(False) if not msg == None: self._writer.write(msg.encode()) await self._writer.drain() except Exception as e: print(" send error :{}".format(e)) self._reader, self._writer = await self.connect() await asyncio.sleep(0) time.sleep(0.001) async def main(self): self._reader,self._writer=await self.connect() await asyncio.gather(self.recv_loop(),self.send_loop(),self.close_door_loop()) def run(self): asyncio.run(self.main()) def recieve2message(self,bytes): try: data=json.loads(bytes.decode()) except: return None statu_msg=message.in_mcpu_statu() if 'OutsideDoorStatus' in data.keys(): statu_msg.door_statu=data["OutsideDoorStatus"]+1 if 'InPutDi' in data.keys(): if "Di1" in data["InPutDi"].keys(): statu_msg.back_io=data["InPutDi"]['Di1']+1 if 'InsideExistenceFlag' in data.keys(): statu_msg.is_occupy=data["InsideExistenceFlag"]+1 if 'CarHeightStatusCurrent' in data.keys(): statu_msg.heighth=data["CarHeightStatusCurrent"]+1 # if self._id==4: # #print(bytes) # print(time.time(),data["CarHeightStatusCurrent"],statu_msg.heighth) return statu_msg