3 Commits e8ce7d6f60 ... a099d59f09

Auteur SHA1 Message Date
  yct a099d59f09 rabbitmq pass test il y a 2 ans
  yct 8534754f07 rabbitmq need further test il y a 2 ans
  yct 292a9e16e0 rabbitmq with compile error, need test il y a 2 ans

+ 14 - 4
CMakeLists.txt

@@ -2,7 +2,7 @@ project(measure_wj_proj)
 
 cmake_minimum_required(VERSION 3.5)
 
-set (CMAKE_CXX_STANDARD 11)
+set (CMAKE_CXX_STANDARD 14)
 set(CMAKE_BUILD_TYPE "Release")
 
 # set(PCL_DIR "/home/youchen/pcl-1.8/share/pcl-1.8")
@@ -27,7 +27,7 @@ endif(NOT ${YAML_CPP_VERSION} VERSION_LESS "0.5")
 
 #other libs
 FIND_PACKAGE(Protobuf REQUIRED)
-FIND_PACKAGE(Glog REQUIRED)
+# FIND_PACKAGE(Glog REQUIRED)
 FIND_PACKAGE(OpenCV REQUIRED)
 FIND_PACKAGE(PCL REQUIRED)
 FIND_PACKAGE(Ceres REQUIRED)
@@ -67,6 +67,8 @@ aux_source_directory(${CMAKE_CURRENT_LIST_DIR}/velodyne_lidar/match3d/ VELODYNE_
 aux_source_directory(${CMAKE_CURRENT_LIST_DIR}/velodyne_lidar/velodyne_driver VELODYNE_LIDAR_DRIVER)
 aux_source_directory(${CMAKE_CURRENT_LIST_DIR}/velodyne_lidar VELODYNE_LIDAR)
 
+aux_source_directory(${CMAKE_CURRENT_LIST_DIR}/rabbitmq RABBIT_MQ)
+
 add_executable(measure_wj
         main.cpp
         ${ERROR_SRC}
@@ -83,11 +85,15 @@ add_executable(measure_wj
     ${VELODYNE_LIDAR_MATCH}
     ${VELODYNE_LIDAR_DRIVER}
     ${VELODYNE_LIDAR}
+    ${RABBIT_MQ}
 		)
 
 target_link_libraries(measure_wj
         /usr/local/lib/libglog.a
         /usr/local/lib/libgflags.a
+        # /usr/lib/x86_64-linux-gnu/libglog.so
+        # /usr/lib/x86_64-linux-gnu/libgflags.so
+        /usr/local/lib/librabbitmq.a
         nnxx
         nanomsg
 
@@ -114,6 +120,7 @@ ${TASK_MANAGER_SRC}
 ${TOOL_SRC}
 ${COMMUNICATION_SRC}
 ${SYSTEM_SRC}
+${RABBIT_MQ}
 ${VERIFY_SRC}
 
 ${VELODYNE_LIDAR_COMMON}
@@ -122,8 +129,10 @@ ${VELODYNE_LIDAR_DRIVER}
 ${VELODYNE_LIDAR}
 )
 target_link_libraries(vlp16 
-/usr/local/lib/libglog.a
-/usr/local/lib/libgflags.a
+        /usr/local/lib/libglog.a
+        /usr/local/lib/libgflags.a
+        # /usr/lib/x86_64-linux-gnu/libglog.so
+        # /usr/lib/x86_64-linux-gnu/libgflags.so
 nnxx
 nanomsg
 
@@ -133,6 +142,7 @@ ${GLOG_LIBRARIES}
 ${PCL_LIBRARIES}
 ${CERES_LIBRARIES}
 ${YAML_CPP_LIBRARIES}
+/usr/local/lib/librabbitmq.a
 )
 
 add_executable(lidar_calib_tool ./tests/lidar_calib_tools.cpp ./tool/point_tool.cpp)

+ 21 - 0
error_code/error_code.h

@@ -570,6 +570,27 @@ enum Error_code
     PARKSPACE_SEARCH_REQUEST_CANCELED,
     PARKSPACE_RELEASE_REQUEST_CANCELED,
 
+    //rabbitmq module, 通信模块  新版通信
+ 	RABBITMQ_BASE_ERROR_BASE					= 0x31010000,
+	RABBITMQ_READ_PROTOBUF_ERROR,					//rabbitmq模块,读取PROTO参数错误
+	RABBITMQ_AMQP_NEW_CONNECTION_ERROR,				//rabbitmq模块,新建连接状态错误
+	RABBITMQ_AMQP_TCP_SOCKET_NEW_ERROR,				//rabbitmq模块,新建socket错误
+	RABBITMQ_PROTOBUF_LOSS_ERROR,					//rabbitmq模块,PROTO缺少数据
+	RABBITMQ_AMQP_SOCKET_OPEN_ERROR,				//rabbitmq模块,打开socket错误
+	RABBITMQ_AMQP_LOGIN_ERROR,						//rabbitmq模块,登录服务器错误
+	RABBITMQ_AMQP_CHANNEL_OPEN_ERROR,				//rabbitmq模块,打开通道错误
+	RABBITMQ_AMQP_QUEUE_BIND_ERROR,					//rabbitmq模块,绑定队列错误
+	RABBITMQ_AMQP_NEW_CONSUME_ERROR,				//rabbitmq模块,新建消费者错误
+	RABBITMQ_AMQP_CONSUME_MESSAGE_ERROR,			//rabbitmq模块,接受消息错误
+	RABBITMQ_AMQP_BASIC_PUBLISH_ERROR,				//rabbitmq模块,发送消息错误
+	RABBITMQ_AMQP_BASIC_ACK_ERROR,					//rabbitmq模块,确认消息错误
+
+
+	RABBITMQ_BIND_ERROR,
+	RABBITMQ_CONNECT_ERROR,
+	RABBITMQ_ANALYSIS_TIME_OUT,									//解析超时,
+	RABBITMQ_EXCUTER_IS_BUSY,										//处理器正忙, 请稍等
+
 
 };
 

+ 11 - 0
main.cpp

@@ -132,6 +132,14 @@ int main(int argc,char* argv[])
 	}
 	System_communication::get_instance_references().set_encapsulate_cycle_time(110);
 
+
+	ec = System_communication_mq::get_instance_references().rabbitmq_init();
+	if(ec != SUCCESS)
+	{
+		LOG(ERROR) << "system communication mq init failed: " << ec.to_string();
+		return -1;
+	}
+
 	// prev_test_pred_task();
 	// test_whole_process();
 //    usleep(1000*5000);
@@ -163,6 +171,9 @@ int main(int argc,char* argv[])
 
 	// 反初始化
 	System_communication::get_instance_references().communication_uninit();
+	System_communication_mq::get_instance_references().rabbitmq_uninit();
+
+
 	System_executor::get_instance_references().system_executor_uninit();
 	if(WJ_VELO == 0 || WJ_VELO == 2)
 		Wanji_manager::get_instance_references().wanji_manager_uninit();

+ 10 - 0
proto.sh

@@ -1,4 +1,12 @@
 protoc -I=./message message_base.proto --cpp_out=./message
+###
+ # @Author: yct 18202736439@163.com
+ # @Date: 2022-07-26 23:01:22
+ # @LastEditors: yct 18202736439@163.com
+ # @LastEditTime: 2022-09-30 18:51:59
+ # @FilePath: /puai_wj_2021/proto.sh
+ # @Description: 这是默认设置,请设置`customMade`, 打开koroFileHeader查看配置 进行设置: https://github.com/OBKoro1/koro1FileHeader/wiki/%E9%85%8D%E7%BD%AE
+### 
 protoc -I=./message measure_message.proto --cpp_out=./message
 
 #protoc -I=./locate locate_parameter.proto --cpp_out=./locate
@@ -10,3 +18,5 @@ protoc -I=./wanji_lidar wj_lidar_conf.proto --cpp_out=./wanji_lidar
 protoc -I=./verify hardware_limit.proto --cpp_out=./verify
 
 protoc -I=./velodyne_lidar velodyne_config.proto --cpp_out=./velodyne_lidar
+
+protoc -I=./rabbitmq rabbitmq.proto --cpp_out=./rabbitmq

+ 65 - 0
rabbitmq/rabbitmq.proto

