Explorar o código

20200709, 优化执行模块

huli %!s(int64=4) %!d(string=hai) anos
pai
achega
beb65d6601

+ 2 - 2
communication/communication_socket_base.cpp

@@ -150,7 +150,7 @@ Error_manager Communication_socket_base::communication_run()
 	m_communication_statu = COMMUNICATION_READY;
 	//启动4个线程。
 	//接受线程默认循环, 内部的nn_recv进行等待, 超时1ms
-	m_receive_condition.reset(false, true, false);
+	m_receive_condition.reset(false, false, false);
 	mp_receive_data_thread = new std::thread(&Communication_socket_base::receive_data_thread, this);
 	//解析线程默认等待, 需要接受线程去唤醒, 超时1ms, 超时后主动遍历m_receive_data_list
 	m_analysis_data_condition.reset(false, false, false);
@@ -248,7 +248,7 @@ void Communication_socket_base::receive_data_thread()
 	//通信接受线程, 负责接受socket消息, 并存入 m_receive_data_list
 	while (m_receive_condition.is_alive())
 	{
-		m_receive_condition.wait();
+		m_receive_condition.wait_for_ex(std::chrono::microseconds(1));
 		if ( m_receive_condition.is_alive() )
 		{
 			std::this_thread::yield();

+ 5 - 0
error_code/error_code.cpp

@@ -179,6 +179,11 @@ char* Error_manager::get_error_description()
     return pm_error_description;
 }
 
+int Error_manager::get_description_length()
+{
+	return m_description_length;
+}
+
 //复制错误描述,(深拷贝)
 //output:p_error_description     错误描述的字符串指针,不可以为NULL,必须要有实际的内存
 //output:description_length      错误描述的字符串长度,不可以为0,长度最好足够大,一般256即可。

+ 7 - 4
error_code/error_code.h

@@ -289,10 +289,11 @@ enum Error_code
 	//system module, 系统模块
 	SYSTEM_EXECUTOR_ERROR_BASE						= 0x12010000,		//系统执行模块,
 	SYSTEM_EXECUTOR_PARSE_ERROR,										//系统执行模块, 解析消息错误
-	SYSTEM_EXECUTOR_STATUS_BUSY,
-	SYSTEM_EXECUTOR_STATUS_ERROR,
-	
-	    LOCATER_MSG_TABLE_NOT_EXIST ,
+	SYSTEM_EXECUTOR_STATUS_BUSY,										//系统执行模块, 状态正忙
+	SYSTEM_EXECUTOR_STATUS_ERROR,										//系统执行模块, 状态错误
+	SYSTEM_EXECUTOR_CHECK_ERROR,										//系统执行模块, 检查错误
+
+	LOCATER_MSG_TABLE_NOT_EXIST ,
     LOCATER_MSG_RESPONSE_TYPE_ERROR,
     LOCATER_MSG_RESPONSE_INFO_ERROR,
     LOCATER_MSG_REQUEST_INVALID,
@@ -395,6 +396,8 @@ public://外部接口函数
     //获取错误描述的指针,(浅拷贝)
     char* get_error_description();
 
+	int get_description_length();
+
     //复制错误描述,(深拷贝)
     //output:p_error_description     错误描述的字符串指针,不可以为NULL,必须要有实际的内存
     //output:description_length      错误描述的字符串长度,不可以为0,长度最好足够大,一般256即可。

+ 15 - 11
main.cpp

@@ -17,6 +17,7 @@
 
 #include "./tool/thread_pool.h"
 #include "./system/system_communication.h"
+#include "./system/system_executor.h"
 
 #define LIVOX_NUMBER	     2
 
@@ -28,21 +29,24 @@ void asd(int i)
 
 int main(int argc,char* argv[])
 {
+	Laser_manager::get_instance_references().laser_manager_init();
+	std::cout << "Laser_manager = " << Laser_manager::get_instance_references().get_laser_manager_status() << std::endl;
+	Locate_manager::get_instance_references().Locate_manager_init();
+	std::cout << "Locate_manager = " << Locate_manager::get_instance_references().get_locate_manager_status() << std::endl;
+	System_executor::get_instance_references().system_executor_init(4);
+	std::cout << "System_executor = " << System_executor::get_instance_references().get_system_executor_status() << std::endl;
 
+	System_communication::get_instance_references().communication_init();
 
-//	System_communication csb;
+	char ch ;
+	std::cin >> ch ;
 
-//	std::vector<std::string> connect_string_vector;
-//	connect_string_vector.push_back("tcp://192.168.2.166:9001");
-//	csb.communication_init("tcp://192.168.2.166:9000", connect_string_vector);
+	System_communication::get_instance_references().communication_uninit();
+	System_executor::get_instance_references().system_executor_uninit();
+	Locate_manager::get_instance_references().Locate_manager_uninit();
+	Laser_manager::get_instance_references().laser_manager_uninit();
 
-//	csb.communication_init();
-
-//	System_communication::get_instance_references().communication_init();
-//
-//	char ch ;
-//	std::cin >> ch ;
-//	return 0;
+	return 0;
 
 	Thread_pool pool(4);
 	std::vector< std::future<int> > results;

+ 4 - 1
setting/communication.prototxt

@@ -8,7 +8,10 @@ communication_parameters
  # connect_string_vector:"tcp://192.168.2.166:9002"
 
   # connect_string_vector:"tcp://192.168.2.125:9876"
-   connect_string_vector:"tcp://192.168.2.166:1234"
+  # connect_string_vector:"tcp://192.168.2.166:1234"
+
+   bind_string:"tcp://192.168.2.166:4444"
+
 
 }
 

+ 17 - 4
system/system_communication.cpp

@@ -21,6 +21,7 @@ System_communication::~System_communication()
 //检查消息是否有效, 主要检查消息类型和接受者, 判断这条消息是不是给我的.
 Error_manager System_communication::check_msg(Communication_message*  p_msg)
 {
+
 	//通过 p_msg->get_message_type() 和 p_msg->get_receiver() 判断这条消息是不是给我的.
 	if ( p_msg->get_message_type() == Communication_message::Message_type::eLocate_request_msg
 		 && p_msg->get_receiver() == Communication_message::Communicator::eMeasurer )
@@ -29,6 +30,7 @@ Error_manager System_communication::check_msg(Communication_message*  p_msg)
 	}
 	else
 	{
+//		std::cout << "System_communication::check_msg INVALID_MESSAGE" << std::endl;
 		//无效的消息,
 		return Error_code::INVALID_MESSAGE;
 	}
@@ -40,8 +42,8 @@ Error_manager System_communication::check_executer(Communication_message*  p_msg
 	//检查对应模块的状态, 判断是否可以处理这条消息
 	//同时也要判断是否超时, 超时返回 COMMUNICATION_ANALYSIS_TIME_OUT
 	//如果处理器正在忙别的, 那么返回 COMMUNICATION_EXCUTER_IS_BUSY
-	std::cout << "Communication_socket_base::check_msg  p_buf =  " << p_msg->get_message_buf() << std::endl;
-	std::cout << "Communication_socket_base::check_msg   size =  " << p_msg->get_message_buf().size() << std::endl;
+//	std::cout << "Communication_socket_base::check_msg  p_buf =  " << p_msg->get_message_buf() << std::endl;
+//	std::cout << "Communication_socket_base::check_msg   size =  " << p_msg->get_message_buf().size() << std::endl;
 
 	Error_manager t_error;
 	if ( p_msg->is_over_time() )
@@ -52,6 +54,7 @@ Error_manager System_communication::check_executer(Communication_message*  p_msg
 	}
 	else
 	{
+
 		return System_executor::get_instance_references().check_executer(p_msg);
 	}
 	return Error_code::SUCCESS;
@@ -68,7 +71,17 @@ Error_manager System_communication::encapsulate_send_data()
 {
 	Error_manager t_error;
 
-//创建一条答复消息
+	message::Measure_request_msg t_measure_request_msg;
+	t_measure_request_msg.mutable_base_info()->set_msg_type(message::Message_type::eLocate_request_msg);
+	t_measure_request_msg.mutable_base_info()->set_timeout_ms(5000);
+	t_measure_request_msg.mutable_base_info()->set_sender(message::Communicator::eMain);
+	t_measure_request_msg.mutable_base_info()->set_receiver(message::Communicator::eMeasurer);
+	t_measure_request_msg.set_command_id(123);
+	t_measure_request_msg.set_terminal_id(1);
+	string t_msg = t_measure_request_msg.SerializeAsString();
+	System_communication::get_instance_references().encapsulate_msg(t_msg);
+	/*
+	//创建一条状态消息
 	message::Measure_status_msg t_measure_status_msg;
 	t_measure_status_msg.mutable_base_info()->set_msg_type(message::Message_type::eLocate_status_msg);
 	t_measure_status_msg.mutable_base_info()->set_timeout_ms(5000);
@@ -104,7 +117,7 @@ Error_manager System_communication::encapsulate_send_data()
 
 	string t_msg = t_measure_status_msg.SerializeAsString();
 	System_communication::get_instance_references().encapsulate_msg(t_msg);
-
+*/
 	return Error_code::SUCCESS;
 }
 

+ 18 - 3
system/system_executor.cpp

@@ -15,13 +15,22 @@ System_executor::System_executor()
 
 System_executor::~System_executor()
 {
-
+	system_executor_uninit();
 }
 
 //初始化
 Error_manager System_executor::system_executor_init(int threads_size)
 {
 	m_thread_pool.thread_pool_init(threads_size);
+	m_system_executor_status = SYSTEM_EXECUTOR_READY;
+	return Error_code::SUCCESS;
+}
+
+//反初始化
+Error_manager System_executor::system_executor_uninit()
+{
+	m_thread_pool.thread_pool_uninit();
+	m_system_executor_status = SYSTEM_EXECUTOR_UNKNOW;
 	return Error_code::SUCCESS;
 }
 
@@ -83,10 +92,12 @@ Error_manager System_executor::check_executer(Communication_message* p_msg)
 
 						string t_msg = t_measure_response_msg.SerializeAsString();
 						System_communication::get_instance_references().encapsulate_msg(t_msg);
+						LOG(INFO) << " System_executor::check_executer executer status error "<< this;
 						return t_error;
 					}
 					else
 					{
+						LOG(INFO) << " System_executor::check_executer second PARSE_ERROR "<< this;
 						return Error_manager(Error_code::SYSTEM_EXECUTOR_PARSE_ERROR, Error_level::MINOR_ERROR,
 											 " message::Measure_request_msg  ParseFromString error ");
 					}
@@ -112,7 +123,6 @@ Error_manager System_executor::execute_msg(Communication_message* p_msg)
 		return Error_manager(Error_code::POINTER_IS_NULL, Error_level::MINOR_ERROR,
 							 "  POINTER IS NULL ");
 	}
