ThreadPool.h 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. /*
  2. * ThreadPool 线程池,
  3. *
  4. * */
  5. //例如
  6. // ThreadPool thread_pool(4);
  7. // std::future<int> x = thread_pool.enqueue( []{return 0;} );
  8. // std::cout << x.get() << std::endl;
  9. //例如
  10. /*
  11. ThreadPool pool(4);
  12. std::vector< std::future<int> > results;
  13. for(int i = 0; i < 8; ++i) {
  14. results.emplace_back(
  15. pool.enqueue([i] {
  16. std::cout << "hello " << i << std::endl;
  17. std::this_thread::sleep_for(std::chrono::seconds(1));
  18. std::cout << "world " << i << std::endl;
  19. return i*i;
  20. })
  21. );
  22. }
  23. for(auto && result: results)
  24. std::cout << result.get() << ' ';
  25. std::cout << std::endl;
  26. return 0;
  27. */
  28. #ifndef THREAD_POOL_H
  29. #define THREAD_POOL_H
  30. #include <vector>
  31. #include <queue>
  32. #include <memory>
  33. #include <thread>
  34. #include <mutex>
  35. #include <condition_variable>
  36. #include <future>
  37. #include <functional>
  38. #include <stdexcept>
  39. class ThreadPool {
  40. public:
  41. //构造函数, 会自动初始化 threads_size 数量的线程
  42. ThreadPool(size_t threads_size);
  43. //往线程池添加执行任务, 之后会唤醒一个线程去执行他.
  44. //input: F&& f 函数指针(函数名)
  45. //input: Args&&... args 函数的参数, 自定义
  46. template<class F, class... Args>
  47. auto enqueue(F&& f, Args&&... args)
  48. -> std::future<typename std::result_of<F(Args...)>::type>;
  49. ~ThreadPool();
  50. //判断线程池是否超载
  51. bool thread_is_full_load();
  52. private:
  53. // 线程数组
  54. std::vector< std::thread > workers;
  55. //每个线程的工作状态, true:线程正在执行任务, false:线程空闲等待
  56. std::vector< bool > working_flag_vector;
  57. // 任务函数 队列, 里面存入的是任务函数的指针
  58. std::queue< std::function<void()> > tasks;
  59. // 线程锁和条件变量
  60. std::mutex queue_mutex;
  61. std::condition_variable condition;
  62. //终止标志位
  63. bool stop;
  64. };
  65. //构造函数, 会自动初始化 threads_size 数量的线程
  66. inline ThreadPool::ThreadPool(size_t threads)
  67. : stop(false)
  68. {
  69. //每个线程的工作状态
  70. for(size_t i = 0;i<threads;++i)
  71. {
  72. working_flag_vector.push_back(false);
  73. }
  74. //初始化 threads_size 数量的线程
  75. for(size_t i = 0;i<threads;++i)
  76. workers.emplace_back(
  77. [i,this] //每个线程的执行的基本函数,
  78. {
  79. for(;;)
  80. {
  81. std::function<void()> task;
  82. {
  83. std::unique_lock<std::mutex> lock(this->queue_mutex);
  84. this->working_flag_vector[i] = false;//线程等待
  85. this->condition.wait(lock,
  86. [this] //线程等待的判断函数
  87. { return this->stop || !this->tasks.empty(); });
  88. if (this->stop )//&& this->tasks.empty()) //这里修改了, 不需要把任务池都执行完才退出, stop之后就可以退了.
  89. {
  90. return;//只有在终止标志位true, 那么就退出线程执行函数
  91. }
  92. this->working_flag_vector[i] = true;//线程工作
  93. //从 任务池 里面取出 执行函数
  94. task = std::move(this->tasks.front());
  95. this->tasks.pop();
  96. }
  97. //运行执行函数
  98. task();
  99. }
  100. }
  101. );
  102. }
  103. //往线程池添加执行任务, 之后会唤醒一个线程去执行他.
  104. //input: F&& f 函数指针(函数名)
  105. //input: Args&&... args 函数的参数, 自定义
  106. //注注注注注意了::::: res是enqueue的返回值, 由于线程异步, 使用future, 可以返回未来的一个值,
  107. // 在子线程执行完成之后, 将结果返回给外部主线程
  108. // 外部主线程 调用时, 必须使用 std::future<return_type> 格式去接受
  109. template<class F, class... Args>
  110. auto ThreadPool::enqueue(F&& f, Args&&... args)
  111. -> std::future<typename std::result_of<F(Args...)>::type>
  112. {
  113. using return_type = typename std::result_of<F(Args...)>::type;
  114. auto task = std::make_shared< std::packaged_task<return_type()> >(
  115. std::bind(std::forward<F>(f), std::forward<Args>(args)...)
  116. );
  117. std::future<return_type> res = task->get_future();
  118. {
  119. std::unique_lock<std::mutex> lock(queue_mutex);
  120. // don't allow enqueueing after stopping the pool
  121. if(stop)
  122. throw std::runtime_error("enqueue on stopped ThreadPool");
  123. tasks.emplace([task](){ (*task)(); });
  124. }
  125. condition.notify_one();
  126. return res;
  127. }
  128. // the destructor joins all threads
  129. inline ThreadPool::~ThreadPool()
  130. {
  131. {
  132. std::unique_lock<std::mutex> lock(queue_mutex);
  133. stop = true;
  134. }
  135. condition.notify_all();
  136. for(std::thread &worker: workers)
  137. { worker.join();}
  138. }
  139. //判断线程池是否超载
  140. bool ThreadPool::thread_is_full_load()
  141. {
  142. //只要有一个线程wait, 那么就认为没有超载,
  143. std::unique_lock<std::mutex> lock(queue_mutex);
  144. bool result = true;
  145. for(bool t_working_flag: working_flag_vector)
  146. {
  147. if ( !t_working_flag )
  148. {
  149. result = false;
  150. }
  151. }
  152. return result;
  153. }
  154. #endif