command_manager.cpp 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466
  1. //
  2. // Created by zx on 2020/7/14.
  3. //
  4. #include <sstream>
  5. #include <iomanip>
  6. #include <parkspace_excutor.h>
  7. #include "command_manager.h"
  8. #include "StoreProcessTask.h"
  9. #include "PickupProcessTask.h"
  10. #include "command_accepter.h"
  11. Command_manager::Command_manager()
  12. :m_thread_queue_process(nullptr)
  13. ,m_system_paused(false)
  14. {
  15. }
  16. Command_manager::~Command_manager()
  17. {
  18. //等待线程池完成
  19. if(m_thread_queue_process!=nullptr) {
  20. m_thread_queue_process->WaitForFinish();
  21. m_thread_queue_process->Stop();
  22. }
  23. }
  24. Error_manager Command_manager::init(setting::System_setting system_setting) {
  25. /*
  26. * 检查参数
  27. */
  28. if (system_setting.has_bind_ip() == false || system_setting.has_entrance_num() == false
  29. || system_setting.has_export_num() == false) {
  30. return Error_manager(ERROR, MAJOR_ERROR, "系统配置错误");
  31. }
  32. if (system_setting.entrance_num() < 0 || system_setting.export_num() < 0) {
  33. return Error_manager(ERROR, MAJOR_ERROR, "系统配置出入口数量错误");
  34. }
  35. //初始化出入口状态为 Enable
  36. m_input_entrance_paused.resize(system_setting.entrance_num());
  37. for(int i=0;i<system_setting.entrance_num();++i)
  38. m_input_entrance_paused[i]=false;
  39. m_output_entrance_paused.resize(system_setting.export_num());
  40. for(int i=0;i<system_setting.export_num();++i)
  41. m_output_entrance_paused[i]=false;
  42. //创建线程池
  43. if (m_thread_queue_process == nullptr) {
  44. m_thread_queue_process = tq::TQFactory::CreateDefaultQueue();
  45. m_thread_queue_process->Start(48);
  46. }
  47. /*
  48. * 此处添加等待各个通讯模块正常代码
  49. */
  50. std::chrono::system_clock::time_point t_start=std::chrono::system_clock::now();
  51. std::chrono::system_clock::time_point t_end=std::chrono::system_clock::now();
  52. /*Error_manager parkspace_code=ERROR;
  53. LOG(INFO)<<"初始化车位管理模块...";
  54. do
  55. {
  56. if (parkspace_code != SUCCESS) {
  57. parkspace_code = Parkspace_excutor::get_instance_pointer()->check_statu();
  58. LOG_IF(INFO, parkspace_code == SUCCESS) << "车位管理模块初始化完成!!!";
  59. }
  60. if(parkspace_code==SUCCESS)
  61. break;
  62. t_end=std::chrono::system_clock::now();
  63. usleep(1000*100);
  64. }while(t_end-t_start<std::chrono::seconds(300));
  65. LOG_IF(ERROR,parkspace_code!=SUCCESS)<<"车位管理节点连接超时";
  66. if(parkspace_code!=SUCCESS)
  67. {
  68. return Error_manager(ERROR,MAJOR_ERROR,"车位管理模块初始化超时");
  69. }
  70. //检查节点并创建停取车流程
  71. for(int i=0;i<system_setting.entrance_num();++i)
  72. {
  73. //
  74. // 检查入口 i 的各个节点状态
  75. //
  76. LOG(INFO)<<"初始化停车入口:"<<i<<" 测量及调度模块...";
  77. Error_manager locate_code=ERROR,dispatch_code=ERROR;
  78. do {
  79. if (locate_code != SUCCESS) {
  80. locate_code = Measure_excutor::get_instance_pointer()->check_statu(i);
  81. LOG_IF(INFO, locate_code == SUCCESS) << "停车入口:"<<i<<" 测量模块初始化完成!!!";
  82. }
  83. if (dispatch_code != SUCCESS) {
  84. dispatch_code = Dispatch_excutor::get_instance_pointer()->check_entrance_statu(i);
  85. LOG_IF(INFO, dispatch_code == SUCCESS) << "停车入口:"<<i<<" 调度模块初始化完成!!!";
  86. }
  87. t_end=std::chrono::system_clock::now();
  88. if (locate_code == SUCCESS && dispatch_code == SUCCESS)
  89. break;
  90. usleep(1000*100);
  91. }while(t_end-t_start<std::chrono::seconds(300));
  92. LOG_IF(ERROR,locate_code!=SUCCESS||dispatch_code!=SUCCESS)<<"停车口:"<<i<<" 测量/调度节点连接超时";
  93. if(locate_code!=SUCCESS||dispatch_code!=SUCCESS)
  94. {
  95. return Error_manager(ERROR,MAJOR_ERROR,"停车入口通讯节点初始化超时");
  96. }
  97. }
  98. for(int i=0;i<system_setting.export_num();++i)
  99. {
  100. LOG(INFO)<<"初始化取车出口:"<<i<<" 调度模块...";
  101. Error_manager dispatch_code=ERROR;
  102. do {
  103. if (dispatch_code != SUCCESS) {
  104. dispatch_code = Dispatch_excutor::get_instance_pointer()->check_export_statu(i);
  105. LOG_IF(INFO, dispatch_code == SUCCESS) << "取车出口:" << i << " 调度模块初始化完成!!!";
  106. }
  107. t_end = std::chrono::system_clock::now();
  108. if (dispatch_code == SUCCESS)
  109. break;
  110. usleep(1000*100);
  111. }while(t_end-t_start<std::chrono::seconds(300));
  112. LOG_IF(ERROR,dispatch_code!=SUCCESS)<<"取车出口:"<<i<<" 调度节点连接超时";
  113. if(dispatch_code!=SUCCESS)
  114. {
  115. return Error_manager(ERROR,MAJOR_ERROR,"取车出口通讯节点初始化超时");
  116. }
  117. }
  118. */
  119. m_system_setting=system_setting;
  120. return SUCCESS;
  121. }
  122. /*
  123. * 执行停车请求
  124. */
  125. Error_manager Command_manager::execute_store_command(message::Store_command_request_msg& request)
  126. {
  127. message::Error_manager error_msg;
  128. message::Store_command_response_msg response;
  129. if (m_system_paused == true)
  130. {
  131. error_msg.set_error_code(PAUSE);
  132. error_msg.set_error_description("急停");
  133. response.mutable_code()->CopyFrom(error_msg);
  134. Communication_message msg;
  135. msg.reset(response.base_info(), response.SerializeAsString());
  136. Message_communicator::get_instance_pointer()->send_msg(&msg);
  137. return Error_manager(PAUSE, MINOR_ERROR, "急停");
  138. }
  139. //判断entrance是否开放
  140. if (request.terminal_id() < 0 || request.terminal_id() >= m_system_setting.entrance_num())
  141. {
  142. error_msg.set_error_code(ERROR);
  143. error_msg.set_error_description(" xxxx entrance id 设置错误");
  144. response.mutable_code()->CopyFrom(error_msg);
  145. return Error_manager(ERROR, MINOR_ERROR, " xxxxx entrance id 设置错误");
  146. }
  147. if (m_input_entrance_paused[request.terminal_id()] != false)
  148. {
  149. error_msg.set_error_code(ERROR);
  150. error_msg.set_error_description("中控entrance已停止使用");
  151. response.mutable_code()->CopyFrom(error_msg);
  152. Communication_message msg;
  153. msg.reset(response.base_info(), response.SerializeAsString());
  154. Message_communicator::get_instance_pointer()->send_msg(&msg);
  155. return Error_manager(ERROR, MINOR_ERROR, "中控entrance已停止使用");
  156. }
  157. message::Base_info base_info_response;
  158. base_info_response.set_msg_type(message::eStore_command_response_msg);
  159. base_info_response.set_sender(message::eMain);
  160. base_info_response.set_receiver(message::eTerminor);
  161. response.mutable_base_info()->CopyFrom(base_info_response);
  162. response.set_terminal_id(request.terminal_id());
  163. Error_manager code;
  164. if (request.base_info().msg_type() == message::eStore_command_request_msg
  165. && request.base_info().receiver() == message::eMain
  166. && request.base_info().sender() == message::eTerminor) {
  167. if (request.has_locate_information() && request.has_base_info())
  168. {
  169. message::Locate_information locate_info = request.locate_information();
  170. if (locate_info.has_locate_correct())
  171. {
  172. if (locate_info.locate_correct() == true)
  173. {
  174. if (locate_info.has_locate_width() && locate_info.has_locate_height()
  175. && locate_info.has_locate_x() && locate_info.has_locate_y()
  176. && locate_info.has_locate_angle() && locate_info.has_locate_wheel_base())
  177. {
  178. LOG(INFO) << "------ 停 ------- 收到停车,车牌:" << request.car_info().license() <<
  179. ",终端:" << request.terminal_id() << "............................";
  180. //check hardwared
  181. Error_manager parkspace_code = Parkspace_excutor::get_instance_pointer()->check_statu();
  182. Error_manager dispatch_code = Dispatch_excutor::get_instance_pointer()->check_entrance_statu(request.terminal_id());
  183. if(parkspace_code!=SUCCESS)
  184. {
  185. error_msg.set_error_code(parkspace_code.get_error_code());
  186. error_msg.set_error_description(parkspace_code.to_string());
  187. response.mutable_code()->CopyFrom(error_msg);
  188. LOG(ERROR) << "xxxx 创建停车流程失败:" << request.terminal_id() <<
  189. "车牌:" << request.car_info().license() << " " << parkspace_code.to_string();
  190. Communication_message msg;
  191. msg.reset(response.base_info(), response.SerializeAsString());
  192. Message_communicator::get_instance_pointer()->send_msg(&msg);
  193. return parkspace_code;
  194. }
  195. if(dispatch_code!=SUCCESS)
  196. {
  197. error_msg.set_error_code(dispatch_code.get_error_code());
  198. error_msg.set_error_description(dispatch_code.to_string());
  199. response.mutable_code()->CopyFrom(error_msg);
  200. LOG(ERROR) << "xxxx 创建停车流程失败:" << request.terminal_id() <<
  201. "车牌:" << request.car_info().license() << " " << dispatch_code.to_string();
  202. Communication_message msg;
  203. msg.reset(response.base_info(), response.SerializeAsString());
  204. Message_communicator::get_instance_pointer()->send_msg(&msg);
  205. return dispatch_code;
  206. }
  207. /*
  208. * 检查消息完毕,开始处理
  209. */
  210. Process_task *ptask = new StoreProcessTask(request.terminal_id(), request.car_info());
  211. StoreProcessTask *pStore_task = (StoreProcessTask *) ptask;
  212. //初始化流程
  213. code = pStore_task->init_task(locate_info);
  214. if (code != SUCCESS)
  215. {
  216. delete pStore_task;
  217. error_msg.set_error_code(code.get_error_code());
  218. error_msg.set_error_description(code.to_string());
  219. response.mutable_code()->CopyFrom(error_msg);
  220. LOG(ERROR) << "xxxx 创建停车流程失败(车位分配失败),终端:" << request.terminal_id() <<
  221. "车牌:" << request.car_info().license() << " " << code.to_string();
  222. Communication_message msg;
  223. msg.reset(response.base_info(), response.SerializeAsString());
  224. Message_communicator::get_instance_pointer()->send_msg(&msg);
  225. return code;
  226. }
  227. m_thread_queue_process->AddTask(pStore_task);
  228. return SUCCESS;
  229. }
  230. }
  231. }
  232. }
  233. error_msg.set_error_code(FAILED);
  234. error_msg.set_error_description("创建停车流程失败...指令缺少必要信息...");
  235. response.mutable_code()->CopyFrom(error_msg);
  236. Communication_message msg;
  237. msg.reset(response.base_info(), response.SerializeAsString());
  238. Message_communicator::get_instance_pointer()->send_msg(&msg);
  239. return Error_manager(FAILED, MINOR_ERROR, "创建停车流程失败...指令缺少必要信息...");
  240. } else {
  241. error_msg.set_error_code(INVALID_MESSAGE);
  242. error_msg.set_error_description("停车请求基本信息错误");
  243. response.mutable_code()->CopyFrom(error_msg);
  244. Communication_message msg;
  245. msg.reset(response.base_info(),response.SerializeAsString());
  246. Message_communicator::get_instance_pointer()->send_msg(&msg);
  247. return Error_manager(INVALID_MESSAGE, MAJOR_ERROR, "停车请求基本信息错误");
  248. }
  249. }
  250. /*
  251. * 执行取车请求
  252. */
  253. Error_manager Command_manager::execute_pickup_command(message::Pickup_command_request_msg& request)
  254. {
  255. message::Error_manager error_msg;
  256. message::Pickup_command_response_msg response;
  257. message::Base_info base_info_response;
  258. base_info_response.set_msg_type(message::ePickup_command_response_msg);
  259. base_info_response.set_sender(message::eMain);
  260. base_info_response.set_receiver(message::eTerminor);
  261. response.mutable_base_info()->CopyFrom(base_info_response);
  262. response.set_terminal_id(request.terminal_id());
  263. if (m_thread_queue_process == nullptr)
  264. {
  265. error_msg.set_error_code(ERROR);
  266. error_msg.set_error_description(" xxxxx 中控bug,线程池未初始化");
  267. response.mutable_code()->CopyFrom(error_msg);
  268. Communication_message msg;
  269. msg.reset(response.base_info(), response.SerializeAsString());
  270. Message_communicator::get_instance_pointer()->send_msg(&msg);
  271. return Error_manager(ERROR, MINOR_ERROR, "线程池未初始化,bug");
  272. }
  273. if (m_system_paused == true)
  274. {
  275. error_msg.set_error_code(PAUSE);
  276. error_msg.set_error_description("急停");
  277. response.mutable_code()->CopyFrom(error_msg);
  278. Communication_message msg;
  279. msg.reset(response.base_info(), response.SerializeAsString());
  280. Message_communicator::get_instance_pointer()->send_msg(&msg);
  281. return Error_manager(PAUSE, MINOR_ERROR, "急停");
  282. }
  283. //判断出口是否开放
  284. if (request.terminal_id() < 0 || request.terminal_id() >= m_system_setting.export_num())
  285. {
  286. error_msg.set_error_code(ERROR);
  287. error_msg.set_error_description(" xxxx 出口 id 设置错误");
  288. response.mutable_code()->CopyFrom(error_msg);
  289. return Error_manager(ERROR, MINOR_ERROR, " xxxxx出口 id 设置错误");
  290. }
  291. if (m_output_entrance_paused[request.terminal_id()] != false)
  292. {
  293. error_msg.set_error_code(ERROR);
  294. error_msg.set_error_description("中控出口已停止使用");
  295. response.mutable_code()->CopyFrom(error_msg);
  296. Communication_message msg;
  297. msg.reset(response.base_info(), response.SerializeAsString());
  298. Message_communicator::get_instance_pointer()->send_msg(&msg);
  299. return Error_manager(ERROR, MINOR_ERROR, "中控出口已停止使用");
  300. }
  301. if (request.base_info().msg_type() == message::ePickup_command_request_msg
  302. && request.base_info().receiver() == message::eMain
  303. && request.base_info().sender() == message::eTerminor
  304. && request.has_car_info())
  305. {
  306. response.set_terminal_id(request.terminal_id());
  307. /*
  308. * 检查各个节点是否正常
  309. */
  310. Error_manager parkspace_code = Parkspace_excutor::get_instance_pointer()->check_statu();
  311. if (parkspace_code != SUCCESS)
  312. {
  313. error_msg.set_error_code(parkspace_code.get_error_code());
  314. error_msg.set_error_description(parkspace_code.get_error_description());
  315. response.mutable_code()->CopyFrom(error_msg);
  316. Communication_message msg;
  317. msg.reset(response.base_info(), response.SerializeAsString());
  318. Message_communicator::get_instance_pointer()->send_msg(&msg);
  319. return parkspace_code;
  320. }
  321. Error_manager dispatch_code = Dispatch_excutor::get_instance_pointer()->check_export_statu(request.terminal_id());
  322. if (dispatch_code != SUCCESS)
  323. {
  324. error_msg.set_error_code(dispatch_code.get_error_code());
  325. error_msg.set_error_description(dispatch_code.get_error_description());
  326. response.mutable_code()->CopyFrom(error_msg);
  327. Communication_message msg;
  328. msg.reset(response.base_info(), response.SerializeAsString());
  329. Message_communicator::get_instance_pointer()->send_msg(&msg);
  330. return dispatch_code;
  331. }
  332. //一切正常,接受指令
  333. Error_manager code;
  334. LOG(WARNING) << "--------- 取 -------收到取车-----------------------------" << request.car_info().license();
  335. Process_task *ptask = new PickupProcessTask(request.terminal_id(), request.car_info());
  336. PickupProcessTask *pPick_task = (PickupProcessTask *) ptask;
  337. //初始化流程
  338. code = pPick_task->init_task();
  339. if (code == SUCCESS)
  340. {
  341. m_thread_queue_process->AddTask(pPick_task);
  342. return SUCCESS;
  343. }
  344. usleep(1000 * 1000);
  345. error_msg.set_error_code(code.get_error_code());
  346. error_msg.set_error_description(code.to_string());
  347. response.mutable_code()->CopyFrom(error_msg);
  348. LOG(ERROR) << "创建取车流程失败(车位查询失败),终端:" << request.terminal_id() <<
  349. "车牌:" << request.car_info().license() << " " << code.to_string();
  350. delete pPick_task;
  351. Communication_message msg;
  352. msg.reset(response.base_info(), response.SerializeAsString());
  353. Message_communicator::get_instance_pointer()->send_msg(&msg);
  354. return code;
  355. }
  356. else
  357. {
  358. error_msg.set_error_code(INVALID_MESSAGE);
  359. error_msg.set_error_description("停车请求信息错误");
  360. response.mutable_code()->CopyFrom(error_msg);
  361. Communication_message msg;
  362. msg.reset(response.base_info(), response.SerializeAsString());
  363. Message_communicator::get_instance_pointer()->send_msg(&msg);
  364. return Error_manager(INVALID_MESSAGE, MAJOR_ERROR, "停车请求信息错误");
  365. }
  366. }
  367. /*
  368. * 控制入口 开放或者关闭
  369. */
  370. Error_manager Command_manager::pause_entrance(int terminal_id,bool paused)
  371. {
  372. if(terminal_id<0 || terminal_id>m_system_setting.entrance_num())
  373. return Error_manager(ERROR,MINOR_ERROR,"xxxx");
  374. m_input_entrance_paused[terminal_id]=paused;
  375. return SUCCESS;
  376. }
  377. /*
  378. * 控制出口 开放或者关闭
  379. */
  380. Error_manager Command_manager::pause_export(int terminal_id,bool paused)
  381. {
  382. if(terminal_id<0 || terminal_id>m_system_setting.export_num())
  383. return Error_manager(ERROR,MINOR_ERROR,"xxxx");
  384. m_output_entrance_paused[terminal_id]=paused;
  385. return SUCCESS;
  386. }
  387. /*
  388. * pause,系统急停
  389. */
  390. void Command_manager::pause_system()
  391. {
  392. m_system_paused=true;
  393. }
  394. bool Command_manager::entrance_paused(int terminal_id)
  395. {
  396. if(terminal_id>=m_system_setting.export_num())
  397. return false;
  398. return m_output_entrance_paused[terminal_id];
  399. }
  400. bool Command_manager::export_paused(int terminal_id)
  401. {
  402. if(terminal_id>=m_system_setting.entrance_num())
  403. return false;
  404. return m_input_entrance_paused[terminal_id];
  405. }