rslidar_mqtt_async.cpp 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. /**
  2. * @project ALG_WHeelData
  3. * @brief 从mqtt服务器获取点云数据并进行处理,仅给外部一个获取数据的接口。
  4. * @author lz
  5. * @data 2023/4/21
  6. **/
  7. #include "rslidar_mqtt_async.h"
  8. bool CloudDataMqttAsync::init() {
  9. m_message_arrived_callback_ = CloudDataArrived;
  10. MqttAsyncClient::init_from_proto(MQTT_CONFIG_PATH);
  11. return true;
  12. }
  13. int CloudDataMqttAsync::CloudDataArrived(MqttAsyncClient *client, char *topicName, int topicLen,
  14. MQTTAsync_message *message) {
  15. std::string topic = topicName;
  16. DLOG(INFO) << "received message from topic : " << topic << ".";
  17. auto *m_client = dynamic_cast<CloudDataMqttAsync *>(client);
  18. m_client->insertCloudData(topic, (unsigned char *) message->payload, message->payloadlen);
  19. return true;
  20. }
  21. bool CloudDataMqttAsync::isTopicLoss(const std::string &topic) {
  22. // 检测时间戳是否超时
  23. auto iter_time = m_cloud_time_stamp_map.find(topic);
  24. if (iter_time == m_cloud_time_stamp_map.end()) {
  25. return false;
  26. }
  27. std::chrono::duration<double> diff = std::chrono::steady_clock::now() - iter_time->second;
  28. if (diff.count() > 3) {
  29. LOG(WARNING) << "cloud not update, please re-check lidar mqtt statu.";
  30. return false;
  31. }
  32. return true;
  33. }
  34. bool CloudDataMqttAsync::getCloudData(const std::string &topic, pcl::PointCloud<pcl::PointXYZ>::Ptr &cloud) {
  35. std::lock_guard<std::mutex> lck(m_cloud_mutex);
  36. // 检测话题是否长时间未生数据存储
  37. if (!isTopicLoss(topic)) {
  38. return false;
  39. }
  40. // 检测点云数据是否存在
  41. auto iter = m_full_clouds_map.find(topic);
  42. if (iter == m_full_clouds_map.end()) {
  43. LOG(WARNING) << "find topic " << topic << " in m_cloud_map error, please re-check save data code.";
  44. return false;
  45. }
  46. for (auto &point: iter->second->points) {
  47. cloud->emplace_back(point.x, point.y, point.z);
  48. }
  49. return true;
  50. }
  51. bool CloudDataMqttAsync::getCloudData(const std::string &topic, pcl::PointCloud<pcl::PointXYZI>::Ptr &cloud) {
  52. std::lock_guard<std::mutex> lck(m_cloud_mutex);
  53. // 检测话题是否长时间未生数据存储
  54. if (!isTopicLoss(topic)) {
  55. return false;
  56. }
  57. // 检测点云数据是否存在
  58. auto iter = m_full_clouds_map.find(topic);
  59. if (iter == m_full_clouds_map.end()) {
  60. LOG(WARNING) << "find topic " << topic << " in m_cloud_map error, please re-check save data code.";
  61. return false;
  62. }
  63. // 获取点云输据
  64. pcl::PointXYZI pt;
  65. for (auto &point: iter->second->points) {
  66. pt.x = point.x;
  67. pt.y = point.y;
  68. pt.z = point.z;
  69. pt.intensity = point.intensity;
  70. cloud->push_back(pt);
  71. }
  72. return true;
  73. }
  74. bool CloudDataMqttAsync::getCloudData(const std::string &topic, pcl::PointCloud<pcl::PointRPRY>::Ptr &cloud) {
  75. std::lock_guard<std::mutex> lck(m_cloud_mutex);
  76. // 检测话题是否长时间未生数据存储
  77. if (!isTopicLoss(topic)) {
  78. return false;
  79. }
  80. // 检测点云数据是否存在
  81. auto iter = m_full_clouds_map.find(topic);
  82. if (iter == m_full_clouds_map.end()) {
  83. LOG(WARNING) << "find topic " << topic << " in m_cloud_map error, please re-check save data code.";
  84. return false;
  85. }
  86. pcl::PointRPRY pt{};
  87. for (auto &point: iter->second->points) {
  88. pt.pitch = point.pitch;
  89. pt.yaw = point.yaw;
  90. pt.roll = point.roll;
  91. pt.r = point.r;
  92. cloud->push_back(pt);
  93. }
  94. return true;
  95. }
  96. bool CloudDataMqttAsync::getCloudData(const std::string &topic, pcl::PointCloud<pcl::PointALL>::Ptr &cloud) {
  97. std::lock_guard<std::mutex> lck(m_cloud_mutex);
  98. // 检测话题是否长时间未生数据存储
  99. if (!isTopicLoss(topic)) {
  100. return false;
  101. }
  102. // 检测点云数据是否存在
  103. auto iter = m_full_clouds_map.find(topic);
  104. if (iter == m_full_clouds_map.end()) {
  105. LOG(WARNING) << "find topic " << topic << " in m_cloud_map error, please re-check save data code.";
  106. return false;
  107. }
  108. cloud->insert(cloud->end(), iter->second->points.begin(), iter->second->points.end());
  109. return true;
  110. }
  111. bool CloudDataMqttAsync::insertCloudData(std::string &topic, unsigned char *cloud_data, int size) {
  112. pcl::PointCloud<pcl::PointALL>::Ptr clouds(new pcl::PointCloud<pcl::PointALL>);
  113. // 解压数据,获得点云
  114. m_unpacket->unpacket((unsigned char *) cloud_data, size, clouds);
  115. std::lock_guard<std::mutex> lck(m_cloud_mutex);
  116. auto iter_full_cloud = m_full_clouds_map.find(topic);
  117. if (iter_full_cloud == m_full_clouds_map.end()) {
  118. m_full_clouds_map.insert(
  119. std::pair<std::string, pcl::PointCloud<pcl::PointALL>::Ptr>(topic, new pcl::PointCloud<pcl::PointALL>));
  120. iter_full_cloud = m_full_clouds_map.find(topic);
  121. }
  122. auto iter_cloud_time_stamp = m_cloud_time_stamp_map.find(topic);
  123. if (iter_cloud_time_stamp == m_cloud_time_stamp_map.end()) {
  124. m_cloud_time_stamp_map.insert(
  125. std::pair<std::string, std::chrono::steady_clock::time_point>(topic, std::chrono::steady_clock::now()));
  126. iter_cloud_time_stamp = m_cloud_time_stamp_map.find(topic);
  127. }
  128. iter_full_cloud->second->swap(*clouds);
  129. iter_cloud_time_stamp->second = std::chrono::steady_clock::now();
  130. return true;
  131. }