Hi everyone,

I've written a patch that adds support for POSIX message queues.  I
started this implementation because I wanted to play around with this
API in a hobby project.  The patch is not yet fully tested but the
main functionality is there to warrant some feedback on both the
implementation and the usefulness of the API.

I've specifically left out the implementation of mq_notify(2) because
OpenBSD lacks sigevent support.  I am planning to look into that if this
work is of interest.  The notification facility of the POSIX mqueue API
is one of its strengths compared to the SysV API.

Depending on the feedback, I am planning to work on the following items:

* Rethink default limits with respect to the default message size, maximum
  message size, default maximum # of messages and absolute maximum # of 
messages.
  Do we need sysctl knobs?

* Respect MQ_OPEN_MAX.  The implementation uses file descriptors internally
  so currently the fd limits apply but it may be desirable to have stricter
  default limits for message queues.

* Hook into poll() and kqfilter().  Non-standard extension.

* Include regress tests for the API and test it against real software
  that depend on this API.

* Think about how to list the currently live message queues.
  This can be done with a tool similar to ipcs(1).  It can also be done
  by exposing the message queues on the filesystem.

* Implement mq_notify(2) and the related sigevent infrastructure.  This
  maybe can be done incrementally.  Unable to estimate how much work
  it is involved to get this working.

* Consider implementing mq_send(3) on top of mq_timedsend(2) in userspace.
  Similarly for mq_receive(3).

* Consider using the pool(9) API for messages that are smaller or
  equal to the default message size.

* Simplify sys_mq_open().  Clean up the rest of the code as much as possible.
  Do proper style(9) checks.

* Cross-check with POSIX to ensure semantics are properly implemented.

Overall, I think the POSIX message queue API is cleaner and easier to
use than the SysV one.  The main and obvious downside is that not much
software is using it currently.

Let me know what you think!

Thanks,
Dimitris

>From 6b2ed6b728de3c2ca8ab17fca527303b44840ba0 Mon Sep 17 00:00:00 2001
From: Dimitris Papastamos <[email protected]>
Date: Tue, 15 Sep 2015 16:54:34 +0100
Subject: [PATCH] Initial implementation of mq_*() interfaces

---
 conf/files           |   1 +
 kern/init_main.c     |   4 +
 kern/sys_mqueue.c    | 724 +++++++++++++++++++++++++++++++++++++++++++++++++++
 kern/syscalls.master |  14 +
 sys/_types.h         |   1 +
 sys/file.h           |   1 +
 sys/mqueue.h         |  33 +++
 sys/types.h          |   1 +
 8 files changed, 779 insertions(+)
 create mode 100644 kern/sys_mqueue.c
 create mode 100644 sys/mqueue.h

diff --git a/conf/files b/conf/files
index 691d7f2..f3d5bfc 100644
--- a/conf/files
+++ b/conf/files
@@ -694,6 +694,7 @@ file kern/subr_prof.c
 file kern/subr_userconf.c              boot_config
 file kern/subr_xxx.c
 file kern/sys_generic.c
+file kern/sys_mqueue.c
 file kern/sys_pipe.c
 file kern/sys_process.c                        ptrace | systrace
 file kern/sys_socket.c
diff --git a/kern/init_main.c b/kern/init_main.c
index e2505ea..cc323d9 100644
--- a/kern/init_main.c
+++ b/kern/init_main.c
@@ -67,6 +67,7 @@
 #ifdef SYSVSEM
 #include <sys/sem.h>
 #endif
+#include <sys/mqueue.h>
 #ifdef SYSVMSG
 #include <sys/msg.h>
 #endif
@@ -376,6 +377,9 @@ main(void *framep)
        seminit();
 #endif
 
+       /* Initialize POSIX message queues. */
+       mqinit();
+
 #ifdef SYSVMSG
        /* Initialize System V style message queues. */
        msginit();
