123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168 |
- /**
- * @project ALG_WHeelData
- * @brief 从mqtt服务器获取点云数据并进行处理,仅给外部一个获取数据的接口。
- * @author lz
- * @data 2023/4/21
- **/
- #include "rslidar_mqtt_async.h"
- bool CloudDataMqttAsync::init() {
- m_message_arrived_callback_ = CloudDataArrived;
- MqttAsyncClient::init_from_proto(MQTT_CONFIG_PATH);
- return true;
- }
- int CloudDataMqttAsync::CloudDataArrived(MqttAsyncClient *client, char *topicName, int topicLen,
- MQTTAsync_message *message) {
- std::string topic = topicName;
- DLOG(INFO) << "received message from topic : " << topic << ".";
- auto *m_client = dynamic_cast<CloudDataMqttAsync *>(client);
- m_client->insertCloudData(topic, (unsigned char *) message->payload, message->payloadlen);
- return true;
- }
- bool CloudDataMqttAsync::getCloudData(const std::string &topic, pcl::PointCloud<pcl::PointXYZ>::Ptr &cloud) {
- std::lock_guard<std::mutex> lck(m_cloud_mutex);
- // 检测时间戳是否超时
- auto iter_time = m_cloud_time_stamp_map.find(topic);
- if (iter_time == m_cloud_time_stamp_map.end()) {
- return false;
- }
- std::chrono::duration<double> diff = std::chrono::steady_clock::now() - iter_time->second;
- if (diff.count() > 1) {
- LOG(WARNING) << "cloud not update, please re-check lidar mqtt statu.";
- return false;
- }
- // 检测点云数据是否存在
- auto iter = m_full_clouds_map.find(topic);
- if (iter == m_full_clouds_map.end()) {
- LOG(WARNING) << "find topic " << topic << " in m_cloud_map error, please re-check save data code.";
- return false;
- }
- for (auto &point: iter->second->points) {
- cloud->emplace_back(point.x, point.y, point.z);
- }
- return true;
- }
- bool CloudDataMqttAsync::getCloudData(const std::string &topic, pcl::PointCloud<pcl::PointXYZI>::Ptr &cloud) {
- std::lock_guard<std::mutex> lck(m_cloud_mutex);
- // 检测时间戳是否超时
- auto iter_time = m_cloud_time_stamp_map.find(topic);
- if (iter_time == m_cloud_time_stamp_map.end()) {
- return false;
- }
- std::chrono::duration<double> diff = std::chrono::steady_clock::now() - iter_time->second;
- if (diff.count() > 1) {
- LOG(WARNING) << "cloud not update, please re-check lidar mqtt statu.";
- return false;
- }
- // 检测点云数据是否存在
- auto iter = m_full_clouds_map.find(topic);
- if (iter == m_full_clouds_map.end()) {
- LOG(WARNING) << "find topic " << topic << " in m_cloud_map error, please re-check save data code.";
- return false;
- }
- // 获取点云输据
- pcl::PointXYZI pt;
- for (auto &point: iter->second->points) {
- pt.x = point.x;
- pt.y = point.y;
- pt.z = point.z;
- pt.intensity = point.intensity;
- cloud->emplace_back(pt);
- }
- return true;
- }
- bool CloudDataMqttAsync::getCloudData(const std::string &topic, pcl::PointCloud<pcl::PointRPRY>::Ptr &cloud) {
- std::lock_guard<std::mutex> lck(m_cloud_mutex);
- // 检测时间戳是否超时
- auto iter_time = m_cloud_time_stamp_map.find(topic);
- if (iter_time == m_cloud_time_stamp_map.end()) {
- return false;
- }
- std::chrono::duration<double> diff = std::chrono::steady_clock::now() - iter_time->second;
- if (diff.count() > 1) {
- LOG(WARNING) << "cloud not update, please re-check lidar mqtt statu.";
- return false;
- }
- // 检测点云数据是否存在
- auto iter = m_full_clouds_map.find(topic);
- if (iter == m_full_clouds_map.end()) {
- LOG(WARNING) << "find topic " << topic << " in m_cloud_map error, please re-check save data code.";
- return false;
- }
- pcl::PointRPRY pt;
- for (auto &point: iter->second->points) {
- pt.pitch = point.pitch;
- pt.yaw = point.yaw;
- pt.roll = point.roll;
- pt.r = point.r;
- cloud->emplace_back(pt);
- }
- return true;
- }
- bool CloudDataMqttAsync::getCloudData(const std::string &topic, pcl::PointCloud<pcl::PointALL>::Ptr &cloud) {
- std::lock_guard<std::mutex> lck(m_cloud_mutex);
- // 检测时间戳是否超时
- auto iter_time = m_cloud_time_stamp_map.find(topic);
- if (iter_time == m_cloud_time_stamp_map.end()) {
- return false;
- }
- std::chrono::duration<double> diff = std::chrono::steady_clock::now() - iter_time->second;
- if (diff.count() > 3) {
- }
- if (diff.count() > 1) {
- LOG(WARNING) << "cloud not update, please re-check lidar mqtt statu.";
- return false;
- }
- // 检测点云数据是否存在
- auto iter = m_full_clouds_map.find(topic);
- if (iter == m_full_clouds_map.end()) {
- LOG(WARNING) << "find topic " << topic << " in m_cloud_map error, please re-check save data code.";
- return false;
- }
- cloud->insert(cloud->end(), iter->second->points.begin(), iter->second->points.end());
- return true;
- }
- bool CloudDataMqttAsync::insertCloudData(std::string &topic, unsigned char *cloud_data, int size) {
- pcl::PointCloud<pcl::PointALL>::Ptr clouds(new pcl::PointCloud<pcl::PointALL>);
- // 解压数据,获得点云
- m_unpacket->unpacket((unsigned char *) cloud_data, size, clouds);
- std::lock_guard<std::mutex> lck(m_cloud_mutex);
- auto iter_full_cloud = m_full_clouds_map.find(topic);
- if (iter_full_cloud == m_full_clouds_map.end()) {
- m_full_clouds_map.insert(
- std::pair<std::string, pcl::PointCloud<pcl::PointALL>::Ptr>(topic, new pcl::PointCloud<pcl::PointALL>));
- iter_full_cloud = m_full_clouds_map.find(topic);
- }
- auto iter_cloud_time_stamp = m_cloud_time_stamp_map.find(topic);
- if (iter_cloud_time_stamp == m_cloud_time_stamp_map.end()) {
- m_cloud_time_stamp_map.insert(
- std::pair<std::string, std::chrono::steady_clock::time_point>(topic, std::chrono::steady_clock::now()));
- iter_cloud_time_stamp = m_cloud_time_stamp_map.find(topic);
- }
- iter_full_cloud->second->swap(*clouds);
- iter_cloud_time_stamp->second = std::chrono::steady_clock::now();
- return true;
- }
|