|
- #include "streamServer.h"
- #include <unistd.h>
- #include "communication/transitData.h"
- CustomServer::CustomServer() {
- update_ = false;
- }
- CustomServer::~CustomServer() {
- openDataStream_ = false;
- openImageStream_ = true;
- }
- ::grpc::Status
- CustomServer::OpenMeasureDataStream(::grpc::ServerContext *context, const ::JetStream::RequestCmd *request,
- ::grpc::ServerWriter<::JetStream::MeasureInfo> *writer) {
- openDataStream_ = true;
- printf("打开测量数据流\n");
- while (openDataStream_) {
- TransitData::MeasureInfo info;
- if (TransitData::get_instance_pointer()->getMeasureInfo(info, 0.3)) {
- ::JetStream::MeasureInfo jetInfo;
- jetInfo.set_x(info.x);
- jetInfo.set_y(info.y);
- jetInfo.set_trans_x(info.transx);
- jetInfo.set_trans_y(info.transy);
- jetInfo.set_theta(info.theta);
- jetInfo.set_width(info.width);
- jetInfo.set_wheelbase(info.wheelbase);
- jetInfo.set_ftheta(info.ftheta);
- jetInfo.set_border_plc(info.border_plc);
- jetInfo.set_border_display(info.border_display);
- jetInfo.set_error(info.error);
- writer->Write(jetInfo);
- }
- if (context->IsCancelled()) {
- printf("Data Stream canceled\n");
- return ::grpc::Status::OK;
- }
- std::this_thread::sleep_for(std::chrono::milliseconds(100));
- }
- printf("关闭测量数据流\n");
- return ::grpc::Status::OK;
- }
- ::grpc::Status CustomServer::OpenImageStream(::grpc::ServerContext *context,
- const ::JetStream::RequestCmd *request,
- ::grpc::ServerWriter<::JetStream::ResImage> *writer) {
- openImageStream_ = true;
- printf("打开视频流\n");
- while (openImageStream_) {
- ::JetStream::ResImage jetIamages;
- bool ret[4] = {0};
- cv::Mat image[4];
- for (int i = 0; i < 4; i++) {
- ret[i] = TransitData::get_instance_pointer()->getImage(image[i], i, 0.3);
- }
- if (ret[0]) jetIamages.mutable_img1()->CopyFrom(cvMat2JetImage(image[0]));
- if (ret[1]) jetIamages.mutable_img2()->CopyFrom(cvMat2JetImage(image[1]));
- if (ret[2]) jetIamages.mutable_img3()->CopyFrom(cvMat2JetImage(image[2]));
- if (ret[3]) jetIamages.mutable_img4()->CopyFrom(cvMat2JetImage(image[3]));
- writer->Write(jetIamages);
- if (context->IsCancelled()) {
- printf(" canceled\n");
- return ::grpc::Status::OK;
- }
- std::this_thread::sleep_for(std::chrono::milliseconds(33));
- }
- printf("关闭视频流\n");
- return ::grpc::Status::OK;
- }
- ::grpc::Status CustomServer::CloseImageStream(::grpc::ServerContext *context, const ::JetStream::RequestCmd *request,
- ::JetStream::Response *response) {
- openImageStream_ = false;
- return ::grpc::Status::OK;
- }
- ::grpc::Status
- CustomServer::CloseMeasureDataStream(::grpc::ServerContext *context, const ::JetStream::RequestCmd *request,
- ::JetStream::Response *response) {
- openDataStream_ = false;
- return ::grpc::Status::OK;
- }
- ::grpc::Status CustomServer::GetCloud(::grpc::ServerContext *context, const ::JetStream::RequestCmd *request,
- ::JetStream::ResCloud *response) {
- int id = request->id();
- std::vector<cv::Point> mask;
- cv::Mat imageCL;
- ::JetStream::ResCloud jetClouds;
- ::JetStream::PointCloud cloud;
- if (TransitData::get_instance_pointer()->getImageCL(imageCL, id, 0.3)) {
- // for (int cols = 0; cols < 640 - 0; cols++) {
- // for (int rows = 0; rows < 480 - 0; rows++) {
- // imageCL.at<cv::Vec4f>(rows, cols)[0] > 5. ? printf("%.f\n", imageCL.at<cv::Vec4f>(rows, cols)[0]) : false;
- // }
- // }
- int size = imageCL.rows * imageCL.cols;
- char *data = (char *) malloc(size * 4 * sizeof(short));
- memset(data, 0, size * 4 * sizeof(short));
- if (TransitData::get_instance_pointer()->getMask(mask, id, 0.3)) {
- printf("%d %d\n", mask[0].x, mask[0].y);
- for (auto i : mask) {
- imageCL.at<cv::Vec4f>(i)[3] = 0.95;
- }
- }
- int count = 0;
- cv::Mat mat = cv::Mat::zeros(480, 640, CV_8U);
- for (int i = 0; i < imageCL.rows; ++i) {
- for (int j = 0; j < imageCL.cols; ++j) {
- cv::Vec4f pt = imageCL.at<cv::Vec4f>(i, j);
- if (fabs(pt[0] < 1e-6) && fabs(pt[1]) < 1e-6 && fabs(pt[2]) < 1e-6)
- continue;
- else {
- short xyz[4] = {0};
- xyz[0] = short(pt[0] / 5. * 32750);
- xyz[1] = short(pt[1] / 5. * 32750);
- xyz[2] = short(pt[2] / 5. * 32750);
- xyz[3] = short(pt[3] / 5. * 32750.);
- if (pt[3] == 1.0) {
- mat.at<uchar>(i ,j) = 255;
- // printf("%d %d %d %d %d %d\n", xyz[0], xyz[1], xyz[2], xyz[3], i, j);
- }
- // if (fabs(pt[0]) > 5 || fabs(pt[1]) > 5 || fabs(pt[2]) > 5 || fabs(pt[3]) > 5) {
- // std::cout << pt[0] << " " << pt[1] << " " << pt[2] << " " << pt[3] << "\n";
- // }
- memcpy(data + count * 4 * sizeof(short), xyz, 4 * sizeof(short));
- count++;
- }
- }
- }
- // cv::imwrite("/home/zx/sss.jpeg", mat);
- // cv::imshow("22222", mat);
- // cv::waitKey(0);
- cloud.set_size(count);
- cloud.set_data(data, count * 4 * sizeof(short));
- free(data);
- switch (id + 1) {
- case 1:
- jetClouds.mutable_cloud1()->CopyFrom(cloud);
- break;
- case 2:
- jetClouds.mutable_cloud2()->CopyFrom(cloud);
- break;
- case 3:
- jetClouds.mutable_cloud3()->CopyFrom(cloud);
- break;
- case 4:
- jetClouds.mutable_cloud4()->CopyFrom(cloud);
- break;
- default:
- break;
- }
- response->CopyFrom(jetClouds);
- return ::grpc::Status::OK;
- } else {
- return ::grpc::Status(::grpc::StatusCode::DATA_LOSS, "0.3s内没有等到最新数据");
- }
- }
- ::grpc::Status CustomServer::GetImage(::grpc::ServerContext *context, const ::JetStream::RequestCmd *request,
- ::JetStream::ResImage *response) {
- int id = request->id();
- cv::Mat image;
- ::JetStream::Image jetImgs[4];
- if (id == -1) {
- for (int i = 0; i < 4; ++i) {
- if (TransitData::get_instance_pointer()->getImage(image, i, 0.3)) {
- jetImgs[i].CopyFrom(cvMat2JetImage(image));
- }
- }
- response->mutable_img1()->CopyFrom(jetImgs[0]);
- response->mutable_img2()->CopyFrom(jetImgs[1]);
- response->mutable_img3()->CopyFrom(jetImgs[2]);
- response->mutable_img4()->CopyFrom(jetImgs[3]);
- return ::grpc::Status::OK;
- }
- ::JetStream::Image jetImage;
- if (TransitData::get_instance_pointer()->getImage(image, id, 0.3)) {
- jetImage.CopyFrom(cvMat2JetImage(image));
- switch (id) {
- case 0:
- response->mutable_img1()->CopyFrom(jetImage);
- break;
- case 1:
- response->mutable_img2()->CopyFrom(jetImage);
- break;
- case 2:
- response->mutable_img3()->CopyFrom(jetImage);
- break;
- case 3:
- response->mutable_img4()->CopyFrom(jetImage);
- break;
- default:
- break;
- }
- return ::grpc::Status::OK;
- } else {
- return ::grpc::Status(::grpc::StatusCode::DATA_LOSS, "0.3s内没有等到最新数据");
- }
- }
- ::JetStream::Image CustomServer::cvMat2JetImage(cv::Mat &image) {
- ::JetStream::Image img;
- img.set_width(image.cols);
- img.set_height(image.rows);
- img.set_channel(image.channels());
- img.set_data(image.data, image.cols * image.rows * image.channels());
- return img;
- }
- StreamRpcServer::StreamRpcServer() {
- thread_ = nullptr;
- }
- StreamRpcServer::~StreamRpcServer() {
- if (thread_ != nullptr) {
- server_->Shutdown();
- if (thread_->joinable()) {
- thread_->join();
- delete thread_;
- }
- }
- }
- void StreamRpcServer::Listenning(std::string ip, int port) {
- char ipport[64] = {0};
- printf("listenning grpc %s ", ip.c_str());
- sprintf(ipport, "%s:%d", ip.c_str(), port);
- builder_.AddListeningPort(ipport, grpc::InsecureServerCredentials());
- builder_.RegisterService(&service_);
- server_ = builder_.BuildAndStart();
- thread_ = new std::thread(&StreamRpcServer::WaitThread, this);
- }
- void StreamRpcServer::WaitThread(void *p) {
- StreamRpcServer *server = (StreamRpcServer *) p;
- printf(" listenning ........\n");
- server->server_->Wait();
- printf(" server closesd.....\n");
- }
|