#include "streamServer.h" #include #include "communication/transitData.h" CustomServer::CustomServer() { update_ = false; } CustomServer::~CustomServer() { openDataStream_ = false; openImageStream_ = true; } ::grpc::Status CustomServer::OpenMeasureDataStream(::grpc::ServerContext *context, const ::JetStream::RequestCmd *request, ::grpc::ServerWriter<::JetStream::MeasureInfo> *writer) { openDataStream_ = true; printf("打开测量数据流\n"); while (openDataStream_) { TransitData::MeasureInfo info; if (TransitData::get_instance_pointer()->getMeasureInfo(info, 0.3)) { ::JetStream::MeasureInfo jetInfo; jetInfo.set_x(info.x); jetInfo.set_y(info.y); jetInfo.set_trans_x(info.transx); jetInfo.set_trans_y(info.transy); jetInfo.set_theta(info.theta); jetInfo.set_width(info.width); jetInfo.set_wheelbase(info.wheelbase); jetInfo.set_ftheta(info.ftheta); jetInfo.set_border_plc(info.border_plc); jetInfo.set_border_display(info.border_display); jetInfo.set_error(info.error); writer->Write(jetInfo); } if (context->IsCancelled()) { printf("Data Stream canceled\n"); return ::grpc::Status::OK; } std::this_thread::sleep_for(std::chrono::milliseconds(100)); } printf("关闭测量数据流\n"); return ::grpc::Status::OK; } ::grpc::Status CustomServer::OpenImageStream(::grpc::ServerContext *context, const ::JetStream::RequestCmd *request, ::grpc::ServerWriter<::JetStream::ResImage> *writer) { openImageStream_ = true; printf("打开视频流\n"); while (openImageStream_) { ::JetStream::ResImage jetIamages; bool ret[4] = {0}; cv::Mat image[4]; for (int i = 0; i < 4; i++) { ret[i] = TransitData::get_instance_pointer()->getImage(image[i], i, 0.3); } if (ret[0]) jetIamages.mutable_img1()->CopyFrom(cvMat2JetImage(image[0])); if (ret[1]) jetIamages.mutable_img2()->CopyFrom(cvMat2JetImage(image[1])); if (ret[2]) jetIamages.mutable_img3()->CopyFrom(cvMat2JetImage(image[2])); if (ret[3]) jetIamages.mutable_img4()->CopyFrom(cvMat2JetImage(image[3])); writer->Write(jetIamages); if (context->IsCancelled()) { printf(" canceled\n"); return ::grpc::Status::OK; } std::this_thread::sleep_for(std::chrono::milliseconds(33)); } printf("关闭视频流\n"); return ::grpc::Status::OK; } ::grpc::Status CustomServer::CloseImageStream(::grpc::ServerContext *context, const ::JetStream::RequestCmd *request, ::JetStream::Response *response) { openImageStream_ = false; return ::grpc::Status::OK; } ::grpc::Status CustomServer::CloseMeasureDataStream(::grpc::ServerContext *context, const ::JetStream::RequestCmd *request, ::JetStream::Response *response) { openDataStream_ = false; return ::grpc::Status::OK; } ::grpc::Status CustomServer::GetCloud(::grpc::ServerContext *context, const ::JetStream::RequestCmd *request, ::JetStream::ResCloud *response) { int id = request->id(); std::vector mask; cv::Mat imageCL; ::JetStream::ResCloud jetClouds; ::JetStream::PointCloud cloud; if (TransitData::get_instance_pointer()->getImageCL(imageCL, id, 0.3)) { // for (int cols = 0; cols < 640 - 0; cols++) { // for (int rows = 0; rows < 480 - 0; rows++) { // imageCL.at(rows, cols)[0] > 5. ? printf("%.f\n", imageCL.at(rows, cols)[0]) : false; // } // } int size = imageCL.rows * imageCL.cols; char *data = (char *) malloc(size * 4 * sizeof(short)); memset(data, 0, size * 4 * sizeof(short)); if (TransitData::get_instance_pointer()->getMask(mask, id, 0.3)) { printf("%d %d\n", mask[0].x, mask[0].y); for (auto i : mask) { imageCL.at(i)[3] = 0.95; } } int count = 0; cv::Mat mat = cv::Mat::zeros(480, 640, CV_8U); for (int i = 0; i < imageCL.rows; ++i) { for (int j = 0; j < imageCL.cols; ++j) { cv::Vec4f pt = imageCL.at(i, j); if (fabs(pt[0] < 1e-6) && fabs(pt[1]) < 1e-6 && fabs(pt[2]) < 1e-6) continue; else { short xyz[4] = {0}; xyz[0] = short(pt[0] / 5. * 32750); xyz[1] = short(pt[1] / 5. * 32750); xyz[2] = short(pt[2] / 5. * 32750); xyz[3] = short(pt[3] / 5. * 32750.); if (pt[3] == 1.0) { mat.at(i ,j) = 255; // printf("%d %d %d %d %d %d\n", xyz[0], xyz[1], xyz[2], xyz[3], i, j); } // if (fabs(pt[0]) > 5 || fabs(pt[1]) > 5 || fabs(pt[2]) > 5 || fabs(pt[3]) > 5) { // std::cout << pt[0] << " " << pt[1] << " " << pt[2] << " " << pt[3] << "\n"; // } memcpy(data + count * 4 * sizeof(short), xyz, 4 * sizeof(short)); count++; } } } // cv::imwrite("/home/zx/sss.jpeg", mat); // cv::imshow("22222", mat); // cv::waitKey(0); cloud.set_size(count); cloud.set_data(data, count * 4 * sizeof(short)); free(data); switch (id + 1) { case 1: jetClouds.mutable_cloud1()->CopyFrom(cloud); break; case 2: jetClouds.mutable_cloud2()->CopyFrom(cloud); break; case 3: jetClouds.mutable_cloud3()->CopyFrom(cloud); break; case 4: jetClouds.mutable_cloud4()->CopyFrom(cloud); break; default: break; } response->CopyFrom(jetClouds); return ::grpc::Status::OK; } else { return ::grpc::Status(::grpc::StatusCode::DATA_LOSS, "0.3s内没有等到最新数据"); } } ::grpc::Status CustomServer::GetImage(::grpc::ServerContext *context, const ::JetStream::RequestCmd *request, ::JetStream::ResImage *response) { int id = request->id(); cv::Mat image; ::JetStream::Image jetImgs[4]; if (id == -1) { for (int i = 0; i < 4; ++i) { if (TransitData::get_instance_pointer()->getImage(image, i, 0.3)) { jetImgs[i].CopyFrom(cvMat2JetImage(image)); } } response->mutable_img1()->CopyFrom(jetImgs[0]); response->mutable_img2()->CopyFrom(jetImgs[1]); response->mutable_img3()->CopyFrom(jetImgs[2]); response->mutable_img4()->CopyFrom(jetImgs[3]); return ::grpc::Status::OK; } ::JetStream::Image jetImage; if (TransitData::get_instance_pointer()->getImage(image, id, 0.3)) { jetImage.CopyFrom(cvMat2JetImage(image)); switch (id) { case 0: response->mutable_img1()->CopyFrom(jetImage); break; case 1: response->mutable_img2()->CopyFrom(jetImage); break; case 2: response->mutable_img3()->CopyFrom(jetImage); break; case 3: response->mutable_img4()->CopyFrom(jetImage); break; default: break; } return ::grpc::Status::OK; } else { return ::grpc::Status(::grpc::StatusCode::DATA_LOSS, "0.3s内没有等到最新数据"); } } ::JetStream::Image CustomServer::cvMat2JetImage(cv::Mat &image) { ::JetStream::Image img; img.set_width(image.cols); img.set_height(image.rows); img.set_channel(image.channels()); img.set_data(image.data, image.cols * image.rows * image.channels()); return img; } StreamRpcServer::StreamRpcServer() { thread_ = nullptr; } StreamRpcServer::~StreamRpcServer() { if (thread_ != nullptr) { server_->Shutdown(); if (thread_->joinable()) { thread_->join(); delete thread_; } } } void StreamRpcServer::Listenning(std::string ip, int port) { char ipport[64] = {0}; printf("listenning grpc %s ", ip.c_str()); sprintf(ipport, "%s:%d", ip.c_str(), port); builder_.AddListeningPort(ipport, grpc::InsecureServerCredentials()); builder_.RegisterService(&service_); server_ = builder_.BuildAndStart(); thread_ = new std::thread(&StreamRpcServer::WaitThread, this); } void StreamRpcServer::WaitThread(void *p) { StreamRpcServer *server = (StreamRpcServer *) p; printf(" listenning ........\n"); server->server_->Wait(); printf(" server closesd.....\n"); }