Чтение онлайн

ЖАНРЫ

UNIX: взаимодействие процессов

Стивенс Уильям Ричард

Шрифт:
Запуск всех потоков

21-41 Инициализируем семафоры и запускаем потоки-производители и поток-потребитель. Затем ожидается завершение работы потоков. Эта часть кода практически идентична листингу 7.1.

В листинге 10.13 приведен текст функции produce, которая выполняется каждым потоком-производителем.

Листинг 10.13. Функция, выполняемая всеми потоками-производителями

//pxsem/prodcons3.c

43 void *

44 produce(void *arg)

45 {

46 for (;;) {

47 Sem_wait(&shared.nempty); /*
ожидание освобождения поля */

48 Sem_wait(&shared.mutex);

49 if (shared.nput >= nitems) {

50 Sem_post(&shared.nempty);

51 Sem_post(&shared.mutex);

52 return(NULL); /* готово */

53 }

54 shared.buff[shared.nput % NBUFF] = shared.nputval;

55 shared.nput++;

56 shared.nputval++;

57 Sem_post(&shared.mutex);

58 Sem_post(&shared.nstored); /* еще один элемент */

59 *((int *) arg) += 1;

60 }

61 }

Взаимное исключение между потоками-производителями

49-53 Отличие от листинга 10.8 в том, что цикл завершается, когда nitems объектов будет помещено в буфер всеми потоками. Обратите внимание, что потоки-производители могут получить семафор nempty в любой момент, но только один производитель может иметь семафор mutex. Это защищает переменные nput и nval от одновременного изменения несколькими производителями.

Завершение производителей

50-51 Нам нужно аккуратно обработать завершение потоков-производителей. После того как последний объект помещен в буфер, каждый поток выполняет

Sem_wait(&shared.nempty); /* ожидание пустого поля */

в начале цикла, что уменьшает значение семафора nempty. Но прежде, чем поток будет завершен, он должен увеличить значение этого семафора, потому что он не помещает объект в буфер в последнем проходе цикла. Завершающий работу поток должен также освободить семафор mutex, чтобы другие производители смогли продолжить функционирование. Если мы не увеличим семафор nempty по завершении процесса и если производителей будет больше, чем мест в буфере, лишние потоки будут заблокированы навсегда, ожидая освобождения семафора nempty, и никогда не завершат свою работу.

Функция consume в листинге 10.14 проверяет правильность всех записей в буфере, выводя сообщение при обнаружении ошибки.

Листинг 10.14. Функция, выполняемая потоком-потребителем

//pxsem/prodcons3.с

62 void *

63 consume(void *arg)

64 {

65 int i;

66 for (i = 0; i < nitems; i++) {

67 Sem_wait(&shared.nstored); /* ожидание помещения по крайней мере одного элемента в буфер */

68 Sem_wait(&shared.mutex);

69 if (shared.buff[i % NBUFF] != i)

70 printf("error: buff[%d] = %d\n", i, shared.buff[i % NBUFF]);

71 Sem_post(&shared.mutex);

72 Sem_post(&shared.nempty); /* еще одно пустое поле */

73 }

74 return(NULL);

75 }

Условие

завершения единственного потока-потребителя звучит просто: он считает все потребленные объекты и останавливается по достижении nitems.

10.10. Несколько производителей, несколько потребителей

Следующее изменение, которое мы внесем в нашу пpoгрaммy, будет заключаться в добавлении возможности одновременной работы нескольких потребителей вместе с несколькими производителями. Есть ли смысл в наличии нескольких потребителей — зависит от приложения. Автор видел два примера, в которых использовался этот метод.

1. Пpoгрaммa преобразования IP-адресов в имена узлов. Каждый потребитель берет IP-адрес, вызывает gethostbyaddr (раздел 9.6 [24]), затем дописывает имя узла к файлу. Поскольку каждый вызов gethostbyaddr обрабатывается неопределенное время, порядок IP-адресов в буфере будет, скорее всего, отличаться от порядка имен узлов в файле, созданном потоками-потребителями. Преимущество этой схемы в параллельности выполнения вызовов gethostbyaddr (каждый из которых может работать несколько секунд) — по одному на каждый поток-потребитель.

ПРИМЕЧАНИЕ

Предполагается наличие версии gethostbyaddr, допускающей многократное вхождение, что не всегда верно. Если эта версия недоступна, можно хранить буфер в разделяемой памяти и использовать процессы вместо потоков. 

2. Программа, принимающая дейтаграммы UDP, обрабатывающая их и записывающая результат в базу данных. Каждая дeйтaгрaммa обрабатывается одним потоком-потребителем, которые выполняются параллельно для ускорения процесса. Хотя дейтаграммы записываются в базу данных в порядке, вообще говоря, отличном от порядка их приема, встроенная схема упорядочения записей в базе данных справляется с этой проблемой.

В листинге 10.15 приведены глобальные переменные программы.

Листинг 10.15. Глобальные переменные

//pxsem/prodcons4.с

1 #include "unpipc.h"

2 #define NBUFF 10

3 #define MAXNTHREADS 100

4 int nitems, nproducers, nconsumers; /* только для чтения */

5 struct { /* общие данные производителей и потребителей */

6 int buff[NBUFF];

7 int nput; /* номер объекта: 0, 1. 2, … */

8 int nputval; /* сохраняемое в buff[] значение */

9 int nget; /* номер объекта: 0, 1, 2, … */

10 int ngetval; /* получаемое из buff[] значение */

11 sem_t mutex, nempty, nstored; /* семафоры, а не указатели */

12 } shared;

13 void *produce(void *), *consume(void *);

Глобальные переменные и общая структура

4-12 Количество потоков-потребителей является глобальной переменной, устанавливаемой из командной строки. В структуру shared добавилось два новых поля: nget — номер следующего объекта, получаемого одним из потоков-потребителей, и ngetval — соответствующее значение.

Функция main, текст которой приведен в листинге 10.16, запускает несколько потоков-потребителей и потоков-производителей одновременно.

19-23 Новый аргумент командной строки указывает количество потоков-потребителей. Для хранения идентификаторов потоков-потребителей выделяется место под специальный массив (tid_consume), а для подсчета обработанных каждым потоком объектов выделяется массив conscount.

Поделиться с друзьями: