123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374 |
- #include "async_client.h"
- // 构造
- Async_Client::Async_Client() : m_socket(m_io_service)
- {
- m_communication_status = E_UNKNOWN; //通信状态
- m_is_reconnection_flag = false; // 断线是否重连的标志位, true:断线自动重连, false:断线不重连, 保持断线状态.
- mp_thread_receive = NULL; // 接受线程, 内存由本模块管理
- mp_thread_check_connect = NULL; // 检查线程, 内存由本模块管理
- m_check_connect_wait_time_ms = 0; // 检查连接线程 的等待时间
- m_data_updata_time = std::chrono::system_clock::now() - std::chrono::hours(1);
- memset(m_data_buf, 0, DATA_LENGTH_MAX);
- mp_communication_data_queue = NULL;
- }
- /**
- * 析构函数
- * */
- Async_Client::~Async_Client()
- {
- uninit();
- }
- //初始化, 默认重连
- Error_manager Async_Client::init(std::string ip, int port, Thread_safe_queue<Binary_buf*>* p_communication_data_queue,
- bool is_reconnection_flag )
- {
- if ( p_communication_data_queue == NULL )
- {
- return Error_manager(Error_code::POINTER_IS_NULL, Error_level::MINOR_ERROR,
- " POINTER IS NULL ");
- }
- m_is_reconnection_flag = is_reconnection_flag;
- m_check_connect_wait_time_ms = 0; // 检查连接线程 的等待时间, 第一次直接通过
- boost::asio::ip::tcp::endpoint t_ep(boost::asio::ip::address_v4::from_string(ip), port);
- m_ep = t_ep;
- mp_communication_data_queue = p_communication_data_queue;
- //接受线程, 默认等待
- m_condition_receive.reset(false, false, false);
- mp_thread_receive = new std::thread(&Async_Client::thread_receive, this);
- //检查连接线程, 默认等待, 内部的wait_for会让其通过
- m_condition_check_connect.reset(false, false, false);
- mp_thread_check_connect = new std::thread(&Async_Client::thread_check_connect, this);
- return Error_code::SUCCESS;
- }
- //反初始化
- Error_manager Async_Client::uninit()
- {
- //注注注注注意了, 这里一定要在关闭通信线程之前, 关闭socket,
- //因为异步操作自带一个线程, 这个线程可能会卡住我们的线程...
- //关闭通信
- m_socket.close();
-
- //杀死2个线程,强制退出
- if (mp_thread_receive)
- {
- m_condition_receive.kill_all();
- }
- if (mp_thread_check_connect)
- {
- m_condition_check_connect.kill_all();
- }
- //回收2个线程的资源
- if (mp_thread_receive)
- {
- mp_thread_receive->join();
- delete mp_thread_receive;
- mp_thread_receive = NULL;
- }
- if (mp_thread_check_connect)
- {
- mp_thread_check_connect->join();
- delete mp_thread_check_connect;
- mp_thread_check_connect = NULL;
- }
-
- m_io_service.stop();
- mp_communication_data_queue = NULL;
- if ( m_communication_status != E_FAULT )
- {
- m_communication_status = E_UNKNOWN;
- }
- return Error_code::SUCCESS;
- }
- //检查状态
- Error_manager Async_Client::check_status()
- {
- if ( m_communication_status == E_READY )
- {
- return Error_code::SUCCESS;
- }
- else if(m_communication_status == E_UNKNOWN)
- {
- return Error_manager(Error_code::WJ_LIDAR_COMMUNICATION_UNINITIALIZED, Error_level::MINOR_ERROR,
- " Async_Client::check_status() error ");
- }
- else if(m_communication_status == E_DISCONNECT)
- {
- return Error_manager(Error_code::WJ_LIDAR_COMMUNICATION_DISCONNECT, Error_level::MINOR_ERROR,
- " Async_Client::check_status() error ");
- }
- else
- {
- return Error_manager(Error_code::WJ_LIDAR_COMMUNICATION_FAULT, Error_level::MINOR_ERROR,
- " Async_Client::check_status() error ");
- }
- }
- //判断是否正常
- bool Async_Client::is_ready()
- {
- return (m_communication_status == E_READY);
- }
- //获取状态
- Async_Client::Communication_status Async_Client::get_status()
- {
- return m_communication_status;
- }
- //线程接受函数,包括连接,重连和接受数据
- void Async_Client::thread_receive()
- {
- LOG(INFO) << " ---Async_Client::thread_receive start ---"<< this;
- //接受雷达消息,包括连接,重连和接受数据
- while (m_condition_receive.is_alive())
- {
- m_condition_receive.wait();
- if (m_condition_receive.is_alive())
- {
- std::this_thread::yield();
- // 连接成功后开启异步读
- client_async_read();
- }
- }
- LOG(INFO) << " Async_Client::thread_receive end :"<<this;
- return;
- }
- //线程检查连接函数,进行连接, 并检查消息是否按时更新, 如果超时, 那么触发重连
- void Async_Client::thread_check_connect()
- {
- LOG(INFO) << " ---Async_Client::thread_check_connect start ---"<< this;
- //检查连接, 连接, 重连, 判断时间
- while (m_condition_check_connect.is_alive())
- {
- m_condition_check_connect.wait_for_millisecond(m_check_connect_wait_time_ms);
- if (m_condition_check_connect.is_alive())
- {
- std::this_thread::yield();
- switch ( (Communication_status)m_communication_status )
- {
- case E_UNKNOWN:
- {
- //连接
- socket_connect();
- break;
- }
- case E_DISCONNECT:
- {
- //重连
- socket_reconnect();
- break;
- }
- case E_READY:
- {
- //检查
- socket_check();
- break;
- }
- default:
- {
- break;
- }
- }
- }
- }
- LOG(INFO) << " Async_Client::thread_check_connect end :"<<this;
- return;
- }
- // 开启连接
- void Async_Client::socket_connect()
- {
- m_io_service.reset();
- //开启异步连接, 使用m_ep里面的ip和port进行tcp异步连接
- // 只有在run()之后, 才会真正的执行连接函数
- m_socket.async_connect(m_ep, boost::bind(&Async_Client::handle_connect, this, boost::asio::placeholders::error));
- //注注注注注注意了:async_connect是阻塞函数, 如果连接不上就会一直卡在这, 而不是通过之后, 回调返回错误码.
- //这个函数大约1分钟之后就会通过, 并调用里面的回调函数,返回我们错误码.
- //如果硬件恢复连接, 那么这个函数会立即连上,
- //如果我们中途退出, 一定要 m_socket.close()退出异步通信他自己后台的线程, 防止这里卡死, 影响我们的线程关闭........
- m_io_service.run();
- return;
- }
- //关闭连接
- void Async_Client::socket_close()
- {
- m_socket.close();
- return;
- }
- // 重新连接
- void Async_Client::socket_reconnect()
- {
- if (m_is_reconnection_flag)
- {
- //关闭原有的连接
- socket_close();
- // 开启连接
- socket_connect();
- }
- //否则就在这无限等待下去
- return;
- }
- // 检查连接
- void Async_Client::socket_check()
- {
- // 判断距上次读取到数据超时时间,过长则主动断开并重连
- int duration = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - m_data_updata_time).count();
- if (duration > READ_TIMEOUT_MILLISECONDS)
- {
- LOG(WARNING) << "connection check thread found read timeout, close socket and try to reinitialize/reconnect.";
- // 通信超时后, 修改状态为断连,
- m_communication_status = E_DISCONNECT;
- //检查连接等待时间变为 RECONNECTION_WAIT_TIME_MS,
- m_check_connect_wait_time_ms = RECONNECTION_WAIT_TIME_MS;
- //关闭接受线程
- m_condition_receive.notify_all(false);
- }
- //否则不做处理
- return;
- }
- // 异步写入
- void Async_Client::client_async_write(char *buf, int len)
- {
- m_io_service.run();
- boost::asio::async_write(m_socket,
- boost::asio::buffer(buf, len),
- boost::bind(&Async_Client::handle_write, this,
- boost::asio::placeholders::error));
- m_io_service.run();
- return;
- }
- // 异步读取
- void Async_Client::client_async_read()
- {
- //异步读取, 在run()之后, 会自动接受数据, 并存入mp_data_buf里面,
- // 然后调用回调函数handle_read(), bytes_transferred是返回的数据有效长度
- m_io_service.reset();
- m_socket.async_read_some(boost::asio::buffer(m_data_buf, DATA_LENGTH_MAX),
- boost::bind(&Async_Client::handle_read, this,
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred));
- //注:async_read_some, 如果断连, 那么就会回调handle_read, 并填充错误码 boost::asio::placeholders::error
- m_io_service.run();
- return;
- }
- //异步连接回调, 失败后重连
- void Async_Client::handle_connect(const boost::system::error_code &error)
- {
- if (!error)
- {
- LOG(INFO) << "Async_Client::handle_connect Connection Successful! " << m_ep.address().to_string() << ":" << m_ep.port();
- // 连接成功后修改状态为正常待机,
- m_communication_status = E_READY;
- //检查连接等待时间变为 CHECK_WAIT_TIME_MS,
- m_check_connect_wait_time_ms = CHECK_WAIT_TIME_MS;
- //唤醒接受线程
- m_condition_receive.notify_all(true);
- }
- else
- {
- LOG(WARNING) << "connect failed, " << boost::system::system_error(error).what() << ", waiting for reconnection " << m_ep.address().to_string() << ":" << m_ep.port();
- // 连接失败后修改状态为断连,
- m_communication_status = E_DISCONNECT;
- //检查连接等待时间变为 RECONNECTION_WAIT_TIME_MS,
- m_check_connect_wait_time_ms = RECONNECTION_WAIT_TIME_MS;
- //关闭接受线程
- m_condition_receive.notify_all(false);
- }
- return;
- }
- //异步读取回调, 失败后重连. bytes_transferred是返回的数据有效长度
- void Async_Client::handle_read(const boost::system::error_code &error,
- size_t bytes_transferred)
- {
- if (!error)
- {
- Binary_buf * tp_binary_buf = new Binary_buf(m_data_buf, bytes_transferred);
- if ( mp_communication_data_queue != NULL )
- {
- //将数据添加到队列中, 然后内存权限转交给队列.
- mp_communication_data_queue->push(tp_binary_buf);
- }
- //更新时间
- m_data_updata_time = std::chrono::system_clock::now();
- }
- else
- {
- LOG(WARNING) << "handle_read, " << boost::system::system_error(error).what() << ", waiting for reconnection " << m_ep.address().to_string() << ":" << m_ep.port();
- // 读取失败后修改状态为断连,
- m_communication_status = E_DISCONNECT;
- //检查连接等待时间变为 RECONNECTION_WAIT_TIME_MS,
- m_check_connect_wait_time_ms = RECONNECTION_WAIT_TIME_MS;
- //关闭接受线程
- m_condition_receive.notify_all(false);
- }
- return;
- }
- //异步写入回调, 失败后重连
- void Async_Client::handle_write(const boost::system::error_code &error)
- {
- if (!error)
- {
- //发送成功, 暂时不做处理
- // LOG(INFO) << "handle write no error";
- }
- else
- {
- LOG(WARNING) << "handle_write, " << boost::system::system_error(error).what() << ", waiting for reconnection " << m_ep.address().to_string() << ":" << m_ep.port();
- // 读取失败后修改状态为断连,
- m_communication_status = E_DISCONNECT;
- //检查连接等待时间变为 RECONNECTION_WAIT_TIME_MS,
- m_check_connect_wait_time_ms = RECONNECTION_WAIT_TIME_MS;
- //关闭接受线程
- m_condition_receive.notify_all(false);
- }
- return;
- }
|