123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313 |
- #include "async_client.h"
- // 构造
- Async_Client::Async_Client() : m_socket(m_io_service) {
- m_communication_status = E_UNKNOWN; //通信状态
- m_is_reconnection_flag = false; // 断线是否重连的标志位, true:断线自动重连, false:断线不重连, 保持断线状态.
- mp_thread_receive = nullptr; // 接受线程, 内存由本模块管理
- mp_thread_check_connect = nullptr; // 检查线程, 内存由本模块管理
- m_check_connect_wait_time_ms = 0; // 检查连接线程 的等待时间
- m_data_updata_time = std::chrono::system_clock::now() - std::chrono::hours(1);
- memset(m_data_buf, 0, DATA_LENGTH_MAX);
- mp_communication_data_queue = nullptr;
- }
- /**
- * 析构函数
- * */
- Async_Client::~Async_Client() {
- uninit();
- }
- //初始化, 默认重连
- Error_manager Async_Client::init(const std::string& ip, int port, Thread_safe_queue<Binary_buf *> *p_communication_data_queue,
- bool is_reconnection_flag) {
- if (p_communication_data_queue == nullptr) {
- return {Error_code::POINTER_IS_NULL, Error_level::MINOR_ERROR,
- " POINTER IS NULL "};
- }
- m_is_reconnection_flag = is_reconnection_flag;
- m_check_connect_wait_time_ms = 0; // 检查连接线程 的等待时间, 第一次直接通过
- boost::asio::ip::tcp::endpoint t_ep(boost::asio::ip::address_v4::from_string(ip), port);
- m_ep = t_ep;
- mp_communication_data_queue = p_communication_data_queue;
- //接受线程, 默认等待
- m_condition_receive.reset(false, false, false);
- mp_thread_receive = new std::thread(&Async_Client::thread_receive, this);
- //检查连接线程, 默认等待, 内部的wait_for会让其通过
- m_condition_check_connect.reset(false, false, false);
- mp_thread_check_connect = new std::thread(&Async_Client::thread_check_connect, this);
- return Error_code::SUCCESS;
- }
- //反初始化
- Error_manager Async_Client::uninit() {
- //注注注注注意了, 这里一定要在关闭通信线程之前, 关闭socket,
- //因为异步操作自带一个线程, 这个线程可能会卡住我们的线程...
- //关闭通信
- m_socket.close();
- //杀死2个线程,强制退出
- if (mp_thread_receive) {
- m_condition_receive.kill_all();
- }
- if (mp_thread_check_connect) {
- m_condition_check_connect.kill_all();
- }
- //回收2个线程的资源
- if (mp_thread_receive) {
- mp_thread_receive->join();
- delete mp_thread_receive;
- mp_thread_receive = nullptr;
- }
- if (mp_thread_check_connect) {
- mp_thread_check_connect->join();
- delete mp_thread_check_connect;
- mp_thread_check_connect = nullptr;
- }
- m_io_service.stop();
- mp_communication_data_queue = nullptr;
- if (m_communication_status != E_FAULT) {
- m_communication_status = E_UNKNOWN;
- }
- return Error_code::SUCCESS;
- }
- //检查状态
- Error_manager Async_Client::check_status() {
- if (m_communication_status == E_READY) {
- return Error_code::SUCCESS;
- } else if (m_communication_status == E_UNKNOWN) {
- return {Error_code::WJ_LIDAR_COMMUNICATION_UNINITIALIZED, Error_level::MINOR_ERROR,
- " Async_Client::check_status() error "};
- } else if (m_communication_status == E_DISCONNECT) {
- return {Error_code::WJ_LIDAR_COMMUNICATION_DISCONNECT, Error_level::MINOR_ERROR,
- " Async_Client::check_status() error "};
- } else {
- return {Error_code::WJ_LIDAR_COMMUNICATION_FAULT, Error_level::MINOR_ERROR,
- " Async_Client::check_status() error "};
- }
- }
- //判断是否正常
- bool Async_Client::is_ready() {
- return (m_communication_status == E_READY);
- }
- //获取状态
- Async_Client::Communication_status Async_Client::get_status() {
- return m_communication_status;
- }
- //线程接受函数,包括连接,重连和接受数据
- void Async_Client::thread_receive() {
- LOG(INFO) << " ---Async_Client::thread_receive start ---" << this;
- //接受雷达消息,包括连接,重连和接受数据
- while (m_condition_receive.is_alive()) {
- m_condition_receive.wait();
- if (m_condition_receive.is_alive()) {
- std::this_thread::yield();
- // 连接成功后开启异步读
- client_async_read();
- }
- }
- LOG(INFO) << " Async_Client::thread_receive end :" << this;
- }
- //线程检查连接函数,进行连接, 并检查消息是否按时更新, 如果超时, 那么触发重连
- void Async_Client::thread_check_connect() {
- LOG(INFO) << " ---Async_Client::thread_check_connect start ---" << this;
- //检查连接, 连接, 重连, 判断时间
- while (m_condition_check_connect.is_alive()) {
- m_condition_check_connect.wait_for_millisecond(m_check_connect_wait_time_ms);
- if (m_condition_check_connect.is_alive()) {
- std::this_thread::yield();
- switch ((Communication_status) m_communication_status) {
- case E_UNKNOWN: {
- //连接
- socket_connect();
- break;
- }
- case E_DISCONNECT: {
- //重连
- socket_reconnect();
- break;
- }
- case E_READY: {
- //检查
- socket_check();
- break;
- }
- default: {
- break;
- }
- }
- }
- }
- LOG(INFO) << " Async_Client::thread_check_connect end :" << this;
- }
- // 开启连接
- void Async_Client::socket_connect() {
- m_io_service.reset();
- //开启异步连接, 使用m_ep里面的ip和port进行tcp异步连接
- // 只有在run()之后, 才会真正的执行连接函数
- m_socket.async_connect(m_ep, boost::bind(&Async_Client::handle_connect, this, boost::asio::placeholders::error));
- //注注注注注注意了:async_connect是阻塞函数, 如果连接不上就会一直卡在这, 而不是通过之后, 回调返回错误码.
- //这个函数大约1分钟之后就会通过, 并调用里面的回调函数,返回我们错误码.
- //如果硬件恢复连接, 那么这个函数会立即连上,
- //如果我们中途退出, 一定要 m_socket.close()退出异步通信他自己后台的线程, 防止这里卡死, 影响我们的线程关闭........
- m_io_service.run();
- }
- //关闭连接
- void Async_Client::socket_close() {
- m_socket.close();
- }
- // 重新连接
- void Async_Client::socket_reconnect() {
- if (m_is_reconnection_flag) {
- //关闭原有的连接
- socket_close();
- // 开启连接
- socket_connect();
- }
- }
- // 检查连接
- void Async_Client::socket_check() {
- // 判断距上次读取到数据超时时间,过长则主动断开并重连
- int duration = std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::system_clock::now() - m_data_updata_time).count();
- if (duration > READ_TIMEOUT_MILLISECONDS) {
- LOG(WARNING) << "connection check thread found read timeout, close socket and try to reinitialize/reconnect.";
- // 通信超时后, 修改状态为断连,
- m_communication_status = E_DISCONNECT;
- //检查连接等待时间变为 RECONNECTION_WAIT_TIME_MS,
- m_check_connect_wait_time_ms = RECONNECTION_WAIT_TIME_MS;
- //关闭接受线程
- m_condition_receive.notify_all(false);
- }
- }
- // 异步写入
- void Async_Client::client_async_write(char *buf, int len) {
- m_io_service.run();
- boost::asio::async_write(m_socket,
- boost::asio::buffer(buf, len),
- boost::bind(&Async_Client::handle_write, this,
- boost::asio::placeholders::error));
- m_io_service.run();
- }
- // 异步读取
- void Async_Client::client_async_read() {
- //异步读取, 在run()之后, 会自动接受数据, 并存入mp_data_buf里面,
- // 然后调用回调函数handle_read(), bytes_transferred是返回的数据有效长度
- m_io_service.reset();
- m_socket.async_read_some(boost::asio::buffer(m_data_buf, DATA_LENGTH_MAX),
- boost::bind(&Async_Client::handle_read, this,
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred));
- //注:async_read_some, 如果断连, 那么就会回调handle_read, 并填充错误码 boost::asio::placeholders::error
- m_io_service.run();
- }
- //异步连接回调, 失败后重连
- void Async_Client::handle_connect(const boost::system::error_code &error) {
- if (!error) {
- LOG(INFO) << "Async_Client::handle_connect Connection Successful! " << m_ep.address().to_string() << ":"
- << m_ep.port();
- // 连接成功后修改状态为正常待机,
- m_communication_status = E_READY;
- //检查连接等待时间变为 CHECK_WAIT_TIME_MS,
- m_check_connect_wait_time_ms = CHECK_WAIT_TIME_MS;
- //唤醒接受线程
- m_condition_receive.notify_all(true);
- } else {
- LOG(WARNING) << "connect failed, " << boost::system::system_error(error).what() << ", waiting for reconnection "
- << m_ep.address().to_string() << ":" << m_ep.port();
- // 连接失败后修改状态为断连,
- m_communication_status = E_DISCONNECT;
- //检查连接等待时间变为 RECONNECTION_WAIT_TIME_MS,
- m_check_connect_wait_time_ms = RECONNECTION_WAIT_TIME_MS;
- //关闭接受线程
- m_condition_receive.notify_all(false);
- }
- }
- //异步读取回调, 失败后重连. bytes_transferred是返回的数据有效长度
- void Async_Client::handle_read(const boost::system::error_code &error,
- size_t bytes_transferred) {
- if (!error) {
- Binary_buf *tp_binary_buf = new Binary_buf(m_data_buf, bytes_transferred);
- if (mp_communication_data_queue != nullptr) {
- //将数据添加到队列中, 然后内存权限转交给队列.
- mp_communication_data_queue->push(tp_binary_buf);
- }
- //更新时间
- m_data_updata_time = std::chrono::system_clock::now();
- } else {
- LOG(WARNING) << "handle_read, " << boost::system::system_error(error).what() << ", waiting for reconnection "
- << m_ep.address().to_string() << ":" << m_ep.port();
- // 读取失败后修改状态为断连,
- m_communication_status = E_DISCONNECT;
- //检查连接等待时间变为 RECONNECTION_WAIT_TIME_MS,
- m_check_connect_wait_time_ms = RECONNECTION_WAIT_TIME_MS;
- //关闭接受线程
- m_condition_receive.notify_all(false);
- }
- }
- //异步写入回调, 失败后重连
- void Async_Client::handle_write(const boost::system::error_code &error) {
- if (!error) {
- //发送成功, 暂时不做处理
- // LOG(INFO) << "handle write no error";
- } else {
- LOG(WARNING) << "handle_write, " << boost::system::system_error(error).what() << ", waiting for reconnection "
- << m_ep.address().to_string() << ":" << m_ep.port();
- // 读取失败后修改状态为断连,
- m_communication_status = E_DISCONNECT;
- //检查连接等待时间变为 RECONNECTION_WAIT_TIME_MS,
- m_check_connect_wait_time_ms = RECONNECTION_WAIT_TIME_MS;
- //关闭接受线程
- m_condition_receive.notify_all(false);
- }
- }
|