Программирование в стандарте POSIX

       

Очереди сообщений


Мы переходим к рассмотрению средств локального межпроцессного взаимодействия, относящихся к необязательной части стандарта POSIX-2001, именуемой "X/Open-расширение системного интерфейса" (XSI). Будут описаны очереди сообщений, семафоры и разделяемые сегменты памяти.

Остановимся сначала на понятиях и структурах, общих для всех трех упомянутых средств.

Каждая очередь сообщений, набор семафоров и разделяемый сегмент однозначно идентифицируются положительным целым числом, которое обычно обозначается, соответственно, как msqid, semid и shmid и возвращается в качестве результатов функций msgget(), semget() и shmget().

При получении идентификаторов средств межпроцессного взаимодействия используется еще одна сущность - ключ, а для его генерации предназначена функция ftok() (см. листинг 8.22). Аргумент path должен задавать маршрутное имя существующего файла, к которому вызывающий процесс может применить функцию stat(). В качестве значения аргумента id, по соображениям мобильности, рекомендуется использовать однобайтный символ. Гарантируется, что функция ftok() сгенерирует один и тот же ключ для заданной пары (файл, символ) и разные ключи для разных пар.

#include <sys/ipc.h> key_t ftok (const char *path, int id);

Листинг 8.22. Описание функции ftok(). (html, txt)

С идентификатором средства межпроцессного взаимодействия ассоциирована структура данных, содержащая информацию о допустимых и выполненных операциях. Соответствующие декларации сосредоточены в заголовочных файлах <sys/msg.h>, <sys/sem.h> и <sys/shm.h>.

В упомянутую структуру входит подструктура ipc_perm с данными о владельцах и режимом доступа, описанная в файле <sys/ipc.h> и содержащая по крайней мере следующие поля.

uid_t uid; /* Идентификатор владельца */ gid_t gid; /* Идентификатор владеющей группы */ uid_t cuid; /* Идентификатор пользователя, создавшего данное средство межпроцессного взаимодействия */ gid_t cgid; /* Идентификатор создавшей группы */ mode_t mode; /* Режим доступа на чтение/запись */


Управление доступом к описываемым средствам межпроцессного взаимодействия осуществляется аналогично файловому, только наряду (и наравне) с владельцами (пользователем и группой) рассматриваются те, кто эти средства создал (создатели).

Опросить статус присутствующих в данный момент в системе (т. е. активных) средств межпроцессного взаимодействия позволяет служебная программа ipcs:

ipcs [-qms] [-a | -bcopt]

По умолчанию выдается краткая информация обо всех средствах - очередях сообщений, семафорах и разделяемых сегментах памяти. Если нужно ограничиться их отдельными видами, следует воспользоваться опциями -q, -s и/или -m, соответственно.

Следующие опции управляют форматом выдачи. Задание опции -a равносильно указанию всех опций формата. Опция -b предписывает выдавать лимиты на размер (максимальное количество байт в сообщениях очереди и т.п.), -c - имена пользователя и группы создателя средства, -o - информацию об использовании (количество сообщений в очереди, их суммарный размер и т.п.), -p - информацию о процессах (идентификаторы последнего отправителя, получателя и т.п.), -t - информацию о времени (последняя управляющая операция, последняя отправка сообщения и т.п.).

Для удаления из системы активных средств межпроцессного взаимодействия предназначена служебная программа ipcrm (разумеется, подверженная контролю прав доступа). Удаляемые средства могут задаваться идентификаторами или ключами:

ipcrm [-q msgid | -Q msgkey | -s semid | -S semkey | -m shmid | -M shmkey ] ...

На этом мы завершаем изложение общих вопросов, относящихся к средствам межпроцессного взаимодействия, и переходим к рассмотрению специфических возможностей каждого из них.

Механизм очередей сообщений позволяет процессам взаимодействовать, обмениваясь данными. Данные передаются между процессами дискретными порциями, называемыми сообщениями. Процессы выполняют над сообщениями две основные операции - прием и отправку. Процессы, отправляющие или принимающие сообщение, могут приостанавливаться, если требуемую операцию невозможно выполнить немедленно.В частности, могут быть отложены попытки отправить сообщение в заполненную до отказа очередь, получить сообщение из пустой очереди и т.п. ("операции с блокировкой"). Если же указано, что приостанавливать процесс нельзя, "операции без блокировки" либо выполняются немедленно, либо завершаются неудачей.

