async_client.h 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. /*
  2. * //启动异步服务
  3. *
  4. * 必须使用如下三步
  5. m_io_service.reset();
  6. //异步事件函数
  7. m_io_service.run();
  8. //run运行之后, 代码会卡在run()这里
  9. //socket.async_connect() socket.async_read_some() boost::asio::async_write()等函数内部才会真正的执行,
  10. // 执行完成后就会自动调用里面的回调函数,
  11. //里面的事件函数执行完之后, 才会结束run(), 此时才会执行run()后面的代码
  12. //异步通信的机制就是, 如上3个函数,在被调用的时候会先跳过,
  13. //并执行后面的其他代码, 只有在run之后, 才会启动如上3个函数,
  14. //由m_io_service.run() m_io_service.stop() m_io_service.reset() m_socket.close() 来控制
  15. //功能有点像线程的条件变量.
  16. //注, run()函数只能启动前面的异步事件,
  17. 在run之后, 如果还有异步事件, 那么需要reset和run再次使用.
  18. //注注注注注意了
  19. //m_io_service.run(), m_io_service.stop() , m_io_service.reset()只是控制异步服务的事件函数
  20. //m_socket.close() 控制通信
  21. * */
  22. #ifndef ASYNC_CLIENT_H
  23. #define ASYNC_CLIENT_H
  24. #include <iostream>
  25. #include <stdio.h>
  26. #include <time.h>
  27. #include <boost/asio.hpp>
  28. #include <boost/bind.hpp>
  29. #include <boost/thread.hpp>
  30. #include <chrono>
  31. #include <mutex>
  32. #include <thread>
  33. #include <atomic>
  34. #include <string>
  35. #include <unistd.h>
  36. #include "glog/logging.h"
  37. #include "../error_code/error_code.h"
  38. #include "../tool/thread_condition.h"
  39. #include "../tool/binary_buf.h"
  40. #include "../tool/thread_safe_queue.h"
  41. /**
  42. * 异步tcp通信客户端
  43. * */
  44. class Async_Client
  45. {
  46. public:
  47. #define DATA_LENGTH_MAX 50000 //接受消息的缓存长度50000byte, 实际通信数据不能超过它, 如果超过了, 则需要增大
  48. #define RECONNECTION_WAIT_TIME_MS 1000 //每隔1000ms重连一次
  49. #define CHECK_WAIT_TIME_MS 75 //每隔75ms检查一次通信, 万集雷达15HZ, 必须要大于一个通信周期, 防止重连之后没有及时刷新消息就又超时断连了
  50. #define READ_TIMEOUT_MILLISECONDS 5000 //通信数据更新 的超时时间5000ms, 如果数据一直不更新那么就断开然后重连
  51. //注:隐藏bug:如果通信正常,但是服务端
  52. typedef void (*fundata_t)(const char *data, const int len, const void *p);
  53. typedef void (*Callback_function)(const char *data, const int len, const void *p);
  54. //通信状态
  55. enum Communication_status
  56. {//default = 0
  57. E_UNKNOWN = 0, //未知
  58. E_READY = 1, //正常待机
  59. E_DISCONNECT = 2, //断连
  60. E_FAULT =10, //故障
  61. };
  62. public:
  63. // 构造
  64. Async_Client();
  65. // 析构
  66. ~Async_Client();
  67. //初始化, 默认重连
  68. Error_manager init(std::string ip, int port, Thread_safe_queue<Binary_buf*>* p_communication_data_queue,
  69. bool is_reconnection_flag = true);
  70. //反初始化
  71. Error_manager uninit();
  72. //检查状态
  73. Error_manager check_status();
  74. //判断是否正常
  75. bool is_ready();
  76. //获取状态
  77. Communication_status get_status();
  78. protected:
  79. //线程接受函数,自动接受数据到 mp_data_buf, 然后触发回调函数,
  80. void thread_receive();
  81. //线程检查连接函数,进行连接, 并检查消息是否按时更新, 如果超时, 那么触发重连
  82. void thread_check_connect();
  83. // 开启连接
  84. void socket_connect();
  85. // 关闭连接
  86. void socket_close();
  87. // 重新连接
  88. void socket_reconnect();
  89. // 检查连接
  90. void socket_check();
  91. // 异步写入
  92. void client_async_write(char *buf, int len);
  93. // 异步读取
  94. void client_async_read();
  95. // 异步连接回调
  96. void handle_connect(const boost::system::error_code &error);
  97. //异步读取回调, 失败后重连. bytes_transferred是返回的数据有效长度
  98. void handle_read(const boost::system::error_code &error, size_t bytes_transferred);
  99. // 异步写回调
  100. void handle_write(const boost::system::error_code &error);
  101. protected:
  102. //状态
  103. std::atomic<Communication_status> m_communication_status; //通信状态
  104. std::atomic<bool> m_is_reconnection_flag; // 断线是否重连的标志位, true:断线自动重连, false:断线不重连, 保持断线状态.
  105. //通信
  106. //把m_io_service作为参数, 构造m_socket, 那么m_io_service.run()可以异步启动m_socket的通信
  107. boost::asio::io_service m_io_service; // 异步控制服务, 负责异步操作, run启动异步服务, stop停止异步服务
  108. boost::asio::ip::tcp::socket m_socket; // socket句柄, 负责tcp通信, 主要是connect,read,write,close
  109. //m_socket.async_connect() 传入m_ep, 可以tcp连接 指定的ip和port
  110. boost::asio::ip::tcp::endpoint m_ep; // 连接参数, 主要是ip和port
  111. //数据
  112. char m_data_buf[DATA_LENGTH_MAX]; // 通信消息接受缓存, 默认50000byte
  113. std::chrono::system_clock::time_point m_data_updata_time; // 数据更新时间
  114. Thread_safe_queue<Binary_buf*>* mp_communication_data_queue; //通信数据队列, 内存由上级管理
  115. //接受数据的线程, 自动接受数据到 mp_data_buf, 然后存入队列
  116. std::thread * mp_thread_receive; // 接受线程, 内存由本模块管理
  117. Thread_condition m_condition_receive; //接受线程的条件变量
  118. // 检查连接线程, 进行连接, 并检查消息是否按时更新, 如果超时, 那么触发重连
  119. std::thread * mp_thread_check_connect; // 检查连接线程, 内存由本模块管理
  120. Thread_condition m_condition_check_connect; //检查连接线程的条件变量
  121. int m_check_connect_wait_time_ms; // 检查连接线程 的等待时间
  122. };
  123. #endif // ASYNC_CLIENT_H
  124. /*
  125. * boost::asio::io_service使用时的注意事项:
  126. ①请让boost::asio::io_service和boost::asio::io_service::work搭配使用。
  127. ②想让event按照进入(strand)时的顺序被执行,需要boost::asio::io_service要和boost::asio::io_service::strand搭配使用。
  128. ③一般情况下,io_service的成员函数的使用顺序:
  129. boost::asio::io_service构造,
  130. boost::asio::io_service::run(),
  131. boost::asio::io_service::stop(),
  132. boost::asio::io_service::reset(),
  133. boost::asio::io_service::run(),
  134. ......
  135. boost::asio::io_service析构,
  136. ④不论有没有使用io_service::work,run()都会执行完io_service里面的event,(若没有用work,run就会退出)。
  137. ⑤一个新创建的io_service不需要执行reset()函数。
  138. ⑥在调用stop()后,在调用run()之前,请先调用reset()函数。
  139. ⑦函数stop()和reset()并不能清除掉io_service里面尚未执行的event。
  140. 我个人认为,也只有析构掉io_service,才能清空它里面的那些尚未执行的event了。(可以用智能指针)。
  141. ⑧函数stop(),stopped(),reset(),很简单,请单步调试,以明白它在函数里做了什么。
  142. */