streamServer.cpp 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. #include "streamServer.h"
  2. #include <unistd.h>
  3. CustomServer::CustomServer(){
  4. update_=false;
  5. }
  6. CustomServer::~CustomServer(){
  7. openDataStream_=false;
  8. openImageStream_=true;
  9. }
  10. void CustomServer::update(const ::JetStream::ResFrame& frame){
  11. frame_.CopyFrom(frame);
  12. update_=true;
  13. }
  14. bool CustomServer::waitForUpdata(float tm){
  15. update_=false;
  16. //等待最新数据
  17. auto tp=std::chrono::steady_clock::now();
  18. double time = 0;
  19. while(time<tm && update_==false ){
  20. std::this_thread::sleep_for(std::chrono::milliseconds(10));
  21. auto now=std::chrono::steady_clock::now();
  22. auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(now - tp);
  23. time = double(duration.count()) * std::chrono::milliseconds::period::num /
  24. std::chrono::milliseconds::period::den;
  25. }
  26. if(update_==false){
  27. printf("在%fs内没有等待最新数据\n",tm);
  28. return false;
  29. }
  30. return true;
  31. }
  32. ::grpc::Status CustomServer::OpenMeasureDataStream(::grpc::ServerContext* context, const ::JetStream::RequestCmd* request,
  33. ::grpc::ServerWriter< ::JetStream::MeasureInfo>* writer){
  34. openDataStream_=true;
  35. printf("打开测量数据流\n");
  36. while(openDataStream_){
  37. if(waitForUpdata()){
  38. writer->Write(frame_.measure_info());
  39. }else{
  40. ::JetStream::MeasureInfo info;
  41. info.set_error("没有数据");
  42. writer->Write(info);
  43. }
  44. if(context->IsCancelled()){
  45. printf("Data Stream canceled\n");
  46. return ::grpc::Status::OK;
  47. }
  48. std::this_thread::sleep_for(std::chrono::milliseconds(100));
  49. }
  50. printf("关闭测量数据流\n");
  51. return ::grpc::Status::OK;
  52. }
  53. ::grpc::Status CustomServer::OpenImageStream(::grpc::ServerContext* context,
  54. const ::JetStream::RequestCmd* request, ::grpc::ServerWriter< ::JetStream::ResImage>* writer){
  55. openImageStream_=true;
  56. printf("打开视频流\n");
  57. while(openImageStream_){
  58. if(update_){
  59. writer->Write(frame_.images());
  60. update_=false;
  61. }
  62. if(context->IsCancelled()){
  63. printf(" canceled\n");
  64. return ::grpc::Status::OK;
  65. }
  66. std::this_thread::sleep_for(std::chrono::milliseconds(33));
  67. }
  68. printf("关闭视频流\n");
  69. return ::grpc::Status::OK;
  70. }
  71. ::grpc::Status CustomServer::CloseImageStream(::grpc::ServerContext* context, const ::JetStream::RequestCmd* request,
  72. ::JetStream::Response* response){
  73. openImageStream_=false;
  74. return ::grpc::Status::OK;
  75. }
  76. ::grpc::Status CustomServer::CloseMeasureDataStream(::grpc::ServerContext* context, const ::JetStream::RequestCmd* request,
  77. ::JetStream::Response* response){
  78. openDataStream_=false;
  79. return ::grpc::Status::OK;
  80. }
  81. ::grpc::Status CustomServer::Detect(::grpc::ServerContext* context,
  82. const ::JetStream::RequestCmd* request, ::JetStream::ResFrame* response){
  83. if(waitForUpdata()){
  84. response->CopyFrom(frame_);
  85. return ::grpc::Status::OK;
  86. }else{
  87. return ::grpc::Status(::grpc::StatusCode::DATA_LOSS,"0.3s内没有等到最新数据");
  88. }
  89. }
  90. ::grpc::Status CustomServer::GetCloud(::grpc::ServerContext* context, const ::JetStream::RequestCmd* request,
  91. ::JetStream::ResCloud* response){
  92. if(waitForUpdata()==false){
  93. return ::grpc::Status(::grpc::StatusCode::DATA_LOSS,"0.3s内没有等到最新数据");
  94. }
  95. int id=request->id();
  96. switch (id)
  97. {
  98. case 1:response->mutable_cloud1()->CopyFrom(frame_.clouds().cloud1());break;
  99. case 2: response->mutable_cloud2()->CopyFrom(frame_.clouds().cloud2());break;
  100. case 3: response->mutable_cloud3()->CopyFrom(frame_.clouds().cloud3());break;
  101. case 4: response->mutable_cloud4()->CopyFrom(frame_.clouds().cloud4());break;
  102. default:
  103. response->CopyFrom(frame_.clouds());
  104. break;
  105. }
  106. return ::grpc::Status::OK;
  107. }
  108. ::grpc::Status CustomServer::GetImage(::grpc::ServerContext* context, const ::JetStream::RequestCmd* request,
  109. ::JetStream::ResImage* response){
  110. if(waitForUpdata()==false){
  111. return ::grpc::Status(::grpc::StatusCode::DATA_LOSS,"0.3s内没有等到最新数据");
  112. }
  113. int id=request->id();
  114. switch (id)
  115. {
  116. case 1:response->mutable_img1()->CopyFrom(frame_.images().img1());break;
  117. case 2: response->mutable_img2()->CopyFrom(frame_.images().img2());break;
  118. case 3: response->mutable_img3()->CopyFrom(frame_.images().img3());break;
  119. case 4: response->mutable_img4()->CopyFrom(frame_.images().img4());break;
  120. default:
  121. response->CopyFrom(frame_.images());
  122. break;
  123. }
  124. return ::grpc::Status::OK;
  125. }
  126. StreamRpcServer::StreamRpcServer(){
  127. thread_=nullptr;
  128. }
  129. StreamRpcServer::~StreamRpcServer(){
  130. if(thread_!= nullptr){
  131. server_->Shutdown();
  132. if(thread_->joinable()){
  133. thread_->join();
  134. delete thread_;
  135. }
  136. }
  137. }
  138. void StreamRpcServer::Listenning(std::string ip,int port){
  139. char ipport[64]={0};
  140. sprintf(ipport,"%s:%d",ip.c_str(),port);
  141. builder_.AddListeningPort(ipport, grpc::InsecureServerCredentials());
  142. builder_.RegisterService(&service_);
  143. server_=builder_.BuildAndStart();
  144. thread_=new std::thread(&StreamRpcServer::WaitThread,this);
  145. }
  146. void StreamRpcServer::WaitThread(void* p){
  147. StreamRpcServer* server=(StreamRpcServer*)p;
  148. server->server_->Wait();
  149. }
  150. void StreamRpcServer::ResetData(const ::JetStream::ResFrame& frame){
  151. service_.update(frame);
  152. }
  153. ::JetStream::Image StreamRpcServer::cvMat2JetImage(cv::Mat& image){
  154. ::JetStream::Image img;
  155. img.set_width(image.cols);
  156. img.set_height(image.rows);
  157. img.set_channel(image.channels());
  158. img.set_data(image.data,image.cols*image.rows*image.channels());
  159. return img;
  160. }
  161. ::JetStream::PointCloud StreamRpcServer::pcl2JetPointCloud(pcl::PointCloud<pcl::PointXYZI>::Ptr pcl){
  162. int size=pcl->size();
  163. char* data=(char*)malloc(size*4*sizeof(short));
  164. memset(data,0,size*4*sizeof(short));
  165. for(int i=0;i<size;++i){
  166. if(fabs(pcl->points[i].x)>=5 || fabs(pcl->points[i].y)>=5
  167. ||fabs(pcl->points[i].z)>=5)
  168. continue;
  169. short xyz[4]={0};
  170. xyz[0]=short(pcl->points[i].x/5.*32750);
  171. xyz[1]=short(pcl->points[i].y/5.*32750);
  172. xyz[2]=short(pcl->points[i].z/5.*32750);
  173. xyz[3]=short(pcl->points[i].intensity/5.*32750.);
  174. memcpy(data+i*4*sizeof(short),xyz,4*sizeof(short));
  175. }
  176. ::JetStream::PointCloud cloud;
  177. cloud.set_size(pcl->size());
  178. cloud.set_data(data,size*4*sizeof(short));
  179. free(data);
  180. return cloud;
  181. }
  182. ::JetStream::PointCloud StreamRpcServer::cvPointMat2JetPointCloud(cv::Mat &image) {
  183. std::vector<short> point_data;
  184. for (int rows = 0; rows < image.rows; rows++) {
  185. for (int cols = 0; cols < image.cols; cols++) {
  186. if (fabs(image.at<cv::Vec4f>(rows, cols)[0]) > 5 ||
  187. fabs(image.at<cv::Vec4f>(rows, cols)[1]) > 5 ||
  188. fabs(image.at<cv::Vec4f>(rows, cols)[2]) > 5
  189. ) { continue;}
  190. if (fabs(image.at<cv::Vec4f>(rows, cols)[0]) == 0 &&
  191. fabs(image.at<cv::Vec4f>(rows, cols)[1]) == 0 &&
  192. fabs(image.at<cv::Vec4f>(rows, cols)[2]) == 0
  193. ) {continue;}
  194. point_data.emplace_back(short(image.at<cv::Vec4f>(rows, cols)[0]/5.*32750));
  195. point_data.emplace_back(short(image.at<cv::Vec4f>(rows, cols)[1]/5.*32750));
  196. point_data.emplace_back(short(image.at<cv::Vec4f>(rows, cols)[2]/5.*32750));
  197. point_data.emplace_back(short(image.at<cv::Vec4f>(rows, cols)[3]/5.*32750));
  198. }
  199. }
  200. int size=point_data.size()/4;
  201. char* data=(char*)malloc(size*4*sizeof(short));
  202. memset(data,0,size*4*sizeof(short));
  203. memcpy(data, point_data.data(), size*4*sizeof(short));
  204. ::JetStream::PointCloud cloud;
  205. cloud.set_size(size);
  206. cloud.set_data(data,size*4*sizeof(short));
  207. free(data);
  208. return cloud;
  209. }