123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493 |
- #include "communication_socket_base.h"
- #include "../tool/proto_tool.h"
- Communication_socket_base::Communication_socket_base()
- {
- m_communication_statu = COMMUNICATION_UNKNOW;
- mp_receive_data_thread = NULL;
- mp_analysis_data_thread = NULL;
- mp_send_data_thread = NULL;
- mp_encapsulate_data_thread = NULL;
- }
- Communication_socket_base::~Communication_socket_base()
- {
- communication_uninit();
- }
- //初始化 通信 模块。如下三选一
- Error_manager Communication_socket_base::communication_init()
- {
- LOG(INFO) << " ---Communication_socket_base::communication_init() run--- "<< this;
- return communication_init_from_protobuf(COMMUNICATION_PARAMETER_PATH);
- }
- //初始化 通信 模块。从文件读取
- Error_manager Communication_socket_base::communication_init_from_protobuf(std::string prototxt_path)
- {
- Communication_proto::Communication_parameter_all t_communication_parameter_all;
- if(! proto_tool::read_proto_param(prototxt_path,t_communication_parameter_all) )
- {
- return Error_manager(COMMUNICATION_READ_PROTOBUF_ERROR,MINOR_ERROR,
- "Communication_socket_base read_proto_param failed");
- }
- return communication_init_from_protobuf(t_communication_parameter_all);
- }
- //初始化 通信 模块。从protobuf读取
- Error_manager Communication_socket_base::communication_init_from_protobuf(Communication_proto::Communication_parameter_all& communication_parameter_all)
- {
- LOG(INFO) << " ---Communication_socket_base::communication_init_from_protobuf() run--- "<< this;
- Error_manager t_error;
- if ( communication_parameter_all.communication_parameters().has_bind_string() )
- {
- t_error = communication_bind(communication_parameter_all.communication_parameters().bind_string());
- if ( t_error != Error_code::SUCCESS )
- {
- return t_error;
- }
- }
- std::cout << "communication_parameter_all.communication_parameters().connect_string_vector_size() " <<
- communication_parameter_all.communication_parameters().connect_string_vector_size()<< std::endl;
- for(int i=0;i<communication_parameter_all.communication_parameters().connect_string_vector_size();++i)
- {
- t_error = communication_connect( communication_parameter_all.communication_parameters().connect_string_vector(i) );
- if ( t_error != Error_code::SUCCESS )
- {
- return t_error;
- }
- }
- //启动通信, run thread
- communication_run();
- return Error_code::SUCCESS;
- }
- //初始化
- Error_manager Communication_socket_base::communication_init(std::string bind_string, std::vector<std::string>& connect_string_vector)
- {
- LOG(INFO) << " ---Communication_socket_base::communication_init() run--- "<< this;
- Error_manager t_error;
- t_error = communication_bind(bind_string);
- if ( t_error != Error_code::SUCCESS )
- {
- return t_error;
- }
- t_error = communication_connect(connect_string_vector);
- if ( t_error != Error_code::SUCCESS )
- {
- return t_error;
- }
- //启动通信, run thread
- communication_run();
- return Error_code::SUCCESS;
- }
- //bind
- Error_manager Communication_socket_base::communication_bind(std::string bind_string)
- {
- Error_manager t_error;
- int t_socket_result;
- //m_socket 自己作为一个服务器, 绑定一个端口
- t_socket_result = m_socket.bind(bind_string);
- if ( t_socket_result <0 )
- {
- return Error_manager(Error_code::COMMUNICATION_BIND_ERROR, Error_level::MINOR_ERROR,
- " m_socket.bind error ");
- }
- LOG(INFO) << " ---Communication_socket_base::communication_bind() bind:: "<< bind_string << " " << this;
- return Error_code::SUCCESS;
- }
- //connect
- Error_manager Communication_socket_base::communication_connect(std::vector<std::string>& connect_string_vector)
- {
- Error_manager t_error;
- for (auto iter = connect_string_vector.begin(); iter != connect_string_vector.end(); ++iter)
- {
- t_error = communication_connect(*iter);
- if ( t_error != Error_code::SUCCESS )
- {
- return t_error;
- }
- }
- return Error_code::SUCCESS;
- }
- //connect
- Error_manager Communication_socket_base::communication_connect(std::string connect_string)
- {
- Error_manager t_error;
- int t_socket_result;
- //m_socket 和远端通信, 连接远端服务器的端口
- t_socket_result = m_socket.connect(connect_string);
- if ( t_socket_result <0 )
- {
- return Error_manager(Error_code::COMMUNICATION_CONNECT_ERROR, Error_level::MINOR_ERROR,
- " m_socket.connect error ");
- }
- LOG(INFO) << " ---Communication_socket_base::communication_connect() connect:: "<< connect_string << " " << this;
- return Error_code::SUCCESS;
- }
- //启动通信, run thread
- Error_manager Communication_socket_base::communication_run()
- {
- m_communication_statu = COMMUNICATION_READY;
- //启动4个线程。
- //接受线程默认循环, 内部的nn_recv进行等待, 超时1ms
- m_receive_condition.reset(false, true, false);
- mp_receive_data_thread = new std::thread(&Communication_socket_base::receive_data_thread, this);
- //解析线程默认等待, 需要接受线程去唤醒, 超时1ms, 超时后主动遍历m_receive_data_list
- m_analysis_data_condition.reset(false, false, false);
- mp_analysis_data_thread = new std::thread(&Communication_socket_base::analysis_data_thread, this);
- //发送线程默认循环, 内部的wait_and_pop进行等待,
- m_send_data_condition.reset(false, true, false);
- mp_send_data_thread = new std::thread(&Communication_socket_base::send_data_thread, this);
- //封装线程默认等待, ...., 超时1ms, 超时后主动 封装心跳和状态信息,
- m_encapsulate_data_condition.reset(false, false, false);
- mp_encapsulate_data_thread = new std::thread(&Communication_socket_base::encapsulate_data_thread, this);
- return Error_code::SUCCESS;
- }
- //反初始化 通信 模块。
- Error_manager Communication_socket_base::communication_uninit()
- {
- //终止list,防止 wait_and_pop 阻塞线程。
- m_receive_data_list.termination_list();
- m_send_data_list.termination_list();
- //杀死4个线程,强制退出
- if (mp_receive_data_thread)
- {
- m_receive_condition.kill_all();
- }
- if (mp_analysis_data_thread)
- {
- m_analysis_data_condition.kill_all();
- }
- if (mp_send_data_thread)
- {
- m_send_data_condition.kill_all();
- }
- if (mp_encapsulate_data_thread)
- {
- m_encapsulate_data_condition.kill_all();
- }
- //回收4个线程的资源
- if (mp_receive_data_thread)
- {
- mp_receive_data_thread->join();
- delete mp_receive_data_thread;
- mp_receive_data_thread = NULL;
- }
- if (mp_analysis_data_thread)
- {
- mp_analysis_data_thread->join();
- delete mp_analysis_data_thread;
- mp_analysis_data_thread = 0;
- }
- if (mp_send_data_thread)
- {
- mp_send_data_thread->join();
- delete mp_send_data_thread;
- mp_send_data_thread = NULL;
- }
- if (mp_encapsulate_data_thread)
- {
- mp_encapsulate_data_thread->join();
- delete mp_encapsulate_data_thread;
- mp_encapsulate_data_thread = NULL;
- }
- //清空list
- m_receive_data_list.clear_and_delete();
- m_send_data_list.clear_and_delete();
- m_communication_statu = COMMUNICATION_UNKNOW;
- m_socket.close();
- return Error_code::SUCCESS;
- }
- //mp_receive_data_thread 接受线程执行函数,
- //receive_data_thread 内部线程负责接受消息
- void Communication_socket_base::receive_data_thread()
- {
- LOG(INFO) << " Communication_socket_base::receive_data_thread start "<< this;
- //通信接受线程, 负责接受socket消息, 并存入 m_receive_data_list
- while (m_receive_condition.is_alive())
- {
- m_receive_condition.wait();
- if ( m_receive_condition.is_alive() )
- {
- std::this_thread::yield();
- std::unique_lock<std::mutex> lk(m_mutex);
- //flags为1, 非阻塞接受消息, 如果接收到消息, 那么接受数据长度大于0
- nnxx::message t_msg = m_socket.recv(1);
- if ( t_msg.size()>0 )
- {
- Binary_buf * tp_binary_buf = new Binary_buf( (char*)(t_msg.data()), t_msg.size() );
- // std::cout << tp_binary_buf->get_buf() << std::endl;
- bool is_push = m_receive_data_list.push(tp_binary_buf);
- // if ( is_push == false )
- // {
- // return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR,
- // " m_receive_data_list.push error ");
- // }
- //唤醒解析线程一次,
- m_analysis_data_condition.notify_all(false, true);
- }
- }
- }
- LOG(INFO) << " Communication_socket_base::receive_data_thread end "<< this;
- return;
- }
- //mp_analysis_data_thread 解析线程执行函数,
- //analysis_data_thread 内部线程负责解析消息
- void Communication_socket_base::analysis_data_thread()
- {
- LOG(INFO) << " Communication_socket_base::analysis_data_thread start "<< this;
- //通信解析线程, 负责巡检m_receive_data_list, 并解析和处理消息
- while (m_analysis_data_condition.is_alive())
- {
- bool t_pass_flag = m_analysis_data_condition.wait_for_millisecond(1000);
- if ( m_analysis_data_condition.is_alive() )
- {
- std::this_thread::yield();
- //如果解析线程被主动唤醒, 那么就表示 收到新的消息, 那就遍历整个链表
- if ( t_pass_flag )
- {
- analysis_receive_list();
- }
- //如果解析线程超时通过, 那么就定时处理链表残留的消息,
- else
- {
- analysis_receive_list();
- }
- }
- }
- LOG(INFO) << " Communication_socket_base::analysis_data_thread end "<< this;
- return;
- }
- //循环接受链表, 解析消息,
- Error_manager Communication_socket_base::analysis_receive_list()
- {
- Error_manager t_error;
- if ( m_receive_data_list.m_termination_flag )
- {
- return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR,
- " Communication_socket_base::analysis_receive_list error ");
- }
- else
- {
- std::unique_lock<std::mutex> lk(m_receive_data_list.m_mutex);
- for (auto iter = m_receive_data_list.m_data_list.begin(); iter != m_receive_data_list.m_data_list.end(); )
- {
- Binary_buf* tp_buf = **iter;
- if ( tp_buf == NULL )
- {
- iter = m_receive_data_list.m_data_list.erase(iter);
- //注:erase 删除当前 iter 之后返回下一个节点,当前的 iter 无效化,
- }
- else
- {
- //检查消息是否可以被解析
- t_error = check_msg(tp_buf);
- if ( t_error == SUCCESS)
- {
- //处理消息
- t_error = excute_msg(tp_buf);
- // if ( t_error )
- // {
- // //执行结果不管
- // }
- // else
- // {
- // //执行结果不管
- // }
- delete(tp_buf);
- tp_buf = NULL;
- iter = m_receive_data_list.m_data_list.erase(iter);
- //注:erase 删除当前 iter 之后返回下一个节点,当前的 iter 无效化,
- }
- else if( t_error == COMMUNICATION_ANALYSIS_TIME_OUT )
- {
- //超时了就直接删除
- delete(tp_buf);
- tp_buf = NULL;
- iter = m_receive_data_list.m_data_list.erase(iter);
- //注:erase 删除当前 iter 之后返回下一个节点,当前的 iter 无效化,
- }
- else //if( t_error == COMMUNICATION_EXCUTER_IS_BUSY)
- {
- //处理器正忙, 那就不做处理, 直接处理下一个
- //注:这条消息就被保留了下来, wait_for_millisecond 超时通过之后, 会循环检查残留的消息.
- iter++;
- }
- }
- }
- }
- return Error_code::SUCCESS;
- }
- //检查消息是否可以被解析
- Error_manager Communication_socket_base::check_msg(Binary_buf* p_buf)
- {
- //检查对应模块的状态, 判断是否可以处理这条消息
- //同时也要判断是否超时, 超时返回 COMMUNICATION_ANALYSIS_TIME_OUT
- //如果处理器正在忙别的, 那么返回 COMMUNICATION_EXCUTER_IS_BUSY
- //......................
- std::cout << "Communication_socket_base::check_msg p_buf = " << p_buf->get_buf() << std::endl;
- std::cout << "Communication_socket_base::check_msg size = " << p_buf->get_length() << std::endl;
- // return Error_code::COMMUNICATION_ANALYSIS_TIME_OUT;
- // return Error_code::COMMUNICATION_EXCUTER_IS_BUSY;
- return Error_code::SUCCESS;
- }
- //处理消息
- Error_manager Communication_socket_base::excute_msg(Binary_buf* p_buf)
- {
- //先将 p_buf 转化为 对应的格式, 不能一直使用 p_buf, 和这个是要销毁的
- //然后处理这个消息, 就是调用对应模块的 excute 接口函数
- //执行结果不管, 如果需要答复, 那么对应模块 在自己内部 封装一条消息发送即可.
- std::cout << "Communication_socket_base::excute_msg p_buf = " << p_buf->get_buf() << std::endl;
- std::cout << "Communication_socket_base::excute_msg size = " << p_buf->get_length() << std::endl;
- return Error_code::SUCCESS;
- }
- //mp_send_data_thread 发送线程执行函数,
- //send_data_thread 内部线程负责发送消息
- void Communication_socket_base::send_data_thread()
- {
- LOG(INFO) << " Communication_socket_base::send_data_thread start "<< this;
- //通信发送线程, 负责巡检m_send_data_list, 并发送消息
- while (m_send_data_condition.is_alive())
- {
- m_send_data_condition.wait();
- if ( m_send_data_condition.is_alive() )
- {
- std::this_thread::yield();
- Binary_buf* tp_buf = NULL;
- //这里 wait_and_pop 会使用链表内部的 m_data_cond 条件变量来控制等待,
- //封装线程使用push的时候, 会唤醒线程并通过等待, 此时 m_send_data_condition 是一直通过的.
- //如果需要退出, 那么就要 m_send_data_list.termination_list(); 和 m_send_data_condition.kill_all();
- bool is_pop = m_send_data_list.wait_and_pop(tp_buf);
- if ( is_pop )
- {
- if ( tp_buf != NULL )
- {
- std::unique_lock<std::mutex> lk(m_mutex);
- m_socket.send(tp_buf->get_buf(), tp_buf->get_length(), 0);
- delete(tp_buf);
- tp_buf = NULL;
- }
- }
- else
- {
- //没有取出, 那么应该就是 m_termination_flag 结束了
- // return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR,
- // " Communication_socket_base::send_data_thread() error ");
- }
- }
- }
- LOG(INFO) << " Communication_socket_base::send_data_thread end "<< this;
- return;
- }
- //mp_encapsulate_data_thread 封装线程执行函数,
- //encapsulate_data_thread 内部线程负责封装消息
- void Communication_socket_base::encapsulate_data_thread()
- {
- LOG(INFO) << " Communication_socket_base::encapsulate_data_thread start "<< this;
- //通信封装线程, 负责定时封装消息, 并存入 m_send_data_list
- while (m_encapsulate_data_condition.is_alive())
- {
- bool t_pass_flag = m_encapsulate_data_condition.wait_for_millisecond(1000);
- if ( m_encapsulate_data_condition.is_alive() )
- {
- std::this_thread::yield();
- //如果封装线程被主动唤醒, 那么就表示 需要主动发送消息,
- if ( t_pass_flag )
- {
- //主动发送消息,
- }
- //如果封装线程超时通过, 那么就定时封装心跳和状态信息
- else
- {
- encapsulate_send_data();
- }
- }
- }
- LOG(INFO) << " Communication_socket_base::encapsulate_data_thread end "<< this;
- return;
- }
- //定时封装发送消息, 一般为心跳和状态信息, 需要子类重载
- Error_manager Communication_socket_base::encapsulate_send_data()
- {
- char buf[256] = {0};
- static unsigned int t_heartbeat = 0;
- sprintf(buf, "Communication_socket_base, heartbeat = %d\0\0\0", t_heartbeat);
- t_heartbeat++;
- Binary_buf* tp_buf = new Binary_buf(buf, strlen(buf)+1);//+1是为了保证发送了结束符, 方便打印
- bool is_push = m_send_data_list.push(tp_buf);
- if ( is_push == false )
- {
- return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR,
- " Communication_socket_base::encapsulate_msg error ");
- }
- return Error_code::SUCCESS;
- }
- //封装消息, 需要子类重载
- Error_manager Communication_socket_base::encapsulate_msg(Binary_buf* p_buf)
- {
- Binary_buf * tp_buf = new Binary_buf(*p_buf);
- bool is_push = m_send_data_list.push(p_buf);
- if ( is_push == false )
- {
- return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR,
- " Communication_socket_base::encapsulate_msg error ");
- }
- return Error_code::SUCCESS;
- }
|