Terminal_communication.cpp 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351
  1. //
  2. // Created by zx on 2020/7/13.
  3. //
  4. #include "Terminal_communication.h"
  5. #include "process_message.pb.h"
  6. #include <condition_variable>
  7. Terminal_communication::Terminal_communication()
  8. :m_update_msg_map_thread(nullptr)
  9. {
  10. if(m_update_msg_map_thread== nullptr)
  11. {
  12. m_publish_exit_condition.reset(false, false, false);
  13. m_update_msg_map_thread=new std::thread(thread_update_map_function,this);
  14. }
  15. }
  16. Terminal_communication::~Terminal_communication(){}
  17. //重载函数
  18. Error_manager Terminal_communication::encapsulate_msg(Communication_message* message)
  19. {
  20. Error_manager code;
  21. switch (message->get_message_type())
  22. {
  23. case Communication_message::eStore_command_request_msg:
  24. {
  25. message::Store_command_request_msg request;
  26. request.ParseFromString(message->get_message_buf());
  27. //清空记录
  28. m_store_response_msg=message::Store_command_response_msg();
  29. //发送请求
  30. code= Communication_socket_base::encapsulate_msg(message);
  31. break;
  32. }
  33. case Communication_message::ePickup_command_request_msg:
  34. {
  35. message::Pickup_command_request_msg request;
  36. request.ParseFromString(message->get_message_buf());
  37. //清空记录
  38. m_pickup_response_msg=message::Pickup_command_response_msg();
  39. //发送请求
  40. code= Communication_socket_base::encapsulate_msg(message);
  41. break;
  42. }
  43. default:
  44. code= Error_manager(ERROR,NEGLIGIBLE_ERROR,"terminal message table is not exist");
  45. }
  46. return code;
  47. }
  48. Error_manager Terminal_communication::execute_msg(Communication_message* p_msg)
  49. {
  50. if(p_msg->get_message_type()==Communication_message::eStore_command_response_msg)
  51. {
  52. message::Store_command_response_msg response;
  53. if(false==response.ParseFromString(p_msg->get_message_buf()))
  54. {
  55. return Error_manager(ERROR,CRITICAL_ERROR,"停车指令反馈信息解析错误");
  56. }
  57. message::Base_info base_info=response.base_info();
  58. if(base_info.sender()==message::eMain && base_info.receiver()==message::eTerminor)
  59. {
  60. message::Error_manager error_code=response.code();
  61. if(error_code.error_code()==0)
  62. {
  63. m_store_response_msg=response;
  64. return SUCCESS;
  65. }
  66. }
  67. }
  68. if(p_msg->get_message_type()==Communication_message::ePickup_command_response_msg)
  69. {
  70. message::Pickup_command_response_msg response;
  71. if(false==response.ParseFromString(p_msg->get_message_buf()))
  72. {
  73. return Error_manager(ERROR,CRITICAL_ERROR,"停车指令反馈信息解析错误");
  74. }
  75. message::Base_info base_info=response.base_info();
  76. if(base_info.sender()==message::eMain && base_info.receiver()==message::eTerminor)
  77. {
  78. message::Error_manager error_code=response.code();
  79. if(error_code.error_code()==0)
  80. {
  81. m_pickup_response_msg=response;
  82. return SUCCESS;
  83. }
  84. }
  85. }
  86. if(p_msg->get_message_type()==Communication_message::eStoring_process_statu_msg)
  87. {
  88. message::Storing_process_statu_msg msg;
  89. if(msg.ParseFromString(p_msg->get_message_buf())==false)
  90. {
  91. std::cout<<" ERROR : Storing_process_statu_msg 失败"<<std::endl;
  92. }
  93. message::Base_info base_info=msg.base_info();
  94. if(base_info.sender()==message::eMain)
  95. {
  96. if(m_storing_statu_map.find(msg.license())==false)
  97. {
  98. m_storing_license_queue.push(msg.license());
  99. }
  100. m_storing_statu_map[msg.license()]=msg;
  101. std::chrono::system_clock::time_point time_point=std::chrono::system_clock::now();
  102. m_storing_statu_time_point_map[msg.license()]=time_point;
  103. }
  104. }
  105. if(p_msg->get_message_type()==Communication_message::ePicking_process_statu_msg)
  106. {
  107. message::Picking_process_statu_msg msg;
  108. if(msg.ParseFromString(p_msg->get_message_buf())==false)
  109. {
  110. std::cout<<" ERROR : Picking_process_statu_msg 失败"<<std::endl;
  111. }
  112. message::Base_info base_info=msg.base_info();
  113. if(base_info.sender()==message::eMain)
  114. {
  115. if(m_picking_statu_map.find(msg.license())==false)
  116. {
  117. m_picking_license_queue.push(msg.license());
  118. }
  119. m_picking_statu_map[msg.license()]=msg;
  120. std::chrono::system_clock::time_point time_point=std::chrono::system_clock::now();
  121. m_picking_statu_time_point_map[msg.license()]=time_point;
  122. }
  123. }
  124. return SUCCESS;
  125. return Error_manager(FAILED,MINOR_ERROR,"terminal communication 未知消息");
  126. }
  127. /*
  128. * 检测消息是否可被处理
  129. */
  130. Error_manager Terminal_communication::check_msg(Communication_message* p_msg)
  131. {
  132. return SUCCESS;
  133. }
  134. /*
  135. * 心跳发送函数,重载
  136. */
  137. Error_manager Terminal_communication::encapsulate_send_data()
  138. {
  139. return SUCCESS;
  140. }
  141. //检查消息是否可以被解析, 需要重载
  142. Error_manager Terminal_communication::check_executer(Communication_message* p_msg)
  143. {
  144. return SUCCESS;
  145. }
  146. /*
  147. * 发送停车指令请求
  148. */
  149. Error_manager Terminal_communication::store_request(message::Store_command_request_msg& request,message::Store_command_response_msg& response)
  150. {
  151. if(request.has_locate_information()==false ||
  152. request.has_car_info()==false)
  153. {
  154. return Error_manager(ERROR,CRITICAL_ERROR,"停车指令请求消息缺少必要信息");
  155. }
  156. Error_manager code;
  157. Communication_message message;
  158. message.reset(request.base_info(),request.SerializeAsString());
  159. int timeout=1000*3;
  160. code=encapsulate_msg(&message);
  161. if(code!=SUCCESS)
  162. return code;
  163. //循环查询请求是否被处理
  164. auto start_time=std::chrono::system_clock::now();
  165. double time=0;
  166. do {
  167. //查询到记录
  168. ///查询是否存在,并且删除该记录,
  169. //判断是否接收到回应,若回应信息被赋值则证明有回应
  170. if (m_store_response_msg.has_base_info() && m_store_response_msg.has_code()) {
  171. message::Base_info response_base = m_store_response_msg.base_info();
  172. //检查类型是否匹配
  173. if (response_base.msg_type() != message::eStore_command_response_msg) {
  174. return Error_manager(ERROR, CRITICAL_ERROR,
  175. "停车指令反馈消息 response msg type error");
  176. }
  177. //检查基本信息是否匹配
  178. if (response_base.sender() != message::eMain ||
  179. response_base.receiver() != message::eTerminor ) {
  180. return Error_manager(PARKSPACE_RELEASE_RESPONSE_INFO_ERROR, MAJOR_ERROR,
  181. "parkspace store response msg info error");
  182. }
  183. response.CopyFrom(m_store_response_msg);
  184. m_store_response_msg.clear_base_info();
  185. return SUCCESS;
  186. }
  187. auto end_time = std::chrono::system_clock::now();
  188. auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
  189. time = 1000.0 * double(duration.count()) * std::chrono::microseconds::period::num /
  190. std::chrono::microseconds::period::den;
  191. std::this_thread::yield();
  192. usleep(1000);
  193. }while(time<double(timeout));
  194. //超时,删除记录,返回错误
  195. return Error_manager(RESPONSE_TIMEOUT,MINOR_ERROR,"parkspace store request timeout");
  196. }
  197. /*
  198. * 发送取车指令请求
  199. */
  200. Error_manager Terminal_communication::pickup_request(message::Pickup_command_request_msg& request,message::Pickup_command_response_msg& response)
  201. {
  202. if(request.has_car_info()==false)
  203. {
  204. return Error_manager(ERROR,CRITICAL_ERROR,"取车指令请求消息缺少车辆信息");
  205. }
  206. Error_manager code;
  207. Communication_message message;
  208. message.reset(request.base_info(),request.SerializeAsString());
  209. int timeout=1000*3;
  210. code=encapsulate_msg(&message);
  211. if(code!=SUCCESS)
  212. return code;
  213. // m_statu=eTerminal_picking;
  214. //循环查询请求是否被处理
  215. auto start_time=std::chrono::system_clock::now();
  216. double time=0;
  217. do {
  218. //查询到记录
  219. ///查询是否存在,并且删除该记录,
  220. //判断是否接收到回应,若回应信息被赋值则证明有回应
  221. if (m_pickup_response_msg.has_base_info() && m_pickup_response_msg.has_code())
  222. {
  223. message::Base_info response_base = m_pickup_response_msg.base_info();
  224. //检查类型是否匹配
  225. if (response_base.msg_type() != message::ePickup_command_response_msg) {
  226. return Error_manager(ERROR, CRITICAL_ERROR,
  227. "停车指令反馈消息 response msg type error");
  228. }
  229. //检查基本信息是否匹配
  230. if (response_base.sender() != message::eMain ||
  231. response_base.receiver() != message::eTerminor ) {
  232. return Error_manager(PARKSPACE_RELEASE_RESPONSE_INFO_ERROR, MAJOR_ERROR,
  233. "parkspace release response msg info error");
  234. }
  235. response.CopyFrom(m_pickup_response_msg);
  236. m_pickup_response_msg.clear_base_info();
  237. //m_pickup_response_msg=message::Pickup_command_response_msg();
  238. return SUCCESS;
  239. }
  240. auto end_time = std::chrono::system_clock::now();
  241. auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
  242. time = 1000.0 * double(duration.count()) * std::chrono::microseconds::period::num /
  243. std::chrono::microseconds::period::den;
  244. std::this_thread::yield();
  245. usleep(1000);
  246. }while(time<double(timeout));
  247. //超时,删除记录,返回错误
  248. return Error_manager(RESPONSE_TIMEOUT,MINOR_ERROR,"parkspace release request timeout");
  249. }
  250. /*
  251. * 获取停车流程状态
  252. */
  253. Error_manager Terminal_communication::get_storing_statu(std::string car_license,message::Storing_process_statu_msg& msg)
  254. {
  255. if(m_storing_statu_map.find(car_license,msg)) {
  256. return SUCCESS;
  257. }
  258. return FAILED;
  259. }
  260. /*
  261. * 获取取车流程状态
  262. */
  263. Error_manager Terminal_communication::get_picking_statu(std::string car_license,message::Picking_process_statu_msg& msg)
  264. {
  265. if(m_picking_statu_map.find(car_license,msg))
  266. return SUCCESS;
  267. return FAILED;
  268. }
  269. void Terminal_communication::update_map()
  270. {
  271. while(false==m_publish_exit_condition.wait_for_ex(std::chrono::milliseconds(200)))
  272. {
  273. std::string license;
  274. if (m_storing_license_queue.try_pop(license))
  275. {
  276. std::chrono::system_clock::time_point t1 = std::chrono::system_clock::now();
  277. std::chrono::system_clock::time_point start = m_storing_statu_time_point_map[license];
  278. auto duration = std::chrono::duration_cast<std::chrono::microseconds>(t1 - start);
  279. double time = 1000.0 * double(duration.count()) * std::chrono::microseconds::period::num /
  280. std::chrono::microseconds::period::den;
  281. if (time > 3000) {
  282. m_storing_statu_time_point_map.erase(license);
  283. m_storing_statu_map.erase(license);
  284. }
  285. else
  286. {
  287. m_storing_license_queue.push(license);
  288. }
  289. }
  290. if (m_picking_license_queue.try_pop(license))
  291. {
  292. std::chrono::system_clock::time_point t1 = std::chrono::system_clock::now();
  293. std::chrono::system_clock::time_point start = m_picking_statu_time_point_map[license];
  294. auto duration = std::chrono::duration_cast<std::chrono::microseconds>(t1 - start);
  295. double time = 1000.0 * double(duration.count()) * std::chrono::microseconds::period::num /
  296. std::chrono::microseconds::period::den;
  297. if (time > 3000) {
  298. m_picking_statu_time_point_map.erase(license);
  299. m_picking_statu_map.erase(license);
  300. }
  301. else
  302. {
  303. m_picking_license_queue.push(license);
  304. }
  305. }
  306. }
  307. }
  308. void Terminal_communication::thread_update_map_function(Terminal_communication *terminal) {
  309. terminal->update_map();
  310. }