123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180 |
- /*
- * //启动异步服务
- *
- * 必须使用如下三步
- m_io_service.reset();
- //异步事件函数
- m_io_service.run();
- //run运行之后, 代码会卡在run()这里
- //socket.async_connect() socket.async_read_some() boost::asio::async_write()等函数内部才会真正的执行,
- // 执行完成后就会自动调用里面的回调函数,
- //里面的事件函数执行完之后, 才会结束run(), 此时才会执行run()后面的代码
- //异步通信的机制就是, 如上3个函数,在被调用的时候会先跳过,
- //并执行后面的其他代码, 只有在run之后, 才会启动如上3个函数,
- //由m_io_service.run() m_io_service.stop() m_io_service.reset() m_socket.close() 来控制
- //功能有点像线程的条件变量.
- //注, run()函数只能启动前面的异步事件,
- 在run之后, 如果还有异步事件, 那么需要reset和run再次使用.
- //注注注注注意了
- //m_io_service.run(), m_io_service.stop() , m_io_service.reset()只是控制异步服务的事件函数
- //m_socket.close() 控制通信
- * */
- #ifndef ASYNC_CLIENT_H
- #define ASYNC_CLIENT_H
- #include <iostream>
- #include <stdio.h>
- #include <time.h>
- #include <boost/asio.hpp>
- #include <boost/bind.hpp>
- #include <boost/thread.hpp>
- #include <chrono>
- #include <mutex>
- #include <thread>
- #include <atomic>
- #include <string>
- #include <unistd.h>
- #include "glog/logging.h"
- #include "../error_code/error_code.h"
- #include "../tool/thread_condition.h"
- #include "../tool/binary_buf.h"
- #include "../tool/thread_safe_queue.h"
- #include "../tool/common_data.h"
- /**
- * 异步tcp通信客户端
- * */
- class Async_Client
- {
- public:
- #define DATA_LENGTH_MAX 50000 //接受消息的缓存长度50000byte, 实际通信数据不能超过它, 如果超过了, 则需要增大
- #define RECONNECTION_WAIT_TIME_MS 1000 //每隔1000ms重连一次
- #define CHECK_WAIT_TIME_MS WANJI_716_SCAN_CYCLE_MS //每隔66ms检查一次通信, 万集雷达15HZ, 必须要大于一个通信周期, 防止重连之后没有及时刷新消息就又超时断连了
- #define READ_TIMEOUT_MILLISECONDS 5000 //通信数据更新 的超时时间5000ms, 如果数据一直不更新那么就断开然后重连
- //注:隐藏bug:如果通信正常,但是服务端
- typedef void (*fundata_t)(const char *data, const int len, const void *p);
- typedef void (*Callback_function)(const char *data, const int len, const void *p);
- //通信状态
- enum Communication_status
- {//default = 0
- E_UNKNOWN = 0, //未知
- E_READY = 1, //正常待机
- E_DISCONNECT = 2, //断连
- E_FAULT =10, //故障
- };
- public:
- // 构造
- Async_Client();
- // 析构
- ~Async_Client();
- //初始化, 默认重连
- Error_manager init(std::string ip, int port, Thread_safe_queue<Binary_buf*>* p_communication_data_queue,
- bool is_reconnection_flag = true);
- //反初始化
- Error_manager uninit();
- //检查状态
- Error_manager check_status();
- //判断是否正常
- bool is_ready();
- //获取状态
- Communication_status get_status();
- protected:
- //线程接受函数,自动接受数据到 mp_data_buf, 然后触发回调函数,
- void thread_receive();
- //线程检查连接函数,进行连接, 并检查消息是否按时更新, 如果超时, 那么触发重连
- void thread_check_connect();
- // 开启连接
- void socket_connect();
- // 关闭连接
- void socket_close();
- // 重新连接
- void socket_reconnect();
- // 检查连接
- void socket_check();
- // 异步写入
- void client_async_write(char *buf, int len);
- // 异步读取
- void client_async_read();
- // 异步连接回调
- void handle_connect(const boost::system::error_code &error);
- //异步读取回调, 失败后重连. bytes_transferred是返回的数据有效长度
- void handle_read(const boost::system::error_code &error, size_t bytes_transferred);
- // 异步写回调
- void handle_write(const boost::system::error_code &error);
- protected:
- //状态
- std::atomic<Communication_status> m_communication_status; //通信状态
- std::atomic<bool> m_is_reconnection_flag; // 断线是否重连的标志位, true:断线自动重连, false:断线不重连, 保持断线状态.
- //通信
- //把m_io_service作为参数, 构造m_socket, 那么m_io_service.run()可以异步启动m_socket的通信
- boost::asio::io_service m_io_service; // 异步控制服务, 负责异步操作, run启动异步服务, stop停止异步服务
- boost::asio::ip::tcp::socket m_socket; // socket句柄, 负责tcp通信, 主要是connect,read,write,close
- //m_socket.async_connect() 传入m_ep, 可以tcp连接 指定的ip和port
- boost::asio::ip::tcp::endpoint m_ep; // 连接参数, 主要是ip和port
- //数据
- char m_data_buf[DATA_LENGTH_MAX]; // 通信消息接受缓存, 默认50000byte
- std::chrono::system_clock::time_point m_data_updata_time; // 数据更新时间
- Thread_safe_queue<Binary_buf*>* mp_communication_data_queue; //通信数据队列, 内存由上级管理
- //接受数据的线程, 自动接受数据到 mp_data_buf, 然后存入队列
- std::thread * mp_thread_receive; // 接受线程, 内存由本模块管理
- Thread_condition m_condition_receive; //接受线程的条件变量
- // 检查连接线程, 进行连接, 并检查消息是否按时更新, 如果超时, 那么触发重连
- std::thread * mp_thread_check_connect; // 检查连接线程, 内存由本模块管理
- Thread_condition m_condition_check_connect; //检查连接线程的条件变量
- int m_check_connect_wait_time_ms; // 检查连接线程 的等待时间
- };
- #endif // ASYNC_CLIENT_H
- /*
- * boost::asio::io_service使用时的注意事项:
- ①请让boost::asio::io_service和boost::asio::io_service::work搭配使用。
- ②想让event按照进入(strand)时的顺序被执行,需要boost::asio::io_service要和boost::asio::io_service::strand搭配使用。
- ③一般情况下,io_service的成员函数的使用顺序:
- boost::asio::io_service构造,
- boost::asio::io_service::run(),
- boost::asio::io_service::stop(),
- boost::asio::io_service::reset(),
- boost::asio::io_service::run(),
- ......
- boost::asio::io_service析构,
- ④不论有没有使用io_service::work,run()都会执行完io_service里面的event,(若没有用work,run就会退出)。
- ⑤一个新创建的io_service不需要执行reset()函数。
- ⑥在调用stop()后,在调用run()之前,请先调用reset()函数。
- ⑦函数stop()和reset()并不能清除掉io_service里面尚未执行的event。
- 我个人认为,也只有析构掉io_service,才能清空它里面的那些尚未执行的event了。(可以用智能指针)。
- ⑧函数stop(),stopped(),reset(),很简单,请单步调试,以明白它在函数里做了什么。
- */
|