// // Created by zx on 2020/7/13. // #include "Terminal_communication.h" #include "process_message.pb.h" #include #include Terminal_communication::Terminal_communication() :m_update_msg_map_thread(nullptr) { if(m_update_msg_map_thread== nullptr) { m_publish_exit_condition.reset(false, false, false); m_update_msg_map_thread=new std::thread(thread_update_map_function,this); } } Terminal_communication::~Terminal_communication(){} //重载函数 Error_manager Terminal_communication::encapsulate_msg(Communication_message* message) { Error_manager code; switch (message->get_message_type()) { case Communication_message::eStore_command_request_msg: { message::Store_command_request_msg request; request.ParseFromString(message->get_message_buf()); //发送请求 code= Communication_socket_base::encapsulate_msg(message); break; } case Communication_message::ePickup_command_request_msg: { message::Pickup_command_request_msg request; request.ParseFromString(message->get_message_buf()); //发送请求 code= Communication_socket_base::encapsulate_msg(message); break; } default: code= Error_manager(ERROR,NEGLIGIBLE_ERROR,"terminal message table is not exist"); } return code; } Error_manager Terminal_communication::execute_msg(Communication_message* p_msg) { if(p_msg->get_message_type()==Communication_message::eStore_command_response_msg) { message::Store_command_response_msg response; if(false==response.ParseFromString(p_msg->get_message_buf())) { return Error_manager(ERROR,CRITICAL_ERROR,"停车指令反馈信息解析错误"); } message::Base_info base_info=response.base_info(); if(base_info.sender()==message::eMain && base_info.receiver()==message::eTerminor) { message::Error_manager error_code=response.code(); m_store_response_msg_table[response.license()]=response; return SUCCESS; } } if(p_msg->get_message_type()==Communication_message::ePickup_command_response_msg) { message::Pickup_command_response_msg response; if(false==response.ParseFromString(p_msg->get_message_buf())) { return Error_manager(ERROR,CRITICAL_ERROR,"停车指令反馈信息解析错误"); } LOG(INFO)<<" 取车反馈:"<get_message_type()==Communication_message::eStoring_process_statu_msg) { message::Storing_process_statu_msg msg; if(msg.ParseFromString(p_msg->get_message_buf())==false) { return Error_manager(ERROR,CRITICAL_ERROR,"停车流程状态信息解析错误"); } message::Base_info base_info=msg.base_info(); if(base_info.sender()==message::eMain) { if(m_storing_process_statu_map.find(msg.license())==false) { m_storing_license_queue.push(msg.license()); } m_storing_process_statu_map[msg.license()]=msg; std::chrono::system_clock::time_point time_point=std::chrono::system_clock::now(); m_storing_statu_process_time_point_map[msg.license()]=time_point; } } if(p_msg->get_message_type()==Communication_message::ePicking_process_statu_msg) { message::Picking_process_statu_msg msg; if(msg.ParseFromString(p_msg->get_message_buf())==false) { return Error_manager(ERROR,CRITICAL_ERROR,"取车流程状态信息解析错误"); } message::Base_info base_info=msg.base_info(); if(base_info.sender()==message::eMain) { if(m_picking_statu_process_map.find(msg.license())==false) { m_picking_license_queue.push(msg.license()); } m_picking_statu_process_map[msg.license()]=msg; std::chrono::system_clock::time_point time_point=std::chrono::system_clock::now(); m_picking_statu_process_time_point_map[msg.license()]=time_point; } } if(p_msg->get_message_type()==Communication_message::eCentral_controller_statu_msg) { message::Central_controller_statu_msg msg; if(msg.ParseFromString(p_msg->get_message_buf())==false) { return Error_manager(ERROR,CRITICAL_ERROR,"中控状态信息解析错误"); } message::Base_info base_info=msg.base_info(); if(base_info.sender()==message::eMain) { } } return SUCCESS; } /* * 检测消息是否可被处理 */ Error_manager Terminal_communication::check_msg(Communication_message* p_msg) { return SUCCESS; } /* * 心跳发送函数,重载 */ Error_manager Terminal_communication::encapsulate_send_data() { return SUCCESS; } //检查消息是否可以被解析, 需要重载 Error_manager Terminal_communication::check_executer(Communication_message* p_msg) { return SUCCESS; } /* * 发送停车指令请求 */ Error_manager Terminal_communication::store_request(message::Store_command_request_msg& request,message::Store_command_response_msg& result) { if(request.has_locate_information()==false || request.has_car_info()==false) { return Error_manager(ERROR,CRITICAL_ERROR,"停车指令请求消息缺少必要信息"); } Error_manager code; Communication_message message; message.reset(request.base_info(),request.SerializeAsString()); int timeout=1000*5; code=encapsulate_msg(&message); if(code!=SUCCESS) return code; //循环查询请求是否被处理 auto start_time=std::chrono::system_clock::now(); double time=0; do { ///查询是否存在,并且删除该记录, message::Store_command_response_msg response; if(m_store_response_msg_table.find(request.car_info().license(),response)) { //判断是否接收到回应,若回应信息被赋值则证明有回应 if (response.has_base_info() && response.has_code()) { message::Base_info response_base = response.base_info(); //检查类型是否匹配 if (response_base.msg_type() != message::eStore_command_response_msg) { return Error_manager(ERROR, CRITICAL_ERROR, "停车指令反馈消息 response msg type error"); } //检查基本信息是否匹配 if (response_base.sender() != message::eMain || response_base.receiver() != message::eTerminor ) { return Error_manager(PARKSPACE_RELEASE_RESPONSE_INFO_ERROR, MAJOR_ERROR, "parkspace store response msg info error"); } result = response; m_store_response_msg_table.erase(request.car_info().license()); if(response.has_code()) { if(response.code().error_code()==0) return SUCCESS; } return Error_manager(FAILED,MINOR_ERROR,"车位查询返回错误码"); } } auto end_time=std::chrono::system_clock::now(); auto duration = std::chrono::duration_cast(end_time - start_time); time=1000.0*double(duration.count()) * std::chrono::microseconds::period::num / std::chrono::microseconds::period::den; if(time>double(timeout)) { m_store_response_msg_table.erase(request.car_info().license()); return Error_manager(RESPONSE_TIMEOUT,MINOR_ERROR,"storing terminal request timeout"); } usleep(1000); }while(1); } /* * 发送取车指令请求 */ Error_manager Terminal_communication::pickup_request(message::Pickup_command_request_msg& request,message::Pickup_command_response_msg& result) { if(request.has_car_info()==false) { return Error_manager(ERROR,CRITICAL_ERROR,"取车指令请求消息缺少车辆信息"); } Error_manager code; Communication_message message; message.reset(request.base_info(),request.SerializeAsString()); int timeout=1000*3; code=encapsulate_msg(&message); if(code!=SUCCESS) return code; // m_statu=eTerminal_picking; //循环查询请求是否被处理 auto start_time=std::chrono::system_clock::now(); double time=0; do { ///查询是否存在,并且删除该记录, message::Pickup_command_response_msg response; if(m_pickup_response_msg_table.find(request.car_info().license(),response)) { //判断是否接收到回应,若回应信息被赋值则证明有回应 if (response.has_base_info() && response.has_code()) { message::Base_info response_base = response.base_info(); //检查类型是否匹配 if (response_base.msg_type() != message::ePickup_command_response_msg) { return Error_manager(ERROR, CRITICAL_ERROR, "停车指令反馈消息 response msg type error"); } //检查基本信息是否匹配 if (response_base.sender() != message::eMain || response_base.receiver() != message::eTerminor ) { return Error_manager(PARKSPACE_RELEASE_RESPONSE_INFO_ERROR, MAJOR_ERROR, "parkspace release response msg info error"); } result = response; m_pickup_response_msg_table.erase(request.car_info().license()); if(response.has_code()) { if(response.code().error_code()==0) return SUCCESS; } return Error_manager(FAILED,MINOR_ERROR,"车位查询返回错误码"); } } auto end_time=std::chrono::system_clock::now(); auto duration = std::chrono::duration_cast(end_time - start_time); time=1000.0*double(duration.count()) * std::chrono::microseconds::period::num / std::chrono::microseconds::period::den; if(time>double(timeout)) { m_pickup_response_msg_table.erase(request.car_info().license()); return Error_manager(RESPONSE_TIMEOUT,MINOR_ERROR,"picking terminal request timeout"); } usleep(1000); }while(1); } /* * 获取停车流程状态 */ Error_manager Terminal_communication::get_storing_statu(std::string car_license,message::Storing_process_statu_msg& msg) { if(m_storing_process_statu_map.find(car_license,msg)) { return SUCCESS; } return FAILED; } /* * 获取取车流程状态 */ Error_manager Terminal_communication::get_picking_statu(std::string car_license,message::Picking_process_statu_msg& msg) { if(m_picking_statu_process_map.find(car_license,msg)) return SUCCESS; return FAILED; } void Terminal_communication::update_map() { while(false==m_publish_exit_condition.wait_for_ex(std::chrono::milliseconds(200))) { std::string license; if (m_storing_license_queue.try_pop(license)) { std::chrono::system_clock::time_point t1 = std::chrono::system_clock::now(); std::chrono::system_clock::time_point start = m_storing_statu_process_time_point_map[license]; auto duration = std::chrono::duration_cast(t1 - start); double time = 1000.0 * double(duration.count()) * std::chrono::microseconds::period::num / std::chrono::microseconds::period::den; if (time > 300) { m_storing_statu_process_time_point_map.erase(license); m_storing_process_statu_map.erase(license); } else { m_storing_license_queue.push(license); } } if (m_picking_license_queue.try_pop(license)) { std::chrono::system_clock::time_point t1 = std::chrono::system_clock::now(); std::chrono::system_clock::time_point start = m_picking_statu_process_time_point_map[license]; auto duration = std::chrono::duration_cast(t1 - start); double time = 1000.0 * double(duration.count()) * std::chrono::microseconds::period::num / std::chrono::microseconds::period::den; if (time > 300) { m_picking_statu_process_time_point_map.erase(license); m_picking_statu_process_map.erase(license); } else { m_picking_license_queue.push(license); } } } } void Terminal_communication::thread_update_map_function(Terminal_communication *terminal) { terminal->update_map(); }