rabbitmq_base.cpp 54 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985
  1. #include "rabbitmq_base.h"
  2. Rabbitmq_base::Rabbitmq_base() {
  3. m_rabbitmq_status = RABBITMQ_STATUS_UNKNOW;
  4. mp_connect = NULL;
  5. mp_socket = NULL;
  6. m_port = 0;
  7. mp_receive_analysis_thread = NULL;
  8. mp_send_thread = NULL;
  9. mp_encapsulate_status_thread = NULL;
  10. m_encapsulate_status_cycle_time = 100;//默认1000ms,就自动封装一次状态信息
  11. check_msg_callback = NULL;
  12. check_executer_callback = NULL;
  13. execute_msg_callback = NULL;
  14. encapsulate_status_callback = NULL;
  15. }
  16. Rabbitmq_base::~Rabbitmq_base() {
  17. rabbitmq_uninit();
  18. }
  19. //初始化 通信 模块。如下三选一
  20. Error_manager Rabbitmq_base::rabbitmq_init() {
  21. return rabbitmq_init_from_protobuf(RABBITMQ_PARAMETER_PATH);
  22. }
  23. //初始化 通信 模块。从文件读取
  24. Error_manager Rabbitmq_base::rabbitmq_init_from_protobuf(std::string prototxt_path) {
  25. Rabbitmq_proto::Rabbitmq_parameter_all t_rabbitmq_parameter_all;
  26. if (loadProtobufFile(prototxt_path, t_rabbitmq_parameter_all) != SUCCESS) {
  27. return Error_manager(RABBITMQ_READ_PROTOBUF_ERROR, MINOR_ERROR,
  28. "rabbitmq_init_from_file: %s read_proto_param failed", prototxt_path.c_str());
  29. }
  30. return rabbitmq_init_from_protobuf(t_rabbitmq_parameter_all);
  31. }
  32. //初始化 通信 模块。从protobuf读取
  33. Error_manager
  34. Rabbitmq_base::rabbitmq_init_from_protobuf(Rabbitmq_proto::Rabbitmq_parameter_all &rabbitmq_parameter_all) {
  35. LOG(INFO) << " ---Rabbitmq_base::rabbitmq_init_from_protobuf() run--- " << this;
  36. int t_status = 0; //状态
  37. amqp_rpc_reply_t t_reply; //reply答复结果
  38. Error_manager t_error;
  39. m_rabbitmq_parameter_all = rabbitmq_parameter_all;
  40. for (auto &queue:rabbitmq_parameter_all.rabbitmq_parameters().rabbitmq_reciever_vector()) {
  41. mp_rabbitmq_reciever.insert(std::pair<const std::string, Rabbitmq_proto::Rabbitmq_channel_queue_consume>(queue.routing_key(), queue));
  42. }
  43. for (auto &queue:rabbitmq_parameter_all.rabbitmq_parameters().rabbitmq_sender_request_vector()) {
  44. mp_rabbitmq_reciever.insert(std::pair<const std::string, Rabbitmq_proto::Rabbitmq_channel_queue_consume>(queue.routing_key(), queue));
  45. }
  46. for (auto &queue:rabbitmq_parameter_all.rabbitmq_parameters().rabbitmq_sender_status_vector()) {
  47. mp_rabbitmq_reciever.insert(std::pair<const std::string, Rabbitmq_proto::Rabbitmq_channel_queue_consume>(queue.routing_key(), queue));
  48. }
  49. //amqp_new_connection 新建amqp的连接配置,里面只有连接状态参数
  50. // 返回amqp_connection_state_t_ *, 函数内部分配内存, amqp_destroy_connection()可以释放内存, 内存不为空则成功
  51. mp_connect = amqp_new_connection();
  52. if (mp_connect == nullptr) {
  53. return Error_manager(Error_code::RABBITMQ_AMQP_NEW_CONNECTION_ERROR, Error_level::MINOR_ERROR,
  54. "amqp_new_connection fun error ");
  55. }
  56. //amqp_tcp_socket_new 新建tcp_socket连接
  57. // 返回amqp_socket_t *, 函数内部分配内存, amqp_connection_close()可以释放内存, 内存不为空则成功
  58. mp_socket = amqp_tcp_socket_new(mp_connect);
  59. if (mp_socket == nullptr) {
  60. return Error_manager(Error_code::RABBITMQ_AMQP_TCP_SOCKET_NEW_ERROR, Error_level::MINOR_ERROR,
  61. "amqp_tcp_socket_new fun error ");
  62. }
  63. //载入外部参数
  64. if (rabbitmq_parameter_all.rabbitmq_parameters().has_ip() &&
  65. rabbitmq_parameter_all.rabbitmq_parameters().has_port() &&
  66. rabbitmq_parameter_all.rabbitmq_parameters().has_user() &&
  67. rabbitmq_parameter_all.rabbitmq_parameters().has_password()) {
  68. m_ip = rabbitmq_parameter_all.rabbitmq_parameters().ip();
  69. m_port = rabbitmq_parameter_all.rabbitmq_parameters().port();
  70. m_user = rabbitmq_parameter_all.rabbitmq_parameters().user();
  71. m_password = rabbitmq_parameter_all.rabbitmq_parameters().password();
  72. } else {
  73. return Error_manager(Error_code::RABBITMQ_PROTOBUF_LOSS_ERROR, Error_level::MINOR_ERROR,
  74. " rabbitmq_parameter_all.rabbitmq_parameters() The data is not complete ");
  75. }
  76. //amqp_socket_open 打开socket连接, 输入ip和port,
  77. // 成功返回AMQP_STATUS_OK = 0x0, 失败返回错误状态码, 详见 enum amqp_status_enum_
  78. //只需要设置配置服务器的ip和port, 不需要配置子节点客户端的ip和port, 在后面配置channel通道时,进行设置.
  79. t_status = amqp_socket_open(mp_socket, m_ip.c_str(), m_port);
  80. if (t_status != AMQP_STATUS_OK) {
  81. return Error_manager(Error_code::RABBITMQ_AMQP_SOCKET_OPEN_ERROR, Error_level::MINOR_ERROR,
  82. amqp_error_to_string(t_status, "amqp_socket_open"));
  83. }
  84. //amqp_login() 登录代理服务器,
  85. //输入 连接参数结构体 amqp_connection_state_t,
  86. //输入 连接地址, 前面 amqp_socket_open() 已经输入了,这里默认写"/"
  87. //输入 连接通道最大值, 默认值0表示没有限制
  88. //输入 连接帧率最大值, 默认值是131072 (128KB)
  89. //输入 心跳帧之间的秒数, 默认值0禁用心跳
  90. //输入 身份验证模式, AMQP_SASL_METHOD_PLAIN, 追加用户名和密码
  91. // AMQP_SASL_METHOD_EXTERNAL, 追加身份证
  92. //返回 结果的结构体 amqp_rpc_reply_t
  93. // amqp_response_type_enum reply_type 登录成功是 AMQP_RESPONSE_NORMAL
  94. // 失败:如果是 reply_type == AMQP_RESPONSE_SERVER_EXCEPTION, 服务器连接错误, 错误信息在 amqp_method_t reply
  95. // 失败:如果是 reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION, 库函数错误, 错误信息在 int library_error
  96. t_reply = amqp_login(mp_connect, "/", 0, 131072, 0,
  97. AMQP_SASL_METHOD_PLAIN, m_user.c_str(), m_password.c_str());
  98. if (t_reply.reply_type != AMQP_RESPONSE_NORMAL) {
  99. return Error_manager(Error_code::RABBITMQ_AMQP_LOGIN_ERROR, Error_level::MINOR_ERROR,
  100. amqp_error_to_string(t_reply, "amqp_login"));
  101. }
  102. //清除channel_map, 通道的缓存,防止重复开启, (channel允许重复使用, 但是不能重复初始化)
  103. m_channel_map.clear();
  104. //创建通道队列消费者, (交换机和永久队列不在代码里创建,请在服务器上手动创建)
  105. t_error = rabbitmq_new_channel_queue_consume(rabbitmq_parameter_all);
  106. if (t_error != Error_code::SUCCESS) {
  107. return t_error;
  108. }
  109. //启动通信, 开启线程, run thread
  110. t_error = rabbitmq_run();
  111. if (t_error != Error_code::SUCCESS) {
  112. return t_error;
  113. }
  114. return Error_code::SUCCESS;
  115. }
  116. //创建通道队列消费者, (交换机和永久队列不在代码里创建,请在服务器上手动创建)
  117. Error_manager
  118. Rabbitmq_base::rabbitmq_new_channel_queue_consume(Rabbitmq_proto::Rabbitmq_parameter_all &rabbitmq_parameter_all) {
  119. int t_status = 0; //状态
  120. amqp_rpc_reply_t t_reply; //reply答复结果
  121. Error_manager t_error;
  122. ///Rabbitmq 接受的通道,队列和消费者, 多个
  123. for (int i = 0; i < rabbitmq_parameter_all.rabbitmq_parameters().rabbitmq_reciever_vector_size(); ++i) {
  124. //Rabbitmq 配置的通道,队列和消费者,
  125. Rabbitmq_proto::Rabbitmq_channel_queue_consume t_inf =
  126. rabbitmq_parameter_all.rabbitmq_parameters().rabbitmq_reciever_vector(i);
  127. //通道查重,防止重复开启(channel允许重复使用, 但是不能重复初始化)
  128. if (m_channel_map.find(t_inf.channel()) == m_channel_map.end()) {
  129. //amqp_channel_open() 打开连接通道, 同一台电脑可以多个进程和线程进行连接服务器, 每个连接需要自己独特的通道.
  130. amqp_channel_open(mp_connect, t_inf.channel());
  131. //amqp_get_rpc_reply() 获取当前网络连接的状态结果.
  132. t_reply = amqp_get_rpc_reply(mp_connect);
  133. if (t_reply.reply_type != AMQP_RESPONSE_NORMAL) {
  134. return Error_manager(Error_code::RABBITMQ_AMQP_CHANNEL_OPEN_ERROR, Error_level::MINOR_ERROR,
  135. amqp_error_to_string(t_reply, "amqp_channel_open"));
  136. }
  137. if (t_inf.consume_no_ack() == 0) {
  138. //amqp_basic_qos设置通道每次只能接受一条消息, 直到该消息被ack,才能接受下一条.状态消息可以继续接受
  139. //uint16_t prefetch_count 同时接受消息的个数, 这里固定写1,
  140. //配合 amqp_basic_qos 和 amqp_basic_ack , 来阻塞这个通道的接受消息
  141. //注:请求消息no_ack==0, 当接受一条指令后,该通道被阻塞,其他通道仍然正常接受, 等到任务被执行完,手动调用amqp_basic_ack函数, 则可以继续接受请求消息.
  142. //注:状态消息no_ack==1, 当接受一条指令后,该状态消息立刻被删除,然后可以继续接受下一条状态消息.
  143. amqp_basic_qos(mp_connect, t_inf.channel(), 0, PREFETCH_COUNT, 0);
  144. }
  145. m_channel_map[t_inf.channel()] = true;
  146. }
  147. //临时队列需要代码创建, 永久队列需要在服务器上提前手动创建
  148. if (t_inf.queue_durable() == 0) {
  149. //目前只填充超时时间, x-message-ttl 队列接受消息 的超时时间 (单位毫秒)
  150. if (t_inf.queue_meassage_ttl() != 0) {
  151. amqp_table_t t_arguments; //队列的扩展属性 num_entries 是map长度, amqp_table_entry_t_ 是map指针
  152. //目前只填充超时时间, x-message-ttl 队列接受消息 的超时时间 (单位毫秒)
  153. t_arguments.num_entries = 1;
  154. amqp_table_entry_t_ t_map_arg;
  155. t_map_arg.key = amqp_cstring_bytes("x-message-ttl"); //需要配置的参数
  156. t_map_arg.value.kind = AMQP_FIELD_KIND_U16; //需要配置的数据类型, 如果是字符串, 写 AMQP_FIELD_KIND_UTF8
  157. t_map_arg.value.value.u16 = t_inf.queue_meassage_ttl(); //需要配置的数值
  158. t_arguments.entries = &t_map_arg;
  159. //amqp_queue_declare() 队列声明, 就是创建新的队列.
  160. //输入 amqp_connection_state_t state 连接状态参数的结构体
  161. //输入 amqp_channel_t channel 连接通道的编号
  162. //输入 amqp_bytes_t queue 队列名称,可以手动命名,如果写空,系统就会自动分配, 手动写amqp_cstring_bytes("abcdefg"), 默认空 amqp_empty_bytes
  163. //输入 amqp_boolean_t passive 是否被动,默认0
  164. //输入 amqp_boolean_t durable 是否持久,默认0, 节点代码可以创建临时队列(所有权归节点), 服务器手动创建永久队列(所有权归服务器)
  165. // 1表示永久队列,当节点死掉,队列在服务器保留,仍然可以接受数据,节点上线后,可以接受掉线期间的所有数据
  166. // 0表示临时队列,当节点死掉,队列消失,不再接受数据,直到下次恢复正常
  167. //输入 amqp_boolean_t exclusive 是否独立,默认0
  168. //输入 amqp_boolean_t auto_delete 是否自动删除,默认0, 1表示消息被消费者接受后,就自动删除消息, 当接收端断连后,队列也会才删除,
  169. // 一般情况下设为0,然后让接受者手动删除.
  170. //输入 amqp_table_t arguments 预留参数,默认空 amqp_empty_table
  171. //返回 amqp_queue_declare_ok_t * 返回结果
  172. amqp_queue_declare(mp_connect, t_inf.channel(), amqp_cstring_bytes(t_inf.queue_name().c_str()),
  173. t_inf.queue_passive(), t_inf.queue_durable(), t_inf.queue_exclusive(),
  174. t_inf.queue_auto_delete(), t_arguments);
  175. } else {
  176. //amqp_queue_declare() 队列声明, 就是创建新的队列.
  177. //输入 amqp_connection_state_t state 连接状态参数的结构体
  178. //输入 amqp_channel_t channel 连接通道的编号
  179. //输入 amqp_bytes_t queue 队列名称,可以手动命名,如果写空,系统就会自动分配, 手动写amqp_cstring_bytes("abcdefg"), 默认空 amqp_empty_bytes
  180. //输入 amqp_boolean_t passive 是否被动,默认0
  181. //输入 amqp_boolean_t durable 是否持久,默认0, 节点代码可以创建临时队列(所有权归节点), 服务器手动创建永久队列(所有权归服务器)
  182. // 1表示永久队列,当节点死掉,队列在服务器保留,仍然可以接受数据,节点上线后,可以接受掉线期间的所有数据
  183. // 0表示临时队列,当节点死掉,队列消失,不再接受数据,直到下次恢复正常
  184. //输入 amqp_boolean_t exclusive 是否独立,默认0
  185. //输入 amqp_boolean_t auto_delete 是否自动删除,默认0, 1表示消息被消费者接受后,就自动删除消息, 当接收端断连后,队列也会才删除,
  186. // 一般情况下设为0,然后让接受者手动删除.
  187. //输入 amqp_table_t arguments 预留参数,默认空 amqp_empty_table
  188. //返回 amqp_queue_declare_ok_t * 返回结果
  189. amqp_queue_declare(mp_connect, t_inf.channel(), amqp_cstring_bytes(t_inf.queue_name().c_str()),
  190. t_inf.queue_passive(), t_inf.queue_durable(), t_inf.queue_exclusive(),
  191. t_inf.queue_auto_delete(), amqp_empty_table);
  192. }
  193. //amqp_queue_bind 队列绑定, 将队列加载到服务器的交换机下面, 交换机收到消息后,就会检查key,然后放到指定的队列.
  194. //输入 amqp_connection_state_t state 连接状态参数的结构体
  195. //输入 amqp_channel_t channel 连接通道的编号
  196. //输入 amqp_bytes_t queue 队列名称,
  197. //输入 amqp_bytes_t exchange 交换机模式字符串
  198. //输入 amqp_bytes_t bindingkey 绑定密钥字符串, 交换机的判断规则. 发送端的 routingkey 和 接收端的 bindingkey 需要保持一致
  199. //输入 amqp_table_t arguments 预留参数,默认空 amqp_empty_table
  200. //返回 amqp_queue_bind_ok_t * 返回结果
  201. //注注注注注意了, 队列绑定交换机时,必须保证交换机是有效的.否则报错
  202. amqp_queue_bind(mp_connect, t_inf.channel(), amqp_cstring_bytes(t_inf.queue_name().c_str()),
  203. amqp_cstring_bytes(t_inf.exchange_name().c_str()),
  204. amqp_cstring_bytes(t_inf.binding_key().c_str()), amqp_empty_table);
  205. amqp_rpc_reply_t t_reply = amqp_get_rpc_reply(mp_connect);
  206. if (t_reply.reply_type != AMQP_RESPONSE_NORMAL) {
  207. return Error_manager(Error_code::RABBITMQ_AMQP_QUEUE_BIND_ERROR, Error_level::MINOR_ERROR,
  208. amqp_error_to_string(t_reply, "amqp_queue_bind"));
  209. }
  210. }
  211. //amqp_basic_consume 创建基本类型的消费者,就是接收端, 消费者绑定队列,只能接受一个队列里面的消息
  212. //输入 amqp_connection_state_t state 连接状态参数的结构体
  213. //输入 amqp_channel_t channel 连接通道的编号
  214. //输入 amqp_bytes_t queue 队列名称,
  215. //输入 amqp_bytes_t consumer_tag 消费者名称
  216. //输入 amqp_boolean_t no_local 是否非本地, 默认0,表示本地
  217. //输入 amqp_boolean_t no_ack, 是否确认应答,默认0,表示接收后需要应答
  218. //输入 amqp_boolean_t exclusive 是否独立,默认0
  219. //输入 amqp_table_t arguments 预留参数,默认空 amqp_empty_table
  220. //返回 amqp_basic_consume_ok_t * 返回结果
  221. //注注注注注意了, 接受端绑定队列时,必须保证队列是有效的,否则报错,
  222. amqp_basic_consume(mp_connect, t_inf.channel(), amqp_cstring_bytes(t_inf.queue_name().c_str()),
  223. amqp_cstring_bytes(t_inf.consume_name().c_str()), t_inf.consume_no_local(),
  224. t_inf.consume_no_ack(), t_inf.consume_exclusive(), amqp_empty_table);
  225. amqp_rpc_reply_t t_reply = amqp_get_rpc_reply(mp_connect);
  226. if (t_reply.reply_type != AMQP_RESPONSE_NORMAL) {
  227. return Error_manager(Error_code::RABBITMQ_AMQP_NEW_CONSUME_ERROR, Error_level::MINOR_ERROR,
  228. amqp_error_to_string(t_reply, "amqp_basic_consume"));
  229. }
  230. }
  231. //Rabbitmq 发送请求的通道
  232. for (int i = 0; i < rabbitmq_parameter_all.rabbitmq_parameters().rabbitmq_sender_request_vector_size(); ++i) {
  233. //Rabbitmq 配置发送通道
  234. Rabbitmq_proto::Rabbitmq_channel_queue_consume t_inf1 =
  235. rabbitmq_parameter_all.rabbitmq_parameters().rabbitmq_sender_request_vector(i);
  236. //通道查重,防止重复开启(channel允许重复使用, 但是不能重复初始化)
  237. if (m_channel_map.find(t_inf1.channel()) == m_channel_map.end()) {
  238. //amqp_channel_open() 打开连接通道, 同一台电脑可以多个进程和线程进行连接服务器, 每个连接需要自己独特的通道.
  239. amqp_channel_open(mp_connect, t_inf1.channel());
  240. //amqp_get_rpc_reply() 获取当前网络连接的状态结果.
  241. t_reply = amqp_get_rpc_reply(mp_connect);
  242. if (t_reply.reply_type != AMQP_RESPONSE_NORMAL) {
  243. return Error_manager(Error_code::RABBITMQ_AMQP_CHANNEL_OPEN_ERROR, Error_level::MINOR_ERROR,
  244. amqp_error_to_string(t_reply, "amqp_channel_open"));
  245. }
  246. m_channel_map[t_inf1.channel()] = true;
  247. }
  248. }
  249. //Rabbitmq 发送状态的通道
  250. for (int i = 0; i < rabbitmq_parameter_all.rabbitmq_parameters().rabbitmq_sender_status_vector_size(); ++i) {
  251. //Rabbitmq 配置发送通道
  252. Rabbitmq_proto::Rabbitmq_channel_queue_consume t_inf2 =
  253. rabbitmq_parameter_all.rabbitmq_parameters().rabbitmq_sender_status_vector(i);
  254. //通道查重,防止重复开启(channel允许重复使用, 但是不能重复初始化)
  255. if (m_channel_map.find(t_inf2.channel()) == m_channel_map.end()) {
  256. //amqp_channel_open() 打开连接通道, 同一台电脑可以多个进程和线程进行连接服务器, 每个连接需要自己独特的通道.
  257. amqp_channel_open(mp_connect, t_inf2.channel());
  258. //amqp_get_rpc_reply() 获取当前网络连接的状态结果.
  259. t_reply = amqp_get_rpc_reply(mp_connect);
  260. if (t_reply.reply_type != AMQP_RESPONSE_NORMAL) {
  261. return Error_manager(Error_code::RABBITMQ_AMQP_CHANNEL_OPEN_ERROR, Error_level::MINOR_ERROR,
  262. amqp_error_to_string(t_reply, "amqp_channel_open"));
  263. }
  264. m_channel_map[t_inf2.channel()] = true;
  265. }
  266. }
  267. return Error_code::SUCCESS;
  268. }
  269. //启动通信, 开启线程, run thread
  270. Error_manager Rabbitmq_base::rabbitmq_run() {
  271. //启动 线程。
  272. //接受线程默认循环, 内部的 amqp_consume_message 进行等待, 超时1ms
  273. m_receive_analysis_condition.reset(false, true, false);
  274. mp_receive_analysis_thread = new std::thread(&Rabbitmq_base::receive_analysis_thread, this);
  275. //发送线程默认循环, 内部的wait_and_pop进行等待,
  276. m_send_condition.reset(false, true, false);
  277. mp_send_thread = new std::thread(&Rabbitmq_base::send_thread, this);
  278. //封装线程默认等待, ...., 超时1秒, 超时后主动 封装心跳和状态信息,
  279. m_encapsulate_status_condition.reset(false, false, false);
  280. mp_encapsulate_status_thread = new std::thread(&Rabbitmq_base::encapsulate_status_thread, this);
  281. m_rabbitmq_status = RABBITMQ_STATUS_READY;
  282. return Error_code::SUCCESS;
  283. }
  284. //反初始化 通信 模块。
  285. Error_manager Rabbitmq_base::rabbitmq_uninit() {
  286. LOG(INFO) << " ---Rabbitmq_base::rabbitmq_uninit() run--- " << this;
  287. //终止list,防止 wait_and_pop 阻塞线程。
  288. m_send_list.termination_list();
  289. //杀死线程,强制退出
  290. if (mp_receive_analysis_thread) {
  291. m_receive_analysis_condition.kill_all();
  292. }
  293. if (mp_send_thread) {
  294. m_send_condition.kill_all();
  295. }
  296. if (mp_encapsulate_status_thread) {
  297. m_encapsulate_status_condition.kill_all();
  298. }
  299. //回收线程的资源
  300. if (mp_receive_analysis_thread) {
  301. mp_receive_analysis_thread->join();
  302. delete mp_receive_analysis_thread;
  303. mp_receive_analysis_thread = NULL;
  304. }
  305. if (mp_send_thread) {
  306. mp_send_thread->join();
  307. delete mp_send_thread;
  308. mp_send_thread = NULL;
  309. }
  310. if (mp_encapsulate_status_thread) {
  311. mp_encapsulate_status_thread->join();
  312. delete mp_encapsulate_status_thread;
  313. mp_encapsulate_status_thread = NULL;
  314. }
  315. //清空list
  316. m_send_list.clear_and_delete();
  317. if (m_rabbitmq_status == RABBITMQ_STATUS_READY) {
  318. for (auto iter = m_channel_map.begin(); iter != m_channel_map.end(); ++iter) {
  319. amqp_channel_close(mp_connect, iter->first, AMQP_REPLY_SUCCESS);
  320. }
  321. amqp_connection_close(mp_connect, AMQP_REPLY_SUCCESS);
  322. amqp_destroy_connection(mp_connect);
  323. }
  324. m_rabbitmq_status = RABBITMQ_STATUS_UNKNOW;
  325. return Error_code::SUCCESS;
  326. }
  327. //重连, 快速uninit, init
  328. Error_manager Rabbitmq_base::rabbitmq_reconnnect() {
  329. //重连全程加锁,防止其他线程运行.
  330. std::unique_lock<std::mutex> lk(m_mutex);
  331. m_rabbitmq_status = RABBITMQ_STATUS_RECONNNECT;
  332. //断开连接
  333. for (auto iter = m_channel_map.begin(); iter != m_channel_map.end(); ++iter) {
  334. amqp_channel_close(mp_connect, iter->first, AMQP_REPLY_SUCCESS);
  335. }
  336. amqp_connection_close(mp_connect, AMQP_REPLY_SUCCESS);
  337. amqp_destroy_connection(mp_connect);
  338. //重新连接,线程不需要重启
  339. LOG(INFO) << " ---Rabbitmq_base::rabbitmq_reconnnect() run--- " << this;
  340. int t_status = 0; //状态
  341. amqp_rpc_reply_t t_reply; //reply答复结果
  342. Error_manager t_error;
  343. //amqp_new_connection 新建amqp的连接配置,里面只有连接状态参数
  344. // 返回amqp_connection_state_t_ *, 函数内部分配内存, amqp_destroy_connection()可以释放内存, 内存不为空则成功
  345. mp_connect = amqp_new_connection();
  346. if (mp_connect == NULL) {
  347. return Error_manager(Error_code::RABBITMQ_AMQP_NEW_CONNECTION_ERROR, Error_level::MINOR_ERROR,
  348. "amqp_new_connection fun error ");
  349. }
  350. //amqp_tcp_socket_new 新建tcp_socket连接
  351. // 返回amqp_socket_t *, 函数内部分配内存, amqp_connection_close()可以释放内存, 内存不为空则成功
  352. mp_socket = amqp_tcp_socket_new(mp_connect);
  353. if (mp_socket == NULL) {
  354. return Error_manager(Error_code::RABBITMQ_AMQP_TCP_SOCKET_NEW_ERROR, Error_level::MINOR_ERROR,
  355. "amqp_tcp_socket_new fun error ");
  356. }
  357. //载入外部参数
  358. if (m_rabbitmq_parameter_all.rabbitmq_parameters().has_ip() &&
  359. m_rabbitmq_parameter_all.rabbitmq_parameters().has_port() &&
  360. m_rabbitmq_parameter_all.rabbitmq_parameters().has_user() &&
  361. m_rabbitmq_parameter_all.rabbitmq_parameters().has_password()) {
  362. m_ip = m_rabbitmq_parameter_all.rabbitmq_parameters().ip();
  363. m_port = m_rabbitmq_parameter_all.rabbitmq_parameters().port();
  364. m_user = m_rabbitmq_parameter_all.rabbitmq_parameters().user();
  365. m_password = m_rabbitmq_parameter_all.rabbitmq_parameters().password();
  366. } else {
  367. return Error_manager(Error_code::RABBITMQ_PROTOBUF_LOSS_ERROR, Error_level::MINOR_ERROR,
  368. " rabbitmq_parameter_all.rabbitmq_parameters() The data is not complete ");
  369. }
  370. //amqp_socket_open 打开socket连接, 输入ip和port,
  371. // 成功返回AMQP_STATUS_OK = 0x0, 失败返回错误状态码, 详见 enum amqp_status_enum_
  372. //只需要设置配置服务器的ip和port, 不需要配置子节点客户端的ip和port, 在后面配置channel通道时,进行设置.
  373. t_status = amqp_socket_open(mp_socket, m_ip.c_str(), m_port);
  374. if (t_status != AMQP_STATUS_OK) {
  375. return Error_manager(Error_code::RABBITMQ_AMQP_SOCKET_OPEN_ERROR, Error_level::MINOR_ERROR,
  376. amqp_error_to_string(t_status, "amqp_socket_open"));
  377. }
  378. //amqp_login() 登录代理服务器,
  379. //输入 连接参数结构体 amqp_connection_state_t,
  380. //输入 连接地址, 前面 amqp_socket_open() 已经输入了,这里默认写"/"
  381. //输入 连接通道最大值, 默认值0表示没有限制
  382. //输入 连接帧率最大值, 默认值是131072 (128KB)
  383. //输入 心跳帧之间的秒数, 默认值0禁用心跳
  384. //输入 身份验证模式, AMQP_SASL_METHOD_PLAIN, 追加用户名和密码
  385. // AMQP_SASL_METHOD_EXTERNAL, 追加身份证
  386. //返回 结果的结构体 amqp_rpc_reply_t
  387. // amqp_response_type_enum reply_type 登录成功是 AMQP_RESPONSE_NORMAL
  388. // 失败:如果是 reply_type == AMQP_RESPONSE_SERVER_EXCEPTION, 服务器连接错误, 错误信息在 amqp_method_t reply
  389. // 失败:如果是 reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION, 库函数错误, 错误信息在 int library_error
  390. t_reply = amqp_login(mp_connect, "/", 0, 131072, 0,
  391. AMQP_SASL_METHOD_PLAIN, m_user.c_str(), m_password.c_str());
  392. if (t_reply.reply_type != AMQP_RESPONSE_NORMAL) {
  393. return Error_manager(Error_code::RABBITMQ_AMQP_LOGIN_ERROR, Error_level::MINOR_ERROR,
  394. amqp_error_to_string(t_reply, "amqp_login"));
  395. }
  396. //清除channel_map, 通道的缓存,防止重复开启, (channel允许重复使用, 但是不能重复初始化)
  397. m_channel_map.clear();
  398. //创建通道队列消费者, (交换机和永久队列不在代码里创建,请在服务器上手动创建)
  399. t_error = rabbitmq_new_channel_queue_consume(m_rabbitmq_parameter_all);
  400. if (t_error != Error_code::SUCCESS) {
  401. return t_error;
  402. }
  403. //不用重启线程
  404. return Error_code::SUCCESS;
  405. }
  406. //设置 自动封装状态的时间周期
  407. void Rabbitmq_base::set_encapsulate_status_cycle_time(unsigned int encapsulate_status_cycle_time) {
  408. m_encapsulate_status_cycle_time = encapsulate_status_cycle_time;
  409. }
  410. //设置回调函数check_msg_callback
  411. void Rabbitmq_base::set_check_msg_callback(Error_manager (*callback)(Rabbitmq_message *p_msg)) {
  412. check_msg_callback = callback;
  413. }
  414. //设置回调函数check_executer_callback
  415. void Rabbitmq_base::set_check_executer_callback(Error_manager (*callback)(Rabbitmq_message *p_msg)) {
  416. check_executer_callback = callback;
  417. }
  418. //设置回调函数execute_msg_callback
  419. void Rabbitmq_base::set_execute_msg_callback(Error_manager (*callback)(Rabbitmq_message *p_msg)) {
  420. execute_msg_callback = callback;
  421. }
  422. //设置回调函数encapsulate_status_callback
  423. void Rabbitmq_base::set_encapsulate_status_callback(Error_manager (*callback)()) {
  424. encapsulate_status_callback = callback;
  425. }
  426. //mp_receive_analysis_thread 接受解析 执行函数,
  427. void Rabbitmq_base::receive_analysis_thread() {
  428. LOG(INFO) << " Rabbitmq_base::receive_analysis_thread start " << this;
  429. //通信接受线程, 负责接受socket消息, 并存入 m_receive_data_list
  430. while (m_receive_analysis_condition.is_alive()) {
  431. //这里就不需要超时等待了, rabbitmq的接受函数可以配置等待超时....
  432. // m_receive_analysis_condition.wait_for_ex(std::chrono::microseconds(1));
  433. m_receive_analysis_condition.wait();
  434. if (m_receive_analysis_condition.is_alive()) {
  435. std::this_thread::sleep_for(std::chrono::microseconds(100));
  436. std::this_thread::yield();
  437. amqp_rpc_reply_t t_reply; //运行结果
  438. amqp_envelope_t t_envelope; //数据包, 含有一些包裹属性和数据内容
  439. //接受消息等待超时,默认1000us, 当收到消息后,立刻通过阻塞,否则等待超时后通过阻塞
  440. struct timeval t_timeout; //超时时间, 默认1ms
  441. t_timeout.tv_sec = 0;
  442. t_timeout.tv_usec = 1000;
  443. {//这个大括号表示只对 recv 和 send 加锁, 不要因为后面的复杂逻辑影响通信效率
  444. std::unique_lock<std::mutex> lk(m_mutex);
  445. //允许释放连接参数状态的内存,
  446. // 因为这个连接是底层分配的内存,是全局的. 为了开启多个连接,就要重复使用
  447. //这里释放之后,其他代码就开启多线程开启新的连接了.
  448. amqp_maybe_release_buffers(mp_connect);
  449. //amqp_consume_message 接受消息, 阻塞函数,可以设置超时.
  450. //输入 amqp_connection_state_t state 连接状态参数的结构体
  451. //输入 amqp_envelope_t *envelope 接受数据包的指针, 成功接收到数据后,数据包会覆盖
  452. //输入 const struct timeval *timeout 超时时间, 防止阻塞. 传入NULL就是完全阻塞.
  453. //输入 int flags 未使用, 默认0
  454. //输入 amqp_connection_state_t state 连接状态参数的结构体
  455. //返回 状态结果的结构体 amqp_rpc_reply_t
  456. // amqp_response_type_enum reply_type 成功是 AMQP_RESPONSE_NORMAL
  457. // 失败:如果是 reply_type == AMQP_RESPONSE_SERVER_EXCEPTION, 服务器连接错误, 错误信息在 amqp_method_t reply
  458. // 失败:如果是 reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION, 库函数错误, 错误信息在 int library_error
  459. t_reply = amqp_consume_message(mp_connect, &t_envelope, &t_timeout, 0);
  460. }
  461. if (AMQP_RESPONSE_NORMAL == t_reply.reply_type)//正常接受到消息
  462. {
  463. m_rabbitmq_status = RABBITMQ_STATUS_READY;
  464. //从t_envelope数据包里面提取信息
  465. std::string t_receive_string = std::string((char *) t_envelope.message.body.bytes,
  466. t_envelope.message.body.len);
  467. int t_channel = t_envelope.channel;
  468. int t_delivery_tag = t_envelope.delivery_tag;
  469. std::string t_exchange_name = std::string((char *) t_envelope.exchange.bytes, t_envelope.exchange.len);
  470. std::string t_routing_key = std::string((char *) t_envelope.routing_key.bytes,
  471. t_envelope.routing_key.len);
  472. //如果这里接受到了消息, 在这提前解析消息最前面的Base_msg (消息公共内容), 用于后续的check
  473. message::Base_msg t_base_msg;
  474. // if( t_base_msg.ParseFromString(t_receive_string) )
  475. //删除 message::Base_msg 里面的 message::Base_info的机制,完全依赖服务器来分发消息
  476. if (true) {
  477. //第一次解析之后转化为, Communication_message, 自定义的通信消息格式
  478. Rabbitmq_message t_rabbitmq_message;
  479. t_rabbitmq_message.reset(t_base_msg.base_info(), t_receive_string, t_channel, t_delivery_tag,
  480. t_exchange_name, t_routing_key);
  481. //检查消息是否有效, 主要检查消息类型和接受者, 判断这条消息是不是给我的.
  482. if (check_msg(&t_rabbitmq_message) == SUCCESS) {
  483. //这里直接就用当前线程进行处理,
  484. //检查消息是否可以被处理
  485. if (check_executer(&t_rabbitmq_message) == SUCCESS) {
  486. //处理消息
  487. if (execute_msg(&t_rabbitmq_message) == SUCCESS) {
  488. }
  489. //else不做处理
  490. }
  491. //else不做处理
  492. }
  493. //else不做处理
  494. }
  495. //else解析失败, 就当做什么也没发生, 认为接收消息无效,
  496. else {
  497. std::cout << " huli test :::: " << " t_receive_string = " << t_receive_string << std::endl;
  498. if (t_channel == 401) {
  499. amqp_basic_ack(mp_connect, t_channel, t_delivery_tag, 0);
  500. }
  501. }
  502. //amqp_destroy_envelope 销毁数据包, 只有接受成功, t_envelope才有内存
  503. amqp_destroy_envelope(&t_envelope);
  504. } else//没有接受到消息
  505. {
  506. //超时报错,不做处理, continue
  507. //注注注注注意了, 没有收到消息会超时报错, res.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION, res.library_error = -13, (-0x000D request timed out)
  508. if (t_reply.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION && t_reply.library_error == -13) {
  509. m_rabbitmq_status = RABBITMQ_STATUS_READY;
  510. continue;
  511. } else//其他报错,特殊处理
  512. {
  513. //need
  514. std::string error_description = amqp_error_to_string(t_reply, "amqp_consume_message");
  515. LOG(WARNING) << " huli test 123123123:::: " << " error_description = " << error_description
  516. << std::endl;
  517. // return Error_manager(Error_code::RABBITMQ_AMQP_CONSUME_MESSAGE_ERROR, Error_level::MINOR_ERROR,
  518. // amqp_error_to_string(t_reply, "amqp_consume_message") );
  519. //重启
  520. rabbitmq_reconnnect();
  521. }
  522. }
  523. }
  524. }
  525. LOG(INFO) << " Rabbitmq_base::receive_analysis_thread end " << this;
  526. return;
  527. }
  528. //检查消息是否有效, 主要检查消息类型和接受者, 判断这条消息是不是给我的., 需要子类重载
  529. Error_manager Rabbitmq_base::check_msg(Rabbitmq_message *p_msg) {
  530. if (check_msg_callback != NULL) {
  531. return check_msg_callback(p_msg);
  532. }
  533. return Error_code::SUCCESS;
  534. }
  535. //检查执行者的状态, 判断能否处理这条消息, 需要子类重载
  536. Error_manager Rabbitmq_base::check_executer(Rabbitmq_message *p_msg) {
  537. if (check_executer_callback != NULL) {
  538. return check_executer_callback(p_msg);
  539. }
  540. return Error_code::SUCCESS;
  541. }
  542. //处理消息, 需要子类重载
  543. Error_manager Rabbitmq_base::execute_msg(Rabbitmq_message *p_msg) {
  544. if (execute_msg_callback != NULL) {
  545. return execute_msg_callback(p_msg);
  546. } else {
  547. //需要子类重载
  548. std::cout << " huli test :::: " << " execute_msg Rabbitmq_message = " << p_msg->get_message_buf() << std::endl;
  549. //如果是请求消息,那么在子节点继承的时候一定要记得调用
  550. //配置rabbitmq.proto时, 如果consume_no_ack == 0 , 一定要手动调用 amqp_basic_ack
  551. int consume_no_ack = 1;
  552. if (consume_no_ack == 0 || p_msg->m_channel == 401) {
  553. //amqp_basic_ack 确认消息, 通知服务器队列手动删除消息.
  554. //输入 amqp_connection_state_t state 连接状态参数的结构体
  555. //输入 amqp_channel_t channel 连接通道的编号
  556. //输入 uint64_t delivery_tag 消息传递编号,
  557. //输入 amqp_boolean_t multiple 多个标记位, 默认0, 1表示删除1~delivery_tag的所有消息, 不删除大于delivery_tag的, 0表示只删除这一条
  558. int ack_result = amqp_basic_ack(mp_connect, p_msg->m_channel, p_msg->m_delivery_tag, 0);
  559. }
  560. }
  561. return Error_code::SUCCESS;
  562. }
  563. //ack_msg 处理完消息后, 手动确认消息, 通知服务器队列删除消息.
  564. //执行者在execute_msg里面可以调用这个函数, 或者回调也行.
  565. Error_manager Rabbitmq_base::ack_msg(Rabbitmq_message *p_msg) {
  566. //amqp_basic_ack 确认消息, 通知服务器队列手动删除消息.
  567. //输入 amqp_connection_state_t state 连接状态参数的结构体
  568. //输入 amqp_channel_t channel 连接通道的编号
  569. //输入 uint64_t delivery_tag 消息传递编号,
  570. //输入 amqp_boolean_t multiple 多个标记位, 默认0, 1表示删除1~delivery_tag的所有消息, 不删除大于delivery_tag的, 0表示只删除这一条
  571. int ack_result = amqp_basic_ack(mp_connect, p_msg->m_channel, p_msg->m_delivery_tag, 0);
  572. if (ack_result != 0) {
  573. return Error_manager(Error_code::RABBITMQ_AMQP_BASIC_ACK_ERROR, Error_level::MINOR_ERROR,
  574. amqp_error_to_string(ack_result, "amqp_basic_ack"));
  575. }
  576. return Error_code::SUCCESS;
  577. }
  578. //mp_send_thread 发送线程执行函数,
  579. void Rabbitmq_base::send_thread() {
  580. LOG(INFO) << " Rabbitmq_base::send_thread start " << this;
  581. //通信发送线程, 负责巡检m_send_list, 并发送消息
  582. while (m_send_condition.is_alive()) {
  583. m_send_condition.wait();
  584. if (m_send_condition.is_alive()) {
  585. std::this_thread::yield();
  586. Rabbitmq_message *tp_msg = NULL;
  587. int t_result = 0;
  588. //这里 wait_and_pop 会使用链表内部的 m_data_cond 条件变量来控制等待,
  589. //封装线程使用push的时候, 会唤醒线程并通过等待, 此时 m_send_data_condition 是一直通过的.
  590. //如果需要退出, 那么就要 m_send_data_list.termination_list(); 和 m_send_data_condition.kill_all();
  591. bool is_pop = m_send_list.wait_and_pop(tp_msg);
  592. if (is_pop) {
  593. if (tp_msg != NULL) {
  594. //amqp_basic_properties_t 消息数据的基本属性,里面有15个成员.
  595. amqp_basic_properties_t props;
  596. //判断是否要设置发送消息的超时时间, 如果配置10秒,超时后,服务器会自动删除消息
  597. if (tp_msg->m_timeout_ms == std::chrono::milliseconds(0)) {
  598. //amqp_flags_t _flags 一个uint32_t, 按位 表示这15个属性的修改开关.
  599. //例如: _flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG = 0b 1001 0000 0000 0000;
  600. //就表示 content-type 和 delivery-mode 是有效属性. 接下来的设置就会生效.
  601. props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
  602. //amqp_bytes_t content_type 消息数据的类型 "text/plain"是 普通文本格式
  603. //注意了,需要使用 amqp_cstring_bytes() 将char*转为amqp_bytes_t(自定义的字符串, 类似于std::string)
  604. props.content_type = amqp_cstring_bytes("text/plain");
  605. //uint8_t delivery_mode 配送模式 2表示持续发送模式
  606. props.delivery_mode = AMQP_DELIVERY_PERSISTENT;
  607. } else {
  608. //amqp_flags_t _flags 一个uint32_t, 按位 表示这15个属性的修改开关.
  609. //例如: _flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG = 0b 1001 0000 0000 0000;
  610. //就表示 content-type 和 delivery-mode 是有效属性. 接下来的设置就会生效.
  611. props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG |
  612. AMQP_BASIC_EXPIRATION_FLAG;
  613. //amqp_bytes_t content_type 消息数据的类型 "text/plain"是 普通文本格式
  614. //注意了,需要使用 amqp_cstring_bytes() 将char*转为amqp_bytes_t(自定义的字符串, 类似于std::string)
  615. props.content_type = amqp_cstring_bytes("text/plain");
  616. //uint8_t delivery_mode 配送模式 2表示持续发送模式
  617. props.delivery_mode = AMQP_DELIVERY_PERSISTENT;
  618. char buf[256] = {0};
  619. sprintf(buf, "%d", (int) tp_msg->m_timeout_ms.count());
  620. props.expiration = amqp_cstring_bytes(buf);//超时, 单位ms;
  621. }
  622. {//这个大括号表示只对 recv 和 send 加锁, 不要因为后面的复杂逻辑影响通信效率
  623. std::unique_lock<std::mutex> lk(m_mutex);
  624. // std::cout << " huli test :::: " << " tp_msg->m_message_buf = " << tp_msg->m_message_buf << std::endl;
  625. //amqp_basic_publish() 发布消息给代理服务器, 在交换器上发布一个带有路由密钥的消息。交换机会根据路由密钥匹配,放到对应的队列里面
  626. //输入 amqp_connection_state_t state 连接状态参数的结构体
  627. //输入 amqp_channel_t channel 连接通道的编号
  628. //输入 amqp_bytes_t exchange 交换机模式字符串
  629. //输入 amqp_bytes_t routing_key 路由密钥字符串, 交换机的判断规则. 发送端的 routingkey 和 接收端的 bindingkey 需要保持一致
  630. //输入 amqp_boolean_t mandatory 强制服务器必须通过路由密钥才能存到队列, 默认为0
  631. //输入 amqp_boolean_t immediate 表示服务器必须立刻转发消息给接受者, 默认为0
  632. //输入 struct amqp_basic_properties_t_ const *properties 消息数据的基本属性
  633. //输入 amqp_bytes_t body 消息数据内容
  634. //返回错误码 成功返回AMQP_STATUS_OK = 0x0, 失败返回错误状态码, 详见 enum amqp_status_enum_
  635. //注注注注注意了::amqp_basic_publish()是异步通信,
  636. // return AMQP_STATUS_OK 也只是表示消息成功发送到服务器. 无法确认 接收端是否正常接受消息
  637. t_result = amqp_basic_publish(mp_connect, tp_msg->m_channel,
  638. amqp_cstring_bytes(tp_msg->m_exchange_name.c_str()),
  639. amqp_cstring_bytes(tp_msg->m_routing_key.c_str()), 0, 0,
  640. &props, amqp_cstring_bytes(tp_msg->m_message_buf.c_str()));
  641. }
  642. if (t_result == AMQP_STATUS_OK) {
  643. m_rabbitmq_status = RABBITMQ_STATUS_READY;
  644. delete (tp_msg);
  645. tp_msg = NULL;
  646. // std::string re = amqp_error_to_string(t_result, "amqp_basic_publish");
  647. // std::cout << " huli test :::: " << " re = " << re << std::endl;
  648. // return Error_manager(Error_code::RABBITMQ_AMQP_BASIC_PUBLISH_ERROR, Error_level::MINOR_ERROR,
  649. // amqp_error_to_string(t_result, "amqp_basic_publish") );
  650. } else {
  651. std::string re = amqp_error_to_string(t_result, "amqp_basic_publish");
  652. std::cout << " huli test :::: " << " re = " << re << std::endl;
  653. //重启
  654. m_rabbitmq_status = RABBITMQ_STATUS_RECONNNECT;
  655. m_send_list.push(tp_msg); //重新加入队列,下一次再发
  656. tp_msg = NULL;
  657. rabbitmq_reconnnect();
  658. }
  659. }
  660. } else {
  661. //没有取出, 那么应该就是 m_termination_flag 结束了
  662. // return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR,
  663. // " Communication_socket_base::send_data_thread() error ");
  664. }
  665. }
  666. }
  667. LOG(INFO) << " Rabbitmq_base::send_thread end " << this;
  668. return;
  669. }
  670. //手动封装消息,需要手动写入参数channel,exchange_name,routing_key
  671. Error_manager
  672. Rabbitmq_base::encapsulate_msg(std::string message, int channel, std::string exchange_name, std::string routing_key,
  673. int timeout_ms = 0) {
  674. if (m_rabbitmq_status != RABBITMQ_STATUS_READY) {
  675. LOG(ERROR) << " m_rabbitmq_status error ";
  676. return Error_manager(Error_code::ERROR, Error_level::MINOR_ERROR,
  677. " m_rabbitmq_status error ");
  678. }
  679. // LOG(INFO) << exchange_name << " " << routing_key;
  680. Rabbitmq_message *tp_msg = new Rabbitmq_message(message, channel, exchange_name, routing_key, timeout_ms);
  681. bool is_push = m_send_list.push(tp_msg);
  682. if (is_push == false) {
  683. delete (tp_msg);
  684. tp_msg = NULL;
  685. return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR,
  686. " Communication_socket_base::encapsulate_msg error ");
  687. }
  688. return Error_code::SUCCESS;
  689. }
  690. //手动封装消息,需要手动写入参数channel,exchange_name,routing_key
  691. Error_manager Rabbitmq_base::encapsulate_msg(Rabbitmq_message *p_msg) {
  692. Rabbitmq_message *tp_msg = new Rabbitmq_message(*p_msg);
  693. bool is_push = m_send_list.push(tp_msg);
  694. if (is_push == false) {
  695. delete (tp_msg);
  696. tp_msg = NULL;
  697. return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR,
  698. " Communication_socket_base::encapsulate_msg error ");
  699. }
  700. return Error_code::SUCCESS;
  701. }
  702. //手动封装任务消息(请求和答复), 系统会使用rabbitmq.proto的配置参数,
  703. Error_manager Rabbitmq_base::encapsulate_task_msg(std::string message, int vector_index) {
  704. int channel = m_rabbitmq_parameter_all.rabbitmq_parameters().rabbitmq_sender_request_vector(vector_index).channel();
  705. std::string exchange_name = m_rabbitmq_parameter_all.rabbitmq_parameters().rabbitmq_sender_request_vector(
  706. vector_index).exchange_name();
  707. std::string routing_key = m_rabbitmq_parameter_all.rabbitmq_parameters().rabbitmq_sender_request_vector(
  708. vector_index).routing_key();
  709. int timeout_ms = m_rabbitmq_parameter_all.rabbitmq_parameters().rabbitmq_sender_request_vector(
  710. vector_index).timeout_ms();
  711. Rabbitmq_message *tp_msg = new Rabbitmq_message(message, channel, exchange_name, routing_key, timeout_ms);
  712. bool is_push = m_send_list.push(tp_msg);
  713. if (is_push == false) {
  714. delete (tp_msg);
  715. tp_msg = NULL;
  716. return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR,
  717. " Communication_socket_base::encapsulate_msg error ");
  718. }
  719. return Error_code::SUCCESS;
  720. }
  721. //手动封装状态消息, 系统会使用rabbitmq.proto的配置参数,
  722. Error_manager Rabbitmq_base::encapsulate_status_msg(std::string message, int vector_index) {
  723. if (vector_index >= m_rabbitmq_parameter_all.rabbitmq_parameters().rabbitmq_sender_status_vector().size()) {
  724. LOG(WARNING) << "vector index error.";
  725. return {FAILED, NORMAL};
  726. }
  727. int channel = m_rabbitmq_parameter_all.rabbitmq_parameters().rabbitmq_sender_status_vector(vector_index).channel();
  728. std::string exchange_name = m_rabbitmq_parameter_all.rabbitmq_parameters().rabbitmq_sender_status_vector(
  729. vector_index).exchange_name();
  730. std::string routing_key = m_rabbitmq_parameter_all.rabbitmq_parameters().rabbitmq_sender_status_vector(
  731. vector_index).routing_key();
  732. int timeout_ms = m_rabbitmq_parameter_all.rabbitmq_parameters().rabbitmq_sender_status_vector(
  733. vector_index).timeout_ms();
  734. Rabbitmq_message *tp_msg = new Rabbitmq_message(message, channel, exchange_name, routing_key, timeout_ms);
  735. bool is_push = m_send_list.push(tp_msg);
  736. if (is_push == false) {
  737. delete (tp_msg);
  738. tp_msg = NULL;
  739. return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR,
  740. " Communication_socket_base::encapsulate_msg error ");
  741. }
  742. return Error_code::SUCCESS;
  743. }
  744. //mp_encapsulate_stauts_thread 自动封装线程执行函数,
  745. void Rabbitmq_base::encapsulate_status_thread() {
  746. LOG(INFO) << " Rabbitmq_base::encapsulate_status_thread start " << this;
  747. //通信封装线程, 负责定时封装消息, 并存入 m_send_data_list
  748. while (m_encapsulate_status_condition.is_alive()) {
  749. bool t_pass_flag = m_encapsulate_status_condition.wait_for_millisecond(m_encapsulate_status_cycle_time);
  750. if (m_encapsulate_status_condition.is_alive()) {
  751. std::this_thread::yield();
  752. //如果封装线程被主动唤醒, 那么就表示 需要主动发送消息,
  753. if (t_pass_flag) {
  754. //主动发送消息,
  755. }
  756. //如果封装线程超时通过, 那么就定时封装心跳和状态信息
  757. else {
  758. //只有通信正常的时候,才封装发送状态消息
  759. if (m_rabbitmq_status == RABBITMQ_STATUS_READY) {
  760. auto_encapsulate_status();
  761. }
  762. }
  763. }
  764. }
  765. LOG(INFO) << " Rabbitmq_base::encapsulate_status_thread end " << this;
  766. return;
  767. }
  768. //定时封装发送消息, 一般为心跳和状态信息, 需要子类重载
  769. Error_manager Rabbitmq_base::auto_encapsulate_status() {
  770. if (encapsulate_status_callback != NULL) {
  771. return encapsulate_status_callback();
  772. }
  773. return Error_code::SUCCESS;
  774. }
  775. //把rabbitmq的错误信息转化为string, amqp_status就是enum amqp_status_enum_, amqp_error_string2()函数可以把他翻译为string
  776. std::string Rabbitmq_base::amqp_error_to_string(int amqp_status) {
  777. char buf[256] = {0};
  778. sprintf(buf, "amqp_status = 0x%x, %s", amqp_status, amqp_error_string2(amqp_status));
  779. return buf;
  780. }
  781. //把rabbitmq的错误信息转化为string, amqp_status就是enum amqp_status_enum_, amqp_error_string2()函数可以把他翻译为string
  782. std::string Rabbitmq_base::amqp_error_to_string(int amqp_status, std::string amqp_fun_name) {
  783. char buf[256] = {0};
  784. sprintf(buf, "amqp_fun_name = %s, amqp_status = 0x%x, %s", amqp_fun_name.c_str(), amqp_status,
  785. amqp_error_string2(amqp_status));
  786. return buf;
  787. }
  788. //把rabbitmq的错误信息转化为string, amqp_rpc_reply_t就是amqp函数运行的结果
  789. std::string Rabbitmq_base::amqp_error_to_string(amqp_rpc_reply_t amqp_rpc_reply) {
  790. char buf[256] = {0};
  791. // amqp_response_type_enum reply_type 登录成功是 AMQP_RESPONSE_NORMAL
  792. // 失败:如果是 reply_type == AMQP_RESPONSE_SERVER_EXCEPTION, 服务器连接错误, 错误信息在 amqp_method_t reply
  793. // 失败:如果是 reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION, 库函数错误, 错误信息在 int library_error
  794. switch (amqp_rpc_reply.reply_type) {
  795. case AMQP_RESPONSE_NORMAL: {
  796. sprintf(buf, "SUCCESS");
  797. break;
  798. }
  799. case AMQP_RESPONSE_NONE: {
  800. sprintf(buf, " reply_type = AMQP_RESPONSE_NONE ");
  801. break;
  802. }
  803. case AMQP_RESPONSE_LIBRARY_EXCEPTION: {
  804. sprintf(buf, " reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION, library_error = %s, ",
  805. amqp_error_string2(amqp_rpc_reply.library_error));
  806. break;
  807. }
  808. case AMQP_RESPONSE_SERVER_EXCEPTION: {
  809. if (amqp_rpc_reply.reply.id == AMQP_CONNECTION_CLOSE_METHOD) {
  810. amqp_connection_close_t *p_decoded = (amqp_connection_close_t *) amqp_rpc_reply.reply.decoded;
  811. sprintf(buf, " reply.id = AMQP_CONNECTION_CLOSE_METHOD, reply = %u, %.*s ",
  812. p_decoded->reply_code, (int) p_decoded->reply_text.len, (char *) p_decoded->reply_text.bytes);
  813. } else if (amqp_rpc_reply.reply.id == AMQP_CHANNEL_CLOSE_METHOD) {
  814. amqp_channel_close_t *p_decoded = (amqp_channel_close_t *) amqp_rpc_reply.reply.decoded;
  815. sprintf(buf, " reply.id = AMQP_CHANNEL_CLOSE_METHOD, reply = %u, %.*s ",
  816. p_decoded->reply_code, (int) p_decoded->reply_text.len, (char *) p_decoded->reply_text.bytes);
  817. } else {
  818. sprintf(buf, " reply_type = AMQP_RESPONSE_SERVER_EXCEPTION ");
  819. }
  820. break;
  821. }
  822. default: {
  823. sprintf(buf, " reply_type = unknown, reply.id = 0x%08X, ",
  824. amqp_rpc_reply.reply.id);
  825. break;
  826. }
  827. }
  828. return buf;
  829. }
  830. //把rabbitmq的错误信息转化为string, amqp_rpc_reply_t就是amqp函数运行的结果
  831. std::string Rabbitmq_base::amqp_error_to_string(amqp_rpc_reply_t amqp_rpc_reply, std::string amqp_fun_name) {
  832. char buf[256] = {0};
  833. // amqp_response_type_enum reply_type 登录成功是 AMQP_RESPONSE_NORMAL
  834. // 失败:如果是 reply_type == AMQP_RESPONSE_SERVER_EXCEPTION, 服务器连接错误, 错误信息在 amqp_method_t reply
  835. // 失败:如果是 reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION, 库函数错误, 错误信息在 int library_error
  836. switch (amqp_rpc_reply.reply_type) {
  837. case AMQP_RESPONSE_NORMAL: {
  838. sprintf(buf, "SUCCESS");
  839. break;
  840. }
  841. case AMQP_RESPONSE_NONE: {
  842. sprintf(buf, "amqp_fun_name = %s, reply_type = AMQP_RESPONSE_NONE ", amqp_fun_name.c_str());
  843. break;
  844. }
  845. case AMQP_RESPONSE_LIBRARY_EXCEPTION: {
  846. // huli test 123123123:::: error_description = amqp_fun_name = amqp_consume_message, reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION, library_error = unexpected protocol state,
  847. sprintf(buf, "amqp_fun_name = %s, reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION, library_error = %s, ",
  848. amqp_fun_name.c_str(), amqp_error_string2(amqp_rpc_reply.library_error));
  849. break;
  850. }
  851. case AMQP_RESPONSE_SERVER_EXCEPTION: {
  852. if (amqp_rpc_reply.reply.id == AMQP_CONNECTION_CLOSE_METHOD) {
  853. amqp_connection_close_t *p_decoded = (amqp_connection_close_t *) amqp_rpc_reply.reply.decoded;
  854. sprintf(buf, "amqp_fun_name = %s, reply.id = AMQP_CONNECTION_CLOSE_METHOD, reply = %u, %.*s ",
  855. amqp_fun_name.c_str(), p_decoded->reply_code, (int) p_decoded->reply_text.len,
  856. (char *) p_decoded->reply_text.bytes);
  857. } else if (amqp_rpc_reply.reply.id == AMQP_CHANNEL_CLOSE_METHOD) {
  858. amqp_channel_close_t *p_decoded = (amqp_channel_close_t *) amqp_rpc_reply.reply.decoded;
  859. sprintf(buf, "amqp_fun_name = %s, reply.id = AMQP_CHANNEL_CLOSE_METHOD, reply = %u, %.*s ",
  860. amqp_fun_name.c_str(), p_decoded->reply_code, (int) p_decoded->reply_text.len,
  861. (char *) p_decoded->reply_text.bytes);
  862. } else {
  863. sprintf(buf, "amqp_fun_name = %s, reply_type = AMQP_RESPONSE_SERVER_EXCEPTION ", amqp_fun_name.c_str());
  864. }
  865. break;
  866. }
  867. default: {
  868. sprintf(buf, "amqp_fun_name = %s, reply_type = unknown, reply.id = 0x%08X, ",
  869. amqp_fun_name.c_str(), amqp_rpc_reply.reply.id);
  870. break;
  871. }
  872. }
  873. return buf;
  874. }