async_client.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314
  1. #include "async_client.h"
  2. // 构造
  3. Async_Client::Async_Client() : m_socket(m_io_service) {
  4. m_communication_status = E_UNKNOWN; //通信状态
  5. m_is_reconnection_flag = false; // 断线是否重连的标志位, true:断线自动重连, false:断线不重连, 保持断线状态.
  6. mp_thread_receive = NULL; // 接受线程, 内存由本模块管理
  7. mp_thread_check_connect = NULL; // 检查线程, 内存由本模块管理
  8. m_check_connect_wait_time_ms = 0; // 检查连接线程 的等待时间
  9. m_data_updata_time = std::chrono::system_clock::now() - std::chrono::hours(1);
  10. memset(m_data_buf, 0, DATA_LENGTH_MAX);
  11. mp_communication_data_queue = NULL;
  12. }
  13. /**
  14. * 析构函数
  15. * */
  16. Async_Client::~Async_Client() {
  17. uninit();
  18. }
  19. //初始化, 默认重连
  20. Error_manager Async_Client::init(std::string ip, int port, Thread_safe_queue<Binary_buf *> *p_communication_data_queue,
  21. bool is_reconnection_flag) {
  22. if (p_communication_data_queue == NULL) {
  23. return Error_manager(Error_code::POINTER_IS_NULL, Error_level::MINOR_ERROR,
  24. " POINTER IS NULL ");
  25. }
  26. m_is_reconnection_flag = is_reconnection_flag;
  27. m_check_connect_wait_time_ms = 0; // 检查连接线程 的等待时间, 第一次直接通过
  28. boost::asio::ip::tcp::endpoint t_ep(boost::asio::ip::address_v4::from_string(ip), port);
  29. m_ep = t_ep;
  30. mp_communication_data_queue = p_communication_data_queue;
  31. //接受线程, 默认等待
  32. m_condition_receive.reset(false, false, false);
  33. mp_thread_receive = new std::thread(&Async_Client::thread_receive, this);
  34. //检查连接线程, 默认等待, 内部的wait_for会让其通过
  35. m_condition_check_connect.reset(false, false, false);
  36. mp_thread_check_connect = new std::thread(&Async_Client::thread_check_connect, this);
  37. return Error_code::SUCCESS;
  38. }
  39. //反初始化
  40. Error_manager Async_Client::uninit() {
  41. //注注注注注意了, 这里一定要在关闭通信线程之前, 关闭socket,
  42. //因为异步操作自带一个线程, 这个线程可能会卡住我们的线程...
  43. //关闭通信
  44. m_socket.close();
  45. //杀死2个线程,强制退出
  46. if (mp_thread_receive) {
  47. m_condition_receive.kill_all();
  48. }
  49. if (mp_thread_check_connect) {
  50. m_condition_check_connect.kill_all();
  51. }
  52. //回收2个线程的资源
  53. if (mp_thread_receive) {
  54. mp_thread_receive->join();
  55. delete mp_thread_receive;
  56. mp_thread_receive = NULL;
  57. }
  58. if (mp_thread_check_connect) {
  59. mp_thread_check_connect->join();
  60. delete mp_thread_check_connect;
  61. mp_thread_check_connect = NULL;
  62. }
  63. m_io_service.stop();
  64. mp_communication_data_queue = NULL;
  65. if (m_communication_status != E_FAULT) {
  66. m_communication_status = E_UNKNOWN;
  67. }
  68. return Error_code::SUCCESS;
  69. }
  70. //检查状态
  71. Error_manager Async_Client::check_status() {
  72. if (m_communication_status == E_READY) {
  73. return Error_code::SUCCESS;
  74. } else if (m_communication_status == E_UNKNOWN) {
  75. return Error_manager(Error_code::WJ_LIDAR_COMMUNICATION_UNINITIALIZED, Error_level::MINOR_ERROR,
  76. " Async_Client::check_status() error ");
  77. } else if (m_communication_status == E_DISCONNECT) {
  78. return Error_manager(Error_code::WJ_LIDAR_COMMUNICATION_DISCONNECT, Error_level::MINOR_ERROR,
  79. " Async_Client::check_status() error ");
  80. } else {
  81. return Error_manager(Error_code::WJ_LIDAR_COMMUNICATION_FAULT, Error_level::MINOR_ERROR,
  82. " Async_Client::check_status() error ");
  83. }
  84. }
  85. //判断是否正常
  86. bool Async_Client::is_ready() {
  87. return (m_communication_status == E_READY);
  88. }
  89. //获取状态
  90. Async_Client::Communication_status Async_Client::get_status() {
  91. return m_communication_status;
  92. }
  93. //线程接受函数,包括连接,重连和接受数据
  94. void Async_Client::thread_receive() {
  95. LOG(INFO) << " ---Async_Client::thread_receive start ---" << this;
  96. //接受雷达消息,包括连接,重连和接受数据
  97. while (m_condition_receive.is_alive()) {
  98. m_condition_receive.wait();
  99. if (m_condition_receive.is_alive()) {
  100. std::this_thread::yield();
  101. // 连接成功后开启异步读
  102. client_async_read();
  103. }
  104. }
  105. LOG(INFO) << " Async_Client::thread_receive end :" << this;
  106. return;
  107. }
  108. //线程检查连接函数,进行连接, 并检查消息是否按时更新, 如果超时, 那么触发重连
  109. void Async_Client::thread_check_connect() {
  110. LOG(INFO) << " ---Async_Client::thread_check_connect start ---" << this;
  111. //检查连接, 连接, 重连, 判断时间
  112. while (m_condition_check_connect.is_alive()) {
  113. m_condition_check_connect.wait_for_millisecond(m_check_connect_wait_time_ms);
  114. if (m_condition_check_connect.is_alive()) {
  115. std::this_thread::yield();
  116. switch ((Communication_status) m_communication_status) {
  117. case E_UNKNOWN: {
  118. //连接
  119. socket_connect();
  120. break;
  121. }
  122. case E_DISCONNECT: {
  123. //重连
  124. socket_reconnect();
  125. break;
  126. }
  127. case E_READY: {
  128. //检查
  129. socket_check();
  130. break;
  131. }
  132. default: {
  133. break;
  134. }
  135. }
  136. }
  137. }
  138. LOG(INFO) << " Async_Client::thread_check_connect end :" << this;
  139. }
  140. // 开启连接
  141. void Async_Client::socket_connect() {
  142. m_io_service.reset();
  143. //开启异步连接, 使用m_ep里面的ip和port进行tcp异步连接
  144. // 只有在run()之后, 才会真正的执行连接函数
  145. m_socket.async_connect(m_ep, boost::bind(&Async_Client::handle_connect, this, boost::asio::placeholders::error));
  146. //注注注注注注意了:async_connect是阻塞函数, 如果连接不上就会一直卡在这, 而不是通过之后, 回调返回错误码.
  147. //这个函数大约1分钟之后就会通过, 并调用里面的回调函数,返回我们错误码.
  148. //如果硬件恢复连接, 那么这个函数会立即连上,
  149. //如果我们中途退出, 一定要 m_socket.close()退出异步通信他自己后台的线程, 防止这里卡死, 影响我们的线程关闭........
  150. m_io_service.run();
  151. }
  152. //关闭连接
  153. void Async_Client::socket_close() {
  154. m_socket.close();
  155. }
  156. // 重新连接
  157. void Async_Client::socket_reconnect() {
  158. if (m_is_reconnection_flag) {
  159. //关闭原有的连接
  160. socket_close();
  161. // 开启连接
  162. socket_connect();
  163. }
  164. }
  165. // 检查连接
  166. void Async_Client::socket_check() {
  167. // 判断距上次读取到数据超时时间,过长则主动断开并重连
  168. int duration = std::chrono::duration_cast<std::chrono::milliseconds>(
  169. std::chrono::system_clock::now() - m_data_updata_time).count();
  170. if (duration > READ_TIMEOUT_MILLISECONDS) {
  171. LOG(WARNING) << "connection check thread found read timeout, close socket and try to reinitialize/reconnect.";
  172. // 通信超时后, 修改状态为断连,
  173. m_communication_status = E_DISCONNECT;
  174. //检查连接等待时间变为 RECONNECTION_WAIT_TIME_MS,
  175. m_check_connect_wait_time_ms = RECONNECTION_WAIT_TIME_MS;
  176. //关闭接受线程
  177. m_condition_receive.notify_all(false);
  178. }
  179. }
  180. // 异步写入
  181. void Async_Client::client_async_write(char *buf, int len) {
  182. m_io_service.run();
  183. boost::asio::async_write(m_socket,
  184. boost::asio::buffer(buf, len),
  185. boost::bind(&Async_Client::handle_write, this,
  186. boost::asio::placeholders::error));
  187. m_io_service.run();
  188. }
  189. // 异步读取
  190. void Async_Client::client_async_read() {
  191. //异步读取, 在run()之后, 会自动接受数据, 并存入mp_data_buf里面,
  192. // 然后调用回调函数handle_read(), bytes_transferred是返回的数据有效长度
  193. m_io_service.reset();
  194. m_socket.async_read_some(boost::asio::buffer(m_data_buf, DATA_LENGTH_MAX),
  195. boost::bind(&Async_Client::handle_read, this,
  196. boost::asio::placeholders::error,
  197. boost::asio::placeholders::bytes_transferred));
  198. //注:async_read_some, 如果断连, 那么就会回调handle_read, 并填充错误码 boost::asio::placeholders::error
  199. m_io_service.run();
  200. }
  201. //异步连接回调, 失败后重连
  202. void Async_Client::handle_connect(const boost::system::error_code &error) {
  203. if (!error) {
  204. LOG(INFO) << "Async_Client::handle_connect Connection Successful! " << m_ep.address().to_string() << ":"
  205. << m_ep.port();
  206. // 连接成功后修改状态为正常待机,
  207. m_communication_status = E_READY;
  208. //检查连接等待时间变为 CHECK_WAIT_TIME_MS,
  209. m_check_connect_wait_time_ms = CHECK_WAIT_TIME_MS;
  210. //唤醒接受线程
  211. m_condition_receive.notify_all(true);
  212. } else {
  213. LOG(WARNING) << "connect failed, " << boost::system::system_error(error).what() << ", waiting for reconnection "
  214. << m_ep.address().to_string() << ":" << m_ep.port();
  215. // 连接失败后修改状态为断连,
  216. m_communication_status = E_DISCONNECT;
  217. //检查连接等待时间变为 RECONNECTION_WAIT_TIME_MS,
  218. m_check_connect_wait_time_ms = RECONNECTION_WAIT_TIME_MS;
  219. //关闭接受线程
  220. m_condition_receive.notify_all(false);
  221. }
  222. }
  223. //异步读取回调, 失败后重连. bytes_transferred是返回的数据有效长度
  224. void Async_Client::handle_read(const boost::system::error_code &error,
  225. size_t bytes_transferred) {
  226. if (!error) {
  227. Binary_buf *tp_binary_buf = new Binary_buf(m_data_buf, bytes_transferred);
  228. if (mp_communication_data_queue != NULL) {
  229. //将数据添加到队列中, 然后内存权限转交给队列.
  230. mp_communication_data_queue->push(tp_binary_buf);
  231. }
  232. //更新时间
  233. m_data_updata_time = std::chrono::system_clock::now();
  234. } else {
  235. LOG(WARNING) << "handle_read, " << boost::system::system_error(error).what() << ", waiting for reconnection "
  236. << m_ep.address().to_string() << ":" << m_ep.port();
  237. // 读取失败后修改状态为断连,
  238. m_communication_status = E_DISCONNECT;
  239. //检查连接等待时间变为 RECONNECTION_WAIT_TIME_MS,
  240. m_check_connect_wait_time_ms = RECONNECTION_WAIT_TIME_MS;
  241. //关闭接受线程
  242. m_condition_receive.notify_all(false);
  243. }
  244. }
  245. //异步写入回调, 失败后重连
  246. void Async_Client::handle_write(const boost::system::error_code &error) {
  247. if (!error) {
  248. //发送成功, 暂时不做处理
  249. // LOG(INFO) << "handle write no error";
  250. } else {
  251. LOG(WARNING) << "handle_write, " << boost::system::system_error(error).what() << ", waiting for reconnection "
  252. << m_ep.address().to_string() << ":" << m_ep.port();
  253. // 读取失败后修改状态为断连,
  254. m_communication_status = E_DISCONNECT;
  255. //检查连接等待时间变为 RECONNECTION_WAIT_TIME_MS,
  256. m_check_connect_wait_time_ms = RECONNECTION_WAIT_TIME_MS;
  257. //关闭接受线程
  258. m_condition_receive.notify_all(false);
  259. }
  260. }