Browse Source

1、完善消息格式;

LiuZe 1 year ago
parent
commit
a2da78313d

+ 0 - 12
Modules/Tof3D/etc/device.json

@@ -5,18 +5,6 @@
             "enable": true,
             "enable": true,
             "type": "DEVICE_TYPE_VZ_DS_77_LITE",
             "type": "DEVICE_TYPE_VZ_DS_77_LITE",
             "ipv4": "192.168.2.101"
             "ipv4": "192.168.2.101"
-        },
-        {
-            "id": 2,
-            "enable": false,
-            "type": "DEVICE_TYPE_VZ_DS_77_LITE",
-            "ipv4": "192.168.1.101"
-        },
-        {
-            "id": 4,
-            "enable": false,
-            "type": "DEVICE_TYPE_VZ_DS_77_LITE",
-            "ipv4": "192.168.1.102"
         }
         }
     ]
     ]
 }
 }

+ 2 - 0
Modules/Tof3D/etc/example.txt

@@ -0,0 +1,2 @@
+{"func": "getCameraFrame", "params": {"id":0, "path": "/home/zx/", "depth_name": "111"}}
+{"func": "loopGetCameraFrame", "params": {"id":0, "path": "/home/zx/saveImage/", "times":10, "interval":0}}

+ 4 - 4
Modules/Tof3D/etc/rabbitmq.json

@@ -1,12 +1,12 @@
 {
 {
-    "ip": "192.168.9.115",
+    "ip": "127.0.0.1",
     "port": 5672,
     "port": 5672,
-    "user": "wk",
-    "password": "123456",
+    "user": "zx",
+    "password": "zx123456",
     "binds": [
     "binds": [
         {
         {
             "ex": "statu_ex",
             "ex": "statu_ex",
-            "route_key": "user_park_command_3117_response_queue"
+            "route_key": "test_command_queue"
         }
         }
     ]
     ]
 }
 }

+ 8 - 5
Modules/Tof3D/main.py

@@ -13,24 +13,27 @@ if __name__ == '__main__':
                                             rabbitmq_etc.user, rabbitmq_etc.password)
                                             rabbitmq_etc.user, rabbitmq_etc.password)
     statu_ex_keys = []
     statu_ex_keys = []
     for bind in rabbitmq_etc.binds:
     for bind in rabbitmq_etc.binds:
-        key = []
-        key.append(bind.ex)
-        key.append(bind.route_key)
+        key = [bind.ex, bind.route_key]
         statu_ex_keys.append(key)
         statu_ex_keys.append(key)
 
 
+    glog.info(statu_ex_keys)
     # 初始化
     # 初始化
     g_rabbitmq.Init(None, statu_ex_keys)
     g_rabbitmq.Init(None, statu_ex_keys)
     g_rabbitmq.start()
     g_rabbitmq.start()
     tofs = TofManager()
     tofs = TofManager()
     tofs.searchAllCamera()
     tofs.searchAllCamera()
     etc = tofs.getAllCameraEtc()
     etc = tofs.getAllCameraEtc()
-    tofs.openCamera(0)
+    tofs.openAllCamera()
     tofs.startAllCameraStream()
     tofs.startAllCameraStream()
 
 
     # 绑定rabbitmq
     # 绑定rabbitmq
     g_rabbitmq.bind_statu_callback(statu_ex_keys[0][0], statu_ex_keys[0][1], tofs.exec)
     g_rabbitmq.bind_statu_callback(statu_ex_keys[0][0], statu_ex_keys[0][1], tofs.exec)
 
 
-    g_rabbitmq.publish(statu_ex_keys[0][0], statu_ex_keys[0][1], "cao")
+    while True:
+        message = input("input q or Q exit this progress\n")
+        if message == "q" or message == "Q":
+            break
+
     # 释放
     # 释放
     tofs.stopAllCameraStream()
     tofs.stopAllCameraStream()
     tofs.closeAllCamera()
     tofs.closeAllCamera()

+ 32 - 0
Modules/Tof3D/rabbitmqSendMessage.py

@@ -0,0 +1,32 @@
+import SDK.ZX.async_communication as cm
+import SDK.ZX.async_communication_etc_pb2 as rce
+import SDK.ZX.tool as zx_tool
+import os
+
+if __name__ == '__main__':
+    # 加载配置
+    rabbitmq_etc = zx_tool.getProtobufJsonConfig(os.path.dirname(os.path.abspath(__file__)) + "/etc/rabbitmq.json",
+                                                 rce.RabbitmqEtc())
+    print(rabbitmq_etc)
+    g_rabbitmq = cm.RabbitAsyncCommunicator(rabbitmq_etc.ip, rabbitmq_etc.port,
+                                            rabbitmq_etc.user, rabbitmq_etc.password)
+    statu_ex_keys = []
+    for bind in rabbitmq_etc.binds:
+        key = [bind.ex, bind.route_key]
+        statu_ex_keys.append(key)
+
+    # 初始化
+    g_rabbitmq.Init(None, statu_ex_keys)
+    g_rabbitmq.start()
+
+    while True:
+        message = input("Input message: ")
+        if message == "Q" or message == "q":
+            break
+        else:
+            print(message)
+        g_rabbitmq.publish(statu_ex_keys[0][0], statu_ex_keys[0][1], message)
+
+    # 强制结束进程
+    pid = os.getpid()
+    os.kill(pid, 1)

+ 0 - 103
Modules/Tof3D/samples.py

@@ -1,103 +0,0 @@
-from pickle import FALSE, TRUE
-import sys
-
-from SDK.Vzense.VzenseDS_api import *
-import time
-
-camera = VzenseTofCam()
-
-camera_count = camera.VZ_GetDeviceCount()
-retry_count = 100
-while camera_count == 0 and retry_count > 0:
-    retry_count = retry_count - 1
-    camera_count = camera.VZ_GetDeviceCount()
-    time.sleep(1)
-    print("scaning......   ", retry_count)
-
-device_info = VzDeviceInfo()
-
-if camera_count > 1:
-    ret, device_infolist = camera.VZ_GetDeviceInfoList(camera_count)
-    if ret == 0:
-        device_info = device_infolist[0]
-        for info in device_infolist:
-            print('cam uri:  ' + str(info.uri))
-    else:
-        print(' failed:' + ret)
-        exit()
-elif camera_count == 1:
-    ret, device_info = camera.VZ_GetDeviceInfo()
-    if ret == 0:
-        print('cam uri:' + str(device_info.uri))
-    else:
-        print(' failed:' + ret)
-        exit()
-else:
-    print("there are no camera found")
-    exit()
-
-if VzConnectStatus.Connected.value != device_info.status:
-    print("connect statu:", device_info.status)
-    print("Call VZ_OpenDeviceByUri with connect status :", VzConnectStatus.Connected.value)
-    exit()
-else:
-    print("uri: " + str(device_info.uri))
-    print("alias: " + str(device_info.alias))
-    print("connectStatus: " + str(device_info.status))
-
-ret = camera.VZ_OpenDeviceByUri(device_info.uri)
-if ret == 0:
-    print("open device successful")
-else:
-    print('VZ_OpenDeviceByUri failed: ' + str(ret))
-
-ret = camera.VZ_StartStream()
-if ret == 0:
-    print("start stream successful")
-else:
-    print("VZ_StartStream failed:", ret)
-
-while 1:
-    ret, frameready = camera.VZ_GetFrameReady(c_uint16(1000))
-    if ret != 0:
-        print("VZ_GetFrameReady failed:", ret)
-        continue
-
-    if frameready.depth:
-        ret, frame = camera.VZ_GetFrame(VzFrameType.VzDepthFrame)
-        if ret == 0:
-            ret, pointlist = camera.VZ_ConvertDepthFrameToPointCloudVector(frame)
-            if ret == 0:
-                curPath = os.getcwd()
-                print(curPath)
-                folder = curPath + "/save"
-                if not os.path.exists(folder):
-                    print("not exists")
-                    os.makedirs(folder)
-                else:
-                    print("already exists")
-
-                filename = folder + "/point.txt"
-                file = open(filename, "w")
-
-                for i in range(frame.width * frame.height):
-                    if pointlist[i].z != 0 and pointlist[i].z != 65535:
-                        file.write("{0},{1},{2}\n".format(pointlist[i].x, pointlist[i].y, pointlist[i].z))
-
-                file.close()
-                print("save ok")
-            else:
-                print("VZ_ConvertDepthFrameToWorldVector failed:", ret)
-        break
-
-ret = camera.VZ_StopStream()
-if ret == 0:
-    print("stop stream successful")
-else:
-    print('VZ_StopStream failed: ' + str(ret))
-
-ret = camera.VZ_CloseDevice()
-if ret == 0:
-    print("close device successful")
-else:
-    print('VZ_CloseDevice failed: ' + str(ret))

