import json import message_pb2 as message import google.protobuf.text_format as tf import queue import time import asyncio import async_communication as ASC import threading ''' 单片机状态,2表示门已经关闭 ''' class ExportIO(threading.Thread): def __init__(self,parameter,rabbitmq_parameter): threading.Thread.__init__(self) self._unit,self._id,self._ip,self._port=parameter self._close=False self._latest_iomsg=None self._last_user_table = None self._send_queue=queue.Queue() self._last_statucall_time=time.time() self._user_leave_callback=None self._last_table=None self._open_door_flag = False self._rabbit_mq=ASC.RabbitAsyncCommunicator(rabbitmq_parameter["ip"],rabbitmq_parameter["port"], rabbitmq_parameter['user'],rabbitmq_parameter['password']) #消费关门消息 self._rabbit_mq.Init(None, None) self._rabbit_mq.start() #数据库重试的次数, 超过3次就重启数据库梁 self._retry_count = 0 def SetUserLeaveCallback(self,callback): self._user_leave_callback=callback def close(self): self._close=True self._rabbit_mq.close() self.join() def export_idle(self): if self._latest_iomsg==None: return False else: return self._latest_iomsg.outside_safety==1 #open_door 函数,负责修改开门任务标记,只要数据库查到了,就把_open_door_flag修改为true #open_door 函数被node_py的主函数调用,不要做太多的事,剩余的开门控制和数据库清理,交给open_door_loop def open_door(self,table): self._open_door_flag = True return async def open_door_loop(self): while self._close==False: try: if (self._latest_iomsg is not None) and self._open_door_flag == True: #门不是开状态,那么就发送开门,无论里面是否有车 if self._latest_iomsg.door_statu != 2: print(" 出口编号:%d , 执行开门 " % (self._id)) opencmd = "{\"TerminalID\": %d, \"DispatchDirection\": 2, \"OutPutDo\": {\"Do0\": 0, \"Do1\": 0, \"Do2\": 0,\"Do3\": 0,\"Do4\": 0, \"Do5\": 0, \"Do6\": 0, \"Do7\": 0}, \"ProcessControl\": 4}" % ( self._id - 1) print(opencmd) self._send_queue.put(opencmd) print(" opendoor queue size:%d"%self._send_queue.qsize()) await asyncio.sleep(3) #如果门开到位,则检查汽车是否离开,如果没车就删库,清除flag, 注意了, 在汽车离开3秒后,由单片机底层自动关门,我们不用管 elif self._latest_iomsg.outside_safety == 1: if not self._user_leave_callback == None: self._user_leave_callback(self._id) self._open_door_flag = False self._retry_count = self._retry_count + 1 print(" _retry_count = %d"%self._retry_count) else:#else 无限等待,什么也不做, 等待汽车离开 self._retry_count = 0 except: pass await asyncio.sleep(0) async def connect(self): print("Trying to connect {}:{}".format(self._ip, self._port)) while self._close == False: try: reader, writer = await asyncio.open_connection( self._ip, self._port ) break except Exception as e: print("Failed to connect {} {}: {}".format(self._ip, self._port, e)) await asyncio.sleep(3) print("Connected to {}:{}".format(self._ip, self._port)) return reader, writer async def recv_loop(self): # if (self._id == 5): # print(" test 1 ") while self._close==False: # if (self._id == 5): # print(" test 2 ") try: recieve=await self._reader.readuntil(b'$') if (self._id == 5): print(" 00 recieve = " + str(recieve)) except Exception as e: print("self._reader.readuntil(b'$') error") print(" e: {}".format(e)) self._reader,self._writer=await self.connect() await asyncio.sleep(0.001) continue #socket.ConnectionResetError: # print(recieve) start_index = recieve.find(b'@') end_index = recieve.find(b'$') if start_index >= 0 and end_index >= 0 and end_index >=start_index: bytes = recieve[start_index + 4:end_index] self._latest_iomsg =self.recieve2message(bytes) if (not self._rabbit_mq == None) and (not self._latest_iomsg == None): if self._latest_iomsg.outside_safety == 0 or self._latest_iomsg.door_statu == 0: print("self._latest_iomsg error") continue else: if (self._id == 5): print(" 11 recieve = " + str(recieve)) ex_name = "statu_ex" key = "out_mcpu_%d_statu_port" % self._id self._rabbit_mq.publish(ex_name, key, tf.MessageToString(self._latest_iomsg, as_utf8=True)) else: print("self._rabbit_mq") print(self._rabbit_mq) print("self._latest_iomsg") print(self._latest_iomsg) else: print(" start_index = %s"%start_index) print("end_index = %s"%end_index) print("WARNING : mcpu recieve format error , time = %s"%time.time()) print(" 22 recieve = " + str(recieve)) await asyncio.sleep(0) continue await asyncio.sleep(0) async def send_loop(self): while self._close == False: try: if self._send_queue.qsize() > 0: print("给单片机发送指令 size:%d" % self._send_queue.qsize()) msg = self._send_queue.get(False) if not msg == None: self._writer.write(msg.encode()) await self._writer.drain() except Exception as e: print(" send error :{}".format(e)) self._reader, self._writer = await self.connect() await asyncio.sleep(0) time.sleep(0.001) async def main(self): self._reader,self._writer=await self.connect() await asyncio.gather(self.recv_loop(),self.send_loop(),self.open_door_loop()) def run(self): asyncio.run(self.main()) def recieve2message(self,bytes): try: data=json.loads(bytes) except: print("tf json load error ") return None statu_msg=message.out_mcpu_statu() if 'InsideExistenceFlag' in data.keys(): statu_msg.outside_safety = data['InsideExistenceFlag'] + 1 else: print("statu_msg.outside_safety not live") print(bytes) statu_msg.outside_safety = 0 if 'OutsideDoorStatus' in data.keys(): statu_msg.door_statu = data["OutsideDoorStatus"] + 1 # 1 k 2g 3 running else: print("statu_msg.door_statu not live ") print(bytes) statu_msg.door_statu = 0 return statu_msg