操作系统:经典同步问题

在进行linux的编程的时候,经常会遇到一些坑和问题。稍微整理了一下,有遇到的时候可以复习。
比如pthread的使用,创建,取消,阻塞和非阻塞的问题。对于临界区资源的访问,也是一个在多
线程和多进程编程中,所会经常遇到的问题。

一、生产者-消费者问题

生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了两个共享固定大小缓冲区的线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。wiki

关键: 同步概念+互斥概念

同步:当生产者只有当缓存区不满的时候才可以写入数据,消费者只有当缓存区不空的时候才可以读出数据

互斥:生产者在写的时候消费者不可以读,消费者在读的时候生产者不可以写。

线程和进程十分相似,不同的只是线程比进程小。首先,线程采用了多个线程可共享资源的设计思想;例如,它们的操作大部分都是在同一地址空间进行的。其次,从一个线程切换到另一线程所花费的代价比进程低。再次,进程本身的信息在内存中占用的空间比线程大,因此线程更能允分地利用内存。

程序

1
2
#include <stdio.h>
3
#include <stdlib.h>
4
#include <semaphore.h>
5
#include <pthread.h>
6
#include <unistd.h>
7
8
typedef int buffer_item;
9
#define BUFFER_SIZE 5
10
buffer_item buffer[BUFFER_SIZE];
11
12
sem_t empty;
13
sem_t mutex;
14
sem_t full;
15
int in;
16
int out;
17
// producer
18
int insert_item(buffer_item item) {
19
	// insert item into buffer
20
	sem_wait(&empty); // 同步信号量
21
	sem_wait(&mutex); // 互斥信号量
22
23
	// add nextp to buffer
24
	buffer[in] = item;
25
	in = (in + 1) % BUFFER_SIZE;
26
27
	sem_post(&mutex);
28
	sem_post(&full);
29
	return 0;
30
}
31
int remove_item(buffer_item *item) {
32
	sem_wait(&full);
33
	sem_wait(&mutex);
34
	
35
	// remove an item from buffer to nextc
36
	*item = buffer[out];
37
	out = (out + 1) % BUFFER_SIZE;
38
39
	sem_post(&mutex);
40
	sem_post(&empty);
41
	return 0;
42
}
43
44
void *producer(void *param) {
45
	pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
46
	pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
47
	buffer_item random;
48
	while (1) {
49
		sleep(rand()%10);
50
		random = rand();
51
		printf("producer produced %d\n", random);
52
		if (insert_item(random))
53
			printf("report error condition\n");
54
	}
55
}
56
57
void *consumer(void *param) {
58
	pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
59
	pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
60
	buffer_item random;
61
	while(1) {
62
		sleep(rand()%10);
63
		if (remove_item(&random))
64
			printf("report error condition\n");
65
		else
66
			printf("consumer consumed %d\n", random);
67
	}
68
}
69
int main(int argc, char *argv[]){
70
	// get command line
71
	if (argc < 4) {
72
		printf("please input sleep time, number of producer and consumer\n");
73
		return 0;
74
	}
75
	int sleep_t = atoi(argv[1]);
76
	int producer_n = atoi(argv[2]);
77
	int consumer_n = atoi(argv[3]);
78
	// initialize buffer
79
	int i = 0;
80
	sem_init(&empty, 0, BUFFER_SIZE);
81
	sem_init(&mutex, 0, 1);
82
	sem_init(&full, 0, 0);
83
	in = 0;
84
	out = 0;
85
	exit_pthread = 0;
86
	for (i = 0; i < BUFFER_SIZE; i++) {
87
		buffer[i] = 0;
88
	}
89
	// create producer pthread
90
	pthread_t tid1[producer_n], tid2[consumer_n];
91
	pthread_attr_t attr;
92
	/*get the default attributes */
93
	pthread_attr_init(&attr);
94
	/* create pthreads */ 
95
	for (i = 0; i < 5; i++)
96
		pthread_create(&tid1[i], &attr, producer, NULL);
97
98
	// create consumer pthread
99
	for (i = 0; i < 5; i++)
100
		pthread_create(&tid2[i], &attr, consumer, NULL);
101
    sleep(sleep_t); // seconds
102
	return 0;
103
}

