Module: xenomai-forge
Branch: master
Commit: a4fb94614a70b143def5325f8795a0cca251d7ef
URL:    
http://git.xenomai.org/?p=xenomai-forge.git;a=commit;h=a4fb94614a70b143def5325f8795a0cca251d7ef

Author: Philippe Gerum <[email protected]>
Date:   Tue Nov 29 12:32:43 2011 +0100

alchemy/queue: optimize with direct copy to waiter

This patch implements direct message passing between rt_queue_write()
and rt_queue_read() when the runtime conditions allow.

Sending a message to a queue already pended by a waiter thread is a
quite common case. When both the sender and the receiver threads
belong to the same process, we may copy the data sent through
rt_queue_write() directly to the receiver's buffer passed to
rt_queue_read() before unblocking the thread.

Sending the data over this way, without going through any queuing
saves one data copy, and the combined cost of allocating then
releasing a message buffer, which is significant, particularly on low
end hardware.

---

 lib/alchemy/queue.c |  182 +++++++++++++++++++++++++++++++++++++++++----------
 lib/alchemy/queue.h |    2 +
 2 files changed, 150 insertions(+), 34 deletions(-)

diff --git a/lib/alchemy/queue.c b/lib/alchemy/queue.c
index 65c5164..3f40356 100644
--- a/lib/alchemy/queue.c
+++ b/lib/alchemy/queue.c
@@ -142,7 +142,10 @@ void *rt_queue_alloc(RT_QUEUE *queue, size_t size)
        if (msg == NULL)
                goto done;
 
-       inith(&msg->next);
+       /*
+        * XXX: no need to init the ->next holder, list_*pend() do not
+        * require this, and this ends up being costly on low end.
+        */
        msg->size = size;       /* Zero is allowed. */
        msg->refcount = 1;
        ++msg;
@@ -271,20 +274,86 @@ out:
 int rt_queue_write(RT_QUEUE *queue,
                   const void *buf, size_t size, int mode)
 {
-       void *_buf;
-       int ret;
+       struct alchemy_queue_wait *wait;
+       struct alchemy_queue_msg *msg;
+       struct alchemy_queue *qcb;
+       struct threadobj *waiter;
+       struct syncstate syns;
+       int ret = 0, nwaiters;
+       struct service svc;
+       size_t usersz;
+
+       if (size == 0)
+               return 0;
+
+       COPPERPLATE_PROTECT(svc);
+
+       qcb = get_alchemy_queue(queue, &syns, &ret);
+       if (qcb == NULL)
+               goto out;
+
+       waiter = syncobj_peek_at_pend(&qcb->sobj);
+       if (waiter && threadobj_local_p(waiter)) {
+               /*
+                * Fast path for local threads already waiting for
+                * data via rt_queue_read(): do direct copy to the
+                * reader's buffer.
+                */
+               wait = threadobj_get_wait(waiter);
+               usersz = wait->usersz;
+               if (usersz == 0)
+                       /* no buffer provided, enqueue normally. */
+                       goto enqueue;
+               if (size > usersz)
+                       size = usersz;
+               if (size > 0)
+                       memcpy(wait->userbuf, buf, size);
+               wait->usersz = size;
+               syncobj_wakeup_waiter(&qcb->sobj, waiter);
+               ret = 1;
+               goto done;
+       }
+
+enqueue:
+       nwaiters = syncobj_pend_count(&qcb->sobj);
+       if (nwaiters == 0 && (mode & Q_BROADCAST) != 0)
+               goto done;
+
+       ret = -ENOMEM;
+       if (qcb->limit && qcb->mcount >= qcb->limit)
+               goto done;
 
-       _buf = rt_queue_alloc(queue, size);
-       if (_buf == NULL)
-               return -ENOMEM;
+       msg = heapobj_alloc(&qcb->hobj, size + sizeof(*msg));
+       if (msg == NULL)
+               goto done;
 
-       if (size > 0)
-               memcpy(_buf, buf, size);
+       msg->size = size;
+       msg->refcount = 0;
+       memcpy(msg + 1, buf, size);
 
-       ret = rt_queue_send(queue, _buf, size, mode);
-       if (ret == 0 && (mode & Q_BROADCAST))
-               /* Nobody received, free the temp buffer. */
-               rt_queue_free(queue, _buf);
+       ret = 0;  /* # of tasks unblocked. */
+       if (nwaiters == 0) {
+               qcb->mcount++;
+               if (mode & Q_URGENT)
+                       list_prepend(&msg->next, &qcb->mq);
+               else
+                       list_append(&msg->next, &qcb->mq);
+               goto done;
+       }
+
+       do {
+               waiter = syncobj_post(&qcb->sobj);
+               if (waiter == NULL)
+                       break;
+               wait = threadobj_get_wait(waiter);
+               wait->msg = msg;
+               msg->refcount++;
+               ret++;
+       } while (mode & Q_BROADCAST);
+done:
+       put_alchemy_queue(qcb, &syns);
+out:
+       COPPERPLATE_UNPROTECT(svc);
 
        return ret;
 }