+ 92 - 21
Modules/Tof3D/tofManager.py

@@ -1,5 +1,5 @@
 import etc_pb2 as etc
 import etc_pb2 as etc
-import os, glog, numpy, cv2, SDK.ZX.tool
+import os, glog, numpy, cv2, SDK.ZX.tool, json, time
 from tofDevice import *
 from tofDevice import *
 import SDK.Vzense.VzenseDS_api as tof
 import SDK.Vzense.VzenseDS_api as tof
 
 
@@ -27,7 +27,7 @@ class TofManager:
         frametmp = cv2.applyColorMap(img, cv2.COLORMAP_RAINBOW)
         frametmp = cv2.applyColorMap(img, cv2.COLORMAP_RAINBOW)
 
 
         if not os.path.exists(path):
         if not os.path.exists(path):
-            print("not exists")
+            glog.info(path + " not exists, mkdit it.")
             os.makedirs(path)
             os.makedirs(path)
         filename = path + name
         filename = path + name
         cv2.imwrite(filename, frametmp, [cv2.IMWRITE_JPEG_QUALITY, 100])
         cv2.imwrite(filename, frametmp, [cv2.IMWRITE_JPEG_QUALITY, 100])
@@ -39,27 +39,64 @@ class TofManager:
         self.camera = tof.VzenseTofCam()
         self.camera = tof.VzenseTofCam()
         self.camera_list = {}
         self.camera_list = {}
         self.search_flag = False
         self.search_flag = False
+        self.liscenFunc = [["getCameraFrame", self.getCameraFrame],
+                           ["updateTofsEtc", self.updateTofsEtc],
+                           ["getCameraEtc", self.getCameraEtc],
+                           ["getAllCameraEtc", self.getAllCameraEtc],
+                           ["getCameraList", self.getCameraList],
+                           ["searchAllCamera", self.searchAllCamera],
+                           ["openCamera", self.openCamera],
+                           ["openAllCamera", self.openAllCamera],
+                           ["closeCamera", self.closeCamera],
+                           ["closeAllCamera", self.closeAllCamera],
+                           ["startCameraStream", self.startCameraStream],
+                           ["startAllCameraStream", self.startAllCameraStream],
+                           ["stopCamera", self.stopCamera],
+                           ["stopAllCameraStream", self.stopAllCameraStream],
+                           ["getCameraFrame", self.getCameraFrame],
+                           ["test", self.test],
+                           ["loopGetCameraFrame", self.loopGetCameraFrame]
+                           ]
+
+        # self.t1 = threading(target=func, args=('第一个线程', 1))
+
+    def test(self, data):
+        glog.info(data)
 
 
     def exec(self, req_data):
     def exec(self, req_data):
