db_query.py 3.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. import time
  2. import threading
  3. import pymysql as psql
  4. class DBQuery(threading.Thread):
  5. def __init__(self,ip,port,database,user,password):
  6. threading.Thread.__init__(self)
  7. self.ip=ip
  8. self.port=port
  9. self.database=database
  10. self.user=user
  11. self.password=password
  12. self.conn=psql.connect(host=self.ip,port=self.port,database=self.database,charset="utf8",user=self.user,passwd=self.password)
  13. self.unit_cmd_1=None
  14. self.unit_cmd_2=None
  15. self.unit_cmd_3=None
  16. self.isClose = False
  17. self.callback = None
  18. def Close(self):
  19. self.isClose = True
  20. def setCallback(self,callback):
  21. self.callback = callback
  22. def connect(self):
  23. try:
  24. self.conn = psql.connect(host=self.ip, port=self.port, database=self.database, charset="utf8",
  25. user=self.user, passwd=self.password)
  26. return True
  27. except:
  28. return False
  29. def run(self):
  30. while self.isClose is False:
  31. try:
  32. self.conn.ping() # 采用连接对象的ping()函数检测连接状态
  33. except:
  34. self.connect()
  35. time.sleep(0.5)
  36. continue
  37. #获取一个光标
  38. cursor = self.conn.cursor()
  39. #查询指令队列所有信息
  40. SQL1="select * from command_queue where unit = 1 and type = 2 order by queue_id ASC;"
  41. SQL2="select * from command_queue where unit = 2 and type = 2 order by queue_id ASC;"
  42. SQL3="select * from command_queue where unit = 3 and type = 2 order by queue_id ASC;"
  43. cmd_dict1 = {}
  44. cmd_dict2 = {}
  45. cmd_dict3 = {}
  46. #执行语句 返回结果数量
  47. command_count=cursor.execute(SQL1)
  48. self.conn.commit()
  49. #结果数量大于0
  50. if(command_count > 0):
  51. queue_cmd=cursor.fetchall()
  52. column=[index[0] for index in cursor.description ]# 列名
  53. cmd_dict1 = [dict(zip(column, row)) for row in queue_cmd] # row是数据库返回的一条一条记录,其中的每一天和column写成字典,最后就是字典数组
  54. command_count=cursor.execute(SQL2)
  55. self.conn.commit()
  56. #结果数量大于0
  57. if(command_count > 0):
  58. queue_cmd=cursor.fetchall()
  59. column=[index[0] for index in cursor.description ]# 列名
  60. cmd_dict2 = [dict(zip(column, row)) for row in queue_cmd] # row是数据库返回的一条一条记录,其中的每一天和column写成字典,最后就是字典数组
  61. command_count=cursor.execute(SQL3)
  62. self.conn.commit()
  63. #结果数量大于0
  64. if(command_count > 0):
  65. queue_cmd=cursor.fetchall()
  66. column=[index[0] for index in cursor.description ]# 列名
  67. cmd_dict3 = [dict(zip(column, row)) for row in queue_cmd] # row是数据库返回的一条一条记录,其中的每一天和column写成字典,最后就是字典数组
  68. if(cmd_dict1 != self.unit_cmd_1):
  69. self.unit_cmd_1 = cmd_dict1.copy()
  70. if self.callback is not None and len(self.unit_cmd_1) != 0:
  71. self.callback(self.unit_cmd_1,1)
  72. if(cmd_dict2 != self.unit_cmd_2):
  73. self.unit_cmd_2 = cmd_dict2.copy()
  74. if self.callback is not None and len(self.unit_cmd_2) != 0:
  75. self.callback(self.unit_cmd_2,2)
  76. if(cmd_dict3 != self.unit_cmd_3):
  77. self.unit_cmd_3 = cmd_dict3.copy()
  78. if self.callback is not None and len(self.unit_cmd_3) != 0:
  79. self.callback(self.unit_cmd_3,3)
  80. # 关闭光标
  81. cursor.close()
  82. time.sleep(1)