@@ -311,21 +380,23 @@ ssize_t rt_queue_receive_timed(RT_QUEUE *queue, void 
**bufp,
                goto out;
        }
 
-       if (!list_empty(&qcb->mq)) {
-               msg = list_pop_entry(&qcb->mq, struct alchemy_queue_msg, next);
-               msg->refcount++;
-               *bufp = msg + 1;
-               ret = (ssize_t)msg->size;
-               qcb->mcount--;
-               goto done;
-       }
+       if (list_empty(&qcb->mq))
+               goto wait;
 
+       msg = list_pop_entry(&qcb->mq, struct alchemy_queue_msg, next);
+       msg->refcount++;
+       *bufp = msg + 1;
+       ret = (ssize_t)msg->size;
+       qcb->mcount--;
+       goto done;
+wait:
        if (alchemy_poll_mode(abs_timeout)) {
                ret = -EWOULDBLOCK;
                goto done;
        }
 
        wait = threadobj_prepare_wait(struct alchemy_queue_wait);
+       wait->usersz = 0;
 
        ret = syncobj_pend(&qcb->sobj, abs_timeout, &syns);
        if (ret) {
@@ -352,25 +423,68 @@ ssize_t rt_queue_read_timed(RT_QUEUE *queue,
                            void *buf, size_t size,
                            const struct timespec *abs_timeout)
 {
-       ssize_t rsize;
-       void *_buf;
-       int ret;
+       struct alchemy_queue_wait *wait;
+       struct alchemy_queue_msg *msg;
+       struct alchemy_queue *qcb;
+       struct syncstate syns;
+       struct service svc;
+       ssize_t ret;
+       int err = 0;
 
-       rsize = rt_queue_receive_timed(queue, &_buf, abs_timeout);
-       if (rsize < 0)
-               return rsize;
+       if (!threadobj_current_p() && !alchemy_poll_mode(abs_timeout))
+               return -EPERM;
 
-       if (size > rsize)
-               size = rsize;
+       if (size == 0)
+               return 0;
 
-       if (size > 0)
-               memcpy(buf, _buf, size);
+       COPPERPLATE_PROTECT(svc);
 
-       ret = rt_queue_free(queue, _buf);
-       if (ret)
-               return __bt(ret);
+       qcb = get_alchemy_queue(queue, &syns, &err);
+       if (qcb == NULL) {
+               ret = err;
+               goto out;
+       }
 
-       return rsize;
+       if (list_empty(&qcb->mq))
+               goto wait;
+
+       msg = list_pop_entry(&qcb->mq, struct alchemy_queue_msg, next);
+       qcb->mcount--;
+       goto transfer;
+wait:
+       if (alchemy_poll_mode(abs_timeout)) {
+               ret = -EWOULDBLOCK;
+               goto done;
+       }
+
+       wait = threadobj_prepare_wait(struct alchemy_queue_wait);
+       wait->userbuf = buf;
+       wait->usersz = size;
+       wait->msg = NULL;
+
+       ret = syncobj_pend(&qcb->sobj, abs_timeout, &syns);
+       if (ret) {
+               if (ret == -EIDRM) {
+                       threadobj_finish_wait();
+                       goto out;
+               }
+       } else if (wait->msg) {
+               msg = wait->msg;
+       transfer:
+               ret = (ssize_t)(msg->size > size ? size : msg->size);
+               if (ret > 0) 
+                       memcpy(buf, msg + 1, ret);
+               heapobj_free(&qcb->hobj, msg);
+       } else  /* A direct copy took place. */
+               ret = (ssize_t)wait->usersz;
+
+       threadobj_finish_wait();
+done:
+       put_alchemy_queue(qcb, &syns);
+out:
+       COPPERPLATE_UNPROTECT(svc);
+
+       return ret;
 }
 
 int rt_queue_flush(RT_QUEUE *queue)
diff --git a/lib/alchemy/queue.h b/lib/alchemy/queue.h
index c08e86d..bba613f 100644
--- a/lib/alchemy/queue.h
+++ b/lib/alchemy/queue.h
@@ -48,6 +48,8 @@ struct alchemy_queue_msg {
 
 struct alchemy_queue_wait {
        struct alchemy_queue_msg *msg;
+       void *userbuf;
+       size_t usersz;
 };
 
 extern struct syncluster alchemy_queue_table;


_______________________________________________
Xenomai-git mailing list
[email protected]
https://mail.gna.org/listinfo/xenomai-git

Reply via email to