notify_excutor.cpp 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. //
  2. // Created by zx on 2020/7/21.
  3. //
  4. #include "notify_excutor.h"
  5. #include "uniq_key.h"
  6. Notify_excutor::~Notify_excutor(){}
  7. Error_manager Notify_excutor::notify_request(message::Notify_request_msg& request,
  8. message::Notify_response_msg& result,Thread_condition& cancel_condition)
  9. {
  10. /*
  11. * 检查request合法性,以及模块状态
  12. */
  13. if(request.base_info().sender()!=message::eMain||request.base_info().receiver()!=message::eNotify)
  14. return Error_manager(ERROR,MINOR_ERROR,"notify request invalid");
  15. request.set_command_key(create_key());
  16. if(m_response_table.find(request.command_key())==true)
  17. return Error_manager(ERROR,MAJOR_ERROR," notify request repeated");
  18. //设置超时,若没有设置,默认60000000 1000分钟
  19. int timeout=request.base_info().has_timeout_ms()?request.base_info().timeout_ms():60000000;
  20. //向测量节点发送测量请求,并记录请求
  21. Error_manager code;
  22. Communication_message message;
  23. message::Base_info base_msg;
  24. base_msg.set_msg_type(message::eNotify_request_msg);
  25. base_msg.set_sender(message::eMain);
  26. base_msg.set_receiver(message::eNotify);
  27. base_msg.set_timeout_ms(timeout);
  28. message.reset(base_msg,request.SerializeAsString());
  29. //发送请求
  30. code= Message_communicator::get_instance_pointer()->send_msg(&message);
  31. if(code!=SUCCESS)
  32. {
  33. m_response_table.erase(request.command_key());
  34. return code;
  35. }
  36. return SUCCESS;
  37. m_response_table[request.command_key()]=message::Notify_response_msg();
  38. //循环查询请求是否被处理
  39. auto start_time=std::chrono::system_clock::now();
  40. double time=0;
  41. do{
  42. //查询到记录
  43. message::Notify_response_msg response;
  44. ///查询是否存在,并且删除该记录,
  45. if(m_response_table.find(request.command_key(),response))
  46. {
  47. //判断是否接收到回应,若回应信息被赋值则证明有回应
  48. if (response.has_base_info() && response.has_command_key())
  49. {
  50. message::Base_info response_base = response.base_info();
  51. //检查类型是否匹配
  52. if (response_base.msg_type() != message::eNotify_response_msg) {
  53. return Error_manager(ERROR, CRITICAL_ERROR,
  54. "notify response basemsg type error");
  55. }
  56. //检查基本信息是否匹配
  57. if (response_base.sender() != message::eNotify ||
  58. response_base.receiver() != message::eMain ||
  59. response.command_key() != request.command_key()) {
  60. return Error_manager(ERROR, MAJOR_ERROR,
  61. "notify response basemsg info error");
  62. }
  63. result = response;
  64. m_response_table.erase(request.command_key());
  65. return SUCCESS;
  66. }
  67. }
  68. else
  69. {
  70. //未查询到记录,任务已经被提前取消,记录被删除
  71. return Error_manager(TASK_CANCEL,MINOR_ERROR,"notify request canceled");
  72. }
  73. auto end_time=std::chrono::system_clock::now();
  74. auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
  75. time=1000.0*double(duration.count()) * std::chrono::microseconds::period::num / std::chrono::microseconds::period::den;
  76. std::this_thread::yield();
  77. if(time>double(timeout))
  78. {
  79. m_response_table.erase(request.command_key());
  80. return Error_manager(RESPONSE_TIMEOUT,MINOR_ERROR,"notify request timeout");
  81. }
  82. }while(cancel_condition.wait_for_ex(std::chrono::milliseconds(1))==false);
  83. m_response_table.erase(request.command_key());
  84. return Error_manager(TASK_CANCEL,MINOR_ERROR,"notify request task canceled");
  85. }
  86. /*
  87. * 提前取消请求
  88. */
  89. Error_manager Notify_excutor::cancel_request(message::Notify_request_msg& request)
  90. {
  91. message::Notify_response_msg t_response;
  92. if(m_response_table.find(request.command_key(),t_response))
  93. {
  94. m_response_table.erase(request.command_key());
  95. }
  96. return SUCCESS;
  97. }
  98. Error_manager Notify_excutor::check_node_statu()
  99. {
  100. std::chrono::system_clock::time_point time_now=std::chrono::system_clock::now();
  101. auto durantion=time_now-m_notify_statu_time;
  102. if(m_notify_statu_msg.has_base_info()== false
  103. || durantion>std::chrono::seconds(5) || m_notify_statu_msg.has_error_manager()==false)
  104. {
  105. return Error_manager(DISCONNECT,MINOR_ERROR,"取车等候区通知节点通讯断开");
  106. }
  107. if(m_notify_statu_msg.error_manager().error_code()!=SUCCESS)
  108. {
  109. return Error_manager(ERROR,MINOR_ERROR,"取车等候区通知节点故障");
  110. }
  111. return SUCCESS;
  112. }
  113. Notify_excutor::Notify_excutor()
  114. {
  115. }
  116. Error_manager Notify_excutor::consume_msg(Communication_message* p_msg)
  117. {
  118. if(p_msg== nullptr)
  119. return Error_manager(POINTER_IS_NULL,CRITICAL_ERROR,"notify response msg pointer is null");
  120. //response消息
  121. switch (p_msg->get_message_type())
  122. {
  123. ///测量结果反馈消息
  124. case Communication_message::eNotify_response_msg:
  125. {
  126. message::Notify_response_msg response;
  127. response.ParseFromString(p_msg->get_message_buf());
  128. ///查询请求表是否存在,并且更新
  129. if(m_response_table.find_update(response.command_key(),response)==false)
  130. {
  131. return Error_manager(ERROR,NEGLIGIBLE_ERROR,"notify response without request");
  132. }
  133. break;
  134. }
  135. ///测量系统状态
  136. case Communication_message::eNotify_status_msg:
  137. {
  138. message::Notify_status_msg statu_msg;
  139. if(statu_msg.ParseFromString(p_msg->get_message_buf())==false)
  140. {
  141. return Error_manager(ERROR,CRITICAL_ERROR,"取车等候区通知节点状态消息解析失败");
  142. }
  143. /*std::cout<<"plc msg:"<<std::endl;
  144. std::cout<<statu_msg.DebugString()<<std::endl;*/
  145. m_notify_statu_msg=statu_msg;
  146. m_notify_statu_time=std::chrono::system_clock::now();
  147. break;
  148. }
  149. }
  150. return SUCCESS;
  151. }