@@ -0,0 +1,65 @@
+syntax = "proto2";
+package Rabbitmq_proto;
+
+//Rabbitmq 配置的通道,队列和消费者,
+//注:目前规定
+//接受请求消息:同一子节点所有的请求消息写同一个channel通道编号,queue_durable = 1, queue_auto_delete = 0, consume_no_ack = 0,
+//接受状态消息:同一子节点所有的状态消息写同一个channel通道编号,queue_durable = 0, queue_auto_delete = 1, consume_no_ack = 1,
+message Rabbitmq_channel_queue_consume
+{
+    optional int32 channel = 1;                             //连接通道,必写, 可以相同, 不同的消息队列可以共用同一个连接通道,
+                                                            //配合 amqp_basic_qos 和 amqp_basic_ack , 来阻塞这个通道的接受消息
+                                                            //请求消息和状态消息必须分别写不同的通道,例如所有的请求都可以写12, 所有的状态都可以写34
+    optional string exchange_name = 2;                      //交换机名称,必写, 可以相同, 不同的消息队列可以共用同一个交换机,
+                                                            //配合 routingkey 和 bindingkey , 来分配消息到合适的队列
+
+    //发送队列专属
+    optional string routing_key = 3;                        //发送端的路由键, 交换机分配消息的判断条件
+    optional int32 timeout_ms = 4;                          //发送超时时间, 单位ms, 服务器会自动删除超时的消息, 如果写0,那么就没有超时删除
+
+    //接受队列专属
+    optional string binding_key = 5;                        //接受端的绑定键, 交换机分配消息的判断条件
+    optional string queue_name = 6;                         //队列名称,必写, 不能相同
+    optional int32 queue_passive = 7[default = 0];          //是否被动,默认0
+    optional int32 queue_durable = 8;                       //是否持久,必写, 节点代码可以创建临时队列(所有权归节点), 服务器手动创建永久队列(所有权归服务器)
+                                              		        //		1表示永久队列,当节点死掉,队列在服务器保留,仍然可以接受数据,节点上线后,可以接受掉线期间的所有数据
+                                              		        //		0表示临时队列,当节点死掉,队列消失,不再接受数据,直到下次恢复正常
+    optional int32 queue_exclusive = 9[default = 0];        //是否独立,默认0
+    optional int32 queue_auto_delete = 10[default = 0];      //是否自动删除, 固定写0,
+                                                            //1表示消息被消费者接受后,就自动删除消息, 当接收端断连后,队列也会删除,
+                                                            //0表示消息被消费者接受后,不会自动删除消息, 需要手动ack才会删除消息, 队列不会删除
+                                                            //一般情况下设为0,然后让接受者手动删除.
+
+
+    optional string consume_name = 11;                       //消费者名称,必写, 不能相同
+    optional int32 consume_no_local = 12[default = 0];      //是否非本地, 默认0,表示本地
+    optional int32 consume_no_ack = 13[default = 0];        //是否确认应答,默认0,表示接收后需要应答
+                                                            //请求消息必须写0, 必须应答之后才能接受下一条
+                                                            //状态消息必须写1, 可以无限循环接受,收到的瞬间,服务器就会删除这一条消息
+    optional int32 consume_exclusive = 14;                  //是否独立,默认0
+
+}
+
+
+
+//Rabbitmq 配置参数
+message Rabbitmq_parameter
+{
+    optional string ip = 1;             //服务器ip地址, 不带端口
+    optional int32 port = 2;            //端口,默认5672
+    optional string user = 3;           //用户名, 默认guest
+    optional string password = 4;       //密码, 默认guest
+
+    repeated Rabbitmq_channel_queue_consume rabbitmq_reciever_vector= 5;                //Rabbitmq 接受的通道,队列和消费者, 多个
+    repeated Rabbitmq_channel_queue_consume rabbitmq_sender_request_vector= 6;         //Rabbitmq 发送请求的通道
+    repeated Rabbitmq_channel_queue_consume rabbitmq_sender_status_vector= 7;          //Rabbitmq 发送状态的通道
+    //注:rabbitmq的接受是以队列为目标的, 可以同时接受多个队列的消息.
+    //注:rabbitmq的发送是以交换机为目标的,我们发送到交换机后,由交换机按照规则,去分配到下面的队列里面
+
+}
+
+//Rabbitmq 配置参数 总配置
+message Rabbitmq_parameter_all
+{
+    optional Rabbitmq_parameter        rabbitmq_parameters=1;
+}

Fichier diff supprimé car celui-ci est trop grand
+ 1046 - 0
rabbitmq/rabbitmq_base.cpp


+ 190 - 0
rabbitmq/rabbitmq_base.h

@@ -0,0 +1,190 @@
+
+/*
+ * rabbitmq_base 通信模块的基类,
+ * 用户从这个基类继承, 初始化之后, 便可以自动进行通信
+ * 重载解析消息和封装消息,
+
+ nanosmg 必须实时接受消息, 如果不收可能就丢包, 所以本地开启了接受线程和解析线程.
+ 	接受线程实时接受,然后缓存到本地list, 然后解析线程再慢慢处理.
+ rabbitmq 的服务器可以缓存消息, 如果子节点不接受, 数据仍然留在服务器上, 在需要的时候再去接受,不会丢包.
+ 	因此删除了接受线程和缓存list, 直接使用解析线程,一边接受一边处理,处理完后再接受下一条.
+
+ * */
+
+#ifndef __RIBBITMQ_BASE__HH__
+#define __RIBBITMQ_BASE__HH__
+
+#include <mutex>
+#include <thread>
+
+#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <time.h>
+#include <map>
+#include "../tool/proto_tool.h"
+
+//#ifdef _WIN32
+//#ifndef WIN32_LEAN_AND_MEAN
+//#define WIN32_LEAN_AND_MEAN
+//#endif
+//#include <Winsock2.h>
+//#else
+//#include <sys/time.h>
+//#endif
+#include <string>
+#include <rabbitmq-c/amqp.h>
+#include <rabbitmq-c/tcp_socket.h>
+#include <assert.h>
+
+#include <glog/logging.h>
+#include "../error_code/error_code.h"
+#include "../tool/binary_buf.h"
+#include "../tool/thread_safe_list.h"
+#include "../tool/thread_condition.h"
+
+#include "../message/message_base.pb.h"
+#include "../rabbitmq/rabbitmq.pb.h"
+#include "../rabbitmq/rabbitmq_message.h"
+
+//rabbitmq初始化配置参数的默认路径
+#define RABBITMQ_PARAMETER_PATH "../setting/rabbitmq.prototxt"
+//amqp_basic_qos设置通道每次只能接受PREFETCH_COUNT条消息, 默认每次只能同时接受1条消息
+#define PREFETCH_COUNT 1
+
+class Rabbitmq_base
+{
+	//通信状态
+	enum Rabbitmq_status
+	{
+		RABBITMQ_STATUS_UNKNOW = 0,            	//通信状态 未知
+		RABBITMQ_STATUS_READY = 1,            	//通信状态 正常
+		RABBITMQ_STATUS_DISCONNECT = 11,            	//通信状态 断连(可能会在断连和重连之间反复跳动)
+		RABBITMQ_STATUS_RECONNNECT = 12,            	//通信状态 重连(可能会在断连和重连之间反复跳动)
+
+
+		RABBITMQ_STATUS_FAULT = 100,         	//通信状态 错误
+	};
+
+public:
+	Rabbitmq_base();
+	Rabbitmq_base(const Rabbitmq_base &other) = delete;
+	Rabbitmq_base &operator=(const Rabbitmq_base &other) = delete;
+	~Rabbitmq_base();
+
+public://API functions
+	//初始化 通信 模块。如下三选一
+	Error_manager rabbitmq_init();
+	//初始化 通信 模块。从文件读取
+	Error_manager rabbitmq_init_from_protobuf(std::string prototxt_path);
+	//初始化 通信 模块。从protobuf读取
+	Error_manager rabbitmq_init_from_protobuf(Rabbitmq_proto::Rabbitmq_parameter_all &rabbitmq_parameter_all);
+
+	//反初始化 通信 模块。
+	Error_manager rabbitmq_uninit();
+	//重连, 快速uninit, init
+	Error_manager rabbitmq_reconnnect();
+
+	//手动封装消息, 如下四选一
+	//手动封装消息,需要手动写入参数channel,exchange_name,routing_key
+	Error_manager encapsulate_msg(std::string message, int channel, std::string exchange_name, std::string routing_key, int timeout_ms);
+	//手动封装消息,需要手动写入参数channel,exchange_name,routing_key
+	Error_manager encapsulate_msg(Rabbitmq_message* p_msg);
+	//手动封装任务消息(请求和答复), 系统会使用rabbitmq.proto的配置参数,
+	Error_manager encapsulate_task_msg(std::string message, int vector_index=0);
+	//手动封装状态消息, 系统会使用rabbitmq.proto的配置参数,
+	Error_manager encapsulate_status_msg(std::string message, int vector_index=0);
+
+	//ack_msg 处理完消息后, 手动确认消息, 通知服务器队列删除消息.
+	//执行者在execute_msg里面可以调用这个函数, 或者回调也行.
+	Error_manager ack_msg(Rabbitmq_message* p_msg);
+
+	//设置 自动封装状态的时间周期, 可选(默认1000ms)
+	void set_encapsulate_status_cycle_time(unsigned int encapsulate_status_cycle_time);
+
+	//设置回调函数check_msg_callback
+	void set_check_msg_callback(Error_manager (*callback)(Rabbitmq_message* p_msg));
+	//设置回调函数check_executer_callback
+	void set_check_executer_callback(Error_manager (*callback)(Rabbitmq_message* p_msg));
+	//设置回调函数execute_msg_callback
+	void set_execute_msg_callback(Error_manager (*callback)(Rabbitmq_message* p_msg));
+	//设置回调函数encapsulate_status_callback
+	void set_encapsulate_status_callback(Error_manager (*callback)());
+
+protected:
+	//创建通道队列消费者, (交换机和永久队列不在代码里创建,请在服务器上手动创建)
+	Error_manager rabbitmq_new_channel_queue_consume(Rabbitmq_proto::Rabbitmq_parameter_all &rabbitmq_parameter_all);
+	//启动通信, 开启线程, run thread
+	Error_manager rabbitmq_run();
+
+
+
+	//mp_receive_analysis_thread 接受解析 执行函数,
+	void receive_analysis_thread();
+	//检查消息是否有效, 主要检查消息类型和接受者, 判断这条消息是不是给我的., 需要子类重载
+	virtual Error_manager check_msg(Rabbitmq_message* p_msg);
+	//检查执行者的状态, 判断能否处理这条消息, 需要子类重载
+	virtual Error_manager check_executer(Rabbitmq_message* p_msg);
+	//处理消息, 需要子类重载
+	virtual Error_manager execute_msg(Rabbitmq_message* p_msg);
+
+	//mp_send_thread 发送线程执行函数,
+	void send_thread();
+
+	//mp_encapsulate_stauts_thread 自动封装线程执行函数,
+	void encapsulate_status_thread();
+	//定时封装发送消息, 一般为心跳和状态信息, 需要子类重载
+	virtual Error_manager auto_encapsulate_status();
+
+protected:
+	//把rabbitmq的错误信息转化为string, amqp_status就是enum amqp_status_enum_, amqp_error_string2()函数可以把他翻译为string
+	std::string amqp_error_to_string(int amqp_status);
+	//把rabbitmq的错误信息转化为string, amqp_status就是enum amqp_status_enum_, amqp_error_string2()函数可以把他翻译为string
+	std::string amqp_error_to_string(int amqp_status, std::string amqp_fun_name);
+	//把rabbitmq的错误信息转化为string, amqp_rpc_reply_t就是amqp函数运行的结果
+	std::string amqp_error_to_string(amqp_rpc_reply_t amqp_rpc_reply);
+	//把rabbitmq的错误信息转化为string, amqp_rpc_reply_t就是amqp函数运行的结果
+	std::string amqp_error_to_string(amqp_rpc_reply_t amqp_rpc_reply, std::string amqp_fun_name);
+
+protected://member variable
+	Rabbitmq_status					m_rabbitmq_status;		//通信状态
+
+	//rabbitmq网络通信 连接配置信息
+	Rabbitmq_proto::Rabbitmq_parameter_all		m_rabbitmq_parameter_all;
+	amqp_connection_state_t_ *  	mp_connect;		// 连接参数的结构体, 内存系统自动分配,自动释放
+	amqp_socket_t * 				mp_socket ;		// 网口通信socket, 内存系统自动分配,自动释放
+	std::string						m_ip;			//服务器ip地址, 不带端口
+	int 							m_port;			//端口,默认5672
+	std::string						m_user;			//用户名, 默认guest
+	std::string						m_password;		//密码, 默认guest
+	std::mutex 						m_mutex;		// socket的锁, 发送和接受的通信锁
+	std::map<int, bool>				m_channel_map;	// 通道的缓存,防止重复开启
+
+	//接受模块,
+	//rabbitmq 的服务器可以缓存消息, 如果子节点不接受, 数据仍然留在服务器上, 在需要的时候再去接受,不会丢包.
+	//因此删除了接受线程和缓存list, 直接使用解析线程,一边接受一边处理,处理完后再接受下一条.
+	std::thread*							mp_receive_analysis_thread;    			//接受解析的线程指针
+	Thread_condition						m_receive_analysis_condition;			//接受解析的条件变量
+
+	//发送模块,
+	Thread_safe_list<Rabbitmq_message*>		m_send_list;				//发送的list容器
+	std::thread*							mp_send_thread;    			//发送的线程指针
+	Thread_condition						m_send_condition;			//发送的条件变量
+	//自动发送状态的
+	std::thread*							mp_encapsulate_status_thread;    		//自动封装状态的线程指针
+	Thread_condition						m_encapsulate_status_condition;			//自动封装状态的条件变量
+	unsigned int 							m_encapsulate_status_cycle_time;		//自动封装状态的时间周期
+
+	//回调函数,
+	// //可以选择设置回调函数,或者子类继承重载,二选一.
+	Error_manager (*check_msg_callback)(Rabbitmq_message* p_msg);
+	Error_manager (*check_executer_callback)(Rabbitmq_message* p_msg);
+	Error_manager (*execute_msg_callback)(Rabbitmq_message* p_msg);
+	Error_manager (*encapsulate_status_callback)();
+};
+
+
+
+
+#endif //__RIBBITMQ_BASE__HH__