diff --git a/kern/sys_mqueue.c b/kern/sys_mqueue.c
new file mode 100644
index 0000000..d85cda0
--- /dev/null
+++ b/kern/sys_mqueue.c
@@ -0,0 +1,724 @@
+/*
+ * Copyright (c) 2015 Dimitris Papastamos <[email protected]>
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+#include <sys/param.h>
+#include <sys/systm.h>
+#include <sys/proc.h>
+#include <sys/file.h>
+#include <sys/filedesc.h>
+#include <sys/pool.h>
+#include <sys/malloc.h>
+#include <sys/mount.h>
+#include <sys/stat.h>
+#include <sys/syscallargs.h>
+#include <sys/time.h>
+#include <sys/vnode.h>
+#include <sys/mqueue.h>
+
+#define MQ_NAMELEN     (NAME_MAX + 1)
+#define MQ_DEF_MSGSIZE 1024
+/* Stored in mq_flags when message queue is marked for removal. */
+#define MQ_DYING       0x10000000
+
+struct mq {
+       char   name[MQ_NAMELEN];
+       int    oflag;
+       mode_t mode;
+       struct mq_attr attr;
+       int    schan;
+       int    rchan;
+       uid_t  euid;
+       gid_t  egid;
+       u_int  refcnt;
+       struct rwlock lock;
+       TAILQ_HEAD(, mq_msg) head;
+       LIST_ENTRY(mq)       entry;
+};
+
+struct mq_msg {
+       TAILQ_ENTRY(mq_msg) entry;
+       size_t   len;
+       u_int    prio;
+       uint8_t  data[];
+};
+
+int mqread(struct file *, off_t *, struct uio *, struct ucred *);
+int mqwrite(struct file *, off_t *, struct uio *, struct ucred *);
+int mqioctl(struct file *, u_long, caddr_t, struct proc *);
+int mqpoll(struct file *, int, struct proc *);
+int mqkqfilter(struct file *, struct knote *);
+int mqstat(struct file *, struct stat *, struct proc *);
+int mqclose(struct file *, struct proc *);
+
+struct fileops mqops = {
+       mqread, mqwrite, mqioctl, mqpoll, mqkqfilter, mqstat, mqclose
+};
+
+/* TODO: Rethink default limits, should there be sysctl knobs? */
+u_int mq_max_msgsize = 16 * MQ_DEF_MSGSIZE;
+u_int mq_def_maxmsg = 128;
+u_int mq_max_maxmsg = 16 * 32;
+
+LIST_HEAD(, mq) mqlist;
+struct rwlock mqlistlock;
+
+void
+mqinit(void)
+{
+       rw_init(&mqlistlock, "mqlistlock");
+       LIST_INIT(&mqlist);
+}
+
+int
+ts2timo(struct proc *p, struct timespec *ts, int *timo)
+{
+       struct timespec rts;
+       int error;
+
+       error = clock_gettime(p, CLOCK_REALTIME, &rts);
+       if (error != 0)
+               return (error);
+       if (timespeccmp(&rts, ts, >=))
+               return (ETIMEDOUT);
+       timespecsub(ts, &rts, ts);
+       error = timespecfix(ts);
+       if (error != 0)
+               return (error);
+       *timo = tstohz(ts);
+       return (0);
+}
+
+struct mq *
+mqlookup(char *name)
+{
+       struct mq *mq;
+
+       LIST_FOREACH(mq, &mqlist, entry)
+               if (strncmp(mq->name, name, MQ_NAMELEN) == 0)
+                       return (mq);
+       return (NULL);
+}
+
+void
+msgdestroy(struct mq_msg *msg)
+{
+       free(msg, M_TEMP, sizeof(*msg) + msg->len);
+}
+
+struct mq_msg *
+msgcreate(const char *udata, size_t len, u_int prio, int *error)
+{
+       struct mq_msg *msg;
+
+       msg = malloc(sizeof(*msg) + len, M_TEMP, M_WAITOK);
+       *error = copyin(udata, msg->data, len);
+       if (*error != 0) {
+               free(msg, M_TEMP, sizeof(*msg) + len);
+               return (NULL);
+       }
+       msg->len = len;
+       msg->prio = prio;
+       return (msg);
+}
+
+void
+mqdestroy(struct mq *mq)
+{
+       struct mq_msg *msg;
+
+       while ((msg = TAILQ_FIRST(&mq->head))) {
+               TAILQ_REMOVE(&mq->head, msg, entry);
+               msgdestroy(msg);
+       }
+       free(mq, M_TEMP, sizeof(*mq));
+}
+
+struct mq *
+mqcreate(char *name, int oflag, mode_t mode, struct mq_attr *attr,
+         struct proc *p)
+{
+       struct mq *mq;
+
+       mq = malloc(sizeof(*mq), M_TEMP, M_WAITOK | M_ZERO);
+       strlcpy(mq->name, name, sizeof(mq->name));
+       mq->oflag = oflag;
+       mq->mode = (mode & ALLPERMS) & ~p->p_fd->fd_cmask;
+       mq->euid = p->p_ucred->cr_uid;
+       mq->egid = p->p_ucred->cr_gid;
+       memcpy(&mq->attr, attr, sizeof(mq->attr));
+       rw_init(&mq->lock, "mqlock");
+       TAILQ_INIT(&mq->head);
+       return (mq);
+}
+
+int
+mqaccess(struct ucred *cred, struct mq *mq, mode_t mode)
+{
+       return vaccess(VNON, mq->mode, mq->euid, mq->egid, mode, cred);
+}
+
+/*
+ * TODO: Maybe it would make sense to have dummy bad ops for these
+ * somewhere else so we don't need to define our own.
+ */
+int
+mqread(struct file *fp, off_t *off, struct uio *uio, struct ucred *cred)
+{
+       return (EOPNOTSUPP);
+}
+
+int
+mqwrite(struct file *fp, off_t *off, struct uio *uio, struct ucred *cred)
+{
+       return (EOPNOTSUPP);
+}
+
+int
+mqioctl(struct file *fp, u_long cmd, caddr_t data, struct proc *p)
+{
+       return (EOPNOTSUPP);
+}
+
+/*
+ * TODO: As a non-standard extension, implement poll
+ * on message queue descriptors.
+ */
+int
+mqpoll(struct file *fp, int events, struct proc *p)
+{
+       return (EOPNOTSUPP);
+}
+
+/*
+ * TODO: As a non-standard extension, implement kqfilter
+ * on message queue descriptors.
+ */
+int
+mqkqfilter(struct file *fp, struct knote *kn)
+{
+       return (EOPNOTSUPP);
+}
+
+/*
+ * TODO: Check if it makes sense to implement stat for
+ * message queue descriptors.
+ */
+int
+mqstat(struct file *fp, struct stat *ub, struct proc *p)
+{
+       return (EOPNOTSUPP);
+}
+
+int
+mqclose(struct file *fp, struct proc *p)
+{
+       struct mq *mq;
+       int destroy = 0;
+
+       mq = fp->f_data;
+       rw_enter_write(&mq->lock);
+       if (--mq->refcnt == 0 && (mq->attr.mq_flags & MQ_DYING) != 0)
+               destroy = 1;
+       rw_exit_write(&mq->lock);
+       if (destroy != 0)
+               mqdestroy(mq);
+       return (0);
+}
+
+/*
+ * Require one slash at the start as per POSIX.
+ * Disallow any further slashes.
+ */
+int
+namechk(char *name)
+{
+       if (*name++ != '/')
+               return (EINVAL);
+       while (*name && *name != '/')
+               name++;
+       return *name ? (EINVAL) : (0);
+}
+
+mqd_t
+sys_mq_open(struct proc *p, void *v, register_t *retval)
+{
+       struct sys_mq_open_args /* {
+               syscallarg(const char *) name;
+               syscallarg(int) oflag;
+               syscallarg(mode_t) mode;
+               syscallarg(struct mq_attr *) attr;
+       } */ *uap = v;
+       struct mq *mq, *newmq = NULL;
+       struct mq_attr attr;
+       struct filedesc *fdp = p->p_fd;
+       struct file *fp;
+       mode_t accmode;
+       char *name;
+       int oflag, fd, error;
+
+       oflag = SCARG(uap, oflag);
+       if ((oflag & O_CREAT) != 0) {
+               if (SCARG(uap, attr)) {
+                       error = copyin(SCARG(uap, attr), &attr, sizeof(attr));
+                       if (error != 0)
+                               return (error);
+                       if (attr.mq_maxmsg <= 0 || attr.mq_msgsize <= 0)
+                               return (EINVAL);
+                       if (attr.mq_maxmsg > mq_max_maxmsg ||
+                           attr.mq_msgsize > mq_max_msgsize)
+                               return (ENOSPC);
+                       attr.mq_curmsgs = 0;
+               } else {
+                       memset(&attr, 0, sizeof(attr));
+                       attr.mq_maxmsg = mq_def_maxmsg;
+                       attr.mq_msgsize = MQ_DEF_MSGSIZE;
+               }
+       }
+
+       name = malloc(MQ_NAMELEN, M_TEMP, M_WAITOK);
+       error = copyinstr(SCARG(uap, name), name, MQ_NAMELEN, NULL);
+       if (error != 0)
+               goto err0;
+
+       error = namechk(name);
+       if (error != 0)
+               goto err0;
+
+       fdplock(fdp);
+       error = falloc(p, &fp, &fd);
+       if (error != 0)
+               goto err1;
+
+       fp->f_flag = FREAD;
+       if ((oflag & O_WRONLY) != 0 || (oflag & O_RDWR) != 0)
+               fp->f_flag |= FWRITE;
+       if ((oflag & O_NONBLOCK) != 0)
+               fp->f_flag |= FNONBLOCK;
+
+       rw_enter_write(&mqlistlock);
+       mq = mqlookup(name);
+       if (mq) {
+               rw_enter_write(&mq->lock);
+               if ((oflag & O_CREAT) != 0 && (oflag & O_EXCL) != 0) {
+                       error = EEXIST;
+                       goto err3;
+               }
+               accmode = VREAD;
+               if ((fp->f_flag & FWRITE) != 0)
+                       accmode |= VWRITE;
+               if (mqaccess(p->p_ucred, mq, accmode) != 0) {
+                       error = EACCES;
+                       goto err3;
+               }
+       } else {
+               if ((oflag & O_CREAT) == 0) {
+                       error = ENOENT;
+                       goto err2;
+               }
+               mq = mqcreate(name, oflag, SCARG(uap, mode), &attr, p);
+               newmq = mq;
+               rw_enter_write(&mq->lock);
+       }
+
+       fp->f_type = DTYPE_MQUEUE;
+       fp->f_data = mq;
+       fp->f_ops = &mqops;
+
+       fdp->fd_ofileflags[fd] |= UF_EXCLOSE;
+
+       FILE_SET_MATURE(fp, p);
+       fdpunlock(fdp);
+
+       mq->refcnt++;
+       if (newmq)
+               LIST_INSERT_HEAD(&mqlist, newmq, entry);
+       rw_exit_write(&mq->lock);
+       rw_exit_write(&mqlistlock);
+       free(name, M_TEMP, MQ_NAMELEN);
+
+       *retval = fd;
+       return (0);
+
+err3:
+       rw_exit_write(&mq->lock);
+err2:
+       rw_exit_write(&mqlistlock);
+       fdremove(fdp, fd);
+       closef(fp, p);
+err1:
+       fdpunlock(fdp);
+err0:
+       free(name, M_TEMP, MQ_NAMELEN);
+       return (error);
+}
+
+int
+sys_mq_unlink(struct proc *p, void *v, register_t *retval)
+{
+       struct sys_mq_unlink_args /* {
+               syscallarg(const char *) name;
+       } */ *uap = v;
+       struct mq *mq;
+       char *name;
+       int error;
+       int refcnt;
+
+       name = malloc(MQ_NAMELEN, M_TEMP, M_WAITOK);
+       error = copyinstr(SCARG(uap, name), name, MQ_NAMELEN, NULL);
+       if (error != 0)
+               goto err0;
+
+       rw_enter_write(&mqlistlock);
+       mq = mqlookup(name);
+       if (!mq) {
+               error = ENOENT;
+               goto err1;
+       }
+
+       rw_enter_write(&mq->lock);
+       if (mqaccess(p->p_ucred, mq, VWRITE) != 0) {
+               error = EACCES;
+               goto err2;
+       }
+       LIST_REMOVE(mq, entry);
+       mq->attr.mq_flags |= MQ_DYING;
+       refcnt = mq->refcnt;
+err2:
+       rw_exit_write(&mq->lock);
+err1:
+       rw_exit_write(&mqlistlock);
+err0:
+       free(name, M_TEMP, MQ_NAMELEN);
+
+       /*
+        * Destroy the queue if it doesn't have any
+        * active references.  If refcnt is > 0 at this
+        * point, the queue will be destroyed in mqclose().
+        */
+       if (error == 0 && refcnt == 0)
+               mqdestroy(mq);
+       return (error);
+}
+
+int
+domqsend(struct proc *p, mqd_t mqdes, const char *umsg_ptr, size_t msg_len,
+         unsigned int msg_prio, struct timespec *ats)
+{
+       struct mq *mq;
+       struct mq_msg *msg, *pos;
+       struct filedesc *fdp = p->p_fd;
+       struct file *fp;
+       int timo;
+       int error;
+
+       if (msg_prio >= MQ_PRIO_MAX)
+               return (EINVAL);
+
+       fp = fd_getfile(fdp, mqdes);
+       if (!fp || fp->f_type != DTYPE_MQUEUE)
+               return (EBADF);
+       FREF(fp);
+       mq = fp->f_data;
+
+       rw_enter_write(&mq->lock);
+       if ((fp->f_flag & FWRITE) == 0 ||
+           mqaccess(p->p_ucred, mq, VWRITE)) {
+               rw_exit_write(&mq->lock);
+               FRELE(fp, p);
+               return (EBADF);
+       }
+
+       if (msg_len > mq->attr.mq_msgsize) {
+               rw_exit_write(&mq->lock);
+               FRELE(fp, p);
+               return (EMSGSIZE);
+       }
+
+       /* Block if there is no space in the queue. */
+       while (mq->attr.mq_curmsgs == mq->attr.mq_maxmsg) {
+               rw_exit_write(&mq->lock);
+               if ((fp->f_flag & FNONBLOCK) != 0) {
+                       FRELE(fp, p);
+                       return (EWOULDBLOCK);
+               }
+               timo = 0;
+               if (ats) {
+                       error = ts2timo(p, ats, &timo);
+                       if (error < 0) {
+                               FRELE(fp, p);
+                               return (error);
+                       }
+               }
+               error = tsleep(&mq->schan, PWAIT | PCATCH, "mqschan", timo);
+               if (error != 0) {
+                       FRELE(fp, p);
+                       return (error);
+               }
+               /*
+                * `mq' is always valid here.  If there were references
+                * to the queue, then it couldn't have been destroyed.  If
+                * it was destroyed, we would never be able to wake up
+                * from tsleep() without an error.
+                */
+               rw_enter_write(&mq->lock);
+       }
+
+       msg = msgcreate(umsg_ptr, msg_len, msg_prio, &error);
+       if (!msg) {
+               rw_exit_write(&mq->lock);
+               FRELE(fp, p);
+               return (error);
+       }
+
+       mq->attr.mq_curmsgs++;
+       TAILQ_FOREACH(pos, &mq->head, entry)
+               if (msg->prio > pos->prio)
+                       break;
+       if (!pos)
+               TAILQ_INSERT_TAIL(&mq->head, msg, entry);
+       else
+               TAILQ_INSERT_BEFORE(pos, msg, entry);
+
+       wakeup(&mq->rchan);
+       rw_exit_write(&mq->lock);
+       FRELE(fp, p);
+       return (0);
+}
+
+int
+sys_mq_send(struct proc *p, void *v, register_t *retval)
+{
+       struct sys_mq_send_args /* {
+               syscallarg(mqd_t) mqdes;
+               syscallarg(const char *) msg_ptr;
+               syscallarg(size_t) msg_len;
+               syscallarg(unsigned int) msg_prio;
+       } */ *uap = v;
+
+       return domqsend(p, SCARG(uap, mqdes), SCARG(uap, msg_ptr),
+                       SCARG(uap, msg_len), SCARG(uap, msg_prio), NULL);
+}
+
+int
+sys_mq_timedsend(struct proc *p, void *v, register_t *retval)
+{
+       struct sys_mq_timedsend_args /* {
+               syscallarg(mqd_t) mqdes;
+               syscallarg(const char *) msg_ptr;
+               syscallarg(size_t) msg_len;
+               syscallarg(unsigned int) msg_prio;
+               syscallarg(const struct timespec *) abstime;
+       } */ *uap = v;
+       struct timespec ts;
+       int error;
+
+       error = copyin(SCARG(uap, abstime), &ts, sizeof(ts));
+       if (error != 0)
+               return (error);
+       return domqsend(p, SCARG(uap, mqdes), SCARG(uap, msg_ptr),
+                       SCARG(uap, msg_len), SCARG(uap, msg_prio), &ts);
+}
+
+int
+domqrecv(struct proc *p, mqd_t mqdes, char *umsg_ptr, size_t msg_len,
+         u_int *umsg_prio, struct timespec *ats)
+{
+       struct mq *mq;
+       struct mq_msg *msg;
+       struct filedesc *fdp = p->p_fd;
+       struct file *fp;
+       int timo;
+       int error;
+
+       fp = fd_getfile(fdp, mqdes);
+       if (!fp || fp->f_type != DTYPE_MQUEUE)
+               return (EBADF);
+       FREF(fp);
+       mq = fp->f_data;
+
+       rw_enter_write(&mq->lock);
+       if (mqaccess(p->p_ucred, mq, VREAD)) {
+               rw_exit_write(&mq->lock);
+               FRELE(fp, p);
+               return (EBADF);
+       }
+
+       if (msg_len < mq->attr.mq_msgsize) {
+               rw_exit_write(&mq->lock);
+               FRELE(fp, p);
+               return (EMSGSIZE);
+       }
+
+       /* Block until there are messages to receive. */
+       while (mq->attr.mq_curmsgs == 0) {
+               rw_exit_write(&mq->lock);
+               if ((fp->f_flag & FNONBLOCK) != 0) {
+                       FRELE(fp, p);
+                       return (EWOULDBLOCK);
+               }
+               timo = 0;
+               if (ats) {
+                       error = ts2timo(p, ats, &timo);
+                       if (error < 0) {
+                               FRELE(fp, p);
+                               return (error);
+                       }
+               }
+               error = tsleep(&mq->rchan, PWAIT | PCATCH, "mqrchan", timo);
+               if (error != 0) {
+                       FRELE(fp, p);
+                       return (error);
+               }
+               /*
+                * `mq' is always valid here.  If there were references
+                * to the queue, then it couldn't have been destroyed.  If
+                * it was destroyed, we would never be able to wake up
+                * from tsleep() without an error.
+                */
+               rw_enter_write(&mq->lock);
+       }
+
+       msg = TAILQ_FIRST(&mq->head);
+       error = copyout(msg->data, umsg_ptr, msg->len);
+       if (error != 0) {
+               rw_exit_write(&mq->lock);
+               FRELE(fp, p);
+               return (error);
+       }
+
+       if (umsg_prio) {
+               error = copyout(&msg->prio, umsg_prio, sizeof(msg->prio));
+               if (error != 0) {
+                       rw_exit_write(&mq->lock);
+                       FRELE(fp, p);
+                       return (error);
+               }
+       }
+
+       mq->attr.mq_curmsgs--;
+       TAILQ_REMOVE(&mq->head, msg, entry);
+       wakeup(&mq->schan);
+       rw_exit_write(&mq->lock);
+       FRELE(fp, p);
+       return (0);
+}
+
+int
+sys_mq_receive(struct proc *p, void *v, register_t *retval)
+{
+       struct sys_mq_receive_args /* {
+               syscallarg(mqd_t) mqdes;
+               syscallarg(char *) msg_ptr;
+               syscallarg(size_t) msg_len;
+               syscallarg(unsigned int *) msg_prio;
+       } */ *uap = v;
+
+       return domqrecv(p, SCARG(uap, mqdes), SCARG(uap, msg_ptr),
+                       SCARG(uap, msg_len), SCARG(uap, msg_prio), NULL);
+}
+
+int
+sys_mq_timedreceive(struct proc *p, void *v, register_t *retval)
+{
+       struct sys_mq_timedreceive_args /* {
+               syscallarg(mqd_t) mqdes;
+               syscallarg(char *) msg_ptr;
+               syscallarg(size_t) msg_len;
+               syscallarg(unsigned int *) msg_prio;
+               syscallarg(const struct timespec *) abstime;
+       } */ *uap = v;
+       struct timespec ts;
+       int error;
+
+       error = copyin(SCARG(uap, abstime), &ts, sizeof(ts));
+       if (error != 0)
+               return (error);
+       return domqrecv(p, SCARG(uap, mqdes), SCARG(uap, msg_ptr),
+                       SCARG(uap, msg_len), SCARG(uap, msg_prio), &ts);
+}
+
+int
+sys_mq_setattr(struct proc *p, void *v, register_t *retval)
+{
+       struct sys_mq_setattr_args /* {
+               syscallarg(mqd_t) mqdes;
+               syscallarg(const struct mq_attr *) mqstat;
+               syscallarg(struct mq_attr *) omqstat;
+       } */ *uap = v;
+       struct mq *mq;
+       struct mq_attr attr, oldattr;
+       struct filedesc *fdp = p->p_fd;
+       struct file *fp;
+       int error;
+
+       error = copyin(SCARG(uap, mqstat), &attr, sizeof(attr));
+       if (error != 0)
+               return (error);
+
+       fp = fd_getfile(fdp, SCARG(uap, mqdes));
+       if (!fp || fp->f_type != DTYPE_MQUEUE)
+               return (EBADF);
+       FREF(fp);
+       mq = fp->f_data;
+
+       rw_enter_write(&mq->lock);
+       /* Copy out old attributes if needed */
+       if (SCARG(uap, omqstat)) {
+               memcpy(&oldattr, &mq->attr, sizeof(oldattr));
+               oldattr.mq_flags &= (fp->f_flag & FNONBLOCK);
+               error = copyout(&oldattr, SCARG(uap, omqstat),
+                               sizeof(oldattr));
+               if (error != 0) {
+                       rw_exit_write(&mq->lock);
+                       FRELE(fp, p);
+                       return (error);
+               }
+       }
+       if ((attr.mq_flags & O_NONBLOCK) != 0)
+               fp->f_flag |= FNONBLOCK;
+       else
+               fp->f_flag &= ~FNONBLOCK;
+       rw_exit_write(&mq->lock);
+       FRELE(fp, p);
+
+       return (0);
+}
+
+int
+sys_mq_getattr(struct proc *p, void *v, register_t *retval)
+{
+       struct sys_mq_getattr_args /* {
+               syscallarg(mqd_t) mqdes;
+               syscallarg(struct mq_attr *) mqstat;
+       } */ *uap = v;
+       struct mq *mq;
+       struct mq_attr attr;
+       struct filedesc *fdp = p->p_fd;
+       struct file *fp;
+
+       fp = fd_getfile(fdp, SCARG(uap, mqdes));
+       if (!fp || fp->f_type != DTYPE_MQUEUE)
+               return (EBADF);
+       FREF(fp);
+       mq = fp->f_data;
+
+       rw_enter_write(&mq->lock);
+       memcpy(&attr, &mq->attr, sizeof(attr));
+       rw_exit_write(&mq->lock);
+       FRELE(fp, p);
+       attr.mq_flags &= (fp->f_flag & FNONBLOCK);
+       return copyout(&attr, SCARG(uap, mqstat), sizeof(attr));
+}
diff --git a/kern/syscalls.master b/kern/syscalls.master
index 4dca927..ec130c2 100644
--- a/kern/syscalls.master
+++ b/kern/syscalls.master
@@ -561,3 +561,17 @@
 328    OBSOL           __tfork51
 329    STD NOLOCK      { void sys___set_tcb(void *tcb); }
 330    STD NOLOCK      { void *sys___get_tcb(void); }
