#include "rabbitmq_base.h" Rabbitmq_base::Rabbitmq_base() { m_rabbitmq_status = RABBITMQ_STATUS_UNKNOW; mp_connect = NULL; mp_socket = NULL; m_port = 0; mp_receive_analysis_thread = NULL; mp_send_thread = NULL; mp_encapsulate_status_thread = NULL; m_encapsulate_status_cycle_time = 100;//默认1000ms,就自动封装一次状态信息 check_msg_callback = NULL; check_executer_callback = NULL; execute_msg_callback = NULL; encapsulate_status_callback = NULL; } Rabbitmq_base::~Rabbitmq_base() { rabbitmq_uninit(); } //初始化 通信 模块。如下三选一 Error_manager Rabbitmq_base::rabbitmq_init() { return rabbitmq_init_from_protobuf(RABBITMQ_PARAMETER_PATH); } //初始化 通信 模块。从文件读取 Error_manager Rabbitmq_base::rabbitmq_init_from_protobuf(std::string prototxt_path) { Rabbitmq_proto::Rabbitmq_parameter_all t_rabbitmq_parameter_all; if (loadProtobufFile(prototxt_path, t_rabbitmq_parameter_all) != SUCCESS) { return Error_manager(RABBITMQ_READ_PROTOBUF_ERROR, MINOR_ERROR, "rabbitmq_init_from_file: %s read_proto_param failed", prototxt_path.c_str()); } return rabbitmq_init_from_protobuf(t_rabbitmq_parameter_all); } //初始化 通信 模块。从protobuf读取 Error_manager Rabbitmq_base::rabbitmq_init_from_protobuf(Rabbitmq_proto::Rabbitmq_parameter_all &rabbitmq_parameter_all) { LOG(INFO) << " ---Rabbitmq_base::rabbitmq_init_from_protobuf() run--- " << this; int t_status = 0; //状态 amqp_rpc_reply_t t_reply; //reply答复结果 Error_manager t_error; m_rabbitmq_parameter_all = rabbitmq_parameter_all; for (auto &queue:rabbitmq_parameter_all.rabbitmq_parameters().rabbitmq_reciever_vector()) { mp_rabbitmq_reciever.insert(std::pair(queue.routing_key(), queue)); } for (auto &queue:rabbitmq_parameter_all.rabbitmq_parameters().rabbitmq_sender_request_vector()) { mp_rabbitmq_reciever.insert(std::pair(queue.routing_key(), queue)); } for (auto &queue:rabbitmq_parameter_all.rabbitmq_parameters().rabbitmq_sender_status_vector()) { mp_rabbitmq_reciever.insert(std::pair(queue.routing_key(), queue)); } //amqp_new_connection 新建amqp的连接配置,里面只有连接状态参数 // 返回amqp_connection_state_t_ *, 函数内部分配内存, amqp_destroy_connection()可以释放内存, 内存不为空则成功 mp_connect = amqp_new_connection(); if (mp_connect == nullptr) { return Error_manager(Error_code::RABBITMQ_AMQP_NEW_CONNECTION_ERROR, Error_level::MINOR_ERROR, "amqp_new_connection fun error "); } //amqp_tcp_socket_new 新建tcp_socket连接 // 返回amqp_socket_t *, 函数内部分配内存, amqp_connection_close()可以释放内存, 内存不为空则成功 mp_socket = amqp_tcp_socket_new(mp_connect); if (mp_socket == nullptr) { return Error_manager(Error_code::RABBITMQ_AMQP_TCP_SOCKET_NEW_ERROR, Error_level::MINOR_ERROR, "amqp_tcp_socket_new fun error "); } //载入外部参数 if (rabbitmq_parameter_all.rabbitmq_parameters().has_ip() && rabbitmq_parameter_all.rabbitmq_parameters().has_port() && rabbitmq_parameter_all.rabbitmq_parameters().has_user() && rabbitmq_parameter_all.rabbitmq_parameters().has_password()) { m_ip = rabbitmq_parameter_all.rabbitmq_parameters().ip(); m_port = rabbitmq_parameter_all.rabbitmq_parameters().port(); m_user = rabbitmq_parameter_all.rabbitmq_parameters().user(); m_password = rabbitmq_parameter_all.rabbitmq_parameters().password(); } else { return Error_manager(Error_code::RABBITMQ_PROTOBUF_LOSS_ERROR, Error_level::MINOR_ERROR, " rabbitmq_parameter_all.rabbitmq_parameters() The data is not complete "); } //amqp_socket_open 打开socket连接, 输入ip和port, // 成功返回AMQP_STATUS_OK = 0x0, 失败返回错误状态码, 详见 enum amqp_status_enum_ //只需要设置配置服务器的ip和port, 不需要配置子节点客户端的ip和port, 在后面配置channel通道时,进行设置. t_status = amqp_socket_open(mp_socket, m_ip.c_str(), m_port); if (t_status != AMQP_STATUS_OK) { return Error_manager(Error_code::RABBITMQ_AMQP_SOCKET_OPEN_ERROR, Error_level::MINOR_ERROR, amqp_error_to_string(t_status, "amqp_socket_open")); } //amqp_login() 登录代理服务器, //输入 连接参数结构体 amqp_connection_state_t, //输入 连接地址, 前面 amqp_socket_open() 已经输入了,这里默认写"/" //输入 连接通道最大值, 默认值0表示没有限制 //输入 连接帧率最大值, 默认值是131072 (128KB) //输入 心跳帧之间的秒数, 默认值0禁用心跳 //输入 身份验证模式, AMQP_SASL_METHOD_PLAIN, 追加用户名和密码 // AMQP_SASL_METHOD_EXTERNAL, 追加身份证 //返回 结果的结构体 amqp_rpc_reply_t // amqp_response_type_enum reply_type 登录成功是 AMQP_RESPONSE_NORMAL // 失败:如果是 reply_type == AMQP_RESPONSE_SERVER_EXCEPTION, 服务器连接错误, 错误信息在 amqp_method_t reply // 失败:如果是 reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION, 库函数错误, 错误信息在 int library_error t_reply = amqp_login(mp_connect, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, m_user.c_str(), m_password.c_str()); if (t_reply.reply_type != AMQP_RESPONSE_NORMAL) { return Error_manager(Error_code::RABBITMQ_AMQP_LOGIN_ERROR, Error_level::MINOR_ERROR, amqp_error_to_string(t_reply, "amqp_login")); } //清除channel_map, 通道的缓存,防止重复开启, (channel允许重复使用, 但是不能重复初始化) m_channel_map.clear(); //创建通道队列消费者, (交换机和永久队列不在代码里创建,请在服务器上手动创建) t_error = rabbitmq_new_channel_queue_consume(rabbitmq_parameter_all); if (t_error != Error_code::SUCCESS) { return t_error; } //启动通信, 开启线程, run thread t_error = rabbitmq_run(); if (t_error != Error_code::SUCCESS) { return t_error; } return Error_code::SUCCESS; } //创建通道队列消费者, (交换机和永久队列不在代码里创建,请在服务器上手动创建) Error_manager Rabbitmq_base::rabbitmq_new_channel_queue_consume(Rabbitmq_proto::Rabbitmq_parameter_all &rabbitmq_parameter_all) { int t_status = 0; //状态 amqp_rpc_reply_t t_reply; //reply答复结果 Error_manager t_error; ///Rabbitmq 接受的通道,队列和消费者, 多个 for (int i = 0; i < rabbitmq_parameter_all.rabbitmq_parameters().rabbitmq_reciever_vector_size(); ++i) { //Rabbitmq 配置的通道,队列和消费者, Rabbitmq_proto::Rabbitmq_channel_queue_consume t_inf = rabbitmq_parameter_all.rabbitmq_parameters().rabbitmq_reciever_vector(i); //通道查重,防止重复开启(channel允许重复使用, 但是不能重复初始化) if (m_channel_map.find(t_inf.channel()) == m_channel_map.end()) { //amqp_channel_open() 打开连接通道, 同一台电脑可以多个进程和线程进行连接服务器, 每个连接需要自己独特的通道. amqp_channel_open(mp_connect, t_inf.channel()); //amqp_get_rpc_reply() 获取当前网络连接的状态结果. t_reply = amqp_get_rpc_reply(mp_connect); if (t_reply.reply_type != AMQP_RESPONSE_NORMAL) { return Error_manager(Error_code::RABBITMQ_AMQP_CHANNEL_OPEN_ERROR, Error_level::MINOR_ERROR, amqp_error_to_string(t_reply, "amqp_channel_open")); } if (t_inf.consume_no_ack() == 0) { //amqp_basic_qos设置通道每次只能接受一条消息, 直到该消息被ack,才能接受下一条.状态消息可以继续接受 //uint16_t prefetch_count 同时接受消息的个数, 这里固定写1, //配合 amqp_basic_qos 和 amqp_basic_ack , 来阻塞这个通道的接受消息 //注:请求消息no_ack==0, 当接受一条指令后,该通道被阻塞,其他通道仍然正常接受, 等到任务被执行完,手动调用amqp_basic_ack函数, 则可以继续接受请求消息. //注:状态消息no_ack==1, 当接受一条指令后,该状态消息立刻被删除,然后可以继续接受下一条状态消息. amqp_basic_qos(mp_connect, t_inf.channel(), 0, PREFETCH_COUNT, 0); } m_channel_map[t_inf.channel()] = true; } //临时队列需要代码创建, 永久队列需要在服务器上提前手动创建 if (t_inf.queue_durable() == 0) { //目前只填充超时时间, x-message-ttl 队列接受消息 的超时时间 (单位毫秒) if (t_inf.queue_meassage_ttl() != 0) { amqp_table_t t_arguments; //队列的扩展属性 num_entries 是map长度, amqp_table_entry_t_ 是map指针 //目前只填充超时时间, x-message-ttl 队列接受消息 的超时时间 (单位毫秒) t_arguments.num_entries = 1; amqp_table_entry_t_ t_map_arg; t_map_arg.key = amqp_cstring_bytes("x-message-ttl"); //需要配置的参数 t_map_arg.value.kind = AMQP_FIELD_KIND_U16; //需要配置的数据类型, 如果是字符串, 写 AMQP_FIELD_KIND_UTF8 t_map_arg.value.value.u16 = t_inf.queue_meassage_ttl(); //需要配置的数值 t_arguments.entries = &t_map_arg; //amqp_queue_declare() 队列声明, 就是创建新的队列. //输入 amqp_connection_state_t state 连接状态参数的结构体 //输入 amqp_channel_t channel 连接通道的编号 //输入 amqp_bytes_t queue 队列名称,可以手动命名,如果写空,系统就会自动分配, 手动写amqp_cstring_bytes("abcdefg"), 默认空 amqp_empty_bytes //输入 amqp_boolean_t passive 是否被动,默认0 //输入 amqp_boolean_t durable 是否持久,默认0, 节点代码可以创建临时队列(所有权归节点), 服务器手动创建永久队列(所有权归服务器) // 1表示永久队列,当节点死掉,队列在服务器保留,仍然可以接受数据,节点上线后,可以接受掉线期间的所有数据 // 0表示临时队列,当节点死掉,队列消失,不再接受数据,直到下次恢复正常 //输入 amqp_boolean_t exclusive 是否独立,默认0 //输入 amqp_boolean_t auto_delete 是否自动删除,默认0, 1表示消息被消费者接受后,就自动删除消息, 当接收端断连后,队列也会才删除, // 一般情况下设为0,然后让接受者手动删除. //输入 amqp_table_t arguments 预留参数,默认空 amqp_empty_table //返回 amqp_queue_declare_ok_t * 返回结果 amqp_queue_declare(mp_connect, t_inf.channel(), amqp_cstring_bytes(t_inf.queue_name().c_str()), t_inf.queue_passive(), t_inf.queue_durable(), t_inf.queue_exclusive(), t_inf.queue_auto_delete(), t_arguments); } else { //amqp_queue_declare() 队列声明, 就是创建新的队列. //输入 amqp_connection_state_t state 连接状态参数的结构体 //输入 amqp_channel_t channel 连接通道的编号 //输入 amqp_bytes_t queue 队列名称,可以手动命名,如果写空,系统就会自动分配, 手动写amqp_cstring_bytes("abcdefg"), 默认空 amqp_empty_bytes //输入 amqp_boolean_t passive 是否被动,默认0 //输入 amqp_boolean_t durable 是否持久,默认0, 节点代码可以创建临时队列(所有权归节点), 服务器手动创建永久队列(所有权归服务器) // 1表示永久队列,当节点死掉,队列在服务器保留,仍然可以接受数据,节点上线后,可以接受掉线期间的所有数据 // 0表示临时队列,当节点死掉,队列消失,不再接受数据,直到下次恢复正常 //输入 amqp_boolean_t exclusive 是否独立,默认0 //输入 amqp_boolean_t auto_delete 是否自动删除,默认0, 1表示消息被消费者接受后,就自动删除消息, 当接收端断连后,队列也会才删除, // 一般情况下设为0,然后让接受者手动删除. //输入 amqp_table_t arguments 预留参数,默认空 amqp_empty_table //返回 amqp_queue_declare_ok_t * 返回结果 amqp_queue_declare(mp_connect, t_inf.channel(), amqp_cstring_bytes(t_inf.queue_name().c_str()), t_inf.queue_passive(), t_inf.queue_durable(), t_inf.queue_exclusive(), t_inf.queue_auto_delete(), amqp_empty_table); } //amqp_queue_bind 队列绑定, 将队列加载到服务器的交换机下面, 交换机收到消息后,就会检查key,然后放到指定的队列. //输入 amqp_connection_state_t state 连接状态参数的结构体 //输入 amqp_channel_t channel 连接通道的编号 //输入 amqp_bytes_t queue 队列名称, //输入 amqp_bytes_t exchange 交换机模式字符串 //输入 amqp_bytes_t bindingkey 绑定密钥字符串, 交换机的判断规则. 发送端的 routingkey 和 接收端的 bindingkey 需要保持一致 //输入 amqp_table_t arguments 预留参数,默认空 amqp_empty_table //返回 amqp_queue_bind_ok_t * 返回结果 //注注注注注意了, 队列绑定交换机时,必须保证交换机是有效的.否则报错 amqp_queue_bind(mp_connect, t_inf.channel(), amqp_cstring_bytes(t_inf.queue_name().c_str()), amqp_cstring_bytes(t_inf.exchange_name().c_str()), amqp_cstring_bytes(t_inf.binding_key().c_str()), amqp_empty_table); amqp_rpc_reply_t t_reply = amqp_get_rpc_reply(mp_connect); if (t_reply.reply_type != AMQP_RESPONSE_NORMAL) { return Error_manager(Error_code::RABBITMQ_AMQP_QUEUE_BIND_ERROR, Error_level::MINOR_ERROR, amqp_error_to_string(t_reply, "amqp_queue_bind")); } } //amqp_basic_consume 创建基本类型的消费者,就是接收端, 消费者绑定队列,只能接受一个队列里面的消息 //输入 amqp_connection_state_t state 连接状态参数的结构体 //输入 amqp_channel_t channel 连接通道的编号 //输入 amqp_bytes_t queue 队列名称, //输入 amqp_bytes_t consumer_tag 消费者名称 //输入 amqp_boolean_t no_local 是否非本地, 默认0,表示本地 //输入 amqp_boolean_t no_ack, 是否确认应答,默认0,表示接收后需要应答 //输入 amqp_boolean_t exclusive 是否独立,默认0 //输入 amqp_table_t arguments 预留参数,默认空 amqp_empty_table //返回 amqp_basic_consume_ok_t * 返回结果 //注注注注注意了, 接受端绑定队列时,必须保证队列是有效的,否则报错, amqp_basic_consume(mp_connect, t_inf.channel(), amqp_cstring_bytes(t_inf.queue_name().c_str()), amqp_cstring_bytes(t_inf.consume_name().c_str()), t_inf.consume_no_local(), t_inf.consume_no_ack(), t_inf.consume_exclusive(), amqp_empty_table); amqp_rpc_reply_t t_reply = amqp_get_rpc_reply(mp_connect); if (t_reply.reply_type != AMQP_RESPONSE_NORMAL) { return Error_manager(Error_code::RABBITMQ_AMQP_NEW_CONSUME_ERROR, Error_level::MINOR_ERROR, amqp_error_to_string(t_reply, "amqp_basic_consume")); } } //Rabbitmq 发送请求的通道 for (int i = 0; i < rabbitmq_parameter_all.rabbitmq_parameters().rabbitmq_sender_request_vector_size(); ++i) { //Rabbitmq 配置发送通道 Rabbitmq_proto::Rabbitmq_channel_queue_consume t_inf1 = rabbitmq_parameter_all.rabbitmq_parameters().rabbitmq_sender_request_vector(i); //通道查重,防止重复开启(channel允许重复使用, 但是不能重复初始化) if (m_channel_map.find(t_inf1.channel()) == m_channel_map.end()) { //amqp_channel_open() 打开连接通道, 同一台电脑可以多个进程和线程进行连接服务器, 每个连接需要自己独特的通道. amqp_channel_open(mp_connect, t_inf1.channel()); //amqp_get_rpc_reply() 获取当前网络连接的状态结果. t_reply = amqp_get_rpc_reply(mp_connect); if (t_reply.reply_type != AMQP_RESPONSE_NORMAL) { return Error_manager(Error_code::RABBITMQ_AMQP_CHANNEL_OPEN_ERROR, Error_level::MINOR_ERROR, amqp_error_to_string(t_reply, "amqp_channel_open")); } m_channel_map[t_inf1.channel()] = true; } } //Rabbitmq 发送状态的通道 for (int i = 0; i < rabbitmq_parameter_all.rabbitmq_parameters().rabbitmq_sender_status_vector_size(); ++i) { //Rabbitmq 配置发送通道 Rabbitmq_proto::Rabbitmq_channel_queue_consume t_inf2 = rabbitmq_parameter_all.rabbitmq_parameters().rabbitmq_sender_status_vector(i); //通道查重,防止重复开启(channel允许重复使用, 但是不能重复初始化) if (m_channel_map.find(t_inf2.channel()) == m_channel_map.end()) { //amqp_channel_open() 打开连接通道, 同一台电脑可以多个进程和线程进行连接服务器, 每个连接需要自己独特的通道. amqp_channel_open(mp_connect, t_inf2.channel()); //amqp_get_rpc_reply() 获取当前网络连接的状态结果. t_reply = amqp_get_rpc_reply(mp_connect); if (t_reply.reply_type != AMQP_RESPONSE_NORMAL) { return Error_manager(Error_code::RABBITMQ_AMQP_CHANNEL_OPEN_ERROR, Error_level::MINOR_ERROR, amqp_error_to_string(t_reply, "amqp_channel_open")); } m_channel_map[t_inf2.channel()] = true; } } return Error_code::SUCCESS; } //启动通信, 开启线程, run thread Error_manager Rabbitmq_base::rabbitmq_run() { //启动 线程。 //接受线程默认循环, 内部的 amqp_consume_message 进行等待, 超时1ms m_receive_analysis_condition.reset(false, true, false); mp_receive_analysis_thread = new std::thread(&Rabbitmq_base::receive_analysis_thread, this); //发送线程默认循环, 内部的wait_and_pop进行等待, m_send_condition.reset(false, true, false); mp_send_thread = new std::thread(&Rabbitmq_base::send_thread, this); //封装线程默认等待, ...., 超时1秒, 超时后主动 封装心跳和状态信息, m_encapsulate_status_condition.reset(false, false, false); mp_encapsulate_status_thread = new std::thread(&Rabbitmq_base::encapsulate_status_thread, this); m_rabbitmq_status = RABBITMQ_STATUS_READY; return Error_code::SUCCESS; } //反初始化 通信 模块。 Error_manager Rabbitmq_base::rabbitmq_uninit() { LOG(INFO) << " ---Rabbitmq_base::rabbitmq_uninit() run--- " << this; //终止list,防止 wait_and_pop 阻塞线程。 m_send_list.termination_list(); //杀死线程,强制退出 if (mp_receive_analysis_thread) { m_receive_analysis_condition.kill_all(); } if (mp_send_thread) { m_send_condition.kill_all(); } if (mp_encapsulate_status_thread) { m_encapsulate_status_condition.kill_all(); } //回收线程的资源 if (mp_receive_analysis_thread) { mp_receive_analysis_thread->join(); delete mp_receive_analysis_thread; mp_receive_analysis_thread = NULL; } if (mp_send_thread) { mp_send_thread->join(); delete mp_send_thread; mp_send_thread = NULL; } if (mp_encapsulate_status_thread) { mp_encapsulate_status_thread->join(); delete mp_encapsulate_status_thread; mp_encapsulate_status_thread = NULL; } //清空list m_send_list.clear_and_delete(); if (m_rabbitmq_status == RABBITMQ_STATUS_READY) { for (auto iter = m_channel_map.begin(); iter != m_channel_map.end(); ++iter) { amqp_channel_close(mp_connect, iter->first, AMQP_REPLY_SUCCESS); } amqp_connection_close(mp_connect, AMQP_REPLY_SUCCESS); amqp_destroy_connection(mp_connect); } m_rabbitmq_status = RABBITMQ_STATUS_UNKNOW; return Error_code::SUCCESS; } //重连, 快速uninit, init Error_manager Rabbitmq_base::rabbitmq_reconnnect() { //重连全程加锁,防止其他线程运行. std::unique_lock lk(m_mutex); m_rabbitmq_status = RABBITMQ_STATUS_RECONNNECT; //断开连接 for (auto iter = m_channel_map.begin(); iter != m_channel_map.end(); ++iter) { amqp_channel_close(mp_connect, iter->first, AMQP_REPLY_SUCCESS); } amqp_connection_close(mp_connect, AMQP_REPLY_SUCCESS); amqp_destroy_connection(mp_connect); //重新连接,线程不需要重启 LOG(INFO) << " ---Rabbitmq_base::rabbitmq_reconnnect() run--- " << this; int t_status = 0; //状态 amqp_rpc_reply_t t_reply; //reply答复结果 Error_manager t_error; //amqp_new_connection 新建amqp的连接配置,里面只有连接状态参数 // 返回amqp_connection_state_t_ *, 函数内部分配内存, amqp_destroy_connection()可以释放内存, 内存不为空则成功 mp_connect = amqp_new_connection(); if (mp_connect == NULL) { return Error_manager(Error_code::RABBITMQ_AMQP_NEW_CONNECTION_ERROR, Error_level::MINOR_ERROR, "amqp_new_connection fun error "); } //amqp_tcp_socket_new 新建tcp_socket连接 // 返回amqp_socket_t *, 函数内部分配内存, amqp_connection_close()可以释放内存, 内存不为空则成功 mp_socket = amqp_tcp_socket_new(mp_connect); if (mp_socket == NULL) { return Error_manager(Error_code::RABBITMQ_AMQP_TCP_SOCKET_NEW_ERROR, Error_level::MINOR_ERROR, "amqp_tcp_socket_new fun error "); } //载入外部参数 if (m_rabbitmq_parameter_all.rabbitmq_parameters().has_ip() && m_rabbitmq_parameter_all.rabbitmq_parameters().has_port() && m_rabbitmq_parameter_all.rabbitmq_parameters().has_user() && m_rabbitmq_parameter_all.rabbitmq_parameters().has_password()) { m_ip = m_rabbitmq_parameter_all.rabbitmq_parameters().ip(); m_port = m_rabbitmq_parameter_all.rabbitmq_parameters().port(); m_user = m_rabbitmq_parameter_all.rabbitmq_parameters().user(); m_password = m_rabbitmq_parameter_all.rabbitmq_parameters().password(); } else { return Error_manager(Error_code::RABBITMQ_PROTOBUF_LOSS_ERROR, Error_level::MINOR_ERROR, " rabbitmq_parameter_all.rabbitmq_parameters() The data is not complete "); } //amqp_socket_open 打开socket连接, 输入ip和port, // 成功返回AMQP_STATUS_OK = 0x0, 失败返回错误状态码, 详见 enum amqp_status_enum_ //只需要设置配置服务器的ip和port, 不需要配置子节点客户端的ip和port, 在后面配置channel通道时,进行设置. t_status = amqp_socket_open(mp_socket, m_ip.c_str(), m_port); if (t_status != AMQP_STATUS_OK) { return Error_manager(Error_code::RABBITMQ_AMQP_SOCKET_OPEN_ERROR, Error_level::MINOR_ERROR, amqp_error_to_string(t_status, "amqp_socket_open")); } //amqp_login() 登录代理服务器, //输入 连接参数结构体 amqp_connection_state_t, //输入 连接地址, 前面 amqp_socket_open() 已经输入了,这里默认写"/" //输入 连接通道最大值, 默认值0表示没有限制 //输入 连接帧率最大值, 默认值是131072 (128KB) //输入 心跳帧之间的秒数, 默认值0禁用心跳 //输入 身份验证模式, AMQP_SASL_METHOD_PLAIN, 追加用户名和密码 // AMQP_SASL_METHOD_EXTERNAL, 追加身份证 //返回 结果的结构体 amqp_rpc_reply_t // amqp_response_type_enum reply_type 登录成功是 AMQP_RESPONSE_NORMAL // 失败:如果是 reply_type == AMQP_RESPONSE_SERVER_EXCEPTION, 服务器连接错误, 错误信息在 amqp_method_t reply // 失败:如果是 reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION, 库函数错误, 错误信息在 int library_error t_reply = amqp_login(mp_connect, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, m_user.c_str(), m_password.c_str()); if (t_reply.reply_type != AMQP_RESPONSE_NORMAL) { return Error_manager(Error_code::RABBITMQ_AMQP_LOGIN_ERROR, Error_level::MINOR_ERROR, amqp_error_to_string(t_reply, "amqp_login")); } //清除channel_map, 通道的缓存,防止重复开启, (channel允许重复使用, 但是不能重复初始化) m_channel_map.clear(); //创建通道队列消费者, (交换机和永久队列不在代码里创建,请在服务器上手动创建) t_error = rabbitmq_new_channel_queue_consume(m_rabbitmq_parameter_all); if (t_error != Error_code::SUCCESS) { return t_error; } //不用重启线程 return Error_code::SUCCESS; } //设置 自动封装状态的时间周期 void Rabbitmq_base::set_encapsulate_status_cycle_time(unsigned int encapsulate_status_cycle_time) { m_encapsulate_status_cycle_time = encapsulate_status_cycle_time; } //设置回调函数check_msg_callback void Rabbitmq_base::set_check_msg_callback(Error_manager (*callback)(Rabbitmq_message *p_msg)) { check_msg_callback = callback; } //设置回调函数check_executer_callback void Rabbitmq_base::set_check_executer_callback(Error_manager (*callback)(Rabbitmq_message *p_msg)) { check_executer_callback = callback; } //设置回调函数execute_msg_callback void Rabbitmq_base::set_execute_msg_callback(Error_manager (*callback)(Rabbitmq_message *p_msg)) { execute_msg_callback = callback; } //设置回调函数encapsulate_status_callback void Rabbitmq_base::set_encapsulate_status_callback(Error_manager (*callback)()) { encapsulate_status_callback = callback; } //mp_receive_analysis_thread 接受解析 执行函数, void Rabbitmq_base::receive_analysis_thread() { LOG(INFO) << " Rabbitmq_base::receive_analysis_thread start " << this; //通信接受线程, 负责接受socket消息, 并存入 m_receive_data_list while (m_receive_analysis_condition.is_alive()) { //这里就不需要超时等待了, rabbitmq的接受函数可以配置等待超时.... // m_receive_analysis_condition.wait_for_ex(std::chrono::microseconds(1)); m_receive_analysis_condition.wait(); if (m_receive_analysis_condition.is_alive()) { std::this_thread::sleep_for(std::chrono::microseconds(100)); std::this_thread::yield(); amqp_rpc_reply_t t_reply; //运行结果 amqp_envelope_t t_envelope; //数据包, 含有一些包裹属性和数据内容 //接受消息等待超时,默认1000us, 当收到消息后,立刻通过阻塞,否则等待超时后通过阻塞 struct timeval t_timeout; //超时时间, 默认1ms t_timeout.tv_sec = 0; t_timeout.tv_usec = 1000; {//这个大括号表示只对 recv 和 send 加锁, 不要因为后面的复杂逻辑影响通信效率 std::unique_lock lk(m_mutex); //允许释放连接参数状态的内存, // 因为这个连接是底层分配的内存,是全局的. 为了开启多个连接,就要重复使用 //这里释放之后,其他代码就开启多线程开启新的连接了. amqp_maybe_release_buffers(mp_connect); //amqp_consume_message 接受消息, 阻塞函数,可以设置超时. //输入 amqp_connection_state_t state 连接状态参数的结构体 //输入 amqp_envelope_t *envelope 接受数据包的指针, 成功接收到数据后,数据包会覆盖 //输入 const struct timeval *timeout 超时时间, 防止阻塞. 传入NULL就是完全阻塞. //输入 int flags 未使用, 默认0 //输入 amqp_connection_state_t state 连接状态参数的结构体 //返回 状态结果的结构体 amqp_rpc_reply_t // amqp_response_type_enum reply_type 成功是 AMQP_RESPONSE_NORMAL // 失败:如果是 reply_type == AMQP_RESPONSE_SERVER_EXCEPTION, 服务器连接错误, 错误信息在 amqp_method_t reply // 失败:如果是 reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION, 库函数错误, 错误信息在 int library_error t_reply = amqp_consume_message(mp_connect, &t_envelope, &t_timeout, 0); } if (AMQP_RESPONSE_NORMAL == t_reply.reply_type)//正常接受到消息 { m_rabbitmq_status = RABBITMQ_STATUS_READY; //从t_envelope数据包里面提取信息 std::string t_receive_string = std::string((char *) t_envelope.message.body.bytes, t_envelope.message.body.len); int t_channel = t_envelope.channel; int t_delivery_tag = t_envelope.delivery_tag; std::string t_exchange_name = std::string((char *) t_envelope.exchange.bytes, t_envelope.exchange.len); std::string t_routing_key = std::string((char *) t_envelope.routing_key.bytes, t_envelope.routing_key.len); //如果这里接受到了消息, 在这提前解析消息最前面的Base_msg (消息公共内容), 用于后续的check message::Base_msg t_base_msg; // if( t_base_msg.ParseFromString(t_receive_string) ) //删除 message::Base_msg 里面的 message::Base_info的机制,完全依赖服务器来分发消息 if (true) { //第一次解析之后转化为, Communication_message, 自定义的通信消息格式 Rabbitmq_message t_rabbitmq_message; t_rabbitmq_message.reset(t_base_msg.base_info(), t_receive_string, t_channel, t_delivery_tag, t_exchange_name, t_routing_key); //检查消息是否有效, 主要检查消息类型和接受者, 判断这条消息是不是给我的. if (check_msg(&t_rabbitmq_message) == SUCCESS) { //这里直接就用当前线程进行处理, //检查消息是否可以被处理 if (check_executer(&t_rabbitmq_message) == SUCCESS) { //处理消息 if (execute_msg(&t_rabbitmq_message) == SUCCESS) { } //else不做处理 } //else不做处理 } //else不做处理 } //else解析失败, 就当做什么也没发生, 认为接收消息无效, else { std::cout << " huli test :::: " << " t_receive_string = " << t_receive_string << std::endl; if (t_channel == 401) { amqp_basic_ack(mp_connect, t_channel, t_delivery_tag, 0); } } //amqp_destroy_envelope 销毁数据包, 只有接受成功, t_envelope才有内存 amqp_destroy_envelope(&t_envelope); } else//没有接受到消息 { //超时报错,不做处理, continue //注注注注注意了, 没有收到消息会超时报错, res.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION, res.library_error = -13, (-0x000D request timed out) if (t_reply.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION && t_reply.library_error == -13) { m_rabbitmq_status = RABBITMQ_STATUS_READY; continue; } else//其他报错,特殊处理 { //need std::string error_description = amqp_error_to_string(t_reply, "amqp_consume_message"); LOG(WARNING) << " huli test 123123123:::: " << " error_description = " << error_description << std::endl; // return Error_manager(Error_code::RABBITMQ_AMQP_CONSUME_MESSAGE_ERROR, Error_level::MINOR_ERROR, // amqp_error_to_string(t_reply, "amqp_consume_message") ); //重启 rabbitmq_reconnnect(); } } } } LOG(INFO) << " Rabbitmq_base::receive_analysis_thread end " << this; return; } //检查消息是否有效, 主要检查消息类型和接受者, 判断这条消息是不是给我的., 需要子类重载 Error_manager Rabbitmq_base::check_msg(Rabbitmq_message *p_msg) { if (check_msg_callback != NULL) { return check_msg_callback(p_msg); } return Error_code::SUCCESS; } //检查执行者的状态, 判断能否处理这条消息, 需要子类重载 Error_manager Rabbitmq_base::check_executer(Rabbitmq_message *p_msg) { if (check_executer_callback != NULL) { return check_executer_callback(p_msg); } return Error_code::SUCCESS; } //处理消息, 需要子类重载 Error_manager Rabbitmq_base::execute_msg(Rabbitmq_message *p_msg) { if (execute_msg_callback != NULL) { return execute_msg_callback(p_msg); } else { //需要子类重载 std::cout << " huli test :::: " << " execute_msg Rabbitmq_message = " << p_msg->get_message_buf() << std::endl; //如果是请求消息,那么在子节点继承的时候一定要记得调用 //配置rabbitmq.proto时, 如果consume_no_ack == 0 , 一定要手动调用 amqp_basic_ack int consume_no_ack = 1; if (consume_no_ack == 0 || p_msg->m_channel == 401) { //amqp_basic_ack 确认消息, 通知服务器队列手动删除消息. //输入 amqp_connection_state_t state 连接状态参数的结构体 //输入 amqp_channel_t channel 连接通道的编号 //输入 uint64_t delivery_tag 消息传递编号, //输入 amqp_boolean_t multiple 多个标记位, 默认0, 1表示删除1~delivery_tag的所有消息, 不删除大于delivery_tag的, 0表示只删除这一条 int ack_result = amqp_basic_ack(mp_connect, p_msg->m_channel, p_msg->m_delivery_tag, 0); } } return Error_code::SUCCESS; } //ack_msg 处理完消息后, 手动确认消息, 通知服务器队列删除消息. //执行者在execute_msg里面可以调用这个函数, 或者回调也行. Error_manager Rabbitmq_base::ack_msg(Rabbitmq_message *p_msg) { //amqp_basic_ack 确认消息, 通知服务器队列手动删除消息. //输入 amqp_connection_state_t state 连接状态参数的结构体 //输入 amqp_channel_t channel 连接通道的编号 //输入 uint64_t delivery_tag 消息传递编号, //输入 amqp_boolean_t multiple 多个标记位, 默认0, 1表示删除1~delivery_tag的所有消息, 不删除大于delivery_tag的, 0表示只删除这一条 int ack_result = amqp_basic_ack(mp_connect, p_msg->m_channel, p_msg->m_delivery_tag, 0); if (ack_result != 0) { return Error_manager(Error_code::RABBITMQ_AMQP_BASIC_ACK_ERROR, Error_level::MINOR_ERROR, amqp_error_to_string(ack_result, "amqp_basic_ack")); } return Error_code::SUCCESS; } //mp_send_thread 发送线程执行函数, void Rabbitmq_base::send_thread() { LOG(INFO) << " Rabbitmq_base::send_thread start " << this; //通信发送线程, 负责巡检m_send_list, 并发送消息 while (m_send_condition.is_alive()) { m_send_condition.wait(); if (m_send_condition.is_alive()) { std::this_thread::yield(); Rabbitmq_message *tp_msg = NULL; int t_result = 0; //这里 wait_and_pop 会使用链表内部的 m_data_cond 条件变量来控制等待, //封装线程使用push的时候, 会唤醒线程并通过等待, 此时 m_send_data_condition 是一直通过的. //如果需要退出, 那么就要 m_send_data_list.termination_list(); 和 m_send_data_condition.kill_all(); bool is_pop = m_send_list.wait_and_pop(tp_msg); if (is_pop) { if (tp_msg != NULL) { //amqp_basic_properties_t 消息数据的基本属性,里面有15个成员. amqp_basic_properties_t props; //判断是否要设置发送消息的超时时间, 如果配置10秒,超时后,服务器会自动删除消息 if (tp_msg->m_timeout_ms == std::chrono::milliseconds(0)) { //amqp_flags_t _flags 一个uint32_t, 按位 表示这15个属性的修改开关. //例如: _flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG = 0b 1001 0000 0000 0000; //就表示 content-type 和 delivery-mode 是有效属性. 接下来的设置就会生效. props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG; //amqp_bytes_t content_type 消息数据的类型 "text/plain"是 普通文本格式 //注意了,需要使用 amqp_cstring_bytes() 将char*转为amqp_bytes_t(自定义的字符串, 类似于std::string) props.content_type = amqp_cstring_bytes("text/plain"); //uint8_t delivery_mode 配送模式 2表示持续发送模式 props.delivery_mode = AMQP_DELIVERY_PERSISTENT; } else { //amqp_flags_t _flags 一个uint32_t, 按位 表示这15个属性的修改开关. //例如: _flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG = 0b 1001 0000 0000 0000; //就表示 content-type 和 delivery-mode 是有效属性. 接下来的设置就会生效. props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG | AMQP_BASIC_EXPIRATION_FLAG; //amqp_bytes_t content_type 消息数据的类型 "text/plain"是 普通文本格式 //注意了,需要使用 amqp_cstring_bytes() 将char*转为amqp_bytes_t(自定义的字符串, 类似于std::string) props.content_type = amqp_cstring_bytes("text/plain"); //uint8_t delivery_mode 配送模式 2表示持续发送模式 props.delivery_mode = AMQP_DELIVERY_PERSISTENT; char buf[256] = {0}; sprintf(buf, "%d", (int) tp_msg->m_timeout_ms.count()); props.expiration = amqp_cstring_bytes(buf);//超时, 单位ms; } {//这个大括号表示只对 recv 和 send 加锁, 不要因为后面的复杂逻辑影响通信效率 std::unique_lock lk(m_mutex); // std::cout << " huli test :::: " << " tp_msg->m_message_buf = " << tp_msg->m_message_buf << std::endl; //amqp_basic_publish() 发布消息给代理服务器, 在交换器上发布一个带有路由密钥的消息。交换机会根据路由密钥匹配,放到对应的队列里面 //输入 amqp_connection_state_t state 连接状态参数的结构体 //输入 amqp_channel_t channel 连接通道的编号 //输入 amqp_bytes_t exchange 交换机模式字符串 //输入 amqp_bytes_t routing_key 路由密钥字符串, 交换机的判断规则. 发送端的 routingkey 和 接收端的 bindingkey 需要保持一致 //输入 amqp_boolean_t mandatory 强制服务器必须通过路由密钥才能存到队列, 默认为0 //输入 amqp_boolean_t immediate 表示服务器必须立刻转发消息给接受者, 默认为0 //输入 struct amqp_basic_properties_t_ const *properties 消息数据的基本属性 //输入 amqp_bytes_t body 消息数据内容 //返回错误码 成功返回AMQP_STATUS_OK = 0x0, 失败返回错误状态码, 详见 enum amqp_status_enum_ //注注注注注意了::amqp_basic_publish()是异步通信, // return AMQP_STATUS_OK 也只是表示消息成功发送到服务器. 无法确认 接收端是否正常接受消息 t_result = amqp_basic_publish(mp_connect, tp_msg->m_channel, amqp_cstring_bytes(tp_msg->m_exchange_name.c_str()), amqp_cstring_bytes(tp_msg->m_routing_key.c_str()), 0, 0, &props, amqp_cstring_bytes(tp_msg->m_message_buf.c_str())); } if (t_result == AMQP_STATUS_OK) { m_rabbitmq_status = RABBITMQ_STATUS_READY; delete (tp_msg); tp_msg = NULL; // std::string re = amqp_error_to_string(t_result, "amqp_basic_publish"); // std::cout << " huli test :::: " << " re = " << re << std::endl; // return Error_manager(Error_code::RABBITMQ_AMQP_BASIC_PUBLISH_ERROR, Error_level::MINOR_ERROR, // amqp_error_to_string(t_result, "amqp_basic_publish") ); } else { std::string re = amqp_error_to_string(t_result, "amqp_basic_publish"); std::cout << " huli test :::: " << " re = " << re << std::endl; //重启 m_rabbitmq_status = RABBITMQ_STATUS_RECONNNECT; m_send_list.push(tp_msg); //重新加入队列,下一次再发 tp_msg = NULL; rabbitmq_reconnnect(); } } } else { //没有取出, 那么应该就是 m_termination_flag 结束了 // return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR, // " Communication_socket_base::send_data_thread() error "); } } } LOG(INFO) << " Rabbitmq_base::send_thread end " << this; return; } //手动封装消息,需要手动写入参数channel,exchange_name,routing_key Error_manager Rabbitmq_base::encapsulate_msg(std::string message, int channel, std::string exchange_name, std::string routing_key, int timeout_ms = 0) { if (m_rabbitmq_status != RABBITMQ_STATUS_READY) { LOG(ERROR) << " m_rabbitmq_status error "; return Error_manager(Error_code::ERROR, Error_level::MINOR_ERROR, " m_rabbitmq_status error "); } // LOG(INFO) << exchange_name << " " << routing_key; Rabbitmq_message *tp_msg = new Rabbitmq_message(message, channel, exchange_name, routing_key, timeout_ms); bool is_push = m_send_list.push(tp_msg); if (is_push == false) { delete (tp_msg); tp_msg = NULL; return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR, " Communication_socket_base::encapsulate_msg error "); } return Error_code::SUCCESS; } //手动封装消息,需要手动写入参数channel,exchange_name,routing_key Error_manager Rabbitmq_base::encapsulate_msg(Rabbitmq_message *p_msg) { Rabbitmq_message *tp_msg = new Rabbitmq_message(*p_msg); bool is_push = m_send_list.push(tp_msg); if (is_push == false) { delete (tp_msg); tp_msg = NULL; return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR, " Communication_socket_base::encapsulate_msg error "); } return Error_code::SUCCESS; } //手动封装任务消息(请求和答复), 系统会使用rabbitmq.proto的配置参数, Error_manager Rabbitmq_base::encapsulate_task_msg(std::string message, int vector_index) { int channel = m_rabbitmq_parameter_all.rabbitmq_parameters().rabbitmq_sender_request_vector(vector_index).channel(); std::string exchange_name = m_rabbitmq_parameter_all.rabbitmq_parameters().rabbitmq_sender_request_vector( vector_index).exchange_name(); std::string routing_key = m_rabbitmq_parameter_all.rabbitmq_parameters().rabbitmq_sender_request_vector( vector_index).routing_key(); int timeout_ms = m_rabbitmq_parameter_all.rabbitmq_parameters().rabbitmq_sender_request_vector( vector_index).timeout_ms(); Rabbitmq_message *tp_msg = new Rabbitmq_message(message, channel, exchange_name, routing_key, timeout_ms); bool is_push = m_send_list.push(tp_msg); if (is_push == false) { delete (tp_msg); tp_msg = NULL; return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR, " Communication_socket_base::encapsulate_msg error "); } return Error_code::SUCCESS; } //手动封装状态消息, 系统会使用rabbitmq.proto的配置参数, Error_manager Rabbitmq_base::encapsulate_status_msg(std::string message, int vector_index) { if (vector_index >= m_rabbitmq_parameter_all.rabbitmq_parameters().rabbitmq_sender_status_vector().size()) { LOG(WARNING) << "vector index error."; return {FAILED, NORMAL}; } int channel = m_rabbitmq_parameter_all.rabbitmq_parameters().rabbitmq_sender_status_vector(vector_index).channel(); std::string exchange_name = m_rabbitmq_parameter_all.rabbitmq_parameters().rabbitmq_sender_status_vector( vector_index).exchange_name(); std::string routing_key = m_rabbitmq_parameter_all.rabbitmq_parameters().rabbitmq_sender_status_vector( vector_index).routing_key(); int timeout_ms = m_rabbitmq_parameter_all.rabbitmq_parameters().rabbitmq_sender_status_vector( vector_index).timeout_ms(); Rabbitmq_message *tp_msg = new Rabbitmq_message(message, channel, exchange_name, routing_key, timeout_ms); bool is_push = m_send_list.push(tp_msg); if (is_push == false) { delete (tp_msg); tp_msg = NULL; return Error_manager(Error_code::CONTAINER_IS_TERMINATE, Error_level::MINOR_ERROR, " Communication_socket_base::encapsulate_msg error "); } return Error_code::SUCCESS; } //mp_encapsulate_stauts_thread 自动封装线程执行函数, void Rabbitmq_base::encapsulate_status_thread() { LOG(INFO) << " Rabbitmq_base::encapsulate_status_thread start " << this; //通信封装线程, 负责定时封装消息, 并存入 m_send_data_list while (m_encapsulate_status_condition.is_alive()) { bool t_pass_flag = m_encapsulate_status_condition.wait_for_millisecond(m_encapsulate_status_cycle_time); if (m_encapsulate_status_condition.is_alive()) { std::this_thread::yield(); //如果封装线程被主动唤醒, 那么就表示 需要主动发送消息, if (t_pass_flag) { //主动发送消息, } //如果封装线程超时通过, 那么就定时封装心跳和状态信息 else { //只有通信正常的时候,才封装发送状态消息 if (m_rabbitmq_status == RABBITMQ_STATUS_READY) { auto_encapsulate_status(); } } } } LOG(INFO) << " Rabbitmq_base::encapsulate_status_thread end " << this; return; } //定时封装发送消息, 一般为心跳和状态信息, 需要子类重载 Error_manager Rabbitmq_base::auto_encapsulate_status() { if (encapsulate_status_callback != NULL) { return encapsulate_status_callback(); } return Error_code::SUCCESS; } //把rabbitmq的错误信息转化为string, amqp_status就是enum amqp_status_enum_, amqp_error_string2()函数可以把他翻译为string std::string Rabbitmq_base::amqp_error_to_string(int amqp_status) { char buf[256] = {0}; sprintf(buf, "amqp_status = 0x%x, %s", amqp_status, amqp_error_string2(amqp_status)); return buf; } //把rabbitmq的错误信息转化为string, amqp_status就是enum amqp_status_enum_, amqp_error_string2()函数可以把他翻译为string std::string Rabbitmq_base::amqp_error_to_string(int amqp_status, std::string amqp_fun_name) { char buf[256] = {0}; sprintf(buf, "amqp_fun_name = %s, amqp_status = 0x%x, %s", amqp_fun_name.c_str(), amqp_status, amqp_error_string2(amqp_status)); return buf; } //把rabbitmq的错误信息转化为string, amqp_rpc_reply_t就是amqp函数运行的结果 std::string Rabbitmq_base::amqp_error_to_string(amqp_rpc_reply_t amqp_rpc_reply) { char buf[256] = {0}; // amqp_response_type_enum reply_type 登录成功是 AMQP_RESPONSE_NORMAL // 失败:如果是 reply_type == AMQP_RESPONSE_SERVER_EXCEPTION, 服务器连接错误, 错误信息在 amqp_method_t reply // 失败:如果是 reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION, 库函数错误, 错误信息在 int library_error switch (amqp_rpc_reply.reply_type) { case AMQP_RESPONSE_NORMAL: { sprintf(buf, "SUCCESS"); break; } case AMQP_RESPONSE_NONE: { sprintf(buf, " reply_type = AMQP_RESPONSE_NONE "); break; } case AMQP_RESPONSE_LIBRARY_EXCEPTION: { sprintf(buf, " reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION, library_error = %s, ", amqp_error_string2(amqp_rpc_reply.library_error)); break; } case AMQP_RESPONSE_SERVER_EXCEPTION: { if (amqp_rpc_reply.reply.id == AMQP_CONNECTION_CLOSE_METHOD) { amqp_connection_close_t *p_decoded = (amqp_connection_close_t *) amqp_rpc_reply.reply.decoded; sprintf(buf, " reply.id = AMQP_CONNECTION_CLOSE_METHOD, reply = %u, %.*s ", p_decoded->reply_code, (int) p_decoded->reply_text.len, (char *) p_decoded->reply_text.bytes); } else if (amqp_rpc_reply.reply.id == AMQP_CHANNEL_CLOSE_METHOD) { amqp_channel_close_t *p_decoded = (amqp_channel_close_t *) amqp_rpc_reply.reply.decoded; sprintf(buf, " reply.id = AMQP_CHANNEL_CLOSE_METHOD, reply = %u, %.*s ", p_decoded->reply_code, (int) p_decoded->reply_text.len, (char *) p_decoded->reply_text.bytes); } else { sprintf(buf, " reply_type = AMQP_RESPONSE_SERVER_EXCEPTION "); } break; } default: { sprintf(buf, " reply_type = unknown, reply.id = 0x%08X, ", amqp_rpc_reply.reply.id); break; } } return buf; } //把rabbitmq的错误信息转化为string, amqp_rpc_reply_t就是amqp函数运行的结果 std::string Rabbitmq_base::amqp_error_to_string(amqp_rpc_reply_t amqp_rpc_reply, std::string amqp_fun_name) { char buf[256] = {0}; // amqp_response_type_enum reply_type 登录成功是 AMQP_RESPONSE_NORMAL // 失败:如果是 reply_type == AMQP_RESPONSE_SERVER_EXCEPTION, 服务器连接错误, 错误信息在 amqp_method_t reply // 失败:如果是 reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION, 库函数错误, 错误信息在 int library_error switch (amqp_rpc_reply.reply_type) { case AMQP_RESPONSE_NORMAL: { sprintf(buf, "SUCCESS"); break; } case AMQP_RESPONSE_NONE: { sprintf(buf, "amqp_fun_name = %s, reply_type = AMQP_RESPONSE_NONE ", amqp_fun_name.c_str()); break; } case AMQP_RESPONSE_LIBRARY_EXCEPTION: { // huli test 123123123:::: error_description = amqp_fun_name = amqp_consume_message, reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION, library_error = unexpected protocol state, sprintf(buf, "amqp_fun_name = %s, reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION, library_error = %s, ", amqp_fun_name.c_str(), amqp_error_string2(amqp_rpc_reply.library_error)); break; } case AMQP_RESPONSE_SERVER_EXCEPTION: { if (amqp_rpc_reply.reply.id == AMQP_CONNECTION_CLOSE_METHOD) { amqp_connection_close_t *p_decoded = (amqp_connection_close_t *) amqp_rpc_reply.reply.decoded; sprintf(buf, "amqp_fun_name = %s, reply.id = AMQP_CONNECTION_CLOSE_METHOD, reply = %u, %.*s ", amqp_fun_name.c_str(), p_decoded->reply_code, (int) p_decoded->reply_text.len, (char *) p_decoded->reply_text.bytes); } else if (amqp_rpc_reply.reply.id == AMQP_CHANNEL_CLOSE_METHOD) { amqp_channel_close_t *p_decoded = (amqp_channel_close_t *) amqp_rpc_reply.reply.decoded; sprintf(buf, "amqp_fun_name = %s, reply.id = AMQP_CHANNEL_CLOSE_METHOD, reply = %u, %.*s ", amqp_fun_name.c_str(), p_decoded->reply_code, (int) p_decoded->reply_text.len, (char *) p_decoded->reply_text.bytes); } else { sprintf(buf, "amqp_fun_name = %s, reply_type = AMQP_RESPONSE_SERVER_EXCEPTION ", amqp_fun_name.c_str()); } break; } default: { sprintf(buf, "amqp_fun_name = %s, reply_type = unknown, reply.id = 0x%08X, ", amqp_fun_name.c_str(), amqp_rpc_reply.reply.id); break; } } return buf; }