操作系统:经典同步问题

在进行linux的编程的时候,经常会遇到一些坑和问题。稍微整理了一下,有遇到的时候可以复习。
比如pthread的使用,创建,取消,阻塞和非阻塞的问题。对于临界区资源的访问,也是一个在多
线程和多进程编程中,所会经常遇到的问题。

一、生产者-消费者问题

生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了两个共享固定大小缓冲区的线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。wiki

关键: 同步概念+互斥概念

同步:当生产者只有当缓存区不满的时候才可以写入数据,消费者只有当缓存区不空的时候才可以读出数据

互斥:生产者在写的时候消费者不可以读,消费者在读的时候生产者不可以写。

线程和进程十分相似,不同的只是线程比进程小。首先,线程采用了多个线程可共享资源的设计思想;例如,它们的操作大部分都是在同一地址空间进行的。其次,从一个线程切换到另一线程所花费的代价比进程低。再次,进程本身的信息在内存中占用的空间比线程大,因此线程更能允分地利用内存。

程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
#include <stdio.h>
#include <stdlib.h>
#include <semaphore.h>
#include <pthread.h>
#include <unistd.h>
typedef int buffer_item;
#define BUFFER_SIZE 5
buffer_item buffer[BUFFER_SIZE];
sem_t empty;
sem_t mutex;
sem_t full;
int in;
int out;
// producer
int insert_item(buffer_item item) {
// insert item into buffer
sem_wait(&empty); // 同步信号量
sem_wait(&mutex); // 互斥信号量
// add nextp to buffer
buffer[in] = item;
in = (in + 1) % BUFFER_SIZE;
sem_post(&mutex);
sem_post(&full);
return 0;
}
int remove_item(buffer_item *item) {
sem_wait(&full);
sem_wait(&mutex);
// remove an item from buffer to nextc
*item = buffer[out];
out = (out + 1) % BUFFER_SIZE;
sem_post(&mutex);
sem_post(&empty);
return 0;
}
void *producer(void *param) {
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
buffer_item random;
while (1) {
sleep(rand()%10);
random = rand();
printf("producer produced %d\n", random);
if (insert_item(random))
printf("report error condition\n");
}
}
void *consumer(void *param) {
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
buffer_item random;
while(1) {
sleep(rand()%10);
if (remove_item(&random))
printf("report error condition\n");
else
printf("consumer consumed %d\n", random);
}
}
int main(int argc, char *argv[]){
// get command line
if (argc < 4) {
printf("please input sleep time, number of producer and consumer\n");
return 0;
}
int sleep_t = atoi(argv[1]);
int producer_n = atoi(argv[2]);
int consumer_n = atoi(argv[3]);
// initialize buffer
int i = 0;
sem_init(&empty, 0, BUFFER_SIZE);
sem_init(&mutex, 0, 1);
sem_init(&full, 0, 0);
in = 0;
out = 0;
exit_pthread = 0;
for (i = 0; i < BUFFER_SIZE; i++) {
buffer[i] = 0;
}
// create producer pthread
pthread_t tid1[producer_n], tid2[consumer_n];
pthread_attr_t attr;
/*get the default attributes */
pthread_attr_init(&attr);
/* create pthreads */
for (i = 0; i < 5; i++)
pthread_create(&tid1[i], &attr, producer, NULL);
// create consumer pthread
for (i = 0; i < 5; i++)
pthread_create(&tid2[i], &attr, consumer, NULL);
sleep(sleep_t); // seconds
return 0;
}

结果

分析

生产者和消费者的程序设计主要采用了sem_t二进制信号量(初始化为1)来实现互斥,sem_t计数信号量来实现同步。linux下的wait和signal函数和课本上的同步功能描述的函数不是相同的。命令行参数中读取

  • 程序终止前的休眠时间
  • 生产者线程数量
  • 消费者线程数量

其中,主进程结束线程其实也会自动结束。mutex也可以使用pthread_mutex互斥信号量来实现。

观察程序的输出结果可以发现,生产者生产之后,消费者才可以进行消费。

