communication_socket_base.cpp 20 KB


  1. #include "communication_socket_base.h"
  2. #include "../tool/proto_tool.h"
  3. Communication_socket_base::Communication_socket_base()
  4. {
  5. m_communication_statu = COMMUNICATION_UNKNOW;
  6. mp_receive_data_thread = NULL;
  7. mp_analysis_data_thread = NULL;
  8. mp_send_data_thread = NULL;
  9. mp_encapsulate_data_thread = NULL;
  10. m_analysis_cycle_time = 1000;//默认1000ms,就自动解析(接受list)
  11. m_encapsulate_cycle_time = 1000;//默认1000ms,就自动发送一次状态信息
  12. }
  13. Communication_socket_base::~Communication_socket_base()
  14. {
  15. communication_uninit();
  16. }
  17. //初始化 通信 模块。如下三选一
  18. Error_manager Communication_socket_base::communication_init()
  19. {
  20. LOG(INFO) << " ---Communication_socket_base::communication_init() run--- "<< this;
  21. return communication_init_from_protobuf(COMMUNICATION_PARAMETER_PATH);
  22. }
  23. //初始化 通信 模块。从文件读取
  24. Error_manager Communication_socket_base::communication_init_from_protobuf(std::string prototxt_path)
  25. {
  26. Communication_proto::Communication_parameter_all t_communication_parameter_all;
  27. if(! proto_tool::read_proto_param(prototxt_path,t_communication_parameter_all) )
  28. {
  29. return Error_manager(COMMUNICATION_READ_PROTOBUF_ERROR,MINOR_ERROR,
  30. "Communication_socket_base read_proto_param failed");
  31. }
  32. return communication_init_from_protobuf(t_communication_parameter_all);
  33. }
  34. //初始化 通信 模块。从protobuf读取
  35. Error_manager Communication_socket_base::communication_init_from_protobuf(Communication_proto::Communication_parameter_all& communication_parameter_all)
  36. {
  37. LOG(INFO) << " ---Communication_socket_base::communication_init_from_protobuf() run--- "<< this;
  38. Error_manager t_error;
  39. if ( communication_parameter_all.communication_parameters().has_bind_string() )
  40. {
  41. t_error = communication_bind(communication_parameter_all.communication_parameters().bind_string());
  42. if ( t_error != Error_code::SUCCESS )
  43. {
  44. return t_error;
  45. }
  46. }
  47. // std::cout << "communication_parameter_all.communication_parameters().connect_string_vector_size() " <<
  48. // communication_parameter_all.communication_parameters().connect_string_vector_size()<< std::endl;
  49. for(int i=0;i<communication_parameter_all.communication_parameters().connect_string_vector_size();++i)
  50. {
  51. t_error = communication_connect( communication_parameter_all.communication_parameters().connect_string_vector(i) );
  52. if ( t_error != Error_code::SUCCESS )
  53. {
  54. return t_error;
  55. }
  56. }
  57. //启动通信, run thread
  58. communication_run();
  59. return Error_code::SUCCESS;
  60. }
  61. //初始化
  62. Error_manager Communication_socket_base::communication_init(std::string bind_string, std::vector<std::string>& connect_string_vector)
  63. {
  64. LOG(INFO) << " ---Communication_socket_base::communication_init() run--- "<< this;
  65. Error_manager t_error;
  66. t_error = communication_bind(bind_string);
  67. if ( t_error != Error_code::SUCCESS )
  68. {
  69. return t_error;
  70. }
  71. t_error = communication_connect(connect_string_vector);
  72. if ( t_error != Error_code::SUCCESS )
  73. {
  74. return t_error;
  75. }
  76. //启动通信, run thread
  77. communication_run();
  78. return Error_code::SUCCESS;
  79. }
  80. //bind
  81. Error_manager Communication_socket_base::communication_bind(std::string bind_string)
  82. {
  83. Error_manager t_error;
  84. int t_socket_result;
  85. //m_socket 自己作为一个服务器, 绑定一个端口
  86. t_socket_result = m_socket.bind(bind_string);
  87. if ( t_socket_result <0 )
  88. {
  89. return Error_manager(Error_code::COMMUNICATION_BIND_ERROR, Error_level::MINOR_ERROR,
  90. " m_socket.bind error ");
  91. }
  92. LOG(INFO) << " ---Communication_socket_base::communication_bind() bind:: "<< bind_string << " " << this;
  93. return Error_code::SUCCESS;
  94. }
  95. //connect
  96. Error_manager Communication_socket_base::communication_connect(std::vector<std::string>& connect_string_vector)
  97. {
  98. Error_manager t_error;
  99. for (auto iter = connect_string_vector.begin(); iter != connect_string_vector.end(); ++iter)
  100. {
  101. t_error = communication_connect(*iter);
  102. if ( t_error != Error_code::SUCCESS )
  103. {
  104. return t_error;
  105. }
  106. }
  107. return Error_code::SUCCESS;
  108. }
  109. //connect
  110. Error_manager Communication_socket_base::communication_connect(std::string connect_string)
  111. {
  112. Error_manager t_error;
  113. int t_socket_result;
  114. //m_socket 和远端通信, 连接远端服务器的端口
  115. t_socket_result = m_socket.connect(connect_string);
  116. if ( t_socket_result <0 )
  117. {
  118. return Error_manager(Error_code::COMMUNICATION_CONNECT_ERROR, Error_level::MINOR_ERROR,
  119. " m_socket.connect error ");
  120. }
  121. LOG(INFO) << " ---Communication_socket_base::communication_connect() connect:: "<< connect_string << " " << this;
  122. return Error_code::SUCCESS;
  123. }
  124. //启动通信, run thread
  125. Error_manager Communication_socket_base::communication_run()
  126. {
  127. m_communication_statu = COMMUNICATION_READY;
  128. //启动4个线程。
  129. //接受线程默认循环, 内部的nn_recv进行等待, 超时1ms
  130. m_receive_condition.reset(false, false, false);
  131. mp_receive_data_thread = new std::thread(&Communication_socket_base::receive_data_thread, this);
  132. //解析线程默认等待, 需要接受线程去唤醒, 超时1ms, 超时后主动遍历m_receive_data_list
  133. m_analysis_data_condition.reset(false, false, false);
  134. mp_analysis_data_thread = new std::thread(&Communication_socket_base::analysis_data_thread, this);
  135. //发送线程默认循环, 内部的wait_and_pop进行等待,
  136. m_send_data_condition.reset(false, true, false);
  137. mp_send_data_thread = new std::thread(&Communication_socket_base::send_data_thread, this);
  138. //封装线程默认等待, ...., 超时1ms, 超时后主动 封装心跳和状态信息,
  139. m_encapsulate_data_condition.reset(false, false, false);
  140. mp_encapsulate_data_thread = new std::thread(&Communication_socket_base::encapsulate_data_thread, this);
  141. return Error_code::SUCCESS;
  142. }
  143. //反初始化 通信 模块。
  144. Error_manager Communication_socket_base::communication_uninit()
  145. {
  146. //终止list,防止 wait_and_pop 阻塞线程。
  147. m_receive_data_list.termination_list();
  148. m_send_data_list.termination_list();
  149. //杀死4个线程,强制退出
  150. if (mp_receive_data_thread)
  151. {
  152. m_receive_condition.kill_all();
  153. }
  154. if (mp_analysis_data_thread)
  155. {
  156. m_analysis_data_condition.kill_all();
  157. }
  158. if (mp_send_data_thread)
  159. {
  160. m_send_data_condition.kill_all();
  161. }
  162. if (mp_encapsulate_data_thread)
  163. {
  164. m_encapsulate_data_condition.kill_all();
  165. }
  166. //回收4个线程的资源
  167. if (mp_receive_data_thread)
  168. {
  169. mp_receive_data_thread->join();
  170. delete mp_receive_data_thread;
  171. mp_receive_data_thread = NULL;
  172. }
  173. if (mp_analysis_data_thread)
  174. {
  175. mp_analysis_data_thread->join();
  176. delete mp_analysis_data_thread;
  177. mp_analysis_data_thread = 0;
  178. }
  179. if (mp_send_data_thread)
  180. {
  181. mp_send_data_thread->join();
  182. delete mp_send_data_thread;
  183. mp_send_data_thread = NULL;
  184. }
  185. if (mp_encapsulate_data_thread)
  186. {
  187. mp_encapsulate_data_thread->join();
  188. delete mp_encapsulate_data_thread;
  189. mp_encapsulate_data_thread = NULL;
  190. }
  191. //清空list
  192. m_receive_data_list.clear_and_delete();
  193. m_send_data_list.clear_and_delete();
  194. m_communication_statu = COMMUNICATION_UNKNOW;
  195. m_socket.close();
  196. return Error_code::SUCCESS;
  197. }
  198. void Communication_socket_base::set_analysis_cycle_time(unsigned int analysis_cycle_time)
  199. {
  200. m_analysis_cycle_time = analysis_cycle_time;
  201. }
  202. void Communication_socket_base::set_encapsulate_cycle_time(unsigned int encapsulate_cycle_time)
  203. {
  204. m_encapsulate_cycle_time = encapsulate_cycle_time;
  205. }
  206. //mp_receive_data_thread 接受线程执行函数,
  207. //receive_data_thread 内部线程负责接受消息
  208. void Communication_socket_base::receive_data_thread()
  209. {
  210. LOG(INFO) << " Communication_socket_base::receive_data_thread start "<< this;
  211. //通信接受线程, 负责接受socket消息, 并存入 m_receive_data_list
  212. while (m_receive_condition.is_alive())
  213. {
  214. m_receive_condition.wait_for_ex(std::chrono::microseconds(1));
  215. if ( m_receive_condition.is_alive() )
  216. {
  217. std::this_thread::yield();
  218. std::string t_receive_string;
  219. {//这个大括号表示只对 recv 和 send 加锁, 不要因为后面的复杂逻辑影响通信效率
  220. std::unique_lock<std::mutex> lk(m_mutex);
  221. //flags为1, 非阻塞接受消息, 如果接收到消息, 那么接受数据长度大于0
  222. t_receive_string = m_socket.recv<std::string>(1);
  223. }
  224. if ( t_receive_string.size()>0 )
  225. {
  226. //如果这里接受到了消息, 在这提前解析消息最前面的Base_msg (消息公共内容), 用于后续的check
  227. message::Base_msg t_base_msg;
  228. if( t_base_msg.ParseFromString(t_receive_string) )
  229. {
  230. //第一次解析之后转化为, Communication_message, 自定义的通信消息格式
  231. Communication_message * tp_communication_message = new Communication_message;
  232. tp_communication_message->reset(t_base_msg.base_info(), t_receive_string);
  233. //检查消息是否有效, 主要检查消息类型和接受者, 判断这条消息是不是给我的.
  234. if ( check_msg(tp_communication_message) == SUCCESS )
  235. {
  236. bool is_push = m_receive_data_list.push(tp_communication_message);
  237. //push成功之后, tp_communication_message内存的管理权限交给链表, 如果失败就要回收内存
  238. if ( is_push )
  239. {
  240. //唤醒解析线程一次,
  241. m_analysis_data_condition.notify_all(false, true);
  242. }
  243. else
  244. {
  245. // push失败, 就要回收内存
  246. delete(tp_communication_message);
  247. tp_communication_message = NULL;
  248. // return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR,
  249. // " m_receive_data_list.push error ");
  250. }
  251. }
  252. else
  253. {
  254. delete(tp_communication_message);
  255. tp_communication_message = NULL;
  256. }
  257. }
  258. //解析失败, 就当做什么也没发生, 认为接收消息无效,
  259. }
  260. //没有接受到消息, 返回空字符串
  261. }
  262. }
  263. LOG(INFO) << " Communication_socket_base::receive_data_thread end "<< this;
  264. return;
  265. }
  266. //检查消息是否有效, 主要检查消息类型和接受者, 判断这条消息是不是给我的.
  267. Error_manager Communication_socket_base::check_msg(Communication_message* p_msg)
  268. {
  269. //通过 p_msg->get_message_type() 和 p_msg->get_receiver() 判断这条消息是不是给我的.
  270. //子类重载时, 增加自己模块的判断逻辑, 以后再写.
  271. if ( p_msg->get_message_type() == Communication_message::Message_type::eBase_msg
  272. && p_msg->get_receiver() == Communication_message::Communicator::eMain )
  273. {
  274. return Error_code::SUCCESS;
  275. }
  276. else
  277. {
  278. //无效的消息,
  279. return Error_manager(Error_code::INVALID_MESSAGE, Error_level::NEGLIGIBLE_ERROR,
  280. " INVALID_MESSAGE error "); }
  281. }
  282. //mp_analysis_data_thread 解析线程执行函数,
  283. //analysis_data_thread 内部线程负责解析消息
  284. void Communication_socket_base::analysis_data_thread()
  285. {
  286. LOG(INFO) << " Communication_socket_base::analysis_data_thread start "<< this;
  287. //通信解析线程, 负责巡检m_receive_data_list, 并解析和处理消息
  288. while (m_analysis_data_condition.is_alive())
  289. {
  290. bool t_pass_flag = m_analysis_data_condition.wait_for_millisecond(m_analysis_cycle_time);
  291. if ( m_analysis_data_condition.is_alive() )
  292. {
  293. std::this_thread::yield();
  294. //如果解析线程被主动唤醒, 那么就表示 收到新的消息, 那就遍历整个链表
  295. if ( t_pass_flag )
  296. {
  297. analysis_receive_list();
  298. }
  299. //如果解析线程超时通过, 那么就定时处理链表残留的消息,
  300. else
  301. {
  302. analysis_receive_list();
  303. }
  304. }
  305. }
  306. LOG(INFO) << " Communication_socket_base::analysis_data_thread end "<< this;
  307. return;
  308. }
  309. //循环接受链表, 解析消息,
  310. Error_manager Communication_socket_base::analysis_receive_list()
  311. {
  312. Error_manager t_error;
  313. if ( m_receive_data_list.m_termination_flag )
  314. {
  315. return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR,
  316. " Communication_socket_base::analysis_receive_list error ");
  317. }
  318. else
  319. {
  320. std::unique_lock<std::mutex> lk(m_receive_data_list.m_mutex);
  321. for (auto iter = m_receive_data_list.m_data_list.begin(); iter != m_receive_data_list.m_data_list.end(); )
  322. {
  323. Communication_message* tp_msg = **iter;
  324. if ( tp_msg == NULL )
  325. {
  326. iter = m_receive_data_list.m_data_list.erase(iter);
  327. //注:erase 删除当前 iter 之后返回下一个节点,当前的 iter 无效化,
  328. }
  329. else
  330. {
  331. //检查消息是否可以被处理
  332. t_error = check_executer(tp_msg);
  333. if ( t_error == SUCCESS)
  334. {
  335. //处理消息
  336. t_error = execute_msg(tp_msg);
  337. // if ( t_error )
  338. // {
  339. // //执行结果不管
  340. // }
  341. // else
  342. // {
  343. // //执行结果不管
  344. // }
  345. delete(tp_msg);
  346. tp_msg = NULL;
  347. iter = m_receive_data_list.m_data_list.erase(iter);
  348. //注:erase 删除当前 iter 之后返回下一个节点,当前的 iter 无效化,
  349. }
  350. else if( t_error == COMMUNICATION_EXCUTER_IS_BUSY)
  351. {
  352. //处理器正忙, 那就不做处理, 直接处理下一个
  353. //注:这条消息就被保留了下来, wait_for_millisecond 超时通过之后, 会循环检查残留的消息.
  354. iter++;
  355. }
  356. else //if( t_error == COMMUNICATION_ANALYSIS_TIME_OUT )
  357. {
  358. //超时了就直接删除
  359. delete(tp_msg);
  360. tp_msg = NULL;
  361. iter = m_receive_data_list.m_data_list.erase(iter);
  362. //注:erase 删除当前 iter 之后返回下一个节点,当前的 iter 无效化,
  363. //注:消息删除之后, 不需要发送答复消息, 发送方也会有超时处理的, 只有 execute_msg 里面可以答复消息
  364. }
  365. }
  366. }
  367. }
  368. return Error_code::SUCCESS;
  369. }
  370. //检查执行者的状态, 判断能否处理这条消息, 需要子类重载
  371. Error_manager Communication_socket_base::check_executer(Communication_message* p_msg)
  372. {
  373. //检查对应模块的状态, 判断是否可以处理这条消息
  374. //同时也要判断是否超时, 超时返回 COMMUNICATION_ANALYSIS_TIME_OUT
  375. //如果处理器正在忙别的, 那么返回 COMMUNICATION_EXCUTER_IS_BUSY
  376. if ( p_msg->is_over_time() )
  377. {
  378. std::cout << "Communication_socket_base::check_msg p_buf = " << p_msg->get_message_buf() << std::endl;
  379. std::cout << "Communication_socket_base::check_msg size = " << p_msg->get_message_buf().size() << std::endl;
  380. std::cout << "COMMUNICATION_ANALYSIS_TIME_OUT , " << std::endl;
  381. return Error_code::COMMUNICATION_ANALYSIS_TIME_OUT;
  382. }
  383. else
  384. {
  385. bool executer_is_ready = false;
  386. //通过 p_msg->get_message_type() 和 p_msg->get_receiver() 找到处理模块的实例对象, 查询执行人是否可以处理这条消息
  387. //这里子类重载时, 增加判断逻辑, 以后再写.
  388. executer_is_ready = true;
  389. std::cout << "Communication_socket_base::check_msg p_buf = " << p_msg->get_message_buf() << std::endl;
  390. std::cout << "Communication_socket_base::check_msg size = " << p_msg->get_message_buf().size() << std::endl;
  391. if ( executer_is_ready )
  392. {
  393. std::cout << "executer_is_ready , " << std::endl;
  394. return Error_code::SUCCESS;
  395. }
  396. else
  397. {
  398. std::cout << "executer_is_busy , " << std::endl;
  399. return Error_code::COMMUNICATION_EXCUTER_IS_BUSY;
  400. }
  401. }
  402. return Error_code::SUCCESS;
  403. }
  404. //处理消息
  405. Error_manager Communication_socket_base::execute_msg(Communication_message* p_msg)
  406. {
  407. //先将 p_msg 转化为 对应的格式, 使用对应模块的protobuf来二次解析
  408. // 不能一直使用 Communication_message* p_msg, 这个是要销毁的
  409. //然后处理这个消息, 就是调用对应模块的 execute 接口函数
  410. //执行结果不管, 如果需要答复, 那么对应模块 在自己内部 封装一条消息发送即可.
  411. //子类重载, 需要完全重写, 以后再写.
  412. //注注注注注意了, 本模块只是用来做通信,
  413. //在做处理消息的时候, 可能会调用执行者的接口函数,
  414. //这里不应该长时间阻塞或者处理复杂的逻辑,
  415. //请执行者另开线程来处理任务.
  416. std::cout << "Communication_socket_base::excute_msg p_buf = " << p_msg->get_message_buf() << std::endl;
  417. std::cout << "Communication_socket_base::excute_msg size = " << p_msg->get_message_buf().size() << std::endl;
  418. return Error_code::SUCCESS;
  419. }
  420. //mp_send_data_thread 发送线程执行函数,
  421. //send_data_thread 内部线程负责发送消息
  422. void Communication_socket_base::send_data_thread()
  423. {
  424. LOG(INFO) << " Communication_socket_base::send_data_thread start "<< this;
  425. //通信发送线程, 负责巡检m_send_data_list, 并发送消息
  426. while (m_send_data_condition.is_alive())
  427. {
  428. m_send_data_condition.wait();
  429. if ( m_send_data_condition.is_alive() )
  430. {
  431. std::this_thread::yield();
  432. Communication_message* tp_msg = NULL;
  433. //这里 wait_and_pop 会使用链表内部的 m_data_cond 条件变量来控制等待,
  434. //封装线程使用push的时候, 会唤醒线程并通过等待, 此时 m_send_data_condition 是一直通过的.
  435. //如果需要退出, 那么就要 m_send_data_list.termination_list(); 和 m_send_data_condition.kill_all();
  436. bool is_pop = m_send_data_list.wait_and_pop(tp_msg);
  437. if ( is_pop )
  438. {
  439. if ( tp_msg != NULL )
  440. {
  441. {//这个大括号表示只对 recv 和 send 加锁, 不要因为后面的复杂逻辑影响通信效率
  442. std::unique_lock<std::mutex> lk(m_mutex);
  443. m_socket.send(tp_msg->get_message_buf());
  444. }
  445. delete(tp_msg);
  446. tp_msg = NULL;
  447. }
  448. }
  449. else
  450. {
  451. //没有取出, 那么应该就是 m_termination_flag 结束了
  452. // return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR,
  453. // " Communication_socket_base::send_data_thread() error ");
  454. }
  455. }
  456. }
  457. LOG(INFO) << " Communication_socket_base::send_data_thread end "<< this;
  458. return;
  459. }
  460. //mp_encapsulate_data_thread 封装线程执行函数,
  461. //encapsulate_data_thread 内部线程负责封装消息
  462. void Communication_socket_base::encapsulate_data_thread()
  463. {
  464. LOG(INFO) << " Communication_socket_base::encapsulate_data_thread start "<< this;
  465. //通信封装线程, 负责定时封装消息, 并存入 m_send_data_list
  466. while (m_encapsulate_data_condition.is_alive())
  467. {
  468. bool t_pass_flag = m_encapsulate_data_condition.wait_for_millisecond(m_encapsulate_cycle_time);
  469. if ( m_encapsulate_data_condition.is_alive() )
  470. {
  471. std::this_thread::yield();
  472. //如果封装线程被主动唤醒, 那么就表示 需要主动发送消息,
  473. if ( t_pass_flag )
  474. {
  475. //主动发送消息,
  476. }
  477. //如果封装线程超时通过, 那么就定时封装心跳和状态信息
  478. else
  479. {
  480. encapsulate_send_data();
  481. }
  482. }
  483. }
  484. LOG(INFO) << " Communication_socket_base::encapsulate_data_thread end "<< this;
  485. return;
  486. }
  487. //定时封装发送消息, 一般为心跳和状态信息, 需要子类重载
  488. Error_manager Communication_socket_base::encapsulate_send_data()
  489. {
  490. // char buf[256] = {0};
  491. // static unsigned int t_heartbeat = 0;
  492. // sprintf(buf, "Communication_socket_base, heartbeat = %d\0\0\0, test\0", t_heartbeat);
  493. // t_heartbeat++;
  494. return SUCCESS;
  495. message::Base_msg t_base_msg;
  496. t_base_msg.mutable_base_info()->set_msg_type(message::Message_type::eBase_msg);
  497. t_base_msg.mutable_base_info()->set_timeout_ms(5000);
  498. t_base_msg.mutable_base_info()->set_sender(message::Communicator::eMain);
  499. t_base_msg.mutable_base_info()->set_receiver(message::Communicator::eMain);
  500. Communication_message* tp_msg = new Communication_message(t_base_msg.SerializeAsString());
  501. bool is_push = m_send_data_list.push(tp_msg);
  502. if ( is_push == false )
  503. {
  504. delete(tp_msg);
  505. tp_msg = NULL;
  506. return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR,
  507. " Communication_socket_base::encapsulate_msg error ");
  508. }
  509. return Error_code::SUCCESS;
  510. }
  511. //封装消息, 需要子类重载
  512. Error_manager Communication_socket_base::encapsulate_msg(std::string message)
  513. {
  514. Communication_message* tp_msg = new Communication_message(message);
  515. bool is_push = m_send_data_list.push(tp_msg);
  516. if ( is_push == false )
  517. {
  518. delete(tp_msg);
  519. tp_msg = NULL;
  520. return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR,
  521. " Communication_socket_base::encapsulate_msg error ");
  522. }
  523. return Error_code::SUCCESS;
  524. }
  525. //封装消息, 需要子类重载
  526. Error_manager Communication_socket_base::encapsulate_msg(Communication_message* p_msg)
  527. {
  528. Communication_message* tp_msg = new Communication_message(*p_msg);
  529. bool is_push = m_send_data_list.push(tp_msg);
  530. if ( is_push == false )
  531. {
  532. delete(tp_msg);
  533. tp_msg = NULL;
  534. return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR,
  535. " Communication_socket_base::encapsulate_msg error ");
  536. }
  537. return Error_code::SUCCESS;
  538. }