import sys sys.path.append("..") import threading import time import async_communication as CM import uuid import message_pb2 as message import google.protobuf.text_format as tf MeasureStatu={"ok":0,"无数据":1,"噪声":2,"超界":3} ArrowType={0:"正确图片",0x01:"向后调整",0x02:"向前调整",0x04:"向右调整",0x08:"向左调整" ,0x10:"左前调整",0x06:"右前调整",0x09:"右后调整",0x05:"左后调整"} class CommandChecker(threading.Thread): def __init__(self,db): threading.Thread.__init__(self) self._db = db self._dispatch_statu = CM.TimeStatu(timeout=0.1) self._measure_statu = CM.TimeStatu(timeout=0.1) self._plate_color = {'蓝色':'1','黑色':'2','黄色':'3','白色':'4','绿色':'5','其他':'6'} self.is_close = False def receive_dispatch_statu(self, statu, ex, key): self._dispatch_statu = statu def receive_measure_info(self,statu, ex, key): print(111) self._measure_statu = statu def stop(self): self.is_close = True def exit_isOK(self, pick_body): pick_table = message.pick_table() try: tf.Parse(pick_body, pick_table) except Exception as e: pick_table.statu.execute_statu = message.eError pick_table.statu.statu_description = "设备故障,请联系管理员!" + "(取车表单错误)" + str(e.args) return pick_table if pick_table.primary_key is None or pick_table.primary_key == '': pick_table.statu.execute_statu = message.eWarning pick_table.statu.statu_description = " 唯一码不能为空!" else: # 查询车位表和指令表是否有该车 space_res = self._db.query_space_in_primary_key(pick_table.primary_key) command_res = self._db.query_command_in_primary_key(pick_table.primary_key) if len(command_res) > 0: pick_table.statu.execute_statu = message.eWarning pick_table.statu.statu_description = "指令正在执行中!请稍等!" elif len(space_res) <= 0: pick_table.statu.execute_statu = message.eWarning pick_table.statu.statu_description = "车辆信息不存在!" return pick_table def entrance_isOK(self, park_body): # 停车表单 park_table = message.park_table() try: tf.Parse(park_body, park_table) except Exception as e: park_table.statu.execute_statu = message.eError park_table.statu.statu_description = "设备故障,请联系管理员!" + "(停车表单错误)" + str(e.args) return park_table # 检查调度状态 key = "dispatch_%d_statu_port" % park_table.unit_id if (key in self._dispatch_statu) is False or self._dispatch_statu[key].timeout(): park_table.statu.execute_statu = message.eError park_table.statu.statu_description = "设备故障,请联系管理员!" + "(调度)" return park_table key = "measure_%d_statu_port" % park_table.unit_id if (key in self._measure_statu) is False or self._measure_statu[key].timeout(): park_table.statu.execute_statu = message.eError park_table.statu.statu_description = "设备故障,请联系管理员!" + "(雷达)" return park_table # 获取车高信息 dispatch_node_statu = message.dispatch_node_statu() try: tf.Parse(self._dispatch_statu[key].statu, dispatch_node_statu) except Exception: park_table.statu.execute_statu = message.eError park_table.statu.statu_description = "设备故障,请联系管理员!" + "(调度状态消息错误)" return park_table if dispatch_node_statu.plc_carrier_status == 0: park_table.statu.execute_statu = message.eError park_table.statu.statu_description = "设备故障,请联系管理员!" + "(搬运器)" return park_table elif dispatch_node_statu.plc_carrier_status == 4: park_table.statu.execute_statu = message.eWarning park_table.statu.statu_description = "该单元设备维护中!请前往其他入口!" return park_table car_height = dispatch_node_statu.dispatch_plc_passway_status_vector[park_table.import_id - 1].car_height if car_height == 0: park_table.statu.execute_statu = message.eWarning park_table.statu.statu_description = "未检测到车辆!" return park_table elif car_height == 4: park_table.statu.execute_statu = message.eWarning park_table.statu.statu_description = "车辆超高,请退出车库!" return park_table # 填写车高信息 measure_info = message.measure_info() measure_info.height = car_height park_table.entrance_measure_info.CopyFrom(measure_info) # 赋值唯一码 uid = str(uuid.uuid1()) park_table.primary_key = uid # 查询车位表和指令表是否有该车 space_res = self._db.query_space_in_car_number(park_table.car_number) command_res = self._db.query_command_in_car_number(park_table.car_number) if len(command_res) > 0: park_table.statu.execute_statu = message.eWarning park_table.statu.statu_description = "指令正在执行中!请稍等!" elif len(space_res) > 0: park_table.statu.execute_statu = message.eWarning park_table.statu.statu_description = "车辆 :%s 已在库内!" % park_table.car_number else: # 车辆不在库内 查询是否有对应大小空车位 space_res = self._db.query_space_in_height_unit_and_empty(measure_info.height, park_table.unit_id) command_res = self._db.query_command_in_height_unit_and_statu(measure_info.height, park_table.unit_id) if len(space_res) - len(command_res) <= 0: park_table.statu.execute_statu = message.eWarning park_table.statu.statu_description = "没有空余车位,车牌号:%s 高度:%.3fm!" % (park_table.car_number, measure_info.height) return park_table def run(self): while self.is_close is False: # print(self.error_str) time.sleep(0.05) # 先判断连接状态 if self._dispatch_statu.timeout(): self.error_str = "服务器超时,请联系管理员!" continue if self.measure_statu.timeout(): self.error_str = '服务器超时,请联系管理员!' continue # 有车才显示 icpu_statu = message.in_mcpu_statu() try: tf.Parse(self.icpu_statu.statu, icpu_statu) except: print(self.icpu_statu.statu) continue if icpu_statu.heighth == 1: self.error_str = '未知车高!' continue if icpu_statu.heighth == 5: self.error_str = '车辆超高!' continue # 以下是有车的处理 # 1,显示测量数据(实时和静态) measure_info = message.measure_info() tf.Parse(self.measure_statu.statu, measure_info) # 2,获取车辆运动状态(静止or动态) border_statu = measure_info.border_statu is_moving = ((border_statu >> 11) & 0x01) == 1 lidar_statu = measure_info.ground_status # 测量状态(正常,无数据、噪声、超界) if is_moving: self.last_moving_statu = None else: # 当前静止 if lidar_statu == MeasureStatu["ok"]: self.last_moving_statu = measure_info elif lidar_statu == MeasureStatu["超界"]: if not self.last_moving_statu == None: # 上一刻静止且数据正确,当前静止,不可能出现超界,将超界清除 new_border = border_statu new_border = (new_border & (~(0x01 << 0))) new_border = (new_border & (~(0x01 << 1))) new_border = (new_border & (~(0x01 << 2))) new_border = (new_border & (~(0x01 << 3))) new_border = (new_border & (~(0x01 << 6))) new_border = (new_border & (~(0x01 << 7))) new_border = (new_border & (~(0x01 << 8))) new_border = (new_border & (~(0x01 << 9))) border_statu = new_border elif lidar_statu == MeasureStatu["无数据"]: # 当前静止无车,清除上一时刻数据 self.last_moving_statu = None elif lidar_statu == MeasureStatu["噪声"]: if not self.last_moving_statu == None: # 上一时刻静止且正确,当前噪声,不显示当前数据,显示上一正确数据 measure_info.CopyFrom(self.last_moving_statu) # 先判断光电 if is_moving: if icpu_statu.back_io == 1: self.error_str = '请按提示调整!' self.last_show = "调整" continue # 光电正常 if lidar_statu == MeasureStatu["无数据"] and icpu_statu.is_occupy != 2: self.error_str = '无车!' self.last_show = "空闲" continue elif lidar_statu == MeasureStatu["噪声"]: if self.last_show == "超时": self.error_str = '雷达噪声' self.last_show = "空闲" continue elif lidar_statu == MeasureStatu["ok"]: self.error_str = 'OK' continue elif lidar_statu == MeasureStatu["超界"]: if (border_statu >> 7) & 0x01 == 1: self.error_str = '轴距不满足规格,请驶出!' self.last_show = "轴距超差" continue if (border_statu >> 9) & 0x01 == 1: self.error_str = '请按提示调整!' self.last_show = "请调整" continue if (border_statu >> 8) & 0x01 == 1: self.error_str = '请按提示调整!' self.last_show = "请调整" continue if (border_statu >> 10) & 0x01 == 1: self.error_str = '请按提示调整!' self.last_show = "请调整" continue if (border_statu >> 6) & 0x01 == 1: self.error_str = '车宽不满足规格,请驶出!' self.last_show = "超宽" continue border = border_statu & 0x0f if (border in ArrowType.keys()) is False: self.error_str = '服务器超时,请联系管理员!' self.last_show = "超时" else: if ArrowType[border] == "正确图片": self.error_str = "OK" self.last_show = "正确" else: self.error_str = "请按提示调整" self.last_show = "请调整"