123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194 |
- import socket
- import datetime
- import time
- string_color = {
- "UNKNOWN":'', #
- "RED":'\\C1', #红色, 默认
- "GREEN":'\\C2', #绿色
- "YELLOW":'\\C3', #黄色
- "BLUE":'\\C4', #蓝色
- "CYAN":'\\C5', #青色
- "PURPLE":'\\C6', #紫色
- "WHITE":'\\C7' #白色
- }
- class LedControl:
- def __init__(self,ip,port):
- self.ip=ip
- self.port=port
- def sendLedMsg(self,area_index, input, color=string_color["UNKNOWN"]):
- t_time = datetime.datetime.now()
- while (datetime.datetime.now() - t_time).microseconds < 30000:
- self.sock=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.sock.settimeout(1)
- try:
- self.sock.connect((self.ip,self.port))
- self.sock.send(self.string2bytes(area_index, input, color))
- print("connected led \033[0:32mSUCCEND\033[m ip:%s port:%d\nsend msg area_index:%d msg:%s" % (self.ip,self.port,area_index,input))
- break
- except Exception as e:
- print("connected led \033[0:31mERROR\033[m: %s" % e.args)
- time.sleep(0.01)
- self.sock.close()
- time.sleep(0.5)
- def getInput(self,cmd_queue):
- queue_string = ""
- work_string = ""
- complete_string = ""
- for cmd in cmd_queue:
- car_number = "\\C7"+cmd["car_number"]
- if(cmd["statu"] == 0):#排队
- statu = "\\C1排 队"
- queue_string = queue_string + car_number + " " + statu + "\\n"
- elif(cmd["statu"] == 1):#工作
- statu = "\\C3取车中"
- work_string = work_string + car_number + " " + statu + "\\n"
- elif(cmd["statu"] == 2):#已完成
- statu = "\\C2出口"+ str(cmd["export_id"])
- complete_string = complete_string + car_number + " " + statu + "\\n"
- led_show_string = complete_string + work_string + queue_string
- return led_show_string
- def string2bytes(self,area_index, input, color):
- input = color + input
- # area_index = 0 #显示区域编号
- width = 133 #显示区域宽,单位像素
- height = 60 #显示区域高,单位像素
- x = area_index*width + area_index #显示区域左上角x,单位像素
- y = 0 #显示区域左上角y,单位像素
-
- #msg = bytearray(input, encoding='utf-8')
- msg = bytearray(input, encoding='gbk')
-
- m_frame_head = bytearray(8) #帧头, 默认8个0xA5
- m_frame_head[0:8] = b'\xA5\xA5\xA5\xA5\xA5\xA5\xA5\xA5'
-
- m_frame_configuration = bytearray(14) #包头, 配置参数, 14个byte
- m_frame_configuration[0:12]=b'\xfe\xff\x00\x80\x00\x00\x00\x00\x00\x00\x63\x02'
- # m_frame_configuration 最后2个是 数据域长度,后面在写
-
- m_cmd_head = bytearray(7) #指令头, 默认7个byte
- m_cmd_head[0:7] = b'\xA3\x06\x01\x00\x00\x00\x01'
-
- m_area_head = bytearray(29) #区域头, 默认29个byte
- # m_area_head 前面2个是 区域的数据长度, 后面在写
- m_area_head[2] = 0
- t_area_x = 0x8000+x # 区域 X 坐标,默认以字节(8 个像素点)为单位, 高字节最高位为 1 时,表示以像素点为单位
- m_area_head[3]=t_area_x%256
- m_area_head[4]=int(t_area_x/256)
- t_area_y = y #区域 Y 坐标,以像素点为单位
- m_area_head[5]=t_area_y%256
- m_area_head[6]=int(t_area_y/256)
- t_area_width = 0x8000+width #区域宽度,默认以字节(8 个像素点)为单位, 高字节最高位为 1 时,表示以像素点为单位
- m_area_head[7]=t_area_width%256
- m_area_head[8]=int(t_area_width/256)
- t_area_height = height #区域高度,以像素点为单位
- m_area_head[9]=t_area_height%256
- m_area_head[10]=int(t_area_height/256)
- m_area_head[11] = area_index
- m_area_head[12:25] = b'\x00\x00\x02\x00\x00\x00\x00\x02\x01\x01\x00\x00\x0a'
- #m_area_head[21] = 1 #0x01——静止显示
- m_area_head[21] = 5 #0x05——向上移动
- #m_area_head后面4个是 显示字符串 的长度, 后续在写.
- #长度整合
- t_show_string_len = len(msg)
- m_area_head[25] = t_show_string_len%256
- m_area_head[26]=(int(t_show_string_len/256))%256
- m_area_head[27]=(int(t_show_string_len/256/256))%256
- m_area_head[28]=(int(t_show_string_len/256/256/256))%256
- t_area_data_len = 29-2+ len(msg)
- m_area_head[0]=t_area_data_len%256
- m_area_head[1]=int(t_area_data_len/256)
- t_data_len = 7 + 29 + len(msg)
- m_frame_configuration[12] = t_data_len%256
- m_frame_configuration[13] = int(t_data_len/256)
- #crc校验码
- t_result_src = m_frame_configuration + m_cmd_head + m_area_head +msg
- crc=self.crc16(t_result_src)
- [a1,a2]=self.dec2hex(crc)
- m_frame_crc = bytearray(2) #crc校验码, 2个byte, 包校验为包头数据和数据域的校验值
- m_frame_crc[0]=a1
- m_frame_crc[1]=a2
- p_src = t_result_src + m_frame_crc
- p_dst = bytearray(1024)
- j=0
- for i in range(len(p_src)):
- if p_src[i] == 0xA5:
- p_dst[j] = 0xA6
- p_dst[j+1] = 0x02
- j+=2
- elif p_src[i] == 0xA6:
- p_dst[j] = 0xA6
- p_dst[j+1] = 0x01
- j+=2
- elif p_src[i] == 0x5A:
- p_dst[j] = 0x5B
- p_dst[j+1] = 0x02
- j+=2
- elif p_src[i] == 0x5B:
- p_dst[j] = 0x5B
- p_dst[j+1] = 0x01
- j+=2
- else:
- p_dst[j] = p_src[i]
- j = j+1
- m_frame_tail = bytearray(1)
- m_frame_tail[0] = 90 #就是0x5A
- output = m_frame_head + p_dst[0:j] + m_frame_tail
- # print('--------------------------------')
- # print(output)
- # print(len(output))
- return output
- def crc16(self,bytes):
- crc=0
- for byte in bytes:
- crc=byte^crc
- for i in range(8):
- if crc&0x01==1:
- crc=(crc>>1)^0xA001
- else:
- crc=crc>>1
- return crc
- def dec2hex(self,num):
- a=int(num/256)
- b=int(num%256)
- return [b,a]
- # if __name__=="__main__":
- # sock=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- # sock.settimeout(1)
- # sock.connect(("192.168.1.163",5005))
- #
- # print("connected")
- #
- #
- # time.sleep(1)
- # bytes=string2bytes(0, '楚天车库', string_color["RED"])
- # sock.send(bytes)
- # time.sleep(1)
- # bytes=string2bytes(1, '停车入口', string_color["GREEN"])
- # sock.send(bytes)
- # time.sleep(1)
- # bytes=string2bytes(2, '暂停服务', string_color["YELLOW"])
- # sock.send(bytes)
- # #bytes=string2bytes(3, '\\C1暂停\\C2服务', string_color["UNKNOWN"])
- # #sock.send(bytes)
- #
- #
- # sock.close()
- # print("end")
|