|
@@ -0,0 +1,339 @@
|
|
|
+
|
|
|
+
|
|
|
+/*
|
|
|
+ * (1)这个实现要求构建工具支持C++11的atomic mutex condition_veriable功能。这是C++11的基础特性,一般2011年以后的C++编译器都能支持。 例如,visual studio 2012以上。
|
|
|
+
|
|
|
+(2)这个类的实现中有两处使用了unique_lock而不是lock_guard,这是data_cond.wait所需要的,unique_lock是lock_guard的增强版。
|
|
|
+
|
|
|
+通过std::move的使用(前提是我们实现的类型T定义了移动构造函数和移动赋值函数),能利用移动语义带来的性能优势。
|
|
|
+
|
|
|
+使用shared_ptr<T>返回元素,用户无需释放元素的内存。
|
|
|
+
|
|
|
+
|
|
|
+原文链接:https://blog.csdn.net/weixin_41855721/article/details/81703818
|
|
|
+ 增加了一些功能函数,
|
|
|
+ 补充了注释说明
|
|
|
+
|
|
|
+ termination_queue
|
|
|
+ // 在退出状态下,所有的功能函数不可用,返回false或者null。
|
|
|
+ // wait_and_pop不会阻塞。让其直接通过,通过后直接return,不允许做其他的。
|
|
|
+
|
|
|
+ pop系列函数
|
|
|
+ //(1)没有调用termination时,每调用一次出队一个元素,直到队列为空本方法阻塞线程。
|
|
|
+ //(2)在调用了termination后,本方法永不阻塞,如果原本已经处于阻塞状态,解除阻塞状态。
|
|
|
+ //(3)返回true时,value值有效。返回false时,value值无效。调用了termination且队列为空时返回false.
|
|
|
+
|
|
|
+注注注注注意了:模板类不支持分离编译。 模板类的实现必须放在头文件
|
|
|
+ 为了方便阅读和编程规范,依然将声明和实现分开,就像是把cpp文件的代码复制到h文件的尾部。
|
|
|
+
|
|
|
+ 如果将实现放到cpp里面,那么就要为cpp文件加 ifndef define endif 防止重定义。
|
|
|
+ 然后在调用方include包含cpp文件,但是这样不好。
|
|
|
+
|
|
|
+
|
|
|
+ * */
|
|
|
+
|
|
|
+#ifndef LIDARMEASURE_THREAD_SAFE_QUEUE_H
|
|
|
+#define LIDARMEASURE_THREAD_SAFE_QUEUE_H
|
|
|
+
|
|
|
+#include <queue>
|
|
|
+
|
|
|
+#include <atomic>
|
|
|
+#include <mutex>
|
|
|
+#include <condition_variable>
|
|
|
+
|
|
|
+
|
|
|
+template<class T>
|
|
|
+class Thread_safe_queue
|
|
|
+{
|
|
|
+public:
|
|
|
+ Thread_safe_queue();
|
|
|
+ Thread_safe_queue(const Thread_safe_queue& other);
|
|
|
+ ~Thread_safe_queue();
|
|
|
+
|
|
|
+ //(1)没有调用termination时,每调用一次出队一个元素,直到队列为空本方法阻塞线程。
|
|
|
+ //(2)在调用了termination后,本方法永不阻塞,如果原本已经处于阻塞状态,解除阻塞状态。
|
|
|
+ //(3)返回true时,value值有效。返回false时,value值无效。调用了termination且队列为空时返回false.
|
|
|
+
|
|
|
+ //等待并弹出数据,成功弹出则返回true
|
|
|
+ // 队列为空则无限等待,termination终止队列,则返回false
|
|
|
+ bool wait_and_pop(T& value);
|
|
|
+ //尝试弹出数据,成功弹出则返回true
|
|
|
+ //队列为空 或者 termination终止队列,返回false
|
|
|
+ bool try_pop(T& value);
|
|
|
+ //等待并弹出数据,成功弹出则返回true
|
|
|
+ // 队列为空则无限等待,termination终止队列,则返回false
|
|
|
+ std::shared_ptr<T> wait_and_pop();
|
|
|
+ //尝试弹出数据,成功弹出则返回true
|
|
|
+ //队列为空 或者 termination终止队列,返回false
|
|
|
+ std::shared_ptr<T> try_pop();
|
|
|
+ //插入一项,并唤醒一个线程,
|
|
|
+ //如果成功插入,则返回true, 失败则返回false
|
|
|
+ //注:只能唤醒一个线程,防止多线程误判empty()
|
|
|
+ bool push(T new_value);
|
|
|
+ //清除队列,只是将队列的实例抛出。T是实例内存,系统自动回收的。
|
|
|
+ bool clear();
|
|
|
+ //清除队列,抛出之后还要delete指针。T是动态内存,需要手动回收的。
|
|
|
+ bool clear_and_delete();
|
|
|
+
|
|
|
+public:
|
|
|
+ //判空
|
|
|
+ bool empty();
|
|
|
+ //获取队列大小
|
|
|
+ size_t size();
|
|
|
+ //设置队列为退出状态。并唤醒所有的线程,使其通过wait
|
|
|
+ // 在退出状态下,所有的功能函数不可用,必须直接返回false或者null。
|
|
|
+ // wait_and_pop不会阻塞。让其直接通过,通过后直接return,不允许做其他的。
|
|
|
+ void termination_queue();
|
|
|
+ //唤醒队列,恢复所有的功能函数。wait_and_pop会继续阻塞。
|
|
|
+ void wake_queue();
|
|
|
+ //获取退出状态
|
|
|
+ bool get_termination_flag();
|
|
|
+ //判断是否可以直接通过wait, m_data_queue不为空或者m_termination终止时都可以通过等待。
|
|
|
+ bool is_pass();
|
|
|
+
|
|
|
+protected:
|
|
|
+ std::mutex m_mutex; //队列的锁
|
|
|
+ std::queue<std::shared_ptr<T>> m_data_queue; //队列数据,使用智能指针shared_ptr
|
|
|
+ std::condition_variable m_data_cond; //条件变量
|
|
|
+ std::atomic<bool> m_termination_flag; //终止标志位
|
|
|
+
|
|
|
+private:
|
|
|
+
|
|
|
+
|
|
|
+};
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+template<class T>
|
|
|
+Thread_safe_queue<T>::Thread_safe_queue()
|
|
|
+{
|
|
|
+ m_termination_flag = false;
|
|
|
+}
|
|
|
+template<class T>
|
|
|
+Thread_safe_queue<T>::Thread_safe_queue(const Thread_safe_queue& other)
|
|
|
+{
|
|
|
+ std::unique_lock<std::mutex> lock_this(m_mutex);
|
|
|
+ std::unique_lock<std::mutex> lock_other(other.m_mutex);
|
|
|
+ m_data_queue = other.data_queue;
|
|
|
+ m_termination_flag = other.m_termination_flag;
|
|
|
+}
|
|
|
+template<class T>
|
|
|
+Thread_safe_queue<T>::~Thread_safe_queue()
|
|
|
+{
|
|
|
+ //析构时,终止队列,让线程通过等待,方便线程推出。
|
|
|
+ termination_queue();
|
|
|
+}
|
|
|
+
|
|
|
+//(1)没有调用termination时,每调用一次出队一个元素,直到队列为空本方法阻塞线程。
|
|
|
+//(2)在调用了termination后,本方法永不阻塞,如果原本已经处于阻塞状态,解除阻塞状态。
|
|
|
+//(3)返回true时,value值有效。返回false时,value值无效。调用了termination且队列为空时返回false.
|
|
|
+
|
|
|
+//等待并弹出数据,成功弹出则返回true
|
|
|
+// 队列为空则无限等待,termination终止队列,则返回false
|
|
|
+template<class T>
|
|
|
+bool Thread_safe_queue<T>::wait_and_pop(T& value)
|
|
|
+{
|
|
|
+ if ( m_termination_flag )
|
|
|
+ {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ std::unique_lock<std::mutex> lk(m_mutex);
|
|
|
+ //无限等待,一直阻塞,除非有新的数据加入或者终止队列
|
|
|
+ m_data_cond.wait(lk, [this]
|
|
|
+ { return ((!m_data_queue.empty()) || m_termination_flag); });
|
|
|
+ if (m_termination_flag)
|
|
|
+ {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ value = move(*m_data_queue.front());
|
|
|
+ m_data_queue.pop();
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+//尝试弹出数据,成功弹出则返回true
|
|
|
+//队列为空 或者 termination终止队列,返回false
|
|
|
+template<class T>
|
|
|
+bool Thread_safe_queue<T>::try_pop(T& value)
|
|
|
+{
|
|
|
+ if ( m_termination_flag )
|
|
|
+ {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ std::unique_lock<std::mutex> lk(m_mutex);
|
|
|
+ if (m_data_queue.empty())
|
|
|
+ {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ value = move(*m_data_queue.front());
|
|
|
+ m_data_queue.pop();
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+//等待并弹出数据,成功弹出则返回true
|
|
|
+// 队列为空则无限等待,termination终止队列,则返回false
|
|
|
+template<class T>
|
|
|
+std::shared_ptr<T> Thread_safe_queue<T>::wait_and_pop()
|
|
|
+{
|
|
|
+ if ( m_termination_flag )
|
|
|
+ {
|
|
|
+ return NULL;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ std::unique_lock<std::mutex> lk(m_mutex);
|
|
|
+ //无限等待,一直阻塞,除非有新的数据加入或者终止队列
|
|
|
+ m_data_cond.wait(lk, [this]
|
|
|
+ { return ((!m_data_queue.empty()) || m_termination_flag); });
|
|
|
+ if (m_termination_flag)
|
|
|
+ {
|
|
|
+ return NULL;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ std::shared_ptr<T> res = m_data_queue.front();
|
|
|
+ m_data_queue.pop();
|
|
|
+ return res;
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+//尝试弹出数据,成功弹出则返回true
|
|
|
+//队列为空 或者 termination终止队列,返回false
|
|
|
+template<class T>
|
|
|
+std::shared_ptr<T> Thread_safe_queue<T>::try_pop()
|
|
|
+{
|
|
|
+ if ( m_termination_flag )
|
|
|
+ {
|
|
|
+ return NULL;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ std::unique_lock<std::mutex> lk(m_mutex);
|
|
|
+ if (m_data_queue.empty())
|
|
|
+ {
|
|
|
+ return NULL;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ std::shared_ptr<T> res = m_data_queue.front();
|
|
|
+ m_data_queue.pop();
|
|
|
+ return res;
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+//插入一项,并唤醒一个线程,
|
|
|
+//如果成功插入,则返回true, 失败则返回false
|
|
|
+//注:只能唤醒一个线程,防止多线程误判empty()
|
|
|
+template<class T>
|
|
|
+bool Thread_safe_queue<T>::push(T new_value)
|
|
|
+{
|
|
|
+ if (m_termination_flag)
|
|
|
+ {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ std::shared_ptr<T> data(std::make_shared<T>(move(new_value)));
|
|
|
+ std::unique_lock<std::mutex> lk(m_mutex);
|
|
|
+ m_data_queue.push(data);
|
|
|
+ m_data_cond.notify_one();
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+}
|
|
|
+//清除队列,只是将队列的实例抛出。T是实例内存,系统自动回收的。
|
|
|
+template<class T>
|
|
|
+bool Thread_safe_queue<T>::clear()
|
|
|
+{
|
|
|
+ std::unique_lock<std::mutex> lk(m_mutex);
|
|
|
+ while (!m_data_queue.empty())
|
|
|
+ {
|
|
|
+ m_data_queue.pop();
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+
|
|
|
+}
|
|
|
+//清除队列,抛出之后还要delete指针。T是动态内存,需要手动回收的。
|
|
|
+template<class T>
|
|
|
+bool Thread_safe_queue<T>::clear_and_delete()
|
|
|
+{
|
|
|
+ std::unique_lock<std::mutex> lk(m_mutex);
|
|
|
+ while (!m_data_queue.empty())
|
|
|
+ {
|
|
|
+ T res = NULL;
|
|
|
+ res = move(*m_data_queue.front());
|
|
|
+ m_data_queue.pop();
|
|
|
+ if(res != NULL)
|
|
|
+ {
|
|
|
+ delete(res);
|
|
|
+
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
+//判空
|
|
|
+template<class T>
|
|
|
+bool Thread_safe_queue<T>::empty()
|
|
|
+{
|
|
|
+ std::unique_lock<std::mutex> lk(m_mutex);
|
|
|
+ return m_data_queue.empty();
|
|
|
+}
|
|
|
+//获取队列大小
|
|
|
+template<class T>
|
|
|
+size_t Thread_safe_queue<T>::size()
|
|
|
+{
|
|
|
+ std::unique_lock<std::mutex> lk(m_mutex);
|
|
|
+ return m_data_queue.size();
|
|
|
+}
|
|
|
+//设置队列为退出状态。并唤醒所有的线程,使其通过wait
|
|
|
+// 在退出状态下,所有的功能函数不可用,必须直接返回false或者null。
|
|
|
+// wait_and_pop不会阻塞。让其直接通过,通过后直接return,不允许做其他的。
|
|
|
+template<class T>
|
|
|
+void Thread_safe_queue<T>::termination_queue()
|
|
|
+{
|
|
|
+ std::unique_lock<std::mutex> lk(m_mutex);
|
|
|
+ m_termination_flag = true;
|
|
|
+ m_data_cond.notify_all();
|
|
|
+}
|
|
|
+//唤醒队列,恢复所有的功能函数。wait_and_pop会继续阻塞。
|
|
|
+template<class T>
|
|
|
+void Thread_safe_queue<T>::wake_queue()
|
|
|
+{
|
|
|
+ std::unique_lock<std::mutex> lk(m_mutex);
|
|
|
+ m_termination_flag = false;
|
|
|
+ m_data_cond.notify_all();
|
|
|
+}
|
|
|
+//获取退出状态
|
|
|
+template<class T>
|
|
|
+bool Thread_safe_queue<T>::get_termination_flag()
|
|
|
+{
|
|
|
+ return m_termination_flag;
|
|
|
+}
|
|
|
+//判断是否可以直接通过wait, m_data_queue不为空或者m_termination终止时都可以通过等待。
|
|
|
+template<class T>
|
|
|
+bool Thread_safe_queue<T>::is_pass()
|
|
|
+{
|
|
|
+ return (!m_data_queue.empty() || m_termination_flag);
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+#endif //LIDARMEASURE_THREAD_SAFE_QUEUE_H
|