Прежде чем процессы смогут обмениваться сообщениями, один из них должен создать очередь. Одновременно определяются первоначальные права на выполнение операций для различных процессов, в том числе соответствующих управляющих действий над очередями.



В частности, могут быть отложены попытки отправить сообщение в заполненную до отказа очередь, получить сообщение из пустой очереди и т.п. ("операции с блокировкой"). Если же указано, что приостанавливать процесс нельзя, "операции без блокировки" либо выполняются немедленно, либо завершаются неудачей.

Прежде чем процессы смогут обмениваться сообщениями, один из них должен создать очередь. Одновременно определяются первоначальные права на выполнение операций для различных процессов, в том числе соответствующих управляющих действий над очередями.

Для работы с очередями сообщений стандарт POSIX-2001 предусматривает следующие функции (см. листинг 8.23): msgget() (получение идентификатора очереди сообщений), msgctl() (управление очередью сообщений), msgsnd() (отправка сообщения) и msgrcv() (прием сообщения).



#include <sys/msg.h> int msgget (key_t key, int msgflg); int msgsnd (int msqid, const void *msgp, size_t msgsz, int msgflg); ssize_t msgrcv (int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg); int msgctl (int msqid, int cmd, struct msqid_ds *buf);

Листинг 8.23. Описание функций для работы с очередями сообщений.

Структура msqid_ds, ассоциированная с идентификатором очереди сообщений, должна содержать по крайней мере следующие поля.

struct ipc_perm msg_perm; /* Данные о правах доступа к очереди сообщений */ msgqnum_t msg_qnum; /* Текущее количество сообщений в очереди */ msglen_t msg_qbytes; /* Максимально допустимый суммарный размер сообщений в очереди */ pid_t msg_lspid; /* Идентификатор процесса, отправившего последнее сообщение */ pid_t msg_lrpid; /* Идентификатор процесса, принявшего последнее сообщение */ time_t msg_stime; /* Время последней отправки */ time_t msg_rtime; /* Время последнего приема */ time_t msg_ctime; /* Время последнего изменения посредством msgctl() */

Перейдем к детальному рассмотрению функций для работы с очередями сообщений.

Функция msgget() возвращает идентификатор очереди сообщений, ассоциированный с ключом key.


Новая очередь, ее идентификатор и соответствующая структура msqid_ds создаются для заданного ключа, если значение аргумента key равно IPC_PRIVATE или очередь еще не ассоциирована с ключом, а в числе флагов msgflg задан IPC_CREAT.

Если необходима уверенность в том, что очередь с указанным ключом создается заново, в дополнение к флагу IPC_CREAT следует установить IPC_EXCL. Тогда попытка получить идентификатор уже существующий очереди завершится неудачей.

Структура msqid_ds для новой очереди инициализируется следующим образом.

  • Значения полей msg_perm.cuid, msg_perm.uid, msg_perm.cgid и msg_perm.gid устанавливаются равными действующим идентификаторам пользователя и группы вызывающего процесса.
  • Младшие девять бит поля msg_perm.mode устанавливаются равными младшим девяти битам значения msgflg.
  • Поля msg_qnum, msg_lspid, msg_lrpid, msg_stime и msg_rtime обнуляются.
  • В поле msg_ctime помещается текущее время, а в поле msg_qbytes - определенный в системе лимит.


Один из тонких вопросов, связанных с созданием очереди сообщений, заключается в выборе ключа. Всем процессам, которые намереваются работать с общей очередью сообщений, для получения идентификатора msqid необходимо знать ключ очереди. Задание ключа одинаковым константным значением во всех этих программах небезопасно, поскольку может оказаться так, что тот же ключ будет случайно задействован и другими программами. Как одно из возможных решений рекомендуется использование функции ftok(), вычисляющей действительно "уникальный" ключ.

В листинге 8.24 приведен простейший пример программы, где создается очередь сообщений с правами доступа, указанными в командной строке.