结果

分析

生产者和消费者的程序设计主要采用了sem_t二进制信号量(初始化为1)来实现互斥,sem_t计数信号量来实现同步。linux下的wait和signal函数和课本上的同步功能描述的函数不是相同的。命令行参数中读取

  • 程序终止前的休眠时间
  • 生产者线程数量
  • 消费者线程数量

其中,主进程结束线程其实也会自动结束。mutex也可以使用pthread_mutex互斥信号量来实现。

观察程序的输出结果可以发现,生产者生产之后,消费者才可以进行消费。

几点要注意的地方:

  • sleep的单位是以秒计算
  • 在linux系统下,使用了pthread的进行编译需要加上-pthread才能正常链接到pthread的库才可以正常通过编译,而osx下不需要。(或者-lrt可以更灵活地链接到POSIX标准下的所有库)
  • osx下使用不了未命名的sem函数

参考

stack overflow:Undefined Reference issues using Semaphores

二、读者-写者问题

1.读者优先

定义

如果一个读者申请进行读操作时已有另一个读者正在进行读操作,则该读者可直接开始读操作。

实现

通过建立一个在读取的读者队列(并非真正使用队列,使用reader_count来对正在读的读者进行计数),当读者队列有读者在读取的时候就阻塞写者。没有读者在读的时候就不阻塞写者。

程序

1
#include <stdio.h>
2
#include <stdlib.h>
3
#include <semaphore.h>
4
#include <pthread.h>
5
#include <unistd.h>
6
#include <string.h>
7
8
#define MAX_DATA_NUM 25
9
// 不加typedef 声明的时候要加struct
10
typedef struct data {
11
	int pid;
12
	int RW;
13
	int delay;
14
	int last_time;
15
}data;
16
sem_t wrt;
17
sem_t read_mutex;
18
sem_t write_mutex;
19
int read_count;
20
21
int shared_int;
22
23
void *reader(void* param) {
24
	data test = *(data*) param;
25
	sleep(test.delay);
26
	do {
27
		sem_wait(&read_mutex);
28
		read_count++;
29
		// 第一个读者到来的时候,必须申请临界区资源
30
		// 当有写者在写的时候必须等待
31
		if (read_count == 1) {
32
			sem_wait(&wrt);
33
		}
34
		sem_post(&read_mutex);
35
36
		printf("Reader %d read data: %d, %d reading\n", test.pid, shared_int, read_count);
37
		sleep(test.last_time); // 读所持续的时间
38
39
		sem_wait(&read_mutex);
40
		read_count--;
41
		if (read_count == 0) {
42
			sem_post(&wrt);
43
		}
44
		sem_post(&read_mutex);
45
	} while(1);
46
}
47
48
void *writer(void* param) {
49
	data test = *(data*) param;
50
	sleep(test.delay);
51
	do {
52
		//sem_wait(&write_mutex); // 防止读者与写者抢,有一个写者在临界区,一个读者一个写者等待的情况
53
		sem_wait(&wrt);
54
55
		shared_int = rand();
56
		printf("Writer %d write data %d\n", test.pid, shared_int);
57
		sleep(test.last_time);
58
59
		sem_post(&wrt);
60
		//sem_post(&write_mutex);
61
	} while(1);
62
}
63
64
int main(int argc, char *argv[]) {
65
	// 初始化
66
	read_count = 0;
67
	sem_init(&wrt, 0, 1);
68
	sem_init(&read_mutex, 0, 1);
69
	sem_init(&write_mutex, 0, 1);
70
71
	shared_int = 0;
72
	FILE *stream;
73
	char cmd[10];
74
	if (argc < 2) {
75
		fprintf(stderr ,"Please input test file\n");
76
		return 1;
77
	}
78
	if ((stream = fopen(argv[1],"r")) == NULL) {
79
		fprintf(stderr, "Cannot open file %s\n", argv[1]);
80
		return 1;
81
	}
82
	pthread_attr_t attr;
83
	pthread_attr_init(&attr);
84
	int reader_num = 0;
85
	int writer_num = 0; 
86
	int max_wait_time = 0;
87
	int count = 0;
88
	int i = 0;
89
	data test[MAX_DATA_NUM];
90
	while (fgets(cmd,20,stream) != NULL && count <= MAX_DATA_NUM) {
91
	    if (cmd[0] == '\n') break;
92
		printf("%s", cmd);
93
		char *p;
94
		/*在第一次调用时,strtok()必需给予参数s 字符串,往后的调用则将参数s 设置成NULL。
95
		每次调用成功则返回下一个分割后的字符串指针.内部的实现可以用一个static变量来保存指针*/
96
		p = strtok(cmd, " ");
97
		if (p) {
98
			test[count].pid = atoi(p);
99
		}
100
		for (i = 0; i < 3; i++) {
101
			p = strtok(NULL, " ");
102
			switch (i) {
103
				case 0 :
104
					test[count].RW = (p[0] == 'R') ? 0 : 1;
105
					break;
106
				case 1 :
107
					test[count].delay = atoi(p);
108
					break;
109
				case 2 :
110
					test[count].last_time = atoi(p);
111
					break;
112
				default: break;
113
			}
114
		}
115
		count++;
116
	}
117
	pthread_t tid[count];
118
	for (i = 0; i < count; i++) {
119
		if (test[i].RW == 0) {
120
			pthread_create(&tid[i], &attr, reader, (void*)&test[i]);
121
		} else {
122
			pthread_create(&tid[i], &attr, writer, (void*)&test[i]);
123
		}
124
	}
125
	sleep(15);
126
	return 0;
127
}

