import sys from async_communication import TimeStatu sys.path.append("..") # 导入pymysql import socket import threading import time from datetime import datetime from concurrent.futures import ThreadPoolExecutor import google.protobuf.text_format as tf import message_pb2 as message from led_protocol import LedProtocol from led_protocol import font_library import mytool.db_helper.db_operation as spmng class led_control(threading.Thread): def __init__(self, unit, channel, communication, area_list, db_config): threading.Thread.__init__(self) self._dispatch_statu = {} self.led_communication = communication self.area_list = area_list self.area_1_show_str_timer = TimeStatu('', 0.1) self.area_2_show_str_timer = TimeStatu('', 0.1) self.db = spmng.DBOperation(db_config['db_ip'], db_config['db_port'], db_config['db_name'], db_config['db_user'], db_config['db_password']) self.unit = unit # if self.unit == 31 or self.unit == 32: # self.en_font = font_library["EN32"] # self.cn_font = font_library["CN32"] # else: # self.en_font = font_library["EN16"] # self.cn_font = font_library["CN16"] self.en_font = font_library["EN16"] self.cn_font = font_library["CN16"] self.channel = channel self.led_protocol = LedProtocol(area_list) self._pool = ThreadPoolExecutor() self.led_control_init() def led_control_init(self): font = font_library["EN32"] unit_char = "" if self.unit == 1: unit_char = 'A' elif self.unit == 2: unit_char = 'B' elif self.unit == 11: unit_char = 'A1' elif self.unit == 12: unit_char = 'A2' elif self.unit == 13: unit_char = 'A3' elif self.unit == 14: unit_char = 'A4' elif self.unit == 21: unit_char = 'B1' elif self.unit == 22: unit_char = 'B2' elif self.unit == 23: unit_char = 'B3' elif self.unit == 24: unit_char = 'B4' elif self.unit == 25: unit_char = 'B5' elif self.unit == 31: unit_char = 'C1' # font = font_library["EN48"] elif self.unit == 32: unit_char = 'C2' # font = font_library["EN48"] self.send_led_msg(0, unit_char, en_font=font, cn_font=self.cn_font) def receive_dispatch_statu(self, statu, ex, key): self._dispatch_statu[key] = statu def get_parking_string(self, small_space_count, big_space_count): led_show_string = "剩大车位:%d\\n剩小车位:%d" % (big_space_count, small_space_count) return led_show_string def get_entrance_statu_led_string(self, small_space_count, big_space_count, pick_command_count): key = "dispatch_%d_statu_port" % self.unit led_show_string = "" if (key in self._dispatch_statu) is False or self._dispatch_statu[key].timeout(): print('ERROR --- 调度节点未连接----key:%s---------time:%s' % (key, datetime.now())) led_show_string = "故障!!" else: dispatch_node_statu = message.dispatch_node_statu() try: tf.Parse(self._dispatch_statu[key].statu, dispatch_node_statu) if dispatch_node_statu.plc_carrier_status == 0: led_show_string = "故 障!" elif dispatch_node_statu.plc_carrier_status == 4: led_show_string = "维护中!" else: if big_space_count <= 0 and small_space_count <= 0: led_show_string = "无位 不可进\\n取车候车:%d" % pick_command_count elif big_space_count > 0 and (dispatch_node_statu.dispatch_plc_passway_status_vector[ self.channel - 1].plc_passway_enable == 0 or dispatch_node_statu.dispatch_plc_passway_status_vector[ self.channel - 1].plc_passway_enable == 2): led_show_string = "占用 不可进\\n取车候车:%d" % pick_command_count elif big_space_count > 0 and (dispatch_node_statu.dispatch_plc_passway_status_vector[ self.channel - 1].plc_passway_enable == 1 or dispatch_node_statu.dispatch_plc_passway_status_vector[ self.channel - 1].plc_passway_enable == 3): led_show_string = "空闲 可进入\\n取车候车:%d" % pick_command_count elif small_space_count > 0 and (dispatch_node_statu.dispatch_plc_passway_status_vector[ self.channel - 1].plc_passway_enable == 0 or dispatch_node_statu.dispatch_plc_passway_status_vector[ self.channel - 1].plc_passway_enable == 2): led_show_string = "占用 不可进\\n取车候车:%d" % pick_command_count elif small_space_count > 0 and (dispatch_node_statu.dispatch_plc_passway_status_vector[ self.channel - 1].plc_passway_enable == 1 or dispatch_node_statu.dispatch_plc_passway_status_vector[ self.channel - 1].plc_passway_enable == 3): led_show_string = "空闲 可进入\\n取车候车:%d" % pick_command_count except Exception: print('ERROR --- 调度状态消息解析错误 key:%s time:%s message:\n%s' % ( key, datetime.now(), self._dispatch_statu[key].statu)) led_show_string = "故 障!" return led_show_string def send_led_msg(self, area_index, input, en_font=font_library["EN32"], cn_font=font_library["CN32"]): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(1) try: sock.connect((self.led_communication['ip'], self.led_communication['port'])) time.sleep(0.1) sock.send(self.led_protocol.string2bytes(area_index, input, en_font=en_font, cn_font=cn_font)) print("connected led \033[0:32mSUCCEND\033[m ip:%s port:%d\nsend msg area_index:%d msg:%s" % ( self.led_communication['ip'], self.led_communication['port'], area_index, input)) except Exception as e: print("connected led \033[0:31mERROR\033[m: %s time:%s msg=%s" % (str(e.args), datetime.now(), input)) time.sleep(0.1) sock.close() def run(self): i = 0 while True: time.sleep(1) if i == 20: self.led_control_init() time.sleep(0.1) i=0 i = i + 1 small_space_res = self.db.query_space_in_height_unit_and_empty_2(1.48, 1.5, self.unit) big_space_res = self.db.query_space_in_height_unit_and_empty(2, self.unit) command_small_res = self.db.query_command_in_height_minmax_unit_and_statu(1.48, 1.5, self.unit) command_big_res = self.db.query_command_in_height_unit_and_statu(2, self.unit) pick_command_res = self.db.query_sort_pick_command(self.unit) if small_space_res is None or big_space_res is None or command_small_res is None or command_big_res is None or pick_command_res is None: continue small_space_count = len(small_space_res) - len(command_small_res) big_space_count = len(big_space_res) - len(command_big_res) pick_commandcount = len(pick_command_res) statu_string = self.get_entrance_statu_led_string(small_space_count, big_space_count, pick_commandcount) parking_string = self.get_parking_string(small_space_count, big_space_count) if self.area_1_show_str_timer.statu != statu_string or self.area_1_show_str_timer.timeout(): self.area_1_show_str_timer = TimeStatu(statu_string, 20) self._pool.submit(self.send_led_msg, 1, statu_string,self.en_font,self.cn_font) # test # self._pool.submit(self.send_led_msg, 2, str(i),self.en_font,self.cn_font) # i = i + 1 if self.area_2_show_str_timer.statu != parking_string or self.area_2_show_str_timer.timeout(): self.area_2_show_str_timer = TimeStatu(parking_string,20) self._pool.submit(self.send_led_msg,2, parking_string,self.en_font,self.cn_font)