-        print(req_data.statu)
+        try:
+            json_data = json.loads(req_data.statu)
+            glog.info(json_data)
+            if type(json_data) != dict:
+                glog.warning("receive data not is dict")
+                return
+            for bind_func in self.liscenFunc:
+                if json_data["func"] == bind_func[0] and type(json_data["params"]) == dict:
+                    bind_func[1](json_data["params"])
+                    return
+                glog.warning(json_data["params"])
+                    # {"func": "test", "params": {"id": 1}}
+        except ValueError as e:
+            glog.error(req_data.statu)
 
 
-    def updateTofsEtc(self):
+    def updateTofsEtc(self, receive_data=""):
         etc_proto = SDK.ZX.tool.getProtobufJsonConfig(self.etc_file, etc.DevicesConfig())
         etc_proto = SDK.ZX.tool.getProtobufJsonConfig(self.etc_file, etc.DevicesConfig())
         for tof_etc in etc_proto.devices:
         for tof_etc in etc_proto.devices:
             self.tofs_etc[tof_etc.id] = tof_etc
             self.tofs_etc[tof_etc.id] = tof_etc
         glog.info(self.tofs_etc)
         glog.info(self.tofs_etc)
         self.search_flag = False
         self.search_flag = False
 
 
-    def getCameraEtc(self, id):
+    def getCameraEtc(self, receive_data=""):
+        id = receive_data["id"]
         return self.tofs_etc[id]
         return self.tofs_etc[id]
 
 
-    def getAllCameraEtc(self):
+    def getAllCameraEtc(self, receive_data=""):
         return self.tofs_etc
         return self.tofs_etc
 
 
-    def getCameraList(self):
+    def getCameraList(self, receive_data=""):
         return self.camera_list
         return self.camera_list
 
 
-    def searchAllCamera(self):
+    def searchAllCamera(self, receive_data=""):
         # 搜索相机,会确保配置中启用的相机全部搜索到
         # 搜索相机,会确保配置中启用的相机全部搜索到
         camera_count = self.camera.VZ_GetDeviceCount()
         camera_count = self.camera.VZ_GetDeviceCount()
         retry_count = 60
         retry_count = 60
@@ -94,7 +131,8 @@ class TofManager:
             glog.info("scaning...... " + str(retry_count) + " already scanned" + str(camera_count) + " devices.")
             glog.info("scaning...... " + str(retry_count) + " already scanned" + str(camera_count) + " devices.")
         return self.search_flag
         return self.search_flag
 
 
-    def openCamera(self, id):
+    def openCamera(self, receive_data=""):
+        id = receive_data["id"]
         if id in self.camera_list:
         if id in self.camera_list:
             glog.info("camera " + self.tofs_etc[id].ipv4 + " already opend.")
             glog.info("camera " + self.tofs_etc[id].ipv4 + " already opend.")
             return tof.VzReturnStatus.VzRetOK
             return tof.VzReturnStatus.VzRetOK
@@ -110,7 +148,7 @@ class TofManager:
         else:
         else:
             glog.info(self.tofs_etc[id].ipv4 + ' VZ_OpenDeviceByIP failed: ' + str(ret))
             glog.info(self.tofs_etc[id].ipv4 + ' VZ_OpenDeviceByIP failed: ' + str(ret))
 
 
-    def openAllCamera(self):
+    def openAllCamera(self, receive_data=""):
         for tof_etc in self.tofs_etc:
         for tof_etc in self.tofs_etc:
             if self.tofs_etc[tof_etc].enable is False:
             if self.tofs_etc[tof_etc].enable is False:
                 continue
                 continue
@@ -122,7 +160,8 @@ class TofManager:
             else:
             else:
                 glog.info(self.tofs_etc[tof_etc].ipv4 + ' VZ_OpenDeviceByIP failed: ' + str(ret))
                 glog.info(self.tofs_etc[tof_etc].ipv4 + ' VZ_OpenDeviceByIP failed: ' + str(ret))
 
 
-    def closeCamera(self, id):
+    def closeCamera(self, receive_data=""):
+        id = receive_data["id"]
         if (id not in self.camera_list):
         if (id not in self.camera_list):
             glog.info("camera " + self.tofs_etc[id].ipv4 + " already closed.")
             glog.info("camera " + self.tofs_etc[id].ipv4 + " already closed.")
             return tof.VzReturnStatus.VzRetOK
             return tof.VzReturnStatus.VzRetOK
