test_persistence.c 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580
  1. /*******************************************************************************
  2. * Copyright (c) 2012, 2022 IBM Corp., Ian Craggs
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v2.0
  6. * and Eclipse Distribution License v1.0 which accompany this distribution.
  7. *
  8. * The Eclipse Public License is available at
  9. * https://www.eclipse.org/legal/epl-2.0/
  10. * and the Eclipse Distribution License is available at
  11. * http://www.eclipse.org/org/documents/edl-v10.php.
  12. *
  13. * Contributors:
  14. * Ian Craggs - initial API and implementation and/or initial documentation
  15. * Ian Craggs - correct some compile warnings
  16. * Ian Craggs - add binary will message test
  17. * Ian Craggs - MQTT V5 updates
  18. *******************************************************************************/
  19. /**
  20. * @file
  21. * Offline buffering and automatic reconnect tests for the Paho Asynchronous MQTT C client
  22. *
  23. */
  24. #include "MQTTAsync.h"
  25. #include <string.h>
  26. #include <stdlib.h>
  27. #include "Thread.h"
  28. #if !defined(_WINDOWS)
  29. #include <sys/time.h>
  30. #include <sys/socket.h>
  31. #include <unistd.h>
  32. #include <errno.h>
  33. #else
  34. #include <windows.h>
  35. #endif
  36. char unique[50]; // unique suffix/prefix to add to clientid/topic etc
  37. #define ARRAY_SIZE(a) (sizeof(a) / sizeof(a[0]))
  38. void usage(void)
  39. {
  40. printf("help!!\n");
  41. exit(EXIT_FAILURE);
  42. }
  43. struct Options
  44. {
  45. char* connection; /**< connection to system under test. */
  46. char* proxy_connection; /**< connection to proxy */
  47. int verbose;
  48. int test_no;
  49. } options =
  50. {
  51. "localhost:1883",
  52. "localhost:1884",
  53. 0,
  54. 0,
  55. };
  56. void getopts(int argc, char** argv)
  57. {
  58. int count = 1;
  59. while (count < argc)
  60. {
  61. if (strcmp(argv[count], "--test_no") == 0)
  62. {
  63. if (++count < argc)
  64. options.test_no = atoi(argv[count]);
  65. else
  66. usage();
  67. }
  68. else if (strcmp(argv[count], "--connection") == 0)
  69. {
  70. if (++count < argc)
  71. options.connection = argv[count];
  72. else
  73. usage();
  74. }
  75. else if (strcmp(argv[count], "--proxy_connection") == 0)
  76. {
  77. if (++count < argc)
  78. options.proxy_connection = argv[count];
  79. else
  80. usage();
  81. }
  82. else if (strcmp(argv[count], "--verbose") == 0)
  83. options.verbose = 1;
  84. count++;
  85. }
  86. }
  87. #define LOGA_DEBUG 0
  88. #define LOGA_INFO 1
  89. #include <stdarg.h>
  90. #include <time.h>
  91. #include <sys/timeb.h>
  92. void MyLog(int LOGA_level, char* format, ...)
  93. {
  94. static char msg_buf[256];
  95. va_list args;
  96. struct timeb ts;
  97. struct tm *timeinfo;
  98. if (LOGA_level == LOGA_DEBUG && options.verbose == 0)
  99. return;
  100. ftime(&ts);
  101. timeinfo = localtime(&ts.time);
  102. strftime(msg_buf, 80, "%Y%m%d %H%M%S", timeinfo);
  103. sprintf(&msg_buf[strlen(msg_buf)], ".%.3hu ", ts.millitm);
  104. va_start(args, format);
  105. vsnprintf(&msg_buf[strlen(msg_buf)], sizeof(msg_buf) - strlen(msg_buf),
  106. format, args);
  107. va_end(args);
  108. printf("%s\n", msg_buf);
  109. fflush(stdout);
  110. }
  111. void MySleep(long milliseconds)
  112. {
  113. #if defined(_WIN32) || defined(_WIN64)
  114. Sleep(milliseconds);
  115. #else
  116. usleep(milliseconds*1000);
  117. #endif
  118. }
  119. #if defined(_WIN32) || defined(_WINDOWS)
  120. #define START_TIME_TYPE DWORD
  121. static DWORD start_time = 0;
  122. START_TIME_TYPE start_clock(void)
  123. {
  124. return GetTickCount();
  125. }
  126. #elif defined(AIX)
  127. #define START_TIME_TYPE struct timespec
  128. START_TIME_TYPE start_clock(void)
  129. {
  130. static struct timespec start;
  131. clock_gettime(CLOCK_REALTIME, &start);
  132. return start;
  133. }
  134. #else
  135. #define START_TIME_TYPE struct timeval
  136. /* TODO - unused - remove? static struct timeval start_time; */
  137. START_TIME_TYPE start_clock(void)
  138. {
  139. struct timeval start_time;
  140. gettimeofday(&start_time, NULL);
  141. return start_time;
  142. }
  143. #endif
  144. #if defined(_WIN32)
  145. long elapsed(START_TIME_TYPE start_time)
  146. {
  147. return GetTickCount() - start_time;
  148. }
  149. #elif defined(AIX)
  150. #define assert(a)
  151. long elapsed(struct timespec start)
  152. {
  153. struct timespec now, res;
  154. clock_gettime(CLOCK_REALTIME, &now);
  155. ntimersub(now, start, res);
  156. return (res.tv_sec)*1000L + (res.tv_nsec)/1000000L;
  157. }
  158. #else
  159. long elapsed(START_TIME_TYPE start_time)
  160. {
  161. struct timeval now, res;
  162. gettimeofday(&now, NULL);
  163. timersub(&now, &start_time, &res);
  164. return (res.tv_sec) * 1000 + (res.tv_usec) / 1000;
  165. }
  166. #endif
  167. #define assert(a, b, c, d) myassert(__FILE__, __LINE__, a, b, c, d)
  168. #define assert1(a, b, c, d, e) myassert(__FILE__, __LINE__, a, b, c, d, e)
  169. #define MAXMSGS 30;
  170. int tests = 0;
  171. int failures = 0;
  172. FILE* xml;
  173. START_TIME_TYPE global_start_time;
  174. char output[3000];
  175. char* cur_output = output;
  176. void write_test_result(void)
  177. {
  178. long duration = elapsed(global_start_time);
  179. fprintf(xml, " time=\"%ld.%.3ld\" >\n", duration / 1000, duration % 1000);
  180. if (cur_output != output)
  181. {
  182. fprintf(xml, "%s", output);
  183. cur_output = output;
  184. }
  185. fprintf(xml, "</testcase>\n");
  186. }
  187. void myassert(char* filename, int lineno, char* description, int value,
  188. char* format, ...)
  189. {
  190. ++tests;
  191. if (!value)
  192. {
  193. va_list args;
  194. ++failures;
  195. MyLog(LOGA_INFO, "Assertion failed, file %s, line %d, description: %s", filename,
  196. lineno, description);
  197. va_start(args, format);
  198. vprintf(format, args);
  199. va_end(args);
  200. cur_output += sprintf(cur_output, "<failure type=\"%s\">file %s, line %d </failure>\n",
  201. description, filename, lineno);
  202. }
  203. else
  204. MyLog(LOGA_DEBUG, "Assertion succeeded, file %s, line %d, description: %s",
  205. filename, lineno, description);
  206. }
  207. void logProperties(MQTTProperties *props)
  208. {
  209. int i = 0;
  210. for (i = 0; i < props->count; ++i)
  211. {
  212. int id = props->array[i].identifier;
  213. const char* name = MQTTPropertyName(id);
  214. char* intformat = "Property name %s value %d";
  215. switch (MQTTProperty_getType(id))
  216. {
  217. case MQTTPROPERTY_TYPE_BYTE:
  218. MyLog(LOGA_DEBUG, intformat, name, props->array[i].value.byte);
  219. break;
  220. case MQTTPROPERTY_TYPE_TWO_BYTE_INTEGER:
  221. MyLog(LOGA_DEBUG, intformat, name, props->array[i].value.integer2);
  222. break;
  223. case MQTTPROPERTY_TYPE_FOUR_BYTE_INTEGER:
  224. MyLog(LOGA_DEBUG, intformat, name, props->array[i].value.integer4);
  225. break;
  226. case MQTTPROPERTY_TYPE_VARIABLE_BYTE_INTEGER:
  227. MyLog(LOGA_DEBUG, intformat, name, props->array[i].value.integer4);
  228. break;
  229. case MQTTPROPERTY_TYPE_BINARY_DATA:
  230. case MQTTPROPERTY_TYPE_UTF_8_ENCODED_STRING:
  231. MyLog(LOGA_DEBUG, "Property name %s value len %.*s", name,
  232. props->array[i].value.data.len, props->array[i].value.data.data);
  233. break;
  234. case MQTTPROPERTY_TYPE_UTF_8_STRING_PAIR:
  235. MyLog(LOGA_DEBUG, "Property name %s key %.*s value %.*s", name,
  236. props->array[i].value.data.len, props->array[i].value.data.data,
  237. props->array[i].value.value.len, props->array[i].value.value.data);
  238. break;
  239. }
  240. }
  241. }
  242. char willTopic[100];
  243. char test_topic[50];
  244. /*********************************************************************
  245. Test7: Fill up TCP buffer with QoS 0 messages
  246. *********************************************************************/
  247. int test7c_connected = 0;
  248. int test7_will_message_received = 0;
  249. int test7_messages_received = 0;
  250. int test7Finished = 0;
  251. int test7OnFailureCalled = 0;
  252. int test7_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
  253. {
  254. MQTTAsync c = (MQTTAsync)context;
  255. static int message_count = 0;
  256. MyLog(LOGA_DEBUG, "Message received on topic %s, \"%.*s\"", topicName, message->payloadlen, message->payload);
  257. if (memcmp(message->payload, "will message", message->payloadlen) == 0)
  258. test7_will_message_received = 1;
  259. else
  260. test7_messages_received++;
  261. MQTTAsync_freeMessage(&message);
  262. MQTTAsync_free(topicName);
  263. return 1;
  264. }
  265. void test7cConnected(void* context, char* cause)
  266. {
  267. MQTTAsync c = (MQTTAsync)context;
  268. MyLog(LOGA_DEBUG, "In connected callback for client c, context %p\n", context);
  269. test7c_connected = 1;
  270. }
  271. void test7cOnConnectFailure(void* context, MQTTAsync_failureData5* response)
  272. {
  273. MyLog(LOGA_DEBUG, "In c connect onFailure callback, context %p", context);
  274. test7OnFailureCalled++;
  275. test7Finished = 1;
  276. }
  277. void test7cOnConnectSuccess(void* context, MQTTAsync_successData5* response)
  278. {
  279. MQTTAsync c = (MQTTAsync)context;
  280. MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
  281. MyLog(LOGA_DEBUG, "In connect onSuccess callback for client c, context %p\n", context);
  282. /* send a message to the proxy to break the connection */
  283. pubmsg.payload = "TERMINATE";
  284. pubmsg.payloadlen = (int)strlen(pubmsg.payload);
  285. pubmsg.qos = 0;
  286. pubmsg.retained = 0;
  287. //rc = MQTTAsync_sendMessage(c, "MQTTSAS topic", &pubmsg, NULL);
  288. //assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  289. }
  290. int test7(struct Options options)
  291. {
  292. char* testname = "test7";
  293. int subsqos = 2;
  294. MQTTAsync c;
  295. MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer5;
  296. MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
  297. MQTTAsync_createOptions createOpts = MQTTAsync_createOptions_initializer;
  298. int rc = 0;
  299. int count = 0;
  300. char clientidc[50];
  301. int i = 0;
  302. test7_will_message_received = 0;
  303. test7_messages_received = 0;
  304. test7Finished = 0;
  305. test7OnFailureCalled = 0;
  306. test7c_connected = 0;
  307. sprintf(willTopic, "paho-test95-7-%s", unique);
  308. sprintf(clientidc, "paho-test9-7-c-%s", "same"); //unique);
  309. sprintf(test_topic, "longer paho-test9-7-test topic %s", unique);
  310. test7Finished = 0;
  311. failures = 0;
  312. MyLog(LOGA_INFO, "Starting Offline buffering 7 - many persisted messages");
  313. fprintf(xml, "<testcase classname=\"test7\" name=\"%s\"", testname);
  314. global_start_time = start_clock();
  315. createOpts.MQTTVersion = MQTTVERSION_5;
  316. createOpts.allowDisconnectedSendAtAnyTime = 1;
  317. createOpts.sendWhileDisconnected = 1;
  318. createOpts.maxBufferedMessages = 64000;
  319. createOpts.persistQoS0 = 1;
  320. printf("Create starting\n");
  321. START_TIME_TYPE start = start_clock();
  322. rc = MQTTAsync_createWithOptions(&c, options.proxy_connection, clientidc, MQTTCLIENT_PERSISTENCE_DEFAULT,
  323. NULL, &createOpts);
  324. long duration = elapsed(start);
  325. assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
  326. if (rc != MQTTASYNC_SUCCESS)
  327. {
  328. MQTTAsync_destroy(&c);
  329. goto exit;
  330. }
  331. printf("Create finished after %ld ms\n", duration);
  332. MQTTAsync_token *tokens, *cur_token;
  333. MQTTAsync_getPendingTokens(c, &tokens);
  334. int token_count = 0;
  335. if ((cur_token = tokens) != NULL)
  336. {
  337. while (*cur_token != -1)
  338. {
  339. cur_token++;
  340. token_count++;
  341. }
  342. }
  343. printf("%d messages restored\n", token_count);
  344. if (tokens)
  345. MQTTAsync_free(tokens);
  346. opts.keepAliveInterval = 20;
  347. opts.cleansession = 1;
  348. rc = MQTTAsync_setCallbacks(c, c, NULL, test7_messageArrived, NULL);
  349. assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  350. #if 0
  351. opts.will = NULL; /* don't need will for this client, as it's going to be connected all the time */
  352. opts.context = c;
  353. opts.onSuccess5 = test7cOnConnectSuccess;
  354. opts.onFailure5 = test7cOnConnectFailure;
  355. MyLog(LOGA_DEBUG, "Connecting client c");
  356. rc = MQTTAsync_connect(c, &opts);
  357. assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  358. if (rc != MQTTASYNC_SUCCESS)
  359. {
  360. failures++;
  361. goto exit;
  362. }
  363. /* wait until d is ready: connected and subscribed */
  364. count = 0;
  365. while (!test7cReady && ++count < 10000)
  366. {
  367. if (test7Finished)
  368. goto exit;
  369. MySleep(100);
  370. }
  371. assert("Count should be less than 10000", count < 10000, "count was %d", count); /* wrong */
  372. #endif
  373. rc = MQTTAsync_setConnected(c, c, test7cConnected);
  374. assert("Good rc from setConnectedCallback", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  375. /* let client c go: connect, and send disconnect command to proxy */
  376. opts.will = &wopts;
  377. opts.will->payload.data = "will message";
  378. opts.will->payload.len = (int)strlen(opts.will->payload.data) + 1;
  379. opts.will->qos = 1;
  380. opts.will->retained = 0;
  381. opts.will->topicName = willTopic;
  382. opts.onSuccess5 = test7cOnConnectSuccess;
  383. opts.onFailure5 = test7cOnConnectFailure;
  384. opts.context = c;
  385. opts.cleansession = 0;
  386. /*opts.automaticReconnect = 1;
  387. opts.minRetryInterval = 3;
  388. opts.maxRetryInterval = 6;*/
  389. #if 0
  390. MyLog(LOGA_DEBUG, "Connecting client c");
  391. rc = MQTTAsync_connect(c, &opts);
  392. assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  393. if (rc != MQTTASYNC_SUCCESS)
  394. {
  395. failures++;
  396. goto exit;
  397. }
  398. count = 0;
  399. while (!test7c_connected && ++count < 10000)
  400. MySleep(100);
  401. assert("Count should be less than 10000", count < 10000, "count was %d", count); /* wrong */
  402. #endif
  403. /* wait for will message */
  404. //while (test7_will_message_received == 0 && ++count < 10000)
  405. // MySleep(100);
  406. MyLog(LOGA_DEBUG, "Now we can send some messages to be buffered by TCP");
  407. test7c_connected = 0;
  408. #define PAYLOAD_LEN 500
  409. char buf[PAYLOAD_LEN];
  410. /* send some messages. Then reconnect (check connected callback), and check that those messages are received */
  411. for (i = 0; i < 50000; ++i)
  412. {
  413. MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
  414. MQTTAsync_responseOptions pubopts = MQTTAsync_responseOptions_initializer;
  415. pubmsg.qos = i % 3;
  416. sprintf(buf, "QoS %d message", pubmsg.qos);
  417. pubmsg.payload = buf;
  418. pubmsg.payloadlen = PAYLOAD_LEN;
  419. pubmsg.retained = 0;
  420. rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &pubopts);
  421. assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
  422. if (rc != MQTTASYNC_SUCCESS)
  423. {
  424. MySleep(3000);
  425. MyLog(LOGA_DEBUG, "Connecting client c");
  426. rc = MQTTAsync_connect(c, &opts);
  427. assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  428. if (rc != MQTTASYNC_SUCCESS)
  429. {
  430. failures++;
  431. goto exit;
  432. }
  433. count = 0;
  434. while (!test7c_connected && ++count < 10000)
  435. MySleep(100);
  436. assert("Count should be less than 10000", count < 10000, "count was %d", count); /* wrong */
  437. MySleep(3000);
  438. break;
  439. }
  440. }
  441. exit:
  442. rc = MQTTAsync_disconnect(c, NULL);
  443. assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  444. /*rc = MQTTAsync_disconnect(d, NULL);
  445. assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);*/
  446. MySleep(200);
  447. MQTTAsync_destroy(&c);
  448. MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.",
  449. (failures == 0) ? "passed" : "failed", testname, tests, failures);
  450. write_test_result();
  451. return failures;
  452. }
  453. void handleTrace(enum MQTTASYNC_TRACE_LEVELS level, char* message)
  454. {
  455. printf("%s\n", message);
  456. }
  457. int main(int argc, char** argv)
  458. {
  459. int* numtests = &tests;
  460. int rc = 0;
  461. int (*tests[])() = { NULL, test7 };
  462. time_t randtime;
  463. srand((unsigned) time(&randtime));
  464. sprintf(unique, "%u", rand());
  465. MyLog(LOGA_INFO, "Random prefix/suffix is %s", unique);
  466. xml = fopen("TEST-test9.xml", "w");
  467. fprintf(xml, "<testsuite name=\"test9\" tests=\"%d\">\n", (int)(ARRAY_SIZE(tests) - 1));
  468. MQTTAsync_setTraceCallback(handleTrace);
  469. getopts(argc, argv);
  470. if (options.test_no == 0)
  471. { /* run all the tests */
  472. for (options.test_no = 1; options.test_no < ARRAY_SIZE(tests); ++options.test_no)
  473. {
  474. failures = 0;
  475. MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_PROTOCOL);
  476. rc += tests[options.test_no](options); /* return number of failures. 0 = test succeeded */
  477. }
  478. }
  479. else
  480. {
  481. MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_PROTOCOL);
  482. rc = tests[options.test_no](options); /* run just the selected test */
  483. }
  484. MyLog(LOGA_INFO, "Total tests run: %d", *numtests);
  485. if (rc == 0)
  486. MyLog(LOGA_INFO, "verdict pass");
  487. else
  488. MyLog(LOGA_INFO, "verdict fail");
  489. fprintf(xml, "</testsuite>\n");
  490. fclose(xml);
  491. return rc;
  492. }