使用条件变量实现线程同步

使用条件变量实现线程同步

C语言中文网:使用条件变量实现线程同步

假设一个进程中包含多个线程,这些线程共享变量 x,我们希望某个(或某些)线程等待 “x==10’ 条件成立后再执行后续的代码,该如何实现呢?

您可能想到用 while 循环实现,例如:

1
2
3
4
5
6
void* threadFun(void * args){
while(x != 10){
sleep(5);
}
// 待条件成立后,执行后续的代码
}

当线程执行此函数时,会判断 x 的值是否等于 10,如果不等则间隔 5 秒后再重复判断,直到 x 的值等于 10 ,线程才能执行后续的代码。

直观上看,while 循环确实能够阻塞线程,但这种方法存在严重的效率问题。当线程因条件不成立进入等待状态时,如果此时恰好有另一个线程将 x 的值改为 10,该线程必须等待 5 秒后才能继续执行。如果我们将等待时间缩短(或者直接将 sleep(5) 注释掉),线程将反复判断 x 的值是否等于 10,它可能会一直霸占着 CPU 资源,导致其它线程无法执行,x 变量的值会出现“长时间不改变”的情况。

针对类似的场景,我们推荐您用条件变量来实现。和互斥锁、信号量类似,==条件变量本质也是一个全局变量,它的功能是阻塞线程,直至接收到“条件成立”的信号后,被阻塞的线程才能继续执行。==

一个条件变量可以阻塞多个线程,这些线程会组成一个等待队列。当条件成立时,条件变量可以解除线程的“被阻塞状态”。也就是说,条件变量可以完成以下两项操作:

  • 阻塞线程,直至接收到“条件成立”的信号;
  • 向等待队列中的一个或所有线程发送“条件成立”的信号,解除它们的“被阻塞”状态。

为了避免多线程之间发生“抢夺资源”的问题,条件变量在使用过程中必须和一个互斥锁搭配使用。

条件变量的具体用法

POSIX 标准中,条件变量用 pthread_cond_t 类型的变量表示,此类型定义在<pthread.h>头文件中。举个例子:

1
2
#include <pthread.h>
pthread_cond_t myCond;

由此,我们就成功创建了一个条件变量。要想使用 myCond 条件变量,还需要进行初始化操作。

1) 初始化条件变量

初始化条件变量的方式有两种,一种是直接将 PTHREAD_COND_INITIALIZER 赋值给条件变量,例如:

1
pthread_cond_t myCond = PTHREAD_COND_INITIALIZER;

还可以借助 pthread_cond_init() 函数初始化条件变量,语法格式如下:

1
int pthread_cond_init(pthread_cond_t * cond, const pthread_condattr_t * attr);

参数 cond 用于指明要初始化的条件变量;参数 attr 用于自定义条件变量的属性,通常我们将它赋值为 NULL,表示以系统默认的属性完成初始化操作。

pthread_cond_init() 函数初始化成功时返回数字 0,反之函数返回非零数。

当 attr 参数为 NULL 时,以上两种初始化方式完全等价。

2) 阻塞当前线程,等待条件成立

当条件不成立时,条件变量可以阻塞当前线程,所有被阻塞的线程会构成一个等待队列。

阻塞线程可以借助以下两个函数实现:

1
2
int pthread_cond_wait(pthread_cond_t* cond, pthread_mutex_t* mutex);
int pthread_cond_timedwait(pthread_cond_t* cond, pthread_mutex_t* mutex, const struct timespec* abstime);

cond 参数表示已初始化好的条件变量;mutex 参数表示与条件变量配合使用的互斥锁;abstime 参数表示阻塞线程的时间。

注意,abstime 参数指的是绝对时间,例如您打算阻塞线程 5 秒钟,那么首先要得到当前系统的时间,然后再加上 5 秒,最终得到的时间才是传递的实参值。

调用两个函数之前,我们必须先创建好一个互斥锁并完成“加锁”操作,然后才能作为实参传递给 mutex 参数。两个函数会完成以下两项工作:

  • 阻塞线程,直至接收到“条件成立”的信号;
  • 当线程被添加到等待队列上时,将互斥锁“解锁”。

也就是说,函数尚未接收到“条件成立”的信号之前,它将一直阻塞线程执行。注意,当函数接收到“条件成立”的信号后,它并不会立即结束对线程的阻塞,而是先完成对互斥锁的“加锁”操作,然后才解除阻塞。

两个函数都以“原子操作”的方式完成“阻塞线程+解锁”或者“重新加锁+解除阻塞”这两个过程。所谓“原子操作”,即当有多个线程执行相同的某个过程时,虽然它们都会访问互斥锁和条件变量,但之间不会相互干扰。

以上两个函数都能用来阻塞线程,它们的区别在于:

  • pthread_cond_wait() 函数可以永久阻塞线程,直到条件变量成立的那一刻;
  • pthread_cond_timedwait() 函数只能在 abstime 参数指定的时间内阻塞线程,超出时限后,该函数将重新对互斥锁执行“加锁”操作,并解除对线程的阻塞,函数的返回值为 ETIMEDOUT。

如果函数成功接收到了“条件成立”的信号,重新对互斥锁完成了“加锁”并使线程继续执行,函数返回数字 0,反之则返回非零数。

POSIX 标准规定,pthread_cond_wait() 和 pthread_cond_timedwait() 函数是可以作为“取消点”的函数。当线程接收到“强制终止执行”的信号后,执行到这两个函数时,线程就会终止执行。有关强制终止执行线程和“取消点”的具体含义,您可以阅读《线程间相互终止执行,这个坑千万别踩!》一文。

