123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252 |
- #include "streamServer.h"
- #include <unistd.h>
- CustomServer::CustomServer(){
- update_=false;
- }
- CustomServer::~CustomServer(){
- openDataStream_=false;
- openImageStream_=true;
- }
- void CustomServer::update(const ::JetStream::ResFrame& frame){
- frame_.CopyFrom(frame);
- update_=true;
- }
- bool CustomServer::waitForUpdata(float tm){
- update_=false;
- //等待最新数据
- auto tp=std::chrono::steady_clock::now();
- double time = 0;
- while(time<tm && update_==false ){
- std::this_thread::sleep_for(std::chrono::milliseconds(10));
- auto now=std::chrono::steady_clock::now();
- auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(now - tp);
- time = double(duration.count()) * std::chrono::milliseconds::period::num /
- std::chrono::milliseconds::period::den;
- }
- if(update_==false){
- printf("在%fs内没有等待最新数据\n",tm);
- return false;
- }
- return true;
- }
- ::grpc::Status CustomServer::OpenMeasureDataStream(::grpc::ServerContext* context, const ::JetStream::RequestCmd* request,
- ::grpc::ServerWriter< ::JetStream::MeasureInfo>* writer){
- openDataStream_=true;
- printf("打开测量数据流\n");
- while(openDataStream_){
- if(waitForUpdata()){
- writer->Write(frame_.measure_info());
- }else{
- ::JetStream::MeasureInfo info;
- info.set_error("没有数据");
- writer->Write(info);
- }
- if(context->IsCancelled()){
- printf("Data Stream canceled\n");
- return ::grpc::Status::OK;
- }
- std::this_thread::sleep_for(std::chrono::milliseconds(100));
- }
- printf("关闭测量数据流\n");
- return ::grpc::Status::OK;
- }
- ::grpc::Status CustomServer::OpenImageStream(::grpc::ServerContext* context,
- const ::JetStream::RequestCmd* request, ::grpc::ServerWriter< ::JetStream::ResImage>* writer){
- openImageStream_=true;
- printf("打开视频流\n");
- while(openImageStream_){
- if(update_){
-
- writer->Write(frame_.images());
- update_=false;
- }
- if(context->IsCancelled()){
- printf(" canceled\n");
- return ::grpc::Status::OK;
- }
- std::this_thread::sleep_for(std::chrono::milliseconds(33));
-
- }
- printf("关闭视频流\n");
- return ::grpc::Status::OK;
- }
- ::grpc::Status CustomServer::CloseImageStream(::grpc::ServerContext* context, const ::JetStream::RequestCmd* request,
- ::JetStream::Response* response){
- openImageStream_=false;
- return ::grpc::Status::OK;
-
- }
- ::grpc::Status CustomServer::CloseMeasureDataStream(::grpc::ServerContext* context, const ::JetStream::RequestCmd* request,
- ::JetStream::Response* response){
- openDataStream_=false;
- return ::grpc::Status::OK;
-
- }
- ::grpc::Status CustomServer::Detect(::grpc::ServerContext* context,
- const ::JetStream::RequestCmd* request, ::JetStream::ResFrame* response){
- if(waitForUpdata()){
- response->CopyFrom(frame_);
- return ::grpc::Status::OK;
- }else{
- return ::grpc::Status(::grpc::StatusCode::DATA_LOSS,"0.3s内没有等到最新数据");
- }
-
- }
- ::grpc::Status CustomServer::GetCloud(::grpc::ServerContext* context, const ::JetStream::RequestCmd* request,
- ::JetStream::ResCloud* response){
- if(waitForUpdata()==false){
- return ::grpc::Status(::grpc::StatusCode::DATA_LOSS,"0.3s内没有等到最新数据");
- }
- int id=request->id();
- switch (id)
- {
- case 1:response->mutable_cloud1()->CopyFrom(frame_.clouds().cloud1());break;
- case 2: response->mutable_cloud2()->CopyFrom(frame_.clouds().cloud2());break;
- case 3: response->mutable_cloud3()->CopyFrom(frame_.clouds().cloud3());break;
- case 4: response->mutable_cloud4()->CopyFrom(frame_.clouds().cloud4());break;
- default:
- response->CopyFrom(frame_.clouds());
- break;
- }
- return ::grpc::Status::OK;
- }
- ::grpc::Status CustomServer::GetImage(::grpc::ServerContext* context, const ::JetStream::RequestCmd* request,
- ::JetStream::ResImage* response){
- if(waitForUpdata()==false){
- return ::grpc::Status(::grpc::StatusCode::DATA_LOSS,"0.3s内没有等到最新数据");
- }
- int id=request->id();
- switch (id)
- {
- case 1:response->mutable_img1()->CopyFrom(frame_.images().img1());break;
- case 2: response->mutable_img2()->CopyFrom(frame_.images().img2());break;
- case 3: response->mutable_img3()->CopyFrom(frame_.images().img3());break;
- case 4: response->mutable_img4()->CopyFrom(frame_.images().img4());break;
- default:
- response->CopyFrom(frame_.images());
- break;
- }
- return ::grpc::Status::OK;
- }
- StreamRpcServer::StreamRpcServer(){
- thread_=nullptr;
- }
- StreamRpcServer::~StreamRpcServer(){
-
- if(thread_!= nullptr){
- server_->Shutdown();
- if(thread_->joinable()){
- thread_->join();
- delete thread_;
- }
- }
- }
- void StreamRpcServer::Listenning(std::string ip,int port){
- char ipport[64]={0};
- sprintf(ipport,"%s:%d",ip.c_str(),port);
- builder_.AddListeningPort(ipport, grpc::InsecureServerCredentials());
- builder_.RegisterService(&service_);
- server_=builder_.BuildAndStart();
- thread_=new std::thread(&StreamRpcServer::WaitThread,this);
- }
- void StreamRpcServer::WaitThread(void* p){
- StreamRpcServer* server=(StreamRpcServer*)p;
- server->server_->Wait();
- }
- void StreamRpcServer::ResetData(const ::JetStream::ResFrame& frame){
- service_.update(frame);
- }
- ::JetStream::Image StreamRpcServer::cvMat2JetImage(cv::Mat& image){
- ::JetStream::Image img;
- img.set_width(image.cols);
- img.set_height(image.rows);
- img.set_channel(image.channels());
- img.set_data(image.data,image.cols*image.rows*image.channels());
- return img;
- }
- ::JetStream::PointCloud StreamRpcServer::pcl2JetPointCloud(pcl::PointCloud<pcl::PointXYZI>::Ptr pcl){
- int size=pcl->size();
- char* data=(char*)malloc(size*4*sizeof(short));
- memset(data,0,size*4*sizeof(short));
- for(int i=0;i<size;++i){
- if(fabs(pcl->points[i].x)>=5 || fabs(pcl->points[i].y)>=5
- ||fabs(pcl->points[i].z)>=5)
- continue;
- short xyz[4]={0};
- xyz[0]=short(pcl->points[i].x/5.*32750);
- xyz[1]=short(pcl->points[i].y/5.*32750);
- xyz[2]=short(pcl->points[i].z/5.*32750);
- xyz[3]=short(pcl->points[i].intensity/5.*32750.);
- memcpy(data+i*4*sizeof(short),xyz,4*sizeof(short));
- }
- ::JetStream::PointCloud cloud;
- cloud.set_size(pcl->size());
- cloud.set_data(data,size*4*sizeof(short));
- free(data);
- return cloud;
- }
- ::JetStream::PointCloud StreamRpcServer::cvPointMat2JetPointCloud(cv::Mat &image) {
- std::vector<short> point_data;
- for (int rows = 0; rows < image.rows; rows++) {
- for (int cols = 0; cols < image.cols; cols++) {
- if (fabs(image.at<cv::Vec4f>(rows, cols)[0]) > 5 ||
- fabs(image.at<cv::Vec4f>(rows, cols)[1]) > 5 ||
- fabs(image.at<cv::Vec4f>(rows, cols)[2]) > 5
- ) { continue;}
- if (fabs(image.at<cv::Vec4f>(rows, cols)[0]) == 0 &&
- fabs(image.at<cv::Vec4f>(rows, cols)[1]) == 0 &&
- fabs(image.at<cv::Vec4f>(rows, cols)[2]) == 0
- ) {continue;}
- point_data.emplace_back(short(image.at<cv::Vec4f>(rows, cols)[0]/5.*32750));
- point_data.emplace_back(short(image.at<cv::Vec4f>(rows, cols)[1]/5.*32750));
- point_data.emplace_back(short(image.at<cv::Vec4f>(rows, cols)[2]/5.*32750));
- point_data.emplace_back(short(image.at<cv::Vec4f>(rows, cols)[3]/5.*32750));
- }
- }
- int size=point_data.size()/4;
- char* data=(char*)malloc(size*4*sizeof(short));
- memset(data,0,size*4*sizeof(short));
- memcpy(data, point_data.data(), size*4*sizeof(short));
- ::JetStream::PointCloud cloud;
- cloud.set_size(size);
- cloud.set_data(data,size*4*sizeof(short));
- free(data);
- return cloud;
- }
|