import datetime import threading import time import async_communication as CM import message_pb2 as message import google.protobuf.text_format as tf import asyncio MeasureStatu={"ok":0,"无数据":1,"噪声":2,"超界":3} ArrowType={0:"正确图片",0x01:"向后调整",0x02:"向前调整",0x04:"向右调整",0x08:"向左调整" ,0x10:"左前调整",0x06:"右前调整",0x09:"右后调整",0x05:"左后调整"} class EntranceChecker(threading.Thread): def __init__(self): threading.Thread.__init__(self) self.icpu_statu = CM.TimeStatu(timeout=0.1) self.measure_statu = CM.TimeStatu(timeout=0.1) self.last_door_statu=None self.last_moving_statu = None self.last_show = None self.lock = threading.Lock() self.error_str="" self.is_close = False def receive_icpu(self,statu): self.icpu_statu=statu def receive_measureInfo(self,statu): self.measure_statu=statu async def exit_isOK(self, pick_table): if pick_table.primary_key is None: pick_table.statu.execute_statu = message.eError pick_table.statu.statu_description = " 唯一码不能为空!" return tf.MessageToString(pick_table, as_utf8=True) return tf.MessageToString(pick_table, as_utf8=True) async def entrance_isOK(self,park_table): park = message.park_table() park.CopyFrom(park_table) measure_info = message.measure_info() tf.Parse(self.measure_statu.statu, measure_info) tm = time.time() if self.error_str == 'OK': while time.time() - tm < 2: if measure_info.border_statu == MeasureStatu["ok"] and measure_info.ground_status == 0: park.statu.execute_statu = message.eNormal park.statu.statu_description = self.error_str park.entrance_measure_info.CopyFrom(measure_info) im_mcpu_statu = message.in_mcpu_statu() tf.Parse(self.icpu_statu.statu, im_mcpu_statu) if im_mcpu_statu.heighth == 2: park.entrance_measure_info.height = 1.50 elif im_mcpu_statu.heighth == 3: park.entrance_measure_info.height = 1.70 elif im_mcpu_statu.heighth == 4: park.entrance_measure_info.height = 1.90 return tf.MessageToString(park, as_utf8=True) else: await asyncio.sleep(0.1) #time.sleep(0.1) if self.measure_statu.statu is not None and self.measure_statu.timeout() is False: try: measure_info = message.measure_info() tf.Parse(self.measure_statu.statu, measure_info) print("check measure info again:",measure_info) except: print("parse exception:\n"+self.measure_statu.statu) print(measure_info) park.statu.execute_statu = message.eError park.statu.statu_description = "请检查入口处是否有人员逗留!" return tf.MessageToString(park, as_utf8=True) else: print(measure_info) park.statu.execute_statu = message.eError park.statu.statu_description = self.error_str return tf.MessageToString(park, as_utf8=True) def run(self): while self.is_close is False: # print(self.error_str) time.sleep(0.05) #先判断连接状态 if self.icpu_statu.timeout(): self.error_str="服务器超时,请联系管理员!" continue if self.measure_statu.timeout(): self.error_str='服务器超时,请联系管理员!' continue #有车才显示 icpu_statu=message.in_mcpu_statu() try: tf.Parse(self.icpu_statu.statu,icpu_statu) except: print(self.icpu_statu.statu) continue if icpu_statu.heighth == 1: self.error_str = '未知车高!' continue if icpu_statu.heighth == 5: self.error_str = '车辆超高!' continue #以下是有车的处理 #1,显示测量数据(实时和静态) measure_info=message.measure_info() tf.Parse(self.measure_statu.statu,measure_info) #2,获取车辆运动状态(静止or动态) border_statu=measure_info.border_statu is_moving = ((border_statu >> 11) & 0x01) == 1 lidar_statu=measure_info.ground_status # 测量状态(正常,无数据、噪声、超界) if is_moving: self.last_moving_statu=None else: #当前静止 if lidar_statu==MeasureStatu["ok"]: self.last_moving_statu=measure_info elif lidar_statu==MeasureStatu["超界"]: if not self.last_moving_statu ==None: #上一刻静止且数据正确,当前静止,不可能出现超界,将超界清除 new_border=border_statu new_border = (new_border & (~(0x01 << 0))) new_border = (new_border & (~(0x01 << 1))) new_border = (new_border & (~(0x01 << 2))) new_border = (new_border & (~(0x01 << 3))) new_border = (new_border & (~(0x01 << 6))) new_border = (new_border & (~(0x01 << 7))) new_border = (new_border & (~(0x01 << 8))) new_border = (new_border & (~(0x01 << 9))) border_statu=new_border elif lidar_statu==MeasureStatu["无数据"]: #当前静止无车,清除上一时刻数据 self.last_moving_statu=None elif lidar_statu==MeasureStatu["噪声"]: if not self.last_moving_statu==None: #上一时刻静止且正确,当前噪声,不显示当前数据,显示上一正确数据 measure_info.CopyFrom(self.last_moving_statu) #先判断光电 if is_moving: if icpu_statu.back_io==1: self.error_str='请按提示调整!' self.last_show="调整" continue #光电正常 if lidar_statu==MeasureStatu["无数据"]and icpu_statu.is_occupy != 2: self.error_str='无车!' self.last_show="空闲" continue elif lidar_statu==MeasureStatu["噪声"]: if self.last_show=="超时": self.error_str='雷达噪声' self.last_show="空闲" continue elif lidar_statu==MeasureStatu["ok"]: self.error_str='OK' continue elif lidar_statu==MeasureStatu["超界"]: if (border_statu>>7)&0x01==1: self.error_str='轴距不满足规格,请驶出!' self.last_show="轴距超差" continue if (border_statu>>9)&0x01==1: self.error_str='请按提示调整!' self.last_show="请调整" continue if (border_statu>>8)&0x01==1: self.error_str='请按提示调整!' self.last_show="请调整" continue if (border_statu>>10)&0x01==1: self.error_str='请按提示调整!' self.last_show="请调整" continue if (border_statu>>6)&0x01==1: self.error_str='车宽不满足规格,请驶出!' self.last_show="超宽" continue border=border_statu&0x0f if (border in ArrowType.keys()) is False: self.error_str='服务器超时,请联系管理员!' self.last_show = "超时" else: if ArrowType[border] == "正确图片": self.error_str="OK" self.last_show="正确" else: self.error_str="请按提示调整" self.last_show="请调整"