ThreadTaskQueue.cpp 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. //
  2. // ThreadTaskQueue.cpp
  3. // LibDriveRating-CXX
  4. //
  5. // Created by Melo Yao on 6/9/14.
  6. // Copyright (c) 2014 AutoNavi. All rights reserved.
  7. //
  8. #include "ThreadTaskQueue.h"
  9. #include <algorithm>
  10. #define WAIT_TIMEOUT 5000
  11. using namespace threadpp;
  12. namespace tq{
  13. class QueueRunnable
  14. {
  15. ThreadTaskQueue* queue;
  16. ITask* currentTask;
  17. protected:
  18. static void run_callback(void*);
  19. thread* th;
  20. QueueRunnable(ThreadTaskQueue* q):queue(q),currentTask(NULL){}
  21. void run();
  22. void CancelCurrentTask();
  23. bool TaskIsRunning() const;
  24. friend class ThreadTaskQueue;
  25. };
  26. void QueueRunnable::run_callback(void *ctx)
  27. {
  28. ((QueueRunnable*) ctx)->run();
  29. }
  30. void QueueRunnable::run()
  31. {
  32. while (queue->IsStarted()) {
  33. queue->LockQueue();
  34. ITask* task = queue->NextTask();
  35. if (task == NULL) {
  36. queue->UnlockQueue();
  37. continue;
  38. }
  39. currentTask = task;
  40. queue->UnlockQueue();
  41. task->Run();
  42. queue->LockQueue();
  43. currentTask = NULL;
  44. queue->FinishTask(task);
  45. //析构任务
  46. delete task;
  47. queue->NotifyQueue();
  48. queue->UnlockQueue();
  49. }
  50. }
  51. void QueueRunnable::CancelCurrentTask()
  52. {
  53. queue->LockQueue();
  54. if(currentTask)
  55. {
  56. currentTask->Cancel();
  57. }
  58. queue->UnlockQueue();
  59. }
  60. bool QueueRunnable::TaskIsRunning() const
  61. {
  62. return currentTask != NULL;
  63. }
  64. ThreadTaskQueue::ThreadTaskQueue():_tasklist(),_started(false),_suspended(false)
  65. {
  66. }
  67. void ThreadTaskQueue::Start(unsigned int nThreads)
  68. {
  69. _mutex.lock();
  70. if (_started) {
  71. _mutex.unlock();
  72. return;
  73. }
  74. _started = true;
  75. _threads.reserve(nThreads);
  76. for (int i = 0; i<nThreads; ++i) {
  77. QueueRunnable* runnable = new QueueRunnable(this);
  78. runnable->th = new thread(QueueRunnable::run_callback, runnable);
  79. _threads.push_back(runnable);
  80. }
  81. _mutex.unlock();
  82. }
  83. void ThreadTaskQueue::Stop()
  84. {
  85. _mutex.lock();
  86. if (!_started) {
  87. _mutex.unlock();
  88. return;
  89. }
  90. _started = false;
  91. for (std::list<ITask*>::iterator it = _tasklist.begin(); it!= _tasklist.end(); ++it) {
  92. delete *it;
  93. }
  94. _tasklist.clear();
  95. _mutex.notify_all();
  96. std::vector<QueueRunnable*> copy(_threads);
  97. _threads.clear();
  98. _mutex.unlock();
  99. for (std::vector<QueueRunnable*>::iterator it = copy.begin(); it!=copy.end(); ++it) {
  100. (*it)->th->join();
  101. thread* t = (*it)->th;
  102. delete (*it);
  103. delete t;
  104. }
  105. }
  106. bool ThreadTaskQueue::IsStarted() const
  107. {
  108. return _started;
  109. }
  110. void ThreadTaskQueue::AddTask(ITask* task)
  111. {
  112. _mutex.lock();
  113. if (_started) {
  114. _tasklist.push_back(task);
  115. _mutex.notify_all();
  116. }
  117. _mutex.unlock();
  118. }
  119. void ThreadTaskQueue::GetTasks(ITask** tasksBuf, unsigned int taskBufSize) const
  120. {
  121. recursivelock* mutex = const_cast<recursivelock*>(&_mutex);
  122. mutex->lock();
  123. size_t count = 0;
  124. for (std::list<ITask*>::const_iterator it = _tasklist.begin(); it!=_tasklist.end(); ++it) {
  125. if (count<taskBufSize) {
  126. tasksBuf[count] = *it;
  127. count++;
  128. }
  129. else
  130. {
  131. break;
  132. }
  133. }
  134. mutex->unlock();
  135. }
  136. unsigned int ThreadTaskQueue::TaskCount() const
  137. {
  138. recursivelock* mutex = const_cast<recursivelock*>(&_mutex);
  139. mutex->lock();
  140. unsigned int count = (unsigned int)_tasklist.size();
  141. mutex->unlock();
  142. return count;
  143. }
  144. void ThreadTaskQueue::CancelAll()
  145. {
  146. _mutex.lock();
  147. for (std::vector<QueueRunnable*>::iterator it = _threads.begin(); it!=_threads.end(); ++it) {
  148. (*it)->CancelCurrentTask();
  149. }
  150. for (std::list<ITask*>::const_iterator it = _tasklist.begin(); it!=_tasklist.end(); ++it) {
  151. (*it)->Cancel();
  152. }
  153. _mutex.unlock();
  154. }
  155. void ThreadTaskQueue::WaitForFinish()
  156. {
  157. while (true) {
  158. _mutex.lock();
  159. bool isExecuting = false;
  160. for (std::vector<QueueRunnable*>::iterator it = _threads.begin(); it!=_threads.end(); ++it) {
  161. if ((*it)->TaskIsRunning()) {
  162. isExecuting = true;
  163. break;
  164. }
  165. }
  166. if (!isExecuting&&_tasklist.size() == 0) {
  167. _mutex.unlock();
  168. break;
  169. }
  170. _mutex.wait(100);
  171. _mutex.unlock();
  172. }
  173. }
  174. void ThreadTaskQueue::Suspend()
  175. {
  176. _mutex.lock();
  177. _suspended = true;
  178. _mutex.unlock();
  179. }
  180. void ThreadTaskQueue::Resume()
  181. {
  182. _mutex.lock();
  183. _suspended = false;
  184. _mutex.notify_all();
  185. _mutex.unlock();
  186. }
  187. void ThreadTaskQueue::NotifyQueue()
  188. {
  189. _mutex.notify_all();
  190. }
  191. ITask* ThreadTaskQueue::NextTask()
  192. {
  193. while (_started && (_tasklist.empty()||_suspended)) {
  194. _mutex.wait(WAIT_TIMEOUT);//defensive waiting time limit.
  195. }
  196. ITask* task = NULL;
  197. if (_tasklist.size()>0) {
  198. task = _tasklist.front();
  199. _tasklist.pop_front();
  200. }
  201. return task;
  202. }
  203. inline
  204. void ThreadTaskQueue::LockQueue()
  205. {
  206. _mutex.lock();
  207. }
  208. inline
  209. void ThreadTaskQueue::UnlockQueue()
  210. {
  211. _mutex.unlock();
  212. }
  213. inline
  214. void ThreadTaskQueue::FinishTask(ITask* task)
  215. {
  216. if(task->GetCategory() != NoCategory)
  217. {
  218. _recyclerMutex.lock();
  219. std::map<TaskCategory,RecyclerPair>::iterator it = _recyclers.find(task->GetCategory());
  220. if(it!=_recyclers.end())
  221. {
  222. RecyclerPair pair = it->second;
  223. _recyclerMutex.unlock();
  224. pair.recycler(task,pair.context);
  225. return;
  226. }
  227. _recyclerMutex.unlock();
  228. }
  229. }
  230. void ThreadTaskQueue::SetTaskRecycler(TaskCategory cat, TaskRecycler recycler,void *context)
  231. {
  232. _recyclerMutex.lock();
  233. std::pair<TaskCategory,RecyclerPair> pair(cat, RecyclerPair(recycler,context));
  234. _recyclers.insert(pair);
  235. _recyclerMutex.unlock();
  236. }
  237. void ThreadTaskQueue::ClearTaskRecycler(TaskCategory cat)
  238. {
  239. _recyclerMutex.lock();
  240. std::map<TaskCategory,RecyclerPair>::iterator it = _recyclers.find(cat);
  241. if(it!=_recyclers.end())
  242. {
  243. _recyclers.erase(it);
  244. }
  245. _recyclerMutex.unlock();
  246. }
  247. ThreadTaskQueue::~ThreadTaskQueue()
  248. {
  249. this->Stop();//Defensive stop.
  250. }
  251. }