/* * Thread_pool 线程池, * * */ //例如 // Thread_pool thread_pool(4); // std::future x = thread_pool.enqueue( []{return 0;} ); // std::cout << x.get() << std::endl; //例如 /* Thread_pool pool(4); std::vector< std::future > results; for(int i = 0; i < 8; ++i) { results.emplace_back( pool.enqueue([i] { std::cout << "hello " << i << std::endl; std::this_thread::sleep_for(std::chrono::seconds(1)); std::cout << "world " << i << std::endl; return i*i; }) ); } for(auto && result: results) std::cout << result.get() << ' '; std::cout << std::endl; return 0; */ #ifndef THREAD_POOL_H #define THREAD_POOL_H #include #include #include #include #include #include #include #include #include class Thread_pool { public: //构造函数, 会自动初始化 threads_size 数量的线程 Thread_pool(size_t threads_size); //构造函数,没有初始化的,后续需要调用init才能正常使用 Thread_pool(); //初始化,初始化 threads_size 数量的线程 void thread_pool_init(size_t threads_size); //反初始化 void thread_pool_uninit(); //往线程池添加执行任务, 之后会唤醒一个线程去执行他. //input: F&& f 函数指针(函数名) //input: Args&&... args 函数的参数, 自定义 template auto enqueue(F&& f, Args&&... args) -> std::future::type>; ~Thread_pool(); //判断线程池是否超载 bool thread_is_full_load(); private: // 线程数组 std::vector< std::thread > workers; //每个线程的工作状态, true:线程正在执行任务, false:线程空闲等待 std::vector< bool > working_flag_vector; // 任务函数 队列, 里面存入的是任务函数的指针 std::queue< std::function > tasks; // 线程锁和条件变量 std::mutex queue_mutex; std::condition_variable condition; //终止标志位 bool stop; }; //构造函数, 会自动初始化 threads_size 数量的线程 inline Thread_pool::Thread_pool(size_t threads_size) : stop(false) { //每个线程的工作状态 for(size_t i = 0;i task; { std::unique_lock lock(this->queue_mutex); this->working_flag_vector[i] = false;//线程等待 this->condition.wait(lock, [this] //线程等待的判断函数 { return this->stop || !this->tasks.empty(); }); if (this->stop )//&& this->tasks.empty()) //这里修改了, 不需要把任务池都执行完才退出, stop之后就可以退了. { return;//只有在终止标志位true, 那么就退出线程执行函数 } this->working_flag_vector[i] = true;//线程工作 //从 任务池 里面取出 执行函数 task = std::move(this->tasks.front()); this->tasks.pop(); } //运行执行函数 task(); } } ); } //构造函数,没有初始化的,后续需要调用init才能正常使用 inline Thread_pool::Thread_pool() : stop(false) { } //初始化,初始化 threads_size 数量的线程 inline void Thread_pool::thread_pool_init(size_t threads_size) { stop = false; //每个线程的工作状态 for(size_t i = 0;i task; { std::unique_lock lock(this->queue_mutex); this->working_flag_vector[i] = false;//线程等待 this->condition.wait(lock, [this] //线程等待的判断函数 { return this->stop || !this->tasks.empty(); }); if (this->stop )//&& this->tasks.empty()) //这里修改了, 不需要把任务池都执行完才退出, stop之后就可以退了. { return;//只有在终止标志位true, 那么就退出线程执行函数 } this->working_flag_vector[i] = true;//线程工作 //从 任务池 里面取出 执行函数 task = std::move(this->tasks.front()); this->tasks.pop(); } //运行执行函数 task(); } } ); } //反初始化 inline void Thread_pool::thread_pool_uninit() { { std::unique_lock lock(queue_mutex); stop = true; } condition.notify_all(); for (auto iter = workers.begin(); iter != workers.end(); ) { iter->join(); iter = workers.erase(iter); } working_flag_vector.clear(); } //往线程池添加执行任务, 之后会唤醒一个线程去执行他. //input: F&& f 函数指针(函数名) //input: Args&&... args 函数的参数, 自定义 //注注注注注意了::::: res是enqueue的返回值, 由于线程异步, 使用future, 可以返回未来的一个值, // 在子线程执行完成之后, 将结果返回给外部主线程 // 外部主线程 调用时, 必须使用 std::future 格式去接受 template auto Thread_pool::enqueue(F&& f, Args&&... args) -> std::future::type> { using return_type = typename std::result_of::type; auto task = std::make_shared< std::packaged_task >( std::bind(std::forward(f), std::forward(args)...) ); std::future res = task->get_future(); { std::unique_lock lock(queue_mutex); // don't allow enqueueing after stopping the pool if(stop) throw std::runtime_error("enqueue on stopped Thread_pool"); tasks.emplace([task](){ (*task)(); }); } condition.notify_one(); return res; } // the destructor joins all threads inline Thread_pool::~Thread_pool() { { std::unique_lock lock(queue_mutex); stop = true; } condition.notify_all(); for(std::thread &worker: workers) { worker.join();} } //判断线程池是否超载 inline bool Thread_pool::thread_is_full_load() { //只要有一个线程wait, 那么就认为没有超载, std::unique_lock lock(queue_mutex); bool result = true; for(bool t_working_flag: working_flag_vector) { if ( !t_working_flag ) { result = false; } } return result; } #endif