Чтение онлайн

ЖАНРЫ

UNIX: взаимодействие процессов

Стивенс Уильям Ричард

Шрифт:

ПРИМЕЧАНИЕ

Наша схема может оказаться медленной в случае наличия в очереди большого количества сообщений, поскольку каждый раз при добавлении нового придется просматривать их значительную часть. Можно хранить отдельно индексы последних сообщений со всеми имеющимися значениями приоритета. 

Пробуждение любого процесса, заблокированного в вызове mq_receive

75-77 Если очередь была пуста в момент помещения в нее нового сообщения, мы вызываем pthread_cond_signal, чтобы разблокировать любой из процессов, ожидающих сообщения.

78 Увеличиваем на единицу количество сообщений в очереди mq_curmsgs.

Функция mq_receive

В

листинге 5.27 приведен текст первой половины функции mq_receive, которая получает необходимые указатели, блокирует взаимное исключение и проверяет объем буфера вызвавшего процесса, который должен быть достаточным для помещения туда сообщения максимально возможной длины.

Проверка полноты очереди

30-40 Если очередь пуста и установлен флаг O_NONBLOCK, возвращается ошибка с кодом EAGAIN. В противном случае увеличивается значение счетчика mqh_nwait, который проверяется функцией mq_send (листинг 5.25) в случае, если очередь пуста и есть процессы, ожидающие уведомления. Затем мы ожидаем сигнала по условной переменной, который будет передан функцией mq_send (листинг 5.26).

ПРИМЕЧАНИЕ

Наша реализация mq_receive, как и реализация mq_send, упрощает ситуацию с ошибкой EINTR, возвращаемой при прерывании ожидания сигналом, перехватываемым вызвавшим процессом.

В листинге 5.28 приведен текст второй половины функции mq_receive. Мы уже знаем, что в очереди есть сообщение, которое можно будет возвратить вызвавшему процессу.

Листинг 5.27.Функция mq_receive: первая половина

//my_pxmsg_mmap/mq_receive.с

1 #include "unpipc.h"

2 #include "mqueue.h"

3 ssize_t

4 mymq_receive(mymqd_t mqd, char *ptr, size_t maxlen, unsigned int *priop)

5 {

6 int n;

7 long index;

8 int8_t *mptr;

9 ssize_t len;

10 struct mymq_hdr *mqhdr;

11 struct mymq_attr *attr;

12 struct mymsg_hdr *msghdr;

13 struct mymq_info *mqinfo;

14 mqinfo = mqd;

15 if (mqinfo->mqi_magic != MQI_MAGIC) {

16 errno = EBADF;

17 return(-1);

18 }

19 mqhdr = mqinfo->mqi_hdr; /* указатель struct */

20 mptr = (int8_t *) mqhdr; /* указатель на байт */

21 attr = &mqhdr->mqh_attr;

22 if ((n = pthread_mutex_lock(&mqhdr->mqh_lock)) != 0) {

23 errno = n;

24 return(-1);

25 }

26 if (maxlen < attr->mq_msgsize) {

27 errno = EMSGSIZE;

28 goto err;

29 }

30 if (attr->mq_curmsgs = 0) { /* очередь пуста */

31 if (mqinfo->mqi_flags & O_NONBLOCK) {

32 errno = EAGAIN;

33 goto err;

34 }

35 /*
ожидаем помещения сообщения в очередь */

36 mqhdr->mqh_nwait++;

37 while (attr->mq_curmsgs == 0)

38 pthread_cond_wait(&mqhdr->mqh_wait, &mqhdr->mqh_lock);

39 mqhdr->mqh_nwait--;

40 }

Листинг 5.28. Функция mq_receive: вторая половина

//my_pxmsg_mmap/mq_receive.c

41
if ((index = mqhdr->mqh_head) == 0)

42 err_dump("mymq_receive: curmsgs = %ld; head = 0", attr->mq_curmsgs);

43 msghdr = (struct mymsg_hdr *) &mptr[index];

44 mqhdr->mqh_head = msghdr->msg_next; /* новое начало списка */

45 len = msghdr->msg_len;

46 memcpy(ptr, msghdr + 1, len); /* копирование самого сообщения */

47 if (priop != NULL)

48 *priop = msghdr->msg_prio;

49 /* только что считанное сообщение становится первым в списке пустых */

50 msghdr->msg_next = mqhdr->mqr_free;

51 mqhdr->mqh_free = index;

52 /* запуск любого процесса, заблокированного в вызове mq_send */

53 if (attr->mq_curmsgs == attr->mq_maxmsg)

54 pthread_cond_signal(&mqhdr->mqh_wait);

55 attr->mq_curmsgs--;

56 pthread_mutex_unlock(&mqhdr->mqh_lock);

57 return(len);

58 err:

59 pthread_mutex_unlock(&mqhdr->mqh_lock);

60 return(-1);

61 }

Возвращение сообщения вызвавшему процессу

43-51 msghdr указывает на msg_hdr первого сообщения в очереди, которое мы и возвратим. Освободившееся сообщение становится первым в списке свободных. 

Разблокирование процесса, заблокированного в вызове mq_send

52-54 Если очередь была полной в момент считывания сообщения, мы вызываем pthread_cond_signal для отправки сообщения любому из процессов, заблокированных в вызове mq_send.

5.9. Резюме

Очереди сообщений Posix просты в использовании: новая очередь создается (или существующая открывается) функцией mq_open; закрываются очереди вызовом mq_close, а удаляются mq_unlink. Поместить сообщение в очередь можно функцией mq_send, а считать его оттуда можно с помощью mq_receive. Атрибуты очереди можно считать и установить с помощью функций mq_getattr и mq_setattr, а функция mq_notify позволяет зарегистрировать процесс на уведомление о помещении нового сообщения в пустую очередь. Каждое сообщение в очереди обладает приоритетом (небольшое целое число), и функция mq_receive всегда возвращает старейшее сообщение с наивысшим приоритетом.

Поделиться с друзьями: