rabbitmq_base.h 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. /*
  2. * rabbitmq_base 通信模块的基类,
  3. * 用户从这个基类继承, 初始化之后, 便可以自动进行通信
  4. * 重载解析消息和封装消息,
  5. nanosmg 必须实时接受消息, 如果不收可能就丢包, 所以本地开启了接受线程和解析线程.
  6. 接受线程实时接受,然后缓存到本地list, 然后解析线程再慢慢处理.
  7. rabbitmq 的服务器可以缓存消息, 如果子节点不接受, 数据仍然留在服务器上, 在需要的时候再去接受,不会丢包.
  8. 因此删除了接受线程和缓存list, 直接使用解析线程,一边接受一边处理,处理完后再接受下一条.
  9. * */
  10. #ifndef __RIBBITMQ_BASE__HH__
  11. #define __RIBBITMQ_BASE__HH__
  12. #include <mutex>
  13. #include <thread>
  14. #include <stdint.h>
  15. #include <stdio.h>
  16. #include <stdlib.h>
  17. #include <string.h>
  18. #include <time.h>
  19. #include <map>
  20. #include "../tool/proto_tool.h"
  21. //#ifdef _WIN32
  22. //#ifndef WIN32_LEAN_AND_MEAN
  23. //#define WIN32_LEAN_AND_MEAN
  24. //#endif
  25. //#include <Winsock2.h>
  26. //#else
  27. //#include <sys/time.h>
  28. //#endif
  29. #include <string>
  30. #include <rabbitmq-c/amqp.h>
  31. #include <rabbitmq-c/tcp_socket.h>
  32. #include <assert.h>
  33. #include <glog/logging.h>
  34. #include "../error_code/error_code.h"
  35. #include "../tool/binary_buf.h"
  36. #include "../tool/thread_safe_list.h"
  37. #include "../tool/thread_condition.h"
  38. #include "../message/message_base.pb.h"
  39. #include "../rabbitmq/rabbitmq.pb.h"
  40. #include "../rabbitmq/rabbitmq_message.h"
  41. //rabbitmq初始化配置参数的默认路径
  42. #define RABBITMQ_PARAMETER_PATH "../setting/rabbitmq.prototxt"
  43. //amqp_basic_qos设置通道每次只能接受PREFETCH_COUNT条消息, 默认每次只能同时接受1条消息
  44. #define PREFETCH_COUNT 1
  45. class Rabbitmq_base
  46. {
  47. //通信状态
  48. enum Rabbitmq_status
  49. {
  50. RABBITMQ_STATUS_UNKNOW = 0, //通信状态 未知
  51. RABBITMQ_STATUS_READY = 1, //通信状态 正常
  52. RABBITMQ_STATUS_DISCONNECT = 11, //通信状态 断连(可能会在断连和重连之间反复跳动)
  53. RABBITMQ_STATUS_RECONNNECT = 12, //通信状态 重连(可能会在断连和重连之间反复跳动)
  54. RABBITMQ_STATUS_FAULT = 100, //通信状态 错误
  55. };
  56. public:
  57. Rabbitmq_base();
  58. Rabbitmq_base(const Rabbitmq_base &other) = delete;
  59. Rabbitmq_base &operator=(const Rabbitmq_base &other) = delete;
  60. ~Rabbitmq_base();
  61. public://API functions
  62. //初始化 通信 模块。如下三选一
  63. Error_manager rabbitmq_init();
  64. //初始化 通信 模块。从文件读取
  65. Error_manager rabbitmq_init_from_protobuf(std::string prototxt_path);
  66. //初始化 通信 模块。从protobuf读取
  67. Error_manager rabbitmq_init_from_protobuf(Rabbitmq_proto::Rabbitmq_parameter_all &rabbitmq_parameter_all);
  68. //反初始化 通信 模块。
  69. Error_manager rabbitmq_uninit();
  70. //重连, 快速uninit, init
  71. Error_manager rabbitmq_reconnnect();
  72. //手动封装消息, 如下四选一
  73. //手动封装消息,需要手动写入参数channel,exchange_name,routing_key
  74. Error_manager encapsulate_msg(std::string message, int channel, std::string exchange_name, std::string routing_key, int timeout_ms);
  75. //手动封装消息,需要手动写入参数channel,exchange_name,routing_key
  76. Error_manager encapsulate_msg(Rabbitmq_message* p_msg);
  77. //手动封装任务消息(请求和答复), 系统会使用rabbitmq.proto的配置参数,
  78. Error_manager encapsulate_task_msg(std::string message, int vector_index=0);
  79. //手动封装状态消息, 系统会使用rabbitmq.proto的配置参数,
  80. Error_manager encapsulate_status_msg(std::string message, int vector_index=0);
  81. //ack_msg 处理完消息后, 手动确认消息, 通知服务器队列删除消息.
  82. //执行者在execute_msg里面可以调用这个函数, 或者回调也行.
  83. Error_manager ack_msg(Rabbitmq_message* p_msg);
  84. //设置 自动封装状态的时间周期, 可选(默认1000ms)
  85. void set_encapsulate_status_cycle_time(unsigned int encapsulate_status_cycle_time);
  86. //设置回调函数check_msg_callback
  87. void set_check_msg_callback(Error_manager (*callback)(Rabbitmq_message* p_msg));
  88. //设置回调函数check_executer_callback
  89. void set_check_executer_callback(Error_manager (*callback)(Rabbitmq_message* p_msg));
  90. //设置回调函数execute_msg_callback
  91. void set_execute_msg_callback(Error_manager (*callback)(Rabbitmq_message* p_msg));
  92. //设置回调函数encapsulate_status_callback
  93. void set_encapsulate_status_callback(Error_manager (*callback)());
  94. protected:
  95. //创建通道队列消费者, (交换机和永久队列不在代码里创建,请在服务器上手动创建)
  96. Error_manager rabbitmq_new_channel_queue_consume(Rabbitmq_proto::Rabbitmq_parameter_all &rabbitmq_parameter_all);
  97. //启动通信, 开启线程, run thread
  98. Error_manager rabbitmq_run();
  99. //mp_receive_analysis_thread 接受解析 执行函数,
  100. void receive_analysis_thread();
  101. //检查消息是否有效, 主要检查消息类型和接受者, 判断这条消息是不是给我的., 需要子类重载
  102. virtual Error_manager check_msg(Rabbitmq_message* p_msg);
  103. //检查执行者的状态, 判断能否处理这条消息, 需要子类重载
  104. virtual Error_manager check_executer(Rabbitmq_message* p_msg);
  105. //处理消息, 需要子类重载
  106. virtual Error_manager execute_msg(Rabbitmq_message* p_msg);
  107. //mp_send_thread 发送线程执行函数,
  108. void send_thread();
  109. //mp_encapsulate_stauts_thread 自动封装线程执行函数,
  110. void encapsulate_status_thread();
  111. //定时封装发送消息, 一般为心跳和状态信息, 需要子类重载
  112. virtual Error_manager auto_encapsulate_status();
  113. protected:
  114. //把rabbitmq的错误信息转化为string, amqp_status就是enum amqp_status_enum_, amqp_error_string2()函数可以把他翻译为string
  115. std::string amqp_error_to_string(int amqp_status);
  116. //把rabbitmq的错误信息转化为string, amqp_status就是enum amqp_status_enum_, amqp_error_string2()函数可以把他翻译为string
  117. std::string amqp_error_to_string(int amqp_status, std::string amqp_fun_name);
  118. //把rabbitmq的错误信息转化为string, amqp_rpc_reply_t就是amqp函数运行的结果
  119. std::string amqp_error_to_string(amqp_rpc_reply_t amqp_rpc_reply);
  120. //把rabbitmq的错误信息转化为string, amqp_rpc_reply_t就是amqp函数运行的结果
  121. std::string amqp_error_to_string(amqp_rpc_reply_t amqp_rpc_reply, std::string amqp_fun_name);
  122. protected://member variable
  123. Rabbitmq_status m_rabbitmq_status; //通信状态
  124. //rabbitmq网络通信 连接配置信息
  125. Rabbitmq_proto::Rabbitmq_parameter_all m_rabbitmq_parameter_all;
  126. amqp_connection_state_t_ * mp_connect; // 连接参数的结构体, 内存系统自动分配,自动释放
  127. amqp_socket_t * mp_socket ; // 网口通信socket, 内存系统自动分配,自动释放
  128. std::string m_ip; //服务器ip地址, 不带端口
  129. int m_port; //端口,默认5672
  130. std::string m_user; //用户名, 默认guest
  131. std::string m_password; //密码, 默认guest
  132. std::mutex m_mutex; // socket的锁, 发送和接受的通信锁
  133. std::map<int, bool> m_channel_map; // 通道的缓存,防止重复开启
  134. //接受模块,
  135. //rabbitmq 的服务器可以缓存消息, 如果子节点不接受, 数据仍然留在服务器上, 在需要的时候再去接受,不会丢包.
  136. //因此删除了接受线程和缓存list, 直接使用解析线程,一边接受一边处理,处理完后再接受下一条.
  137. std::thread* mp_receive_analysis_thread; //接受解析的线程指针
  138. Thread_condition m_receive_analysis_condition; //接受解析的条件变量
  139. //发送模块,
  140. Thread_safe_list<Rabbitmq_message*> m_send_list; //发送的list容器
  141. std::thread* mp_send_thread; //发送的线程指针
  142. Thread_condition m_send_condition; //发送的条件变量
  143. //自动发送状态的
  144. std::thread* mp_encapsulate_status_thread; //自动封装状态的线程指针
  145. Thread_condition m_encapsulate_status_condition; //自动封装状态的条件变量
  146. unsigned int m_encapsulate_status_cycle_time; //自动封装状态的时间周期
  147. //回调函数,
  148. // //可以选择设置回调函数,或者子类继承重载,二选一.
  149. Error_manager (*check_msg_callback)(Rabbitmq_message* p_msg);
  150. Error_manager (*check_executer_callback)(Rabbitmq_message* p_msg);
  151. Error_manager (*execute_msg_callback)(Rabbitmq_message* p_msg);
  152. Error_manager (*encapsulate_status_callback)();
  153. };
  154. #endif //__RIBBITMQ_BASE__HH__