#include <stdio.h> #include <sys/ipc.h> #include <sys/msg.h>

/* Программа создает очередь сообщений. */ /* В командной строке задаются имя файла для ftok() */ /* и режим доступа к очереди сообщений */

#define FTOK_CHAR 'G'

int main (int argc, char *argv []) { key_t key; int msqid; int mode = 0;

if (argc != 3) { fprintf (stderr, "Использование: %s маршрутное_имя режим_доступа\n", argv [0]); return (1); }



if ((key = ftok (argv [1], FTOK_CHAR)) == (key_t) (-1)) { perror ("FTOK"); return (2); } (void) sscanf (argv [2], "%o", (unsigned int *) &mode);

if ((msqid = msgget (key, IPC_CREAT | mode)) < 0) { perror ("MSGGET"); return (3); }

return 0; }

Листинг 8.24. Пример программы, создающей очередь сообщений.

Если после выполнения этой программы воспользоваться командой ipcs -q, то результат может выглядеть так, как показано в листинге 8.25.

------ Message Queues -------- key msqid owner perms used-bytes messages 0x47034bac 163840 galat 644 0 0

Листинг 8.25. Возможный результат опроса статуса очередей сообщений.

Удалить созданную очередь из системы, соответствующей стандарту POSIX-2001, можно командой ipcrm -q 163840.

Операции отправки/приема сообщений выполняют функции msgsnd() и msgrcv(); msgsnd() помещает сообщения в очередь, а msgrcv() читает и "достает" их оттуда.

В обоих случаях первый аргумент задает идентификатор очереди; второй является указателем на содержащую сообщение структуру. Сообщение состоит из двух частей: текста (последовательности байт) и так называемого типа (положительного целого числа). Тип, указанный во время отправки, используется впоследствии при выборе сообщения из очереди. Аргумент msgsz определяет длину сообщения; аргумент msgflg задает флаги.

В зависимости от значения, указанного в качестве аргумента msgtyp функции msgrcv(), из очереди выбирается то или иное сообщение. Если значение аргумента равно нулю, запрашивается первое сообщение в очереди, если больше нуля - первое сообщение типа msgtyp, а если меньше нуля - первое сообщение наименьшего из типов, не превышающих абсолютную величину аргумента msgtyp. Пусть, например, в очередь последовательно помещены сообщения с типами 5, 3 и 2. Тогда вызов msgrcv (msqid, msgp, size, 0, flags) выберет из очереди сообщение с типом 5, поскольку оно отправлено первым; вызов msgrcv (msqid, msgp, size, -4, flags) - последнее сообщение, так как 2 - это наименьший из возможных типов в указанном диапазоне; наконец, вызов msgrcv (msqid, msgp, size, 3, flags) - сообщение с типом 3.



Во многих приложениях взаимодействующим посредством очереди сообщений процессам требуется синхронизировать свое выполнение. Например, процесс-получатель, пытавшийся прочитать сообщение и обнаруживший, что очередь пуста (либо сообщение указанного типа отсутствует), должен иметь возможность подождать, пока процесс-отправитель не поместит в очередь требуемое сообщение. Аналогичным образом, процесс, желающий отправить сообщение в очередь, в которой нет достаточного для него места, может ожидать его освобождения в результате чтения сообщений другими процессами. Процесс, вызвавший подобного рода "операцию с блокировкой", приостанавливается до тех пор, пока либо станет возможным выполнение операции, либо будет ликвидирована очередь. С другой стороны, имеются приложения, где подобные ситуации должны приводить к немедленному (и неудачному) завершению вызова функции.

Если не указано противное, функции msgsnd() и msgrcv() выполняют операции с блокировкой, например: msgsnd (msqid, msgp, size, 0); msgrcv (msqid, msgp, size, type, 0). Чтобы выполнить операцию без блокировки, необходимо установить флаг IPC_NOWAIT: msgsnd (msqid, msgp, size, IPC_NOWAIT); msgrcv (msqid, msgp, size, type, IPC_NOWAIT).

Аргумент msgp указывает на значение структурного типа, в котором представлены тип и тело сообщения (см. листинг 8.26).

struct msgbuf { long mtype; /* Тип сообщения */ char mtext [1]; /* Текст сообщения */ };

