import socket import datetime import time string_color = { "UNKNOWN":'', # "RED":'\\C1', #红色, 默认 "GREEN":'\\C2', #绿色 "YELLOW":'\\C3', #黄色 "BLUE":'\\C4', #蓝色 "CYAN":'\\C5', #青色 "PURPLE":'\\C6', #紫色 "WHITE":'\\C7' #白色 } class LedControl: def __init__(self,ip,port): self.ip=ip self.port=port def sendLedMsg(self,area_index, input, color=string_color["UNKNOWN"]): t_time = datetime.datetime.now() while (datetime.datetime.now() - t_time).microseconds < 30000: self.sock=socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.settimeout(1) try: self.sock.connect((self.ip,self.port)) self.sock.send(self.string2bytes(area_index, input, color)) print("connected led \033[0:32mSUCCEND\033[m ip:%s port:%d\nsend msg area_index:%d msg:%s" % (self.ip,self.port,area_index,input)) break except Exception as e: print("connected led \033[0:31mERROR\033[m: %s" % e.args) time.sleep(0.01) self.sock.close() time.sleep(0.5) def getInput(self,cmd_queue): queue_string = "" work_string = "" complete_string = "" for cmd in cmd_queue: car_number = "\\C7"+cmd["car_number"] if(cmd["statu"] == 0):#排队 statu = "\\C1排 队" queue_string = queue_string + car_number + " " + statu + "\\n" elif(cmd["statu"] == 1):#工作 statu = "\\C3取车中" work_string = work_string + car_number + " " + statu + "\\n" elif(cmd["statu"] == 2):#已完成 statu = "\\C2出口"+ str(cmd["export_id"]) complete_string = complete_string + car_number + " " + statu + "\\n" led_show_string = complete_string + work_string + queue_string return led_show_string def string2bytes(self,area_index, input, color): input = color + input # area_index = 0 #显示区域编号 width = 133 #显示区域宽,单位像素 height = 60 #显示区域高,单位像素 x = area_index*width + area_index #显示区域左上角x,单位像素 y = 0 #显示区域左上角y,单位像素 #msg = bytearray(input, encoding='utf-8') msg = bytearray(input, encoding='gbk') m_frame_head = bytearray(8) #帧头, 默认8个0xA5 m_frame_head[0:8] = b'\xA5\xA5\xA5\xA5\xA5\xA5\xA5\xA5' m_frame_configuration = bytearray(14) #包头, 配置参数, 14个byte m_frame_configuration[0:12]=b'\xfe\xff\x00\x80\x00\x00\x00\x00\x00\x00\x63\x02' # m_frame_configuration 最后2个是 数据域长度,后面在写 m_cmd_head = bytearray(7) #指令头, 默认7个byte m_cmd_head[0:7] = b'\xA3\x06\x01\x00\x00\x00\x01' m_area_head = bytearray(29) #区域头, 默认29个byte # m_area_head 前面2个是 区域的数据长度, 后面在写 m_area_head[2] = 0 t_area_x = 0x8000+x # 区域 X 坐标,默认以字节(8 个像素点)为单位, 高字节最高位为 1 时,表示以像素点为单位 m_area_head[3]=t_area_x%256 m_area_head[4]=int(t_area_x/256) t_area_y = y #区域 Y 坐标,以像素点为单位 m_area_head[5]=t_area_y%256 m_area_head[6]=int(t_area_y/256) t_area_width = 0x8000+width #区域宽度,默认以字节(8 个像素点)为单位, 高字节最高位为 1 时,表示以像素点为单位 m_area_head[7]=t_area_width%256 m_area_head[8]=int(t_area_width/256) t_area_height = height #区域高度,以像素点为单位 m_area_head[9]=t_area_height%256 m_area_head[10]=int(t_area_height/256) m_area_head[11] = area_index m_area_head[12:25] = b'\x00\x00\x02\x00\x00\x00\x00\x02\x01\x01\x00\x00\x0a' #m_area_head[21] = 1 #0x01——静止显示 m_area_head[21] = 5 #0x05——向上移动 #m_area_head后面4个是 显示字符串 的长度, 后续在写. #长度整合 t_show_string_len = len(msg) m_area_head[25] = t_show_string_len%256 m_area_head[26]=(int(t_show_string_len/256))%256 m_area_head[27]=(int(t_show_string_len/256/256))%256 m_area_head[28]=(int(t_show_string_len/256/256/256))%256 t_area_data_len = 29-2+ len(msg) m_area_head[0]=t_area_data_len%256 m_area_head[1]=int(t_area_data_len/256) t_data_len = 7 + 29 + len(msg) m_frame_configuration[12] = t_data_len%256 m_frame_configuration[13] = int(t_data_len/256) #crc校验码 t_result_src = m_frame_configuration + m_cmd_head + m_area_head +msg crc=self.crc16(t_result_src) [a1,a2]=self.dec2hex(crc) m_frame_crc = bytearray(2) #crc校验码, 2个byte, 包校验为包头数据和数据域的校验值 m_frame_crc[0]=a1 m_frame_crc[1]=a2 p_src = t_result_src + m_frame_crc p_dst = bytearray(1024) j=0 for i in range(len(p_src)): if p_src[i] == 0xA5: p_dst[j] = 0xA6 p_dst[j+1] = 0x02 j+=2 elif p_src[i] == 0xA6: p_dst[j] = 0xA6 p_dst[j+1] = 0x01 j+=2 elif p_src[i] == 0x5A: p_dst[j] = 0x5B p_dst[j+1] = 0x02 j+=2 elif p_src[i] == 0x5B: p_dst[j] = 0x5B p_dst[j+1] = 0x01 j+=2 else: p_dst[j] = p_src[i] j = j+1 m_frame_tail = bytearray(1) m_frame_tail[0] = 90 #就是0x5A output = m_frame_head + p_dst[0:j] + m_frame_tail # print('--------------------------------') # print(output) # print(len(output)) return output def crc16(self,bytes): crc=0 for byte in bytes: crc=byte^crc for i in range(8): if crc&0x01==1: crc=(crc>>1)^0xA001 else: crc=crc>>1 return crc def dec2hex(self,num): a=int(num/256) b=int(num%256) return [b,a] # if __name__=="__main__": # sock=socket.socket(socket.AF_INET, socket.SOCK_STREAM) # sock.settimeout(1) # sock.connect(("192.168.1.163",5005)) # # print("connected") # # # time.sleep(1) # bytes=string2bytes(0, '楚天车库', string_color["RED"]) # sock.send(bytes) # time.sleep(1) # bytes=string2bytes(1, '停车入口', string_color["GREEN"]) # sock.send(bytes) # time.sleep(1) # bytes=string2bytes(2, '暂停服务', string_color["YELLOW"]) # sock.send(bytes) # #bytes=string2bytes(3, '\\C1暂停\\C2服务', string_color["UNKNOWN"]) # #sock.send(bytes) # # # sock.close() # print("end")