测试文件

1
1 R 3 5
2
2 R 3 5
3
3 R 3 5
4
4 W 4 5
5
5 R 4 5
6
6 R 5 5
7
7 R 5 5
8
8 R 5 5
9
9 R 6 5
10
10 R 6 5

结果

分析

在读者优先中,只有在密集的读者中插入一个写者。写者遇到了长时间的饥饿。

存在的问题

在这种算法中,如果有一个写者进入了临界区。有一个读者和一个写者在等待。第一个写者出临界区释放wrt之后,第二个写者是不是需要和读者抢占,是不是就不能保证读者优先进入。这种情况在读者优先的条件中吗?

2.写者优先

定义

如果一个读者申请进行读操作时已有另一写者在等待访问共享资源,则该读者必须等到没有写者处于等待状态后才能开始读操作。

实现

有写者开始等待,就要开始阻塞读者进入读者队列进行读取。当读者队列为0,写者可以开始写。之后的情况就跟读者优先相同,主要是实现写-读的互斥,只有在写完之后才能读。主要就是需要增加一个写者队列让写者进行等待,和一个新的二进制信号量,在写者开始等待的时候就阻塞读者进入读者队列。

程序

1
#include <stdio.h>
2
#include <stdlib.h>
3
#include <semaphore.h>
4
#include <pthread.h>
5
#include <unistd.h>
6
#include <string.h>
7
8
#define MAX_DATA_NUM 25
9
// 不加typedef 声明的时候要加struct
10
typedef struct data {
11
	int pid;
12
	int RW;
13
	int delay;
14
	int last_time;
15
}data;
16
17
sem_t wrt;
18
sem_t read_mutex;
19
sem_t write_mutex;
20
sem_t read_queue;
21
22
int read_count;
23
int write_count;
24
25
int shared_int;
26
27
void *reader(void* param) {
28
29
	data test = *(data*) param;
30
	sleep(test.delay);
31
32
	do {
33
		// 增加一个互斥量read,在有写者在写或等待的时候必须等待
34
		// 也就是说写者队列有写者的时候都必须进行等待。
35
36
		sem_wait(&read_queue); // 当写者抢占到之后,读者被阻塞在此,不能进队
37
		sem_wait(&read_mutex);
38
		read_count++;
39
		// 第一个读者到来的时候,必须申请临界区资源
40
		// 当有写者在写的时候必须等待
41
		if (read_count == 1)
42
			sem_wait(&wrt);
43
		sem_post(&read_mutex);
44
		sem_post(&read_queue); // 写者的抢占时机,抢占到之后只需要等待读者队列全部读完就可以。
45
46
		printf("Reader %d read data: %d, %d reading\n", test.pid, shared_int);
47
		sleep(t\est.last_time); // 读所持续的时间
48
49
		sem_wait(&read_mutex);
50
		read_count--;
51
		if (read_count == 0)
52
			sem_post(&wrt);
53
		sem_post(&read_mutex);
54
	} while(1);
55
}
56
57
void *writer(void* param) {
58
59
	data test = *(data*) param;
60
	sleep(test.delay);
61
62
	do {
63
		sem_wait(&write_mutex);
64
		write_count++;
65
		if (write_count == 1) // 写者队列有人就阻塞读者
66
			sem_wait(&read_queue);
67
		sem_post(&write_mutex);
68
69
		sem_wait(&wrt);
70
71
		shared_int = rand();
72
		printf("Writer %d write data %d, %d writing\n", test.pid, shared_int, write_count);
73
		sleep(test.last_time);
74
75
		sem_post(&wrt);
76
77
		sem_wait(&write_mutex);
78
		write_count--;
79
		if (write_count == 0)
80
			sem_post(&read_queue);
81
		sem_post(&write_mutex);
82
	} while(1);
83
}
84
85
int main(int argc, char *argv[]) {
86
	// 初始化
87
	read_count = 0;
88
	write_count = 0;
89
	sem_init(&wrt, 0, 1);
90
	sem_init(&read_queue, 0, 1);
91
	sem_init(&read_mutex, 0, 1);
92
	sem_init(&write_mutex, 0, 1);
93
94
	
95
	shared_int = 0;
96
	FILE *stream;
97
	char cmd[10];
98
	if (argc < 2) {
99
		fprintf(stderr ,"Please input test file\n");
100
		return 1;
101
	}
102
	if ((stream = fopen(argv[1],"r")) == NULL) {
103
		fprintf(stderr, "Cannot open file %s\n", argv[1]);
104
		return 1;
105
	}
106
	pthread_attr_t attr;
107
	pthread_attr_init(&attr);
108
	int reader_num = 0;
109
	int writer_num = 0; 
110
	int max_wait_time = 0;
111
	int count = 0;
112
	int i = 0;
113
	data test[MAX_DATA_NUM];
114
	while (fgets(cmd,20,stream) != NULL) {
115
	    if (cmd[0] == '\n') break;
116
		printf("%s", cmd);
117
		char *p;
118
		/*在第一次调用时,strtok()必需给予参数s 字符串,往后的调用则将参数s 设置成NULL。
119
		每次调用成功则返回下一个分割后的字符串指针.内部的实现可以用一个static变量来保存指针*/
120
		p = strtok(cmd, " ");
121
		if (p) {
122
			test[count].pid = atoi(p);
123
		}
124
		for (i = 0; i < 3; i++) {
125
			p = strtok(NULL, " ");
126
			switch (i) {
127
				case 0 :
128
					test[count].RW = (p[0] == 'R') ? 0 : 1;
129
					break;
130
				case 1 :
131
					test[count].delay = atoi(p);
132
					break;
133
				case 2 :
134
					test[count].last_time = atoi(p);
135
					break;
136
				default: break;
137
			}
138
		}
139
		count++;
140
	}
141
	pthread_t tid[count];
142
	for (i = 0; i < count; i++) {
143
		if (test[i].RW == 0) {
144
			pthread_create(&tid[i], &attr, reader, (void*)&test[i]);
145
		} else {
146
			pthread_create(&tid[i], &attr, writer, (void*)&test[i]);
147
		}
148
	}
149
	sleep(35);
150
	return 0;
151
}