Листинг 8.26. Описание структурного типа для представления сообщений.

Для хранения реальных сообщений в прикладной программе следует определить аналогичную структуру, указав желаемый размер сообщения, например, так, как это сделано в листинге 8.27.

#define MAXSZTMSG 8192

struct mymsgbuf { long mtype; /* Тип сообщения */ char mtext [MAXSZTMSG]; /* Текст сообщения */ }; struct mymsgbuf msgbuf;

Листинг 8.27. Описание структуры для хранения сообщений.

В качестве аргумента msgsz обычно указывается размер текстового буфера, например: sizeof (msgbuf.text).

Если не указано противное, в случае, когда длина выбранного сообщения больше, чем msgsz, вызов msgrcv() завершается неудачей.


Если же установить флаг MSG_NOERROR, длинное сообщение обрезается до msgsz байт. Отброшенная часть пропадает, а вызывающий процесс не получает никакого уведомления о том, что сообщение обрезано.

При успешном завершении msgsnd() возвращает 0, а msgrcv() - значение, равное числу реально полученных байт; при неудаче возвращается -1.

Процессы, обладающие достаточными правами доступа, посредством функции msgctl() могут получать информацию о состоянии очереди, изменять ряд характеристик, удалять очередь.

Управляющее действие определяется значением аргумента cmd. Допустимых значений три: IPC_STAT - получить информацию о состоянии очереди, IPC_SET - переустановить характеристики очереди, IPC_RMID - удалить очередь.

Команды IPC_STAT и IPC_SET для хранения информации об очереди используют имеющуюся в прикладной программе структуру типа msqid_ds, указатель на которую содержит аргумент buf: IPC_STAT копирует в нее ассоциированную с очередью структуру данных, а IPC_SET, наоборот, в соответствии с ней обновляет ассоциированную структуру. Команда IPC_SET позволяет переустановить значения идентификаторов владельца (msg_perm.uid) и владеющей группы (msg_perm.gid), режима доступа (msg_perm.mode), максимально допустимый суммарный размер сообщений в очереди (msg_qbytes). Увеличить значение msg_qbytes может только процесс, обладающий соответствующими привилегиями.

В листинге 8.28 приведена программа, изменяющая максимально допустимый суммарный размер сообщений в очереди. Предполагается, что очередь сообщений уже создана, а ее идентификатор известен. Читателю предлагается выполнить эту программу с разными значениями максимально допустимого суммарного размера (как меньше, так и больше текущего), действуя от имени обычного и привилегированного пользователя.

#include <stdio.h> #include <sys/msg.h>

int main (int argc, char *argv []) { int msqid; struct msqid_ds msqid_ds;

if (argc != 3) { fprintf (stderr, "Использование: %s идентификатор_очереди максимальный_размер\n", argv [0]); return (1); }



(void) sscanf (argv [1], "%d", &msqid);

/* Получим исходное значение структуры данных */ if (msgctl (msqid, IPC_STAT, &msqid_ds) == -1) { perror ("IPC_STAT-1"); return (2); } printf ("Максимальный размер очереди до изменения: %ld\n", msqid_ds.msg_qbytes);

(void) sscanf (argv [2], "%d", (int *) &msqid_ds.msg_qbytes);

/* Попробуем внести изменения */ if (msgctl (msqid, IPC_SET, &msqid_ds) == -1) { perror ("IPC_SET"); }

/* Получим новое значение структуры данных */ if (msgctl (msqid, IPC_STAT, &msqid_ds) == -1) { perror ("IPC_STAT-2"); return (3); } printf ("Максимальный размер очереди после изменения: %ld\n", msqid_ds.msg_qbytes);

return 0; }

Листинг 8.28. Пример программы управления очередями сообщений.

Две программы, показанные в листингах 8.29 и 8.30, демонстрируют полный цикл работы с очередями сообщений - от создания до удаления. Программа из листинга 8.29 представляет собой родительский процесс, читающий строки со стандартного ввода и отправляющий их в виде сообщений процессу-потомку (листинг 8.30). Последний принимает сообщения и выдает их тела на стандартный вывод. Предполагается, что программа этого процесса находится в файле msq_child текущего каталога.

