#include "communication_socket_base.h" #include "../tool/proto_tool.h" Communication_socket_base::Communication_socket_base() { m_communication_statu = COMMUNICATION_UNKNOW; mp_receive_data_thread = NULL; mp_analysis_data_thread = NULL; mp_send_data_thread = NULL; mp_encapsulate_data_thread = NULL; } Communication_socket_base::~Communication_socket_base() { communication_uninit(); } //初始化 通信 模块。如下三选一 Error_manager Communication_socket_base::communication_init() { LOG(INFO) << " ---Communication_socket_base::communication_init() run--- "<< this; return communication_init_from_protobuf(COMMUNICATION_PARAMETER_PATH); } //初始化 通信 模块。从文件读取 Error_manager Communication_socket_base::communication_init_from_protobuf(std::string prototxt_path) { Communication_proto::Communication_parameter_all t_communication_parameter_all; if(! proto_tool::read_proto_param(prototxt_path,t_communication_parameter_all) ) { return Error_manager(COMMUNICATION_READ_PROTOBUF_ERROR,MINOR_ERROR, "Communication_socket_base read_proto_param failed"); } return communication_init_from_protobuf(t_communication_parameter_all); } //初始化 通信 模块。从protobuf读取 Error_manager Communication_socket_base::communication_init_from_protobuf(Communication_proto::Communication_parameter_all& communication_parameter_all) { LOG(INFO) << " ---Communication_socket_base::communication_init_from_protobuf() run--- "<< this; Error_manager t_error; if ( communication_parameter_all.communication_parameters().has_bind_string() ) { t_error = communication_bind(communication_parameter_all.communication_parameters().bind_string()); if ( t_error != Error_code::SUCCESS ) { return t_error; } } std::cout << "communication_parameter_all.communication_parameters().connect_string_vector_size() " << communication_parameter_all.communication_parameters().connect_string_vector_size()<< std::endl; for(int i=0;i& connect_string_vector) { LOG(INFO) << " ---Communication_socket_base::communication_init() run--- "<< this; Error_manager t_error; t_error = communication_bind(bind_string); if ( t_error != Error_code::SUCCESS ) { return t_error; } t_error = communication_connect(connect_string_vector); if ( t_error != Error_code::SUCCESS ) { return t_error; } //启动通信, run thread communication_run(); return Error_code::SUCCESS; } //bind Error_manager Communication_socket_base::communication_bind(std::string bind_string) { Error_manager t_error; int t_socket_result; //m_socket 自己作为一个服务器, 绑定一个端口 t_socket_result = m_socket.bind(bind_string); if ( t_socket_result <0 ) { return Error_manager(Error_code::COMMUNICATION_BIND_ERROR, Error_level::MINOR_ERROR, " m_socket.bind error "); } LOG(INFO) << " ---Communication_socket_base::communication_bind() bind:: "<< bind_string << " " << this; return Error_code::SUCCESS; } //connect Error_manager Communication_socket_base::communication_connect(std::vector& connect_string_vector) { Error_manager t_error; for (auto iter = connect_string_vector.begin(); iter != connect_string_vector.end(); ++iter) { t_error = communication_connect(*iter); if ( t_error != Error_code::SUCCESS ) { return t_error; } } return Error_code::SUCCESS; } //connect Error_manager Communication_socket_base::communication_connect(std::string connect_string) { Error_manager t_error; int t_socket_result; //m_socket 和远端通信, 连接远端服务器的端口 t_socket_result = m_socket.connect(connect_string); if ( t_socket_result <0 ) { return Error_manager(Error_code::COMMUNICATION_CONNECT_ERROR, Error_level::MINOR_ERROR, " m_socket.connect error "); } LOG(INFO) << " ---Communication_socket_base::communication_connect() connect:: "<< connect_string << " " << this; return Error_code::SUCCESS; } //启动通信, run thread Error_manager Communication_socket_base::communication_run() { m_communication_statu = COMMUNICATION_READY; //启动4个线程。 //接受线程默认循环, 内部的nn_recv进行等待, 超时1ms m_receive_condition.reset(false, true, false); mp_receive_data_thread = new std::thread(&Communication_socket_base::receive_data_thread, this); //解析线程默认等待, 需要接受线程去唤醒, 超时1ms, 超时后主动遍历m_receive_data_list m_analysis_data_condition.reset(false, false, false); mp_analysis_data_thread = new std::thread(&Communication_socket_base::analysis_data_thread, this); //发送线程默认循环, 内部的wait_and_pop进行等待, m_send_data_condition.reset(false, true, false); mp_send_data_thread = new std::thread(&Communication_socket_base::send_data_thread, this); //封装线程默认等待, ...., 超时1ms, 超时后主动 封装心跳和状态信息, m_encapsulate_data_condition.reset(false, false, false); mp_encapsulate_data_thread = new std::thread(&Communication_socket_base::encapsulate_data_thread, this); return Error_code::SUCCESS; } //反初始化 通信 模块。 Error_manager Communication_socket_base::communication_uninit() { //终止list,防止 wait_and_pop 阻塞线程。 m_receive_data_list.termination_list(); m_send_data_list.termination_list(); //杀死4个线程,强制退出 if (mp_receive_data_thread) { m_receive_condition.kill_all(); } if (mp_analysis_data_thread) { m_analysis_data_condition.kill_all(); } if (mp_send_data_thread) { m_send_data_condition.kill_all(); } if (mp_encapsulate_data_thread) { m_encapsulate_data_condition.kill_all(); } //回收4个线程的资源 if (mp_receive_data_thread) { mp_receive_data_thread->join(); delete mp_receive_data_thread; mp_receive_data_thread = NULL; } if (mp_analysis_data_thread) { mp_analysis_data_thread->join(); delete mp_analysis_data_thread; mp_analysis_data_thread = 0; } if (mp_send_data_thread) { mp_send_data_thread->join(); delete mp_send_data_thread; mp_send_data_thread = NULL; } if (mp_encapsulate_data_thread) { mp_encapsulate_data_thread->join(); delete mp_encapsulate_data_thread; mp_encapsulate_data_thread = NULL; } //清空list m_receive_data_list.clear_and_delete(); m_send_data_list.clear_and_delete(); m_communication_statu = COMMUNICATION_UNKNOW; m_socket.close(); return Error_code::SUCCESS; } //mp_receive_data_thread 接受线程执行函数, //receive_data_thread 内部线程负责接受消息 void Communication_socket_base::receive_data_thread() { LOG(INFO) << " Communication_socket_base::receive_data_thread start "<< this; //通信接受线程, 负责接受socket消息, 并存入 m_receive_data_list while (m_receive_condition.is_alive()) { m_receive_condition.wait(); if ( m_receive_condition.is_alive() ) { std::this_thread::yield(); std::unique_lock lk(m_mutex); //flags为1, 非阻塞接受消息, 如果接收到消息, 那么接受数据长度大于0 nnxx::message t_msg = m_socket.recv(1); if ( t_msg.size()>0 ) { Binary_buf * tp_binary_buf = new Binary_buf( (char*)(t_msg.data()), t_msg.size() ); // std::cout << tp_binary_buf->get_buf() << std::endl; bool is_push = m_receive_data_list.push(tp_binary_buf); // if ( is_push == false ) // { // return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR, // " m_receive_data_list.push error "); // } //唤醒解析线程一次, m_analysis_data_condition.notify_all(false, true); } } } LOG(INFO) << " Communication_socket_base::receive_data_thread end "<< this; return; } //mp_analysis_data_thread 解析线程执行函数, //analysis_data_thread 内部线程负责解析消息 void Communication_socket_base::analysis_data_thread() { LOG(INFO) << " Communication_socket_base::analysis_data_thread start "<< this; //通信解析线程, 负责巡检m_receive_data_list, 并解析和处理消息 while (m_analysis_data_condition.is_alive()) { bool t_pass_flag = m_analysis_data_condition.wait_for_millisecond(1000); if ( m_analysis_data_condition.is_alive() ) { std::this_thread::yield(); //如果解析线程被主动唤醒, 那么就表示 收到新的消息, 那就遍历整个链表 if ( t_pass_flag ) { analysis_receive_list(); } //如果解析线程超时通过, 那么就定时处理链表残留的消息, else { analysis_receive_list(); } } } LOG(INFO) << " Communication_socket_base::analysis_data_thread end "<< this; return; } //循环接受链表, 解析消息, Error_manager Communication_socket_base::analysis_receive_list() { Error_manager t_error; if ( m_receive_data_list.m_termination_flag ) { return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR, " Communication_socket_base::analysis_receive_list error "); } else { std::unique_lock lk(m_receive_data_list.m_mutex); for (auto iter = m_receive_data_list.m_data_list.begin(); iter != m_receive_data_list.m_data_list.end(); ) { Binary_buf* tp_buf = **iter; if ( tp_buf == NULL ) { iter = m_receive_data_list.m_data_list.erase(iter); //注:erase 删除当前 iter 之后返回下一个节点,当前的 iter 无效化, } else { //检查消息是否可以被解析 t_error = check_msg(tp_buf); if ( t_error == SUCCESS) { //处理消息 t_error = excute_msg(tp_buf); // if ( t_error ) // { // //执行结果不管 // } // else // { // //执行结果不管 // } delete(tp_buf); tp_buf = NULL; iter = m_receive_data_list.m_data_list.erase(iter); //注:erase 删除当前 iter 之后返回下一个节点,当前的 iter 无效化, } else if( t_error == COMMUNICATION_ANALYSIS_TIME_OUT ) { //超时了就直接删除 delete(tp_buf); tp_buf = NULL; iter = m_receive_data_list.m_data_list.erase(iter); //注:erase 删除当前 iter 之后返回下一个节点,当前的 iter 无效化, } else //if( t_error == COMMUNICATION_EXCUTER_IS_BUSY) { //处理器正忙, 那就不做处理, 直接处理下一个 //注:这条消息就被保留了下来, wait_for_millisecond 超时通过之后, 会循环检查残留的消息. iter++; } } } } return Error_code::SUCCESS; } //检查消息是否可以被解析 Error_manager Communication_socket_base::check_msg(Binary_buf* p_buf) { //检查对应模块的状态, 判断是否可以处理这条消息 //同时也要判断是否超时, 超时返回 COMMUNICATION_ANALYSIS_TIME_OUT //如果处理器正在忙别的, 那么返回 COMMUNICATION_EXCUTER_IS_BUSY //...................... std::cout << "Communication_socket_base::check_msg p_buf = " << p_buf->get_buf() << std::endl; std::cout << "Communication_socket_base::check_msg size = " << p_buf->get_length() << std::endl; // return Error_code::COMMUNICATION_ANALYSIS_TIME_OUT; // return Error_code::COMMUNICATION_EXCUTER_IS_BUSY; return Error_code::SUCCESS; } //处理消息 Error_manager Communication_socket_base::excute_msg(Binary_buf* p_buf) { //先将 p_buf 转化为 对应的格式, 不能一直使用 p_buf, 和这个是要销毁的 //然后处理这个消息, 就是调用对应模块的 excute 接口函数 //执行结果不管, 如果需要答复, 那么对应模块 在自己内部 封装一条消息发送即可. std::cout << "Communication_socket_base::excute_msg p_buf = " << p_buf->get_buf() << std::endl; std::cout << "Communication_socket_base::excute_msg size = " << p_buf->get_length() << std::endl; return Error_code::SUCCESS; } //mp_send_data_thread 发送线程执行函数, //send_data_thread 内部线程负责发送消息 void Communication_socket_base::send_data_thread() { LOG(INFO) << " Communication_socket_base::send_data_thread start "<< this; //通信发送线程, 负责巡检m_send_data_list, 并发送消息 while (m_send_data_condition.is_alive()) { m_send_data_condition.wait(); if ( m_send_data_condition.is_alive() ) { std::this_thread::yield(); Binary_buf* tp_buf = NULL; //这里 wait_and_pop 会使用链表内部的 m_data_cond 条件变量来控制等待, //封装线程使用push的时候, 会唤醒线程并通过等待, 此时 m_send_data_condition 是一直通过的. //如果需要退出, 那么就要 m_send_data_list.termination_list(); 和 m_send_data_condition.kill_all(); bool is_pop = m_send_data_list.wait_and_pop(tp_buf); if ( is_pop ) { if ( tp_buf != NULL ) { std::unique_lock lk(m_mutex); m_socket.send(tp_buf->get_buf(), tp_buf->get_length(), 0); delete(tp_buf); tp_buf = NULL; } } else { //没有取出, 那么应该就是 m_termination_flag 结束了 // return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR, // " Communication_socket_base::send_data_thread() error "); } } } LOG(INFO) << " Communication_socket_base::send_data_thread end "<< this; return; } //mp_encapsulate_data_thread 封装线程执行函数, //encapsulate_data_thread 内部线程负责封装消息 void Communication_socket_base::encapsulate_data_thread() { LOG(INFO) << " Communication_socket_base::encapsulate_data_thread start "<< this; //通信封装线程, 负责定时封装消息, 并存入 m_send_data_list while (m_encapsulate_data_condition.is_alive()) { bool t_pass_flag = m_encapsulate_data_condition.wait_for_millisecond(1000); if ( m_encapsulate_data_condition.is_alive() ) { std::this_thread::yield(); //如果封装线程被主动唤醒, 那么就表示 需要主动发送消息, if ( t_pass_flag ) { //主动发送消息, } //如果封装线程超时通过, 那么就定时封装心跳和状态信息 else { encapsulate_send_data(); } } } LOG(INFO) << " Communication_socket_base::encapsulate_data_thread end "<< this; return; } //定时封装发送消息, 一般为心跳和状态信息, 需要子类重载 Error_manager Communication_socket_base::encapsulate_send_data() { char buf[256] = {0}; static unsigned int t_heartbeat = 0; sprintf(buf, "Communication_socket_base, heartbeat = %d\0\0\0", t_heartbeat); t_heartbeat++; Binary_buf* tp_buf = new Binary_buf(buf, strlen(buf)+1);//+1是为了保证发送了结束符, 方便打印 bool is_push = m_send_data_list.push(tp_buf); if ( is_push == false ) { return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR, " Communication_socket_base::encapsulate_msg error "); } return Error_code::SUCCESS; } //封装消息, 需要子类重载 Error_manager Communication_socket_base::encapsulate_msg(Binary_buf* p_buf) { Binary_buf * tp_buf = new Binary_buf(*p_buf); bool is_push = m_send_data_list.push(p_buf); if ( is_push == false ) { return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR, " Communication_socket_base::encapsulate_msg error "); } return Error_code::SUCCESS; }