123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201 |
- import json
- import message_pb2 as message
- import google.protobuf.text_format as tf
- import queue
- import time
- import asyncio
- import async_communication as ASC
- import threading
- '''
- 单片机状态,2表示门已经关闭
- '''
- class ExportIO(threading.Thread):
- def __init__(self,parameter,rabbitmq_parameter):
- threading.Thread.__init__(self)
- self._unit,self._id,self._ip,self._port=parameter
- self._close=False
- self._latest_iomsg=None
- self._last_user_table = None
- self._send_queue=queue.Queue()
- self._last_statucall_time=time.time()
- self._user_leave_callback=None
- self._last_table=None
- self._open_door_flag = False
- self._rabbit_mq=ASC.RabbitAsyncCommunicator(rabbitmq_parameter["ip"],rabbitmq_parameter["port"],
- rabbitmq_parameter['user'],rabbitmq_parameter['password'])
- #消费关门消息
- self._rabbit_mq.Init(None, None)
- self._rabbit_mq.start()
- #数据库重试的次数, 超过3次就重启数据库梁
- self._retry_count = 0
- def SetUserLeaveCallback(self,callback):
- self._user_leave_callback=callback
- def close(self):
- self._close=True
- self._rabbit_mq.close()
- self.join()
- def export_idle(self):
- if self._latest_iomsg==None:
- return False
- else:
- return self._latest_iomsg.outside_safety==1
- #open_door 函数,负责修改开门任务标记,只要数据库查到了,就把_open_door_flag修改为true
- #open_door 函数被node_py的主函数调用,不要做太多的事,剩余的开门控制和数据库清理,交给open_door_loop
- def open_door(self,table):
- self._open_door_flag = True
-
- return
- async def open_door_loop(self):
- while self._close==False:
- try:
- if (self._latest_iomsg is not None) and self._open_door_flag == True:
- #门不是开状态,那么就发送开门,无论里面是否有车
- if self._latest_iomsg.door_statu != 2:
- print(" 出口编号:%d , 执行开门 " % (self._id))
- opencmd = "{\"TerminalID\": %d, \"DispatchDirection\": 2, \"OutPutDo\": {\"Do0\": 0, \"Do1\": 0, \"Do2\": 0,\"Do3\": 0,\"Do4\": 0, \"Do5\": 0, \"Do6\": 0, \"Do7\": 0}, \"ProcessControl\": 4}" % (
- self._id - 1)
- print(opencmd)
- self._send_queue.put(opencmd)
- print(" opendoor queue size:%d"%self._send_queue.qsize())
- await asyncio.sleep(3)
- #如果门开到位,则检查汽车是否离开,如果没车就删库,清除flag, 注意了, 在汽车离开3秒后,由单片机底层自动关门,我们不用管
- elif self._latest_iomsg.outside_safety == 1:
- if not self._user_leave_callback == None:
- self._user_leave_callback(self._id)
- self._open_door_flag = False
- self._retry_count = self._retry_count + 1
- print(" _retry_count = %d"%self._retry_count)
- else:#else 无限等待,什么也不做, 等待汽车离开
- self._retry_count = 0
- except:
- pass
- await asyncio.sleep(0)
- async def connect(self):
- print("Trying to connect {}:{}".format(self._ip, self._port))
- while self._close == False:
- try:
- reader, writer = await asyncio.open_connection(
- self._ip, self._port
- )
- break
- except Exception as e:
- print("Failed to connect {} {}: {}".format(self._ip, self._port, e))
- await asyncio.sleep(3)
- print("Connected to {}:{}".format(self._ip, self._port))
- return reader, writer
- async def recv_loop(self):
- # if (self._id == 5):
- # print(" test 1 ")
- while self._close==False:
- # if (self._id == 5):
- # print(" test 2 ")
- try:
- recieve=await self._reader.readuntil(b'$')
- if (self._id == 5):
- print(" 00 recieve = " + str(recieve))
- except Exception as e:
- print("self._reader.readuntil(b'$') error")
- print(" e: {}".format(e))
- self._reader,self._writer=await self.connect()
- await asyncio.sleep(0.001)
- continue
- #socket.ConnectionResetError:
- # print(recieve)
- start_index = recieve.find(b'@')
- end_index = recieve.find(b'$')
- if start_index >= 0 and end_index >= 0 and end_index >=start_index:
- bytes = recieve[start_index + 4:end_index]
- self._latest_iomsg =self.recieve2message(bytes)
- if (not self._rabbit_mq == None) and (not self._latest_iomsg == None):
- if self._latest_iomsg.outside_safety == 0 or self._latest_iomsg.door_statu == 0:
- print("self._latest_iomsg error")
- continue
- else:
- if (self._id == 5):
- print(" 11 recieve = " + str(recieve))
- ex_name = "statu_ex"
- key = "out_mcpu_%d_statu_port" % self._id
- self._rabbit_mq.publish(ex_name, key, tf.MessageToString(self._latest_iomsg, as_utf8=True))
- else:
- print("self._rabbit_mq")
- print(self._rabbit_mq)
- print("self._latest_iomsg")
- print(self._latest_iomsg)
- else:
- print(" start_index = %s"%start_index)
- print("end_index = %s"%end_index)
- print("WARNING : mcpu recieve format error , time = %s"%time.time())
- print(" 22 recieve = " + str(recieve))
- await asyncio.sleep(0)
- continue
- await asyncio.sleep(0)
- async def send_loop(self):
- while self._close == False:
- try:
- if self._send_queue.qsize() > 0:
- print("给单片机发送指令 size:%d" % self._send_queue.qsize())
- msg = self._send_queue.get(False)
- if not msg == None:
- self._writer.write(msg.encode())
- await self._writer.drain()
- except Exception as e:
- print(" send error :{}".format(e))
- self._reader, self._writer = await self.connect()
- await asyncio.sleep(0)
- time.sleep(0.001)
- async def main(self):
- self._reader,self._writer=await self.connect()
- await asyncio.gather(self.recv_loop(),self.send_loop(),self.open_door_loop())
- def run(self):
- asyncio.run(self.main())
- def recieve2message(self,bytes):
- try:
- data=json.loads(bytes)
- except:
- print("tf json load error ")
- return None
- statu_msg=message.out_mcpu_statu()
- if 'InsideExistenceFlag' in data.keys():
- statu_msg.outside_safety = data['InsideExistenceFlag'] + 1
- else:
- print("statu_msg.outside_safety not live")
- print(bytes)
- statu_msg.outside_safety = 0
- if 'OutsideDoorStatus' in data.keys():
- statu_msg.door_statu = data["OutsideDoorStatus"] + 1 # 1 k 2g 3 running
- else:
- print("statu_msg.door_statu not live ")
- print(bytes)
- statu_msg.door_statu = 0
- return statu_msg
|