Module: xenomai-forge Branch: next Commit: 0140904e9697b8a51b2247d2d2dac343abba663c URL: http://git.xenomai.org/?p=xenomai-forge.git;a=commit;h=0140904e9697b8a51b2247d2d2dac343abba663c
Author: Philippe Gerum <r...@xenomai.org> Date: Wed Sep 11 18:40:06 2013 +0200 cobalt/mq: pipeline incoming messages directly to pending waiters If the message queue is pended, pass any incoming message directly to the first waiter without going through the queue (aka "pipelined send"). --- kernel/cobalt/posix/mqueue.c | 241 +++++++++++++++++++++------------------- kernel/cobalt/posix/registry.c | 3 +- kernel/cobalt/posix/registry.h | 2 +- 3 files changed, 129 insertions(+), 117 deletions(-) diff --git a/kernel/cobalt/posix/mqueue.c b/kernel/cobalt/posix/mqueue.c index 354ba19..d0a22c3 100644 --- a/kernel/cobalt/posix/mqueue.c +++ b/kernel/cobalt/posix/mqueue.c @@ -84,9 +84,14 @@ struct cobalt_msg { static struct list_head cobalt_mqq; +struct cobalt_mqwait_context { + struct xnthread_wait_context wc; + struct cobalt_msg *msg; +}; + static struct mq_attr default_attr = { - mq_maxmsg:10, - mq_msgsize:8192, + .mq_maxmsg = 10, + .mq_msgsize = 8192, }; static inline struct cobalt_msg *mq_msg_alloc(cobalt_mq_t *mq) @@ -380,40 +385,36 @@ static inline int mq_close(mqd_t fd) cobalt_desc_t *desc; cobalt_mq_t *mq; spl_t s; - int err; + int ret; xnlock_get_irqsave(&nklock, s); - err = -cobalt_desc_get(&desc, fd, COBALT_MQ_MAGIC); - if (err) - goto err_unlock; + ret = -cobalt_desc_get(&desc, fd, COBALT_MQ_MAGIC); + if (ret) + goto out; mq = node2mq(cobalt_desc_node(desc)); - - err = -cobalt_node_put(&mq->nodebase); - if (err) - goto err_unlock; + ret = -cobalt_node_put(&mq->nodebase); + if (ret) + goto out; if (mq->target_qd == fd) mq->target = NULL; + if (cobalt_node_removed_p(&mq->nodebase)) { xnlock_put_irqrestore(&nklock, s); - mq_destroy(mq); xnfree(mq); } else xnlock_put_irqrestore(&nklock, s); - err = -cobalt_desc_destroy(desc); - if (err) - goto error; + cobalt_desc_destroy(desc); return 0; - - err_unlock: + out: xnlock_put_irqrestore(&nklock, s); - error: - return err; + + return ret; } /** @@ -512,6 +513,8 @@ static inline struct cobalt_msg *cobalt_mq_tryrcv(cobalt_mq_t **mqp, if (len < mq->attr.mq_msgsize) return ERR_PTR(-EMSGSIZE); + *mqp = mq; + if (list_empty(&mq->queued)) return ERR_PTR(-EAGAIN); @@ -521,7 +524,6 @@ static inline struct cobalt_msg *cobalt_mq_tryrcv(cobalt_mq_t **mqp, if (list_empty(&mq->queued)) xnselect_signal(&mq->read_select, 0); - *mqp = mq; mq->nodebase.refcount++; return msg; @@ -533,17 +535,19 @@ cobalt_mq_timedsend_inner(cobalt_mq_t **mqp, mqd_t fd, { struct xnthread *cur = xnsched_current_thread(); struct cobalt_msg *msg; + cobalt_desc_t *desc; + cobalt_mq_t *mq; + xntmode_t tmode; + xnticks_t to; spl_t s; - int rc; + int ret; xnlock_get_irqsave(&nklock, s); - for (;;) { - cobalt_desc_t *desc; - cobalt_mq_t *mq; - xnticks_t to = XN_INFINITE; - if ((rc = cobalt_desc_get(&desc, fd, COBALT_MQ_MAGIC))) { - msg = ERR_PTR(-rc); + for (;;) { + ret = cobalt_desc_get(&desc, fd, COBALT_MQ_MAGIC); + if (ret) { + msg = ERR_PTR(-ret); break; } @@ -554,36 +558,34 @@ cobalt_mq_timedsend_inner(cobalt_mq_t **mqp, mqd_t fd, if ((cobalt_desc_getflags(desc) & O_NONBLOCK)) break; + to = XN_INFINITE; + tmode = XN_RELATIVE; if (abs_timeoutp) { if ((unsigned long)abs_timeoutp->tv_nsec >= ONE_BILLION){ msg = ERR_PTR(-EINVAL); break; } to = ts2ns(abs_timeoutp) + 1; + tmode = XN_REALTIME; } mq = node2mq(cobalt_desc_node(desc)); - - if (abs_timeoutp) - xnsynch_sleep_on(&mq->senders, to, XN_REALTIME); - else - xnsynch_sleep_on(&mq->senders, to, XN_RELATIVE); + xnsynch_sleep_on(&mq->senders, to, tmode); if (xnthread_test_info(cur, XNBREAK)) { msg = ERR_PTR(-EINTR); break; } - if (xnthread_test_info(cur, XNTIMEO)) { msg = ERR_PTR(-ETIMEDOUT); break; } - if (xnthread_test_info(cur, XNRMID)) { msg = ERR_PTR(-EBADF); break; } } + xnlock_put_irqrestore(&nklock, s); return msg; @@ -592,60 +594,69 @@ cobalt_mq_timedsend_inner(cobalt_mq_t **mqp, mqd_t fd, static int cobalt_mq_finish_send(mqd_t fd, cobalt_mq_t *mq, struct cobalt_msg *msg) { - int err = 0, resched = 0, removed; + struct cobalt_mqwait_context *mwc; + struct xnthread_wait_context *wc; struct cobalt_sigpending *sigp; + struct xnthread *thread; cobalt_desc_t *desc; + int ret, removed; spl_t s; xnlock_get_irqsave(&nklock, s); - if ((err = -cobalt_desc_get(&desc, fd, COBALT_MQ_MAGIC))) + ret = -cobalt_desc_get(&desc, fd, COBALT_MQ_MAGIC); + if (ret) goto bad_fd; - if ((node2mq(cobalt_desc_node(desc)) != mq)) { - err = -EBADF; + if (node2mq(cobalt_desc_node(desc)) != mq) { + ret = -EBADF; goto bad_fd; } - list_add_priff(msg, &mq->queued, prio, link); - mq->nrqueued++; + /* Can we do pipelined sending? */ + if (xnsynch_pended_p(&mq->receivers)) { + thread = xnsynch_wakeup_one_sleeper(&mq->receivers); + wc = xnthread_get_wait_context(thread); + mwc = container_of(wc, struct cobalt_mqwait_context, wc); + mwc->msg = msg; + mq->nodebase.refcount++; + } else { + /* Nope, have to go through the queue. */ + list_add_priff(msg, &mq->queued, prio, link); + mq->nrqueued++; - if (list_is_singular(&mq->queued)) - resched = xnselect_signal(&mq->read_select, 1); - - if (xnsynch_wakeup_one_sleeper(&mq->receivers)) - resched = 1; - else if (mq->target && list_is_singular(&mq->queued)) { /* - * First message and no pending reader? send a signal - * if mq_notify was called. + * If first message and no pending reader, send a + * signal if notification was enabled via mq_notify(). */ - sigp = cobalt_signal_alloc(); - if (sigp) { - cobalt_copy_siginfo(SI_MESGQ, &sigp->si, &mq->si); - cobalt_signal_send(mq->target, sigp, 0); - resched = 1; + if (list_is_singular(&mq->queued)) { + xnselect_signal(&mq->read_select, 1); + if (mq->target) { + sigp = cobalt_signal_alloc(); + if (sigp) { + cobalt_copy_siginfo(SI_MESGQ, &sigp->si, &mq->si); + cobalt_signal_send(mq->target, sigp, 0); + } + mq->target = NULL; + } } - mq->target = NULL; } - - unref: +unref: cobalt_node_put(&mq->nodebase); removed = cobalt_node_removed_p(&mq->nodebase); xnlock_put_irqrestore(&nklock, s); - if (resched) - xnsched_run(); + xnsched_run(); if (removed) { mq_destroy(mq); xnfree(mq); } - return err; + return ret; - bad_fd: +bad_fd: /* * descriptor was destroyed, simply return the message to the * pool and wakeup any waiting sender. @@ -653,10 +664,9 @@ cobalt_mq_finish_send(mqd_t fd, cobalt_mq_t *mq, struct cobalt_msg *msg) mq_msg_free(mq, msg); if (list_is_singular(&mq->avail)) - resched = xnselect_signal(&mq->write_select, 1); + xnselect_signal(&mq->write_select, 1); - if (xnsynch_wakeup_one_sleeper(&mq->senders)) - resched = 1; + xnsynch_wakeup_one_sleeper(&mq->senders); goto unref; } @@ -665,58 +675,61 @@ static struct cobalt_msg * cobalt_mq_timedrcv_inner(cobalt_mq_t **mqp, mqd_t fd, size_t len, const struct timespec *abs_timeoutp) { - xnthread_t *cur = xnsched_current_thread(); + struct cobalt_mqwait_context mwc; struct cobalt_msg *msg; + cobalt_desc_t *desc; + cobalt_mq_t *mq; + xntmode_t tmode; + xnticks_t to; spl_t s; - int rc; + int ret; xnlock_get_irqsave(&nklock, s); - for (;;) { - xnticks_t to = XN_INFINITE; - cobalt_desc_t *desc; - cobalt_mq_t *mq; - if ((rc = cobalt_desc_get(&desc, fd, COBALT_MQ_MAGIC))) { - msg = ERR_PTR(-rc); - break; - } - - if ((msg = cobalt_mq_tryrcv(mqp, desc, len)) != ERR_PTR(-EAGAIN)) - break; - - if ((cobalt_desc_getflags(desc) & O_NONBLOCK)) - break; - - if (abs_timeoutp) { - if ((unsigned long)abs_timeoutp->tv_nsec >= ONE_BILLION){ - msg = ERR_PTR(-EINVAL); - break; - } - to = ts2ns(abs_timeoutp) + 1; - } - - mq = node2mq(cobalt_desc_node(desc)); - - if (abs_timeoutp) - xnsynch_sleep_on(&mq->receivers, to, XN_REALTIME); - else - xnsynch_sleep_on(&mq->receivers, to, XN_RELATIVE); + ret = cobalt_desc_get(&desc, fd, COBALT_MQ_MAGIC); + if (ret) { + msg = ERR_PTR(-ret); + goto out; + } - if (xnthread_test_info(cur, XNRMID)) { - msg = ERR_PTR(-EBADF); - break; - } + msg = cobalt_mq_tryrcv(mqp, desc, len); + if (msg != ERR_PTR(-EAGAIN)) + goto out; - if (xnthread_test_info(cur, XNTIMEO)) { - msg = ERR_PTR(-ETIMEDOUT); - break; - } + if (cobalt_desc_getflags(desc) & O_NONBLOCK) + goto out; - if (xnthread_test_info(cur, XNBREAK)) { - msg = ERR_PTR(-EINTR); - break; + to = XN_INFINITE; + tmode = XN_RELATIVE; + if (abs_timeoutp) { + if (abs_timeoutp->tv_nsec >= ONE_BILLION) { + msg = ERR_PTR(-EINVAL); + goto out; } + to = ts2ns(abs_timeoutp) + 1; + tmode = XN_REALTIME; } + + mq = node2mq(cobalt_desc_node(desc)); + xnthread_prepare_wait(&mwc.wc); + ret = xnsynch_sleep_on(&mq->receivers, to, tmode); + xnthread_finish_wait(&mwc.wc, NULL); + + if (ret == 0) { + /* Revalidate the descriptor. */ + ret = cobalt_desc_get(&desc, fd, COBALT_MQ_MAGIC); + if (ret) { + mq_msg_free(mq, msg); + msg = ERR_PTR(-ret); + } else + msg = mwc.msg; + } else if (ret & XNRMID) + msg = ERR_PTR(-EBADF); + else if (ret & XNTIMEO) + msg = ERR_PTR(-ETIMEDOUT); + else + msg = ERR_PTR(-EINTR); +out: xnlock_put_irqrestore(&nklock, s); return msg; @@ -725,37 +738,37 @@ cobalt_mq_timedrcv_inner(cobalt_mq_t **mqp, mqd_t fd, static int cobalt_mq_finish_rcv(mqd_t fd, cobalt_mq_t *mq, struct cobalt_msg *msg) { - int err = 0, resched = 0, removed; cobalt_desc_t *desc; + int ret, removed; spl_t s; xnlock_get_irqsave(&nklock, s); - err = -cobalt_desc_get(&desc, fd, COBALT_MQ_MAGIC); - if (!err && node2mq(cobalt_desc_node(desc)) != mq) - err = -EBADF; + + ret = -cobalt_desc_get(&desc, fd, COBALT_MQ_MAGIC); + if (ret == 0 && node2mq(cobalt_desc_node(desc)) != mq) + ret = -EBADF; mq_msg_free(mq, msg); if (list_is_singular(&mq->avail)) - resched = xnselect_signal(&mq->write_select, 1); + xnselect_signal(&mq->write_select, 1); - if (xnsynch_wakeup_one_sleeper(&mq->senders)) - resched = 1; + if (xnsynch_pended_p(&mq->senders)) + xnsynch_wakeup_one_sleeper(&mq->senders); cobalt_node_put(&mq->nodebase); removed = cobalt_node_removed_p(&mq->nodebase); xnlock_put_irqrestore(&nklock, s); - if (resched) - xnsched_run(); + xnsched_run(); if (removed) { mq_destroy(mq); xnfree(mq); } - return err; + return ret; } /** diff --git a/kernel/cobalt/posix/registry.c b/kernel/cobalt/posix/registry.c index 8eaf914..a0e4fd4 100644 --- a/kernel/cobalt/posix/registry.c +++ b/kernel/cobalt/posix/registry.c @@ -241,7 +241,7 @@ int cobalt_desc_create(cobalt_desc_t ** descp, cobalt_node_t * node, long flags) return 0; } -int cobalt_desc_destroy(cobalt_desc_t * desc) +void cobalt_desc_destroy(cobalt_desc_t *desc) { spl_t s; @@ -249,7 +249,6 @@ int cobalt_desc_destroy(cobalt_desc_t * desc) cobalt_reg_fd_put(desc->fd); xnlock_put_irqrestore(&nklock, s); xnfree(desc); - return 0; } int cobalt_desc_get(cobalt_desc_t ** descp, int fd, unsigned magic) diff --git a/kernel/cobalt/posix/registry.h b/kernel/cobalt/posix/registry.h index 36f569f..06fabf4 100644 --- a/kernel/cobalt/posix/registry.h +++ b/kernel/cobalt/posix/registry.h @@ -76,7 +76,7 @@ int cobalt_desc_create(cobalt_desc_t **descp, cobalt_node_t *node, long flags); int cobalt_desc_get(cobalt_desc_t **descp, int fd, unsigned magic); -int cobalt_desc_destroy(cobalt_desc_t *desc); +void cobalt_desc_destroy(cobalt_desc_t *desc); #define cobalt_desc_setflags(desc, fl) ((desc)->flags = (fl)) _______________________________________________ Xenomai-git mailing list Xenomai-git@xenomai.org http://www.xenomai.org/mailman/listinfo/xenomai-git