Чтение онлайн

ЖАНРЫ

QNX/UNIX: Анатомия параллелизма
Шрифт:

9 : cycles - 1143005896; on semaphore - 1344

10 : cycles - 1143021518, on semaphore - 1344

11 : cycles - 1143036136; on semaphore - 1344

12 : cycles - 1143053448; on semaphore - 1344

13 : cycles - 1143068415; on semaphore - 1344

14 : cycles - 1143083676; on semaphore - 1344

15 : cycles - 1143098361; on semaphore - 1344

16 : cycles - 1143114009; on semaphore - 1344

17 : cycles - 1143128525; on semaphore - 1344

18 : cycles - 1142872665; on semaphore - 1344

Есть

некоторая корреляция времени переключения контекста с размером выборки и количеством обрабатывающих потоков, но она в широком диапазоне этих параметров не превышает 8%. В данном приложении эта численная величина включает в себя: блокирование на семафоре, переключение на контекст другого потока и разблокирование семафора. Если вспомнить, что раньше мы получали оценки для принудительного (посредством
sched_yield
) переключения контекста потоков в 375 процессорных циклов, а для захвата-освобождения семафора — порядка 870, то эти цифры хорошо согласуются с полученными сейчас результатами.

Рассматриваемые примитивы служат принципиально различным целям. Мьютекс, как уже было сказано ранее, предназначен в первую очередь для регламентации доступа к участкам программного кода. Семафоры же больше предназначены для регламентации порядка доступа к определенным объектам данных. Классическими задачами этого класса являются задачи «производитель-потребитель», когда M производителей создают некоторые объекты данных (читая эти данные с реальных внешних устройств, или создавая их как результат только внутренних вычислений, или любым другим способом), а N потребителей независимо берут произведенные объекты данных на последующую обработку.

Это настолько общий и часто встречающийся класс задач, что покажем для него простейший «скелет» в виде отдельного приложения, в котором отслеживание порядка доступа потребителей будет осуществлять счетный семафор ( файл sy22.cc). Для простоты понимания приложение сделано как трансформация кода предшествующей группы тестов. В качестве имитации производства объекта данных, как и в качестве его обработки потребителем, используется пассивная пауза (

delay
) на случайную величину (производство и обработка объектов данных в коде не показаны, так как это не относится к существу рассматриваемого — нас интересуют процессы синхронизации этих операций, а не сами операции).

Кроме основной нашей цели это приложение дополнительно демонстрирует:

• Практическое использование принудительного завершения (отмены) потоков «извне» с управлением состоянием завершаемости потоков и расстановкой точек отмены, о чем мы уже говорили ранее.

• Использование атомарных (непрерываемых) операций (например,

atomic_add_value
), о которых мы будем говорить чуть позже.

• Использование реентерабельных форм функций стандартной библиотеки, безопасных в многопоточной среде (

rand_r
вместо
rand
).

Один производитель — T потребителей

#include <stdlib.h>

#include <stdio.h>

#include <iostream.h>

#include <unistd.h>

#include <pthread.h>

#include <errno.h>

#include <semaphore.h>

#include <atomic.h>

const int D = 10;

unsigned int T = 2;

static sem_t sem;

pthread_t* tid;

void* writer(void* data) {

unsigned long i = (int)(data); //
общий размер выборки

unsigned int s = 1;

while (i-- > 0) {

delay((long)rand_r(&s) * D / RAND_MAX + 1);

sem_post(&sem); // объект данных произведен

}

for (i = 0; i < T; i++) pthread_cancel(tid[i + 1]);

return NULL;

}

static char *str; // строка результирующей диагностики

static volatile unsigned ind = 0;

void* reader(void*) {

char tid[8];

sprintf(tid, "%X", pthread_self);

unsigned int s = rand;

pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);

while(true) {

sem_wait(&sem); // получен объект данных

str[atomic_add_value(&ind, 1)] = *tid;

pthread_testcancel;

delay((long)rand_r(&s) * D * T / RAND_MAX + 1);

}

return NULL;

}

int main(int argc, char *argv[]) {

unsigned long N = 1000;

int opt, val;

while ((opt = getopt(argc, argv, "n:t:")) != -1) {

switch(opt) {

case 'n':

if (sscanf(optarg, "%i", &val) != 1)

cout << "parse command line error" << endl, exit(EXIT_FAILURE);

if (val > 0) N = val;

break;

case 't':

if (sscanf(optarg, "%i", &val) != 1)

cout << "parse command line error" << endl, exit(EXIT_FAILURE);

if (val > 0) T = val;

break;

default:

exit(EXIT_FAILURE);

}

Поделиться с друзьями: