||
- import sys
- from PyQt5.QtWidgets import QApplication, QSizePolicy, QLabel, QWidget
- import json
- import threading
- import time
- from functools import partial
- from PyQt5.QtCore import pyqtSignal, Qt
- from PyQt5.QtGui import QCursor
- from PyQt5.QtWidgets import QPushButton, QMainWindow, QMenu, QMessageBox
- import message.message_pb2 as message
- import google.protobuf.text_format as tf
- import re
- import ui.spaceUi as sui
- import spaceManage as spmng
- import async_communication as cmt
- from led import Led
- # db参数
- db_ip = "192.168.1.233"
- # db_ip = "127.0.0.1"
- db_port = 3306
- db_name = "ct_project"
- db_user = "zx"
- db_password = "zx123456"
- # db_user = "root"
- # db_password = "123456"
- # mq参数
- mq_ip = "192.168.1.233"
- # mq_ip = "127.0.0.1"
- mq_port = 5672
- mq_user = "zx"
- mq_password = "zx123456"
- # mq_user = "wk"
- # mq_password = "123456"
- statu_ex_keys = [
- ["statu_ex", "in_mcpu_1_statu_port"],
- ["statu_ex", "in_mcpu_2_statu_port"],
- ["statu_ex", "in_mcpu_3_statu_port"],
- ["statu_ex", "in_mcpu_4_statu_port"],
- ["statu_ex", "in_mcpu_5_statu_port"],
- ["statu_ex", "in_mcpu_6_statu_port"],
- ["statu_ex", "out_mcpu_1_statu_port"],
- ["statu_ex", "out_mcpu_2_statu_port"],
- ["statu_ex", "out_mcpu_3_statu_port"],
- ["statu_ex", "out_mcpu_4_statu_port"],
- ["statu_ex", "out_mcpu_5_statu_port"],
- ["statu_ex", "out_mcpu_6_statu_port"]
- ]
- class QPBtn(QPushButton):
- clickedSignal = pyqtSignal(bool) # 定义带参信号
- def __init__(self, parent=None):
- super(QPBtn, self).__init__(parent)
- def mousePressEvent(self, event): # 重定义该函数,对不同的操作释放不同的信号参数
- if event.buttons() == Qt.LeftButton:
- self.clickedSignal.emit(True)
- elif event.buttons() == Qt.RightButton:
- self.clickedSignal.emit(False)
- class MainWindow(QMainWindow, threading.Thread):
- drawSpaceBtnSignal = pyqtSignal(dict, int)
- drawMcpuBtnSignal = pyqtSignal(list)
- drawLedSignal = pyqtSignal()
- def __init__(self):
- threading.Thread.__init__(self)
- QMainWindow.__init__(self)
- self.mcpu_statu = {}
- self.mcpu_timeout = {}
- self.ui = sui.Ui_MainWindow()
- self.ui.setupUi(self)
- self.isClose = False
- self.drawSpaceBtnSignal.connect(self.drawSpaceBtn)
- self.drawMcpuBtnSignal.connect(self.drawMcpuBtn)
- self.drawLedSignal.connect(self.drawLed)
- self.btn_positions = [(i, j) for i in range(13, -1, -1) for j in range(6)]
- self.dispatch_mcpu_positions = [(i, j) for i in range(1) for j in range(3)]
- def getBackGroundColor(self, color):
- if color == "red":
- return "background-color:rgb(255,70,70)"
- elif color == "gray":
- return "background-color:rgb(168,168,168)"
- elif color == "green":
- return "background-color:rgb(0,180,0)"
- elif color == "yellow":
- return "background-color:rgb(248,239,71)"
- def drawLed(self):
- export_1 = True
- export_2 = True
- export_3 = True
- export_4 = True
- export_5 = True
- export_6 = True
- out_mcpu_1_statu = message.out_mcpu_statu()
- out_mcpu_2_statu = message.out_mcpu_statu()
- out_mcpu_3_statu = message.out_mcpu_statu()
- out_mcpu_4_statu = message.out_mcpu_statu()
- out_mcpu_5_statu = message.out_mcpu_statu()
- out_mcpu_6_statu = message.out_mcpu_statu()
- if self.ui.out_mcpu_1_statu_btn.toolTip() != "None":
- tf.Parse(self.ui.out_mcpu_1_statu_btn.toolTip(), out_mcpu_1_statu)
- if out_mcpu_1_statu.outside_safety == 2:
- export_1 = False
- if self.ui.out_mcpu_2_statu_btn.toolTip() != "None":
- tf.Parse(self.ui.out_mcpu_2_statu_btn.toolTip(), out_mcpu_2_statu)
- if out_mcpu_2_statu.outside_safety == 2:
- export_2 = False
- if self.ui.out_mcpu_3_statu_btn.toolTip() != "None":
- tf.Parse(self.ui.out_mcpu_3_statu_btn.toolTip(), out_mcpu_3_statu)
- if out_mcpu_3_statu.outside_safety == 2:
- export_3 = False
- if self.ui.out_mcpu_4_statu_btn.toolTip() != "None":
- tf.Parse(self.ui.out_mcpu_4_statu_btn.toolTip(), out_mcpu_4_statu)
- if out_mcpu_4_statu.outside_safety == 2:
- export_4 = False
- if self.ui.out_mcpu_5_statu_btn.toolTip() != "None":
- tf.Parse(self.ui.out_mcpu_5_statu_btn.toolTip(), out_mcpu_5_statu)
- if out_mcpu_5_statu.outside_safety == 2:
- export_5 = False
- if self.ui.out_mcpu_6_statu_btn.toolTip() != "None":
- tf.Parse(self.ui.out_mcpu_6_statu_btn.toolTip(), out_mcpu_6_statu)
- if out_mcpu_6_statu.outside_safety == 2:
- export_6 = False
- if g_space.command_queue_dict is not False and g_space.command_queue_dict is not None:
- for dict in g_space.command_queue_dict:
- if dict["export_id"] == 1:
- export_1 = False
- if dict["export_id"] == 2:
- export_2 = False
- if dict["export_id"] == 3:
- export_3 = False
- if dict["export_id"] == 4:
- export_4 = False
- if dict["export_id"] == 5:
- export_5 = False
- if dict["export_id"] == 6:
- export_6 = False
- if export_1 == export_2 == False:
- led = Led()
- led.setEnabled(False) # 设置为不使能状态,如果需要切换可以设置为True。则可以实现点击切换指示灯颜色的效果
- led.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
- led.setChecked(False) # 设置为选中状态
- else:
- led = Led()
- led.setEnabled(False) # 设置为不使能状态,如果需要切换可以设置为True。则可以实现点击切换指示灯颜色的效果
- led.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
- led.setChecked(True) # 设置为选中状态
- self.ui.led_process_Layout.addWidget(led, 0, 1)
- if export_3 == export_4 == False:
- led = Led()
- led.setEnabled(False) # 设置为不使能状态,如果需要切换可以设置为True。则可以实现点击切换指示灯颜色的效果
- led.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
- led.setChecked(False) # 设置为选中状态
- else:
- led = Led()
- led.setEnabled(False) # 设置为不使能状态,如果需要切换可以设置为True。则可以实现点击切换指示灯颜色的效果
- led.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
- led.setChecked(True) # 设置为选中状态
- self.ui.led_process_Layout.addWidget(led, 0, 2)
- if export_5 == export_6 == False:
- led = Led()
- led.setEnabled(False) # 设置为不使能状态,如果需要切换可以设置为True。则可以实现点击切换指示灯颜色的效果
- led.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
- led.setChecked(False) # 设置为选中状态
- else:
- led = Led()
- led.setEnabled(False) # 设置为不使能状态,如果需要切换可以设置为True。则可以实现点击切换指示灯颜色的效果
- led.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
- led.setChecked(True) # 设置为选中状态
- self.ui.led_process_Layout.addWidget(led, 0, 3)
- def drawMcpuBtn(self, list):
- ex_name = list[0]
- key = list[1]
- id = ex_name + ":" + key
- btn = self.findChild(QPushButton, key.replace("port", "btn"))
- if g_rabbitmq[id].statu is not None and g_rabbitmq[id].timeout() is False:
- btn.setToolTip(g_rabbitmq[id].statu)
- if key.find("in") >= 0:
- in_mcpu_statu = message.in_mcpu_statu()
- tf.Parse(g_rabbitmq[id].statu, in_mcpu_statu)
- if in_mcpu_statu.is_occupy == 1:
- btn.setStyleSheet(self.getBackGroundColor("green"))
- btn.setText(btn.text()[:9] + "空闲----"+btn.text()[15:])
- else:
- btn.setStyleSheet(self.getBackGroundColor("yellow"))
- btn.setText(btn.text()[:9] + "占用----"+btn.text()[15:])
- # print(btn.text())
- if in_mcpu_statu.door_statu == 2:
- btn.setText(btn.text()[:-3] + "开到位")
- elif in_mcpu_statu.door_statu == 3:
- btn.setText(btn.text()[:-3] + "关到位")
- elif in_mcpu_statu.door_statu == 4:
- btn.setText(btn.text()[:-3] + "开关中")
- elif in_mcpu_statu.door_statu == 5:
- btn.setText(btn.text()[:-3] + "门故障")
- # print(btn.text())
- # print(btn.text())
- elif btn.text().find("出口") >= 0:
- out_mcpu_statu = message.out_mcpu_statu()
- tf.Parse(g_rabbitmq[id].statu, out_mcpu_statu)
- if out_mcpu_statu.outside_safety == 1:
- btn.setStyleSheet(self.getBackGroundColor("green"))
- btn.setText(btn.text()[:9] + "空闲----" + btn.text()[15:])
- else:
- btn.setStyleSheet(self.getBackGroundColor("yellow"))
- btn.setText(btn.text()[:9] + "占用----" + btn.text()[15:])
- export_id = re.search('\d+', btn.objectName()).group()
- if g_space.command_queue_dict is not None:
- for dict in g_space.command_queue_dict:
- if dict["export_id"] == int(export_id):
- btn.setStyleSheet(self.getBackGroundColor("yellow"))
- btn.setText(btn.text()[:9] + "占用----" + btn.text()[15:])
- if out_mcpu_statu.door_statu == 2:
- btn.setText(btn.text()[:-3] + "开到位")
- elif out_mcpu_statu.door_statu == 3:
- btn.setText(btn.text()[:-3] + "关到位")
- elif out_mcpu_statu.door_statu == 4:
- btn.setText(btn.text()[:-3] + "开关中")
- elif out_mcpu_statu.door_statu == 5:
- btn.setText(btn.text()[:-3] + "门故障")
- else:
- btn.setToolTip("None")
- btn.setStyleSheet(self.getBackGroundColor("gray"))
- btn.setText(btn.text()[:9] + "断连")
- def drawSpaceBtn(self, parkspace_dict, unit):
- for park, position in zip(parkspace_dict.values(), self.btn_positions):
- btn = QPBtn()
- str = "车位:%d\n楼层:%d" % ((park["id"] - 1) % 78 + 1, park["floor"])
- if park["statu"] == 1:
- btn.setStyleSheet("background-color:rgb(255,130,130)")
- elif park["statu"] == 0 and park["car_number"] is not None:
- btn.setStyleSheet("background-color:rgb(248,239,71)")
- btn.setGeometry(0, 0, 50, 50)
- btn.setText(str)
- tool_tip = '{"id":%d,\n"floor":%d,\n "subID":%d, \n"height":%f, \n"car_number":"%s" ,\n "unit":%d, \n"statu":%d}' % (
- park["id"], park["floor"], park["subID"], park["height"], park["car_number"], park["unit"],
- park["statu"])
- btn.setToolTip(tool_tip)
- btn.clickedSignal.connect(self.btn_click)
- if unit == 1:
- self.ui.A_gridLayout.addWidget(btn, *position)
- if unit == 2:
- self.ui.B_gridLayout.addWidget(btn, *position)
- if unit == 3:
- self.ui.C_gridLayout.addWidget(btn, *position)
- def btn_click(self, flag):
- if flag is False:
- sender = self.sender()
- tool_tip = sender.toolTip()
- dict = json.loads(tool_tip)
- menu = QMenu(self)
- action = menu.addAction('启用车位')
- action.triggered.connect(partial(self.btn_opeSpace, dict, 0))
- action = menu.addAction('停用车位')
- action.triggered.connect(partial(self.btn_disableSpace, dict, 1))
- action = menu.addAction('清除车位')
- action.triggered.connect(partial(self.btn_clearSpace, dict))
- action = menu.addAction('取出车辆')
- action.triggered.connect(partial(self.btn_pickUp, dict))
- menu.exec_(QCursor.pos())
- def btn_disableSpace(self, dict, statu):
- if g_space.statu is False:
- QMessageBox.warning(self, '警告', '数据库连接错误,请检查网络状态!',
- QMessageBox.Ok) # "退出"代表的是弹出框的标题,"你确认退出.."表示弹出框的内容
- else:
- g_space.updatePark(dict, statu)
- def btn_opeSpace(self, dict, statu):
- if g_space.statu is False:
- QMessageBox.warning(self, '错误', '数据库连接错误,请检查网络状态!',
- QMessageBox.Ok) # "退出"代表的是弹出框的标题,"你确认退出.."表示弹出框的内容
- else:
- g_space.updatePark(dict, statu)
- def btn_clearSpace(self, dict):
- if g_space.statu is False:
- QMessageBox.warning(self, '错误', '数据库连接错误,请检查网络状态!',
- QMessageBox.Ok) # "退出"代表的是弹出框的标题,"你确认退出.."表示弹出框的内容
- else:
- g_space.clearPark(dict)
- def btn_pickUp(self, dict):
- if g_space.statu is False:
- QMessageBox.warning(self, '错误', '数据库连接错误,请检查网络状态!',
- QMessageBox.Ok) # "退出"代表的是弹出框的标题,"你确认退出.."表示弹出框的内容
- else:
- g_space.pickUpPark(dict)
- def run(self):
- while self.isClose is not True:
- if g_space.statu is False:
- self.setWindowTitle("MainWindow---连接错误!")
- self.setEnabled(False)
- else:
- self.setWindowTitle("MainWindow---连接正常!")
- self.setEnabled(True)
- if g_space.isUpdate_A is True:
- self.drawSpaceBtnSignal.emit(g_space.a_unit_park_dict, 1)
- g_space.isUpdate_A = False
- if g_space.isUpdate_B is True:
- self.drawSpaceBtnSignal.emit(g_space.b_unit_park_dict, 2)
- g_space.isUpdate_B = False
- if g_space.isUpdate_C is True:
- self.drawSpaceBtnSignal.emit(g_space.c_unit_park_dict, 3)
- g_space.isUpdate_C = False
- for list in statu_ex_keys:
- ex_name = list[0]
- key = list[1]
- id = ex_name + ":" + key
- if (id in self.mcpu_statu.keys()) is False or self.mcpu_statu[id].statu != g_rabbitmq[id].statu or \
- self.mcpu_timeout[id] != g_rabbitmq[id].timeout():
- self.mcpu_timeout[id] = g_rabbitmq[id].timeout()
- self.mcpu_statu[id] = g_rabbitmq[id]
- g_space.commandIsUpdate = True
- print("-----------------------------------------------")
- print("key = " + id + "" + " timeout=" + str(self.mcpu_timeout) + " statu=" + str(
- self.mcpu_statu[id].statu))
- self.drawMcpuBtnSignal.emit(list)
- time.sleep(0.01)
- if g_space.commandIsUpdate is True:
- self.drawLedSignal.emit()
- g_space.commandIsUpdate = False
- time.sleep(0.5)
- def closeEvent(self, event):
- results = QMessageBox.question(self, '退出', '你确定要退出吗?', QMessageBox.Yes | QMessageBox.No,
- QMessageBox.No) # "退出"代表的是弹出框的标题,"你确认退出.."表示弹出框的内容
- if results == QMessageBox.Yes:
- g_rabbitmq.close()
- g_space.isClose = True
- self.isClose = True
- event.accept() # 接受关闭事件
- else:
- event.ignore() # 忽略关闭事件
- g_rabbitmq = cmt.RabbitAsyncCommunicator(mq_ip, mq_port, mq_user, mq_password)
- g_space = spmng.ParkManage(db_ip, db_port, db_name, db_user, db_password, g_rabbitmq)
- if __name__ == '__main__':
- app = QApplication(sys.argv)
- g_space.start()
- g_rabbitmq.Init(None, statu_ex_keys)
- g_rabbitmq.start()
- mainWindow = MainWindow()
- mainWindow.start()
- mainWindow.showMaximized()
- time.sleep(0.5)
- mainWindow.show()
- sys.exit(app.exec_())
- # 进入程序的主循环,并通过exit函数确保主循环安全结束(该释放资源的一定要释放)
|