3) 解除线程的“阻塞”状态

对于被 pthread_cond_wait() 或 pthread_cond_timedwait() 函数阻塞的线程,我们可以借助如下两个函数向它们发送“条件成立”的信号,解除它们的“被阻塞”状态:

1
2
int pthread_cond_signal(pthread_cond_t* cond);
int pthread_cond_broadcast(pthread_cond_t* cond);

cond 参数表示初始化好的条件变量。当函数成功解除线程的“被阻塞”状态时,返回数字 0,反之返回非零数。

两个函数都能解除线程的“被阻塞”状态,区别在于:

  • pthread_cond_signal() 函数至少解除一个线程的“被阻塞”状态,如果等待队列中包含多个线程,优先解除哪个线程将由操作系统的线程调度程序决定;
  • pthread_cond_broadcast() 函数可以解除等待队列中所有线程的“被阻塞”状态。

由于互斥锁的存在,解除阻塞后的线程也不一定能立即执行。当互斥锁处于“加锁”状态时,解除阻塞状态的所有线程会组成等待互斥锁资源的队列,等待互斥锁“解锁”。

4) 销毁条件变量

对于初始化好的条件变量,我们可以调用 pthread_cond_destory() 函数销毁它。

pthread_cond_destory() 函数的语法格式如下:

1
int pthread_cond_destroy(pthread_cond_t *cond);

cond 参数表示要销毁的条件变量。如果函数成功销毁 cond 参数指定的条件变量,返回数字 0,反之返回非零数。

值得一提的是,销毁后的条件变量还可以调用 pthread_cond_init() 函数重新初始化后使用。

条件变量的实际应用

接下来,通过一个实例给您演示条件变量的具体用法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
//初始化互斥锁
pthread_mutex_t myMutex = PTHREAD_MUTEX_INITIALIZER;
//初始化条件变量
pthread_cond_t myCond = PTHREAD_COND_INITIALIZER;
//设置全局变量
int x = 0;
//线程执行的函数
void * waitForTrue(void *args) {
int res;
//条件变量阻塞线程之前,先对互斥锁执行“加锁”操作
res = pthread_mutex_lock(&myMutex);
if (res != 0) {
printf("waitForTrue 加锁失败\n");
return NULL;
}
printf("------等待 x 的值为 10\n");
if (pthread_cond_wait(&myCond, &myMutex) == 0) { //遇到pthread_cond_wait 就会卡住,直到另一个线程遇到pthread_cond_signal,这里才会执行下一步
printf("x = %d\n", x);
}
//最终将互斥锁解锁
pthread_mutex_unlock(&myMutex);
return NULL;
}
//线程执行的函数
void * doneForTrue(void *args) {
int res;
while (x != 10) {
//对互斥锁执行“加锁”操作
res = pthread_mutex_lock(&myMutex);
if (res == 0) {
x++;
printf("doneForTrue:x = %d\n", x);
sleep(1);
//对互斥锁“解锁”
pthread_mutex_unlock(&myMutex);
}
}
//发送“条件成立”的信号,解除 mythread1 线程的“被阻塞”状态
res = pthread_cond_signal(&myCond);
if (res != 0) {
printf("解除阻塞失败\n");
}
return NULL;
}
int main() {
int res;
pthread_t mythread1, mythread2;
res = pthread_create(&mythread1, NULL, waitForTrue, NULL);
if (res != 0) {
printf("mythread1线程创建失败\n");
return 0;
}
res = pthread_create(&mythread2, NULL, doneForTrue, NULL);
if (res != 0) {
printf("mythread2线程创建失败\n");
return 0;
}
//等待 mythread1 线程执行完成
res = pthread_join(mythread1, NULL);
if (res != 0) {
printf("1:等待线程失败\n");
}
//等待 mythread2 线程执行完成
res = pthread_join(mythread2, NULL);
if (res != 0) {
printf("2:等待线程失败\n");
}
//销毁条件变量
pthread_cond_destroy(&myCond);
return 0;
}

假设程序编写在 thread.c 文件中,执行过程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
[root@localhost ~]# gcc thread.c -o thread.exe -lpthread
[root@localhost ~]# ./thread.exe
------等待 x 的值为 10
doneForTrue:x = 1
doneForTrue:x = 2
doneForTrue:x = 3
doneForTrue:x = 4
doneForTrue:x = 5
doneForTrue:x = 6
doneForTrue:x = 7
doneForTrue:x = 8
doneForTrue:x = 9
doneForTrue:x = 10
x = 10

程序中共创建了 2 个线程 mythread1 和 mythread2,其中 mythread1 线程借助条件变量实现了“直到变量 x 的值为 10 时,才继续执行后续代码”的功能,mythread1 线程用于将 x 的变量修改为 10,同时向 mythread1 线程发送“条件成立”的信号,唤醒 mythread1 线程并继续执行。

某一个线程遇到pthread_cond_wait 就会卡住,直到另一个线程遇到pthread_cond_signal,前一个线程这里才会继续执行下一步。

std::condition_variable::native_handle

https://zh.cppreference.com/w/cpp/thread/condition_variable/native_handle

用于linux系统、posix系统

此函数结果的含义和类型是实现定义的。 POSIX 系统上,这可以是 pthread_cond_t* 类型值。 Windows 系统上,这可以是 PCONDITION_VARIABLE 。