#include "async_client.h" // 构造 Async_Client::Async_Client() : m_socket(m_io_service) { m_communication_status = E_UNKNOWN; //通信状态 m_is_reconnection_flag = false; // 断线是否重连的标志位, true:断线自动重连, false:断线不重连, 保持断线状态. mp_thread_receive = NULL; // 接受线程, 内存由本模块管理 mp_thread_check_connect = NULL; // 检查线程, 内存由本模块管理 m_check_connect_wait_time_ms = 0; // 检查连接线程 的等待时间 m_data_updata_time = std::chrono::system_clock::now() - std::chrono::hours(1); memset(m_data_buf, 0, DATA_LENGTH_MAX); mp_communication_data_queue = NULL; } /** * 析构函数 * */ Async_Client::~Async_Client() { uninit(); } //初始化, 默认重连 Error_manager Async_Client::init(std::string ip, int port, Thread_safe_queue* p_communication_data_queue, bool is_reconnection_flag ) { if ( p_communication_data_queue == NULL ) { return Error_manager(Error_code::POINTER_IS_NULL, Error_level::MINOR_ERROR, " POINTER IS NULL "); } m_is_reconnection_flag = is_reconnection_flag; m_check_connect_wait_time_ms = 0; // 检查连接线程 的等待时间, 第一次直接通过 boost::asio::ip::tcp::endpoint t_ep(boost::asio::ip::address_v4::from_string(ip), port); m_ep = t_ep; mp_communication_data_queue = p_communication_data_queue; //接受线程, 默认等待 m_condition_receive.reset(false, false, false); mp_thread_receive = new std::thread(&Async_Client::thread_receive, this); //检查连接线程, 默认等待, 内部的wait_for会让其通过 m_condition_check_connect.reset(false, false, false); mp_thread_check_connect = new std::thread(&Async_Client::thread_check_connect, this); return Error_code::SUCCESS; } //反初始化 Error_manager Async_Client::uninit() { //注注注注注意了, 这里一定要在关闭通信线程之前, 关闭socket, //因为异步操作自带一个线程, 这个线程可能会卡住我们的线程... //关闭通信 m_socket.close(); //杀死2个线程,强制退出 if (mp_thread_receive) { m_condition_receive.kill_all(); } if (mp_thread_check_connect) { m_condition_check_connect.kill_all(); } //回收2个线程的资源 if (mp_thread_receive) { mp_thread_receive->join(); delete mp_thread_receive; mp_thread_receive = NULL; } if (mp_thread_check_connect) { mp_thread_check_connect->join(); delete mp_thread_check_connect; mp_thread_check_connect = NULL; } m_io_service.stop(); mp_communication_data_queue = NULL; if ( m_communication_status != E_FAULT ) { m_communication_status = E_UNKNOWN; } return Error_code::SUCCESS; } //检查状态 Error_manager Async_Client::check_status() { if ( m_communication_status == E_READY ) { return Error_code::SUCCESS; } else if(m_communication_status == E_UNKNOWN) { return Error_manager(Error_code::WJ_LIDAR_COMMUNICATION_UNINITIALIZED, Error_level::MINOR_ERROR, " Async_Client::check_status() error "); } else if(m_communication_status == E_DISCONNECT) { return Error_manager(Error_code::WJ_LIDAR_COMMUNICATION_DISCONNECT, Error_level::MINOR_ERROR, " Async_Client::check_status() error "); } else { return Error_manager(Error_code::WJ_LIDAR_COMMUNICATION_FAULT, Error_level::MINOR_ERROR, " Async_Client::check_status() error "); } } //判断是否正常 bool Async_Client::is_ready() { return (m_communication_status == E_READY); } //获取状态 Async_Client::Communication_status Async_Client::get_status() { return m_communication_status; } //线程接受函数,包括连接,重连和接受数据 void Async_Client::thread_receive() { LOG(INFO) << " ---Async_Client::thread_receive start ---"<< this; //接受雷达消息,包括连接,重连和接受数据 while (m_condition_receive.is_alive()) { m_condition_receive.wait(); if (m_condition_receive.is_alive()) { std::this_thread::yield(); // 连接成功后开启异步读 client_async_read(); } } LOG(INFO) << " Async_Client::thread_receive end :"<(std::chrono::system_clock::now() - m_data_updata_time).count(); if (duration > READ_TIMEOUT_MILLISECONDS) { LOG(WARNING) << "connection check thread found read timeout, close socket and try to reinitialize/reconnect."; // 通信超时后, 修改状态为断连, m_communication_status = E_DISCONNECT; //检查连接等待时间变为 RECONNECTION_WAIT_TIME_MS, m_check_connect_wait_time_ms = RECONNECTION_WAIT_TIME_MS; //关闭接受线程 m_condition_receive.notify_all(false); } //否则不做处理 return; } // 异步写入 void Async_Client::client_async_write(char *buf, int len) { m_io_service.run(); boost::asio::async_write(m_socket, boost::asio::buffer(buf, len), boost::bind(&Async_Client::handle_write, this, boost::asio::placeholders::error)); m_io_service.run(); return; } // 异步读取 void Async_Client::client_async_read() { //异步读取, 在run()之后, 会自动接受数据, 并存入mp_data_buf里面, // 然后调用回调函数handle_read(), bytes_transferred是返回的数据有效长度 m_io_service.reset(); m_socket.async_read_some(boost::asio::buffer(m_data_buf, DATA_LENGTH_MAX), boost::bind(&Async_Client::handle_read, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); //注:async_read_some, 如果断连, 那么就会回调handle_read, 并填充错误码 boost::asio::placeholders::error m_io_service.run(); return; } //异步连接回调, 失败后重连 void Async_Client::handle_connect(const boost::system::error_code &error) { if (!error) { LOG(INFO) << "Async_Client::handle_connect Connection Successful! " << m_ep.address().to_string() << ":" << m_ep.port(); // 连接成功后修改状态为正常待机, m_communication_status = E_READY; //检查连接等待时间变为 CHECK_WAIT_TIME_MS, m_check_connect_wait_time_ms = CHECK_WAIT_TIME_MS; //唤醒接受线程 m_condition_receive.notify_all(true); } else { LOG(WARNING) << "connect failed, " << boost::system::system_error(error).what() << ", waiting for reconnection " << m_ep.address().to_string() << ":" << m_ep.port(); // 连接失败后修改状态为断连, m_communication_status = E_DISCONNECT; //检查连接等待时间变为 RECONNECTION_WAIT_TIME_MS, m_check_connect_wait_time_ms = RECONNECTION_WAIT_TIME_MS; //关闭接受线程 m_condition_receive.notify_all(false); } return; } //异步读取回调, 失败后重连. bytes_transferred是返回的数据有效长度 void Async_Client::handle_read(const boost::system::error_code &error, size_t bytes_transferred) { if (!error) { Binary_buf * tp_binary_buf = new Binary_buf(m_data_buf, bytes_transferred); if ( mp_communication_data_queue != NULL ) { //将数据添加到队列中, 然后内存权限转交给队列. mp_communication_data_queue->push(tp_binary_buf); } //更新时间 m_data_updata_time = std::chrono::system_clock::now(); } else { LOG(WARNING) << "handle_read, " << boost::system::system_error(error).what() << ", waiting for reconnection " << m_ep.address().to_string() << ":" << m_ep.port(); // 读取失败后修改状态为断连, m_communication_status = E_DISCONNECT; //检查连接等待时间变为 RECONNECTION_WAIT_TIME_MS, m_check_connect_wait_time_ms = RECONNECTION_WAIT_TIME_MS; //关闭接受线程 m_condition_receive.notify_all(false); } return; } //异步写入回调, 失败后重连 void Async_Client::handle_write(const boost::system::error_code &error) { if (!error) { //发送成功, 暂时不做处理 // LOG(INFO) << "handle write no error"; } else { LOG(WARNING) << "handle_write, " << boost::system::system_error(error).what() << ", waiting for reconnection " << m_ep.address().to_string() << ":" << m_ep.port(); // 读取失败后修改状态为断连, m_communication_status = E_DISCONNECT; //检查连接等待时间变为 RECONNECTION_WAIT_TIME_MS, m_check_connect_wait_time_ms = RECONNECTION_WAIT_TIME_MS; //关闭接受线程 m_condition_receive.notify_all(false); } return; }