#include <unistd.h> #include <stdio.h> #include <limits.h> #include <string.h> #include <sys/wait.h> #include <sys/msg.h>

/* Программа копирует строки со стандартного ввода на стандартный вывод, */ /* "прокачивая" их через очередь сообщений */

#define FTOK_FILE "/home/galat" #define FTOK_CHAR "G"

#define MSGQ_MODE 0644

#define MY_PROMPT "Вводите строки\n" #define MY_MSG "Вы ввели: "

int main (void) { key_t key; int msqid; struct mymsgbuf { long mtype; char mtext [LINE_MAX]; } line_buf, msgbuf;

switch (fork ()) { case -1: perror ("FORK"); return (1); case 0: /* Чтение из очереди и выдачу на стандартный вывод */ /* реализуем в порожденном процессе. */ (void) execl ("./msq_child", "msq_child", FTOK_FILE, FTOK_CHAR, (char *) 0); perror ("EXEC"); return (2); /* execl() завершился неудачей */ }



/* Чтение со стандартного ввода и запись в очередь */ /* возложим на родительский процесс */

/* Выработаем ключ для очереди сообщений */ if ((key = ftok (FTOK_FILE, FTOK_CHAR [0])) == (key_t) (-1)) { perror ("FTOK"); return (3); }

/* Получим идентификатор очереди сообщений */ if ((msqid = msgget (key, IPC_CREAT | MSGQ_MODE)) < 0) { perror ("MSGGET"); return (4); }

/* Приступим к отправке сообщений в очередь */ msgbuf.mtype = line_buf.mtype = 1; strncpy (msgbuf.mtext, MY_PROMPT, sizeof (msgbuf.mtext)); if (msgsnd (msqid, (void *) &msgbuf, strlen (msgbuf.mtext) + 1, 0) != 0) { perror ("MSGSND-1"); return (5); } strncpy (msgbuf.mtext, MY_MSG, sizeof (msgbuf.mtext));

while (fgets (line_buf.mtext, sizeof (line_buf.mtext), stdin) != NULL) { if (msgsnd (msqid, (void *) &msgbuf, strlen (msgbuf.mtext) + 1, 0) != 0) { perror ("MSGSND-2"); break; } if (msgsnd (msqid, (void *) &line_buf, strlen (line_buf.mtext) + 1, 0) != 0) { perror ("MSGSND-3"); break; } }

/* Удалим очередь */ if (msgctl (msqid, IPC_RMID, NULL) == -1) { perror ("MSGCTL-IPC_RMID"); return (6); }

return (0); }

Листинг 8.29. Передающая часть программы работы с очередями сообщений.

#include <stdio.h> #include <limits.h> #include <sys/msg.h>

/* Программа получает сообщения из очереди */ /* и копирует их тела на стандартный вывод */

#define MSGQ_MODE 0644

int main (int argc, char *argv []) { key_t key; int msqid; struct mymsgbuf { long mtype; char mtext [LINE_MAX]; } msgbuf;

if (argc != 3) { fprintf (stderr, "Использование: %s имя_файла цепочка_символов\n", argv [0]); return (1); }

/* Выработаем ключ для очереди сообщений */ if ((key = ftok (argv [1], *argv [2])) == (key_t) (-1)) { perror ("CHILD FTOK"); return (2); }

/* Получим идентификатор очереди сообщений */ if ((msqid = msgget (key, IPC_CREAT | MSGQ_MODE)) < 0) { perror ("CHILD MSGGET"); return (3); }

/* Цикл приема сообщений и выдачи строк */ while (msgrcv (msqid, (void *) &msgbuf, sizeof (msgbuf.mtext), 0, 0) > 0) { if (fputs (msgbuf.mtext, stdout) == EOF) { break; } }

return 0; }

Листинг 8.30. Приемная часть программы работы с очередями сообщений.

Обратим внимание на способ выработки согласованного ключа, а также на то, что, вообще говоря, неизвестно, какой из процессов - родительский или порожденный - создаст очередь, а какой получит уже ассоциированный с ключом идентификатор (вызовы msgget() в обоих процессах одинаковы), но на корректность работы программы это не влияет.


Содержание раздела