/* * rabbitmq_base 通信模块的基类, * 用户从这个基类继承, 初始化之后, 便可以自动进行通信 * 重载解析消息和封装消息, nanosmg 必须实时接受消息, 如果不收可能就丢包, 所以本地开启了接受线程和解析线程. 接受线程实时接受,然后缓存到本地list, 然后解析线程再慢慢处理. rabbitmq 的服务器可以缓存消息, 如果子节点不接受, 数据仍然留在服务器上, 在需要的时候再去接受,不会丢包. 因此删除了接受线程和缓存list, 直接使用解析线程,一边接受一边处理,处理完后再接受下一条. * */ #ifndef __RIBBITMQ_BASE__HH__ #define __RIBBITMQ_BASE__HH__ #include #include #include #include #include #include #include #include #include "tool/static_tool.hpp" #include #include #include #include #include "error_code/error_code.hpp" #include "tool/binary_buf.h" #include "tool/thread_safe_list.hpp" #include "tool/thread_condition.h" #include "rabbitmq/rabbitmq.pb.h" #include "rabbitmq/rabbitmq_message.h" //rabbitmq初始化配置参数的默认路径 #define RABBITMQ_PARAMETER_PATH "../etc/rabbitmq.prototxt" //amqp_basic_qos设置通道每次只能接受PREFETCH_COUNT条消息, 默认每次只能同时接受1条消息 #define PREFETCH_COUNT 1 class Rabbitmq_base { //通信状态 enum Rabbitmq_status { RABBITMQ_STATUS_UNKNOW = 0, //通信状态 未知 RABBITMQ_STATUS_READY = 1, //通信状态 正常 RABBITMQ_STATUS_DISCONNECT = 11, //通信状态 断连(可能会在断连和重连之间反复跳动) RABBITMQ_STATUS_RECONNNECT = 12, //通信状态 重连(可能会在断连和重连之间反复跳动) RABBITMQ_STATUS_FAULT = 100, //通信状态 错误 }; public: Rabbitmq_base(); Rabbitmq_base(const Rabbitmq_base &other) = delete; Rabbitmq_base &operator=(const Rabbitmq_base &other) = delete; ~Rabbitmq_base(); public://API functions //初始化 通信 模块。如下三选一 Error_manager rabbitmq_init(); //初始化 通信 模块。从文件读取 Error_manager rabbitmq_init_from_protobuf(std::string prototxt_path); //初始化 通信 模块。从protobuf读取 Error_manager rabbitmq_init_from_protobuf(Rabbitmq_proto::Rabbitmq_parameter_all &rabbitmq_parameter_all); //反初始化 通信 模块。 Error_manager rabbitmq_uninit(); //重连, 快速uninit, init Error_manager rabbitmq_reconnnect(); //手动封装消息, 如下四选一 //手动封装消息,需要手动写入参数channel,exchange_name,routing_key Error_manager encapsulate_msg(std::string message, int channel, std::string exchange_name, std::string routing_key, int timeout_ms); //手动封装消息,需要手动写入参数channel,exchange_name,routing_key Error_manager encapsulate_msg(Rabbitmq_message *p_msg); //手动封装任务消息(请求和答复), 系统会使用rabbitmq.proto的配置参数, Error_manager encapsulate_task_msg(std::string message, int vector_index = 0); //手动封装状态消息, 系统会使用rabbitmq.proto的配置参数, Error_manager encapsulate_status_msg(std::string message, int vector_index = 0); //ack_msg 处理完消息后, 手动确认消息, 通知服务器队列删除消息. //执行者在execute_msg里面可以调用这个函数, 或者回调也行. Error_manager ack_msg(Rabbitmq_message *p_msg); //设置 自动封装状态的时间周期, 可选(默认1000ms) void set_encapsulate_status_cycle_time(unsigned int encapsulate_status_cycle_time); //设置回调函数check_msg_callback void set_check_msg_callback(Error_manager (*callback)(Rabbitmq_message *p_msg)); //设置回调函数check_executer_callback void set_check_executer_callback(Error_manager (*callback)(Rabbitmq_message *p_msg)); //设置回调函数execute_msg_callback void set_execute_msg_callback(Error_manager (*callback)(Rabbitmq_message *p_msg)); //设置回调函数encapsulate_status_callback void set_encapsulate_status_callback(Error_manager (*callback)()); protected: //创建通道队列消费者, (交换机和永久队列不在代码里创建,请在服务器上手动创建) Error_manager rabbitmq_new_channel_queue_consume(Rabbitmq_proto::Rabbitmq_parameter_all &rabbitmq_parameter_all); //启动通信, 开启线程, run thread Error_manager rabbitmq_run(); //mp_receive_analysis_thread 接受解析 执行函数, void receive_analysis_thread(); //检查消息是否有效, 主要检查消息类型和接受者, 判断这条消息是不是给我的., 需要子类重载 virtual Error_manager check_msg(Rabbitmq_message *p_msg); //检查执行者的状态, 判断能否处理这条消息, 需要子类重载 virtual Error_manager check_executer(Rabbitmq_message *p_msg); //处理消息, 需要子类重载 virtual Error_manager execute_msg(Rabbitmq_message *p_msg); //mp_send_thread 发送线程执行函数, void send_thread(); //mp_encapsulate_stauts_thread 自动封装线程执行函数, void encapsulate_status_thread(); //定时封装发送消息, 一般为心跳和状态信息, 需要子类重载 virtual Error_manager auto_encapsulate_status(); protected: //把rabbitmq的错误信息转化为string, amqp_status就是enum amqp_status_enum_, amqp_error_string2()函数可以把他翻译为string std::string amqp_error_to_string(int amqp_status); //把rabbitmq的错误信息转化为string, amqp_status就是enum amqp_status_enum_, amqp_error_string2()函数可以把他翻译为string std::string amqp_error_to_string(int amqp_status, std::string amqp_fun_name); //把rabbitmq的错误信息转化为string, amqp_rpc_reply_t就是amqp函数运行的结果 std::string amqp_error_to_string(amqp_rpc_reply_t amqp_rpc_reply); //把rabbitmq的错误信息转化为string, amqp_rpc_reply_t就是amqp函数运行的结果 std::string amqp_error_to_string(amqp_rpc_reply_t amqp_rpc_reply, std::string amqp_fun_name); protected://member variable Rabbitmq_status m_rabbitmq_status; //通信状态 //rabbitmq网络通信 连接配置信息 Rabbitmq_proto::Rabbitmq_parameter_all m_rabbitmq_parameter_all; amqp_connection_state_t_ *mp_connect; // 连接参数的结构体, 内存系统自动分配,自动释放 amqp_socket_t *mp_socket; // 网口通信socket, 内存系统自动分配,自动释放 std::string m_ip; //服务器ip地址, 不带端口 int m_port; //端口,默认5672 std::string m_user; //用户名, 默认guest std::string m_password; //密码, 默认guest std::mutex m_mutex; // socket的锁, 发送和接受的通信锁 std::map m_channel_map; // 通道的缓存,防止重复开启 // rabbitmq发送队列map std::map mp_rabbitmq_reciever; std::map mp_rabbitmq_sender_request; std::map mp_rabbitmq_sender_status; //接受模块, //rabbitmq 的服务器可以缓存消息, 如果子节点不接受, 数据仍然留在服务器上, 在需要的时候再去接受,不会丢包. //因此删除了接受线程和缓存list, 直接使用解析线程,一边接受一边处理,处理完后再接受下一条. std::thread *mp_receive_analysis_thread; //接受解析的线程指针 Thread_condition m_receive_analysis_condition; //接受解析的条件变量 //发送模块, Thread_safe_list m_send_list; //发送的list容器 std::thread *mp_send_thread; //发送的线程指针 Thread_condition m_send_condition; //发送的条件变量 //自动发送状态的 std::thread *mp_encapsulate_status_thread; //自动封装状态的线程指针 Thread_condition m_encapsulate_status_condition; //自动封装状态的条件变量 unsigned int m_encapsulate_status_cycle_time; //自动封装状态的时间周期 //回调函数, // //可以选择设置回调函数,或者子类继承重载,二选一. Error_manager (*check_msg_callback)(Rabbitmq_message *p_msg); Error_manager (*check_executer_callback)(Rabbitmq_message *p_msg); Error_manager (*execute_msg_callback)(Rabbitmq_message *p_msg); Error_manager (*encapsulate_status_callback)(); }; #endif //__RIBBITMQ_BASE__HH__