communication_socket_base.cpp 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607
  1. #include "communication_socket_base.h"
  2. #include "../tool/proto_tool.h"
  3. Communication_socket_base::Communication_socket_base()
  4. {
  5. m_communication_statu = COMMUNICATION_UNKNOW;
  6. mp_receive_data_thread = NULL;
  7. mp_analysis_data_thread = NULL;
  8. mp_send_data_thread = NULL;
  9. mp_encapsulate_data_thread = NULL;
  10. }
  11. Communication_socket_base::~Communication_socket_base()
  12. {
  13. communication_uninit();
  14. }
  15. //初始化 通信 模块。如下三选一
  16. Error_manager Communication_socket_base::communication_init()
  17. {
  18. LOG(INFO) << " ---Communication_socket_base::communication_init() run--- "<< this;
  19. return communication_init_from_protobuf(COMMUNICATION_PARAMETER_PATH);
  20. }
  21. //初始化 通信 模块。从文件读取
  22. Error_manager Communication_socket_base::communication_init_from_protobuf(std::string prototxt_path)
  23. {
  24. Communication_proto::Communication_parameter_all t_communication_parameter_all;
  25. if(! proto_tool::read_proto_param(prototxt_path,t_communication_parameter_all) )
  26. {
  27. return Error_manager(COMMUNICATION_READ_PROTOBUF_ERROR,MINOR_ERROR,
  28. "Communication_socket_base read_proto_param failed");
  29. }
  30. return communication_init_from_protobuf(t_communication_parameter_all);
  31. }
  32. //初始化 通信 模块。从protobuf读取
  33. Error_manager Communication_socket_base::communication_init_from_protobuf(Communication_proto::Communication_parameter_all& communication_parameter_all)
  34. {
  35. LOG(INFO) << " ---Communication_socket_base::communication_init_from_protobuf() run--- "<< this;
  36. Error_manager t_error;
  37. if ( communication_parameter_all.communication_parameters().has_bind_string() )
  38. {
  39. t_error = communication_bind(communication_parameter_all.communication_parameters().bind_string());
  40. if ( t_error != Error_code::SUCCESS )
  41. {
  42. return t_error;
  43. }
  44. }
  45. std::cout << "communication_parameter_all.communication_parameters().connect_string_vector_size() " <<
  46. communication_parameter_all.communication_parameters().connect_string_vector_size()<< std::endl;
  47. for(int i=0;i<communication_parameter_all.communication_parameters().connect_string_vector_size();++i)
  48. {
  49. t_error = communication_connect( communication_parameter_all.communication_parameters().connect_string_vector(i) );
  50. if ( t_error != Error_code::SUCCESS )
  51. {
  52. return t_error;
  53. }
  54. }
  55. //启动通信, run thread
  56. communication_run();
  57. return Error_code::SUCCESS;
  58. }
  59. //初始化
  60. Error_manager Communication_socket_base::communication_init(std::string bind_string, std::vector<std::string>& connect_string_vector)
  61. {
  62. LOG(INFO) << " ---Communication_socket_base::communication_init() run--- "<< this;
  63. Error_manager t_error;
  64. t_error = communication_bind(bind_string);
  65. if ( t_error != Error_code::SUCCESS )
  66. {
  67. return t_error;
  68. }
  69. t_error = communication_connect(connect_string_vector);
  70. if ( t_error != Error_code::SUCCESS )
  71. {
  72. return t_error;
  73. }
  74. //启动通信, run thread
  75. communication_run();
  76. return Error_code::SUCCESS;
  77. }
  78. //bind
  79. Error_manager Communication_socket_base::communication_bind(std::string bind_string)
  80. {
  81. Error_manager t_error;
  82. int t_socket_result;
  83. //m_socket 自己作为一个服务器, 绑定一个端口
  84. t_socket_result = m_socket.bind(bind_string);
  85. if ( t_socket_result <0 )
  86. {
  87. return Error_manager(Error_code::COMMUNICATION_BIND_ERROR, Error_level::MINOR_ERROR,
  88. " m_socket.bind error ");
  89. }
  90. LOG(INFO) << " ---Communication_socket_base::communication_bind() bind:: "<< bind_string << " " << this;
  91. return Error_code::SUCCESS;
  92. }
  93. //connect
  94. Error_manager Communication_socket_base::communication_connect(std::vector<std::string>& connect_string_vector)
  95. {
  96. Error_manager t_error;
  97. for (auto iter = connect_string_vector.begin(); iter != connect_string_vector.end(); ++iter)
  98. {
  99. t_error = communication_connect(*iter);
  100. if ( t_error != Error_code::SUCCESS )
  101. {
  102. return t_error;
  103. }
  104. }
  105. return Error_code::SUCCESS;
  106. }
  107. //connect
  108. Error_manager Communication_socket_base::communication_connect(std::string connect_string)
  109. {
  110. Error_manager t_error;
  111. int t_socket_result;
  112. //m_socket 和远端通信, 连接远端服务器的端口
  113. t_socket_result = m_socket.connect(connect_string);
  114. if ( t_socket_result <0 )
  115. {
  116. return Error_manager(Error_code::COMMUNICATION_CONNECT_ERROR, Error_level::MINOR_ERROR,
  117. " m_socket.connect error ");
  118. }
  119. LOG(INFO) << " ---Communication_socket_base::communication_connect() connect:: "<< connect_string << " " << this;
  120. return Error_code::SUCCESS;
  121. }
  122. //启动通信, run thread
  123. Error_manager Communication_socket_base::communication_run()
  124. {
  125. m_communication_statu = COMMUNICATION_READY;
  126. //启动4个线程。
  127. //接受线程默认循环, 内部的nn_recv进行等待, 超时1ms
  128. m_receive_condition.reset(false, true, false);
  129. mp_receive_data_thread = new std::thread(&Communication_socket_base::receive_data_thread, this);
  130. //解析线程默认等待, 需要接受线程去唤醒, 超时1ms, 超时后主动遍历m_receive_data_list
  131. m_analysis_data_condition.reset(false, false, false);
  132. mp_analysis_data_thread = new std::thread(&Communication_socket_base::analysis_data_thread, this);
  133. //发送线程默认循环, 内部的wait_and_pop进行等待,
  134. m_send_data_condition.reset(false, true, false);
  135. mp_send_data_thread = new std::thread(&Communication_socket_base::send_data_thread, this);
  136. //封装线程默认等待, ...., 超时1ms, 超时后主动 封装心跳和状态信息,
  137. m_encapsulate_data_condition.reset(false, false, false);
  138. mp_encapsulate_data_thread = new std::thread(&Communication_socket_base::encapsulate_data_thread, this);
  139. return Error_code::SUCCESS;
  140. }
  141. //反初始化 通信 模块。
  142. Error_manager Communication_socket_base::communication_uninit()
  143. {
  144. //终止list,防止 wait_and_pop 阻塞线程。
  145. m_receive_data_list.termination_list();
  146. m_send_data_list.termination_list();
  147. //杀死4个线程,强制退出
  148. if (mp_receive_data_thread)
  149. {
  150. m_receive_condition.kill_all();
  151. }
  152. if (mp_analysis_data_thread)
  153. {
  154. m_analysis_data_condition.kill_all();
  155. }
  156. if (mp_send_data_thread)
  157. {
  158. m_send_data_condition.kill_all();
  159. }
  160. if (mp_encapsulate_data_thread)
  161. {
  162. m_encapsulate_data_condition.kill_all();
  163. }
  164. //回收4个线程的资源
  165. if (mp_receive_data_thread)
  166. {
  167. mp_receive_data_thread->join();
  168. delete mp_receive_data_thread;
  169. mp_receive_data_thread = NULL;
  170. }
  171. if (mp_analysis_data_thread)
  172. {
  173. mp_analysis_data_thread->join();
  174. delete mp_analysis_data_thread;
  175. mp_analysis_data_thread = 0;
  176. }
  177. if (mp_send_data_thread)
  178. {
  179. mp_send_data_thread->join();
  180. delete mp_send_data_thread;
  181. mp_send_data_thread = NULL;
  182. }
  183. if (mp_encapsulate_data_thread)
  184. {
  185. mp_encapsulate_data_thread->join();
  186. delete mp_encapsulate_data_thread;
  187. mp_encapsulate_data_thread = NULL;
  188. }
  189. //清空list
  190. m_receive_data_list.clear_and_delete();
  191. m_send_data_list.clear_and_delete();
  192. m_communication_statu = COMMUNICATION_UNKNOW;
  193. m_socket.close();
  194. return Error_code::SUCCESS;
  195. }
  196. //mp_receive_data_thread 接受线程执行函数,
  197. //receive_data_thread 内部线程负责接受消息
  198. void Communication_socket_base::receive_data_thread(Communication_socket_base* communicator)
  199. {
  200. LOG(INFO) << " Communication_socket_base::receive_data_thread start "<< communicator;
  201. //通信接受线程, 负责接受socket消息, 并存入 m_receive_data_list
  202. while (communicator->m_receive_condition.is_alive())
  203. {
  204. communicator->m_receive_condition.wait();
  205. if ( communicator->m_receive_condition.is_alive() )
  206. {
  207. std::this_thread::yield();
  208. std::unique_lock<std::mutex> lk(communicator->m_mutex);
  209. //flags为1, 非阻塞接受消息, 如果接收到消息, 那么接受数据长度大于0
  210. std::string t_receive_string = communicator->m_socket.recv<std::string>(1);
  211. if ( t_receive_string.length()>0 )
  212. {
  213. //如果这里接受到了消息, 在这提前解析消息最前面的Base_msg (消息公共内容), 用于后续的check
  214. std::cout<<" recieve length "<<t_receive_string.length()<<std::endl;
  215. message::Base_msg t_base_msg;
  216. if( t_base_msg.ParseFromString(t_receive_string) )
  217. {
  218. std::cout<<" TYPE ==== "<<t_base_msg.base_info().msg_type()<<std::endl;
  219. message::Measure_response_msg response;
  220. if(response.ParseFromString(t_receive_string))
  221. std::cout<<response.DebugString()<<std::endl;
  222. else
  223. std::cout<<" response EROR************************"<<std::endl;
  224. //第一次解析之后转化为, Communication_message, 自定义的通信消息格式
  225. Communication_message * tp_communication_message = new Communication_message;
  226. tp_communication_message->reset(t_base_msg.base_info(), t_receive_string);
  227. //检查消息是否有效, 主要检查消息类型和接受者, 判断这条消息是不是给我的.
  228. if ( communicator->check_msg(tp_communication_message) == SUCCESS )
  229. {
  230. bool is_push = communicator->m_receive_data_list.push(tp_communication_message);
  231. //push成功之后, tp_communication_message内存的管理权限交给链表, 如果失败就要回收内存
  232. if ( is_push )
  233. {
  234. //唤醒解析线程一次,
  235. communicator->m_analysis_data_condition.notify_all(false, true);
  236. }
  237. else
  238. {
  239. // push失败, 就要回收内存
  240. delete(tp_communication_message);
  241. tp_communication_message = NULL;
  242. // return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR,
  243. // " m_receive_data_list.push error ");
  244. }
  245. }
  246. else
  247. {
  248. delete(tp_communication_message);
  249. tp_communication_message = NULL;
  250. }
  251. }
  252. //解析失败, 就当做什么也没发生, 认为接收消息无效,
  253. }
  254. //没有接受到消息, 返回空字符串
  255. }
  256. }
  257. LOG(INFO) << " Communication_socket_base::receive_data_thread end "<< communicator;
  258. return;
  259. }
  260. //检查消息是否有效, 主要检查消息类型和接受者, 判断这条消息是不是给我的.
  261. Error_manager Communication_socket_base::check_msg(Communication_message* p_msg)
  262. {
  263. //通过 p_msg->get_message_type() 和 p_msg->get_receiver() 判断这条消息是不是给我的.
  264. //子类重载时, 增加自己模块的判断逻辑, 以后再写.
  265. if ( p_msg->get_message_type() == Communication_message::Message_type::eBase_msg
  266. && p_msg->get_receiver() == Communication_message::Communicator::eMain )
  267. {
  268. return Error_code::SUCCESS;
  269. }
  270. else
  271. {
  272. //认为接受人
  273. return Error_code::INVALID_MESSAGE;
  274. }
  275. }
  276. //mp_analysis_data_thread 解析线程执行函数,
  277. //analysis_data_thread 内部线程负责解析消息
  278. void Communication_socket_base::analysis_data_thread()
  279. {
  280. LOG(INFO) << " Communication_socket_base::analysis_data_thread start "<< this;
  281. //通信解析线程, 负责巡检m_receive_data_list, 并解析和处理消息
  282. while (m_analysis_data_condition.is_alive())
  283. {
  284. bool t_pass_flag = m_analysis_data_condition.wait_for_millisecond(1000);
  285. if ( m_analysis_data_condition.is_alive() )
  286. {
  287. std::this_thread::yield();
  288. //如果解析线程被主动唤醒, 那么就表示 收到新的消息, 那就遍历整个链表
  289. if ( t_pass_flag )
  290. {
  291. analysis_receive_list();
  292. }
  293. //如果解析线程超时通过, 那么就定时处理链表残留的消息,
  294. else
  295. {
  296. analysis_receive_list();
  297. }
  298. }
  299. }
  300. LOG(INFO) << " Communication_socket_base::analysis_data_thread end "<< this;
  301. return;
  302. }
  303. //循环接受链表, 解析消息,
  304. Error_manager Communication_socket_base::analysis_receive_list()
  305. {
  306. Error_manager t_error;
  307. if ( m_receive_data_list.m_termination_flag )
  308. {
  309. return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR,
  310. " Communication_socket_base::analysis_receive_list error ");
  311. }
  312. else
  313. {
  314. std::unique_lock<std::mutex> lk(m_receive_data_list.m_mutex);
  315. for (auto iter = m_receive_data_list.m_data_list.begin(); iter != m_receive_data_list.m_data_list.end(); )
  316. {
  317. Communication_message* tp_msg = **iter;
  318. if ( tp_msg == NULL )
  319. {
  320. iter = m_receive_data_list.m_data_list.erase(iter);
  321. //注:erase 删除当前 iter 之后返回下一个节点,当前的 iter 无效化,
  322. }
  323. else
  324. {
  325. //检查消息是否可以被处理
  326. t_error = check_executer(tp_msg);
  327. if ( t_error == SUCCESS)
  328. {
  329. //处理消息
  330. t_error = execute_msg(tp_msg);
  331. // if ( t_error )
  332. // {
  333. // //执行结果不管
  334. // }
  335. // else
  336. // {
  337. // //执行结果不管
  338. // }
  339. delete(tp_msg);
  340. tp_msg = NULL;
  341. iter = m_receive_data_list.m_data_list.erase(iter);
  342. //注:erase 删除当前 iter 之后返回下一个节点,当前的 iter 无效化,
  343. }
  344. else if( t_error == COMMUNICATION_EXCUTER_IS_BUSY)
  345. {
  346. //处理器正忙, 那就不做处理, 直接处理下一个
  347. //注:这条消息就被保留了下来, wait_for_millisecond 超时通过之后, 会循环检查残留的消息.
  348. iter++;
  349. }
  350. else //if( t_error == COMMUNICATION_ANALYSIS_TIME_OUT )
  351. {
  352. //超时了就直接删除
  353. delete(tp_msg);
  354. tp_msg = NULL;
  355. iter = m_receive_data_list.m_data_list.erase(iter);
  356. //注:erase 删除当前 iter 之后返回下一个节点,当前的 iter 无效化,
  357. //注:消息删除之后, 不需要发送答复消息, 发送方也会有超时处理的, 只有 execute_msg 里面可以答复消息
  358. }
  359. }
  360. }
  361. }
  362. return Error_code::SUCCESS;
  363. }
  364. //检查执行者的状态, 判断能否处理这条消息, 需要子类重载
  365. Error_manager Communication_socket_base::check_executer(Communication_message* p_msg)
  366. {
  367. //检查对应模块的状态, 判断是否可以处理这条消息
  368. //同时也要判断是否超时, 超时返回 COMMUNICATION_ANALYSIS_TIME_OUT
  369. //如果处理器正在忙别的, 那么返回 COMMUNICATION_EXCUTER_IS_BUSY
  370. if ( p_msg->is_over_time() )
  371. {
  372. std::cout << "Communication_socket_base::check_msg p_buf = " << p_msg->get_message_buf() << std::endl;
  373. std::cout << "Communication_socket_base::check_msg size = " << p_msg->get_message_buf().size() << std::endl;
  374. std::cout << "COMMUNICATION_ANALYSIS_TIME_OUT , " << std::endl;
  375. return Error_code::COMMUNICATION_ANALYSIS_TIME_OUT;
  376. }
  377. else
  378. {
  379. bool executer_is_ready = false;
  380. //通过 p_msg->get_message_type() 和 p_msg->get_receiver() 找到处理模块的实例对象, 查询执行人是否可以处理这条消息
  381. //这里子类重载时, 增加判断逻辑, 以后再写.
  382. // executer_is_ready = true;
  383. std::cout << "Communication_socket_base::check_msg p_buf = " << p_msg->get_message_buf() << std::endl;
  384. std::cout << "Communication_socket_base::check_msg size = " << p_msg->get_message_buf().size() << std::endl;
  385. if ( executer_is_ready )
  386. {
  387. std::cout << "executer_is_ready , " << std::endl;
  388. return Error_code::SUCCESS;
  389. }
  390. else
  391. {
  392. std::cout << "executer_is_busy , " << std::endl;
  393. return Error_code::COMMUNICATION_EXCUTER_IS_BUSY;
  394. }
  395. }
  396. return Error_code::SUCCESS;
  397. }
  398. //处理消息
  399. Error_manager Communication_socket_base::execute_msg(Communication_message* p_msg)
  400. {
  401. //先将 p_msg 转化为 对应的格式, 使用对应模块的protobuf来二次解析
  402. // 不能一直使用 Communication_message* p_msg, 这个是要销毁的
  403. //然后处理这个消息, 就是调用对应模块的 execute 接口函数
  404. //执行结果不管, 如果需要答复, 那么对应模块 在自己内部 封装一条消息发送即可.
  405. //子类重载, 需要完全重写, 以后再写.
  406. //注注注注注意了, 本模块只是用来做通信,
  407. //在做处理消息的时候, 可能会调用执行者的接口函数,
  408. //这里不应该长时间阻塞或者处理复杂的逻辑,
  409. //请执行者另开线程来处理任务.
  410. std::cout << "Communication_socket_base::excute_msg p_buf = " << p_msg->get_message_buf() << std::endl;
  411. std::cout << "Communication_socket_base::excute_msg size = " << p_msg->get_message_buf().size() << std::endl;
  412. return Error_code::SUCCESS;
  413. }
  414. //mp_send_data_thread 发送线程执行函数,
  415. //send_data_thread 内部线程负责发送消息
  416. void Communication_socket_base::send_data_thread(Communication_socket_base* communicator)
  417. {
  418. LOG(INFO) << " Communication_socket_base::send_data_thread start "<< communicator;
  419. //通信发送线程, 负责巡检m_send_data_list, 并发送消息
  420. while (communicator->m_send_data_condition.is_alive())
  421. {
  422. communicator->m_send_data_condition.wait();
  423. if ( communicator->m_send_data_condition.is_alive() )
  424. {
  425. std::this_thread::yield();
  426. Communication_message* tp_msg = NULL;
  427. //这里 wait_and_pop 会使用链表内部的 m_data_cond 条件变量来控制等待,
  428. //封装线程使用push的时候, 会唤醒线程并通过等待, 此时 m_send_data_condition 是一直通过的.
  429. //如果需要退出, 那么就要 m_send_data_list.termination_list(); 和 m_send_data_condition.kill_all();
  430. bool is_pop = communicator->m_send_data_list.wait_and_pop(tp_msg);
  431. if ( is_pop )
  432. {
  433. if ( tp_msg != NULL )
  434. {
  435. std::unique_lock<std::mutex> lk(communicator->m_mutex);
  436. int send_size = communicator->m_socket.send(tp_msg->get_message_buf());
  437. delete(tp_msg);
  438. tp_msg = NULL;
  439. }
  440. }
  441. else
  442. {
  443. //没有取出, 那么应该就是 m_termination_flag 结束了
  444. // return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR,
  445. // " Communication_socket_base::send_data_thread() error ");
  446. }
  447. }
  448. }
  449. LOG(INFO) << " Communication_socket_base::send_data_thread end "<< communicator;
  450. return;
  451. }
  452. //mp_encapsulate_data_thread 封装线程执行函数,
  453. //encapsulate_data_thread 内部线程负责封装消息
  454. void Communication_socket_base::encapsulate_data_thread()
  455. {
  456. LOG(INFO) << " Communication_socket_base::encapsulate_data_thread start "<< this;
  457. //通信封装线程, 负责定时封装消息, 并存入 m_send_data_list
  458. while (m_encapsulate_data_condition.is_alive())
  459. {
  460. bool t_pass_flag = m_encapsulate_data_condition.wait_for_millisecond(1000);
  461. if ( m_encapsulate_data_condition.is_alive() )
  462. {
  463. std::this_thread::yield();
  464. //如果封装线程被主动唤醒, 那么就表示 需要主动发送消息,
  465. if ( t_pass_flag )
  466. {
  467. //主动发送消息,
  468. }
  469. //如果封装线程超时通过, 那么就定时封装心跳和状态信息
  470. else
  471. {
  472. encapsulate_send_data();
  473. }
  474. }
  475. }
  476. LOG(INFO) << " Communication_socket_base::encapsulate_data_thread end "<< this;
  477. return;
  478. }
  479. //定时封装发送消息, 一般为心跳和状态信息, 需要子类重载
  480. Error_manager Communication_socket_base::encapsulate_send_data()
  481. {
  482. // char buf[256] = {0};
  483. // static unsigned int t_heartbeat = 0;
  484. // sprintf(buf, "Communication_socket_base, heartbeat = %d\0\0\0, test\0", t_heartbeat);
  485. // t_heartbeat++;
  486. return SUCCESS;
  487. message::Base_info t_base_msg;
  488. t_base_msg.set_msg_type(message::Message_type::eBase_msg);
  489. t_base_msg.set_timeout_ms(5000);
  490. t_base_msg.set_sender(message::Communicator::eMain);
  491. t_base_msg.set_receiver(message::Communicator::eMain);
  492. Communication_message* tp_msg = new Communication_message(t_base_msg.SerializeAsString());
  493. bool is_push = m_send_data_list.push(tp_msg);
  494. if ( is_push == false )
  495. {
  496. delete(tp_msg);
  497. tp_msg = NULL;
  498. return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR,
  499. " Communication_socket_base::encapsulate_msg error ");
  500. }
  501. return Error_code::SUCCESS;
  502. }
  503. //封装消息, 需要子类重载
  504. Error_manager Communication_socket_base::encapsulate_msg(std::string message)
  505. {
  506. Communication_message* tp_msg = new Communication_message(message);
  507. bool is_push = m_send_data_list.push(tp_msg);
  508. if ( is_push == false )
  509. {
  510. delete(tp_msg);
  511. tp_msg = NULL;
  512. return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR,
  513. " Communication_socket_base::encapsulate_msg error ");
  514. }
  515. return Error_code::SUCCESS;
  516. }
  517. //封装消息, 需要子类重载
  518. Error_manager Communication_socket_base::encapsulate_msg(Communication_message* p_msg)
  519. {
  520. Communication_message* tp_msg = new Communication_message(*p_msg);
  521. bool is_push = m_send_data_list.push(tp_msg);
  522. if ( is_push == false )
  523. {
  524. delete(tp_msg);
  525. tp_msg = NULL;
  526. return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR,
  527. " Communication_socket_base::encapsulate_msg error ");
  528. }
  529. return Error_code::SUCCESS;
  530. }