LedControl.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. import socket
  2. import datetime
  3. import time
  4. string_color = {
  5. "UNKNOWN":'', #
  6. "RED":'\\C1', #红色, 默认
  7. "GREEN":'\\C2', #绿色
  8. "YELLOW":'\\C3', #黄色
  9. "BLUE":'\\C4', #蓝色
  10. "CYAN":'\\C5', #青色
  11. "PURPLE":'\\C6', #紫色
  12. "WHITE":'\\C7' #白色
  13. }
  14. class LedControl:
  15. def __init__(self,ip,port):
  16. self.ip=ip
  17. self.port=port
  18. def sendLedMsg(self,area_index, input, color=string_color["UNKNOWN"]):
  19. t_time = datetime.datetime.now()
  20. while (datetime.datetime.now() - t_time).microseconds < 30000:
  21. self.sock=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  22. self.sock.settimeout(1)
  23. try:
  24. self.sock.connect((self.ip,self.port))
  25. self.sock.send(self.string2bytes(area_index, input, color))
  26. print("connected led \033[0:32mSUCCEND\033[m ip:%s port:%d\nsend msg area_index:%d msg:%s" % (self.ip,self.port,area_index,input))
  27. break
  28. except Exception as e:
  29. print("connected led \033[0:31mERROR\033[m: %s" % e.args)
  30. time.sleep(0.01)
  31. self.sock.close()
  32. time.sleep(0.5)
  33. def getInput(self,cmd_queue):
  34. led_show_string = ""
  35. queue_string = ""
  36. work_string = ""
  37. complete_string = ""
  38. for cmd in cmd_queue:
  39. car_number = "\\C7"+cmd["car_number"]
  40. if(cmd["statu"] == 0):#排队
  41. statu = "\\C1排 队"
  42. queue_string = queue_string + car_number + " " + statu + "\\n"
  43. elif(cmd["statu"] == 1):#工作
  44. statu = "\\C3取车中"
  45. work_string = work_string + car_number + " " + statu + "\\n"
  46. elif(cmd["statu"] == 2):#已完成
  47. statu = "\\C2出口"+ str(cmd["export_id"])
  48. complete_string = complete_string + car_number + " " + statu + "\\n"
  49. led_show_string = complete_string + work_string + queue_string
  50. return led_show_string
  51. def string2bytes(self,area_index, input, color):
  52. input = color + input
  53. # area_index = 0 #显示区域编号
  54. width = 133 #显示区域宽,单位像素
  55. height = 60 #显示区域高,单位像素
  56. x = area_index*width + area_index #显示区域左上角x,单位像素
  57. y = 0 #显示区域左上角y,单位像素
  58. #msg = bytearray(input, encoding='utf-8')
  59. msg = bytearray(input, encoding='gbk')
  60. m_frame_head = bytearray(8) #帧头, 默认8个0xA5
  61. m_frame_head[0:8] = b'\xA5\xA5\xA5\xA5\xA5\xA5\xA5\xA5'
  62. m_frame_configuration = bytearray(14) #包头, 配置参数, 14个byte
  63. m_frame_configuration[0:12]=b'\xfe\xff\x00\x80\x00\x00\x00\x00\x00\x00\x63\x02'
  64. # m_frame_configuration 最后2个是 数据域长度,后面在写
  65. m_cmd_head = bytearray(7) #指令头, 默认7个byte
  66. m_cmd_head[0:7] = b'\xA3\x06\x01\x00\x00\x00\x01'
  67. m_area_head = bytearray(29) #区域头, 默认29个byte
  68. # m_area_head 前面2个是 区域的数据长度, 后面在写
  69. m_area_head[2] = 0
  70. t_area_x = 0x8000+x # 区域 X 坐标,默认以字节(8 个像素点)为单位, 高字节最高位为 1 时,表示以像素点为单位
  71. m_area_head[3]=t_area_x%256
  72. m_area_head[4]=int(t_area_x/256)
  73. t_area_y = y #区域 Y 坐标,以像素点为单位
  74. m_area_head[5]=t_area_y%256
  75. m_area_head[6]=int(t_area_y/256)
  76. t_area_width = 0x8000+width #区域宽度,默认以字节(8 个像素点)为单位, 高字节最高位为 1 时,表示以像素点为单位
  77. m_area_head[7]=t_area_width%256
  78. m_area_head[8]=int(t_area_width/256)
  79. t_area_height = height #区域高度,以像素点为单位
  80. m_area_head[9]=t_area_height%256
  81. m_area_head[10]=int(t_area_height/256)
  82. m_area_head[11] = area_index
  83. m_area_head[12:25] = b'\x00\x00\x02\x00\x00\x00\x00\x02\x01\x01\x00\x00\x0a'
  84. #m_area_head[21] = 1 #0x01——静止显示
  85. m_area_head[21] = 5 #0x05——向上移动
  86. #m_area_head后面4个是 显示字符串 的长度, 后续在写.
  87. #长度整合
  88. t_show_string_len = len(msg)
  89. m_area_head[25] = t_show_string_len%256
  90. m_area_head[26]=(int(t_show_string_len/256))%256
  91. m_area_head[27]=(int(t_show_string_len/256/256))%256
  92. m_area_head[28]=(int(t_show_string_len/256/256/256))%256
  93. t_area_data_len = 29-2+ len(msg)
  94. m_area_head[0]=t_area_data_len%256
  95. m_area_head[1]=int(t_area_data_len/256)
  96. t_data_len = 7 + 29 + len(msg)
  97. m_frame_configuration[12] = t_data_len%256
  98. m_frame_configuration[13] = int(t_data_len/256)
  99. #crc校验码
  100. t_result_src = m_frame_configuration + m_cmd_head + m_area_head +msg
  101. crc=self.crc16(t_result_src)
  102. [a1,a2]=self.dec2hex(crc)
  103. m_frame_crc = bytearray(2) #crc校验码, 2个byte, 包校验为包头数据和数据域的校验值
  104. m_frame_crc[0]=a1
  105. m_frame_crc[1]=a2
  106. p_src = t_result_src + m_frame_crc
  107. p_dst = bytearray(1024)
  108. j=0
  109. for i in range(len(p_src)):
  110. if p_src[i] == 0xA5:
  111. p_dst[j] = 0xA6
  112. p_dst[j+1] = 0x02
  113. j+=2
  114. elif p_src[i] == 0xA6:
  115. p_dst[j] = 0xA6
  116. p_dst[j+1] = 0x01
  117. j+=2
  118. elif p_src[i] == 0x5A:
  119. p_dst[j] = 0x5B
  120. p_dst[j+1] = 0x02
  121. j+=2
  122. elif p_src[i] == 0x5B:
  123. p_dst[j] = 0x5B
  124. p_dst[j+1] = 0x01
  125. j+=2
  126. else:
  127. p_dst[j] = p_src[i]
  128. j = j+1
  129. m_frame_tail = bytearray(1)
  130. m_frame_tail[0] = 90 #就是0x5A
  131. output = m_frame_head + p_dst[0:j] + m_frame_tail
  132. # print('--------------------------------')
  133. # print(output)
  134. # print(len(output))
  135. return output
  136. def crc16(self,bytes):
  137. crc=0
  138. for byte in bytes:
  139. crc=byte^crc
  140. for i in range(8):
  141. if crc&0x01==1:
  142. crc=(crc>>1)^0xA001
  143. else:
  144. crc=crc>>1
  145. return crc
  146. def dec2hex(self,num):
  147. a=int(num/256)
  148. b=int(num%256)
  149. return [b,a]
  150. # if __name__=="__main__":
  151. # sock=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  152. # sock.settimeout(1)
  153. # sock.connect(("192.168.1.163",5005))
  154. #
  155. # print("connected")
  156. #
  157. #
  158. # time.sleep(1)
  159. # bytes=string2bytes(0, '楚天车库', string_color["RED"])
  160. # sock.send(bytes)
  161. # time.sleep(1)
  162. # bytes=string2bytes(1, '停车入口', string_color["GREEN"])
  163. # sock.send(bytes)
  164. # time.sleep(1)
  165. # bytes=string2bytes(2, '暂停服务', string_color["YELLOW"])
  166. # sock.send(bytes)
  167. # #bytes=string2bytes(3, '\\C1暂停\\C2服务', string_color["UNKNOWN"])
  168. # #sock.send(bytes)
  169. #
  170. #
  171. # sock.close()
  172. # print("end")