几点要注意的地方:

  • sleep的单位是以秒计算
  • 在linux系统下,使用了pthread的进行编译需要加上-pthread才能正常链接到pthread的库才可以正常通过编译,而osx下不需要。(或者-lrt可以更灵活地链接到POSIX标准下的所有库)
  • osx下使用不了未命名的sem函数

参考

stack overflow:Undefined Reference issues using Semaphores

二、读者-写者问题

1.读者优先

定义

如果一个读者申请进行读操作时已有另一个读者正在进行读操作,则该读者可直接开始读操作。

实现

通过建立一个在读取的读者队列(并非真正使用队列,使用reader_count来对正在读的读者进行计数),当读者队列有读者在读取的时候就阻塞写者。没有读者在读的时候就不阻塞写者。

程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#include <stdio.h>
#include <stdlib.h>
#include <semaphore.h>
#include <pthread.h>
#include <unistd.h>
#include <string.h>
#define MAX_DATA_NUM 25
// 不加typedef 声明的时候要加struct
typedef struct data {
int pid;
int RW;
int delay;
int last_time;
}data;
sem_t wrt;
sem_t read_mutex;
sem_t write_mutex;
int read_count;
int shared_int;
void *reader(void* param) {
data test = *(data*) param;
sleep(test.delay);
do {
sem_wait(&read_mutex);
read_count++;
// 第一个读者到来的时候,必须申请临界区资源
// 当有写者在写的时候必须等待
if (read_count == 1) {
sem_wait(&wrt);
}
sem_post(&read_mutex);
printf("Reader %d read data: %d, %d reading\n", test.pid, shared_int, read_count);
sleep(test.last_time); // 读所持续的时间
sem_wait(&read_mutex);
read_count--;
if (read_count == 0) {
sem_post(&wrt);
}
sem_post(&read_mutex);
} while(1);
}
void *writer(void* param) {
data test = *(data*) param;
sleep(test.delay);
do {
//sem_wait(&write_mutex); // 防止读者与写者抢,有一个写者在临界区,一个读者一个写者等待的情况
sem_wait(&wrt);
shared_int = rand();
printf("Writer %d write data %d\n", test.pid, shared_int);
sleep(test.last_time);
sem_post(&wrt);
//sem_post(&write_mutex);
} while(1);
}
int main(int argc, char *argv[]) {
// 初始化
read_count = 0;
sem_init(&wrt, 0, 1);
sem_init(&read_mutex, 0, 1);
sem_init(&write_mutex, 0, 1);
shared_int = 0;
FILE *stream;
char cmd[10];
if (argc < 2) {
fprintf(stderr ,"Please input test file\n");
return 1;
}
if ((stream = fopen(argv[1],"r")) == NULL) {
fprintf(stderr, "Cannot open file %s\n", argv[1]);
return 1;
}
pthread_attr_t attr;
pthread_attr_init(&attr);
int reader_num = 0;
int writer_num = 0;
int max_wait_time = 0;
int count = 0;
int i = 0;
data test[MAX_DATA_NUM];
while (fgets(cmd,20,stream) != NULL && count <= MAX_DATA_NUM) {
if (cmd[0] == '\n') break;
printf("%s", cmd);
char *p;
/*在第一次调用时,strtok()必需给予参数s 字符串,往后的调用则将参数s 设置成NULL。
每次调用成功则返回下一个分割后的字符串指针.内部的实现可以用一个static变量来保存指针*/
p = strtok(cmd, " ");
if (p) {
test[count].pid = atoi(p);
}
for (i = 0; i < 3; i++) {
p = strtok(NULL, " ");
switch (i) {
case 0 :
test[count].RW = (p[0] == 'R') ? 0 : 1;
break;
case 1 :
test[count].delay = atoi(p);
break;
case 2 :
test[count].last_time = atoi(p);
break;
default: break;
}
}
count++;
}
pthread_t tid[count];
for (i = 0; i < count; i++) {
if (test[i].RW == 0) {
pthread_create(&tid[i], &attr, reader, (void*)&test[i]);
} else {
pthread_create(&tid[i], &attr, writer, (void*)&test[i]);
}
}
sleep(15);
return 0;
}

