* Mathieu Desnoyers ([email protected]) wrote:
> * Lai Jiangshan ([email protected]) wrote:
> > Some guys would be surprised by this fact:
> > There are already TWO implementations of wfqueue in urcu.
> > 
> > The first one is in urcu/static/wfqueue.h:
> > 1) enqueue: exchange the tail and then update previous->next
> > 2) dequeue: wait for first node's next pointer and them shift, a dummy node
> >     is introduced to avoid the queue->tail become NULL when shift.
> > 
> > The second one shares some code with the first one, and the left code
> > are spreading in urcu-call-rcu-impl.h:
> > 1) enqueue: share with the first one
> > 2) no dequeue operation: and no shift, so it don't need dummy node,
> >     Although the dummy node is queued when initialization, but it is removed
> >     after the first dequeue_all operation in call_rcu_thread().
> >     call_rcu_data_free() forgets to handle the dummy node if it is not 
> > removed.
> > 3)dequeue_all: record the old head and tail, and queue->head become the 
> > special
> >     tail node.(atomic record the tail and change the tail).
> > 
> > The second implementation's code are spreading, bad for review, and it is 
> > not
> > tested by tests/test_urcu_wfq.
> > 
> > So we need a better implementation avoid the dummy node dancing and can 
> > service
> > both generic wfqueue APIs and dequeue_all API for call rcu.
> > 
> > The new implementation:
> > 1) enqueue: share with the first one/original implementation.
> > 2) dequeue: shift when node count >= 2, cmpxchg when node count = 1.
> >     no dummy node, save memory.
> > 3) dequeue_all: simply set queue->head.next to NULL, xchg the tail
> >     and return the old head.next.
> > 
> > More implementation details are in the code.
> > tests/test_urcu_wfq will be update in future for testing new APIs.
> 
> Hi Lai,
> 
> Some other style-related questions below,

FYI, I'm preparing a patch implementing those and some other ideas
regarding the API. I plan to post it tomorrow.

Thanks,

Mathieu

