And here is the new ABI I propose. Thanks,
Mathieu * Mathieu Desnoyers ([email protected]) wrote: > This work is derived from the patch from Lai Jiangshan submitted as > "urcu: new wfqueue implementation" > (http://lists.lttng.org/pipermail/lttng-dev/2012-August/018379.html) > > Its changelog: > > > Some guys would be surprised by this fact: > > There are already TWO implementations of wfqueue in urcu. > > > > The first one is in urcu/static/wfqueue.h: > > 1) enqueue: exchange the tail and then update previous->next > > 2) dequeue: wait for first node's next pointer and them shift, a dummy node > > is introduced to avoid the queue->tail become NULL when shift. > > > > The second one shares some code with the first one, and the left code > > are spreading in urcu-call-rcu-impl.h: > > 1) enqueue: share with the first one > > 2) no dequeue operation: and no shift, so it don't need dummy node, > > Although the dummy node is queued when initialization, but it is removed > > after the first dequeue_all operation in call_rcu_thread(). > > call_rcu_data_free() forgets to handle the dummy node if it is not > > removed. > > 3)dequeue_all: record the old head and tail, and queue->head become the > > special > > tail node.(atomic record the tail and change the tail). > > > > The second implementation's code are spreading, bad for review, and it is > > not > > tested by tests/test_urcu_wfq. > > > > So we need a better implementation avoid the dummy node dancing and can > > service > > both generic wfqueue APIs and dequeue_all API for call rcu. > > > > The new implementation: > > 1) enqueue: share with the first one/original implementation. > > 2) dequeue: shift when node count >= 2, cmpxchg when node count = 1. > > no dummy node, save memory. > > 3) dequeue_all: simply set queue->head.next to NULL, xchg the tail > > and return the old head.next. > > > > More implementation details are in the code. > > tests/test_urcu_wfq will be update in future for testing new APIs. > > The patch proposed by Lai brings a very interesting simplification to > the single-node handling (which is kept here), and moves all queue > handling code away from call_rcu implementation, back into the wfqueue > code. This has the benefit to allow testing enhancements. > > I modified it so the API does not expose implementation details to the > user (e.g. ___cds_wfq_node_sync_next). I added a "splice" operation and > a for loop iterator which should allow wfqueue users to use the list > very efficiently both from LGPL/GPL code and from non-LGPL-compatible > code. > > Benchmarks performed on Intel(R) Core(TM) i7-3520M CPU @ 2.90GHz > (dual-core, with hyperthreading) > > Benchmark invoked: > for a in $(seq 1 10); do ./test_urcu_wfq 1 1 10 -a 0 -a 2; done > > (using cpu number 0 and 2, which should correspond to two cores of my > Intel 2-core/hyperthread processor) > > Before patch: > > testdur 10 nr_enqueuers 1 wdelay 0 nr_dequeuers 1 rdur 0 > nr_enqueues 97274297 nr_dequeues 80745742 successful enqueues > 97274297 successful dequeues 80745321 end_dequeues 16528976 nr_ops > 178020039 > testdur 10 nr_enqueuers 1 wdelay 0 nr_dequeuers 1 rdur 0 > nr_enqueues 92300568 nr_dequeues 75019529 successful enqueues > 92300568 successful dequeues 74973237 end_dequeues 17327331 nr_ops > 167320097 > testdur 10 nr_enqueuers 1 wdelay 0 nr_dequeuers 1 rdur 0 > nr_enqueues 93516443 nr_dequeues 75846726 successful enqueues > 93516443 successful dequeues 75826578 end_dequeues 17689865 nr_ops > 169363169 > testdur 10 nr_enqueuers 1 wdelay 0 nr_dequeuers 1 rdur 0 > nr_enqueues 94160362 nr_dequeues 77967638 successful enqueues > 94160362 successful dequeues 77967638 end_dequeues 16192724 nr_ops > 172128000 > testdur 10 nr_enqueuers 1 wdelay 0 nr_dequeuers 1 rdur 0 > nr_enqueues 97491956 nr_dequeues 81001191 successful enqueues > 97491956 successful dequeues 81000247 end_dequeues 16491709 nr_ops > 178493147 > testdur 10 nr_enqueuers 1 wdelay 0 nr_dequeuers 1 rdur 0 > nr_enqueues 94101298 nr_dequeues 75650510 successful enqueues > 94101298 successful dequeues 75649318 end_dequeues 18451980 nr_ops > 169751808 > testdur 10 nr_enqueuers 1 wdelay 0 nr_dequeuers 1 rdur 0 > nr_enqueues 94742803 nr_dequeues 75402105 successful enqueues > 94742803 successful dequeues 75341859 end_dequeues 19400944 nr_ops > 170144908 > testdur 10 nr_enqueuers 1 wdelay 0 nr_dequeuers 1 rdur 0 > nr_enqueues 92198835 nr_dequeues 75037877 successful enqueues > 92198835 successful dequeues 75027605 end_dequeues 17171230 nr_ops > 167236712 > testdur 10 nr_enqueuers 1 wdelay 0 nr_dequeuers 1 rdur 0 > nr_enqueues 94159560 nr_dequeues 77895972 successful enqueues > 94159560 successful dequeues 77858442 end_dequeues 16301118 nr_ops > 172055532 > testdur 10 nr_enqueuers 1 wdelay 0 nr_dequeuers 1 rdur 0 > nr_enqueues 96059399 nr_dequeues 80115442 successful enqueues > 96059399 successful dequeues 80066843 end_dequeues 15992556 nr_ops > 176174841 > > After patch: > > testdur 10 nr_enqueuers 1 wdelay 0 nr_dequeuers 1 rdur 0 > nr_enqueues 221229322 nr_dequeues 210645491 successful enqueues > 221229322 successful dequeues 210645088 end_dequeues 10584234 nr_ops > 431874813 > testdur 10 nr_enqueuers 1 wdelay 0 nr_dequeuers 1 rdur 0 > nr_enqueues 219803943 nr_dequeues 210377337 successful enqueues > 219803943 successful dequeues 210368680 end_dequeues 9435263 nr_ops > 430181280 > testdur 10 nr_enqueuers 1 wdelay 0 nr_dequeuers 1 rdur 0 > nr_enqueues 237006358 nr_dequeues 237035340 successful enqueues > 237006358 successful dequeues 236997050 end_dequeues 9308 nr_ops > 474041698 > testdur 10 nr_enqueuers 1 wdelay 0 nr_dequeuers 1 rdur 0 > nr_enqueues 235822443 nr_dequeues 235815942 successful enqueues > 235822443 successful dequeues 235814020 end_dequeues 8423 nr_ops > 471638385 > testdur 10 nr_enqueuers 1 wdelay 0 nr_dequeuers 1 rdur 0 > nr_enqueues 235825567 nr_dequeues 235811803 successful enqueues > 235825567 successful dequeues 235810526 end_dequeues 15041 nr_ops > 471637370 > testdur 10 nr_enqueuers 1 wdelay 0 nr_dequeuers 1 rdur 0 > nr_enqueues 221974953 nr_dequeues 210938190 successful enqueues > 221974953 successful dequeues 210938190 end_dequeues 11036763 nr_ops > 432913143 > testdur 10 nr_enqueuers 1 wdelay 0 nr_dequeuers 1 rdur 0 > nr_enqueues 237994492 nr_dequeues 237938119 successful enqueues > 237994492 successful dequeues 237930648 end_dequeues 63844 nr_ops > 475932611 > testdur 10 nr_enqueuers 1 wdelay 0 nr_dequeuers 1 rdur 0 > nr_enqueues 220634365 nr_dequeues 210491382 successful enqueues > 220634365 successful dequeues 210490995 end_dequeues 10143370 nr_ops > 431125747 > testdur 10 nr_enqueuers 1 wdelay 0 nr_dequeuers 1 rdur 0 > nr_enqueues 237388065 nr_dequeues 237401251 successful enqueues > 237388065 successful dequeues 237380295 end_dequeues 7770 nr_ops > 474789316 > testdur 10 nr_enqueuers 1 wdelay 0 nr_dequeuers 1 rdur 0 > nr_enqueues 221201436 nr_dequeues 210831162 successful enqueues > 221201436 successful dequeues 210831162 end_dequeues 10370274 nr_ops > 432032598 > > Summary: Both enqueue and dequeue speed increase: around 2.3x speedup > for enqueue, and around 2.6x for dequeue. > > We can verify that: > successful enqueues - successful dequeues = end_dequeues > > For all runs (ensures correctness: no lost node). > > * Introduce wfqueue ABI v1 (false-sharing fix) > > wfqueue v0 suffers from false-sharing between head and tail. By > cache-aligning head and tail, we get a significant speedup on > benchmarks. But in order to do that, we need to break the ABI to enlarge > struct cds_wfq_queue. > > Provide a backward compatibility ABI for the old wfqueue by defining the > original symbols to the old implementation (which uses dummy node). > Programs compiled against old headers, which are not LGPL_SOURCE, will > still use the old implementation. > > Any code compiled against the new header will directly use the new ABI. > > This does not require any change in the way users call the API: the new > ABI symbols are simply defined with _1 suffix, and wrapped by > preprocessor macros. > > Known limitation: users should *not* link together objects using the v0 > and v1 APIs of wfqueue and exchange struct cds_wfq_queue structures > between the two. The only way to run into this corner-case would be to > combine objects compiled with different versions of the urcu/wfqueue.h > header. > > CC: Lai Jiangshan <[email protected]> > CC: Paul McKenney <[email protected]> > Signed-off-by: Mathieu Desnoyers <[email protected]> > --- > diff --git a/Makefile.am b/Makefile.am > index 2396fcf..31052ef 100644 > --- a/Makefile.am > +++ b/Makefile.am > @@ -53,7 +53,7 @@ lib_LTLIBRARIES = liburcu-common.la \ > # liburcu-common contains wait-free queues (needed by call_rcu) as well > # as futex fallbacks. > # > -liburcu_common_la_SOURCES = wfqueue.c wfstack.c $(COMPAT) > +liburcu_common_la_SOURCES = wfqueue.c wfqueue0.c wfstack.c $(COMPAT) > > liburcu_la_SOURCES = urcu.c urcu-pointer.c $(COMPAT) > liburcu_la_LIBADD = liburcu-common.la > diff --git a/urcu-call-rcu-impl.h b/urcu-call-rcu-impl.h > index 13b24ff..d8537d0 100644 > --- a/urcu-call-rcu-impl.h > +++ b/urcu-call-rcu-impl.h > @@ -21,6 +21,7 @@ > */ > > #define _GNU_SOURCE > +#define _LGPL_SOURCE > #include <stdio.h> > #include <pthread.h> > #include <signal.h> > @@ -220,10 +221,7 @@ static void call_rcu_wake_up(struct call_rcu_data *crdp) > static void *call_rcu_thread(void *arg) > { > unsigned long cbcount; > - struct cds_wfq_node *cbs; > - struct cds_wfq_node **cbs_tail; > - struct call_rcu_data *crdp = (struct call_rcu_data *)arg; > - struct rcu_head *rhp; > + struct call_rcu_data *crdp = (struct call_rcu_data *) arg; > int rt = !!(uatomic_read(&crdp->flags) & URCU_CALL_RCU_RT); > int ret; > > @@ -243,35 +241,30 @@ static void *call_rcu_thread(void *arg) > cmm_smp_mb(); > } > for (;;) { > - if (&crdp->cbs.head != _CMM_LOAD_SHARED(crdp->cbs.tail)) { > - while ((cbs = _CMM_LOAD_SHARED(crdp->cbs.head)) == NULL) > - poll(NULL, 0, 1); > - _CMM_STORE_SHARED(crdp->cbs.head, NULL); > - cbs_tail = (struct cds_wfq_node **) > - uatomic_xchg(&crdp->cbs.tail, &crdp->cbs.head); > + struct cds_wfq_queue cbs_tmp; > + struct cds_wfq_node *cbs, *tmp_cbs; > + > + cds_wfq_init(&cbs_tmp); > + __cds_wfq_splice_blocking(&cbs_tmp, &crdp->cbs); > + if (!cds_wfq_empty(&cbs_tmp)) { > synchronize_rcu(); > cbcount = 0; > - do { > - while (cbs->next == NULL && > - &cbs->next != cbs_tail) > - poll(NULL, 0, 1); > - if (cbs == &crdp->cbs.dummy) { > - cbs = cbs->next; > - continue; > - } > - rhp = (struct rcu_head *)cbs; > - cbs = cbs->next; > + __cds_wfq_for_each_blocking_safe(&cbs_tmp, > + cbs, tmp_cbs) { > + struct rcu_head *rhp; > + > + rhp = caa_container_of(cbs, > + struct rcu_head, next); > rhp->func(rhp); > cbcount++; > - } while (cbs != NULL); > + } > uatomic_sub(&crdp->qlen, cbcount); > } > if (uatomic_read(&crdp->flags) & URCU_CALL_RCU_STOP) > break; > rcu_thread_offline(); > if (!rt) { > - if (&crdp->cbs.head > - == _CMM_LOAD_SHARED(crdp->cbs.tail)) { > + if (cds_wfq_empty(&crdp->cbs)) { > call_rcu_wait(crdp); > poll(NULL, 0, 10); > uatomic_dec(&crdp->futex); > @@ -625,32 +618,27 @@ void call_rcu(struct rcu_head *head, > */ > void call_rcu_data_free(struct call_rcu_data *crdp) > { > - struct cds_wfq_node *cbs; > - struct cds_wfq_node **cbs_tail; > - struct cds_wfq_node **cbs_endprev; > - > if (crdp == NULL || crdp == default_call_rcu_data) { > return; > } > + > if ((uatomic_read(&crdp->flags) & URCU_CALL_RCU_STOPPED) == 0) { > uatomic_or(&crdp->flags, URCU_CALL_RCU_STOP); > wake_call_rcu_thread(crdp); > while ((uatomic_read(&crdp->flags) & URCU_CALL_RCU_STOPPED) == > 0) > poll(NULL, 0, 1); > } > - if (&crdp->cbs.head != _CMM_LOAD_SHARED(crdp->cbs.tail)) { > - while ((cbs = _CMM_LOAD_SHARED(crdp->cbs.head)) == NULL) > - poll(NULL, 0, 1); > - _CMM_STORE_SHARED(crdp->cbs.head, NULL); > - cbs_tail = (struct cds_wfq_node **) > - uatomic_xchg(&crdp->cbs.tail, &crdp->cbs.head); > + > + if (!cds_wfq_empty(&crdp->cbs)) { > /* Create default call rcu data if need be */ > (void) get_default_call_rcu_data(); > - cbs_endprev = (struct cds_wfq_node **) > - uatomic_xchg(&default_call_rcu_data, cbs_tail); > - *cbs_endprev = cbs; > + > + __cds_wfq_splice_blocking(&default_call_rcu_data->cbs, > + &crdp->cbs); > + > uatomic_add(&default_call_rcu_data->qlen, > uatomic_read(&crdp->qlen)); > + > wake_call_rcu_thread(default_call_rcu_data); > } > > diff --git a/urcu/static/wfqueue.h b/urcu/static/wfqueue.h > index 636e1af..52e452d 100644 > --- a/urcu/static/wfqueue.h > +++ b/urcu/static/wfqueue.h > @@ -9,7 +9,8 @@ > * TO BE INCLUDED ONLY IN LGPL-COMPATIBLE CODE. See wfqueue.h for linking > * dynamically with the userspace rcu library. > * > - * Copyright 2010 - Mathieu Desnoyers <[email protected]> > + * Copyright 2010-2012 - Mathieu Desnoyers <[email protected]> > + * Copyright 2011-2012 - Lai Jiangshan <[email protected]> > * > * This library is free software; you can redistribute it and/or > * modify it under the terms of the GNU Lesser General Public > @@ -29,6 +30,7 @@ > #include <pthread.h> > #include <assert.h> > #include <poll.h> > +#include <stdbool.h> > #include <urcu/compiler.h> > #include <urcu/uatomic.h> > > @@ -38,11 +40,16 @@ extern "C" { > > /* > * Queue with wait-free enqueue/blocking dequeue. > - * This implementation adds a dummy head node when the queue is empty to > ensure > - * we can always update the queue locklessly. > * > * Inspired from half-wait-free/half-blocking queue implementation done by > * Paul E. McKenney. > + * > + * Caller must ensure mutual exclusion of queue update operations > + * "dequeue" and "splice" source queue. Queue read operations "first" > + * and "next" need to be protected against concurrent "dequeue" and > + * "splice" (for source queue) by the caller. "enqueue", "splice" > + * (destination queue), and "empty" are the only operations that can be > + * used without any mutual exclusion. > */ > > #define WFQ_ADAPT_ATTEMPTS 10 /* Retry if being set */ > @@ -57,31 +64,55 @@ static inline void _cds_wfq_init(struct cds_wfq_queue *q) > { > int ret; > > - _cds_wfq_node_init(&q->dummy); > /* Set queue head and tail */ > - q->head = &q->dummy; > - q->tail = &q->dummy.next; > - ret = pthread_mutex_init(&q->lock, NULL); > + _cds_wfq_node_init(&q->dequeue.head); > + q->enqueue.tail = &q->dequeue.head; > + ret = pthread_mutex_init(&q->dequeue.lock, NULL); > assert(!ret); > } > > -static inline void _cds_wfq_enqueue(struct cds_wfq_queue *q, > - struct cds_wfq_node *node) > +static inline bool _cds_wfq_empty(struct cds_wfq_queue *q) > { > - struct cds_wfq_node **old_tail; > + /* > + * Queue is empty if no node is pointed by q->head.next nor > + * q->tail. Even though the q->tail check is sufficient to find > + * out of the queue is empty, we first check q->head.next as a > + * common case to ensure that dequeuers do not frequently access > + * enqueuer's q->tail cache line. > + */ > + return CMM_LOAD_SHARED(q->dequeue.head.next) == NULL > + && CMM_LOAD_SHARED(q->enqueue.tail) == &q->dequeue.head; > +} > + > +static inline void ___cds_wfq_append(struct cds_wfq_queue *q, > + struct cds_wfq_node *new_head, > + struct cds_wfq_node *new_tail) > +{ > + struct cds_wfq_node *old_tail; > > /* > - * uatomic_xchg() implicit memory barrier orders earlier stores to data > - * structure containing node and setting node->next to NULL before > - * publication. > + * Implicit memory barrier before uatomic_xchg() orders earlier > + * stores to data structure containing node and setting > + * node->next to NULL before publication. > */ > - old_tail = uatomic_xchg(&q->tail, &node->next); > + old_tail = uatomic_xchg(&q->enqueue.tail, new_tail); > + > /* > - * At this point, dequeuers see a NULL old_tail->next, which indicates > - * that the queue is being appended to. The following store will append > - * "node" to the queue from a dequeuer perspective. > + * Implicit memory barrier after uatomic_xchg() orders store to > + * q->tail before store to old_tail->next. > + * > + * At this point, dequeuers see a NULL q->tail->next, which > + * indicates that the queue is being appended to. The following > + * store will append "node" to the queue from a dequeuer > + * perspective. > */ > - CMM_STORE_SHARED(*old_tail, node); > + CMM_STORE_SHARED(old_tail->next, new_head); > +} > + > +static inline void _cds_wfq_enqueue(struct cds_wfq_queue *q, > + struct cds_wfq_node *new_tail) > +{ > + ___cds_wfq_append(q, new_tail, new_tail); > } > > /* > @@ -100,14 +131,68 @@ ___cds_wfq_node_sync_next(struct cds_wfq_node *node) > if (++attempt >= WFQ_ADAPT_ATTEMPTS) { > poll(NULL, 0, WFQ_WAIT); /* Wait for 10ms */ > attempt = 0; > - } else > + } else { > caa_cpu_relax(); > + } > } > > return next; > } > > /* > + * ___cds_wfq_first_blocking: get first node of a queue, without dequeuing. > + * > + * Mutual exclusion with "dequeue" and "splice" operations must be ensured > + * by the caller. > + */ > +static inline struct cds_wfq_node * > +___cds_wfq_first_blocking(struct cds_wfq_queue *q) > +{ > + struct cds_wfq_node *node; > + > + if (_cds_wfq_empty(q)) > + return NULL; > + node = ___cds_wfq_node_sync_next(&q->dequeue.head); > + /* Load q->head.next before loading node's content */ > + cmm_smp_read_barrier_depends(); > + return node; > +} > + > +/* > + * ___cds_wfq_next_blocking: get next node of a queue, without dequeuing. > + * > + * Mutual exclusion with "dequeue" and "splice" operations must be ensured > + * by the caller. > + */ > +static inline struct cds_wfq_node * > +___cds_wfq_next_blocking(struct cds_wfq_queue *q, struct cds_wfq_node *node) > +{ > + struct cds_wfq_node *next; > + > + /* > + * Even though the following q->tail check is sufficient to find > + * out if we reached the end of the queue, we first check > + * node->next as a common case to ensure that iteration on nodes > + * do not frequently access enqueuer's q->tail cache line. > + */ > + if ((next = CMM_LOAD_SHARED(node->next)) != NULL) { > + /* Load node->next before loading next's content */ > + cmm_smp_read_barrier_depends(); > + return next; > + } > + /* Load node->next before q->tail */ > + cmm_smp_rmb(); > + if (CMM_LOAD_SHARED(q->enqueue.tail) == node) > + return NULL; > + next = ___cds_wfq_node_sync_next(node); > + /* Load node->next before loading next's content */ > + cmm_smp_read_barrier_depends(); > + return next; > +} > + > +/* > + * ___cds_wfq_dequeue_blocking: dequeue a node from the queue. > + * > * It is valid to reuse and free a dequeued node immediately. > * > * No need to go on a waitqueue here, as there is no possible state in which > the > @@ -120,42 +205,104 @@ ___cds_wfq_dequeue_blocking(struct cds_wfq_queue *q) > { > struct cds_wfq_node *node, *next; > > - /* > - * Queue is empty if it only contains the dummy node. > - */ > - if (q->head == &q->dummy && CMM_LOAD_SHARED(q->tail) == &q->dummy.next) > + if (_cds_wfq_empty(q)) > return NULL; > - node = q->head; > > - next = ___cds_wfq_node_sync_next(node); > + node = ___cds_wfq_node_sync_next(&q->dequeue.head); > + > + if ((next = CMM_LOAD_SHARED(node->next)) == NULL) { > + /* > + * @node is probably the only node in the queue. > + * Try to move the tail to &q->head. > + * q->head.next is set to NULL here, and stays > + * NULL if the cmpxchg succeeds. Should the > + * cmpxchg fail due to a concurrent enqueue, the > + * q->head.next will be set to the next node. > + * The implicit memory barrier before > + * uatomic_cmpxchg() orders load node->next > + * before loading q->tail. > + * The implicit memory barrier before uatomic_cmpxchg > + * orders load q->head.next before loading node's > + * content. > + */ > + _cds_wfq_node_init(&q->dequeue.head); > + if (uatomic_cmpxchg(&q->enqueue.tail, node, > + &q->dequeue.head) == node) > + return node; > + next = ___cds_wfq_node_sync_next(node); > + } > > /* > * Move queue head forward. > */ > - q->head = next; > + q->dequeue.head.next = next; > + > + /* Load q->head.next before loading node's content */ > + cmm_smp_read_barrier_depends(); > + return node; > +} > + > +/* > + * ___cds_wfq_splice_blocking: enqueue all src_q nodes at the end of dest_q. > + * > + * Dequeue all nodes from src_q. > + * dest_q must be already initialized. > + * caller ensures mutual exclusion of dequeue and splice operations on > + * src_q. > + */ > +static inline void > +___cds_wfq_splice_blocking(struct cds_wfq_queue *dest_q, > + struct cds_wfq_queue *src_q) > +{ > + struct cds_wfq_node *head, *tail; > + > + if (_cds_wfq_empty(src_q)) > + return; > + > + head = ___cds_wfq_node_sync_next(&src_q->dequeue.head); > + _cds_wfq_node_init(&src_q->dequeue.head); > + > /* > - * Requeue dummy node if we just dequeued it. > + * Memory barrier implied before uatomic_xchg() orders store to > + * src_q->head before store to src_q->tail. This is required by > + * concurrent enqueue on src_q, which exchanges the tail before > + * updating the previous tail's next pointer. > */ > - if (node == &q->dummy) { > - _cds_wfq_node_init(node); > - _cds_wfq_enqueue(q, node); > - return ___cds_wfq_dequeue_blocking(q); > - } > - return node; > + tail = uatomic_xchg(&src_q->enqueue.tail, &src_q->dequeue.head); > + > + /* > + * Append the spliced content of src_q into dest_q. Does not > + * require mutual exclusion on dest_q (wait-free). > + */ > + ___cds_wfq_append(dest_q, head, tail); > } > > +/* Locking performed within cds_wfq calls. */ > static inline struct cds_wfq_node * > _cds_wfq_dequeue_blocking(struct cds_wfq_queue *q) > { > - struct cds_wfq_node *retnode; > + struct cds_wfq_node *retval; > + int ret; > + > + ret = pthread_mutex_lock(&q->dequeue.lock); > + assert(!ret); > + retval = ___cds_wfq_dequeue_blocking(q); > + ret = pthread_mutex_unlock(&q->dequeue.lock); > + assert(!ret); > + return retval; > +} > + > +static inline void > +_cds_wfq_splice_blocking(struct cds_wfq_queue *dest_q, > + struct cds_wfq_queue *src_q) > +{ > int ret; > > - ret = pthread_mutex_lock(&q->lock); > + ret = pthread_mutex_lock(&src_q->dequeue.lock); > assert(!ret); > - retnode = ___cds_wfq_dequeue_blocking(q); > - ret = pthread_mutex_unlock(&q->lock); > + ___cds_wfq_splice_blocking(dest_q, src_q); > + ret = pthread_mutex_unlock(&src_q->dequeue.lock); > assert(!ret); > - return retnode; > } > > #ifdef __cplusplus > diff --git a/urcu/wfqueue.h b/urcu/wfqueue.h > index 03a73f1..446c94c 100644 > --- a/urcu/wfqueue.h > +++ b/urcu/wfqueue.h > @@ -6,7 +6,8 @@ > * > * Userspace RCU library - Queue with Wait-Free Enqueue/Blocking Dequeue > * > - * Copyright 2010 - Mathieu Desnoyers <[email protected]> > + * Copyright 2010-2012 - Mathieu Desnoyers <[email protected]> > + * Copyright 2011-2012 - Lai Jiangshan <[email protected]> > * > * This library is free software; you can redistribute it and/or > * modify it under the terms of the GNU Lesser General Public > @@ -25,7 +26,9 @@ > > #include <pthread.h> > #include <assert.h> > +#include <stdbool.h> > #include <urcu/compiler.h> > +#include <urcu/arch.h> > > #ifdef __cplusplus > extern "C" { > @@ -33,8 +36,6 @@ extern "C" { > > /* > * Queue with wait-free enqueue/blocking dequeue. > - * This implementation adds a dummy head node when the queue is empty to > ensure > - * we can always update the queue locklessly. > * > * Inspired from half-wait-free/half-blocking queue implementation done by > * Paul E. McKenney. > @@ -45,9 +46,13 @@ struct cds_wfq_node { > }; > > struct cds_wfq_queue { > - struct cds_wfq_node *head, **tail; > - struct cds_wfq_node dummy; /* Dummy node */ > - pthread_mutex_t lock; > + struct { > + struct cds_wfq_node head; > + pthread_mutex_t lock; > + } __attribute__((aligned((CAA_CACHE_LINE_SIZE)))) dequeue; > + struct { > + struct cds_wfq_node *tail; > + } __attribute__((aligned((CAA_CACHE_LINE_SIZE)))) enqueue; > }; > > #ifdef _LGPL_SOURCE > @@ -55,22 +60,104 @@ struct cds_wfq_queue { > #include <urcu/static/wfqueue.h> > > #define cds_wfq_node_init _cds_wfq_node_init > -#define cds_wfq_init _cds_wfq_init > -#define cds_wfq_enqueue _cds_wfq_enqueue > -#define __cds_wfq_dequeue_blocking ___cds_wfq_dequeue_blocking > +#define cds_wfq_init _cds_wfq_init > +#define cds_wfq_empty _cds_wfq_empty > +#define cds_wfq_enqueue _cds_wfq_enqueue > + > +/* Locking performed within cds_wfq calls. */ > #define cds_wfq_dequeue_blocking _cds_wfq_dequeue_blocking > +#define cds_wfq_splice_blocking _cds_wfq_splice_blocking > +#define cds_wfq_first_blocking _cds_wfq_first_blocking > +#define cds_wfq_next_blocking _cds_wfq_next_blocking > + > +/* Locking ensured by caller */ > +#define __cds_wfq_dequeue_blocking ___cds_wfq_dequeue_blocking > +#define __cds_wfq_splice_blocking ___cds_wfq_splice_blocking > +#define __cds_wfq_first_blocking ___cds_wfq_first_blocking > +#define __cds_wfq_next_blocking ___cds_wfq_next_blocking > > #else /* !_LGPL_SOURCE */ > > -extern void cds_wfq_node_init(struct cds_wfq_node *node); > -extern void cds_wfq_init(struct cds_wfq_queue *q); > -extern void cds_wfq_enqueue(struct cds_wfq_queue *q, struct cds_wfq_node > *node); > -/* __cds_wfq_dequeue_blocking: caller ensures mutual exclusion between > dequeues */ > -extern struct cds_wfq_node *__cds_wfq_dequeue_blocking(struct cds_wfq_queue > *q); > -extern struct cds_wfq_node *cds_wfq_dequeue_blocking(struct cds_wfq_queue > *q); > +extern void cds_wfq_node_init_1(struct cds_wfq_node *node); > +extern void cds_wfq_init_1(struct cds_wfq_queue *q); > +extern bool cds_wfq_empty_1(struct cds_wfq_queue *q); > +extern void cds_wfq_enqueue_1(struct cds_wfq_queue *q, struct cds_wfq_node > *node); > + > +/* Locking performed within cds_wfq calls. */ > +extern struct cds_wfq_node *cds_wfq_dequeue_blocking_1(struct cds_wfq_queue > *q); > +extern void cds_wfq_splice_blocking_1(struct cds_wfq_queue *dest_q, > + struct cds_wfq_queue *src_q); > + > +/* > + * __cds_wfq_dequeue_blocking: caller ensures mutual exclusion of dequeue > + * and splice operations. > + */ > +extern struct cds_wfq_node *__cds_wfq_dequeue_blocking_1(struct > cds_wfq_queue *q); > + > +/* > + * __cds_wfq_splice_blocking: caller ensures mutual exclusion of dequeue and > + * splice operations on src_q. dest_q must be already initialized. > + */ > +extern void __cds_wfq_splice_blocking_1(struct cds_wfq_queue *dest_q, > + struct cds_wfq_queue *src_q); > + > +/* > + * __cds_wfq_first_blocking: mutual exclusion with "dequeue" and > + * "splice" operations must be ensured by the caller. > + */ > +extern struct cds_wfq_node *__cds_wfq_first_blocking_1(struct cds_wfq_queue > *q); > + > +/* > + * __cds_wfq_next_blocking: mutual exclusion with "dequeue" and "splice" > + * operations must be ensured by the caller. > + */ > +extern struct cds_wfq_node *__cds_wfq_next_blocking_1(struct cds_wfq_queue > *q, > + struct cds_wfq_node *node); > + > +#define cds_wfq_node_init cds_wfq_node_init_1 > +#define cds_wfq_init cds_wfq_init_1 > +#define cds_wfq_empty cds_wfq_empty_1 > +#define cds_wfq_enqueue cds_wfq_enqueue_1 > + > +/* Locking performed within cds_wfq calls. */ > +#define cds_wfq_dequeue_blocking cds_wfq_dequeue_blocking_1 > +#define cds_wfq_splice_blocking cds_wfq_splice_blocking_1 > +#define cds_wfq_first_blocking cds_wfq_first_blocking_1 > +#define cds_wfq_next_blocking cds_wfq_next_blocking_1 > + > +/* Locking ensured by caller */ > +#define __cds_wfq_dequeue_blocking __cds_wfq_dequeue_blocking_1 > +#define __cds_wfq_splice_blocking __cds_wfq_splice_blocking_1 > +#define __cds_wfq_first_blocking __cds_wfq_first_blocking_1 > +#define __cds_wfq_next_blocking __cds_wfq_next_blocking_1 > > #endif /* !_LGPL_SOURCE */ > > +/* > + * __cds_wfq_for_each_blocking: Iterate over all nodes in a queue, > + * without dequeuing them. > + * > + * Mutual exclusion with "dequeue" and "splice" operations must be > + * ensured by the caller. > + */ > +#define __cds_wfq_for_each_blocking(q, node) \ > + for (node = __cds_wfq_first_blocking(q); \ > + node != NULL; \ > + node = __cds_wfq_next_blocking(q, node)) > + > +/* > + * __cds_wfq_for_each_blocking_safe: Iterate over all nodes in a queue, > + * without dequeuing them. Safe against deletion. > + * > + * Mutual exclusion with "dequeue" and "splice" operations must be > + * ensured by the caller. > + */ > +#define __cds_wfq_for_each_blocking_safe(q, node, n) \ > + for (node = __cds_wfq_first_blocking(q), \ > + n = (node ? __cds_wfq_next_blocking(q, node) : NULL); \ > + node != NULL; \ > + node = n, n = (node ? __cds_wfq_next_blocking(q, node) : NULL)) > + > #ifdef __cplusplus > } > #endif > diff --git a/wfqueue.c b/wfqueue.c > index 3337171..b5fba7b 100644 > --- a/wfqueue.c > +++ b/wfqueue.c > @@ -3,7 +3,8 @@ > * > * Userspace RCU library - Queue with Wait-Free Enqueue/Blocking Dequeue > * > - * Copyright 2010 - Mathieu Desnoyers <[email protected]> > + * Copyright 2010-2012 - Mathieu Desnoyers <[email protected]> > + * Copyright 2011-2012 - Lai Jiangshan <[email protected]> > * > * This library is free software; you can redistribute it and/or > * modify it under the terms of the GNU Lesser General Public > @@ -28,27 +29,55 @@ > * library wrappers to be used by non-LGPL compatible source code. > */ > > -void cds_wfq_node_init(struct cds_wfq_node *node) > +void cds_wfq_node_init_1(struct cds_wfq_node *node) > { > _cds_wfq_node_init(node); > } > > -void cds_wfq_init(struct cds_wfq_queue *q) > +void cds_wfq_init_1(struct cds_wfq_queue *q) > { > _cds_wfq_init(q); > } > > -void cds_wfq_enqueue(struct cds_wfq_queue *q, struct cds_wfq_node *node) > +bool cds_wfq_empty_1(struct cds_wfq_queue *q) > +{ > + return _cds_wfq_empty(q); > +} > + > +void cds_wfq_enqueue_1(struct cds_wfq_queue *q, struct cds_wfq_node *node) > { > _cds_wfq_enqueue(q, node); > } > > -struct cds_wfq_node *__cds_wfq_dequeue_blocking(struct cds_wfq_queue *q) > +struct cds_wfq_node *cds_wfq_dequeue_blocking_1(struct cds_wfq_queue *q) > +{ > + return _cds_wfq_dequeue_blocking(q); > +} > + > +void cds_wfq_splice_blocking_1(struct cds_wfq_queue *dest_q, > + struct cds_wfq_queue *src_q) > +{ > + _cds_wfq_splice_blocking(dest_q, src_q); > +} > + > +struct cds_wfq_node *__cds_wfq_dequeue_blocking_1(struct cds_wfq_queue *q) > { > return ___cds_wfq_dequeue_blocking(q); > } > > -struct cds_wfq_node *cds_wfq_dequeue_blocking(struct cds_wfq_queue *q) > +void __cds_wfq_splice_blocking_1(struct cds_wfq_queue *dest_q, > + struct cds_wfq_queue *src_q) > { > - return _cds_wfq_dequeue_blocking(q); > + ___cds_wfq_splice_blocking(dest_q, src_q); > +} > + > +struct cds_wfq_node *__cds_wfq_first_blocking_1(struct cds_wfq_queue *q) > +{ > + return ___cds_wfq_first_blocking(q); > +} > + > +struct cds_wfq_node *__cds_wfq_next_blocking_1(struct cds_wfq_queue *q, > + struct cds_wfq_node *node) > +{ > + return ___cds_wfq_next_blocking(q, node); > } > > -- > Mathieu Desnoyers > Operating System Efficiency R&D Consultant > EfficiOS Inc. > http://www.efficios.com > > _______________________________________________ > lttng-dev mailing list > [email protected] > http://lists.lttng.org/cgi-bin/mailman/listinfo/lttng-dev -- Mathieu Desnoyers Operating System Efficiency R&D Consultant EfficiOS Inc. http://www.efficios.com _______________________________________________ lttng-dev mailing list [email protected] http://lists.lttng.org/cgi-bin/mailman/listinfo/lttng-dev