@@ -134,7 +173,7 @@ class TofManager:
             glog.warning("VZ_CloseDevice " + str(self.tofs_etc[id].ipv4) + " failed: " + str(ret))
             glog.warning("VZ_CloseDevice " + str(self.tofs_etc[id].ipv4) + " failed: " + str(ret))
         return ret
         return ret
 
 
-    def closeAllCamera(self):
+    def closeAllCamera(self, receive_data=""):
         for id in self.camera_list:
         for id in self.camera_list:
             ret = self.camera_list[id].VZ_CloseDevice()
             ret = self.camera_list[id].VZ_CloseDevice()
             if ret == 0:
             if ret == 0:
@@ -143,7 +182,8 @@ class TofManager:
                 glog.warning("VZ_CloseDevice " + str(self.tofs_etc[id].ipv4) + " failed: " + str(ret))
                 glog.warning("VZ_CloseDevice " + str(self.tofs_etc[id].ipv4) + " failed: " + str(ret))
         self.camera_list.clear()
         self.camera_list.clear()
 
 
-    def startCameraStream(self, id):
+    def startCameraStream(self, receive_data=""):
+        id = receive_data["id"]
         if id in self.camera_list:
         if id in self.camera_list:
             ret = self.camera_list[id].VZ_StartStream()
             ret = self.camera_list[id].VZ_StartStream()
             if ret == 0:
             if ret == 0:
@@ -153,7 +193,7 @@ class TofManager:
         else:
         else:
             glog.warning("camera " + self.tofs_etc[id].ip + "not open, please open this camera")
             glog.warning("camera " + self.tofs_etc[id].ip + "not open, please open this camera")
 
 
-    def startAllCameraStream(self):
+    def startAllCameraStream(self, receive_data=""):
         for id in self.camera_list:
         for id in self.camera_list:
             ret = self.camera_list[id].VZ_StartStream()
             ret = self.camera_list[id].VZ_StartStream()
             if ret == 0:
             if ret == 0:
@@ -161,7 +201,8 @@ class TofManager:
             else:
             else:
                 glog.info(self.tofs_etc[id].ipv4 + ' VZ_StartStream failed: ' + str(ret))
                 glog.info(self.tofs_etc[id].ipv4 + ' VZ_StartStream failed: ' + str(ret))
 
 
-    def stopCamera(self, id):
+    def stopCamera(self, receive_data=""):
+        id = receive_data["id"]
         if id in self.camera_list:
         if id in self.camera_list:
             ret = self.camera_list[id].VZ_StopStream()
             ret = self.camera_list[id].VZ_StopStream()
             if ret == 0:
             if ret == 0:
@@ -171,7 +212,7 @@ class TofManager:
         else:
         else:
             glog.warning("camera " + self.tofs_etc[id].ip + "not open, please open this camera")
             glog.warning("camera " + self.tofs_etc[id].ip + "not open, please open this camera")
 
 
-    def stopAllCameraStream(self):
+    def stopAllCameraStream(self, receive_data=""):
         for id in self.camera_list:
         for id in self.camera_list:
             ret = self.camera_list[id].VZ_StopStream()
             ret = self.camera_list[id].VZ_StopStream()
             if ret == 0:
             if ret == 0:
@@ -179,27 +220,57 @@ class TofManager:
             else:
             else:
                 glog.info(self.tofs_etc[id].ipv4 + ' VZ_StopStream failed: ' + str(ret))
                 glog.info(self.tofs_etc[id].ipv4 + ' VZ_StopStream failed: ' + str(ret))
 
 
-    def getCameraFrame(self, id):
+    def getCameraFrame(self, receive_data=""):
+        id, path, depth_name, ir_name = 0, "", "", ""
+        if "id" not in receive_data or "path" not in receive_data:
+            return
+        id = receive_data["id"]
+        path = receive_data["path"]
+        if "depth_name" in receive_data:
+            depth_name = receive_data["depth_name"]
+        if "ir_name" in receive_data:
+            ir_name = receive_data["ir_name"]
+
+        if id == "" or path == "":
+            return
+
         if id in self.camera_list:
         if id in self.camera_list:
             ret, frameready = self.camera_list[id].VZ_GetFrameReady(c_uint16(1000))
             ret, frameready = self.camera_list[id].VZ_GetFrameReady(c_uint16(1000))
             if ret != 0:
             if ret != 0:
                 glog.error("VZ_GetFrameReady failed: %d", ret)
                 glog.error("VZ_GetFrameReady failed: %d", ret)
                 return
                 return
 
 
