38 /* ожидание завершения всех производителей и потребителей */
39 for (i = 0; i < nproducers: i++) {
40 Pthread_join(tid_produce[i], NULL);
41 printf("producer count[%d] = %d\n", i, prodcount[i]);
42 }
43 for (i = 0; i < nconsumers; i++) {
44 Pthread_join(tid_consume[i], NULL);
45 printf("consumer count[%d] = %d\n", i, conscount[i]);
46 }
47 Sem_destroy(&shared.mutex);
48 Sem_destroy(&shared.nempty);
49 Sem_destroy(&shared.nstored);
50 exit(0);
51 }
Функция produce содержит одну новую строку по сравнению с листингом 10.13. В части кода, относящейся к завершению потока-производителя, появляется строка, отмеченная знаком +:
if (shared.nput >= nitems) {
+ Sem_post(&shared.nstored); /* даем возможность потребителям завершить работу */
Sem_post(&shared.nempty);
Sem_post(&shared.mutex);
return(NULL); /* готово */
}
Снова нам нужно быть аккуратными при обработке завершения процессов-производителей и потребителей. После обработки всех объектов в буфере все потребители блокируются в вызове
Sem_wait(&shared.nstored); /* Ожидание помещения объекта в буфер */
Производителям приходится увеличивать семафор nstored для разблокирования потрeбитeлeй, чтобы они узнали, что работа
завершена. Функция consume приведена в листинге 10.17.
Листинг 10.17. Функция, выполняемая всеми потоками-потребителями
//pxsem/prodcons4.c
72 void *
73 consume(void *arg)
74 {
75 int i;
76 for (;;) {
77 Sem_wait(&shared.nstored); /* ожидание помещения объекта в буфер */
78 Sem_wait(&shared.mutex);
79 if (shared.nget >= nitems) {
80 Sem_post(&shared.nstored);
81 Sem_post(&shared.mutex);
82 return(NULL); /* готово */
83 }
84 i = shared.nget % NBUFF;
85 if (shared.buff[i] != shared.ngetval)
86 printf("error: buff[%d] = %d\n", i, shared.buff[i]);
87 shared.nget++;
88 shared.ngetval++;
89 Sem_post(&shared.mutex);
90 Sem_post(&shared.nempty); /* освобождается место для элемента */
91 *((int *) arg) += 1;
92 }
93 }
Завершение потоков-потребителей
79-83 Функция consume сравнивает nget и nitems, чтобы узнать, когда следует остановиться (аналогично функции produce). Обработав последний объект в буфере, потоки-потребители блокируются, ожидая изменения семафора nstored. Когда завершается очередной поток-потребитель, он увеличивает семафор nstored, давая возможность завершить работу другому потоку-потребителю.
10.11. Несколько буферов
Во многих программах, обрабатывающих какие-либо данные, можно встретить цикл вида
while ((n = read(fdin, buff, BUFFSIZE)) > 0) {
/* обработка данных */
write(fdout, buff, n);
}
Например, программы, обрабатывающие текстовые файлы, считывают строку из входного файла, выполняют с ней некоторые действия, а затем записывают строку в выходной файл. Для текстовых файлов вызовы read и write часто заменяются на функции стандартной библиотеки ввода-вывода fgets и fputs.
На рис. 10.11 изображена иллюстрация к такой схеме. Здесь функция reader считывает данные из входного файла, а функция writer записывает данные в выходной файл. Используется один буфер.
Рис. 10.10. Процесс считывает данные в буфер, а потом записывает его содержимое в другой файл
Рис. 10.11. Один процесс, считывающий данные в буфер и записывающий их в файл
На рис. 10.10 приведена временная диаграмма работы такой программы. Числа слева проставлены в условных единицах времени. Предполагается, что операция чтения занимает 5 единиц, записи — 7, а обработка данных между считыванием и записью требует 2 единицы времени.