rabbitmq_base.h 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. /*
  2. * rabbitmq_base 通信模块的基类,
  3. * 用户从这个基类继承, 初始化之后, 便可以自动进行通信
  4. * 重载解析消息和封装消息,
  5. nanosmg 必须实时接受消息, 如果不收可能就丢包, 所以本地开启了接受线程和解析线程.
  6. 接受线程实时接受,然后缓存到本地list, 然后解析线程再慢慢处理.
  7. rabbitmq 的服务器可以缓存消息, 如果子节点不接受, 数据仍然留在服务器上, 在需要的时候再去接受,不会丢包.
  8. 因此删除了接受线程和缓存list, 直接使用解析线程,一边接受一边处理,处理完后再接受下一条.
  9. * */
  10. #ifndef __RIBBITMQ_BASE__HH__
  11. #define __RIBBITMQ_BASE__HH__
  12. #include <mutex>
  13. #include <thread>
  14. #include <cstdint>
  15. #include <cstdio>
  16. #include <cstdlib>
  17. #include <cstring>
  18. #include <ctime>
  19. #include <map>
  20. #include "tool/static_tool.hpp"
  21. #include <rabbitmq-c/amqp.h>
  22. #include <rabbitmq-c/tcp_socket.h>
  23. #include <cassert>
  24. #include <glog/logging.h>
  25. #include "error_code/error_code.hpp"
  26. #include "tool/binary_buf.h"
  27. #include "tool/thread_safe_list.hpp"
  28. #include "tool/thread_condition.h"
  29. #include "rabbitmq/rabbitmq.pb.h"
  30. #include "rabbitmq/rabbitmq_message.h"
  31. //rabbitmq初始化配置参数的默认路径
  32. #define RABBITMQ_PARAMETER_PATH "../etc/rabbitmq.prototxt"
  33. //amqp_basic_qos设置通道每次只能接受PREFETCH_COUNT条消息, 默认每次只能同时接受1条消息
  34. #define PREFETCH_COUNT 1
  35. class Rabbitmq_base {
  36. //通信状态
  37. enum Rabbitmq_status {
  38. RABBITMQ_STATUS_UNKNOW = 0, //通信状态 未知
  39. RABBITMQ_STATUS_READY = 1, //通信状态 正常
  40. RABBITMQ_STATUS_DISCONNECT = 11, //通信状态 断连(可能会在断连和重连之间反复跳动)
  41. RABBITMQ_STATUS_RECONNNECT = 12, //通信状态 重连(可能会在断连和重连之间反复跳动)
  42. RABBITMQ_STATUS_FAULT = 100, //通信状态 错误
  43. };
  44. public:
  45. Rabbitmq_base();
  46. Rabbitmq_base(const Rabbitmq_base &other) = delete;
  47. Rabbitmq_base &operator=(const Rabbitmq_base &other) = delete;
  48. ~Rabbitmq_base();
  49. public://API functions
  50. //初始化 通信 模块。如下三选一
  51. Error_manager rabbitmq_init();
  52. //初始化 通信 模块。从文件读取
  53. Error_manager rabbitmq_init_from_protobuf(std::string prototxt_path);
  54. //初始化 通信 模块。从protobuf读取
  55. Error_manager rabbitmq_init_from_protobuf(Rabbitmq_proto::Rabbitmq_parameter_all &rabbitmq_parameter_all);
  56. //反初始化 通信 模块。
  57. Error_manager rabbitmq_uninit();
  58. //重连, 快速uninit, init
  59. Error_manager rabbitmq_reconnnect();
  60. //手动封装消息, 如下四选一
  61. //手动封装消息,需要手动写入参数channel,exchange_name,routing_key
  62. Error_manager encapsulate_msg(std::string message, int channel, std::string exchange_name, std::string routing_key,
  63. int timeout_ms);
  64. //手动封装消息,需要手动写入参数channel,exchange_name,routing_key
  65. Error_manager encapsulate_msg(Rabbitmq_message *p_msg);
  66. //手动封装任务消息(请求和答复), 系统会使用rabbitmq.proto的配置参数,
  67. Error_manager encapsulate_task_msg(std::string message, int vector_index = 0);
  68. //手动封装状态消息, 系统会使用rabbitmq.proto的配置参数,
  69. Error_manager encapsulate_status_msg(std::string message, int vector_index = 0);
  70. //ack_msg 处理完消息后, 手动确认消息, 通知服务器队列删除消息.
  71. //执行者在execute_msg里面可以调用这个函数, 或者回调也行.
  72. Error_manager ack_msg(Rabbitmq_message *p_msg);
  73. //设置 自动封装状态的时间周期, 可选(默认1000ms)
  74. void set_encapsulate_status_cycle_time(unsigned int encapsulate_status_cycle_time);
  75. //设置回调函数check_msg_callback
  76. void set_check_msg_callback(Error_manager (*callback)(Rabbitmq_message *p_msg));
  77. //设置回调函数check_executer_callback
  78. void set_check_executer_callback(Error_manager (*callback)(Rabbitmq_message *p_msg));
  79. //设置回调函数execute_msg_callback
  80. void set_execute_msg_callback(Error_manager (*callback)(Rabbitmq_message *p_msg));
  81. //设置回调函数encapsulate_status_callback
  82. void set_encapsulate_status_callback(Error_manager (*callback)());
  83. protected:
  84. //创建通道队列消费者, (交换机和永久队列不在代码里创建,请在服务器上手动创建)
  85. Error_manager rabbitmq_new_channel_queue_consume(Rabbitmq_proto::Rabbitmq_parameter_all &rabbitmq_parameter_all);
  86. //启动通信, 开启线程, run thread
  87. Error_manager rabbitmq_run();
  88. //mp_receive_analysis_thread 接受解析 执行函数,
  89. void receive_analysis_thread();
  90. //检查消息是否有效, 主要检查消息类型和接受者, 判断这条消息是不是给我的., 需要子类重载
  91. virtual Error_manager check_msg(Rabbitmq_message *p_msg);
  92. //检查执行者的状态, 判断能否处理这条消息, 需要子类重载
  93. virtual Error_manager check_executer(Rabbitmq_message *p_msg);
  94. //处理消息, 需要子类重载
  95. virtual Error_manager execute_msg(Rabbitmq_message *p_msg);
  96. //mp_send_thread 发送线程执行函数,
  97. void send_thread();
  98. //mp_encapsulate_stauts_thread 自动封装线程执行函数,
  99. void encapsulate_status_thread();
  100. //定时封装发送消息, 一般为心跳和状态信息, 需要子类重载
  101. virtual Error_manager auto_encapsulate_status();
  102. protected:
  103. //把rabbitmq的错误信息转化为string, amqp_status就是enum amqp_status_enum_, amqp_error_string2()函数可以把他翻译为string
  104. std::string amqp_error_to_string(int amqp_status);
  105. //把rabbitmq的错误信息转化为string, amqp_status就是enum amqp_status_enum_, amqp_error_string2()函数可以把他翻译为string
  106. std::string amqp_error_to_string(int amqp_status, std::string amqp_fun_name);
  107. //把rabbitmq的错误信息转化为string, amqp_rpc_reply_t就是amqp函数运行的结果
  108. std::string amqp_error_to_string(amqp_rpc_reply_t amqp_rpc_reply);
  109. //把rabbitmq的错误信息转化为string, amqp_rpc_reply_t就是amqp函数运行的结果
  110. std::string amqp_error_to_string(amqp_rpc_reply_t amqp_rpc_reply, std::string amqp_fun_name);
  111. protected://member variable
  112. Rabbitmq_status m_rabbitmq_status; //通信状态
  113. //rabbitmq网络通信 连接配置信息
  114. Rabbitmq_proto::Rabbitmq_parameter_all m_rabbitmq_parameter_all;
  115. amqp_connection_state_t_ *mp_connect; // 连接参数的结构体, 内存系统自动分配,自动释放
  116. amqp_socket_t *mp_socket; // 网口通信socket, 内存系统自动分配,自动释放
  117. std::string m_ip; //服务器ip地址, 不带端口
  118. int m_port; //端口,默认5672
  119. std::string m_user; //用户名, 默认guest
  120. std::string m_password; //密码, 默认guest
  121. std::mutex m_mutex; // socket的锁, 发送和接受的通信锁
  122. std::map<int, bool> m_channel_map; // 通道的缓存,防止重复开启
  123. // rabbitmq发送队列map
  124. std::map<const std::string, Rabbitmq_proto::Rabbitmq_channel_queue_consume> mp_rabbitmq_reciever;
  125. std::map<const std::string, Rabbitmq_proto::Rabbitmq_channel_queue_consume> mp_rabbitmq_sender_request;
  126. std::map<const std::string, Rabbitmq_proto::Rabbitmq_channel_queue_consume> mp_rabbitmq_sender_status;
  127. //接受模块,
  128. //rabbitmq 的服务器可以缓存消息, 如果子节点不接受, 数据仍然留在服务器上, 在需要的时候再去接受,不会丢包.
  129. //因此删除了接受线程和缓存list, 直接使用解析线程,一边接受一边处理,处理完后再接受下一条.
  130. std::thread *mp_receive_analysis_thread; //接受解析的线程指针
  131. Thread_condition m_receive_analysis_condition; //接受解析的条件变量
  132. //发送模块,
  133. Thread_safe_list<Rabbitmq_message *> m_send_list; //发送的list容器
  134. std::thread *mp_send_thread; //发送的线程指针
  135. Thread_condition m_send_condition; //发送的条件变量
  136. //自动发送状态的
  137. std::thread *mp_encapsulate_status_thread; //自动封装状态的线程指针
  138. Thread_condition m_encapsulate_status_condition; //自动封装状态的条件变量
  139. unsigned int m_encapsulate_status_cycle_time; //自动封装状态的时间周期
  140. //回调函数,
  141. // //可以选择设置回调函数,或者子类继承重载,二选一.
  142. Error_manager (*check_msg_callback)(Rabbitmq_message *p_msg);
  143. Error_manager (*check_executer_callback)(Rabbitmq_message *p_msg);
  144. Error_manager (*execute_msg_callback)(Rabbitmq_message *p_msg);
  145. Error_manager (*encapsulate_status_callback)();
  146. };
  147. #endif //__RIBBITMQ_BASE__HH__