/** * @project ALG_WHeelData * @brief 从mqtt服务器获取点云数据并进行处理,仅给外部一个获取数据的接口。 * @author lz * @data 2023/4/21 **/ #include "rslidar_mqtt_async.h" bool CloudDataMqttAsync::init() { m_message_arrived_callback_ = CloudDataArrived; MqttAsyncClient::init_from_proto(MQTT_CONFIG_PATH); return true; } int CloudDataMqttAsync::CloudDataArrived(MqttAsyncClient *client, char *topicName, int topicLen, MQTTAsync_message *message) { std::string topic = topicName; DLOG(INFO) << "received message from topic : " << topic << "."; auto *m_client = dynamic_cast(client); m_client->insertCloudData(topic, (unsigned char *) message->payload, message->payloadlen); return true; } bool CloudDataMqttAsync::isTopicLoss(const std::string &topic) { // 检测时间戳是否超时 auto iter_time = m_cloud_time_stamp_map.find(topic); if (iter_time == m_cloud_time_stamp_map.end()) { return false; } std::chrono::duration diff = std::chrono::steady_clock::now() - iter_time->second; if (diff.count() > 3) { LOG(WARNING) << "cloud not update, please re-check lidar mqtt statu."; return false; } return true; } bool CloudDataMqttAsync::getCloudData(const std::string &topic, pcl::PointCloud::Ptr &cloud) { std::lock_guard lck(m_cloud_mutex); // 检测话题是否长时间未生数据存储 if (!isTopicLoss(topic)) { return false; } // 检测点云数据是否存在 auto iter = m_full_clouds_map.find(topic); if (iter == m_full_clouds_map.end()) { LOG(WARNING) << "find topic " << topic << " in m_cloud_map error, please re-check save data code."; return false; } for (auto &point: iter->second->points) { cloud->emplace_back(point.x, point.y, point.z); } return true; } bool CloudDataMqttAsync::getCloudData(const std::string &topic, pcl::PointCloud::Ptr &cloud) { std::lock_guard lck(m_cloud_mutex); // 检测话题是否长时间未生数据存储 if (!isTopicLoss(topic)) { return false; } // 检测点云数据是否存在 auto iter = m_full_clouds_map.find(topic); if (iter == m_full_clouds_map.end()) { LOG(WARNING) << "find topic " << topic << " in m_cloud_map error, please re-check save data code."; return false; } // 获取点云输据 pcl::PointXYZI pt; for (auto &point: iter->second->points) { pt.x = point.x; pt.y = point.y; pt.z = point.z; pt.intensity = point.intensity; cloud->push_back(pt); } return true; } bool CloudDataMqttAsync::getCloudData(const std::string &topic, pcl::PointCloud::Ptr &cloud) { std::lock_guard lck(m_cloud_mutex); // 检测话题是否长时间未生数据存储 if (!isTopicLoss(topic)) { return false; } // 检测点云数据是否存在 auto iter = m_full_clouds_map.find(topic); if (iter == m_full_clouds_map.end()) { LOG(WARNING) << "find topic " << topic << " in m_cloud_map error, please re-check save data code."; return false; } pcl::PointRPRY pt{}; for (auto &point: iter->second->points) { pt.pitch = point.pitch; pt.yaw = point.yaw; pt.roll = point.roll; pt.r = point.r; cloud->push_back(pt); } return true; } bool CloudDataMqttAsync::getCloudData(const std::string &topic, pcl::PointCloud::Ptr &cloud) { std::lock_guard lck(m_cloud_mutex); // 检测话题是否长时间未生数据存储 if (!isTopicLoss(topic)) { return false; } // 检测点云数据是否存在 auto iter = m_full_clouds_map.find(topic); if (iter == m_full_clouds_map.end()) { LOG(WARNING) << "find topic " << topic << " in m_cloud_map error, please re-check save data code."; return false; } cloud->insert(cloud->end(), iter->second->points.begin(), iter->second->points.end()); return true; } bool CloudDataMqttAsync::insertCloudData(std::string &topic, unsigned char *cloud_data, int size) { pcl::PointCloud::Ptr clouds(new pcl::PointCloud); // 解压数据,获得点云 m_unpacket->unpacket((unsigned char *) cloud_data, size, clouds); std::lock_guard lck(m_cloud_mutex); auto iter_full_cloud = m_full_clouds_map.find(topic); if (iter_full_cloud == m_full_clouds_map.end()) { m_full_clouds_map.insert( std::pair::Ptr>(topic, new pcl::PointCloud)); iter_full_cloud = m_full_clouds_map.find(topic); } auto iter_cloud_time_stamp = m_cloud_time_stamp_map.find(topic); if (iter_cloud_time_stamp == m_cloud_time_stamp_map.end()) { m_cloud_time_stamp_map.insert( std::pair(topic, std::chrono::steady_clock::now())); iter_cloud_time_stamp = m_cloud_time_stamp_map.find(topic); } iter_full_cloud->second->swap(*clouds); iter_cloud_time_stamp->second = std::chrono::steady_clock::now(); return true; }