communication_socket_base.cpp 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667
  1. #include "communication_socket_base.h"
  2. #include "../tool/proto_tool.h"
  3. Communication_socket_base::Communication_socket_base()
  4. {
  5. m_communication_statu = COMMUNICATION_UNKNOW;
  6. mp_receive_data_thread = NULL;
  7. mp_analysis_data_thread = NULL;
  8. mp_send_data_thread = NULL;
  9. mp_encapsulate_data_thread = NULL;
  10. m_analysis_cycle_time = 1000;//默认1000ms,就自动解析(接受list)
  11. m_encapsulate_cycle_time = 1000;//默认1000ms,就自动发送一次状态信息
  12. }
  13. Communication_socket_base::~Communication_socket_base()
  14. {
  15. communication_uninit();
  16. }
  17. //初始化 通信 模块。如下三选一
  18. Error_manager Communication_socket_base::communication_init()
  19. {
  20. LOG(INFO) << " ---Communication_socket_base::communication_init() run--- "<< this;
  21. return communication_init_from_protobuf(COMMUNICATION_PARAMETER_PATH);
  22. }
  23. //初始化 通信 模块。从文件读取
  24. Error_manager Communication_socket_base::communication_init_from_protobuf(std::string prototxt_path)
  25. {
  26. Communication_proto::Communication_parameter_all t_communication_parameter_all;
  27. if(! proto_tool::read_proto_param(prototxt_path,t_communication_parameter_all) )
  28. {
  29. return Error_manager(COMMUNICATION_READ_PROTOBUF_ERROR,MINOR_ERROR,
  30. "Communication_socket_base read_proto_param failed");
  31. }
  32. return communication_init_from_protobuf(t_communication_parameter_all);
  33. }
  34. //初始化 通信 模块。从protobuf读取
  35. Error_manager Communication_socket_base::communication_init_from_protobuf(Communication_proto::Communication_parameter_all& communication_parameter_all)
  36. {
  37. LOG(INFO) << " ---Communication_socket_base::communication_init_from_protobuf() run--- "<< this;
  38. Error_manager t_error;
  39. int result = nng_bus0_open(&m_socket_bus);
  40. if ( result != 0 )
  41. {
  42. return Error_manager(Error_code::COMMUNICATION_INIT_ERROR, Error_level::MINOR_ERROR,
  43. "communication_init nng_bus0_open error ");
  44. }
  45. if ( communication_parameter_all.communication_parameters().has_bind_string() )
  46. {
  47. t_error = communication_bind(communication_parameter_all.communication_parameters().bind_string());
  48. if ( t_error != Error_code::SUCCESS )
  49. {
  50. return t_error;
  51. }
  52. }
  53. // std::cout << "communication_parameter_all.communication_parameters().connect_string_vector_size() " <<
  54. // communication_parameter_all.communication_parameters().connect_string_vector_size()<< std::endl;
  55. for(int i=0;i<communication_parameter_all.communication_parameters().connect_string_vector_size();++i)
  56. {
  57. t_error = communication_connect( communication_parameter_all.communication_parameters().connect_string_vector(i) );
  58. if ( t_error != Error_code::SUCCESS )
  59. {
  60. return t_error;
  61. }
  62. }
  63. //启动通信, run thread
  64. communication_run();
  65. return Error_code::SUCCESS;
  66. }
  67. //初始化
  68. Error_manager Communication_socket_base::communication_init(std::string bind_string, std::vector<std::string>& connect_string_vector)
  69. {
  70. LOG(INFO) << " ---Communication_socket_base::communication_init() run--- "<< this;
  71. Error_manager t_error;
  72. int t_result = nng_bus0_open(&m_socket_bus);
  73. if ( t_result != 0 )
  74. {
  75. return Error_manager(Error_code::COMMUNICATION_INIT_ERROR, Error_level::MINOR_ERROR,
  76. "communication_init nng_bus0_open error ");
  77. }
  78. t_error = communication_bind(bind_string);
  79. if ( t_error != Error_code::SUCCESS )
  80. {
  81. return t_error;
  82. }
  83. t_error = communication_connect(connect_string_vector);
  84. if ( t_error != Error_code::SUCCESS )
  85. {
  86. return t_error;
  87. }
  88. //启动通信, run thread
  89. communication_run();
  90. return Error_code::SUCCESS;
  91. }
  92. //bind
  93. Error_manager Communication_socket_base::communication_bind(std::string bind_string)
  94. {
  95. Error_manager t_error;
  96. int t_socket_result;
  97. //m_socket 自己作为一个服务器, 绑定一个端口
  98. // t_socket_result = m_socket.bind(bind_string);
  99. // if ( t_socket_result <0 )
  100. // {
  101. // return Error_manager(Error_code::COMMUNICATION_BIND_ERROR, Error_level::MINOR_ERROR,
  102. // " m_socket.bind error ");
  103. // }
  104. //m_socket 自己作为一个服务器, 绑定一个端口
  105. int t_result = nng_listen( m_socket_bus, bind_string.c_str(), nullptr, NNG_FLAG_NONBLOCK );
  106. if ( t_result != 0 )
  107. {
  108. return Error_manager(Error_code::COMMUNICATION_BIND_ERROR, Error_level::MINOR_ERROR,
  109. "communication_bind nng_listen error ");
  110. }
  111. LOG(INFO) << " ---Communication_socket_base::communication_bind() bind:: "<< bind_string << " " << this;
  112. return Error_code::SUCCESS;
  113. }
  114. //connect
  115. Error_manager Communication_socket_base::communication_connect(std::vector<std::string>& connect_string_vector)
  116. {
  117. Error_manager t_error;
  118. for (auto iter = connect_string_vector.begin(); iter != connect_string_vector.end(); ++iter)
  119. {
  120. t_error = communication_connect(*iter);
  121. if ( t_error != Error_code::SUCCESS )
  122. {
  123. return t_error;
  124. }
  125. }
  126. return Error_code::SUCCESS;
  127. }
  128. //connect
  129. Error_manager Communication_socket_base::communication_connect(std::string connect_string)
  130. {
  131. Error_manager t_error;
  132. int t_socket_result;
  133. //m_socket 和远端通信, 连接远端服务器的端口
  134. // t_socket_result = m_socket.connect(connect_string);
  135. // if ( t_socket_result <0 )
  136. // {
  137. // return Error_manager(Error_code::COMMUNICATION_CONNECT_ERROR, Error_level::MINOR_ERROR,
  138. // " m_socket.connect error ");
  139. // }
  140. //m_socket 和远端通信, 连接远端服务器的端口
  141. int t_result = nng_dial( m_socket_bus, connect_string.c_str(), nullptr, NNG_FLAG_NONBLOCK );
  142. if ( t_result != 0 )
  143. {
  144. return Error_manager(Error_code::COMMUNICATION_CONNECT_ERROR, Error_level::MINOR_ERROR,
  145. "communication_connect nng_dial error ");
  146. }
  147. LOG(INFO) << " ---Communication_socket_base::communication_connect() connect:: "<< connect_string << " " << this;
  148. return Error_code::SUCCESS;
  149. }
  150. //启动通信, run thread
  151. Error_manager Communication_socket_base::communication_run()
  152. {
  153. m_communication_statu = COMMUNICATION_READY;
  154. //启动4个线程。
  155. //接受线程默认循环, 内部的nn_recv进行等待, 超时1ms
  156. m_receive_condition.reset(false, false, false);
  157. mp_receive_data_thread = new std::thread(&Communication_socket_base::receive_data_thread, this);
  158. //解析线程默认等待, 需要接受线程去唤醒, 超时1ms, 超时后主动遍历m_receive_data_list
  159. m_analysis_data_condition.reset(false, false, false);
  160. mp_analysis_data_thread = new std::thread(&Communication_socket_base::analysis_data_thread, this);
  161. //发送线程默认循环, 内部的wait_and_pop进行等待,
  162. m_send_data_condition.reset(false, true, false);
  163. mp_send_data_thread = new std::thread(&Communication_socket_base::send_data_thread, this);
  164. //封装线程默认等待, ...., 超时1ms, 超时后主动 封装心跳和状态信息,
  165. m_encapsulate_data_condition.reset(false, false, false);
  166. mp_encapsulate_data_thread = new std::thread(&Communication_socket_base::encapsulate_data_thread, this);
  167. return Error_code::SUCCESS;
  168. }
  169. //反初始化 通信 模块。
  170. Error_manager Communication_socket_base::communication_uninit()
  171. {
  172. //终止list,防止 wait_and_pop 阻塞线程。
  173. m_receive_data_list.termination_list();
  174. m_send_data_list.termination_list();
  175. //杀死4个线程,强制退出
  176. if (mp_receive_data_thread)
  177. {
  178. m_receive_condition.kill_all();
  179. }
  180. if (mp_analysis_data_thread)
  181. {
  182. m_analysis_data_condition.kill_all();
  183. }
  184. if (mp_send_data_thread)
  185. {
  186. m_send_data_condition.kill_all();
  187. }
  188. if (mp_encapsulate_data_thread)
  189. {
  190. m_encapsulate_data_condition.kill_all();
  191. }
  192. //回收4个线程的资源
  193. if (mp_receive_data_thread)
  194. {
  195. mp_receive_data_thread->join();
  196. delete mp_receive_data_thread;
  197. mp_receive_data_thread = NULL;
  198. }
  199. if (mp_analysis_data_thread)
  200. {
  201. mp_analysis_data_thread->join();
  202. delete mp_analysis_data_thread;
  203. mp_analysis_data_thread = 0;
  204. }
  205. if (mp_send_data_thread)
  206. {
  207. mp_send_data_thread->join();
  208. delete mp_send_data_thread;
  209. mp_send_data_thread = NULL;
  210. }
  211. if (mp_encapsulate_data_thread)
  212. {
  213. mp_encapsulate_data_thread->join();
  214. delete mp_encapsulate_data_thread;
  215. mp_encapsulate_data_thread = NULL;
  216. }
  217. //清空list
  218. m_receive_data_list.clear_and_delete();
  219. m_send_data_list.clear_and_delete();
  220. m_communication_statu = COMMUNICATION_UNKNOW;
  221. //m_socket.close();
  222. nng_close(m_socket_bus);
  223. return Error_code::SUCCESS;
  224. }
  225. void Communication_socket_base::set_analysis_cycle_time(unsigned int analysis_cycle_time)
  226. {
  227. m_analysis_cycle_time = analysis_cycle_time;
  228. }
  229. void Communication_socket_base::set_encapsulate_cycle_time(unsigned int encapsulate_cycle_time)
  230. {
  231. m_encapsulate_cycle_time = encapsulate_cycle_time;
  232. }
  233. //接受string, 非阻塞模式, return 接收数据,如果没有收到数据,长度就为0, 非阻塞模式
  234. std::string Communication_socket_base::receive_string()
  235. {
  236. char* tp_data = NULL;
  237. size_t t_size = 0;
  238. int t_result = nng_recv(m_socket_bus,&tp_data,&t_size,NNG_FLAG_ALLOC | NNG_FLAG_NONBLOCK );
  239. if( t_result != 0 ) {
  240. return std::string();
  241. }
  242. else if ( t_size == 0 )
  243. {
  244. return std::string();
  245. }
  246. else
  247. {
  248. return std::string(tp_data, t_size);
  249. }
  250. }
  251. //发送string
  252. void Communication_socket_base::send_string(std::string str)
  253. {
  254. int t_result = nng_send(m_socket_bus,(void*)(str.c_str()), str.size(), NNG_FLAG_NONBLOCK);
  255. return;
  256. }
  257. //mp_receive_data_thread 接受线程执行函数,
  258. //receive_data_thread 内部线程负责接受消息
  259. void Communication_socket_base::receive_data_thread()
  260. {
  261. LOG(INFO) << " Communication_socket_base::receive_data_thread start "<< this;
  262. //通信接受线程, 负责接受socket消息, 并存入 m_receive_data_list
  263. while (m_receive_condition.is_alive())
  264. {
  265. m_receive_condition.wait_for_ex(std::chrono::microseconds(1));
  266. if ( m_receive_condition.is_alive() )
  267. {
  268. std::this_thread::yield();
  269. std::string t_receive_string;
  270. {//这个大括号表示只对 recv 和 send 加锁, 不要因为后面的复杂逻辑影响通信效率
  271. std::unique_lock<std::mutex> lk(m_mutex);
  272. //flags为1, 非阻塞接受消息, 如果接收到消息, 那么接受数据长度大于0
  273. // t_receive_string = m_socket.recv<std::string>(1);
  274. t_receive_string = receive_string();
  275. }
  276. if ( t_receive_string.size()>0 )
  277. {
  278. // std::cout << " huli test :::: " << " t_receive_string = " << t_receive_string << std::endl;
  279. //如果这里接受到了消息, 在这提前解析消息最前面的Base_msg (消息公共内容), 用于后续的check
  280. message::Base_msg t_base_msg;
  281. if( t_base_msg.ParseFromString(t_receive_string) )
  282. {
  283. //第一次解析之后转化为, Communication_message, 自定义的通信消息格式
  284. Communication_message * tp_communication_message = new Communication_message;
  285. tp_communication_message->reset(t_base_msg.base_info(), t_receive_string);
  286. //检查消息是否有效, 主要检查消息类型和接受者, 判断这条消息是不是给我的.
  287. if ( check_msg(tp_communication_message) == SUCCESS )
  288. {
  289. bool is_push = m_receive_data_list.push(tp_communication_message);
  290. //push成功之后, tp_communication_message内存的管理权限交给链表, 如果失败就要回收内存
  291. if ( is_push )
  292. {
  293. //唤醒解析线程一次,
  294. m_analysis_data_condition.notify_all(false, true);
  295. }
  296. else
  297. {
  298. // push失败, 就要回收内存
  299. delete(tp_communication_message);
  300. tp_communication_message = NULL;
  301. // return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR,
  302. // " m_receive_data_list.push error ");
  303. }
  304. }
  305. else
  306. {
  307. delete(tp_communication_message);
  308. tp_communication_message = NULL;
  309. }
  310. }
  311. //解析失败, 就当做什么也没发生, 认为接收消息无效,
  312. }
  313. //没有接受到消息, 返回空字符串
  314. }
  315. }
  316. LOG(INFO) << " Communication_socket_base::receive_data_thread end "<< this;
  317. return;
  318. }
  319. //检查消息是否有效, 主要检查消息类型和接受者, 判断这条消息是不是给我的.
  320. Error_manager Communication_socket_base::check_msg(Communication_message* p_msg)
  321. {
  322. //通过 p_msg->get_message_type() 和 p_msg->get_receiver() 判断这条消息是不是给我的.
  323. //子类重载时, 增加自己模块的判断逻辑, 以后再写.
  324. if ( p_msg->get_message_type() == Communication_message::Message_type::eBase_msg
  325. && p_msg->get_receiver() == Communication_message::Communicator::eMain )
  326. {
  327. return Error_code::SUCCESS;
  328. }
  329. else
  330. {
  331. //无效的消息,
  332. return Error_manager(Error_code::INVALID_MESSAGE, Error_level::NEGLIGIBLE_ERROR,
  333. " INVALID_MESSAGE error "); }
  334. }
  335. //mp_analysis_data_thread 解析线程执行函数,
  336. //analysis_data_thread 内部线程负责解析消息
  337. void Communication_socket_base::analysis_data_thread()
  338. {
  339. LOG(INFO) << " Communication_socket_base::analysis_data_thread start "<< this;
  340. //通信解析线程, 负责巡检m_receive_data_list, 并解析和处理消息
  341. while (m_analysis_data_condition.is_alive())
  342. {
  343. bool t_pass_flag = m_analysis_data_condition.wait_for_millisecond(m_analysis_cycle_time);
  344. if ( m_analysis_data_condition.is_alive() )
  345. {
  346. std::this_thread::yield();
  347. //如果解析线程被主动唤醒, 那么就表示 收到新的消息, 那就遍历整个链表
  348. if ( t_pass_flag )
  349. {
  350. analysis_receive_list();
  351. }
  352. //如果解析线程超时通过, 那么就定时处理链表残留的消息,
  353. else
  354. {
  355. analysis_receive_list();
  356. }
  357. }
  358. }
  359. LOG(INFO) << " Communication_socket_base::analysis_data_thread end "<< this;
  360. return;
  361. }
  362. //循环接受链表, 解析消息,
  363. Error_manager Communication_socket_base::analysis_receive_list()
  364. {
  365. Error_manager t_error;
  366. if ( m_receive_data_list.m_termination_flag )
  367. {
  368. return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR,
  369. " Communication_socket_base::analysis_receive_list error ");
  370. }
  371. else
  372. {
  373. std::unique_lock<std::mutex> lk(m_receive_data_list.m_mutex);
  374. for (auto iter = m_receive_data_list.m_data_list.begin(); iter != m_receive_data_list.m_data_list.end(); )
  375. {
  376. Communication_message* tp_msg = **iter;
  377. if ( tp_msg == NULL )
  378. {
  379. iter = m_receive_data_list.m_data_list.erase(iter);
  380. //注:erase 删除当前 iter 之后返回下一个节点,当前的 iter 无效化,
  381. }
  382. else
  383. {
  384. //检查消息是否可以被处理
  385. t_error = check_executer(tp_msg);
  386. if ( t_error == SUCCESS)
  387. {
  388. //处理消息
  389. t_error = execute_msg(tp_msg);
  390. // if ( t_error )
  391. // {
  392. // //执行结果不管
  393. // }
  394. // else
  395. // {
  396. // //执行结果不管
  397. // }
  398. delete(tp_msg);
  399. tp_msg = NULL;
  400. iter = m_receive_data_list.m_data_list.erase(iter);
  401. //注:erase 删除当前 iter 之后返回下一个节点,当前的 iter 无效化,
  402. }
  403. else if( t_error == COMMUNICATION_EXCUTER_IS_BUSY)
  404. {
  405. //处理器正忙, 那就不做处理, 直接处理下一个
  406. //注:这条消息就被保留了下来, wait_for_millisecond 超时通过之后, 会循环检查残留的消息.
  407. iter++;
  408. }
  409. else //if( t_error == COMMUNICATION_ANALYSIS_TIME_OUT )
  410. {
  411. //超时了就直接删除
  412. delete(tp_msg);
  413. tp_msg = NULL;
  414. iter = m_receive_data_list.m_data_list.erase(iter);
  415. //注:erase 删除当前 iter 之后返回下一个节点,当前的 iter 无效化,
  416. //注:消息删除之后, 不需要发送答复消息, 发送方也会有超时处理的, 只有 execute_msg 里面可以答复消息
  417. }
  418. }
  419. }
  420. }
  421. return Error_code::SUCCESS;
  422. }
  423. //检查执行者的状态, 判断能否处理这条消息, 需要子类重载
  424. Error_manager Communication_socket_base::check_executer(Communication_message* p_msg)
  425. {
  426. //检查对应模块的状态, 判断是否可以处理这条消息
  427. //同时也要判断是否超时, 超时返回 COMMUNICATION_ANALYSIS_TIME_OUT
  428. //如果处理器正在忙别的, 那么返回 COMMUNICATION_EXCUTER_IS_BUSY
  429. if ( p_msg->is_over_time() )
  430. {
  431. std::cout << "Communication_socket_base::check_msg p_buf = " << p_msg->get_message_buf() << std::endl;
  432. std::cout << "Communication_socket_base::check_msg size = " << p_msg->get_message_buf().size() << std::endl;
  433. std::cout << "COMMUNICATION_ANALYSIS_TIME_OUT , " << std::endl;
  434. return Error_code::COMMUNICATION_ANALYSIS_TIME_OUT;
  435. }
  436. else
  437. {
  438. bool executer_is_ready = false;
  439. //通过 p_msg->get_message_type() 和 p_msg->get_receiver() 找到处理模块的实例对象, 查询执行人是否可以处理这条消息
  440. //这里子类重载时, 增加判断逻辑, 以后再写.
  441. executer_is_ready = true;
  442. std::cout << "Communication_socket_base::check_msg p_buf = " << p_msg->get_message_buf() << std::endl;
  443. std::cout << "Communication_socket_base::check_msg size = " << p_msg->get_message_buf().size() << std::endl;
  444. if ( executer_is_ready )
  445. {
  446. std::cout << "executer_is_ready , " << std::endl;
  447. return Error_code::SUCCESS;
  448. }
  449. else
  450. {
  451. std::cout << "executer_is_busy , " << std::endl;
  452. return Error_code::COMMUNICATION_EXCUTER_IS_BUSY;
  453. }
  454. }
  455. return Error_code::SUCCESS;
  456. }
  457. //处理消息
  458. Error_manager Communication_socket_base::execute_msg(Communication_message* p_msg)
  459. {
  460. //先将 p_msg 转化为 对应的格式, 使用对应模块的protobuf来二次解析
  461. // 不能一直使用 Communication_message* p_msg, 这个是要销毁的
  462. //然后处理这个消息, 就是调用对应模块的 execute 接口函数
  463. //执行结果不管, 如果需要答复, 那么对应模块 在自己内部 封装一条消息发送即可.
  464. //子类重载, 需要完全重写, 以后再写.
  465. //注注注注注意了, 本模块只是用来做通信,
  466. //在做处理消息的时候, 可能会调用执行者的接口函数,
  467. //这里不应该长时间阻塞或者处理复杂的逻辑,
  468. //请执行者另开线程来处理任务.
  469. std::cout << "Communication_socket_base::excute_msg p_buf = " << p_msg->get_message_buf() << std::endl;
  470. std::cout << "Communication_socket_base::excute_msg size = " << p_msg->get_message_buf().size() << std::endl;
  471. return Error_code::SUCCESS;
  472. }
  473. //mp_send_data_thread 发送线程执行函数,
  474. //send_data_thread 内部线程负责发送消息
  475. void Communication_socket_base::send_data_thread()
  476. {
  477. LOG(INFO) << " Communication_socket_base::send_data_thread start "<< this;
  478. //通信发送线程, 负责巡检m_send_data_list, 并发送消息
  479. while (m_send_data_condition.is_alive())
  480. {
  481. m_send_data_condition.wait();
  482. if ( m_send_data_condition.is_alive() )
  483. {
  484. std::this_thread::yield();
  485. Communication_message* tp_msg = NULL;
  486. //这里 wait_and_pop 会使用链表内部的 m_data_cond 条件变量来控制等待,
  487. //封装线程使用push的时候, 会唤醒线程并通过等待, 此时 m_send_data_condition 是一直通过的.
  488. //如果需要退出, 那么就要 m_send_data_list.termination_list(); 和 m_send_data_condition.kill_all();
  489. bool is_pop = m_send_data_list.wait_and_pop(tp_msg);
  490. if ( is_pop )
  491. {
  492. if ( tp_msg != NULL )
  493. {
  494. {//这个大括号表示只对 recv 和 send 加锁, 不要因为后面的复杂逻辑影响通信效率
  495. std::unique_lock<std::mutex> lk(m_mutex);
  496. // m_socket.send(tp_msg->get_message_buf());
  497. send_string(tp_msg->get_message_buf());
  498. }
  499. delete(tp_msg);
  500. tp_msg = NULL;
  501. }
  502. }
  503. else
  504. {
  505. //没有取出, 那么应该就是 m_termination_flag 结束了
  506. // return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR,
  507. // " Communication_socket_base::send_data_thread() error ");
  508. }
  509. }
  510. }
  511. LOG(INFO) << " Communication_socket_base::send_data_thread end "<< this;
  512. return;
  513. }
  514. //mp_encapsulate_data_thread 封装线程执行函数,
  515. //encapsulate_data_thread 内部线程负责封装消息
  516. void Communication_socket_base::encapsulate_data_thread()
  517. {
  518. LOG(INFO) << " Communication_socket_base::encapsulate_data_thread start "<< this;
  519. //通信封装线程, 负责定时封装消息, 并存入 m_send_data_list
  520. while (m_encapsulate_data_condition.is_alive())
  521. {
  522. bool t_pass_flag = m_encapsulate_data_condition.wait_for_millisecond(m_encapsulate_cycle_time);
  523. if ( m_encapsulate_data_condition.is_alive() )
  524. {
  525. std::this_thread::yield();
  526. //如果封装线程被主动唤醒, 那么就表示 需要主动发送消息,
  527. if ( t_pass_flag )
  528. {
  529. //主动发送消息,
  530. }
  531. //如果封装线程超时通过, 那么就定时封装心跳和状态信息
  532. else
  533. {
  534. encapsulate_send_data();
  535. }
  536. }
  537. }
  538. LOG(INFO) << " Communication_socket_base::encapsulate_data_thread end "<< this;
  539. return;
  540. }
  541. //定时封装发送消息, 一般为心跳和状态信息, 需要子类重载
  542. Error_manager Communication_socket_base::encapsulate_send_data()
  543. {
  544. // char buf[256] = {0};
  545. // static unsigned int t_heartbeat = 0;
  546. // sprintf(buf, "Communication_socket_base, heartbeat = %d\0\0\0, test\0", t_heartbeat);
  547. // t_heartbeat++;
  548. return SUCCESS;
  549. message::Base_msg t_base_msg;
  550. t_base_msg.mutable_base_info()->set_msg_type(message::Message_type::eBase_msg);
  551. t_base_msg.mutable_base_info()->set_timeout_ms(5000);
  552. t_base_msg.mutable_base_info()->set_sender(message::Communicator::eMain);
  553. t_base_msg.mutable_base_info()->set_receiver(message::Communicator::eMain);
  554. Communication_message* tp_msg = new Communication_message(t_base_msg.SerializeAsString());
  555. bool is_push = m_send_data_list.push(tp_msg);
  556. if ( is_push == false )
  557. {
  558. delete(tp_msg);
  559. tp_msg = NULL;
  560. return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR,
  561. " Communication_socket_base::encapsulate_msg error ");
  562. }
  563. return Error_code::SUCCESS;
  564. }
  565. //封装消息, 需要子类重载
  566. Error_manager Communication_socket_base::encapsulate_msg(std::string message)
  567. {
  568. Communication_message* tp_msg = new Communication_message(message);
  569. bool is_push = m_send_data_list.push(tp_msg);
  570. if ( is_push == false )
  571. {
  572. delete(tp_msg);
  573. tp_msg = NULL;
  574. return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR,
  575. " Communication_socket_base::encapsulate_msg error ");
  576. }
  577. return Error_code::SUCCESS;
  578. }
  579. //封装消息, 需要子类重载
  580. Error_manager Communication_socket_base::encapsulate_msg(Communication_message* p_msg)
  581. {
  582. Communication_message* tp_msg = new Communication_message(*p_msg);
  583. bool is_push = m_send_data_list.push(tp_msg);
  584. if ( is_push == false )
  585. {
  586. delete(tp_msg);
  587. tp_msg = NULL;
  588. return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR,
  589. " Communication_socket_base::encapsulate_msg error ");
  590. }
  591. return Error_code::SUCCESS;
  592. }