Наша схема может оказаться медленной в случае наличия в очереди большого количества сообщений, поскольку каждый раз при добавлении нового придется просматривать их значительную часть. Можно хранить отдельно индексы последних сообщений со всеми имеющимися значениями приоритета.
Пробуждение любого процесса, заблокированного в вызове mq_receive
75-77 Если очередь была пуста в момент помещения в нее нового сообщения, мы вызываем pthread_cond_signal, чтобы разблокировать любой из процессов, ожидающих сообщения.
78 Увеличиваем на единицу количество сообщений в очереди mq_curmsgs.
Функция mq_receive
В
листинге 5.27 приведен текст первой половины функции mq_receive, которая получает необходимые указатели, блокирует взаимное исключение и проверяет объем буфера вызвавшего процесса, который должен быть достаточным для помещения туда сообщения максимально возможной длины.
Проверка полноты очереди
30-40 Если очередь пуста и установлен флаг O_NONBLOCK, возвращается ошибка с кодом EAGAIN. В противном случае увеличивается значение счетчика mqh_nwait, который проверяется функцией mq_send (листинг 5.25) в случае, если очередь пуста и есть процессы, ожидающие уведомления. Затем мы ожидаем сигнала по условной переменной, который будет передан функцией mq_send (листинг 5.26).
ПРИМЕЧАНИЕ
Наша реализация mq_receive, как и реализация mq_send, упрощает ситуацию с ошибкой EINTR, возвращаемой при прерывании ожидания сигналом, перехватываемым вызвавшим процессом.
В листинге 5.28 приведен текст второй половины функции mq_receive. Мы уже знаем, что в очереди есть сообщение, которое можно будет возвратить вызвавшему процессу.
Листинг 5.27.Функция mq_receive: первая половина
//my_pxmsg_mmap/mq_receive.с
1 #include "unpipc.h"
2 #include "mqueue.h"
3 ssize_t
4 mymq_receive(mymqd_t mqd, char *ptr, size_t maxlen, unsigned int *priop)
42 err_dump("mymq_receive: curmsgs = %ld; head = 0", attr->mq_curmsgs);
43 msghdr = (struct mymsg_hdr *) &mptr[index];
44 mqhdr->mqh_head = msghdr->msg_next; /* новое начало списка */
45 len = msghdr->msg_len;
46 memcpy(ptr, msghdr + 1, len); /* копирование самого сообщения */
47 if (priop != NULL)
48 *priop = msghdr->msg_prio;
49 /* только что считанное сообщение становится первым в списке пустых */
50 msghdr->msg_next = mqhdr->mqr_free;
51 mqhdr->mqh_free = index;
52 /* запуск любого процесса, заблокированного в вызове mq_send */
53 if (attr->mq_curmsgs == attr->mq_maxmsg)
54 pthread_cond_signal(&mqhdr->mqh_wait);
55 attr->mq_curmsgs--;
56 pthread_mutex_unlock(&mqhdr->mqh_lock);
57 return(len);
58 err:
59 pthread_mutex_unlock(&mqhdr->mqh_lock);
60 return(-1);
61 }
Возвращение сообщения вызвавшему процессу
43-51 msghdr указывает на msg_hdr первого сообщения в очереди, которое мы и возвратим. Освободившееся сообщение становится первым в списке свободных.
Разблокирование процесса, заблокированного в вызове mq_send
52-54 Если очередь была полной в момент считывания сообщения, мы вызываем pthread_cond_signal для отправки сообщения любому из процессов, заблокированных в вызове mq_send.
5.9. Резюме
Очереди сообщений Posix просты в использовании: новая очередь создается (или существующая открывается) функцией mq_open; закрываются очереди вызовом mq_close, а удаляются mq_unlink. Поместить сообщение в очередь можно функцией mq_send, а считать его оттуда можно с помощью mq_receive. Атрибуты очереди можно считать и установить с помощью функций mq_getattr и mq_setattr, а функция mq_notify позволяет зарегистрировать процесс на уведомление о помещении нового сообщения в пустую очередь. Каждое сообщение в очереди обладает приоритетом (небольшое целое число), и функция mq_receive всегда возвращает старейшее сообщение с наивысшим приоритетом.