@@ -0,0 +1,205 @@
+import etc_pb2 as etc
+import os, glog, numpy, cv2, SDK.ZX.tool
+from tofDevice import *
+import SDK.Vzense.VzenseDS_api as tof
+class TofManager:
+ def saveIrFrame2Image(self, path, name, frame):
+ frametmp = numpy.ctypeslib.as_array(frame.pFrameData, (1, frame.dataLen))
+ frametmp.dtype = numpy.uint8
+ frametmp.shape = (frame.height, frame.width)
+ if not os.path.exists(path):
+ os.makedirs(path)
+ filename = path + name
+ cv2.imwrite(filename, frametmp, [cv2.IMWRITE_JPEG_QUALITY, 100])
+ def saveDepthFrame2Image(self, path, name, frame):
+ frametmp = numpy.ctypeslib.as_array(frame.pFrameData, (1, frame.width * frame.height * 2))
+ frametmp.dtype = numpy.uint16
+ frametmp.shape = (frame.height, frame.width)
+ # convert ushort value to 0xff is just for display
+ img = numpy.int32(frametmp)
+ img = img * 255 / c_uint16(7495)
+ img = numpy.clip(img, 0, 255)
+ img = numpy.uint8(img)
+ frametmp = cv2.applyColorMap(img, cv2.COLORMAP_RAINBOW)
+ if not os.path.exists(path):
+ print("not exists")
+ os.makedirs(path)
+ filename = path + name
+ cv2.imwrite(filename, frametmp, [cv2.IMWRITE_JPEG_QUALITY, 100])
+ def __init__(self):
+ self.etc_file = os.path.dirname(os.path.abspath(__file__)) + "/etc/device.json"
+ self.tofs_etc = {}
+ self.updateTofsEtc()
+ self.camera = tof.VzenseTofCam()
+ self.camera_list = {}
+ self.search_flag = False
+ def exec(self, req_data):
+ print(req_data.statu)
+ def updateTofsEtc(self):
+ etc_proto = SDK.ZX.tool.getProtobufJsonConfig(self.etc_file, etc.DevicesConfig())
+ for tof_etc in etc_proto.devices:
+ self.tofs_etc[tof_etc.id] = tof_etc
+ glog.info(self.tofs_etc)
+ self.search_flag = False
+ def getCameraEtc(self, id):
+ return self.tofs_etc[id]
+ def getAllCameraEtc(self):
+ return self.tofs_etc
+ def getCameraList(self):
+ return self.camera_list
+ def searchAllCamera(self):
+ # 搜索相机,会确保配置中启用的相机全部搜索到
+ camera_count = self.camera.VZ_GetDeviceCount()
+ retry_count = 60
+ device_ip_list = []
+ while camera_count <= len(self.tofs_etc) and retry_count > 0:
+ ret, deviceInfoList = self.camera.VZ_GetDeviceInfoList()
+ retry_count = retry_count - 1
+ camera_count = self.camera.VZ_GetDeviceCount()
+ if camera_count > 0:
+ device_ip_list = []
+ for i in range(camera_count):
+ device_ip_list.append(deviceInfoList[i].ip)
+ find_device_count = 0
+ for tof_etc in self.tofs_etc:
+ if self.tofs_etc[tof_etc].enable is False:
+ find_device_count = find_device_count + 1
+ glog.info(self.tofs_etc[tof_etc].ipv4 + " not enable.")
+ if find_device_count == len(self.tofs_etc):
+ self.search_flag = True
+ return self.search_flag
+ continue
+ if self.tofs_etc[tof_etc].ipv4.encode() not in device_ip_list:
+ glog.info("find a device " + self.tofs_etc[tof_etc].ipv4 + " fulture.")
+ break
+ else:
+ find_device_count = find_device_count + 1
+ glog.info("find a device " + self.tofs_etc[tof_etc].ipv4 + " success.")
+ if find_device_count == len(self.tofs_etc):
+ self.search_flag = True
+ return self.search_flag
+ time.sleep(0.5)
+ glog.info("scaning...... " + str(retry_count) + " already scanned" + str(camera_count) + " devices.")
+ return self.search_flag
+ def openCamera(self, id):
+ if id in self.camera_list:
+ glog.info("camera " + self.tofs_etc[id].ipv4 + " already opend.")
+ return tof.VzReturnStatus.VzRetOK
+ if self.tofs_etc[id].enable is False:
+ glog.warning(
+ "this camera not enabled in etc fiile, if you want to enable it, please update the file, and then use func updateTofsEtc.")
+ return tof.VzReturnStatus.VzRetOthers
+ cam = tof.VzenseTofCam()
+ ret = cam.VZ_OpenDeviceByIP(self.tofs_etc[id].ipv4.encode())
+ if ret == 0:
+ glog.info(self.tofs_etc[id].ipv4 + " open successful")
+ self.camera_list[id] = cam
+ else:
+ glog.info(self.tofs_etc[id].ipv4 + ' VZ_OpenDeviceByIP failed: ' + str(ret))
+ def openAllCamera(self):
+ for tof_etc in self.tofs_etc:
+ if self.tofs_etc[tof_etc].enable is False:
+ continue
+ cam = tof.VzenseTofCam()
+ ret = cam.VZ_OpenDeviceByIP(self.tofs_etc[tof_etc].ipv4.encode())
+ if ret == 0:
+ glog.info(self.tofs_etc[tof_etc].ipv4 + " open successful")
+ self.camera_list[tof_etc] = cam
+ else:
+ glog.info(self.tofs_etc[tof_etc].ipv4 + ' VZ_OpenDeviceByIP failed: ' + str(ret))
+ def closeCamera(self, id):
+ if (id not in self.camera_list):
+ glog.info("camera " + self.tofs_etc[id].ipv4 + " already closed.")
+ return tof.VzReturnStatus.VzRetOK
+ ret = self.camera_list[id].VZ_CloseDevice()
+ if ret == 0:
+ del self.camera_list[id]
+ glog.info("close device " + str(self.tofs_etc[id].ipv4) + " successful")
+ else:
+ glog.warning("VZ_CloseDevice " + str(self.tofs_etc[id].ipv4) + " failed: " + str(ret))
+ return ret
+ def closeAllCamera(self):
+ for id in self.camera_list:
+ ret = self.camera_list[id].VZ_CloseDevice()
+ if ret == 0:
+ glog.info("close device " + str(self.tofs_etc[id].ipv4) + " successful")
+ else:
+ glog.warning("VZ_CloseDevice " + str(self.tofs_etc[id].ipv4) + " failed: " + str(ret))
+ self.camera_list.clear()
+ def startCameraStream(self, id):
+ if id in self.camera_list:
+ ret = self.camera_list[id].VZ_StartStream()
+ if ret == 0:
+ glog.info(self.tofs_etc[id].ipv4 + " start stream successful")
+ else:
+ glog.info(self.tofs_etc[id].ipv4 + ' VZ_StartStream failed: ' + str(ret))
+ else:
+ glog.warning("camera " + self.tofs_etc[id].ip + "not open, please open this camera")
+ def startAllCameraStream(self):
+ for id in self.camera_list:
+ ret = self.camera_list[id].VZ_StartStream()
+ if ret == 0:
+ glog.info(self.tofs_etc[id].ipv4 + " start stream successful")
+ else:
+ glog.info(self.tofs_etc[id].ipv4 + ' VZ_StartStream failed: ' + str(ret))
+ def stopCamera(self, id):
+ if id in self.camera_list:
+ ret = self.camera_list[id].VZ_StopStream()
+ if ret == 0:
+ glog.info(self.tofs_etc[id].ipv4 + " stop stream successful")
+ else:
+ glog.info(self.tofs_etc[id].ipv4 + ' VZ_StopStream failed: ' + str(ret))
+ else:
+ glog.warning("camera " + self.tofs_etc[id].ip + "not open, please open this camera")
+ def stopAllCameraStream(self):
+ for id in self.camera_list:
+ ret = self.camera_list[id].VZ_StopStream()
+ if ret == 0:
+ glog.info(self.tofs_etc[id].ipv4 + " stop stream successful")
+ else:
+ glog.info(self.tofs_etc[id].ipv4 + ' VZ_StopStream failed: ' + str(ret))
+ def getCameraFrame(self, id):
+ if id in self.camera_list:
+ ret, frameready = self.camera_list[id].VZ_GetFrameReady(c_uint16(1000))
+ if ret != 0:
+ glog.error("VZ_GetFrameReady failed: %d", ret)
+ return
+ if frameready.depth:
+ ret, depthframe = self.camera_list[id].VZ_GetFrame(tof.VzFrameType.VzDepthFrame)
+ if ret == 0:
+ self.saveDepthFrame2Image(os.getcwd() + "/save/", "depthframe.jpg", depthframe)
+ glog.info(self.tofs_etc[id].ipv4 + " depth frameindex: " + str(depthframe.frameIndex))
+ else:
+ glog.warning("VZ_GetFrame error %d", ret)
+ if frameready.ir:
+ ret, irframe = self.camera_list[id].VZ_GetFrame(tof.VzFrameType.VzIRFrame)
+ if ret == 0:
+ self.saveIrFrame2Image(os.getcwd() + "/save/", "irframe.jpg", irframe)
+ glog.info(self.tofs_etc[id].ipv4 + " ir frameindex: " + str(irframe.frameIndex))
+ else:
+ glog.warning("VZ_GetFrame error %d", ret)
+ def setCameraEtc(self, ip, tof_etc=etc.VzenseTofDevices()):
+ glog.info("=======================")