Hi everyone, I've written a patch that adds support for POSIX message queues. I started this implementation because I wanted to play around with this API in a hobby project. The patch is not yet fully tested but the main functionality is there to warrant some feedback on both the implementation and the usefulness of the API.
I've specifically left out the implementation of mq_notify(2) because OpenBSD lacks sigevent support. I am planning to look into that if this work is of interest. The notification facility of the POSIX mqueue API is one of its strengths compared to the SysV API. Depending on the feedback, I am planning to work on the following items: * Rethink default limits with respect to the default message size, maximum message size, default maximum # of messages and absolute maximum # of messages. Do we need sysctl knobs? * Respect MQ_OPEN_MAX. The implementation uses file descriptors internally so currently the fd limits apply but it may be desirable to have stricter default limits for message queues. * Hook into poll() and kqfilter(). Non-standard extension. * Include regress tests for the API and test it against real software that depend on this API. * Think about how to list the currently live message queues. This can be done with a tool similar to ipcs(1). It can also be done by exposing the message queues on the filesystem. * Implement mq_notify(2) and the related sigevent infrastructure. This maybe can be done incrementally. Unable to estimate how much work it is involved to get this working. * Consider implementing mq_send(3) on top of mq_timedsend(2) in userspace. Similarly for mq_receive(3). * Consider using the pool(9) API for messages that are smaller or equal to the default message size. * Simplify sys_mq_open(). Clean up the rest of the code as much as possible. Do proper style(9) checks. * Cross-check with POSIX to ensure semantics are properly implemented. Overall, I think the POSIX message queue API is cleaner and easier to use than the SysV one. The main and obvious downside is that not much software is using it currently. Let me know what you think! Thanks, Dimitris >From 6b2ed6b728de3c2ca8ab17fca527303b44840ba0 Mon Sep 17 00:00:00 2001 From: Dimitris Papastamos <[email protected]> Date: Tue, 15 Sep 2015 16:54:34 +0100 Subject: [PATCH] Initial implementation of mq_*() interfaces --- conf/files | 1 + kern/init_main.c | 4 + kern/sys_mqueue.c | 724 +++++++++++++++++++++++++++++++++++++++++++++++++++ kern/syscalls.master | 14 + sys/_types.h | 1 + sys/file.h | 1 + sys/mqueue.h | 33 +++ sys/types.h | 1 + 8 files changed, 779 insertions(+) create mode 100644 kern/sys_mqueue.c create mode 100644 sys/mqueue.h diff --git a/conf/files b/conf/files index 691d7f2..f3d5bfc 100644 --- a/conf/files +++ b/conf/files @@ -694,6 +694,7 @@ file kern/subr_prof.c file kern/subr_userconf.c boot_config file kern/subr_xxx.c file kern/sys_generic.c +file kern/sys_mqueue.c file kern/sys_pipe.c file kern/sys_process.c ptrace | systrace file kern/sys_socket.c diff --git a/kern/init_main.c b/kern/init_main.c index e2505ea..cc323d9 100644 --- a/kern/init_main.c +++ b/kern/init_main.c @@ -67,6 +67,7 @@ #ifdef SYSVSEM #include <sys/sem.h> #endif +#include <sys/mqueue.h> #ifdef SYSVMSG #include <sys/msg.h> #endif @@ -376,6 +377,9 @@ main(void *framep) seminit(); #endif + /* Initialize POSIX message queues. */ + mqinit(); + #ifdef SYSVMSG /* Initialize System V style message queues. */ msginit(); diff --git a/kern/sys_mqueue.c b/kern/sys_mqueue.c new file mode 100644 index 0000000..d85cda0 --- /dev/null +++ b/kern/sys_mqueue.c @@ -0,0 +1,724 @@ +/* + * Copyright (c) 2015 Dimitris Papastamos <[email protected]> + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +#include <sys/param.h> +#include <sys/systm.h> +#include <sys/proc.h> +#include <sys/file.h> +#include <sys/filedesc.h> +#include <sys/pool.h> +#include <sys/malloc.h> +#include <sys/mount.h> +#include <sys/stat.h> +#include <sys/syscallargs.h> +#include <sys/time.h> +#include <sys/vnode.h> +#include <sys/mqueue.h> + +#define MQ_NAMELEN (NAME_MAX + 1) +#define MQ_DEF_MSGSIZE 1024 +/* Stored in mq_flags when message queue is marked for removal. */ +#define MQ_DYING 0x10000000 + +struct mq { + char name[MQ_NAMELEN]; + int oflag; + mode_t mode; + struct mq_attr attr; + int schan; + int rchan; + uid_t euid; + gid_t egid; + u_int refcnt; + struct rwlock lock; + TAILQ_HEAD(, mq_msg) head; + LIST_ENTRY(mq) entry; +}; + +struct mq_msg { + TAILQ_ENTRY(mq_msg) entry; + size_t len; + u_int prio; + uint8_t data[]; +}; + +int mqread(struct file *, off_t *, struct uio *, struct ucred *); +int mqwrite(struct file *, off_t *, struct uio *, struct ucred *); +int mqioctl(struct file *, u_long, caddr_t, struct proc *); +int mqpoll(struct file *, int, struct proc *); +int mqkqfilter(struct file *, struct knote *); +int mqstat(struct file *, struct stat *, struct proc *); +int mqclose(struct file *, struct proc *); + +struct fileops mqops = { + mqread, mqwrite, mqioctl, mqpoll, mqkqfilter, mqstat, mqclose +}; + +/* TODO: Rethink default limits, should there be sysctl knobs? */ +u_int mq_max_msgsize = 16 * MQ_DEF_MSGSIZE; +u_int mq_def_maxmsg = 128; +u_int mq_max_maxmsg = 16 * 32; + +LIST_HEAD(, mq) mqlist; +struct rwlock mqlistlock; + +void +mqinit(void) +{ + rw_init(&mqlistlock, "mqlistlock"); + LIST_INIT(&mqlist); +} + +int +ts2timo(struct proc *p, struct timespec *ts, int *timo) +{ + struct timespec rts; + int error; + + error = clock_gettime(p, CLOCK_REALTIME, &rts); + if (error != 0) + return (error); + if (timespeccmp(&rts, ts, >=)) + return (ETIMEDOUT); + timespecsub(ts, &rts, ts); + error = timespecfix(ts); + if (error != 0) + return (error); + *timo = tstohz(ts); + return (0); +} + +struct mq * +mqlookup(char *name) +{ + struct mq *mq; + + LIST_FOREACH(mq, &mqlist, entry) + if (strncmp(mq->name, name, MQ_NAMELEN) == 0) + return (mq); + return (NULL); +} + +void +msgdestroy(struct mq_msg *msg) +{ + free(msg, M_TEMP, sizeof(*msg) + msg->len); +} + +struct mq_msg * +msgcreate(const char *udata, size_t len, u_int prio, int *error) +{ + struct mq_msg *msg; + + msg = malloc(sizeof(*msg) + len, M_TEMP, M_WAITOK); + *error = copyin(udata, msg->data, len); + if (*error != 0) { + free(msg, M_TEMP, sizeof(*msg) + len); + return (NULL); + } + msg->len = len; + msg->prio = prio; + return (msg); +} + +void +mqdestroy(struct mq *mq) +{ + struct mq_msg *msg; + + while ((msg = TAILQ_FIRST(&mq->head))) { + TAILQ_REMOVE(&mq->head, msg, entry); + msgdestroy(msg); + } + free(mq, M_TEMP, sizeof(*mq)); +} + +struct mq * +mqcreate(char *name, int oflag, mode_t mode, struct mq_attr *attr, + struct proc *p) +{ + struct mq *mq; + + mq = malloc(sizeof(*mq), M_TEMP, M_WAITOK | M_ZERO); + strlcpy(mq->name, name, sizeof(mq->name)); + mq->oflag = oflag; + mq->mode = (mode & ALLPERMS) & ~p->p_fd->fd_cmask; + mq->euid = p->p_ucred->cr_uid; + mq->egid = p->p_ucred->cr_gid; + memcpy(&mq->attr, attr, sizeof(mq->attr)); + rw_init(&mq->lock, "mqlock"); + TAILQ_INIT(&mq->head); + return (mq); +} + +int +mqaccess(struct ucred *cred, struct mq *mq, mode_t mode) +{ + return vaccess(VNON, mq->mode, mq->euid, mq->egid, mode, cred); +} + +/* + * TODO: Maybe it would make sense to have dummy bad ops for these + * somewhere else so we don't need to define our own. + */ +int +mqread(struct file *fp, off_t *off, struct uio *uio, struct ucred *cred) +{ + return (EOPNOTSUPP); +} + +int +mqwrite(struct file *fp, off_t *off, struct uio *uio, struct ucred *cred) +{ + return (EOPNOTSUPP); +} + +int +mqioctl(struct file *fp, u_long cmd, caddr_t data, struct proc *p) +{ + return (EOPNOTSUPP); +} + +/* + * TODO: As a non-standard extension, implement poll + * on message queue descriptors. + */ +int +mqpoll(struct file *fp, int events, struct proc *p) +{ + return (EOPNOTSUPP); +} + +/* + * TODO: As a non-standard extension, implement kqfilter + * on message queue descriptors. + */ +int +mqkqfilter(struct file *fp, struct knote *kn) +{ + return (EOPNOTSUPP); +} + +/* + * TODO: Check if it makes sense to implement stat for + * message queue descriptors. + */ +int +mqstat(struct file *fp, struct stat *ub, struct proc *p) +{ + return (EOPNOTSUPP); +} + +int +mqclose(struct file *fp, struct proc *p) +{ + struct mq *mq; + int destroy = 0; + + mq = fp->f_data; + rw_enter_write(&mq->lock); + if (--mq->refcnt == 0 && (mq->attr.mq_flags & MQ_DYING) != 0) + destroy = 1; + rw_exit_write(&mq->lock); + if (destroy != 0) + mqdestroy(mq); + return (0); +} + +/* + * Require one slash at the start as per POSIX. + * Disallow any further slashes. + */ +int +namechk(char *name) +{ + if (*name++ != '/') + return (EINVAL); + while (*name && *name != '/') + name++; + return *name ? (EINVAL) : (0); +} + +mqd_t +sys_mq_open(struct proc *p, void *v, register_t *retval) +{ + struct sys_mq_open_args /* { + syscallarg(const char *) name; + syscallarg(int) oflag; + syscallarg(mode_t) mode; + syscallarg(struct mq_attr *) attr; + } */ *uap = v; + struct mq *mq, *newmq = NULL; + struct mq_attr attr; + struct filedesc *fdp = p->p_fd; + struct file *fp; + mode_t accmode; + char *name; + int oflag, fd, error; + + oflag = SCARG(uap, oflag); + if ((oflag & O_CREAT) != 0) { + if (SCARG(uap, attr)) { + error = copyin(SCARG(uap, attr), &attr, sizeof(attr)); + if (error != 0) + return (error); + if (attr.mq_maxmsg <= 0 || attr.mq_msgsize <= 0) + return (EINVAL); + if (attr.mq_maxmsg > mq_max_maxmsg || + attr.mq_msgsize > mq_max_msgsize) + return (ENOSPC); + attr.mq_curmsgs = 0; + } else { + memset(&attr, 0, sizeof(attr)); + attr.mq_maxmsg = mq_def_maxmsg; + attr.mq_msgsize = MQ_DEF_MSGSIZE; + } + } + + name = malloc(MQ_NAMELEN, M_TEMP, M_WAITOK); + error = copyinstr(SCARG(uap, name), name, MQ_NAMELEN, NULL); + if (error != 0) + goto err0; + + error = namechk(name); + if (error != 0) + goto err0; + + fdplock(fdp); + error = falloc(p, &fp, &fd); + if (error != 0) + goto err1; + + fp->f_flag = FREAD; + if ((oflag & O_WRONLY) != 0 || (oflag & O_RDWR) != 0) + fp->f_flag |= FWRITE; + if ((oflag & O_NONBLOCK) != 0) + fp->f_flag |= FNONBLOCK; + + rw_enter_write(&mqlistlock); + mq = mqlookup(name); + if (mq) { + rw_enter_write(&mq->lock); + if ((oflag & O_CREAT) != 0 && (oflag & O_EXCL) != 0) { + error = EEXIST; + goto err3; + } + accmode = VREAD; + if ((fp->f_flag & FWRITE) != 0) + accmode |= VWRITE; + if (mqaccess(p->p_ucred, mq, accmode) != 0) { + error = EACCES; + goto err3; + } + } else { + if ((oflag & O_CREAT) == 0) { + error = ENOENT; + goto err2; + } + mq = mqcreate(name, oflag, SCARG(uap, mode), &attr, p); + newmq = mq; + rw_enter_write(&mq->lock); + } + + fp->f_type = DTYPE_MQUEUE; + fp->f_data = mq; + fp->f_ops = &mqops; + + fdp->fd_ofileflags[fd] |= UF_EXCLOSE; + + FILE_SET_MATURE(fp, p); + fdpunlock(fdp); + + mq->refcnt++; + if (newmq) + LIST_INSERT_HEAD(&mqlist, newmq, entry); + rw_exit_write(&mq->lock); + rw_exit_write(&mqlistlock); + free(name, M_TEMP, MQ_NAMELEN); + + *retval = fd; + return (0); + +err3: + rw_exit_write(&mq->lock); +err2: + rw_exit_write(&mqlistlock); + fdremove(fdp, fd); + closef(fp, p); +err1: + fdpunlock(fdp); +err0: + free(name, M_TEMP, MQ_NAMELEN); + return (error); +} + +int +sys_mq_unlink(struct proc *p, void *v, register_t *retval) +{ + struct sys_mq_unlink_args /* { + syscallarg(const char *) name; + } */ *uap = v; + struct mq *mq; + char *name; + int error; + int refcnt; + + name = malloc(MQ_NAMELEN, M_TEMP, M_WAITOK); + error = copyinstr(SCARG(uap, name), name, MQ_NAMELEN, NULL); + if (error != 0) + goto err0; + + rw_enter_write(&mqlistlock); + mq = mqlookup(name); + if (!mq) { + error = ENOENT; + goto err1; + } + + rw_enter_write(&mq->lock); + if (mqaccess(p->p_ucred, mq, VWRITE) != 0) { + error = EACCES; + goto err2; + } + LIST_REMOVE(mq, entry); + mq->attr.mq_flags |= MQ_DYING; + refcnt = mq->refcnt; +err2: + rw_exit_write(&mq->lock); +err1: + rw_exit_write(&mqlistlock); +err0: + free(name, M_TEMP, MQ_NAMELEN); + + /* + * Destroy the queue if it doesn't have any + * active references. If refcnt is > 0 at this + * point, the queue will be destroyed in mqclose(). + */ + if (error == 0 && refcnt == 0) + mqdestroy(mq); + return (error); +} + +int +domqsend(struct proc *p, mqd_t mqdes, const char *umsg_ptr, size_t msg_len, + unsigned int msg_prio, struct timespec *ats) +{ + struct mq *mq; + struct mq_msg *msg, *pos; + struct filedesc *fdp = p->p_fd; + struct file *fp; + int timo; + int error; + + if (msg_prio >= MQ_PRIO_MAX) + return (EINVAL); + + fp = fd_getfile(fdp, mqdes); + if (!fp || fp->f_type != DTYPE_MQUEUE) + return (EBADF); + FREF(fp); + mq = fp->f_data; + + rw_enter_write(&mq->lock); + if ((fp->f_flag & FWRITE) == 0 || + mqaccess(p->p_ucred, mq, VWRITE)) { + rw_exit_write(&mq->lock); + FRELE(fp, p); + return (EBADF); + } + + if (msg_len > mq->attr.mq_msgsize) { + rw_exit_write(&mq->lock); + FRELE(fp, p); + return (EMSGSIZE); + } + + /* Block if there is no space in the queue. */ + while (mq->attr.mq_curmsgs == mq->attr.mq_maxmsg) { + rw_exit_write(&mq->lock); + if ((fp->f_flag & FNONBLOCK) != 0) { + FRELE(fp, p); + return (EWOULDBLOCK); + } + timo = 0; + if (ats) { + error = ts2timo(p, ats, &timo); + if (error < 0) { + FRELE(fp, p); + return (error); + } + } + error = tsleep(&mq->schan, PWAIT | PCATCH, "mqschan", timo); + if (error != 0) { + FRELE(fp, p); + return (error); + } + /* + * `mq' is always valid here. If there were references + * to the queue, then it couldn't have been destroyed. If + * it was destroyed, we would never be able to wake up + * from tsleep() without an error. + */ + rw_enter_write(&mq->lock); + } + + msg = msgcreate(umsg_ptr, msg_len, msg_prio, &error); + if (!msg) { + rw_exit_write(&mq->lock); + FRELE(fp, p); + return (error); + } + + mq->attr.mq_curmsgs++; + TAILQ_FOREACH(pos, &mq->head, entry) + if (msg->prio > pos->prio) + break; + if (!pos) + TAILQ_INSERT_TAIL(&mq->head, msg, entry); + else + TAILQ_INSERT_BEFORE(pos, msg, entry); + + wakeup(&mq->rchan); + rw_exit_write(&mq->lock); + FRELE(fp, p); + return (0); +} + +int +sys_mq_send(struct proc *p, void *v, register_t *retval) +{ + struct sys_mq_send_args /* { + syscallarg(mqd_t) mqdes; + syscallarg(const char *) msg_ptr; + syscallarg(size_t) msg_len; + syscallarg(unsigned int) msg_prio; + } */ *uap = v; + + return domqsend(p, SCARG(uap, mqdes), SCARG(uap, msg_ptr), + SCARG(uap, msg_len), SCARG(uap, msg_prio), NULL); +} + +int +sys_mq_timedsend(struct proc *p, void *v, register_t *retval) +{ + struct sys_mq_timedsend_args /* { + syscallarg(mqd_t) mqdes; + syscallarg(const char *) msg_ptr; + syscallarg(size_t) msg_len; + syscallarg(unsigned int) msg_prio; + syscallarg(const struct timespec *) abstime; + } */ *uap = v; + struct timespec ts; + int error; + + error = copyin(SCARG(uap, abstime), &ts, sizeof(ts)); + if (error != 0) + return (error); + return domqsend(p, SCARG(uap, mqdes), SCARG(uap, msg_ptr), + SCARG(uap, msg_len), SCARG(uap, msg_prio), &ts); +} + +int +domqrecv(struct proc *p, mqd_t mqdes, char *umsg_ptr, size_t msg_len, + u_int *umsg_prio, struct timespec *ats) +{ + struct mq *mq; + struct mq_msg *msg; + struct filedesc *fdp = p->p_fd; + struct file *fp; + int timo; + int error; + + fp = fd_getfile(fdp, mqdes); + if (!fp || fp->f_type != DTYPE_MQUEUE) + return (EBADF); + FREF(fp); + mq = fp->f_data; + + rw_enter_write(&mq->lock); + if (mqaccess(p->p_ucred, mq, VREAD)) { + rw_exit_write(&mq->lock); + FRELE(fp, p); + return (EBADF); + } + + if (msg_len < mq->attr.mq_msgsize) { + rw_exit_write(&mq->lock); + FRELE(fp, p); + return (EMSGSIZE); + } + + /* Block until there are messages to receive. */ + while (mq->attr.mq_curmsgs == 0) { + rw_exit_write(&mq->lock); + if ((fp->f_flag & FNONBLOCK) != 0) { + FRELE(fp, p); + return (EWOULDBLOCK); + } + timo = 0; + if (ats) { + error = ts2timo(p, ats, &timo); + if (error < 0) { + FRELE(fp, p); + return (error); + } + } + error = tsleep(&mq->rchan, PWAIT | PCATCH, "mqrchan", timo); + if (error != 0) { + FRELE(fp, p); + return (error); + } + /* + * `mq' is always valid here. If there were references + * to the queue, then it couldn't have been destroyed. If + * it was destroyed, we would never be able to wake up + * from tsleep() without an error. + */ + rw_enter_write(&mq->lock); + } + + msg = TAILQ_FIRST(&mq->head); + error = copyout(msg->data, umsg_ptr, msg->len); + if (error != 0) { + rw_exit_write(&mq->lock); + FRELE(fp, p); + return (error); + } + + if (umsg_prio) { + error = copyout(&msg->prio, umsg_prio, sizeof(msg->prio)); + if (error != 0) { + rw_exit_write(&mq->lock); + FRELE(fp, p); + return (error); + } + } + + mq->attr.mq_curmsgs--; + TAILQ_REMOVE(&mq->head, msg, entry); + wakeup(&mq->schan); + rw_exit_write(&mq->lock); + FRELE(fp, p); + return (0); +} + +int +sys_mq_receive(struct proc *p, void *v, register_t *retval) +{ + struct sys_mq_receive_args /* { + syscallarg(mqd_t) mqdes; + syscallarg(char *) msg_ptr; + syscallarg(size_t) msg_len; + syscallarg(unsigned int *) msg_prio; + } */ *uap = v; + + return domqrecv(p, SCARG(uap, mqdes), SCARG(uap, msg_ptr), + SCARG(uap, msg_len), SCARG(uap, msg_prio), NULL); +} + +int +sys_mq_timedreceive(struct proc *p, void *v, register_t *retval) +{ + struct sys_mq_timedreceive_args /* { + syscallarg(mqd_t) mqdes; + syscallarg(char *) msg_ptr; + syscallarg(size_t) msg_len; + syscallarg(unsigned int *) msg_prio; + syscallarg(const struct timespec *) abstime; + } */ *uap = v; + struct timespec ts; + int error; + + error = copyin(SCARG(uap, abstime), &ts, sizeof(ts)); + if (error != 0) + return (error); + return domqrecv(p, SCARG(uap, mqdes), SCARG(uap, msg_ptr), + SCARG(uap, msg_len), SCARG(uap, msg_prio), &ts); +} + +int +sys_mq_setattr(struct proc *p, void *v, register_t *retval) +{ + struct sys_mq_setattr_args /* { + syscallarg(mqd_t) mqdes; + syscallarg(const struct mq_attr *) mqstat; + syscallarg(struct mq_attr *) omqstat; + } */ *uap = v; + struct mq *mq; + struct mq_attr attr, oldattr; + struct filedesc *fdp = p->p_fd; + struct file *fp; + int error; + + error = copyin(SCARG(uap, mqstat), &attr, sizeof(attr)); + if (error != 0) + return (error); + + fp = fd_getfile(fdp, SCARG(uap, mqdes)); + if (!fp || fp->f_type != DTYPE_MQUEUE) + return (EBADF); + FREF(fp); + mq = fp->f_data; + + rw_enter_write(&mq->lock); + /* Copy out old attributes if needed */ + if (SCARG(uap, omqstat)) { + memcpy(&oldattr, &mq->attr, sizeof(oldattr)); + oldattr.mq_flags &= (fp->f_flag & FNONBLOCK); + error = copyout(&oldattr, SCARG(uap, omqstat), + sizeof(oldattr)); + if (error != 0) { + rw_exit_write(&mq->lock); + FRELE(fp, p); + return (error); + } + } + if ((attr.mq_flags & O_NONBLOCK) != 0) + fp->f_flag |= FNONBLOCK; + else + fp->f_flag &= ~FNONBLOCK; + rw_exit_write(&mq->lock); + FRELE(fp, p); + + return (0); +} + +int +sys_mq_getattr(struct proc *p, void *v, register_t *retval) +{ + struct sys_mq_getattr_args /* { + syscallarg(mqd_t) mqdes; + syscallarg(struct mq_attr *) mqstat; + } */ *uap = v; + struct mq *mq; + struct mq_attr attr; + struct filedesc *fdp = p->p_fd; + struct file *fp; + + fp = fd_getfile(fdp, SCARG(uap, mqdes)); + if (!fp || fp->f_type != DTYPE_MQUEUE) + return (EBADF); + FREF(fp); + mq = fp->f_data; + + rw_enter_write(&mq->lock); + memcpy(&attr, &mq->attr, sizeof(attr)); + rw_exit_write(&mq->lock); + FRELE(fp, p); + attr.mq_flags &= (fp->f_flag & FNONBLOCK); + return copyout(&attr, SCARG(uap, mqstat), sizeof(attr)); +} diff --git a/kern/syscalls.master b/kern/syscalls.master index 4dca927..ec130c2 100644 --- a/kern/syscalls.master +++ b/kern/syscalls.master @@ -561,3 +561,17 @@ 328 OBSOL __tfork51 329 STD NOLOCK { void sys___set_tcb(void *tcb); } 330 STD NOLOCK { void *sys___get_tcb(void); } +331 STD { mqd_t sys_mq_open(const char *name, int oflag, \ + mode_t mode, struct mq_attr *attr); } +332 STD { int sys_mq_unlink(const char *name); } +333 STD { int sys_mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, \ + unsigned int msg_prio); } +334 STD { int sys_mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, \ + unsigned int *msg_prio); } +335 STD { int sys_mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, \ + unsigned int msg_prio, const struct timespec *abstime); } +336 STD { int sys_mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len, \ + unsigned int *msg_prio, const struct timespec *abstime); } +337 STD { int sys_mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat, \ + struct mq_attr *omqstat); } +338 STD { int sys_mq_getattr(mqd_t mqdes, struct mq_attr *mqstat); } diff --git a/sys/_types.h b/sys/_types.h index e058674..a384295 100644 --- a/sys/_types.h +++ b/sys/_types.h @@ -51,6 +51,7 @@ typedef __uint32_t __in_addr_t; /* base type for internet address */ typedef __uint16_t __in_port_t; /* IP port type */ typedef __uint64_t __ino_t; /* inode number */ typedef long __key_t; /* IPC key (for Sys V IPC) */ +typedef int __mqd_t; /* POSIX realtime message queue descriptor */ typedef __uint32_t __mode_t; /* permissions */ typedef __uint32_t __nlink_t; /* link count */ typedef __int64_t __off_t; /* file offset or size */ diff --git a/sys/file.h b/sys/file.h index 85f900a..662b344 100644 --- a/sys/file.h +++ b/sys/file.h @@ -70,6 +70,7 @@ struct file { #define DTYPE_KQUEUE 4 /* event queue */ /* was define DTYPE_CRYPTO 5 */ #define DTYPE_SYSTRACE 6 /* system call tracing */ +#define DTYPE_MQUEUE 7 /* message queue */ short f_type; /* descriptor type */ long f_count; /* reference count */ struct ucred *f_cred; /* credentials associated with descriptor */ diff --git a/sys/mqueue.h b/sys/mqueue.h new file mode 100644 index 0000000..5bdeba4 --- /dev/null +++ b/sys/mqueue.h @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2015 Dimitris Papastamos <[email protected]> + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +#ifndef _SYS_MQUEUE_H_ +#define _SYS_MQUEUE_H_ + +#define MQ_PRIO_MAX 32 + +struct mq_attr { + long mq_flags; /* only 0 and O_NONBLOCK are valid */ + long mq_maxmsg; /* maximum number of messages */ + long mq_msgsize; /* maximum size of message */ + long mq_curmsgs; /* current number of messages queued */ +}; + +#ifdef _KERNEL +void mqinit(void); +#endif /* _KERNEL */ + +#endif /* _SYS_MQUEUE_H_ */ diff --git a/sys/types.h b/sys/types.h index ceb7de5..c618ffb 100644 --- a/sys/types.h +++ b/sys/types.h @@ -141,6 +141,7 @@ typedef __gid_t gid_t; /* group id */ typedef __id_t id_t; /* may contain pid, uid or gid */ typedef __ino_t ino_t; /* inode number */ typedef __key_t key_t; /* IPC key (for Sys V IPC) */ +typedef __mqd_t mqd_t; /* POSIX realtime message queue descriptor */ typedef __mode_t mode_t; /* permissions */ typedef __nlink_t nlink_t; /* link count */ typedef __rlim_t rlim_t; /* resource limit */ -- 2.5.2
