async_client.cpp 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374
  1. #include "async_client.h"
  2. // 构造
  3. Async_Client::Async_Client() : m_socket(m_io_service)
  4. {
  5. m_communication_status = E_UNKNOWN; //通信状态
  6. m_is_reconnection_flag = false; // 断线是否重连的标志位, true:断线自动重连, false:断线不重连, 保持断线状态.
  7. mp_thread_receive = NULL; // 接受线程, 内存由本模块管理
  8. mp_thread_check_connect = NULL; // 检查线程, 内存由本模块管理
  9. m_check_connect_wait_time_ms = 0; // 检查连接线程 的等待时间
  10. m_data_updata_time = std::chrono::system_clock::now() - std::chrono::hours(1);
  11. memset(m_data_buf, 0, DATA_LENGTH_MAX);
  12. mp_communication_data_queue = NULL;
  13. }
  14. /**
  15. * 析构函数
  16. * */
  17. Async_Client::~Async_Client()
  18. {
  19. uninit();
  20. }
  21. //初始化, 默认重连
  22. Error_manager Async_Client::init(std::string ip, int port, Thread_safe_queue<Binary_buf*>* p_communication_data_queue,
  23. bool is_reconnection_flag )
  24. {
  25. if ( p_communication_data_queue == NULL )
  26. {
  27. return Error_manager(Error_code::POINTER_IS_NULL, Error_level::MINOR_ERROR,
  28. " POINTER IS NULL ");
  29. }
  30. m_is_reconnection_flag = is_reconnection_flag;
  31. m_check_connect_wait_time_ms = 0; // 检查连接线程 的等待时间, 第一次直接通过
  32. boost::asio::ip::tcp::endpoint t_ep(boost::asio::ip::address_v4::from_string(ip), port);
  33. m_ep = t_ep;
  34. mp_communication_data_queue = p_communication_data_queue;
  35. //接受线程, 默认等待
  36. m_condition_receive.reset(false, false, false);
  37. mp_thread_receive = new std::thread(&Async_Client::thread_receive, this);
  38. //检查连接线程, 默认等待, 内部的wait_for会让其通过
  39. m_condition_check_connect.reset(false, false, false);
  40. mp_thread_check_connect = new std::thread(&Async_Client::thread_check_connect, this);
  41. return Error_code::SUCCESS;
  42. }
  43. //反初始化
  44. Error_manager Async_Client::uninit()
  45. {
  46. //注注注注注意了, 这里一定要在关闭通信线程之前, 关闭socket,
  47. //因为异步操作自带一个线程, 这个线程可能会卡住我们的线程...
  48. //关闭通信
  49. m_socket.close();
  50. //杀死2个线程,强制退出
  51. if (mp_thread_receive)
  52. {
  53. m_condition_receive.kill_all();
  54. }
  55. if (mp_thread_check_connect)
  56. {
  57. m_condition_check_connect.kill_all();
  58. }
  59. //回收2个线程的资源
  60. if (mp_thread_receive)
  61. {
  62. mp_thread_receive->join();
  63. delete mp_thread_receive;
  64. mp_thread_receive = NULL;
  65. }
  66. if (mp_thread_check_connect)
  67. {
  68. mp_thread_check_connect->join();
  69. delete mp_thread_check_connect;
  70. mp_thread_check_connect = NULL;
  71. }
  72. m_io_service.stop();
  73. mp_communication_data_queue = NULL;
  74. if ( m_communication_status != E_FAULT )
  75. {
  76. m_communication_status = E_UNKNOWN;
  77. }
  78. return Error_code::SUCCESS;
  79. }
  80. //检查状态
  81. Error_manager Async_Client::check_status()
  82. {
  83. if ( m_communication_status == E_READY )
  84. {
  85. return Error_code::SUCCESS;
  86. }
  87. else if(m_communication_status == E_UNKNOWN)
  88. {
  89. return Error_manager(Error_code::WJ_LIDAR_COMMUNICATION_UNINITIALIZED, Error_level::MINOR_ERROR,
  90. " Async_Client::check_status() error ");
  91. }
  92. else if(m_communication_status == E_DISCONNECT)
  93. {
  94. return Error_manager(Error_code::WJ_LIDAR_COMMUNICATION_DISCONNECT, Error_level::MINOR_ERROR,
  95. " Async_Client::check_status() error ");
  96. }
  97. else
  98. {
  99. return Error_manager(Error_code::WJ_LIDAR_COMMUNICATION_FAULT, Error_level::MINOR_ERROR,
  100. " Async_Client::check_status() error ");
  101. }
  102. }
  103. //判断是否正常
  104. bool Async_Client::is_ready()
  105. {
  106. return (m_communication_status == E_READY);
  107. }
  108. //获取状态
  109. Async_Client::Communication_status Async_Client::get_status()
  110. {
  111. return m_communication_status;
  112. }
  113. //线程接受函数,包括连接,重连和接受数据
  114. void Async_Client::thread_receive()
  115. {
  116. LOG(INFO) << " ---Async_Client::thread_receive start ---"<< this;
  117. //接受雷达消息,包括连接,重连和接受数据
  118. while (m_condition_receive.is_alive())
  119. {
  120. m_condition_receive.wait();
  121. if (m_condition_receive.is_alive())
  122. {
  123. std::this_thread::yield();
  124. // 连接成功后开启异步读
  125. client_async_read();
  126. }
  127. }
  128. LOG(INFO) << " Async_Client::thread_receive end :"<<this;
  129. return;
  130. }
  131. //线程检查连接函数,进行连接, 并检查消息是否按时更新, 如果超时, 那么触发重连
  132. void Async_Client::thread_check_connect()
  133. {
  134. LOG(INFO) << " ---Async_Client::thread_check_connect start ---"<< this;
  135. //检查连接, 连接, 重连, 判断时间
  136. while (m_condition_check_connect.is_alive())
  137. {
  138. m_condition_check_connect.wait_for_millisecond(m_check_connect_wait_time_ms);
  139. if (m_condition_check_connect.is_alive())
  140. {
  141. std::this_thread::yield();
  142. switch ( (Communication_status)m_communication_status )
  143. {
  144. case E_UNKNOWN:
  145. {
  146. //连接
  147. socket_connect();
  148. break;
  149. }
  150. case E_DISCONNECT:
  151. {
  152. //重连
  153. socket_reconnect();
  154. break;
  155. }
  156. case E_READY:
  157. {
  158. //检查
  159. socket_check();
  160. break;
  161. }
  162. default:
  163. {
  164. break;
  165. }
  166. }
  167. }
  168. }
  169. LOG(INFO) << " Async_Client::thread_check_connect end :"<<this;
  170. return;
  171. }
  172. // 开启连接
  173. void Async_Client::socket_connect()
  174. {
  175. m_io_service.reset();
  176. //开启异步连接, 使用m_ep里面的ip和port进行tcp异步连接
  177. // 只有在run()之后, 才会真正的执行连接函数
  178. m_socket.async_connect(m_ep, boost::bind(&Async_Client::handle_connect, this, boost::asio::placeholders::error));
  179. //注注注注注注意了:async_connect是阻塞函数, 如果连接不上就会一直卡在这, 而不是通过之后, 回调返回错误码.
  180. //这个函数大约1分钟之后就会通过, 并调用里面的回调函数,返回我们错误码.
  181. //如果硬件恢复连接, 那么这个函数会立即连上,
  182. //如果我们中途退出, 一定要 m_socket.close()退出异步通信他自己后台的线程, 防止这里卡死, 影响我们的线程关闭........
  183. m_io_service.run();
  184. return;
  185. }
  186. //关闭连接
  187. void Async_Client::socket_close()
  188. {
  189. m_socket.close();
  190. return;
  191. }
  192. // 重新连接
  193. void Async_Client::socket_reconnect()
  194. {
  195. if (m_is_reconnection_flag)
  196. {
  197. //关闭原有的连接
  198. socket_close();
  199. // 开启连接
  200. socket_connect();
  201. }
  202. //否则就在这无限等待下去
  203. return;
  204. }
  205. // 检查连接
  206. void Async_Client::socket_check()
  207. {
  208. // 判断距上次读取到数据超时时间,过长则主动断开并重连
  209. int duration = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - m_data_updata_time).count();
  210. if (duration > READ_TIMEOUT_MILLISECONDS)
  211. {
  212. LOG(WARNING) << "connection check thread found read timeout, close socket and try to reinitialize/reconnect.";
  213. // 通信超时后, 修改状态为断连,
  214. m_communication_status = E_DISCONNECT;
  215. //检查连接等待时间变为 RECONNECTION_WAIT_TIME_MS,
  216. m_check_connect_wait_time_ms = RECONNECTION_WAIT_TIME_MS;
  217. //关闭接受线程
  218. m_condition_receive.notify_all(false);
  219. }
  220. //否则不做处理
  221. return;
  222. }
  223. // 异步写入
  224. void Async_Client::client_async_write(char *buf, int len)
  225. {
  226. m_io_service.run();
  227. boost::asio::async_write(m_socket,
  228. boost::asio::buffer(buf, len),
  229. boost::bind(&Async_Client::handle_write, this,
  230. boost::asio::placeholders::error));
  231. m_io_service.run();
  232. return;
  233. }
  234. // 异步读取
  235. void Async_Client::client_async_read()
  236. {
  237. //异步读取, 在run()之后, 会自动接受数据, 并存入mp_data_buf里面,
  238. // 然后调用回调函数handle_read(), bytes_transferred是返回的数据有效长度
  239. m_io_service.reset();
  240. m_socket.async_read_some(boost::asio::buffer(m_data_buf, DATA_LENGTH_MAX),
  241. boost::bind(&Async_Client::handle_read, this,
  242. boost::asio::placeholders::error,
  243. boost::asio::placeholders::bytes_transferred));
  244. //注:async_read_some, 如果断连, 那么就会回调handle_read, 并填充错误码 boost::asio::placeholders::error
  245. m_io_service.run();
  246. return;
  247. }
  248. //异步连接回调, 失败后重连
  249. void Async_Client::handle_connect(const boost::system::error_code &error)
  250. {
  251. if (!error)
  252. {
  253. LOG(INFO) << "Async_Client::handle_connect Connection Successful! " << m_ep.address().to_string() << ":" << m_ep.port();
  254. // 连接成功后修改状态为正常待机,
  255. m_communication_status = E_READY;
  256. //检查连接等待时间变为 CHECK_WAIT_TIME_MS,
  257. m_check_connect_wait_time_ms = CHECK_WAIT_TIME_MS;
  258. //唤醒接受线程
  259. m_condition_receive.notify_all(true);
  260. }
  261. else
  262. {
  263. LOG(WARNING) << "connect failed, " << boost::system::system_error(error).what() << ", waiting for reconnection " << m_ep.address().to_string() << ":" << m_ep.port();
  264. // 连接失败后修改状态为断连,
  265. m_communication_status = E_DISCONNECT;
  266. //检查连接等待时间变为 RECONNECTION_WAIT_TIME_MS,
  267. m_check_connect_wait_time_ms = RECONNECTION_WAIT_TIME_MS;
  268. //关闭接受线程
  269. m_condition_receive.notify_all(false);
  270. }
  271. return;
  272. }
  273. //异步读取回调, 失败后重连. bytes_transferred是返回的数据有效长度
  274. void Async_Client::handle_read(const boost::system::error_code &error,
  275. size_t bytes_transferred)
  276. {
  277. if (!error)
  278. {
  279. Binary_buf * tp_binary_buf = new Binary_buf(m_data_buf, bytes_transferred);
  280. if ( mp_communication_data_queue != NULL )
  281. {
  282. //将数据添加到队列中, 然后内存权限转交给队列.
  283. mp_communication_data_queue->push(tp_binary_buf);
  284. }
  285. //更新时间
  286. m_data_updata_time = std::chrono::system_clock::now();
  287. }
  288. else
  289. {
  290. LOG(WARNING) << "handle_read, " << boost::system::system_error(error).what() << ", waiting for reconnection " << m_ep.address().to_string() << ":" << m_ep.port();
  291. // 读取失败后修改状态为断连,
  292. m_communication_status = E_DISCONNECT;
  293. //检查连接等待时间变为 RECONNECTION_WAIT_TIME_MS,
  294. m_check_connect_wait_time_ms = RECONNECTION_WAIT_TIME_MS;
  295. //关闭接受线程
  296. m_condition_receive.notify_all(false);
  297. }
  298. return;
  299. }
  300. //异步写入回调, 失败后重连
  301. void Async_Client::handle_write(const boost::system::error_code &error)
  302. {
  303. if (!error)
  304. {
  305. //发送成功, 暂时不做处理
  306. // LOG(INFO) << "handle write no error";
  307. }
  308. else
  309. {
  310. LOG(WARNING) << "handle_write, " << boost::system::system_error(error).what() << ", waiting for reconnection " << m_ep.address().to_string() << ":" << m_ep.port();
  311. // 读取失败后修改状态为断连,
  312. m_communication_status = E_DISCONNECT;
  313. //检查连接等待时间变为 RECONNECTION_WAIT_TIME_MS,
  314. m_check_connect_wait_time_ms = RECONNECTION_WAIT_TIME_MS;
  315. //关闭接受线程
  316. m_condition_receive.notify_all(false);
  317. }
  318. return;
  319. }