ThreadTaskQueue.cpp 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. //
  2. // ThreadTaskQueue.cpp
  3. // LibDriveRating-CXX
  4. //
  5. // Created by Melo Yao on 6/9/14.
  6. // Copyright (c) 2014 AutoNavi. All rights reserved.
  7. //
  8. #include "ThreadTaskQueue.h"
  9. #include <algorithm>
  10. #define WAIT_TIMEOUT 5000
  11. using namespace threadpp;
  12. namespace tq{
  13. class QueueRunnable
  14. {
  15. ThreadTaskQueue* queue;
  16. ITask* currentTask;
  17. protected:
  18. static void run_callback(void*);
  19. thread* th;
  20. QueueRunnable(ThreadTaskQueue* q):queue(q),currentTask(NULL){}
  21. void run();
  22. void CancelCurrentTask();
  23. bool TaskIsRunning() const;
  24. friend class ThreadTaskQueue;
  25. };
  26. void QueueRunnable::run_callback(void *ctx)
  27. {
  28. ((QueueRunnable*) ctx)->run();
  29. }
  30. void QueueRunnable::run()
  31. {
  32. while (queue->IsStarted()) {
  33. queue->LockQueue();
  34. ITask* task = queue->NextTask();
  35. if (task == NULL) {
  36. queue->UnlockQueue();
  37. continue;
  38. }
  39. currentTask = task;
  40. queue->UnlockQueue();
  41. task->Run();
  42. queue->LockQueue();
  43. currentTask = NULL;
  44. queue->FinishTask(task);
  45. queue->NotifyQueue();
  46. queue->UnlockQueue();
  47. }
  48. }
  49. void QueueRunnable::CancelCurrentTask()
  50. {
  51. queue->LockQueue();
  52. if(currentTask)
  53. {
  54. currentTask->Cancel();
  55. }
  56. queue->UnlockQueue();
  57. }
  58. bool QueueRunnable::TaskIsRunning() const
  59. {
  60. return currentTask != NULL;
  61. }
  62. ThreadTaskQueue::ThreadTaskQueue():_tasklist(),_started(false),_suspended(false)
  63. {
  64. }
  65. void ThreadTaskQueue::Start(unsigned int nThreads)
  66. {
  67. _mutex.lock();
  68. if (_started) {
  69. _mutex.unlock();
  70. return;
  71. }
  72. _started = true;
  73. _threads.reserve(nThreads);
  74. for (int i = 0; i<nThreads; ++i) {
  75. QueueRunnable* runnable = new QueueRunnable(this);
  76. runnable->th = new thread(QueueRunnable::run_callback, runnable);
  77. _threads.push_back(runnable);
  78. }
  79. _mutex.unlock();
  80. }
  81. void ThreadTaskQueue::Stop()
  82. {
  83. _mutex.lock();
  84. if (!_started) {
  85. _mutex.unlock();
  86. return;
  87. }
  88. _started = false;
  89. for (std::list<ITask*>::iterator it = _tasklist.begin(); it!= _tasklist.end(); ++it) {
  90. delete *it;
  91. }
  92. _tasklist.clear();
  93. _mutex.notify_all();
  94. std::vector<QueueRunnable*> copy(_threads);
  95. _threads.clear();
  96. _mutex.unlock();
  97. for (std::vector<QueueRunnable*>::iterator it = copy.begin(); it!=copy.end(); ++it) {
  98. (*it)->th->join();
  99. thread* t = (*it)->th;
  100. delete (*it);
  101. delete t;
  102. }
  103. }
  104. bool ThreadTaskQueue::IsStarted() const
  105. {
  106. return _started;
  107. }
  108. void ThreadTaskQueue::AddTask(ITask* task)
  109. {
  110. _mutex.lock();
  111. if (_started) {
  112. _tasklist.push_back(task);
  113. _mutex.notify_all();
  114. }
  115. _mutex.unlock();
  116. }
  117. void ThreadTaskQueue::GetTasks(ITask** tasksBuf, unsigned int taskBufSize) const
  118. {
  119. recursivelock* mutex = const_cast<recursivelock*>(&_mutex);
  120. mutex->lock();
  121. size_t count = 0;
  122. for (std::list<ITask*>::const_iterator it = _tasklist.begin(); it!=_tasklist.end(); ++it) {
  123. if (count<taskBufSize) {
  124. tasksBuf[count] = *it;
  125. count++;
  126. }
  127. else
  128. {
  129. break;
  130. }
  131. }
  132. mutex->unlock();
  133. }
  134. unsigned int ThreadTaskQueue::TaskCount() const
  135. {
  136. recursivelock* mutex = const_cast<recursivelock*>(&_mutex);
  137. mutex->lock();
  138. unsigned int count = (unsigned int)_tasklist.size();
  139. mutex->unlock();
  140. return count;
  141. }
  142. void ThreadTaskQueue::CancelAll()
  143. {
  144. _mutex.lock();
  145. for (std::vector<QueueRunnable*>::iterator it = _threads.begin(); it!=_threads.end(); ++it) {
  146. (*it)->CancelCurrentTask();
  147. }
  148. for (std::list<ITask*>::const_iterator it = _tasklist.begin(); it!=_tasklist.end(); ++it) {
  149. (*it)->Cancel();
  150. }
  151. _mutex.unlock();
  152. }
  153. void ThreadTaskQueue::WaitForFinish()
  154. {
  155. while (true) {
  156. _mutex.lock();
  157. bool isExecuting = false;
  158. for (std::vector<QueueRunnable*>::iterator it = _threads.begin(); it!=_threads.end(); ++it) {
  159. if ((*it)->TaskIsRunning()) {
  160. isExecuting = true;
  161. break;
  162. }
  163. }
  164. if (!isExecuting&&_tasklist.size() == 0) {
  165. _mutex.unlock();
  166. break;
  167. }
  168. _mutex.wait(100);
  169. _mutex.unlock();
  170. }
  171. }
  172. void ThreadTaskQueue::Suspend()
  173. {
  174. _mutex.lock();
  175. _suspended = true;
  176. _mutex.unlock();
  177. }
  178. void ThreadTaskQueue::Resume()
  179. {
  180. _mutex.lock();
  181. _suspended = false;
  182. _mutex.notify_all();
  183. _mutex.unlock();
  184. }
  185. void ThreadTaskQueue::NotifyQueue()
  186. {
  187. _mutex.notify_all();
  188. }
  189. ITask* ThreadTaskQueue::NextTask()
  190. {
  191. while (_started && (_tasklist.empty()||_suspended)) {
  192. _mutex.wait(WAIT_TIMEOUT);//defensive waiting time limit.
  193. }
  194. ITask* task = NULL;
  195. if (_tasklist.size()>0) {
  196. task = _tasklist.front();
  197. _tasklist.pop_front();
  198. }
  199. return task;
  200. }
  201. inline
  202. void ThreadTaskQueue::LockQueue()
  203. {
  204. _mutex.lock();
  205. }
  206. inline
  207. void ThreadTaskQueue::UnlockQueue()
  208. {
  209. _mutex.unlock();
  210. }
  211. inline
  212. void ThreadTaskQueue::FinishTask(ITask* task)
  213. {
  214. if(task->GetCategory() != NoCategory)
  215. {
  216. _recyclerMutex.lock();
  217. std::map<TaskCategory,RecyclerPair>::iterator it = _recyclers.find(task->GetCategory());
  218. if(it!=_recyclers.end())
  219. {
  220. RecyclerPair pair = it->second;
  221. _recyclerMutex.unlock();
  222. pair.recycler(task,pair.context);
  223. return;
  224. }
  225. _recyclerMutex.unlock();
  226. }
  227. //析构任务
  228. delete task;
  229. }
  230. void ThreadTaskQueue::SetTaskRecycler(TaskCategory cat, TaskRecycler recycler,void *context)
  231. {
  232. _recyclerMutex.lock();
  233. std::pair<TaskCategory,RecyclerPair> pair(cat, RecyclerPair(recycler,context));
  234. _recyclers.insert(pair);
  235. _recyclerMutex.unlock();
  236. }
  237. void ThreadTaskQueue::ClearTaskRecycler(TaskCategory cat)
  238. {
  239. _recyclerMutex.lock();
  240. std::map<TaskCategory,RecyclerPair>::iterator it = _recyclers.find(cat);
  241. if(it!=_recyclers.end())
  242. {
  243. _recyclers.erase(it);
  244. }
  245. _recyclerMutex.unlock();
  246. }
  247. ThreadTaskQueue::~ThreadTaskQueue()
  248. {
  249. this->Stop();//Defensive stop.
  250. }
  251. }