Module: xenomai-forge
Branch: next
Commit: 3ea22affcfa2ceabba3b6a85be5f79a7eeedc24e
URL:    
http://git.xenomai.org/?p=xenomai-forge.git;a=commit;h=3ea22affcfa2ceabba3b6a85be5f79a7eeedc24e

Author: Philippe Gerum <r...@xenomai.org>
Date:   Thu Oct  3 09:43:00 2013 +0200

cobalt/posix/mq: implement pipelined message allocation

Message slots released from the receive side are directly transferred
to threads waiting for them on the send side.

---

 kernel/cobalt/posix/mqueue.c |  117 +++++++++++++++++++++--------------------
 1 files changed, 60 insertions(+), 57 deletions(-)

diff --git a/kernel/cobalt/posix/mqueue.c b/kernel/cobalt/posix/mqueue.c
index 27d1c55..1cafb15 100644
--- a/kernel/cobalt/posix/mqueue.c
+++ b/kernel/cobalt/posix/mqueue.c
@@ -485,6 +485,8 @@ struct cobalt_msg *cobalt_mq_trysend(cobalt_mq_t **mqp,
        if (len > mq->attr.mq_msgsize)
                return ERR_PTR(-EMSGSIZE);
 
+       *mqp = mq;
+
        msg = mq_msg_alloc(mq);
        if (msg == NULL)
                return ERR_PTR(-EAGAIN);
@@ -492,7 +494,6 @@ struct cobalt_msg *cobalt_mq_trysend(cobalt_mq_t **mqp,
        if (list_empty(&mq->avail))
                xnselect_signal(&mq->write_select, 0);
 
-       *mqp = mq;
        mq->nodebase.refcount++;
 
        return msg;
@@ -534,7 +535,7 @@ static struct cobalt_msg *
 cobalt_mq_timedsend_inner(cobalt_mq_t **mqp, mqd_t fd,
                          size_t len, const struct timespec *abs_timeoutp)
 {
-       struct xnthread *cur = xnsched_current_thread();
+       struct cobalt_mqwait_context mwc;
        struct cobalt_msg *msg;
        cobalt_desc_t *desc;
        cobalt_mq_t *mq;
@@ -545,53 +546,72 @@ cobalt_mq_timedsend_inner(cobalt_mq_t **mqp, mqd_t fd,
 
        xnlock_get_irqsave(&nklock, s);
 
-       for (;;) {
-               ret = cobalt_desc_get(&desc, fd, COBALT_MQ_MAGIC);
-               if (ret) {
-                       msg = ERR_PTR(-ret);
-                       break;
-               }
+       ret = cobalt_desc_get(&desc, fd, COBALT_MQ_MAGIC);
+       if (ret) {
+               msg = ERR_PTR(-ret);
+               goto out;
+       }
 
-               msg = cobalt_mq_trysend(mqp, desc, len);
-               if (msg != ERR_PTR(-EAGAIN))
-                       break;
+       msg = cobalt_mq_trysend(mqp, desc, len);
+       if (msg != ERR_PTR(-EAGAIN))
+               goto out;
 
-               if ((cobalt_desc_getflags(desc) & O_NONBLOCK))
-                       break;
+       if ((cobalt_desc_getflags(desc) & O_NONBLOCK))
+               goto out;
 
-               to = XN_INFINITE;
-               tmode = XN_RELATIVE;
-               if (abs_timeoutp) {
-                       if ((unsigned long)abs_timeoutp->tv_nsec >= 
ONE_BILLION){
-                               msg = ERR_PTR(-EINVAL);
-                               break;
-                       }
-                       to = ts2ns(abs_timeoutp) + 1;
-                       tmode = XN_REALTIME;
+       to = XN_INFINITE;
+       tmode = XN_RELATIVE;
+       if (abs_timeoutp) {
+               if ((unsigned long)abs_timeoutp->tv_nsec >= ONE_BILLION) {
+                       msg = ERR_PTR(-EINVAL);
+                       goto out;
                }
+               to = ts2ns(abs_timeoutp) + 1;
+               tmode = XN_REALTIME;
+       }
 
-               mq = node2mq(cobalt_desc_node(desc));
-               xnsynch_sleep_on(&mq->senders, to, tmode);
-
-               if (xnthread_test_info(cur, XNBREAK)) {
+       mq = node2mq(cobalt_desc_node(desc));
+       xnthread_prepare_wait(&mwc.wc);
+       ret = xnsynch_sleep_on(&mq->senders, to, tmode);
+       if (ret) {
+               if (ret & XNBREAK)
                        msg = ERR_PTR(-EINTR);
-                       break;
-               }
-               if (xnthread_test_info(cur, XNTIMEO)) {
+               else if (ret & XNTIMEO)
                        msg = ERR_PTR(-ETIMEDOUT);
-                       break;
-               }
-               if (xnthread_test_info(cur, XNRMID)) {
+               else if (ret & XNRMID)
                        msg = ERR_PTR(-EBADF);
-                       break;
-               }
-       }
-
+       } else
+               msg = mwc.msg;
+out:
        xnlock_put_irqrestore(&nklock, s);
 
        return msg;
 }
 
+static void mq_release_msg(cobalt_mq_t *mq, struct cobalt_msg *msg)
+{
+       struct cobalt_mqwait_context *mwc;
+       struct xnthread_wait_context *wc;
+       struct xnthread *thread;
+
+       /*
+        * Try passing the free message slot to a waiting sender, link
+        * it to the free queue otherwise.
+        */
+       if (xnsynch_pended_p(&mq->senders)) {
+               thread = xnsynch_wakeup_one_sleeper(&mq->senders);
+               wc = xnthread_get_wait_context(thread);
+               mwc = container_of(wc, struct cobalt_mqwait_context, wc);
+               mwc->msg = msg;
+               xnthread_complete_wait(wc);
+               mq->nodebase.refcount++;
+       } else {
+               mq_msg_free(mq, msg);
+               if (list_is_singular(&mq->avail))
+                       xnselect_signal(&mq->write_select, 1);
+       }
+}
+
 static int
 cobalt_mq_finish_send(mqd_t fd, cobalt_mq_t *mq, struct cobalt_msg *msg)
 {
@@ -659,17 +679,7 @@ unref:
        return ret;
 
 bad_fd:
-       /*
-        * descriptor was destroyed, simply return the message to the
-        * pool and wakeup any waiting sender.
-        */;
-       mq_msg_free(mq, msg);
-
-       if (list_is_singular(&mq->avail))
-               xnselect_signal(&mq->write_select, 1);
-
-       xnsynch_wakeup_one_sleeper(&mq->senders);
-
+       mq_release_msg(mq, msg);
        goto unref;
 }
 
@@ -742,21 +752,14 @@ cobalt_mq_finish_rcv(mqd_t fd, cobalt_mq_t *mq, struct 
cobalt_msg *msg)
        if (ret == 0 && node2mq(cobalt_desc_node(desc)) != mq)
                ret = -EBADF;
 
-       mq_msg_free(mq, msg);
-
-       if (list_is_singular(&mq->avail))
-               xnselect_signal(&mq->write_select, 1);
-
-       if (xnsynch_pended_p(&mq->senders))
-               xnsynch_wakeup_one_sleeper(&mq->senders);
-
+       mq_release_msg(mq, msg);
        cobalt_node_put(&mq->nodebase);
        removed = cobalt_node_removed_p(&mq->nodebase);
 
-       xnlock_put_irqrestore(&nklock, s);
-
        xnsched_run();
 
+       xnlock_put_irqrestore(&nklock, s);
+
        if (removed) {
                mq_destroy(mq);
                xnfree(mq);


_______________________________________________
Xenomai-git mailing list
Xenomai-git@xenomai.org
http://www.xenomai.org/mailman/listinfo/xenomai-git

Reply via email to