结果

测试文件同上

但是发现,几乎每次写者结束之后都再次抢赢了在等待的读者。造成读者的饥饿。

问题与改进

因为对read_queue和write_mutex的释放几乎同时于是在两条指令之间增加了一条sleep延迟指令(但是不知道这种方法是否合适,从结果来看是合理的)。得到下面的结果

发现写者等待可以阻塞读者,在读者读完后写者写入。

二、哲学家就餐问题(管程实现)

程序

1
#include <stdio.h>
2
#include <pthread.h>
3
#include <unistd.h>
4
#include <semaphore.h>
5
#include <iostream>
6
7
sem_t next;
8
pthread_mutex_t mutex;
9
class monitor {
10
 private:
11
	enum {THIKING, HUNGRY, EATING} state[5];
12
	pthread_cond_t self[5];
13
 public:
14
 	monitor() {
15
	    int i;
16
		for (i = 0; i < 5; i++)
17
			state[i] = THIKING;
18
			pthread_cond_init(&self[i],NULL);
19
	}
20
	
21
	void pickup(int i) {
22
		state[i] = HUNGRY;
23
		test(i);
24
		if (state[i] != EATING) {
25
			pthread_cond_wait(&self[i], &mutex);
26
		}
27
	}
28
29
	void putdown(int i) {
30
		state[i] = THIKING;
31
		printf("Philosopher %d finished \n", i + 1);
32
		test((i + 4) % 5);
33
		test((i + 1) % 5);
34
	}
35
36
	void test(int i) {
37
		if ((state[(i + 4) % 5] != EATING) &&
38
			(state[i] == HUNGRY) &&
39
			(state[(i + 1) % 5] != EATING)) {
40
				state[i] = EATING;
41
				printf("Philosopher %d begin eating\n", i + 1);
42
				pthread_cond_signal(&self[i]);
43
		}
44
	}
45
}dp;
46
47
void *philosopher(void* param) {
48
	int pid = *(int*) param;
49
	while(1) {
50
      	pthread_mutex_lock(&mutex); // 修改
51
		dp.pickup(pid);
52
		
53
        sleep(3);
54
55
		dp.putdown(pid);
56
      	pthread_mutex_unlock(&mutex);
57
	}
58
}
59
60
int main() {
61
	pthread_attr_t attr;
62
	pthread_attr_init(&attr);
63
	pthread_t tid[5];
64
	pthread_mutex_init(&mutex, NULL);
65
	sem_init(&next, 0, 1);
66
	
67
	int id[5];
68
	int i;
69
	for (i = 0; i < 5; i++) {
70
	    id[i] = i;
71
		pthread_create(&tid[i], &attr, philosopher, (void*)&id[i]);
72
	}
73
	sleep(15);
74
	return 0;
75
}

结果

分析

使用了pthread_cond_t 来作为条件变量。使线程在pthread_cond_wait()上进行等待的时候,外面还需要有一个互斥锁来保护条件变量的唤醒不受到竞争。5个哲学家共用筷子的话,一次可以不止一个哲学家在吃。所以就不限制只能一个线程进入管程,在管程内部实现互斥,同个时间内只能有一个线程是活动的。程序员就可以不用显示地进行同步编程。由于pickup和putdown两个动作之间还间隔者一个吃饭的动作,所以putdown和另一个进程的pickup不会构成竞争。putdown对临界区的操作就可以直接还是用mutex作为互斥锁来实现互斥。

进程开始在条件变量上等待或者结束之后,就会释放mutex,其他进程就可以进入管程。这样子可以保证只有一个程序在管程内运行但是有多个在等待。

您的支持将鼓励我继续创作!