+ 109 - 0
rabbitmq/rabbitmq_message.cpp

@@ -0,0 +1,109 @@
+
+
+
+
+#include "rabbitmq_message.h"
+
+
+Rabbitmq_message::Rabbitmq_message()
+{
+	m_message_type = message::Message_type::eBase_msg;
+	m_receive_time = std::chrono::system_clock::now();
+	m_timeout_ms = std::chrono::milliseconds(0);		//超时默认0秒
+	m_sender = message::Communicator::eEmpty;
+	m_receiver = message::Communicator::eEmpty;
+//	m_message_buf = "";
+}
+
+Rabbitmq_message::Rabbitmq_message(const message::Base_info& base_info, std::string receive_string, int channel, int delivery_tag, std::string exchange_name, std::string routing_key)
+{
+	m_message_type = (message::Message_type)(base_info.msg_type());
+
+	m_receive_time = std::chrono::system_clock::now();
+	m_timeout_ms = std::chrono::milliseconds(base_info.timeout_ms());
+	m_sender = (message::Communicator)(base_info.sender());
+	m_receiver = (message::Communicator)(base_info.receiver());
+	m_message_buf = receive_string;
+
+	m_channel = channel;
+	m_delivery_tag = delivery_tag;
+	m_exchange_name = exchange_name;
+	m_routing_key = routing_key;
+}
+
+Rabbitmq_message::Rabbitmq_message(std::string receive_string, int channel, std::string exchange_name, std::string routing_key, int timeout_ms=0)
+{
+	m_timeout_ms = std::chrono::milliseconds(timeout_ms);
+	m_message_buf = receive_string;
+	m_channel = channel;
+	m_exchange_name = exchange_name;
+	m_routing_key = routing_key;
+}
+
+
+
+Rabbitmq_message::~Rabbitmq_message()
+{
+
+}
+
+bool Rabbitmq_message::is_over_time()
+{
+	if ( std::chrono::system_clock::now() - m_receive_time > m_timeout_ms)
+	{
+		return true;
+	}
+	else
+	{
+		return false;
+	}
+}
+
+
+void Rabbitmq_message::reset(const message::Base_info& base_info, std::string receive_string, int channel, int delivery_tag, std::string exchange_name, std::string routing_key)
+{
+	m_message_type = (message::Message_type)(base_info.msg_type());
+
+	m_receive_time = std::chrono::system_clock::now();
+	m_timeout_ms = std::chrono::milliseconds(base_info.timeout_ms());
+	m_sender = (message::Communicator)(base_info.sender());
+	m_receiver = (message::Communicator)(base_info.receiver());
+	m_message_buf = receive_string;
+	m_channel = channel;
+	m_delivery_tag = delivery_tag;
+	m_exchange_name = exchange_name;
+	m_routing_key = routing_key;
+}
+void Rabbitmq_message::reset(std::string receive_string, int channel, std::string exchange_name, std::string routing_key, int timeout_ms = 0)
+{
+	m_timeout_ms = std::chrono::milliseconds(timeout_ms);
+	m_message_buf = receive_string;
+	m_channel = channel;
+	m_exchange_name = exchange_name;
+	m_routing_key = routing_key;
+}
+
+message::Message_type Rabbitmq_message::get_message_type()
+{
+	return m_message_type;
+}
+message::Communicator Rabbitmq_message::get_sender()
+{
+	return m_sender;
+}
+message::Communicator Rabbitmq_message::get_receiver()
+{
+	return m_receiver;
+}
+std::string Rabbitmq_message::get_message_buf()
+{
+	return m_message_buf;
+}
+
+std::chrono::system_clock::time_point Rabbitmq_message::get_receive_time()
+{
+	return m_receive_time;
+}
+
+
+

+ 62 - 0
rabbitmq/rabbitmq_message.h

