rabbitmq_base.h 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. /*
  2. * rabbitmq_base 通信模块的基类,
  3. * 用户从这个基类继承, 初始化之后, 便可以自动进行通信
  4. * 重载解析消息和封装消息,
  5. nanosmg 必须实时接受消息, 如果不收可能就丢包, 所以本地开启了接受线程和解析线程.
  6. 接受线程实时接受,然后缓存到本地list, 然后解析线程再慢慢处理.
  7. rabbitmq 的服务器可以缓存消息, 如果子节点不接受, 数据仍然留在服务器上, 在需要的时候再去接受,不会丢包.
  8. 因此删除了接受线程和缓存list, 直接使用解析线程,一边接受一边处理,处理完后再接受下一条.
  9. * */
  10. #ifndef __RIBBITMQ_BASE__HH__
  11. #define __RIBBITMQ_BASE__HH__
  12. #include <mutex>
  13. #include <thread>
  14. #include <cstdint>
  15. #include <cstdio>
  16. #include <cstdlib>
  17. #include <cstring>
  18. #include <ctime>
  19. #include <map>
  20. #include "tool/load_protobuf.hpp"
  21. #include <rabbitmq-c/amqp.h>
  22. #include <rabbitmq-c/tcp_socket.h>
  23. #include <cassert>
  24. #include <glog/logging.h>
  25. #include "tool/error_code.hpp"
  26. #include "thread/binary_buf.h"
  27. #include "tool/thread_safe_list.hpp"
  28. #include "thread/thread_condition.h"
  29. #include "rabbitmq/rabbitmq.pb.h"
  30. #include "rabbitmq/rabbitmq_message.h"
  31. //rabbitmq初始化配置参数的默认路径
  32. #define RABBITMQ_PARAMETER_PATH "../etc/rabbitmq.prototxt"
  33. //amqp_basic_qos设置通道每次只能接受PREFETCH_COUNT条消息, 默认每次只能同时接受1条消息
  34. #define PREFETCH_COUNT 1
  35. class Rabbitmq_base {
  36. //通信状态
  37. enum Rabbitmq_status {
  38. RABBITMQ_STATUS_UNKNOW = 0, //通信状态 未知
  39. RABBITMQ_STATUS_READY = 1, //通信状态 正常
  40. RABBITMQ_STATUS_DISCONNECT = 11, //通信状态 断连(可能会在断连和重连之间反复跳动)
  41. RABBITMQ_STATUS_RECONNNECT = 12, //通信状态 重连(可能会在断连和重连之间反复跳动)
  42. RABBITMQ_STATUS_FAULT = 100, //通信状态 错误
  43. };
  44. public:
  45. struct QueueKey {
  46. std::string ex;
  47. std::string queue;
  48. };
  49. public:
  50. Rabbitmq_base();
  51. Rabbitmq_base(const Rabbitmq_base &other) = delete;
  52. Rabbitmq_base &operator=(const Rabbitmq_base &other) = delete;
  53. ~Rabbitmq_base();
  54. public://API functions
  55. //初始化 通信 模块。如下三选一
  56. virtual Error_manager rabbitmq_init();
  57. //初始化 通信 模块。从文件读取
  58. virtual Error_manager rabbitmq_init_from_protobuf(std::string prototxt_path);
  59. //初始化 通信 模块。从protobuf读取
  60. Error_manager rabbitmq_init_from_protobuf(Rabbitmq_proto::Rabbitmq_parameter_all &rabbitmq_parameter_all);
  61. //反初始化 通信 模块。
  62. Error_manager rabbitmq_uninit();
  63. //重连, 快速uninit, init
  64. Error_manager rabbitmq_reconnnect();
  65. //手动封装消息, 如下四选一
  66. //手动封装消息,需要手动写入参数channel,exchange_name,routing_key
  67. Error_manager encapsulate_msg(std::string message, int channel, std::string exchange_name, std::string routing_key,
  68. int timeout_ms);
  69. //手动封装消息,需要手动写入参数channel,exchange_name,routing_key
  70. Error_manager encapsulate_msg(Rabbitmq_message *p_msg);
  71. //手动封装任务消息(请求和答复), 系统会使用rabbitmq.proto的配置参数,
  72. Error_manager encapsulate_task_msg(std::string message, int vector_index = 0);
  73. //手动封装状态消息, 系统会使用rabbitmq.proto的配置参数,
  74. Error_manager encapsulate_status_msg(std::string message, int vector_index = 0);
  75. //ack_msg 处理完消息后, 手动确认消息, 通知服务器队列删除消息.
  76. //执行者在execute_msg里面可以调用这个函数, 或者回调也行.
  77. Error_manager ack_msg(Rabbitmq_message *p_msg);
  78. //设置 自动封装状态的时间周期, 可选(默认1000ms)
  79. void set_encapsulate_status_cycle_time(unsigned int encapsulate_status_cycle_time);
  80. //设置回调函数check_msg_callback
  81. void set_check_msg_callback(Error_manager (*callback)(Rabbitmq_message *p_msg));
  82. //设置回调函数check_executer_callback
  83. void set_check_executer_callback(Error_manager (*callback)(Rabbitmq_message *p_msg));
  84. //设置回调函数execute_msg_callback
  85. void set_execute_msg_callback(Error_manager (*callback)(Rabbitmq_message *p_msg));
  86. //设置回调函数encapsulate_status_callback
  87. void set_encapsulate_status_callback(Error_manager (*callback)());
  88. protected:
  89. //创建通道队列消费者, (交换机和永久队列不在代码里创建,请在服务器上手动创建)
  90. Error_manager rabbitmq_new_channel_queue_consume(Rabbitmq_proto::Rabbitmq_parameter_all &rabbitmq_parameter_all);
  91. //启动通信, 开启线程, run thread
  92. Error_manager rabbitmq_run();
  93. //mp_receive_analysis_thread 接受解析 执行函数,
  94. void receive_analysis_thread();
  95. //检查消息是否有效, 主要检查消息类型和接受者, 判断这条消息是不是给我的., 需要子类重载
  96. virtual Error_manager check_msg(Rabbitmq_message *p_msg);
  97. //检查执行者的状态, 判断能否处理这条消息, 需要子类重载
  98. virtual Error_manager check_executer(Rabbitmq_message *p_msg);
  99. //处理消息, 需要子类重载
  100. virtual Error_manager execute_msg(Rabbitmq_message *p_msg);
  101. //mp_send_thread 发送线程执行函数,
  102. void send_thread();
  103. //mp_encapsulate_stauts_thread 自动封装线程执行函数,
  104. void encapsulate_status_thread();
  105. //定时封装发送消息, 一般为心跳和状态信息, 需要子类重载
  106. virtual Error_manager auto_encapsulate_status();
  107. protected:
  108. //把rabbitmq的错误信息转化为string, amqp_status就是enum amqp_status_enum_, amqp_error_string2()函数可以把他翻译为string
  109. std::string amqp_error_to_string(int amqp_status);
  110. //把rabbitmq的错误信息转化为string, amqp_status就是enum amqp_status_enum_, amqp_error_string2()函数可以把他翻译为string
  111. std::string amqp_error_to_string(int amqp_status, std::string amqp_fun_name);
  112. //把rabbitmq的错误信息转化为string, amqp_rpc_reply_t就是amqp函数运行的结果
  113. std::string amqp_error_to_string(amqp_rpc_reply_t amqp_rpc_reply);
  114. //把rabbitmq的错误信息转化为string, amqp_rpc_reply_t就是amqp函数运行的结果
  115. std::string amqp_error_to_string(amqp_rpc_reply_t amqp_rpc_reply, std::string amqp_fun_name);
  116. protected://member variable
  117. Rabbitmq_status m_rabbitmq_status; //通信状态
  118. //rabbitmq网络通信 连接配置信息
  119. Rabbitmq_proto::Rabbitmq_parameter_all m_rabbitmq_parameter_all;
  120. amqp_connection_state_t_ *mp_connect; // 连接参数的结构体, 内存系统自动分配,自动释放
  121. amqp_socket_t *mp_socket; // 网口通信socket, 内存系统自动分配,自动释放
  122. std::string m_ip; //服务器ip地址, 不带端口
  123. int m_port; //端口,默认5672
  124. std::string m_user; //用户名, 默认guest
  125. std::string m_password; //密码, 默认guest
  126. std::mutex m_mutex; // socket的锁, 发送和接受的通信锁
  127. std::map<int, bool> m_channel_map; // 通道的缓存,防止重复开启
  128. // rabbitmq发送队列map
  129. std::map<const std::string, Rabbitmq_proto::Rabbitmq_channel_queue_consume> mp_rabbitmq_reciever;
  130. std::map<const std::string, Rabbitmq_proto::Rabbitmq_channel_queue_consume> mp_rabbitmq_sender_request;
  131. std::map<const std::string, Rabbitmq_proto::Rabbitmq_channel_queue_consume> mp_rabbitmq_sender_status;
  132. //接受模块,
  133. //rabbitmq 的服务器可以缓存消息, 如果子节点不接受, 数据仍然留在服务器上, 在需要的时候再去接受,不会丢包.
  134. //因此删除了接受线程和缓存list, 直接使用解析线程,一边接受一边处理,处理完后再接受下一条.
  135. std::thread *mp_receive_analysis_thread; //接受解析的线程指针
  136. Thread_condition m_receive_analysis_condition; //接受解析的条件变量
  137. //发送模块,
  138. Thread_safe_list<Rabbitmq_message *> m_send_list; //发送的list容器
  139. std::thread *mp_send_thread; //发送的线程指针
  140. Thread_condition m_send_condition; //发送的条件变量
  141. //自动发送状态的
  142. std::thread *mp_encapsulate_status_thread; //自动封装状态的线程指针
  143. Thread_condition m_encapsulate_status_condition; //自动封装状态的条件变量
  144. unsigned int m_encapsulate_status_cycle_time; //自动封装状态的时间周期
  145. //回调函数,
  146. // //可以选择设置回调函数,或者子类继承重载,二选一.
  147. Error_manager (*check_msg_callback)(Rabbitmq_message *p_msg);
  148. Error_manager (*check_executer_callback)(Rabbitmq_message *p_msg);
  149. Error_manager (*execute_msg_callback)(Rabbitmq_message *p_msg);
  150. Error_manager (*encapsulate_status_callback)();
  151. };
  152. #endif //__RIBBITMQ_BASE__HH__