common.c 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420
  1. // Copyright 2007 - 2021, Alan Antonuk and the rabbitmq-c contributors.
  2. // SPDX-License-Identifier: mit
  3. #include "common.h"
  4. #ifdef WITH_SSL
  5. #include <rabbitmq-c/ssl_socket.h>
  6. #endif
  7. #include <errno.h>
  8. #include <fcntl.h>
  9. #include <rabbitmq-c/tcp_socket.h>
  10. #include <stdarg.h>
  11. #include <stdio.h>
  12. #include <stdlib.h>
  13. #include <string.h>
  14. #include <unistd.h>
  15. #ifdef WINDOWS
  16. #include "compat.h"
  17. #endif
  18. void die(const char *fmt, ...) {
  19. va_list ap;
  20. va_start(ap, fmt);
  21. vfprintf(stderr, fmt, ap);
  22. va_end(ap);
  23. fprintf(stderr, "\n");
  24. exit(1);
  25. }
  26. void die_errno(int err, const char *fmt, ...) {
  27. va_list ap;
  28. if (err == 0) {
  29. return;
  30. }
  31. va_start(ap, fmt);
  32. vfprintf(stderr, fmt, ap);
  33. va_end(ap);
  34. fprintf(stderr, ": %s\n", strerror(err));
  35. exit(1);
  36. }
  37. void die_amqp_error(int err, const char *fmt, ...) {
  38. va_list ap;
  39. if (err >= 0) {
  40. return;
  41. }
  42. va_start(ap, fmt);
  43. vfprintf(stderr, fmt, ap);
  44. va_end(ap);
  45. fprintf(stderr, ": %s\n", amqp_error_string2(err));
  46. exit(1);
  47. }
  48. const char *amqp_server_exception_string(amqp_rpc_reply_t r) {
  49. int res;
  50. static char s[512];
  51. switch (r.reply.id) {
  52. case AMQP_CONNECTION_CLOSE_METHOD: {
  53. amqp_connection_close_t *m = (amqp_connection_close_t *)r.reply.decoded;
  54. res = snprintf(s, sizeof(s), "server connection error %d, message: %.*s",
  55. m->reply_code, (int)m->reply_text.len,
  56. (char *)m->reply_text.bytes);
  57. break;
  58. }
  59. case AMQP_CHANNEL_CLOSE_METHOD: {
  60. amqp_channel_close_t *m = (amqp_channel_close_t *)r.reply.decoded;
  61. res = snprintf(s, sizeof(s), "server channel error %d, message: %.*s",
  62. m->reply_code, (int)m->reply_text.len,
  63. (char *)m->reply_text.bytes);
  64. break;
  65. }
  66. default:
  67. res = snprintf(s, sizeof(s), "unknown server error, method id 0x%08X",
  68. r.reply.id);
  69. break;
  70. }
  71. return res >= 0 ? s : NULL;
  72. }
  73. const char *amqp_rpc_reply_string(amqp_rpc_reply_t r) {
  74. switch (r.reply_type) {
  75. case AMQP_RESPONSE_NORMAL:
  76. return "normal response";
  77. case AMQP_RESPONSE_NONE:
  78. return "missing RPC reply type";
  79. case AMQP_RESPONSE_LIBRARY_EXCEPTION:
  80. return amqp_error_string2(r.library_error);
  81. case AMQP_RESPONSE_SERVER_EXCEPTION:
  82. return amqp_server_exception_string(r);
  83. default:
  84. abort();
  85. }
  86. }
  87. void die_rpc(amqp_rpc_reply_t r, const char *fmt, ...) {
  88. va_list ap;
  89. if (r.reply_type == AMQP_RESPONSE_NORMAL) {
  90. return;
  91. }
  92. va_start(ap, fmt);
  93. vfprintf(stderr, fmt, ap);
  94. va_end(ap);
  95. fprintf(stderr, ": %s\n", amqp_rpc_reply_string(r));
  96. exit(1);
  97. }
  98. static char *amqp_url;
  99. static char *amqp_server;
  100. static int amqp_port = -1;
  101. static char *amqp_vhost;
  102. static char *amqp_username;
  103. static char *amqp_password;
  104. static int amqp_heartbeat = 0;
  105. #ifdef WITH_SSL
  106. static int amqp_ssl = 0;
  107. static char *amqp_cacert = "/etc/ssl/certs/cacert.pem";
  108. static char *amqp_key = NULL;
  109. static char *amqp_cert = NULL;
  110. #endif /* WITH_SSL */
  111. const char *connect_options_title = "Connection options";
  112. struct poptOption connect_options[] = {
  113. {"url", 'u', POPT_ARG_STRING, &amqp_url, 0, "the AMQP URL to connect to",
  114. "amqp://..."},
  115. {"server", 's', POPT_ARG_STRING, &amqp_server, 0,
  116. "the AMQP server to connect to", "hostname"},
  117. {"port", 0, POPT_ARG_INT, &amqp_port, 0, "the port to connect on", "port"},
  118. {"vhost", 0, POPT_ARG_STRING, &amqp_vhost, 0,
  119. "the vhost to use when connecting", "vhost"},
  120. {"username", 0, POPT_ARG_STRING, &amqp_username, 0,
  121. "the username to login with", "username"},
  122. {"password", 0, POPT_ARG_STRING, &amqp_password, 0,
  123. "the password to login with", "password"},
  124. {"heartbeat", 0, POPT_ARG_INT, &amqp_heartbeat, 0,
  125. "heartbeat interval, set to 0 to disable", "heartbeat"},
  126. #ifdef WITH_SSL
  127. {"ssl", 0, POPT_ARG_NONE, &amqp_ssl, 0, "connect over SSL/TLS", NULL},
  128. {"cacert", 0, POPT_ARG_STRING, &amqp_cacert, 0,
  129. "path to the CA certificate file", "cacert.pem"},
  130. {"key", 0, POPT_ARG_STRING, &amqp_key, 0,
  131. "path to the client private key file", "key.pem"},
  132. {"cert", 0, POPT_ARG_STRING, &amqp_cert, 0,
  133. "path to the client certificate file", "cert.pem"},
  134. #endif /* WITH_SSL */
  135. {NULL, '\0', 0, NULL, 0, NULL, NULL}};
  136. static void init_connection_info(struct amqp_connection_info *ci) {
  137. ci->user = NULL;
  138. ci->password = NULL;
  139. ci->host = NULL;
  140. ci->port = -1;
  141. ci->vhost = NULL;
  142. ci->user = NULL;
  143. amqp_default_connection_info(ci);
  144. if (amqp_url)
  145. die_amqp_error(amqp_parse_url(strdup(amqp_url), ci), "Parsing URL '%s'",
  146. amqp_url);
  147. if (amqp_server) {
  148. char *colon;
  149. if (amqp_url) {
  150. die("--server and --url options cannot be used at the same time");
  151. }
  152. /* parse the server string into a hostname and a port */
  153. colon = strchr(amqp_server, ':');
  154. if (colon) {
  155. char *port_end;
  156. size_t host_len;
  157. /* Deprecate specifying the port number with the
  158. --server option, because it is not ipv6 friendly.
  159. --url now allows connection options to be
  160. specified concisely. */
  161. fprintf(stderr,
  162. "Specifying the port number with --server is deprecated\n");
  163. host_len = colon - amqp_server;
  164. ci->host = malloc(host_len + 1);
  165. memcpy(ci->host, amqp_server, host_len);
  166. ci->host[host_len] = 0;
  167. if (amqp_port >= 0) {
  168. die("both --server and --port options specify server port");
  169. }
  170. ci->port = strtol(colon + 1, &port_end, 10);
  171. if (ci->port < 0 || ci->port > 65535 || port_end == colon + 1 ||
  172. *port_end != 0)
  173. die("bad server port number in '%s'", amqp_server);
  174. } else {
  175. ci->host = amqp_server;
  176. ci->port = 5672;
  177. #if WITH_SSL
  178. if (amqp_ssl) {
  179. ci->port = 5671;
  180. }
  181. #endif
  182. }
  183. }
  184. #if WITH_SSL
  185. if (amqp_ssl && !ci->ssl) {
  186. if (amqp_url) {
  187. die("the --ssl option specifies an SSL connection"
  188. " but the --url option does not");
  189. } else {
  190. ci->ssl = 1;
  191. }
  192. }
  193. #endif
  194. if (amqp_port >= 0) {
  195. if (amqp_url) {
  196. die("--port and --url options cannot be used at the same time");
  197. }
  198. ci->port = amqp_port;
  199. }
  200. if (amqp_username) {
  201. if (amqp_url) {
  202. die("--username and --url options cannot be used at the same time");
  203. }
  204. ci->user = amqp_username;
  205. }
  206. if (amqp_password) {
  207. if (amqp_url) {
  208. die("--password and --url options cannot be used at the same time");
  209. }
  210. ci->password = amqp_password;
  211. }
  212. if (amqp_vhost) {
  213. if (amqp_url) {
  214. die("--vhost and --url options cannot be used at the same time");
  215. }
  216. ci->vhost = amqp_vhost;
  217. }
  218. if (amqp_heartbeat < 0) {
  219. die("--heartbeat must be a positive value");
  220. }
  221. }
  222. amqp_connection_state_t make_connection(void) {
  223. int status;
  224. amqp_socket_t *socket = NULL;
  225. struct amqp_connection_info ci;
  226. amqp_connection_state_t conn;
  227. init_connection_info(&ci);
  228. conn = amqp_new_connection();
  229. if (ci.ssl) {
  230. #ifdef WITH_SSL
  231. socket = amqp_ssl_socket_new(conn);
  232. if (!socket) {
  233. die("creating SSL/TLS socket");
  234. }
  235. if (amqp_cacert) {
  236. amqp_ssl_socket_set_cacert(socket, amqp_cacert);
  237. }
  238. if (amqp_key) {
  239. amqp_ssl_socket_set_key(socket, amqp_cert, amqp_key);
  240. }
  241. #else
  242. die("librabbitmq was not built with SSL/TLS support");
  243. #endif
  244. } else {
  245. socket = amqp_tcp_socket_new(conn);
  246. if (!socket) {
  247. die("creating TCP socket (out of memory)");
  248. }
  249. }
  250. status = amqp_socket_open(socket, ci.host, ci.port);
  251. if (status) {
  252. die_amqp_error(status, "opening socket to %s:%d", ci.host, ci.port);
  253. }
  254. die_rpc(amqp_login(conn, ci.vhost, 0, 131072, amqp_heartbeat,
  255. AMQP_SASL_METHOD_PLAIN, ci.user, ci.password),
  256. "logging in to AMQP server");
  257. if (!amqp_channel_open(conn, 1)) {
  258. die_rpc(amqp_get_rpc_reply(conn), "opening channel");
  259. }
  260. return conn;
  261. }
  262. void close_connection(amqp_connection_state_t conn) {
  263. int res;
  264. die_rpc(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "closing channel");
  265. die_rpc(amqp_connection_close(conn, AMQP_REPLY_SUCCESS),
  266. "closing connection");
  267. res = amqp_destroy_connection(conn);
  268. die_amqp_error(res, "closing connection");
  269. }
  270. amqp_bytes_t read_all(int fd) {
  271. size_t space = 4096;
  272. amqp_bytes_t bytes;
  273. bytes.bytes = malloc(space);
  274. bytes.len = 0;
  275. for (;;) {
  276. ssize_t res = read(fd, (char *)bytes.bytes + bytes.len, space - bytes.len);
  277. if (res == 0) {
  278. break;
  279. }
  280. if (res < 0) {
  281. if (errno == EINTR) {
  282. continue;
  283. }
  284. die_errno(errno, "reading");
  285. }
  286. bytes.len += res;
  287. if (bytes.len == space) {
  288. space *= 2;
  289. bytes.bytes = realloc(bytes.bytes, space);
  290. }
  291. }
  292. return bytes;
  293. }
  294. void write_all(int fd, amqp_bytes_t data) {
  295. while (data.len > 0) {
  296. ssize_t res = write(fd, data.bytes, data.len);
  297. if (res < 0) {
  298. die_errno(errno, "write");
  299. }
  300. data.len -= res;
  301. data.bytes = (char *)data.bytes + res;
  302. }
  303. }
  304. void copy_body(amqp_connection_state_t conn, int fd) {
  305. size_t body_remaining;
  306. amqp_frame_t frame;
  307. int res = amqp_simple_wait_frame(conn, &frame);
  308. die_amqp_error(res, "waiting for header frame");
  309. if (frame.frame_type != AMQP_FRAME_HEADER) {
  310. die("expected header, got frame type 0x%X", frame.frame_type);
  311. }
  312. body_remaining = frame.payload.properties.body_size;
  313. while (body_remaining) {
  314. res = amqp_simple_wait_frame(conn, &frame);
  315. die_amqp_error(res, "waiting for body frame");
  316. if (frame.frame_type != AMQP_FRAME_BODY) {
  317. die("expected body, got frame type 0x%X", frame.frame_type);
  318. }
  319. write_all(fd, frame.payload.body_fragment);
  320. body_remaining -= frame.payload.body_fragment.len;
  321. }
  322. }
  323. poptContext process_options(int argc, const char **argv,
  324. struct poptOption *options, const char *help) {
  325. int c;
  326. poptContext opts = poptGetContext(NULL, argc, argv, options, 0);
  327. poptSetOtherOptionHelp(opts, help);
  328. while ((c = poptGetNextOpt(opts)) >= 0) {
  329. /* no options require explicit handling */
  330. }
  331. if (c < -1) {
  332. fprintf(stderr, "%s: %s\n", poptBadOption(opts, POPT_BADOPTION_NOALIAS),
  333. poptStrerror(c));
  334. poptPrintUsage(opts, stderr, 0);
  335. exit(1);
  336. }
  337. return opts;
  338. }
  339. void process_all_options(int argc, const char **argv,
  340. struct poptOption *options) {
  341. poptContext opts = process_options(argc, argv, options, "[OPTIONS]...");
  342. const char *opt = poptPeekArg(opts);
  343. if (opt) {
  344. fprintf(stderr, "unexpected operand: %s\n", opt);
  345. poptPrintUsage(opts, stderr, 0);
  346. exit(1);
  347. }
  348. poptFreeContext(opts);
  349. }
  350. amqp_bytes_t cstring_bytes(const char *str) {
  351. return str ? amqp_cstring_bytes(str) : amqp_empty_bytes;
  352. }