Module: xenomai-forge Branch: master Commit: a4fb94614a70b143def5325f8795a0cca251d7ef URL: http://git.xenomai.org/?p=xenomai-forge.git;a=commit;h=a4fb94614a70b143def5325f8795a0cca251d7ef
Author: Philippe Gerum <[email protected]> Date: Tue Nov 29 12:32:43 2011 +0100 alchemy/queue: optimize with direct copy to waiter This patch implements direct message passing between rt_queue_write() and rt_queue_read() when the runtime conditions allow. Sending a message to a queue already pended by a waiter thread is a quite common case. When both the sender and the receiver threads belong to the same process, we may copy the data sent through rt_queue_write() directly to the receiver's buffer passed to rt_queue_read() before unblocking the thread. Sending the data over this way, without going through any queuing saves one data copy, and the combined cost of allocating then releasing a message buffer, which is significant, particularly on low end hardware. --- lib/alchemy/queue.c | 182 +++++++++++++++++++++++++++++++++++++++++---------- lib/alchemy/queue.h | 2 + 2 files changed, 150 insertions(+), 34 deletions(-) diff --git a/lib/alchemy/queue.c b/lib/alchemy/queue.c index 65c5164..3f40356 100644 --- a/lib/alchemy/queue.c +++ b/lib/alchemy/queue.c @@ -142,7 +142,10 @@ void *rt_queue_alloc(RT_QUEUE *queue, size_t size) if (msg == NULL) goto done; - inith(&msg->next); + /* + * XXX: no need to init the ->next holder, list_*pend() do not + * require this, and this ends up being costly on low end. + */ msg->size = size; /* Zero is allowed. */ msg->refcount = 1; ++msg; @@ -271,20 +274,86 @@ out: int rt_queue_write(RT_QUEUE *queue, const void *buf, size_t size, int mode) { - void *_buf; - int ret; + struct alchemy_queue_wait *wait; + struct alchemy_queue_msg *msg; + struct alchemy_queue *qcb; + struct threadobj *waiter; + struct syncstate syns; + int ret = 0, nwaiters; + struct service svc; + size_t usersz; + + if (size == 0) + return 0; + + COPPERPLATE_PROTECT(svc); + + qcb = get_alchemy_queue(queue, &syns, &ret); + if (qcb == NULL) + goto out; + + waiter = syncobj_peek_at_pend(&qcb->sobj); + if (waiter && threadobj_local_p(waiter)) { + /* + * Fast path for local threads already waiting for + * data via rt_queue_read(): do direct copy to the + * reader's buffer. + */ + wait = threadobj_get_wait(waiter); + usersz = wait->usersz; + if (usersz == 0) + /* no buffer provided, enqueue normally. */ + goto enqueue; + if (size > usersz) + size = usersz; + if (size > 0) + memcpy(wait->userbuf, buf, size); + wait->usersz = size; + syncobj_wakeup_waiter(&qcb->sobj, waiter); + ret = 1; + goto done; + } + +enqueue: + nwaiters = syncobj_pend_count(&qcb->sobj); + if (nwaiters == 0 && (mode & Q_BROADCAST) != 0) + goto done; + + ret = -ENOMEM; + if (qcb->limit && qcb->mcount >= qcb->limit) + goto done; - _buf = rt_queue_alloc(queue, size); - if (_buf == NULL) - return -ENOMEM; + msg = heapobj_alloc(&qcb->hobj, size + sizeof(*msg)); + if (msg == NULL) + goto done; - if (size > 0) - memcpy(_buf, buf, size); + msg->size = size; + msg->refcount = 0; + memcpy(msg + 1, buf, size); - ret = rt_queue_send(queue, _buf, size, mode); - if (ret == 0 && (mode & Q_BROADCAST)) - /* Nobody received, free the temp buffer. */ - rt_queue_free(queue, _buf); + ret = 0; /* # of tasks unblocked. */ + if (nwaiters == 0) { + qcb->mcount++; + if (mode & Q_URGENT) + list_prepend(&msg->next, &qcb->mq); + else + list_append(&msg->next, &qcb->mq); + goto done; + } + + do { + waiter = syncobj_post(&qcb->sobj); + if (waiter == NULL) + break; + wait = threadobj_get_wait(waiter); + wait->msg = msg; + msg->refcount++; + ret++; + } while (mode & Q_BROADCAST); +done: + put_alchemy_queue(qcb, &syns); +out: + COPPERPLATE_UNPROTECT(svc); return ret; } @@ -311,21 +380,23 @@ ssize_t rt_queue_receive_timed(RT_QUEUE *queue, void **bufp, goto out; } - if (!list_empty(&qcb->mq)) { - msg = list_pop_entry(&qcb->mq, struct alchemy_queue_msg, next); - msg->refcount++; - *bufp = msg + 1; - ret = (ssize_t)msg->size; - qcb->mcount--; - goto done; - } + if (list_empty(&qcb->mq)) + goto wait; + msg = list_pop_entry(&qcb->mq, struct alchemy_queue_msg, next); + msg->refcount++; + *bufp = msg + 1; + ret = (ssize_t)msg->size; + qcb->mcount--; + goto done; +wait: if (alchemy_poll_mode(abs_timeout)) { ret = -EWOULDBLOCK; goto done; } wait = threadobj_prepare_wait(struct alchemy_queue_wait); + wait->usersz = 0; ret = syncobj_pend(&qcb->sobj, abs_timeout, &syns); if (ret) { @@ -352,25 +423,68 @@ ssize_t rt_queue_read_timed(RT_QUEUE *queue, void *buf, size_t size, const struct timespec *abs_timeout) { - ssize_t rsize; - void *_buf; - int ret; + struct alchemy_queue_wait *wait; + struct alchemy_queue_msg *msg; + struct alchemy_queue *qcb; + struct syncstate syns; + struct service svc; + ssize_t ret; + int err = 0; - rsize = rt_queue_receive_timed(queue, &_buf, abs_timeout); - if (rsize < 0) - return rsize; + if (!threadobj_current_p() && !alchemy_poll_mode(abs_timeout)) + return -EPERM; - if (size > rsize) - size = rsize; + if (size == 0) + return 0; - if (size > 0) - memcpy(buf, _buf, size); + COPPERPLATE_PROTECT(svc); - ret = rt_queue_free(queue, _buf); - if (ret) - return __bt(ret); + qcb = get_alchemy_queue(queue, &syns, &err); + if (qcb == NULL) { + ret = err; + goto out; + } - return rsize; + if (list_empty(&qcb->mq)) + goto wait; + + msg = list_pop_entry(&qcb->mq, struct alchemy_queue_msg, next); + qcb->mcount--; + goto transfer; +wait: + if (alchemy_poll_mode(abs_timeout)) { + ret = -EWOULDBLOCK; + goto done; + } + + wait = threadobj_prepare_wait(struct alchemy_queue_wait); + wait->userbuf = buf; + wait->usersz = size; + wait->msg = NULL; + + ret = syncobj_pend(&qcb->sobj, abs_timeout, &syns); + if (ret) { + if (ret == -EIDRM) { + threadobj_finish_wait(); + goto out; + } + } else if (wait->msg) { + msg = wait->msg; + transfer: + ret = (ssize_t)(msg->size > size ? size : msg->size); + if (ret > 0) + memcpy(buf, msg + 1, ret); + heapobj_free(&qcb->hobj, msg); + } else /* A direct copy took place. */ + ret = (ssize_t)wait->usersz; + + threadobj_finish_wait(); +done: + put_alchemy_queue(qcb, &syns); +out: + COPPERPLATE_UNPROTECT(svc); + + return ret; } int rt_queue_flush(RT_QUEUE *queue) diff --git a/lib/alchemy/queue.h b/lib/alchemy/queue.h index c08e86d..bba613f 100644 --- a/lib/alchemy/queue.h +++ b/lib/alchemy/queue.h @@ -48,6 +48,8 @@ struct alchemy_queue_msg { struct alchemy_queue_wait { struct alchemy_queue_msg *msg; + void *userbuf; + size_t usersz; }; extern struct syncluster alchemy_queue_table; _______________________________________________ Xenomai-git mailing list [email protected] https://mail.gna.org/listinfo/xenomai-git