+331    STD             { mqd_t sys_mq_open(const char *name, int oflag, \
+                           mode_t mode, struct mq_attr *attr); }
+332    STD             { int sys_mq_unlink(const char *name); }
+333    STD             { int sys_mq_send(mqd_t mqdes, const char *msg_ptr, 
size_t msg_len, \
+                           unsigned int msg_prio); }
+334    STD             { int sys_mq_receive(mqd_t mqdes, char *msg_ptr, size_t 
msg_len, \
+                           unsigned int *msg_prio); }
+335    STD             { int sys_mq_timedsend(mqd_t mqdes, const char 
*msg_ptr, size_t msg_len, \
+                           unsigned int msg_prio, const struct timespec 
*abstime); }
+336    STD             { int sys_mq_timedreceive(mqd_t mqdes, char *msg_ptr, 
size_t msg_len, \
+                           unsigned int *msg_prio, const struct timespec 
*abstime); }
+337    STD             { int sys_mq_setattr(mqd_t mqdes, const struct mq_attr 
*mqstat, \
+                           struct mq_attr *omqstat); }
+338    STD             { int sys_mq_getattr(mqd_t mqdes, struct mq_attr 
*mqstat); }
diff --git a/sys/_types.h b/sys/_types.h
index e058674..a384295 100644
--- a/sys/_types.h
+++ b/sys/_types.h
@@ -51,6 +51,7 @@ typedef       __uint32_t      __in_addr_t;    /* base type 
for internet address */
 typedef        __uint16_t      __in_port_t;    /* IP port type */
 typedef        __uint64_t      __ino_t;        /* inode number */
 typedef        long            __key_t;        /* IPC key (for Sys V IPC) */
