import json import queue import socket import threading import time class TimeStatu: def __init__(self, statu=None, timeout=3): self.statu = statu self.time = time.time() self.timeout_ms = timeout def timeout(self): tm = time.time() return tm - self.time > self.timeout_ms class McpuCommunicator: def __init__(self, mcpu_keys): self._mcpu_keys = mcpu_keys self._mcpu_iomsg = {} self._mcpu_communication_thread = {} self._mcpu_socket = {} self._mcpu_connect_status = {} self._mcpu_statu_lock = threading.Lock() self._reconnect_thread = threading.Thread(target=self.ReconnectThread) self._send_thread = threading.Thread(target=self.SendThread) self._is_close = False self._publish_msg_queue=queue.Queue() def McpuCommunicatorInit(self): if self._mcpu_keys == None: return for mouth, id, ip, port in self._mcpu_keys: key = mouth + ":mcpu_" + str(id) self._mcpu_socket[key] = {} self._mcpu_socket[key]["socket"] = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._mcpu_socket[key]["socket"].settimeout(1) self._mcpu_socket[key]["ip"] = ip self._mcpu_socket[key]["port"] = port self._mcpu_connect_status[key] = False self._mcpu_communication_thread[key] = threading.Thread(target=self.McpuCommunicationThread, args=(key,)) self._mcpu_iomsg[key] = TimeStatu(None, 0.1) self._mcpu_communication_thread[key].start() self._reconnect_thread.start() self._send_thread.start() def publish(self,key,msg): self._publish_msg_queue.put([key,msg]) def SendThread(self): while self._is_close is False: if self._publish_msg_queue.qsize() > 0: msg_bag = self._publish_msg_queue.get(False) if not msg_bag is None: key, msg = msg_bag try: self._mcpu_socket[key]["socket"].sendall(msg) print('\033[0;32m{} send msg success:{}\033[m'.format(key, msg)) except Exception as e: print('\033[0;31m{} send msg error:{}\033[m'.format(key, str(e))) time.sleep(0.001) def ReconnectThread(self): while self._is_close is False: for key, mcpu in self._mcpu_socket.items(): if self._is_close is True: break if self._mcpu_connect_status[key] == False: try: self._mcpu_socket[key]["socket"] = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._mcpu_socket[key]["socket"].settimeout(1) mcpu["socket"].connect((mcpu["ip"], mcpu["port"])) print('-----------------------------------------') print('\033[0;32m{} client start connect to host:{} port:{}\033[m'.format(key, mcpu["ip"], mcpu["port"])) with self._mcpu_statu_lock: self._mcpu_connect_status[key] = True except ConnectionRefusedError: print('\033[0;31msocket connect error, trying again .... host:{} port:{}\033[m'.format( mcpu["ip"], mcpu["port"])) with self._mcpu_statu_lock: self._mcpu_connect_status[key] = False except Exception as e: print('\033[0;31m{} host:{} port:{} connect error:{}\033[m'.format(key, mcpu["ip"], mcpu["port"], str(e))) with self._mcpu_statu_lock: self._mcpu_connect_status[key] = False time.sleep(0.1) def McpuCommunicatorUnInit(self): self._is_close = True self._reconnect_thread.join() for thread in self._mcpu_communication_thread.values(): thread.join() for mcpu in self._mcpu_socket.values(): mcpu["socket"].close() def GetMcpuIoMsg(self,key): return self._mcpu_iomsg[key] def GetAllMcpuConnectStatus(self): return self._mcpu_connect_status def GetMcpuConnectStatus(self,key): return self._mcpu_connect_status[key] def McpuCommunicationThread(self, key): while self._is_close is False: if self._mcpu_connect_status[key] is True: length_data = "" try: length_data = self._mcpu_socket[key]["socket"].recv(1024) except Exception as e: print('\033[0;31m{} recv error:{}\033[m'.format(key, str(e))) # print(len(length_data)) if len(length_data) == 0: with self._mcpu_statu_lock: self._mcpu_connect_status[key] = False elif len(length_data) > 0: head = length_data.find(b'@') tail = length_data.find(b'$') if head >= 0 and tail >= 0: bytes = length_data[head + 4:-3] self._mcpu_iomsg[key] = TimeStatu(self.recieve2message(bytes), 3) else: print('\033[0;33m{} data error:{}\033[m'.format(key, length_data)) time.sleep(0.001) def recieve2message(self, bytes): try: data = json.loads(bytes.decode()) except: return None return data