在进行linux的编程的时候,经常会遇到一些坑和问题。稍微整理了一下,有遇到的时候可以复习。
比如pthread的使用,创建,取消,阻塞和非阻塞的问题。对于临界区资源的访问,也是一个在多
线程和多进程编程中,所会经常遇到的问题。
一、生产者-消费者问题
生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了两个共享固定大小缓冲区的线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。wiki
关键: 同步概念+互斥概念
同步:当生产者只有当缓存区不满的时候才可以写入数据,消费者只有当缓存区不空的时候才可以读出数据
互斥:生产者在写的时候消费者不可以读,消费者在读的时候生产者不可以写。
线程和进程十分相似,不同的只是线程比进程小。首先,线程采用了多个线程可共享资源的设计思想;例如,它们的操作大部分都是在同一地址空间进行的。其次,从一个线程切换到另一线程所花费的代价比进程低。再次,进程本身的信息在内存中占用的空间比线程大,因此线程更能允分地利用内存。
程序
1 | |
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 | |
8 | typedef int buffer_item; |
9 |
|
10 | buffer_item buffer[BUFFER_SIZE]; |
11 | |
12 | sem_t empty; |
13 | sem_t mutex; |
14 | sem_t full; |
15 | int in; |
16 | int out; |
17 | // producer |
18 | int insert_item(buffer_item item) { |
19 | // insert item into buffer |
20 | sem_wait(&empty); // 同步信号量 |
21 | sem_wait(&mutex); // 互斥信号量 |
22 | |
23 | // add nextp to buffer |
24 | buffer[in] = item; |
25 | in = (in + 1) % BUFFER_SIZE; |
26 | |
27 | sem_post(&mutex); |
28 | sem_post(&full); |
29 | return 0; |
30 | } |
31 | int remove_item(buffer_item *item) { |
32 | sem_wait(&full); |
33 | sem_wait(&mutex); |
34 | |
35 | // remove an item from buffer to nextc |
36 | *item = buffer[out]; |
37 | out = (out + 1) % BUFFER_SIZE; |
38 | |
39 | sem_post(&mutex); |
40 | sem_post(&empty); |
41 | return 0; |
42 | } |
43 | |
44 | void *producer(void *param) { |
45 | pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); |
46 | pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); |
47 | buffer_item random; |
48 | while (1) { |
49 | sleep(rand()%10); |
50 | random = rand(); |
51 | printf("producer produced %d\n", random); |
52 | if (insert_item(random)) |
53 | printf("report error condition\n"); |
54 | } |
55 | } |
56 | |
57 | void *consumer(void *param) { |
58 | pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); |
59 | pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); |
60 | buffer_item random; |
61 | while(1) { |
62 | sleep(rand()%10); |
63 | if (remove_item(&random)) |
64 | printf("report error condition\n"); |
65 | else |
66 | printf("consumer consumed %d\n", random); |
67 | } |
68 | } |
69 | int main(int argc, char *argv[]){ |
70 | // get command line |
71 | if (argc < 4) { |
72 | printf("please input sleep time, number of producer and consumer\n"); |
73 | return 0; |
74 | } |
75 | int sleep_t = atoi(argv[1]); |
76 | int producer_n = atoi(argv[2]); |
77 | int consumer_n = atoi(argv[3]); |
78 | // initialize buffer |
79 | int i = 0; |
80 | sem_init(&empty, 0, BUFFER_SIZE); |
81 | sem_init(&mutex, 0, 1); |
82 | sem_init(&full, 0, 0); |
83 | in = 0; |
84 | out = 0; |
85 | exit_pthread = 0; |
86 | for (i = 0; i < BUFFER_SIZE; i++) { |
87 | buffer[i] = 0; |
88 | } |
89 | // create producer pthread |
90 | pthread_t tid1[producer_n], tid2[consumer_n]; |
91 | pthread_attr_t attr; |
92 | /*get the default attributes */ |
93 | pthread_attr_init(&attr); |
94 | /* create pthreads */ |
95 | for (i = 0; i < 5; i++) |
96 | pthread_create(&tid1[i], &attr, producer, NULL); |
97 | |
98 | // create consumer pthread |
99 | for (i = 0; i < 5; i++) |
100 | pthread_create(&tid2[i], &attr, consumer, NULL); |
101 | sleep(sleep_t); // seconds |
102 | return 0; |
103 | } |
结果
分析
生产者和消费者的程序设计主要采用了sem_t二进制信号量(初始化为1)来实现互斥,sem_t计数信号量来实现同步。linux下的wait和signal函数和课本上的同步功能描述的函数不是相同的。命令行参数中读取
- 程序终止前的休眠时间
- 生产者线程数量
- 消费者线程数量
其中,主进程结束线程其实也会自动结束。mutex也可以使用pthread_mutex互斥信号量来实现。
观察程序的输出结果可以发现,生产者生产之后,消费者才可以进行消费。
几点要注意的地方:
- sleep的单位是以秒计算
- 在linux系统下,使用了pthread的进行编译需要加上-pthread才能正常链接到pthread的库才可以正常通过编译,而osx下不需要。(或者-lrt可以更灵活地链接到POSIX标准下的所有库)
- osx下使用不了未命名的sem函数
参考
stack overflow:Undefined Reference issues using Semaphores
二、读者-写者问题
1.读者优先
定义
如果一个读者申请进行读操作时已有另一个读者正在进行读操作,则该读者可直接开始读操作。
实现
通过建立一个在读取的读者队列(并非真正使用队列,使用reader_count来对正在读的读者进行计数),当读者队列有读者在读取的时候就阻塞写者。没有读者在读的时候就不阻塞写者。
程序
1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 | |
8 |
|
9 | // 不加typedef 声明的时候要加struct |
10 | typedef struct data { |
11 | int pid; |
12 | int RW; |
13 | int delay; |
14 | int last_time; |
15 | }data; |
16 | sem_t wrt; |
17 | sem_t read_mutex; |
18 | sem_t write_mutex; |
19 | int read_count; |
20 | |
21 | int shared_int; |
22 | |
23 | void *reader(void* param) { |
24 | data test = *(data*) param; |
25 | sleep(test.delay); |
26 | do { |
27 | sem_wait(&read_mutex); |
28 | read_count++; |
29 | // 第一个读者到来的时候,必须申请临界区资源 |
30 | // 当有写者在写的时候必须等待 |
31 | if (read_count == 1) { |
32 | sem_wait(&wrt); |
33 | } |
34 | sem_post(&read_mutex); |
35 | |
36 | printf("Reader %d read data: %d, %d reading\n", test.pid, shared_int, read_count); |
37 | sleep(test.last_time); // 读所持续的时间 |
38 | |
39 | sem_wait(&read_mutex); |
40 | read_count--; |
41 | if (read_count == 0) { |
42 | sem_post(&wrt); |
43 | } |
44 | sem_post(&read_mutex); |
45 | } while(1); |
46 | } |
47 | |
48 | void *writer(void* param) { |
49 | data test = *(data*) param; |
50 | sleep(test.delay); |
51 | do { |
52 | //sem_wait(&write_mutex); // 防止读者与写者抢,有一个写者在临界区,一个读者一个写者等待的情况 |
53 | sem_wait(&wrt); |
54 | |
55 | shared_int = rand(); |
56 | printf("Writer %d write data %d\n", test.pid, shared_int); |
57 | sleep(test.last_time); |
58 | |
59 | sem_post(&wrt); |
60 | //sem_post(&write_mutex); |
61 | } while(1); |
62 | } |
63 | |
64 | int main(int argc, char *argv[]) { |
65 | // 初始化 |
66 | read_count = 0; |
67 | sem_init(&wrt, 0, 1); |
68 | sem_init(&read_mutex, 0, 1); |
69 | sem_init(&write_mutex, 0, 1); |
70 | |
71 | shared_int = 0; |
72 | FILE *stream; |
73 | char cmd[10]; |
74 | if (argc < 2) { |
75 | fprintf(stderr ,"Please input test file\n"); |
76 | return 1; |
77 | } |
78 | if ((stream = fopen(argv[1],"r")) == NULL) { |
79 | fprintf(stderr, "Cannot open file %s\n", argv[1]); |
80 | return 1; |
81 | } |
82 | pthread_attr_t attr; |
83 | pthread_attr_init(&attr); |
84 | int reader_num = 0; |
85 | int writer_num = 0; |
86 | int max_wait_time = 0; |
87 | int count = 0; |
88 | int i = 0; |
89 | data test[MAX_DATA_NUM]; |
90 | while (fgets(cmd,20,stream) != NULL && count <= MAX_DATA_NUM) { |
91 | if (cmd[0] == '\n') break; |
92 | printf("%s", cmd); |
93 | char *p; |
94 | /*在第一次调用时,strtok()必需给予参数s 字符串,往后的调用则将参数s 设置成NULL。 |
95 | 每次调用成功则返回下一个分割后的字符串指针.内部的实现可以用一个static变量来保存指针*/ |
96 | p = strtok(cmd, " "); |
97 | if (p) { |
98 | test[count].pid = atoi(p); |
99 | } |
100 | for (i = 0; i < 3; i++) { |
101 | p = strtok(NULL, " "); |
102 | switch (i) { |
103 | case 0 : |
104 | test[count].RW = (p[0] == 'R') ? 0 : 1; |
105 | break; |
106 | case 1 : |
107 | test[count].delay = atoi(p); |
108 | break; |
109 | case 2 : |
110 | test[count].last_time = atoi(p); |
111 | break; |
112 | default: break; |
113 | } |
114 | } |
115 | count++; |
116 | } |
117 | pthread_t tid[count]; |
118 | for (i = 0; i < count; i++) { |
119 | if (test[i].RW == 0) { |
120 | pthread_create(&tid[i], &attr, reader, (void*)&test[i]); |
121 | } else { |
122 | pthread_create(&tid[i], &attr, writer, (void*)&test[i]); |
123 | } |
124 | } |
125 | sleep(15); |
126 | return 0; |
127 | } |
测试文件
1 | 1 R 3 5 |
2 | 2 R 3 5 |
3 | 3 R 3 5 |
4 | 4 W 4 5 |
5 | 5 R 4 5 |
6 | 6 R 5 5 |
7 | 7 R 5 5 |
8 | 8 R 5 5 |
9 | 9 R 6 5 |
10 | 10 R 6 5 |
结果
分析
在读者优先中,只有在密集的读者中插入一个写者。写者遇到了长时间的饥饿。
存在的问题
在这种算法中,如果有一个写者进入了临界区。有一个读者和一个写者在等待。第一个写者出临界区释放wrt之后,第二个写者是不是需要和读者抢占,是不是就不能保证读者优先进入。这种情况在读者优先的条件中吗?
2.写者优先
定义
如果一个读者申请进行读操作时已有另一写者在等待访问共享资源,则该读者必须等到没有写者处于等待状态后才能开始读操作。
实现
有写者开始等待,就要开始阻塞读者进入读者队列进行读取。当读者队列为0,写者可以开始写。之后的情况就跟读者优先相同,主要是实现写-读的互斥,只有在写完之后才能读。主要就是需要增加一个写者队列让写者进行等待,和一个新的二进制信号量,在写者开始等待的时候就阻塞读者进入读者队列。
程序
1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 |
|
7 | |
8 |
|
9 | // 不加typedef 声明的时候要加struct |
10 | typedef struct data { |
11 | int pid; |
12 | int RW; |
13 | int delay; |
14 | int last_time; |
15 | }data; |
16 | |
17 | sem_t wrt; |
18 | sem_t read_mutex; |
19 | sem_t write_mutex; |
20 | sem_t read_queue; |
21 | |
22 | int read_count; |
23 | int write_count; |
24 | |
25 | int shared_int; |
26 | |
27 | void *reader(void* param) { |
28 | |
29 | data test = *(data*) param; |
30 | sleep(test.delay); |
31 | |
32 | do { |
33 | // 增加一个互斥量read,在有写者在写或等待的时候必须等待 |
34 | // 也就是说写者队列有写者的时候都必须进行等待。 |
35 | |
36 | sem_wait(&read_queue); // 当写者抢占到之后,读者被阻塞在此,不能进队 |
37 | sem_wait(&read_mutex); |
38 | read_count++; |
39 | // 第一个读者到来的时候,必须申请临界区资源 |
40 | // 当有写者在写的时候必须等待 |
41 | if (read_count == 1) |
42 | sem_wait(&wrt); |
43 | sem_post(&read_mutex); |
44 | sem_post(&read_queue); // 写者的抢占时机,抢占到之后只需要等待读者队列全部读完就可以。 |
45 | |
46 | printf("Reader %d read data: %d, %d reading\n", test.pid, shared_int); |
47 | sleep(t\est.last_time); // 读所持续的时间 |
48 | |
49 | sem_wait(&read_mutex); |
50 | read_count--; |
51 | if (read_count == 0) |
52 | sem_post(&wrt); |
53 | sem_post(&read_mutex); |
54 | } while(1); |
55 | } |
56 | |
57 | void *writer(void* param) { |
58 | |
59 | data test = *(data*) param; |
60 | sleep(test.delay); |
61 | |
62 | do { |
63 | sem_wait(&write_mutex); |
64 | write_count++; |
65 | if (write_count == 1) // 写者队列有人就阻塞读者 |
66 | sem_wait(&read_queue); |
67 | sem_post(&write_mutex); |
68 | |
69 | sem_wait(&wrt); |
70 | |
71 | shared_int = rand(); |
72 | printf("Writer %d write data %d, %d writing\n", test.pid, shared_int, write_count); |
73 | sleep(test.last_time); |
74 | |
75 | sem_post(&wrt); |
76 | |
77 | sem_wait(&write_mutex); |
78 | write_count--; |
79 | if (write_count == 0) |
80 | sem_post(&read_queue); |
81 | sem_post(&write_mutex); |
82 | } while(1); |
83 | } |
84 | |
85 | int main(int argc, char *argv[]) { |
86 | // 初始化 |
87 | read_count = 0; |
88 | write_count = 0; |
89 | sem_init(&wrt, 0, 1); |
90 | sem_init(&read_queue, 0, 1); |
91 | sem_init(&read_mutex, 0, 1); |
92 | sem_init(&write_mutex, 0, 1); |
93 | |
94 | |
95 | shared_int = 0; |
96 | FILE *stream; |
97 | char cmd[10]; |
98 | if (argc < 2) { |
99 | fprintf(stderr ,"Please input test file\n"); |
100 | return 1; |
101 | } |
102 | if ((stream = fopen(argv[1],"r")) == NULL) { |
103 | fprintf(stderr, "Cannot open file %s\n", argv[1]); |
104 | return 1; |
105 | } |
106 | pthread_attr_t attr; |
107 | pthread_attr_init(&attr); |
108 | int reader_num = 0; |
109 | int writer_num = 0; |
110 | int max_wait_time = 0; |
111 | int count = 0; |
112 | int i = 0; |
113 | data test[MAX_DATA_NUM]; |
114 | while (fgets(cmd,20,stream) != NULL) { |
115 | if (cmd[0] == '\n') break; |
116 | printf("%s", cmd); |
117 | char *p; |
118 | /*在第一次调用时,strtok()必需给予参数s 字符串,往后的调用则将参数s 设置成NULL。 |
119 | 每次调用成功则返回下一个分割后的字符串指针.内部的实现可以用一个static变量来保存指针*/ |
120 | p = strtok(cmd, " "); |
121 | if (p) { |
122 | test[count].pid = atoi(p); |
123 | } |
124 | for (i = 0; i < 3; i++) { |
125 | p = strtok(NULL, " "); |
126 | switch (i) { |
127 | case 0 : |
128 | test[count].RW = (p[0] == 'R') ? 0 : 1; |
129 | break; |
130 | case 1 : |
131 | test[count].delay = atoi(p); |
132 | break; |
133 | case 2 : |
134 | test[count].last_time = atoi(p); |
135 | break; |
136 | default: break; |
137 | } |
138 | } |
139 | count++; |
140 | } |
141 | pthread_t tid[count]; |
142 | for (i = 0; i < count; i++) { |
143 | if (test[i].RW == 0) { |
144 | pthread_create(&tid[i], &attr, reader, (void*)&test[i]); |
145 | } else { |
146 | pthread_create(&tid[i], &attr, writer, (void*)&test[i]); |
147 | } |
148 | } |
149 | sleep(35); |
150 | return 0; |
151 | } |
结果
测试文件同上
但是发现,几乎每次写者结束之后都再次抢赢了在等待的读者。造成读者的饥饿。
问题与改进
因为对read_queue和write_mutex的释放几乎同时于是在两条指令之间增加了一条sleep延迟指令(但是不知道这种方法是否合适,从结果来看是合理的)。得到下面的结果
发现写者等待可以阻塞读者,在读者读完后写者写入。
二、哲学家就餐问题(管程实现)
程序
1 |
|
2 |
|
3 |
|
4 |
|
5 |
|
6 | |
7 | sem_t next; |
8 | pthread_mutex_t mutex; |
9 | class monitor { |
10 | private: |
11 | enum {THIKING, HUNGRY, EATING} state[5]; |
12 | pthread_cond_t self[5]; |
13 | public: |
14 | monitor() { |
15 | int i; |
16 | for (i = 0; i < 5; i++) |
17 | state[i] = THIKING; |
18 | pthread_cond_init(&self[i],NULL); |
19 | } |
20 | |
21 | void pickup(int i) { |
22 | state[i] = HUNGRY; |
23 | test(i); |
24 | if (state[i] != EATING) { |
25 | pthread_cond_wait(&self[i], &mutex); |
26 | } |
27 | } |
28 | |
29 | void putdown(int i) { |
30 | state[i] = THIKING; |
31 | printf("Philosopher %d finished \n", i + 1); |
32 | test((i + 4) % 5); |
33 | test((i + 1) % 5); |
34 | } |
35 | |
36 | void test(int i) { |
37 | if ((state[(i + 4) % 5] != EATING) && |
38 | (state[i] == HUNGRY) && |
39 | (state[(i + 1) % 5] != EATING)) { |
40 | state[i] = EATING; |
41 | printf("Philosopher %d begin eating\n", i + 1); |
42 | pthread_cond_signal(&self[i]); |
43 | } |
44 | } |
45 | }dp; |
46 | |
47 | void *philosopher(void* param) { |
48 | int pid = *(int*) param; |
49 | while(1) { |
50 | pthread_mutex_lock(&mutex); // 修改 |
51 | dp.pickup(pid); |
52 | |
53 | sleep(3); |
54 | |
55 | dp.putdown(pid); |
56 | pthread_mutex_unlock(&mutex); |
57 | } |
58 | } |
59 | |
60 | int main() { |
61 | pthread_attr_t attr; |
62 | pthread_attr_init(&attr); |
63 | pthread_t tid[5]; |
64 | pthread_mutex_init(&mutex, NULL); |
65 | sem_init(&next, 0, 1); |
66 | |
67 | int id[5]; |
68 | int i; |
69 | for (i = 0; i < 5; i++) { |
70 | id[i] = i; |
71 | pthread_create(&tid[i], &attr, philosopher, (void*)&id[i]); |
72 | } |
73 | sleep(15); |
74 | return 0; |
75 | } |
结果
分析
使用了pthread_cond_t 来作为条件变量。使线程在pthread_cond_wait()上进行等待的时候,外面还需要有一个互斥锁来保护条件变量的唤醒不受到竞争。5个哲学家共用筷子的话,一次可以不止一个哲学家在吃。所以就不限制只能一个线程进入管程,在管程内部实现互斥,同个时间内只能有一个线程是活动的。程序员就可以不用显示地进行同步编程。由于pickup和putdown两个动作之间还间隔者一个吃饭的动作,所以putdown和另一个进程的pickup不会构成竞争。putdown对临界区的操作就可以直接还是用mutex作为互斥锁来实现互斥。
进程开始在条件变量上等待或者结束之后,就会释放mutex,其他进程就可以进入管程。这样子可以保证只有一个程序在管程内运行但是有多个在等待。