import etc_pb2 as etc import os, glog, numpy, cv2, SDK.ZX.tool, json, time from tofDevice import * import SDK.Vzense.VzenseDS_api as tof class TofManager: def saveIrFrame2Image(self, path, name, frame): frametmp = numpy.ctypeslib.as_array(frame.pFrameData, (1, frame.dataLen)) frametmp.dtype = numpy.uint8 frametmp.shape = (frame.height, frame.width) if not os.path.exists(path): os.makedirs(path) filename = path + name cv2.imwrite(filename, frametmp, [cv2.IMWRITE_JPEG_QUALITY, 100]) def saveDepthFrame2Image(self, path, name, frame): frametmp = numpy.ctypeslib.as_array(frame.pFrameData, (1, frame.width * frame.height * 2)) frametmp.dtype = numpy.uint16 frametmp.shape = (frame.height, frame.width) # convert ushort value to 0xff is just for display img = numpy.int32(frametmp) img = img * 255 / c_uint16(7495) img = numpy.clip(img, 0, 255) img = numpy.uint8(img) frametmp = cv2.applyColorMap(img, cv2.COLORMAP_RAINBOW) if not os.path.exists(path): glog.info(path + " not exists, mkdit it.") os.makedirs(path) filename = path + name cv2.imwrite(filename, frametmp, [cv2.IMWRITE_JPEG_QUALITY, 100]) def __init__(self): self.etc_file = os.path.dirname(os.path.abspath(__file__)) + "/etc/device.json" self.tofs_etc = {} self.updateTofsEtc() self.camera = tof.VzenseTofCam() self.camera_list = {} self.search_flag = False self.liscenFunc = [["getCameraFrame", self.getCameraFrame], ["updateTofsEtc", self.updateTofsEtc], ["getCameraEtc", self.getCameraEtc], ["getAllCameraEtc", self.getAllCameraEtc], ["getCameraList", self.getCameraList], ["searchAllCamera", self.searchAllCamera], ["openCamera", self.openCamera], ["openAllCamera", self.openAllCamera], ["closeCamera", self.closeCamera], ["closeAllCamera", self.closeAllCamera], ["startCameraStream", self.startCameraStream], ["startAllCameraStream", self.startAllCameraStream], ["stopCamera", self.stopCamera], ["stopAllCameraStream", self.stopAllCameraStream], ["getCameraFrame", self.getCameraFrame], ["test", self.test], ["loopGetCameraFrame", self.loopGetCameraFrame] ] # self.t1 = threading(target=func, args=('第一个线程', 1)) def test(self, data): glog.info(data) def exec(self, req_data): try: json_data = json.loads(req_data.statu) glog.info(json_data) if type(json_data) != dict: glog.warning("receive data not is dict") return for bind_func in self.liscenFunc: if json_data["func"] == bind_func[0] and type(json_data["params"]) == dict: bind_func[1](json_data["params"]) return glog.warning(json_data["params"]) # {"func": "test", "params": {"id": 1}} except ValueError as e: glog.error(req_data.statu) def updateTofsEtc(self, receive_data=""): etc_proto = SDK.ZX.tool.getProtobufJsonConfig(self.etc_file, etc.DevicesConfig()) for tof_etc in etc_proto.devices: self.tofs_etc[tof_etc.id] = tof_etc glog.info(self.tofs_etc) self.search_flag = False def getCameraEtc(self, receive_data=""): id = receive_data["id"] return self.tofs_etc[id] def getAllCameraEtc(self, receive_data=""): return self.tofs_etc def getCameraList(self, receive_data=""): return self.camera_list def searchAllCamera(self, receive_data=""): # 搜索相机,会确保配置中启用的相机全部搜索到 camera_count = self.camera.VZ_GetDeviceCount() retry_count = 60 device_ip_list = [] while camera_count <= len(self.tofs_etc) and retry_count > 0: ret, deviceInfoList = self.camera.VZ_GetDeviceInfoList() retry_count = retry_count - 1 camera_count = self.camera.VZ_GetDeviceCount() if camera_count > 0: device_ip_list = [] for i in range(camera_count): device_ip_list.append(deviceInfoList[i].ip) find_device_count = 0 for tof_etc in self.tofs_etc: if self.tofs_etc[tof_etc].enable is False: find_device_count = find_device_count + 1 glog.info(self.tofs_etc[tof_etc].ipv4 + " not enable.") if find_device_count == len(self.tofs_etc): self.search_flag = True return self.search_flag continue if self.tofs_etc[tof_etc].ipv4.encode() not in device_ip_list: glog.info("find a device " + self.tofs_etc[tof_etc].ipv4 + " fulture.") break else: find_device_count = find_device_count + 1 glog.info("find a device " + self.tofs_etc[tof_etc].ipv4 + " success.") if find_device_count == len(self.tofs_etc): self.search_flag = True return self.search_flag time.sleep(0.5) glog.info("scaning...... " + str(retry_count) + " already scanned" + str(camera_count) + " devices.") return self.search_flag def openCamera(self, receive_data=""): id = receive_data["id"] if id in self.camera_list: glog.info("camera " + self.tofs_etc[id].ipv4 + " already opend.") return tof.VzReturnStatus.VzRetOK if self.tofs_etc[id].enable is False: glog.warning( "this camera not enabled in etc fiile, if you want to enable it, please update the file, and then use func updateTofsEtc.") return tof.VzReturnStatus.VzRetOthers cam = tof.VzenseTofCam() ret = cam.VZ_OpenDeviceByIP(self.tofs_etc[id].ipv4.encode()) if ret == 0: glog.info(self.tofs_etc[id].ipv4 + " open successful") self.camera_list[id] = cam else: glog.info(self.tofs_etc[id].ipv4 + ' VZ_OpenDeviceByIP failed: ' + str(ret)) def openAllCamera(self, receive_data=""): for tof_etc in self.tofs_etc: if self.tofs_etc[tof_etc].enable is False: continue cam = tof.VzenseTofCam() ret = cam.VZ_OpenDeviceByIP(self.tofs_etc[tof_etc].ipv4.encode()) if ret == 0: glog.info(self.tofs_etc[tof_etc].ipv4 + " open successful") self.camera_list[tof_etc] = cam else: glog.info(self.tofs_etc[tof_etc].ipv4 + ' VZ_OpenDeviceByIP failed: ' + str(ret)) def closeCamera(self, receive_data=""): id = receive_data["id"] if (id not in self.camera_list): glog.info("camera " + self.tofs_etc[id].ipv4 + " already closed.") return tof.VzReturnStatus.VzRetOK ret = self.camera_list[id].VZ_CloseDevice() if ret == 0: del self.camera_list[id] glog.info("close device " + str(self.tofs_etc[id].ipv4) + " successful") else: glog.warning("VZ_CloseDevice " + str(self.tofs_etc[id].ipv4) + " failed: " + str(ret)) return ret def closeAllCamera(self, receive_data=""): for id in self.camera_list: ret = self.camera_list[id].VZ_CloseDevice() if ret == 0: glog.info("close device " + str(self.tofs_etc[id].ipv4) + " successful") else: glog.warning("VZ_CloseDevice " + str(self.tofs_etc[id].ipv4) + " failed: " + str(ret)) self.camera_list.clear() def startCameraStream(self, receive_data=""): id = receive_data["id"] if id in self.camera_list: ret = self.camera_list[id].VZ_StartStream() if ret == 0: glog.info(self.tofs_etc[id].ipv4 + " start stream successful") else: glog.info(self.tofs_etc[id].ipv4 + ' VZ_StartStream failed: ' + str(ret)) else: glog.warning("camera " + self.tofs_etc[id].ip + "not open, please open this camera") def startAllCameraStream(self, receive_data=""): for id in self.camera_list: ret = self.camera_list[id].VZ_StartStream() if ret == 0: glog.info(self.tofs_etc[id].ipv4 + " start stream successful") else: glog.info(self.tofs_etc[id].ipv4 + ' VZ_StartStream failed: ' + str(ret)) def stopCamera(self, receive_data=""): id = receive_data["id"] if id in self.camera_list: ret = self.camera_list[id].VZ_StopStream() if ret == 0: glog.info(self.tofs_etc[id].ipv4 + " stop stream successful") else: glog.info(self.tofs_etc[id].ipv4 + ' VZ_StopStream failed: ' + str(ret)) else: glog.warning("camera " + self.tofs_etc[id].ip + "not open, please open this camera") def stopAllCameraStream(self, receive_data=""): for id in self.camera_list: ret = self.camera_list[id].VZ_StopStream() if ret == 0: glog.info(self.tofs_etc[id].ipv4 + " stop stream successful") else: glog.info(self.tofs_etc[id].ipv4 + ' VZ_StopStream failed: ' + str(ret)) def getCameraFrame(self, receive_data=""): id, path, depth_name, ir_name = 0, "", "", "" if "id" not in receive_data or "path" not in receive_data: return id = receive_data["id"] path = receive_data["path"] if "depth_name" in receive_data: depth_name = receive_data["depth_name"] if "ir_name" in receive_data: ir_name = receive_data["ir_name"] if id == "" or path == "": return if id in self.camera_list: ret, frameready = self.camera_list[id].VZ_GetFrameReady(c_uint16(1000)) if ret != 0: glog.error("VZ_GetFrameReady failed: %d", ret) return if frameready.depth and depth_name != "": ret, depthframe = self.camera_list[id].VZ_GetFrame(tof.VzFrameType.VzDepthFrame) if ret == 0: self.saveDepthFrame2Image(path, depth_name + ".jpg", depthframe) glog.info(self.tofs_etc[id].ipv4 + " depth frameindex: " + str(depthframe.frameIndex)) else: glog.warning("VZ_GetFrame error %d", ret) if frameready.ir and ir_name != "": ret, irframe = self.camera_list[id].VZ_GetFrame(tof.VzFrameType.VzIRFrame) if ret == 0: self.saveIrFrame2Image(path, ir_name + ".jpg", irframe) glog.info(self.tofs_etc[id].ipv4 + " ir frameindex: " + str(irframe.frameIndex)) else: glog.warning("VZ_GetFrame error %d", ret) def loopGetCameraFrame(self, receive_data=""): path, times, interval = 0, 0, 0 if "path" not in receive_data or "times" not in receive_data or "interval" not in receive_data: return times = receive_data["times"] interval = receive_data["interval"] if interval < 0.2: interval = 0.2 while times: receive_data["depth_name"] = "Depth_" + str(receive_data["times"] - times + 1) receive_data["ir_name"] = "Ir_" + str(receive_data["times"] - times + 1) glog.info(receive_data) self.getCameraFrame(receive_data) times = times - 1 time.sleep(interval) def setCameraEtc(self, ip, tof_etc=etc.VzenseTofDevices()): glog.info("=======================")