123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228 |
- #include "async_client.h"
- /**
- * 构造函数
- * */
- Async_Client::Async_Client(boost::asio::io_service &io_service, boost::asio::ip::tcp::endpoint endpoint, fundata_t fundata_, void *p)
- : iosev(io_service),
- socket(iosev),
- m_ep(endpoint),
- mb_connected(0),
- mb_initialized(0),
- mb_with_reconnection(0),
- mb_exit(0),
- m_fundata(fundata_),
- mp_handle(p),
- m_connection_check_thread(0)
- {
- memset(m_origin_data_, 0, MAX_LENGTH);
- }
- /**
- * 析构函数
- * */
- Async_Client::~Async_Client()
- {
- mp_handle = 0;
- }
- /**
- * 连接检查线程函数, 读超时则主动断开连接,读回调函数将收到相应boost错误码,从而启动重连
- * */
- void Async_Client::connection_check(Async_Client *p)
- {
- // 1. 检验参数合法性
- if (p == 0)
- return;
- while (!p->mb_exit)
- {
- if (p->client_initialize_status() && p->client_connection_status())
- {
- // 判断距上次读取到数据超时时间,过长则主动断开并重连
- int duration = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - p->m_last_read_time).count();
- if (duration > READ_TIMEOUT_MILLISECONDS)
- {
- LOG(WARNING) << "connection check thread found read timeout, close socket and try to reinitialize/reconnect.";
- p->mb_connected = false;
- p->socket_close();
- }
- // 以超时时间作为检查连接状态的时间间隔
- usleep(1000 * READ_TIMEOUT_MILLISECONDS);
- }
- }
- }
- /**
- * 异步写入
- * */
- void Async_Client::client_async_write(char *buf, int len)
- {
- boost::asio::async_write(socket,
- boost::asio::buffer(buf, len),
- boost::bind(&Async_Client::handle_write, this,
- boost::asio::placeholders::error));
- }
- /**
- * 异步读取
- * */
- void Async_Client::client_async_read()
- {
- socket.async_read_some(boost::asio::buffer(m_origin_data_, MAX_LENGTH),
- boost::bind(&Async_Client::handle_read, this,
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred));
- }
- /**
- * 返回连接状态
- * */
- bool Async_Client::client_connection_status()
- {
- return mb_connected;
- }
- /**
- * 返回初始化状态
- * */
- bool Async_Client::client_initialize_status()
- {
- return mb_initialized;
- }
- /**
- * 异步连接回调, 失败后重连
- * */
- void Async_Client::handle_connect(const boost::system::error_code &error)
- {
- if (mb_exit)
- return;
- if (!error)
- {
- LOG(INFO) << " Connection Successful! " << m_ep.address().to_string() << ":" << m_ep.port();
- mb_connected = true;
- // 连接成功后开启异步读
- this->client_async_read();
- }
- else
- {
- LOG(WARNING) << "handle_connect, " << boost::system::system_error(error).what() << ", waiting for reconnection " << m_ep.address().to_string() << ":" << m_ep.port();
- socket_reconnect();
- }
- }
- /**
- * 异步读取回调, 失败后重连
- * */
- void Async_Client::handle_read(const boost::system::error_code &error,
- size_t bytes_transferred)
- {
- if (mb_exit)
- return;
- m_last_read_time = std::chrono::steady_clock::now();
- if (!error)
- {
- // LOG(INFO) << "handle read no error";
- //printf("From %s Received data Len:%d\n",this->ep.address().to_string().c_str(),
- // bytes_transferred);
- if (mp_handle != 0)
- {
- // 调用外部回调处理函数后再次进入异步读
- // LOG(INFO) << "client handle read call callback func, data length: " << bytes_transferred << ", strlen: " << m_origin_data_;
- (*m_fundata)(m_origin_data_, bytes_transferred, mp_handle);
- memset(m_origin_data_, 0, MAX_LENGTH);
- this->client_async_read();
- }
- else
- {
- LOG(ERROR) << "async client, handle null pointer" << m_ep.address().to_string() << ":" << m_ep.port();
- }
- }
- else
- {
- LOG(WARNING) << "handle_read, " << boost::system::system_error(error).what() << ", waiting for reconnection " << m_ep.address().to_string() << ":" << m_ep.port();
- socket_reconnect();
- }
- }
- /**
- * 异步写入回调, 失败后重连
- * */
- void Async_Client::handle_write(const boost::system::error_code &error)
- {
- if (mb_exit)
- return;
- if (!error)
- {
- // LOG(INFO) << "handle write no error";
- }
- else
- {
- LOG(WARNING) << "handle_write, " << boost::system::system_error(error).what() << ", waiting for reconnection " << m_ep.address().to_string() << ":" << m_ep.port();
- socket_reconnect();
- }
- }
- /**
- * 关闭连接
- * */
- void Async_Client::socket_close()
- {
- socket.close();
- // LOG(INFO) << "client close";
- }
- /**
- * 开始连接
- * */
- void Async_Client::socket_connect()
- {
- socket.async_connect(m_ep,
- boost::bind(&Async_Client::handle_connect, this, boost::asio::placeholders::error));
- // LOG(INFO) << "client connect";
- }
- void Async_Client::socket_reconnect()
- {
- // LOG(INFO) << "trying to reconnect.";
- mb_connected = false;
- socket_close();
- if (mb_with_reconnection)
- {
- socket_connect();
- usleep(1000 * 1000);
- }
- }
- /**
- * 初始化, 参数为是否开启重连功能
- * */
- bool Async_Client::initialize(bool with_reconnection)
- {
- m_last_read_time = std::chrono::steady_clock::now();
- mb_with_reconnection = with_reconnection;
- socket_connect();
- m_connection_check_thread = new std::thread(&connection_check, this);
- mb_initialized = true;
- LOG(INFO) << "client initialized";
- return mb_initialized;
- }
- /**
- * 客户端关闭
- * */
- bool Async_Client::close()
- {
- mb_exit = true;
- if (m_connection_check_thread != 0)
- {
- if (m_connection_check_thread->joinable())
- {
- m_connection_check_thread->join();
- }
- delete m_connection_check_thread;
- m_connection_check_thread = 0;
- }
- socket_close();
- return true;
- }
|