#include "async_client.h" /** * 构造函数 * */ Async_Client::Async_Client(boost::asio::io_service &io_service, boost::asio::ip::tcp::endpoint endpoint, fundata_t fundata_, void *p) : iosev(io_service), socket(iosev), m_ep(endpoint), mb_connected(0), mb_initialized(0), mb_with_reconnection(0), mb_exit(0), m_fundata(fundata_), mp_handle(p), m_connection_check_thread(0) { memset(m_origin_data_, 0, MAX_LENGTH); } /** * 析构函数 * */ Async_Client::~Async_Client() { mp_handle = 0; } /** * 连接检查线程函数, 读超时则主动断开连接,读回调函数将收到相应boost错误码,从而启动重连 * */ void Async_Client::connection_check(Async_Client *p) { // 1. 检验参数合法性 if (p == 0) return; while (!p->mb_exit) { if (p->client_initialize_status() && p->client_connection_status()) { // 判断距上次读取到数据超时时间,过长则主动断开并重连 int duration = std::chrono::duration_cast(std::chrono::steady_clock::now() - p->m_last_read_time).count(); if (duration > READ_TIMEOUT_MILLISECONDS) { LOG(WARNING) << "connection check thread found read timeout, close socket and try to reinitialize/reconnect."; p->mb_connected = false; p->socket_close(); } // 以超时时间作为检查连接状态的时间间隔 usleep(1000 * READ_TIMEOUT_MILLISECONDS); } } } /** * 异步写入 * */ void Async_Client::client_async_write(char *buf, int len) { boost::asio::async_write(socket, boost::asio::buffer(buf, len), boost::bind(&Async_Client::handle_write, this, boost::asio::placeholders::error)); } /** * 异步读取 * */ void Async_Client::client_async_read() { socket.async_read_some(boost::asio::buffer(m_origin_data_, MAX_LENGTH), boost::bind(&Async_Client::handle_read, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); } /** * 返回连接状态 * */ bool Async_Client::client_connection_status() { return mb_connected; } /** * 返回初始化状态 * */ bool Async_Client::client_initialize_status() { return mb_initialized; } /** * 异步连接回调, 失败后重连 * */ void Async_Client::handle_connect(const boost::system::error_code &error) { if (mb_exit) return; if (!error) { LOG(INFO) << " Connection Successful! " << m_ep.address().to_string() << ":" << m_ep.port(); mb_connected = true; // 连接成功后开启异步读 this->client_async_read(); } else { LOG(WARNING) << "handle_connect, " << boost::system::system_error(error).what() << ", waiting for reconnection " << m_ep.address().to_string() << ":" << m_ep.port(); socket_reconnect(); } } /** * 异步读取回调, 失败后重连 * */ void Async_Client::handle_read(const boost::system::error_code &error, size_t bytes_transferred) { if (mb_exit) return; m_last_read_time = std::chrono::steady_clock::now(); if (!error) { // LOG(INFO) << "handle read no error"; //printf("From %s Received data Len:%d\n",this->ep.address().to_string().c_str(), // bytes_transferred); if (mp_handle != 0) { // 调用外部回调处理函数后再次进入异步读 // LOG(INFO) << "client handle read call callback func, data length: " << bytes_transferred << ", strlen: " << m_origin_data_; (*m_fundata)(m_origin_data_, bytes_transferred, mp_handle); memset(m_origin_data_, 0, MAX_LENGTH); this->client_async_read(); } else { LOG(ERROR) << "async client, handle null pointer" << m_ep.address().to_string() << ":" << m_ep.port(); } } else { LOG(WARNING) << "handle_read, " << boost::system::system_error(error).what() << ", waiting for reconnection " << m_ep.address().to_string() << ":" << m_ep.port(); socket_reconnect(); } } /** * 异步写入回调, 失败后重连 * */ void Async_Client::handle_write(const boost::system::error_code &error) { if (mb_exit) return; if (!error) { // LOG(INFO) << "handle write no error"; } else { LOG(WARNING) << "handle_write, " << boost::system::system_error(error).what() << ", waiting for reconnection " << m_ep.address().to_string() << ":" << m_ep.port(); socket_reconnect(); } } /** * 关闭连接 * */ void Async_Client::socket_close() { socket.close(); // LOG(INFO) << "client close"; } /** * 开始连接 * */ void Async_Client::socket_connect() { socket.async_connect(m_ep, boost::bind(&Async_Client::handle_connect, this, boost::asio::placeholders::error)); // LOG(INFO) << "client connect"; } void Async_Client::socket_reconnect() { // LOG(INFO) << "trying to reconnect."; mb_connected = false; socket_close(); if (mb_with_reconnection) { socket_connect(); usleep(1000 * 1000); } } /** * 初始化, 参数为是否开启重连功能 * */ bool Async_Client::initialize(bool with_reconnection) { m_last_read_time = std::chrono::steady_clock::now(); mb_with_reconnection = with_reconnection; socket_connect(); m_connection_check_thread = new std::thread(&connection_check, this); mb_initialized = true; LOG(INFO) << "client initialized"; return mb_initialized; } /** * 客户端关闭 * */ bool Async_Client::close() { mb_exit = true; if (m_connection_check_thread != 0) { if (m_connection_check_thread->joinable()) { m_connection_check_thread->join(); } delete m_connection_check_thread; m_connection_check_thread = 0; } socket_close(); return true; }