123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202 |
- /*
- * rabbitmq_base 通信模块的基类,
- * 用户从这个基类继承, 初始化之后, 便可以自动进行通信
- * 重载解析消息和封装消息,
- nanosmg 必须实时接受消息, 如果不收可能就丢包, 所以本地开启了接受线程和解析线程.
- 接受线程实时接受,然后缓存到本地list, 然后解析线程再慢慢处理.
- rabbitmq 的服务器可以缓存消息, 如果子节点不接受, 数据仍然留在服务器上, 在需要的时候再去接受,不会丢包.
- 因此删除了接受线程和缓存list, 直接使用解析线程,一边接受一边处理,处理完后再接受下一条.
- * */
- #ifndef __RIBBITMQ_BASE__HH__
- #define __RIBBITMQ_BASE__HH__
- #include <mutex>
- #include <thread>
- #include <cstdint>
- #include <cstdio>
- #include <cstdlib>
- #include <cstring>
- #include <ctime>
- #include <map>
- #include "tool/static_tool.hpp"
- #include <rabbitmq-c/amqp.h>
- #include <rabbitmq-c/tcp_socket.h>
- #include <cassert>
- #include <glog/logging.h>
- #include "error_code/error_code.hpp"
- #include "tool/binary_buf.h"
- #include "tool/thread_safe_list.hpp"
- #include "tool/thread_condition.h"
- #include "rabbitmq/rabbitmq.pb.h"
- #include "rabbitmq/rabbitmq_message.h"
- //rabbitmq初始化配置参数的默认路径
- #define RABBITMQ_PARAMETER_PATH "../etc/rabbitmq.prototxt"
- //amqp_basic_qos设置通道每次只能接受PREFETCH_COUNT条消息, 默认每次只能同时接受1条消息
- #define PREFETCH_COUNT 1
- class Rabbitmq_base {
- //通信状态
- enum Rabbitmq_status {
- RABBITMQ_STATUS_UNKNOW = 0, //通信状态 未知
- RABBITMQ_STATUS_READY = 1, //通信状态 正常
- RABBITMQ_STATUS_DISCONNECT = 11, //通信状态 断连(可能会在断连和重连之间反复跳动)
- RABBITMQ_STATUS_RECONNNECT = 12, //通信状态 重连(可能会在断连和重连之间反复跳动)
- RABBITMQ_STATUS_FAULT = 100, //通信状态 错误
- };
- public:
- Rabbitmq_base();
- Rabbitmq_base(const Rabbitmq_base &other) = delete;
- Rabbitmq_base &operator=(const Rabbitmq_base &other) = delete;
- ~Rabbitmq_base();
- public://API functions
- //初始化 通信 模块。如下三选一
- Error_manager rabbitmq_init();
- //初始化 通信 模块。从文件读取
- Error_manager rabbitmq_init_from_protobuf(std::string prototxt_path);
- //初始化 通信 模块。从protobuf读取
- Error_manager rabbitmq_init_from_protobuf(Rabbitmq_proto::Rabbitmq_parameter_all &rabbitmq_parameter_all);
- //反初始化 通信 模块。
- Error_manager rabbitmq_uninit();
- //重连, 快速uninit, init
- Error_manager rabbitmq_reconnnect();
- //手动封装消息, 如下四选一
- //手动封装消息,需要手动写入参数channel,exchange_name,routing_key
- Error_manager encapsulate_msg(std::string message, int channel, std::string exchange_name, std::string routing_key,
- int timeout_ms);
- //手动封装消息,需要手动写入参数channel,exchange_name,routing_key
- Error_manager encapsulate_msg(Rabbitmq_message *p_msg);
- //手动封装任务消息(请求和答复), 系统会使用rabbitmq.proto的配置参数,
- Error_manager encapsulate_task_msg(std::string message, int vector_index = 0);
- //手动封装状态消息, 系统会使用rabbitmq.proto的配置参数,
- Error_manager encapsulate_status_msg(std::string message, int vector_index = 0);
- //ack_msg 处理完消息后, 手动确认消息, 通知服务器队列删除消息.
- //执行者在execute_msg里面可以调用这个函数, 或者回调也行.
- Error_manager ack_msg(Rabbitmq_message *p_msg);
- //设置 自动封装状态的时间周期, 可选(默认1000ms)
- void set_encapsulate_status_cycle_time(unsigned int encapsulate_status_cycle_time);
- //设置回调函数check_msg_callback
- void set_check_msg_callback(Error_manager (*callback)(Rabbitmq_message *p_msg));
- //设置回调函数check_executer_callback
- void set_check_executer_callback(Error_manager (*callback)(Rabbitmq_message *p_msg));
- //设置回调函数execute_msg_callback
- void set_execute_msg_callback(Error_manager (*callback)(Rabbitmq_message *p_msg));
- //设置回调函数encapsulate_status_callback
- void set_encapsulate_status_callback(Error_manager (*callback)());
- protected:
- //创建通道队列消费者, (交换机和永久队列不在代码里创建,请在服务器上手动创建)
- Error_manager rabbitmq_new_channel_queue_consume(Rabbitmq_proto::Rabbitmq_parameter_all &rabbitmq_parameter_all);
- //启动通信, 开启线程, run thread
- Error_manager rabbitmq_run();
- //mp_receive_analysis_thread 接受解析 执行函数,
- void receive_analysis_thread();
- //检查消息是否有效, 主要检查消息类型和接受者, 判断这条消息是不是给我的., 需要子类重载
- virtual Error_manager check_msg(Rabbitmq_message *p_msg);
- //检查执行者的状态, 判断能否处理这条消息, 需要子类重载
- virtual Error_manager check_executer(Rabbitmq_message *p_msg);
- //处理消息, 需要子类重载
- virtual Error_manager execute_msg(Rabbitmq_message *p_msg);
- //mp_send_thread 发送线程执行函数,
- void send_thread();
- //mp_encapsulate_stauts_thread 自动封装线程执行函数,
- void encapsulate_status_thread();
- //定时封装发送消息, 一般为心跳和状态信息, 需要子类重载
- virtual Error_manager auto_encapsulate_status();
- protected:
- //把rabbitmq的错误信息转化为string, amqp_status就是enum amqp_status_enum_, amqp_error_string2()函数可以把他翻译为string
- std::string amqp_error_to_string(int amqp_status);
- //把rabbitmq的错误信息转化为string, amqp_status就是enum amqp_status_enum_, amqp_error_string2()函数可以把他翻译为string
- std::string amqp_error_to_string(int amqp_status, std::string amqp_fun_name);
- //把rabbitmq的错误信息转化为string, amqp_rpc_reply_t就是amqp函数运行的结果
- std::string amqp_error_to_string(amqp_rpc_reply_t amqp_rpc_reply);
- //把rabbitmq的错误信息转化为string, amqp_rpc_reply_t就是amqp函数运行的结果
- std::string amqp_error_to_string(amqp_rpc_reply_t amqp_rpc_reply, std::string amqp_fun_name);
- protected://member variable
- Rabbitmq_status m_rabbitmq_status; //通信状态
- //rabbitmq网络通信 连接配置信息
- Rabbitmq_proto::Rabbitmq_parameter_all m_rabbitmq_parameter_all;
- amqp_connection_state_t_ *mp_connect; // 连接参数的结构体, 内存系统自动分配,自动释放
- amqp_socket_t *mp_socket; // 网口通信socket, 内存系统自动分配,自动释放
- std::string m_ip; //服务器ip地址, 不带端口
- int m_port; //端口,默认5672
- std::string m_user; //用户名, 默认guest
- std::string m_password; //密码, 默认guest
- std::mutex m_mutex; // socket的锁, 发送和接受的通信锁
- std::map<int, bool> m_channel_map; // 通道的缓存,防止重复开启
- // rabbitmq发送队列map
- std::map<const std::string, Rabbitmq_proto::Rabbitmq_channel_queue_consume> mp_rabbitmq_reciever;
- std::map<const std::string, Rabbitmq_proto::Rabbitmq_channel_queue_consume> mp_rabbitmq_sender_request;
- std::map<const std::string, Rabbitmq_proto::Rabbitmq_channel_queue_consume> mp_rabbitmq_sender_status;
- //接受模块,
- //rabbitmq 的服务器可以缓存消息, 如果子节点不接受, 数据仍然留在服务器上, 在需要的时候再去接受,不会丢包.
- //因此删除了接受线程和缓存list, 直接使用解析线程,一边接受一边处理,处理完后再接受下一条.
- std::thread *mp_receive_analysis_thread; //接受解析的线程指针
- Thread_condition m_receive_analysis_condition; //接受解析的条件变量
- //发送模块,
- Thread_safe_list<Rabbitmq_message *> m_send_list; //发送的list容器
- std::thread *mp_send_thread; //发送的线程指针
- Thread_condition m_send_condition; //发送的条件变量
- //自动发送状态的
- std::thread *mp_encapsulate_status_thread; //自动封装状态的线程指针
- Thread_condition m_encapsulate_status_condition; //自动封装状态的条件变量
- unsigned int m_encapsulate_status_cycle_time; //自动封装状态的时间周期
- //回调函数,
- // //可以选择设置回调函数,或者子类继承重载,二选一.
- Error_manager (*check_msg_callback)(Rabbitmq_message *p_msg);
- Error_manager (*check_executer_callback)(Rabbitmq_message *p_msg);
- Error_manager (*execute_msg_callback)(Rabbitmq_message *p_msg);
- Error_manager (*encapsulate_status_callback)();
- };
- #endif //__RIBBITMQ_BASE__HH__
|