123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149 |
- import json
- import message_pb2 as message
- import google.protobuf.text_format as tf
- import queue
- import time
- import asyncio
- import async_communication as ASC
- import threading
- '''
- 单片机状态,2表示门已经关闭
- '''
- class EntranceIO(threading.Thread):
- def __init__(self,parameter,rabbitmq_parameter):
- threading.Thread.__init__(self)
- self._unit,self._id,self._ip,self._port=parameter
- self._close=False
- self._latest_iomsg=None
- self._send_queue=queue.Queue()
- self._statu_send_freq=50
- self._last_statucall_time=time.time()
- self._rabbit_mq=ASC.RabbitAsyncCommunicator(rabbitmq_parameter["ip"],rabbitmq_parameter["port"],
- rabbitmq_parameter['user'],rabbitmq_parameter['password'])
- self._close_door_cmd_queue=queue.Queue()
- #消费关门消息
- self._rabbit_mq.Init([["close_door_%d_queue"%self._id, self.close_door]], None)
- self._rabbit_mq.start()
- def close(self):
- self._close=True
- self._rabbit_mq.close()
- self.join()
- def close_door(self,msg):
- table = message.park_table()
- try:
- tf.Parse(msg, table)
- except:
- print("收到关门指令格式错误")
- return
- if table.statu.execute_statu == message.eError:
- print("单片机节点:收到消息,指令错误,不关门")
- return
- self._close_door_cmd_queue.put("close")
- async def connect(self):
- print("Trying to connect {}:{}".format(self._ip, self._port))
- while self._close==False:
- try:
- reader, writer = await asyncio.open_connection(
- self._ip, self._port
- )
- break
- except Exception as e:
- print("Failed to connect {} {}: {}".format(self._ip, self._port, e))
- await asyncio.sleep(3)
- print("Connected to {}:{}".format(self._ip, self._port))
- return reader, writer
- async def close_door_loop(self):
- while self._close==False:
- try:
- if self._close_door_cmd_queue.qsize() > 0:
- msg = self._close_door_cmd_queue.get(False)
- print(" 入口:%d 开始关门 " % (self._id))
- closecmd = "{\"TerminalID\": %d, \"DispatchDirection\": 2, \"OutPutDo\": {\"Do0\": 0, \"Do1\": 0, \"Do2\": 0,\
- \"Do3\": 0,\"Do4\": 0, \"Do5\": 0, \"Do6\": 0, \"Do7\": 0}, \"ProcessControl\": 3}" % (
- self._id - 1)
- while self._close == False:
- self._send_queue.put(closecmd)
- # 等待知道门开到位
- if not self._latest_iomsg == None:
- if self._latest_iomsg.door_statu == 2 + 1: # 状态值3表示关门
- break
- await asyncio.sleep(3)
- print(" 入口 %d 关门完成" % (self._id))
- except:
- pass
- await asyncio.sleep(0)
- async def recv_loop(self):
- while self._close==False:
- # print("iiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii")
- try:
- recieve=await self._reader.readuntil(b'$')
- except Exception as e:
- print(" e: {}".format(e))
- self._reader,self._writer=await self.connect()
- continue
- at=recieve.find(b'@')
- if at>=0:
- bytes=recieve[at+4:-1]
- self._latest_iomsg=self.recieve2message(bytes)
- if (not self._rabbit_mq==None) and (not self._latest_iomsg==None):
- ex_name = "statu_ex"
- key = "in_mcpu_%d_statu_port" % self._id
- self._rabbit_mq.publish(ex_name,key,tf.MessageToString(self._latest_iomsg,as_utf8=True))
- else:
- print("%s WARNING : mcpu recieve format error"%time.time())
- print(len(recieve),recieve)
- await asyncio.sleep(0)
- async def send_loop(self):
- while self._close == False:
- try:
- if self._send_queue.qsize() > 0:
- print("发送关门指令 size:%d" % self._send_queue.qsize())
- msg = self._send_queue.get(False)
- if not msg == None:
- self._writer.write(msg.encode())
- await self._writer.drain()
- except Exception as e:
- print(" send error :{}".format(e))
- self._reader, self._writer = await self.connect()
- await asyncio.sleep(0)
- time.sleep(0.001)
- async def main(self):
- self._reader,self._writer=await self.connect()
- await asyncio.gather(self.recv_loop(),self.send_loop(),self.close_door_loop())
- def run(self):
- asyncio.run(self.main())
- def recieve2message(self,bytes):
- try:
- data=json.loads(bytes.decode())
- except:
- return None
- statu_msg=message.in_mcpu_statu()
- if 'OutsideDoorStatus' in data.keys():
- statu_msg.door_statu=data["OutsideDoorStatus"]+1
- if 'InPutDi' in data.keys():
- if "Di1" in data["InPutDi"].keys():
- statu_msg.back_io=data["InPutDi"]['Di1']+1
- if 'InsideExistenceFlag' in data.keys():
- statu_msg.is_occupy=data["InsideExistenceFlag"]+1
- if 'CarHeightStatusCurrent' in data.keys():
- statu_msg.heighth=data["CarHeightStatusCurrent"]+1
- # if self._id==4:
- # #print(bytes)
- # print(time.time(),data["CarHeightStatusCurrent"],statu_msg.heighth)
- return statu_msg
|