测试文件

1
2
3
4
5
6
7
8
9
10
1 R 3 5
2 R 3 5
3 R 3 5
4 W 4 5
5 R 4 5
6 R 5 5
7 R 5 5
8 R 5 5
9 R 6 5
10 R 6 5

结果

分析

在读者优先中,只有在密集的读者中插入一个写者。写者遇到了长时间的饥饿。

存在的问题

在这种算法中,如果有一个写者进入了临界区。有一个读者和一个写者在等待。第一个写者出临界区释放wrt之后,第二个写者是不是需要和读者抢占,是不是就不能保证读者优先进入。这种情况在读者优先的条件中吗?

2.写者优先

定义

如果一个读者申请进行读操作时已有另一写者在等待访问共享资源,则该读者必须等到没有写者处于等待状态后才能开始读操作。

实现

有写者开始等待,就要开始阻塞读者进入读者队列进行读取。当读者队列为0,写者可以开始写。之后的情况就跟读者优先相同,主要是实现写-读的互斥,只有在写完之后才能读。主要就是需要增加一个写者队列让写者进行等待,和一个新的二进制信号量,在写者开始等待的时候就阻塞读者进入读者队列。

程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
#include <stdio.h>
#include <stdlib.h>
#include <semaphore.h>
#include <pthread.h>
#include <unistd.h>
#include <string.h>
#define MAX_DATA_NUM 25
// 不加typedef 声明的时候要加struct
typedef struct data {
int pid;
int RW;
int delay;
int last_time;
}data;
sem_t wrt;
sem_t read_mutex;
sem_t write_mutex;
sem_t read_queue;
int read_count;
int write_count;
int shared_int;
void *reader(void* param) {
data test = *(data*) param;
sleep(test.delay);
do {
// 增加一个互斥量read,在有写者在写或等待的时候必须等待
// 也就是说写者队列有写者的时候都必须进行等待。
sem_wait(&read_queue); // 当写者抢占到之后,读者被阻塞在此,不能进队
sem_wait(&read_mutex);
read_count++;
// 第一个读者到来的时候,必须申请临界区资源
// 当有写者在写的时候必须等待
if (read_count == 1)
sem_wait(&wrt);
sem_post(&read_mutex);
sem_post(&read_queue); // 写者的抢占时机,抢占到之后只需要等待读者队列全部读完就可以。
printf("Reader %d read data: %d, %d reading\n", test.pid, shared_int);
sleep(t\est.last_time); // 读所持续的时间
sem_wait(&read_mutex);
read_count--;
if (read_count == 0)
sem_post(&wrt);
sem_post(&read_mutex);
} while(1);
}
void *writer(void* param) {
data test = *(data*) param;
sleep(test.delay);
do {
sem_wait(&write_mutex);
write_count++;
if (write_count == 1) // 写者队列有人就阻塞读者
sem_wait(&read_queue);
sem_post(&write_mutex);
sem_wait(&wrt);
shared_int = rand();
printf("Writer %d write data %d, %d writing\n", test.pid, shared_int, write_count);
sleep(test.last_time);
sem_post(&wrt);
sem_wait(&write_mutex);
write_count--;
if (write_count == 0)
sem_post(&read_queue);
sem_post(&write_mutex);
} while(1);
}
int main(int argc, char *argv[]) {
// 初始化
read_count = 0;
write_count = 0;
sem_init(&wrt, 0, 1);
sem_init(&read_queue, 0, 1);
sem_init(&read_mutex, 0, 1);
sem_init(&write_mutex, 0, 1);
shared_int = 0;
FILE *stream;
char cmd[10];
if (argc < 2) {
fprintf(stderr ,"Please input test file\n");
return 1;
}
if ((stream = fopen(argv[1],"r")) == NULL) {
fprintf(stderr, "Cannot open file %s\n", argv[1]);
return 1;
}
pthread_attr_t attr;
pthread_attr_init(&attr);
int reader_num = 0;
int writer_num = 0;
int max_wait_time = 0;
int count = 0;
int i = 0;
data test[MAX_DATA_NUM];
while (fgets(cmd,20,stream) != NULL) {
if (cmd[0] == '\n') break;
printf("%s", cmd);
char *p;
/*在第一次调用时,strtok()必需给予参数s 字符串,往后的调用则将参数s 设置成NULL。
每次调用成功则返回下一个分割后的字符串指针.内部的实现可以用一个static变量来保存指针*/
p = strtok(cmd, " ");
if (p) {
test[count].pid = atoi(p);
}
for (i = 0; i < 3; i++) {
p = strtok(NULL, " ");
switch (i) {
case 0 :
test[count].RW = (p[0] == 'R') ? 0 : 1;
break;
case 1 :
test[count].delay = atoi(p);
break;
case 2 :
test[count].last_time = atoi(p);
break;
default: break;
}
}
count++;
}
pthread_t tid[count];
for (i = 0; i < count; i++) {
if (test[i].RW == 0) {
pthread_create(&tid[i], &attr, reader, (void*)&test[i]);
} else {
pthread_create(&tid[i], &attr, writer, (void*)&test[i]);
}
}
sleep(35);
return 0;
}

