123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262 |
- /*
- * Thread_pool 线程池,
- *
- * */
- //例如
- // Thread_pool thread_pool(4);
- // std::future<int> x = thread_pool.enqueue( []{return 0;} );
- // std::cout << x.get() << std::endl;
- //例如
- /*
- Thread_pool pool(4);
- std::vector< std::future<int> > results;
- for(int i = 0; i < 8; ++i) {
- results.emplace_back(
- pool.enqueue([i] {
- std::cout << "hello " << i << std::endl;
- std::this_thread::sleep_for(std::chrono::seconds(1));
- std::cout << "world " << i << std::endl;
- return i*i;
- })
- );
- }
- for(auto && result: results)
- std::cout << result.get() << ' ';
- std::cout << std::endl;
- return 0;
- */
- #ifndef THREAD_POOL_H
- #define THREAD_POOL_H
- #include <vector>
- #include <queue>
- #include <memory>
- #include <thread>
- #include <mutex>
- #include <condition_variable>
- #include <future>
- #include <functional>
- #include <stdexcept>
- class Thread_pool {
- public:
- //构造函数, 会自动初始化 threads_size 数量的线程
- Thread_pool(size_t threads_size);
- //构造函数,没有初始化的,后续需要调用init才能正常使用
- Thread_pool();
- //初始化,初始化 threads_size 数量的线程
- void thread_pool_init(size_t threads_size);
- //反初始化
- void thread_pool_uninit();
- //往线程池添加执行任务, 之后会唤醒一个线程去执行他.
- //input: F&& f 函数指针(函数名)
- //input: Args&&... args 函数的参数, 自定义
- template<class F, class... Args>
- auto enqueue(F&& f, Args&&... args)
- -> std::future<typename std::result_of<F(Args...)>::type>;
- ~Thread_pool();
- //判断线程池是否超载
- bool thread_is_full_load();
- private:
- // 线程数组
- std::vector< std::thread > workers;
- //每个线程的工作状态, true:线程正在执行任务, false:线程空闲等待
- std::vector< bool > working_flag_vector;
- // 任务函数 队列, 里面存入的是任务函数的指针
- std::queue< std::function<void()> > tasks;
-
- // 线程锁和条件变量
- std::mutex queue_mutex;
- std::condition_variable condition;
- //终止标志位
- bool stop;
- };
- //构造函数, 会自动初始化 threads_size 数量的线程
- inline Thread_pool::Thread_pool(size_t threads_size)
- : stop(false)
- {
- //每个线程的工作状态
- for(size_t i = 0;i<threads_size;++i)
- {
- working_flag_vector.push_back(false);
- }
- //初始化 threads_size 数量的线程
- for(size_t i = 0;i<threads_size;++i)
- workers.emplace_back(
- [i,this] //每个线程的执行的基本函数,
- {
- for(;;)
- {
- std::function<void()> task;
- {
- std::unique_lock<std::mutex> lock(this->queue_mutex);
- this->working_flag_vector[i] = false;//线程等待
- this->condition.wait(lock,
- [this] //线程等待的判断函数
- { return this->stop || !this->tasks.empty(); });
- if (this->stop )//&& this->tasks.empty()) //这里修改了, 不需要把任务池都执行完才退出, stop之后就可以退了.
- {
- return;//只有在终止标志位true, 那么就退出线程执行函数
- }
- this->working_flag_vector[i] = true;//线程工作
- //从 任务池 里面取出 执行函数
- task = std::move(this->tasks.front());
- this->tasks.pop();
- }
- //运行执行函数
- task();
- }
- }
- );
- }
- //构造函数,没有初始化的,后续需要调用init才能正常使用
- inline Thread_pool::Thread_pool()
- : stop(false)
- {
- }
- //初始化,初始化 threads_size 数量的线程
- inline void Thread_pool::thread_pool_init(size_t threads_size)
- {
- stop = false;
- //每个线程的工作状态
- for(size_t i = 0;i<threads_size;++i)
- {
- working_flag_vector.push_back(false);
- }
- //初始化 threads_size 数量的线程
- for(size_t i = 0;i<threads_size;++i)
- workers.emplace_back(
- [i,this] //每个线程的执行的基本函数,
- {
- for(;;)
- {
- std::function<void()> task;
- {
- std::unique_lock<std::mutex> lock(this->queue_mutex);
- this->working_flag_vector[i] = false;//线程等待
- this->condition.wait(lock,
- [this] //线程等待的判断函数
- { return this->stop || !this->tasks.empty(); });
- if (this->stop )//&& this->tasks.empty()) //这里修改了, 不需要把任务池都执行完才退出, stop之后就可以退了.
- {
- return;//只有在终止标志位true, 那么就退出线程执行函数
- }
- this->working_flag_vector[i] = true;//线程工作
- //从 任务池 里面取出 执行函数
- task = std::move(this->tasks.front());
- this->tasks.pop();
- }
- //运行执行函数
- task();
- }
- }
- );
- }
- //反初始化
- inline void Thread_pool::thread_pool_uninit()
- {
- {
- std::unique_lock<std::mutex> lock(queue_mutex);
- stop = true;
- }
- condition.notify_all();
- for (auto iter = workers.begin(); iter != workers.end(); )
- {
- iter->join();
- iter = workers.erase(iter);
- }
- working_flag_vector.clear();
- }
- //往线程池添加执行任务, 之后会唤醒一个线程去执行他.
- //input: F&& f 函数指针(函数名)
- //input: Args&&... args 函数的参数, 自定义
- //注注注注注意了::::: res是enqueue的返回值, 由于线程异步, 使用future, 可以返回未来的一个值,
- // 在子线程执行完成之后, 将结果返回给外部主线程
- // 外部主线程 调用时, 必须使用 std::future<return_type> 格式去接受
- template<class F, class... Args>
- auto Thread_pool::enqueue(F&& f, Args&&... args)
- -> std::future<typename std::result_of<F(Args...)>::type>
- {
- using return_type = typename std::result_of<F(Args...)>::type;
- auto task = std::make_shared< std::packaged_task<return_type()> >(
- std::bind(std::forward<F>(f), std::forward<Args>(args)...)
- );
- std::future<return_type> res = task->get_future();
- {
- std::unique_lock<std::mutex> lock(queue_mutex);
- // don't allow enqueueing after stopping the pool
- if(stop)
- throw std::runtime_error("enqueue on stopped Thread_pool");
- tasks.emplace([task](){ (*task)(); });
- }
- condition.notify_one();
- return res;
- }
- // the destructor joins all threads
- inline Thread_pool::~Thread_pool()
- {
- {
- std::unique_lock<std::mutex> lock(queue_mutex);
- stop = true;
- }
- condition.notify_all();
- for(std::thread &worker: workers)
- { worker.join();}
- }
- //判断线程池是否超载
- inline bool Thread_pool::thread_is_full_load()
- {
- //只要有一个线程wait, 那么就认为没有超载,
- std::unique_lock<std::mutex> lock(queue_mutex);
- bool result = true;
- for(bool t_working_flag: working_flag_vector)
- {
- if ( !t_working_flag )
- {
- result = false;
- }
- }
- return result;
- }
- #endif
|