@@ -0,0 +1,62 @@
+//
+// Created by huli on 2020/6/29.
+//
+
+#ifndef __RABBITMQ_MESSAGE_H
+#define __RABBITMQ_MESSAGE_H
+
+#include "../error_code/error_code.h"
+
+#include <time.h>
+#include <sys/time.h>
+#include <chrono>
+//#include <iosfwd>
+
+#include <string>
+#include "../message/message_base.pb.h"
+
+
+class Rabbitmq_message
+{
+public:
+	Rabbitmq_message();
+	Rabbitmq_message(const message::Base_info& base_info, std::string receive_string, int channel, int delivery_tag, std::string exchange_name, std::string routing_key);
+	Rabbitmq_message(std::string receive_string, int channel, std::string exchange_name, std::string routing_key, int timeout_ms);
+	Rabbitmq_message(const Rabbitmq_message& other)= default;
+	Rabbitmq_message& operator =(const Rabbitmq_message& other)= default;
+	~Rabbitmq_message();
+public://API functions
+	bool is_over_time();
+public://get or set member variable
+	void reset(const message::Base_info& base_info, std::string receive_string, int channel, int delivery_tag, std::string exchange_name, std::string routing_key);
+	void reset(std::string receive_string, int channel, std::string exchange_name, std::string routing_key, int timeout_ms);
+
+	message::Message_type get_message_type();
+	message::Communicator get_sender();
+	message::Communicator get_receiver();
+	std::string get_message_buf();
+	std::chrono::system_clock::time_point get_receive_time();
+
+public://member variable
+	message::Message_type								m_message_type;				//消息类型
+	std::chrono::system_clock::time_point				m_receive_time;				//接收消息的时间点
+	std::chrono::milliseconds							m_timeout_ms;				//超时时间, 整个软件都统一为毫秒
+	message::Communicator								m_sender;					//发送者
+	message::Communicator								m_receiver;					//接受者
+
+	std::string											m_message_buf;				//消息数据
+
+	//rabbitmq 接受特有, 保存channel和delivery_tag, 用来ack
+	//rabbitmq 通用, 填写m_channel m_exchange_name m_routing_key 用来区别消息
+	int													m_channel;					//接受消息来源的通道
+	int													m_delivery_tag;				//接受消息的传递编号
+	std::string											m_exchange_name;			//交换机名称
+	std::string											m_routing_key;				//key,识别码
+
+
+private:
+
+};
+
+
+#endif //__RABBITMQ_MESSAGE_H

+ 303 - 0
rabbitmq/ttt.cpp

@@ -0,0 +1,303 @@
+#include "ttt.h"
+#include <unistd.h>
+
+
+CRabbitmqClient::CRabbitmqClient()
+: m_strHostname("")
+, m_iPort(0)
+, m_strUser("")
+, m_strPasswd("")
+, m_iChannel(1) //默认用1号通道,通道无所谓
+, m_pSock(NULL)
+, m_pConn(NULL) {
+
+}
+
+CRabbitmqClient::~CRabbitmqClient() {
+	if (NULL != m_pConn) {
+		Disconnect();
+		m_pConn = NULL;
+	}
+}
+
+int CRabbitmqClient::Connect(const string &strHostname, int iPort, const string &strUser, const string &strPasswd) {
+	m_strHostname = strHostname;
+	m_iPort = iPort;
+	m_strUser = strUser;
+	m_strPasswd = strPasswd;
+
+	m_pConn = amqp_new_connection();
+	if (NULL == m_pConn) {
+		fprintf(stderr, "amqp new connection failed\n");
+		return -1;
+	}
+
+	m_pSock =  amqp_tcp_socket_new(m_pConn);
+	if (NULL == m_pSock) {
+		fprintf(stderr, "amqp tcp new socket failed\n");
+		return -2;
+	}
+
+	int status = amqp_socket_open(m_pSock, m_strHostname.c_str(), m_iPort);
+	if (status<0) {
+		fprintf(stderr, "amqp socket open failed\n");
+		return -3;
+	}
+
+	// amqp_login(amqp_connection_state_t state,char const *vhost, int channel_max, int frame_max, int heartbeat, amqp_sasl_method_enum sasl_method, ..)
+	if (0 != ErrorMsg(amqp_login(m_pConn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, m_strUser.c_str(), m_strPasswd.c_str()), "Logging in")) {
+		return -4;
+	}
+
+	return 0;
+}
+
+int CRabbitmqClient::Disconnect() {
+	if (NULL != m_pConn) {
+		if (0 != ErrorMsg(amqp_connection_close(m_pConn, AMQP_REPLY_SUCCESS), "Closing connection"))
+			return -1;
+
+		if (amqp_destroy_connection(m_pConn) < 0)
+			return -2;
+
+		m_pConn = NULL;
+	}
+
+	return 0;
+}
+
+int CRabbitmqClient::ExchangeDeclare(const string &strExchange, const string &strType) {
+	amqp_channel_open(m_pConn, m_iChannel);
+
+	amqp_bytes_t _exchange = amqp_cstring_bytes(strExchange.c_str());
+	amqp_bytes_t _type = amqp_cstring_bytes(strType.c_str());
+	int _passive= 0;
+	int _durable= 0;      // 交换机是否持久化
+	amqp_exchange_declare(m_pConn, m_iChannel, _exchange, _type, _passive, _durable, 0, 0, amqp_empty_table);
+	if (0 != ErrorMsg(amqp_get_rpc_reply(m_pConn), "exchange_declare")) {
+		amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);
+		return -1;
+	}
+
+	amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);
+	return 0;
+}
+
+int CRabbitmqClient::QueueDeclare(const string &strQueueName) {
+	if(NULL == m_pConn) {
+		fprintf(stderr, "QueueDeclare m_pConn is null\n");
+		return -1;
+	}
+
+	amqp_channel_open(m_pConn, m_iChannel);
+	amqp_bytes_t _queue = amqp_cstring_bytes(strQueueName.c_str());
+	int32_t _passive = 0;
+	int32_t _durable = 0;
+	int32_t _exclusive = 0;
+	int32_t _auto_delete = 1;
+	amqp_queue_declare(m_pConn, m_iChannel, _queue, _passive, _durable, _exclusive, _auto_delete, amqp_empty_table);
+	if (0 != ErrorMsg(amqp_get_rpc_reply(m_pConn), "queue_declare")) {
+		amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);
+		return -1;
+	}
+
+	amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);
+	return 0;
+}
+
+int CRabbitmqClient::QueueBind(const string &strQueueName, const string &strExchange, const string &strBindKey) {
+	if(NULL == m_pConn) {
+		fprintf(stderr, "QueueBind m_pConn is null\n");
+		return -1;
+	}
+
+	amqp_channel_open(m_pConn, m_iChannel);
+	amqp_bytes_t _queue = amqp_cstring_bytes(strQueueName.c_str());
+	amqp_bytes_t _exchange = amqp_cstring_bytes(strExchange.c_str());
+	amqp_bytes_t _routkey  = amqp_cstring_bytes(strBindKey.c_str());
+	amqp_queue_bind(m_pConn, m_iChannel, _queue, _exchange, _routkey, amqp_empty_table);
+	if(0 != ErrorMsg(amqp_get_rpc_reply(m_pConn), "queue_bind")) {
+		amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);
+		return -1;
+	}
+
+	amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);
+	return 0;
+}
+
+int CRabbitmqClient::QueueUnbind(const string &strQueueName, const string &strExchange, const string &strBindKey) {
+	if(NULL == m_pConn) {
+		fprintf(stderr, "QueueUnbind m_pConn is null\n");
+		return -1;
+	}
+
+	amqp_channel_open(m_pConn, m_iChannel);
+	amqp_bytes_t _queue = amqp_cstring_bytes(strQueueName.c_str());
+	amqp_bytes_t _exchange = amqp_cstring_bytes(strExchange.c_str());
+	amqp_bytes_t _routkey  = amqp_cstring_bytes(strBindKey.c_str());
+	amqp_queue_unbind(m_pConn, m_iChannel, _queue, _exchange, _routkey, amqp_empty_table);
+	if(0 != ErrorMsg(amqp_get_rpc_reply(m_pConn), "queue_unbind")) {
+		amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);
+		return -1;
+	}
+
+	amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);
+	return 0;
+}
+
+int CRabbitmqClient::QueueDelete(const string &strQueueName, int iIfUnused) {
+	if(NULL == m_pConn) {
+		fprintf(stderr, "QueueDelete m_pConn is null\n");
+		return -1;
+	}
+
+	amqp_channel_open(m_pConn, m_iChannel);
+	if(0 != ErrorMsg(amqp_get_rpc_reply(m_pConn), "open channel")) {
+		amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);
+		return -2;
+	}
+
+	amqp_queue_delete(m_pConn, m_iChannel, amqp_cstring_bytes(strQueueName.c_str()), iIfUnused, 0);
+	if(0 != ErrorMsg(amqp_get_rpc_reply(m_pConn), "delete queue")) {
+		amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);
+		return -3;
+	}
+
+	amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);
+	return 0;
+}
+
+int CRabbitmqClient::Publish(const string &strMessage, const string &strExchange, const string &strRoutekey) {
+	if (NULL == m_pConn) {
+		fprintf(stderr, "publish m_pConn is null, publish failed\n");
+		return -1;
+	}
+
+	amqp_channel_open(m_pConn, m_iChannel);
+	if(0 != ErrorMsg(amqp_get_rpc_reply(m_pConn), "open channel")) {
+		amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);
+		return -2;
+	}
+
+	amqp_bytes_t message_bytes;
+	message_bytes.len = strMessage.length();
+	message_bytes.bytes = (void *)(strMessage.c_str());
+	//fprintf(stderr, "publish message(%d): %.*s\n", (int)message_bytes.len, (int)message_bytes.len, (char *)message_bytes.bytes);
+
+	/*
+	amqp_basic_properties_t props;
+	props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
+	props.content_type = amqp_cstring_bytes(m_type.c_str());
+	props.delivery_mode = m_durable;    // persistent delivery mode
+	*/
+
+	amqp_bytes_t exchange = amqp_cstring_bytes(strExchange.c_str());
+	amqp_bytes_t routekey = amqp_cstring_bytes(strRoutekey.c_str());
+
+	//if (0 != amqp_basic_publish(m_pConn, m_iChannel, exchange, routekey, 0, 0, &props, message_bytes)) {
+	if (0 != amqp_basic_publish(m_pConn, m_iChannel, exchange, routekey, 0, 0, NULL, message_bytes)) {
+		fprintf(stderr, "publish amqp_basic_publish failed\n");
+		if (0 != ErrorMsg(amqp_get_rpc_reply(m_pConn), "amqp_basic_publish")) {
+			amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);
+			return -3;
+		}
+	}
+
+	amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);
+	return 0;
+}
+
+int CRabbitmqClient::Consumer(const string &strQueueName, vector<string> &message_array, int GetNum, struct timeval *timeout) {
+	if (NULL == m_pConn) {
+		fprintf(stderr, "Consumer m_pConn is null, Consumer failed\n");
+		return -1;
+	}
+
+	amqp_channel_open(m_pConn, m_iChannel);
+	if (0 != ErrorMsg(amqp_get_rpc_reply(m_pConn), "open channel")) {
+		amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);
+		return -2;
+	}
+
+	amqp_basic_qos(m_pConn, m_iChannel, 0, GetNum, 0);
+	int ack = 1; // no_ack    是否需要确认消息后再从队列中删除消息
+	amqp_bytes_t queuename= amqp_cstring_bytes(strQueueName.c_str());
+	amqp_basic_consume(m_pConn, m_iChannel, queuename, amqp_empty_bytes, 0, ack, 0, amqp_empty_table);
+
+	if (0 != ErrorMsg(amqp_get_rpc_reply(m_pConn), "Consuming")) {
+		amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);
+		return -3;
+	}
+
+	int hasget = 0;
+	amqp_rpc_reply_t res;
+	amqp_envelope_t envelope;
+	while (GetNum > 0) {
+		amqp_maybe_release_buffers(m_pConn);
+		res = amqp_consume_message(m_pConn, &envelope, timeout, 0);
+		if (AMQP_RESPONSE_NORMAL != res.reply_type) {
+			fprintf(stderr, "Consumer amqp_channel_close failed\n");
+			amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);
+
+			if (0 == hasget)
+				return -res.reply_type;
+			else
+				return 0;
+		}
+
+		string str((char *)envelope.message.body.bytes, (char *)envelope.message.body.bytes + envelope.message.body.len);
+		message_array.push_back(str);
+		int rtn = amqp_basic_ack(m_pConn, m_iChannel, envelope.delivery_tag, 1);
+		amqp_destroy_envelope(&envelope);
+		if (rtn != 0) {
+			amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);
+			return -4;
+		}
+
+		GetNum--;
+		hasget++;
+		usleep(1);
+	}
+
+	return 0;
+}
+
+int CRabbitmqClient::ErrorMsg(amqp_rpc_reply_t x, char const *context) {
+	switch (x.reply_type) {
+		case AMQP_RESPONSE_NORMAL:
+			return 0;
+
+		case AMQP_RESPONSE_NONE:
+			fprintf(stderr, "%s: missing RPC reply type!\n", context);
+			break;
+
+		case AMQP_RESPONSE_LIBRARY_EXCEPTION:
+			fprintf(stderr, "%s: %s\n", context, amqp_error_string2(x.library_error));
+			break;
+
+		case AMQP_RESPONSE_SERVER_EXCEPTION:
+			switch (x.reply.id) {
+				case AMQP_CONNECTION_CLOSE_METHOD: {
+					amqp_connection_close_t *m = (amqp_connection_close_t *)x.reply.decoded;
+					fprintf(stderr, "%s: server connection error %uh, message: %.*s\n",
+							context, m->reply_code, (int)m->reply_text.len,
+							(char *)m->reply_text.bytes);
+					break;
+				}
+				case AMQP_CHANNEL_CLOSE_METHOD: {
+					amqp_channel_close_t *m = (amqp_channel_close_t *)x.reply.decoded;
+					fprintf(stderr, "%s: server channel error %uh, message: %.*s\n",
+							context, m->reply_code, (int)m->reply_text.len,
+							(char *)m->reply_text.bytes);
+					break;
+				}
+				default:
+					fprintf(stderr, "%s: unknown server error, method id 0x%08X\n",
+							context, x.reply.id);
+					break;
+			}
+			break;
+	}
+
+	return -1;
+}

