async_client.cpp 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. #include "async_client.h"
  2. /**
  3. * 构造函数
  4. * */
  5. Async_Client::Async_Client(boost::asio::io_service &io_service, boost::asio::ip::tcp::endpoint endpoint, fundata_t fundata_, void *p)
  6. : iosev(io_service),
  7. socket(iosev),
  8. m_ep(endpoint),
  9. mb_connected(0),
  10. mb_initialized(0),
  11. mb_with_reconnection(0),
  12. mb_exit(0),
  13. m_fundata(fundata_),
  14. mp_handle(p),
  15. m_connection_check_thread(0)
  16. {
  17. memset(m_origin_data_, 0, MAX_LENGTH);
  18. }
  19. /**
  20. * 析构函数
  21. * */
  22. Async_Client::~Async_Client()
  23. {
  24. mp_handle = 0;
  25. }
  26. /**
  27. * 连接检查线程函数, 读超时则主动断开连接,读回调函数将收到相应boost错误码,从而启动重连
  28. * */
  29. void Async_Client::connection_check(Async_Client *p)
  30. {
  31. // 1. 检验参数合法性
  32. if (p == 0)
  33. return;
  34. while (!p->mb_exit)
  35. {
  36. if (p->client_initialize_status() && p->client_connection_status())
  37. {
  38. // 判断距上次读取到数据超时时间,过长则主动断开并重连
  39. int duration = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - p->m_last_read_time).count();
  40. if (duration > READ_TIMEOUT_MILLISECONDS)
  41. {
  42. LOG(WARNING) << "connection check thread found read timeout, close socket and try to reinitialize/reconnect.";
  43. p->mb_connected = false;
  44. p->socket_close();
  45. }
  46. // 以超时时间作为检查连接状态的时间间隔
  47. usleep(1000 * READ_TIMEOUT_MILLISECONDS);
  48. }
  49. }
  50. }
  51. /**
  52. * 异步写入
  53. * */
  54. void Async_Client::client_async_write(char *buf, int len)
  55. {
  56. boost::asio::async_write(socket,
  57. boost::asio::buffer(buf, len),
  58. boost::bind(&Async_Client::handle_write, this,
  59. boost::asio::placeholders::error));
  60. }
  61. /**
  62. * 异步读取
  63. * */
  64. void Async_Client::client_async_read()
  65. {
  66. socket.async_read_some(boost::asio::buffer(m_origin_data_, MAX_LENGTH),
  67. boost::bind(&Async_Client::handle_read, this,
  68. boost::asio::placeholders::error,
  69. boost::asio::placeholders::bytes_transferred));
  70. }
  71. /**
  72. * 返回连接状态
  73. * */
  74. bool Async_Client::client_connection_status()
  75. {
  76. return mb_connected;
  77. }
  78. /**
  79. * 返回初始化状态
  80. * */
  81. bool Async_Client::client_initialize_status()
  82. {
  83. return mb_initialized;
  84. }
  85. /**
  86. * 异步连接回调, 失败后重连
  87. * */
  88. void Async_Client::handle_connect(const boost::system::error_code &error)
  89. {
  90. if (mb_exit)
  91. return;
  92. if (!error)
  93. {
  94. LOG(INFO) << " Connection Successful! " << m_ep.address().to_string() << ":" << m_ep.port();
  95. mb_connected = true;
  96. // 连接成功后开启异步读
  97. this->client_async_read();
  98. }
  99. else
  100. {
  101. LOG(WARNING) << "handle_connect, " << boost::system::system_error(error).what() << ", waiting for reconnection " << m_ep.address().to_string() << ":" << m_ep.port();
  102. socket_reconnect();
  103. }
  104. }
  105. /**
  106. * 异步读取回调, 失败后重连
  107. * */
  108. void Async_Client::handle_read(const boost::system::error_code &error,
  109. size_t bytes_transferred)
  110. {
  111. if (mb_exit)
  112. return;
  113. m_last_read_time = std::chrono::steady_clock::now();
  114. if (!error)
  115. {
  116. // LOG(INFO) << "handle read no error";
  117. //printf("From %s Received data Len:%d\n",this->ep.address().to_string().c_str(),
  118. // bytes_transferred);
  119. if (mp_handle != 0)
  120. {
  121. // 调用外部回调处理函数后再次进入异步读
  122. // LOG(INFO) << "client handle read call callback func, data length: " << bytes_transferred << ", strlen: " << m_origin_data_;
  123. (*m_fundata)(m_origin_data_, bytes_transferred, mp_handle);
  124. memset(m_origin_data_, 0, MAX_LENGTH);
  125. this->client_async_read();
  126. }
  127. else
  128. {
  129. LOG(ERROR) << "async client, handle null pointer" << m_ep.address().to_string() << ":" << m_ep.port();
  130. }
  131. }
  132. else
  133. {
  134. LOG(WARNING) << "handle_read, " << boost::system::system_error(error).what() << ", waiting for reconnection " << m_ep.address().to_string() << ":" << m_ep.port();
  135. socket_reconnect();
  136. }
  137. }
  138. /**
  139. * 异步写入回调, 失败后重连
  140. * */
  141. void Async_Client::handle_write(const boost::system::error_code &error)
  142. {
  143. if (mb_exit)
  144. return;
  145. if (!error)
  146. {
  147. // LOG(INFO) << "handle write no error";
  148. }
  149. else
  150. {
  151. LOG(WARNING) << "handle_write, " << boost::system::system_error(error).what() << ", waiting for reconnection " << m_ep.address().to_string() << ":" << m_ep.port();
  152. socket_reconnect();
  153. }
  154. }
  155. /**
  156. * 关闭连接
  157. * */
  158. void Async_Client::socket_close()
  159. {
  160. socket.close();
  161. // LOG(INFO) << "client close";
  162. }
  163. /**
  164. * 开始连接
  165. * */
  166. void Async_Client::socket_connect()
  167. {
  168. socket.async_connect(m_ep,
  169. boost::bind(&Async_Client::handle_connect, this, boost::asio::placeholders::error));
  170. // LOG(INFO) << "client connect";
  171. }
  172. void Async_Client::socket_reconnect()
  173. {
  174. // LOG(INFO) << "trying to reconnect.";
  175. mb_connected = false;
  176. socket_close();
  177. if (mb_with_reconnection)
  178. {
  179. socket_connect();
  180. usleep(1000 * 1000);
  181. }
  182. }
  183. /**
  184. * 初始化, 参数为是否开启重连功能
  185. * */
  186. bool Async_Client::initialize(bool with_reconnection)
  187. {
  188. m_last_read_time = std::chrono::steady_clock::now();
  189. mb_with_reconnection = with_reconnection;
  190. socket_connect();
  191. m_connection_check_thread = new std::thread(&connection_check, this);
  192. mb_initialized = true;
  193. LOG(INFO) << "client initialized";
  194. return mb_initialized;
  195. }
  196. /**
  197. * 客户端关闭
  198. * */
  199. bool Async_Client::close()
  200. {
  201. mb_exit = true;
  202. if (m_connection_check_thread != 0)
  203. {
  204. if (m_connection_check_thread->joinable())
  205. {
  206. m_connection_check_thread->join();
  207. }
  208. delete m_connection_check_thread;
  209. m_connection_check_thread = 0;
  210. }
  211. socket_close();
  212. return true;
  213. }