-
 	switch ( p_msg->get_message_type() )
 	{
 		case Communication_message::eLocate_request_msg:
@@ -172,6 +182,10 @@ bool System_executor::is_ready()
 	}
 }
 
+System_executor::System_executor_status System_executor::get_system_executor_status()
+{
+	return m_system_executor_status;
+}
 
 
 
@@ -184,6 +198,7 @@ void System_executor::execute_for_measure(int command_id, int terminal_id)
 {
 	Error_manager t_error;
 
+	LOG(INFO) << " System_executor::execute_for_measure run "<< this;
 	//这里要处理.......以后再写
 
 	//创建一条答复消息
@@ -197,7 +212,7 @@ void System_executor::execute_for_measure(int command_id, int terminal_id)
 	t_measure_response_msg.set_terminal_id(terminal_id);
 	t_measure_response_msg.mutable_error_manager()->set_error_code(t_error.get_error_code());
 	t_measure_response_msg.mutable_error_manager()->set_error_level((message::Error_level)t_error.get_error_level());
-	t_measure_response_msg.mutable_error_manager()->set_error_description(t_error.get_error_description());
+	t_measure_response_msg.mutable_error_manager()->set_error_description(t_error.get_error_description(), t_error.get_description_length());
 
 	t_measure_response_msg.mutable_locate_information()->set_locate_x(0);
 	t_measure_response_msg.mutable_locate_information()->set_locate_y(0);

+ 3 - 1
system/system_executor.h

@@ -38,6 +38,8 @@ public:
 public://API functions
 	//初始化
 	Error_manager system_executor_init(int threads_size);
+	//反初始化
+	Error_manager system_executor_uninit();
 
 	//检查执行者的状态, 判断能否处理这条消息,
 	Error_manager check_executer(Communication_message* p_msg);
@@ -50,7 +52,7 @@ public://API functions
 	//判断是否为待机,如果已经准备好,则可以执行任务。
 	bool is_ready();
 public://get or set member variable
-
+	System_executor_status get_system_executor_status();
 public:
 	//雷达感测定位 的处理函数
 //input::command_id, 消息指令id, 由主控制系统生成的唯一码

+ 18 - 0
tool/thread_pool.h

@@ -55,6 +55,9 @@ public:
 	//初始化,初始化 threads_size 数量的线程
 	void thread_pool_init(size_t threads_size);
 
+	//反初始化
+	void thread_pool_uninit();
+
 	//往线程池添加执行任务, 之后会唤醒一个线程去执行他.
 	//input: F&& f  			函数指针(函数名)
 	//input: Args&&... args		函数的参数, 自定义
@@ -176,7 +179,22 @@ inline void Thread_pool::thread_pool_init(size_t threads_size)
 		);
 }
 
+//反初始化
+inline void Thread_pool::thread_pool_uninit()
+{
+	{
+		std::unique_lock<std::mutex> lock(queue_mutex);
+		stop = true;
+	}
+	condition.notify_all();
 
+	for (auto iter = workers.begin(); iter != workers.end(); )
+	{
+		iter->join();
+		iter = workers.erase(iter);
+	}
+	working_flag_vector.clear();
+}
 
 
 //往线程池添加执行任务, 之后会唤醒一个线程去执行他.