+ 105 - 0
rabbitmq/ttt.h

@@ -0,0 +1,105 @@
+#ifndef RABBITMQ_CLIENT_H_
+#define RABBITMQ_CLIENT_H_
+
+
+#include <string>
+#include <vector>
+
+#include <rabbitmq-c/tcp_socket.h>
+
+using std::string;
+using std::vector;
+
+class CRabbitmqClient{
+public:
+	CRabbitmqClient();
+	~CRabbitmqClient();
+
+
+	int Connect(const string &strHostname, int iPort, const string &strUser, const string &strPasswd);
+	int Disconnect();
+
+	/**
+	*   @brief       ExchangeDeclare    声明exchange
+	*	@param       [in]               strExchange
+	*   @param       [in]               strType
+	*   @return 等于0值代表成功创建exchange,小于0代表错误
+	*/
+	int ExchangeDeclare(const string &strExchange, const string &strType);
+
+	/**
+	*   @brief       QueueDeclare                     声明消息队列
+	*	@param       [in]               strQueueName  消息队列实例
+	*   @param
+	*   @return 等于0值代表成功创建queue,小于0代表错误
+	*/
+	int QueueDeclare(const string &strQueueName);
+
+	/**
+	*   @brief       QueueBind                        将队列,交换机和绑定规则绑定起来形成一个路由表
+	*	@param       [in]               strQueueName  消息队列
+	*	@param       [in]               strExchange   交换机名称
+	*	@param       [in]               strBindKey    路由名称  “msg.#” “msg.weather.**”
+	*   @return 等于0值代表成功绑定,小于0代表错误
+	*/
+	int QueueBind(const string &strQueueName, const string &strExchange, const string &strBindKey);
+
+	/**
+	*   @brief       QueueUnbind                      将队列,交换机和绑定规则绑定解除
+	*	@param       [in]               strQueueName  消息队列
+	*	@param       [in]               strExchange   交换机名称
+	*	@param       [in]               strBindKey    路由名称  “msg.#” “msg.weather.**”
+	*   @return 等于0值代表成功绑定,小于0代表错误
+	*/
+	int QueueUnbind(const string &strQueueName, const string &strExchange, const string &strBindKey);
+
+	/**
+	*   @brief       QueueDelete                      删除消息队列。
+	*	@param       [in]               strQueueName  消息队列名称
+	*	@param       [in]               iIfUnused     消息队列是否在用,1 则论是否在用都删除
+	*   @return 等于0值代表成功删除queue,小于0代表错误
+	*/
+	int QueueDelete(const string &strQueueName, int iIfUnused);
+
+	/**
+	* @brief Publish  发布消息
+	* @param [in] strMessage        消息实体
+	* @param [in] strExchange       交换器
+	* @param [in] strRoutekey       路由规则
+	*   1.Direct Exchange – 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。
+	*   2.Fanout Exchange – 不处理路由键。将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。
+	*   3.Topic Exchange – 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。
+	*      因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”
+	* @return 等于0值代表成功发送消息实体,小于0代表发送错误
+	*/
+	int Publish(const string &strMessage, const string &strExchange, const string &strRoutekey);
+
+	/**
+	* @brief consumer  消费消息
+	* @param [in]  strQueueName         队列名称
+	* @param [out] message_array        获取的消息实体
+	* @param [int] GetNum               需要取得的消息个数
+	* @param [int] timeout              取得的消息是延迟,若为NULL,表示持续取,无延迟,阻塞状态
+	* @return 等于0值代表成功,小于0代表错误,错误信息从ErrorReturn返回
+	*/
+	int Consumer(const string &strQueueName, vector<string> &message_array, int GetNum = 1, struct timeval *timeout = NULL);
+
+
+private:
+	CRabbitmqClient(const CRabbitmqClient & rh);
+	void operator=(const CRabbitmqClient & rh);
+
+	int ErrorMsg(amqp_rpc_reply_t x, char const *context);
+
+
+	string                      m_strHostname;      // amqp主机
+	int                         m_iPort;            // amqp端口
+	string					    m_strUser;
+	string					    m_strPasswd;
+	int                         m_iChannel;
+
+	amqp_socket_t               *m_pSock;
+	amqp_connection_state_t     m_pConn;
+};
+
+#endif

