# 导入pymysql import socket import time import LedControl as lc import google.protobuf.text_format as tf import pymysql as psql #db参数 db_ip="192.168.1.233" db_port=3306 db_name="ct_project" db_user="zx" db_password="zx123456" #led参数 led_ip="192.168.1.169" led_port=5005 #LED消息发送器 g_lc=lc.LedControl(led_ip,led_port) class DBQuery(): def __init__(self,ip,port,database,user,password): self.ip=ip self.port=port self.database=database self.user=user self.password=password self.conn=psql.connect(host=self.ip,port=self.port,database=self.database,charset="utf8",user=self.user,passwd=self.password) self.unit_cmd_1=None self.unit_cmd_2=None self.unit_cmd_3=None def run(self): while True: #获取一个光标 cursor = self.conn.cursor() #查询指令队列所有信息 SQL1="select * from command_queue where unit = 1 and type = 2 order by queue_id ASC;" SQL2="select * from command_queue where unit = 2 and type = 2 order by queue_id ASC;" SQL3="select * from command_queue where unit = 3 and type = 2 order by queue_id ASC;" cmd_dict1 = {} cmd_dict2 = {} cmd_dict3 = {} #执行语句 返回结果数量 command_count=cursor.execute(SQL1) self.conn.commit() #结果数量大于0 if(command_count > 0): queue_cmd=cursor.fetchall() column=[index[0] for index in cursor.description ]# 列名 cmd_dict1 = [dict(zip(column, row)) for row in queue_cmd] # row是数据库返回的一条一条记录,其中的每一天和column写成字典,最后就是字典数组 command_count=cursor.execute(SQL2) self.conn.commit() #结果数量大于0 if(command_count > 0): queue_cmd=cursor.fetchall() column=[index[0] for index in cursor.description ]# 列名 cmd_dict2 = [dict(zip(column, row)) for row in queue_cmd] # row是数据库返回的一条一条记录,其中的每一天和column写成字典,最后就是字典数组 command_count=cursor.execute(SQL3) self.conn.commit() #结果数量大于0 if(command_count > 0): queue_cmd=cursor.fetchall() column=[index[0] for index in cursor.description ]# 列名 cmd_dict3 = [dict(zip(column, row)) for row in queue_cmd] # row是数据库返回的一条一条记录,其中的每一天和column写成字典,最后就是字典数组 if(cmd_dict1 != self.unit_cmd_1): self.unit_cmd_1 = cmd_dict1.copy() led_show_string = g_lc.getInput(self.unit_cmd_1) g_lc.sendLedMsg(0,led_show_string) if(cmd_dict2 != self.unit_cmd_2): self.unit_cmd_2 = cmd_dict2.copy() led_show_string = g_lc.getInput(self.unit_cmd_2) g_lc.sendLedMsg(1,led_show_string) if(cmd_dict3 != self.unit_cmd_3): self.unit_cmd_3 = cmd_dict3.copy() led_show_string = g_lc.getInput(self.unit_cmd_3) g_lc.sendLedMsg(2,led_show_string) # 关闭光标 cursor.close() time.sleep(1) if __name__=="__main__": db_query = DBQuery(db_ip,db_port,db_name,db_user,db_password) db_query.run()