async_client.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. #include "async_client.h"
  2. // 构造
  3. Async_Client::Async_Client() : m_socket(m_io_service) {
  4. m_communication_status = E_UNKNOWN; //通信状态
  5. m_is_reconnection_flag = false; // 断线是否重连的标志位, true:断线自动重连, false:断线不重连, 保持断线状态.
  6. mp_thread_receive = nullptr; // 接受线程, 内存由本模块管理
  7. mp_thread_check_connect = nullptr; // 检查线程, 内存由本模块管理
  8. m_check_connect_wait_time_ms = 0; // 检查连接线程 的等待时间
  9. m_data_updata_time = std::chrono::system_clock::now() - std::chrono::hours(1);
  10. memset(m_data_buf, 0, DATA_LENGTH_MAX);
  11. mp_communication_data_queue = nullptr;
  12. }
  13. /**
  14. * 析构函数
  15. * */
  16. Async_Client::~Async_Client() {
  17. uninit();
  18. }
  19. //初始化, 默认重连
  20. Error_manager Async_Client::init(const std::string& ip, int port, Thread_safe_queue<Binary_buf *> *p_communication_data_queue,
  21. bool is_reconnection_flag) {
  22. if (p_communication_data_queue == nullptr) {
  23. return {Error_code::POINTER_IS_NULL, Error_level::MINOR_ERROR,
  24. " POINTER IS NULL "};
  25. }
  26. m_is_reconnection_flag = is_reconnection_flag;
  27. m_check_connect_wait_time_ms = 0; // 检查连接线程 的等待时间, 第一次直接通过
  28. boost::asio::ip::tcp::endpoint t_ep(boost::asio::ip::address_v4::from_string(ip), port);
  29. m_ep = t_ep;
  30. mp_communication_data_queue = p_communication_data_queue;
  31. //接受线程, 默认等待
  32. m_condition_receive.reset(false, false, false);
  33. mp_thread_receive = new std::thread(&Async_Client::thread_receive, this);
  34. //检查连接线程, 默认等待, 内部的wait_for会让其通过
  35. m_condition_check_connect.reset(false, false, false);
  36. mp_thread_check_connect = new std::thread(&Async_Client::thread_check_connect, this);
  37. return Error_code::SUCCESS;
  38. }
  39. //反初始化
  40. Error_manager Async_Client::uninit() {
  41. //注注注注注意了, 这里一定要在关闭通信线程之前, 关闭socket,
  42. //因为异步操作自带一个线程, 这个线程可能会卡住我们的线程...
  43. //关闭通信
  44. m_socket.close();
  45. //杀死2个线程,强制退出
  46. if (mp_thread_receive) {
  47. m_condition_receive.kill_all();
  48. }
  49. if (mp_thread_check_connect) {
  50. m_condition_check_connect.kill_all();
  51. }
  52. //回收2个线程的资源
  53. if (mp_thread_receive) {
  54. mp_thread_receive->join();
  55. delete mp_thread_receive;
  56. mp_thread_receive = nullptr;
  57. }
  58. if (mp_thread_check_connect) {
  59. mp_thread_check_connect->join();
  60. delete mp_thread_check_connect;
  61. mp_thread_check_connect = nullptr;
  62. }
  63. m_io_service.stop();
  64. mp_communication_data_queue = nullptr;
  65. if (m_communication_status != E_FAULT) {
  66. m_communication_status = E_UNKNOWN;
  67. }
  68. return Error_code::SUCCESS;
  69. }
  70. //检查状态
  71. Error_manager Async_Client::check_status() {
  72. if (m_communication_status == E_READY) {
  73. return Error_code::SUCCESS;
  74. } else if (m_communication_status == E_UNKNOWN) {
  75. return {Error_code::WJ_LIDAR_COMMUNICATION_UNINITIALIZED, Error_level::MINOR_ERROR,
  76. " Async_Client::check_status() error "};
  77. } else if (m_communication_status == E_DISCONNECT) {
  78. return {Error_code::WJ_LIDAR_COMMUNICATION_DISCONNECT, Error_level::MINOR_ERROR,
  79. " Async_Client::check_status() error "};
  80. } else {
  81. return {Error_code::WJ_LIDAR_COMMUNICATION_FAULT, Error_level::MINOR_ERROR,
  82. " Async_Client::check_status() error "};
  83. }
  84. }
  85. //判断是否正常
  86. bool Async_Client::is_ready() {
  87. return (m_communication_status == E_READY);
  88. }
  89. //获取状态
  90. Async_Client::Communication_status Async_Client::get_status() {
  91. return m_communication_status;
  92. }
  93. //线程接受函数,包括连接,重连和接受数据
  94. void Async_Client::thread_receive() {
  95. LOG(INFO) << " ---Async_Client::thread_receive start ---" << this;
  96. //接受雷达消息,包括连接,重连和接受数据
  97. while (m_condition_receive.is_alive()) {
  98. m_condition_receive.wait();
  99. if (m_condition_receive.is_alive()) {
  100. std::this_thread::yield();
  101. // 连接成功后开启异步读
  102. client_async_read();
  103. }
  104. }
  105. LOG(INFO) << " Async_Client::thread_receive end :" << this;
  106. }
  107. //线程检查连接函数,进行连接, 并检查消息是否按时更新, 如果超时, 那么触发重连
  108. void Async_Client::thread_check_connect() {
  109. LOG(INFO) << " ---Async_Client::thread_check_connect start ---" << this;
  110. //检查连接, 连接, 重连, 判断时间
  111. while (m_condition_check_connect.is_alive()) {
  112. m_condition_check_connect.wait_for_millisecond(m_check_connect_wait_time_ms);
  113. if (m_condition_check_connect.is_alive()) {
  114. std::this_thread::yield();
  115. switch ((Communication_status) m_communication_status) {
  116. case E_UNKNOWN: {
  117. //连接
  118. socket_connect();
  119. break;
  120. }
  121. case E_DISCONNECT: {
  122. //重连
  123. socket_reconnect();
  124. break;
  125. }
  126. case E_READY: {
  127. //检查
  128. socket_check();
  129. break;
  130. }
  131. default: {
  132. break;
  133. }
  134. }
  135. }
  136. }
  137. LOG(INFO) << " Async_Client::thread_check_connect end :" << this;
  138. }
  139. // 开启连接
  140. void Async_Client::socket_connect() {
  141. m_io_service.reset();
  142. //开启异步连接, 使用m_ep里面的ip和port进行tcp异步连接
  143. // 只有在run()之后, 才会真正的执行连接函数
  144. m_socket.async_connect(m_ep, boost::bind(&Async_Client::handle_connect, this, boost::asio::placeholders::error));
  145. //注注注注注注意了:async_connect是阻塞函数, 如果连接不上就会一直卡在这, 而不是通过之后, 回调返回错误码.
  146. //这个函数大约1分钟之后就会通过, 并调用里面的回调函数,返回我们错误码.
  147. //如果硬件恢复连接, 那么这个函数会立即连上,
  148. //如果我们中途退出, 一定要 m_socket.close()退出异步通信他自己后台的线程, 防止这里卡死, 影响我们的线程关闭........
  149. m_io_service.run();
  150. }
  151. //关闭连接
  152. void Async_Client::socket_close() {
  153. m_socket.close();
  154. }
  155. // 重新连接
  156. void Async_Client::socket_reconnect() {
  157. if (m_is_reconnection_flag) {
  158. //关闭原有的连接
  159. socket_close();
  160. // 开启连接
  161. socket_connect();
  162. }
  163. }
  164. // 检查连接
  165. void Async_Client::socket_check() {
  166. // 判断距上次读取到数据超时时间,过长则主动断开并重连
  167. int duration = std::chrono::duration_cast<std::chrono::milliseconds>(
  168. std::chrono::system_clock::now() - m_data_updata_time).count();
  169. if (duration > READ_TIMEOUT_MILLISECONDS) {
  170. LOG(WARNING) << "connection check thread found read timeout, close socket and try to reinitialize/reconnect.";
  171. // 通信超时后, 修改状态为断连,
  172. m_communication_status = E_DISCONNECT;
  173. //检查连接等待时间变为 RECONNECTION_WAIT_TIME_MS,
  174. m_check_connect_wait_time_ms = RECONNECTION_WAIT_TIME_MS;
  175. //关闭接受线程
  176. m_condition_receive.notify_all(false);
  177. }
  178. }
  179. // 异步写入
  180. void Async_Client::client_async_write(char *buf, int len) {
  181. m_io_service.run();
  182. boost::asio::async_write(m_socket,
  183. boost::asio::buffer(buf, len),
  184. boost::bind(&Async_Client::handle_write, this,
  185. boost::asio::placeholders::error));
  186. m_io_service.run();
  187. }
  188. // 异步读取
  189. void Async_Client::client_async_read() {
  190. //异步读取, 在run()之后, 会自动接受数据, 并存入mp_data_buf里面,
  191. // 然后调用回调函数handle_read(), bytes_transferred是返回的数据有效长度
  192. m_io_service.reset();
  193. m_socket.async_read_some(boost::asio::buffer(m_data_buf, DATA_LENGTH_MAX),
  194. boost::bind(&Async_Client::handle_read, this,
  195. boost::asio::placeholders::error,
  196. boost::asio::placeholders::bytes_transferred));
  197. //注:async_read_some, 如果断连, 那么就会回调handle_read, 并填充错误码 boost::asio::placeholders::error
  198. m_io_service.run();
  199. }
  200. //异步连接回调, 失败后重连
  201. void Async_Client::handle_connect(const boost::system::error_code &error) {
  202. if (!error) {
  203. LOG(INFO) << "Async_Client::handle_connect Connection Successful! " << m_ep.address().to_string() << ":"
  204. << m_ep.port();
  205. // 连接成功后修改状态为正常待机,
  206. m_communication_status = E_READY;
  207. //检查连接等待时间变为 CHECK_WAIT_TIME_MS,
  208. m_check_connect_wait_time_ms = CHECK_WAIT_TIME_MS;
  209. //唤醒接受线程
  210. m_condition_receive.notify_all(true);
  211. } else {
  212. LOG(WARNING) << "connect failed, " << boost::system::system_error(error).what() << ", waiting for reconnection "
  213. << m_ep.address().to_string() << ":" << m_ep.port();
  214. // 连接失败后修改状态为断连,
  215. m_communication_status = E_DISCONNECT;
  216. //检查连接等待时间变为 RECONNECTION_WAIT_TIME_MS,
  217. m_check_connect_wait_time_ms = RECONNECTION_WAIT_TIME_MS;
  218. //关闭接受线程
  219. m_condition_receive.notify_all(false);
  220. }
  221. }
  222. //异步读取回调, 失败后重连. bytes_transferred是返回的数据有效长度
  223. void Async_Client::handle_read(const boost::system::error_code &error,
  224. size_t bytes_transferred) {
  225. if (!error) {
  226. Binary_buf *tp_binary_buf = new Binary_buf(m_data_buf, bytes_transferred);
  227. if (mp_communication_data_queue != nullptr) {
  228. //将数据添加到队列中, 然后内存权限转交给队列.
  229. mp_communication_data_queue->push(tp_binary_buf);
  230. }
  231. //更新时间
  232. m_data_updata_time = std::chrono::system_clock::now();
  233. } else {
  234. LOG(WARNING) << "handle_read, " << boost::system::system_error(error).what() << ", waiting for reconnection "
  235. << m_ep.address().to_string() << ":" << m_ep.port();
  236. // 读取失败后修改状态为断连,
  237. m_communication_status = E_DISCONNECT;
  238. //检查连接等待时间变为 RECONNECTION_WAIT_TIME_MS,
  239. m_check_connect_wait_time_ms = RECONNECTION_WAIT_TIME_MS;
  240. //关闭接受线程
  241. m_condition_receive.notify_all(false);
  242. }
  243. }
  244. //异步写入回调, 失败后重连
  245. void Async_Client::handle_write(const boost::system::error_code &error) {
  246. if (!error) {
  247. //发送成功, 暂时不做处理
  248. // LOG(INFO) << "handle write no error";
  249. } else {
  250. LOG(WARNING) << "handle_write, " << boost::system::system_error(error).what() << ", waiting for reconnection "
  251. << m_ep.address().to_string() << ":" << m_ep.port();
  252. // 读取失败后修改状态为断连,
  253. m_communication_status = E_DISCONNECT;
  254. //检查连接等待时间变为 RECONNECTION_WAIT_TIME_MS,
  255. m_check_connect_wait_time_ms = RECONNECTION_WAIT_TIME_MS;
  256. //关闭接受线程
  257. m_condition_receive.notify_all(false);
  258. }
  259. }