async_client.h 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. /*
  2. * //启动异步服务
  3. *
  4. * 必须使用如下三步
  5. m_io_service.reset();
  6. //异步事件函数
  7. m_io_service.run();
  8. //run运行之后, 代码会卡在run()这里
  9. //socket.async_connect() socket.async_read_some() boost::asio::async_write()等函数内部才会真正的执行,
  10. // 执行完成后就会自动调用里面的回调函数,
  11. //里面的事件函数执行完之后, 才会结束run(), 此时才会执行run()后面的代码
  12. //异步通信的机制就是, 如上3个函数,在被调用的时候会先跳过,
  13. //并执行后面的其他代码, 只有在run之后, 才会启动如上3个函数,
  14. //由m_io_service.run() m_io_service.stop() m_io_service.reset() m_socket.close() 来控制
  15. //功能有点像线程的条件变量.
  16. //注, run()函数只能启动前面的异步事件,
  17. 在run之后, 如果还有异步事件, 那么需要reset和run再次使用.
  18. //注注注注注意了
  19. //m_io_service.run(), m_io_service.stop() , m_io_service.reset()只是控制异步服务的事件函数
  20. //m_socket.close() 控制通信
  21. * */
  22. #ifndef ASYNC_CLIENT_H
  23. #define ASYNC_CLIENT_H
  24. #include <iostream>
  25. #include <stdio.h>
  26. #include <time.h>
  27. #include <boost/asio.hpp>
  28. #include <boost/bind.hpp>
  29. #include <boost/thread.hpp>
  30. #include <chrono>
  31. #include <mutex>
  32. #include <thread>
  33. #include <atomic>
  34. #include <string>
  35. #include <unistd.h>
  36. #include "glog/logging.h"
  37. #include "../error_code/error_code.h"
  38. #include "../tool/thread_condition.h"
  39. #include "../tool/binary_buf.h"
  40. #include "../tool/thread_safe_queue.h"
  41. #include "../tool/common_data.h"
  42. /**
  43. * 异步tcp通信客户端
  44. * */
  45. class Async_Client
  46. {
  47. public:
  48. #define DATA_LENGTH_MAX 50000 //接受消息的缓存长度50000byte, 实际通信数据不能超过它, 如果超过了, 则需要增大
  49. #define RECONNECTION_WAIT_TIME_MS 1000 //每隔1000ms重连一次
  50. #define CHECK_WAIT_TIME_MS WANJI_716_SCAN_CYCLE_MS //每隔66ms检查一次通信, 万集雷达15HZ, 必须要大于一个通信周期, 防止重连之后没有及时刷新消息就又超时断连了
  51. #define READ_TIMEOUT_MILLISECONDS 5000 //通信数据更新 的超时时间5000ms, 如果数据一直不更新那么就断开然后重连
  52. //注:隐藏bug:如果通信正常,但是服务端
  53. typedef void (*fundata_t)(const char *data, const int len, const void *p);
  54. typedef void (*Callback_function)(const char *data, const int len, const void *p);
  55. //通信状态
  56. enum Communication_status
  57. {//default = 0
  58. E_UNKNOWN = 0, //未知
  59. E_READY = 1, //正常待机
  60. E_DISCONNECT = 2, //断连
  61. E_FAULT =10, //故障
  62. };
  63. public:
  64. // 构造
  65. Async_Client();
  66. // 析构
  67. ~Async_Client();
  68. //初始化, 默认重连
  69. Error_manager init(std::string ip, int port, Thread_safe_queue<Binary_buf*>* p_communication_data_queue,
  70. bool is_reconnection_flag = true);
  71. //反初始化
  72. Error_manager uninit();
  73. //检查状态
  74. Error_manager check_status();
  75. //判断是否正常
  76. bool is_ready();
  77. //获取状态
  78. Communication_status get_status();
  79. protected:
  80. //线程接受函数,自动接受数据到 mp_data_buf, 然后触发回调函数,
  81. void thread_receive();
  82. //线程检查连接函数,进行连接, 并检查消息是否按时更新, 如果超时, 那么触发重连
  83. void thread_check_connect();
  84. // 开启连接
  85. void socket_connect();
  86. // 关闭连接
  87. void socket_close();
  88. // 重新连接
  89. void socket_reconnect();
  90. // 检查连接
  91. void socket_check();
  92. // 异步写入
  93. void client_async_write(char *buf, int len);
  94. // 异步读取
  95. void client_async_read();
  96. // 异步连接回调
  97. void handle_connect(const boost::system::error_code &error);
  98. //异步读取回调, 失败后重连. bytes_transferred是返回的数据有效长度
  99. void handle_read(const boost::system::error_code &error, size_t bytes_transferred);
  100. // 异步写回调
  101. void handle_write(const boost::system::error_code &error);
  102. protected:
  103. //状态
  104. std::atomic<Communication_status> m_communication_status; //通信状态
  105. std::atomic<bool> m_is_reconnection_flag; // 断线是否重连的标志位, true:断线自动重连, false:断线不重连, 保持断线状态.
  106. //通信
  107. //把m_io_service作为参数, 构造m_socket, 那么m_io_service.run()可以异步启动m_socket的通信
  108. boost::asio::io_service m_io_service; // 异步控制服务, 负责异步操作, run启动异步服务, stop停止异步服务
  109. boost::asio::ip::tcp::socket m_socket; // socket句柄, 负责tcp通信, 主要是connect,read,write,close
  110. //m_socket.async_connect() 传入m_ep, 可以tcp连接 指定的ip和port
  111. boost::asio::ip::tcp::endpoint m_ep; // 连接参数, 主要是ip和port
  112. //数据
  113. char m_data_buf[DATA_LENGTH_MAX]; // 通信消息接受缓存, 默认50000byte
  114. std::chrono::system_clock::time_point m_data_updata_time; // 数据更新时间
  115. Thread_safe_queue<Binary_buf*>* mp_communication_data_queue; //通信数据队列, 内存由上级管理
  116. //接受数据的线程, 自动接受数据到 mp_data_buf, 然后存入队列
  117. std::thread * mp_thread_receive; // 接受线程, 内存由本模块管理
  118. Thread_condition m_condition_receive; //接受线程的条件变量
  119. // 检查连接线程, 进行连接, 并检查消息是否按时更新, 如果超时, 那么触发重连
  120. std::thread * mp_thread_check_connect; // 检查连接线程, 内存由本模块管理
  121. Thread_condition m_condition_check_connect; //检查连接线程的条件变量
  122. int m_check_connect_wait_time_ms; // 检查连接线程 的等待时间
  123. };
  124. #endif // ASYNC_CLIENT_H
  125. /*
  126. * boost::asio::io_service使用时的注意事项:
  127. ①请让boost::asio::io_service和boost::asio::io_service::work搭配使用。
  128. ②想让event按照进入(strand)时的顺序被执行,需要boost::asio::io_service要和boost::asio::io_service::strand搭配使用。
  129. ③一般情况下,io_service的成员函数的使用顺序:
  130. boost::asio::io_service构造,
  131. boost::asio::io_service::run(),
  132. boost::asio::io_service::stop(),
  133. boost::asio::io_service::reset(),
  134. boost::asio::io_service::run(),
  135. ......
  136. boost::asio::io_service析构,
  137. ④不论有没有使用io_service::work,run()都会执行完io_service里面的event,(若没有用work,run就会退出)。
  138. ⑤一个新创建的io_service不需要执行reset()函数。
  139. ⑥在调用stop()后,在调用run()之前,请先调用reset()函数。
  140. ⑦函数stop()和reset()并不能清除掉io_service里面尚未执行的event。
  141. 我个人认为,也只有析构掉io_service,才能清空它里面的那些尚未执行的event了。(可以用智能指针)。
  142. ⑧函数stop(),stopped(),reset(),很简单,请单步调试,以明白它在函数里做了什么。
  143. */