thread_pool.hpp 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. /*
  2. * Thread_pool 线程池,
  3. *
  4. * */
  5. //例如
  6. // Thread_pool thread_pool(4);
  7. // std::future<int> x = thread_pool.enqueue( []{return 0;} );
  8. // std::cout << x.get() << std::endl;
  9. //例如
  10. /*
  11. Thread_pool pool(4);
  12. std::vector< std::future<int> > results;
  13. for(int i = 0; i < 8; ++i) {
  14. results.emplace_back(
  15. pool.enqueue([i] {
  16. std::cout << "hello " << i << std::endl;
  17. std::this_thread::sleep_for(std::chrono::seconds(1));
  18. std::cout << "world " << i << std::endl;
  19. return i*i;
  20. })
  21. );
  22. }
  23. for(auto && result: results)
  24. std::cout << result.get() << ' ';
  25. std::cout << std::endl;
  26. return 0;
  27. */
  28. #ifndef THREAD_POOL_H
  29. #define THREAD_POOL_H
  30. #include <vector>
  31. #include <queue>
  32. #include <memory>
  33. #include <thread>
  34. #include <mutex>
  35. #include <condition_variable>
  36. #include <future>
  37. #include <functional>
  38. #include <stdexcept>
  39. class Thread_pool {
  40. public:
  41. //构造函数, 会自动初始化 threads_size 数量的线程
  42. Thread_pool(size_t threads_size);
  43. //构造函数,没有初始化的,后续需要调用init才能正常使用
  44. Thread_pool();
  45. //初始化,初始化 threads_size 数量的线程
  46. void thread_pool_init(size_t threads_size);
  47. //反初始化
  48. void thread_pool_uninit();
  49. //往线程池添加执行任务, 之后会唤醒一个线程去执行他.
  50. //input: F&& f 函数指针(函数名)
  51. //input: Args&&... args 函数的参数, 自定义
  52. template<class F, class... Args>
  53. auto enqueue(F&& f, Args&&... args)
  54. -> std::future<typename std::result_of<F(Args...)>::type>;
  55. ~Thread_pool();
  56. //判断线程池是否超载
  57. bool thread_is_full_load();
  58. private:
  59. // 线程数组
  60. std::vector< std::thread > workers;
  61. //每个线程的工作状态, true:线程正在执行任务, false:线程空闲等待
  62. std::vector< bool > working_flag_vector;
  63. // 任务函数 队列, 里面存入的是任务函数的指针
  64. std::queue< std::function<void()> > tasks;
  65. // 线程锁和条件变量
  66. std::mutex queue_mutex;
  67. std::condition_variable condition;
  68. //终止标志位
  69. bool stop;
  70. };
  71. //构造函数, 会自动初始化 threads_size 数量的线程
  72. inline Thread_pool::Thread_pool(size_t threads_size)
  73. : stop(false)
  74. {
  75. //每个线程的工作状态
  76. for(size_t i = 0;i<threads_size;++i)
  77. {
  78. working_flag_vector.push_back(false);
  79. }
  80. //初始化 threads_size 数量的线程
  81. for(size_t i = 0;i<threads_size;++i)
  82. workers.emplace_back(
  83. [i,this] //每个线程的执行的基本函数,
  84. {
  85. for(;;)
  86. {
  87. std::function<void()> task;
  88. {
  89. std::unique_lock<std::mutex> lock(this->queue_mutex);
  90. this->working_flag_vector[i] = false;//线程等待
  91. this->condition.wait(lock,
  92. [this] //线程等待的判断函数
  93. { return this->stop || !this->tasks.empty(); });
  94. if (this->stop )//&& this->tasks.empty()) //这里修改了, 不需要把任务池都执行完才退出, stop之后就可以退了.
  95. {
  96. return;//只有在终止标志位true, 那么就退出线程执行函数
  97. }
  98. this->working_flag_vector[i] = true;//线程工作
  99. //从 任务池 里面取出 执行函数
  100. task = std::move(this->tasks.front());
  101. this->tasks.pop();
  102. }
  103. //运行执行函数
  104. task();
  105. }
  106. }
  107. );
  108. }
  109. //构造函数,没有初始化的,后续需要调用init才能正常使用
  110. inline Thread_pool::Thread_pool()
  111. : stop(false)
  112. {
  113. }
  114. //初始化,初始化 threads_size 数量的线程
  115. inline void Thread_pool::thread_pool_init(size_t threads_size)
  116. {
  117. stop = false;
  118. //每个线程的工作状态
  119. for(size_t i = 0;i<threads_size;++i)
  120. {
  121. working_flag_vector.push_back(false);
  122. }
  123. //初始化 threads_size 数量的线程
  124. for(size_t i = 0;i<threads_size;++i)
  125. workers.emplace_back(
  126. [i,this] //每个线程的执行的基本函数,
  127. {
  128. for(;;)
  129. {
  130. std::function<void()> task;
  131. {
  132. std::unique_lock<std::mutex> lock(this->queue_mutex);
  133. this->working_flag_vector[i] = false;//线程等待
  134. this->condition.wait(lock,
  135. [this] //线程等待的判断函数
  136. { return this->stop || !this->tasks.empty(); });
  137. if (this->stop )//&& this->tasks.empty()) //这里修改了, 不需要把任务池都执行完才退出, stop之后就可以退了.
  138. {
  139. return;//只有在终止标志位true, 那么就退出线程执行函数
  140. }
  141. this->working_flag_vector[i] = true;//线程工作
  142. //从 任务池 里面取出 执行函数
  143. task = std::move(this->tasks.front());
  144. this->tasks.pop();
  145. }
  146. //运行执行函数
  147. task();
  148. }
  149. }
  150. );
  151. }
  152. //反初始化
  153. inline void Thread_pool::thread_pool_uninit()
  154. {
  155. {
  156. std::unique_lock<std::mutex> lock(queue_mutex);
  157. stop = true;
  158. }
  159. condition.notify_all();
  160. for (auto iter = workers.begin(); iter != workers.end(); )
  161. {
  162. iter->join();
  163. iter = workers.erase(iter);
  164. }
  165. working_flag_vector.clear();
  166. }
  167. //往线程池添加执行任务, 之后会唤醒一个线程去执行他.
  168. //input: F&& f 函数指针(函数名)
  169. //input: Args&&... args 函数的参数, 自定义
  170. //注注注注注意了::::: res是enqueue的返回值, 由于线程异步, 使用future, 可以返回未来的一个值,
  171. // 在子线程执行完成之后, 将结果返回给外部主线程
  172. // 外部主线程 调用时, 必须使用 std::future<return_type> 格式去接受
  173. template<class F, class... Args>
  174. auto Thread_pool::enqueue(F&& f, Args&&... args)
  175. -> std::future<typename std::result_of<F(Args...)>::type>
  176. {
  177. using return_type = typename std::result_of<F(Args...)>::type;
  178. auto task = std::make_shared< std::packaged_task<return_type()> >(
  179. std::bind(std::forward<F>(f), std::forward<Args>(args)...)
  180. );
  181. std::future<return_type> res = task->get_future();
  182. {
  183. std::unique_lock<std::mutex> lock(queue_mutex);
  184. // don't allow enqueueing after stopping the pool
  185. if(stop)
  186. throw std::runtime_error("enqueue on stopped Thread_pool");
  187. tasks.emplace([task](){ (*task)(); });
  188. }
  189. condition.notify_one();
  190. return res;
  191. }
  192. // the destructor joins all threads
  193. inline Thread_pool::~Thread_pool()
  194. {
  195. {
  196. std::unique_lock<std::mutex> lock(queue_mutex);
  197. stop = true;
  198. }
  199. condition.notify_all();
  200. for(std::thread &worker: workers)
  201. { worker.join();}
  202. }
  203. //判断线程池是否超载
  204. inline bool Thread_pool::thread_is_full_load()
  205. {
  206. //只要有一个线程wait, 那么就认为没有超载,
  207. std::unique_lock<std::mutex> lock(queue_mutex);
  208. bool result = true;
  209. for(bool t_working_flag: working_flag_vector)
  210. {
  211. if ( !t_working_flag )
  212. {
  213. result = false;
  214. }
  215. }
  216. return result;
  217. }
  218. #endif