-            if frameready.depth:
+            if frameready.depth and depth_name != "":
                 ret, depthframe = self.camera_list[id].VZ_GetFrame(tof.VzFrameType.VzDepthFrame)
                 ret, depthframe = self.camera_list[id].VZ_GetFrame(tof.VzFrameType.VzDepthFrame)
                 if ret == 0:
                 if ret == 0:
-                    self.saveDepthFrame2Image(os.getcwd() + "/save/", "depthframe.jpg", depthframe)
+                    self.saveDepthFrame2Image(path, depth_name + ".jpg", depthframe)
                     glog.info(self.tofs_etc[id].ipv4 + "  depth frameindex: " + str(depthframe.frameIndex))
                     glog.info(self.tofs_etc[id].ipv4 + "  depth frameindex: " + str(depthframe.frameIndex))
                 else:
                 else:
                     glog.warning("VZ_GetFrame error %d", ret)
                     glog.warning("VZ_GetFrame error %d", ret)
-            if frameready.ir:
+            if frameready.ir and ir_name != "":
                 ret, irframe = self.camera_list[id].VZ_GetFrame(tof.VzFrameType.VzIRFrame)
                 ret, irframe = self.camera_list[id].VZ_GetFrame(tof.VzFrameType.VzIRFrame)
                 if ret == 0:
                 if ret == 0:
-                    self.saveIrFrame2Image(os.getcwd() + "/save/", "irframe.jpg", irframe)
+                    self.saveIrFrame2Image(path, ir_name + ".jpg", irframe)
                     glog.info(self.tofs_etc[id].ipv4 + "  ir frameindex: " + str(irframe.frameIndex))
                     glog.info(self.tofs_etc[id].ipv4 + "  ir frameindex: " + str(irframe.frameIndex))
                 else:
                 else:
                     glog.warning("VZ_GetFrame error %d", ret)
                     glog.warning("VZ_GetFrame error %d", ret)
 
 
+    def loopGetCameraFrame(self, receive_data=""):
+        path, times, interval = 0, 0, 0
+        if "path" not in receive_data or "times" not in receive_data or "interval" not in receive_data:
+            return
+        times = receive_data["times"]
+        interval = receive_data["interval"]
+        if interval < 0.2:
+            interval = 0.2
+
+        while times:
+            receive_data["depth_name"] = "Depth_" + str(receive_data["times"] - times + 1)
+            receive_data["ir_name"] = "Ir_" + str(receive_data["times"] - times + 1)
+            glog.info(receive_data)
+            self.getCameraFrame(receive_data)
+            times = times - 1
+            time.sleep(interval)
+
     def setCameraEtc(self, ip, tof_etc=etc.VzenseTofDevices()):
     def setCameraEtc(self, ip, tof_etc=etc.VzenseTofDevices()):
         glog.info("=======================")
         glog.info("=======================")

+ 68 - 65
SDK/ZX/async_communication.py

@@ -4,21 +4,23 @@ import asyncio
 import aio_pika
 import aio_pika
 import queue
 import queue
 
 
+
 class TimeStatu:
 class TimeStatu:
-    def __init__(self,statu=None,timeout=3):
-        self.statu=statu
-        self.time=time.time()
-        self.timeout_ms=timeout
+    def __init__(self, statu=None, timeout=3):
+        self.statu = statu
+        self.time = time.time()
+        self.timeout_ms = timeout
 
 
     def timeout(self):
     def timeout(self):
-        tm=time.time()
-        return tm-self.time>self.timeout_ms
+        tm = time.time()
+        return tm - self.time > self.timeout_ms
+
 
 
 class RabbitAsyncCommunicator(threading.Thread):
 class RabbitAsyncCommunicator(threading.Thread):
