|
@@ -0,0 +1,610 @@
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+#include "communication_socket_base.h"
|
|
|
+#include "../tool/proto_tool.h"
|
|
|
+
|
|
|
+Communication_socket_base::Communication_socket_base()
|
|
|
+{
|
|
|
+ m_communication_statu = COMMUNICATION_UNKNOW;
|
|
|
+
|
|
|
+ mp_receive_data_thread = NULL;
|
|
|
+ mp_analysis_data_thread = NULL;
|
|
|
+ mp_send_data_thread = NULL;
|
|
|
+ mp_encapsulate_data_thread = NULL;
|
|
|
+
|
|
|
+ m_analysis_cycle_time = 1000;//默认1000ms,就自动解析(接受list)
|
|
|
+ m_encapsulate_cycle_time = 1000;//默认1000ms,就自动发送一次状态信息
|
|
|
+}
|
|
|
+
|
|
|
+Communication_socket_base::~Communication_socket_base()
|
|
|
+{
|
|
|
+ communication_uninit();
|
|
|
+}
|
|
|
+
|
|
|
+//初始化 通信 模块。如下三选一
|
|
|
+Error_manager Communication_socket_base::communication_init()
|
|
|
+{
|
|
|
+ LOG(INFO) << " ---Communication_socket_base::communication_init() run--- "<< this;
|
|
|
+
|
|
|
+ return communication_init_from_protobuf(COMMUNICATION_PARAMETER_PATH);
|
|
|
+}
|
|
|
+
|
|
|
+//初始化 通信 模块。从文件读取
|
|
|
+Error_manager Communication_socket_base::communication_init_from_protobuf(std::string prototxt_path)
|
|
|
+{
|
|
|
+ Communication_proto::Communication_parameter_all t_communication_parameter_all;
|
|
|
+ if(! proto_tool::read_proto_param(prototxt_path,t_communication_parameter_all) )
|
|
|
+ {
|
|
|
+ return Error_manager(COMMUNICATION_READ_PROTOBUF_ERROR,MINOR_ERROR,
|
|
|
+ "Communication_socket_base read_proto_param failed");
|
|
|
+ }
|
|
|
+
|
|
|
+ return communication_init_from_protobuf(t_communication_parameter_all);
|
|
|
+}
|
|
|
+
|
|
|
+//初始化 通信 模块。从protobuf读取
|
|
|
+Error_manager Communication_socket_base::communication_init_from_protobuf(Communication_proto::Communication_parameter_all& communication_parameter_all)
|
|
|
+{
|
|
|
+ LOG(INFO) << " ---Communication_socket_base::communication_init_from_protobuf() run--- "<< this;
|
|
|
+ Error_manager t_error;
|
|
|
+
|
|
|
+ if ( communication_parameter_all.communication_parameters().has_bind_string() )
|
|
|
+ {
|
|
|
+ t_error = communication_bind(communication_parameter_all.communication_parameters().bind_string());
|
|
|
+ if ( t_error != Error_code::SUCCESS )
|
|
|
+ {
|
|
|
+ return t_error;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ std::cout << "communication_parameter_all.communication_parameters().connect_string_vector_size() " <<
|
|
|
+ communication_parameter_all.communication_parameters().connect_string_vector_size()<< std::endl;
|
|
|
+ for(int i=0;i<communication_parameter_all.communication_parameters().connect_string_vector_size();++i)
|
|
|
+ {
|
|
|
+ t_error = communication_connect( communication_parameter_all.communication_parameters().connect_string_vector(i) );
|
|
|
+ if ( t_error != Error_code::SUCCESS )
|
|
|
+ {
|
|
|
+ return t_error;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ //启动通信, run thread
|
|
|
+ communication_run();
|
|
|
+
|
|
|
+ return Error_code::SUCCESS;
|
|
|
+}
|
|
|
+
|
|
|
+//初始化
|
|
|
+Error_manager Communication_socket_base::communication_init(std::string bind_string, std::vector<std::string>& connect_string_vector)
|
|
|
+{
|
|
|
+ LOG(INFO) << " ---Communication_socket_base::communication_init() run--- "<< this;
|
|
|
+ Error_manager t_error;
|
|
|
+
|
|
|
+ t_error = communication_bind(bind_string);
|
|
|
+ if ( t_error != Error_code::SUCCESS )
|
|
|
+ {
|
|
|
+ return t_error;
|
|
|
+ }
|
|
|
+
|
|
|
+ t_error = communication_connect(connect_string_vector);
|
|
|
+ if ( t_error != Error_code::SUCCESS )
|
|
|
+ {
|
|
|
+ return t_error;
|
|
|
+ }
|
|
|
+
|
|
|
+ //启动通信, run thread
|
|
|
+ communication_run();
|
|
|
+
|
|
|
+ return Error_code::SUCCESS;
|
|
|
+}
|
|
|
+//bind
|
|
|
+Error_manager Communication_socket_base::communication_bind(std::string bind_string)
|
|
|
+{
|
|
|
+ Error_manager t_error;
|
|
|
+ int t_socket_result;
|
|
|
+
|
|
|
+ //m_socket 自己作为一个服务器, 绑定一个端口
|
|
|
+ t_socket_result = m_socket.bind(bind_string);
|
|
|
+ if ( t_socket_result <0 )
|
|
|
+ {
|
|
|
+ return Error_manager(Error_code::COMMUNICATION_BIND_ERROR, Error_level::MINOR_ERROR,
|
|
|
+ " m_socket.bind error ");
|
|
|
+ }
|
|
|
+ LOG(INFO) << " ---Communication_socket_base::communication_bind() bind:: "<< bind_string << " " << this;
|
|
|
+
|
|
|
+ return Error_code::SUCCESS;
|
|
|
+}
|
|
|
+//connect
|
|
|
+Error_manager Communication_socket_base::communication_connect(std::vector<std::string>& connect_string_vector)
|
|
|
+{
|
|
|
+ Error_manager t_error;
|
|
|
+ for (auto iter = connect_string_vector.begin(); iter != connect_string_vector.end(); ++iter)
|
|
|
+ {
|
|
|
+ t_error = communication_connect(*iter);
|
|
|
+ if ( t_error != Error_code::SUCCESS )
|
|
|
+ {
|
|
|
+ return t_error;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return Error_code::SUCCESS;
|
|
|
+}
|
|
|
+//connect
|
|
|
+Error_manager Communication_socket_base::communication_connect(std::string connect_string)
|
|
|
+{
|
|
|
+ Error_manager t_error;
|
|
|
+ int t_socket_result;
|
|
|
+ //m_socket 和远端通信, 连接远端服务器的端口
|
|
|
+ t_socket_result = m_socket.connect(connect_string);
|
|
|
+ if ( t_socket_result <0 )
|
|
|
+ {
|
|
|
+ return Error_manager(Error_code::COMMUNICATION_CONNECT_ERROR, Error_level::MINOR_ERROR,
|
|
|
+ " m_socket.connect error ");
|
|
|
+ }
|
|
|
+ LOG(INFO) << " ---Communication_socket_base::communication_connect() connect:: "<< connect_string << " " << this;
|
|
|
+
|
|
|
+ return Error_code::SUCCESS;
|
|
|
+}
|
|
|
+//启动通信, run thread
|
|
|
+Error_manager Communication_socket_base::communication_run()
|
|
|
+{
|
|
|
+ m_communication_statu = COMMUNICATION_READY;
|
|
|
+ //启动4个线程。
|
|
|
+ //接受线程默认循环, 内部的nn_recv进行等待, 超时1ms
|
|
|
+ m_receive_condition.reset(false, false, false);
|
|
|
+ mp_receive_data_thread = new std::thread(&Communication_socket_base::receive_data_thread, this);
|
|
|
+ //解析线程默认等待, 需要接受线程去唤醒, 超时1ms, 超时后主动遍历m_receive_data_list
|
|
|
+ m_analysis_data_condition.reset(false, false, false);
|
|
|
+ mp_analysis_data_thread = new std::thread(&Communication_socket_base::analysis_data_thread, this);
|
|
|
+ //发送线程默认循环, 内部的wait_and_pop进行等待,
|
|
|
+ m_send_data_condition.reset(false, true, false);
|
|
|
+ mp_send_data_thread = new std::thread(&Communication_socket_base::send_data_thread, this);
|
|
|
+ //封装线程默认等待, ...., 超时1ms, 超时后主动 封装心跳和状态信息,
|
|
|
+ m_encapsulate_data_condition.reset(false, false, false);
|
|
|
+ mp_encapsulate_data_thread = new std::thread(&Communication_socket_base::encapsulate_data_thread, this);
|
|
|
+
|
|
|
+ return Error_code::SUCCESS;
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+//反初始化 通信 模块。
|
|
|
+Error_manager Communication_socket_base::communication_uninit()
|
|
|
+{
|
|
|
+ //终止list,防止 wait_and_pop 阻塞线程。
|
|
|
+ m_receive_data_list.termination_list();
|
|
|
+ m_send_data_list.termination_list();
|
|
|
+
|
|
|
+ //杀死4个线程,强制退出
|
|
|
+ if (mp_receive_data_thread)
|
|
|
+ {
|
|
|
+ m_receive_condition.kill_all();
|
|
|
+ }
|
|
|
+ if (mp_analysis_data_thread)
|
|
|
+ {
|
|
|
+ m_analysis_data_condition.kill_all();
|
|
|
+ }
|
|
|
+ if (mp_send_data_thread)
|
|
|
+ {
|
|
|
+ m_send_data_condition.kill_all();
|
|
|
+ }
|
|
|
+ if (mp_encapsulate_data_thread)
|
|
|
+ {
|
|
|
+ m_encapsulate_data_condition.kill_all();
|
|
|
+ }
|
|
|
+ //回收4个线程的资源
|
|
|
+ if (mp_receive_data_thread)
|
|
|
+ {
|
|
|
+ mp_receive_data_thread->join();
|
|
|
+ delete mp_receive_data_thread;
|
|
|
+ mp_receive_data_thread = NULL;
|
|
|
+ }
|
|
|
+ if (mp_analysis_data_thread)
|
|
|
+ {
|
|
|
+ mp_analysis_data_thread->join();
|
|
|
+ delete mp_analysis_data_thread;
|
|
|
+ mp_analysis_data_thread = 0;
|
|
|
+ }
|
|
|
+ if (mp_send_data_thread)
|
|
|
+ {
|
|
|
+ mp_send_data_thread->join();
|
|
|
+ delete mp_send_data_thread;
|
|
|
+ mp_send_data_thread = NULL;
|
|
|
+ }
|
|
|
+ if (mp_encapsulate_data_thread)
|
|
|
+ {
|
|
|
+ mp_encapsulate_data_thread->join();
|
|
|
+ delete mp_encapsulate_data_thread;
|
|
|
+ mp_encapsulate_data_thread = NULL;
|
|
|
+ }
|
|
|
+
|
|
|
+ //清空list
|
|
|
+ m_receive_data_list.clear_and_delete();
|
|
|
+ m_send_data_list.clear_and_delete();
|
|
|
+
|
|
|
+ m_communication_statu = COMMUNICATION_UNKNOW;
|
|
|
+ m_socket.close();
|
|
|
+
|
|
|
+ return Error_code::SUCCESS;
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+void Communication_socket_base::set_analysis_cycle_time(unsigned int analysis_cycle_time)
|
|
|
+{
|
|
|
+ m_analysis_cycle_time = analysis_cycle_time;
|
|
|
+}
|
|
|
+void Communication_socket_base::set_encapsulate_cycle_time(unsigned int encapsulate_cycle_time)
|
|
|
+{
|
|
|
+ m_encapsulate_cycle_time = encapsulate_cycle_time;
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+//mp_receive_data_thread 接受线程执行函数,
|
|
|
+//receive_data_thread 内部线程负责接受消息
|
|
|
+void Communication_socket_base::receive_data_thread()
|
|
|
+{
|
|
|
+ LOG(INFO) << " Communication_socket_base::receive_data_thread start "<< this;
|
|
|
+
|
|
|
+ //通信接受线程, 负责接受socket消息, 并存入 m_receive_data_list
|
|
|
+ while (m_receive_condition.is_alive())
|
|
|
+ {
|
|
|
+ m_receive_condition.wait_for_ex(std::chrono::microseconds(1));
|
|
|
+ if ( m_receive_condition.is_alive() )
|
|
|
+ {
|
|
|
+ std::this_thread::yield();
|
|
|
+
|
|
|
+ std::string t_receive_string;
|
|
|
+ {//这个大括号表示只对 recv 和 send 加锁, 不要因为后面的复杂逻辑影响通信效率
|
|
|
+ std::unique_lock<std::mutex> lk(m_mutex);
|
|
|
+ //flags为1, 非阻塞接受消息, 如果接收到消息, 那么接受数据长度大于0
|
|
|
+ t_receive_string = m_socket.recv<std::string>(1);
|
|
|
+ }
|
|
|
+ if ( t_receive_string.size()>0 )
|
|
|
+ {
|
|
|
+ //如果这里接受到了消息, 在这提前解析消息最前面的Base_msg (消息公共内容), 用于后续的check
|
|
|
+ message::Base_msg t_base_msg;
|
|
|
+ if( t_base_msg.ParseFromString(t_receive_string) )
|
|
|
+ {
|
|
|
+ //第一次解析之后转化为, Communication_message, 自定义的通信消息格式
|
|
|
+ Communication_message * tp_communication_message = new Communication_message;
|
|
|
+ tp_communication_message->reset(t_base_msg.base_info(), t_receive_string);
|
|
|
+ //检查消息是否有效, 主要检查消息类型和接受者, 判断这条消息是不是给我的.
|
|
|
+ if ( check_msg(tp_communication_message) == SUCCESS )
|
|
|
+ {
|
|
|
+ bool is_push = m_receive_data_list.push(tp_communication_message);
|
|
|
+ //push成功之后, tp_communication_message内存的管理权限交给链表, 如果失败就要回收内存
|
|
|
+ if ( is_push )
|
|
|
+ {
|
|
|
+ //唤醒解析线程一次,
|
|
|
+ m_analysis_data_condition.notify_all(false, true);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+// push失败, 就要回收内存
|
|
|
+ delete(tp_communication_message);
|
|
|
+ tp_communication_message = NULL;
|
|
|
+// return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR,
|
|
|
+// " m_receive_data_list.push error ");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ delete(tp_communication_message);
|
|
|
+ tp_communication_message = NULL;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ }
|
|
|
+ //解析失败, 就当做什么也没发生, 认为接收消息无效,
|
|
|
+ }
|
|
|
+ //没有接受到消息, 返回空字符串
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ LOG(INFO) << " Communication_socket_base::receive_data_thread end "<< this;
|
|
|
+ return;
|
|
|
+}
|
|
|
+
|
|
|
+//检查消息是否有效, 主要检查消息类型和接受者, 判断这条消息是不是给我的.
|
|
|
+Error_manager Communication_socket_base::check_msg(Communication_message* p_msg)
|
|
|
+{
|
|
|
+ //通过 p_msg->get_message_type() 和 p_msg->get_receiver() 判断这条消息是不是给我的.
|
|
|
+ //子类重载时, 增加自己模块的判断逻辑, 以后再写.
|
|
|
+ if ( p_msg->get_message_type() == Communication_message::Message_type::eBase_msg
|
|
|
+ && p_msg->get_receiver() == Communication_message::Communicator::eMain )
|
|
|
+ {
|
|
|
+ return Error_code::SUCCESS;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ //无效的消息,
|
|
|
+ return Error_manager(Error_code::INVALID_MESSAGE, Error_level::NEGLIGIBLE_ERROR,
|
|
|
+ " INVALID_MESSAGE error "); }
|
|
|
+}
|
|
|
+
|
|
|
+//mp_analysis_data_thread 解析线程执行函数,
|
|
|
+//analysis_data_thread 内部线程负责解析消息
|
|
|
+void Communication_socket_base::analysis_data_thread()
|
|
|
+{
|
|
|
+ LOG(INFO) << " Communication_socket_base::analysis_data_thread start "<< this;
|
|
|
+
|
|
|
+ //通信解析线程, 负责巡检m_receive_data_list, 并解析和处理消息
|
|
|
+ while (m_analysis_data_condition.is_alive())
|
|
|
+ {
|
|
|
+ bool t_pass_flag = m_analysis_data_condition.wait_for_millisecond(m_analysis_cycle_time);
|
|
|
+ if ( m_analysis_data_condition.is_alive() )
|
|
|
+ {
|
|
|
+ std::this_thread::yield();
|
|
|
+ //如果解析线程被主动唤醒, 那么就表示 收到新的消息, 那就遍历整个链表
|
|
|
+ if ( t_pass_flag )
|
|
|
+ {
|
|
|
+ analysis_receive_list();
|
|
|
+ }
|
|
|
+ //如果解析线程超时通过, 那么就定时处理链表残留的消息,
|
|
|
+ else
|
|
|
+ {
|
|
|
+ analysis_receive_list();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ LOG(INFO) << " Communication_socket_base::analysis_data_thread end "<< this;
|
|
|
+ return;
|
|
|
+}
|
|
|
+
|
|
|
+//循环接受链表, 解析消息,
|
|
|
+Error_manager Communication_socket_base::analysis_receive_list()
|
|
|
+{
|
|
|
+ Error_manager t_error;
|
|
|
+ if ( m_receive_data_list.m_termination_flag )
|
|
|
+ {
|
|
|
+ return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR,
|
|
|
+ " Communication_socket_base::analysis_receive_list error ");
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ std::unique_lock<std::mutex> lk(m_receive_data_list.m_mutex);
|
|
|
+ for (auto iter = m_receive_data_list.m_data_list.begin(); iter != m_receive_data_list.m_data_list.end(); )
|
|
|
+ {
|
|
|
+ Communication_message* tp_msg = **iter;
|
|
|
+ if ( tp_msg == NULL )
|
|
|
+ {
|
|
|
+ iter = m_receive_data_list.m_data_list.erase(iter);
|
|
|
+ //注:erase 删除当前 iter 之后返回下一个节点,当前的 iter 无效化,
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ //检查消息是否可以被处理
|
|
|
+ t_error = check_executer(tp_msg);
|
|
|
+ if ( t_error == SUCCESS)
|
|
|
+ {
|
|
|
+ //处理消息
|
|
|
+ t_error = execute_msg(tp_msg);
|
|
|
+// if ( t_error )
|
|
|
+// {
|
|
|
+// //执行结果不管
|
|
|
+// }
|
|
|
+// else
|
|
|
+// {
|
|
|
+// //执行结果不管
|
|
|
+// }
|
|
|
+ delete(tp_msg);
|
|
|
+ tp_msg = NULL;
|
|
|
+ iter = m_receive_data_list.m_data_list.erase(iter);
|
|
|
+ //注:erase 删除当前 iter 之后返回下一个节点,当前的 iter 无效化,
|
|
|
+ }
|
|
|
+ else if( t_error == COMMUNICATION_EXCUTER_IS_BUSY)
|
|
|
+ {
|
|
|
+ //处理器正忙, 那就不做处理, 直接处理下一个
|
|
|
+ //注:这条消息就被保留了下来, wait_for_millisecond 超时通过之后, 会循环检查残留的消息.
|
|
|
+ iter++;
|
|
|
+ }
|
|
|
+ else //if( t_error == COMMUNICATION_ANALYSIS_TIME_OUT )
|
|
|
+ {
|
|
|
+ //超时了就直接删除
|
|
|
+ delete(tp_msg);
|
|
|
+ tp_msg = NULL;
|
|
|
+ iter = m_receive_data_list.m_data_list.erase(iter);
|
|
|
+ //注:erase 删除当前 iter 之后返回下一个节点,当前的 iter 无效化,
|
|
|
+
|
|
|
+ //注:消息删除之后, 不需要发送答复消息, 发送方也会有超时处理的, 只有 execute_msg 里面可以答复消息
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return Error_code::SUCCESS;
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+//检查执行者的状态, 判断能否处理这条消息, 需要子类重载
|
|
|
+Error_manager Communication_socket_base::check_executer(Communication_message* p_msg)
|
|
|
+{
|
|
|
+ //检查对应模块的状态, 判断是否可以处理这条消息
|
|
|
+ //同时也要判断是否超时, 超时返回 COMMUNICATION_ANALYSIS_TIME_OUT
|
|
|
+ //如果处理器正在忙别的, 那么返回 COMMUNICATION_EXCUTER_IS_BUSY
|
|
|
+
|
|
|
+ if ( p_msg->is_over_time() )
|
|
|
+ {
|
|
|
+ std::cout << "Communication_socket_base::check_msg p_buf = " << p_msg->get_message_buf() << std::endl;
|
|
|
+ std::cout << "Communication_socket_base::check_msg size = " << p_msg->get_message_buf().size() << std::endl;
|
|
|
+ std::cout << "COMMUNICATION_ANALYSIS_TIME_OUT , " << std::endl;
|
|
|
+ return Error_code::COMMUNICATION_ANALYSIS_TIME_OUT;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ bool executer_is_ready = false;
|
|
|
+ //通过 p_msg->get_message_type() 和 p_msg->get_receiver() 找到处理模块的实例对象, 查询执行人是否可以处理这条消息
|
|
|
+ //这里子类重载时, 增加判断逻辑, 以后再写.
|
|
|
+ executer_is_ready = true;
|
|
|
+
|
|
|
+ std::cout << "Communication_socket_base::check_msg p_buf = " << p_msg->get_message_buf() << std::endl;
|
|
|
+ std::cout << "Communication_socket_base::check_msg size = " << p_msg->get_message_buf().size() << std::endl;
|
|
|
+
|
|
|
+ if ( executer_is_ready )
|
|
|
+ {
|
|
|
+ std::cout << "executer_is_ready , " << std::endl;
|
|
|
+ return Error_code::SUCCESS;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ std::cout << "executer_is_busy , " << std::endl;
|
|
|
+ return Error_code::COMMUNICATION_EXCUTER_IS_BUSY;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return Error_code::SUCCESS;
|
|
|
+}
|
|
|
+
|
|
|
+//处理消息
|
|
|
+Error_manager Communication_socket_base::execute_msg(Communication_message* p_msg)
|
|
|
+{
|
|
|
+ //先将 p_msg 转化为 对应的格式, 使用对应模块的protobuf来二次解析
|
|
|
+ // 不能一直使用 Communication_message* p_msg, 这个是要销毁的
|
|
|
+ //然后处理这个消息, 就是调用对应模块的 execute 接口函数
|
|
|
+ //执行结果不管, 如果需要答复, 那么对应模块 在自己内部 封装一条消息发送即可.
|
|
|
+ //子类重载, 需要完全重写, 以后再写.
|
|
|
+
|
|
|
+ //注注注注注意了, 本模块只是用来做通信,
|
|
|
+ //在做处理消息的时候, 可能会调用执行者的接口函数,
|
|
|
+ //这里不应该长时间阻塞或者处理复杂的逻辑,
|
|
|
+ //请执行者另开线程来处理任务.
|
|
|
+
|
|
|
+ std::cout << "Communication_socket_base::excute_msg p_buf = " << p_msg->get_message_buf() << std::endl;
|
|
|
+ std::cout << "Communication_socket_base::excute_msg size = " << p_msg->get_message_buf().size() << std::endl;
|
|
|
+ return Error_code::SUCCESS;
|
|
|
+}
|
|
|
+
|
|
|
+//mp_send_data_thread 发送线程执行函数,
|
|
|
+//send_data_thread 内部线程负责发送消息
|
|
|
+void Communication_socket_base::send_data_thread()
|
|
|
+{
|
|
|
+ LOG(INFO) << " Communication_socket_base::send_data_thread start "<< this;
|
|
|
+
|
|
|
+ //通信发送线程, 负责巡检m_send_data_list, 并发送消息
|
|
|
+ while (m_send_data_condition.is_alive())
|
|
|
+ {
|
|
|
+ m_send_data_condition.wait();
|
|
|
+ if ( m_send_data_condition.is_alive() )
|
|
|
+ {
|
|
|
+ std::this_thread::yield();
|
|
|
+
|
|
|
+ Communication_message* tp_msg = NULL;
|
|
|
+ //这里 wait_and_pop 会使用链表内部的 m_data_cond 条件变量来控制等待,
|
|
|
+ //封装线程使用push的时候, 会唤醒线程并通过等待, 此时 m_send_data_condition 是一直通过的.
|
|
|
+ //如果需要退出, 那么就要 m_send_data_list.termination_list(); 和 m_send_data_condition.kill_all();
|
|
|
+ bool is_pop = m_send_data_list.wait_and_pop(tp_msg);
|
|
|
+ if ( is_pop )
|
|
|
+ {
|
|
|
+ if ( tp_msg != NULL )
|
|
|
+ {
|
|
|
+ {//这个大括号表示只对 recv 和 send 加锁, 不要因为后面的复杂逻辑影响通信效率
|
|
|
+ std::unique_lock<std::mutex> lk(m_mutex);
|
|
|
+ m_socket.send(tp_msg->get_message_buf());
|
|
|
+ }
|
|
|
+ delete(tp_msg);
|
|
|
+ tp_msg = NULL;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ //没有取出, 那么应该就是 m_termination_flag 结束了
|
|
|
+// return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR,
|
|
|
+// " Communication_socket_base::send_data_thread() error ");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ LOG(INFO) << " Communication_socket_base::send_data_thread end "<< this;
|
|
|
+ return;
|
|
|
+}
|
|
|
+
|
|
|
+//mp_encapsulate_data_thread 封装线程执行函数,
|
|
|
+//encapsulate_data_thread 内部线程负责封装消息
|
|
|
+void Communication_socket_base::encapsulate_data_thread()
|
|
|
+{
|
|
|
+ LOG(INFO) << " Communication_socket_base::encapsulate_data_thread start "<< this;
|
|
|
+
|
|
|
+ //通信封装线程, 负责定时封装消息, 并存入 m_send_data_list
|
|
|
+ while (m_encapsulate_data_condition.is_alive())
|
|
|
+ {
|
|
|
+ bool t_pass_flag = m_encapsulate_data_condition.wait_for_millisecond(m_encapsulate_cycle_time);
|
|
|
+
|
|
|
+ if ( m_encapsulate_data_condition.is_alive() )
|
|
|
+ {
|
|
|
+ std::this_thread::yield();
|
|
|
+ //如果封装线程被主动唤醒, 那么就表示 需要主动发送消息,
|
|
|
+ if ( t_pass_flag )
|
|
|
+ {
|
|
|
+ //主动发送消息,
|
|
|
+ }
|
|
|
+ //如果封装线程超时通过, 那么就定时封装心跳和状态信息
|
|
|
+ else
|
|
|
+ {
|
|
|
+ encapsulate_send_data();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ LOG(INFO) << " Communication_socket_base::encapsulate_data_thread end "<< this;
|
|
|
+ return;
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+//定时封装发送消息, 一般为心跳和状态信息, 需要子类重载
|
|
|
+Error_manager Communication_socket_base::encapsulate_send_data()
|
|
|
+{
|
|
|
+// char buf[256] = {0};
|
|
|
+// static unsigned int t_heartbeat = 0;
|
|
|
+// sprintf(buf, "Communication_socket_base, heartbeat = %d\0\0\0, test\0", t_heartbeat);
|
|
|
+// t_heartbeat++;
|
|
|
+ return SUCCESS;
|
|
|
+ message::Base_msg t_base_msg;
|
|
|
+ t_base_msg.mutable_base_info()->set_msg_type(message::Message_type::eBase_msg);
|
|
|
+ t_base_msg.mutable_base_info()->set_timeout_ms(5000);
|
|
|
+ t_base_msg.mutable_base_info()->set_sender(message::Communicator::eMain);
|
|
|
+ t_base_msg.mutable_base_info()->set_receiver(message::Communicator::eMain);
|
|
|
+
|
|
|
+ Communication_message* tp_msg = new Communication_message(t_base_msg.SerializeAsString());
|
|
|
+ bool is_push = m_send_data_list.push(tp_msg);
|
|
|
+ if ( is_push == false )
|
|
|
+ {
|
|
|
+ delete(tp_msg);
|
|
|
+ tp_msg = NULL;
|
|
|
+ return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR,
|
|
|
+ " Communication_socket_base::encapsulate_msg error ");
|
|
|
+ }
|
|
|
+ return Error_code::SUCCESS;
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+//封装消息, 需要子类重载
|
|
|
+Error_manager Communication_socket_base::encapsulate_msg(std::string message)
|
|
|
+{
|
|
|
+ Communication_message* tp_msg = new Communication_message(message);
|
|
|
+ bool is_push = m_send_data_list.push(tp_msg);
|
|
|
+ if ( is_push == false )
|
|
|
+ {
|
|
|
+ delete(tp_msg);
|
|
|
+ tp_msg = NULL;
|
|
|
+ return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR,
|
|
|
+ " Communication_socket_base::encapsulate_msg error ");
|
|
|
+ }
|
|
|
+ return Error_code::SUCCESS;
|
|
|
+}
|
|
|
+
|
|
|
+//封装消息, 需要子类重载
|
|
|
+Error_manager Communication_socket_base::encapsulate_msg(Communication_message* p_msg)
|
|
|
+{
|
|
|
+ Communication_message* tp_msg = new Communication_message(*p_msg);
|
|
|
+ bool is_push = m_send_data_list.push(tp_msg);
|
|
|
+ if ( is_push == false )
|
|
|
+ {
|
|
|
+ delete(tp_msg);
|
|
|
+ tp_msg = NULL;
|
|
|
+ return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR,
|
|
|
+ " Communication_socket_base::encapsulate_msg error ");
|
|
|
+ }
|
|
|
+ return Error_code::SUCCESS;
|
|
|
+}
|