结果

测试文件同上

但是发现,几乎每次写者结束之后都再次抢赢了在等待的读者。造成读者的饥饿。

问题与改进

因为对read_queue和write_mutex的释放几乎同时于是在两条指令之间增加了一条sleep延迟指令(但是不知道这种方法是否合适,从结果来看是合理的)。得到下面的结果

发现写者等待可以阻塞读者,在读者读完后写者写入。

二、哲学家就餐问题(管程实现)

程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include <semaphore.h>
#include <iostream>
sem_t next;
pthread_mutex_t mutex;
class monitor {
private:
enum {THIKING, HUNGRY, EATING} state[5];
pthread_cond_t self[5];
public:
monitor() {
int i;
for (i = 0; i < 5; i++)
state[i] = THIKING;
pthread_cond_init(&self[i],NULL);
}
void pickup(int i) {
state[i] = HUNGRY;
test(i);
if (state[i] != EATING) {
pthread_cond_wait(&self[i], &mutex);
}
}
void putdown(int i) {
state[i] = THIKING;
printf("Philosopher %d finished \n", i + 1);
test((i + 4) % 5);
test((i + 1) % 5);
}
void test(int i) {
if ((state[(i + 4) % 5] != EATING) &&
(state[i] == HUNGRY) &&
(state[(i + 1) % 5] != EATING)) {
state[i] = EATING;
printf("Philosopher %d begin eating\n", i + 1);
pthread_cond_signal(&self[i]);
}
}
}dp;
void *philosopher(void* param) {
int pid = *(int*) param;
while(1) {
pthread_mutex_lock(&mutex); // 修改
dp.pickup(pid);
sleep(3);
dp.putdown(pid);
pthread_mutex_unlock(&mutex);
}
}
int main() {
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_t tid[5];
pthread_mutex_init(&mutex, NULL);
sem_init(&next, 0, 1);
int id[5];
int i;
for (i = 0; i < 5; i++) {
id[i] = i;
pthread_create(&tid[i], &attr, philosopher, (void*)&id[i]);
}
sleep(15);
return 0;
}

结果

分析

使用了pthread_cond_t 来作为条件变量。使线程在pthread_cond_wait()上进行等待的时候,外面还需要有一个互斥锁来保护条件变量的唤醒不受到竞争。5个哲学家共用筷子的话,一次可以不止一个哲学家在吃。所以就不限制只能一个线程进入管程,在管程内部实现互斥,同个时间内只能有一个线程是活动的。程序员就可以不用显示地进行同步编程。由于pickup和putdown两个动作之间还间隔者一个吃饭的动作,所以putdown和另一个进程的pickup不会构成竞争。putdown对临界区的操作就可以直接还是用mutex作为互斥锁来实现互斥。

进程开始在条件变量上等待或者结束之后,就会释放mutex,其他进程就可以进入管程。这样子可以保证只有一个程序在管程内运行但是有多个在等待。

您的支持将鼓励我继续创作!