Module: xenomai-forge
Branch: next
Commit: 0140904e9697b8a51b2247d2d2dac343abba663c
URL:    
http://git.xenomai.org/?p=xenomai-forge.git;a=commit;h=0140904e9697b8a51b2247d2d2dac343abba663c

Author: Philippe Gerum <r...@xenomai.org>
Date:   Wed Sep 11 18:40:06 2013 +0200

cobalt/mq: pipeline incoming messages directly to pending waiters

If the message queue is pended, pass any incoming message directly to
the first waiter without going through the queue (aka "pipelined
send").

---

 kernel/cobalt/posix/mqueue.c   |  241 +++++++++++++++++++++-------------------
 kernel/cobalt/posix/registry.c |    3 +-
 kernel/cobalt/posix/registry.h |    2 +-
 3 files changed, 129 insertions(+), 117 deletions(-)

diff --git a/kernel/cobalt/posix/mqueue.c b/kernel/cobalt/posix/mqueue.c
index 354ba19..d0a22c3 100644
--- a/kernel/cobalt/posix/mqueue.c
+++ b/kernel/cobalt/posix/mqueue.c
@@ -84,9 +84,14 @@ struct cobalt_msg {
 
 static struct list_head cobalt_mqq;
 
+struct cobalt_mqwait_context {
+       struct xnthread_wait_context wc;
+       struct cobalt_msg *msg;
+};
+
 static struct mq_attr default_attr = {
-      mq_maxmsg:10,
-      mq_msgsize:8192,
+      .mq_maxmsg = 10,
+      .mq_msgsize = 8192,
 };
 
 static inline struct cobalt_msg *mq_msg_alloc(cobalt_mq_t *mq)
@@ -380,40 +385,36 @@ static inline int mq_close(mqd_t fd)
        cobalt_desc_t *desc;
        cobalt_mq_t *mq;
        spl_t s;
-       int err;
+       int ret;
 
        xnlock_get_irqsave(&nklock, s);
 
-       err = -cobalt_desc_get(&desc, fd, COBALT_MQ_MAGIC);
-       if (err)
-               goto err_unlock;
+       ret = -cobalt_desc_get(&desc, fd, COBALT_MQ_MAGIC);
+       if (ret)
+               goto out;
 
        mq = node2mq(cobalt_desc_node(desc));
-
-       err = -cobalt_node_put(&mq->nodebase);
-       if (err)
-               goto err_unlock;
+       ret = -cobalt_node_put(&mq->nodebase);
+       if (ret)
+               goto out;
 
        if (mq->target_qd == fd)
                mq->target = NULL;
+
        if (cobalt_node_removed_p(&mq->nodebase)) {
                xnlock_put_irqrestore(&nklock, s);
-
                mq_destroy(mq);
                xnfree(mq);
        } else
                xnlock_put_irqrestore(&nklock, s);
 
-       err = -cobalt_desc_destroy(desc);
-       if (err)
-               goto error;
+       cobalt_desc_destroy(desc);
 
        return 0;
-
-      err_unlock:
+ out:
        xnlock_put_irqrestore(&nklock, s);
-      error:
-       return err;
+
+       return ret;
 }
 
 /**
@@ -512,6 +513,8 @@ static inline struct cobalt_msg 
*cobalt_mq_tryrcv(cobalt_mq_t **mqp,
        if (len < mq->attr.mq_msgsize)
                return ERR_PTR(-EMSGSIZE);
 
+       *mqp = mq;
+
        if (list_empty(&mq->queued))
                return ERR_PTR(-EAGAIN);
 
@@ -521,7 +524,6 @@ static inline struct cobalt_msg 
*cobalt_mq_tryrcv(cobalt_mq_t **mqp,
        if (list_empty(&mq->queued))
                xnselect_signal(&mq->read_select, 0);
 
-       *mqp = mq;
        mq->nodebase.refcount++;
 
        return msg;
@@ -533,17 +535,19 @@ cobalt_mq_timedsend_inner(cobalt_mq_t **mqp, mqd_t fd,
 {
        struct xnthread *cur = xnsched_current_thread();
        struct cobalt_msg *msg;
+       cobalt_desc_t *desc;
+       cobalt_mq_t *mq;
+       xntmode_t tmode;
+       xnticks_t to;
        spl_t s;
-       int rc;
+       int ret;
 
        xnlock_get_irqsave(&nklock, s);
-       for (;;) {
-               cobalt_desc_t *desc;
-               cobalt_mq_t *mq;
-               xnticks_t to = XN_INFINITE;
 
-               if ((rc = cobalt_desc_get(&desc, fd, COBALT_MQ_MAGIC))) {
-                       msg = ERR_PTR(-rc);
+       for (;;) {
+               ret = cobalt_desc_get(&desc, fd, COBALT_MQ_MAGIC);
+               if (ret) {
+                       msg = ERR_PTR(-ret);
                        break;
                }
 
@@ -554,36 +558,34 @@ cobalt_mq_timedsend_inner(cobalt_mq_t **mqp, mqd_t fd,
                if ((cobalt_desc_getflags(desc) & O_NONBLOCK))
                        break;
 
+               to = XN_INFINITE;
+               tmode = XN_RELATIVE;
                if (abs_timeoutp) {
                        if ((unsigned long)abs_timeoutp->tv_nsec >= 
ONE_BILLION){
                                msg = ERR_PTR(-EINVAL);
                                break;
                        }
                        to = ts2ns(abs_timeoutp) + 1;
+                       tmode = XN_REALTIME;
                }
 
                mq = node2mq(cobalt_desc_node(desc));
-
-               if (abs_timeoutp)
-                       xnsynch_sleep_on(&mq->senders, to, XN_REALTIME);
-               else
-                       xnsynch_sleep_on(&mq->senders, to, XN_RELATIVE);
+               xnsynch_sleep_on(&mq->senders, to, tmode);
 
                if (xnthread_test_info(cur, XNBREAK)) {
                        msg = ERR_PTR(-EINTR);
                        break;
                }
-
                if (xnthread_test_info(cur, XNTIMEO)) {
                        msg = ERR_PTR(-ETIMEDOUT);
                        break;
                }
-
                if (xnthread_test_info(cur, XNRMID)) {
                        msg = ERR_PTR(-EBADF);
                        break;
                }
        }
+
        xnlock_put_irqrestore(&nklock, s);
 
        return msg;
@@ -592,60 +594,69 @@ cobalt_mq_timedsend_inner(cobalt_mq_t **mqp, mqd_t fd,
 static int
 cobalt_mq_finish_send(mqd_t fd, cobalt_mq_t *mq, struct cobalt_msg *msg)
 {
-       int err = 0, resched = 0, removed;
+       struct cobalt_mqwait_context *mwc;
+       struct xnthread_wait_context *wc;
        struct cobalt_sigpending *sigp;
+       struct xnthread *thread;
        cobalt_desc_t *desc;
+       int ret, removed;
        spl_t s;
 
        xnlock_get_irqsave(&nklock, s);
 
-       if ((err = -cobalt_desc_get(&desc, fd, COBALT_MQ_MAGIC)))
+       ret = -cobalt_desc_get(&desc, fd, COBALT_MQ_MAGIC);
+       if (ret)
                goto bad_fd;
 
-       if ((node2mq(cobalt_desc_node(desc)) != mq)) {
-               err = -EBADF;
+       if (node2mq(cobalt_desc_node(desc)) != mq) {
+               ret = -EBADF;
                goto bad_fd;
        }
 
-       list_add_priff(msg, &mq->queued, prio, link);
-       mq->nrqueued++;
+       /* Can we do pipelined sending? */
+       if (xnsynch_pended_p(&mq->receivers)) {
+               thread = xnsynch_wakeup_one_sleeper(&mq->receivers);
+               wc = xnthread_get_wait_context(thread);
+               mwc = container_of(wc, struct cobalt_mqwait_context, wc);
+               mwc->msg = msg;
+               mq->nodebase.refcount++;
+       } else {
+               /* Nope, have to go through the queue. */
+               list_add_priff(msg, &mq->queued, prio, link);
+               mq->nrqueued++;
 
-       if (list_is_singular(&mq->queued))
-               resched = xnselect_signal(&mq->read_select, 1);
-
-       if (xnsynch_wakeup_one_sleeper(&mq->receivers))
-               resched = 1;
-       else if (mq->target && list_is_singular(&mq->queued)) {
                /*
-                * First message and no pending reader? send a signal
-                * if mq_notify was called.
+                * If first message and no pending reader, send a
+                * signal if notification was enabled via mq_notify().
                 */
-               sigp = cobalt_signal_alloc();
-               if (sigp) {
-                       cobalt_copy_siginfo(SI_MESGQ, &sigp->si, &mq->si);
-                       cobalt_signal_send(mq->target, sigp, 0);
-                       resched = 1;
+               if (list_is_singular(&mq->queued)) {
+                       xnselect_signal(&mq->read_select, 1);
+                       if (mq->target) {
+                               sigp = cobalt_signal_alloc();
+                               if (sigp) {
+                                       cobalt_copy_siginfo(SI_MESGQ, 
&sigp->si, &mq->si);
+                                       cobalt_signal_send(mq->target, sigp, 0);
+                               }
+                               mq->target = NULL;
+                       }
                }
-               mq->target = NULL;
        }
-
-  unref:
+unref:
        cobalt_node_put(&mq->nodebase);
        removed = cobalt_node_removed_p(&mq->nodebase);
 
        xnlock_put_irqrestore(&nklock, s);
 
-       if (resched)
-               xnsched_run();
+       xnsched_run();
 
        if (removed) {
                mq_destroy(mq);
                xnfree(mq);
        }
 
-       return err;
+       return ret;
 
-  bad_fd:
+bad_fd:
        /*
         * descriptor was destroyed, simply return the message to the
         * pool and wakeup any waiting sender.
@@ -653,10 +664,9 @@ cobalt_mq_finish_send(mqd_t fd, cobalt_mq_t *mq, struct 
cobalt_msg *msg)
        mq_msg_free(mq, msg);
 
        if (list_is_singular(&mq->avail))
-               resched = xnselect_signal(&mq->write_select, 1);
+               xnselect_signal(&mq->write_select, 1);
 
-       if (xnsynch_wakeup_one_sleeper(&mq->senders))
-               resched = 1;
+       xnsynch_wakeup_one_sleeper(&mq->senders);
 
        goto unref;
 }
@@ -665,58 +675,61 @@ static struct cobalt_msg *
 cobalt_mq_timedrcv_inner(cobalt_mq_t **mqp, mqd_t fd,
                         size_t len, const struct timespec *abs_timeoutp)
 {
-       xnthread_t *cur = xnsched_current_thread();
+       struct cobalt_mqwait_context mwc;
        struct cobalt_msg *msg;
+       cobalt_desc_t *desc;
+       cobalt_mq_t *mq;
+       xntmode_t tmode;
+       xnticks_t to;
        spl_t s;
-       int rc;
+       int ret;
 
        xnlock_get_irqsave(&nklock, s);
-       for (;;) {
-               xnticks_t to = XN_INFINITE;
-               cobalt_desc_t *desc;
-               cobalt_mq_t *mq;
 
-               if ((rc = cobalt_desc_get(&desc, fd, COBALT_MQ_MAGIC))) {
-                       msg = ERR_PTR(-rc);
-                       break;
-               }
-
-               if ((msg = cobalt_mq_tryrcv(mqp, desc, len)) != 
ERR_PTR(-EAGAIN))
-                       break;
-
-               if ((cobalt_desc_getflags(desc) & O_NONBLOCK))
-                       break;
-
-               if (abs_timeoutp) {
-                       if ((unsigned long)abs_timeoutp->tv_nsec >= 
ONE_BILLION){
-                               msg = ERR_PTR(-EINVAL);
-                               break;
-                       }
-                       to = ts2ns(abs_timeoutp) + 1;
-               }
-
-               mq = node2mq(cobalt_desc_node(desc));
-
-               if (abs_timeoutp)
-                       xnsynch_sleep_on(&mq->receivers, to, XN_REALTIME);
-               else
-                       xnsynch_sleep_on(&mq->receivers, to, XN_RELATIVE);
+       ret = cobalt_desc_get(&desc, fd, COBALT_MQ_MAGIC);
+       if (ret) {
+               msg = ERR_PTR(-ret);
+               goto out;
+       }
 
-               if (xnthread_test_info(cur, XNRMID)) {
-                       msg = ERR_PTR(-EBADF);
-                       break;
-               }
+       msg = cobalt_mq_tryrcv(mqp, desc, len);
+       if (msg != ERR_PTR(-EAGAIN))
+               goto out;
 
-               if (xnthread_test_info(cur, XNTIMEO)) {
-                       msg = ERR_PTR(-ETIMEDOUT);
-                       break;
-               }
+       if (cobalt_desc_getflags(desc) & O_NONBLOCK)
+               goto out;
 
-               if (xnthread_test_info(cur, XNBREAK)) {
-                       msg = ERR_PTR(-EINTR);
-                       break;
+       to = XN_INFINITE;
+       tmode = XN_RELATIVE;
+       if (abs_timeoutp) {
+               if (abs_timeoutp->tv_nsec >= ONE_BILLION) {
+                       msg = ERR_PTR(-EINVAL);
+                       goto out;
                }
+               to = ts2ns(abs_timeoutp) + 1;
+               tmode = XN_REALTIME;
        }
+
+       mq = node2mq(cobalt_desc_node(desc));
+       xnthread_prepare_wait(&mwc.wc);
+       ret = xnsynch_sleep_on(&mq->receivers, to, tmode);
+       xnthread_finish_wait(&mwc.wc, NULL);
+
+       if (ret == 0) {
+               /* Revalidate the descriptor. */
+               ret = cobalt_desc_get(&desc, fd, COBALT_MQ_MAGIC);
+               if (ret) {
+                       mq_msg_free(mq, msg);
+                       msg = ERR_PTR(-ret);
+               }  else
+                       msg = mwc.msg;
+       } else if (ret & XNRMID)
+               msg = ERR_PTR(-EBADF);
+       else if (ret & XNTIMEO)
+               msg = ERR_PTR(-ETIMEDOUT);
+       else
+               msg = ERR_PTR(-EINTR);
+out:
        xnlock_put_irqrestore(&nklock, s);
 
        return msg;
@@ -725,37 +738,37 @@ cobalt_mq_timedrcv_inner(cobalt_mq_t **mqp, mqd_t fd,
 static int
 cobalt_mq_finish_rcv(mqd_t fd, cobalt_mq_t *mq, struct cobalt_msg *msg)
 {
-       int err = 0, resched = 0, removed;
        cobalt_desc_t *desc;
+       int ret, removed;
        spl_t s;
 
        xnlock_get_irqsave(&nklock, s);
-       err = -cobalt_desc_get(&desc, fd, COBALT_MQ_MAGIC);
-       if (!err && node2mq(cobalt_desc_node(desc)) != mq)
-               err = -EBADF;
+
+       ret = -cobalt_desc_get(&desc, fd, COBALT_MQ_MAGIC);
+       if (ret == 0 && node2mq(cobalt_desc_node(desc)) != mq)
+               ret = -EBADF;
 
        mq_msg_free(mq, msg);
 
        if (list_is_singular(&mq->avail))
-               resched = xnselect_signal(&mq->write_select, 1);
+               xnselect_signal(&mq->write_select, 1);
 
-       if (xnsynch_wakeup_one_sleeper(&mq->senders))
-               resched = 1;
+       if (xnsynch_pended_p(&mq->senders))
+               xnsynch_wakeup_one_sleeper(&mq->senders);
 
        cobalt_node_put(&mq->nodebase);
        removed = cobalt_node_removed_p(&mq->nodebase);
 
        xnlock_put_irqrestore(&nklock, s);
 
-       if (resched)
-               xnsched_run();
+       xnsched_run();
 
        if (removed) {
                mq_destroy(mq);
                xnfree(mq);
        }
 
-       return err;
+       return ret;
 }
 
 /**
diff --git a/kernel/cobalt/posix/registry.c b/kernel/cobalt/posix/registry.c
index 8eaf914..a0e4fd4 100644
--- a/kernel/cobalt/posix/registry.c
+++ b/kernel/cobalt/posix/registry.c
@@ -241,7 +241,7 @@ int cobalt_desc_create(cobalt_desc_t ** descp, 
cobalt_node_t * node, long flags)
        return 0;
 }
 
-int cobalt_desc_destroy(cobalt_desc_t * desc)
+void cobalt_desc_destroy(cobalt_desc_t *desc)
 {
        spl_t s;
 
@@ -249,7 +249,6 @@ int cobalt_desc_destroy(cobalt_desc_t * desc)
        cobalt_reg_fd_put(desc->fd);
        xnlock_put_irqrestore(&nklock, s);
        xnfree(desc);
-       return 0;
 }
 
 int cobalt_desc_get(cobalt_desc_t ** descp, int fd, unsigned magic)
diff --git a/kernel/cobalt/posix/registry.h b/kernel/cobalt/posix/registry.h
index 36f569f..06fabf4 100644
--- a/kernel/cobalt/posix/registry.h
+++ b/kernel/cobalt/posix/registry.h
@@ -76,7 +76,7 @@ int cobalt_desc_create(cobalt_desc_t **descp, cobalt_node_t 
*node, long flags);
 
 int cobalt_desc_get(cobalt_desc_t **descp, int fd, unsigned magic);
 
-int cobalt_desc_destroy(cobalt_desc_t *desc);
+void cobalt_desc_destroy(cobalt_desc_t *desc);
 
 #define cobalt_desc_setflags(desc, fl) ((desc)->flags = (fl))
 


_______________________________________________
Xenomai-git mailing list
Xenomai-git@xenomai.org
http://www.xenomai.org/mailman/listinfo/xenomai-git

Reply via email to