ttt.cpp 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303
  1. #include "ttt.h"
  2. #include <unistd.h>
  3. CRabbitmqClient::CRabbitmqClient()
  4. : m_strHostname("")
  5. , m_iPort(0)
  6. , m_strUser("")
  7. , m_strPasswd("")
  8. , m_iChannel(1) //默认用1号通道,通道无所谓
  9. , m_pSock(NULL)
  10. , m_pConn(NULL) {
  11. }
  12. CRabbitmqClient::~CRabbitmqClient() {
  13. if (NULL != m_pConn) {
  14. Disconnect();
  15. m_pConn = NULL;
  16. }
  17. }
  18. int CRabbitmqClient::Connect(const string &strHostname, int iPort, const string &strUser, const string &strPasswd) {
  19. m_strHostname = strHostname;
  20. m_iPort = iPort;
  21. m_strUser = strUser;
  22. m_strPasswd = strPasswd;
  23. m_pConn = amqp_new_connection();
  24. if (NULL == m_pConn) {
  25. fprintf(stderr, "amqp new connection failed\n");
  26. return -1;
  27. }
  28. m_pSock = amqp_tcp_socket_new(m_pConn);
  29. if (NULL == m_pSock) {
  30. fprintf(stderr, "amqp tcp new socket failed\n");
  31. return -2;
  32. }
  33. int status = amqp_socket_open(m_pSock, m_strHostname.c_str(), m_iPort);
  34. if (status<0) {
  35. fprintf(stderr, "amqp socket open failed\n");
  36. return -3;
  37. }
  38. // amqp_login(amqp_connection_state_t state,char const *vhost, int channel_max, int frame_max, int heartbeat, amqp_sasl_method_enum sasl_method, ..)
  39. if (0 != ErrorMsg(amqp_login(m_pConn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, m_strUser.c_str(), m_strPasswd.c_str()), "Logging in")) {
  40. return -4;
  41. }
  42. return 0;
  43. }
  44. int CRabbitmqClient::Disconnect() {
  45. if (NULL != m_pConn) {
  46. if (0 != ErrorMsg(amqp_connection_close(m_pConn, AMQP_REPLY_SUCCESS), "Closing connection"))
  47. return -1;
  48. if (amqp_destroy_connection(m_pConn) < 0)
  49. return -2;
  50. m_pConn = NULL;
  51. }
  52. return 0;
  53. }
  54. int CRabbitmqClient::ExchangeDeclare(const string &strExchange, const string &strType) {
  55. amqp_channel_open(m_pConn, m_iChannel);
  56. amqp_bytes_t _exchange = amqp_cstring_bytes(strExchange.c_str());
  57. amqp_bytes_t _type = amqp_cstring_bytes(strType.c_str());
  58. int _passive= 0;
  59. int _durable= 0; // 交换机是否持久化
  60. amqp_exchange_declare(m_pConn, m_iChannel, _exchange, _type, _passive, _durable, 0, 0, amqp_empty_table);
  61. if (0 != ErrorMsg(amqp_get_rpc_reply(m_pConn), "exchange_declare")) {
  62. amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);
  63. return -1;
  64. }
  65. amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);
  66. return 0;
  67. }
  68. int CRabbitmqClient::QueueDeclare(const string &strQueueName) {
  69. if(NULL == m_pConn) {
  70. fprintf(stderr, "QueueDeclare m_pConn is null\n");
  71. return -1;
  72. }
  73. amqp_channel_open(m_pConn, m_iChannel);
  74. amqp_bytes_t _queue = amqp_cstring_bytes(strQueueName.c_str());
  75. int32_t _passive = 0;
  76. int32_t _durable = 0;
  77. int32_t _exclusive = 0;
  78. int32_t _auto_delete = 1;
  79. amqp_queue_declare(m_pConn, m_iChannel, _queue, _passive, _durable, _exclusive, _auto_delete, amqp_empty_table);
  80. if (0 != ErrorMsg(amqp_get_rpc_reply(m_pConn), "queue_declare")) {
  81. amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);
  82. return -1;
  83. }
  84. amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);
  85. return 0;
  86. }
  87. int CRabbitmqClient::QueueBind(const string &strQueueName, const string &strExchange, const string &strBindKey) {
  88. if(NULL == m_pConn) {
  89. fprintf(stderr, "QueueBind m_pConn is null\n");
  90. return -1;
  91. }
  92. amqp_channel_open(m_pConn, m_iChannel);
  93. amqp_bytes_t _queue = amqp_cstring_bytes(strQueueName.c_str());
  94. amqp_bytes_t _exchange = amqp_cstring_bytes(strExchange.c_str());
  95. amqp_bytes_t _routkey = amqp_cstring_bytes(strBindKey.c_str());
  96. amqp_queue_bind(m_pConn, m_iChannel, _queue, _exchange, _routkey, amqp_empty_table);
  97. if(0 != ErrorMsg(amqp_get_rpc_reply(m_pConn), "queue_bind")) {
  98. amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);
  99. return -1;
  100. }
  101. amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);
  102. return 0;
  103. }
  104. int CRabbitmqClient::QueueUnbind(const string &strQueueName, const string &strExchange, const string &strBindKey) {
  105. if(NULL == m_pConn) {
  106. fprintf(stderr, "QueueUnbind m_pConn is null\n");
  107. return -1;
  108. }
  109. amqp_channel_open(m_pConn, m_iChannel);
  110. amqp_bytes_t _queue = amqp_cstring_bytes(strQueueName.c_str());
  111. amqp_bytes_t _exchange = amqp_cstring_bytes(strExchange.c_str());
  112. amqp_bytes_t _routkey = amqp_cstring_bytes(strBindKey.c_str());
  113. amqp_queue_unbind(m_pConn, m_iChannel, _queue, _exchange, _routkey, amqp_empty_table);
  114. if(0 != ErrorMsg(amqp_get_rpc_reply(m_pConn), "queue_unbind")) {
  115. amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);
  116. return -1;
  117. }
  118. amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);
  119. return 0;
  120. }
  121. int CRabbitmqClient::QueueDelete(const string &strQueueName, int iIfUnused) {
  122. if(NULL == m_pConn) {
  123. fprintf(stderr, "QueueDelete m_pConn is null\n");
  124. return -1;
  125. }
  126. amqp_channel_open(m_pConn, m_iChannel);
  127. if(0 != ErrorMsg(amqp_get_rpc_reply(m_pConn), "open channel")) {
  128. amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);
  129. return -2;
  130. }
  131. amqp_queue_delete(m_pConn, m_iChannel, amqp_cstring_bytes(strQueueName.c_str()), iIfUnused, 0);
  132. if(0 != ErrorMsg(amqp_get_rpc_reply(m_pConn), "delete queue")) {
  133. amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);
  134. return -3;
  135. }
  136. amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);
  137. return 0;
  138. }
  139. int CRabbitmqClient::Publish(const string &strMessage, const string &strExchange, const string &strRoutekey) {
  140. if (NULL == m_pConn) {
  141. fprintf(stderr, "publish m_pConn is null, publish failed\n");
  142. return -1;
  143. }
  144. amqp_channel_open(m_pConn, m_iChannel);
  145. if(0 != ErrorMsg(amqp_get_rpc_reply(m_pConn), "open channel")) {
  146. amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);
  147. return -2;
  148. }
  149. amqp_bytes_t message_bytes;
  150. message_bytes.len = strMessage.length();
  151. message_bytes.bytes = (void *)(strMessage.c_str());
  152. //fprintf(stderr, "publish message(%d): %.*s\n", (int)message_bytes.len, (int)message_bytes.len, (char *)message_bytes.bytes);
  153. /*
  154. amqp_basic_properties_t props;
  155. props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
  156. props.content_type = amqp_cstring_bytes(m_type.c_str());
  157. props.delivery_mode = m_durable; // persistent delivery mode
  158. */
  159. amqp_bytes_t exchange = amqp_cstring_bytes(strExchange.c_str());
  160. amqp_bytes_t routekey = amqp_cstring_bytes(strRoutekey.c_str());
  161. //if (0 != amqp_basic_publish(m_pConn, m_iChannel, exchange, routekey, 0, 0, &props, message_bytes)) {
  162. if (0 != amqp_basic_publish(m_pConn, m_iChannel, exchange, routekey, 0, 0, NULL, message_bytes)) {
  163. fprintf(stderr, "publish amqp_basic_publish failed\n");
  164. if (0 != ErrorMsg(amqp_get_rpc_reply(m_pConn), "amqp_basic_publish")) {
  165. amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);
  166. return -3;
  167. }
  168. }
  169. amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);
  170. return 0;
  171. }
  172. int CRabbitmqClient::Consumer(const string &strQueueName, vector<string> &message_array, int GetNum, struct timeval *timeout) {
  173. if (NULL == m_pConn) {
  174. fprintf(stderr, "Consumer m_pConn is null, Consumer failed\n");
  175. return -1;
  176. }
  177. amqp_channel_open(m_pConn, m_iChannel);
  178. if (0 != ErrorMsg(amqp_get_rpc_reply(m_pConn), "open channel")) {
  179. amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);
  180. return -2;
  181. }
  182. amqp_basic_qos(m_pConn, m_iChannel, 0, GetNum, 0);
  183. int ack = 1; // no_ack 是否需要确认消息后再从队列中删除消息
  184. amqp_bytes_t queuename= amqp_cstring_bytes(strQueueName.c_str());
  185. amqp_basic_consume(m_pConn, m_iChannel, queuename, amqp_empty_bytes, 0, ack, 0, amqp_empty_table);
  186. if (0 != ErrorMsg(amqp_get_rpc_reply(m_pConn), "Consuming")) {
  187. amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);
  188. return -3;
  189. }
  190. int hasget = 0;
  191. amqp_rpc_reply_t res;
  192. amqp_envelope_t envelope;
  193. while (GetNum > 0) {
  194. amqp_maybe_release_buffers(m_pConn);
  195. res = amqp_consume_message(m_pConn, &envelope, timeout, 0);
  196. if (AMQP_RESPONSE_NORMAL != res.reply_type) {
  197. fprintf(stderr, "Consumer amqp_channel_close failed\n");
  198. amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);
  199. if (0 == hasget)
  200. return -res.reply_type;
  201. else
  202. return 0;
  203. }
  204. string str((char *)envelope.message.body.bytes, (char *)envelope.message.body.bytes + envelope.message.body.len);
  205. message_array.push_back(str);
  206. int rtn = amqp_basic_ack(m_pConn, m_iChannel, envelope.delivery_tag, 1);
  207. amqp_destroy_envelope(&envelope);
  208. if (rtn != 0) {
  209. amqp_channel_close(m_pConn, m_iChannel, AMQP_REPLY_SUCCESS);
  210. return -4;
  211. }
  212. GetNum--;
  213. hasget++;
  214. usleep(1);
  215. }
  216. return 0;
  217. }
  218. int CRabbitmqClient::ErrorMsg(amqp_rpc_reply_t x, char const *context) {
  219. switch (x.reply_type) {
  220. case AMQP_RESPONSE_NORMAL:
  221. return 0;
  222. case AMQP_RESPONSE_NONE:
  223. fprintf(stderr, "%s: missing RPC reply type!\n", context);
  224. break;
  225. case AMQP_RESPONSE_LIBRARY_EXCEPTION:
  226. fprintf(stderr, "%s: %s\n", context, amqp_error_string2(x.library_error));
  227. break;
  228. case AMQP_RESPONSE_SERVER_EXCEPTION:
  229. switch (x.reply.id) {
  230. case AMQP_CONNECTION_CLOSE_METHOD: {
  231. amqp_connection_close_t *m = (amqp_connection_close_t *)x.reply.decoded;
  232. fprintf(stderr, "%s: server connection error %uh, message: %.*s\n",
  233. context, m->reply_code, (int)m->reply_text.len,
  234. (char *)m->reply_text.bytes);
  235. break;
  236. }
  237. case AMQP_CHANNEL_CLOSE_METHOD: {
  238. amqp_channel_close_t *m = (amqp_channel_close_t *)x.reply.decoded;
  239. fprintf(stderr, "%s: server channel error %uh, message: %.*s\n",
  240. context, m->reply_code, (int)m->reply_text.len,
  241. (char *)m->reply_text.bytes);
  242. break;
  243. }
  244. default:
  245. fprintf(stderr, "%s: unknown server error, method id 0x%08X\n",
  246. context, x.reply.id);
  247. break;
  248. }
  249. break;
  250. }
  251. return -1;
  252. }