rabbitmq_base.cpp 47 KB


  1. #include "rabbitmq_base.h"
  2. #include "tool/time_tool.h"
  3. Rabbitmq_base::Rabbitmq_base()
  4. {
  5. m_rabbitmq_status = RABBITMQ_STATUS_UNKNOW;
  6. mp_connect = NULL;
  7. mp_socket = NULL;
  8. m_port = 0;
  9. mp_receive_analysis_thread = NULL;
  10. mp_send_thread = NULL;
  11. mp_encapsulate_status_thread = NULL;
  12. m_encapsulate_status_cycle_time = 1000;//默认1000ms,就自动封装一次状态信息
  13. check_msg_callback = NULL;
  14. check_executer_callback = NULL;
  15. execute_msg_callback = NULL;
  16. encapsulate_status_callback = NULL;
  17. }
  18. Rabbitmq_base::~Rabbitmq_base()
  19. {
  20. rabbitmq_uninit();
  21. }
  22. //初始化 通信 模块。如下三选一
  23. Error_manager Rabbitmq_base::rabbitmq_init()
  24. {
  25. LOG(INFO) << RABBITMQ_PARAMETER_PATH;
  26. return rabbitmq_init_from_protobuf(RABBITMQ_PARAMETER_PATH);
  27. }
  28. //初始化 通信 模块。从文件读取
  29. Error_manager Rabbitmq_base::rabbitmq_init_from_protobuf(std::string prototxt_path)
  30. {
  31. Rabbitmq_proto::Rabbitmq_parameter_all t_rabbitmq_parameter_all;
  32. if(!proto_tool::read_proto_param(prototxt_path,t_rabbitmq_parameter_all) )
  33. {
  34. return Error_manager(RABBITMQ_READ_PROTOBUF_ERROR,MINOR_ERROR,
  35. "rabbitmq_init_from_protobuf read_proto_param failed");
  36. }
  37. LOG(INFO) << "read rabbitmq params success.";
  38. return rabbitmq_init_from_protobuf(t_rabbitmq_parameter_all);
  39. }
  40. //初始化 通信 模块。从protobuf读取
  41. Error_manager Rabbitmq_base::rabbitmq_init_from_protobuf(Rabbitmq_proto::Rabbitmq_parameter_all &rabbitmq_parameter_all)
  42. {
  43. LOG(INFO) << " ---Rabbitmq_base::rabbitmq_init_from_protobuf() run--- "<< this;
  44. int t_status=0; //状态
  45. amqp_rpc_reply_t t_reply; //reply答复结果
  46. Error_manager t_error;
  47. m_rabbitmq_parameter_all = rabbitmq_parameter_all;
  48. //amqp_new_connection 新建amqp的连接配置,里面只有连接状态参数
  49. // 返回amqp_connection_state_t_ *, 函数内部分配内存, amqp_destroy_connection()可以释放内存, 内存不为空则成功
  50. mp_connect = amqp_new_connection();
  51. if ( mp_connect == NULL )
  52. {
  53. return Error_manager(Error_code::RABBITMQ_AMQP_NEW_CONNECTION_ERROR, Error_level::MINOR_ERROR,
  54. "amqp_new_connection fun error ");
  55. }
  56. //amqp_tcp_socket_new 新建tcp_socket连接
  57. // 返回amqp_socket_t *, 函数内部分配内存, amqp_connection_close()可以释放内存, 内存不为空则成功
  58. mp_socket = amqp_tcp_socket_new(mp_connect);
  59. if ( mp_socket == NULL )
  60. {
  61. return Error_manager(Error_code::RABBITMQ_AMQP_TCP_SOCKET_NEW_ERROR, Error_level::MINOR_ERROR,
  62. "amqp_tcp_socket_new fun error ");
  63. }
  64. //载入外部参数
  65. if (rabbitmq_parameter_all.rabbitmq_parameters().has_ip() &&
  66. rabbitmq_parameter_all.rabbitmq_parameters().has_port() &&
  67. rabbitmq_parameter_all.rabbitmq_parameters().has_user() &&
  68. rabbitmq_parameter_all.rabbitmq_parameters().has_password() )
  69. {
  70. m_ip = rabbitmq_parameter_all.rabbitmq_parameters().ip();
  71. m_port = rabbitmq_parameter_all.rabbitmq_parameters().port();
  72. m_user = rabbitmq_parameter_all.rabbitmq_parameters().user();
  73. m_password = rabbitmq_parameter_all.rabbitmq_parameters().password();
  74. }
  75. else
  76. {
  77. return Error_manager(Error_code::RABBITMQ_PROTOBUF_LOSS_ERROR, Error_level::MINOR_ERROR,
  78. " rabbitmq_parameter_all.rabbitmq_parameters() The data is not complete ");
  79. }
  80. //amqp_socket_open 打开socket连接, 输入ip和port,
  81. // 成功返回AMQP_STATUS_OK = 0x0, 失败返回错误状态码, 详见 enum amqp_status_enum_
  82. //只需要设置配置服务器的ip和port, 不需要配置子节点客户端的ip和port, 在后面配置channel通道时,进行设置.
  83. t_status = amqp_socket_open(mp_socket, m_ip.c_str(), m_port);
  84. if ( t_status != AMQP_STATUS_OK )
  85. {
  86. return Error_manager(Error_code::RABBITMQ_AMQP_SOCKET_OPEN_ERROR, Error_level::MINOR_ERROR,
  87. amqp_error_to_string(t_status, "amqp_socket_open") );
  88. }
  89. //amqp_login() 登录代理服务器,
  90. //输入 连接参数结构体 amqp_connection_state_t,
  91. //输入 连接地址, 前面 amqp_socket_open() 已经输入了,这里默认写"/"
  92. //输入 连接通道最大值, 默认值0表示没有限制
  93. //输入 连接帧率最大值, 默认值是131072 (128KB)
  94. //输入 心跳帧之间的秒数, 默认值0禁用心跳
  95. //输入 身份验证模式, AMQP_SASL_METHOD_PLAIN, 追加用户名和密码
  96. // AMQP_SASL_METHOD_EXTERNAL, 追加身份证
  97. //返回 结果的结构体 amqp_rpc_reply_t
  98. // amqp_response_type_enum reply_type 登录成功是 AMQP_RESPONSE_NORMAL
  99. // 失败:如果是 reply_type == AMQP_RESPONSE_SERVER_EXCEPTION, 服务器连接错误, 错误信息在 amqp_method_t reply
  100. // 失败:如果是 reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION, 库函数错误, 错误信息在 int library_error
  101. t_reply = amqp_login(mp_connect, "/", 0, 131072, 0,
  102. AMQP_SASL_METHOD_PLAIN, m_user.c_str(), m_password.c_str());
  103. if ( t_reply.reply_type != AMQP_RESPONSE_NORMAL )
  104. {
  105. return Error_manager(Error_code::RABBITMQ_AMQP_LOGIN_ERROR, Error_level::MINOR_ERROR,
  106. amqp_error_to_string(t_reply, "amqp_login") );
  107. }
  108. //清除channel_map, 通道的缓存,防止重复开启, (channel允许重复使用, 但是不能重复初始化)
  109. m_channel_map.clear();
  110. //创建通道队列消费者, (交换机和永久队列不在代码里创建,请在服务器上手动创建)
  111. t_error = rabbitmq_new_channel_queue_consume(rabbitmq_parameter_all);
  112. if ( t_error != Error_code::SUCCESS )
  113. {
  114. return t_error;
  115. }
  116. //启动通信, 开启线程, run thread
  117. t_error = rabbitmq_run();
  118. if ( t_error != Error_code::SUCCESS )
  119. {
  120. return t_error;
  121. }
  122. return Error_code::SUCCESS;
  123. }
  124. //创建通道队列消费者, (交换机和永久队列不在代码里创建,请在服务器上手动创建)
  125. Error_manager Rabbitmq_base::rabbitmq_new_channel_queue_consume(Rabbitmq_proto::Rabbitmq_parameter_all &rabbitmq_parameter_all)
  126. {
  127. int t_status=0; //状态
  128. amqp_rpc_reply_t t_reply; //reply答复结果
  129. Error_manager t_error;
  130. ///Rabbitmq 接受的通道,队列和消费者, 多个
  131. for(int i=0;i<rabbitmq_parameter_all.rabbitmq_parameters().rabbitmq_reciever_vector_size();++i)
  132. {
  133. //Rabbitmq 配置的通道,队列和消费者,
  134. Rabbitmq_proto::Rabbitmq_channel_queue_consume t_inf =
  135. rabbitmq_parameter_all.rabbitmq_parameters().rabbitmq_reciever_vector(i);
  136. //通道查重,防止重复开启(channel允许重复使用, 但是不能重复初始化)
  137. if ( m_channel_map.find(t_inf.channel()) == m_channel_map.end() )
  138. {
  139. //amqp_channel_open() 打开连接通道, 同一台电脑可以多个进程和线程进行连接服务器, 每个连接需要自己独特的通道.
  140. amqp_channel_open(mp_connect, t_inf.channel());
  141. //amqp_get_rpc_reply() 获取当前网络连接的状态结果.
  142. t_reply = amqp_get_rpc_reply(mp_connect);
  143. if ( t_reply.reply_type != AMQP_RESPONSE_NORMAL )
  144. {
  145. return Error_manager(Error_code::RABBITMQ_AMQP_CHANNEL_OPEN_ERROR, Error_level::MINOR_ERROR,
  146. amqp_error_to_string(t_reply, "amqp_channel_open") );
  147. }
  148. if ( t_inf.consume_no_ack() == 0 )
  149. {
  150. //amqp_basic_qos设置通道每次只能接受一条消息, 直到该消息被ack,才能接受下一条.状态消息可以继续接受
  151. //uint16_t prefetch_count 同时接受消息的个数, 这里固定写1,
  152. //配合 amqp_basic_qos 和 amqp_basic_ack , 来阻塞这个通道的接受消息
  153. //注:请求消息no_ack==0, 当接受一条指令后,该通道被阻塞,其他通道仍然正常接受, 等到任务被执行完,手动调用amqp_basic_ack函数, 则可以继续接受请求消息.
  154. //注:状态消息no_ack==1, 当接受一条指令后,该状态消息立刻被删除,然后可以继续接受下一条状态消息.
  155. amqp_basic_qos(mp_connect, t_inf.channel(), 0, PREFETCH_COUNT, 0);
  156. }
  157. m_channel_map[t_inf.channel()] = true;
  158. }
  159. //临时队列需要代码创建, 永久队列需要在服务器上提前手动创建
  160. if ( t_inf.queue_durable() == 0 )
  161. {
  162. //目前只填充超时时间, x-message-ttl 队列接受消息 的超时时间 (单位毫秒)
  163. if ( t_inf.queue_meassage_ttl() != 0 )
  164. {
  165. amqp_table_t t_arguments; //队列的扩展属性 num_entries 是map长度, amqp_table_entry_t_ 是map指针
  166. //目前只填充超时时间, x-message-ttl 队列接受消息 的超时时间 (单位毫秒)
  167. t_arguments.num_entries = 1;
  168. amqp_table_entry_t_ t_map_arg;
  169. t_map_arg.key = amqp_cstring_bytes("x-message-ttl"); //需要配置的参数
  170. t_map_arg.value.kind = AMQP_FIELD_KIND_U16; //需要配置的数据类型, 如果是字符串, 写 AMQP_FIELD_KIND_UTF8
  171. t_map_arg.value.value.u16 = t_inf.queue_meassage_ttl(); //需要配置的数值
  172. t_arguments.entries = &t_map_arg;
  173. //amqp_queue_declare() 队列声明, 就是创建新的队列.
  174. //输入 amqp_connection_state_t state 连接状态参数的结构体
  175. //输入 amqp_channel_t channel 连接通道的编号
  176. //输入 amqp_bytes_t queue 队列名称,可以手动命名,如果写空,系统就会自动分配, 手动写amqp_cstring_bytes("abcdefg"), 默认空 amqp_empty_bytes
  177. //输入 amqp_boolean_t passive 是否被动,默认0
  178. //输入 amqp_boolean_t durable 是否持久,默认0, 节点代码可以创建临时队列(所有权归节点), 服务器手动创建永久队列(所有权归服务器)
  179. // 1表示永久队列,当节点死掉,队列在服务器保留,仍然可以接受数据,节点上线后,可以接受掉线期间的所有数据
  180. // 0表示临时队列,当节点死掉,队列消失,不再接受数据,直到下次恢复正常
  181. //输入 amqp_boolean_t exclusive 是否独立,默认0
  182. //输入 amqp_boolean_t auto_delete 是否自动删除,默认0, 1表示消息被消费者接受后,就自动删除消息, 当接收端断连后,队列也会才删除,
  183. // 一般情况下设为0,然后让接受者手动删除.
  184. //输入 amqp_table_t arguments 预留参数,默认空 amqp_empty_table
  185. //返回 amqp_queue_declare_ok_t * 返回结果
  186. amqp_queue_declare(mp_connect, t_inf.channel(), amqp_cstring_bytes(t_inf.queue_name().c_str()),
  187. t_inf.queue_passive(), t_inf.queue_durable(), t_inf.queue_exclusive(),
  188. t_inf.queue_auto_delete(), t_arguments);
  189. }
  190. else
  191. {
  192. //amqp_queue_declare() 队列声明, 就是创建新的队列.
  193. //输入 amqp_connection_state_t state 连接状态参数的结构体
  194. //输入 amqp_channel_t channel 连接通道的编号
  195. //输入 amqp_bytes_t queue 队列名称,可以手动命名,如果写空,系统就会自动分配, 手动写amqp_cstring_bytes("abcdefg"), 默认空 amqp_empty_bytes
  196. //输入 amqp_boolean_t passive 是否被动,默认0
  197. //输入 amqp_boolean_t durable 是否持久,默认0, 节点代码可以创建临时队列(所有权归节点), 服务器手动创建永久队列(所有权归服务器)
  198. // 1表示永久队列,当节点死掉,队列在服务器保留,仍然可以接受数据,节点上线后,可以接受掉线期间的所有数据
  199. // 0表示临时队列,当节点死掉,队列消失,不再接受数据,直到下次恢复正常
  200. //输入 amqp_boolean_t exclusive 是否独立,默认0
  201. //输入 amqp_boolean_t auto_delete 是否自动删除,默认0, 1表示消息被消费者接受后,就自动删除消息, 当接收端断连后,队列也会才删除,
  202. // 一般情况下设为0,然后让接受者手动删除.
  203. //输入 amqp_table_t arguments 预留参数,默认空 amqp_empty_table
  204. //返回 amqp_queue_declare_ok_t * 返回结果
  205. amqp_queue_declare(mp_connect, t_inf.channel(), amqp_cstring_bytes(t_inf.queue_name().c_str()),
  206. t_inf.queue_passive(), t_inf.queue_durable(), t_inf.queue_exclusive(),
  207. t_inf.queue_auto_delete(), amqp_empty_table);
  208. }
  209. //amqp_queue_bind 队列绑定, 将队列加载到服务器的交换机下面, 交换机收到消息后,就会检查key,然后放到指定的队列.
  210. //输入 amqp_connection_state_t state 连接状态参数的结构体
  211. //输入 amqp_channel_t channel 连接通道的编号
  212. //输入 amqp_bytes_t queue 队列名称,
  213. //输入 amqp_bytes_t exchange 交换机模式字符串
  214. //输入 amqp_bytes_t bindingkey 绑定密钥字符串, 交换机的判断规则. 发送端的 routingkey 和 接收端的 bindingkey 需要保持一致
  215. //输入 amqp_table_t arguments 预留参数,默认空 amqp_empty_table
  216. //返回 amqp_queue_bind_ok_t * 返回结果
  217. //注注注注注意了, 队列绑定交换机时,必须保证交换机是有效的.否则报错
  218. amqp_queue_bind(mp_connect, t_inf.channel(), amqp_cstring_bytes(t_inf.queue_name().c_str()),
  219. amqp_cstring_bytes(t_inf.exchange_name().c_str()),
  220. amqp_cstring_bytes(t_inf.binding_key().c_str()), amqp_empty_table);
  221. amqp_rpc_reply_t t_reply = amqp_get_rpc_reply(mp_connect);
  222. if ( t_reply.reply_type != AMQP_RESPONSE_NORMAL )
  223. {
  224. return Error_manager(Error_code::RABBITMQ_AMQP_QUEUE_BIND_ERROR, Error_level::MINOR_ERROR,
  225. amqp_error_to_string(t_reply, "amqp_queue_bind") );
  226. }
  227. }
  228. //amqp_basic_consume 创建基本类型的消费者,就是接收端, 消费者绑定队列,只能接受一个队列里面的消息
  229. //输入 amqp_connection_state_t state 连接状态参数的结构体
  230. //输入 amqp_channel_t channel 连接通道的编号
  231. //输入 amqp_bytes_t queue 队列名称,
  232. //输入 amqp_bytes_t consumer_tag 消费者名称
  233. //输入 amqp_boolean_t no_local 是否非本地, 默认0,表示本地
  234. //输入 amqp_boolean_t no_ack, 是否确认应答,默认0,表示接收后需要应答
  235. //输入 amqp_boolean_t exclusive 是否独立,默认0
  236. //输入 amqp_table_t arguments 预留参数,默认空 amqp_empty_table
  237. //返回 amqp_basic_consume_ok_t * 返回结果
  238. //注注注注注意了, 接受端绑定队列时,必须保证队列是有效的,否则报错,
  239. amqp_basic_consume(mp_connect, t_inf.channel(), amqp_cstring_bytes(t_inf.queue_name().c_str()),
  240. amqp_cstring_bytes(t_inf.consume_name().c_str()), t_inf.consume_no_local(),
  241. t_inf.consume_no_ack(), t_inf.consume_exclusive(), amqp_empty_table);
  242. amqp_rpc_reply_t t_reply = amqp_get_rpc_reply(mp_connect);
  243. if ( t_reply.reply_type != AMQP_RESPONSE_NORMAL )
  244. {
  245. return Error_manager(Error_code::RABBITMQ_AMQP_NEW_CONSUME_ERROR, Error_level::MINOR_ERROR,
  246. amqp_error_to_string(t_reply, "amqp_basic_consume") );
  247. }
  248. }
  249. //Rabbitmq 发送请求的通道
  250. for(int i=0;i<rabbitmq_parameter_all.rabbitmq_parameters().rabbitmq_sender_request_vector_size();++i)
  251. {
  252. //Rabbitmq 配置发送通道
  253. Rabbitmq_proto::Rabbitmq_channel_queue_consume t_inf1 =
  254. rabbitmq_parameter_all.rabbitmq_parameters().rabbitmq_sender_request_vector(i);
  255. //通道查重,防止重复开启(channel允许重复使用, 但是不能重复初始化)
  256. if ( m_channel_map.find(t_inf1.channel()) == m_channel_map.end() )
  257. {
  258. //amqp_channel_open() 打开连接通道, 同一台电脑可以多个进程和线程进行连接服务器, 每个连接需要自己独特的通道.
  259. amqp_channel_open(mp_connect, t_inf1.channel());
  260. //amqp_get_rpc_reply() 获取当前网络连接的状态结果.
  261. t_reply = amqp_get_rpc_reply(mp_connect);
  262. if ( t_reply.reply_type != AMQP_RESPONSE_NORMAL )
  263. {
  264. return Error_manager(Error_code::RABBITMQ_AMQP_CHANNEL_OPEN_ERROR, Error_level::MINOR_ERROR,
  265. amqp_error_to_string(t_reply, "amqp_channel_open") );
  266. }
  267. m_channel_map[t_inf1.channel()] = true;
  268. }
  269. }
  270. //Rabbitmq 发送状态的通道
  271. for(int i=0;i<rabbitmq_parameter_all.rabbitmq_parameters().rabbitmq_sender_status_vector_size();++i)
  272. {
  273. //Rabbitmq 配置发送通道
  274. Rabbitmq_proto::Rabbitmq_channel_queue_consume t_inf2 =
  275. rabbitmq_parameter_all.rabbitmq_parameters().rabbitmq_sender_status_vector(i);
  276. //通道查重,防止重复开启(channel允许重复使用, 但是不能重复初始化)
  277. if ( m_channel_map.find(t_inf2.channel()) == m_channel_map.end() )
  278. {
  279. //amqp_channel_open() 打开连接通道, 同一台电脑可以多个进程和线程进行连接服务器, 每个连接需要自己独特的通道.
  280. amqp_channel_open(mp_connect, t_inf2.channel());
  281. //amqp_get_rpc_reply() 获取当前网络连接的状态结果.
  282. t_reply = amqp_get_rpc_reply(mp_connect);
  283. if ( t_reply.reply_type != AMQP_RESPONSE_NORMAL )
  284. {
  285. return Error_manager(Error_code::RABBITMQ_AMQP_CHANNEL_OPEN_ERROR, Error_level::MINOR_ERROR,
  286. amqp_error_to_string(t_reply, "amqp_channel_open") );
  287. }
  288. m_channel_map[t_inf2.channel()] = true;
  289. }
  290. }
  291. return Error_code::SUCCESS;
  292. }
  293. //启动通信, 开启线程, run thread
  294. Error_manager Rabbitmq_base::rabbitmq_run()
  295. {
  296. //启动 线程。
  297. //接受线程默认循环, 内部的 amqp_consume_message 进行等待, 超时1ms
  298. m_receive_analysis_condition.reset(false, true, false);
  299. mp_receive_analysis_thread = new std::thread(&Rabbitmq_base::receive_analysis_thread, this);
  300. //发送线程默认循环, 内部的wait_and_pop进行等待,
  301. m_send_condition.reset(false, true, false);
  302. mp_send_thread = new std::thread(&Rabbitmq_base::send_thread, this);
  303. //封装线程默认等待, ...., 超时1秒, 超时后主动 封装心跳和状态信息,
  304. m_encapsulate_status_condition.reset(false, false, false);
  305. mp_encapsulate_status_thread = new std::thread(&Rabbitmq_base::encapsulate_status_thread, this);
  306. m_rabbitmq_status = RABBITMQ_STATUS_READY;
  307. return Error_code::SUCCESS;
  308. }
  309. //反初始化 通信 模块。
  310. Error_manager Rabbitmq_base::rabbitmq_uninit()
  311. {
  312. LOG(INFO) << " ---Rabbitmq_base::rabbitmq_uninit() run--- "<< this;
  313. //终止list,防止 wait_and_pop 阻塞线程。
  314. m_send_list.termination_list();
  315. //杀死线程,强制退出
  316. if (mp_receive_analysis_thread)
  317. {
  318. m_receive_analysis_condition.kill_all();
  319. }
  320. if (mp_send_thread)
  321. {
  322. m_send_condition.kill_all();
  323. }
  324. if (mp_encapsulate_status_thread)
  325. {
  326. m_encapsulate_status_condition.kill_all();
  327. }
  328. //回收线程的资源
  329. if (mp_receive_analysis_thread)
  330. {
  331. mp_receive_analysis_thread->join();
  332. delete mp_receive_analysis_thread;
  333. mp_receive_analysis_thread = NULL;
  334. }
  335. if (mp_send_thread)
  336. {
  337. mp_send_thread->join();
  338. delete mp_send_thread;
  339. mp_send_thread = NULL;
  340. }
  341. if (mp_encapsulate_status_thread)
  342. {
  343. mp_encapsulate_status_thread->join();
  344. delete mp_encapsulate_status_thread;
  345. mp_encapsulate_status_thread = NULL;
  346. }
  347. //清空list
  348. m_send_list.clear_and_delete();
  349. if ( m_rabbitmq_status == RABBITMQ_STATUS_READY )
  350. {
  351. for (auto iter = m_channel_map.begin(); iter != m_channel_map.end(); ++iter)
  352. {
  353. amqp_channel_close(mp_connect, iter->first, AMQP_REPLY_SUCCESS);
  354. }
  355. amqp_connection_close(mp_connect, AMQP_REPLY_SUCCESS);
  356. amqp_destroy_connection(mp_connect);
  357. }
  358. m_rabbitmq_status = RABBITMQ_STATUS_UNKNOW;
  359. return Error_code::SUCCESS;
  360. }
  361. //重连, 快速uninit, init
  362. Error_manager Rabbitmq_base::rabbitmq_reconnnect()
  363. {
  364. //重连全程加锁,防止其他线程运行.
  365. std::unique_lock<std::mutex> lk(m_mutex);
  366. m_rabbitmq_status = RABBITMQ_STATUS_RECONNNECT;
  367. //断开连接
  368. for (auto iter = m_channel_map.begin(); iter != m_channel_map.end(); ++iter)
  369. {
  370. amqp_channel_close(mp_connect, iter->first, AMQP_REPLY_SUCCESS);
  371. }
  372. amqp_connection_close(mp_connect, AMQP_REPLY_SUCCESS);
  373. amqp_destroy_connection(mp_connect);
  374. //重新连接,线程不需要重启
  375. LOG(INFO) << " ---Rabbitmq_base::rabbitmq_reconnnect() run--- "<< this;
  376. int t_status=0; //状态
  377. amqp_rpc_reply_t t_reply; //reply答复结果
  378. Error_manager t_error;
  379. //amqp_new_connection 新建amqp的连接配置,里面只有连接状态参数
  380. // 返回amqp_connection_state_t_ *, 函数内部分配内存, amqp_destroy_connection()可以释放内存, 内存不为空则成功
  381. mp_connect = amqp_new_connection();
  382. if ( mp_connect == NULL )
  383. {
  384. return Error_manager(Error_code::RABBITMQ_AMQP_NEW_CONNECTION_ERROR, Error_level::MINOR_ERROR,
  385. "amqp_new_connection fun error ");
  386. }
  387. //amqp_tcp_socket_new 新建tcp_socket连接
  388. // 返回amqp_socket_t *, 函数内部分配内存, amqp_connection_close()可以释放内存, 内存不为空则成功
  389. mp_socket = amqp_tcp_socket_new(mp_connect);
  390. if ( mp_socket == NULL )
  391. {
  392. return Error_manager(Error_code::RABBITMQ_AMQP_TCP_SOCKET_NEW_ERROR, Error_level::MINOR_ERROR,
  393. "amqp_tcp_socket_new fun error ");
  394. }
  395. //载入外部参数
  396. if (m_rabbitmq_parameter_all.rabbitmq_parameters().has_ip() &&
  397. m_rabbitmq_parameter_all.rabbitmq_parameters().has_port() &&
  398. m_rabbitmq_parameter_all.rabbitmq_parameters().has_user() &&
  399. m_rabbitmq_parameter_all.rabbitmq_parameters().has_password() )
  400. {
  401. m_ip = m_rabbitmq_parameter_all.rabbitmq_parameters().ip();
  402. m_port = m_rabbitmq_parameter_all.rabbitmq_parameters().port();
  403. m_user = m_rabbitmq_parameter_all.rabbitmq_parameters().user();
  404. m_password = m_rabbitmq_parameter_all.rabbitmq_parameters().password();
  405. }
  406. else
  407. {
  408. return Error_manager(Error_code::RABBITMQ_PROTOBUF_LOSS_ERROR, Error_level::MINOR_ERROR,
  409. " rabbitmq_parameter_all.rabbitmq_parameters() The data is not complete ");
  410. }
  411. //amqp_socket_open 打开socket连接, 输入ip和port,
  412. // 成功返回AMQP_STATUS_OK = 0x0, 失败返回错误状态码, 详见 enum amqp_status_enum_
  413. //只需要设置配置服务器的ip和port, 不需要配置子节点客户端的ip和port, 在后面配置channel通道时,进行设置.
  414. t_status = amqp_socket_open(mp_socket, m_ip.c_str(), m_port);
  415. if ( t_status != AMQP_STATUS_OK )
  416. {
  417. return Error_manager(Error_code::RABBITMQ_AMQP_SOCKET_OPEN_ERROR, Error_level::MINOR_ERROR,
  418. amqp_error_to_string(t_status, "amqp_socket_open") );
  419. }
  420. //amqp_login() 登录代理服务器,
  421. //输入 连接参数结构体 amqp_connection_state_t,
  422. //输入 连接地址, 前面 amqp_socket_open() 已经输入了,这里默认写"/"
  423. //输入 连接通道最大值, 默认值0表示没有限制
  424. //输入 连接帧率最大值, 默认值是131072 (128KB)
  425. //输入 心跳帧之间的秒数, 默认值0禁用心跳
  426. //输入 身份验证模式, AMQP_SASL_METHOD_PLAIN, 追加用户名和密码
  427. // AMQP_SASL_METHOD_EXTERNAL, 追加身份证
  428. //返回 结果的结构体 amqp_rpc_reply_t
  429. // amqp_response_type_enum reply_type 登录成功是 AMQP_RESPONSE_NORMAL
  430. // 失败:如果是 reply_type == AMQP_RESPONSE_SERVER_EXCEPTION, 服务器连接错误, 错误信息在 amqp_method_t reply
  431. // 失败:如果是 reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION, 库函数错误, 错误信息在 int library_error
  432. t_reply = amqp_login(mp_connect, "/", 0, 131072, 0,
  433. AMQP_SASL_METHOD_PLAIN, m_user.c_str(), m_password.c_str());
  434. if ( t_reply.reply_type != AMQP_RESPONSE_NORMAL )
  435. {
  436. return Error_manager(Error_code::RABBITMQ_AMQP_LOGIN_ERROR, Error_level::MINOR_ERROR,
  437. amqp_error_to_string(t_reply, "amqp_login") );
  438. }
  439. //清除channel_map, 通道的缓存,防止重复开启, (channel允许重复使用, 但是不能重复初始化)
  440. m_channel_map.clear();
  441. //创建通道队列消费者, (交换机和永久队列不在代码里创建,请在服务器上手动创建)
  442. t_error = rabbitmq_new_channel_queue_consume(m_rabbitmq_parameter_all);
  443. if ( t_error != Error_code::SUCCESS )
  444. {
  445. return t_error;
  446. }
  447. //不用重启线程
  448. return Error_code::SUCCESS;
  449. }
  450. //设置 自动封装状态的时间周期
  451. void Rabbitmq_base::set_encapsulate_status_cycle_time(unsigned int encapsulate_status_cycle_time)
  452. {
  453. m_encapsulate_status_cycle_time = encapsulate_status_cycle_time;
  454. }
  455. //设置回调函数check_msg_callback
  456. void Rabbitmq_base::set_check_msg_callback(Error_manager (*callback)(Rabbitmq_message* p_msg))
  457. {
  458. check_msg_callback = callback;
  459. }
  460. //设置回调函数check_executer_callback
  461. void Rabbitmq_base::set_check_executer_callback(Error_manager (*callback)(Rabbitmq_message* p_msg))
  462. {
  463. check_executer_callback = callback;
  464. }
  465. //设置回调函数execute_msg_callback
  466. void Rabbitmq_base::set_execute_msg_callback(Error_manager (*callback)(Rabbitmq_message* p_msg))
  467. {
  468. execute_msg_callback = callback;
  469. }
  470. //设置回调函数encapsulate_status_callback
  471. void Rabbitmq_base::set_encapsulate_status_callback(Error_manager (*callback)())
  472. {
  473. encapsulate_status_callback = callback;
  474. }
  475. //mp_receive_analysis_thread 接受解析 执行函数,
  476. void Rabbitmq_base::receive_analysis_thread()
  477. {
  478. LOG(INFO) << " Rabbitmq_base::receive_analysis_thread start "<< this;
  479. //通信接受线程, 负责接受socket消息, 并存入 m_receive_data_list
  480. while (m_receive_analysis_condition.is_alive())
  481. {
  482. //这里就不需要超时等待了, rabbitmq的接受函数可以配置等待超时....
  483. // m_receive_analysis_condition.wait_for_ex(std::chrono::microseconds(1));
  484. m_receive_analysis_condition.wait();
  485. if ( m_receive_analysis_condition.is_alive() )
  486. {
  487. std::this_thread::sleep_for(std::chrono::microseconds(100));
  488. std::this_thread::yield();
  489. amqp_rpc_reply_t t_reply; //运行结果
  490. amqp_envelope_t t_envelope; //数据包, 含有一些包裹属性和数据内容
  491. //接受消息等待超时,默认1000us, 当收到消息后,立刻通过阻塞,否则等待超时后通过阻塞
  492. struct timeval t_timeout; //超时时间, 默认1ms
  493. t_timeout.tv_sec = 0;
  494. t_timeout.tv_usec = 1000;
  495. {//这个大括号表示只对 recv 和 send 加锁, 不要因为后面的复杂逻辑影响通信效率
  496. std::unique_lock<std::mutex> lk(m_mutex);
  497. //允许释放连接参数状态的内存,
  498. // 因为这个连接是底层分配的内存,是全局的. 为了开启多个连接,就要重复使用
  499. //这里释放之后,其他代码就开启多线程开启新的连接了.
  500. amqp_maybe_release_buffers(mp_connect);
  501. //amqp_consume_message 接受消息, 阻塞函数,可以设置超时.
  502. //输入 amqp_connection_state_t state 连接状态参数的结构体
  503. //输入 amqp_envelope_t *envelope 接受数据包的指针, 成功接收到数据后,数据包会覆盖
  504. //输入 const struct timeval *timeout 超时时间, 防止阻塞. 传入NULL就是完全阻塞.
  505. //输入 int flags 未使用, 默认0
  506. //输入 amqp_connection_state_t state 连接状态参数的结构体
  507. //返回 状态结果的结构体 amqp_rpc_reply_t
  508. // amqp_response_type_enum reply_type 成功是 AMQP_RESPONSE_NORMAL
  509. // 失败:如果是 reply_type == AMQP_RESPONSE_SERVER_EXCEPTION, 服务器连接错误, 错误信息在 amqp_method_t reply
  510. // 失败:如果是 reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION, 库函数错误, 错误信息在 int library_error
  511. t_reply = amqp_consume_message(mp_connect, &t_envelope, &t_timeout, 0);
  512. }
  513. if ( AMQP_RESPONSE_NORMAL == t_reply.reply_type )//正常接受到消息
  514. {
  515. m_rabbitmq_status = RABBITMQ_STATUS_READY;
  516. //从t_envelope数据包里面提取信息
  517. std::string t_receive_string = std::string((char*)t_envelope.message.body.bytes, t_envelope.message.body.len);
  518. int t_channel = t_envelope.channel;
  519. int t_delivery_tag = t_envelope.delivery_tag;
  520. std::string t_exchange_name = std::string((char*)t_envelope.exchange.bytes, t_envelope.exchange.len);
  521. std::string t_routing_key = std::string((char*)t_envelope.routing_key.bytes, t_envelope.routing_key.len);
  522. //如果这里接受到了消息, 在这提前解析消息最前面的Base_msg (消息公共内容), 用于后续的check
  523. message::Base_msg t_base_msg;
  524. // if( t_base_msg.ParseFromString(t_receive_string) )
  525. //删除 message::Base_msg 里面的 message::Base_info的机制,完全依赖服务器来分发消息
  526. if( true )
  527. {
  528. //第一次解析之后转化为, Communication_message, 自定义的通信消息格式
  529. Rabbitmq_message t_rabbitmq_message;
  530. t_rabbitmq_message.reset(t_base_msg.base_info(), t_receive_string, t_channel, t_delivery_tag, t_exchange_name, t_routing_key);
  531. //检查消息是否有效, 主要检查消息类型和接受者, 判断这条消息是不是给我的.
  532. if ( check_msg(&t_rabbitmq_message) == SUCCESS )
  533. {
  534. //这里直接就用当前线程进行处理,
  535. //检查消息是否可以被处理
  536. if ( check_executer(&t_rabbitmq_message) == SUCCESS )
  537. {
  538. //处理消息
  539. if ( execute_msg(&t_rabbitmq_message) == SUCCESS )
  540. {
  541. }
  542. //else不做处理
  543. }
  544. //else不做处理
  545. }
  546. //else不做处理
  547. }
  548. //else解析失败, 就当做什么也没发生, 认为接收消息无效,
  549. else
  550. {
  551. std::cout << " huli test :::: " << " t_receive_string = " << t_receive_string << std::endl;
  552. if ( t_channel == 401 )
  553. {
  554. amqp_basic_ack(mp_connect, t_channel, t_delivery_tag, 0);
  555. }
  556. }
  557. //amqp_destroy_envelope 销毁数据包, 只有接受成功, t_envelope才有内存
  558. amqp_destroy_envelope(&t_envelope);
  559. }
  560. else//没有接受到消息
  561. {
  562. //超时报错,不做处理, continue
  563. //注注注注注意了, 没有收到消息会超时报错, res.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION, res.library_error = -13, (-0x000D request timed out)
  564. if (t_reply.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION && t_reply.library_error == -13)
  565. {
  566. m_rabbitmq_status = RABBITMQ_STATUS_READY;
  567. continue;
  568. }
  569. else//其他报错,特殊处理
  570. {
  571. //need
  572. std::string error_description = amqp_error_to_string(t_reply, "amqp_consume_message");
  573. std::cout << " huli test 123123123:::: " << " error_description = " << error_description << std::endl;
  574. // return Error_manager(Error_code::RABBITMQ_AMQP_CONSUME_MESSAGE_ERROR, Error_level::MINOR_ERROR,
  575. // amqp_error_to_string(t_reply, "amqp_consume_message") );
  576. //重启
  577. rabbitmq_reconnnect();
  578. }
  579. }
  580. }
  581. }
  582. LOG(INFO) << " Rabbitmq_base::receive_analysis_thread end "<< this;
  583. return;
  584. }
  585. //检查消息是否有效, 主要检查消息类型和接受者, 判断这条消息是不是给我的., 需要子类重载
  586. Error_manager Rabbitmq_base::check_msg(Rabbitmq_message* p_msg)
  587. {
  588. if ( check_msg_callback != NULL )
  589. {
  590. return check_msg_callback(p_msg);
  591. }
  592. return Error_code::SUCCESS;
  593. }
  594. //检查执行者的状态, 判断能否处理这条消息, 需要子类重载
  595. Error_manager Rabbitmq_base::check_executer(Rabbitmq_message* p_msg)
  596. {
  597. if ( check_executer_callback != NULL )
  598. {
  599. return check_executer_callback(p_msg);
  600. }
  601. return Error_code::SUCCESS;
  602. }
  603. //处理消息, 需要子类重载
  604. Error_manager Rabbitmq_base::execute_msg(Rabbitmq_message* p_msg)
  605. {
  606. if ( execute_msg_callback != NULL )
  607. {
  608. return execute_msg_callback(p_msg);
  609. }
  610. else
  611. {
  612. //需要子类重载
  613. std::cout << " huli test :::: " << " execute_msg Rabbitmq_message = " << p_msg->get_message_buf() << std::endl;
  614. //如果是请求消息,那么在子节点继承的时候一定要记得调用
  615. //配置rabbitmq.proto时, 如果consume_no_ack == 0 , 一定要手动调用 amqp_basic_ack
  616. int consume_no_ack = 1;
  617. if(consume_no_ack == 0 || p_msg->m_channel == 401)
  618. {
  619. //amqp_basic_ack 确认消息, 通知服务器队列手动删除消息.
  620. //输入 amqp_connection_state_t state 连接状态参数的结构体
  621. //输入 amqp_channel_t channel 连接通道的编号
  622. //输入 uint64_t delivery_tag 消息传递编号,
  623. //输入 amqp_boolean_t multiple 多个标记位, 默认0, 1表示删除1~delivery_tag的所有消息, 不删除大于delivery_tag的, 0表示只删除这一条
  624. int ack_result = amqp_basic_ack(mp_connect, p_msg->m_channel, p_msg->m_delivery_tag, 0);
  625. }
  626. }
  627. return Error_code::SUCCESS;
  628. }
  629. //ack_msg 处理完消息后, 手动确认消息, 通知服务器队列删除消息.
  630. //执行者在execute_msg里面可以调用这个函数, 或者回调也行.
  631. Error_manager Rabbitmq_base::ack_msg(Rabbitmq_message* p_msg)
  632. {
  633. //amqp_basic_ack 确认消息, 通知服务器队列手动删除消息.
  634. //输入 amqp_connection_state_t state 连接状态参数的结构体
  635. //输入 amqp_channel_t channel 连接通道的编号
  636. //输入 uint64_t delivery_tag 消息传递编号,
  637. //输入 amqp_boolean_t multiple 多个标记位, 默认0, 1表示删除1~delivery_tag的所有消息, 不删除大于delivery_tag的, 0表示只删除这一条
  638. int ack_result = amqp_basic_ack(mp_connect, p_msg->m_channel, p_msg->m_delivery_tag, 0);
  639. if ( ack_result != 0 )
  640. {
  641. return Error_manager(Error_code::RABBITMQ_AMQP_BASIC_ACK_ERROR, Error_level::MINOR_ERROR,
  642. amqp_error_to_string(ack_result, "amqp_basic_ack") );
  643. }
  644. return Error_code::SUCCESS;
  645. }
  646. //mp_send_thread 发送线程执行函数,
  647. void Rabbitmq_base::send_thread()
  648. {
  649. LOG(INFO) << " Rabbitmq_base::send_thread start "<< this;
  650. //通信发送线程, 负责巡检m_send_list, 并发送消息
  651. while (m_send_condition.is_alive())
  652. {
  653. m_send_condition.wait();
  654. if ( m_send_condition.is_alive() )
  655. {
  656. std::this_thread::yield();
  657. Rabbitmq_message* tp_msg = NULL;
  658. int t_result = 0;
  659. //这里 wait_and_pop 会使用链表内部的 m_data_cond 条件变量来控制等待,
  660. //封装线程使用push的时候, 会唤醒线程并通过等待, 此时 m_send_data_condition 是一直通过的.
  661. //如果需要退出, 那么就要 m_send_data_list.termination_list(); 和 m_send_data_condition.kill_all();
  662. bool is_pop = m_send_list.wait_and_pop(tp_msg);
  663. if ( is_pop )
  664. {
  665. if ( tp_msg != NULL )
  666. {
  667. //amqp_basic_properties_t 消息数据的基本属性,里面有15个成员.
  668. amqp_basic_properties_t props;
  669. //判断是否要设置发送消息的超时时间, 如果配置10秒,超时后,服务器会自动删除消息
  670. if ( tp_msg->m_timeout_ms == std::chrono::milliseconds(0) )
  671. {
  672. //amqp_flags_t _flags 一个uint32_t, 按位 表示这15个属性的修改开关.
  673. //例如: _flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG = 0b 1001 0000 0000 0000;
  674. //就表示 content-type 和 delivery-mode 是有效属性. 接下来的设置就会生效.
  675. props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
  676. //amqp_bytes_t content_type 消息数据的类型 "text/plain"是 普通文本格式
  677. //注意了,需要使用 amqp_cstring_bytes() 将char*转为amqp_bytes_t(自定义的字符串, 类似于std::string)
  678. props.content_type = amqp_cstring_bytes("text/plain");
  679. //uint8_t delivery_mode 配送模式 2表示持续发送模式
  680. props.delivery_mode = AMQP_DELIVERY_PERSISTENT;
  681. }
  682. else
  683. {
  684. //amqp_flags_t _flags 一个uint32_t, 按位 表示这15个属性的修改开关.
  685. //例如: _flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG = 0b 1001 0000 0000 0000;
  686. //就表示 content-type 和 delivery-mode 是有效属性. 接下来的设置就会生效.
  687. props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG | AMQP_BASIC_EXPIRATION_FLAG;
  688. //amqp_bytes_t content_type 消息数据的类型 "text/plain"是 普通文本格式
  689. //注意了,需要使用 amqp_cstring_bytes() 将char*转为amqp_bytes_t(自定义的字符串, 类似于std::string)
  690. props.content_type = amqp_cstring_bytes("text/plain");
  691. //uint8_t delivery_mode 配送模式 2表示持续发送模式
  692. props.delivery_mode = AMQP_DELIVERY_PERSISTENT;
  693. char buf[256] = {0};
  694. sprintf(buf, "%d", (int)tp_msg->m_timeout_ms.count());
  695. props.expiration = amqp_cstring_bytes(buf);//超时, 单位ms;
  696. }
  697. {//这个大括号表示只对 recv 和 send 加锁, 不要因为后面的复杂逻辑影响通信效率
  698. std::unique_lock<std::mutex> lk(m_mutex);
  699. // std::cout << " huli test :::: " << " tp_msg->m_message_buf = " << tp_msg->m_message_buf << std::endl;
  700. //amqp_basic_publish() 发布消息给代理服务器, 在交换器上发布一个带有路由密钥的消息。交换机会根据路由密钥匹配,放到对应的队列里面
  701. //输入 amqp_connection_state_t state 连接状态参数的结构体
  702. //输入 amqp_channel_t channel 连接通道的编号
  703. //输入 amqp_bytes_t exchange 交换机模式字符串
  704. //输入 amqp_bytes_t routing_key 路由密钥字符串, 交换机的判断规则. 发送端的 routingkey 和 接收端的 bindingkey 需要保持一致
  705. //输入 amqp_boolean_t mandatory 强制服务器必须通过路由密钥才能存到队列, 默认为0
  706. //输入 amqp_boolean_t immediate 表示服务器必须立刻转发消息给接受者, 默认为0
  707. //输入 struct amqp_basic_properties_t_ const *properties 消息数据的基本属性
  708. //输入 amqp_bytes_t body 消息数据内容
  709. //返回错误码 成功返回AMQP_STATUS_OK = 0x0, 失败返回错误状态码, 详见 enum amqp_status_enum_
  710. //注注注注注意了::amqp_basic_publish()是异步通信,
  711. // return AMQP_STATUS_OK 也只是表示消息成功发送到服务器. 无法确认 接收端是否正常接受消息
  712. t_result = amqp_basic_publish(mp_connect, tp_msg->m_channel,
  713. amqp_cstring_bytes(tp_msg->m_exchange_name.c_str()),
  714. amqp_cstring_bytes(tp_msg->m_routing_key.c_str()), 0, 0,
  715. &props, amqp_cstring_bytes(tp_msg->m_message_buf.c_str()) );
  716. }
  717. if ( t_result == AMQP_STATUS_OK )
  718. {
  719. m_rabbitmq_status = RABBITMQ_STATUS_READY;
  720. delete(tp_msg);
  721. tp_msg = NULL;
  722. // std::string re = amqp_error_to_string(t_result, "amqp_basic_publish");
  723. // std::cout << " huli test :::: " << " re = " << re << std::endl;
  724. // return Error_manager(Error_code::RABBITMQ_AMQP_BASIC_PUBLISH_ERROR, Error_level::MINOR_ERROR,
  725. // amqp_error_to_string(t_result, "amqp_basic_publish") );
  726. }
  727. else
  728. {
  729. std::string re = amqp_error_to_string(t_result, "amqp_basic_publish");
  730. std::cout << " huli test :::: " << " re = " << re << std::endl;
  731. //重启
  732. m_rabbitmq_status = RABBITMQ_STATUS_RECONNNECT;
  733. m_send_list.push(tp_msg); //重新加入队列,下一次再发
  734. tp_msg = NULL;
  735. rabbitmq_reconnnect();
  736. }
  737. }
  738. }
  739. else
  740. {
  741. //没有取出, 那么应该就是 m_termination_flag 结束了
  742. // return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR,
  743. // " Communication_socket_base::send_data_thread() error ");
  744. }
  745. }
  746. }
  747. LOG(INFO) << " Rabbitmq_base::send_thread end "<< this;
  748. return;
  749. }
  750. //手动封装消息,需要手动写入参数channel,exchange_name,routing_key
  751. Error_manager Rabbitmq_base::encapsulate_msg(std::string message, int channel, std::string exchange_name, std::string routing_key, int timeout_ms=0)
  752. {
  753. if ( m_rabbitmq_status != RABBITMQ_STATUS_READY )
  754. {
  755. return Error_manager(Error_code::ERROR, Error_level::MINOR_ERROR,
  756. " m_rabbitmq_status error ");
  757. }
  758. Rabbitmq_message* tp_msg = new Rabbitmq_message(message, channel, exchange_name, routing_key, timeout_ms);
  759. bool is_push = m_send_list.push(tp_msg);
  760. if ( is_push == false )
  761. {
  762. delete(tp_msg);
  763. tp_msg = NULL;
  764. return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR,
  765. " Communication_socket_base::encapsulate_msg error ");
  766. }
  767. return Error_code::SUCCESS;
  768. }
  769. //手动封装消息,需要手动写入参数channel,exchange_name,routing_key
  770. Error_manager Rabbitmq_base::encapsulate_msg(Rabbitmq_message* p_msg)
  771. {
  772. Rabbitmq_message* tp_msg = new Rabbitmq_message(*p_msg);
  773. bool is_push = m_send_list.push(tp_msg);
  774. if ( is_push == false )
  775. {
  776. delete(tp_msg);
  777. tp_msg = NULL;
  778. return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR,
  779. " Communication_socket_base::encapsulate_msg error ");
  780. }
  781. return Error_code::SUCCESS;
  782. }
  783. //手动封装任务消息(请求和答复), 系统会使用rabbitmq.proto的配置参数,
  784. Error_manager Rabbitmq_base::encapsulate_task_msg(std::string message, int vector_index)
  785. {
  786. int channel = m_rabbitmq_parameter_all.rabbitmq_parameters().rabbitmq_sender_request_vector(vector_index).channel();
  787. std::string exchange_name = m_rabbitmq_parameter_all.rabbitmq_parameters().rabbitmq_sender_request_vector(vector_index).exchange_name();
  788. std::string routing_key = m_rabbitmq_parameter_all.rabbitmq_parameters().rabbitmq_sender_request_vector(vector_index).routing_key();
  789. int timeout_ms = m_rabbitmq_parameter_all.rabbitmq_parameters().rabbitmq_sender_request_vector(vector_index).timeout_ms();
  790. Rabbitmq_message* tp_msg = new Rabbitmq_message(message, channel, exchange_name, routing_key, timeout_ms);
  791. bool is_push = m_send_list.push(tp_msg);
  792. if ( is_push == false )
  793. {
  794. delete(tp_msg);
  795. tp_msg = NULL;
  796. return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR,
  797. " Communication_socket_base::encapsulate_msg error ");
  798. }
  799. return Error_code::SUCCESS;
  800. }
  801. //手动封装状态消息, 系统会使用rabbitmq.proto的配置参数,
  802. Error_manager Rabbitmq_base::encapsulate_status_msg(std::string message, int vector_index)
  803. {
  804. for(auto &iter : m_rabbitmq_parameter_all.rabbitmq_parameters().rabbitmq_sender_status_vector()) {
  805. if (iter.routing_key().find(std::to_string(vector_index)) != iter.routing_key().npos) {
  806. int channel = iter.channel();
  807. std::string exchange_name = iter.exchange_name();
  808. std::string routing_key = iter.routing_key();
  809. int timeout_ms = iter.timeout_ms();
  810. Rabbitmq_message* tp_msg = new Rabbitmq_message(message, channel, exchange_name, routing_key, timeout_ms);
  811. bool is_push = m_send_list.push(tp_msg);
  812. if (!is_push)
  813. {
  814. delete(tp_msg);
  815. tp_msg = nullptr;
  816. return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR,
  817. " Communication_socket_base::encapsulate_msg error ");
  818. }
  819. return Error_code::SUCCESS;
  820. }
  821. }
  822. return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR,
  823. " Communication_socket_base::encapsulate_msg error ");
  824. }
  825. //mp_encapsulate_stauts_thread 自动封装线程执行函数,
  826. void Rabbitmq_base::encapsulate_status_thread()
  827. {
  828. LOG(INFO) << " Rabbitmq_base::encapsulate_status_thread start "<< this;
  829. //通信封装线程, 负责定时封装消息, 并存入 m_send_data_list
  830. while (m_encapsulate_status_condition.is_alive())
  831. {
  832. bool t_pass_flag = m_encapsulate_status_condition.wait_for_millisecond(m_encapsulate_status_cycle_time);
  833. if ( m_encapsulate_status_condition.is_alive() )
  834. {
  835. std::this_thread::yield();
  836. //如果封装线程被主动唤醒, 那么就表示 需要主动发送消息,
  837. if ( t_pass_flag )
  838. {
  839. //主动发送消息,
  840. }
  841. //如果封装线程超时通过, 那么就定时封装心跳和状态信息
  842. else
  843. {
  844. //只有通信正常的时候,才封装发送状态消息
  845. if ( m_rabbitmq_status == RABBITMQ_STATUS_READY )
  846. {
  847. auto_encapsulate_status();
  848. }
  849. }
  850. }
  851. }
  852. LOG(INFO) << " Rabbitmq_base::encapsulate_status_thread end "<< this;
  853. return;
  854. }
  855. //定时封装发送消息, 一般为心跳和状态信息, 需要子类重载
  856. Error_manager Rabbitmq_base::auto_encapsulate_status()
  857. {
  858. if ( encapsulate_status_callback != NULL )
  859. {
  860. return encapsulate_status_callback();
  861. }
  862. return Error_code::SUCCESS;
  863. }
  864. //把rabbitmq的错误信息转化为string, amqp_status就是enum amqp_status_enum_, amqp_error_string2()函数可以把他翻译为string
  865. std::string Rabbitmq_base::amqp_error_to_string(int amqp_status)
  866. {
  867. char buf[256] = {0};
  868. sprintf(buf, "amqp_status = 0x%x, %s", amqp_status, amqp_error_string2(amqp_status));
  869. return buf;
  870. }
  871. //把rabbitmq的错误信息转化为string, amqp_status就是enum amqp_status_enum_, amqp_error_string2()函数可以把他翻译为string
  872. std::string Rabbitmq_base::amqp_error_to_string(int amqp_status, std::string amqp_fun_name)
  873. {
  874. char buf[256] = {0};
  875. sprintf(buf, "amqp_fun_name = %s, amqp_status = 0x%x, %s", amqp_fun_name.c_str(), amqp_status, amqp_error_string2(amqp_status));
  876. return buf;
  877. }
  878. //把rabbitmq的错误信息转化为string, amqp_rpc_reply_t就是amqp函数运行的结果
  879. std::string Rabbitmq_base::amqp_error_to_string(amqp_rpc_reply_t amqp_rpc_reply)
  880. {
  881. char buf[256] = {0};
  882. // amqp_response_type_enum reply_type 登录成功是 AMQP_RESPONSE_NORMAL
  883. // 失败:如果是 reply_type == AMQP_RESPONSE_SERVER_EXCEPTION, 服务器连接错误, 错误信息在 amqp_method_t reply
  884. // 失败:如果是 reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION, 库函数错误, 错误信息在 int library_error
  885. switch ( amqp_rpc_reply.reply_type )
  886. {
  887. case AMQP_RESPONSE_NORMAL:
  888. {
  889. sprintf(buf, "SUCCESS");
  890. break;
  891. }
  892. case AMQP_RESPONSE_NONE:
  893. {
  894. sprintf(buf, " reply_type = AMQP_RESPONSE_NONE " );
  895. break;
  896. }
  897. case AMQP_RESPONSE_LIBRARY_EXCEPTION:
  898. {
  899. sprintf(buf, " reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION, library_error = %s, ",amqp_error_string2(amqp_rpc_reply.library_error) );
  900. break;
  901. }
  902. case AMQP_RESPONSE_SERVER_EXCEPTION:
  903. {
  904. if ( amqp_rpc_reply.reply.id == AMQP_CONNECTION_CLOSE_METHOD )
  905. {
  906. amqp_connection_close_t * p_decoded = (amqp_connection_close_t *)amqp_rpc_reply.reply.decoded;
  907. sprintf(buf, " reply.id = AMQP_CONNECTION_CLOSE_METHOD, reply = %u, %.*s ",
  908. p_decoded->reply_code, (int)p_decoded->reply_text.len, (char *)p_decoded->reply_text.bytes);
  909. }
  910. else if ( amqp_rpc_reply.reply.id == AMQP_CHANNEL_CLOSE_METHOD )
  911. {
  912. amqp_channel_close_t * p_decoded = (amqp_channel_close_t *)amqp_rpc_reply.reply.decoded;
  913. sprintf(buf, " reply.id = AMQP_CHANNEL_CLOSE_METHOD, reply = %u, %.*s ",
  914. p_decoded->reply_code, (int)p_decoded->reply_text.len, (char *)p_decoded->reply_text.bytes);
  915. }
  916. else
  917. {
  918. sprintf(buf, " reply_type = AMQP_RESPONSE_SERVER_EXCEPTION " );
  919. }
  920. break;
  921. }
  922. default:
  923. {
  924. sprintf(buf, " reply_type = unknown, reply.id = 0x%08X, ",
  925. amqp_rpc_reply.reply.id );
  926. break;
  927. }
  928. }
  929. return buf;
  930. }
  931. //把rabbitmq的错误信息转化为string, amqp_rpc_reply_t就是amqp函数运行的结果
  932. std::string Rabbitmq_base::amqp_error_to_string(amqp_rpc_reply_t amqp_rpc_reply, std::string amqp_fun_name)
  933. {
  934. char buf[256] = {0};
  935. // amqp_response_type_enum reply_type 登录成功是 AMQP_RESPONSE_NORMAL
  936. // 失败:如果是 reply_type == AMQP_RESPONSE_SERVER_EXCEPTION, 服务器连接错误, 错误信息在 amqp_method_t reply
  937. // 失败:如果是 reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION, 库函数错误, 错误信息在 int library_error
  938. switch ( amqp_rpc_reply.reply_type )
  939. {
  940. case AMQP_RESPONSE_NORMAL:
  941. {
  942. sprintf(buf, "SUCCESS");
  943. break;
  944. }
  945. case AMQP_RESPONSE_NONE:
  946. {
  947. sprintf(buf, "amqp_fun_name = %s, reply_type = AMQP_RESPONSE_NONE ", amqp_fun_name.c_str() );
  948. break;
  949. }
  950. case AMQP_RESPONSE_LIBRARY_EXCEPTION:
  951. {
  952. // huli test 123123123:::: error_description = amqp_fun_name = amqp_consume_message, reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION, library_error = unexpected protocol state,
  953. sprintf(buf, "amqp_fun_name = %s, reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION, library_error = %s, ", amqp_fun_name.c_str(),amqp_error_string2(amqp_rpc_reply.library_error) );
  954. break;
  955. }
  956. case AMQP_RESPONSE_SERVER_EXCEPTION:
  957. {
  958. if ( amqp_rpc_reply.reply.id == AMQP_CONNECTION_CLOSE_METHOD )
  959. {
  960. amqp_connection_close_t * p_decoded = (amqp_connection_close_t *)amqp_rpc_reply.reply.decoded;
  961. sprintf(buf, "amqp_fun_name = %s, reply.id = AMQP_CONNECTION_CLOSE_METHOD, reply = %u, %.*s ",
  962. amqp_fun_name.c_str(), p_decoded->reply_code, (int)p_decoded->reply_text.len, (char *)p_decoded->reply_text.bytes);
  963. }
  964. else if ( amqp_rpc_reply.reply.id == AMQP_CHANNEL_CLOSE_METHOD )
  965. {
  966. amqp_channel_close_t * p_decoded = (amqp_channel_close_t *)amqp_rpc_reply.reply.decoded;
  967. sprintf(buf, "amqp_fun_name = %s, reply.id = AMQP_CHANNEL_CLOSE_METHOD, reply = %u, %.*s ",
  968. amqp_fun_name.c_str(), p_decoded->reply_code, (int)p_decoded->reply_text.len, (char *)p_decoded->reply_text.bytes);
  969. }
  970. else
  971. {
  972. sprintf(buf, "amqp_fun_name = %s, reply_type = AMQP_RESPONSE_SERVER_EXCEPTION ", amqp_fun_name.c_str() );
  973. }
  974. break;
  975. }
  976. default:
  977. {
  978. sprintf(buf, "amqp_fun_name = %s, reply_type = unknown, reply.id = 0x%08X, ",
  979. amqp_fun_name.c_str(),amqp_rpc_reply.reply.id );
  980. break;
  981. }
  982. }
  983. return buf;
  984. }