On Sat, Aug 11, 2012 at 2:28 AM, Mathieu Desnoyers <[email protected]> wrote: > * Lai Jiangshan ([email protected]) wrote: >> Some guys would be surprised by this fact: >> There are already TWO implementations of wfqueue in urcu. >> >> The first one is in urcu/static/wfqueue.h: >> 1) enqueue: exchange the tail and then update previous->next >> 2) dequeue: wait for first node's next pointer and them shift, a dummy node >> is introduced to avoid the queue->tail become NULL when shift. >> >> The second one shares some code with the first one, and the left code >> are spreading in urcu-call-rcu-impl.h: >> 1) enqueue: share with the first one >> 2) no dequeue operation: and no shift, so it don't need dummy node, >> Although the dummy node is queued when initialization, but it is >> removed >> after the first dequeue_all operation in call_rcu_thread(). >> call_rcu_data_free() forgets to handle the dummy node if it is not >> removed. >> 3)dequeue_all: record the old head and tail, and queue->head become the >> special >> tail node.(atomic record the tail and change the tail). >> >> The second implementation's code are spreading, bad for review, and it is not >> tested by tests/test_urcu_wfq. >> >> So we need a better implementation avoid the dummy node dancing and can >> service >> both generic wfqueue APIs and dequeue_all API for call rcu. >> >> The new implementation: >> 1) enqueue: share with the first one/original implementation. >> 2) dequeue: shift when node count >= 2, cmpxchg when node count = 1. >> no dummy node, save memory. >> 3) dequeue_all: simply set queue->head.next to NULL, xchg the tail >> and return the old head.next. > > Hi Lai, > > Your approach is very interesting! It is indeed good for testing and > maintenance if we can keep all the queue code within the API. > > I am concerned about the following scenario in your new implementation, > I would like to know your thoughts on this. It could happen on > architectures reordering loads (DEC Alpha, AMD64, IA64, PA-RISC, POWER, > SPARC RMO, x86 TSO, and x86 OOStore): > > init state: list is empty > > CPU 0 CPU 1 > > ___cds_wfq_append_list() (append newtail) > oldtail = uatomic_xchg(&q->tail, newtail); (A) > CMM_STORE_SHARED(oldtail->next, head); (B) > > (B) is observable by cpu > 1, but not (A) yet > > > ___cds_wfq_dequeue_blocking() > _cds_wfq_empty(q) > return > q->head.next == NULL > && > CMM_LOAD_SHARED(q->tail) == &q->head; > -> false > (q->tail != &q->head) > node = > ___cds_wfq_node_sync_next(&q->head); > -> node is newtail > if ((next = > CMM_LOAD_SHARED(node->next)) == NULL) { > -> taken, > newtail->next is indeed NULL > * (see note below) > if > (CMM_LOAD_SHARED(q->tail) == node) { > -> not taken, > since q->tail still appears as &q->head > } > next = > ___cds_wfq_node_sync_next(node); > -> endless loop > if no other enqueue is performed. (BUG) > } > > (A) is observable by cpu 1 > > * note: I think we should add a cmm_smp_rmb() here to fix this issue. It > would force CPU 1 to necessarily see store (A) if store (B) is seen. > This would be matching the full memory barrier implied after > uatomic_xchg(). >
You are right. Can you add this line of code after merge this patch if there is no other problem. Lai > Thanks, > > Mathieu > >> >> More implementation details are in the code. >> tests/test_urcu_wfq will be update in future for testing new APIs. >> >> >> Signed-off-by: Lai Jiangshan <[email protected]> >> --- >> urcu-call-rcu-impl.h | 50 ++++++++++-------------- >> urcu/static/wfqueue.h | 104 >> ++++++++++++++++++++++++++++++++++++------------ >> urcu/wfqueue.h | 25 ++++++++++-- >> wfqueue.c | 29 ++++++++++++++ >> 4 files changed, 149 insertions(+), 59 deletions(-) >> >> diff --git a/urcu-call-rcu-impl.h b/urcu-call-rcu-impl.h >> index 13b24ff..dbfb410 100644 >> --- a/urcu-call-rcu-impl.h >> +++ b/urcu-call-rcu-impl.h >> @@ -221,7 +221,7 @@ static void *call_rcu_thread(void *arg) >> { >> unsigned long cbcount; >> struct cds_wfq_node *cbs; >> - struct cds_wfq_node **cbs_tail; >> + struct cds_wfq_node *cbs_tail; >> struct call_rcu_data *crdp = (struct call_rcu_data *)arg; >> struct rcu_head *rhp; >> int rt = !!(uatomic_read(&crdp->flags) & URCU_CALL_RCU_RT); >> @@ -243,24 +243,18 @@ static void *call_rcu_thread(void *arg) >> cmm_smp_mb(); >> } >> for (;;) { >> - if (&crdp->cbs.head != _CMM_LOAD_SHARED(crdp->cbs.tail)) { >> - while ((cbs = _CMM_LOAD_SHARED(crdp->cbs.head)) == >> NULL) >> - poll(NULL, 0, 1); >> - _CMM_STORE_SHARED(crdp->cbs.head, NULL); >> - cbs_tail = (struct cds_wfq_node **) >> - uatomic_xchg(&crdp->cbs.tail, &crdp->cbs.head); >> + cbs = __cds_wfq_dequeue_all_blocking(&crdp->cbs, &cbs_tail); >> + if (cbs) { >> synchronize_rcu(); >> cbcount = 0; >> do { >> - while (cbs->next == NULL && >> - &cbs->next != cbs_tail) >> - poll(NULL, 0, 1); >> - if (cbs == &crdp->cbs.dummy) { >> - cbs = cbs->next; >> - continue; >> - } >> rhp = (struct rcu_head *)cbs; >> - cbs = cbs->next; >> + >> + if (cbs != cbs_tail) >> + cbs = __cds_wfq_node_sync_next(cbs); >> + else >> + cbs = NULL; >> + >> rhp->func(rhp); >> cbcount++; >> } while (cbs != NULL); >> @@ -270,8 +264,7 @@ static void *call_rcu_thread(void *arg) >> break; >> rcu_thread_offline(); >> if (!rt) { >> - if (&crdp->cbs.head >> - == _CMM_LOAD_SHARED(crdp->cbs.tail)) { >> + if (cds_wfq_empty(&crdp->cbs)) { >> call_rcu_wait(crdp); >> poll(NULL, 0, 10); >> uatomic_dec(&crdp->futex); >> @@ -625,32 +618,31 @@ void call_rcu(struct rcu_head *head, >> */ >> void call_rcu_data_free(struct call_rcu_data *crdp) >> { >> - struct cds_wfq_node *cbs; >> - struct cds_wfq_node **cbs_tail; >> - struct cds_wfq_node **cbs_endprev; >> + struct cds_wfq_node *head, *tail; >> >> if (crdp == NULL || crdp == default_call_rcu_data) { >> return; >> } >> + >> if ((uatomic_read(&crdp->flags) & URCU_CALL_RCU_STOPPED) == 0) { >> uatomic_or(&crdp->flags, URCU_CALL_RCU_STOP); >> wake_call_rcu_thread(crdp); >> while ((uatomic_read(&crdp->flags) & URCU_CALL_RCU_STOPPED) == >> 0) >> poll(NULL, 0, 1); >> } >> - if (&crdp->cbs.head != _CMM_LOAD_SHARED(crdp->cbs.tail)) { >> - while ((cbs = _CMM_LOAD_SHARED(crdp->cbs.head)) == NULL) >> - poll(NULL, 0, 1); >> - _CMM_STORE_SHARED(crdp->cbs.head, NULL); >> - cbs_tail = (struct cds_wfq_node **) >> - uatomic_xchg(&crdp->cbs.tail, &crdp->cbs.head); >> + >> + if (!cds_wfq_empty(&crdp->cbs)) { >> + head = __cds_wfq_dequeue_all_blocking(&crdp->cbs, &tail); >> + assert(head); >> + >> /* Create default call rcu data if need be */ >> (void) get_default_call_rcu_data(); >> - cbs_endprev = (struct cds_wfq_node **) >> - uatomic_xchg(&default_call_rcu_data, cbs_tail); >> - *cbs_endprev = cbs; >> + >> + __cds_wfq_append_list(&default_call_rcu_data->cbs, head, tail); >> + >> uatomic_add(&default_call_rcu_data->qlen, >> uatomic_read(&crdp->qlen)); >> + >> wake_call_rcu_thread(default_call_rcu_data); >> } >> >> diff --git a/urcu/static/wfqueue.h b/urcu/static/wfqueue.h >> index 636e1af..15ea9fc 100644 >> --- a/urcu/static/wfqueue.h >> +++ b/urcu/static/wfqueue.h >> @@ -10,6 +10,7 @@ >> * dynamically with the userspace rcu library. >> * >> * Copyright 2010 - Mathieu Desnoyers <[email protected]> >> + * Copyright 2011-2012 - Lai Jiangshan <[email protected]> >> * >> * This library is free software; you can redistribute it and/or >> * modify it under the terms of the GNU Lesser General Public >> @@ -29,6 +30,7 @@ >> #include <pthread.h> >> #include <assert.h> >> #include <poll.h> >> +#include <stdbool.h> >> #include <urcu/compiler.h> >> #include <urcu/uatomic.h> >> >> @@ -38,8 +40,6 @@ extern "C" { >> >> /* >> * Queue with wait-free enqueue/blocking dequeue. >> - * This implementation adds a dummy head node when the queue is empty to >> ensure >> - * we can always update the queue locklessly. >> * >> * Inspired from half-wait-free/half-blocking queue implementation done by >> * Paul E. McKenney. >> @@ -57,31 +57,43 @@ static inline void _cds_wfq_init(struct cds_wfq_queue *q) >> { >> int ret; >> >> - _cds_wfq_node_init(&q->dummy); >> /* Set queue head and tail */ >> - q->head = &q->dummy; >> - q->tail = &q->dummy.next; >> + _cds_wfq_node_init(&q->head); >> + q->tail = &q->head; >> ret = pthread_mutex_init(&q->lock, NULL); >> assert(!ret); >> } >> >> -static inline void _cds_wfq_enqueue(struct cds_wfq_queue *q, >> - struct cds_wfq_node *node) >> +static inline bool _cds_wfq_empty(struct cds_wfq_queue *q) >> { >> - struct cds_wfq_node **old_tail; >> + /* >> + * Queue is empty if no node is pointed by q->head.next nor q->tail. >> + */ >> + return q->head.next == NULL && CMM_LOAD_SHARED(q->tail) == &q->head; >> +} >> >> +static inline void ___cds_wfq_append_list(struct cds_wfq_queue *q, >> + struct cds_wfq_node *head, struct cds_wfq_node *tail) >> +{ >> /* >> * uatomic_xchg() implicit memory barrier orders earlier stores to data >> * structure containing node and setting node->next to NULL before >> * publication. >> */ >> - old_tail = uatomic_xchg(&q->tail, &node->next); >> + tail = uatomic_xchg(&q->tail, tail); >> + >> /* >> - * At this point, dequeuers see a NULL old_tail->next, which indicates >> + * At this point, dequeuers see a NULL tail->next, which indicates >> * that the queue is being appended to. The following store will append >> * "node" to the queue from a dequeuer perspective. >> */ >> - CMM_STORE_SHARED(*old_tail, node); >> + CMM_STORE_SHARED(tail->next, head); >> +} >> + >> +static inline void _cds_wfq_enqueue(struct cds_wfq_queue *q, >> + struct cds_wfq_node *node) >> +{ >> + ___cds_wfq_append_list(q, node, node); >> } >> >> /* >> @@ -120,27 +132,46 @@ ___cds_wfq_dequeue_blocking(struct cds_wfq_queue *q) >> { >> struct cds_wfq_node *node, *next; >> >> - /* >> - * Queue is empty if it only contains the dummy node. >> - */ >> - if (q->head == &q->dummy && CMM_LOAD_SHARED(q->tail) == &q->dummy.next) >> + if (_cds_wfq_empty(q)) >> return NULL; >> - node = q->head; >> >> - next = ___cds_wfq_node_sync_next(node); >> + node = ___cds_wfq_node_sync_next(&q->head); >> + >> + if ((next = CMM_LOAD_SHARED(node->next)) == NULL) { >> + if (CMM_LOAD_SHARED(q->tail) == node) { >> + /* >> + * @node is the only node in the queue. >> + * Try to move the tail to &q->head >> + */ >> + _cds_wfq_node_init(&q->head); >> + if (uatomic_cmpxchg(&q->tail, node, &q->head) == node) >> + return node; >> + } >> + next = ___cds_wfq_node_sync_next(node); >> + } >> >> /* >> * Move queue head forward. >> */ >> - q->head = next; >> - /* >> - * Requeue dummy node if we just dequeued it. >> - */ >> - if (node == &q->dummy) { >> - _cds_wfq_node_init(node); >> - _cds_wfq_enqueue(q, node); >> - return ___cds_wfq_dequeue_blocking(q); >> - } >> + q->head.next = next; >> + >> + return node; >> +} >> + >> +/* dequeue all nodes, the nodes are not synchronized for the next pointer */ >> +static inline struct cds_wfq_node * >> +___cds_wfq_dequeue_all_blocking(struct cds_wfq_queue *q, >> + struct cds_wfq_node **tail) >> +{ >> + struct cds_wfq_node *node; >> + >> + if (_cds_wfq_empty(q)) >> + return NULL; >> + >> + node = ___cds_wfq_node_sync_next(&q->head); >> + _cds_wfq_node_init(&q->head); >> + *tail = uatomic_xchg(&q->tail, &q->head); >> + >> return node; >> } >> >> @@ -158,6 +189,27 @@ _cds_wfq_dequeue_blocking(struct cds_wfq_queue *q) >> return retnode; >> } >> >> +static inline struct cds_wfq_node * >> +_cds_wfq_dequeue_all_blocking(struct cds_wfq_queue *q, >> + struct cds_wfq_node **tail) >> +{ >> + struct cds_wfq_node *node, *next; >> + int ret; >> + >> + ret = pthread_mutex_lock(&q->lock); >> + assert(!ret); >> + node = ___cds_wfq_dequeue_all_blocking(q, tail); >> + ret = pthread_mutex_unlock(&q->lock); >> + assert(!ret); >> + >> + /* synchronize all nodes' next pointer */ >> + next = node; >> + while (next != *tail) >> + next = ___cds_wfq_node_sync_next(next); >> + >> + return node; >> +} >> + >> #ifdef __cplusplus >> } >> #endif >> diff --git a/urcu/wfqueue.h b/urcu/wfqueue.h >> index 03a73f1..985f540 100644 >> --- a/urcu/wfqueue.h >> +++ b/urcu/wfqueue.h >> @@ -7,6 +7,7 @@ >> * Userspace RCU library - Queue with Wait-Free Enqueue/Blocking Dequeue >> * >> * Copyright 2010 - Mathieu Desnoyers <[email protected]> >> + * Copyright 2011-2012 - Lai Jiangshan <[email protected]> >> * >> * This library is free software; you can redistribute it and/or >> * modify it under the terms of the GNU Lesser General Public >> @@ -25,6 +26,7 @@ >> >> #include <pthread.h> >> #include <assert.h> >> +#include <stdbool.h> >> #include <urcu/compiler.h> >> >> #ifdef __cplusplus >> @@ -33,8 +35,6 @@ extern "C" { >> >> /* >> * Queue with wait-free enqueue/blocking dequeue. >> - * This implementation adds a dummy head node when the queue is empty to >> ensure >> - * we can always update the queue locklessly. >> * >> * Inspired from half-wait-free/half-blocking queue implementation done by >> * Paul E. McKenney. >> @@ -45,8 +45,7 @@ struct cds_wfq_node { >> }; >> >> struct cds_wfq_queue { >> - struct cds_wfq_node *head, **tail; >> - struct cds_wfq_node dummy; /* Dummy node */ >> + struct cds_wfq_node head, *tail; >> pthread_mutex_t lock; >> }; >> >> @@ -56,18 +55,36 @@ struct cds_wfq_queue { >> >> #define cds_wfq_node_init _cds_wfq_node_init >> #define cds_wfq_init _cds_wfq_init >> +#define cds_wfq_empty _cds_wfq_empty >> +#define __cds_wfq_append_list ___cds_wfq_append_list >> #define cds_wfq_enqueue _cds_wfq_enqueue >> #define __cds_wfq_dequeue_blocking ___cds_wfq_dequeue_blocking >> #define cds_wfq_dequeue_blocking _cds_wfq_dequeue_blocking >> +#define __cds_wfq_node_sync_next ___cds_wfq_node_sync_next >> +#define __cds_wfq_dequeue_all_blocking ___cds_wfq_dequeue_all_blocking >> +#define cds_wfq_dequeue_all_blocking _cds_wfq_dequeue_all_blocking >> >> #else /* !_LGPL_SOURCE */ >> >> extern void cds_wfq_node_init(struct cds_wfq_node *node); >> extern void cds_wfq_init(struct cds_wfq_queue *q); >> +extern bool cds_wfq_empty(struct cds_wfq_queue *q); >> +/* __cds_wfq_append_list: caller ensures mutual exclusion between dequeues >> */ >> +extern void __cds_wfq_append_list(struct cds_wfq_queue *q, >> + struct cds_wfq_node *head, struct cds_wfq_node *tail); >> extern void cds_wfq_enqueue(struct cds_wfq_queue *q, struct cds_wfq_node >> *node); >> /* __cds_wfq_dequeue_blocking: caller ensures mutual exclusion between >> dequeues */ >> extern struct cds_wfq_node *__cds_wfq_dequeue_blocking(struct cds_wfq_queue >> *q); >> extern struct cds_wfq_node *cds_wfq_dequeue_blocking(struct cds_wfq_queue >> *q); >> +extern struct cds_wfq_node *__cds_wfq_node_sync_next(struct cds_wfq_node >> *node); >> +/* >> + * __cds_wfq_dequeue_all_blocking: caller ensures mutual exclusion between >> + * dequeues, and need synchronize next pointer berfore use it. >> + */ >> +extern struct cds_wfq_node *__cds_wfq_dequeue_all_blocking( >> + struct cds_wfq_queue *q, struct cds_wfq_node **tail); >> +extern struct cds_wfq_node *cds_wfq_dequeue_all_blocking( >> + struct cds_wfq_queue *q, struct cds_wfq_node **tail); >> >> #endif /* !_LGPL_SOURCE */ >> >> diff --git a/wfqueue.c b/wfqueue.c >> index 3337171..28a7b58 100644 >> --- a/wfqueue.c >> +++ b/wfqueue.c >> @@ -4,6 +4,7 @@ >> * Userspace RCU library - Queue with Wait-Free Enqueue/Blocking Dequeue >> * >> * Copyright 2010 - Mathieu Desnoyers <[email protected]> >> + * Copyright 2011-2012 - Lai Jiangshan <[email protected]> >> * >> * This library is free software; you can redistribute it and/or >> * modify it under the terms of the GNU Lesser General Public >> @@ -38,6 +39,17 @@ void cds_wfq_init(struct cds_wfq_queue *q) >> _cds_wfq_init(q); >> } >> >> +bool cds_wfq_empty(struct cds_wfq_queue *q) >> +{ >> + return _cds_wfq_empty(q); >> +} >> + >> +void __cds_wfq_append_list(struct cds_wfq_queue *q, >> + struct cds_wfq_node *head, struct cds_wfq_node *tail) >> +{ >> + return ___cds_wfq_append_list(q, head, tail); >> +} >> + >> void cds_wfq_enqueue(struct cds_wfq_queue *q, struct cds_wfq_node *node) >> { >> _cds_wfq_enqueue(q, node); >> @@ -52,3 +64,20 @@ struct cds_wfq_node *cds_wfq_dequeue_blocking(struct >> cds_wfq_queue *q) >> { >> return _cds_wfq_dequeue_blocking(q); >> } >> + >> +struct cds_wfq_node *__cds_wfq_node_sync_next(struct cds_wfq_node *node) >> +{ >> + return ___cds_wfq_node_sync_next(node); >> +} >> + >> +struct cds_wfq_node *__cds_wfq_dequeue_all_blocking( >> + struct cds_wfq_queue *q, struct cds_wfq_node **tail) >> +{ >> + return ___cds_wfq_dequeue_all_blocking(q, tail); >> +} >> + >> +struct cds_wfq_node *cds_wfq_dequeue_all_blocking( >> + struct cds_wfq_queue *q, struct cds_wfq_node **tail) >> +{ >> + return _cds_wfq_dequeue_all_blocking(q, tail); >> +} >> -- >> 1.7.7 >> >> >> _______________________________________________ >> lttng-dev mailing list >> [email protected] >> http://lists.lttng.org/cgi-bin/mailman/listinfo/lttng-dev > > -- > Mathieu Desnoyers > Operating System Efficiency R&D Consultant > EfficiOS Inc. > http://www.efficios.com _______________________________________________ lttng-dev mailing list [email protected] http://lists.lttng.org/cgi-bin/mailman/listinfo/lttng-dev
