Browse Source

locate功能流程完成,测试通过,增加线程池

zx 5 years ago
parent
commit
482eeb3ea0

+ 4 - 4
CMakeLists.txt

@@ -19,11 +19,9 @@ include_directories(
         ./communication
         ./error_code
         ./tool
+        ./tool/TaskQueue/threadpp
         ./system
-#        ${PCL_INCLUDE_DIRS}
-#        ${OpenCV_INCLUDE_DIRS}
-#        ./laser
-#        ./locate
+        ./lidar_locate
 )
 link_directories("/usr/local/lib")
 
@@ -40,6 +38,7 @@ aux_source_directory(${CMAKE_CURRENT_LIST_DIR}/locate LOCATE_SRC )
 aux_source_directory(${CMAKE_CURRENT_LIST_DIR}/terminor TERMINOR_SRC )
 aux_source_directory(${CMAKE_CURRENT_LIST_DIR}/task TASK_MANAGER_SRC )
 aux_source_directory(${CMAKE_CURRENT_LIST_DIR}/tool TOOL_SRC )
+aux_source_directory(${CMAKE_CURRENT_LIST_DIR}/tool/TaskQueue TASK_QUEUE_SRC )
 aux_source_directory(${CMAKE_CURRENT_LIST_DIR}/communication COMMUNICATION_SRC )
 aux_source_directory(${CMAKE_CURRENT_LIST_DIR}/system SYSTEM_SRC )
 
@@ -56,6 +55,7 @@ add_executable(terminal
 #        ${LOCATE_SRC}
 #        ${TASK_MANAGER_SRC}
         ${TOOL_SRC}
+        ${TASK_QUEUE_SRC}
         ${COMMUNICATION_SRC}
         ${SYSTEM_SRC}
         )

+ 9 - 5
communication/communication_socket_base.cpp

