/* * //启动异步服务 * * 必须使用如下三步 m_io_service.reset(); //异步事件函数 m_io_service.run(); //run运行之后, 代码会卡在run()这里 //socket.async_connect() socket.async_read_some() boost::asio::async_write()等函数内部才会真正的执行, // 执行完成后就会自动调用里面的回调函数, //里面的事件函数执行完之后, 才会结束run(), 此时才会执行run()后面的代码 //异步通信的机制就是, 如上3个函数,在被调用的时候会先跳过, //并执行后面的其他代码, 只有在run之后, 才会启动如上3个函数, //由m_io_service.run() m_io_service.stop() m_io_service.reset() m_socket.close() 来控制 //功能有点像线程的条件变量. //注, run()函数只能启动前面的异步事件, 在run之后, 如果还有异步事件, 那么需要reset和run再次使用. //注注注注注意了 //m_io_service.run(), m_io_service.stop() , m_io_service.reset()只是控制异步服务的事件函数 //m_socket.close() 控制通信 * */ #ifndef ASYNC_CLIENT_H #define ASYNC_CLIENT_H #include #include #include #include #include #include #include #include #include #include #include #include #include "glog/logging.h" #include "../error_code/error_code.h" #include "../tool/thread_condition.h" #include "../tool/binary_buf.h" #include "../tool/thread_safe_queue.h" #include "../tool/common_data.h" /** * 异步tcp通信客户端 * */ class Async_Client { public: #define DATA_LENGTH_MAX 50000 //接受消息的缓存长度50000byte, 实际通信数据不能超过它, 如果超过了, 则需要增大 #define RECONNECTION_WAIT_TIME_MS 1000 //每隔1000ms重连一次 #define CHECK_WAIT_TIME_MS WANJI_716_SCAN_CYCLE_MS //每隔66ms检查一次通信, 万集雷达15HZ, 必须要大于一个通信周期, 防止重连之后没有及时刷新消息就又超时断连了 #define READ_TIMEOUT_MILLISECONDS 5000 //通信数据更新 的超时时间5000ms, 如果数据一直不更新那么就断开然后重连 //注:隐藏bug:如果通信正常,但是服务端 typedef void (*fundata_t)(const char *data, const int len, const void *p); typedef void (*Callback_function)(const char *data, const int len, const void *p); //通信状态 enum Communication_status {//default = 0 E_UNKNOWN = 0, //未知 E_READY = 1, //正常待机 E_DISCONNECT = 2, //断连 E_FAULT =10, //故障 }; public: // 构造 Async_Client(); // 析构 ~Async_Client(); //初始化, 默认重连 Error_manager init(std::string ip, int port, Thread_safe_queue* p_communication_data_queue, bool is_reconnection_flag = true); //反初始化 Error_manager uninit(); //检查状态 Error_manager check_status(); //判断是否正常 bool is_ready(); //获取状态 Communication_status get_status(); protected: //线程接受函数,自动接受数据到 mp_data_buf, 然后触发回调函数, void thread_receive(); //线程检查连接函数,进行连接, 并检查消息是否按时更新, 如果超时, 那么触发重连 void thread_check_connect(); // 开启连接 void socket_connect(); // 关闭连接 void socket_close(); // 重新连接 void socket_reconnect(); // 检查连接 void socket_check(); // 异步写入 void client_async_write(char *buf, int len); // 异步读取 void client_async_read(); // 异步连接回调 void handle_connect(const boost::system::error_code &error); //异步读取回调, 失败后重连. bytes_transferred是返回的数据有效长度 void handle_read(const boost::system::error_code &error, size_t bytes_transferred); // 异步写回调 void handle_write(const boost::system::error_code &error); protected: //状态 std::atomic m_communication_status; //通信状态 std::atomic m_is_reconnection_flag; // 断线是否重连的标志位, true:断线自动重连, false:断线不重连, 保持断线状态. //通信 //把m_io_service作为参数, 构造m_socket, 那么m_io_service.run()可以异步启动m_socket的通信 boost::asio::io_service m_io_service; // 异步控制服务, 负责异步操作, run启动异步服务, stop停止异步服务 boost::asio::ip::tcp::socket m_socket; // socket句柄, 负责tcp通信, 主要是connect,read,write,close //m_socket.async_connect() 传入m_ep, 可以tcp连接 指定的ip和port boost::asio::ip::tcp::endpoint m_ep; // 连接参数, 主要是ip和port //数据 char m_data_buf[DATA_LENGTH_MAX]; // 通信消息接受缓存, 默认50000byte std::chrono::system_clock::time_point m_data_updata_time; // 数据更新时间 Thread_safe_queue* mp_communication_data_queue; //通信数据队列, 内存由上级管理 //接受数据的线程, 自动接受数据到 mp_data_buf, 然后存入队列 std::thread * mp_thread_receive; // 接受线程, 内存由本模块管理 Thread_condition m_condition_receive; //接受线程的条件变量 // 检查连接线程, 进行连接, 并检查消息是否按时更新, 如果超时, 那么触发重连 std::thread * mp_thread_check_connect; // 检查连接线程, 内存由本模块管理 Thread_condition m_condition_check_connect; //检查连接线程的条件变量 int m_check_connect_wait_time_ms; // 检查连接线程 的等待时间 }; #endif // ASYNC_CLIENT_H /* * boost::asio::io_service使用时的注意事项: ①请让boost::asio::io_service和boost::asio::io_service::work搭配使用。 ②想让event按照进入(strand)时的顺序被执行,需要boost::asio::io_service要和boost::asio::io_service::strand搭配使用。 ③一般情况下,io_service的成员函数的使用顺序: boost::asio::io_service构造, boost::asio::io_service::run(), boost::asio::io_service::stop(), boost::asio::io_service::reset(), boost::asio::io_service::run(), ...... boost::asio::io_service析构, ④不论有没有使用io_service::work,run()都会执行完io_service里面的event,(若没有用work,run就会退出)。 ⑤一个新创建的io_service不需要执行reset()函数。 ⑥在调用stop()后,在调用run()之前,请先调用reset()函数。 ⑦函数stop()和reset()并不能清除掉io_service里面尚未执行的event。 我个人认为,也只有析构掉io_service,才能清空它里面的那些尚未执行的event了。(可以用智能指针)。 ⑧函数stop(),stopped(),reset(),很简单,请单步调试,以明白它在函数里做了什么。 */