【注意】最后更新于 May 2, 2021,文中内容可能已过时,请谨慎食用。
这是一道经典的同步问题。我没有那么强大的直觉,只能一步一步分析了。
生产者消费者问题
问题描述
一组生产者进程和一组消费者进程共享一个初始为空、大小为 n 的缓冲区,只有缓冲区没满时,生产者才能把消息放入到缓冲区,否则必须等待;只有缓冲区不空时,消费者才能从中取出消息,否则必须等待。由于缓冲区是临界资源,它只允许一个生产者放入消息,或者一个消费者从中取出消息。
问题分析
- 关系分析:生产者和消费者对缓冲区互斥访问是互斥关系,同时生产者和消费者又是一个相互协作的关系,只有生产者生产之后,消费者才能消费,它们也是同步关系。
- 整理思路:这里比较简单,只有生产者和消费者两个进程,且这两个进程存在着互斥关系和同步关系。那么需要解决的是互斥和同步的PV操作的位置。
- 信号量设置:信号量
mutex
作为互斥信号量,用于控制互斥访问缓冲池,初值为1;信号量 full
用于记录当前缓冲池中“满”缓冲区数,初值为 0;信号量 empty
用于记录当前缓冲池中“空”缓冲区数,初值为n。
缓冲区包含:
- n 个空间,用数组 buffer 表示
- count: 缓冲区中的产品数
- in: 生产者可以放产品的空间,
buffer[in]
- out: 消费者可以取产品的空间,
buffer[out]
单消费者与生产者的情况
最简单的实现
一个简单的伪代码实现如下:
生产者:
1
2
3
4
5
6
7
8
9
10
11
|
producer()
{
while(true)
{
produce_an_item ; 生产一个产品
while(count == n); 当缓冲区满时循环等待
buffer[in] = item; 将 item 放入缓冲区中
in = (in + 1) % n; 生产者缓冲区指针++
count++; 缓冲区产品数++
}
}
|
消费者
1
2
3
4
5
6
7
8
9
10
11
|
consumer()
{
while(true)
{
while(count == 0); 缓冲区空时循环等待
item = buffer[out]; 取出产品
out = (out + 1) % n;
count--;
consumer_the_item; 消费产品
}
}
|
很明显,上述代码是线程不安全的。对缓冲区的产品数量 count 的操作可能会导致奇怪的变化。
我们设有以下的情况:初始缓冲区内有 50 个产品,我们让生产者生产 5000 个产品,消费者消费 5000 个产品。理论上最终缓冲区内还会剩余 5000 个产品。对于这个例子,上述代码实现如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
int count;
char buffer[101];
int n;
int in, out;
void producer()
{
int i = 0;
while (i != 5000)
{
while (count == n)
;
buffer[in] = 1;
in = (in + 1) % n;
count++;
printf("time: %d, producer count: %d\n", i, count);
i++;
}
}
void consumer()
{
int i = 0;
while (i != 5000)
{
while (count == 0)
;
buffer[out] = 0;
out = (out + 1) % n;
count--;
printf("time: %d, consumer count: %d\n", i, count);
++i;
}
}
void *thread_work(void *arg)
{
consumer();
}
int main()
{
pthread_t tid;
n = 100;
count = 50;
in = 50;
out = 0;
pthread_create(&tid, NULL, thread_work, NULL);
producer();
}
|
跑出来后的结果如下:
很明显和我们的预期结果不符。。
加入锁或信号量
为了解决这个问题,我们可以用锁或是信号量加以修改。修改后的伪代码如下:
生产者:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
sex_init(mutex, 0, 1); // mutex_init(&mutex)
producer(){
while(true)
{
produce_an_item ; 生产一个产品
while(count == n); 当缓冲区满时循环等待
buffer[in] = item; 将 item 放入缓冲区中
in = (in + 1) % n; 生产者缓冲区指针++
sem_wait(&mutex); // mutex_lock(&mutex);
count++; 缓冲区产品数++
sem_post(&mutex); // mutex_unlock(&mutex);
}
}
|
消费者:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
consumer()
{
while(true)
{
while(count == 0); 缓冲区空时循环等待
item = buffer[out]; 取出产品
out = (out + 1) % n;
sem_wait(&mutex); // mutex_lock(&mutex);
count--;
sem_post(&mutex); // mutex_unlock(&mutex);
consumer_the_item; 消费产品
}
}
|
参考代码如下:
加锁:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
int count;
char buffer[101];
int n;
int in, out;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
void producer()
{
int i = 0;
while (i != 5000)
{
while (count == n)
;
buffer[in] = 1;
in = (in + 1) % n;
pthread_mutex_lock(&mutex);
count++;
pthread_mutex_unlock(&mutex);
printf("time: %d, producer count: %d\n", i, count);
i++;
}
}
void consumer()
{
int i = 0;
while (i != 5000)
{
while (count == 0)
;
buffer[out] = 0;
out = (out + 1) % n;
pthread_mutex_lock(&mutex);
count--;
pthread_mutex_unlock(&mutex);
printf("time: %d, consumer count: %d\n", i, count);
++i;
}
}
void *thread_work(void *arg)
{
consumer();
}
int main()
{
pthread_t tid;
n = 100;
count = 50;
in = 50;
out = 0;
pthread_create(&tid, NULL, thread_work, NULL);
producer();
}
|
加信号量:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include <semaphore.h>
int count;
char buffer[101];
int n;
int in, out;
sem_t mutex;
void producer()
{
int i = 0;
while (i != 5000)
{
// usleep(100);
while (count == n)
;
buffer[in] = 1;
in = (in + 1) % n;
sem_wait(&mutex);
count++;
sem_post(&mutex);
printf("time: %d, producer count: %d\n", i, count);
i++;
}
}
void consumer()
{
int i = 0;
while (i != 5000)
{
// usleep(2000);
while (count == 0)
;
buffer[out] = 0;
out = (out + 1) % n;
sem_wait(&mutex);
count--;
sem_post(&mutex);
printf("time: %d, consumer count: %d\n", i, count);
++i;
}
}
void *thread_work(void *arg)
{
consumer();
}
int main()
{
pthread_t tid;
n = 100;
count = 50;
in = 50;
out = 0;
sem_init(&mutex, 0, 1);
pthread_create(&tid, NULL, thread_work, NULL);
producer();
}
|
这里信号量设置的是初始值为 1,实际上在这里的效果和锁是等价的。那么锁和信号相比究竟有什么区别呢?
实际上,锁与信号量相比增加了所有权的概念,一只锁住的 Mutex 只能由给它上锁的线程解开,只有系铃人才能解铃。Mutex 的功能也就因而限制在了构造临界区上。
一元信号量则可以由任一线程解开。这样多出来的一份语义,就是解决读者-写者问题的工具。比如某进程读取磁盘并进入睡眠,等待中断读取盘块结束之后来唤醒它。这就是可以祭出一元信号量的一个情景,而 Mutex 是解决不了的。『信号量』 这个词本身来自火车站的信号灯,其实本来就暗含着一层 『通知』 的含义。
上述代码的结果是符合预期的。但里面有没有其他问题呢?
避免忙等
进程的基本状态图如下:
进程在阻塞状态可以分为以下几种情况:
-
死等状态:进程在有限时间内根本不能进入临界区,而一直在尝试进入,陷入一种无结果的等待状态。
(没有进入临界区的正在等待的某进程根本无法获得临界资源而进入进程,这种等待是无结果的,是死等状态~)-> 这个时候应该放弃这个无结果的事情,保证自己等待的时间是有限的
-
忙等状态:当一个进程正处在某临界区内,任何试图进入其临界区的进程都必须进入代码连续循环,陷入忙等状态。连续测试一个变量直到某个值出现为止,称为忙等。
(没有进入临界区的正在等待的某进程不断的在测试循环代码段中的变量的值,占着处理机而不释放,这是一种忙等状态~)-> 这个时候应该释放处理机让给其他进程
-
有限等待:对要求访问临界资源的进程,应保证有限时间内能进入自己的临界区,以免陷入“死等”状态~(受惠的是进程自己)
-
让权等待:当进程不能进入自己的临界区时,应立即释放处理机,以免进程陷入“忙等”状态~(受惠的是其他进程)
上述代码中对 count
的判断会消耗大量的资源,使进程陷入忙等状态。要消除这种状态,我们要考虑以下几个问题:
- 对生产者来说,“资源”是什么?
- 空盒子数
- 定义信号量 empty,初始为 n
- 每次放入一个新产品时,需要
sem_wait(empty)
- 对于消费者来说,“资源”是什么?
- 产品数
- 定义信号量 full,初始为 0
- 每次消耗一个产品时,需要
sem_wait(full)
- 生产者和消费者还需要做什么操作?
- 生产者需要
sem_signal(full)
- 消费者需要
sem_signal(empty)
一个简单的思路就出现了,伪代码表示如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
sem_t full, empty;
sem_init(&full, 0, 0);
sem_init(&empty, 0, n);
producer(){
while(true){
produce_an_item
sem_wait(empty);
buffer[in] = item;
in = (in + 1) % n;
sem_signal(full);
}
}
consumer(){
while(true){
sem_wait(full);
item = buffer[out];
out = (out + 1) % n;
sem_signal(empty);
consume_the_item;
}
}
|
多消费者与生产者的情况
简单的实现
但是遇到多个消费者和生产者的时候,他们相互之间会产生竞争。为了避免上述问题的出现,我们还需要锁来保护。大概思路如下:
- 生产者
- 如果缓冲区非满时,申请锁;
- 生产产品
- 释放锁,释放非空信号
- 消费者
- 如果缓冲区非空时,申请锁;
- 消费产品
- 释放锁,释放非满信号
伪代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
sem_t full; // # of filled slots
sem_t empty; // # of empty slots
sem_t mutex; // mutual exclusion
sem_init(&full, 0, 0)
sem_init(&empty, 0, N)
sem_init(&mutex, 0, 1)
producer() {
while (true) {
sem_wait(mutex);
sem_wait(empty);
produce_an_item;
buffer[in] = item;
in = (in + 1)%n;
sem_signal(full);
sem_signal(mutex);
}
}
consumer() {
while (true) {
sem_wait(mutex);
sem_wait(full);
item = buffer[out];
out = (out + 1)%n;
sem_signal(empty);
sem_signal(mutex);
consume_the_item;
}
}
|
避免死锁
但是实际上,尽管引入了新的变量后解决了竞争的问题,显而易见它也会带来死锁。解决这个的方法是如果没有等待到信号量,在循环中尝试等待并释放锁。实际上,这时候我们只需要一个信号量就可以了,因为大循环包含了对空/满的判断。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
sem_t cond;
mutex_t mutex;
producer() {
while (true) {
mutex_lock(mutex);
while(buffer_is_full())
mutex_unlock(mutex);
sem_wait(cond);
mutex_lock(mutex);
produce_an_item;
buffer[in] = item;
in = (in + 1)%n;
mutex_unlock(mutex);
sem_signal(cond);
}
}
consumer() {
while (true) {
sem_wait(mutex);
while(buffer_is_empty())
mutex_unlock(mutex);
sem_wait(full)
mutex_lock(mutex)
sem_wait(full);
item = buffer[out];
out = (out + 1)%n;
mutex_unlock(mutex);
sem_signal(cond);
consume_the_item;
}
}
|
不过在 Linux 中,pthread_cond_wait(cond, mutex)
参数已经被设置好了,可以简化我们的一部分操作。至于原因可以参考这篇文章。
最终的参考代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
|
#include <stdio.h>
#include <pthread.h>
#define CAPACITY 100 // 缓冲区的最大容量
int buffer[CAPACITY]; // 缓冲区数组
int in; // 缓冲区的写指针
int out; // 缓冲区的读指针
int size; // 缓冲区中的数据个数
void buffer_init()
{
in = 0;
out = 0;
size = 0;
}
// 判断缓冲区是否为空
int buffer_is_empty()
{
return size == 0;
}
// 判断缓冲区是否为满
int buffer_is_full()
{
return size == CAPACITY;
}
// 向缓冲区中追加一个数据
void buffer_put(int item)
{
buffer[in] = item;
in = (in + 1) % CAPACITY;
size++;
}
// 从缓冲区中取走一个数据
int buffer_get()
{
int item;
item = buffer[out];
out = (out + 1) % CAPACITY;
size--;
return item;
}
pthread_cond_t cond;
pthread_mutex_t mutex;
// 生产者线程执行的流程
void *producer_loop(void *arg)
{
int i;
// 生产CAPACITY*2个数据
for (i = 0; i < CAPACITY*2; i++) {
printf("produce %d\n", i);
pthread_mutex_lock(&mutex);
// 当缓冲区为满时,生产者需要等待
while (buffer_is_full()) {
// 当前线程已经持有了mutex,首先释放mutex,然后阻塞,醒来后再次获取mutex
pthread_cond_wait(&cond, &mutex);
}
// 此时,缓冲区肯定不是满的,向缓冲区写数据
buffer_put(i);
pthread_mutex_unlock(&mutex);
// 缓冲区的状态发生了变化,唤醒其它的生产者或消费者
pthread_cond_signal(&cond);
}
return NULL;
}
// 消费者线程执行的流程
void *consumer_loop(void *arg)
{
int i;
// 消费CAPACITY*2个数据
for (i = 0; i < CAPACITY*2; i++) {
pthread_mutex_lock(&mutex);
// 当缓冲区为空时,消费者需要等待
while (buffer_is_empty()) {
// 当前线程已经持有了mutex,首先释放mutex,然后阻塞,醒来后再次获取mutex
pthread_cond_wait(&cond, &mutex);
}
// 此时,缓冲区肯定不是空的,从缓冲区取数据
int item = buffer_get();
pthread_mutex_unlock(&mutex);
// 缓冲区的状态发生了变化,唤醒其它的生产者或消费者
pthread_cond_signal(&cond);
printf("\tconsume %d\n", item);
}
return NULL;
}
int main()
{
pthread_t producer1, producer2;
pthread_t consumer1, consumer2;
buffer_init();
pthread_create(&producer1, NULL, producer_loop, NULL);
pthread_create(&producer2, NULL, producer_loop, NULL);
pthread_create(&consumer1, NULL, consumer_loop, NULL);
pthread_create(&consumer2, NULL, consumer_loop, NULL);
pthread_join(producer1, NULL);
pthread_join(producer2, NULL);
pthread_join(consumer1, NULL);
pthread_join(consumer2, NULL);
return 0;
}
|
这个问题分析起来好像很简单,但是在添加锁和信号量之后又会引入很多新的问题。在解决它的过程中需要一步一步分析逻辑才能避开这些问题。
以后有机会一定要看看多线程、锁、信号量源码级别的实现。。
另外自己以后写文章的时候一定要列一个大纲,我感觉这篇文章的细节就不够清晰。。
参考文章
【操作系统】死等状态、忙等状态、有限等待、让权等待
线程安全与线程不安全
如何写出线程不安全的代码
semaphore和mutex的区别?
为什么pthread_cond_wait需要互斥锁mutex作为参数
文章作者
QRZ
上次更新
2021-05-02
(0e7621f)