> 
> > 
> > 
> > Signed-off-by: Lai Jiangshan <[email protected]>
> > ---
> >  urcu-call-rcu-impl.h  |   50 ++++++++++--------------
> >  urcu/static/wfqueue.h |  104 
> > ++++++++++++++++++++++++++++++++++++------------
> >  urcu/wfqueue.h        |   25 ++++++++++--
> >  wfqueue.c             |   29 ++++++++++++++
> >  4 files changed, 149 insertions(+), 59 deletions(-)
> > 
> > diff --git a/urcu-call-rcu-impl.h b/urcu-call-rcu-impl.h
> > index 13b24ff..dbfb410 100644
> > --- a/urcu-call-rcu-impl.h
> > +++ b/urcu-call-rcu-impl.h
> > @@ -221,7 +221,7 @@ static void *call_rcu_thread(void *arg)
> >  {
> >     unsigned long cbcount;
> >     struct cds_wfq_node *cbs;
> > -   struct cds_wfq_node **cbs_tail;
> > +   struct cds_wfq_node *cbs_tail;
> >     struct call_rcu_data *crdp = (struct call_rcu_data *)arg;
> >     struct rcu_head *rhp;
> >     int rt = !!(uatomic_read(&crdp->flags) & URCU_CALL_RCU_RT);
> > @@ -243,24 +243,18 @@ static void *call_rcu_thread(void *arg)
> >             cmm_smp_mb();
> >     }
> >     for (;;) {
> > -           if (&crdp->cbs.head != _CMM_LOAD_SHARED(crdp->cbs.tail)) {
> > -                   while ((cbs = _CMM_LOAD_SHARED(crdp->cbs.head)) == NULL)
> > -                           poll(NULL, 0, 1);
> > -                   _CMM_STORE_SHARED(crdp->cbs.head, NULL);
> > -                   cbs_tail = (struct cds_wfq_node **)
> > -                           uatomic_xchg(&crdp->cbs.tail, &crdp->cbs.head);
> > +           cbs = __cds_wfq_dequeue_all_blocking(&crdp->cbs, &cbs_tail);
> > +           if (cbs) {
> >                     synchronize_rcu();
> >                     cbcount = 0;
> >                     do {
> > -                           while (cbs->next == NULL &&
> > -                                  &cbs->next != cbs_tail)
> > -                                   poll(NULL, 0, 1);
> > -                           if (cbs == &crdp->cbs.dummy) {
> > -                                   cbs = cbs->next;
> > -                                   continue;
> > -                           }
> >                             rhp = (struct rcu_head *)cbs;
> > -                           cbs = cbs->next;
> > +
> > +                           if (cbs != cbs_tail)
> > +                                   cbs = __cds_wfq_node_sync_next(cbs);
> > +                           else
> > +                                   cbs = NULL;
> > +
> >                             rhp->func(rhp);
> >                             cbcount++;
> >                     } while (cbs != NULL);
> > @@ -270,8 +264,7 @@ static void *call_rcu_thread(void *arg)
> >                     break;
> >             rcu_thread_offline();
> >             if (!rt) {
> > -                   if (&crdp->cbs.head
> > -                       == _CMM_LOAD_SHARED(crdp->cbs.tail)) {
> > +                   if (cds_wfq_empty(&crdp->cbs)) {
> >                             call_rcu_wait(crdp);
> >                             poll(NULL, 0, 10);
> >                             uatomic_dec(&crdp->futex);
> > @@ -625,32 +618,31 @@ void call_rcu(struct rcu_head *head,
> >   */
> >  void call_rcu_data_free(struct call_rcu_data *crdp)
> >  {
> > -   struct cds_wfq_node *cbs;
> > -   struct cds_wfq_node **cbs_tail;
> > -   struct cds_wfq_node **cbs_endprev;
> > +   struct cds_wfq_node *head, *tail;
> >  
> >     if (crdp == NULL || crdp == default_call_rcu_data) {
> >             return;
> >     }
> > +
> >     if ((uatomic_read(&crdp->flags) & URCU_CALL_RCU_STOPPED) == 0) {
> >             uatomic_or(&crdp->flags, URCU_CALL_RCU_STOP);
> >             wake_call_rcu_thread(crdp);
> >             while ((uatomic_read(&crdp->flags) & URCU_CALL_RCU_STOPPED) == 
> > 0)
> >                     poll(NULL, 0, 1);
> >     }
> > -   if (&crdp->cbs.head != _CMM_LOAD_SHARED(crdp->cbs.tail)) {
> > -           while ((cbs = _CMM_LOAD_SHARED(crdp->cbs.head)) == NULL)
> > -                   poll(NULL, 0, 1);
> > -           _CMM_STORE_SHARED(crdp->cbs.head, NULL);
> > -           cbs_tail = (struct cds_wfq_node **)
> > -                   uatomic_xchg(&crdp->cbs.tail, &crdp->cbs.head);
> > +
> > +   if (!cds_wfq_empty(&crdp->cbs)) {
> > +           head = __cds_wfq_dequeue_all_blocking(&crdp->cbs, &tail);
> > +           assert(head);
> > +
> >             /* Create default call rcu data if need be */
> >             (void) get_default_call_rcu_data();
> > -           cbs_endprev = (struct cds_wfq_node **)
> > -                   uatomic_xchg(&default_call_rcu_data, cbs_tail);
> > -           *cbs_endprev = cbs;
> > +
> > +           __cds_wfq_append_list(&default_call_rcu_data->cbs, head, tail);
> > +
> >             uatomic_add(&default_call_rcu_data->qlen,
> >                         uatomic_read(&crdp->qlen));
> > +
> >             wake_call_rcu_thread(default_call_rcu_data);
> >     }
> >  
> > diff --git a/urcu/static/wfqueue.h b/urcu/static/wfqueue.h
> > index 636e1af..15ea9fc 100644
> > --- a/urcu/static/wfqueue.h
> > +++ b/urcu/static/wfqueue.h
> > @@ -10,6 +10,7 @@
> >   * dynamically with the userspace rcu library.
> >   *
> >   * Copyright 2010 - Mathieu Desnoyers <[email protected]>
> > + * Copyright 2011-2012 - Lai Jiangshan <[email protected]>
> >   *
> >   * This library is free software; you can redistribute it and/or
> >   * modify it under the terms of the GNU Lesser General Public
> > @@ -29,6 +30,7 @@
> >  #include <pthread.h>
> >  #include <assert.h>
> >  #include <poll.h>
> > +#include <stdbool.h>
> >  #include <urcu/compiler.h>
> >  #include <urcu/uatomic.h>
> >  
> > @@ -38,8 +40,6 @@ extern "C" {
> >  
> >  /*
> >   * Queue with wait-free enqueue/blocking dequeue.
> > - * This implementation adds a dummy head node when the queue is empty to 
> > ensure
> > - * we can always update the queue locklessly.
> >   *
> >   * Inspired from half-wait-free/half-blocking queue implementation done by
> >   * Paul E. McKenney.
> > @@ -57,31 +57,43 @@ static inline void _cds_wfq_init(struct cds_wfq_queue 
> > *q)
> >  {
> >     int ret;
> >  
> > -   _cds_wfq_node_init(&q->dummy);
> >     /* Set queue head and tail */
> > -   q->head = &q->dummy;
> > -   q->tail = &q->dummy.next;
> > +   _cds_wfq_node_init(&q->head);
> > +   q->tail = &q->head;
> >     ret = pthread_mutex_init(&q->lock, NULL);
> >     assert(!ret);
> >  }
> >  
> > -static inline void _cds_wfq_enqueue(struct cds_wfq_queue *q,
> > -                               struct cds_wfq_node *node)
> > +static inline bool _cds_wfq_empty(struct cds_wfq_queue *q)
> >  {
> > -   struct cds_wfq_node **old_tail;
> > +   /*
> > +    * Queue is empty if no node is pointed by q->head.next nor q->tail.
> > +    */
> > +   return q->head.next == NULL && CMM_LOAD_SHARED(q->tail) == &q->head;
> > +}
> >  
> > +static inline void ___cds_wfq_append_list(struct cds_wfq_queue *q,
> > +           struct cds_wfq_node *head, struct cds_wfq_node *tail)
> > +{
> >     /*
> >      * uatomic_xchg() implicit memory barrier orders earlier stores to data
> >      * structure containing node and setting node->next to NULL before
> >      * publication.
> >      */
> > -   old_tail = uatomic_xchg(&q->tail, &node->next);
> > +   tail = uatomic_xchg(&q->tail, tail);
> 
> I'd prefer to keep "old_tail" here, because it becomes clearer to anyone
> reviewing that uatomic_xchg() returns the old tail (and this extra
> clarity comes without any overhead).
> 
> > +
> >     /*
> > -    * At this point, dequeuers see a NULL old_tail->next, which indicates
> > +    * At this point, dequeuers see a NULL tail->next, which indicates
> >      * that the queue is being appended to. The following store will append
> >      * "node" to the queue from a dequeuer perspective.
> >      */
> > -   CMM_STORE_SHARED(*old_tail, node);
> > +   CMM_STORE_SHARED(tail->next, head);
> > +}
> > +
> > +static inline void _cds_wfq_enqueue(struct cds_wfq_queue *q,
> > +                               struct cds_wfq_node *node)
> > +{
> > +   ___cds_wfq_append_list(q, node, node);
> >  }
> 
> Why not keep ___cds_wfq_append_list() merged into _cds_wfq_enqueue() ?
> 
> This would keep the number of symbols exported minimal.
> 
> So if I get it right, one "_" prefix is the "normally used" functions
> (exposed through the LGPL symbol API).
> 
> The "__" prefix are somewhat more internal, but can also be used
> externally.
> 
> Finally, the "___" prefix seem to be quite similar to the
> double-underscores.
> 
> We might need more consistency, I'm not sure the triple-underscores are
> needed. Also, I'm not sure should export the double-underscore functions
> outside of LGPL use (in other words, maybe we should not expose them to
> !LGPL_SOURCE code). So we would emit the static inlines, but no symbols
> for those. This covers ___cds_wfq_node_sync_next(), and
> ___cds_wfq_dequeue_all_blocking (which requires the caller to use
> sync_next). Currently, all code that needs to fine-grained integration
> is within the userspace RCU tree, which defines LGPL_SOURCE.
> 
> >  
> >  /*
> > @@ -120,27 +132,46 @@ ___cds_wfq_dequeue_blocking(struct cds_wfq_queue *q)
> >  {
> >     struct cds_wfq_node *node, *next;
> >  
> > -   /*
> > -    * Queue is empty if it only contains the dummy node.
> > -    */
> > -   if (q->head == &q->dummy && CMM_LOAD_SHARED(q->tail) == &q->dummy.next)
> > +   if (_cds_wfq_empty(q))
> >             return NULL;
> > -   node = q->head;
> >  
> > -   next = ___cds_wfq_node_sync_next(node);
> > +   node = ___cds_wfq_node_sync_next(&q->head);
> > +
> > +   if ((next = CMM_LOAD_SHARED(node->next)) == NULL) {
> > +           if (CMM_LOAD_SHARED(q->tail) == node) {
> > +                   /*
> > +                    * @node is the only node in the queue.
> > +                    * Try to move the tail to &q->head
> > +                    */
> > +                   _cds_wfq_node_init(&q->head);
> > +                   if (uatomic_cmpxchg(&q->tail, node, &q->head) == node)
> > +                           return node;
> > +           }
> > +           next = ___cds_wfq_node_sync_next(node);
> > +   }
> >  
> >     /*
> >      * Move queue head forward.
> >      */
> > -   q->head = next;
> > -   /*
> > -    * Requeue dummy node if we just dequeued it.
> > -    */
> > -   if (node == &q->dummy) {
> > -           _cds_wfq_node_init(node);
> > -           _cds_wfq_enqueue(q, node);
> > -           return ___cds_wfq_dequeue_blocking(q);
> > -   }
> > +   q->head.next = next;
> > +
> > +   return node;
> > +}
> > +
> > +/* dequeue all nodes, the nodes are not synchronized for the next pointer 
> > */
> > +static inline struct cds_wfq_node *
> > +___cds_wfq_dequeue_all_blocking(struct cds_wfq_queue *q,
> > +           struct cds_wfq_node **tail)
> > +{
> > +   struct cds_wfq_node *node;
> > +
> > +   if (_cds_wfq_empty(q))
> > +           return NULL;
> > +
> > +   node = ___cds_wfq_node_sync_next(&q->head);
> > +   _cds_wfq_node_init(&q->head);
> > +   *tail = uatomic_xchg(&q->tail, &q->head);
> > +
> >     return node;
> >  }
> >  
> > @@ -158,6 +189,27 @@ _cds_wfq_dequeue_blocking(struct cds_wfq_queue *q)
> >     return retnode;
> >  }
> >  
> > +static inline struct cds_wfq_node *
> > +_cds_wfq_dequeue_all_blocking(struct cds_wfq_queue *q,
> > +           struct cds_wfq_node **tail)
> > +{
> > +   struct cds_wfq_node *node, *next;
> > +   int ret;
> > +
> > +   ret = pthread_mutex_lock(&q->lock);
> > +   assert(!ret);
> > +   node = ___cds_wfq_dequeue_all_blocking(q, tail);
> > +   ret = pthread_mutex_unlock(&q->lock);
> > +   assert(!ret);
> 
> So we take the queue lock on dequeue_all, but not on dequeue. It might
> be good to have a consistent behavior: either we lock dequeue and
> dequeue_all, or leave the lock entirely to the caller (and document it).
> 
> Thoughts ?
> 
> Thanks!
> 
> Mathieu
> 
> > +
> > +   /* synchronize all nodes' next pointer */
> > +   next = node;
> > +   while (next != *tail)
> > +           next = ___cds_wfq_node_sync_next(next);
> > +
> > +   return node;
> > +}
> > +
> >  #ifdef __cplusplus
> >  }
> >  #endif
> > diff --git a/urcu/wfqueue.h b/urcu/wfqueue.h
> > index 03a73f1..985f540 100644
> > --- a/urcu/wfqueue.h
> > +++ b/urcu/wfqueue.h
> > @@ -7,6 +7,7 @@
> >   * Userspace RCU library - Queue with Wait-Free Enqueue/Blocking Dequeue
> >   *
> >   * Copyright 2010 - Mathieu Desnoyers <[email protected]>
> > + * Copyright 2011-2012 - Lai Jiangshan <[email protected]>
> >   *
> >   * This library is free software; you can redistribute it and/or
> >   * modify it under the terms of the GNU Lesser General Public
> > @@ -25,6 +26,7 @@
> >  
> >  #include <pthread.h>
> >  #include <assert.h>
> > +#include <stdbool.h>
> >  #include <urcu/compiler.h>
> >  
> >  #ifdef __cplusplus
> > @@ -33,8 +35,6 @@ extern "C" {
> >  
> >  /*
> >   * Queue with wait-free enqueue/blocking dequeue.
> > - * This implementation adds a dummy head node when the queue is empty to 
> > ensure
> > - * we can always update the queue locklessly.
> >   *
> >   * Inspired from half-wait-free/half-blocking queue implementation done by
> >   * Paul E. McKenney.
> > @@ -45,8 +45,7 @@ struct cds_wfq_node {
> >  };
> >  
> >  struct cds_wfq_queue {
> > -   struct cds_wfq_node *head, **tail;
> > -   struct cds_wfq_node dummy;      /* Dummy node */
> > +   struct cds_wfq_node head, *tail;
> >     pthread_mutex_t lock;
> >  };
> >  
> > @@ -56,18 +55,36 @@ struct cds_wfq_queue {
> >  
> >  #define cds_wfq_node_init          _cds_wfq_node_init
> >  #define cds_wfq_init               _cds_wfq_init
> > +#define cds_wfq_empty              _cds_wfq_empty
> > +#define __cds_wfq_append_list      ___cds_wfq_append_list
> >  #define cds_wfq_enqueue            _cds_wfq_enqueue
> >  #define __cds_wfq_dequeue_blocking ___cds_wfq_dequeue_blocking
> >  #define cds_wfq_dequeue_blocking   _cds_wfq_dequeue_blocking
> > +#define __cds_wfq_node_sync_next   ___cds_wfq_node_sync_next
> > +#define __cds_wfq_dequeue_all_blocking     ___cds_wfq_dequeue_all_blocking
> > +#define cds_wfq_dequeue_all_blocking       _cds_wfq_dequeue_all_blocking
> >  
> >  #else /* !_LGPL_SOURCE */
> >  
> >  extern void cds_wfq_node_init(struct cds_wfq_node *node);
> >  extern void cds_wfq_init(struct cds_wfq_queue *q);
> > +extern bool cds_wfq_empty(struct cds_wfq_queue *q);
> > +/* __cds_wfq_append_list: caller ensures mutual exclusion between dequeues 
> > */
> > +extern void __cds_wfq_append_list(struct cds_wfq_queue *q,
> > +           struct cds_wfq_node *head, struct cds_wfq_node *tail);
> >  extern void cds_wfq_enqueue(struct cds_wfq_queue *q, struct cds_wfq_node 
> > *node);
> >  /* __cds_wfq_dequeue_blocking: caller ensures mutual exclusion between 
> > dequeues */
> >  extern struct cds_wfq_node *__cds_wfq_dequeue_blocking(struct 
> > cds_wfq_queue *q);
> >  extern struct cds_wfq_node *cds_wfq_dequeue_blocking(struct cds_wfq_queue 
> > *q);
> > +extern struct cds_wfq_node *__cds_wfq_node_sync_next(struct cds_wfq_node 
> > *node);
> > +/*
> > + * __cds_wfq_dequeue_all_blocking: caller ensures mutual exclusion between
> > + * dequeues, and need synchronize next pointer berfore use it.
> > + */
> > +extern struct cds_wfq_node *__cds_wfq_dequeue_all_blocking(
> > +           struct cds_wfq_queue *q, struct cds_wfq_node **tail);
> > +extern struct cds_wfq_node *cds_wfq_dequeue_all_blocking(
> > +           struct cds_wfq_queue *q, struct cds_wfq_node **tail);
> >  
> >  #endif /* !_LGPL_SOURCE */
> >  
> > diff --git a/wfqueue.c b/wfqueue.c
> > index 3337171..28a7b58 100644
> > --- a/wfqueue.c
> > +++ b/wfqueue.c
> > @@ -4,6 +4,7 @@
> >   * Userspace RCU library - Queue with Wait-Free Enqueue/Blocking Dequeue
> >   *
> >   * Copyright 2010 - Mathieu Desnoyers <[email protected]>
> > + * Copyright 2011-2012 - Lai Jiangshan <[email protected]>
> >   *
> >   * This library is free software; you can redistribute it and/or
> >   * modify it under the terms of the GNU Lesser General Public
> > @@ -38,6 +39,17 @@ void cds_wfq_init(struct cds_wfq_queue *q)
> >     _cds_wfq_init(q);
> >  }
> >  
> > +bool cds_wfq_empty(struct cds_wfq_queue *q)
> > +{
> > +   return _cds_wfq_empty(q);
> > +}
> > +
> > +void __cds_wfq_append_list(struct cds_wfq_queue *q,
> > +           struct cds_wfq_node *head, struct cds_wfq_node *tail)
> > +{
> > +   return ___cds_wfq_append_list(q, head, tail);
> > +}
> > +
> >  void cds_wfq_enqueue(struct cds_wfq_queue *q, struct cds_wfq_node *node)
> >  {
> >     _cds_wfq_enqueue(q, node);
> > @@ -52,3 +64,20 @@ struct cds_wfq_node *cds_wfq_dequeue_blocking(struct 
> > cds_wfq_queue *q)
> >  {
> >     return _cds_wfq_dequeue_blocking(q);
> >  }
> > +
> > +struct cds_wfq_node *__cds_wfq_node_sync_next(struct cds_wfq_node *node)
> > +{
> > +   return ___cds_wfq_node_sync_next(node);
> > +}
> > +
> > +struct cds_wfq_node *__cds_wfq_dequeue_all_blocking(
> > +           struct cds_wfq_queue *q, struct cds_wfq_node **tail)
> > +{
> > +   return ___cds_wfq_dequeue_all_blocking(q, tail);
> > +}
> > +
> > +struct cds_wfq_node *cds_wfq_dequeue_all_blocking(
> > +           struct cds_wfq_queue *q, struct cds_wfq_node **tail)
> > +{
> > +   return _cds_wfq_dequeue_all_blocking(q, tail);
> > +}
> > -- 
> > 1.7.7
> > 
> > 
> > _______________________________________________
> > lttng-dev mailing list
> > [email protected]
> > http://lists.lttng.org/cgi-bin/mailman/listinfo/lttng-dev
> 
> -- 
> Mathieu Desnoyers
> Operating System Efficiency R&D Consultant
> EfficiOS Inc.
> http://www.efficios.com

-- 
Mathieu Desnoyers
Operating System Efficiency R&D Consultant
EfficiOS Inc.
http://www.efficios.com

_______________________________________________
lttng-dev mailing list
[email protected]
http://lists.lttng.org/cgi-bin/mailman/listinfo/lttng-dev

Reply via email to