123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205 |
- import etc_pb2 as etc
- import os, glog, numpy, cv2, SDK.ZX.tool
- from tofDevice import *
- import SDK.Vzense.VzenseDS_api as tof
- class TofManager:
- def saveIrFrame2Image(self, path, name, frame):
- frametmp = numpy.ctypeslib.as_array(frame.pFrameData, (1, frame.dataLen))
- frametmp.dtype = numpy.uint8
- frametmp.shape = (frame.height, frame.width)
- if not os.path.exists(path):
- os.makedirs(path)
- filename = path + name
- cv2.imwrite(filename, frametmp, [cv2.IMWRITE_JPEG_QUALITY, 100])
- def saveDepthFrame2Image(self, path, name, frame):
- frametmp = numpy.ctypeslib.as_array(frame.pFrameData, (1, frame.width * frame.height * 2))
- frametmp.dtype = numpy.uint16
- frametmp.shape = (frame.height, frame.width)
- # convert ushort value to 0xff is just for display
- img = numpy.int32(frametmp)
- img = img * 255 / c_uint16(7495)
- img = numpy.clip(img, 0, 255)
- img = numpy.uint8(img)
- frametmp = cv2.applyColorMap(img, cv2.COLORMAP_RAINBOW)
- if not os.path.exists(path):
- print("not exists")
- os.makedirs(path)
- filename = path + name
- cv2.imwrite(filename, frametmp, [cv2.IMWRITE_JPEG_QUALITY, 100])
- def __init__(self):
- self.etc_file = os.path.dirname(os.path.abspath(__file__)) + "/etc/device.json"
- self.tofs_etc = {}
- self.updateTofsEtc()
- self.camera = tof.VzenseTofCam()
- self.camera_list = {}
- self.search_flag = False
- def exec(self, req_data):
- print(req_data.statu)
- def updateTofsEtc(self):
- etc_proto = SDK.ZX.tool.getProtobufJsonConfig(self.etc_file, etc.DevicesConfig())
- for tof_etc in etc_proto.devices:
- self.tofs_etc[tof_etc.id] = tof_etc
- glog.info(self.tofs_etc)
- self.search_flag = False
- def getCameraEtc(self, id):
- return self.tofs_etc[id]
- def getAllCameraEtc(self):
- return self.tofs_etc
- def getCameraList(self):
- return self.camera_list
- def searchAllCamera(self):
- # 搜索相机,会确保配置中启用的相机全部搜索到
- camera_count = self.camera.VZ_GetDeviceCount()
- retry_count = 60
- device_ip_list = []
- while camera_count <= len(self.tofs_etc) and retry_count > 0:
- ret, deviceInfoList = self.camera.VZ_GetDeviceInfoList()
- retry_count = retry_count - 1
- camera_count = self.camera.VZ_GetDeviceCount()
- if camera_count > 0:
- device_ip_list = []
- for i in range(camera_count):
- device_ip_list.append(deviceInfoList[i].ip)
- find_device_count = 0
- for tof_etc in self.tofs_etc:
- if self.tofs_etc[tof_etc].enable is False:
- find_device_count = find_device_count + 1
- glog.info(self.tofs_etc[tof_etc].ipv4 + " not enable.")
- if find_device_count == len(self.tofs_etc):
- self.search_flag = True
- return self.search_flag
- continue
- if self.tofs_etc[tof_etc].ipv4.encode() not in device_ip_list:
- glog.info("find a device " + self.tofs_etc[tof_etc].ipv4 + " fulture.")
- break
- else:
- find_device_count = find_device_count + 1
- glog.info("find a device " + self.tofs_etc[tof_etc].ipv4 + " success.")
- if find_device_count == len(self.tofs_etc):
- self.search_flag = True
- return self.search_flag
- time.sleep(0.5)
- glog.info("scaning...... " + str(retry_count) + " already scanned" + str(camera_count) + " devices.")
- return self.search_flag
- def openCamera(self, id):
- if id in self.camera_list:
- glog.info("camera " + self.tofs_etc[id].ipv4 + " already opend.")
- return tof.VzReturnStatus.VzRetOK
- if self.tofs_etc[id].enable is False:
- glog.warning(
- "this camera not enabled in etc fiile, if you want to enable it, please update the file, and then use func updateTofsEtc.")
- return tof.VzReturnStatus.VzRetOthers
- cam = tof.VzenseTofCam()
- ret = cam.VZ_OpenDeviceByIP(self.tofs_etc[id].ipv4.encode())
- if ret == 0:
- glog.info(self.tofs_etc[id].ipv4 + " open successful")
- self.camera_list[id] = cam
- else:
- glog.info(self.tofs_etc[id].ipv4 + ' VZ_OpenDeviceByIP failed: ' + str(ret))
- def openAllCamera(self):
- for tof_etc in self.tofs_etc:
- if self.tofs_etc[tof_etc].enable is False:
- continue
- cam = tof.VzenseTofCam()
- ret = cam.VZ_OpenDeviceByIP(self.tofs_etc[tof_etc].ipv4.encode())
- if ret == 0:
- glog.info(self.tofs_etc[tof_etc].ipv4 + " open successful")
- self.camera_list[tof_etc] = cam
- else:
- glog.info(self.tofs_etc[tof_etc].ipv4 + ' VZ_OpenDeviceByIP failed: ' + str(ret))
- def closeCamera(self, id):
- if (id not in self.camera_list):
- glog.info("camera " + self.tofs_etc[id].ipv4 + " already closed.")
- return tof.VzReturnStatus.VzRetOK
- ret = self.camera_list[id].VZ_CloseDevice()
- if ret == 0:
- del self.camera_list[id]
- glog.info("close device " + str(self.tofs_etc[id].ipv4) + " successful")
- else:
- glog.warning("VZ_CloseDevice " + str(self.tofs_etc[id].ipv4) + " failed: " + str(ret))
- return ret
- def closeAllCamera(self):
- for id in self.camera_list:
- ret = self.camera_list[id].VZ_CloseDevice()
- if ret == 0:
- glog.info("close device " + str(self.tofs_etc[id].ipv4) + " successful")
- else:
- glog.warning("VZ_CloseDevice " + str(self.tofs_etc[id].ipv4) + " failed: " + str(ret))
- self.camera_list.clear()
- def startCameraStream(self, id):
- if id in self.camera_list:
- ret = self.camera_list[id].VZ_StartStream()
- if ret == 0:
- glog.info(self.tofs_etc[id].ipv4 + " start stream successful")
- else:
- glog.info(self.tofs_etc[id].ipv4 + ' VZ_StartStream failed: ' + str(ret))
- else:
- glog.warning("camera " + self.tofs_etc[id].ip + "not open, please open this camera")
- def startAllCameraStream(self):
- for id in self.camera_list:
- ret = self.camera_list[id].VZ_StartStream()
- if ret == 0:
- glog.info(self.tofs_etc[id].ipv4 + " start stream successful")
- else:
- glog.info(self.tofs_etc[id].ipv4 + ' VZ_StartStream failed: ' + str(ret))
- def stopCamera(self, id):
- if id in self.camera_list:
- ret = self.camera_list[id].VZ_StopStream()
- if ret == 0:
- glog.info(self.tofs_etc[id].ipv4 + " stop stream successful")
- else:
- glog.info(self.tofs_etc[id].ipv4 + ' VZ_StopStream failed: ' + str(ret))
- else:
- glog.warning("camera " + self.tofs_etc[id].ip + "not open, please open this camera")
- def stopAllCameraStream(self):
- for id in self.camera_list:
- ret = self.camera_list[id].VZ_StopStream()
- if ret == 0:
- glog.info(self.tofs_etc[id].ipv4 + " stop stream successful")
- else:
- glog.info(self.tofs_etc[id].ipv4 + ' VZ_StopStream failed: ' + str(ret))
- def getCameraFrame(self, id):
- if id in self.camera_list:
- ret, frameready = self.camera_list[id].VZ_GetFrameReady(c_uint16(1000))
- if ret != 0:
- glog.error("VZ_GetFrameReady failed: %d", ret)
- return
- if frameready.depth:
- ret, depthframe = self.camera_list[id].VZ_GetFrame(tof.VzFrameType.VzDepthFrame)
- if ret == 0:
- self.saveDepthFrame2Image(os.getcwd() + "/save/", "depthframe.jpg", depthframe)
- glog.info(self.tofs_etc[id].ipv4 + " depth frameindex: " + str(depthframe.frameIndex))
- else:
- glog.warning("VZ_GetFrame error %d", ret)
- if frameready.ir:
- ret, irframe = self.camera_list[id].VZ_GetFrame(tof.VzFrameType.VzIRFrame)
- if ret == 0:
- self.saveIrFrame2Image(os.getcwd() + "/save/", "irframe.jpg", irframe)
- glog.info(self.tofs_etc[id].ipv4 + " ir frameindex: " + str(irframe.frameIndex))
- else:
- glog.warning("VZ_GetFrame error %d", ret)
- def setCameraEtc(self, ip, tof_etc=etc.VzenseTofDevices()):
- glog.info("=======================")
|