+ 53 - 0
setting/rabbitmq.prototxt

@@ -0,0 +1,53 @@
+
+
+rabbitmq_parameters
+{
+    ip:"192.168.1.233"
+    port:5672
+    user:"zx"
+    password:"zx123456"
+
+
+#    rabbitmq_reciever_vector
+#     {
+#         channel:401
+#         exchange_name:"command_ex"
+
+#         binding_key:"count_command_signal_1_port"
+#         queue_name:"count_command_signal_1_queue"
+#         queue_passive:0
+#         queue_durable:1
+#         queue_exclusive:0
+#         queue_auto_delete:0
+#         consume_name:"count_command_signal_1_consume"
+#         consume_no_local:0
+#         consume_no_ack:0
+#         consume_exclusive:0
+#     }
+
+
+#     rabbitmq_reciever_vector
+#     {
+#         channel:402
+#         exchange_name:"command_ex"
+
+#         binding_key:"post_command_1_port"
+#         queue_name:"post_command_1_queue"
+#         queue_passive:0
+#         queue_durable:1
+#         queue_exclusive:0
+#         queue_auto_delete:0
+#         consume_name:"post_command_1_consume"
+#         consume_no_local:0
+#         consume_no_ack:0
+#         consume_exclusive:0
+#     }
+
+    rabbitmq_sender_status_vector
+    {
+        channel:411
+        exchange_name:"statu_ex"
+        routing_key:"measure_5_statu_port"
+        timeout_ms:0
+    }
+}

+ 58 - 0
system/system_communication mq.cpp

@@ -0,0 +1,58 @@
+/*
+ * @Author: yct 18202736439@163.com
+ * @Date: 2022-09-30 18:11:59
+ * @LastEditors: yct 18202736439@163.com
+ * @LastEditTime: 2022-09-30 18:41:52
+ * @FilePath: /puai_wj_2021/system/system_communication mq.cpp
+ * @Description: 这是默认设置,请设置`customMade`, 打开koroFileHeader查看配置 进行设置: https://github.com/OBKoro1/koro1FileHeader/wiki/%E9%85%8D%E7%BD%AE
+ */
+//
+// Created by huli on 2020/6/28.
+//
+
+#include "system_communication_mq.h"
+#include "../system/system_executor.h"
+
+System_communication_mq::System_communication_mq()
+{
+
+}
+
+System_communication_mq::~System_communication_mq()
+{
+
+}
+
+
+//检查消息是否有效, 主要检查消息类型和接受者, 判断这条消息是不是给我的.
+Error_manager System_communication_mq::check_msg(Rabbitmq_message*  p_msg)
+{
+	return SUCCESS;
+}
+
+//检查执行者的状态, 判断能否处理这条消息, 需要子类重载
+Error_manager System_communication_mq::check_executer(Rabbitmq_message*  p_msg)
+{
+	return SUCCESS;
+}
+
+//处理消息, 需要子类重载
+Error_manager System_communication_mq::execute_msg(Rabbitmq_message* p_msg)
+{
+	return SUCCESS;
+}
+
+//定时封装发送消息, 一般为心跳和状态信息, 需要子类重载
+Error_manager System_communication_mq::auto_encapsulate_status()
+{
+	return System_executor::get_instance_references().encapsulate_send_mq_status();
+}
+
+
+
+
+
+
+
+
+

+ 54 - 0
system/system_communication_mq.h

@@ -0,0 +1,54 @@
+/*
+ * @Author: yct 18202736439@163.com
+ * @Date: 2022-09-30 18:09:07
+ * @LastEditors: yct 18202736439@163.com
+ * @LastEditTime: 2022-09-30 18:35:01
+ * @FilePath: /puai_wj_2021/system/system_communication_mq.h
+ * @Description: 这是默认设置,请设置`customMade`, 打开koroFileHeader查看配置 进行设置: https://github.com/OBKoro1/koro1FileHeader/wiki/%E9%85%8D%E7%BD%AE
+ */
+//
+// Created by huli on 2020/6/28.
+//
+
+#ifndef NNXX_TESTS_SYSTEM_COMMUNICATION_MQ_H
+#define NNXX_TESTS_SYSTEM_COMMUNICATION_MQ_H
+
+#include "../tool/singleton.h"
+#include "../rabbitmq/rabbitmq_base.h"
+
+class System_communication_mq:public Singleton<System_communication_mq>, public Rabbitmq_base
+{
+
+// 子类必须把父类设定为友元函数,这样父类才能使用子类的私有构造函数。
+   friend class Singleton<System_communication_mq>;
+private:
+ // 父类的构造函数必须保护,子类的构造函数必须私有。
+   System_communication_mq();
+public:
+    //必须关闭拷贝构造和赋值构造,只能通过 get_instance 函数来进行操作唯一的实例。
+    System_communication_mq(const System_communication_mq& other) = delete;
+    System_communication_mq& operator =(const System_communication_mq& other) = delete;
+    ~System_communication_mq();
+public://API functions
+	//检查消息是否有效, 主要检查消息类型和接受者, 判断这条消息是不是给我的.
+	virtual Error_manager check_msg(Rabbitmq_message* p_msg);
+	//检查执行者的状态, 判断能否处理这条消息, 需要子类重载
+	virtual Error_manager check_executer(Rabbitmq_message* p_msg);
+	//处理消息, 需要子类重载
+	virtual Error_manager execute_msg(Rabbitmq_message* p_msg);
+
+	//定时封装发送消息, 一般为心跳和状态信息, 需要子类重载
+	virtual Error_manager auto_encapsulate_status();
+
+public://get or set member variable
+
+    
+protected://member variable 
+
+    
+private:
+    
+};
+
+
+#endif //NNXX_TESTS_SYSTEM_COMMUNICATION_MQ_H

+ 326 - 0
system/system_executor.cpp

@@ -312,6 +312,332 @@ bool System_executor::update_measure_info(message::Ground_status_msg &msg, Commo
 	return true;
 }
 
