test9.c 79 KB


  1. /*******************************************************************************
  2. * Copyright (c) 2012, 2022 IBM Corp., Ian Craggs
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v2.0
  6. * and Eclipse Distribution License v1.0 which accompany this distribution.
  7. *
  8. * The Eclipse Public License is available at
  9. * https://www.eclipse.org/legal/epl-2.0/
  10. * and the Eclipse Distribution License is available at
  11. * http://www.eclipse.org/org/documents/edl-v10.php.
  12. *
  13. * Contributors:
  14. * Ian Craggs - initial API and implementation and/or initial documentation
  15. * Ian Craggs - correct some compile warnings
  16. * Ian Craggs - add binary will message test
  17. *******************************************************************************/
  18. /**
  19. * @file
  20. * Offline buffering and automatic reconnect tests for the Paho Asynchronous MQTT C client
  21. *
  22. */
  23. #include "MQTTAsync.h"
  24. #include <string.h>
  25. #include <stdlib.h>
  26. #include "Thread.h"
  27. #if !defined(_WINDOWS)
  28. #include <sys/time.h>
  29. #include <sys/socket.h>
  30. #include <unistd.h>
  31. #include <errno.h>
  32. #else
  33. #include <windows.h>
  34. #endif
  35. char unique[50]; // unique suffix/prefix to add to clientid/topic etc
  36. #define ARRAY_SIZE(a) (sizeof(a) / sizeof(a[0]))
  37. void usage(void)
  38. {
  39. printf("help!!\n");
  40. exit(EXIT_FAILURE);
  41. }
  42. struct Options
  43. {
  44. char* connection; /**< connection to system under test. */
  45. char* proxy_connection; /**< connection to proxy */
  46. int verbose;
  47. int test_no;
  48. } options =
  49. {
  50. "localhost:1883",
  51. "localhost:1884",
  52. 0,
  53. 0,
  54. };
  55. void getopts(int argc, char** argv)
  56. {
  57. int count = 1;
  58. while (count < argc)
  59. {
  60. if (strcmp(argv[count], "--test_no") == 0)
  61. {
  62. if (++count < argc)
  63. options.test_no = atoi(argv[count]);
  64. else
  65. usage();
  66. }
  67. else if (strcmp(argv[count], "--connection") == 0)
  68. {
  69. if (++count < argc)
  70. options.connection = argv[count];
  71. else
  72. usage();
  73. }
  74. else if (strcmp(argv[count], "--proxy_connection") == 0)
  75. {
  76. if (++count < argc)
  77. options.proxy_connection = argv[count];
  78. else
  79. usage();
  80. }
  81. else if (strcmp(argv[count], "--verbose") == 0)
  82. options.verbose = 1;
  83. count++;
  84. }
  85. }
  86. #define LOGA_DEBUG 0
  87. #define LOGA_INFO 1
  88. #include <stdarg.h>
  89. #include <time.h>
  90. #include <sys/timeb.h>
  91. void MyLog(int LOGA_level, char* format, ...)
  92. {
  93. static char msg_buf[256];
  94. va_list args;
  95. #if defined(_WIN32) || defined(_WINDOWS)
  96. struct timeb ts;
  97. #else
  98. struct timeval ts;
  99. #endif
  100. struct tm timeinfo;
  101. if (LOGA_level == LOGA_DEBUG && options.verbose == 0)
  102. return;
  103. #if defined(_WIN32) || defined(_WINDOWS)
  104. ftime(&ts);
  105. localtime_s(&timeinfo, &ts.time);
  106. #else
  107. gettimeofday(&ts, NULL);
  108. localtime_r(&ts.tv_sec, &timeinfo);
  109. #endif
  110. strftime(msg_buf, 80, "%Y%m%d %H%M%S", &timeinfo);
  111. #if defined(_WIN32) || defined(_WINDOWS)
  112. sprintf(&msg_buf[strlen(msg_buf)], ".%.3hu ", ts.millitm);
  113. #else
  114. sprintf(&msg_buf[strlen(msg_buf)], ".%.3lu ", ts.tv_usec / 1000L);
  115. #endif
  116. va_start(args, format);
  117. vsnprintf(&msg_buf[strlen(msg_buf)], sizeof(msg_buf) - strlen(msg_buf), format, args);
  118. va_end(args);
  119. printf("%s\n", msg_buf);
  120. fflush(stdout);
  121. }
  122. void MySleep(long milliseconds)
  123. {
  124. #if defined(_WIN32) || defined(_WIN64)
  125. Sleep(milliseconds);
  126. #else
  127. usleep(milliseconds*1000);
  128. #endif
  129. }
  130. #if defined(_WIN32) || defined(_WINDOWS)
  131. #define START_TIME_TYPE DWORD
  132. static DWORD start_time = 0;
  133. START_TIME_TYPE start_clock(void)
  134. {
  135. return GetTickCount();
  136. }
  137. #elif defined(AIX)
  138. #define START_TIME_TYPE struct timespec
  139. START_TIME_TYPE start_clock(void)
  140. {
  141. static struct timespec start;
  142. clock_gettime(CLOCK_REALTIME, &start);
  143. return start;
  144. }
  145. #else
  146. #define START_TIME_TYPE struct timeval
  147. /* TODO - unused - remove? static struct timeval start_time; */
  148. START_TIME_TYPE start_clock(void)
  149. {
  150. struct timeval start_time;
  151. gettimeofday(&start_time, NULL);
  152. return start_time;
  153. }
  154. #endif
  155. #if defined(_WIN32)
  156. long elapsed(START_TIME_TYPE start_time)
  157. {
  158. return GetTickCount() - start_time;
  159. }
  160. #elif defined(AIX)
  161. #define assert(a)
  162. long elapsed(struct timespec start)
  163. {
  164. struct timespec now, res;
  165. clock_gettime(CLOCK_REALTIME, &now);
  166. ntimersub(now, start, res);
  167. return (res.tv_sec)*1000L + (res.tv_nsec)/1000000L;
  168. }
  169. #else
  170. long elapsed(START_TIME_TYPE start_time)
  171. {
  172. struct timeval now, res;
  173. gettimeofday(&now, NULL);
  174. timersub(&now, &start_time, &res);
  175. return (res.tv_sec) * 1000 + (res.tv_usec) / 1000;
  176. }
  177. #endif
  178. #define assert(a, b, c, d) myassert(__FILE__, __LINE__, a, b, c, d)
  179. #define assert1(a, b, c, d, e) myassert(__FILE__, __LINE__, a, b, c, d, e)
  180. #define MAXMSGS 30;
  181. int tests = 0;
  182. int failures = 0;
  183. FILE* xml;
  184. START_TIME_TYPE global_start_time;
  185. char output[3000];
  186. char* cur_output = output;
  187. void write_test_result(void)
  188. {
  189. long duration = elapsed(global_start_time);
  190. fprintf(xml, " time=\"%ld.%.3ld\" >\n", duration / 1000, duration % 1000);
  191. if (cur_output != output)
  192. {
  193. fprintf(xml, "%s", output);
  194. cur_output = output;
  195. }
  196. fprintf(xml, "</testcase>\n");
  197. }
  198. void myassert(char* filename, int lineno, char* description, int value,
  199. char* format, ...)
  200. {
  201. ++tests;
  202. if (!value)
  203. {
  204. va_list args;
  205. ++failures;
  206. MyLog(LOGA_INFO, "Assertion failed, file %s, line %d, description: %s", filename,
  207. lineno, description);
  208. va_start(args, format);
  209. vprintf(format, args);
  210. va_end(args);
  211. cur_output += sprintf(cur_output, "<failure type=\"%s\">file %s, line %d </failure>\n",
  212. description, filename, lineno);
  213. }
  214. else
  215. MyLog(LOGA_DEBUG, "Assertion succeeded, file %s, line %d, description: %s",
  216. filename, lineno, description);
  217. }
  218. void waitForNoPendingTokens(MQTTAsync c)
  219. {
  220. int i = 0, rc = 0, count = 0;
  221. MQTTAsync_token *tokens;
  222. /* acks for outgoing messages could arrive after incoming exchanges are complete */
  223. do
  224. {
  225. rc = MQTTAsync_getPendingTokens(c, &tokens);
  226. assert("Good rc from getPendingTokens", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  227. i = 0;
  228. if (tokens)
  229. {
  230. while (tokens[i] != -1)
  231. ++i;
  232. MQTTAsync_free(tokens);
  233. }
  234. if (i > 0)
  235. MySleep(100);
  236. }
  237. while (i > 0 && ++count < 10);
  238. assert("Number of getPendingTokens should be 0", i == 0, "i was %d ", i);
  239. }
  240. void assert3PendingTokens(MQTTAsync c)
  241. {
  242. int i = 0, rc = 0;
  243. MQTTAsync_token *tokens;
  244. rc = MQTTAsync_getPendingTokens(c, &tokens);
  245. assert("Good rc from getPendingTokens", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  246. i = 0;
  247. if (tokens)
  248. {
  249. while (tokens[i] != -1)
  250. ++i;
  251. MQTTAsync_free(tokens);
  252. }
  253. assert("Number of getPendingTokens should be 3", i == 3, "i was %d\n", i);
  254. }
  255. /*********************************************************************
  256. Tests: offline buffering - sending messages while disconnected
  257. 1. send some messages while disconnected, check that they are sent
  258. 2. repeat test 1 using serverURIs
  259. 3. repeat test 1 using auto reconnect
  260. 4. repeat test 2 using auto reconnect
  261. 5. check max-buffered
  262. 6. check auto-reconnect parms alter behaviour as expected
  263. Tests: automatic reconnect
  264. - check that connected() is called
  265. - check that reconnect() causes reconnect attempt
  266. - check that reconnect() fails if no connect has been previously attempted
  267. *********************************************************************/
  268. void handleTrace(enum MQTTASYNC_TRACE_LEVELS level, char* message)
  269. {
  270. printf("%s\n", message);
  271. }
  272. /*********************************************************************
  273. Test1: offline buffering - sending messages while disconnected
  274. 1. call connect
  275. 2. use proxy to disconnect the client
  276. 3. while the client is disconnected, send more messages
  277. 4. when the client reconnects, check that those messages are sent
  278. *********************************************************************/
  279. int test1_will_message_received = 0;
  280. int test1_messages_received = 0;
  281. int test1_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
  282. {
  283. MQTTAsync c = (MQTTAsync)context;
  284. static int message_count = 0;
  285. MyLog(LOGA_DEBUG, "Message received on topic %s, \"%.*s\"", topicName, message->payloadlen, message->payload);
  286. if (memcmp(message->payload, "will message", message->payloadlen) == 0)
  287. test1_will_message_received = 1;
  288. else
  289. test1_messages_received++;
  290. MQTTAsync_freeMessage(&message);
  291. MQTTAsync_free(topicName);
  292. return 1;
  293. }
  294. int test1Finished = 0;
  295. int test1OnFailureCalled = 0;
  296. void test1cOnFailure(void* context, MQTTAsync_failureData* response)
  297. {
  298. MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
  299. test1OnFailureCalled++;
  300. test1Finished = 1;
  301. }
  302. void test1dOnFailure(void* context, MQTTAsync_failureData* response)
  303. {
  304. MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
  305. test1OnFailureCalled++;
  306. test1Finished = 1;
  307. }
  308. void test1cOnConnect(void* context, MQTTAsync_successData* response)
  309. {
  310. MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
  311. MyLog(LOGA_DEBUG, "In connect onSuccess callback for client d, context %p\n", context);
  312. MQTTAsync c = (MQTTAsync)context;
  313. int rc;
  314. /* send a message to the proxy to break the connection */
  315. pubmsg.payload = "TERMINATE";
  316. pubmsg.payloadlen = (int)strlen(pubmsg.payload);
  317. pubmsg.qos = 0;
  318. pubmsg.retained = 0;
  319. rc = MQTTAsync_sendMessage(c, "MQTTSAS topic", &pubmsg, NULL);
  320. assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  321. }
  322. int test1dReady = 0;
  323. char willTopic[100];
  324. char test_topic[100];
  325. void test1donSubscribe(void* context, MQTTAsync_successData* response)
  326. {
  327. MQTTAsync c = (MQTTAsync)context;
  328. MyLog(LOGA_DEBUG, "In subscribe onSuccess callback for client d, %p granted qos %d", c, response->alt.qos);
  329. test1dReady = 1;
  330. }
  331. void test1dOnConnect(void* context, MQTTAsync_successData* response)
  332. {
  333. MQTTAsync c = (MQTTAsync)context;
  334. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  335. int rc;
  336. int qoss[2] = {2, 2};
  337. char* topics[2] = {willTopic, test_topic};
  338. MyLog(LOGA_DEBUG, "In connect onSuccess callback for client c, context %p\n", context);
  339. opts.onSuccess = test1donSubscribe;
  340. opts.context = c;
  341. rc = MQTTAsync_subscribeMany(c, 2, topics, qoss, &opts);
  342. assert("Good rc from subscribe", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  343. if (rc != MQTTASYNC_SUCCESS)
  344. test1Finished = 1;
  345. }
  346. int test1c_connected = 0;
  347. void test1cConnected(void* context, char* cause)
  348. {
  349. MQTTAsync c = (MQTTAsync)context;
  350. MyLog(LOGA_DEBUG, "In connected callback for client c, context %p\n", context);
  351. test1c_connected = 1;
  352. }
  353. int test1(struct Options options)
  354. {
  355. char* testname = "test1";
  356. int subsqos = 2;
  357. MQTTAsync c, d;
  358. MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
  359. MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
  360. MQTTAsync_createOptions createOptions = MQTTAsync_createOptions_initializer;
  361. int rc = 0;
  362. int count = 0;
  363. char clientidc[70];
  364. char clientidd[70];
  365. int i = 0;
  366. sprintf(willTopic, "paho-test9-1-%s", unique);
  367. sprintf(clientidc, "paho-test9-1-c-%s", unique);
  368. sprintf(clientidd, "paho-test9-1-d-%s", unique);
  369. sprintf(test_topic, "paho-test9-1-test topic %s", unique);
  370. test1Finished = 0;
  371. failures = 0;
  372. MyLog(LOGA_INFO, "Starting Offline buffering 1 - messages while disconnected");
  373. fprintf(xml, "<testcase classname=\"test1\" name=\"%s\"", testname);
  374. global_start_time = start_clock();
  375. createOptions.sendWhileDisconnected = 1;
  376. rc = MQTTAsync_createWithOptions(&c, options.proxy_connection, clientidc, MQTTCLIENT_PERSISTENCE_DEFAULT,
  377. NULL, &createOptions);
  378. assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
  379. if (rc != MQTTASYNC_SUCCESS)
  380. {
  381. MQTTAsync_destroy(&c);
  382. goto exit;
  383. }
  384. rc = MQTTAsync_create(&d, options.connection, clientidd, MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
  385. assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
  386. if (rc != MQTTASYNC_SUCCESS)
  387. {
  388. MQTTAsync_destroy(&c);
  389. goto exit;
  390. }
  391. opts.keepAliveInterval = 5;
  392. opts.cleansession = 1;
  393. opts.MQTTVersion = MQTTVERSION_3_1_1; /* proxy doesn't handle MQTT 3.1 */
  394. //opts.username = "testuser";
  395. //opts.password = "testpassword";
  396. rc = MQTTAsync_setCallbacks(d, d, NULL, test1_messageArrived, NULL);
  397. assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  398. opts.will = NULL; /* don't need will for this client, as it's going to be connected all the time */
  399. opts.context = d;
  400. opts.onSuccess = test1dOnConnect;
  401. opts.onFailure = test1dOnFailure;
  402. MyLog(LOGA_DEBUG, "Connecting client d");
  403. rc = MQTTAsync_connect(d, &opts);
  404. assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  405. if (rc != MQTTASYNC_SUCCESS)
  406. {
  407. failures++;
  408. goto exit;
  409. }
  410. /* wait until d is ready: connected and subscribed */
  411. count = 0;
  412. while (!test1dReady && ++count < 10000)
  413. MySleep(100);
  414. assert("Count should be less than 10000", count < 10000, "count was %d", count); /* wrong */
  415. rc = MQTTAsync_setConnected(c, c, test1cConnected);
  416. assert("Good rc from setConnectedCallback", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  417. /* let client c go: connect, and send disconnect command to proxy */
  418. opts.will = &wopts;
  419. opts.will->message = "will message";
  420. opts.will->qos = 1;
  421. opts.will->retained = 0;
  422. opts.will->topicName = willTopic;
  423. opts.onSuccess = test1cOnConnect;
  424. opts.onFailure = test1cOnFailure;
  425. opts.context = c;
  426. opts.cleansession = 0;
  427. MyLog(LOGA_DEBUG, "Connecting client c");
  428. rc = MQTTAsync_connect(c, &opts);
  429. assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  430. if (rc != MQTTASYNC_SUCCESS)
  431. {
  432. failures++;
  433. goto exit;
  434. }
  435. /* wait for will message */
  436. while (!test1_will_message_received && ++count < 10000)
  437. MySleep(100);
  438. /* ensure not connected */
  439. while (MQTTAsync_isConnected(c) && ++count < 10000)
  440. MySleep(100);
  441. MyLog(LOGA_DEBUG, "Now we can send some messages to be buffered");
  442. test1c_connected = 0;
  443. /* send some messages. Then reconnect (check connected callback), and check that those messages are received */
  444. for (i = 0; i < 3; ++i)
  445. {
  446. char buf[50];
  447. MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
  448. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  449. sprintf(buf, "QoS %d message", i);
  450. pubmsg.payload = buf;
  451. pubmsg.payloadlen = (int)strlen(pubmsg.payload) + 1;
  452. pubmsg.qos = i;
  453. pubmsg.retained = 0;
  454. rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &opts);
  455. assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  456. }
  457. assert3PendingTokens(c);
  458. rc = MQTTAsync_reconnect(c);
  459. assert("Good rc from reconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  460. /* wait for client to be reconnected */
  461. while (!test1c_connected && ++count < 10000)
  462. MySleep(100);
  463. /* wait for success or failure callback */
  464. while (test1_messages_received < 3 && ++count < 10000)
  465. MySleep(100);
  466. waitForNoPendingTokens(c);
  467. rc = MQTTAsync_disconnect(c, NULL);
  468. assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  469. rc = MQTTAsync_disconnect(d, NULL);
  470. assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  471. exit:
  472. MQTTAsync_destroy(&c);
  473. MQTTAsync_destroy(&d);
  474. MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.",
  475. (failures == 0) ? "passed" : "failed", testname, tests, failures);
  476. write_test_result();
  477. return failures;
  478. }
  479. /*********************************************************************
  480. Test2: offline buffering - sending messages while disconnected
  481. 1. call connect
  482. 2. use proxy to disconnect the client
  483. 3. while the client is disconnected, send more messages
  484. 4. when the client reconnects, check that those messages are sent
  485. *********************************************************************/
  486. int test2_will_message_received = 0;
  487. int test2_messages_received = 0;
  488. int test2_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
  489. {
  490. MQTTAsync c = (MQTTAsync)context;
  491. static int message_count = 0;
  492. MyLog(LOGA_DEBUG, "Message received on topic %s, \"%.*s\"", topicName, message->payloadlen, message->payload);
  493. if (memcmp(message->payload, "will message", message->payloadlen) == 0)
  494. test2_will_message_received = 1;
  495. else
  496. test2_messages_received++;
  497. MQTTAsync_freeMessage(&message);
  498. MQTTAsync_free(topicName);
  499. return 1;
  500. }
  501. int test2Finished = 0;
  502. int test2OnFailureCalled = 0;
  503. void test2cOnFailure(void* context, MQTTAsync_failureData* response)
  504. {
  505. MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
  506. test2OnFailureCalled++;
  507. test2Finished = 1;
  508. }
  509. void test2dOnFailure(void* context, MQTTAsync_failureData* response)
  510. {
  511. MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
  512. test2OnFailureCalled++;
  513. test2Finished = 1;
  514. }
  515. void test2cOnConnect(void* context, MQTTAsync_successData* response)
  516. {
  517. MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
  518. MyLog(LOGA_DEBUG, "In connect onSuccess callback for client d, context %p\n", context);
  519. MQTTAsync c = (MQTTAsync)context;
  520. int rc;
  521. /* send a message to the proxy to break the connection */
  522. pubmsg.payload = "TERMINATE";
  523. pubmsg.payloadlen = (int)strlen(pubmsg.payload);
  524. pubmsg.qos = 0;
  525. pubmsg.retained = 0;
  526. rc = MQTTAsync_sendMessage(c, "MQTTSAS topic", &pubmsg, NULL);
  527. assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  528. }
  529. int test2dReady = 0;
  530. char willTopic[100];
  531. char test_topic[100];
  532. void test2donSubscribe(void* context, MQTTAsync_successData* response)
  533. {
  534. MQTTAsync c = (MQTTAsync)context;
  535. MyLog(LOGA_DEBUG, "In subscribe onSuccess callback for client d, %p granted qos %d", c, response->alt.qos);
  536. test2dReady = 1;
  537. }
  538. void test2dOnConnect(void* context, MQTTAsync_successData* response)
  539. {
  540. MQTTAsync c = (MQTTAsync)context;
  541. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  542. int rc;
  543. int qoss[2] = {2, 2};
  544. char* topics[2] = {willTopic, test_topic};
  545. MyLog(LOGA_DEBUG, "In connect onSuccess callback for client c, context %p\n", context);
  546. opts.onSuccess = test2donSubscribe;
  547. opts.context = c;
  548. rc = MQTTAsync_subscribeMany(c, 2, topics, qoss, &opts);
  549. assert("Good rc from subscribe", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  550. if (rc != MQTTASYNC_SUCCESS)
  551. test2Finished = 1;
  552. }
  553. int test2c_connected = 0;
  554. void test2cConnected(void* context, char* cause)
  555. {
  556. MQTTAsync c = (MQTTAsync)context;
  557. MyLog(LOGA_DEBUG, "In connected callback for client c, context %p\n", context);
  558. test2c_connected = 1;
  559. }
  560. int test2(struct Options options)
  561. {
  562. char* testname = "test2";
  563. int subsqos = 2;
  564. MQTTAsync c, d;
  565. MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
  566. MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
  567. MQTTAsync_createOptions createOptions = MQTTAsync_createOptions_initializer;
  568. int rc = 0;
  569. int count = 0;
  570. char clientidc[70];
  571. char clientidd[70];
  572. int i = 0;
  573. char *URIs[2] = {"rubbish", options.proxy_connection};
  574. sprintf(willTopic, "paho-test9-2-%s", unique);
  575. sprintf(clientidc, "paho-test9-2-c-%s", unique);
  576. sprintf(clientidd, "paho-test9-2-d-%s", unique);
  577. sprintf(test_topic, "paho-test9-2-test topic %s", unique);
  578. test2Finished = 0;
  579. failures = 0;
  580. MyLog(LOGA_INFO, "Starting Offline buffering 2 - messages while disconnected with serverURIs");
  581. fprintf(xml, "<testcase classname=\"test2\" name=\"%s\"", testname);
  582. global_start_time = start_clock();
  583. createOptions.sendWhileDisconnected = 1;
  584. rc = MQTTAsync_createWithOptions(&c, "not used", clientidc, MQTTCLIENT_PERSISTENCE_DEFAULT,
  585. NULL, &createOptions);
  586. assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
  587. if (rc != MQTTASYNC_SUCCESS)
  588. {
  589. MQTTAsync_destroy(&c);
  590. goto exit;
  591. }
  592. rc = MQTTAsync_create(&d, options.connection, clientidd, MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
  593. assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
  594. if (rc != MQTTASYNC_SUCCESS)
  595. {
  596. MQTTAsync_destroy(&c);
  597. goto exit;
  598. }
  599. opts.keepAliveInterval = 5;
  600. opts.cleansession = 1;
  601. opts.MQTTVersion = MQTTVERSION_3_1_1; /* proxy doesn't handle MQTT 3.1 */
  602. rc = MQTTAsync_setCallbacks(d, d, NULL, test2_messageArrived, NULL);
  603. assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  604. opts.will = NULL; /* don't need will for this client, as it's going to be connected all the time */
  605. opts.context = d;
  606. opts.onSuccess = test2dOnConnect;
  607. opts.onFailure = test2dOnFailure;
  608. MyLog(LOGA_DEBUG, "Connecting client d");
  609. rc = MQTTAsync_connect(d, &opts);
  610. assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  611. if (rc != MQTTASYNC_SUCCESS)
  612. {
  613. failures++;
  614. goto exit;
  615. }
  616. /* wait until d is ready: connected and subscribed */
  617. count = 0;
  618. while (!test2dReady && ++count < 300)
  619. MySleep(100);
  620. assert("Count should be less than 300", count < 300, "count was %d", count); /* wrong */
  621. rc = MQTTAsync_setConnected(c, c, test2cConnected);
  622. assert("Good rc from setConnectedCallback", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  623. /* let client c go: connect, and send disconnect command to proxy */
  624. opts.will = &wopts;
  625. opts.will->message = "will message";
  626. opts.will->qos = 1;
  627. opts.will->retained = 0;
  628. opts.will->topicName = willTopic;
  629. opts.onSuccess = test2cOnConnect;
  630. opts.onFailure = test2cOnFailure;
  631. opts.context = c;
  632. opts.cleansession = 0;
  633. opts.serverURIs = URIs;
  634. opts.serverURIcount = 2;
  635. opts.MQTTVersion = MQTTVERSION_3_1_1;
  636. MyLog(LOGA_DEBUG, "Connecting client c");
  637. rc = MQTTAsync_connect(c, &opts);
  638. assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  639. if (rc != MQTTASYNC_SUCCESS)
  640. {
  641. failures++;
  642. goto exit;
  643. }
  644. /* wait for will message */
  645. count = 0;
  646. while (!test2_will_message_received && ++count < 300)
  647. MySleep(100);
  648. assert("Count should be less than 300", count < 300, "count was %d", count); /* wrong */
  649. /* ensure not connected */
  650. count = 0;
  651. while (MQTTAsync_isConnected(c) && ++count < 300)
  652. MySleep(100);
  653. assert("Count should be less than 300", count < 300, "count was %d", count); /* wrong */
  654. MyLog(LOGA_DEBUG, "Now we can send some messages to be buffered");
  655. test2c_connected = 0;
  656. /* send some messages. Then reconnect (check connected callback), and check that those messages are received */
  657. for (i = 0; i < 3; ++i)
  658. {
  659. char buf[50];
  660. MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
  661. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  662. sprintf(buf, "QoS %d message", i);
  663. pubmsg.payload = buf;
  664. pubmsg.payloadlen = (int)(strlen(pubmsg.payload) + 1);
  665. pubmsg.qos = i;
  666. pubmsg.retained = 0;
  667. rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &opts);
  668. assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  669. }
  670. assert3PendingTokens(c);
  671. rc = MQTTAsync_reconnect(c);
  672. assert("Good rc from reconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  673. /* wait for client to be reconnected */
  674. count = 0;
  675. while (!test2c_connected && ++count < 300)
  676. MySleep(100);
  677. assert("Count should be less than 300", count < 300, "count was %d", count); /* wrong */
  678. /* wait for success or failure callback */
  679. count = 0;
  680. while (test2_messages_received < 3 && ++count < 300)
  681. MySleep(100);
  682. assert("Count should be less than 300", count < 300, "count was %d", count); /* wrong */
  683. waitForNoPendingTokens(c);
  684. rc = MQTTAsync_disconnect(c, NULL);
  685. assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  686. rc = MQTTAsync_disconnect(d, NULL);
  687. assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  688. exit:
  689. MySleep(200);
  690. MQTTAsync_destroy(&c);
  691. MQTTAsync_destroy(&d);
  692. MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.",
  693. (failures == 0) ? "passed" : "failed", testname, tests, failures);
  694. write_test_result();
  695. return failures;
  696. }
  697. /*********************************************************************
  698. test3: offline buffering - sending messages while disconnected
  699. 1. call connect
  700. 2. use proxy to disconnect the client
  701. 3. while the client is disconnected, send more messages
  702. 4. when the client auto reconnects, check that those messages are sent
  703. *********************************************************************/
  704. int test3_will_message_received = 0;
  705. int test3_messages_received = 0;
  706. int test3_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
  707. {
  708. MQTTAsync c = (MQTTAsync)context;
  709. static int message_count = 0;
  710. MyLog(LOGA_DEBUG, "Message received on topic %s, \"%.*s\"", topicName, message->payloadlen, message->payload);
  711. if (memcmp(message->payload, "will message", message->payloadlen) == 0)
  712. test3_will_message_received = 1;
  713. else
  714. test3_messages_received++;
  715. MQTTAsync_freeMessage(&message);
  716. MQTTAsync_free(topicName);
  717. return 1;
  718. }
  719. int test3Finished = 0;
  720. int test3OnFailureCalled = 0;
  721. void test3cOnFailure(void* context, MQTTAsync_failureData* response)
  722. {
  723. MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
  724. test3OnFailureCalled++;
  725. test3Finished = 1;
  726. }
  727. void test3dOnFailure(void* context, MQTTAsync_failureData* response)
  728. {
  729. MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
  730. test3OnFailureCalled++;
  731. test3Finished = 1;
  732. }
  733. void test3cOnConnect(void* context, MQTTAsync_successData* response)
  734. {
  735. MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
  736. MyLog(LOGA_DEBUG, "In connect onSuccess callback for client d, context %p\n", context);
  737. MQTTAsync c = (MQTTAsync)context;
  738. int rc;
  739. /* send a message to the proxy to break the connection */
  740. pubmsg.payload = "TERMINATE";
  741. pubmsg.payloadlen = (int)strlen(pubmsg.payload);
  742. pubmsg.qos = 0;
  743. pubmsg.retained = 0;
  744. rc = MQTTAsync_sendMessage(c, "MQTTSAS topic", &pubmsg, NULL);
  745. assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  746. }
  747. int test3dReady = 0;
  748. char willTopic[100];
  749. char test_topic[100];
  750. void test3donSubscribe(void* context, MQTTAsync_successData* response)
  751. {
  752. MQTTAsync c = (MQTTAsync)context;
  753. MyLog(LOGA_DEBUG, "In subscribe onSuccess callback for client d, %p granted qos %d", c, response->alt.qos);
  754. test3dReady = 1;
  755. }
  756. void test3dOnConnect(void* context, MQTTAsync_successData* response)
  757. {
  758. MQTTAsync c = (MQTTAsync)context;
  759. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  760. int rc;
  761. int qoss[2] = {2, 2};
  762. char* topics[2] = {willTopic, test_topic};
  763. MyLog(LOGA_DEBUG, "In connect onSuccess callback for client c, context %p\n", context);
  764. opts.onSuccess = test3donSubscribe;
  765. opts.context = c;
  766. rc = MQTTAsync_subscribeMany(c, 2, topics, qoss, &opts);
  767. assert("Good rc from subscribe", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  768. if (rc != MQTTASYNC_SUCCESS)
  769. test3Finished = 1;
  770. }
  771. int test3c_connected = 0;
  772. void test3cConnected(void* context, char* cause)
  773. {
  774. MQTTAsync c = (MQTTAsync)context;
  775. MyLog(LOGA_DEBUG, "In connected callback for client c, context %p\n", context);
  776. test3c_connected = 1;
  777. }
  778. int test3(struct Options options)
  779. {
  780. char* testname = "test3";
  781. int subsqos = 2;
  782. MQTTAsync c, d;
  783. MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
  784. MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
  785. MQTTAsync_createOptions createOptions = MQTTAsync_createOptions_initializer;
  786. int rc = 0;
  787. int count = 0;
  788. char clientidc[70];
  789. char clientidd[70];
  790. int i = 0;
  791. sprintf(willTopic, "paho-test9-3-%s", unique);
  792. sprintf(clientidc, "paho-test9-3-c-%s", unique);
  793. sprintf(clientidd, "paho-test9-3-d-%s", unique);
  794. sprintf(test_topic, "paho-test9-3-test topic %s", unique);
  795. test3Finished = 0;
  796. failures = 0;
  797. MyLog(LOGA_INFO, "Starting Offline buffering 3 - messages while disconnected");
  798. fprintf(xml, "<testcase classname=\"test3\" name=\"%s\"", testname);
  799. global_start_time = start_clock();
  800. createOptions.sendWhileDisconnected = 1;
  801. rc = MQTTAsync_createWithOptions(&c, options.proxy_connection, clientidc, MQTTCLIENT_PERSISTENCE_DEFAULT,
  802. NULL, &createOptions);
  803. assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
  804. if (rc != MQTTASYNC_SUCCESS)
  805. {
  806. MQTTAsync_destroy(&c);
  807. goto exit;
  808. }
  809. rc = MQTTAsync_create(&d, options.connection, clientidd, MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
  810. assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
  811. if (rc != MQTTASYNC_SUCCESS)
  812. {
  813. MQTTAsync_destroy(&c);
  814. goto exit;
  815. }
  816. opts.keepAliveInterval = 5;
  817. opts.cleansession = 1;
  818. opts.MQTTVersion = MQTTVERSION_3_1_1; /* proxy doesn't handle MQTT 3.1 */
  819. //opts.username = "testuser";
  820. //opts.password = "testpassword";
  821. rc = MQTTAsync_setCallbacks(d, d, NULL, test3_messageArrived, NULL);
  822. assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  823. opts.will = NULL; /* don't need will for this client, as it's going to be connected all the time */
  824. opts.context = d;
  825. opts.onSuccess = test3dOnConnect;
  826. opts.onFailure = test3dOnFailure;
  827. MyLog(LOGA_DEBUG, "Connecting client d");
  828. rc = MQTTAsync_connect(d, &opts);
  829. assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  830. if (rc != MQTTASYNC_SUCCESS)
  831. {
  832. failures++;
  833. goto exit;
  834. }
  835. /* wait until d is ready: connected and subscribed */
  836. count = 0;
  837. while (!test3dReady && ++count < 10000)
  838. MySleep(100);
  839. assert("Count should be less than 10000", count < 10000, "count was %d", count); /* wrong */
  840. rc = MQTTAsync_setConnected(c, c, test3cConnected);
  841. assert("Good rc from setConnectedCallback", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  842. /* let client c go: connect, and send disconnect command to proxy */
  843. opts.will = &wopts;
  844. opts.will->message = "will message";
  845. opts.will->qos = 1;
  846. opts.will->retained = 0;
  847. opts.will->topicName = willTopic;
  848. opts.onSuccess = test3cOnConnect;
  849. opts.onFailure = test3cOnFailure;
  850. opts.context = c;
  851. opts.cleansession = 0;
  852. opts.automaticReconnect = 1;
  853. MyLog(LOGA_DEBUG, "Connecting client c");
  854. rc = MQTTAsync_connect(c, &opts);
  855. assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  856. if (rc != MQTTASYNC_SUCCESS)
  857. {
  858. failures++;
  859. goto exit;
  860. }
  861. /* wait for will message */
  862. while (!test3_will_message_received && ++count < 10000)
  863. MySleep(100);
  864. /* ensure not connected */
  865. while (MQTTAsync_isConnected(c) && ++count < 10000)
  866. MySleep(100);
  867. MyLog(LOGA_DEBUG, "Now we can send some messages to be buffered");
  868. test3c_connected = 0;
  869. /* send some messages. Then reconnect (check connected callback), and check that those messages are received */
  870. for (i = 0; i < 3; ++i)
  871. {
  872. char buf[50];
  873. MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
  874. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  875. sprintf(buf, "QoS %d message", i);
  876. pubmsg.payload = buf;
  877. pubmsg.payloadlen = (int)(strlen(pubmsg.payload) + 1);
  878. pubmsg.qos = i;
  879. pubmsg.retained = 0;
  880. rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &opts);
  881. assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  882. }
  883. assert3PendingTokens(c);
  884. /* wait for client to be reconnected */
  885. while (!test3c_connected && ++count < 10000)
  886. MySleep(100);
  887. /* wait for success or failure callback */
  888. while (test3_messages_received < 3 && ++count < 10000)
  889. MySleep(100);
  890. waitForNoPendingTokens(c);
  891. rc = MQTTAsync_disconnect(c, NULL);
  892. assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  893. rc = MQTTAsync_disconnect(d, NULL);
  894. assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  895. exit:
  896. MySleep(200);
  897. MQTTAsync_destroy(&c);
  898. MQTTAsync_destroy(&d);
  899. MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.",
  900. (failures == 0) ? "passed" : "failed", testname, tests, failures);
  901. write_test_result();
  902. return failures;
  903. }
  904. /*********************************************************************
  905. test4: offline buffering - sending messages while disconnected
  906. 1. call connect
  907. 2. use proxy to disconnect the client
  908. 3. while the client is disconnected, send more messages
  909. 4. when the client auto reconnects, check that those messages are sent
  910. *********************************************************************/
  911. int test4_will_message_received = 0;
  912. int test4_messages_received = 0;
  913. int test4_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
  914. {
  915. MQTTAsync c = (MQTTAsync)context;
  916. static int message_count = 0;
  917. MyLog(LOGA_DEBUG, "Message received on topic %s, \"%.*s\"", topicName, message->payloadlen, message->payload);
  918. if (memcmp(message->payload, "will message", message->payloadlen) == 0)
  919. test4_will_message_received = 1;
  920. else
  921. test4_messages_received++;
  922. MQTTAsync_freeMessage(&message);
  923. MQTTAsync_free(topicName);
  924. return 1;
  925. }
  926. int test4Finished = 0;
  927. int test4OnFailureCalled = 0;
  928. void test4cOnFailure(void* context, MQTTAsync_failureData* response)
  929. {
  930. MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
  931. test4OnFailureCalled++;
  932. test4Finished = 1;
  933. }
  934. void test4dOnFailure(void* context, MQTTAsync_failureData* response)
  935. {
  936. MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
  937. test4OnFailureCalled++;
  938. test4Finished = 1;
  939. }
  940. void test4cOnConnect(void* context, MQTTAsync_successData* response)
  941. {
  942. MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
  943. MyLog(LOGA_DEBUG, "In connect onSuccess callback for client d, context %p\n", context);
  944. MQTTAsync c = (MQTTAsync)context;
  945. int rc;
  946. /* send a message to the proxy to break the connection */
  947. pubmsg.payload = "TERMINATE";
  948. pubmsg.payloadlen = (int)strlen(pubmsg.payload);
  949. pubmsg.qos = 0;
  950. pubmsg.retained = 0;
  951. rc = MQTTAsync_sendMessage(c, "MQTTSAS topic", &pubmsg, NULL);
  952. assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  953. }
  954. int test4dReady = 0;
  955. char willTopic[100];
  956. char test_topic[100];
  957. void test4donSubscribe(void* context, MQTTAsync_successData* response)
  958. {
  959. MQTTAsync c = (MQTTAsync)context;
  960. MyLog(LOGA_DEBUG, "In subscribe onSuccess callback for client d, %p granted qos %d", c, response->alt.qos);
  961. test4dReady = 1;
  962. }
  963. void test4dOnConnect(void* context, MQTTAsync_successData* response)
  964. {
  965. MQTTAsync c = (MQTTAsync)context;
  966. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  967. int rc;
  968. int qoss[2] = {2, 2};
  969. char* topics[2] = {willTopic, test_topic};
  970. MyLog(LOGA_DEBUG, "In connect onSuccess callback for client c, context %p\n", context);
  971. opts.onSuccess = test4donSubscribe;
  972. opts.context = c;
  973. rc = MQTTAsync_subscribeMany(c, 2, topics, qoss, &opts);
  974. assert("Good rc from subscribe", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  975. if (rc != MQTTASYNC_SUCCESS)
  976. test4Finished = 1;
  977. }
  978. int test4c_connected = 0;
  979. void test4cConnected(void* context, char* cause)
  980. {
  981. MQTTAsync c = (MQTTAsync)context;
  982. MyLog(LOGA_DEBUG, "In connected callback for client c, context %p\n", context);
  983. test4c_connected = 1;
  984. }
  985. int test4(struct Options options)
  986. {
  987. char* testname = "test4";
  988. int subsqos = 2;
  989. MQTTAsync c, d;
  990. MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
  991. MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
  992. MQTTAsync_createOptions createOptions = MQTTAsync_createOptions_initializer;
  993. int rc = 0;
  994. int count = 0;
  995. char clientidc[70];
  996. char clientidd[70];
  997. int i = 0;
  998. char *URIs[2] = {"rubbish", options.proxy_connection};
  999. sprintf(willTopic, "paho-test9-4-%s", unique);
  1000. sprintf(clientidc, "paho-test9-4-c-%s", unique);
  1001. sprintf(clientidd, "paho-test9-4-d-%s", unique);
  1002. sprintf(test_topic, "paho-test9-4-test topic %s", unique);
  1003. test4Finished = 0;
  1004. failures = 0;
  1005. MyLog(LOGA_INFO, "Starting Offline buffering 4 - messages while disconnected with serverURIs");
  1006. fprintf(xml, "<testcase classname=\"test4\" name=\"%s\"", testname);
  1007. global_start_time = start_clock();
  1008. createOptions.sendWhileDisconnected = 1;
  1009. rc = MQTTAsync_createWithOptions(&c, "not used", clientidc, MQTTCLIENT_PERSISTENCE_DEFAULT,
  1010. NULL, &createOptions);
  1011. assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
  1012. if (rc != MQTTASYNC_SUCCESS)
  1013. {
  1014. MQTTAsync_destroy(&c);
  1015. goto exit;
  1016. }
  1017. rc = MQTTAsync_create(&d, options.connection, clientidd, MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
  1018. assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
  1019. if (rc != MQTTASYNC_SUCCESS)
  1020. {
  1021. MQTTAsync_destroy(&c);
  1022. goto exit;
  1023. }
  1024. opts.keepAliveInterval = 5;
  1025. opts.cleansession = 1;
  1026. opts.MQTTVersion = MQTTVERSION_3_1_1; /* proxy doesn't handle MQTT 3.1 */
  1027. rc = MQTTAsync_setCallbacks(d, d, NULL, test4_messageArrived, NULL);
  1028. assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  1029. opts.will = NULL; /* don't need will for this client, as it's going to be connected all the time */
  1030. opts.context = d;
  1031. opts.onSuccess = test4dOnConnect;
  1032. opts.onFailure = test4dOnFailure;
  1033. MyLog(LOGA_DEBUG, "Connecting client d");
  1034. rc = MQTTAsync_connect(d, &opts);
  1035. assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  1036. if (rc != MQTTASYNC_SUCCESS)
  1037. {
  1038. failures++;
  1039. goto exit;
  1040. }
  1041. /* wait until d is ready: connected and subscribed */
  1042. count = 0;
  1043. while (!test4dReady && ++count < 10000)
  1044. MySleep(100);
  1045. assert("Count should be less than 10000", count < 10000, "count was %d", count); /* wrong */
  1046. rc = MQTTAsync_setConnected(c, c, test4cConnected);
  1047. assert("Good rc from setConnectedCallback", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  1048. /* let client c go: connect, and send disconnect command to proxy */
  1049. opts.will = &wopts;
  1050. opts.will->message = "will message";
  1051. opts.will->qos = 1;
  1052. opts.will->retained = 0;
  1053. opts.will->topicName = willTopic;
  1054. opts.onSuccess = test4cOnConnect;
  1055. opts.onFailure = test4cOnFailure;
  1056. opts.context = c;
  1057. opts.cleansession = 0;
  1058. opts.serverURIs = URIs;
  1059. opts.serverURIcount = 2;
  1060. opts.automaticReconnect = 1;
  1061. MyLog(LOGA_DEBUG, "Connecting client c");
  1062. rc = MQTTAsync_connect(c, &opts);
  1063. assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  1064. if (rc != MQTTASYNC_SUCCESS)
  1065. {
  1066. failures++;
  1067. goto exit;
  1068. }
  1069. /* wait for will message */
  1070. while (!test4_will_message_received && ++count < 10000)
  1071. MySleep(100);
  1072. /* ensure not connected */
  1073. while (MQTTAsync_isConnected(c) && ++count < 10000)
  1074. MySleep(100);
  1075. MyLog(LOGA_DEBUG, "Now we can send some messages to be buffered");
  1076. test4c_connected = 0;
  1077. /* send some messages. Then reconnect (check connected callback), and check that those messages are received */
  1078. for (i = 0; i < 3; ++i)
  1079. {
  1080. char buf[50];
  1081. MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
  1082. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  1083. sprintf(buf, "QoS %d message", i);
  1084. pubmsg.payload = buf;
  1085. pubmsg.payloadlen = (int)(strlen(pubmsg.payload) + 1);
  1086. pubmsg.qos = i;
  1087. pubmsg.retained = 0;
  1088. rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &opts);
  1089. assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  1090. }
  1091. assert3PendingTokens(c);
  1092. /* wait for client to be reconnected */
  1093. while (!test4c_connected && ++count < 10000)
  1094. MySleep(100);
  1095. /* wait for success or failure callback */
  1096. while (test4_messages_received < 3 && ++count < 10000)
  1097. MySleep(100);
  1098. waitForNoPendingTokens(c);
  1099. rc = MQTTAsync_disconnect(c, NULL);
  1100. assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  1101. rc = MQTTAsync_disconnect(d, NULL);
  1102. assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  1103. exit:
  1104. MySleep(200);
  1105. MQTTAsync_destroy(&c);
  1106. MQTTAsync_destroy(&d);
  1107. MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.",
  1108. (failures == 0) ? "passed" : "failed", testname, tests, failures);
  1109. write_test_result();
  1110. return failures;
  1111. }
  1112. /*********************************************************************
  1113. test5: offline buffering - check max buffered
  1114. 1. call connect
  1115. 2. use proxy to disconnect the client
  1116. 3. while the client is disconnected, send more messages
  1117. 4. when the client reconnects, check that those messages are sent
  1118. *********************************************************************/
  1119. int test5_will_message_received = 0;
  1120. int test5_messages_received = 0;
  1121. int test5Finished = 0;
  1122. int test5OnFailureCalled = 0;
  1123. int test5c_connected = 0;
  1124. int test5_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
  1125. {
  1126. MQTTAsync c = (MQTTAsync)context;
  1127. static int message_count = 0;
  1128. MyLog(LOGA_DEBUG, "Message received on topic %s, \"%.*s\"", topicName, message->payloadlen, message->payload);
  1129. if (memcmp(message->payload, "will message", message->payloadlen) == 0)
  1130. test5_will_message_received = 1;
  1131. else
  1132. test5_messages_received++;
  1133. MQTTAsync_freeMessage(&message);
  1134. MQTTAsync_free(topicName);
  1135. return 1;
  1136. }
  1137. void test5cOnFailure(void* context, MQTTAsync_failureData* response)
  1138. {
  1139. MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
  1140. test5OnFailureCalled++;
  1141. test5Finished = 1;
  1142. }
  1143. void test5dOnFailure(void* context, MQTTAsync_failureData* response)
  1144. {
  1145. MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
  1146. test5OnFailureCalled++;
  1147. test5Finished = 1;
  1148. }
  1149. void test5cOnConnect(void* context, MQTTAsync_successData* response)
  1150. {
  1151. MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
  1152. MyLog(LOGA_DEBUG, "In connect onSuccess callback for client d, context %p\n", context);
  1153. MQTTAsync c = (MQTTAsync)context;
  1154. int rc;
  1155. /* send a message to the proxy to break the connection */
  1156. pubmsg.payload = "TERMINATE";
  1157. pubmsg.payloadlen = (int)strlen(pubmsg.payload);
  1158. pubmsg.qos = 0;
  1159. pubmsg.retained = 0;
  1160. rc = MQTTAsync_sendMessage(c, "MQTTSAS topic", &pubmsg, NULL);
  1161. assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  1162. }
  1163. int test5dReady = 0;
  1164. char willTopic[100];
  1165. char test_topic[100];
  1166. void test5donSubscribe(void* context, MQTTAsync_successData* response)
  1167. {
  1168. MQTTAsync c = (MQTTAsync)context;
  1169. MyLog(LOGA_DEBUG, "In subscribe onSuccess callback for client d, %p granted qos %d", c, response->alt.qos);
  1170. test5dReady = 1;
  1171. }
  1172. void test5dOnConnect(void* context, MQTTAsync_successData* response)
  1173. {
  1174. MQTTAsync c = (MQTTAsync)context;
  1175. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  1176. int rc;
  1177. int qoss[2] = {2, 2};
  1178. char* topics[2] = {willTopic, test_topic};
  1179. MyLog(LOGA_DEBUG, "In connect onSuccess callback for client c, context %p\n", context);
  1180. opts.onSuccess = test5donSubscribe;
  1181. opts.context = c;
  1182. rc = MQTTAsync_subscribeMany(c, 2, topics, qoss, &opts);
  1183. assert("Good rc from subscribe", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  1184. if (rc != MQTTASYNC_SUCCESS)
  1185. test5Finished = 1;
  1186. }
  1187. void test5cConnected(void* context, char* cause)
  1188. {
  1189. MQTTAsync c = (MQTTAsync)context;
  1190. MyLog(LOGA_DEBUG, "In connected callback for client c, context %p\n", context);
  1191. test5c_connected = 1;
  1192. }
  1193. int test5(struct Options options)
  1194. {
  1195. char* testname = "test5";
  1196. int subsqos = 2;
  1197. MQTTAsync c, d;
  1198. MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
  1199. MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
  1200. MQTTAsync_createOptions createOptions = MQTTAsync_createOptions_initializer;
  1201. int rc = 0;
  1202. int count = 0;
  1203. char clientidc[70];
  1204. char clientidd[70];
  1205. int i = 0;
  1206. sprintf(willTopic, "paho-test9-5-%s", unique);
  1207. sprintf(clientidc, "paho-test9-5-c-%s", unique);
  1208. sprintf(clientidd, "paho-test9-5-d-%s", unique);
  1209. sprintf(test_topic, "paho-test9-5-test topic %s", unique);
  1210. test5Finished = 0;
  1211. failures = 0;
  1212. MyLog(LOGA_INFO, "Starting Offline buffering 5 - max buffered");
  1213. fprintf(xml, "<testcase classname=\"test5\" name=\"%s\"", testname);
  1214. global_start_time = start_clock();
  1215. createOptions.sendWhileDisconnected = 1;
  1216. createOptions.maxBufferedMessages = 3;
  1217. rc = MQTTAsync_createWithOptions(&c, options.proxy_connection, clientidc, MQTTCLIENT_PERSISTENCE_DEFAULT,
  1218. NULL, &createOptions);
  1219. assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
  1220. if (rc != MQTTASYNC_SUCCESS)
  1221. {
  1222. MQTTAsync_destroy(&c);
  1223. goto exit;
  1224. }
  1225. rc = MQTTAsync_create(&d, options.connection, clientidd, MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
  1226. assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
  1227. if (rc != MQTTASYNC_SUCCESS)
  1228. {
  1229. MQTTAsync_destroy(&c);
  1230. goto exit;
  1231. }
  1232. opts.keepAliveInterval = 5;
  1233. opts.cleansession = 1;
  1234. opts.MQTTVersion = MQTTVERSION_3_1_1; /* proxy doesn't handle MQTT 3.1 */
  1235. //opts.username = "testuser";
  1236. //opts.password = "testpassword";
  1237. rc = MQTTAsync_setCallbacks(d, d, NULL, test5_messageArrived, NULL);
  1238. assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  1239. opts.will = NULL; /* don't need will for this client, as it's going to be connected all the time */
  1240. opts.context = d;
  1241. opts.onSuccess = test5dOnConnect;
  1242. opts.onFailure = test5dOnFailure;
  1243. MyLog(LOGA_DEBUG, "Connecting client d");
  1244. rc = MQTTAsync_connect(d, &opts);
  1245. assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  1246. if (rc != MQTTASYNC_SUCCESS)
  1247. {
  1248. failures++;
  1249. goto exit;
  1250. }
  1251. /* wait until d is ready: connected and subscribed */
  1252. count = 0;
  1253. while (!test5dReady && ++count < 10000)
  1254. MySleep(100);
  1255. assert("Count should be less than 10000", count < 10000, "count was %d", count); /* wrong */
  1256. rc = MQTTAsync_setConnected(c, c, test5cConnected);
  1257. assert("Good rc from setConnectedCallback", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  1258. /* let client c go: connect, and send disconnect command to proxy */
  1259. opts.will = &wopts;
  1260. opts.will->message = "will message";
  1261. opts.will->qos = 1;
  1262. opts.will->retained = 0;
  1263. opts.will->topicName = willTopic;
  1264. opts.onSuccess = test5cOnConnect;
  1265. opts.onFailure = test5cOnFailure;
  1266. opts.context = c;
  1267. opts.cleansession = 0;
  1268. MyLog(LOGA_DEBUG, "Connecting client c");
  1269. rc = MQTTAsync_connect(c, &opts);
  1270. assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  1271. if (rc != MQTTASYNC_SUCCESS)
  1272. {
  1273. failures++;
  1274. goto exit;
  1275. }
  1276. /* wait for will message */
  1277. while (!test5_will_message_received && ++count < 10000)
  1278. MySleep(100);
  1279. /* ensure not connected */
  1280. while (MQTTAsync_isConnected(c) && ++count < 10000)
  1281. MySleep(100);
  1282. MyLog(LOGA_DEBUG, "Now we can send some messages to be buffered");
  1283. test5c_connected = 0;
  1284. /* send some messages. Then reconnect (check connected callback), and check that those messages are received */
  1285. for (i = 0; i < 5; ++i)
  1286. {
  1287. char buf[50];
  1288. MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
  1289. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  1290. sprintf(buf, "QoS %d message", i);
  1291. pubmsg.payload = buf;
  1292. pubmsg.payloadlen = (int)(strlen(pubmsg.payload) + 1);
  1293. pubmsg.qos = i % 3;
  1294. pubmsg.retained = 0;
  1295. rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &opts);
  1296. if (i <= 2)
  1297. assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  1298. else
  1299. assert("Bad rc from sendMessage", rc == MQTTASYNC_MAX_BUFFERED_MESSAGES, "rc was %d ", rc);
  1300. }
  1301. assert3PendingTokens(c);
  1302. rc = MQTTAsync_reconnect(c);
  1303. assert("Good rc from reconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  1304. /* wait for client to be reconnected */
  1305. while (!test5c_connected && ++count < 10000)
  1306. MySleep(100);
  1307. /* wait for success or failure callback */
  1308. while (test5_messages_received < 3 && ++count < 10000)
  1309. MySleep(100);
  1310. waitForNoPendingTokens(c);
  1311. rc = MQTTAsync_disconnect(c, NULL);
  1312. assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  1313. rc = MQTTAsync_disconnect(d, NULL);
  1314. assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  1315. exit:
  1316. MySleep(200);
  1317. MQTTAsync_destroy(&c);
  1318. MQTTAsync_destroy(&d);
  1319. MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.",
  1320. (failures == 0) ? "passed" : "failed", testname, tests, failures);
  1321. write_test_result();
  1322. return failures;
  1323. }
  1324. int test6(struct Options options)
  1325. {
  1326. char* testname = "test6";
  1327. int subsqos = 2;
  1328. MQTTAsync c, d;
  1329. MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
  1330. MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
  1331. MQTTAsync_createOptions createOptions = MQTTAsync_createOptions_initializer;
  1332. int rc = 0;
  1333. int count = 0;
  1334. char clientidc[70];
  1335. char clientidd[70];
  1336. int i = 0;
  1337. test5_will_message_received = 0;
  1338. test5_messages_received = 0;
  1339. test5Finished = 0;
  1340. test5OnFailureCalled = 0;
  1341. test5c_connected = 0;
  1342. sprintf(willTopic, "paho-test9-6-%s", unique);
  1343. sprintf(clientidc, "paho-test9-6-c-%s", unique);
  1344. sprintf(clientidd, "paho-test9-6-d-%s", unique);
  1345. sprintf(test_topic, "paho-test9-6-test topic %s", unique);
  1346. test5Finished = 0;
  1347. failures = 0;
  1348. MyLog(LOGA_INFO, "Starting Offline buffering 6 - max buffered with binary will");
  1349. fprintf(xml, "<testcase classname=\"test6\" name=\"%s\"", testname);
  1350. global_start_time = start_clock();
  1351. createOptions.sendWhileDisconnected = 1;
  1352. createOptions.maxBufferedMessages = 3;
  1353. rc = MQTTAsync_createWithOptions(&c, options.proxy_connection, clientidc, MQTTCLIENT_PERSISTENCE_DEFAULT,
  1354. NULL, &createOptions);
  1355. assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
  1356. if (rc != MQTTASYNC_SUCCESS)
  1357. {
  1358. MQTTAsync_destroy(&c);
  1359. goto exit;
  1360. }
  1361. rc = MQTTAsync_create(&d, options.connection, clientidd, MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
  1362. assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
  1363. if (rc != MQTTASYNC_SUCCESS)
  1364. {
  1365. MQTTAsync_destroy(&c);
  1366. goto exit;
  1367. }
  1368. opts.keepAliveInterval = 5;
  1369. opts.cleansession = 1;
  1370. opts.MQTTVersion = MQTTVERSION_3_1_1; /* proxy doesn't handle MQTT 3.1 */
  1371. //opts.username = "testuser";
  1372. //opts.password = "testpassword";
  1373. rc = MQTTAsync_setCallbacks(d, d, NULL, test5_messageArrived, NULL);
  1374. assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  1375. opts.will = NULL; /* don't need will for this client, as it's going to be connected all the time */
  1376. opts.context = d;
  1377. opts.onSuccess = test5dOnConnect;
  1378. opts.onFailure = test5dOnFailure;
  1379. MyLog(LOGA_DEBUG, "Connecting client d");
  1380. rc = MQTTAsync_connect(d, &opts);
  1381. assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  1382. if (rc != MQTTASYNC_SUCCESS)
  1383. {
  1384. failures++;
  1385. goto exit;
  1386. }
  1387. /* wait until d is ready: connected and subscribed */
  1388. count = 0;
  1389. while (!test5dReady && ++count < 10000)
  1390. MySleep(100);
  1391. assert("Count should be less than 10000", count < 10000, "count was %d", count); /* wrong */
  1392. rc = MQTTAsync_setConnected(c, c, test5cConnected);
  1393. assert("Good rc from setConnectedCallback", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  1394. /* let client c go: connect, and send disconnect command to proxy */
  1395. opts.will = &wopts;
  1396. opts.will->payload.data = "will message";
  1397. opts.will->payload.len = (int)strlen(opts.will->payload.data) + 1;
  1398. opts.will->qos = 1;
  1399. opts.will->retained = 0;
  1400. opts.will->topicName = willTopic;
  1401. opts.onSuccess = test5cOnConnect;
  1402. opts.onFailure = test5cOnFailure;
  1403. opts.context = c;
  1404. opts.cleansession = 0;
  1405. MyLog(LOGA_DEBUG, "Connecting client c");
  1406. rc = MQTTAsync_connect(c, &opts);
  1407. assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  1408. if (rc != MQTTASYNC_SUCCESS)
  1409. {
  1410. failures++;
  1411. goto exit;
  1412. }
  1413. /* wait for will message */
  1414. while (!test5_will_message_received && ++count < 10000)
  1415. MySleep(100);
  1416. /* ensure not connected */
  1417. while (MQTTAsync_isConnected(c) && ++count < 10000)
  1418. MySleep(100);
  1419. MyLog(LOGA_DEBUG, "Now we can send some messages to be buffered");
  1420. test5c_connected = 0;
  1421. /* send some messages. Then reconnect (check connected callback), and check that those messages are received */
  1422. for (i = 0; i < 5; ++i)
  1423. {
  1424. char buf[50];
  1425. MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
  1426. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  1427. sprintf(buf, "QoS %d message", i);
  1428. pubmsg.payload = buf;
  1429. pubmsg.payloadlen = (int)(strlen(pubmsg.payload) + 1);
  1430. pubmsg.qos = i % 3;
  1431. pubmsg.retained = 0;
  1432. rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &opts);
  1433. if (i <= 2)
  1434. assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  1435. else
  1436. assert("Bad rc from sendMessage", rc == MQTTASYNC_MAX_BUFFERED_MESSAGES, "rc was %d ", rc);
  1437. }
  1438. assert3PendingTokens(c);
  1439. rc = MQTTAsync_reconnect(c);
  1440. assert("Good rc from reconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  1441. /* wait for client to be reconnected */
  1442. while (!test5c_connected && ++count < 10000)
  1443. MySleep(100);
  1444. /* wait for success or failure callback */
  1445. while (test5_messages_received < 3 && ++count < 10000)
  1446. MySleep(100);
  1447. waitForNoPendingTokens(c);
  1448. rc = MQTTAsync_disconnect(c, NULL);
  1449. assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  1450. rc = MQTTAsync_disconnect(d, NULL);
  1451. assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  1452. exit:
  1453. MySleep(200);
  1454. MQTTAsync_destroy(&c);
  1455. MQTTAsync_destroy(&d);
  1456. MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.",
  1457. (failures == 0) ? "passed" : "failed", testname, tests, failures);
  1458. write_test_result();
  1459. return failures;
  1460. }
  1461. /*********************************************************************
  1462. Test7: Fill up TCP buffer with QoS 0 messages
  1463. *********************************************************************/
  1464. int test7c_connected = 0;
  1465. int test7_will_message_received = 0;
  1466. int test7_messages_received = 0;
  1467. int test7Finished = 0;
  1468. int test7OnFailureCalled = 0;
  1469. int test7dReady = 0;
  1470. int test7_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
  1471. {
  1472. MQTTAsync c = (MQTTAsync)context;
  1473. static int message_count = 0;
  1474. MyLog(LOGA_DEBUG, "Message received on topic %s, \"%.*s\"", topicName, message->payloadlen, message->payload);
  1475. if (memcmp(message->payload, "will message", message->payloadlen) == 0)
  1476. test7_will_message_received = 1;
  1477. else
  1478. test7_messages_received++;
  1479. MQTTAsync_freeMessage(&message);
  1480. MQTTAsync_free(topicName);
  1481. return 1;
  1482. }
  1483. void test7cConnected(void* context, char* cause)
  1484. {
  1485. MQTTAsync c = (MQTTAsync)context;
  1486. MyLog(LOGA_DEBUG, "In connected callback for client c, context %p\n", context);
  1487. test7c_connected = 1;
  1488. }
  1489. void test7cOnConnectFailure(void* context, MQTTAsync_failureData* response)
  1490. {
  1491. MyLog(LOGA_DEBUG, "In c connect onFailure callback, context %p", context);
  1492. test7OnFailureCalled++;
  1493. test7Finished = 1;
  1494. }
  1495. void test7cOnConnectSuccess(void* context, MQTTAsync_successData* response)
  1496. {
  1497. MQTTAsync c = (MQTTAsync)context;
  1498. MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
  1499. MyLog(LOGA_DEBUG, "In connect onSuccess callback for client c, context %p\n", context);
  1500. /* send a message to the proxy to break the connection */
  1501. pubmsg.payload = "TERMINATE";
  1502. pubmsg.payloadlen = (int)strlen(pubmsg.payload);
  1503. pubmsg.qos = 0;
  1504. pubmsg.retained = 0;
  1505. //rc = MQTTAsync_sendMessage(c, "MQTTSAS topic", &pubmsg, NULL);
  1506. //assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  1507. }
  1508. void test7dOnConnectFailure(void* context, MQTTAsync_failureData* response)
  1509. {
  1510. MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
  1511. test7OnFailureCalled++;
  1512. test7Finished = 1;
  1513. }
  1514. void test7donSubscribe(void* context, MQTTAsync_successData* response)
  1515. {
  1516. MQTTAsync c = (MQTTAsync)context;
  1517. MyLog(LOGA_DEBUG, "In subscribe onSuccess callback for client d, %p granted qos %d", c, response->alt.qos);
  1518. test7dReady = 1;
  1519. }
  1520. void test7dOnConnectSuccess(void* context, MQTTAsync_successData* response)
  1521. {
  1522. MQTTAsync c = (MQTTAsync)context;
  1523. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  1524. int qoss[2] = {2, 2};
  1525. char* topics[2] = {willTopic, test_topic};
  1526. MyLog(LOGA_DEBUG, "In connect onSuccess callback for client c, context %p\n", context);
  1527. opts.onSuccess = test7donSubscribe;
  1528. opts.context = c;
  1529. //rc = MQTTAsync_subscribeMany(c, 2, topics, qoss, &opts);
  1530. //assert("Good rc from subscribe", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  1531. //if (rc != MQTTASYNC_SUCCESS)
  1532. // test5Finished = 1;
  1533. test7dReady = 1;
  1534. }
  1535. int test7(struct Options options)
  1536. {
  1537. char* testname = "test7";
  1538. int subsqos = 2;
  1539. MQTTAsync c, d;
  1540. MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
  1541. MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
  1542. //MQTTAsync_createOptions createOptions = MQTTAsync_createOptions_initializer;
  1543. int rc = 0;
  1544. int count = 0;
  1545. char clientidc[70];
  1546. char clientidd[70];
  1547. int i = 0;
  1548. test7_will_message_received = 0;
  1549. test7_messages_received = 0;
  1550. test7Finished = 0;
  1551. test7OnFailureCalled = 0;
  1552. test7c_connected = 0;
  1553. sprintf(willTopic, "paho-test9-7-%s", unique);
  1554. sprintf(clientidc, "paho-test9-7-c-%s", unique);
  1555. sprintf(clientidd, "paho-test9-7-d-%s", unique);
  1556. sprintf(test_topic, "longer paho-test9-7-test topic %s", unique);
  1557. test7Finished = 0;
  1558. failures = 0;
  1559. MyLog(LOGA_INFO, "Starting Offline buffering 7 - fill TCP buffer");
  1560. fprintf(xml, "<testcase classname=\"test7\" name=\"%s\"", testname);
  1561. global_start_time = start_clock();
  1562. rc = MQTTAsync_create(&c, options.proxy_connection, clientidc, MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
  1563. assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
  1564. if (rc != MQTTASYNC_SUCCESS)
  1565. {
  1566. MQTTAsync_destroy(&c);
  1567. goto exit;
  1568. }
  1569. rc = MQTTAsync_create(&d, options.connection, clientidd, MQTTCLIENT_PERSISTENCE_DEFAULT, NULL);
  1570. assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
  1571. if (rc != MQTTASYNC_SUCCESS)
  1572. {
  1573. MQTTAsync_destroy(&c);
  1574. goto exit;
  1575. }
  1576. opts.keepAliveInterval = 5;
  1577. opts.cleansession = 1;
  1578. opts.MQTTVersion = MQTTVERSION_3_1_1; /* proxy doesn't handle MQTT 3.1 */
  1579. rc = MQTTAsync_setCallbacks(d, d, NULL, test7_messageArrived, NULL);
  1580. assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  1581. opts.will = NULL; /* don't need will for this client, as it's going to be connected all the time */
  1582. opts.context = d;
  1583. opts.onSuccess = test7dOnConnectSuccess;
  1584. opts.onFailure = test7dOnConnectFailure;
  1585. MyLog(LOGA_DEBUG, "Connecting client d");
  1586. rc = MQTTAsync_connect(d, &opts);
  1587. assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  1588. if (rc != MQTTASYNC_SUCCESS)
  1589. {
  1590. failures++;
  1591. goto exit;
  1592. }
  1593. /* wait until d is ready: connected and subscribed */
  1594. count = 0;
  1595. while (!test7dReady && ++count < 10000)
  1596. {
  1597. if (test7Finished)
  1598. goto exit;
  1599. MySleep(100);
  1600. }
  1601. assert("Count should be less than 10000", count < 10000, "count was %d", count); /* wrong */
  1602. rc = MQTTAsync_setConnected(c, c, test7cConnected);
  1603. assert("Good rc from setConnectedCallback", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  1604. /* let client c go: connect, and send disconnect command to proxy */
  1605. opts.will = &wopts;
  1606. opts.will->payload.data = "will message";
  1607. opts.will->payload.len = (int)strlen(opts.will->payload.data) + 1;
  1608. opts.will->qos = 1;
  1609. opts.will->retained = 0;
  1610. opts.will->topicName = willTopic;
  1611. opts.onSuccess = test7cOnConnectSuccess;
  1612. opts.onFailure = test7cOnConnectFailure;
  1613. opts.context = c;
  1614. opts.cleansession = 0;
  1615. /*opts.automaticReconnect = 1;
  1616. opts.minRetryInterval = 3;
  1617. opts.maxRetryInterval = 6;*/
  1618. MyLog(LOGA_DEBUG, "Connecting client c");
  1619. rc = MQTTAsync_connect(c, &opts);
  1620. assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  1621. if (rc != MQTTASYNC_SUCCESS)
  1622. {
  1623. failures++;
  1624. goto exit;
  1625. }
  1626. count = 0;
  1627. while (!test7c_connected && ++count < 10000)
  1628. MySleep(100);
  1629. assert("Count should be less than 10000", count < 10000, "count was %d", count); /* wrong */
  1630. /* wait for will message */
  1631. //while (test7_will_message_received == 0 && ++count < 10000)
  1632. // MySleep(100);
  1633. MyLog(LOGA_DEBUG, "Now we can send some messages to be buffered by TCP");
  1634. test7c_connected = 0;
  1635. char buf[5000000];
  1636. /* send some messages. Then reconnect (check connected callback), and check that those messages are received */
  1637. for (i = 0; i < 50000; ++i)
  1638. {
  1639. MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
  1640. MQTTAsync_responseOptions pubopts = MQTTAsync_responseOptions_initializer;
  1641. pubmsg.qos = 0; /*i % 3;*/
  1642. sprintf(buf, "QoS %d message", pubmsg.qos);
  1643. pubmsg.payload = buf;
  1644. pubmsg.payloadlen = 5000000; //(int)(strlen(pubmsg.payload) + 1);
  1645. pubmsg.retained = 0;
  1646. rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &pubopts);
  1647. assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d\n", rc);
  1648. if (rc != 0)
  1649. {
  1650. //MyLog(LOGA_DEBUG, "Connecting client c");
  1651. //rc = MQTTAsync_connect(c, &opts);
  1652. //MySleep(1000);
  1653. break;
  1654. }
  1655. }
  1656. #if 0
  1657. assert3PendingTokens(c);
  1658. rc = MQTTAsync_reconnect(c);
  1659. assert("Good rc from reconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  1660. /* wait for client to be reconnected */
  1661. while (!test5c_connected && ++count < 10000)
  1662. MySleep(100);
  1663. /* wait for success or failure callback */
  1664. while (test5_messages_received < 3 && ++count < 10000)
  1665. MySleep(100);
  1666. waitForNoPendingTokens(c);
  1667. #endif
  1668. exit:
  1669. rc = MQTTAsync_disconnect(c, NULL);
  1670. assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  1671. rc = MQTTAsync_disconnect(d, NULL);
  1672. assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  1673. MySleep(200);
  1674. MQTTAsync_destroy(&c);
  1675. MQTTAsync_destroy(&d);
  1676. MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.",
  1677. (failures == 0) ? "passed" : "failed", testname, tests, failures);
  1678. write_test_result();
  1679. return failures;
  1680. }
  1681. /*********************************************************************
  1682. Test8: send buffered messages before connect
  1683. *********************************************************************/
  1684. int test8_messages_received = 0;
  1685. int test8Finished = 0;
  1686. int test8OnFailureCalled = 0;
  1687. int test8cConnected = 0;
  1688. int test8dConnected = 0;
  1689. int test8dSubscribed = 0;
  1690. int test8_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
  1691. {
  1692. MQTTAsync c = (MQTTAsync)context;
  1693. static int message_count = 0;
  1694. MyLog(LOGA_DEBUG, "Message received on topic %s, \"%.*s\"", topicName, message->payloadlen, message->payload);
  1695. test8_messages_received++;
  1696. MQTTAsync_freeMessage(&message);
  1697. MQTTAsync_free(topicName);
  1698. return 1;
  1699. }
  1700. void test8donSubscribe(void* context, MQTTAsync_successData* response)
  1701. {
  1702. MQTTAsync c = (MQTTAsync)context;
  1703. MyLog(LOGA_DEBUG, "In subscribe onSuccess callback for client d, %p granted qos %d", c, response->alt.qos);
  1704. test8dSubscribed = 1;
  1705. }
  1706. void test8dOnConnect(void* context, MQTTAsync_successData* response)
  1707. {
  1708. MQTTAsync c = (MQTTAsync)context;
  1709. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  1710. int rc;
  1711. MyLog(LOGA_DEBUG, "In connect onSuccess callback for client c, context %p\n", context);
  1712. test8dConnected = 1;
  1713. opts.onSuccess = test8donSubscribe;
  1714. opts.context = c;
  1715. rc = MQTTAsync_subscribe(c, test_topic, 2, &opts);
  1716. assert("Good rc from subscribe", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  1717. if (rc != MQTTASYNC_SUCCESS)
  1718. test8Finished = 1;
  1719. }
  1720. void test8OnFailure(void* context, MQTTAsync_failureData* response)
  1721. {
  1722. MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
  1723. test8OnFailureCalled++;
  1724. test8Finished = 1;
  1725. }
  1726. void test8cOnConnect(void* context, MQTTAsync_successData* response)
  1727. {
  1728. MQTTAsync c = (MQTTAsync)context;
  1729. int rc;
  1730. MyLog(LOGA_DEBUG, "In connect onSuccess callback for client c, context %p\n", context);
  1731. test8cConnected = 1;
  1732. }
  1733. int test8(struct Options options)
  1734. {
  1735. char* testname = "test8";
  1736. int subsqos = 2;
  1737. MQTTAsync c, d;
  1738. MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
  1739. MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
  1740. MQTTAsync_createOptions createOptions = MQTTAsync_createOptions_initializer;
  1741. int rc = 0;
  1742. int count = 0;
  1743. char clientidc[70];
  1744. char clientidd[70];
  1745. int i = 0;
  1746. sprintf(willTopic, "paho-test9-8-%s", unique);
  1747. sprintf(clientidc, "paho-test9-8-c-%s", unique);
  1748. sprintf(clientidd, "paho-test9-8-d-%s", unique);
  1749. sprintf(test_topic, "paho-test9-8-test topic %s", unique);
  1750. test8Finished = 0;
  1751. failures = 0;
  1752. MyLog(LOGA_INFO, "Starting Offline buffering 8 - send messages before successful connect");
  1753. fprintf(xml, "<testcase classname=\"test8\" name=\"%s\"", testname);
  1754. global_start_time = start_clock();
  1755. /* first check that by default we can't send messages before connect */
  1756. createOptions.sendWhileDisconnected = 1;
  1757. createOptions.maxBufferedMessages = 3;
  1758. rc = MQTTAsync_createWithOptions(&c, options.proxy_connection, clientidc, MQTTCLIENT_PERSISTENCE_DEFAULT,
  1759. NULL, &createOptions);
  1760. assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
  1761. if (rc != MQTTASYNC_SUCCESS)
  1762. {
  1763. MQTTAsync_destroy(&c);
  1764. goto exit;
  1765. }
  1766. /* check can't send messages */
  1767. for (i = 0; i < 5; ++i)
  1768. {
  1769. char buf[50];
  1770. MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
  1771. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  1772. sprintf(buf, "QoS %d message", i);
  1773. pubmsg.payload = buf;
  1774. pubmsg.payloadlen = (int)(strlen(pubmsg.payload) + 1);
  1775. pubmsg.qos = i % 3;
  1776. pubmsg.retained = 0;
  1777. rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &opts);
  1778. assert("Good rc from sendMessage", rc == MQTTASYNC_DISCONNECTED, "rc was %d ", rc);
  1779. }
  1780. MQTTAsync_destroy(&c);
  1781. MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_ERROR);
  1782. /* client to check receipt of messages */
  1783. rc = MQTTAsync_create(&d, options.connection, clientidd, MQTTCLIENT_PERSISTENCE_NONE, NULL);
  1784. assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
  1785. if (rc != MQTTASYNC_SUCCESS)
  1786. {
  1787. MQTTAsync_destroy(&d);
  1788. goto exit;
  1789. }
  1790. createOptions.allowDisconnectedSendAtAnyTime = 1;
  1791. rc = MQTTAsync_createWithOptions(&c, options.connection, clientidc, MQTTCLIENT_PERSISTENCE_DEFAULT,
  1792. NULL, &createOptions);
  1793. assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
  1794. if (rc != MQTTASYNC_SUCCESS)
  1795. {
  1796. MQTTAsync_destroy(&c);
  1797. MQTTAsync_destroy(&d);
  1798. goto exit;
  1799. }
  1800. rc = MQTTAsync_setCallbacks(d, d, NULL, test8_messageArrived, NULL);
  1801. assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  1802. /* let client d go and subscribe */
  1803. opts.onSuccess = test8dOnConnect;
  1804. opts.onFailure = test8OnFailure;
  1805. opts.context = d;
  1806. opts.MQTTVersion = MQTTVERSION_3_1_1; /* proxy doesn't handle MQTT 3.1 */
  1807. MyLog(LOGA_DEBUG, "Connecting client d");
  1808. rc = MQTTAsync_connect(d, &opts);
  1809. assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  1810. if (rc != MQTTASYNC_SUCCESS)
  1811. {
  1812. MQTTAsync_destroy(&c);
  1813. MQTTAsync_destroy(&d);
  1814. goto exit;
  1815. }
  1816. count = 0;
  1817. while (!test8dSubscribed && ++count < 10000)
  1818. MySleep(100);
  1819. assert("Count should be less than 10000", count < 10000, "count was %d", count); /* wrong */
  1820. /* send some messages while disconnected */
  1821. for (i = 0; i < 5; ++i)
  1822. {
  1823. char buf[50];
  1824. MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
  1825. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  1826. sprintf(buf, "QoS %d message", i);
  1827. pubmsg.payload = buf;
  1828. pubmsg.payloadlen = (int)(strlen(pubmsg.payload) + 1);
  1829. pubmsg.qos = i % 3;
  1830. pubmsg.retained = 0;
  1831. rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &opts);
  1832. if (i <= 2)
  1833. assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  1834. else
  1835. assert("Bad rc from sendMessage", rc == MQTTASYNC_MAX_BUFFERED_MESSAGES, "rc was %d ", rc);
  1836. }
  1837. assert3PendingTokens(c);
  1838. opts.onSuccess = test8cOnConnect;
  1839. opts.onFailure = test8OnFailure;
  1840. opts.context = c;
  1841. opts.cleansession = 0;
  1842. MyLog(LOGA_DEBUG, "Connecting client c");
  1843. rc = MQTTAsync_connect(c, &opts);
  1844. assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  1845. if (rc != MQTTASYNC_SUCCESS)
  1846. {
  1847. failures++;
  1848. goto exit;
  1849. }
  1850. count = 0;
  1851. while (!test8cConnected && ++count < 10000)
  1852. MySleep(100);
  1853. assert("Count should be less than 10000", count < 10000, "count was %d", count); /* wrong */
  1854. /* after connect, those queued up messages should be delivered */
  1855. while (test8_messages_received < 3 && ++count < 10000)
  1856. MySleep(100);
  1857. waitForNoPendingTokens(c);
  1858. rc = MQTTAsync_disconnect(c, NULL);
  1859. assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  1860. rc = MQTTAsync_disconnect(d, NULL);
  1861. assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  1862. exit:
  1863. MySleep(200);
  1864. MQTTAsync_destroy(&c);
  1865. MQTTAsync_destroy(&d);
  1866. MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.",
  1867. (failures == 0) ? "passed" : "failed", testname, tests, failures);
  1868. write_test_result();
  1869. return failures;
  1870. }
  1871. /*********************************************************************
  1872. Test9: large nos of messages on create
  1873. *********************************************************************/
  1874. int test9_messages_received = 0;
  1875. int test9Finished = 0;
  1876. int test9OnFailureCalled = 0;
  1877. int test9cConnected = 0;
  1878. void test9OnFailure(void* context, MQTTAsync_failureData* response)
  1879. {
  1880. MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
  1881. test9OnFailureCalled++;
  1882. test9Finished = 1;
  1883. }
  1884. void test9cOnConnect(void* context, MQTTAsync_successData* response)
  1885. {
  1886. MQTTAsync c = (MQTTAsync)context;
  1887. MyLog(LOGA_DEBUG, "In connect onSuccess callback for client c, context %p\n", context);
  1888. test9cConnected = 1;
  1889. }
  1890. int test9(struct Options options)
  1891. {
  1892. char* testname = "test9";
  1893. int subsqos = 2;
  1894. MQTTAsync c;
  1895. MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
  1896. MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
  1897. MQTTAsync_createOptions createOptions = MQTTAsync_createOptions_initializer;
  1898. int rc = 0;
  1899. int count = 0;
  1900. char clientidc[70];
  1901. int i = 0;
  1902. START_TIME_TYPE start;
  1903. int no_buffered_messages = 50000;
  1904. sprintf(willTopic, "paho-test9-9-%s", unique);
  1905. sprintf(clientidc, "paho-test9-9-c-%s", unique);
  1906. sprintf(test_topic, "paho-test9-9-test topic %s", unique);
  1907. test9Finished = 0;
  1908. failures = 0;
  1909. MyLog(LOGA_INFO, "Starting Offline buffering - large nos messages on create");
  1910. fprintf(xml, "<testcase classname=\"test\" name=\"%s\"", testname);
  1911. global_start_time = start_clock();
  1912. createOptions.allowDisconnectedSendAtAnyTime = 1;
  1913. createOptions.sendWhileDisconnected = 1;
  1914. createOptions.maxBufferedMessages = no_buffered_messages;
  1915. rc = MQTTAsync_createWithOptions(&c, options.connection, clientidc, MQTTCLIENT_PERSISTENCE_DEFAULT,
  1916. NULL, &createOptions);
  1917. assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
  1918. if (rc != MQTTASYNC_SUCCESS)
  1919. {
  1920. MQTTAsync_destroy(&c);
  1921. goto exit;
  1922. }
  1923. /* send some messages while disconnected */
  1924. for (i = 0; i < no_buffered_messages; ++i)
  1925. {
  1926. char buf[50];
  1927. MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
  1928. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  1929. sprintf(buf, "QoS %d message", i);
  1930. pubmsg.payload = buf;
  1931. pubmsg.payloadlen = (int)(strlen(pubmsg.payload) + 1);
  1932. pubmsg.qos = i % 3;
  1933. pubmsg.retained = 0;
  1934. rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &opts);
  1935. assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  1936. }
  1937. MQTTAsync_destroy(&c);
  1938. MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_ERROR);
  1939. MyLog(LOGA_INFO, "Create starting with %d messages", no_buffered_messages);
  1940. start = start_clock();
  1941. rc = MQTTAsync_createWithOptions(&c, options.connection, clientidc, MQTTCLIENT_PERSISTENCE_DEFAULT,
  1942. NULL, &createOptions);
  1943. assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
  1944. if (rc != MQTTASYNC_SUCCESS)
  1945. {
  1946. MQTTAsync_destroy(&c);
  1947. goto exit;
  1948. }
  1949. long used = elapsed(start);
  1950. MyLog(LOGA_INFO, "Time taken for create %ld ms", used);
  1951. opts.onSuccess = test9cOnConnect;
  1952. opts.onFailure = test9OnFailure;
  1953. opts.context = c;
  1954. opts.cleansession = 1;
  1955. opts.MQTTVersion = MQTTVERSION_3_1_1; /* proxy doesn't handle MQTT 3.1 */
  1956. MyLog(LOGA_DEBUG, "Connecting client c");
  1957. rc = MQTTAsync_connect(c, &opts);
  1958. assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  1959. if (rc != MQTTASYNC_SUCCESS)
  1960. goto exit;
  1961. count = 0;
  1962. while (!test9cConnected && ++count < 10000)
  1963. MySleep(100);
  1964. assert("Count should be less than 10000", count < 10000, "count was %d", count); /* wrong */
  1965. rc = MQTTAsync_disconnect(c, NULL);
  1966. assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  1967. exit:
  1968. MySleep(200);
  1969. MQTTAsync_destroy(&c);
  1970. MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.",
  1971. (failures == 0) ? "passed" : "failed", testname, tests, failures);
  1972. write_test_result();
  1973. return failures;
  1974. }
  1975. /*********************************************************************
  1976. Test10: delete oldest buffered messages first on buffer full
  1977. *********************************************************************/
  1978. int test10_messages_received = 0;
  1979. int test10Finished = 0;
  1980. int test10OnFailureCalled = 0;
  1981. int test10cConnected = 0;
  1982. int test10dConnected = 0;
  1983. int test10dSubscribed = 0;
  1984. int test10MessagesToSend = 6;
  1985. int test10MessageSeqno = 3;
  1986. int test10_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
  1987. {
  1988. MQTTAsync c = (MQTTAsync)context;
  1989. static int message_count = 0;
  1990. int sequence_no = atoi(message->payload);
  1991. MyLog(LOGA_INFO, "Message received on topic %s, \"%.*s\"", topicName, message->payloadlen, message->payload);
  1992. test10_messages_received++;
  1993. assert("Expected message sequence no", test10MessageSeqno == sequence_no, "sequence_no was %d\n", sequence_no);
  1994. test10MessageSeqno++;
  1995. MQTTAsync_freeMessage(&message);
  1996. MQTTAsync_free(topicName);
  1997. return 1;
  1998. }
  1999. void test10donSubscribe(void* context, MQTTAsync_successData* response)
  2000. {
  2001. MQTTAsync c = (MQTTAsync)context;
  2002. MyLog(LOGA_DEBUG, "In subscribe onSuccess callback for client d, %p granted qos %d", c, response->alt.qos);
  2003. test10dSubscribed = 1;
  2004. }
  2005. void test10dOnConnect(void* context, MQTTAsync_successData* response)
  2006. {
  2007. MQTTAsync c = (MQTTAsync)context;
  2008. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  2009. int rc;
  2010. MyLog(LOGA_DEBUG, "In connect onSuccess callback for client c, context %p\n", context);
  2011. test10dConnected = 1;
  2012. opts.onSuccess = test10donSubscribe;
  2013. opts.context = c;
  2014. rc = MQTTAsync_subscribe(c, test_topic, 2, &opts);
  2015. assert("Good rc from subscribe", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  2016. if (rc != MQTTASYNC_SUCCESS)
  2017. test10Finished = 1;
  2018. }
  2019. void test10OnFailure(void* context, MQTTAsync_failureData* response)
  2020. {
  2021. MyLog(LOGA_DEBUG, "In connect onFailure callback, context %p", context);
  2022. test10OnFailureCalled++;
  2023. test10Finished = 1;
  2024. }
  2025. void test10cOnConnect(void* context, MQTTAsync_successData* response)
  2026. {
  2027. MQTTAsync c = (MQTTAsync)context;
  2028. int rc;
  2029. MyLog(LOGA_DEBUG, "In connect onSuccess callback for client c, context %p\n", context);
  2030. test10cConnected = 1;
  2031. }
  2032. int test10(struct Options options)
  2033. {
  2034. char* testname = "test10";
  2035. int subsqos = 2;
  2036. MQTTAsync c, d;
  2037. MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
  2038. MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
  2039. MQTTAsync_createOptions createOptions = MQTTAsync_createOptions_initializer;
  2040. int rc = 0;
  2041. int count = 0;
  2042. char clientidc[70];
  2043. char clientidd[70];
  2044. int i = 0;
  2045. sprintf(willTopic, "paho-test9-10-%s", unique);
  2046. sprintf(clientidc, "paho-test9-10-c-%s", unique);
  2047. sprintf(clientidd, "paho-test9-10-d-%s", unique);
  2048. sprintf(test_topic, "paho-test9-10-test topic %s", unique);
  2049. test10Finished = 0;
  2050. failures = 0;
  2051. MyLog(LOGA_INFO, "Starting Offline buffering 10 - delete oldest buffered messages first");
  2052. fprintf(xml, "<testcase classname=\"test9\" name=\"%s\"", testname);
  2053. global_start_time = start_clock();
  2054. rc = MQTTAsync_create(&d, options.connection, clientidd, MQTTCLIENT_PERSISTENCE_NONE, NULL);
  2055. assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
  2056. if (rc != MQTTASYNC_SUCCESS)
  2057. {
  2058. MQTTAsync_destroy(&d);
  2059. goto exit;
  2060. }
  2061. createOptions.sendWhileDisconnected = 1;
  2062. createOptions.maxBufferedMessages = 3;
  2063. createOptions.allowDisconnectedSendAtAnyTime = 1;
  2064. createOptions.deleteOldestMessages = 1;
  2065. rc = MQTTAsync_createWithOptions(&c, options.connection, clientidc, MQTTCLIENT_PERSISTENCE_DEFAULT,
  2066. NULL, &createOptions);
  2067. assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
  2068. if (rc != MQTTASYNC_SUCCESS)
  2069. {
  2070. MQTTAsync_destroy(&c);
  2071. MQTTAsync_destroy(&d);
  2072. goto exit;
  2073. }
  2074. rc = MQTTAsync_setCallbacks(d, d, NULL, test10_messageArrived, NULL);
  2075. assert("Good rc from setCallbacks", rc == MQTTASYNC_SUCCESS, "rc was %d", rc);
  2076. /* let client d go and subscribe */
  2077. opts.MQTTVersion = MQTTVERSION_3_1_1; /* proxy doesn't handle MQTT 3.1 */
  2078. opts.onSuccess = test10dOnConnect;
  2079. opts.onFailure = test10OnFailure;
  2080. opts.context = d;
  2081. MyLog(LOGA_DEBUG, "Connecting client d");
  2082. rc = MQTTAsync_connect(d, &opts);
  2083. assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  2084. if (rc != MQTTASYNC_SUCCESS)
  2085. {
  2086. MQTTAsync_destroy(&c);
  2087. MQTTAsync_destroy(&d);
  2088. goto exit;
  2089. }
  2090. count = 0;
  2091. while (!test10dSubscribed && ++count < 10000)
  2092. MySleep(100);
  2093. assert("Count should be less than 10000", count < 10000, "count was %d", count); /* wrong */
  2094. /* send some messages while disconnected */
  2095. for (i = 0; i < test10MessagesToSend; ++i)
  2096. {
  2097. char buf[50];
  2098. MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
  2099. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  2100. pubmsg.qos = i % 3;
  2101. sprintf(buf, "%d message no, QoS %d", i, pubmsg.qos);
  2102. pubmsg.payload = buf;
  2103. pubmsg.payloadlen = (int)(strlen(pubmsg.payload) + 1);
  2104. pubmsg.retained = 0;
  2105. rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &opts);
  2106. assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  2107. }
  2108. assert3PendingTokens(c);
  2109. opts.onSuccess = test10cOnConnect;
  2110. opts.onFailure = test10OnFailure;
  2111. opts.context = c;
  2112. opts.cleansession = 0;
  2113. MyLog(LOGA_DEBUG, "Connecting client c");
  2114. rc = MQTTAsync_connect(c, &opts);
  2115. assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  2116. if (rc != MQTTASYNC_SUCCESS)
  2117. {
  2118. failures++;
  2119. goto exit;
  2120. }
  2121. count = 0;
  2122. while (!test10cConnected && ++count < 10000)
  2123. MySleep(100);
  2124. assert("Count should be less than 10000", count < 10000, "count was %d", count); /* wrong */
  2125. /* after connect, those queued up messages should be delivered */
  2126. while (test10_messages_received < 3 && ++count < 10000)
  2127. MySleep(100);
  2128. waitForNoPendingTokens(c);
  2129. /* Now try the same thing, but force messages to be persisted and re-read */
  2130. /* disconnect so we buffer some messages again */
  2131. rc = MQTTAsync_disconnect(c, NULL);
  2132. assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  2133. /* send some messages while disconnected */
  2134. for (i = 0; i < test10MessagesToSend; ++i)
  2135. {
  2136. char buf[50];
  2137. MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
  2138. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  2139. pubmsg.qos = i % 3;
  2140. sprintf(buf, "%d message no, QoS %d", i, pubmsg.qos);
  2141. pubmsg.payload = buf;
  2142. pubmsg.payloadlen = (int)(strlen(pubmsg.payload) + 1);
  2143. pubmsg.retained = 0;
  2144. rc = MQTTAsync_sendMessage(c, test_topic, &pubmsg, &opts);
  2145. assert("Good rc from sendMessage", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  2146. }
  2147. assert3PendingTokens(c);
  2148. MQTTAsync_destroy(&c);
  2149. test10MessageSeqno = 3;
  2150. /* re-read persistence */
  2151. rc = MQTTAsync_createWithOptions(&c, options.connection, clientidc, MQTTCLIENT_PERSISTENCE_DEFAULT,
  2152. NULL, &createOptions);
  2153. assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
  2154. assert3PendingTokens(c);
  2155. MyLog(LOGA_DEBUG, "Connecting client c");
  2156. rc = MQTTAsync_connect(c, &opts);
  2157. assert("Good rc from connect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  2158. if (rc != MQTTASYNC_SUCCESS)
  2159. {
  2160. failures++;
  2161. goto exit;
  2162. }
  2163. test10_messages_received = count = 0;
  2164. while (!test10cConnected && ++count < 10000)
  2165. MySleep(100);
  2166. assert("Count should be less than 10000", count < 10000, "count was %d", count); /* wrong */
  2167. /* after connect, those queued up messages should be delivered */
  2168. while (test10_messages_received < 3 && ++count < 10000)
  2169. MySleep(100);
  2170. waitForNoPendingTokens(c);
  2171. rc = MQTTAsync_disconnect(c, NULL);
  2172. assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  2173. rc = MQTTAsync_disconnect(d, NULL);
  2174. assert("Good rc from disconnect", rc == MQTTASYNC_SUCCESS, "rc was %d ", rc);
  2175. exit:
  2176. MySleep(200);
  2177. MQTTAsync_destroy(&c);
  2178. MQTTAsync_destroy(&d);
  2179. MyLog(LOGA_INFO, "%s: test %s. %d tests run, %d failures.",
  2180. (failures == 0) ? "passed" : "failed", testname, tests, failures);
  2181. write_test_result();
  2182. return failures;
  2183. }
  2184. int main(int argc, char** argv)
  2185. {
  2186. int* numtests = &tests;
  2187. int rc = 0;
  2188. int (*tests[])() = { NULL, test1, test2, test3, test4, test5, test6, test7, test8, test9, test10};
  2189. time_t randtime;
  2190. srand((unsigned) time(&randtime));
  2191. sprintf(unique, "%u", rand());
  2192. MyLog(LOGA_INFO, "Random prefix/suffix is %s", unique);
  2193. xml = fopen("TEST-test9.xml", "w");
  2194. fprintf(xml, "<testsuite name=\"test9\" tests=\"%d\">\n", (int)(ARRAY_SIZE(tests) - 1));
  2195. MQTTAsync_setTraceCallback(handleTrace);
  2196. getopts(argc, argv);
  2197. if (options.test_no == 0)
  2198. { /* run all the tests */
  2199. for (options.test_no = 1; options.test_no < ARRAY_SIZE(tests); ++options.test_no)
  2200. {
  2201. failures = 0;
  2202. MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_ERROR);
  2203. rc += tests[options.test_no](options); /* return number of failures. 0 = test succeeded */
  2204. }
  2205. }
  2206. else
  2207. {
  2208. MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_ERROR);
  2209. rc = tests[options.test_no](options); /* run just the selected test */
  2210. }
  2211. MyLog(LOGA_INFO, "Total tests run: %d", *numtests);
  2212. if (rc == 0)
  2213. MyLog(LOGA_INFO, "verdict pass");
  2214. else
  2215. MyLog(LOGA_INFO, "verdict fail");
  2216. fprintf(xml, "</testsuite>\n");
  2217. fclose(xml);
  2218. return rc;
  2219. }