// // Created by huli on 2022/12/30. // #include "network_base.h" #include "../tool/proto_tool.h" #include "../tool/time_tool.h" Network_base::Network_base() { m_network_status = NETWORK_STATUS_UNKNOW; mp_receive_data_thread = NULL; mp_analysis_data_thread = NULL; mp_send_data_thread = NULL; mp_encapsulate_data_thread = NULL; m_analysis_cycle_time = 1000;//默认1000ms,就自动解析(接受list) m_encapsulate_cycle_time = 1000;//默认1000ms,就自动发送一次状态信息 } Network_base::~Network_base() { network_uninit(); } //初始化 通信 模块。如下三选一 Error_manager Network_base::network_init() { LOG(INFO) << " ---Network_base::network_init() run--- "<< this; return network_init_from_protobuf(NETKORK_PARAMETER_PATH); } //初始化 通信 模块。从文件读取 Error_manager Network_base::network_init_from_protobuf(std::string prototxt_path) { Network_proto::Network_parameter_all t_network_parameter_all; if(! proto_tool::read_proto_param(prototxt_path,t_network_parameter_all) ) { return Error_manager(NETWORK_READ_PROTOBUF_ERROR,MINOR_ERROR, "network_init_from_protobuf read_proto_param failed"); } return network_init_from_protobuf(t_network_parameter_all); } //初始化 通信 模块。从protobuf读取 Error_manager Network_base::network_init_from_protobuf(Network_proto::Network_parameter_all & network_parameter_all) { LOG(INFO) << " ---Rabbitmq_base::network_init_from_protobuf() run--- "<< this; m_network_parameter_all = network_parameter_all; //根据参数创建socket连接 for (int i = 0; i < m_network_parameter_all.network_parameters().network_information_vector_size(); ++i) { //导入参数 int t_socket_id = m_network_parameter_all.network_parameters().network_information_vector(i).socket_id(); Network_socket t_network_socket; t_network_socket.m_network_information = m_network_parameter_all.network_parameters().network_information_vector(i); //tcp 客户端 if ( t_network_socket.m_network_information.network_mode() == Network_proto::TCP_CLIENT ) { //创建socket t_network_socket.m_socket_fd = socket(AF_INET, SOCK_STREAM, 0); //直接创建socket返回给Communication_tcp的成员 if (t_network_socket.m_socket_fd == -1) { printf(" create tcp socket failed "); return Error_manager(Error_code::NETWORK_CREATE_SOCKET_ERROR, Error_level::MINOR_ERROR, " fun error "); } //配置ip地址 sockaddr_in saddr; //设置连接对象的结构体 saddr.sin_family = AF_INET; saddr.sin_port = htons(t_network_socket.m_network_information.port()); saddr.sin_addr.s_addr = inet_addr(t_network_socket.m_network_information.ip().c_str()); //字符串转整型 set_block(t_network_socket.m_socket_fd, false); //将socket改成非阻塞模式,此时它会立即返回 所以通过fd_set fd_set rfds, wfds; //文件句柄数组,在这个数组中,存放当前每个文件句柄的状态 if (connect(t_network_socket.m_socket_fd, (sockaddr*)&saddr, sizeof(saddr)) != 0) //此时connect马上返回,状态为未成功连接 { FD_ZERO(&rfds); //首先把文件句柄的数组置空 FD_ZERO(&wfds); FD_SET(t_network_socket.m_socket_fd, &rfds); //把sock的网络句柄加入到该句柄数组中 FD_SET(t_network_socket.m_socket_fd, &wfds); timeval tm; //超时参数的结构体 tm.tv_sec = NETKORK_CONNECT_TIME;//默认5秒 tm.tv_usec = 0; int selres = select(t_network_socket.m_socket_fd + 1, &rfds, &wfds, NULL, &tm); //(阻塞函数)(监听的文件句柄的最大值加1,可读序列文件列表,可写的序列文件列表,错误处理,超时)使用select监听文件序列set是否有可读可写,这里监听set数组(里面只有sock),只要其中的句柄有一个变得可写(在这里是sock连接成功了以后就会变得可写,就返回),就返回 switch (selres) { case -1: printf("select error\n"); return Error_manager(Error_code::ERROR, Error_level::MINOR_ERROR, " fun error "); case 0: printf("select time out\n"); return Error_manager(Error_code::ERROR, Error_level::MINOR_ERROR, " fun error "); default: if (FD_ISSET(t_network_socket.m_socket_fd, &rfds) || FD_ISSET(t_network_socket.m_socket_fd, &wfds)) { connect(t_network_socket.m_socket_fd, (sockaddr*)&saddr, sizeof(saddr)); //再次连接一次进行确认 int err = errno; if (err == EISCONN||err == EINPROGRESS) //已经连接到该套接字 或 套接字为非阻塞套接字,且连接请求没有立即完成 { //在这里, 就表示连接正常 printf("connect finished(success).\n"); // set_block(t_network_socket.m_socket_fd,true); //成功之后重新把sock改成阻塞模式,以便后面发送/接收数据 t_network_socket.m_network_status = NETWORK_STATUS_READY; m_network_socket_map[t_socket_id] = t_network_socket; m_network_socket_map[t_socket_id].m_updata_time = std::chrono::system_clock::now(); } else { printf("connect %s : %d finished(failed). errno = %d\n",t_network_socket.m_network_information.ip().c_str(),t_network_socket.m_network_information.port(),errno); // printf("FD_ISSET(sock_fd, &rfds): %d\n FD_ISSET(sock_fd, &wfds): %d\n", FD_ISSET(sock_fd, &rfds) , FD_ISSET(sock_fd, &wfds)); return Error_manager(Error_code::ERROR, Error_level::MINOR_ERROR, " fun error "); } } else { printf("connect %s : %d finished(failed).",t_network_socket.m_network_information.ip().c_str(),t_network_socket.m_network_information.port()); return Error_manager(Error_code::ERROR, Error_level::MINOR_ERROR, " fun error "); } } } else //连接正常 { set_block(t_network_socket.m_socket_fd, true); //成功之后重新把sock改成阻塞模式,以便后面发送/接收数据 printf("connect %s : %d finished(success).\n",t_network_socket.m_network_information.ip().c_str(),t_network_socket.m_network_information.port()); } } else { t_network_socket.m_network_status = NETWORK_STATUS_FAULT; t_network_socket.m_socket_fd = 0; m_network_socket_map[t_socket_id] = t_network_socket; } } network_run(); return Error_code::SUCCESS; } //启动通信, run thread Error_manager Network_base::network_run() { m_network_status = NETWORK_STATUS_UNKNOW; //启动4个线程。 //接受线程默认循环, 内部的nn_recv进行等待, 超时1ms m_receive_condition.reset(false, false, false); mp_receive_data_thread = new std::thread(&Network_base::receive_data_thread, this); //解析线程默认等待, 需要接受线程去唤醒, 超时1ms, 超时后主动遍历m_receive_data_list m_analysis_data_condition.reset(false, false, false); mp_analysis_data_thread = new std::thread(&Network_base::analysis_data_thread, this); //发送线程默认循环, 内部的wait_and_pop进行等待, m_send_data_condition.reset(false, true, false); mp_send_data_thread = new std::thread(&Network_base::send_data_thread, this); //封装线程默认等待, ...., 超时1ms, 超时后主动 封装心跳和状态信息, m_encapsulate_data_condition.reset(false, false, false); mp_encapsulate_data_thread = new std::thread(&Network_base::encapsulate_data_thread, this); return Error_code::SUCCESS; } //反初始化 通信 模块。 Error_manager Network_base::network_uninit() { for (auto iter = m_network_socket_map.begin(); iter != m_network_socket_map.end(); ++iter) { int t_socket_fd = iter->second.m_socket_fd; if (t_socket_fd <= 0) { printf("socket %d error \n", t_socket_fd); //打印ip和端口 } else { close(t_socket_fd); } } return Error_code::SUCCESS; } //重连, 快速uninit, init Error_manager Network_base::network_reconnnect() { return Error_code::SUCCESS; } void Network_base::set_analysis_cycle_time(unsigned int analysis_cycle_time) { m_analysis_cycle_time = analysis_cycle_time; } void Network_base::set_encapsulate_cycle_time(unsigned int encapsulate_cycle_time) { m_encapsulate_cycle_time = encapsulate_cycle_time; } bool Network_base::set_block(int socket_fd, bool isblock) //设置阻塞模式 (希望只有在connect的时候是非阻塞的,而接收数据时候是阻塞的) { if (socket_fd <= 0) { printf(" set tcp socket block failed\n "); return false; } int flags = fcntl(socket_fd, F_GETFL, 0); //获取socket的属性 if (flags < 0) return false; //获取属性出错 if (isblock) { flags = flags&~O_NONBLOCK; //把非阻塞这位设为0 } else { flags = flags | O_NONBLOCK; //把非阻塞这位设为1 } if (fcntl(socket_fd, F_SETFL, flags)) return false; //把标准位设回去 // if (!isblock) // printf("set tcp socket not block success\n"); // if (isblock) // printf("set tcp socket block success\n"); return true; } int Network_base::network_recv(int socket_fd, char *buf, int size) //接收数据 { return recv(socket_fd, buf, size, 0); } int Network_base::network_send(int socket_fd, const char *buf, int size) //发送数据 { int sendedSize = 0; //已发送成功的长度 while (sendedSize != size) //若没发送完成,则从断点开始继续发送 直到完成 { try { int len = send(socket_fd, buf + sendedSize, size - sendedSize, 0); if (len <= 0) break; sendedSize += len; } catch (char *str) { std::cout << " 断线---" << str<< std::endl; break; } } return sendedSize; } bool Network_base::is_connected(int socket_fd) { struct tcp_info info; int len=sizeof(info); getsockopt(socket_fd, IPPROTO_TCP, TCP_INFO, &info, (socklen_t *)&len); if((info.tcpi_state==TCP_ESTABLISHED)) { return true; } return false; } //检查网络,如果断连, 就立刻重连, socket_fd会重新分配新的 Error_manager Network_base::check_and_reconnect(int socket_id) { int t_socket_fd = m_network_socket_map[socket_id].m_socket_fd; // if ( is_connected(t_socket_fd) == false ) if ( true ) { //关闭连接 close(t_socket_fd); //导入参数 Network_socket t_network_socket; t_network_socket.m_network_information = m_network_socket_map[socket_id].m_network_information; //tcp 客户端 if ( t_network_socket.m_network_information.network_mode() == Network_proto::TCP_CLIENT ) { //创建socket t_network_socket.m_socket_fd = socket(AF_INET, SOCK_STREAM, 0); //直接创建socket返回给Communication_tcp的成员 if (t_network_socket.m_socket_fd == -1) { printf(" create tcp socket failed "); return Error_manager(Error_code::NETWORK_CREATE_SOCKET_ERROR, Error_level::MINOR_ERROR, " fun error "); } //配置ip地址 sockaddr_in saddr; //设置连接对象的结构体 saddr.sin_family = AF_INET; saddr.sin_port = htons(t_network_socket.m_network_information.port()); saddr.sin_addr.s_addr = inet_addr(t_network_socket.m_network_information.ip().c_str()); //字符串转整型 set_block(t_network_socket.m_socket_fd, false); //将socket改成非阻塞模式,此时它会立即返回 所以通过fd_set fd_set rfds, wfds; //文件句柄数组,在这个数组中,存放当前每个文件句柄的状态 if (connect(t_network_socket.m_socket_fd, (sockaddr*)&saddr, sizeof(saddr)) != 0) //此时connect马上返回,状态为未成功连接 { FD_ZERO(&rfds); //首先把文件句柄的数组置空 FD_ZERO(&wfds); FD_SET(t_network_socket.m_socket_fd, &rfds); //把sock的网络句柄加入到该句柄数组中 FD_SET(t_network_socket.m_socket_fd, &wfds); timeval tm; //超时参数的结构体 tm.tv_sec = NETKORK_CONNECT_TIME;//默认5秒 tm.tv_usec = 0; int selres = select(t_network_socket.m_socket_fd + 1, &rfds, &wfds, NULL, &tm); //(阻塞函数)(监听的文件句柄的最大值加1,可读序列文件列表,可写的序列文件列表,错误处理,超时)使用select监听文件序列set是否有可读可写,这里监听set数组(里面只有sock),只要其中的句柄有一个变得可写(在这里是sock连接成功了以后就会变得可写,就返回),就返回 switch (selres) { case -1: printf("select error\n"); return Error_manager(Error_code::ERROR, Error_level::MINOR_ERROR, " fun error "); case 0: printf("select time out\n"); return Error_manager(Error_code::ERROR, Error_level::MINOR_ERROR, " fun error "); default: if (FD_ISSET(t_network_socket.m_socket_fd, &rfds) || FD_ISSET(t_network_socket.m_socket_fd, &wfds)) { connect(t_network_socket.m_socket_fd, (sockaddr*)&saddr, sizeof(saddr)); //再次连接一次进行确认 int err = errno; if (err == EISCONN||err == EINPROGRESS) //已经连接到该套接字 或 套接字为非阻塞套接字,且连接请求没有立即完成 { //在这里, 就表示连接正常 printf("connect finished(success).\n"); // set_block(t_network_socket.m_socket_fd,true); //成功之后重新把sock改成阻塞模式,以便后面发送/接收数据 t_network_socket.m_network_status = NETWORK_STATUS_READY; m_network_socket_map[socket_id] = t_network_socket; m_network_socket_map[socket_id].m_updata_time = std::chrono::system_clock::now(); } else { printf("connect %s : %d finished(failed). errno = %d\n",t_network_socket.m_network_information.ip().c_str(),t_network_socket.m_network_information.port(),errno); // printf("FD_ISSET(sock_fd, &rfds): %d\n FD_ISSET(sock_fd, &wfds): %d\n", FD_ISSET(sock_fd, &rfds) , FD_ISSET(sock_fd, &wfds)); return Error_manager(Error_code::ERROR, Error_level::MINOR_ERROR, " fun error "); } } else { printf("connect %s : %d finished(failed).",t_network_socket.m_network_information.ip().c_str(),t_network_socket.m_network_information.port()); return Error_manager(Error_code::ERROR, Error_level::MINOR_ERROR, " fun error "); } } } else //连接正常 { set_block(t_network_socket.m_socket_fd, true); //成功之后重新把sock改成阻塞模式,以便后面发送/接收数据 printf("connect %s : %d finished(success).\n",t_network_socket.m_network_information.ip().c_str(),t_network_socket.m_network_information.port()); } } else { t_network_socket.m_network_status = NETWORK_STATUS_FAULT; t_network_socket.m_socket_fd = 0; m_network_socket_map[socket_id] = t_network_socket; } } else { m_network_socket_map[socket_id].m_updata_time = std::chrono::system_clock::now(); return Error_code::SUCCESS; } return Error_code::SUCCESS; } //mp_receive_data_thread 接受线程执行函数, //receive_data_thread 内部线程负责接受消息 void Network_base::receive_data_thread() { LOG(INFO) << " Network_base::receive_data_thread start "<< this; //通信接受线程, 负责接受socket消息, 并存入 m_receive_data_list while (m_receive_condition.is_alive()) { m_receive_condition.wait_for_ex(std::chrono::microseconds(1)); if ( m_receive_condition.is_alive() ) { std::this_thread::yield(); for (auto iter = m_network_socket_map.begin(); iter != m_network_socket_map.end(); ++iter) { //如果超时2秒就检查重连 if ( std::chrono::system_clock::now() - iter->second.m_updata_time > std::chrono::seconds(NETKORK_RECONNECT_OVER_TIME) ) { Time_tool::get_instance_references().time_start(123); check_and_reconnect(iter->first); Time_tool::get_instance_references().time_end(123); Time_tool::get_instance_references().cout_time_microsecond(123); } int recv_len = 0; char recv_buf[NETWORK_BUFFER_SIZE] = {0}; {//这个大括号表示只对 recv 和 send 加锁, 不要因为后面的复杂逻辑影响通信效率 std::unique_lock lk(m_mutex); //接受数据. 必须非阻塞 recv_len = network_recv(iter->second.m_socket_fd, recv_buf, NETWORK_BUFFER_SIZE); // if ( iter->second.m_network_information.socket_id() == 2 && recv_len !=-1) // { // std::cout << " huli test :::: " << " ----------------------------------------------- = " << std::endl; // std::cout << " huli test :::: " << " socket_id = " << iter->second.m_network_information.socket_id() << std::endl; // std::cout << " huli test :::: " << " ip = " << iter->second.m_network_information.ip() << std::endl; // std::cout << " huli test :::: " << " recv_len = " << recv_len << std::endl; // } } if ( recv_len>0 ) { Network_message * tp_network_message = new Network_message; tp_network_message->reset(std::string(recv_buf, recv_len), iter->second.m_network_information.socket_id(), iter->second.m_network_information.ip(), iter->second.m_network_information.port()); //检查消息是否有效, 主要检查消息类型和接受者, 判断这条消息是不是给我的. if ( check_msg(tp_network_message) == SUCCESS ) { //接受信息成功, 刷新时间 iter->second.m_updata_time = std::chrono::system_clock::now(); bool is_push = m_receive_data_list.push(tp_network_message); //push成功之后, tp_communication_message内存的管理权限交给链表, 如果失败就要回收内存 if ( is_push ) { //唤醒解析线程一次, m_analysis_data_condition.notify_all(false, true); } else { // push失败, 就要回收内存 delete(tp_network_message); tp_network_message = NULL; // return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR, // " m_receive_data_list.push error "); } } else { delete(tp_network_message); tp_network_message = NULL; } } //没有接受到消息, 返回空字符串 } } } LOG(INFO) << " Network_base::receive_data_thread end "<< this; return; } //检查消息是否有效, 主要检查消息类型和接受者, 判断这条消息是不是给我的. Error_manager Network_base::check_msg(Network_message* p_msg) { return Error_code::SUCCESS; } //mp_analysis_data_thread 解析线程执行函数, //analysis_data_thread 内部线程负责解析消息 void Network_base::analysis_data_thread() { LOG(INFO) << " Network_base::analysis_data_thread start "<< this; //通信解析线程, 负责巡检m_receive_data_list, 并解析和处理消息 while (m_analysis_data_condition.is_alive()) { bool t_pass_flag = m_analysis_data_condition.wait_for_millisecond(m_analysis_cycle_time); if ( m_analysis_data_condition.is_alive() ) { std::this_thread::yield(); //如果解析线程被主动唤醒, 那么就表示 收到新的消息, 那就遍历整个链表 if ( t_pass_flag ) { analysis_receive_list(); } //如果解析线程超时通过, 那么就定时处理链表残留的消息, else { analysis_receive_list(); } } } LOG(INFO) << " Network_base::analysis_data_thread end "<< this; return; } //循环接受链表, 解析消息, Error_manager Network_base::analysis_receive_list() { Error_manager t_error; if ( m_receive_data_list.m_termination_flag ) { return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR, " Network_base::analysis_receive_list error "); } else { std::unique_lock lk(m_receive_data_list.m_mutex); for (auto iter = m_receive_data_list.m_data_list.begin(); iter != m_receive_data_list.m_data_list.end(); ) { Network_message* tp_msg = **iter; if ( tp_msg == NULL ) { iter = m_receive_data_list.m_data_list.erase(iter); //注:erase 删除当前 iter 之后返回下一个节点,当前的 iter 无效化, } else { //检查消息是否可以被处理 t_error = check_executer(tp_msg); if ( t_error == SUCCESS) { //处理消息 t_error = execute_msg(tp_msg); // if ( t_error ) // { // //执行结果不管 // } // else // { // //执行结果不管 // } delete(tp_msg); tp_msg = NULL; iter = m_receive_data_list.m_data_list.erase(iter); //注:erase 删除当前 iter 之后返回下一个节点,当前的 iter 无效化, } else if( t_error == COMMUNICATION_EXCUTER_IS_BUSY) { //处理器正忙, 那就不做处理, 直接处理下一个 //注:这条消息就被保留了下来, wait_for_millisecond 超时通过之后, 会循环检查残留的消息. iter++; } else //if( t_error == COMMUNICATION_ANALYSIS_TIME_OUT ) { //超时了就直接删除 delete(tp_msg); tp_msg = NULL; iter = m_receive_data_list.m_data_list.erase(iter); //注:erase 删除当前 iter 之后返回下一个节点,当前的 iter 无效化, //注:消息删除之后, 不需要发送答复消息, 发送方也会有超时处理的, 只有 execute_msg 里面可以答复消息 } } } } return Error_code::SUCCESS; } //检查执行者的状态, 判断能否处理这条消息, 需要子类重载 Error_manager Network_base::check_executer(Network_message* p_msg) { return Error_code::SUCCESS; } //处理消息 Error_manager Network_base::execute_msg(Network_message* p_msg) { //先将 p_msg 转化为 对应的格式, 使用对应模块的protobuf来二次解析 // 不能一直使用 Network_message* p_msg, 这个是要销毁的 //然后处理这个消息, 就是调用对应模块的 execute 接口函数 //执行结果不管, 如果需要答复, 那么对应模块 在自己内部 封装一条消息发送即可. //子类重载, 需要完全重写, 以后再写. //注注注注注意了, 本模块只是用来做通信, //在做处理消息的时候, 可能会调用执行者的接口函数, //这里不应该长时间阻塞或者处理复杂的逻辑, //请执行者另开线程来处理任务. std::cout << "Network_base::excute_msg p_buf = " << p_msg->m_message_buf << std::endl; std::cout << "Network_base::excute_msg size = " << p_msg->m_message_buf.size() << std::endl; return Error_code::SUCCESS; } //mp_send_data_thread 发送线程执行函数, //send_data_thread 内部线程负责发送消息 void Network_base::send_data_thread() { LOG(INFO) << " Network_base::send_data_thread start "<< this; //通信发送线程, 负责巡检m_send_data_list, 并发送消息 while (m_send_data_condition.is_alive()) { m_send_data_condition.wait(); if ( m_send_data_condition.is_alive() ) { std::this_thread::yield(); Network_message* tp_msg = NULL; //这里 wait_and_pop 会使用链表内部的 m_data_cond 条件变量来控制等待, //封装线程使用push的时候, 会唤醒线程并通过等待, 此时 m_send_data_condition 是一直通过的. //如果需要退出, 那么就要 m_send_data_list.termination_list(); 和 m_send_data_condition.kill_all(); bool is_pop = m_send_data_list.wait_and_pop(tp_msg); if ( is_pop ) { if ( tp_msg != NULL ) { {//这个大括号表示只对 recv 和 send 加锁, 不要因为后面的复杂逻辑影响通信效率 std::unique_lock lk(m_mutex); network_send(m_network_socket_map[tp_msg->m_socket_id].m_socket_fd, tp_msg->m_message_buf.c_str(), tp_msg->m_message_buf.size()); } delete(tp_msg); tp_msg = NULL; } } else { //没有取出, 那么应该就是 m_termination_flag 结束了 // return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR, // " Network_base::send_data_thread() error "); } } } LOG(INFO) << " Network_base::send_data_thread end "<< this; return; } //mp_encapsulate_data_thread 封装线程执行函数, //encapsulate_data_thread 内部线程负责封装消息 void Network_base::encapsulate_data_thread() { LOG(INFO) << " Network_base::encapsulate_data_thread start "<< this; //通信封装线程, 负责定时封装消息, 并存入 m_send_data_list while (m_encapsulate_data_condition.is_alive()) { bool t_pass_flag = m_encapsulate_data_condition.wait_for_millisecond(m_encapsulate_cycle_time); if ( m_encapsulate_data_condition.is_alive() ) { std::this_thread::yield(); //如果封装线程被主动唤醒, 那么就表示 需要主动发送消息, if ( t_pass_flag ) { //主动发送消息, } //如果封装线程超时通过, 那么就定时封装心跳和状态信息 else { auto_encapsulate_status(); } } } LOG(INFO) << " Network_base::encapsulate_data_thread end "<< this; return; } //定时封装发送消息, 一般为心跳和状态信息, 需要子类重载 Error_manager Network_base::auto_encapsulate_status() { return Error_code::SUCCESS; } //封装消息, 需要子类重载 Error_manager Network_base::encapsulate_msg(std::string message, int socket_id) { Network_message* tp_msg = new Network_message(message, socket_id); bool is_push = m_send_data_list.push(tp_msg); if ( is_push == false ) { delete(tp_msg); tp_msg = NULL; return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR, " Network_base::encapsulate_msg error "); } return Error_code::SUCCESS; } //封装消息, 需要子类重载 Error_manager Network_base::encapsulate_msg(Network_message* p_msg) { Network_message* tp_msg = new Network_message(*p_msg); bool is_push = m_send_data_list.push(tp_msg); if ( is_push == false ) { delete(tp_msg); tp_msg = NULL; return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR, " Network_base::encapsulate_msg error "); } return Error_code::SUCCESS; }