@@ -238,10 +238,11 @@ void Communication_socket_base::receive_data_thread(Communication_socket_base* c
 	//通信接受线程, 负责接受socket消息, 并存入 m_receive_data_list
 	while (communicator->m_receive_condition.is_alive())
 	{
+        std::this_thread::yield();
+        usleep(1);
         communicator->m_receive_condition.wait();
 		if ( communicator->m_receive_condition.is_alive() )
 		{
-			std::this_thread::yield();
 
 			std::unique_lock<std::mutex> lk(communicator->m_mutex);
 			//flags为1, 非阻塞接受消息, 如果接收到消息, 那么接受数据长度大于0
@@ -317,10 +318,11 @@ void Communication_socket_base::analysis_data_thread()
 	//通信解析线程, 负责巡检m_receive_data_list, 并解析和处理消息
 	while (m_analysis_data_condition.is_alive())
 	{
+        std::this_thread::yield();
 		bool t_pass_flag = m_analysis_data_condition.wait_for_millisecond(1000);
 		if ( m_analysis_data_condition.is_alive() )
 		{
-			std::this_thread::yield();
+
 			//如果解析线程被主动唤醒, 那么就表示 收到新的消息, 那就遍历整个链表
 			if ( t_pass_flag )
 			{
@@ -470,10 +472,11 @@ void Communication_socket_base::send_data_thread(Communication_socket_base* comm
 	//通信发送线程, 负责巡检m_send_data_list, 并发送消息
 	while (communicator->m_send_data_condition.is_alive())
 	{
+        std::this_thread::yield();
         communicator->m_send_data_condition.wait();
 		if ( communicator->m_send_data_condition.is_alive() )
 		{
-			std::this_thread::yield();
+
 
 			Communication_message* tp_msg = NULL;
 			//这里 wait_and_pop 会使用链表内部的 m_data_cond 条件变量来控制等待,
@@ -513,10 +516,11 @@ void Communication_socket_base::encapsulate_data_thread()
 	//通信封装线程, 负责定时封装消息, 并存入 m_send_data_list
 	while (m_encapsulate_data_condition.is_alive())
 	{
+        std::this_thread::yield();
 		bool t_pass_flag = m_encapsulate_data_condition.wait_for_millisecond(1000);
 		if ( m_encapsulate_data_condition.is_alive() )
 		{
-			std::this_thread::yield();
+
 			//如果封装线程被主动唤醒, 那么就表示 需要主动发送消息,
 			if ( t_pass_flag )
 			{
@@ -542,7 +546,7 @@ Error_manager Communication_socket_base::encapsulate_send_data()
 //	static unsigned int t_heartbeat = 0;
 //	sprintf(buf, "Communication_socket_base, heartbeat = %d\0\0\0, test\0", t_heartbeat);
 //	t_heartbeat++;
-    return SUCCESS;
+
 	message::Base_info t_base_msg;
 	t_base_msg.set_msg_type(message::Message_type::eBase_msg);
 	t_base_msg.set_timeout_ms(5000);

+ 2 - 0
error_code/error_code.h

@@ -131,8 +131,10 @@ enum Error_code
     LOCATER_MSG_TABLE_NOT_EXIST ,
     LOCATER_MSG_RESPONSE_TYPE_ERROR,
     LOCATER_MSG_RESPONSE_INFO_ERROR,
+    LOCATER_MSG_REQUEST_CANCELED,
     LOCATER_MSG_REQUEST_INVALID,
     LOCATER_MSG_RESPONSE_HAS_NO_REQUEST,
+    LOCATER_MSG_REQUEST_REPEATED,
 
     //System_manager error from 0x04010000-0x0401FFFF
     SYSTEM_READ_PARAMETER_ERROR=0x04010100,

+ 106 - 24
lidar_locate/Locate_communicator.cpp

@@ -5,6 +5,22 @@
 #include "Locate_communicator.h"
 //#include "locate_message.pb.h"
 
+namespace message {
+    bool operator<(const message::Measure_request_msg msg1, const message::Measure_request_msg msg2) {
+        return (msg1.command_id() < msg2.command_id());
+    }
+
+    bool operator==(const message::Measure_request_msg msg1, const message::Measure_request_msg msg2) {
+        if (msg1.base_info().msg_type() == msg2.base_info().msg_type()
+            && msg1.base_info().sender() == msg2.base_info().sender()
+            && msg1.base_info().receiver() == msg2.base_info().receiver()
+            && msg1.command_id() == msg2.command_id()
+            && msg1.terminal_id() == msg2.terminal_id()) {
+            return true;
+        }
+        return false;
+    }
+}
 
 Locate_communicator::Locate_communicator()
 {
@@ -23,15 +39,17 @@ Error_manager Locate_communicator::encapsulate_msg(Communication_message* messag
     if(message->get_message_type()==Communication_message::eLocate_request_msg)
     {
         message::Measure_request_msg request;
-        m_response_table[request.command_id()]=message::Measure_response_msg();
+        request.ParseFromString(message->get_message_buf());
+        m_response_table[request]=message::Measure_response_msg();
+        //发送请求
+        return Communication_socket_base::encapsulate_msg(message);
     }
     else
     {
         return Error_manager(LOCATER_MSG_TABLE_NOT_EXIST,NEGLIGIBLE_ERROR,"message table is not exist");
     }
 
-    //发送请求
-    return Communication_socket_base::encapsulate_msg(message);
+
 }
 
 Error_manager Locate_communicator::locate_request(message::Measure_request_msg& request,message::Measure_response_msg& result)
@@ -39,9 +57,12 @@ Error_manager Locate_communicator::locate_request(message::Measure_request_msg&
     /*
      * 检查request合法性,以及模块状态
      */
+
     if(request.base_info().sender()!=message::eMain||request.base_info().receiver()!=message::eMeasurer)
         return Error_manager(LOCATER_MSG_REQUEST_INVALID,MINOR_ERROR,"measure request invalid");
 
+    if(m_response_table.find(request)!=m_response_table.end())
+        return Error_manager(LOCATER_MSG_REQUEST_REPEATED,MAJOR_ERROR," measure reques repeated");
     //设置超时,若没有设置,默认3000
     int timeout=request.base_info().has_timeout_ms()?request.base_info().timeout_ms():3000;
     //向测量节点发送测量请求,并记录请求
@@ -64,40 +85,79 @@ Error_manager Locate_communicator::locate_request(message::Measure_request_msg&
     double time=0;
     do{
         //在请求表中查询结果
-        message::Measure_response_msg response=m_response_table[request.command_id()];
-        //判断是否接收到回应,若回应信息被赋值则证明有回应
-        if(response.has_base_info() && response.has_command_id())
+        std::map<message::Measure_request_msg,message::Measure_response_msg>::iterator it=m_response_table.find(request);
+        //查询到记录
+        if(it!=m_response_table.end())
         {
-            message::Base_info response_base=response.base_info();
-            //检查类型是否匹配
-            if(response_base.msg_type() != message::eLocate_response_msg)
-            {
-                return Error_manager(LOCATER_MSG_RESPONSE_TYPE_ERROR,MAJOR_ERROR,"measure response msg type error");
-            }
-            //检查基本信息是否匹配
-            if(response_base.sender()!=message::eMeasurer ||
-                response_base.receiver()!=message::eMain||
-                response.command_id()!=request.command_id())
+            message::Measure_response_msg response=it->second;
+            //判断是否接收到回应,若回应信息被赋值则证明有回应
+            if (response.has_base_info() && response.has_command_id())
             {
-                return Error_manager(LOCATER_MSG_RESPONSE_INFO_ERROR,MAJOR_ERROR,"measure response msg info error");
-            }
-            result=response;
-            return SUCCESS;
+                message::Base_info response_base = response.base_info();
+                //检查类型是否匹配
+                if (response_base.msg_type() != message::eLocate_response_msg) {
+                    return Error_manager(LOCATER_MSG_RESPONSE_TYPE_ERROR, MAJOR_ERROR,
+                                         "measure response msg type error");
+                }
+                //检查基本信息是否匹配
+                if (response_base.sender() != message::eMeasurer ||
+                    response_base.receiver() != message::eMain ||
+                    response.command_id() != request.command_id()) {
+                    return Error_manager(LOCATER_MSG_RESPONSE_INFO_ERROR, MAJOR_ERROR,
+                                         "measure response msg info error");
+                }
+                result = response;
+                m_response_table.erase(request);
+
+                return SUCCESS;
 
+            }
+        }
+        else
+        {
+             //未查询到记录,任务已经被提前取消,记录被删除
+             return Error_manager(LOCATER_MSG_REQUEST_CANCELED,MINOR_ERROR,"measure request canceled");
         }
 
         auto end_time=std::chrono::system_clock::now();
         auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
         time=1000.0*double(duration.count()) * std::chrono::microseconds::period::num / std::chrono::microseconds::period::den;
         std::this_thread::yield();
+        usleep(1000);
     }while(time<double(timeout));
-
     return Error_manager(RESPONSE_TIMEOUT,MINOR_ERROR,"measure request timeout");
 }
 
-message::Measure_status_msg Locate_communicator::get_statu()
+//检查消息是否可以被解析, 需要子类重载
+Error_manager Locate_communicator::check_executer(Communication_message* p_msg)
+{
+    return SUCCESS;
+}
+
+Error_manager Locate_communicator::check_statu()
+{
+    return SUCCESS;
+}
+
+//检查消息是否有效, 主要检查消息类型和接受者, 判断这条消息是不是给我的.
+Error_manager Locate_communicator::check_msg(Communication_message*  p_msg)
 {
+    //通过 p_msg->get_message_type() 和 p_msg->get_receiver() 判断这条消息是不是给我的.
+    //子类重载时, 增加自己模块的判断逻辑, 以后再写.
+    if ( p_msg->get_message_type() == Communication_message::Message_type::eLocate_response_msg
+         && p_msg->get_receiver() == Communication_message::Communicator::eMain )
+    {
+        return Error_code::SUCCESS;
+    }
+    else
+    {
+        //认为接受人
+        return Error_code::INVALID_MESSAGE;
+    }
+}
 
+Error_manager Locate_communicator::encapsulate_send_data(){
+    return SUCCESS;
 }
 
 Error_manager Locate_communicator::execute_msg(Communication_message* p_msg)
@@ -114,13 +174,15 @@ Error_manager Locate_communicator::execute_msg(Communication_message* p_msg)
         {
             message::Measure_response_msg response;
             response.ParseFromString(p_msg->get_message_buf());
+            message::Measure_request_msg request=create_request_by_response(response);
             ///查询请求表是否存在
-            if(m_response_table.find(response.command_id())==m_response_table.end())
+            if(m_response_table.find(request)==m_response_table.end())
             {
                 return Error_manager(LOCATER_MSG_RESPONSE_HAS_NO_REQUEST,NEGLIGIBLE_ERROR,"measure response without request");
             }
             ////更新表
-            m_response_table[response.command_id()]=response;
+
+            m_response_table[request]=response;
             break;
         }
         ///测量系统状态
@@ -130,7 +192,27 @@ Error_manager Locate_communicator::execute_msg(Communication_message* p_msg)
             break;
         }
     }
+    return SUCCESS;
+
+}
 
+Error_manager Locate_communicator::cancel_request(message::Measure_request_msg& request)
+{
+    if(m_response_table.find(request)!=m_response_table.end())
+        m_response_table.erase(request);
+    return SUCCESS;
+}
 
+message::Measure_request_msg Locate_communicator::create_request_by_response(message::Measure_response_msg& response)
+{
+    message::Measure_request_msg request;
+    message::Base_info baseInfo;
+    baseInfo.set_msg_type(message::eLocate_request_msg);
+    baseInfo.set_sender(response.base_info().sender());
+    baseInfo.set_receiver(response.base_info().receiver());
+    request.mutable_base_info()->CopyFrom(baseInfo);
+    request.set_command_id(response.command_id());
+    request.set_terminal_id(response.command_id());
+    return request;
 }
 

+ 29 - 2
lidar_locate/Locate_communicator.h

@@ -5,6 +5,7 @@
 #ifndef NNXX_TESTS_LOCATE_COMMUNICATOR_H
 #define NNXX_TESTS_LOCATE_COMMUNICATOR_H
 
+
 #include <mutex>
 #include "communication_socket_base.h"
 #include "singleton.h"
@@ -14,23 +15,49 @@
 
 #include "thread_safe_map.h"
 
+namespace message
+{
+    bool operator<(const message::Measure_request_msg msg1,const message::Measure_request_msg msg2);
+    bool operator==(const message::Measure_request_msg msg1,const message::Measure_request_msg msg2);
+}
+
+
 class Locate_communicator: public Singleton<Locate_communicator>, public Communication_socket_base
 {
     friend Singleton<Locate_communicator>;
 public:
     virtual ~Locate_communicator();
     Error_manager locate_request(message::Measure_request_msg& request,message::Measure_response_msg& result);
+    /*
+     * 提前取消请求
+     */
+    Error_manager cancel_request(message::Measure_request_msg& request);
+    Error_manager check_statu();
 
-    message::Measure_status_msg get_statu();
 
 protected:
     Locate_communicator();
     virtual Error_manager encapsulate_msg(Communication_message* message);
     virtual Error_manager execute_msg(Communication_message* p_msg);
+    /*
+     * 检测消息是否可被处理
+     */
+    virtual Error_manager check_msg(Communication_message* p_msg);
+    /*
+     * 心跳发送函数,重载
+     */
+    virtual Error_manager encapsulate_send_data();
+    //检查消息是否可以被解析, 需要重载
+    virtual Error_manager check_executer(Communication_message* p_msg);
+
+    /*
+     * 根据接收到的response,生成对应的 request
+     */
+    message::Measure_request_msg create_request_by_response(message::Measure_response_msg& msg);
 
 protected:
 
-    thread_safe_map<int,message::Measure_response_msg>         m_response_table;
+    thread_safe_map<message::Measure_request_msg,message::Measure_response_msg>          m_response_table;
     Communication_message                                       m_measure_statu_msg;
 };
 

+ 58 - 8
main.cpp

@@ -4,20 +4,70 @@
 
 #include <iostream>
 #include <glog/logging.h>
-
+#include "TaskQueue/TQFactory.h"
 #include "./communication/communication_socket_base.h"
+#include "StoreProcessTask.h"
+#include "system_communicator.h"
+#include "Locate_communicator.h"
+#include "threadSafeQueue.h"
+
+///线程池
+tq::IQueue* g_pthread_queue = nullptr;
+Error_manager Init_communicators();
+
 
 int main(int argc,char* argv[])
 {
-	Communication_socket_base csb;
+    Error_manager code=Init_communicators();
+	if(code!=SUCCESS)
+    {
+	    LOG(ERROR)<<code.to_string();
+    }
+    g_pthread_queue= tq::TQFactory::CreateDefaultQueue();
+    g_pthread_queue->Start(6);
 
-//	std::vector<std::string> connect_string_vector;
-//	connect_string_vector.push_back("tcp://192.168.2.166:9001");
-//	csb.communication_init("tcp://192.168.2.166:9000", connect_string_vector);
+    //std::thread* pthread=new std::thread(delete_thread);
 
-	csb.communication_init();
+    int N=0;
+    while(1)
+    {
+        usleep(10);
+        StoreProcessTask* task=new StoreProcessTask();
 
-	char ch ;
-	std::cin >> ch ;
+        task->init_task(rand(),rand()%6);
+        if(g_pthread_queue->TaskCount()<12)
+        {
+            g_pthread_queue->AddTask(task);
+            N++;
+            printf("task size / pushed size : %d / %d\n",g_pthread_queue->TaskCount(),N);
+        }
+        else
+        {
+            delete task;
+        }
+    }
+
+    g_pthread_queue->WaitForFinish();
+    delete g_pthread_queue;
 	return 0;
+}
+
+
+Error_manager Init_communicators()
+{
+    Error_manager code;
+    if(Locate_communicator::get_instance_pointer()== nullptr)
+        return FAILED;
+    code=Locate_communicator::get_instance_pointer()->communication_connect("tcp://127.0.0.1:9006");
+    if(code!=SUCCESS)
+    {
+        return code;
+    }
+    Locate_communicator::get_instance_pointer()->communication_run();
+
+    ///最后初始化与终端通讯的对象
+    if(System_communicator::get_instance_pointer()== nullptr)
+        return FAILED;
+
+    return SUCCESS;
 }

+ 1 - 0
message/measure_message.pb.cc

@@ -1710,6 +1710,7 @@ void Measure_response_msg::InternalSwap(Measure_response_msg* other) {
 }
 
 
+
 // @@protoc_insertion_point(namespace_scope)
 }  // namespace message
 

+ 0 - 1
message/measure_message.pb.h

@@ -601,7 +601,6 @@ class Measure_response_msg : public ::google::protobuf::Message /* @@protoc_inse
 };
 // ===================================================================
 
-
 // ===================================================================
 
 #ifdef __GNUC__

+ 1 - 1
message/message_base.proto

@@ -26,7 +26,7 @@ enum Communicator
     eMain=0x0001;    //主流程
 
     eTerminor=0x0100;
-    //数据
+    //车位
     eTable=0x0200;
     //测量单元
     eMeasurer=0x0300;

+ 0 - 31
system/system_communication.cpp

@@ -1,31 +0,0 @@
-//
-// Created by huli on 2020/6/28.
-//
-
-#include "system_communication.h"
-
-System_communication::System_communication()
-{
-
-}
-
-System_communication::~System_communication()
-{
-
-}
-
-//定时封装发送消息, 一般为心跳和状态信息, 需要子类重载
-Error_manager System_communication::encapsulate_send_data()
-{
-
-	return Error_code::SUCCESS;
-}
-
-
-
-
-
-
-
-
-

+ 0 - 51
system/system_communication.h

@@ -1,51 +0,0 @@
-//
-// Created by huli on 2020/6/28.
-//
-
-#ifndef NNXX_TESTS_SYSTEM_COMMUNICATION_H
-#define NNXX_TESTS_SYSTEM_COMMUNICATION_H
-
-#include "../tool/singleton.h"
-#include "../communication/communication_socket_base.h"
-
-class System_communication:public Singleton<System_communication>, public Communication_socket_base
-{
-	enum Message_type
-	{
-		eCommand_msg=0x01,                      //指令消息
-
-		eLocate_status_msg=0x11,                //定位模块状态消息
-		eLocate_request_msg=0x12,               //定位请求消息
-		eLocate_response_msg=0x13,              //定位反馈消息
-
-		eHarware_statu_msg=0x21,                //调度模块硬件状态消息
-		eExecute_request_msg=0x22,              //请求调度消息
-		eExecute_response_msg=0x23,             //调度结果反馈消息
-
-	};
-// 子类必须把父类设定为友元函数,这样父类才能使用子类的私有构造函数。
-   friend class Singleton<System_communication>;
-private:
- // 父类的构造函数必须保护,子类的构造函数必须私有。
-   System_communication();
-public:
-    //必须关闭拷贝构造和赋值构造,只能通过 get_instance 函数来进行操作唯一的实例。
-    System_communication(const System_communication& other) = delete;
-    System_communication& operator =(const System_communication& other) = delete;
-    ~System_communication();
-public://API functions
-	//定时封装发送消息, 一般为心跳和状态信息, 需要子类重载
-	virtual Error_manager encapsulate_send_data();
-
-public://get or set member variable
-
-    
-protected://member variable 
-
-    
-private:
-    
-};
-
-
-#endif //NNXX_TESTS_SYSTEM_COMMUNICATION_H

+ 10 - 10
test/Locate_client.cpp

@@ -1,7 +1,7 @@
 //
 // Created by zx on 2020/7/3.
 //
-
+#include <unistd.h>
 #include <iostream>
 #include <nnxx/message>
 #include <nnxx/socket.h>
@@ -13,12 +13,12 @@ int main()
 {
     nnxx::socket socket{ nnxx::SP, nnxx::BUS };
     socket.bind("tcp://127.0.0.1:9006");
-    socket.connect("tcp://127.0.0.1:9005");
+
     int n=0;
 
     while(1)
     {
-        std::string t_receive_string = socket.recv<std::string>(1);
+        std::string t_receive_string = socket.recv<std::string>(0);
         if(t_receive_string.length()>0) {
 
             message::Base_info base_msg;
@@ -34,22 +34,22 @@ int main()
 
 
             response.set_command_id(request.command_id());
-            response.set_terminal_id(4);
+            response.set_terminal_id(request.terminal_id());
             response.mutable_base_info()->CopyFrom(base_msg);
 
 
-            error_code.set_error_code(n++);
+            int succ=int((rand()%1000)/999);
+            error_code.set_error_code(succ);
             response.mutable_error_manager()->CopyFrom(error_code);
 
-            std::cout<<response.DebugString()<<std::endl;
+            std::cout<<request.DebugString()<<std::endl;
 
             socket.send(response.SerializeAsString());
 
-            std::cout<<" response length : "<<response.SerializeAsString().length()<<std::endl;
-
-
+            usleep(1000);
+            std::this_thread::yield();
         }
-        std::this_thread::yield();
+
 
     }
 

+ 13 - 13
test/test_locate_sample.cpp

@@ -5,48 +5,48 @@
 #include "../lidar_locate/Locate_communicator.h";
 #include <unistd.h>
 #include "../tool/thread_safe_list.h"
-int main()
+int main(int argc,char* argv[])
 {
 
-    Locate_communicator* pLocator=Locate_communicator::get_instance_pointer();
+    int exe_id=rand()%1000000;
 
-    pLocator->communication_bind("tcp://127.0.0.1:9005");
+    Locate_communicator* pLocator=Locate_communicator::get_instance_pointer();
     pLocator->communication_connect("tcp://127.0.0.1:9006");
-
     pLocator->communication_run();
     int k=1;
     Thread_safe_list<std::string*> list;
     /*nnxx::socket socket{ nnxx::SP, nnxx::BUS };
     socket.bind("tcp://127.0.0.1:9005");
     socket.connect("tcp://127.0.0.1:9006");*/
+    int n=0;
 
     while(1) {
         message::Measure_request_msg request;
-        message::Base_info base_msg;
+        //message::Base_info base_msg;
 
 
-        request.mutable_base_info()->CopyFrom(base_msg);
+        //request.mutable_base_info()->CopyFrom(base_msg);
         request.mutable_base_info()->set_msg_type(message::eLocate_request_msg);
         request.mutable_base_info()->set_sender(message::eMain);
         request.mutable_base_info()->set_receiver(message::eMeasurer);
-        request.mutable_base_info()->set_timeout_ms(1000);
+        //request.mutable_base_info()->set_timeout_ms(1000);
 
-        request.set_command_id(222);
-        request.set_terminal_id(0);
+        request.set_command_id(exe_id+n++);
+        request.set_terminal_id(rand()%6);
         message::Measure_response_msg response;
 
-       message::Measure_request_msg request_1;
+        message::Measure_request_msg request_1;
         std::string str=request.SerializeAsString();
 
-        //socket.send(request_1.SerializeAsString());
-
         Error_manager code=pLocator->locate_request(request,response);
+
         if(code!=SUCCESS)
         {
             std::cout<<code.to_string()<<std::endl;
         }
         else if(request.command_id()==response.command_id()){
-            std::cout<<" Success request command :"<<request.command_id()<<std::endl;
+            std::cout<<" Success request command :"<<response.DebugString()<<std::endl;
+            std::cout<<" n ==  "<<n<<std::endl;
         }
         k++;
 

+ 4 - 0
tool/thread_safe_map.h

@@ -46,6 +46,10 @@ public:
         return dataMap_.end();
     }
 
+    unsigned int size()
+    {
+        return dataMap_.size();
+    }
 private:
     std::map<Key, Val> dataMap_;
     std::mutex mtx_;