-    def __init__(self, host,port, user, password):
+    def __init__(self, host, port, user, password):
         threading.Thread.__init__(self)
         threading.Thread.__init__(self)
         self._host = host
         self._host = host
-        self._port=port
+        self._port = port
         self._user = user
         self._user = user
         self._password = password
         self._password = password
         self._connection = None
         self._connection = None
@@ -26,101 +28,102 @@ class RabbitAsyncCommunicator(threading.Thread):
         self._channel_send = None
         self._channel_send = None
         self._channel_statu = None
         self._channel_statu = None
 
 
-        self._consumer_callbacks= None
-        self._recv_status=None
-        self._queue_callbacks= {}
-        self._publish_msg_queue=queue.Queue()
-        self._status={}
-        self._statu_callbacks={}
+        self._consumer_callbacks = None
+        self._recv_status = None
+        self._queue_callbacks = {}
+        self._publish_msg_queue = queue.Queue()
+        self._status = {}
+        self._statu_callbacks = {}
         self._closing = False
         self._closing = False
 
 
-    def Init(self,consumer_callbacks,recv_status):
-        self._consumer_callbacks=consumer_callbacks
-        self._recv_status=recv_status
-        if self._recv_status==None:
+    def Init(self, consumer_callbacks, recv_status):
+        self._consumer_callbacks = consumer_callbacks
+        self._recv_status = recv_status
+        if self._recv_status == None:
             return
             return
-        for ex_name,key in self._recv_status:
-            self._status[ex_name+":"+key]=TimeStatu(None,0.1)
+        for ex_name, key in self._recv_status:
+            self._status[ex_name + ":" + key] = TimeStatu(None, 0.1)
 
 
-    def publish(self,ex_name,key,msg):
-        self._publish_msg_queue.put([ex_name,key,msg])
+    def publish(self, ex_name, key, msg):
+        self._publish_msg_queue.put([ex_name, key, msg])
 
 
-    def bind_statu_callback(self,ex_name,key,callback):
-        self._statu_callbacks[ex_name+":"+key]=callback
+    def bind_statu_callback(self, ex_name, key, callback):
+        self._statu_callbacks[ex_name + ":" + key] = callback
 
 
     def close(self):
     def close(self):
-        self._closing=True
+        self._closing = True
 
 
     async def init(self):
     async def init(self):
-        connection_string="amqp://%s:%s@%s/"%(self._user,self._password,self._host)
+        connection_string = "amqp://%s:%s@%s/" % (self._user, self._password, self._host)
         self._connection = await aio_pika.connect_robust(connection_string)
         self._connection = await aio_pika.connect_robust(connection_string)
         self._channel_consumer = await self._connection.channel()
         self._channel_consumer = await self._connection.channel()
         self._channel_send = await self._connection.channel()
         self._channel_send = await self._connection.channel()
         self._channel_statu = await self._connection.channel()
         self._channel_statu = await self._connection.channel()
         # Will take no more than 10 messages in advance
         # Will take no more than 10 messages in advance
         await self._channel_consumer.set_qos(prefetch_count=1)
         await self._channel_consumer.set_qos(prefetch_count=1)
-        if self._consumer_callbacks==None:
+        if self._consumer_callbacks == None:
             return
             return
-        for queue_name,callback in self._consumer_callbacks:
-            queue= await self._channel_consumer.get_queue(queue_name)
-            self._queue_callbacks[queue]=callback
-
-
-    def statu_callback(self,ex,key,msg):
-        id=ex+":"+key
-        self._status[id]=TimeStatu(msg)
-        callback=self._statu_callbacks.get(id)
-        if not callback==None:
+        for queue_name, callback in self._consumer_callbacks:
+            queue = await self._channel_consumer.get_queue(queue_name)
+            self._queue_callbacks[queue] = callback
+
+    def statu_callback(self, ex, key, msg):
+        id = ex + ":" + key
+        self._status[id] = TimeStatu(msg)
+        callback = self._statu_callbacks.get(id)
+        if not callback == None:
             callback(self._status[id])
             callback(self._status[id])
 
 
     def __getitem__(self, key):
     def __getitem__(self, key):
         return self._status[key]
         return self._status[key]
