123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510 |
- import sys
- from itertools import product
- import message.message_pb2 as message
- from PyQt5 import sip
- from PyQt5.QtWidgets import QApplication, QSizePolicy, QLabel, QWidget, QListWidgetItem
- import json
- import threading
- import time
- from functools import partial
- from PyQt5.QtCore import pyqtSignal, Qt, QTimer, QThread, QSize
- from PyQt5.QtGui import QCursor, QFont, QColor, QPixmap
- from PyQt5.QtWidgets import QPushButton, QMainWindow, QMenu, QMessageBox
- import message.message_pb2 as message
- import google.protobuf.text_format as tf
- import re
- import ui.spaceUi as sui
- import db_operation as spmng
- import async_communication as cmt
- import mcpu_communication as mcn
- from led import Led
- # mq参数
- mq_ip = "192.168.1.233"
- # mq_ip = "127.0.0.1"
- mq_port = 5672
- mq_user = "zx"
- mq_password = "zx123456"
- # mq_user = "wk"
- # mq_password = "123456"
- statu_ex_keys = [
- ["statu_ex", "in_mcpu_1_statu_port"],
- ["statu_ex", "in_mcpu_2_statu_port"],
- ["statu_ex", "in_mcpu_3_statu_port"],
- ["statu_ex", "in_mcpu_4_statu_port"],
- ["statu_ex", "in_mcpu_5_statu_port"],
- ["statu_ex", "in_mcpu_6_statu_port"],
- ["statu_ex", "out_mcpu_1_statu_port"],
- ["statu_ex", "out_mcpu_2_statu_port"],
- ["statu_ex", "out_mcpu_3_statu_port"],
- ["statu_ex", "out_mcpu_4_statu_port"],
- ["statu_ex", "out_mcpu_5_statu_port"],
- ["statu_ex", "out_mcpu_6_statu_port"]
- ]
- class QPBtn(QPushButton):
- clickedSignal = pyqtSignal(bool) # 定义带参信号
- def __init__(self, parent=None):
- super(QPBtn, self).__init__(parent)
- def mousePressEvent(self, event): # 重定义该函数,对不同的操作释放不同的信号参数
- if event.buttons() == Qt.LeftButton:
- self.clickedSignal.emit(True)
- elif event.buttons() == Qt.RightButton:
- self.clickedSignal.emit(False)
- class MainWindow(QMainWindow, sui.Ui_MainWindow):
- def __init__(self, parent=None):
- super(MainWindow, self).__init__(parent)
- self.setupUi(self)
- self.isClose = False
- self.A_listWidget.setGridSize(QSize(335, 60))
- self.A_listWidget.setStyleSheet("border:5px solid #014F84;")
- self.B_listWidget.setGridSize(QSize(335, 60))
- self.B_listWidget.setStyleSheet("border:5px solid #014F84;")
- self.C_listWidget.setGridSize(QSize(335, 60))
- self.C_listWidget.setStyleSheet("border:5px solid #014F84;")
- self.image_label.setPixmap(QPixmap('log.jpg'))
- self.image_label.setScaledContents(True)
- self.image_label.setMaximumHeight(140)
- self.image_label.setMaximumWidth(600)
- self.btn_positions = [(i, j) for i in range(13, -1, -1) for j in range(6)]
- self.space_dict = {1: [], 2: [], 3: []}
- self.space_is_init = {1: False, 2: False, 3: False}
- self.rmq_statu_dict = {}
- self.command_dict = {1: [], 2: [], 3: []}
- for ex_name,key in statu_ex_keys:
- self.rmq_statu_dict[key]={}
- self.isInit = False
- self.timer = QTimer()
- self.timer.timeout.connect(self.Switch)
- self.timer.start(200)
- def getBackGroundColor(self, color):
- if color == "red":
- return "background-color:rgb(255,70,70)"
- elif color == "gray":
- return "background-color:rgb(168,168,168)"
- elif color == "green":
- return "background-color:rgb(0,180,0)"
- elif color == "yellow":
- return "background-color:rgb(248,239,71)"
- elif color == "pastel":
- return "background-color:rgb(241,158,194)"
- def getListWidgetItem(self, dict):
- item = QListWidgetItem()
- item.setFont(QFont('微软雅黑', 20, QFont.Bold))
- show_str = ""
- if(dict["type"] == 2):
- if (dict["statu"] == 0): # 排队
- item.setForeground(QColor(80, 80, 80))
- show_str = dict["car_number"] + " 排队中,请稍等片刻"
- elif (dict["statu"] == 1): # 工作
- item.setForeground(QColor('blue'))
- show_str = dict["car_number"] + " 取车中,等待取车结束"
- elif (dict["statu"] == 2): # 已完成
- item.setForeground(QColor('green'))
- show_str = dict["car_number"] + " 已完成,请到 %d 号出口取车" % (dict["export_id"])
- item.setText(show_str)
- return item
- def drawLed(self,command_dict):
- export = {}
- for i in range(1,7):
- export[i] = True
- for key in self.rmq_statu_dict.keys():
- if key.find("out") >= 0:
- out_mcpu_statu = message.out_mcpu_statu()
- tf.Parse(self.rmq_statu_dict[key], out_mcpu_statu)
- if out_mcpu_statu.outside_safety == 2:
- export[int(key[9])] = False
- t_command_dict = g_space.get_command_dict()
- for dict in t_command_dict:
- if dict["export_id"] == 1:
- export[1] = False
- if dict["export_id"] == 2:
- export[2] = False
- if dict["export_id"] == 3:
- export[3] = False
- if dict["export_id"] == 4:
- export[4] = False
- if dict["export_id"] == 5:
- export[5] = False
- if dict["export_id"] == 6:
- export[6] = False
- led = Led()
- led.setEnabled(False) # 设置为不使能状态,如果需要切换可以设置为True。则可以实现点击切换指示灯颜色的效果
- led.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
- if export[1] == export[2] == False:
- led.setChecked(False) # 设置为选中状态
- else:
- led.setChecked(True) # 设置为选中状态
- self.led_process_Layout.addWidget(led, 0, 1)
- led = Led()
- led.setEnabled(False) # 设置为不使能状态,如果需要切换可以设置为True。则可以实现点击切换指示灯颜色的效果
- led.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
- if export[3] == export[4] == False:
- led.setChecked(False) # 设置为选中状态
- else:
- led.setChecked(True) # 设置为选中状态
- self.led_process_Layout.addWidget(led, 0, 2)
- led = Led()
- led.setEnabled(False) # 设置为不使能状态,如果需要切换可以设置为True。则可以实现点击切换指示灯颜色的效果
- led.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
- if export[5] == export[6] == False:
- led.setChecked(False) # 设置为选中状态
- else:
- led.setChecked(True) # 设置为选中状态
- self.led_process_Layout.addWidget(led, 0, 3)
- def drawRmqBtn(self, msg, key):
- btn = self.findChild(QPushButton, (key.replace("port", "btn")).replace("mcpu", "rmq"))
- if msg is None:
- btn.setToolTip("None")
- btn.setStyleSheet(self.getBackGroundColor("gray"))
- btn.setText(btn.text()[:9] + "断连")
- return
- btn.setToolTip(msg)
- if btn.text().find("入口") >= 0:
- in_mcpu_statu = message.in_mcpu_statu()
- tf.Parse(msg, in_mcpu_statu)
- if in_mcpu_statu.is_occupy == 1:
- btn.setStyleSheet(self.getBackGroundColor("green"))
- btn.setText(btn.text()[:9] + "空闲----" + btn.text()[15:])
- else:
- btn.setStyleSheet(self.getBackGroundColor("yellow"))
- btn.setText(btn.text()[:9] + "占用----" + btn.text()[15:])
- # print(btn.text())
- if in_mcpu_statu.door_statu == 2:
- btn.setText(btn.text()[:-3] + "开到位")
- elif in_mcpu_statu.door_statu == 3:
- btn.setText(btn.text()[:-3] + "关到位")
- elif in_mcpu_statu.door_statu == 4:
- btn.setText(btn.text()[:-3] + "开关中")
- elif in_mcpu_statu.door_statu == 5:
- btn.setText(btn.text()[:-3] + "门故障")
- elif btn.text().find("出口") >= 0:
- out_mcpu_statu = message.out_mcpu_statu()
- tf.Parse(msg, out_mcpu_statu)
- if out_mcpu_statu.outside_safety == 1:
- btn.setStyleSheet(self.getBackGroundColor("green"))
- btn.setText(btn.text()[:9] + "空闲----" + btn.text()[15:])
- else:
- btn.setStyleSheet(self.getBackGroundColor("yellow"))
- btn.setText(btn.text()[:9] + "占用----" + btn.text()[15:])
- if out_mcpu_statu.door_statu == 2:
- btn.setText(btn.text()[:-3] + "开到位")
- elif out_mcpu_statu.door_statu == 3:
- btn.setText(btn.text()[:-3] + "关到位")
- elif out_mcpu_statu.door_statu == 4:
- btn.setText(btn.text()[:-3] + "开关中")
- elif out_mcpu_statu.door_statu == 5:
- btn.setText(btn.text()[:-3] + "门故障")
- def process_btn_click(self, flag):
- if flag is False:
- sender = self.sender()
- tool_tip = sender.toolTip()
- dict = json.loads(tool_tip)
- menu = QMenu(self)
- action = menu.addAction('清除')
- action.triggered.connect(partial(self.process_delete, dict))
- menu.exec_(QCursor.pos())
- def park_btn_click(self, flag):
- if flag is False:
- sender = self.sender()
- tool_tip = sender.toolTip()
- dict = json.loads(tool_tip)
- menu = QMenu(self)
- action = menu.addAction('启用车位')
- action.triggered.connect(partial(self.btn_opeSpace, dict, 0))
- action = menu.addAction('停用车位')
- action.triggered.connect(partial(self.btn_disableSpace, dict, 1))
- action = menu.addAction('清除车位')
- action.triggered.connect(partial(self.btn_clearSpace, dict))
- action = menu.addAction('取出车辆')
- action.triggered.connect(partial(self.btn_pickUp, dict))
- menu.exec_(QCursor.pos())
- def btn_disableSpace(self, dict, statu):
- res = g_space.update_space_status(dict["id"],statu)
- if res == 0:
- QMessageBox.question(None, '提示', '更新车位失败!请检查是否点击错误!',
- QMessageBox.Ok) # "退出"代表的是弹出框的标题,"你确认退出.."表示弹出框的内容
- def btn_opeSpace(self, dict, statu):
- res = g_space.update_space_status(dict["id"],statu)
- if res == 0:
- QMessageBox.question(None, '提示', '更新车位失败!请检查是否点击错误!',
- QMessageBox.Ok) # "退出"代表的是弹出框的标题,"你确认退出.."表示弹出框的内容
- def btn_clearSpace(self, dict):
- res = g_space.clear_space_data(dict["id"])
- if res == 0:
- QMessageBox.question(None, '提示', '更新车位失败!请检查是否点击错误!',
- QMessageBox.Ok) # "退出"代表的是弹出框的标题,"你确认退出.."表示弹出框的内容
- def btn_pickUp(self, dict):
- res = g_space.query_vehicle_primary_key(dict["car_number"])
- print(res)
- if len(res) < 1:
- QMessageBox.question(None, '提示', dict["car_number"]+' 未查询到车辆!',
- QMessageBox.Ok) # "退出"代表的是弹出框的标题,"你确认退出.."表示弹出框的内容
- elif len(res) > 1:
- QMessageBox.question(None, '提示', dict["car_number"]+' 查询到多条结果!请检查数据库!',
- QMessageBox.Ok) # "退出"代表的是弹出框的标题,"你确认退出.."表示弹出框的内容
- elif len(res) == 1:
- table = message.pick_table()
- table.primary_key = res[0]["primary_key"]
- g_rabbitmq.publish("command_ex", "user_command_port", tf.MessageToString(table, as_utf8=True))
- QMessageBox.question(None, '提示', '取车消息发送成功!',
- QMessageBox.Ok) # "退出"代表的是弹出框的标题,"你确认退出.."表示弹出框的内容
- def process_delete(self, dict):
- g_space.processDelete(dict)
- # def db_callback(self, flag):
- # if flag and self.isEnabled() is False:
- # self.setWindowTitle("MainWindow---连接正常!")
- # self.setEnabled(True)
- # elif flag is False and self.isEnabled() is True:
- # self.setWindowTitle("MainWindow---连接错误!")
- # self.setEnabled(False)
- def getParkData(self,park):
- text_str = "车位:%d\n楼层:%d" % ((park["id"] - 1) % 78 + 1, park["floor"])
- stylesheet = ""
- if park["statu"] == 1:
- stylesheet = "background-color:rgb(255,130,130)"
- elif park["statu"] == 0 and park["car_number"] is not None:
- stylesheet = "background-color:rgb(248,239,71)"
- tool_tip = '{"id":%d,\n"floor":%d,\n "subID":%d, \n"height":%f, \n"car_number":"%s" ,\n "unit":%d, \n"statu":%d}' % (
- park["id"], park["floor"], park["subID"], park["height"], park["car_number"], park["unit"],
- park["statu"])
- return text_str,stylesheet,tool_tip
- def Switch(self):
- #绘制车位按钮
- space_dict = g_space.get_space_dict()
- for unit in self.space_dict.keys():
- if self.space_dict[unit] != space_dict[unit]:
- if self.space_is_init[unit] == False:
- self.space_dict[unit] = space_dict[unit]
- self.space_is_init[unit] = True
- for unit in self.space_dict.keys():
- for park, position in zip(self.space_dict[unit], self.btn_positions):
- btn = QPBtn()
- text_str, stylesheet, tool_tip = self.getParkData(park)
- btn.setText(text_str)
- btn.setStyleSheet(stylesheet)
- btn.setToolTip(tool_tip)
- btn.setObjectName(text_str + str(unit))
- btn.clickedSignal.connect(self.park_btn_click)
- if unit == 1:
- self.A_gridLayout.addWidget(btn, *position)
- if unit == 2:
- self.B_gridLayout.addWidget(btn, *position)
- if unit == 3:
- self.C_gridLayout.addWidget(btn, *position)
- else:
- for park1,park2 in zip(self.space_dict[unit],space_dict[unit]):
- if park1 != park2:
- text_str, stylesheet, tool_tip = self.getParkData(park2)
- results = self.findChild(QPushButton,text_str + str(unit))
- results.setText(text_str)
- results.setStyleSheet(stylesheet)
- results.setToolTip(tool_tip)
- self.space_dict[unit] = space_dict[unit]
- #绘制出入口状态按钮
- for key in self.rmq_statu_dict.keys():
- if g_rabbitmq["statu_ex:"+key].timeout() is True:
- self.rmq_statu_dict[key] = {}
- self.drawRmqBtn(None,key)
- elif self.rmq_statu_dict[key] != g_rabbitmq["statu_ex:"+key].statu :
- self.rmq_statu_dict[key] = g_rabbitmq["statu_ex:"+key].statu
- self.drawRmqBtn(self.rmq_statu_dict[key],key)
- # #绘制信号灯
- # command_dict = g_space.get_command_dict()
- # self.drawLed(command_dict)
- #绘制流程界面
- command_dict = g_space.get_command_dict()
- for key in self.command_dict.keys():
- if self.command_dict[key] != command_dict[key]:
- self.command_dict[key] = command_dict[key]
- if key == 1:
- self.A_listWidget.clear()
- for dict in self.command_dict[key]:
- self.A_listWidget.addItem(self.getListWidgetItem(dict))
- elif key == 2:
- self.B_listWidget.clear()
- for dict in self.command_dict[key]:
- self.B_listWidget.addItem(self.getListWidgetItem(dict))
- elif key == 3:
- self.C_listWidget.clear()
- for dict in self.command_dict[key]:
- self.C_listWidget.addItem(self.getListWidgetItem(dict))
- def closeEvent(self, event):
- results = QMessageBox.question(self, '退出', '你确定要退出吗?', QMessageBox.Yes | QMessageBox.No,
- QMessageBox.No) # "退出"代表的是弹出框的标题,"你确认退出.."表示弹出框的内容
- if results == QMessageBox.Yes:
- g_rabbitmq.close()
- g_space.close()
- self.isClose = True
- event.accept() # 接受关闭事件
- else:
- event.ignore() # 忽略关闭事件
- if __name__ == '__main__':
- g_rabbitmq = cmt.RabbitAsyncCommunicator(mq_ip, mq_port, mq_user, mq_password)
- g_rabbitmq.Init(None, statu_ex_keys)
- g_rabbitmq.start()
- g_space = spmng.DBOperation()
- g_space.start()
- app = QApplication(sys.argv)
- mainWindow = MainWindow()
- mainWindow.showMaximized()
- sys.exit(app.exec_())
- # self.drawLedSignal.emit()
- # self.drawUnitProcessSignal.emit()
- # return
- # def run(self):
- # while self.isClose is not True:
- # if g_space.statu is False:
- # self.setWindowTitle("MainWindow---连接错误!")
- # self.setEnabled(False)
- # else:
- # self.setWindowTitle("MainWindow---连接正常!")
- # self.setEnabled(True)
- #
- # for list in statu_ex_keys:
- # ex_name = list[0]
- # key = list[1]
- # id = ex_name + ":" + key
- # if (id in self.rmq_statu.keys()) is False or self.rmq_statu[id].statu != g_rabbitmq[id].statu or \
- # self.rmq_timeout[id] != g_rabbitmq[id].timeout():
- # self.rmq_timeout[id] = g_rabbitmq[id].timeout()
- # self.rmq_statu[id] = g_rabbitmq[id]
- # self.drawLedSignal.emit()
- # self.drawUnitProcessSignal.emit()
- # self.drawRmqBtnSignal.emit(list)
- # time.sleep(0.01)
- # for key, statu in g_mcpu.GetAllMcpuConnectStatus().items():
- # self.drawMcpuBtnSignal.emit(key)
- # time.sleep(0.01)
- # time.sleep(0.5)
- # 进入程序的主循环,并通过exit函数确保主循环安全结束(该释放资源的一定要释放)
- # def mcpu_btn_click(self):
- # sender = self.sender()
- # mcpu_key_list = sender.objectName().split('_')
- # menu = QMenu(self)
- # action = menu.addAction('手动开门')
- # action.triggered.connect(partial(self.manual_open_door, mcpu_key_list))
- # action = menu.addAction('手动关门')
- # action.triggered.connect(partial(self.manual_close_door, mcpu_key_list))
- # action = menu.addAction('恢复半自动')
- # action.triggered.connect(partial(self.automatic_door, mcpu_key_list))
- # action = menu.addAction('恢复全自动')
- # action.triggered.connect(partial(self.fully_automatic_door, mcpu_key_list))
- # menu.exec_(QCursor.pos())
- #
- # def manual_open_door(self, mcpu_key_list):
- # dispatch_direction = 0
- # if mcpu_key_list[0] == 'in':
- # dispatch_direction = 1
- # elif mcpu_key_list[0] == 'out':
- # dispatch_direction = 2
- # msg = b'{ "TerminalID": %d, "DispatchDirection": %d, "ProcessControl": 2, "OutPutDo": { "Do0": 1, "Do1": 0, "Do2": 0, "Do3": 0, "Do4": 0, "Do5": 0, "Do6": 0, "Do7": 0 }}' % (
- # int(mcpu_key_list[2]) - 1, dispatch_direction)
- # key = mcpu_key_list[0] + ":mcpu_" + mcpu_key_list[2]
- # g_mcpu.publish(key, msg)
- # QMessageBox.question(None, '提示', '指令发送成功!',
- # QMessageBox.Ok) # "退出"代表的是弹出框的标题,"你确认退出.."表示弹出框的内容
- #
- # def manual_close_door(self, mcpu_key_list):
- # dispatch_direction = 0
- # if mcpu_key_list[0] == 'in':
- # dispatch_direction = 1
- # elif mcpu_key_list[0] == 'out':
- # dispatch_direction = 2
- # msg = b'{ "TerminalID": %d, "DispatchDirection": %d, "ProcessControl": 2, "OutPutDo": { "Do0": 0, "Do1": 1, "Do2": 0, "Do3": 0, "Do4": 0, "Do5": 0, "Do6": 0, "Do7": 0 }}' % (
- # int(mcpu_key_list[2]) - 1, dispatch_direction)
- # key = mcpu_key_list[0] + ":mcpu_" + mcpu_key_list[2]
- # g_mcpu.publish(key, msg)
- # QMessageBox.question(None, '提示', '指令发送成功!',
- # QMessageBox.Ok) # "退出"代表的是弹出框的标题,"你确认退出.."表示弹出框的内容
- #
- # def automatic_door(self, mcpu_key_list):
- # dispatch_direction = 0
- # ProcessControl = 0
- # if mcpu_key_list[0] == 'in':
- # ProcessControl = 3
- # dispatch_direction = 1
- # elif mcpu_key_list[0] == 'out':
- # ProcessControl = 4
- # dispatch_direction = 2
- # msg = b'{ "TerminalID": %d, "DispatchDirection": %d, "ProcessControl": %d, "OutPutDo": { "Do0": 0, "Do1": 0, "Do2": 0, "Do3": 0, "Do4": 0, "Do5": 0, "Do6": 0, "Do7": 0 }}' % (
- # int(mcpu_key_list[2]) - 1, dispatch_direction, ProcessControl)
- # key = mcpu_key_list[0] + ":mcpu_" + mcpu_key_list[2]
- # g_mcpu.publish(key, msg)
- # QMessageBox.question(None, '提示', '指令发送成功!',
- # QMessageBox.Ok) # "退出"代表的是弹出框的标题,"你确认退出.."表示弹出框的内容
- #
- # def fully_automatic_door(self, mcpu_key_list):
- # dispatch_direction = 0
- # if mcpu_key_list[0] == 'in':
- # dispatch_direction = 1
- # elif mcpu_key_list[0] == 'out':
- # dispatch_direction = 2
- # msg = b'{ "TerminalID": %d, "DispatchDirection": %d, "ProcessControl": 1, "OutPutDo": { "Do0": 0, "Do1": 0, "Do2": 0, "Do3": 0, "Do4": 0, "Do5": 0, "Do6": 0, "Do7": 0 }}' % (
- # int(mcpu_key_list[2]) - 1, dispatch_direction)
- # key = mcpu_key_list[0] + ":mcpu_" + mcpu_key_list[2]
- # g_mcpu.publish(key, msg)
- # QMessageBox.question(None, '提示', '指令发送成功!',
- # QMessageBox.Ok) # "退出"代表的是弹出框的标题,"你确认退出.."表示弹出框的内容
|