import etc_pb2 as etc import os, glog, numpy, cv2, SDK.ZX.tool from tofDevice import * import SDK.Vzense.VzenseDS_api as tof class TofManager: def saveIrFrame2Image(self, path, name, frame): frametmp = numpy.ctypeslib.as_array(frame.pFrameData, (1, frame.dataLen)) frametmp.dtype = numpy.uint8 frametmp.shape = (frame.height, frame.width) if not os.path.exists(path): os.makedirs(path) filename = path + name cv2.imwrite(filename, frametmp, [cv2.IMWRITE_JPEG_QUALITY, 100]) def saveDepthFrame2Image(self, path, name, frame): frametmp = numpy.ctypeslib.as_array(frame.pFrameData, (1, frame.width * frame.height * 2)) frametmp.dtype = numpy.uint16 frametmp.shape = (frame.height, frame.width) # convert ushort value to 0xff is just for display img = numpy.int32(frametmp) img = img * 255 / c_uint16(7495) img = numpy.clip(img, 0, 255) img = numpy.uint8(img) frametmp = cv2.applyColorMap(img, cv2.COLORMAP_RAINBOW) if not os.path.exists(path): print("not exists") os.makedirs(path) filename = path + name cv2.imwrite(filename, frametmp, [cv2.IMWRITE_JPEG_QUALITY, 100]) def __init__(self): self.etc_file = os.path.dirname(os.path.abspath(__file__)) + "/etc/device.json" self.tofs_etc = {} self.updateTofsEtc() self.camera = tof.VzenseTofCam() self.camera_list = {} self.search_flag = False def exec(self, req_data): print(req_data.statu) def updateTofsEtc(self): etc_proto = SDK.ZX.tool.getProtobufJsonConfig(self.etc_file, etc.DevicesConfig()) for tof_etc in etc_proto.devices: self.tofs_etc[tof_etc.id] = tof_etc glog.info(self.tofs_etc) self.search_flag = False def getCameraEtc(self, id): return self.tofs_etc[id] def getAllCameraEtc(self): return self.tofs_etc def getCameraList(self): return self.camera_list def searchAllCamera(self): # 搜索相机,会确保配置中启用的相机全部搜索到 camera_count = self.camera.VZ_GetDeviceCount() retry_count = 60 device_ip_list = [] while camera_count <= len(self.tofs_etc) and retry_count > 0: ret, deviceInfoList = self.camera.VZ_GetDeviceInfoList() retry_count = retry_count - 1 camera_count = self.camera.VZ_GetDeviceCount() if camera_count > 0: device_ip_list = [] for i in range(camera_count): device_ip_list.append(deviceInfoList[i].ip) find_device_count = 0 for tof_etc in self.tofs_etc: if self.tofs_etc[tof_etc].enable is False: find_device_count = find_device_count + 1 glog.info(self.tofs_etc[tof_etc].ipv4 + " not enable.") if find_device_count == len(self.tofs_etc): self.search_flag = True return self.search_flag continue if self.tofs_etc[tof_etc].ipv4.encode() not in device_ip_list: glog.info("find a device " + self.tofs_etc[tof_etc].ipv4 + " fulture.") break else: find_device_count = find_device_count + 1 glog.info("find a device " + self.tofs_etc[tof_etc].ipv4 + " success.") if find_device_count == len(self.tofs_etc): self.search_flag = True return self.search_flag time.sleep(0.5) glog.info("scaning...... " + str(retry_count) + " already scanned" + str(camera_count) + " devices.") return self.search_flag def openCamera(self, id): if id in self.camera_list: glog.info("camera " + self.tofs_etc[id].ipv4 + " already opend.") return tof.VzReturnStatus.VzRetOK if self.tofs_etc[id].enable is False: glog.warning( "this camera not enabled in etc fiile, if you want to enable it, please update the file, and then use func updateTofsEtc.") return tof.VzReturnStatus.VzRetOthers cam = tof.VzenseTofCam() ret = cam.VZ_OpenDeviceByIP(self.tofs_etc[id].ipv4.encode()) if ret == 0: glog.info(self.tofs_etc[id].ipv4 + " open successful") self.camera_list[id] = cam else: glog.info(self.tofs_etc[id].ipv4 + ' VZ_OpenDeviceByIP failed: ' + str(ret)) def openAllCamera(self): for tof_etc in self.tofs_etc: if self.tofs_etc[tof_etc].enable is False: continue cam = tof.VzenseTofCam() ret = cam.VZ_OpenDeviceByIP(self.tofs_etc[tof_etc].ipv4.encode()) if ret == 0: glog.info(self.tofs_etc[tof_etc].ipv4 + " open successful") self.camera_list[tof_etc] = cam else: glog.info(self.tofs_etc[tof_etc].ipv4 + ' VZ_OpenDeviceByIP failed: ' + str(ret)) def closeCamera(self, id): if (id not in self.camera_list): glog.info("camera " + self.tofs_etc[id].ipv4 + " already closed.") return tof.VzReturnStatus.VzRetOK ret = self.camera_list[id].VZ_CloseDevice() if ret == 0: del self.camera_list[id] glog.info("close device " + str(self.tofs_etc[id].ipv4) + " successful") else: glog.warning("VZ_CloseDevice " + str(self.tofs_etc[id].ipv4) + " failed: " + str(ret)) return ret def closeAllCamera(self): for id in self.camera_list: ret = self.camera_list[id].VZ_CloseDevice() if ret == 0: glog.info("close device " + str(self.tofs_etc[id].ipv4) + " successful") else: glog.warning("VZ_CloseDevice " + str(self.tofs_etc[id].ipv4) + " failed: " + str(ret)) self.camera_list.clear() def startCameraStream(self, id): if id in self.camera_list: ret = self.camera_list[id].VZ_StartStream() if ret == 0: glog.info(self.tofs_etc[id].ipv4 + " start stream successful") else: glog.info(self.tofs_etc[id].ipv4 + ' VZ_StartStream failed: ' + str(ret)) else: glog.warning("camera " + self.tofs_etc[id].ip + "not open, please open this camera") def startAllCameraStream(self): for id in self.camera_list: ret = self.camera_list[id].VZ_StartStream() if ret == 0: glog.info(self.tofs_etc[id].ipv4 + " start stream successful") else: glog.info(self.tofs_etc[id].ipv4 + ' VZ_StartStream failed: ' + str(ret)) def stopCamera(self, id): if id in self.camera_list: ret = self.camera_list[id].VZ_StopStream() if ret == 0: glog.info(self.tofs_etc[id].ipv4 + " stop stream successful") else: glog.info(self.tofs_etc[id].ipv4 + ' VZ_StopStream failed: ' + str(ret)) else: glog.warning("camera " + self.tofs_etc[id].ip + "not open, please open this camera") def stopAllCameraStream(self): for id in self.camera_list: ret = self.camera_list[id].VZ_StopStream() if ret == 0: glog.info(self.tofs_etc[id].ipv4 + " stop stream successful") else: glog.info(self.tofs_etc[id].ipv4 + ' VZ_StopStream failed: ' + str(ret)) def getCameraFrame(self, id): if id in self.camera_list: ret, frameready = self.camera_list[id].VZ_GetFrameReady(c_uint16(1000)) if ret != 0: glog.error("VZ_GetFrameReady failed: %d", ret) return if frameready.depth: ret, depthframe = self.camera_list[id].VZ_GetFrame(tof.VzFrameType.VzDepthFrame) if ret == 0: self.saveDepthFrame2Image(os.getcwd() + "/save/", "depthframe.jpg", depthframe) glog.info(self.tofs_etc[id].ipv4 + " depth frameindex: " + str(depthframe.frameIndex)) else: glog.warning("VZ_GetFrame error %d", ret) if frameready.ir: ret, irframe = self.camera_list[id].VZ_GetFrame(tof.VzFrameType.VzIRFrame) if ret == 0: self.saveIrFrame2Image(os.getcwd() + "/save/", "irframe.jpg", irframe) glog.info(self.tofs_etc[id].ipv4 + " ir frameindex: " + str(irframe.frameIndex)) else: glog.warning("VZ_GetFrame error %d", ret) def setCameraEtc(self, ip, tof_etc=etc.VzenseTofDevices()): glog.info("=======================")