+typedef        int             __mqd_t;        /* POSIX realtime message queue 
descriptor */
 typedef        __uint32_t      __mode_t;       /* permissions */
 typedef        __uint32_t      __nlink_t;      /* link count */
 typedef        __int64_t       __off_t;        /* file offset or size */
diff --git a/sys/file.h b/sys/file.h
index 85f900a..662b344 100644
--- a/sys/file.h
+++ b/sys/file.h
@@ -70,6 +70,7 @@ struct file {
 #define        DTYPE_KQUEUE    4       /* event queue */
 /* was define  DTYPE_CRYPTO    5 */
 #define        DTYPE_SYSTRACE  6       /* system call tracing */
+#define DTYPE_MQUEUE   7       /* message queue */
        short   f_type;         /* descriptor type */
        long    f_count;        /* reference count */
        struct  ucred *f_cred;  /* credentials associated with descriptor */
diff --git a/sys/mqueue.h b/sys/mqueue.h
new file mode 100644
index 0000000..5bdeba4
--- /dev/null
+++ b/sys/mqueue.h
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2015 Dimitris Papastamos <[email protected]>
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+#ifndef _SYS_MQUEUE_H_
+#define _SYS_MQUEUE_H_
+
+#define MQ_PRIO_MAX 32
+
+struct mq_attr {
+       long mq_flags;          /* only 0 and O_NONBLOCK are valid */
+       long mq_maxmsg;         /* maximum number of messages */
+       long mq_msgsize;        /* maximum size of message */
+       long mq_curmsgs;        /* current number of messages queued */
+};
+
+#ifdef _KERNEL
+void mqinit(void);
+#endif /* _KERNEL */
+
+#endif /* _SYS_MQUEUE_H_ */
diff --git a/sys/types.h b/sys/types.h
index ceb7de5..c618ffb 100644
--- a/sys/types.h
+++ b/sys/types.h
@@ -141,6 +141,7 @@ typedef     __gid_t         gid_t;          /* group id */
 typedef        __id_t          id_t;           /* may contain pid, uid or gid 
*/
 typedef        __ino_t         ino_t;          /* inode number */
 typedef        __key_t         key_t;          /* IPC key (for Sys V IPC) */
+typedef        __mqd_t         mqd_t;          /* POSIX realtime message queue 
descriptor */
 typedef        __mode_t        mode_t;         /* permissions */
 typedef        __nlink_t       nlink_t;        /* link count */
 typedef        __rlim_t        rlim_t;         /* resource limit */
-- 
2.5.2

Reply via email to