async_client.h 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. /*
  2. * //启动异步服务
  3. *
  4. * 必须使用如下三步
  5. m_io_service.reset();
  6. //异步事件函数
  7. m_io_service.run();
  8. //run运行之后, 代码会卡在run()这里
  9. //socket.async_connect() socket.async_read_some() boost::asio::async_write()等函数内部才会真正的执行,
  10. // 执行完成后就会自动调用里面的回调函数,
  11. //里面的事件函数执行完之后, 才会结束run(), 此时才会执行run()后面的代码
  12. //异步通信的机制就是, 如上3个函数,在被调用的时候会先跳过,
  13. //并执行后面的其他代码, 只有在run之后, 才会启动如上3个函数,
  14. //由m_io_service.run() m_io_service.stop() m_io_service.reset() m_socket.close() 来控制
  15. //功能有点像线程的条件变量.
  16. //注, run()函数只能启动前面的异步事件,
  17. 在run之后, 如果还有异步事件, 那么需要reset和run再次使用.
  18. //注注注注注意了
  19. //m_io_service.run(), m_io_service.stop() , m_io_service.reset()只是控制异步服务的事件函数
  20. //m_socket.close() 控制通信
  21. * */
  22. #ifndef ASYNC_CLIENT_H
  23. #define ASYNC_CLIENT_H
  24. #include <iostream>
  25. #include <stdio.h>
  26. #include <time.h>
  27. #include <boost/asio.hpp>
  28. #include <boost/bind.hpp>
  29. #include <boost/thread.hpp>
  30. #include <chrono>
  31. #include <mutex>
  32. #include <thread>
  33. #include <atomic>
  34. #include <string>
  35. #include <unistd.h>
  36. #include "glog/logging.h"
  37. #include "error_code/error_code.hpp"
  38. #include "tool/thread_condition.h"
  39. #include "tool/binary_buf.h"
  40. #include "tool/thread_safe_queue.hpp"
  41. /**
  42. * 异步tcp通信客户端
  43. * */
  44. class Async_Client {
  45. public:
  46. #define DATA_LENGTH_MAX 50000 //接受消息的缓存长度50000byte, 实际通信数据不能超过它, 如果超过了, 则需要增大
  47. #define RECONNECTION_WAIT_TIME_MS 1000 //每隔1000ms重连一次
  48. #define CHECK_WAIT_TIME_MS 75 //每隔75ms检查一次通信, 万集雷达15HZ, 必须要大于一个通信周期, 防止重连之后没有及时刷新消息就又超时断连了
  49. #define READ_TIMEOUT_MILLISECONDS 5000 //通信数据更新 的超时时间5000ms, 如果数据一直不更新那么就断开然后重连
  50. //注:隐藏bug:如果通信正常,但是服务端
  51. typedef void (*fundata_t)(const char *data, const int len, const void *p);
  52. typedef void (*Callback_function)(const char *data, const int len, const void *p);
  53. //通信状态
  54. enum Communication_status {//default = 0
  55. E_UNKNOWN = 0, //未知
  56. E_READY = 1, //正常待机
  57. E_DISCONNECT = 2, //断连
  58. E_FAULT = 10, //故障
  59. };
  60. public:
  61. // 构造
  62. Async_Client();
  63. // 析构
  64. ~Async_Client();
  65. //初始化, 默认重连
  66. Error_manager init(std::string ip, int port, Thread_safe_queue<Binary_buf *> *p_communication_data_queue,
  67. bool is_reconnection_flag = true);
  68. //反初始化
  69. Error_manager uninit();
  70. //检查状态
  71. Error_manager check_status();
  72. //判断是否正常
  73. bool is_ready();
  74. //获取状态
  75. Communication_status get_status();
  76. protected:
  77. //线程接受函数,自动接受数据到 mp_data_buf, 然后触发回调函数,
  78. void thread_receive();
  79. //线程检查连接函数,进行连接, 并检查消息是否按时更新, 如果超时, 那么触发重连
  80. void thread_check_connect();
  81. // 开启连接
  82. void socket_connect();
  83. // 关闭连接
  84. void socket_close();
  85. // 重新连接
  86. void socket_reconnect();
  87. // 检查连接
  88. void socket_check();
  89. // 异步写入
  90. void client_async_write(char *buf, int len);
  91. // 异步读取
  92. void client_async_read();
  93. // 异步连接回调
  94. void handle_connect(const boost::system::error_code &error);
  95. //异步读取回调, 失败后重连. bytes_transferred是返回的数据有效长度
  96. void handle_read(const boost::system::error_code &error, size_t bytes_transferred);
  97. // 异步写回调
  98. void handle_write(const boost::system::error_code &error);
  99. protected:
  100. //状态
  101. std::atomic<Communication_status> m_communication_status; //通信状态
  102. std::atomic<bool> m_is_reconnection_flag; // 断线是否重连的标志位, true:断线自动重连, false:断线不重连, 保持断线状态.
  103. //通信
  104. //把m_io_service作为参数, 构造m_socket, 那么m_io_service.run()可以异步启动m_socket的通信
  105. boost::asio::io_service m_io_service; // 异步控制服务, 负责异步操作, run启动异步服务, stop停止异步服务
  106. boost::asio::ip::tcp::socket m_socket; // socket句柄, 负责tcp通信, 主要是connect,read,write,close
  107. //m_socket.async_connect() 传入m_ep, 可以tcp连接 指定的ip和port
  108. boost::asio::ip::tcp::endpoint m_ep; // 连接参数, 主要是ip和port
  109. //数据
  110. char m_data_buf[DATA_LENGTH_MAX]; // 通信消息接受缓存, 默认50000byte
  111. std::chrono::system_clock::time_point m_data_updata_time; // 数据更新时间
  112. Thread_safe_queue<Binary_buf *> *mp_communication_data_queue; //通信数据队列, 内存由上级管理
  113. //接受数据的线程, 自动接受数据到 mp_data_buf, 然后存入队列
  114. std::thread *mp_thread_receive; // 接受线程, 内存由本模块管理
  115. Thread_condition m_condition_receive; //接受线程的条件变量
  116. // 检查连接线程, 进行连接, 并检查消息是否按时更新, 如果超时, 那么触发重连
  117. std::thread *mp_thread_check_connect; // 检查连接线程, 内存由本模块管理
  118. Thread_condition m_condition_check_connect; //检查连接线程的条件变量
  119. int m_check_connect_wait_time_ms; // 检查连接线程 的等待时间
  120. };
  121. #endif // ASYNC_CLIENT_H
  122. /*
  123. * boost::asio::io_service使用时的注意事项:
  124. ①请让boost::asio::io_service和boost::asio::io_service::work搭配使用。
  125. ②想让event按照进入(strand)时的顺序被执行,需要boost::asio::io_service要和boost::asio::io_service::strand搭配使用。
  126. ③一般情况下,io_service的成员函数的使用顺序:
  127. boost::asio::io_service构造,
  128. boost::asio::io_service::run(),
  129. boost::asio::io_service::stop(),
  130. boost::asio::io_service::reset(),
  131. boost::asio::io_service::run(),
  132. ......
  133. boost::asio::io_service析构,
  134. ④不论有没有使用io_service::work,run()都会执行完io_service里面的event,(若没有用work,run就会退出)。
  135. ⑤一个新创建的io_service不需要执行reset()函数。
  136. ⑥在调用stop()后,在调用run()之前,请先调用reset()函数。
  137. ⑦函数stop()和reset()并不能清除掉io_service里面尚未执行的event。
  138. 我个人认为,也只有析构掉io_service,才能清空它里面的那些尚未执行的event了。(可以用智能指针)。
  139. ⑧函数stop(),stopped(),reset(),很简单,请单步调试,以明白它在函数里做了什么。
  140. */