LedControl.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. import socket
  2. import datetime
  3. import time
  4. string_color = {
  5. "UNKNOWN":'', #
  6. "RED":'\\C1', #红色, 默认
  7. "GREEN":'\\C2', #绿色
  8. "YELLOW":'\\C3', #黄色
  9. "BLUE":'\\C4', #蓝色
  10. "CYAN":'\\C5', #青色
  11. "PURPLE":'\\C6', #紫色
  12. "WHITE":'\\C7' #白色
  13. }
  14. class LedControl:
  15. def __init__(self,ip,port):
  16. self.ip=ip
  17. self.port=port
  18. def sendLedMsg(self,area_index, input, color=string_color["UNKNOWN"]):
  19. t_time = datetime.datetime.now()
  20. while (datetime.datetime.now() - t_time).microseconds < 30000:
  21. self.sock=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  22. self.sock.settimeout(1)
  23. try:
  24. self.sock.connect((self.ip,self.port))
  25. self.sock.send(self.string2bytes(area_index, input, color))
  26. print("connected led \033[0:32mSUCCEND\033[m ip:%s port:%d\nsend msg area_index:%d msg:%s" % (self.ip,self.port,area_index,input))
  27. break
  28. except Exception as e:
  29. print("connected led \033[0:31mERROR\033[m: %s" % e.args)
  30. time.sleep(0.01)
  31. self.sock.close()
  32. time.sleep(0.5)
  33. def getInput(self,cmd_queue):
  34. queue_string = ""
  35. work_string = ""
  36. complete_string = ""
  37. for cmd in cmd_queue:
  38. car_number = "\\C7"+cmd["car_number"]
  39. if(cmd["statu"] == 0):#排队
  40. statu = "\\C1排 队"
  41. queue_string = queue_string + car_number + " " + statu + "\\n"
  42. elif(cmd["statu"] == 1):#工作
  43. statu = "\\C3取车中"
  44. work_string = work_string + car_number + " " + statu + "\\n"
  45. elif(cmd["statu"] == 2):#已完成
  46. statu = "\\C2出口"+ str(cmd["export_id"])
  47. complete_string = complete_string + car_number + " " + statu + "\\n"
  48. led_show_string = complete_string + work_string + queue_string
  49. return led_show_string
  50. def string2bytes(self,area_index, input, color):
  51. input = color + input
  52. # area_index = 0 #显示区域编号
  53. width = 133 #显示区域宽,单位像素
  54. height = 60 #显示区域高,单位像素
  55. x = area_index*width + area_index #显示区域左上角x,单位像素
  56. y = 0 #显示区域左上角y,单位像素
  57. #msg = bytearray(input, encoding='utf-8')
  58. msg = bytearray(input, encoding='gbk')
  59. m_frame_head = bytearray(8) #帧头, 默认8个0xA5
  60. m_frame_head[0:8] = b'\xA5\xA5\xA5\xA5\xA5\xA5\xA5\xA5'
  61. m_frame_configuration = bytearray(14) #包头, 配置参数, 14个byte
  62. m_frame_configuration[0:12]=b'\xfe\xff\x00\x80\x00\x00\x00\x00\x00\x00\x63\x02'
  63. # m_frame_configuration 最后2个是 数据域长度,后面在写
  64. m_cmd_head = bytearray(7) #指令头, 默认7个byte
  65. m_cmd_head[0:7] = b'\xA3\x06\x01\x00\x00\x00\x01'
  66. m_area_head = bytearray(29) #区域头, 默认29个byte
  67. # m_area_head 前面2个是 区域的数据长度, 后面在写
  68. m_area_head[2] = 0
  69. t_area_x = 0x8000+x # 区域 X 坐标,默认以字节(8 个像素点)为单位, 高字节最高位为 1 时,表示以像素点为单位
  70. m_area_head[3]=t_area_x%256
  71. m_area_head[4]=int(t_area_x/256)
  72. t_area_y = y #区域 Y 坐标,以像素点为单位
  73. m_area_head[5]=t_area_y%256
  74. m_area_head[6]=int(t_area_y/256)
  75. t_area_width = 0x8000+width #区域宽度,默认以字节(8 个像素点)为单位, 高字节最高位为 1 时,表示以像素点为单位
  76. m_area_head[7]=t_area_width%256
  77. m_area_head[8]=int(t_area_width/256)
  78. t_area_height = height #区域高度,以像素点为单位
  79. m_area_head[9]=t_area_height%256
  80. m_area_head[10]=int(t_area_height/256)
  81. m_area_head[11] = area_index
  82. m_area_head[12:25] = b'\x00\x00\x02\x00\x00\x00\x00\x02\x01\x01\x00\x00\x0a'
  83. #m_area_head[21] = 1 #0x01——静止显示
  84. m_area_head[21] = 5 #0x05——向上移动
  85. #m_area_head后面4个是 显示字符串 的长度, 后续在写.
  86. #长度整合
  87. t_show_string_len = len(msg)
  88. m_area_head[25] = t_show_string_len%256
  89. m_area_head[26]=(int(t_show_string_len/256))%256
  90. m_area_head[27]=(int(t_show_string_len/256/256))%256
  91. m_area_head[28]=(int(t_show_string_len/256/256/256))%256
  92. t_area_data_len = 29-2+ len(msg)
  93. m_area_head[0]=t_area_data_len%256
  94. m_area_head[1]=int(t_area_data_len/256)
  95. t_data_len = 7 + 29 + len(msg)
  96. m_frame_configuration[12] = t_data_len%256
  97. m_frame_configuration[13] = int(t_data_len/256)
  98. #crc校验码
  99. t_result_src = m_frame_configuration + m_cmd_head + m_area_head +msg
  100. crc=self.crc16(t_result_src)
  101. [a1,a2]=self.dec2hex(crc)
  102. m_frame_crc = bytearray(2) #crc校验码, 2个byte, 包校验为包头数据和数据域的校验值
  103. m_frame_crc[0]=a1
  104. m_frame_crc[1]=a2
  105. p_src = t_result_src + m_frame_crc
  106. p_dst = bytearray(1024)
  107. j=0
  108. for i in range(len(p_src)):
  109. if p_src[i] == 0xA5:
  110. p_dst[j] = 0xA6
  111. p_dst[j+1] = 0x02
  112. j+=2
  113. elif p_src[i] == 0xA6:
  114. p_dst[j] = 0xA6
  115. p_dst[j+1] = 0x01
  116. j+=2
  117. elif p_src[i] == 0x5A:
  118. p_dst[j] = 0x5B
  119. p_dst[j+1] = 0x02
  120. j+=2
  121. elif p_src[i] == 0x5B:
  122. p_dst[j] = 0x5B
  123. p_dst[j+1] = 0x01
  124. j+=2
  125. else:
  126. p_dst[j] = p_src[i]
  127. j = j+1
  128. m_frame_tail = bytearray(1)
  129. m_frame_tail[0] = 90 #就是0x5A
  130. output = m_frame_head + p_dst[0:j] + m_frame_tail
  131. # print('--------------------------------')
  132. # print(output)
  133. # print(len(output))
  134. return output
  135. def crc16(self,bytes):
  136. crc=0
  137. for byte in bytes:
  138. crc=byte^crc
  139. for i in range(8):
  140. if crc&0x01==1:
  141. crc=(crc>>1)^0xA001
  142. else:
  143. crc=crc>>1
  144. return crc
  145. def dec2hex(self,num):
  146. a=int(num/256)
  147. b=int(num%256)
  148. return [b,a]
  149. # if __name__=="__main__":
  150. # sock=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  151. # sock.settimeout(1)
  152. # sock.connect(("192.168.1.163",5005))
  153. #
  154. # print("connected")
  155. #
  156. #
  157. # time.sleep(1)
  158. # bytes=string2bytes(0, '楚天车库', string_color["RED"])
  159. # sock.send(bytes)
  160. # time.sleep(1)
  161. # bytes=string2bytes(1, '停车入口', string_color["GREEN"])
  162. # sock.send(bytes)
  163. # time.sleep(1)
  164. # bytes=string2bytes(2, '暂停服务', string_color["YELLOW"])
  165. # sock.send(bytes)
  166. # #bytes=string2bytes(3, '\\C1暂停\\C2服务', string_color["UNKNOWN"])
  167. # #sock.send(bytes)
  168. #
  169. #
  170. # sock.close()
  171. # print("end")