import sys import threading import time from datetime import datetime from PyQt5 import QtWidgets sys.path.append("..") import mytool.db_helper.db_operation as spmng from PyQt5.QtCore import QSize, QTimer, Qt from PyQt5.QtGui import QFont, QBrush, QColor, QPixmap from PyQt5.QtWidgets import QApplication, QMainWindow, QListWidgetItem, QListWidget, QLabel, QMessageBox, QFrame, \ QAbstractItemView from ui.ui import Ui_MainWindow class MainWindow(QMainWindow, Ui_MainWindow): def __init__(self, task_type, show_config, db_config): super().__init__() self.setupUi(self) # self.label.setText("上古街智能立体停车库取车排队信息") self.setWindowFlags(Qt.FramelessWindowHint) self.task_type = task_type self.top_number = show_config["top_number"] # self.image_label.setPixmap(QPixmap('./image/log.jpg')) # self.image_label.setScaledContents(True) # self.image_label.setMaximumHeight(140) # self.image_label.setMaximumWidth(600) self.db = spmng.DBOperation(db_config['db_ip'], db_config['db_port'], db_config['db_name'], db_config['db_user'], db_config['db_password']) self.unit_list = show_config["unit_list"][show_config["unit"]] self.font_size = show_config["font_size"] self.list_widget_dict = {} self.list_widget_thread = {} self.scroll_flag_dict = {} self.command_dict = {} self.last_command_dict = {} self.pick_wait_weight = {} self.last_pick_wait_weight = {} self.btn_positions = [(i, j) for i in range(0, show_config['row']) for j in range(show_config['column'])] # 创建listwidget与label 配置多少个单元生成多少对 for i, position in zip(self.unit_list, self.btn_positions): vboxlayout = QtWidgets.QVBoxLayout() self.list_widget_dict[i] = QListWidget(self) self.list_widget_dict[i].setAutoScroll(True) self.scroll_flag_dict[i] = False self.list_widget_dict[i].setSelectionMode(QAbstractItemView.NoSelection) self.list_widget_dict[i].setStyleSheet("border:5px solid #c0181f;") self.list_widget_dict[i].setObjectName(('list_widget_%d' % (i + 1))) self.list_widget_thread[i] = threading.Thread(target=self.Scroll, args=(i,)) self.list_widget_thread[i].start() label = QLabel(self.get_unit(i), self) label.setFont(QFont("黑体", 30, QFont.Bold)) # 设置字体、样式和大小 label.setAlignment(Qt.AlignCenter) vboxlayout.addWidget(label) vboxlayout.addWidget(self.list_widget_dict[i]) vboxlayout.setSpacing(0) # hframe = QFrame(self) # hframe.setFrameShape(QFrame.HLine) # 设置水平方向 # hframe.setStyleSheet("QFrame{background:black;min-height:4px}"); # vboxlayout.addWidget(hframe) self.main_gridLayout.addLayout(vboxlayout, *position) self.main_gridLayout.setVerticalSpacing(40) self.command_dict[i] = [] self.pick_wait_weight[i] = {} # 数据库查询线程 self.db_query = threading.Thread(target=self.db_query) self.db_query_isClose = False self.db_query.start() # 界面刷新定时器 self.timer = QTimer() self.timer.timeout.connect(self.Switch) self.timer.start(200) # 断开连接的弹窗 self.t_timer = QTimer(self) self.msg_box_hint = QMessageBox() self.msg_box_hint.setIcon(QMessageBox.Critical) self.msg_box_hint.setWindowTitle('错误') self.msg_box_hint.setText('连接断开,重新连接中...') self.t_timer.setSingleShot(True) self.t_timer.timeout.connect(self.msg_box_hint.close) def closeEvent(self, event): self.db_query_isClose = True self.db_query.join() event.accept() # 接受关闭事件 # 定义自动关闭对话框函数 def show_message_box(self): self.msg_box_hint.show() self.t_timer.start(1000) def db_query(self): while self.db_query_isClose is False: for i in self.unit_list: self.last_pick_wait_weight[i] = self.db.query_pick_wait_weight_in_unit(i) if i != 40: self.last_command_dict[i] = self.db.query_queue_condition_in_unit(i) else: self.last_command_dict[i] = self.db.query_command_top_number_in_type(self.task_type, self.top_number) # elif self.task_type != 0 and i == 40: # self.last_command_dict[i] = self.db.query_command_top_number_in_type(self.task_type,self.top_number) # elif self.task_type == 0 and i != 40: # self.last_command_dict[i] = self.db.query_queue_condition_in_unit(i) # else: # self.last_command_dict[i] = self.db.query_command_top_number(self.top_number) time.sleep(0.2) def get_time_weight(self): current_time_gour = datetime.now().hour if 0 <= current_time_gour < 7: return 0.6 elif 7 <= current_time_gour < 8: return 0.9 elif 8 <= current_time_gour < 9: return 1 elif 9 <= current_time_gour < 11: return 0.9 elif 11 <= current_time_gour < 12: return 0.8 elif 12 <= current_time_gour < 14: return 0.7 elif 11 <= current_time_gour < 15: return 0.9 elif 15 <= current_time_gour < 16: return 0.8 elif 16 <= current_time_gour < 21: return 0.7 elif 21 <= current_time_gour < 24: return 0.6 def get_listWidget_item(self, dict, index): item = QListWidgetItem() item.setFont(QFont('微软雅黑', self.font_size, QFont.Bold)) show_str = "" if dict['type'] == 1: # 存车指令 item.setBackground(QColor(185, 240, 240)) if (dict['statu'] == 0): # 排队 item.setForeground(QColor(80, 80, 80)) show_str = "%s 排队中" % (dict['car_number']) elif (dict['statu'] == 1): # 工作 item.setForeground(QColor('blue')) show_str = "%s 存车中" % (dict['car_number']) elif (dict['statu'] == 2): # 已完成 item.setForeground(QColor('green')) show_str = "%s 已完成" % (dict['car_number']) elif (dict['statu'] == 3): # 已完成 item.setForeground(QColor('red')) show_str = "%s 维护!" % (dict['car_number']) elif dict['type'] == 2: # 取车指令 item.setBackground(QColor(250, 240, 200)) if (dict['statu'] == 0): # 排队 item.setForeground(QColor(80, 80, 80)) show_str = "%s %s排队中" % (dict['car_number'], self.get_unit(dict['unit'])) if index != 40: show_str = "%s 预计剩%d分钟" % (dict['car_number'], round((dict['row_number() over(order by queue_time)']) * 5 * self.get_time_weight() * self.pick_wait_weight[index][0]['wait_weight'])) elif (dict['statu'] == 1): # 工作 item.setForeground(QColor('blue')) show_str = "%s %s取车中" % (dict['car_number'], self.get_unit(dict['unit'])) elif (dict['statu'] == 2): # 已完成 item.setForeground(QColor('green')) show_str = "%s %s出口%d" % (dict['car_number'], self.get_unit(dict['unit']), dict['export_id']) elif (dict['statu'] == 3): # 已完成 item.setForeground(QColor(80, 180, 200)) show_str = "%s %s维护!" % (dict['car_number'], self.get_unit(dict['unit'])) item.setText(show_str) return item def Switch(self): for i in self.unit_list: if (i in self.last_command_dict.keys()) is False or self.last_command_dict[i] is None or \ (i in self.last_pick_wait_weight.keys()) is False or self.last_pick_wait_weight[i] is None: self.show_message_box() elif self.command_dict[i] != self.last_command_dict[i] or self.pick_wait_weight[i] != \ self.last_pick_wait_weight[i]: self.pick_wait_weight[i] = self.last_pick_wait_weight[i] self.command_dict[i] = self.last_command_dict[i] self.list_widget_dict[i].clear() for dict in self.command_dict[i]: if dict['statu'] != 4 and dict['type'] == self.task_type: self.list_widget_dict[i].addItem(self.get_listWidget_item(dict, i)) def Scroll(self, i): while True: for index in range(self.list_widget_dict[i].count()): # if self.scroll_flag_dict[i] is True: # self.scroll_flag_dict[i] = False # break self.list_widget_dict[i].setCurrentRow(index) time.sleep(0.7) time.sleep(2) def get_unit(self, unit): if unit == 1: return 'A' elif unit == 2: return 'B' elif unit == 11: return 'A1' elif unit == 12: return 'A2' elif unit == 13: return 'A3' elif unit == 14: return 'A4' elif unit == 21: return 'B1' elif unit == 22: return 'B2' elif unit == 23: return 'B3' elif unit == 24: return 'B4' elif unit == 25: return 'B5' elif unit == 31: return 'C1' elif unit == 32: return 'C2' elif unit == 40: return '最新%d条' % self.top_number return '未知'