+// ***************************** rabbitmq *********************************
+Error_manager System_executor::encapsulate_send_mq_status()
+{
+	static int cloud_count=-1;
+	cloud_count++;
+    // if(!g_debug_file.is_open())
+    // {
+    //     g_debug_file.open("./filter_debug_result.txt", std::ios::app);
+    // }
+
+	Error_manager t_error;
+	//创建一条状态消息
+	message::Ground_status_msg t_ground_status_msg;
+	t_ground_status_msg.mutable_base_info()->set_msg_type(message::Message_type::eGround_status_msg);
+	t_ground_status_msg.mutable_base_info()->set_timeout_ms(5000);
+	t_ground_status_msg.mutable_base_info()->set_sender(message::Communicator::eGround_measurer);
+	t_ground_status_msg.mutable_base_info()->set_receiver(message::Communicator::eEmpty);
+
+	// t_ground_status_msg.set_terminal_id(m_terminal_id);
+	t_ground_status_msg.mutable_id_struct()->set_terminal_id(m_terminal_id);
+
+	// 创建各区域状态消息,
+	// 注意!!!目前公共消息名字依旧使用wj,不做修改
+	// manager
+#if WJ_VELO == 1
+	{
+		Velodyne_manager::Velodyne_manager_status t_velodyne_manager_status = Velodyne_manager::get_instance_references().get_status();
+		t_ground_status_msg.set_wanji_manager_status((message::Wanji_manager_status)t_velodyne_manager_status);
+
+		// lidar
+		std::map<int, Velodyne_lidar_device *> t_velodyne_lidar_device_map = Velodyne_manager::get_instance_references().get_velodyne_lidar_device_map();
+		std::map<int, Velodyne_lidar_device::Velodyne_lidar_device_status> t_velodyne_lidar_status_map;
+		for (auto iter = t_velodyne_lidar_device_map.begin(); iter != t_velodyne_lidar_device_map.end(); ++iter)
+		{
+			t_velodyne_lidar_status_map.emplace(std::pair<int, Velodyne_lidar_device::Velodyne_lidar_device_status>(iter->second->get_lidar_id(), iter->second->get_status()));
+		}
+
+		// region
+		std::map<int, Ground_region *> t_ground_region_map = Velodyne_manager::get_instance_references().get_ground_region_map();
+		int region_index = 0;
+		for (auto iter = t_ground_region_map.begin(); iter != t_ground_region_map.end(); ++iter)
+		{
+			// 以t_ground_status_msg为模板创建各区域心跳消息
+			message::Ground_status_msg t_multi_status_msg;
+			t_multi_status_msg.CopyFrom(t_ground_status_msg);
+			t_multi_status_msg.mutable_id_struct()->set_terminal_id(iter->second->get_terminal_id());
+			t_multi_status_msg.set_region_worker_status((message::Region_worker_status)(iter->second->get_status()));
+			velodyne::Region t_param = iter->second->get_param();
+			for (size_t j = 0; j < t_param.lidar_exts_size(); j++)
+			{
+				std::map<int, Velodyne_lidar_device::Velodyne_lidar_device_status>::iterator t_status_iter = t_velodyne_lidar_status_map.find(t_param.lidar_exts(j).lidar_id());
+				if (t_status_iter == t_velodyne_lidar_status_map.end())
+				{
+					LOG(WARNING) << "lidar status " << t_param.lidar_exts(j).lidar_id() << " cannot be found, param error";
+				}
+				else
+				{
+					t_multi_status_msg.add_wanji_lidar_device_status((message::Wanji_lidar_device_status)t_status_iter->second);
+				}
+			}
+			//velodyne雷达的自动定位信息
+			Common_data::Car_wheel_information t_car_wheel_information;
+			t_error = (*iter).second->get_last_wheel_information(&t_car_wheel_information, std::chrono::system_clock::now());
+
+			// 获取区域点云填入信息
+			pcl::PointCloud<pcl::PointXYZ>::Ptr t_region_cloud = pcl::PointCloud<pcl::PointXYZ>::Ptr(new pcl::PointCloud<pcl::PointXYZ>);
+			iter->second->get_region_cloud(t_region_cloud, Ground_region::Region_cloud_type::filtered);
+			if (cloud_count == 5)
+			{
+				std::string t_filename = std::string("region_") + std::to_string(iter->first) + "_cloud.txt";
+				save_cloud_txt(t_region_cloud, t_filename);
+				LOG(INFO) << "region " << iter->first << " cloud has been saved in " + t_filename;
+			}
+			// for (size_t j = 0; j < t_region_cloud->size(); j++)
+			// {
+			// 	message::Cloud_coordinate *tp_cloud = t_multi_status_msg.add_cloud();
+			// 	tp_cloud->set_x(t_region_cloud->points[j].x);
+			// 	tp_cloud->set_y(t_region_cloud->points[j].y);
+			// 	tp_cloud->set_z(t_region_cloud->points[j].z);
+			// }
+
+			update_measure_info(t_multi_status_msg, t_car_wheel_information, t_region_cloud->size());
+
+			std::string t_msg = t_multi_status_msg.SerializeAsString();
+			System_communication::get_instance_references().encapsulate_msg(t_msg);
+			if (t_multi_status_msg.id_struct().terminal_id() == DISP_TERM_ID)
+				std::cout << t_multi_status_msg.DebugString() << std::endl
+						  << std::endl;
+		}
+	}
+#elif WJ_VELO == 0
+	{
+		// wj_support 20220718
+		Wanji_manager::Wanji_manager_status t_wj_manager_status = Wanji_manager::get_instance_references().get_status();
+		t_ground_status_msg.set_wanji_manager_status((message::Wanji_manager_status)t_wj_manager_status);
+
+		// lidar
+		std::map<int, Wanji_lidar_device *> t_wj_lidar_device_map = Wanji_manager::get_instance_references().get_wanji_lidar_device_map();
+		std::map<int, Wanji_lidar_device::Wanji_lidar_device_status> t_wj_lidar_status_map;
+		for (auto iter = t_wj_lidar_device_map.begin(); iter != t_wj_lidar_device_map.end(); ++iter)
+		{
+			t_wj_lidar_status_map.emplace(std::pair<int, Wanji_lidar_device::Wanji_lidar_device_status>(iter->second->get_lidar_id(), iter->second->get_status()));
+		}
+
+		// region
+		std::map<int, Region_worker *> t_region_worker_map = Wanji_manager::get_instance_references().get_region_worker_map();
+		int region_index = 0;
+		for (auto iter = t_region_worker_map.begin(); iter != t_region_worker_map.end(); ++iter)
+		{
+			// 以t_ground_status_msg为模板创建各区域心跳消息
+			message::Ground_status_msg t_multi_status_msg;
+			t_multi_status_msg.CopyFrom(t_ground_status_msg);
+			t_multi_status_msg.mutable_id_struct()->set_terminal_id(iter->second->get_terminal_id());
+			t_multi_status_msg.set_region_worker_status((message::Region_worker_status)(iter->second->get_status()));
+			wj::Region t_param = iter->second->get_param();
+			for (size_t j = 0; j < t_param.lidar_exts_size(); j++)
+			{
+				std::map<int, Wanji_lidar_device::Wanji_lidar_device_status>::iterator t_status_iter = t_wj_lidar_status_map.find(t_param.lidar_exts(j).lidar_id());
+				if (t_status_iter == t_wj_lidar_status_map.end())
+				{
+					LOG(WARNING) << "lidar status " << t_param.lidar_exts(j).lidar_id() << " cannot be found, param error";
+				}
+				else
+				{
+					t_multi_status_msg.add_wanji_lidar_device_status((message::Wanji_lidar_device_status)t_status_iter->second);
+				}
+			}
+			//velodyne雷达的自动定位信息
+			Common_data::Car_wheel_information t_car_wheel_information;
+			t_error = (*iter).second->get_last_wheel_information(&t_car_wheel_information, std::chrono::system_clock::now());
+
+			// 获取区域点云填入信息
+			pcl::PointCloud<pcl::PointXYZ>::Ptr t_region_cloud = pcl::PointCloud<pcl::PointXYZ>::Ptr(new pcl::PointCloud<pcl::PointXYZ>);
+			iter->second->get_region_cloud(t_region_cloud);
+			if (cloud_count == 5)
+			{
+				std::string t_filename = std::string("region_") + std::to_string(iter->first) + "_cloud.txt";
+				save_cloud_txt(t_region_cloud, t_filename);
+				LOG(INFO) << "region " << iter->first << " cloud has been saved in " + t_filename;
+			}
+			// for (size_t j = 0; j < t_region_cloud->size(); j++)
+			// {
+			// 	message::Cloud_coordinate *tp_cloud = t_multi_status_msg.add_cloud();
+			// 	tp_cloud->set_x(t_region_cloud->points[j].x);
+			// 	tp_cloud->set_y(t_region_cloud->points[j].y);
+			// 	tp_cloud->set_z(t_region_cloud->points[j].z);
+			// }
+
+			update_measure_info(t_multi_status_msg, t_car_wheel_information, t_region_cloud->size());
+
+			std::string t_msg = t_multi_status_msg.SerializeAsString();
+			System_communication::get_instance_references().encapsulate_msg(t_msg);
+			if (t_multi_status_msg.id_struct().terminal_id() == DISP_TERM_ID)
+				LOG(INFO) << t_multi_status_msg.DebugString();
+		}
+	}
+	//	std::cout << " huli test :::: " << " t_ground_status_msg.DebugString() = "  << std::endl;
+	//	std::cout << t_ground_status_msg.DebugString() << std::endl;
+
+	//	std::cout << " huli test :::: " << " t_ground_status_msg.DebugString() = " << t_ground_status_msg.DebugString() << std::endl;
+#else
+{
+	// mix vlp16 and wj716
+	// 以多线雷达为主
+	Velodyne_manager::Velodyne_manager_status t_velodyne_manager_status = Velodyne_manager::get_instance_references().get_status();
+	t_ground_status_msg.set_wanji_manager_status((message::Wanji_manager_status)t_velodyne_manager_status);
+
+	// vlp16 lidar
+	std::map<int, Velodyne_lidar_device *> t_velodyne_lidar_device_map = Velodyne_manager::get_instance_references().get_velodyne_lidar_device_map();
+	std::map<int, Velodyne_lidar_device::Velodyne_lidar_device_status> t_velodyne_lidar_status_map;
+	for (auto iter = t_velodyne_lidar_device_map.begin(); iter != t_velodyne_lidar_device_map.end(); ++iter)
+	{
+		t_velodyne_lidar_status_map.emplace(std::pair<int, Velodyne_lidar_device::Velodyne_lidar_device_status>(iter->second->get_lidar_id(), iter->second->get_status()));
+	}
+
+	// wj lidar
+#if LIDAR_TYPE == 1
+	std::map<int, Wanji_lidar_device *> t_wj_lidar_device_map = Wanji_manager::get_instance_references().get_wanji_lidar_device_map();
+	std::map<int, Wanji_lidar_device::Wanji_lidar_device_status> t_wj_lidar_status_map;
+	for (auto iter = t_wj_lidar_device_map.begin(); iter != t_wj_lidar_device_map.end(); ++iter)
+	{
+		t_wj_lidar_status_map.emplace(std::pair<int, Wanji_lidar_device::Wanji_lidar_device_status>(iter->second->get_lidar_id(), iter->second->get_status()));
+	}
+#else
+	std::map<int, Wanji_716N_lidar_device *> t_wj_lidar_device_map = Wanji_manager::get_instance_references().get_wanji_lidar_device_map();
+	std::map<int, Wanji_716N_lidar_device::Wanji_716N_device_status> t_wj_lidar_status_map;
+	for (auto iter = t_wj_lidar_device_map.begin(); iter != t_wj_lidar_device_map.end(); ++iter)
+	{
+		t_wj_lidar_status_map.emplace(std::pair<int, Wanji_716N_lidar_device::Wanji_716N_device_status>(iter->second->get_lidar_id(), iter->second->get_status()));
+	}
+#endif
+	
+	
+
+	// wj region
+	std::map<int, Region_worker *> t_region_worker_map = Wanji_manager::get_instance_references().get_region_worker_map();
+
+	// vlp16 region
+	std::map<int, Ground_region *> t_ground_region_map = Velodyne_manager::get_instance_references().get_ground_region_map();
+	int region_index = 0;
+	for (auto iter = t_ground_region_map.begin(); iter != t_ground_region_map.end(); ++iter)
+	{
+		// 以t_ground_status_msg为模板创建各区域心跳消息
+		message::Ground_status_msg t_multi_status_msg;
+		t_multi_status_msg.CopyFrom(t_ground_status_msg);
+		t_multi_status_msg.mutable_id_struct()->set_terminal_id(iter->second->get_terminal_id());
+		t_multi_status_msg.set_region_worker_status((message::Region_worker_status)(iter->second->get_status()));
+		// 查找多线区域内对应多线激光雷达
+		velodyne::Region t_param = iter->second->get_param();
+		for (size_t j = 0; j < t_param.lidar_exts_size(); j++)
+		{
+			std::map<int, Velodyne_lidar_device::Velodyne_lidar_device_status>::iterator t_status_iter = t_velodyne_lidar_status_map.find(t_param.lidar_exts(j).lidar_id());
+			if (t_status_iter == t_velodyne_lidar_status_map.end())
+			{
+				LOG(WARNING) << "vlp16 lidar status " << t_param.lidar_exts(j).lidar_id() << " cannot be found, param error";
+			}
+			else
+			{
+				t_multi_status_msg.add_wanji_lidar_device_status((message::Wanji_lidar_device_status)t_status_iter->second);
+			}
+		}
+
+		//velodyne雷达的自动定位信息
+		Common_data::Car_wheel_information t_car_wheel_information;
+		t_error = (*iter).second->get_last_wheel_information(&t_car_wheel_information, std::chrono::system_clock::now());
+
+		// 获取区域点云填入信息
+		pcl::PointCloud<pcl::PointXYZ>::Ptr t_region_cloud = pcl::PointCloud<pcl::PointXYZ>::Ptr(new pcl::PointCloud<pcl::PointXYZ>);
+		iter->second->get_region_cloud(t_region_cloud, Ground_region::Region_cloud_type::filtered);
+		if (cloud_count == 5)
+		{
+			std::string t_filename = std::string("vlp16_region_") + std::to_string(iter->first) + "_cloud.txt";
+			save_cloud_txt(t_region_cloud, t_filename);
+			LOG(INFO) << "vlp16 region " << iter->first << " cloud has been saved in " + t_filename;
+		}
+		// for (size_t j = 0; j < t_region_cloud->size(); j++)
+		// {
+		// 	message::Cloud_coordinate *tp_cloud = t_multi_status_msg.add_cloud();
+		// 	tp_cloud->set_x(t_region_cloud->points[j].x);
+		// 	tp_cloud->set_y(t_region_cloud->points[j].y);
+		// 	tp_cloud->set_z(t_region_cloud->points[j].z);
+		// }
+
+		// 检查vlp16区域与wj区域对应关系,wj存在对应区域则填充设备状态,并融合测量结果
+		auto t_wj_region_iter = t_region_worker_map.find(iter->first);
+		if(t_wj_region_iter != t_region_worker_map.end())
+		{
+			wj::Region t_param = t_wj_region_iter->second->get_param();
+			// 
+			for (size_t j = 0; j < t_param.lidar_exts_size(); j++)
+			{
+#if LIDAR_TYPE == 1
+	std::map<int, Wanji_lidar_device::Wanji_lidar_device_status>::iterator t_status_iter = t_wj_lidar_status_map.find(t_param.lidar_exts(j).lidar_id());
+#else
+	std::map<int, Wanji_716N_lidar_device::Wanji_716N_device_status>::iterator t_status_iter = t_wj_lidar_status_map.find(t_param.lidar_exts(j).lidar_id());
+#endif
+				if (t_status_iter == t_wj_lidar_status_map.end())
+				{
+					LOG(WARNING) << "wj lidar status " << t_param.lidar_exts(j).lidar_id() << " cannot be found, param error";
+				}
+				else
+				{
+					t_multi_status_msg.add_wanji_lidar_device_status((message::Wanji_lidar_device_status)t_status_iter->second);
+				}
+			}
+			//wj雷达的自动定位信息
+			Common_data::Car_wheel_information t_wj_car_wheel_information;
+			t_error = t_wj_region_iter->second->get_last_wheel_information(&t_wj_car_wheel_information, std::chrono::system_clock::now());
+
+			// 获取区域点云填入信息
+			pcl::PointCloud<pcl::PointXYZ>::Ptr t_wj_region_cloud = pcl::PointCloud<pcl::PointXYZ>::Ptr(new pcl::PointCloud<pcl::PointXYZ>);
+			t_wj_region_iter->second->get_region_cloud(t_wj_region_cloud);
+			if (cloud_count == 8)
+			{
+				std::string t_filename = std::string("wj_region_") + std::to_string(t_wj_region_iter->first) + "_cloud.txt";
+				save_cloud_txt(t_wj_region_cloud, t_filename);
+				LOG(INFO) << "wj region " << t_wj_region_iter->first << " cloud has been saved in " + t_filename;
+			}
+			// for (size_t j = 0; j < t_wj_region_cloud->size(); j++)
+			// {
+			// 	message::Cloud_coordinate *tp_cloud = t_multi_status_msg.add_cloud();
+			// 	tp_cloud->set_x(t_wj_region_cloud->points[j].x);
+			// 	tp_cloud->set_y(t_wj_region_cloud->points[j].y);
+			// 	tp_cloud->set_z(t_wj_region_cloud->points[j].z);
+			// }
+
+			// LOG_IF(WARNING, iter->second->get_terminal_id() == DISP_TERM_ID)<<"\n"<<t_car_wheel_information.to_string()<<"\n"
+			// 								<<t_wj_car_wheel_information.to_string()<<"\n"
+			// 								<<t_car_wheel_information.range_status<<", "<<t_wj_car_wheel_information.range_status;
+			// 融合测量结果,验证中心点差异x<0.03, y<0.06, 角度差异ang<0.6, 轴距差异wb<0.055
+			double dx = t_car_wheel_information.car_center_x - t_wj_car_wheel_information.car_center_x;
+			double dy = t_car_wheel_information.car_center_y - t_wj_car_wheel_information.car_center_y;
+			double dang = t_car_wheel_information.car_angle - t_wj_car_wheel_information.car_angle;
+			double dwb = t_car_wheel_information.car_wheel_base - t_wj_car_wheel_information.car_wheel_base;
+
+			if(fabs(dx) < 0.03 && fabs(dy) < 0.06 && fabs(dang) < 0.6 && fabs(dwb) < 0.055)
+			{
+				t_car_wheel_information.car_wheel_base = t_car_wheel_information.car_wheel_base*0.3 + t_car_wheel_information.car_wheel_base*0.7;
+			}else
+			{
+				LOG_IF(WARNING, iter->second->get_terminal_id() == DISP_TERM_ID)<<
+					"detect mismatch, region "<<iter->first<<", dx dy dang dwb: "<<dx<<", "<<dy<<", "<<dang<<", "<< dwb;
+			}
+
+		}else{
+			LOG_IF(WARNING, iter->second->get_terminal_id() == DISP_TERM_ID)<<"region mismatch, vlp16 "<<iter->first<<", wj doesn't have corresponding region id";
+		}
+
+		update_measure_info(t_multi_status_msg, t_car_wheel_information, t_region_cloud->size());
+
+		std::string t_msg = t_multi_status_msg.DebugString();
+		if(t_multi_status_msg.id_struct().terminal_id() == 4)
+		{
+			System_communication_mq::get_instance_references().encapsulate_status_msg(t_msg, 0);
+		}
+		if (t_multi_status_msg.id_struct().terminal_id() == DISP_TERM_ID)
+			std::cout << t_multi_status_msg.DebugString() << std::endl
+						<< std::endl;
+	}
+}
+#endif
+
+	return Error_code::SUCCESS;
+}
+
+// ***************************** nanomsg *********************************
 //定时发送状态信息
 Error_manager System_executor::encapsulate_send_status()
 {

+ 4 - 0
system/system_executor.h

@@ -14,6 +14,7 @@
 #include "../tool/measure_filter.h"
 #include "../message/measure_message.pb.h"
 #include "../system/system_communication.h"
+#include "../system/system_communication_mq.h"
 
 #include "../wanji_lidar/wanji_manager.h"
 #include "../velodyne_lidar/velodyne_manager.h"
@@ -66,6 +67,9 @@ public://API functions
 	//定时发送状态信息
 	Error_manager encapsulate_send_status();
 
+	//定时发送mq状态信息
+	Error_manager encapsulate_send_mq_status();
+
 	//判断是否为待机,如果已经准备好,则可以执行任务。
 	bool is_ready();