

c++标准库:并发(四) —— 条件变量 std::condition_variable
source link: https://www.zoucz.com/blog/2021/06/09/83619610-c87d-11eb-9fe7-534bbf9f369d/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

c++标准库:并发(四) —— 条件变量 std::condition_variable
接上篇 c++标准库:并发(三) —— 锁 mutex 和 lock 我要说话
在上篇关于 std::unique_lock
的小节中,有一个用 std::unique_lock
不断循环加锁、检查条件、释放锁、sleep 来判断一个前置条件是否准备完成的例子。
例子中有一个很关键的点,在准备工作时间无法准确预估的情况下,sleep的时间到底设置多少为合适呢? 设置短了太娘炮,设置长了扯着蛋… 要么sleep过久造成程序执行反应很慢, 要么sleep过短而将CPU性能大量的浪费在检查工作上。
基于这个需求,标准库提供了 std::condition_variable
条件变量,来让我们: 我要说话
- 初始化一个条件变量
condVar
- 通过
condVar.wait
操作使线程进入无限期的阻塞(wait)状态,和sleep状态一样不浪费CPU在检查工作上。多个线程可使用同一个条件变量进行wait
- 通过
condVar.notify
操作让一个或所有使用此条件变量的线程,被唤醒而进入工作状态
上面是条件变量的大致执行逻辑,《C++标准库》这本书中并没有对细节进行解释。下面我写了一些demo,可以来看看细节。 我要说话
下面是一个最基本的使用condition_variable的例子
我要说话
bool readyFlag = false;
std::mutex readyFlagMutex;
std::condition_variable readyCondVar;
//sleep一段时间,将readyFlag设置为true,然后触发一次notify_one
void prepareThread(){
std::this_thread::sleep_for(std::chrono::seconds(5));
{
std::lock_guard<std::mutex> lg(readyFlagMutex);
std::cout << " 1. prepare get lock" << std::endl;
readyFlag = true;
readyCondVar.notify_one();
std::cout << " 2. prepare call notify and sleep " << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(10));
}
}
//通过条件变量的wait让线程进入阻塞状态,通过一个unique_lock,来实现收到notify之后执行的 锁定-检查-解锁逻辑
void waitForPrepareThread(){
{
//这里一定要使用 unique_lock,不能使用lock_guard
std::unique_lock<std::mutex> ul(readyFlagMutex);
readyCondVar.wait(ul);
std::cout << " 3. condition_variable check pass... " << std::endl;
}
std::cout << " 4. prepare job done, run something! " << std::endl;
}
void testConditionVariable(){
std::thread t1(prepareThread);
std::thread t2(waitForPrepareThread);
t1.join();
t2.join();
};
//输出:
// 1. prepare get lock
// 2. prepare call notify and sleep
// 3. condition_variable check pass...
// 4. prepare job done, run something!
我要说话
分析这个case,普通的readyCondVar.wait(ul)
执行的过程可以理解为下面几个步骤: 我要说话
- 等待获取锁
- 进入阻塞状态并立即释放锁
- 阻塞,直到线程被一个
notify_*
唤醒 - 等待获取锁
- 执行后续逻辑
wait(ul, predicate)
用到条件变量的场景,一把是像上面的例子中一样,某个条件ready后就notify一下,通知另一个线程继续执行。看起来很美,对吧?
很不幸的是,实际程序执行的时候,某些情况下,条件变量会出现“假唤醒”的状态。也就是说,会存在莫名其妙程序自动调了一下 notify_*
。此时如果程序直接执行下去,就很容易因为条件还未就绪而崩掉。
所以我们需要执行一个检查条件是否真的就绪的工作,而且这个检查工作本身也得加锁,以防程序因假唤醒挂掉。例如: 我要说话
{
//加锁、判断
std::unique_lock<std::mutex> ul(readyFlagMutex);
while(!readyFlag){
//readyFlag为false时,由wait进入休眠
readyCondVar.wait(ul);
}
//readyFlag为true时,自动释放锁
}
上面的语法稍显麻烦,标准库为wait方法提供了第二个参数,可以传入一个表达式,只有它的返回结果为false时让线程进入休眠,否则直接释放锁并进入后面的流程。
我要说话
//(改写上例子中的 waitForPrepareThread )
//通过条件变量的wait让线程进入休眠状态,通过一个unique_lock,来实现收到notify之后执行的 锁定-检查-解锁逻辑
void waitForPrepareThread(){
{
//这里一定要使用 unique_lock,不能使用lock_guard
std::unique_lock<std::mutex> ul(readyFlagMutex);
//传入条件:为了处理假唤醒,多传入一个判断函数,加锁后,判断函数返回false的时候才释放锁并执行后面的逻辑,否则释放锁并进入休眠状态
readyCondVar.wait(ul, []{
//wait一般情况下只会被 condition_variable的 notify_one、 notify_all 唤醒,但是也存在一定可能性误唤醒
//这里不传入lamda做判断的话,误唤醒时实际上readyFlag并没有变成true
//所以这里再加一道check
std::cout << " condition_variable notified, check readyFlag... " << std::endl;
return readyFlag;
});
}
std::cout << " prepare job done, run something! " << std::endl;
}
//执行结果:
// condition_variable notified, check readyFlag...
// condition_variable notified, check readyFlag...
// prepare job done, run something!
我要说话
从执行结果中可以发现,条件判断函数被执行了两次,这是因为prepare函数是sleep了5s才获取锁的,所以 readyCondVar.wait
在第一时间就获取了锁,然后进行了一次判断,结果此时 readyFlag
是 false,所以条件变量让当前线程释放锁并进入阻塞状态等待。
prepare线程sleep 5s后,将readyFlag修改为true并执行notify_one。 waitForPrepareThread被notify唤醒后,再次执行条件判断函数,此时readyFlag为true,判断函数返回true,所以不再进入阻塞,直接去执行后面的逻辑。总结起来,带条件的wait,工作流程是: 我要说话
- 等待获取锁,然后进行判断
- 判断条件: 若条件为 false
– 进入阻塞状态并立即释放锁
– 阻塞,直到线程被一个notify_*
唤醒
– 等待获取锁
– 执行后续逻辑 - 判断条件: 若条件为true
– 执行后续逻辑
带条件的wait与不带条件的wait相比: 我要说话
wait(ul)
一定会让线程阻塞;wait(ul, predicate)
不一定会让线程进入阻塞- 如果在循环中使用,
wait(ul)
的线程被唤醒后,执行一次工作再次wait又会进入阻塞,它被唤醒后是“单发”的;wait(ul, predicate)
如果条件是true,会一直执行任务不阻塞,是 “连发” 的。
notify_one和notify_all
上面的例子中,条件准备就绪后是notify_one唤醒一个阻塞的线程。调用notify_one时,若此时没有用此条件变量进入阻塞状态的线程,那么notify_one没有任何作用。
标准库还提供了一个notify_all来唤醒所有阻塞的线程,我目前理解的,有两种常见的场景需要用到 notify_all 。 我要说话
事件驱动模型的场景
假如用 condition_variable 来做事件订阅,当某事件发生时,可以通过 notify_all
来通知给所有通过 wait 此 condition_variable 来阻塞的线程。
这个使用场景比较简单,就不写代码验证了。我要说话
部分生产者消费者模型
设想有一个场景: 一个生产者,在生产了100个耗时任务,有10个消费者线程通过条件变量等待任务去执行。那么wait和notify不同选择时会产生不同效果: 我要说话
生产者\消费者 wait(ul) wait(ul, predicate)
notify_one 1次 1个线程执行1个任务 1个线程执行完所有任务
notify_one 100次 10个线程各执行1个任务 10个线程共同执行完所有任务
notify_all 10个线程各执行1个任务 10个线程共同执行完所有任务
如上面的表格,要想让所有线程共同来执行所有任务,可以选择多次执行notify_one或者执行一次notify_all。
其中notify_all因为会唤醒所有线程,在生产者生产较慢而消费者较多时,容易产生不必要的锁竞争和条件判断,所以最优解是多次notify_one,但是生产者生产特征是比较集中,需要消费者共同处理的场景,直接用notify_all也可以。 我要说话
通过条件变量实现一个线程队列
理解了上面各种模式的wait和notify,下面线程队列的例子就比较简单,没什么好说的。
我要说话
std::queue<int> queue;
void provider(int val){
std::this_thread::sleep_for(std::chrono::seconds(5));
for(int i=0;i<50;i++)
{
{
std::lock_guard<std::mutex> lg(readyFlagMutex);
queue.push(val+i);
}
//用notify_one,只会唤醒一个 condition_variable.wait
readyCondVar.notify_one();
}
//用notify_all,会唤醒全部的 condition_variable.wait。
//readyCondVar.notify_all();
}
void consumer(std::string name){
int val;
while (true)
{
{
std::unique_lock<std::mutex> ul(readyFlagMutex);
//readyCondVar.wait(ul); //如果用 wait(ul)+notify_*,那么可能有几个 consunmer线程,任务就只会被执行几个
//wait(ul, predicate)可以清空队列
readyCondVar.wait(ul, [&]{
return !queue.empty();
});
std::cout << name << " notified, queue size: " << queue.size() << std::endl;
//对queue的操作需要在ul的保护范围内进行,以免造成线程安全未知错误
val = queue.front();
queue.pop();
//ul生命周期结束,锁自动释放
}
std::cout << name << " notified, get value: " << val << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
};
void testQueue(){
std::thread pro1(provider, 1);
std::thread consum1(consumer, "tom");
std::thread consum2(consumer, "lily");
std::thread consum3(consumer, "lihua");
std::thread consum4(consumer, "cat");
std::thread consum5(consumer, "dog");
pro1.join();
consum1.join();
consum2.join();
consum3.join();
consum4.join();
consum5.join();
};
我要说话
std::condition_variable_any
点开标准库的头文件可以看到,condition_variable
的 wait
的定义是
我要说话
wait(unique_lock<mutex>& __lock);
我要说话
也就是说,wait只能接收一个基于标准库互斥锁实现的 unique_lock。
假如我们想基于自己的业务特点实现一些自定义的锁,condition_variable
就有点无能为力了,此时可以使用 std::condition_variable_any
。
如下面的例子,将上面的互斥锁替换为基于原子量实现的自旋锁来实现demo。 我要说话
#include <atomic>
class SpinLock {
public:
SpinLock() : flag_(false)
{}
void lock()
{
bool expect = false;
while (!flag_.compare_exchange_weak(expect, true))
{
//这里一定要将expect复原,执行失败时expect结果是未定的
expect = false;
}
}
void unlock()
{
flag_.store(false);
}
private:
std::atomic<bool> flag_;
};
//#define LOCK_TYPE std::mutex
#define LOCK_TYPE SpinLock
如上,将 std::mutex
用自定义的自旋锁替换掉,然后使用 std::condition_variable_any
来做条件变量,可以接受 std::unique_lock<SpinLock>
来作为参数。我要说话
本文链接:https://www.zoucz.com/blog/2021/06/09/83619610-c87d-11eb-9fe7-534bbf9f369d/我要说话
☞ 参与评论我要说话
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK