Чтение онлайн

ЖАНРЫ

QNX/UNIX: Анатомия параллелизма
Шрифт:

Динамический пул потоков нужен разработчикам QNX в первую очередь как инструмент построения многопоточных менеджеров ресурсов - основы построения сервисов ОС QNX. Но и помимо этой цели динамический пул потоков представляет собой мощнейшее средство для конструирования параллельных механизмов обработки.

Проиллюстрируем применение динамического пула потоков примером программного кода, который был нами описан в книге [4] в главе «Сервер TCP/IP... много серверов хороших и разных». По сути, это ретранслирующий TCP/IP-сервер, но сейчас это для нас неважно:

Сервер на базе динамического пула потоков

#include <pthread.h>

#include <sys/dispatch.h>

static int ls; //
прослушивающий TCP-сокет

THREAD_POOL_PARAM_T* alloc(THREAD_POOL_HANDLE_T* h) {

return (THREAD_POOL_PARAM_T*)h;

}

// функция блокирования пула потоков

THREAD_POOL_PARAM_T* block(THREAD_POOL_PARAM_T* p) {

int rs = accept(ls, NULL, NULL);

if (rs < 0) errx("accept error");

return(THREAD_POOL_PARAM_T*)rs;

}

int handler(THREAD_POOL_PARAM_T* p) {

retrans((int)p);

close((int)p);

delay(250);

cout << pthread_self << flush;

return 0;

}

int main(int argc, char* argv[]) {

// создать TCP-сокет на порт

ls = getsocket(THREAD_POOL_PORT);

// создание атрибутной записи пула потоков:

thread_pool_attr_t attr;

memset(&attr, 0, sizeof(thread_pool_attr_t));

// заполнение блока атрибутов пула

/* - mm число блокированных потоков в пуле */

attr.lo_water = 3;

/* - max число блокированных потоков в пуле */

attr.hi_water = 7;

/* - инкремент шага создания потоков */

attr.increment = 2;

attr.maximum = 9;

/* - общий предел числа потоков в пуле */

attr.handle = dispatch_create;

attr.context_alloc = alloc;

attr.block_func = block;

attr.handler_func = handler;

// фактическое создание пула потоков:

void* tpp = thread_pool_create(&attr, POOL_FLAG_USE_SELF);

if (tpp == NULL) errx("create pool");

//
начало функционирования пула потоков:

thread_pool_start(tpp);

// ... выполнение никогда не дойдет до этой точки!

exit(EXIT_SUCCESS);

}

Примечание

В примере используются, но не определены две функции, которые не столь существенны для понимания примера сточки зрения функционирования пула:

errx
— реакция на ошибку выполнения с выводом сообщения и последующим аварийным завершением;

retrans
— прием сообщения с присоединенного TCP-сокета с последующей ретрансляцией полученного содержимого в него же.

Итак, первая особенность пула потоков в том, что мы построили многопоточный сервер, почти не прописывая собственного кода, — большую часть рутинной работы за нас сделала библиотека пула.

Приведем описание логики работы пула потоков и показанного примера на самом качественном, простейшем уровне:

• Первоначально (при запуске пула потоков в работу вызовом

thread_pool_start
) создается
attr.lo_water
потоков («нижняя ватерлиния» числа блокированных потоков).

• При создании любого потока (как в процессе начального, так и в процессе последующего создания) вызывается функция

attr.соntext_alloc
(в контексте созданного потока).

• По завершении функция вызывает блокирующую функцию потока

attr.block_func
, на которой созданный поток ожидает события активизации (в показанном примере событие активизации — это установление соединения новым клиентом по возврату из
accept
).

• Блокирующая функция после наступления события активизации переведет поток в состояние READY и вызовет в контексте этого потока функцию обработчика

attr.handler_func
.

• Если после предыдущего шага число оставшихся заблокированных потоков станет ниже

attr.lo_water
, механизм пула создаст дополнительно
attr.increment
потоков и «доведет» их до блокирующей функции.

• Активизированный поток производит всю обработку, предписанную функцией потока, и после выполнения потоковой функции будет опять переведен в блокированное состояние в функции блокирования…

• …но перед переводом потока вновь в блокированное состояние проверяется, не будет ли при этом превышено число блокированных потоков

attr.hi_water
(«верхняя ватерлиния»), и если это имеет место, то поток вместо перевода в блокированное состояние самоуничтожается.

• Все проверки числа потоков производятся для того, чтобы общее число потоков пула (т. e. число активизированных потоков вместе с блокированными) не превышало общее ограничение

attr.maximum
.

Разобрав общую логику функционирования пула потоков, можно теперь детальнее рассмотреть отдельные шаги всего процесса:

1. Прежде чем создавать пул потоков, мы должны создать атрибутную запись, определяющую все поведение пула. Атрибутная запись описана так (

<sys/dispatch.h>
):

Поделиться с друзьями: