streamServer.cpp 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. #include "streamServer.h"
  2. #include <unistd.h>
  3. #include "communication/transitData.h"
  4. CustomServer::CustomServer() {
  5. update_ = false;
  6. }
  7. CustomServer::~CustomServer() {
  8. openDataStream_ = false;
  9. openImageStream_ = true;
  10. }
  11. ::grpc::Status
  12. CustomServer::OpenMeasureDataStream(::grpc::ServerContext *context, const ::JetStream::RequestCmd *request,
  13. ::grpc::ServerWriter<::JetStream::MeasureInfo> *writer) {
  14. openDataStream_ = true;
  15. printf("打开测量数据流\n");
  16. while (openDataStream_) {
  17. TransitData::MeasureInfo info;
  18. if (TransitData::get_instance_pointer()->getMeasureInfo(info, 0.3)) {
  19. ::JetStream::MeasureInfo jetInfo;
  20. jetInfo.set_x(info.x);
  21. jetInfo.set_y(info.y);
  22. jetInfo.set_trans_x(info.transx);
  23. jetInfo.set_trans_y(info.transy);
  24. jetInfo.set_theta(info.theta);
  25. jetInfo.set_width(info.width);
  26. jetInfo.set_wheelbase(info.wheelbase);
  27. jetInfo.set_ftheta(info.ftheta);
  28. jetInfo.set_border_plc(info.border_plc);
  29. jetInfo.set_border_display(info.border_display);
  30. jetInfo.set_error(info.error);
  31. writer->Write(jetInfo);
  32. }
  33. if (context->IsCancelled()) {
  34. printf("Data Stream canceled\n");
  35. return ::grpc::Status::OK;
  36. }
  37. std::this_thread::sleep_for(std::chrono::milliseconds(100));
  38. }
  39. printf("关闭测量数据流\n");
  40. return ::grpc::Status::OK;
  41. }
  42. ::grpc::Status CustomServer::OpenImageStream(::grpc::ServerContext *context,
  43. const ::JetStream::RequestCmd *request,
  44. ::grpc::ServerWriter<::JetStream::ResImage> *writer) {
  45. openImageStream_ = true;
  46. printf("打开视频流\n");
  47. while (openImageStream_) {
  48. ::JetStream::ResImage jetIamages;
  49. bool ret[4] = {0};
  50. cv::Mat image[4];
  51. for (int i = 0; i < 4; i++) {
  52. ret[i] = TransitData::get_instance_pointer()->getImage(image[i], i, 0.3);
  53. }
  54. if (ret[0]) jetIamages.mutable_img1()->CopyFrom(cvMat2JetImage(image[0]));
  55. if (ret[1]) jetIamages.mutable_img2()->CopyFrom(cvMat2JetImage(image[1]));
  56. if (ret[2]) jetIamages.mutable_img3()->CopyFrom(cvMat2JetImage(image[2]));
  57. if (ret[3]) jetIamages.mutable_img4()->CopyFrom(cvMat2JetImage(image[3]));
  58. writer->Write(jetIamages);
  59. if (context->IsCancelled()) {
  60. printf(" canceled\n");
  61. return ::grpc::Status::OK;
  62. }
  63. std::this_thread::sleep_for(std::chrono::milliseconds(33));
  64. }
  65. printf("关闭视频流\n");
  66. return ::grpc::Status::OK;
  67. }
  68. ::grpc::Status CustomServer::CloseImageStream(::grpc::ServerContext *context, const ::JetStream::RequestCmd *request,
  69. ::JetStream::Response *response) {
  70. openImageStream_ = false;
  71. return ::grpc::Status::OK;
  72. }
  73. ::grpc::Status
  74. CustomServer::CloseMeasureDataStream(::grpc::ServerContext *context, const ::JetStream::RequestCmd *request,
  75. ::JetStream::Response *response) {
  76. openDataStream_ = false;
  77. return ::grpc::Status::OK;
  78. }
  79. ::grpc::Status CustomServer::GetCloud(::grpc::ServerContext *context, const ::JetStream::RequestCmd *request,
  80. ::JetStream::ResCloud *response) {
  81. int id = request->id();
  82. std::vector<cv::Point> mask;
  83. cv::Mat imageCL;
  84. ::JetStream::ResCloud jetClouds;
  85. ::JetStream::PointCloud cloud;
  86. if (TransitData::get_instance_pointer()->getImageCL(imageCL, id, 0.3)) {
  87. // for (int cols = 0; cols < 640 - 0; cols++) {
  88. // for (int rows = 0; rows < 480 - 0; rows++) {
  89. // imageCL.at<cv::Vec4f>(rows, cols)[0] > 5. ? printf("%.f\n", imageCL.at<cv::Vec4f>(rows, cols)[0]) : false;
  90. // }
  91. // }
  92. int size = imageCL.rows * imageCL.cols;
  93. char *data = (char *) malloc(size * 4 * sizeof(short));
  94. memset(data, 0, size * 4 * sizeof(short));
  95. if (TransitData::get_instance_pointer()->getMask(mask, id, 0.3)) {
  96. printf("%d %d\n", mask[0].x, mask[0].y);
  97. for (auto i : mask) {
  98. imageCL.at<cv::Vec4f>(i)[3] = 0.95;
  99. }
  100. }
  101. int count = 0;
  102. cv::Mat mat = cv::Mat::zeros(480, 640, CV_8U);
  103. for (int i = 0; i < imageCL.rows; ++i) {
  104. for (int j = 0; j < imageCL.cols; ++j) {
  105. cv::Vec4f pt = imageCL.at<cv::Vec4f>(i, j);
  106. if (fabs(pt[0] < 1e-6) && fabs(pt[1]) < 1e-6 && fabs(pt[2]) < 1e-6)
  107. continue;
  108. else {
  109. short xyz[4] = {0};
  110. xyz[0] = short(pt[0] / 5. * 32750);
  111. xyz[1] = short(pt[1] / 5. * 32750);
  112. xyz[2] = short(pt[2] / 5. * 32750);
  113. xyz[3] = short(pt[3] / 5. * 32750.);
  114. if (pt[3] == 1.0) {
  115. mat.at<uchar>(i ,j) = 255;
  116. // printf("%d %d %d %d %d %d\n", xyz[0], xyz[1], xyz[2], xyz[3], i, j);
  117. }
  118. // if (fabs(pt[0]) > 5 || fabs(pt[1]) > 5 || fabs(pt[2]) > 5 || fabs(pt[3]) > 5) {
  119. // std::cout << pt[0] << " " << pt[1] << " " << pt[2] << " " << pt[3] << "\n";
  120. // }
  121. memcpy(data + count * 4 * sizeof(short), xyz, 4 * sizeof(short));
  122. count++;
  123. }
  124. }
  125. }
  126. // cv::imwrite("/home/zx/sss.jpeg", mat);
  127. // cv::imshow("22222", mat);
  128. // cv::waitKey(0);
  129. cloud.set_size(count);
  130. cloud.set_data(data, count * 4 * sizeof(short));
  131. free(data);
  132. switch (id + 1) {
  133. case 1:
  134. jetClouds.mutable_cloud1()->CopyFrom(cloud);
  135. break;
  136. case 2:
  137. jetClouds.mutable_cloud2()->CopyFrom(cloud);
  138. break;
  139. case 3:
  140. jetClouds.mutable_cloud3()->CopyFrom(cloud);
  141. break;
  142. case 4:
  143. jetClouds.mutable_cloud4()->CopyFrom(cloud);
  144. break;
  145. default:
  146. break;
  147. }
  148. response->CopyFrom(jetClouds);
  149. return ::grpc::Status::OK;
  150. } else {
  151. return ::grpc::Status(::grpc::StatusCode::DATA_LOSS, "0.3s内没有等到最新数据");
  152. }
  153. }
  154. ::grpc::Status CustomServer::GetImage(::grpc::ServerContext *context, const ::JetStream::RequestCmd *request,
  155. ::JetStream::ResImage *response) {
  156. int id = request->id();
  157. cv::Mat image;
  158. ::JetStream::Image jetImgs[4];
  159. if (id == -1) {
  160. for (int i = 0; i < 4; ++i) {
  161. if (TransitData::get_instance_pointer()->getImage(image, i, 0.3)) {
  162. jetImgs[i].CopyFrom(cvMat2JetImage(image));
  163. }
  164. }
  165. response->mutable_img1()->CopyFrom(jetImgs[0]);
  166. response->mutable_img2()->CopyFrom(jetImgs[1]);
  167. response->mutable_img3()->CopyFrom(jetImgs[2]);
  168. response->mutable_img4()->CopyFrom(jetImgs[3]);
  169. return ::grpc::Status::OK;
  170. }
  171. ::JetStream::Image jetImage;
  172. if (TransitData::get_instance_pointer()->getImage(image, id, 0.3)) {
  173. jetImage.CopyFrom(cvMat2JetImage(image));
  174. switch (id) {
  175. case 0:
  176. response->mutable_img1()->CopyFrom(jetImage);
  177. break;
  178. case 1:
  179. response->mutable_img2()->CopyFrom(jetImage);
  180. break;
  181. case 2:
  182. response->mutable_img3()->CopyFrom(jetImage);
  183. break;
  184. case 3:
  185. response->mutable_img4()->CopyFrom(jetImage);
  186. break;
  187. default:
  188. break;
  189. }
  190. return ::grpc::Status::OK;
  191. } else {
  192. return ::grpc::Status(::grpc::StatusCode::DATA_LOSS, "0.3s内没有等到最新数据");
  193. }
  194. }
  195. ::JetStream::Image CustomServer::cvMat2JetImage(cv::Mat &image) {
  196. ::JetStream::Image img;
  197. img.set_width(image.cols);
  198. img.set_height(image.rows);
  199. img.set_channel(image.channels());
  200. img.set_data(image.data, image.cols * image.rows * image.channels());
  201. return img;
  202. }
  203. grpc::Status CustomServer::setTofTransformation(::grpc::ServerContext *context, const ::JetStream::RequestCmd *request,
  204. ::JetStream::Response *response) {
  205. CoordinateTransformation3D params;
  206. TofTransformationManager::iter()->get((DeviceAzimuth)request->id(), params);
  207. if (request->params().has_x()) {
  208. params.set_x(request->params().x());
  209. }
  210. if (request->params().has_y()) {
  211. params.set_y(request->params().y());
  212. }
  213. if (request->params().has_z()) {
  214. params.set_z(request->params().z());
  215. }
  216. if (request->params().has_pitch()) {
  217. params.set_pitch(request->params().pitch());
  218. }
  219. if (request->params().has_roll()) {
  220. params.set_roll(request->params().roll());
  221. }
  222. if (request->params().has_yaw()) {
  223. params.set_yaw(request->params().yaw());
  224. }
  225. TofTransformationManager::iter()->set((DeviceAzimuth)request->id(), params);
  226. return grpc::Status();
  227. }
  228. grpc::Status CustomServer::saveTofTransformation(::grpc::ServerContext *context, const ::JetStream::RequestCmd *request,
  229. ::JetStream::Response *response) {
  230. TofTransformationManager::iter()->save();
  231. return grpc::Status();
  232. }
  233. StreamRpcServer::StreamRpcServer() {
  234. thread_ = nullptr;
  235. }
  236. StreamRpcServer::~StreamRpcServer() {
  237. if (thread_ != nullptr) {
  238. server_->Shutdown();
  239. if (thread_->joinable()) {
  240. thread_->join();
  241. delete thread_;
  242. }
  243. }
  244. }
  245. void StreamRpcServer::Listenning(std::string ip, int port) {
  246. char ipport[64] = {0};
  247. printf("listenning grpc %s ", ip.c_str());
  248. sprintf(ipport, "%s:%d", ip.c_str(), port);
  249. builder_.AddListeningPort(ipport, grpc::InsecureServerCredentials());
  250. builder_.RegisterService(&service_);
  251. server_ = builder_.BuildAndStart();
  252. thread_ = new std::thread(&StreamRpcServer::WaitThread, this);
  253. }
  254. void StreamRpcServer::WaitThread(void *p) {
  255. StreamRpcServer *server = (StreamRpcServer *) p;
  256. printf(" listenning ........\n");
  257. server->server_->Wait();
  258. printf(" server closesd.....\n");
  259. }