amqp_rpc_sendstring_client.c 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. // Copyright 2007 - 2021, Alan Antonuk and the rabbitmq-c contributors.
  2. // SPDX-License-Identifier: mit
  3. #include <stdint.h>
  4. #include <stdio.h>
  5. #include <stdlib.h>
  6. #include <string.h>
  7. #include <rabbitmq-c/amqp.h>
  8. #include <rabbitmq-c/tcp_socket.h>
  9. #include <assert.h>
  10. #include "utils.h"
  11. int main(int argc, char *argv[]) {
  12. char const *hostname;
  13. int port, status;
  14. char const *exchange;
  15. char const *routingkey;
  16. char const *messagebody;
  17. amqp_socket_t *socket = NULL;
  18. amqp_connection_state_t conn;
  19. amqp_bytes_t reply_to_queue;
  20. if (argc < 6) { /* minimum number of mandatory arguments */
  21. fprintf(stderr,
  22. "usage:\namqp_rpc_sendstring_client host port exchange routingkey "
  23. "messagebody\n");
  24. return 1;
  25. }
  26. hostname = argv[1];
  27. port = atoi(argv[2]);
  28. exchange = argv[3];
  29. routingkey = argv[4];
  30. messagebody = argv[5];
  31. /*
  32. establish a channel that is used to connect RabbitMQ server
  33. */
  34. conn = amqp_new_connection();
  35. socket = amqp_tcp_socket_new(conn);
  36. if (!socket) {
  37. die("creating TCP socket");
  38. }
  39. status = amqp_socket_open(socket, hostname, port);
  40. if (status) {
  41. die("opening TCP socket");
  42. }
  43. die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN,
  44. "guest", "guest"),
  45. "Logging in");
  46. amqp_channel_open(conn, 1);
  47. die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
  48. /*
  49. create private reply_to queue
  50. */
  51. {
  52. amqp_queue_declare_ok_t *r = amqp_queue_declare(
  53. conn, 1, amqp_empty_bytes, 0, 0, 0, 1, amqp_empty_table);
  54. die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue");
  55. reply_to_queue = amqp_bytes_malloc_dup(r->queue);
  56. if (reply_to_queue.bytes == NULL) {
  57. fprintf(stderr, "Out of memory while copying queue name");
  58. return 1;
  59. }
  60. }
  61. /*
  62. send the message
  63. */
  64. {
  65. /*
  66. set properties
  67. */
  68. amqp_basic_properties_t props;
  69. props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG |
  70. AMQP_BASIC_DELIVERY_MODE_FLAG | AMQP_BASIC_REPLY_TO_FLAG |
  71. AMQP_BASIC_CORRELATION_ID_FLAG;
  72. props.content_type = amqp_cstring_bytes("text/plain");
  73. props.delivery_mode = 2; /* persistent delivery mode */
  74. props.reply_to = amqp_bytes_malloc_dup(reply_to_queue);
  75. if (props.reply_to.bytes == NULL) {
  76. fprintf(stderr, "Out of memory while copying queue name");
  77. return 1;
  78. }
  79. props.correlation_id = amqp_cstring_bytes("1");
  80. /*
  81. publish
  82. */
  83. die_on_error(amqp_basic_publish(conn, 1, amqp_cstring_bytes(exchange),
  84. amqp_cstring_bytes(routingkey), 0, 0,
  85. &props, amqp_cstring_bytes(messagebody)),
  86. "Publishing");
  87. amqp_bytes_free(props.reply_to);
  88. }
  89. /*
  90. wait an answer
  91. */
  92. {
  93. amqp_basic_consume(conn, 1, reply_to_queue, amqp_empty_bytes, 0, 1, 0,
  94. amqp_empty_table);
  95. die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming");
  96. amqp_bytes_free(reply_to_queue);
  97. {
  98. amqp_frame_t frame;
  99. int result;
  100. amqp_basic_deliver_t *d;
  101. amqp_basic_properties_t *p;
  102. size_t body_target;
  103. size_t body_received;
  104. for (;;) {
  105. amqp_maybe_release_buffers(conn);
  106. result = amqp_simple_wait_frame(conn, &frame);
  107. printf("Result: %d\n", result);
  108. if (result < 0) {
  109. break;
  110. }
  111. printf("Frame type: %u channel: %u\n", frame.frame_type, frame.channel);
  112. if (frame.frame_type != AMQP_FRAME_METHOD) {
  113. continue;
  114. }
  115. printf("Method: %s\n", amqp_method_name(frame.payload.method.id));
  116. if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) {
  117. continue;
  118. }
  119. d = (amqp_basic_deliver_t *)frame.payload.method.decoded;
  120. printf("Delivery: %u exchange: %.*s routingkey: %.*s\n",
  121. (unsigned)d->delivery_tag, (int)d->exchange.len,
  122. (char *)d->exchange.bytes, (int)d->routing_key.len,
  123. (char *)d->routing_key.bytes);
  124. result = amqp_simple_wait_frame(conn, &frame);
  125. if (result < 0) {
  126. break;
  127. }
  128. if (frame.frame_type != AMQP_FRAME_HEADER) {
  129. fprintf(stderr, "Expected header!");
  130. abort();
  131. }
  132. p = (amqp_basic_properties_t *)frame.payload.properties.decoded;
  133. if (p->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
  134. printf("Content-type: %.*s\n", (int)p->content_type.len,
  135. (char *)p->content_type.bytes);
  136. }
  137. printf("----\n");
  138. body_target = (size_t)frame.payload.properties.body_size;
  139. body_received = 0;
  140. while (body_received < body_target) {
  141. result = amqp_simple_wait_frame(conn, &frame);
  142. if (result < 0) {
  143. break;
  144. }
  145. if (frame.frame_type != AMQP_FRAME_BODY) {
  146. fprintf(stderr, "Expected body!");
  147. abort();
  148. }
  149. body_received += frame.payload.body_fragment.len;
  150. assert(body_received <= body_target);
  151. amqp_dump(frame.payload.body_fragment.bytes,
  152. frame.payload.body_fragment.len);
  153. }
  154. if (body_received != body_target) {
  155. /* Can only happen when amqp_simple_wait_frame returns <= 0 */
  156. /* We break here to close the connection */
  157. break;
  158. }
  159. /* everything was fine, we can quit now because we received the reply */
  160. break;
  161. }
  162. }
  163. }
  164. /*
  165. closing
  166. */
  167. die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS),
  168. "Closing channel");
  169. die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS),
  170. "Closing connection");
  171. die_on_error(amqp_destroy_connection(conn), "Ending connection");
  172. return 0;
  173. }