thread_pool.h 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. /*
  2. * Thread_pool 线程池,
  3. *
  4. * */
  5. //例如
  6. // Thread_pool thread_pool(4);
  7. // std::future<int> x = thread_pool.enqueue( []{return 0;} );
  8. // std::cout << x.get() << std::endl;
  9. //例如
  10. /*
  11. Thread_pool pool(4);
  12. std::vector< std::future<int> > results;
  13. for(int i = 0; i < 8; ++i) {
  14. results.emplace_back(
  15. pool.enqueue([i] {
  16. std::cout << "hello " << i << std::endl;
  17. std::this_thread::sleep_for(std::chrono::seconds(1));
  18. std::cout << "world " << i << std::endl;
  19. return i*i;
  20. })
  21. );
  22. }
  23. for(auto && result: results)
  24. std::cout << result.get() << ' ';
  25. std::cout << std::endl;
  26. return 0;
  27. */
  28. #ifndef THREAD_POOL_H
  29. #define THREAD_POOL_H
  30. #include <vector>
  31. #include <queue>
  32. #include <memory>
  33. #include <thread>
  34. #include <mutex>
  35. #include <condition_variable>
  36. #include <future>
  37. #include <functional>
  38. #include <stdexcept>
  39. class Thread_pool {
  40. public:
  41. //构造函数, 会自动初始化 threads_size 数量的线程
  42. Thread_pool(size_t threads_size);
  43. //构造函数,没有初始化的,后续需要调用init才能正常使用
  44. Thread_pool();
  45. //初始化,初始化 threads_size 数量的线程
  46. void thread_pool_init(size_t threads_size);
  47. //往线程池添加执行任务, 之后会唤醒一个线程去执行他.
  48. //input: F&& f 函数指针(函数名)
  49. //input: Args&&... args 函数的参数, 自定义
  50. template<class F, class... Args>
  51. auto enqueue(F&& f, Args&&... args)
  52. -> std::future<typename std::result_of<F(Args...)>::type>;
  53. ~Thread_pool();
  54. //判断线程池是否超载
  55. bool thread_is_full_load();
  56. private:
  57. // 线程数组
  58. std::vector< std::thread > workers;
  59. //每个线程的工作状态, true:线程正在执行任务, false:线程空闲等待
  60. std::vector< bool > working_flag_vector;
  61. // 任务函数 队列, 里面存入的是任务函数的指针
  62. std::queue< std::function<void()> > tasks;
  63. // 线程锁和条件变量
  64. std::mutex queue_mutex;
  65. std::condition_variable condition;
  66. //终止标志位
  67. bool stop;
  68. };
  69. //构造函数, 会自动初始化 threads_size 数量的线程
  70. inline Thread_pool::Thread_pool(size_t threads_size)
  71. : stop(false)
  72. {
  73. //每个线程的工作状态
  74. for(size_t i = 0;i<threads_size;++i)
  75. {
  76. working_flag_vector.push_back(false);
  77. }
  78. //初始化 threads_size 数量的线程
  79. for(size_t i = 0;i<threads_size;++i)
  80. workers.emplace_back(
  81. [i,this] //每个线程的执行的基本函数,
  82. {
  83. for(;;)
  84. {
  85. std::function<void()> task;
  86. {
  87. std::unique_lock<std::mutex> lock(this->queue_mutex);
  88. this->working_flag_vector[i] = false;//线程等待
  89. this->condition.wait(lock,
  90. [this] //线程等待的判断函数
  91. { return this->stop || !this->tasks.empty(); });
  92. if (this->stop )//&& this->tasks.empty()) //这里修改了, 不需要把任务池都执行完才退出, stop之后就可以退了.
  93. {
  94. return;//只有在终止标志位true, 那么就退出线程执行函数
  95. }
  96. this->working_flag_vector[i] = true;//线程工作
  97. //从 任务池 里面取出 执行函数
  98. task = std::move(this->tasks.front());
  99. this->tasks.pop();
  100. }
  101. //运行执行函数
  102. task();
  103. }
  104. }
  105. );
  106. }
  107. //构造函数,没有初始化的,后续需要调用init才能正常使用
  108. inline Thread_pool::Thread_pool()
  109. : stop(false)
  110. {
  111. }
  112. //初始化,初始化 threads_size 数量的线程
  113. inline void Thread_pool::thread_pool_init(size_t threads_size)
  114. {
  115. stop = false;
  116. //每个线程的工作状态
  117. for(size_t i = 0;i<threads_size;++i)
  118. {
  119. working_flag_vector.push_back(false);
  120. }
  121. //初始化 threads_size 数量的线程
  122. for(size_t i = 0;i<threads_size;++i)
  123. workers.emplace_back(
  124. [i,this] //每个线程的执行的基本函数,
  125. {
  126. for(;;)
  127. {
  128. std::function<void()> task;
  129. {
  130. std::unique_lock<std::mutex> lock(this->queue_mutex);
  131. this->working_flag_vector[i] = false;//线程等待
  132. this->condition.wait(lock,
  133. [this] //线程等待的判断函数
  134. { return this->stop || !this->tasks.empty(); });
  135. if (this->stop )//&& this->tasks.empty()) //这里修改了, 不需要把任务池都执行完才退出, stop之后就可以退了.
  136. {
  137. return;//只有在终止标志位true, 那么就退出线程执行函数
  138. }
  139. this->working_flag_vector[i] = true;//线程工作
  140. //从 任务池 里面取出 执行函数
  141. task = std::move(this->tasks.front());
  142. this->tasks.pop();
  143. }
  144. //运行执行函数
  145. task();
  146. }
  147. }
  148. );
  149. }
  150. //往线程池添加执行任务, 之后会唤醒一个线程去执行他.
  151. //input: F&& f 函数指针(函数名)
  152. //input: Args&&... args 函数的参数, 自定义
  153. //注注注注注意了::::: res是enqueue的返回值, 由于线程异步, 使用future, 可以返回未来的一个值,
  154. // 在子线程执行完成之后, 将结果返回给外部主线程
  155. // 外部主线程 调用时, 必须使用 std::future<return_type> 格式去接受
  156. template<class F, class... Args>
  157. auto Thread_pool::enqueue(F&& f, Args&&... args)
  158. -> std::future<typename std::result_of<F(Args...)>::type>
  159. {
  160. using return_type = typename std::result_of<F(Args...)>::type;
  161. auto task = std::make_shared< std::packaged_task<return_type()> >(
  162. std::bind(std::forward<F>(f), std::forward<Args>(args)...)
  163. );
  164. std::future<return_type> res = task->get_future();
  165. {
  166. std::unique_lock<std::mutex> lock(queue_mutex);
  167. // don't allow enqueueing after stopping the pool
  168. if(stop)
  169. throw std::runtime_error("enqueue on stopped Thread_pool");
  170. tasks.emplace([task](){ (*task)(); });
  171. }
  172. condition.notify_one();
  173. return res;
  174. }
  175. // the destructor joins all threads
  176. inline Thread_pool::~Thread_pool()
  177. {
  178. {
  179. std::unique_lock<std::mutex> lock(queue_mutex);
  180. stop = true;
  181. }
  182. condition.notify_all();
  183. for(std::thread &worker: workers)
  184. { worker.join();}
  185. }
  186. //判断线程池是否超载
  187. inline bool Thread_pool::thread_is_full_load()
  188. {
  189. //只要有一个线程wait, 那么就认为没有超载,
  190. std::unique_lock<std::mutex> lock(queue_mutex);
  191. bool result = true;
  192. for(bool t_working_flag: working_flag_vector)
  193. {
  194. if ( !t_working_flag )
  195. {
  196. result = false;
  197. }
  198. }
  199. return result;
  200. }
  201. #endif