Чтение онлайн

ЖАНРЫ

UNIX: взаимодействие процессов

Стивенс Уильям Ричард

Шрифт:

Хотя мы говорим о защите критической области кода программы, на самом деле речь идет о защите данных, с которыми работает эта часть кода. То есть взаимное исключение обычно используется для защиты совместно используемых несколькими потоками или процессами данных.

Взаимные исключения представляют собой блокировку коллективного пользования. Это значит, что если совместно используемые данные представляют собой, например, связный список, то все потоки, работающие с этим списком, должны блокировать взаимное исключение. Ничто не может помешать потоку работать со списком, не заблокировав взаимное исключение. Взаимные исключения предполагают добровольное сотрудничество потоков.

7.3.

Схема производитель-потребитель

Одна из классических задач на синхронизацию называется задачей производителя и потребителя. Она также известна как задача ограниченного буфера. Один или несколько производителей (потоков или процессов) создают данные, которые обрабатываются одним или несколькими потребителями. Эти данные передаются между производителями и потребителями с помощью одной из форм IPC.

С этой задачей мы регулярно сталкиваемся при использовании каналов Unix. Команда интерпретатора, использующая канал

grep pattern chapters.* | wc -l

является примером такой задачи. Программа grep выступает как производитель (единственный), a wc — как потребитель (тоже единственный). Канал используется как форма IPC. Требуемая синхронизация между производителем и потребителем обеспечивается ядром, обрабатывающим команды write производителя и read покупателя. Если производитель опережает потребителя (канал переполняется), ядро приостанавливает производителя при вызове write, пока в канале не появится место. Если потребитель опережает производителя (канал опустошается), ядро приостанавливает потребителя при вызове read, пока в канале не появятся данные.

Такой тип синхронизации называется неявным; производитель и потребитель не знают о том, что синхронизация вообще осуществляется. Если бы мы использовали очередь сообщений Posix или System V в качестве средства IPC между производителем и потребителем, ядро снова взяло бы на себя обеспечение синхронизации.

При использовании разделяемой памяти как средства IPC производителя и потребителя, однако, требуется использование какого-либо вида явной синхронизации. Мы продемонстрируем это на использовании взаимного исключения. Схема рассматриваемого примера изображена на рис. 7.1.

В одном процессе у нас имеется несколько потоков-производителей и один поток-потребитель. Целочисленный массив buff содержит производимые и потребляемые данные (данные совместного пользования). Для простоты производители просто устанавливают значение buff[0] в 0, buff [1] в 1 и т.д. Потребитель перебирает элементы массива, проверяя правильность записей.

В этом первом примере мы концентрируем внимание на синхронизации между отдельными потоками-производителями. Поток-потребитель не будет запущен, пока все производители не завершат свою работу. В листинге 7.1 приведена функция main нашего примера.

Рис. 7.1. Производители и потребитель

Листинг 7.1. [1] Функция main

//mutex/prodcons2.с

1 #include "unpipc.h"

2 #define MAXNITEMS 1000000

3 #define MAXNTHREADS 100

4 int nitems; /* только для чтения потребителем и производителем */

5 struct {

6 pthread_mutex_t mutex;

1

Все

исходные тексты, опубликованные в этой книге, вы можете найти по адресу http://www.piter.com/download.

7 int buff[MAXNITEMS];

8 int nput;

9 int nval;

10 } shared = {

11 PTHREAD_MUTEX_INITIALIZER

12 };

13 void *produce(void *), *consume(void *);

14 int

15 main(int argc, char **argv)

16 {

17 int i, nthreads, count[MAXNTHREADS];

18 pthread_t tid_produce[MAXNTHREADS], tid_consume;

19 if (argc != 3)

20 err_quit("usage: prodcons2 <#items> <#threads>");

21 nitems = min(atoi(argv[1]), MAXNITEMS);

22 nthreads = min(atoi(argv[2]), MAXNTHREADS);

23 Set_concurrency(nthreads);

24 /* запуск всех потоков-производителей */

25 for (i = 0; i < nthreads; i++) {

26 count[i] = 0;

27 Pthread_create(&tid_produce[i], NULL, produce, &count[i]);

28 }

29 /* ожидание завершения всех производителей */

30 for (i = 0; i < nthreads; i++) {

31 Pthread_join(tid_produce[i], NULL);

32 printf("count[%d] = %d\n", i, count[i]);

33 }

34 /* запуск и ожидание завершения потока-потребителя */

35 Pthread_create(&tid_consume, NULL, consume, NULL);

36 Pthread_join(tid_consume, NULL);

37 exit(0);

38 }

Совместное использование глобальных переменных потоками

4-12 Эти переменные совместно используются потоками. Мы объединяем их в структуру с именем shared вместе с взаимным исключением, чтобы подчеркнуть, что доступ к ним можно получить только вместе с ним. Переменная nput хранит индекс следующего элемента массива buff, подлежащего обработке, a nval содержит следующее значение, которое должно быть в него помещено (0, 1, 2 и т.д.). Мы выделяем память под эту структуру и инициализируем взаимное исключение, используемое для синхронизации потоков-производителей.

ПРИМЕЧАНИЕ

Мы всегда будем стараться размещать совместно используемые данные вместе со средствами синхронизации, к ним относящимися (взаимными исключениями, условными переменными, семафорами), в одной структуре, как мы сделали в этом примере. Это хороший стиль программирования. Однако во многих случаях совместно используемые данные являются динамическими, представляя собой, например, связный список. Мы, наверное, сможем поместить в структуру первый элемент списка вместе со средствами синхронизации (как в структуре mq_hdr в листинге 5.16), но оставшаяся часть списка в структуру не попадет. Следовательно, это решение не всегда является идеальным.

Поделиться с друзьями: