Чтение онлайн

ЖАНРЫ

QNX/UNIX: Анатомия параллелизма
Шрифт:

• У захваченного мьютекса всегда есть поток-владелец, и только он может освободить его в дальнейшем. Именно поэтому мьютекс может использоваться для синхронизации потоков, но только синхронизации в смысле разграничения временной последовательности доступак фрагменту кода — к тому, что часто называют критической секцией кода. Функциональность семафора значительно выше: при возможности (почти всегда) его применения в том контексте, в котором используется и мьютекс (только нужно ли это делать?), он может применяться и для синхронизации потоков в смысле координации последовательностиих взаимодействия в качестве элемента, управляющего

порядком выполнения. Покажем это на примере. Для этого незначительно трансформируем код предыдущего теста для семафора ( файл sy21.cc):

Синхронизация потоков семафорами

#include <stdlib.h>

#include <stdio.h>

#include <string.h>

#include <inttypes.h>

#include <iostream.h>

#include <unistd.h>

#include <pthread.h>

#include <errno.h>

#include <semaphore.h>

unsigned long N = 1000;

unsigned int T = 2;

static sem_t* sem;

static bool debug = false;

static char* str; // строка диагностики

static volatile int ind = 0;

uint64_t *t;

void* threadfunc(void* data) {

unsigned long i = 0;

char tid[8];

sprintf(tid, "%X", pthread_self);

// временная метка начала во всех потоках устанавливается

// на время достижения этой точки в последнем (активном) потоке

if ((int)data == T - 1) {

uint64_t с = ClockCycles;

for (int i = 0; i < T; i++ ) t[i] = c;

}

// рабочий цикл переключений за счет синхронизации

while (i++ < N) {

sem_wait(sem + (int)data);

if (debug) str[ind++] = *tid;

sem_post(sem + ((int)data +1) % T);

}

t[(int)data] = ClockCycles - t[(int)data];

return NULL;

}

int main(int argc, char *argv[]) {

int opt, val;

while ((opt = getopt(argc, argv, "n:t:v")) != -1) {

switch(opt) {

case 'n':

if (sscanf(optarg, "%i", &val) != 1)

cout << "parse command line error" << endl, exit(EXIT_FAILURE);

if (val > 0) N — val;

break;

case 't':

if (sscanf(optarg, "%i", &val) != 1)

cout << "parse command line error" << endl, exit(EXIT_FAILURE);

if (val > 0) T = val;

break;

case 'v':

debug = true;

break;

default:

exit(EXIT_FAILURE);

}

}

if (debug) str = new char[T * N + 1];

pthread_t* tid = new pthread_t[T];

sem = new sem_t[T];

t = new uint64_t[T];

for (int i = 0; i < T; i++) {

//
все потоки, кроме последнего, будут заблокированы

// на своих семафорах сразу же после старта

if (sem_init(sem + i, 0, (i == (T - 1)) ? 1 : 0))

perror("semaphore init"), exit(EXIT_FAILURE);

if (pthread_create(tid + i, NULL, threadfunc, (void*)i
! = EOK)

perror( "thread create error"), exit(EXIT_FAILURE);

}

for (int i=0; i < T; i++)

pthread_join(tid[i], NULL);

for (int i = 0; i < T; i++) sem_destroy(sem + i);

delete [] sem;

for (int i = 0; i < T; i++)

cout << tid[i] << "\t: cycles - " << t[i] << ";\ton semaphore - " <<

t[i] / T / N << endl;

delete [] tid;

delete [] t;

if (debug) {

str[ind] = "\0"; cout << str << endl;

delete [] str;

}

exit(EXIT_SUCCESS);

}

Логически приложение изменилось следующим образом:

• Теперь у нас может быть не 2 идентичных (симметричных) потока, а произвольное их количество (ключ

– t
при запуске приложения).

• Потоки синхронизируются не на одном семафоре — введен массив семафоров по числу потоков: каждый поток блокируется на «своем» семафоре, но разблокирует его (после очередного выполнения своего фрагмента) семафор заблокированного «соседа».

• Теперь нам нет нужды использовать барьер для одновременного старта всех созданных потоков: семафоры всех создаваемых потоков инициализируются нулевым значением; стартующий поток тут же блокируется на своем семафоре, и только последний из запущенных выполняется, не блокируясь на семафоре.

Поделиться с друзьями: