/* * (1)这个实现要求构建工具支持C++11的atomic mutex condition_veriable功能。这是C++11的基础特性,一般2011年以后的C++编译器都能支持。 例如,visual studio 2012以上。 (2)这个类的实现中有两处使用了unique_lock而不是lock_guard,这是data_cond.wait所需要的,unique_lock是lock_guard的增强版。 通过std::move的使用(前提是我们实现的类型T定义了移动构造函数和移动赋值函数),能利用移动语义带来的性能优势。 使用shared_ptr返回元素,用户无需释放元素的内存。 原文链接:https://blog.csdn.net/weixin_41855721/article/details/81703818 增加了一些功能函数, 补充了注释说明 termination_queue // 在退出状态下,所有的功能函数不可用,返回false或者null。 // wait_and_pop不会阻塞。让其直接通过,通过后直接return,不允许做其他的。 pop系列函数 //(1)没有调用termination时,每调用一次出队一个元素,直到队列为空本方法阻塞线程。 //(2)在调用了termination后,本方法永不阻塞,如果原本已经处于阻塞状态,解除阻塞状态。 //(3)返回true时,value值有效。返回false时,value值无效。调用了termination且队列为空时返回false. 注注注注注意了:模板类不支持分离编译。 模板类的实现必须放在头文件 为了方便阅读和编程规范,依然将声明和实现分开,就像是把cpp文件的代码复制到h文件的尾部。 如果将实现放到cpp里面,那么就要为cpp文件加 ifndef define endif 防止重定义。 然后在调用方include包含cpp文件,但是这样不好。 * */ #ifndef LIDARMEASURE_THREAD_SAFE_QUEUE_H #define LIDARMEASURE_THREAD_SAFE_QUEUE_H #include #include #include #include template class Thread_safe_queue { public: Thread_safe_queue(); Thread_safe_queue(const Thread_safe_queue &other); ~Thread_safe_queue(); //(1)没有调用termination时,每调用一次出队一个元素,直到队列为空本方法阻塞线程。 //(2)在调用了termination后,本方法永不阻塞,如果原本已经处于阻塞状态,解除阻塞状态。 //(3)返回true时,value值有效。返回false时,value值无效。调用了termination且队列为空时返回false. //等待并弹出数据,成功弹出则返回true // 队列为空则无限等待,termination终止队列,则返回false bool wait_and_pop(T &value); //尝试弹出数据,成功弹出则返回true //队列为空 或者 termination终止队列,返回false bool try_pop(T &value); //等待并弹出数据,成功弹出则返回true // 队列为空则无限等待,termination终止队列,则返回false std::shared_ptr wait_and_pop(); //尝试弹出数据,成功弹出则返回true //队列为空 或者 termination终止队列,返回false std::shared_ptr try_pop(); //插入一项,并唤醒一个线程, //如果成功插入,则返回true, 失败则返回false //注:只能唤醒一个线程,防止多线程误判empty() bool push(T new_value); //清除队列,只是将队列的实例抛出。T是实例内存,系统自动回收的。 bool clear(); //清除队列,抛出之后还要delete指针。T是动态内存,需要手动回收的。 bool clear_and_delete(); public: //判空 bool empty(); //获取队列大小 size_t size(); //设置队列为退出状态。并唤醒所有的线程,使其通过wait // 在退出状态下,所有的功能函数不可用,必须直接返回false或者null。 // wait_and_pop不会阻塞。让其直接通过,通过后直接return,不允许做其他的。 void termination_queue(); //唤醒队列,恢复所有的功能函数。wait_and_pop会继续阻塞。 void wake_queue(); //获取退出状态 bool get_termination_flag(); //判断是否可以直接通过wait, m_data_queue不为空或者m_termination终止时都可以通过等待。 bool is_pass(); protected: std::mutex m_mutex; //队列的锁 std::queue> m_data_queue; //队列数据,使用智能指针shared_ptr std::condition_variable m_data_cond; //条件变量 std::atomic m_termination_flag; //终止标志位 private: }; template Thread_safe_queue::Thread_safe_queue() { m_termination_flag = false; } template Thread_safe_queue::Thread_safe_queue(const Thread_safe_queue &other) { std::unique_lock lock_this(m_mutex); std::unique_lock lock_other(other.m_mutex); m_data_queue = other.data_queue; m_termination_flag = other.m_termination_flag; } template Thread_safe_queue::~Thread_safe_queue() { //析构时,终止队列,让线程通过等待,方便线程推出。 termination_queue(); } //(1)没有调用termination时,每调用一次出队一个元素,直到队列为空本方法阻塞线程。 //(2)在调用了termination后,本方法永不阻塞,如果原本已经处于阻塞状态,解除阻塞状态。 //(3)返回true时,value值有效。返回false时,value值无效。调用了termination且队列为空时返回false. //等待并弹出数据,成功弹出则返回true // 队列为空则无限等待,termination终止队列,则返回false template bool Thread_safe_queue::wait_and_pop(T &value) { if (m_termination_flag) { return false; } else { std::unique_lock lk(m_mutex); //无限等待,一直阻塞,除非有新的数据加入或者终止队列 m_data_cond.wait(lk, [this] { return ((!m_data_queue.empty()) || m_termination_flag); }); if (m_termination_flag) { return false; } else { value = std::move(*m_data_queue.front()); m_data_queue.pop(); return true; } } } //尝试弹出数据,成功弹出则返回true //队列为空 或者 termination终止队列,返回false template bool Thread_safe_queue::try_pop(T &value) { if (m_termination_flag) { return false; } else { std::unique_lock lk(m_mutex); if (m_data_queue.empty()) { return false; } else { value = std::move(*m_data_queue.front()); m_data_queue.pop(); return true; } } } //等待并弹出数据,成功弹出则返回true // 队列为空则无限等待,termination终止队列,则返回false template std::shared_ptr Thread_safe_queue::wait_and_pop() { if (m_termination_flag) { return NULL; } else { std::unique_lock lk(m_mutex); //无限等待,一直阻塞,除非有新的数据加入或者终止队列 m_data_cond.wait(lk, [this] { return ((!m_data_queue.empty()) || m_termination_flag); }); if (m_termination_flag) { return NULL; } else { std::shared_ptr res = m_data_queue.front(); m_data_queue.pop(); return res; } } } //尝试弹出数据,成功弹出则返回true //队列为空 或者 termination终止队列,返回false template std::shared_ptr Thread_safe_queue::try_pop() { if (m_termination_flag) { return NULL; } else { std::unique_lock lk(m_mutex); if (m_data_queue.empty()) { return NULL; } else { std::shared_ptr res = m_data_queue.front(); m_data_queue.pop(); return res; } } } //插入一项,并唤醒一个线程, //如果成功插入,则返回true, 失败则返回false //注:只能唤醒一个线程,防止多线程误判empty() template bool Thread_safe_queue::push(T new_value) { if (m_termination_flag) { return false; } else { std::shared_ptr data(std::make_shared(std::move(new_value))); std::unique_lock lk(m_mutex); m_data_queue.push(data); m_data_cond.notify_one(); return true; } } //清除队列,只是将队列的实例抛出。T是实例内存,系统自动回收的。 template bool Thread_safe_queue::clear() { std::unique_lock lk(m_mutex); while (!m_data_queue.empty()) { m_data_queue.pop(); } return true; } //清除队列,抛出之后还要delete指针。T是动态内存,需要手动回收的。 template bool Thread_safe_queue::clear_and_delete() { std::unique_lock lk(m_mutex); while (!m_data_queue.empty()) { T res = NULL; res = std::move(*m_data_queue.front()); m_data_queue.pop(); if (res != NULL) { delete (res); } } return true; } //判空 template bool Thread_safe_queue::empty() { std::unique_lock lk(m_mutex); return m_data_queue.empty(); } //获取队列大小 template size_t Thread_safe_queue::size() { std::unique_lock lk(m_mutex); return m_data_queue.size(); } //设置队列为退出状态。并唤醒所有的线程,使其通过wait // 在退出状态下,所有的功能函数不可用,必须直接返回false或者null。 // wait_and_pop不会阻塞。让其直接通过,通过后直接return,不允许做其他的。 template void Thread_safe_queue::termination_queue() { std::unique_lock lk(m_mutex); m_termination_flag = true; m_data_cond.notify_all(); } //唤醒队列,恢复所有的功能函数。wait_and_pop会继续阻塞。 template void Thread_safe_queue::wake_queue() { std::unique_lock lk(m_mutex); m_termination_flag = false; m_data_cond.notify_all(); } //获取退出状态 template bool Thread_safe_queue::get_termination_flag() { return m_termination_flag; } //判断是否可以直接通过wait, m_data_queue不为空或者m_termination终止时都可以通过等待。 template bool Thread_safe_queue::is_pass() { return (!m_data_queue.empty() || m_termination_flag); } #endif //LIDARMEASURE_THREAD_SAFE_QUEUE_H