import sys from concurrent.futures import ThreadPoolExecutor sys.path.append("..") # 导入pymysql import socket import threading import time from datetime import datetime import google.protobuf.text_format as tf import message_pb2 as message from led_protocol import LedProtocol from led_protocol import font_library class led_control(threading.Thread): def __init__(self, unit, channel,communication, area_list,db): threading.Thread.__init__(self) self._dispatch_statu = {} self.led_communication = communication self.area_list = area_list self.wait_command_dict = {} self.pick_command_dict = {} self.db = db self.unit = unit self.channel = channel self.led_protocol = LedProtocol(area_list) self._pool = ThreadPoolExecutor(max_workers=11) def led_control_init(self): unit_char = "" if self.unit == 1: unit_char = 'A' elif self.unit == 2: unit_char = 'B' elif self.unit == 11: unit_char = 'A1' elif self.unit == 12: unit_char = 'A2' elif self.unit == 13: unit_char = 'A3' elif self.unit == 14: unit_char = 'A4' elif self.unit == 21: unit_char = 'BA1' elif self.unit == 22: unit_char = 'B2' elif self.unit == 23: unit_char = 'B3' elif self.unit == 24: unit_char = 'B4' elif self.unit == 25: unit_char = 'B5' elif self.unit == 31: unit_char = 'C1' elif self.unit == 32: unit_char = 'C2' self.send_led_msg(0, unit_char,en_font=font_library["EN32"]) def receive_dispatch_statu(self, statu, ex, key): self._dispatch_statu[key] = statu def get_wait_led_string(self,cmd_queue): if len(cmd_queue) == 0: return "" led_show_string = "" for num, cmd in zip(range(0, len(cmd_queue)), cmd_queue): led_show_string = led_show_string + "%d#%s " % (num + 1, cmd['car_number']) led_show_string = "候车:"+led_show_string return led_show_string def get_pick_led_string(self, cmd_queue): if len(cmd_queue) == 0: return "等待分配!" led_show_string = cmd_queue[0]['car_number']+" 取车中,请稍候!" return led_show_string def send_led_msg(self, area_index, input, en_font=font_library["EN16"], cn_font=font_library["CN16"]): t_time = datetime.now() while (datetime.now() - t_time).microseconds < 30000: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(1) try: sock.connect((self.led_communication['ip'], self.led_communication['port'])) sock.send(self.led_protocol.string2bytes(area_index, input, en_font, cn_font)) print("connected led \033[0:32mSUCCEND\033[m ip:%s port:%d\nsend msg area_index:%d msg:%s" % ( self.led_communication['ip'], self.led_communication['port'], area_index, input)) break except Exception as e: print("connected led \033[0:31mERROR\033[m: %s time:%s msg=%s" % (str(e.args),datetime.now(),input)) time.sleep(0.01) sock.close() time.sleep(0.1) def run(self): self.led_control_init() while True: wait_cmd_dict = self.db.query_pick_command_in_unit_and_statu(self.unit, 0) pick_cmd_dict = self.db.query_pick_command_in_unit_and_statu(self.unit, 1) if self.wait_command_dict != wait_cmd_dict: self.wait_command_dict = wait_cmd_dict led_show_string = self.get_wait_led_string(wait_cmd_dict) self._pool.submit(self.send_led_msg,1, led_show_string) key = "dispatch_%d_statu_port" % self.unit if (key in self._dispatch_statu) is False or self._dispatch_statu[key].timeout(): print('ERROR --- 调度节点未连接----key:%s---------time:%s'%(key,datetime.now())) else: dispatch_node_statu = message.dispatch_node_statu() try: tf.Parse(self._dispatch_statu[key].statu, dispatch_node_statu) if dispatch_node_statu.plc_carrier_status == 0: self.send_led_msg(2, "故障!!!") elif dispatch_node_statu.plc_carrier_status == 4: self.send_led_msg(2, "维护中!") break except Exception: print('ERROR --- 调度状态消息解析错误 key:%s time:%s message:\n%s'% (key,datetime.now(),self._dispatch_statu[key].statu)) if self.pick_command_dict != pick_cmd_dict: self.pick_command_dict = pick_cmd_dict led_show_string = self.get_pick_led_string(pick_cmd_dict) self._pool.submit(self.send_led_msg,2, led_show_string) time.sleep(1)