mcpu_communication.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. import json
  2. import queue
  3. import socket
  4. import threading
  5. import time
  6. class TimeStatu:
  7. def __init__(self, statu=None, timeout=3):
  8. self.statu = statu
  9. self.time = time.time()
  10. self.timeout_ms = timeout
  11. def timeout(self):
  12. tm = time.time()
  13. return tm - self.time > self.timeout_ms
  14. class McpuCommunicator:
  15. def __init__(self, mcpu_keys):
  16. self._mcpu_keys = mcpu_keys
  17. self._mcpu_iomsg = {}
  18. self._mcpu_communication_thread = {}
  19. self._mcpu_socket = {}
  20. self._mcpu_connect_status = {}
  21. self._mcpu_statu_lock = threading.Lock()
  22. self._reconnect_thread = threading.Thread(target=self.ReconnectThread)
  23. self._send_thread = threading.Thread(target=self.SendThread)
  24. self._is_close = False
  25. self._publish_msg_queue=queue.Queue()
  26. def McpuCommunicatorInit(self):
  27. if self._mcpu_keys == None:
  28. return
  29. for mouth, id, ip, port in self._mcpu_keys:
  30. key = mouth + ":mcpu_" + str(id)
  31. self._mcpu_socket[key] = {}
  32. self._mcpu_socket[key]["socket"] = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  33. self._mcpu_socket[key]["socket"].settimeout(1)
  34. self._mcpu_socket[key]["ip"] = ip
  35. self._mcpu_socket[key]["port"] = port
  36. self._mcpu_connect_status[key] = False
  37. self._mcpu_communication_thread[key] = threading.Thread(target=self.McpuCommunicationThread, args=(key,))
  38. self._mcpu_iomsg[key] = TimeStatu(None, 0.1)
  39. self._mcpu_communication_thread[key].start()
  40. self._reconnect_thread.start()
  41. self._send_thread.start()
  42. def publish(self,key,msg):
  43. self._publish_msg_queue.put([key,msg])
  44. def SendThread(self):
  45. while self._is_close is False:
  46. if self._publish_msg_queue.qsize() > 0:
  47. msg_bag = self._publish_msg_queue.get(False)
  48. if not msg_bag is None:
  49. key, msg = msg_bag
  50. try:
  51. self._mcpu_socket[key]["socket"].sendall(msg)
  52. print('\033[0;32m{} send msg success:{}\033[m'.format(key, msg))
  53. except Exception as e:
  54. print('\033[0;31m{} send msg error:{}\033[m'.format(key, str(e)))
  55. time.sleep(0.001)
  56. def ReconnectThread(self):
  57. while self._is_close is False:
  58. for key, mcpu in self._mcpu_socket.items():
  59. if self._is_close is True:
  60. break
  61. if self._mcpu_connect_status[key] == False:
  62. try:
  63. self._mcpu_socket[key]["socket"] = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  64. self._mcpu_socket[key]["socket"].settimeout(1)
  65. mcpu["socket"].connect((mcpu["ip"], mcpu["port"]))
  66. print('-----------------------------------------')
  67. print('\033[0;32m{} client start connect to host:{} port:{}\033[m'.format(key, mcpu["ip"],
  68. mcpu["port"]))
  69. with self._mcpu_statu_lock:
  70. self._mcpu_connect_status[key] = True
  71. except ConnectionRefusedError:
  72. print('\033[0;31msocket connect error, trying again .... host:{} port:{}\033[m'.format(
  73. mcpu["ip"], mcpu["port"]))
  74. with self._mcpu_statu_lock:
  75. self._mcpu_connect_status[key] = False
  76. except Exception as e:
  77. print('\033[0;31m{} host:{} port:{} connect error:{}\033[m'.format(key, mcpu["ip"],
  78. mcpu["port"], str(e)))
  79. with self._mcpu_statu_lock:
  80. self._mcpu_connect_status[key] = False
  81. time.sleep(0.1)
  82. def McpuCommunicatorUnInit(self):
  83. self._is_close = True
  84. self._reconnect_thread.join()
  85. for thread in self._mcpu_communication_thread.values():
  86. thread.join()
  87. for mcpu in self._mcpu_socket.values():
  88. mcpu["socket"].close()
  89. def GetMcpuIoMsg(self,key):
  90. return self._mcpu_iomsg[key]
  91. def GetAllMcpuConnectStatus(self):
  92. return self._mcpu_connect_status
  93. def GetMcpuConnectStatus(self,key):
  94. return self._mcpu_connect_status[key]
  95. def McpuCommunicationThread(self, key):
  96. while self._is_close is False:
  97. if self._mcpu_connect_status[key] is True:
  98. length_data = ""
  99. try:
  100. length_data = self._mcpu_socket[key]["socket"].recv(1024)
  101. except Exception as e:
  102. print('\033[0;31m{} recv error:{}\033[m'.format(key, str(e)))
  103. # print(len(length_data))
  104. if len(length_data) == 0:
  105. with self._mcpu_statu_lock:
  106. self._mcpu_connect_status[key] = False
  107. elif len(length_data) > 0:
  108. head = length_data.find(b'@')
  109. tail = length_data.find(b'$')
  110. if head >= 0 and tail >= 0:
  111. bytes = length_data[head + 4:-3]
  112. self._mcpu_iomsg[key] = TimeStatu(self.recieve2message(bytes), 3)
  113. else:
  114. print('\033[0;33m{} data error:{}\033[m'.format(key, length_data))
  115. time.sleep(0.001)
  116. def recieve2message(self, bytes):
  117. try:
  118. data = json.loads(bytes.decode())
  119. except:
  120. return None
  121. return data