+
     def __setitem__(self, key, value):
     def __setitem__(self, key, value):
-        self._status[key]=value
+        self._status[key] = value
 
 
-    async def recv(self,queue,callback):
+    async def recv(self, queue, callback):
         async with queue.iterator() as queue_iter:
         async with queue.iterator() as queue_iter:
             async for message in queue_iter:
             async for message in queue_iter:
-                if not callback==None:
+                if not callback == None:
                     callback(message.body.decode())
                     callback(message.body.decode())
                     await message.ack()
                     await message.ack()
-                if self._closing==True:
+                if self._closing == True:
                     return
                     return
-    async def recv_statu(self,ex_name,key,ttl=200):
-        statu_ex=await self._channel_statu.get_exchange(ex_name)
-        arg={}
-        arg["x-message-ttl"]=ttl
-        queue=await self._channel_statu.declare_queue("", auto_delete=True,arguments=arg)
-        await queue.bind(statu_ex,routing_key=key)
+
+    async def recv_statu(self, ex_name, key, ttl=200):
+        statu_ex = await self._channel_statu.get_exchange(ex_name)
+        arg = {}
+        arg["x-message-ttl"] = ttl
+        queue = await self._channel_statu.declare_queue("", auto_delete=True, arguments=arg)
+        await queue.bind(statu_ex, routing_key=key)
         async with queue.iterator() as queue_iter:
         async with queue.iterator() as queue_iter:
             async for message in queue_iter:
             async for message in queue_iter:
                 async with message.process():
                 async with message.process():
-                    self.statu_callback(ex_name,key,message.body.decode())
-                if self._closing==True:
+                    self.statu_callback(ex_name, key, message.body.decode())
+                if self._closing == True:
                     return
                     return
 
 
     async def send(self):
     async def send(self):
-        while self._closing==False:
-            if self._publish_msg_queue.qsize()>0:
-                msg_bag=self._publish_msg_queue.get(False)
-                if not msg_bag==None:
-                    ex_name,key,msg=msg_bag
-                    ex= await self._channel_send.get_exchange(ex_name)
-                    await ex.publish(aio_pika.Message(body=msg.encode()),routing_key=key)
+        while self._closing == False:
+            if self._publish_msg_queue.qsize() > 0:
+                msg_bag = self._publish_msg_queue.get(False)
+                if not msg_bag == None:
+                    ex_name, key, msg = msg_bag
+                    ex = await self._channel_send.get_exchange(ex_name)
+                    await ex.publish(aio_pika.Message(body=msg.encode()), routing_key=key)
             await asyncio.sleep(0.001)
             await asyncio.sleep(0.001)
-            #time.sleep(0.001)
-
+            # time.sleep(0.001)
 
 
     async def main(self):
     async def main(self):
         await self.init()
         await self.init()
-        tasks=[]
+        tasks = []
         tasks.append(self.send())
         tasks.append(self.send())
-        if not self._consumer_callbacks==None:
-            for queue,callback in self._queue_callbacks.items():
-                tasks.append(self.recv(queue,callback))
-        if not self._recv_status==None:
-            for ex_name,key in self._recv_status:
-                tasks.append(self.recv_statu(ex_name,key))
+        if not self._consumer_callbacks == None:
+            for queue, callback in self._queue_callbacks.items():
+                tasks.append(self.recv(queue, callback))
+        if not self._recv_status == None:
+            for ex_name, key in self._recv_status:
+                tasks.append(self.recv_statu(ex_name, key))
         await asyncio.gather(*tasks)
         await asyncio.gather(*tasks)
+
     def run(self):
     def run(self):
         asyncio.run(self.main())
         asyncio.run(self.main())

+ 1 - 1
SDK/ZX/tool.py

@@ -1,4 +1,4 @@
-import json
+import json, os
 from google.protobuf import json_format
 from google.protobuf import json_format