Чтение онлайн

ЖАНРЫ

QNX/UNIX: Анатомия параллелизма
Шрифт:

Детальнее это выглядит так: в коде сервера именно тот поток, который выполнит

MsgReceive*(chid, ...)
, и будет заблокирован в ожидании запроса от клиента
MsgSend*
. Аналогично и в коде клиента вся последовательность выполнения блокировок, обозначенная выше, будет относиться именно к потоку, выполняющему последовательные операции:

coid = ConnectAttach(... , chid, ...);

MsgSend*(coid, ...);

Содержимое двух предыдущих абзацев ни одной буквой не противоречит и не отменяет положения традиционного изложения [1] технологии обмена сообщениями микроядра. Тогда зачем же мы даем именно такую

формулировку? Для того чтобы акцентировать внимание на том, что все блокированные состояния и их освобождение имеют смысл относительно потоков (и только потоков!), которые выполняют последовательность операций
MsgSend*
MsgReceive*
MsgReply*
(даже если это единственный поток — главный поток приложения, и тогда мы говорим о блокировании процессов). Проиллюстрируем сказанное следующим приложением ( файл n1.cc):

Обмен сообщениями и взаимные блокировки

#include <stdlib.h>

#include <stdio.h>

#include <unistd.h>

#include <inttypes.h>

#include <errno.h>

#include <iostream.h>

#include <pthread.h>

#include <signal.h>

#include <sys/neutrino.h>

#include <sys/syspage.h>

static const int TEMP = 500; // темп выполнения приложения

static int numclient = 1; // число потоков клиентов

// многопотоковая версия вывода диагностики в поток:

iostream& operator <<(iostream& с, char* s) {

static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

pthread_mutex_lock(&mutex);

c << s << flush;

pthread_mutex_unlock(&mutex);

return c;

}

static uint64_t tb; // временная отметка начала приложения

// временная отметка точки вызова:

inline uint64_t mark(void) {

// частота процессора:

const static uint64_t cps =

SYSPAGE_ENTRY(qtime)->cycles_per_sec;

return (ClockCycles - tb) * 1000 / cps;

}

const int MSGLEN = 80;

// потоковая функция сервера:

void* server(void* chid) {

int rcvid;

char message[MSGLEN];

while (true) {

rcvid = MsgReceive((int)chid, message, MSGLEN, NULL);

sprintf(message + strlen(message), "[%07llu] ... ", mark);

delay(TEMP); // имитация обслуживания

sprintf(message + strlen(message), [%07llu]->", mark);

MsgReply(rcvid, EOK, message, strlen(message) + 1);

}

return NULL;

}

//
потоковая функция клиента:

void* client(void* data) {

while (true) {

char message[MSGLEN];

sprintf(message, "%d:\t[%07llu]->", pthread_self, mark);

MsgSend((int)data, message, strlen(message) + 1, message, MSGLEN);

sprintf(message + strlen(message), "[%07llu]", mark);

cout << message << endl;

static unsigned int seed = 0;

delay(numclient*(((long)rand_r(&seed ) * TEMP / RAND_MAX) + TEMP));

// имитация вычислений...

}

return NULL;

}

int main(int argc, char** argv) {

// 1-й параметр - число потоков клиентов:

if (argc > 1 && atoi(argv[1]) > 0)

numclient = atoi(argv[1]);

tb = ClockCycles;

int chid = ChannelCreate(0);

if (pthread_create(NULL, NULL, server, (void*)chid) != EOK)

perror("server create"), exit(EXIT_FAILURE);

for (int i = 0; i < numclient; i++)

if (pthread_create(NULL, NULL, client,

(void*)ConnectAttach(0, 0, chid, _NTO_SIDE_CHANNEL, 0)) != EOK)

perror("client create"), exit(EXIT_FAILURE);

sigpause(SIGINT);

return EXIT_SUCCESS;

}

Все происходит в рамках единого процесса:

• Создается единый поток сервера, ожидающий сообщений от клиентов и отвечающий на них.

• Создается N потоков клиентов (задается параметром командной строки запуска приложения), которые будут обращаться к серверу.

• К одному каналу сервера устанавливается N соединений от клиентов.

• Канал прослушивания для сервера и идентификаторы соединений для клиентов сознательно создаются в главном потоке (т.e. вне потоков, которые их будут использовать); их значения поступают в потоки (сервера и клиентов) как параметры потоковых функций (трюк с подменой целочисленных значений на указатели мы рассматривали ранее).

• Сообщение продвигается от клиента к серверу и обратно к клиенту; в ходе пересылки объем сообщения нарастает: оно образуется конкатенацией полей, добавляемых последовательно клиентом, сервером и снова клиентом.

• В результате полного цикла обмена сообщением в теле самого сообщения формируется текст, содержащий 5 последовательных полей — идентификатор потока клиента (обращающегося с сообщением) и 4 абсолютные временные метки (в миллисекундах): передачи сообщения клиентом, приема сообщения сервером (начало обработки), ответа на сообщение сервером (конец обработки), приема ответа клиентом.

Поделиться с друзьями: