* Paolo Bonzini ([email protected]) wrote: > The current futex implementation is actually busy waiting on the > list, because futex is never set to -1. This one instead piggybacks > on the flags field, so that the futex can also be used for the STOP flag.
Please see if my commit fixes the problem you refer to here. Some comments below, > > Signed-off-by: Paolo Bonzini <[email protected]> > --- > urcu-call-rcu-impl.h | 43 +++++++++++++++++++++++++------------------ > urcu-call-rcu.h | 1 + > 2 files changed, 26 insertions(+), 18 deletions(-) > > diff --git a/urcu-call-rcu-impl.h b/urcu-call-rcu-impl.h > index ca597d0..9824515 100644 > --- a/urcu-call-rcu-impl.h > +++ b/urcu-call-rcu-impl.h > @@ -45,8 +45,7 @@ > > struct call_rcu_data { > struct cds_wfq_queue cbs; > - unsigned long flags; > - int futex; > + int flags; > pthread_t tid; > int cpu_affinity; > struct cds_list_head list; > @@ -89,22 +88,18 @@ static long maxcpus; > > static void call_rcu_wait(struct call_rcu_data *crdp) > { > - /* Read call_rcu list before read futex */ > - cmm_smp_mb(); > - if (uatomic_read(&crdp->futex) == -1) > - futex_async(&crdp->futex, FUTEX_WAIT, -1, > - NULL, NULL, 0); > + int flags; > + for (;;) { > + flags = uatomic_read(&crdp->flags); > + if (flags & (URCU_CALL_RCU_BUSY | URCU_CALL_RCU_STOP)) > + break; > + futex_async(&crdp->flags, FUTEX_WAIT, flags, NULL, 0, 0); Hrm, not sure this works. We need to order call_rcu list access wrt "futex" (or here flag) value accesses. This seems to be missing here. (uatomic_read does not imply any memory barrier). I'm tempted to just leave the mutex in place for now until we see a clearly measurable performance gain by going with the "or/and" flag approach compared to the simpler (in terms of memory ordering) mutex implementation. As far as the or/and implementations you provide in separate patches are concerned, I'm tempted to pull them for completeness sake (they might end up being useful). Before I do that though, I'm just curious about the memory ordering guarantees they should provide: no-ordering seems like a good thing for or/and: it would be similar to add/sub. Thoughts ? Thanks, Mathieu > + } > } > > static void call_rcu_wake_up(struct call_rcu_data *crdp) > { > - /* Write to call_rcu list before reading/writing futex */ > - cmm_smp_mb(); > - if (unlikely(uatomic_read(&crdp->futex) == -1)) { > - uatomic_set(&crdp->futex, 0); > - futex_async(&crdp->futex, FUTEX_WAKE, 1, > - NULL, NULL, 0); > - } > + futex_async(&crdp->flags, FUTEX_WAKE, 1, NULL, NULL, 0); > } > > /* Allocate the array if it has not already been allocated. */ > @@ -209,7 +204,16 @@ static void *call_rcu_thread(void *arg) > > thread_call_rcu_data = crdp; > for (;;) { > - if (&crdp->cbs.head != _CMM_LOAD_SHARED(crdp->cbs.tail)) { > + for (;;) { > + if (&crdp->cbs.head > + == _CMM_LOAD_SHARED(crdp->cbs.tail)) { > + uatomic_and(&crdp->flags, ~URCU_CALL_RCU_BUSY); > + /* Write flags before rechecking the list. */ > + cmm_smp_mb(); > + if (&crdp->cbs.head > + == _CMM_LOAD_SHARED(crdp->cbs.tail)) > + break; > + } > while ((cbs = _CMM_LOAD_SHARED(crdp->cbs.head)) == NULL) > poll(NULL, 0, 1); > _CMM_STORE_SHARED(crdp->cbs.head, NULL); > @@ -229,13 +233,14 @@ static void *call_rcu_thread(void *arg) > rhp->func(rhp); > } while (cbs != NULL); > } > + /* Read call_rcu list before reading the flags. */ > + cmm_smp_mb(); > if (uatomic_read(&crdp->flags) & URCU_CALL_RCU_STOP) > break; > if (uatomic_read(&crdp->flags) & URCU_CALL_RCU_RT) > poll(NULL, 0, 10); > else { > - if (&crdp->cbs.head == _CMM_LOAD_SHARED(crdp->cbs.tail)) > - call_rcu_wait(crdp); > + call_rcu_wait(crdp); > poll(NULL, 0, 10); > } > } > @@ -262,7 +267,6 @@ static void call_rcu_data_init(struct call_rcu_data > **crdpp, > } > memset(crdp, '\0', sizeof(*crdp)); > cds_wfq_init(&crdp->cbs); > - crdp->futex = 0; > crdp->flags = flags; > cds_list_add(&crdp->list, &call_rcu_data_list); > crdp->cpu_affinity = cpu_affinity; > @@ -513,6 +517,9 @@ void call_rcu(struct rcu_head *head, > head->func = func; > crdp = get_call_rcu_data(); > cds_wfq_enqueue(&crdp->cbs, &head->next); > + /* Write list before writing the flags. */ > + cmm_smp_mb(); > + uatomic_or(&crdp->flags, URCU_CALL_RCU_BUSY); > wake_call_rcu_thread(crdp); > } > > diff --git a/urcu-call-rcu.h b/urcu-call-rcu.h > index e76a844..bfd56ec 100644 > --- a/urcu-call-rcu.h > +++ b/urcu-call-rcu.h > @@ -48,6 +48,7 @@ struct call_rcu_data; > #define URCU_CALL_RCU_RUNNING 0x2 > #define URCU_CALL_RCU_STOP 0x4 > #define URCU_CALL_RCU_STOPPED 0x8 > +#define URCU_CALL_RCU_BUSY 0x10 > > /* > * The rcu_head data structure is placed in the structure to be freed > -- > 1.7.4.4 > > > > _______________________________________________ > ltt-dev mailing list > [email protected] > http://lists.casi.polymtl.ca/cgi-bin/mailman/listinfo/ltt-dev > -- Mathieu Desnoyers Operating System Efficiency R&D Consultant EfficiOS Inc. http://www.efficios.com _______________________________________________ ltt-dev mailing list [email protected] http://lists.casi.polymtl.ca/cgi-bin/mailman/listinfo/ltt-dev
