test_issue373.c 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434
  1. /*******************************************************************************
  2. * Copyright (c) 2012, 2017 IBM Corp.
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v2.0
  6. * and Eclipse Distribution License v1.0 which accompany this distribution.
  7. *
  8. * The Eclipse Public License is available at
  9. * https://www.eclipse.org/legal/epl-2.0/
  10. * and the Eclipse Distribution License is available at
  11. * http://www.eclipse.org/org/documents/edl-v10.php.
  12. *
  13. * Contributors:
  14. *******************************************************************************/
  15. /**
  16. * @file
  17. * Test for issues 373, 385: Memory leak and segmentation fault during connection lost and reconnect
  18. *
  19. */
  20. #include "MQTTAsync.h"
  21. #include <string.h>
  22. #include <stdlib.h>
  23. #include "Thread.h"
  24. #if !defined(_WINDOWS)
  25. #include <sys/time.h>
  26. #include <sys/socket.h>
  27. #include <unistd.h>
  28. #include <errno.h>
  29. #else
  30. #include <windows.h>
  31. #endif
  32. #include "Heap.h" // for Heap_get_info
  33. // undefine macros from Heap.h:
  34. #undef malloc
  35. #undef realloc
  36. #undef free
  37. char unique[50]; // unique suffix/prefix to add to clientid/topic etc
  38. #define ARRAY_SIZE(a) (sizeof(a) / sizeof(a[0]))
  39. void usage(void)
  40. {
  41. printf("help!!\n");
  42. exit(EXIT_FAILURE);
  43. }
  44. struct Options
  45. {
  46. char* connection; /**< connection to system under test. */
  47. char* proxy_connection; /**< connection to proxy */
  48. int verbose;
  49. int test_no;
  50. unsigned int QoS;
  51. unsigned int iterrations;
  52. } options =
  53. {
  54. "localhost:1883",
  55. "localhost:1884",
  56. 0,
  57. 0,
  58. 0,
  59. 5
  60. };
  61. void getopts(int argc, char** argv)
  62. {
  63. int count = 1;
  64. while (count < argc)
  65. {
  66. if (strcmp(argv[count], "--test_no") == 0)
  67. {
  68. if (++count < argc)
  69. options.test_no = atoi(argv[count]);
  70. else
  71. usage();
  72. }
  73. else if (strcmp(argv[count], "--connection") == 0)
  74. {
  75. if (++count < argc)
  76. options.connection = argv[count];
  77. else
  78. usage();
  79. }
  80. else if (strcmp(argv[count], "--proxy_connection") == 0)
  81. {
  82. if (++count < argc)
  83. options.proxy_connection = argv[count];
  84. else
  85. usage();
  86. }
  87. else if (strcmp(argv[count], "--verbose") == 0)
  88. options.verbose = 1;
  89. count++;
  90. }
  91. }
  92. #define LOGA_DEBUG 0
  93. #define LOGA_INFO 1
  94. #include <stdarg.h>
  95. #include <time.h>
  96. #include <sys/timeb.h>
  97. void MyLog(int LOGA_level, char* format, ...)
  98. {
  99. static char msg_buf[256];
  100. va_list args;
  101. #if defined(_WIN32) || defined(_WINDOWS)
  102. struct timeb ts;
  103. #else
  104. struct timeval ts;
  105. #endif
  106. struct tm *timeinfo;
  107. if (LOGA_level == LOGA_DEBUG && options.verbose == 0)
  108. return;
  109. #if defined(_WIN32) || defined(_WINDOWS)
  110. ftime(&ts);
  111. timeinfo = localtime(&ts.time);
  112. #else
  113. gettimeofday(&ts, NULL);
  114. timeinfo = localtime(&ts.tv_sec);
  115. #endif
  116. strftime(msg_buf, 80, "%Y%m%d %H%M%S", timeinfo);
  117. #if defined(_WIN32) || defined(_WINDOWS)
  118. sprintf(&msg_buf[strlen(msg_buf)], ".%.3hu ", ts.millitm);
  119. #else
  120. sprintf(&msg_buf[strlen(msg_buf)], ".%.3lu ", ts.tv_usec / 1000L);
  121. #endif
  122. va_start(args, format);
  123. vsnprintf(&msg_buf[strlen(msg_buf)], sizeof(msg_buf) - strlen(msg_buf), format, args);
  124. va_end(args);
  125. printf("%s\n", msg_buf);
  126. fflush(stdout);
  127. }
  128. void MySleep(long milliseconds)
  129. {
  130. #if defined(_WIN32) || defined(_WIN64)
  131. Sleep(milliseconds);
  132. #else
  133. usleep(milliseconds*1000);
  134. #endif
  135. }
  136. #define assert(a, b, c, d) myassert(__FILE__, __LINE__, a, b, c, d)
  137. int tests = 0;
  138. int failures = 0;
  139. int connected = 0;
  140. int pendingMessageCnt = 0; /* counter of messages which are currently queued for publish */
  141. int pendingMessageCntMax = 0;
  142. int failedPublishCnt = 0;
  143. int goodPublishCnt = 0;
  144. int connectCnt = 0;
  145. int connecting = 0;
  146. void myassert(char* filename, int lineno, char* description, int value,
  147. char* format, ...)
  148. {
  149. ++tests;
  150. if (!value)
  151. {
  152. va_list args;
  153. ++failures;
  154. MyLog(LOGA_INFO, "Assertion failed, file %s, line %d, description: %s", filename,
  155. lineno, description);
  156. va_start(args, format);
  157. vprintf(format, args);
  158. va_end(args);
  159. }
  160. else
  161. MyLog(LOGA_DEBUG, "Assertion succeeded, file %s, line %d, description: %s",
  162. filename, lineno, description);
  163. }
  164. void test1373OnFailure(void* context, MQTTAsync_failureData* response)
  165. {
  166. MyLog(LOGA_INFO, "In connect onFailure callback, context %p", context);
  167. connecting = 0;
  168. }
  169. void test373OnConnect(void* context, MQTTAsync_successData* response)
  170. {
  171. connected = 1;
  172. connecting = 0;
  173. connectCnt++;
  174. MyLog(LOGA_INFO, "Established MQTT connection to %s",response->alt.connect.serverURI);
  175. char MqttVersion[40];
  176. switch (response->alt.connect.MQTTVersion)
  177. {
  178. case MQTTVERSION_3_1:
  179. sprintf(MqttVersion," MQTT version 3.1");
  180. break;
  181. case MQTTVERSION_3_1_1:
  182. sprintf(MqttVersion, " MQTT version 3.1.1");
  183. break;
  184. default:
  185. sprintf(MqttVersion, " MQTT version %d",response->alt.connect.MQTTVersion);
  186. }
  187. MyLog(LOGA_INFO, " %s\n",MqttVersion);
  188. MyLog(LOGA_INFO, "connectCnt %d\n",connectCnt);
  189. }
  190. void test373ConnectionLost(void* context, char* cause)
  191. {
  192. connected = 0;
  193. MyLog(LOGA_INFO, "Disconnected from MQTT broker reason %s",cause);
  194. }
  195. void test373DeliveryComplete(void* context, MQTTAsync_token token)
  196. {
  197. }
  198. void test373_onWriteSuccess(void* context, MQTTAsync_successData* response)
  199. {
  200. pendingMessageCnt--;
  201. goodPublishCnt++;
  202. }
  203. void test373_onWriteFailure(void* context, MQTTAsync_failureData* response)
  204. {
  205. pendingMessageCnt--;
  206. failedPublishCnt++;
  207. }
  208. int test373_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
  209. {
  210. return 0;
  211. }
  212. static char test373Payload[] = "No one is interested in this payload";
  213. int test373SendPublishMessage(MQTTAsync handle,int id, const unsigned int QoS)
  214. {
  215. int rc = 0;
  216. MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
  217. MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
  218. char topic[ sizeof(unique) + 40];
  219. sprintf(topic,"%s/test373/item_%03d",unique,id);
  220. opts.onFailure = test373_onWriteFailure;
  221. opts.onSuccess = test373_onWriteSuccess;
  222. pubmsg.payload = test373Payload;
  223. pubmsg.payloadlen = sizeof(test373Payload);
  224. pubmsg.qos = QoS;
  225. rc = MQTTAsync_sendMessage( handle, topic,&pubmsg,&opts);
  226. if (rc == MQTTASYNC_SUCCESS)
  227. {
  228. pendingMessageCnt++;
  229. if (pendingMessageCnt > pendingMessageCntMax) pendingMessageCntMax = pendingMessageCnt;
  230. }
  231. else
  232. {
  233. MyLog(LOGA_INFO, "Failed to queue message for send with retvalue %d",rc);
  234. }
  235. return rc;
  236. }
  237. int test_373(struct Options options)
  238. {
  239. char* testname = "test373";
  240. MQTTAsync mqttasyncContext;
  241. MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
  242. MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
  243. int rc = 0;
  244. char clientid[30 + sizeof(unique)];
  245. heap_info* mqtt_mem = 0;
  246. MyLog(LOGA_INFO, "Running test373 with QoS=%u, iterrations=%u\n",options.QoS,options.iterrations);
  247. sprintf(clientid, "paho-test373-%s", unique);
  248. connectCnt = 0;
  249. rc = MQTTAsync_create(&mqttasyncContext, options.proxy_connection, clientid,
  250. MQTTCLIENT_PERSISTENCE_NONE,
  251. NULL);
  252. assert("good rc from create", rc == MQTTASYNC_SUCCESS, "rc was %d \n", rc);
  253. if (rc != MQTTASYNC_SUCCESS)
  254. {
  255. goto exit;
  256. }
  257. opts.connectTimeout = 2;
  258. opts.keepAliveInterval = 20;
  259. opts.cleansession = 0;
  260. opts.MQTTVersion = MQTTVERSION_DEFAULT;
  261. opts.onSuccess = test373OnConnect;
  262. opts.onFailure = test1373OnFailure;
  263. opts.context = mqttasyncContext;
  264. rc = MQTTAsync_setCallbacks(mqttasyncContext,mqttasyncContext,
  265. test373ConnectionLost,
  266. test373_messageArrived,
  267. test373DeliveryComplete);
  268. if (rc != MQTTASYNC_SUCCESS)
  269. {
  270. goto exit;
  271. }
  272. MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_ERROR);
  273. while (connectCnt < (int)options.iterrations)
  274. {
  275. if (!connected)
  276. {
  277. MyLog(LOGA_INFO, "Connected %d connectCnt %d\n",connected,connectCnt);
  278. MyLog(LOGA_INFO, "PublishCnt %d, FailedCnt %d, Pending %d maxPending %d",
  279. goodPublishCnt,failedPublishCnt,pendingMessageCnt,pendingMessageCntMax);
  280. #if !defined(_WINDOWS)
  281. mqtt_mem = Heap_get_info();
  282. MyLog(LOGA_INFO, "MQTT mem current %ld, max %ld",mqtt_mem->current_size,mqtt_mem->max_size);
  283. #endif
  284. /* (re)connect to the broker */
  285. if (connecting)
  286. {
  287. MySleep((1+opts.connectTimeout) * 1000); /* but wait for all pending connect attempts to timeout */
  288. }
  289. else
  290. {
  291. rc = MQTTAsync_connect(mqttasyncContext, &opts);
  292. if (rc != MQTTASYNC_SUCCESS)
  293. {
  294. failures++;
  295. goto exit;
  296. }
  297. connecting = 1;
  298. }
  299. }
  300. else
  301. {
  302. /* while connected send 100 message per second */
  303. int topicId;
  304. for(topicId=0; topicId < 100; topicId++)
  305. {
  306. rc = test373SendPublishMessage(mqttasyncContext,topicId,options.QoS);
  307. if (rc != MQTTASYNC_SUCCESS) break;
  308. }
  309. MySleep(100);
  310. }
  311. }
  312. MySleep(5000);
  313. MyLog(LOGA_INFO, "PublishCnt %d, FailedCnt %d, Pending %d maxPending %d",
  314. goodPublishCnt,failedPublishCnt,pendingMessageCnt,pendingMessageCntMax);
  315. #if !defined(_WINDOWS)
  316. mqtt_mem = Heap_get_info();
  317. MyLog(LOGA_INFO, "MQTT mem current %ld, max %ld",mqtt_mem->current_size,mqtt_mem->max_size);
  318. #endif
  319. MQTTAsync_disconnect(mqttasyncContext, NULL);
  320. connected = 0;
  321. MyLog(LOGA_INFO, "PublishCnt %d, FailedCnt %d, Pending %d maxPending %d",
  322. goodPublishCnt,failedPublishCnt,pendingMessageCnt,pendingMessageCntMax);
  323. #if !defined(_WINDOWS)
  324. mqtt_mem = Heap_get_info();
  325. MyLog(LOGA_INFO, "MQTT mem current %ld, max %ld",mqtt_mem->current_size,mqtt_mem->max_size);
  326. #endif
  327. exit:
  328. MQTTAsync_destroy(&mqttasyncContext);
  329. #if !defined(_WINDOWS)
  330. mqtt_mem = Heap_get_info();
  331. MyLog(LOGA_INFO, "MQTT mem current %ld, max %ld",mqtt_mem->current_size,mqtt_mem->max_size);
  332. if (mqtt_mem->current_size > 0) failures++; /* consider any not freed memory as failure */
  333. #endif
  334. return failures;
  335. }
  336. void handleTrace(enum MQTTASYNC_TRACE_LEVELS level, char* message)
  337. {
  338. printf("%s\n", message);
  339. }
  340. int main(int argc, char** argv)
  341. {
  342. int* numtests = &tests;
  343. int rc = 0;
  344. int (*tests[])() = { NULL, test_373};
  345. unsigned int QoS;
  346. sprintf(unique, "%u", rand());
  347. MyLog(LOGA_INFO, "Random prefix/suffix is %s", unique);
  348. MQTTAsync_setTraceCallback(handleTrace);
  349. getopts(argc, argv);
  350. if (options.test_no == 0)
  351. { /* run all the tests */
  352. for (options.test_no = 1; options.test_no < ARRAY_SIZE(tests); ++options.test_no)
  353. {
  354. /* test with QoS 0, 1 and 2 and just 5 iterrations */
  355. for (QoS = 0; QoS < 3; QoS++)
  356. {
  357. failures = 0;
  358. options.QoS = QoS;
  359. options.iterrations = 5;
  360. MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_ERROR);
  361. rc += tests[options.test_no](options); /* return number of failures. 0 = test succeeded */
  362. }
  363. if (rc == 0)
  364. {
  365. /* Test with much more iterrations for QoS = 0 */
  366. failures = 0;
  367. options.QoS = 0;
  368. options.iterrations = 100;
  369. MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_ERROR);
  370. rc += tests[options.test_no](options); /* return number of failures. 0 = test succeeded */
  371. }
  372. }
  373. }
  374. else
  375. {
  376. MQTTAsync_setTraceLevel(MQTTASYNC_TRACE_ERROR);
  377. rc = tests[options.test_no](options); /* run just the selected test */
  378. }
  379. if (rc == 0)
  380. MyLog(LOGA_INFO, "verdict pass");
  381. else
  382. MyLog(LOGA_INFO, "verdict fail");
  383. return rc;
  384. }
  385. /* Local Variables: */
  386. /* indent-tabs-mode: t */
  387. /* c-basic-offset: 8 */
  388. /* End: */