123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354 |
- import etc_pb2 as etc
- import os, glog, numpy, cv2, SDK.ZX.tool, json, time
- import SDK.Vzense.VzenseDS_api as tof
- from ctypes import *
- class TofManager:
- def saveIrFrame2Image(self, path, name, frame):
- frametmp = numpy.ctypeslib.as_array(frame.pFrameData, (1, frame.dataLen))
- frametmp.dtype = numpy.uint8
- frametmp.shape = (frame.height, frame.width)
- if not os.path.exists(path):
- os.makedirs(path)
- filename = path + name
- cv2.imwrite(filename, frametmp, [cv2.IMWRITE_JPEG_QUALITY, 100])
- def saveDepthFrame2Image(self, path, name, frame):
- frametmp = numpy.ctypeslib.as_array(frame.pFrameData, (1, frame.width * frame.height * 2))
- frametmp.dtype = numpy.uint16
- frametmp.shape = (frame.height, frame.width)
- # convert ushort value to 0xff is just for display
- img = numpy.int32(frametmp)
- img = img * 255 / c_uint16(7495)
- img = numpy.clip(img, 0, 255)
- img = numpy.uint8(img)
- frametmp = cv2.applyColorMap(img, cv2.COLORMAP_RAINBOW)
- if not os.path.exists(path):
- glog.info(path + " not exists, mkdit it.")
- os.makedirs(path)
- filename = path + name
- cv2.imwrite(filename, frametmp, [cv2.IMWRITE_JPEG_QUALITY, 100])
- def savePointCloud(self, width, height, path, name, pointlist):
- file = open(path + name, "w")
- for i in range(width * height):
- if pointlist[i].z != 0 and pointlist[i].z != 65535:
- file.write("{0},{1},{2}\n".format(pointlist[i].x, pointlist[i].y, pointlist[i].z))
- file.close()
- def __init__(self):
- self.etc_file = os.path.dirname(os.path.abspath(__file__)) + "/etc/device.json"
- self.tofs_etc = {}
- self.updateTofsEtc()
- self.camera = tof.VzenseTofCam()
- self.camera_list = {}
- self.search_flag = False
- self.stepPath = ""
- self.stepIndex = 0
- self.liscenFunc = [["getCameraFrame", self.getCameraFrame],
- ["updateTofsEtc", self.updateTofsEtc],
- ["getCameraEtc", self.getCameraEtc],
- ["getAllCameraEtc", self.getAllCameraEtc],
- ["getCameraList", self.getCameraList],
- ["searchAllCamera", self.searchAllCamera],
- ["openCamera", self.openCamera],
- ["openAllCamera", self.openAllCamera],
- ["closeCamera", self.closeCamera],
- ["closeAllCamera", self.closeAllCamera],
- ["startCameraStream", self.startCameraStream],
- ["startAllCameraStream", self.startAllCameraStream],
- ["stopCamera", self.stopCamera],
- ["stopAllCameraStream", self.stopAllCameraStream],
- ["getCameraFrame", self.getCameraFrame],
- ["loopGetCameraFrame", self.loopGetCameraFrame],
- ["stepGetCameraFrame", self.stepGetCameraFrame]
- ]
- # self.t1 = threading(target=func, args=('第一个线程', 1))
- def test(self, data):
- glog.info(data)
- def exec(self, req_data):
- try:
- json_data = json.loads(req_data.statu)
- glog.info(json_data)
- if type(json_data) != dict:
- glog.warning("receive data not is dict")
- return
- for bind_func in self.liscenFunc:
- if json_data["func"] == bind_func[0] and type(json_data["params"]) == dict:
- bind_func[1](json_data["params"])
- return
- glog.warning(json_data["params"])
- except ValueError as e:
- glog.error(req_data.statu)
- def updateTofsEtc(self, receive_data=""):
- etc_proto = SDK.ZX.tool.getProtobufJsonConfig(self.etc_file, etc.DevicesConfig())
- for tof_etc in etc_proto.devices:
- self.tofs_etc[tof_etc.id] = tof_etc
- glog.info(self.tofs_etc)
- self.search_flag = False
- def getCameraEtc(self, receive_data=""):
- id = receive_data["id"]
- return self.tofs_etc[id]
- def getAllCameraEtc(self, receive_data=""):
- return self.tofs_etc
- def getCameraList(self, receive_data=""):
- return self.camera_list
- def searchAllCamera(self, receive_data=""):
- # 搜索相机,会确保配置中启用的相机全部搜索到
- camera_count = self.camera.VZ_GetDeviceCount()
- retry_count = 60
- device_ip_list = []
- while camera_count <= len(self.tofs_etc) and retry_count > 0:
- ret, deviceInfoList = self.camera.VZ_GetDeviceInfoList()
- retry_count = retry_count - 1
- camera_count = self.camera.VZ_GetDeviceCount()
- if camera_count > 0:
- device_ip_list = []
- for i in range(camera_count):
- device_ip_list.append(deviceInfoList[i].ip)
- find_device_count = 0
- for tof_etc in self.tofs_etc:
- if self.tofs_etc[tof_etc].enable is False:
- find_device_count = find_device_count + 1
- glog.info(self.tofs_etc[tof_etc].ipv4 + " not enable.")
- if find_device_count == len(self.tofs_etc):
- self.search_flag = True
- return self.search_flag
- continue
- if self.tofs_etc[tof_etc].ipv4.encode() not in device_ip_list:
- glog.info("find a device " + self.tofs_etc[tof_etc].ipv4 + " fulture.")
- break
- else:
- find_device_count = find_device_count + 1
- glog.info("find a device " + self.tofs_etc[tof_etc].ipv4 + " success.")
- if find_device_count == len(self.tofs_etc):
- self.search_flag = True
- return self.search_flag
- time.sleep(0.5)
- glog.info("scaning...... " + str(retry_count) + " already scanned" + str(camera_count) + " devices.")
- return self.search_flag
- def openCamera(self, receive_data=""):
- id = receive_data["id"]
- if id in self.camera_list:
- glog.info("camera " + self.tofs_etc[id].ipv4 + " already opend.")
- return tof.VzReturnStatus.VzRetOK
- if self.tofs_etc[id].enable is False:
- glog.warning(
- "this camera not enabled in etc fiile, if you want to enable it, please update the file, and then use func updateTofsEtc.")
- return tof.VzReturnStatus.VzRetOthers
- cam = tof.VzenseTofCam()
- ret = cam.VZ_OpenDeviceByIP(self.tofs_etc[id].ipv4.encode())
- if ret == 0:
- glog.info(self.tofs_etc[id].ipv4 + " open successful")
- self.camera_list[id] = cam
- else:
- glog.info(self.tofs_etc[id].ipv4 + ' VZ_OpenDeviceByIP failed: ' + str(ret))
- def openAllCamera(self, receive_data=""):
- for tof_etc in self.tofs_etc:
- if self.tofs_etc[tof_etc].enable is False:
- continue
- cam = tof.VzenseTofCam()
- ret = cam.VZ_OpenDeviceByIP(self.tofs_etc[tof_etc].ipv4.encode())
- if ret == 0:
- glog.info(self.tofs_etc[tof_etc].ipv4 + " open successful")
- self.camera_list[tof_etc] = cam
- else:
- glog.info(self.tofs_etc[tof_etc].ipv4 + ' VZ_OpenDeviceByIP failed: ' + str(ret))
- def closeCamera(self, receive_data=""):
- id = receive_data["id"]
- if (id not in self.camera_list):
- glog.info("camera " + self.tofs_etc[id].ipv4 + " already closed.")
- return tof.VzReturnStatus.VzRetOK
- ret = self.camera_list[id].VZ_CloseDevice()
- if ret == 0:
- del self.camera_list[id]
- glog.info("close device " + str(self.tofs_etc[id].ipv4) + " successful")
- else:
- glog.warning("VZ_CloseDevice " + str(self.tofs_etc[id].ipv4) + " failed: " + str(ret))
- return ret
- def closeAllCamera(self, receive_data=""):
- for id in self.camera_list:
- ret = self.camera_list[id].VZ_CloseDevice()
- if ret == 0:
- glog.info("close device " + str(self.tofs_etc[id].ipv4) + " successful")
- else:
- glog.warning("VZ_CloseDevice " + str(self.tofs_etc[id].ipv4) + " failed: " + str(ret))
- self.camera_list.clear()
- def startCameraStream(self, receive_data=""):
- id = receive_data["id"]
- if id in self.camera_list:
- ret = self.camera_list[id].VZ_StartStream()
- if ret == 0:
- glog.info(self.tofs_etc[id].ipv4 + " start stream successful")
- else:
- glog.info(self.tofs_etc[id].ipv4 + ' VZ_StartStream failed: ' + str(ret))
- else:
- glog.warning("camera " + self.tofs_etc[id].ip + "not open, please open this camera")
- def startAllCameraStream(self, receive_data=""):
- for id in self.camera_list:
- ret = self.camera_list[id].VZ_StartStream()
- if ret == 0:
- glog.info(self.tofs_etc[id].ipv4 + " start stream successful")
- else:
- glog.info(self.tofs_etc[id].ipv4 + ' VZ_StartStream failed: ' + str(ret))
- def stopCamera(self, receive_data=""):
- id = receive_data["id"]
- if id in self.camera_list:
- ret = self.camera_list[id].VZ_StopStream()
- if ret == 0:
- glog.info(self.tofs_etc[id].ipv4 + " stop stream successful")
- else:
- glog.info(self.tofs_etc[id].ipv4 + ' VZ_StopStream failed: ' + str(ret))
- else:
- glog.warning("camera " + self.tofs_etc[id].ip + "not open, please open this camera")
- def stopAllCameraStream(self, receive_data=""):
- for id in self.camera_list:
- ret = self.camera_list[id].VZ_StopStream()
- if ret == 0:
- glog.info(self.tofs_etc[id].ipv4 + " stop stream successful")
- else:
- glog.info(self.tofs_etc[id].ipv4 + ' VZ_StopStream failed: ' + str(ret))
- def getCameraFrame(self, receive_data=""):
- id, path, depth_name, ir_name, point_cloud_name = 0, "", "", "", ""
- if "id" not in receive_data or "path" not in receive_data:
- return
- id = receive_data["id"]
- path = receive_data["path"]
- if "depth_name" in receive_data:
- depth_name = receive_data["depth_name"]
- if "ir_name" in receive_data:
- ir_name = receive_data["ir_name"]
- if "point_cloud_name" in receive_data:
- point_cloud_name = receive_data["point_cloud_name"]
- if id == "" or path == "":
- return
- if id in self.camera_list:
- ret, frameready = self.camera_list[id].VZ_GetFrameReady(c_uint16(1000))
- if ret != 0:
- glog.error("VZ_GetFrameReady failed: %d", ret)
- return
- if frameready.depth and depth_name != "":
- ret, depthframe = self.camera_list[id].VZ_GetFrame(tof.VzFrameType.VzDepthFrame)
- if ret == 0:
- self.saveDepthFrame2Image(path, depth_name + ".jpg", depthframe)
- glog.info(self.tofs_etc[id].ipv4 + " depth frameindex: " + str(depthframe.frameIndex))
- if point_cloud_name != "":
- ret, pointlist = self.camera_list[id].VZ_ConvertDepthFrameToPointCloudVector(depthframe)
- if ret == 0:
- self.savePointCloud(depthframe.width, depthframe.height, path, point_cloud_name + ".txt",
- pointlist)
- else:
- glog.warning("VZ_ConvertDepthFrameToPointCloudVector error %d", ret)
- else:
- glog.warning("VZ_GetFrame error %d", ret)
- if frameready.ir and ir_name != "":
- ret, irframe = self.camera_list[id].VZ_GetFrame(tof.VzFrameType.VzIRFrame)
- if ret == 0:
- self.saveIrFrame2Image(path, ir_name + ".jpg", irframe)
- glog.info(self.tofs_etc[id].ipv4 + " ir frameindex: " + str(irframe.frameIndex))
- else:
- glog.warning("VZ_GetFrame error %d", ret)
- def loopGetCameraFrame(self, receive_data=""):
- path, times, interval = 0, 0, 0
- if "path" not in receive_data or "times" not in receive_data or "interval" not in receive_data:
- return
- times = receive_data["times"]
- interval = receive_data["interval"]
- if interval < 0.2:
- interval = 0.2
- while times:
- receive_data["depth_name"] = "Depth_" + str(receive_data["times"] - times + 1)
- receive_data["ir_name"] = "Ir_" + str(receive_data["times"] - times + 1)
- receive_data["point_cloud_name"] = "cloud_" + str(receive_data["times"] - times + 1)
- glog.info(receive_data)
- self.getCameraFrame(receive_data)
- times = times - 1
- time.sleep(interval)
- def stepGetCameraFrame(self, receive_data=""):
- if "path" not in receive_data or "id" not in receive_data:
- return
- if receive_data["path"] != self.stepPath:
- self.stepIndex = 0
- self.stepPath = receive_data["path"]
- receive_data["depth_name"] = "Depth_" + str(self.stepIndex)
- receive_data["ir_name"] = "Ir_" + str(self.stepIndex)
- receive_data["point_cloud_name"] = "cloud_" + str(self.stepIndex)
- self.getCameraFrame(receive_data)
- self.stepIndex = self.stepIndex + 1
- def getCameraFrameCloud(self, receive_data=""):
- id, path, depth_name, point_cloud_name = 0, "", "", ""
- if "id" not in receive_data or "path" not in receive_data:
- return
- id = receive_data["id"]
- path = receive_data["path"]
- if "depth_name" in receive_data:
- depth_name = receive_data["depth_name"]
- if "point_cloud_name" in receive_data:
- point_cloud_name = receive_data["point_cloud_name"]
- if id == "" or path == "":
- return
- if id in self.camera_list:
- ret, frameready = self.camera_list[id].VZ_GetFrameReady(c_uint16(1000))
- if ret != 0:
- glog.error("VZ_GetFrameReady failed: %d", ret)
- return
- if frameready.depth and depth_name != "":
- ret, depthframe = self.camera_list[id].VZ_GetFrame(tof.VzFrameType.VzDepthFrame)
- if ret == 0:
- self.saveDepthFrame2Image(path, depth_name + ".jpg", depthframe)
- glog.info(self.tofs_etc[id].ipv4 + " depth frameindex: " + str(depthframe.frameIndex))
- pointList = []
- if point_cloud_name != "":
- for h in range(190, 290):
- offset = h * depthframe.width
- for w in range(270, 370):
- depth = tof.VzDepthVector3()
- depth.depthX = w
- depth.depthY = h
- depth.depthZ = depthframe.pFrameData[offset + w]
- ret, point = self.camera_list[id].VZ_ConvertDepthToPointCloud(depth)
- pointList.append(point)
- # depthList.append(pt)
- # ret, pointlist = self.camera_list[id].VZ_ConvertDepthFrameToPointCloudVector(depthframe)
- if ret == 0:
- self.savePointCloud(depthframe.width, depthframe.height, path, point_cloud_name + ".txt",
- pointList)
- else:
- glog.warning("VZ_ConvertDepthFrameToPointCloudVector error %d", ret)
- else:
- glog.warning("VZ_GetFrame error %d", ret)
- def setCameraEtc(self, ip, tof_etc=etc.VzenseTofDevices()):
- glog.info("=======================")
|