123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654 |
- /*
- Copyright (c) 2013 Insollo Entertainment, LLC. All rights reserved.
- Copyright 2016 Garrett D'Amore <garrett@damore.org>
- Permission is hereby granted, free of charge, to any person obtaining a copy
- of this software and associated documentation files (the "Software"),
- to deal in the Software without restriction, including without limitation
- the rights to use, copy, modify, merge, publish, distribute, sublicense,
- and/or sell copies of the Software, and to permit persons to whom
- the Software is furnished to do so, subject to the following conditions:
- The above copyright notice and this permission notice shall be included
- in all copies or substantial portions of the Software.
- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
- THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
- FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
- IN THE SOFTWARE.
- */
- #include "../src/nn.h"
- #include "../src/pubsub.h"
- #include "../src/pipeline.h"
- #include "../src/bus.h"
- #include "../src/pair.h"
- #include "../src/survey.h"
- #include "../src/reqrep.h"
- #include "options.h"
- #include "../src/utils/sleep.c"
- #include "../src/utils/clock.c"
- #include <stdio.h>
- #include <string.h>
- #include <errno.h>
- #include <stdlib.h>
- #include <time.h>
- #include <ctype.h>
- #if !defined NN_HAVE_WINDOWS
- #include <unistd.h>
- #endif
- enum echo_format {
- NN_NO_ECHO,
- NN_ECHO_RAW,
- NN_ECHO_ASCII,
- NN_ECHO_QUOTED,
- NN_ECHO_MSGPACK,
- NN_ECHO_HEX
- };
- typedef struct nn_options {
- /* Global options */
- int verbose;
- /* Socket options */
- int socket_type;
- struct nn_string_list bind_addresses;
- struct nn_string_list connect_addresses;
- float send_timeout;
- float recv_timeout;
- struct nn_string_list subscriptions;
- char *socket_name;
- /* Output options */
- float send_delay;
- float send_interval;
- struct nn_blob data_to_send;
- /* Input options */
- enum echo_format echo_format;
- } nn_options_t;
- /* Constants to get address of in option declaration */
- static const int nn_push = NN_PUSH;
- static const int nn_pull = NN_PULL;
- static const int nn_pub = NN_PUB;
- static const int nn_sub = NN_SUB;
- static const int nn_req = NN_REQ;
- static const int nn_rep = NN_REP;
- static const int nn_bus = NN_BUS;
- static const int nn_pair = NN_PAIR;
- static const int nn_surveyor = NN_SURVEYOR;
- static const int nn_respondent = NN_RESPONDENT;
- struct nn_enum_item socket_types[] = {
- {"PUSH", NN_PUSH},
- {"PULL", NN_PULL},
- {"PUB", NN_PUB},
- {"SUB", NN_SUB},
- {"REQ", NN_REQ},
- {"REP", NN_REP},
- {"BUS", NN_BUS},
- {"PAIR", NN_PAIR},
- {"SURVEYOR", NN_SURVEYOR},
- {"RESPONDENT", NN_RESPONDENT},
- {NULL, 0},
- };
- /* Constants to get address of in option declaration */
- static const int nn_echo_raw = NN_ECHO_RAW;
- static const int nn_echo_ascii = NN_ECHO_ASCII;
- static const int nn_echo_quoted = NN_ECHO_QUOTED;
- static const int nn_echo_msgpack = NN_ECHO_MSGPACK;
- static const int nn_echo_hex = NN_ECHO_HEX;
- struct nn_enum_item echo_formats[] = {
- {"no", NN_NO_ECHO},
- {"raw", NN_ECHO_RAW},
- {"ascii", NN_ECHO_ASCII},
- {"quoted", NN_ECHO_QUOTED},
- {"msgpack", NN_ECHO_MSGPACK},
- {"hex", NN_ECHO_HEX},
- {NULL, 0},
- };
- /* Constants for conflict masks */
- #define NN_MASK_SOCK 1
- #define NN_MASK_WRITEABLE 2
- #define NN_MASK_READABLE 4
- #define NN_MASK_SOCK_SUB 8
- #define NN_MASK_DATA 16
- #define NN_MASK_ENDPOINT 32
- #define NN_NO_PROVIDES 0
- #define NN_NO_CONFLICTS 0
- #define NN_NO_REQUIRES 0
- #define NN_MASK_SOCK_WRITEABLE (NN_MASK_SOCK | NN_MASK_WRITEABLE)
- #define NN_MASK_SOCK_READABLE (NN_MASK_SOCK | NN_MASK_READABLE)
- #define NN_MASK_SOCK_READWRITE (NN_MASK_SOCK_WRITEABLE|NN_MASK_SOCK_READABLE)
- struct nn_option nn_options[] = {
- /* Generic options */
- {"verbose", 'v', NULL,
- NN_OPT_INCREMENT, offsetof (nn_options_t, verbose), NULL,
- NN_NO_PROVIDES, NN_NO_CONFLICTS, NN_NO_REQUIRES,
- "Generic", NULL, "Increase verbosity of the nanocat"},
- {"silent", 'q', NULL,
- NN_OPT_DECREMENT, offsetof (nn_options_t, verbose), NULL,
- NN_NO_PROVIDES, NN_NO_CONFLICTS, NN_NO_REQUIRES,
- "Generic", NULL, "Decrease verbosity of the nanocat"},
- {"help", 'h', NULL,
- NN_OPT_HELP, 0, NULL,
- NN_NO_PROVIDES, NN_NO_CONFLICTS, NN_NO_REQUIRES,
- "Generic", NULL, "This help text"},
- /* Socket types */
- {"push", 0, "nn_push",
- NN_OPT_SET_ENUM, offsetof (nn_options_t, socket_type), &nn_push,
- NN_MASK_SOCK_WRITEABLE, NN_MASK_SOCK, NN_MASK_DATA,
- "Socket Types", NULL, "Use NN_PUSH socket type"},
- {"pull", 0, "nn_pull",
- NN_OPT_SET_ENUM, offsetof (nn_options_t, socket_type), &nn_pull,
- NN_MASK_SOCK_READABLE, NN_MASK_SOCK, NN_NO_REQUIRES,
- "Socket Types", NULL, "Use NN_PULL socket type"},
- {"pub", 0, "nn_pub",
- NN_OPT_SET_ENUM, offsetof (nn_options_t, socket_type), &nn_pub,
- NN_MASK_SOCK_WRITEABLE, NN_MASK_SOCK, NN_MASK_DATA,
- "Socket Types", NULL, "Use NN_PUB socket type"},
- {"sub", 0, "nn_sub",
- NN_OPT_SET_ENUM, offsetof (nn_options_t, socket_type), &nn_sub,
- NN_MASK_SOCK_READABLE|NN_MASK_SOCK_SUB, NN_MASK_SOCK, NN_NO_REQUIRES,
- "Socket Types", NULL, "Use NN_SUB socket type"},
- {"req", 0, "nn_req",
- NN_OPT_SET_ENUM, offsetof (nn_options_t, socket_type), &nn_req,
- NN_MASK_SOCK_READWRITE, NN_MASK_SOCK, NN_MASK_DATA,
- "Socket Types", NULL, "Use NN_REQ socket type"},
- {"rep", 0, "nn_rep",
- NN_OPT_SET_ENUM, offsetof (nn_options_t, socket_type), &nn_rep,
- NN_MASK_SOCK_READWRITE, NN_MASK_SOCK, NN_NO_REQUIRES,
- "Socket Types", NULL, "Use NN_REP socket type"},
- {"surveyor", 0, "nn_surveyor",
- NN_OPT_SET_ENUM, offsetof (nn_options_t, socket_type), &nn_surveyor,
- NN_MASK_SOCK_READWRITE, NN_MASK_SOCK, NN_MASK_DATA,
- "Socket Types", NULL, "Use NN_SURVEYOR socket type"},
- {"respondent", 0, "nn_respondent",
- NN_OPT_SET_ENUM, offsetof (nn_options_t, socket_type), &nn_respondent,
- NN_MASK_SOCK_READWRITE, NN_MASK_SOCK, NN_NO_REQUIRES,
- "Socket Types", NULL, "Use NN_RESPONDENT socket type"},
- {"bus", 0, "nn_bus",
- NN_OPT_SET_ENUM, offsetof (nn_options_t, socket_type), &nn_bus,
- NN_MASK_SOCK_READWRITE, NN_MASK_SOCK, NN_NO_REQUIRES,
- "Socket Types", NULL, "Use NN_BUS socket type"},
- {"pair", 0, "nn_pair",
- NN_OPT_SET_ENUM, offsetof (nn_options_t, socket_type), &nn_pair,
- NN_MASK_SOCK_READWRITE, NN_MASK_SOCK, NN_NO_REQUIRES,
- "Socket Types", NULL, "Use NN_PAIR socket type"},
- /* Socket Options */
- {"bind", 0, NULL,
- NN_OPT_LIST_APPEND, offsetof (nn_options_t, bind_addresses), NULL,
- NN_MASK_ENDPOINT, NN_NO_CONFLICTS, NN_NO_REQUIRES,
- "Socket Options", "ADDR", "Bind socket to the address ADDR"},
- {"connect", 0, NULL,
- NN_OPT_LIST_APPEND, offsetof (nn_options_t, connect_addresses), NULL,
- NN_MASK_ENDPOINT, NN_NO_CONFLICTS, NN_NO_REQUIRES,
- "Socket Options", "ADDR", "Connect socket to the address ADDR"},
- {"bind-ipc", 'X' , NULL, NN_OPT_LIST_APPEND_FMT,
- offsetof (nn_options_t, bind_addresses), "ipc://%s",
- NN_MASK_ENDPOINT, NN_NO_CONFLICTS, NN_NO_REQUIRES,
- "Socket Options", "PATH", "Bind socket to the ipc address "
- "\"ipc://PATH\"."},
- {"connect-ipc", 'x' , NULL, NN_OPT_LIST_APPEND_FMT,
- offsetof (nn_options_t, connect_addresses), "ipc://%s",
- NN_MASK_ENDPOINT, NN_NO_CONFLICTS, NN_NO_REQUIRES,
- "Socket Options", "PATH", "Connect socket to the ipc address "
- "\"ipc://PATH\"."},
- {"bind-local", 'L' , NULL, NN_OPT_LIST_APPEND_FMT,
- offsetof (nn_options_t, bind_addresses), "tcp://127.0.0.1:%s",
- NN_MASK_ENDPOINT, NN_NO_CONFLICTS, NN_NO_REQUIRES,
- "Socket Options", "PORT", "Bind socket to the tcp address "
- "\"tcp://127.0.0.1:PORT\"."},
- {"connect-local", 'l' , NULL, NN_OPT_LIST_APPEND_FMT,
- offsetof (nn_options_t, connect_addresses), "tcp://127.0.0.1:%s",
- NN_MASK_ENDPOINT, NN_NO_CONFLICTS, NN_NO_REQUIRES,
- "Socket Options", "PORT", "Connect socket to the tcp address "
- "\"tcp://127.0.0.1:PORT\"."},
- {"recv-timeout", 0, NULL,
- NN_OPT_FLOAT, offsetof (nn_options_t, recv_timeout), NULL,
- NN_NO_PROVIDES, NN_NO_CONFLICTS, NN_MASK_READABLE,
- "Socket Options", "SEC", "Set timeout for receiving a message"},
- {"send-timeout", 0, NULL,
- NN_OPT_FLOAT, offsetof (nn_options_t, send_timeout), NULL,
- NN_NO_PROVIDES, NN_NO_CONFLICTS, NN_MASK_WRITEABLE,
- "Socket Options", "SEC", "Set timeout for sending a message"},
- {"socket-name", 0, NULL,
- NN_OPT_STRING, offsetof (nn_options_t, socket_name), NULL,
- NN_NO_PROVIDES, NN_NO_CONFLICTS, NN_NO_REQUIRES,
- "Socket Options", "NAME", "Name of the socket for statistics"},
- /* Pattern-specific options */
- {"subscribe", 0, NULL,
- NN_OPT_LIST_APPEND, offsetof (nn_options_t, subscriptions), NULL,
- NN_NO_PROVIDES, NN_NO_CONFLICTS, NN_MASK_SOCK_SUB,
- "SUB Socket Options", "PREFIX", "Subscribe to the prefix PREFIX. "
- "Note: socket will be subscribed to everything (empty prefix) if "
- "no prefixes are specified on the command-line."},
- /* Input Options */
- {"format", 0, NULL,
- NN_OPT_ENUM, offsetof (nn_options_t, echo_format), &echo_formats,
- NN_NO_PROVIDES, NN_NO_CONFLICTS, NN_MASK_READABLE,
- "Input Options", "FORMAT", "Use echo format FORMAT "
- "(same as the options below)"},
- {"raw", 0, NULL,
- NN_OPT_SET_ENUM, offsetof (nn_options_t, echo_format), &nn_echo_raw,
- NN_NO_PROVIDES, NN_NO_CONFLICTS, NN_MASK_READABLE,
- "Input Options", NULL, "Dump message as is "
- "(Note: no delimiters are printed)"},
- {"ascii", 'A', NULL,
- NN_OPT_SET_ENUM, offsetof (nn_options_t, echo_format), &nn_echo_ascii,
- NN_NO_PROVIDES, NN_NO_CONFLICTS, NN_MASK_READABLE,
- "Input Options", NULL, "Print ASCII part of message delimited by newline. "
- "All non-ascii characters replaced by dot."},
- {"quoted", 'Q', NULL,
- NN_OPT_SET_ENUM, offsetof (nn_options_t, echo_format), &nn_echo_quoted,
- NN_NO_PROVIDES, NN_NO_CONFLICTS, NN_MASK_READABLE,
- "Input Options", NULL, "Print each message on separate line in double "
- "quotes with C-like character escaping"},
- {"msgpack", 0, NULL,
- NN_OPT_SET_ENUM, offsetof (nn_options_t, echo_format), &nn_echo_msgpack,
- NN_NO_PROVIDES, NN_NO_CONFLICTS, NN_MASK_READABLE,
- "Input Options", NULL, "Print each message as msgpacked string (raw type)."
- " This is useful for programmatic parsing."},
- {"hex", 0, NULL,
- NN_OPT_SET_ENUM, offsetof (nn_options_t, echo_format), &nn_echo_hex,
- NN_NO_PROVIDES, NN_NO_CONFLICTS, NN_MASK_READABLE,
- "Input Options", NULL, "Print each message on separate line in double "
- "quotes with hex values"},
- /* Output Options */
- {"interval", 'i', NULL,
- NN_OPT_FLOAT, offsetof (nn_options_t, send_interval), NULL,
- NN_NO_PROVIDES, NN_NO_CONFLICTS, NN_MASK_WRITEABLE,
- "Output Options", "SEC", "Send message (or request) every SEC seconds"},
- {"delay", 'd', NULL,
- NN_OPT_FLOAT, offsetof (nn_options_t, send_delay), NULL,
- NN_NO_PROVIDES, NN_NO_CONFLICTS, NN_NO_REQUIRES,
- "Output Options", "SEC", "Wait for SEC seconds before sending message"
- " (useful for one-shot PUB sockets)"},
- {"data", 'D', NULL,
- NN_OPT_BLOB, offsetof (nn_options_t, data_to_send), &echo_formats,
- NN_MASK_DATA, NN_MASK_DATA, NN_MASK_WRITEABLE,
- "Output Options", "DATA", "Send DATA to the socket and quit for "
- "PUB, PUSH, PAIR, BUS socket. Use DATA to reply for REP or "
- " RESPONDENT socket. Send DATA as request for REQ or SURVEYOR socket."},
- {"file", 'F', NULL,
- NN_OPT_READ_FILE, offsetof (nn_options_t, data_to_send), &echo_formats,
- NN_MASK_DATA, NN_MASK_DATA, NN_MASK_WRITEABLE,
- "Output Options", "PATH", "Same as --data but get data from file PATH"},
- /* Sentinel */
- {NULL, 0, NULL,
- 0, 0, NULL,
- 0, 0, 0,
- NULL, NULL, NULL},
- };
- struct nn_commandline nn_cli = {
- "A command-line interface to nanomsg",
- "",
- nn_options,
- NN_MASK_SOCK | NN_MASK_ENDPOINT,
- };
- void nn_assert_errno (int flag, char *description)
- {
- int err;
- if (!flag) {
- err = errno;
- fprintf (stderr, "%s: %s\n", description, nn_strerror (err));
- exit (3);
- }
- }
- void nn_sub_init (nn_options_t *options, int sock)
- {
- int i;
- int rc;
- if (options->subscriptions.num) {
- for (i = 0; i < options->subscriptions.num; ++i) {
- rc = nn_setsockopt (sock, NN_SUB, NN_SUB_SUBSCRIBE,
- options->subscriptions.items[i],
- strlen (options->subscriptions.items[i]));
- nn_assert_errno (rc == 0, "Can't subscribe");
- }
- } else {
- rc = nn_setsockopt (sock, NN_SUB, NN_SUB_SUBSCRIBE, "", 0);
- nn_assert_errno (rc == 0, "Can't subscribe");
- }
- }
- void nn_set_recv_timeout (int sock, int millis)
- {
- int rc;
- rc = nn_setsockopt (sock, NN_SOL_SOCKET, NN_RCVTIMEO,
- &millis, sizeof (millis));
- nn_assert_errno (rc == 0, "Can't set recv timeout");
- }
- int nn_create_socket (nn_options_t *options)
- {
- int sock;
- int rc;
- int millis;
- sock = nn_socket (AF_SP, options->socket_type);
- nn_assert_errno (sock >= 0, "Can't create socket");
- /* Generic initialization */
- if (options->send_timeout >= 0) {
- millis = (int)(options->send_timeout * 1000);
- rc = nn_setsockopt (sock, NN_SOL_SOCKET, NN_SNDTIMEO,
- &millis, sizeof (millis));
- nn_assert_errno (rc == 0, "Can't set send timeout");
- }
- if (options->recv_timeout >= 0) {
- nn_set_recv_timeout (sock, (int) options->recv_timeout * 1000);
- }
- if (options->socket_name) {
- rc = nn_setsockopt (sock, NN_SOL_SOCKET, NN_SOCKET_NAME,
- options->socket_name, strlen(options->socket_name));
- nn_assert_errno (rc == 0, "Can't set socket name");
- }
- /* Specific initialization */
- switch (options->socket_type) {
- case NN_SUB:
- nn_sub_init (options, sock);
- break;
- }
- return sock;
- }
- void nn_print_message (nn_options_t *options, char *buf, int buflen)
- {
- switch (options->echo_format) {
- case NN_NO_ECHO:
- return;
- case NN_ECHO_RAW:
- fwrite (buf, 1, buflen, stdout);
- break;
- case NN_ECHO_ASCII:
- for (; buflen > 0; --buflen, ++buf) {
- if (isprint (*buf)) {
- fputc (*buf, stdout);
- } else {
- fputc ('.', stdout);
- }
- }
- fputc ('\n', stdout);
- break;
- case NN_ECHO_QUOTED:
- fputc ('"', stdout);
- for (; buflen > 0; --buflen, ++buf) {
- switch (*buf) {
- case '\n':
- fprintf (stdout, "\\n");
- break;
- case '\r':
- fprintf (stdout, "\\r");
- break;
- case '\\':
- case '\"':
- fprintf (stdout, "\\%c", *buf);
- break;
- default:
- if (isprint (*buf)) {
- fputc (*buf, stdout);
- } else {
- fprintf (stdout, "\\x%02x", (unsigned char)*buf);
- }
- }
- }
- fprintf (stdout, "\"\n");
- break;
- case NN_ECHO_MSGPACK:
- if (buflen < 256) {
- fputc ('\xc4', stdout);
- fputc (buflen, stdout);
- fwrite (buf, 1, buflen, stdout);
- } else if (buflen < 65536) {
- fputc ('\xc5', stdout);
- fputc (buflen >> 8, stdout);
- fputc (buflen & 0xff, stdout);
- fwrite (buf, 1, buflen, stdout);
- } else {
- fputc ('\xc6', stdout);
- fputc (buflen >> 24, stdout);
- fputc ((buflen >> 16) & 0xff, stdout);
- fputc ((buflen >> 8) & 0xff, stdout);
- fputc (buflen & 0xff, stdout);
- fwrite (buf, 1, buflen, stdout);
- }
- break;
- case NN_ECHO_HEX:
- fputc ('"', stdout);
- for (; buflen > 0; --buflen, ++buf) {
- fprintf (stdout, "\\x%02x", (unsigned char)*buf);
- }
- fprintf (stdout, "\"\n");
- break;
-
- }
- fflush (stdout);
- }
- void nn_connect_socket (nn_options_t *options, int sock)
- {
- int i;
- int rc;
- for (i = 0; i < options->bind_addresses.num; ++i) {
- rc = nn_bind (sock, options->bind_addresses.items[i]);
- nn_assert_errno (rc >= 0, "Can't bind");
- }
- for (i = 0; i < options->connect_addresses.num; ++i) {
- rc = nn_connect (sock, options->connect_addresses.items[i]);
- nn_assert_errno (rc >= 0, "Can't connect");
- }
- }
- void nn_send_loop (nn_options_t *options, int sock)
- {
- int rc;
- uint64_t start_time;
- int64_t time_to_sleep, interval;
- interval = (int)(options->send_interval*1000);
- for (;;) {
- start_time = nn_clock_ms();
- rc = nn_send (sock,
- options->data_to_send.data, options->data_to_send.length,
- 0);
- if (rc < 0 && errno == EAGAIN) {
- fprintf (stderr, "Message not sent (EAGAIN)\n");
- } else {
- nn_assert_errno (rc >= 0, "Can't send");
- }
- if (interval >= 0) {
- time_to_sleep = (start_time + interval) - nn_clock_ms();
- if (time_to_sleep > 0) {
- nn_sleep ((int) time_to_sleep);
- }
- } else {
- break;
- }
- }
- }
- void nn_recv_loop (nn_options_t *options, int sock)
- {
- int rc;
- void *buf;
- for (;;) {
- rc = nn_recv (sock, &buf, NN_MSG, 0);
- if (rc < 0 && errno == EAGAIN) {
- continue;
- } else if (rc < 0 && (errno == ETIMEDOUT || errno == EFSM)) {
- return; /* No more messages possible */
- } else {
- nn_assert_errno (rc >= 0, "Can't recv");
- }
- nn_print_message (options, buf, rc);
- nn_freemsg (buf);
- }
- }
- void nn_rw_loop (nn_options_t *options, int sock)
- {
- int rc;
- void *buf;
- uint64_t start_time;
- int64_t time_to_sleep, interval, recv_timeout;
- interval = (int)(options->send_interval*1000);
- recv_timeout = (int)(options->recv_timeout*1000);
- for (;;) {
- start_time = nn_clock_ms();
- rc = nn_send (sock,
- options->data_to_send.data, options->data_to_send.length,
- 0);
- if (rc < 0 && errno == EAGAIN) {
- fprintf (stderr, "Message not sent (EAGAIN)\n");
- } else {
- nn_assert_errno (rc >= 0, "Can't send");
- }
- if (options->send_interval < 0) { /* Never send any more */
- nn_recv_loop (options, sock);
- return;
- }
- for (;;) {
- time_to_sleep = (start_time + interval) - nn_clock_ms();
- if (time_to_sleep <= 0) {
- break;
- }
- if (recv_timeout >= 0 && time_to_sleep > recv_timeout)
- {
- time_to_sleep = recv_timeout;
- }
- nn_set_recv_timeout (sock, (int) time_to_sleep);
- rc = nn_recv (sock, &buf, NN_MSG, 0);
- if (rc < 0) {
- if (errno == EAGAIN) {
- continue;
- } else if (errno == ETIMEDOUT || errno == EFSM) {
- time_to_sleep = (start_time + interval) - nn_clock_ms();
- if (time_to_sleep > 0)
- nn_sleep ((int) time_to_sleep);
- continue;
- }
- }
- nn_assert_errno (rc >= 0, "Can't recv");
- nn_print_message (options, buf, rc);
- nn_freemsg (buf);
- }
- }
- }
- void nn_resp_loop (nn_options_t *options, int sock)
- {
- int rc;
- void *buf;
- for (;;) {
- rc = nn_recv (sock, &buf, NN_MSG, 0);
- if (rc < 0 && errno == EAGAIN) {
- continue;
- } else {
- nn_assert_errno (rc >= 0, "Can't recv");
- }
- nn_print_message (options, buf, rc);
- nn_freemsg (buf);
- rc = nn_send (sock,
- options->data_to_send.data, options->data_to_send.length,
- 0);
- if (rc < 0 && errno == EAGAIN) {
- fprintf (stderr, "Message not sent (EAGAIN)\n");
- } else {
- nn_assert_errno (rc >= 0, "Can't send");
- }
- }
- }
- int main (int argc, char **argv)
- {
- int sock;
- nn_options_t options = {
- /* verbose */ 0,
- /* socket_type */ 0,
- /* bind_addresses */ {NULL, NULL, 0, 0},
- /* connect_addresses */ {NULL, NULL, 0, 0},
- /* send_timeout */ -1.f,
- /* recv_timeout */ -1.f,
- /* subscriptions */ {NULL, NULL, 0, 0},
- /* socket_name */ NULL,
- /* send_delay */ 0.f,
- /* send_interval */ -1.f,
- /* data_to_send */ {NULL, 0, 0},
- /* echo_format */ NN_NO_ECHO
- };
- nn_parse_options (&nn_cli, &options, argc, argv);
- sock = nn_create_socket (&options);
- nn_connect_socket (&options, sock);
- nn_sleep((int)(options.send_delay*1000));
- switch (options.socket_type) {
- case NN_PUB:
- case NN_PUSH:
- nn_send_loop (&options, sock);
- break;
- case NN_SUB:
- case NN_PULL:
- nn_recv_loop (&options, sock);
- break;
- case NN_BUS:
- case NN_PAIR:
- if (options.data_to_send.data) {
- nn_rw_loop (&options, sock);
- } else {
- nn_recv_loop (&options, sock);
- }
- break;
- case NN_SURVEYOR:
- case NN_REQ:
- nn_rw_loop (&options, sock);
- break;
- case NN_REP:
- case NN_RESPONDENT:
- if (options.data_to_send.data) {
- nn_resp_loop (&options, sock);
- } else {
- nn_recv_loop (&options, sock);
- }
- break;
- }
- nn_close (sock);
- nn_free_options(&nn_cli, &options);
- return 0;
- }
|