#include "async_client.h" // 构造 Async_Client::Async_Client() : m_socket(m_io_service) { m_communication_status = E_UNKNOWN; //通信状态 m_is_reconnection_flag = false; // 断线是否重连的标志位, true:断线自动重连, false:断线不重连, 保持断线状态. mp_thread_receive = nullptr; // 接受线程, 内存由本模块管理 mp_thread_check_connect = nullptr; // 检查线程, 内存由本模块管理 m_check_connect_wait_time_ms = 0; // 检查连接线程 的等待时间 m_data_updata_time = std::chrono::system_clock::now() - std::chrono::hours(1); memset(m_data_buf, 0, DATA_LENGTH_MAX); mp_communication_data_queue = nullptr; } /** * 析构函数 * */ Async_Client::~Async_Client() { uninit(); } //初始化, 默认重连 Error_manager Async_Client::init(const std::string& ip, int port, Thread_safe_queue *p_communication_data_queue, bool is_reconnection_flag) { if (p_communication_data_queue == nullptr) { return {Error_code::POINTER_IS_NULL, Error_level::MINOR_ERROR, " POINTER IS NULL "}; } m_is_reconnection_flag = is_reconnection_flag; m_check_connect_wait_time_ms = 0; // 检查连接线程 的等待时间, 第一次直接通过 boost::asio::ip::tcp::endpoint t_ep(boost::asio::ip::address_v4::from_string(ip), port); m_ep = t_ep; mp_communication_data_queue = p_communication_data_queue; //接受线程, 默认等待 m_condition_receive.reset(false, false, false); mp_thread_receive = new std::thread(&Async_Client::thread_receive, this); //检查连接线程, 默认等待, 内部的wait_for会让其通过 m_condition_check_connect.reset(false, false, false); mp_thread_check_connect = new std::thread(&Async_Client::thread_check_connect, this); return Error_code::SUCCESS; } //反初始化 Error_manager Async_Client::uninit() { //注注注注注意了, 这里一定要在关闭通信线程之前, 关闭socket, //因为异步操作自带一个线程, 这个线程可能会卡住我们的线程... //关闭通信 m_socket.close(); //杀死2个线程,强制退出 if (mp_thread_receive) { m_condition_receive.kill_all(); } if (mp_thread_check_connect) { m_condition_check_connect.kill_all(); } //回收2个线程的资源 if (mp_thread_receive) { mp_thread_receive->join(); delete mp_thread_receive; mp_thread_receive = nullptr; } if (mp_thread_check_connect) { mp_thread_check_connect->join(); delete mp_thread_check_connect; mp_thread_check_connect = nullptr; } m_io_service.stop(); mp_communication_data_queue = nullptr; if (m_communication_status != E_FAULT) { m_communication_status = E_UNKNOWN; } return Error_code::SUCCESS; } //检查状态 Error_manager Async_Client::check_status() { if (m_communication_status == E_READY) { return Error_code::SUCCESS; } else if (m_communication_status == E_UNKNOWN) { return {Error_code::WJ_LIDAR_COMMUNICATION_UNINITIALIZED, Error_level::MINOR_ERROR, " Async_Client::check_status() error "}; } else if (m_communication_status == E_DISCONNECT) { return {Error_code::WJ_LIDAR_COMMUNICATION_DISCONNECT, Error_level::MINOR_ERROR, " Async_Client::check_status() error "}; } else { return {Error_code::WJ_LIDAR_COMMUNICATION_FAULT, Error_level::MINOR_ERROR, " Async_Client::check_status() error "}; } } //判断是否正常 bool Async_Client::is_ready() { return (m_communication_status == E_READY); } //获取状态 Async_Client::Communication_status Async_Client::get_status() { return m_communication_status; } //线程接受函数,包括连接,重连和接受数据 void Async_Client::thread_receive() { LOG(INFO) << " ---Async_Client::thread_receive start ---" << this; //接受雷达消息,包括连接,重连和接受数据 while (m_condition_receive.is_alive()) { m_condition_receive.wait(); if (m_condition_receive.is_alive()) { std::this_thread::yield(); // 连接成功后开启异步读 client_async_read(); } } LOG(INFO) << " Async_Client::thread_receive end :" << this; } //线程检查连接函数,进行连接, 并检查消息是否按时更新, 如果超时, 那么触发重连 void Async_Client::thread_check_connect() { LOG(INFO) << " ---Async_Client::thread_check_connect start ---" << this; //检查连接, 连接, 重连, 判断时间 while (m_condition_check_connect.is_alive()) { m_condition_check_connect.wait_for_millisecond(m_check_connect_wait_time_ms); if (m_condition_check_connect.is_alive()) { std::this_thread::yield(); switch ((Communication_status) m_communication_status) { case E_UNKNOWN: { //连接 socket_connect(); break; } case E_DISCONNECT: { //重连 socket_reconnect(); break; } case E_READY: { //检查 socket_check(); break; } default: { break; } } } } LOG(INFO) << " Async_Client::thread_check_connect end :" << this; } // 开启连接 void Async_Client::socket_connect() { m_io_service.reset(); //开启异步连接, 使用m_ep里面的ip和port进行tcp异步连接 // 只有在run()之后, 才会真正的执行连接函数 m_socket.async_connect(m_ep, boost::bind(&Async_Client::handle_connect, this, boost::asio::placeholders::error)); //注注注注注注意了:async_connect是阻塞函数, 如果连接不上就会一直卡在这, 而不是通过之后, 回调返回错误码. //这个函数大约1分钟之后就会通过, 并调用里面的回调函数,返回我们错误码. //如果硬件恢复连接, 那么这个函数会立即连上, //如果我们中途退出, 一定要 m_socket.close()退出异步通信他自己后台的线程, 防止这里卡死, 影响我们的线程关闭........ m_io_service.run(); } //关闭连接 void Async_Client::socket_close() { m_socket.close(); } // 重新连接 void Async_Client::socket_reconnect() { if (m_is_reconnection_flag) { //关闭原有的连接 socket_close(); // 开启连接 socket_connect(); } } // 检查连接 void Async_Client::socket_check() { // 判断距上次读取到数据超时时间,过长则主动断开并重连 int duration = std::chrono::duration_cast( std::chrono::system_clock::now() - m_data_updata_time).count(); if (duration > READ_TIMEOUT_MILLISECONDS) { LOG(WARNING) << "connection check thread found read timeout, close socket and try to reinitialize/reconnect."; // 通信超时后, 修改状态为断连, m_communication_status = E_DISCONNECT; //检查连接等待时间变为 RECONNECTION_WAIT_TIME_MS, m_check_connect_wait_time_ms = RECONNECTION_WAIT_TIME_MS; //关闭接受线程 m_condition_receive.notify_all(false); } } // 异步写入 void Async_Client::client_async_write(char *buf, int len) { m_io_service.run(); boost::asio::async_write(m_socket, boost::asio::buffer(buf, len), boost::bind(&Async_Client::handle_write, this, boost::asio::placeholders::error)); m_io_service.run(); } // 异步读取 void Async_Client::client_async_read() { //异步读取, 在run()之后, 会自动接受数据, 并存入mp_data_buf里面, // 然后调用回调函数handle_read(), bytes_transferred是返回的数据有效长度 m_io_service.reset(); m_socket.async_read_some(boost::asio::buffer(m_data_buf, DATA_LENGTH_MAX), boost::bind(&Async_Client::handle_read, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); //注:async_read_some, 如果断连, 那么就会回调handle_read, 并填充错误码 boost::asio::placeholders::error m_io_service.run(); } //异步连接回调, 失败后重连 void Async_Client::handle_connect(const boost::system::error_code &error) { if (!error) { LOG(INFO) << "Async_Client::handle_connect Connection Successful! " << m_ep.address().to_string() << ":" << m_ep.port(); // 连接成功后修改状态为正常待机, m_communication_status = E_READY; //检查连接等待时间变为 CHECK_WAIT_TIME_MS, m_check_connect_wait_time_ms = CHECK_WAIT_TIME_MS; //唤醒接受线程 m_condition_receive.notify_all(true); } else { LOG(WARNING) << "connect failed, " << boost::system::system_error(error).what() << ", waiting for reconnection " << m_ep.address().to_string() << ":" << m_ep.port(); // 连接失败后修改状态为断连, m_communication_status = E_DISCONNECT; //检查连接等待时间变为 RECONNECTION_WAIT_TIME_MS, m_check_connect_wait_time_ms = RECONNECTION_WAIT_TIME_MS; //关闭接受线程 m_condition_receive.notify_all(false); } } //异步读取回调, 失败后重连. bytes_transferred是返回的数据有效长度 void Async_Client::handle_read(const boost::system::error_code &error, size_t bytes_transferred) { if (!error) { Binary_buf *tp_binary_buf = new Binary_buf(m_data_buf, bytes_transferred); if (mp_communication_data_queue != nullptr) { //将数据添加到队列中, 然后内存权限转交给队列. mp_communication_data_queue->push(tp_binary_buf); } //更新时间 m_data_updata_time = std::chrono::system_clock::now(); } else { LOG(WARNING) << "handle_read, " << boost::system::system_error(error).what() << ", waiting for reconnection " << m_ep.address().to_string() << ":" << m_ep.port(); // 读取失败后修改状态为断连, m_communication_status = E_DISCONNECT; //检查连接等待时间变为 RECONNECTION_WAIT_TIME_MS, m_check_connect_wait_time_ms = RECONNECTION_WAIT_TIME_MS; //关闭接受线程 m_condition_receive.notify_all(false); } } //异步写入回调, 失败后重连 void Async_Client::handle_write(const boost::system::error_code &error) { if (!error) { //发送成功, 暂时不做处理 // LOG(INFO) << "handle write no error"; } else { LOG(WARNING) << "handle_write, " << boost::system::system_error(error).what() << ", waiting for reconnection " << m_ep.address().to_string() << ":" << m_ep.port(); // 读取失败后修改状态为断连, m_communication_status = E_DISCONNECT; //检查连接等待时间变为 RECONNECTION_WAIT_TIME_MS, m_check_connect_wait_time_ms = RECONNECTION_WAIT_TIME_MS; //关闭接受线程 m_condition_receive.notify_all(false); } }