network_base.cpp 25 KB


  1. //
  2. // Created by huli on 2022/12/30.
  3. //
  4. #include "network_base.h"
  5. #include "../tool/proto_tool.h"
  6. #include "../tool/time_tool.h"
  7. Network_base::Network_base()
  8. {
  9. m_network_status = NETWORK_STATUS_UNKNOW;
  10. mp_receive_data_thread = NULL;
  11. mp_analysis_data_thread = NULL;
  12. mp_send_data_thread = NULL;
  13. mp_encapsulate_data_thread = NULL;
  14. m_analysis_cycle_time = 1000;//默认1000ms,就自动解析(接受list)
  15. m_encapsulate_cycle_time = 1000;//默认1000ms,就自动发送一次状态信息
  16. }
  17. Network_base::~Network_base()
  18. {
  19. network_uninit();
  20. }
  21. //初始化 通信 模块。如下三选一
  22. Error_manager Network_base::network_init()
  23. {
  24. LOG(INFO) << " ---Network_base::network_init() run--- "<< this;
  25. return network_init_from_protobuf(NETKORK_PARAMETER_PATH);
  26. }
  27. //初始化 通信 模块。从文件读取
  28. Error_manager Network_base::network_init_from_protobuf(std::string prototxt_path)
  29. {
  30. Network_proto::Network_parameter_all t_network_parameter_all;
  31. if(! proto_tool::read_proto_param(prototxt_path,t_network_parameter_all) )
  32. {
  33. return Error_manager(NETWORK_READ_PROTOBUF_ERROR,MINOR_ERROR,
  34. "network_init_from_protobuf read_proto_param failed");
  35. }
  36. return network_init_from_protobuf(t_network_parameter_all);
  37. }
  38. //初始化 通信 模块。从protobuf读取
  39. Error_manager Network_base::network_init_from_protobuf(Network_proto::Network_parameter_all & network_parameter_all)
  40. {
  41. LOG(INFO) << " ---Rabbitmq_base::network_init_from_protobuf() run--- "<< this;
  42. m_network_parameter_all = network_parameter_all;
  43. //根据参数创建socket连接
  44. for (int i = 0; i < m_network_parameter_all.network_parameters().network_information_vector_size(); ++i)
  45. {
  46. //导入参数
  47. int t_socket_id = m_network_parameter_all.network_parameters().network_information_vector(i).socket_id();
  48. Network_socket t_network_socket;
  49. t_network_socket.m_network_information = m_network_parameter_all.network_parameters().network_information_vector(i);
  50. //tcp 客户端
  51. if ( t_network_socket.m_network_information.network_mode() == Network_proto::TCP_CLIENT )
  52. {
  53. //创建socket
  54. t_network_socket.m_socket_fd = socket(AF_INET, SOCK_STREAM, 0); //直接创建socket返回给Communication_tcp的成员
  55. if (t_network_socket.m_socket_fd == -1)
  56. {
  57. printf(" create tcp socket failed ");
  58. return Error_manager(Error_code::NETWORK_CREATE_SOCKET_ERROR, Error_level::MINOR_ERROR,
  59. " fun error ");
  60. }
  61. //配置ip地址
  62. sockaddr_in saddr; //设置连接对象的结构体
  63. saddr.sin_family = AF_INET;
  64. saddr.sin_port = htons(t_network_socket.m_network_information.port());
  65. saddr.sin_addr.s_addr = inet_addr(t_network_socket.m_network_information.ip().c_str()); //字符串转整型
  66. set_block(t_network_socket.m_socket_fd, false); //将socket改成非阻塞模式,此时它会立即返回 所以通过fd_set
  67. fd_set rfds, wfds; //文件句柄数组,在这个数组中,存放当前每个文件句柄的状态
  68. if (connect(t_network_socket.m_socket_fd, (sockaddr*)&saddr, sizeof(saddr)) != 0) //此时connect马上返回,状态为未成功连接
  69. {
  70. FD_ZERO(&rfds); //首先把文件句柄的数组置空
  71. FD_ZERO(&wfds);
  72. FD_SET(t_network_socket.m_socket_fd, &rfds); //把sock的网络句柄加入到该句柄数组中
  73. FD_SET(t_network_socket.m_socket_fd, &wfds);
  74. timeval tm; //超时参数的结构体
  75. tm.tv_sec = NETKORK_CONNECT_TIME;//默认5秒
  76. tm.tv_usec = 0;
  77. int selres = select(t_network_socket.m_socket_fd + 1, &rfds, &wfds, NULL, &tm); //(阻塞函数)(监听的文件句柄的最大值加1,可读序列文件列表,可写的序列文件列表,错误处理,超时)使用select监听文件序列set是否有可读可写,这里监听set数组(里面只有sock),只要其中的句柄有一个变得可写(在这里是sock连接成功了以后就会变得可写,就返回),就返回
  78. switch (selres)
  79. {
  80. case -1:
  81. printf("select error\n");
  82. return Error_manager(Error_code::ERROR, Error_level::MINOR_ERROR,
  83. " fun error ");
  84. case 0:
  85. printf("select time out\n");
  86. return Error_manager(Error_code::ERROR, Error_level::MINOR_ERROR,
  87. " fun error ");
  88. default:
  89. if (FD_ISSET(t_network_socket.m_socket_fd, &rfds) || FD_ISSET(t_network_socket.m_socket_fd, &wfds))
  90. {
  91. connect(t_network_socket.m_socket_fd, (sockaddr*)&saddr, sizeof(saddr)); //再次连接一次进行确认
  92. int err = errno;
  93. if (err == EISCONN||err == EINPROGRESS) //已经连接到该套接字 或 套接字为非阻塞套接字,且连接请求没有立即完成
  94. {
  95. //在这里, 就表示连接正常
  96. printf("connect finished(success).\n");
  97. // set_block(t_network_socket.m_socket_fd,true); //成功之后重新把sock改成阻塞模式,以便后面发送/接收数据
  98. t_network_socket.m_network_status = NETWORK_STATUS_READY;
  99. m_network_socket_map[t_socket_id] = t_network_socket;
  100. m_network_socket_map[t_socket_id].m_updata_time = std::chrono::system_clock::now();
  101. }
  102. else
  103. {
  104. printf("connect %s : %d finished(failed). errno = %d\n",t_network_socket.m_network_information.ip().c_str(),t_network_socket.m_network_information.port(),errno);
  105. // printf("FD_ISSET(sock_fd, &rfds): %d\n FD_ISSET(sock_fd, &wfds): %d\n", FD_ISSET(sock_fd, &rfds) , FD_ISSET(sock_fd, &wfds));
  106. return Error_manager(Error_code::ERROR, Error_level::MINOR_ERROR,
  107. " fun error ");
  108. }
  109. }
  110. else
  111. {
  112. printf("connect %s : %d finished(failed).",t_network_socket.m_network_information.ip().c_str(),t_network_socket.m_network_information.port());
  113. return Error_manager(Error_code::ERROR, Error_level::MINOR_ERROR,
  114. " fun error ");
  115. }
  116. }
  117. }
  118. else //连接正常
  119. {
  120. set_block(t_network_socket.m_socket_fd, true); //成功之后重新把sock改成阻塞模式,以便后面发送/接收数据
  121. printf("connect %s : %d finished(success).\n",t_network_socket.m_network_information.ip().c_str(),t_network_socket.m_network_information.port());
  122. }
  123. }
  124. else
  125. {
  126. t_network_socket.m_network_status = NETWORK_STATUS_FAULT;
  127. t_network_socket.m_socket_fd = 0;
  128. m_network_socket_map[t_socket_id] = t_network_socket;
  129. }
  130. }
  131. network_run();
  132. return Error_code::SUCCESS;
  133. }
  134. //启动通信, run thread
  135. Error_manager Network_base::network_run()
  136. {
  137. m_network_status = NETWORK_STATUS_UNKNOW;
  138. //启动4个线程。
  139. //接受线程默认循环, 内部的nn_recv进行等待, 超时1ms
  140. m_receive_condition.reset(false, false, false);
  141. mp_receive_data_thread = new std::thread(&Network_base::receive_data_thread, this);
  142. //解析线程默认等待, 需要接受线程去唤醒, 超时1ms, 超时后主动遍历m_receive_data_list
  143. m_analysis_data_condition.reset(false, false, false);
  144. mp_analysis_data_thread = new std::thread(&Network_base::analysis_data_thread, this);
  145. //发送线程默认循环, 内部的wait_and_pop进行等待,
  146. m_send_data_condition.reset(false, true, false);
  147. mp_send_data_thread = new std::thread(&Network_base::send_data_thread, this);
  148. //封装线程默认等待, ...., 超时1ms, 超时后主动 封装心跳和状态信息,
  149. m_encapsulate_data_condition.reset(false, false, false);
  150. mp_encapsulate_data_thread = new std::thread(&Network_base::encapsulate_data_thread, this);
  151. return Error_code::SUCCESS;
  152. }
  153. //反初始化 通信 模块。
  154. Error_manager Network_base::network_uninit()
  155. {
  156. for (auto iter = m_network_socket_map.begin(); iter != m_network_socket_map.end(); ++iter)
  157. {
  158. int t_socket_fd = iter->second.m_socket_fd;
  159. if (t_socket_fd <= 0)
  160. {
  161. printf("socket %d error \n", t_socket_fd); //打印ip和端口
  162. }
  163. else
  164. {
  165. close(t_socket_fd);
  166. }
  167. }
  168. return Error_code::SUCCESS;
  169. }
  170. //重连, 快速uninit, init
  171. Error_manager Network_base::network_reconnnect()
  172. {
  173. return Error_code::SUCCESS;
  174. }
  175. void Network_base::set_analysis_cycle_time(unsigned int analysis_cycle_time)
  176. {
  177. m_analysis_cycle_time = analysis_cycle_time;
  178. }
  179. void Network_base::set_encapsulate_cycle_time(unsigned int encapsulate_cycle_time)
  180. {
  181. m_encapsulate_cycle_time = encapsulate_cycle_time;
  182. }
  183. bool Network_base::set_block(int socket_fd, bool isblock) //设置阻塞模式 (希望只有在connect的时候是非阻塞的,而接收数据时候是阻塞的)
  184. {
  185. if (socket_fd <= 0)
  186. {
  187. printf(" set tcp socket block failed\n ");
  188. return false;
  189. }
  190. int flags = fcntl(socket_fd, F_GETFL, 0); //获取socket的属性
  191. if (flags < 0)
  192. return false; //获取属性出错
  193. if (isblock)
  194. {
  195. flags = flags&~O_NONBLOCK; //把非阻塞这位设为0
  196. }
  197. else
  198. {
  199. flags = flags | O_NONBLOCK; //把非阻塞这位设为1
  200. }
  201. if (fcntl(socket_fd, F_SETFL, flags))
  202. return false; //把标准位设回去
  203. // if (!isblock)
  204. // printf("set tcp socket not block success\n");
  205. // if (isblock)
  206. // printf("set tcp socket block success\n");
  207. return true;
  208. }
  209. int Network_base::network_recv(int socket_fd, char *buf, int size) //接收数据
  210. {
  211. return recv(socket_fd, buf, size, 0);
  212. }
  213. int Network_base::network_send(int socket_fd, const char *buf, int size) //发送数据
  214. {
  215. int sendedSize = 0; //已发送成功的长度
  216. while (sendedSize != size) //若没发送完成,则从断点开始继续发送 直到完成
  217. {
  218. try
  219. {
  220. int len = send(socket_fd, buf + sendedSize, size - sendedSize, 0);
  221. if (len <= 0)
  222. break;
  223. sendedSize += len;
  224. }
  225. catch (char *str)
  226. {
  227. std::cout << " 断线---" << str<< std::endl;
  228. break;
  229. }
  230. }
  231. return sendedSize;
  232. }
  233. bool Network_base::is_connected(int socket_fd)
  234. {
  235. struct tcp_info info;
  236. int len=sizeof(info);
  237. getsockopt(socket_fd, IPPROTO_TCP, TCP_INFO, &info, (socklen_t *)&len);
  238. if((info.tcpi_state==TCP_ESTABLISHED))
  239. {
  240. return true;
  241. }
  242. return false;
  243. }
  244. //检查网络,如果断连, 就立刻重连, socket_fd会重新分配新的
  245. Error_manager Network_base::check_and_reconnect(int socket_id)
  246. {
  247. int t_socket_fd = m_network_socket_map[socket_id].m_socket_fd;
  248. // if ( is_connected(t_socket_fd) == false )
  249. if ( true )
  250. {
  251. //关闭连接
  252. close(t_socket_fd);
  253. //导入参数
  254. Network_socket t_network_socket;
  255. t_network_socket.m_network_information = m_network_socket_map[socket_id].m_network_information;
  256. //tcp 客户端
  257. if ( t_network_socket.m_network_information.network_mode() == Network_proto::TCP_CLIENT )
  258. {
  259. //创建socket
  260. t_network_socket.m_socket_fd = socket(AF_INET, SOCK_STREAM, 0); //直接创建socket返回给Communication_tcp的成员
  261. if (t_network_socket.m_socket_fd == -1)
  262. {
  263. printf(" create tcp socket failed ");
  264. return Error_manager(Error_code::NETWORK_CREATE_SOCKET_ERROR, Error_level::MINOR_ERROR,
  265. " fun error ");
  266. }
  267. //配置ip地址
  268. sockaddr_in saddr; //设置连接对象的结构体
  269. saddr.sin_family = AF_INET;
  270. saddr.sin_port = htons(t_network_socket.m_network_information.port());
  271. saddr.sin_addr.s_addr = inet_addr(t_network_socket.m_network_information.ip().c_str()); //字符串转整型
  272. set_block(t_network_socket.m_socket_fd, false); //将socket改成非阻塞模式,此时它会立即返回 所以通过fd_set
  273. fd_set rfds, wfds; //文件句柄数组,在这个数组中,存放当前每个文件句柄的状态
  274. if (connect(t_network_socket.m_socket_fd, (sockaddr*)&saddr, sizeof(saddr)) != 0) //此时connect马上返回,状态为未成功连接
  275. {
  276. FD_ZERO(&rfds); //首先把文件句柄的数组置空
  277. FD_ZERO(&wfds);
  278. FD_SET(t_network_socket.m_socket_fd, &rfds); //把sock的网络句柄加入到该句柄数组中
  279. FD_SET(t_network_socket.m_socket_fd, &wfds);
  280. timeval tm; //超时参数的结构体
  281. tm.tv_sec = NETKORK_CONNECT_TIME;//默认5秒
  282. tm.tv_usec = 0;
  283. int selres = select(t_network_socket.m_socket_fd + 1, &rfds, &wfds, NULL, &tm); //(阻塞函数)(监听的文件句柄的最大值加1,可读序列文件列表,可写的序列文件列表,错误处理,超时)使用select监听文件序列set是否有可读可写,这里监听set数组(里面只有sock),只要其中的句柄有一个变得可写(在这里是sock连接成功了以后就会变得可写,就返回),就返回
  284. switch (selres)
  285. {
  286. case -1:
  287. printf("select error\n");
  288. return Error_manager(Error_code::ERROR, Error_level::MINOR_ERROR,
  289. " fun error ");
  290. case 0:
  291. printf("select time out\n");
  292. return Error_manager(Error_code::ERROR, Error_level::MINOR_ERROR,
  293. " fun error ");
  294. default:
  295. if (FD_ISSET(t_network_socket.m_socket_fd, &rfds) || FD_ISSET(t_network_socket.m_socket_fd, &wfds))
  296. {
  297. connect(t_network_socket.m_socket_fd, (sockaddr*)&saddr, sizeof(saddr)); //再次连接一次进行确认
  298. int err = errno;
  299. if (err == EISCONN||err == EINPROGRESS) //已经连接到该套接字 或 套接字为非阻塞套接字,且连接请求没有立即完成
  300. {
  301. //在这里, 就表示连接正常
  302. printf("connect finished(success).\n");
  303. // set_block(t_network_socket.m_socket_fd,true); //成功之后重新把sock改成阻塞模式,以便后面发送/接收数据
  304. t_network_socket.m_network_status = NETWORK_STATUS_READY;
  305. m_network_socket_map[socket_id] = t_network_socket;
  306. m_network_socket_map[socket_id].m_updata_time = std::chrono::system_clock::now();
  307. }
  308. else
  309. {
  310. printf("connect %s : %d finished(failed). errno = %d\n",t_network_socket.m_network_information.ip().c_str(),t_network_socket.m_network_information.port(),errno);
  311. // printf("FD_ISSET(sock_fd, &rfds): %d\n FD_ISSET(sock_fd, &wfds): %d\n", FD_ISSET(sock_fd, &rfds) , FD_ISSET(sock_fd, &wfds));
  312. return Error_manager(Error_code::ERROR, Error_level::MINOR_ERROR,
  313. " fun error ");
  314. }
  315. }
  316. else
  317. {
  318. printf("connect %s : %d finished(failed).",t_network_socket.m_network_information.ip().c_str(),t_network_socket.m_network_information.port());
  319. return Error_manager(Error_code::ERROR, Error_level::MINOR_ERROR,
  320. " fun error ");
  321. }
  322. }
  323. }
  324. else //连接正常
  325. {
  326. set_block(t_network_socket.m_socket_fd, true); //成功之后重新把sock改成阻塞模式,以便后面发送/接收数据
  327. printf("connect %s : %d finished(success).\n",t_network_socket.m_network_information.ip().c_str(),t_network_socket.m_network_information.port());
  328. }
  329. }
  330. else
  331. {
  332. t_network_socket.m_network_status = NETWORK_STATUS_FAULT;
  333. t_network_socket.m_socket_fd = 0;
  334. m_network_socket_map[socket_id] = t_network_socket;
  335. }
  336. }
  337. else
  338. {
  339. m_network_socket_map[socket_id].m_updata_time = std::chrono::system_clock::now();
  340. return Error_code::SUCCESS;
  341. }
  342. return Error_code::SUCCESS;
  343. }
  344. //mp_receive_data_thread 接受线程执行函数,
  345. //receive_data_thread 内部线程负责接受消息
  346. void Network_base::receive_data_thread()
  347. {
  348. LOG(INFO) << " Network_base::receive_data_thread start "<< this;
  349. //通信接受线程, 负责接受socket消息, 并存入 m_receive_data_list
  350. while (m_receive_condition.is_alive())
  351. {
  352. m_receive_condition.wait_for_ex(std::chrono::microseconds(1));
  353. if ( m_receive_condition.is_alive() )
  354. {
  355. std::this_thread::yield();
  356. for (auto iter = m_network_socket_map.begin(); iter != m_network_socket_map.end(); ++iter)
  357. {
  358. //如果超时2秒就检查重连
  359. if ( std::chrono::system_clock::now() - iter->second.m_updata_time > std::chrono::seconds(NETKORK_RECONNECT_OVER_TIME) )
  360. {
  361. Time_tool::get_instance_references().time_start(123);
  362. check_and_reconnect(iter->first);
  363. Time_tool::get_instance_references().time_end(123);
  364. Time_tool::get_instance_references().cout_time_microsecond(123);
  365. }
  366. int recv_len = 0;
  367. char recv_buf[NETWORK_BUFFER_SIZE] = {0};
  368. {//这个大括号表示只对 recv 和 send 加锁, 不要因为后面的复杂逻辑影响通信效率
  369. std::unique_lock<std::mutex> lk(m_mutex);
  370. //接受数据. 必须非阻塞
  371. recv_len = network_recv(iter->second.m_socket_fd, recv_buf, NETWORK_BUFFER_SIZE);
  372. // if ( iter->second.m_network_information.socket_id() == 2 && recv_len !=-1)
  373. // {
  374. // std::cout << " huli test :::: " << " ----------------------------------------------- = " << std::endl;
  375. // std::cout << " huli test :::: " << " socket_id = " << iter->second.m_network_information.socket_id() << std::endl;
  376. // std::cout << " huli test :::: " << " ip = " << iter->second.m_network_information.ip() << std::endl;
  377. // std::cout << " huli test :::: " << " recv_len = " << recv_len << std::endl;
  378. // }
  379. }
  380. if ( recv_len>0 )
  381. {
  382. Network_message * tp_network_message = new Network_message;
  383. tp_network_message->reset(std::string(recv_buf, recv_len),
  384. iter->second.m_network_information.socket_id(),
  385. iter->second.m_network_information.ip(),
  386. iter->second.m_network_information.port());
  387. //检查消息是否有效, 主要检查消息类型和接受者, 判断这条消息是不是给我的.
  388. if ( check_msg(tp_network_message) == SUCCESS )
  389. {
  390. //接受信息成功, 刷新时间
  391. iter->second.m_updata_time = std::chrono::system_clock::now();
  392. bool is_push = m_receive_data_list.push(tp_network_message);
  393. //push成功之后, tp_communication_message内存的管理权限交给链表, 如果失败就要回收内存
  394. if ( is_push )
  395. {
  396. //唤醒解析线程一次,
  397. m_analysis_data_condition.notify_all(false, true);
  398. }
  399. else
  400. {
  401. // push失败, 就要回收内存
  402. delete(tp_network_message);
  403. tp_network_message = NULL;
  404. // return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR,
  405. // " m_receive_data_list.push error ");
  406. }
  407. }
  408. else
  409. {
  410. delete(tp_network_message);
  411. tp_network_message = NULL;
  412. }
  413. }
  414. //没有接受到消息, 返回空字符串
  415. }
  416. }
  417. }
  418. LOG(INFO) << " Network_base::receive_data_thread end "<< this;
  419. return;
  420. }
  421. //检查消息是否有效, 主要检查消息类型和接受者, 判断这条消息是不是给我的.
  422. Error_manager Network_base::check_msg(Network_message* p_msg)
  423. {
  424. return Error_code::SUCCESS;
  425. }
  426. //mp_analysis_data_thread 解析线程执行函数,
  427. //analysis_data_thread 内部线程负责解析消息
  428. void Network_base::analysis_data_thread()
  429. {
  430. LOG(INFO) << " Network_base::analysis_data_thread start "<< this;
  431. //通信解析线程, 负责巡检m_receive_data_list, 并解析和处理消息
  432. while (m_analysis_data_condition.is_alive())
  433. {
  434. bool t_pass_flag = m_analysis_data_condition.wait_for_millisecond(m_analysis_cycle_time);
  435. if ( m_analysis_data_condition.is_alive() )
  436. {
  437. std::this_thread::yield();
  438. //如果解析线程被主动唤醒, 那么就表示 收到新的消息, 那就遍历整个链表
  439. if ( t_pass_flag )
  440. {
  441. analysis_receive_list();
  442. }
  443. //如果解析线程超时通过, 那么就定时处理链表残留的消息,
  444. else
  445. {
  446. analysis_receive_list();
  447. }
  448. }
  449. }
  450. LOG(INFO) << " Network_base::analysis_data_thread end "<< this;
  451. return;
  452. }
  453. //循环接受链表, 解析消息,
  454. Error_manager Network_base::analysis_receive_list()
  455. {
  456. Error_manager t_error;
  457. if ( m_receive_data_list.m_termination_flag )
  458. {
  459. return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR,
  460. " Network_base::analysis_receive_list error ");
  461. }
  462. else
  463. {
  464. std::unique_lock<std::mutex> lk(m_receive_data_list.m_mutex);
  465. for (auto iter = m_receive_data_list.m_data_list.begin(); iter != m_receive_data_list.m_data_list.end(); )
  466. {
  467. Network_message* tp_msg = **iter;
  468. if ( tp_msg == NULL )
  469. {
  470. iter = m_receive_data_list.m_data_list.erase(iter);
  471. //注:erase 删除当前 iter 之后返回下一个节点,当前的 iter 无效化,
  472. }
  473. else
  474. {
  475. //检查消息是否可以被处理
  476. t_error = check_executer(tp_msg);
  477. if ( t_error == SUCCESS)
  478. {
  479. //处理消息
  480. t_error = execute_msg(tp_msg);
  481. // if ( t_error )
  482. // {
  483. // //执行结果不管
  484. // }
  485. // else
  486. // {
  487. // //执行结果不管
  488. // }
  489. delete(tp_msg);
  490. tp_msg = NULL;
  491. iter = m_receive_data_list.m_data_list.erase(iter);
  492. //注:erase 删除当前 iter 之后返回下一个节点,当前的 iter 无效化,
  493. }
  494. else if( t_error == COMMUNICATION_EXCUTER_IS_BUSY)
  495. {
  496. //处理器正忙, 那就不做处理, 直接处理下一个
  497. //注:这条消息就被保留了下来, wait_for_millisecond 超时通过之后, 会循环检查残留的消息.
  498. iter++;
  499. }
  500. else //if( t_error == COMMUNICATION_ANALYSIS_TIME_OUT )
  501. {
  502. //超时了就直接删除
  503. delete(tp_msg);
  504. tp_msg = NULL;
  505. iter = m_receive_data_list.m_data_list.erase(iter);
  506. //注:erase 删除当前 iter 之后返回下一个节点,当前的 iter 无效化,
  507. //注:消息删除之后, 不需要发送答复消息, 发送方也会有超时处理的, 只有 execute_msg 里面可以答复消息
  508. }
  509. }
  510. }
  511. }
  512. return Error_code::SUCCESS;
  513. }
  514. //检查执行者的状态, 判断能否处理这条消息, 需要子类重载
  515. Error_manager Network_base::check_executer(Network_message* p_msg)
  516. {
  517. return Error_code::SUCCESS;
  518. }
  519. //处理消息
  520. Error_manager Network_base::execute_msg(Network_message* p_msg)
  521. {
  522. //先将 p_msg 转化为 对应的格式, 使用对应模块的protobuf来二次解析
  523. // 不能一直使用 Network_message* p_msg, 这个是要销毁的
  524. //然后处理这个消息, 就是调用对应模块的 execute 接口函数
  525. //执行结果不管, 如果需要答复, 那么对应模块 在自己内部 封装一条消息发送即可.
  526. //子类重载, 需要完全重写, 以后再写.
  527. //注注注注注意了, 本模块只是用来做通信,
  528. //在做处理消息的时候, 可能会调用执行者的接口函数,
  529. //这里不应该长时间阻塞或者处理复杂的逻辑,
  530. //请执行者另开线程来处理任务.
  531. std::cout << "Network_base::excute_msg p_buf = " << p_msg->m_message_buf << std::endl;
  532. std::cout << "Network_base::excute_msg size = " << p_msg->m_message_buf.size() << std::endl;
  533. return Error_code::SUCCESS;
  534. }
  535. //mp_send_data_thread 发送线程执行函数,
  536. //send_data_thread 内部线程负责发送消息
  537. void Network_base::send_data_thread()
  538. {
  539. LOG(INFO) << " Network_base::send_data_thread start "<< this;
  540. //通信发送线程, 负责巡检m_send_data_list, 并发送消息
  541. while (m_send_data_condition.is_alive())
  542. {
  543. m_send_data_condition.wait();
  544. if ( m_send_data_condition.is_alive() )
  545. {
  546. std::this_thread::yield();
  547. Network_message* tp_msg = NULL;
  548. //这里 wait_and_pop 会使用链表内部的 m_data_cond 条件变量来控制等待,
  549. //封装线程使用push的时候, 会唤醒线程并通过等待, 此时 m_send_data_condition 是一直通过的.
  550. //如果需要退出, 那么就要 m_send_data_list.termination_list(); 和 m_send_data_condition.kill_all();
  551. bool is_pop = m_send_data_list.wait_and_pop(tp_msg);
  552. if ( is_pop )
  553. {
  554. if ( tp_msg != NULL )
  555. {
  556. {//这个大括号表示只对 recv 和 send 加锁, 不要因为后面的复杂逻辑影响通信效率
  557. std::unique_lock<std::mutex> lk(m_mutex);
  558. network_send(m_network_socket_map[tp_msg->m_socket_id].m_socket_fd,
  559. tp_msg->m_message_buf.c_str(),
  560. tp_msg->m_message_buf.size());
  561. }
  562. delete(tp_msg);
  563. tp_msg = NULL;
  564. }
  565. }
  566. else
  567. {
  568. //没有取出, 那么应该就是 m_termination_flag 结束了
  569. // return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR,
  570. // " Network_base::send_data_thread() error ");
  571. }
  572. }
  573. }
  574. LOG(INFO) << " Network_base::send_data_thread end "<< this;
  575. return;
  576. }
  577. //mp_encapsulate_data_thread 封装线程执行函数,
  578. //encapsulate_data_thread 内部线程负责封装消息
  579. void Network_base::encapsulate_data_thread()
  580. {
  581. LOG(INFO) << " Network_base::encapsulate_data_thread start "<< this;
  582. //通信封装线程, 负责定时封装消息, 并存入 m_send_data_list
  583. while (m_encapsulate_data_condition.is_alive())
  584. {
  585. bool t_pass_flag = m_encapsulate_data_condition.wait_for_millisecond(m_encapsulate_cycle_time);
  586. if ( m_encapsulate_data_condition.is_alive() )
  587. {
  588. std::this_thread::yield();
  589. //如果封装线程被主动唤醒, 那么就表示 需要主动发送消息,
  590. if ( t_pass_flag )
  591. {
  592. //主动发送消息,
  593. }
  594. //如果封装线程超时通过, 那么就定时封装心跳和状态信息
  595. else
  596. {
  597. auto_encapsulate_status();
  598. }
  599. }
  600. }
  601. LOG(INFO) << " Network_base::encapsulate_data_thread end "<< this;
  602. return;
  603. }
  604. //定时封装发送消息, 一般为心跳和状态信息, 需要子类重载
  605. Error_manager Network_base::auto_encapsulate_status()
  606. {
  607. return Error_code::SUCCESS;
  608. }
  609. //封装消息, 需要子类重载
  610. Error_manager Network_base::encapsulate_msg(std::string message, int socket_id)
  611. {
  612. Network_message* tp_msg = new Network_message(message, socket_id);
  613. bool is_push = m_send_data_list.push(tp_msg);
  614. if ( is_push == false )
  615. {
  616. delete(tp_msg);
  617. tp_msg = NULL;
  618. return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR,
  619. " Network_base::encapsulate_msg error ");
  620. }
  621. return Error_code::SUCCESS;
  622. }
  623. //封装消息, 需要子类重载
  624. Error_manager Network_base::encapsulate_msg(Network_message* p_msg)
  625. {
  626. Network_message* tp_msg = new Network_message(*p_msg);
  627. bool is_push = m_send_data_list.push(tp_msg);
  628. if ( is_push == false )
  629. {
  630. delete(tp_msg);
  631. tp_msg = NULL;
  632. return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR,
  633. " Network_base::encapsulate_msg error ");
  634. }
  635. return Error_code::SUCCESS;
  636. }