123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133 |
- import json
- import queue
- import socket
- import threading
- import time
- class TimeStatu:
- def __init__(self, statu=None, timeout=3):
- self.statu = statu
- self.time = time.time()
- self.timeout_ms = timeout
- def timeout(self):
- tm = time.time()
- return tm - self.time > self.timeout_ms
- class McpuCommunicator:
- def __init__(self, mcpu_keys):
- self._mcpu_keys = mcpu_keys
- self._mcpu_iomsg = {}
- self._mcpu_communication_thread = {}
- self._mcpu_socket = {}
- self._mcpu_connect_status = {}
- self._mcpu_statu_lock = threading.Lock()
- self._reconnect_thread = threading.Thread(target=self.ReconnectThread)
- self._send_thread = threading.Thread(target=self.SendThread)
- self._is_close = False
- self._publish_msg_queue=queue.Queue()
- def McpuCommunicatorInit(self):
- if self._mcpu_keys == None:
- return
- for mouth, id, ip, port in self._mcpu_keys:
- key = mouth + ":mcpu_" + str(id)
- self._mcpu_socket[key] = {}
- self._mcpu_socket[key]["socket"] = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self._mcpu_socket[key]["socket"].settimeout(1)
- self._mcpu_socket[key]["ip"] = ip
- self._mcpu_socket[key]["port"] = port
- self._mcpu_connect_status[key] = False
- self._mcpu_communication_thread[key] = threading.Thread(target=self.McpuCommunicationThread, args=(key,))
- self._mcpu_iomsg[key] = TimeStatu(None, 0.1)
- self._mcpu_communication_thread[key].start()
- self._reconnect_thread.start()
- self._send_thread.start()
- def publish(self,key,msg):
- self._publish_msg_queue.put([key,msg])
- def SendThread(self):
- while self._is_close is False:
- if self._publish_msg_queue.qsize() > 0:
- msg_bag = self._publish_msg_queue.get(False)
- if not msg_bag is None:
- key, msg = msg_bag
- try:
- self._mcpu_socket[key]["socket"].sendall(msg)
- print('\033[0;32m{} send msg success:{}\033[m'.format(key, msg))
- except Exception as e:
- print('\033[0;31m{} send msg error:{}\033[m'.format(key, str(e)))
- time.sleep(0.001)
- def ReconnectThread(self):
- while self._is_close is False:
- for key, mcpu in self._mcpu_socket.items():
- if self._is_close is True:
- break
- if self._mcpu_connect_status[key] == False:
- try:
- self._mcpu_socket[key]["socket"] = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self._mcpu_socket[key]["socket"].settimeout(1)
- mcpu["socket"].connect((mcpu["ip"], mcpu["port"]))
- print('-----------------------------------------')
- print('\033[0;32m{} client start connect to host:{} port:{}\033[m'.format(key, mcpu["ip"],
- mcpu["port"]))
- with self._mcpu_statu_lock:
- self._mcpu_connect_status[key] = True
- except ConnectionRefusedError:
- print('\033[0;31msocket connect error, trying again .... host:{} port:{}\033[m'.format(
- mcpu["ip"], mcpu["port"]))
- with self._mcpu_statu_lock:
- self._mcpu_connect_status[key] = False
- except Exception as e:
- print('\033[0;31m{} host:{} port:{} connect error:{}\033[m'.format(key, mcpu["ip"],
- mcpu["port"], str(e)))
- with self._mcpu_statu_lock:
- self._mcpu_connect_status[key] = False
- time.sleep(0.1)
- def McpuCommunicatorUnInit(self):
- self._is_close = True
- self._reconnect_thread.join()
- for thread in self._mcpu_communication_thread.values():
- thread.join()
- for mcpu in self._mcpu_socket.values():
- mcpu["socket"].close()
- def GetMcpuIoMsg(self,key):
- return self._mcpu_iomsg[key]
- def GetAllMcpuConnectStatus(self):
- return self._mcpu_connect_status
- def GetMcpuConnectStatus(self,key):
- return self._mcpu_connect_status[key]
- def McpuCommunicationThread(self, key):
- while self._is_close is False:
- if self._mcpu_connect_status[key] is True:
- length_data = ""
- try:
- length_data = self._mcpu_socket[key]["socket"].recv(1024)
- except Exception as e:
- print('\033[0;31m{} recv error:{}\033[m'.format(key, str(e)))
- # print(len(length_data))
- if len(length_data) == 0:
- with self._mcpu_statu_lock:
- self._mcpu_connect_status[key] = False
- elif len(length_data) > 0:
- head = length_data.find(b'@')
- tail = length_data.find(b'$')
- if head >= 0 and tail >= 0:
- bytes = length_data[head + 4:-3]
- self._mcpu_iomsg[key] = TimeStatu(self.recieve2message(bytes), 3)
- else:
- print('\033[0;33m{} data error:{}\033[m'.format(key, length_data))
- time.sleep(0.001)
- def recieve2message(self, bytes):
- try:
- data = json.loads(bytes.decode())
- except:
- return None
- return data
|