On 08/22/2011 08:28 PM, Mathieu Desnoyers wrote: > * Lai Jiangshan ([email protected]) wrote: >> Use userspace lock based on futex with proxies APIs for waiting and boosting. >> Simpler implmentation. >> >> Signed-off-by: Lai Jiangshan <[email protected]> >> --- >> urcu.c | 179 >> +++++++++++++++++----------------------------------- >> urcu/static/urcu.h | 120 ++++++++++++++--------------------- >> 2 files changed, 105 insertions(+), 194 deletions(-) >> >> diff --git a/urcu.c b/urcu.c >> index 039df55..328b2fb 100644 >> --- a/urcu.c >> +++ b/urcu.c >> @@ -40,6 +40,7 @@ >> #include "urcu/static/urcu.h" >> /* Do not #define _LGPL_SOURCE to ensure we can emit the wrapper symbols */ >> #include "urcu.h" >> +#include "urcu-wait-lock-impl.h" >> >> #ifdef RCU_MEMBARRIER >> static int init_done; >> @@ -63,16 +64,6 @@ void __attribute__((destructor)) rcu_exit(void); >> >> static pthread_mutex_t rcu_gp_lock = PTHREAD_MUTEX_INITIALIZER; >> >> -int32_t gp_futex; >> - >> -/* >> - * Global grace period counter. >> - * Contains the current RCU_GP_CTR_PHASE. >> - * Also has a RCU_GP_COUNT of 1, to accelerate the reader fast path. >> - * Written to only by writer with mutex taken. Read by both writer and >> readers. >> - */ >> -unsigned long rcu_gp_ctr = RCU_GP_COUNT; >> - >> /* >> * Written to only by each individual reader. Read by both the reader and >> the >> * writers. >> @@ -190,147 +181,91 @@ static void smp_mb_master(int group) >> } >> #endif /* #ifdef RCU_SIGNAL */ >> >> -/* >> - * synchronize_rcu() waiting. Single thread. >> - */ >> -static void wait_gp(void) >> +void __urcu_read_unlock_specail(void) > > specail -> special, > >> { >> - /* Read reader_gp before read futex */ >> - smp_mb_master(RCU_MB_GROUP); >> - if (uatomic_read(&gp_futex) == -1) >> - futex_async(&gp_futex, FUTEX_WAIT, -1, >> - NULL, NULL, 0); >> + if (uwl_onwer(&rcu_reader.wait) == rcu_reader.tid) { > > onwer -> owner, > >> + uwl_unlock_if_not_proxy_unlocked(&rcu_reader.wait, >> + rcu_reader.tid); >> + } >> } >> >> -void update_counter_and_wait(void) >> + >> +void synchronize_rcu(void) > > Hrm, so you change the 2-phase scheme into a 1-phase waiting scheme > here, and wait explicitely to see all reader's ctl value to be "0".
No. ------> and wait explicitely until all readers call __urcu_read_unlock_special(). Querying reader's ctl value once only is enough. Paul suggested that we should not add overhead to short&quick read site, so spin-wait and timed-wait are added before we do the real wait. > > One major reason why this solution is not acceptable is because slow > synchronize_rcu() thread wrt fast reader thread could cause > synchronize_rcu() to never progress, even if the reader threads are > doing multiple (different) read-side critical sections back-to-back. > The write could see the reader thread as always in a critical section > (if the timing is right). The is explained in: > > http://www.efficios.com/publications > "User-Level Implementations of Read-Copy Update" > > This is one reason why I really want to have formal models of these > changes before integrating them into liburcu. A paper on the formal > modeling of liburcu: > > http://www.lttng.org/pub/thesis/desnoyers-dissertation-2009-12.pdf > "Chapter 7 -- Paper 4: Multi-Core Systems Modeling for Formal > Verification of Parallel Algorithms" > >> { >> CDS_LIST_HEAD(qsreaders); >> - int wait_loops = 0; >> + int wait_loops; >> struct rcu_reader *index, *tmp; >> + pid_t self = syscall(SYS_gettid); >> >> - /* Switch parity: 0 -> 1, 1 -> 0 */ >> - CMM_STORE_SHARED(rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR_PHASE); >> + mutex_lock(&rcu_gp_lock); >> >> - /* >> - * Must commit rcu_gp_ctr update to memory before waiting for quiescent >> - * state. Failure to do so could result in the writer waiting forever >> - * while new readers are always accessing data (no progress). Enforce >> - * compiler-order of store to rcu_gp_ctr before load rcu_reader ctr. >> - */ >> - cmm_barrier(); >> + if (cds_list_empty(®istry)) >> + goto out; >> >> - /* >> - * >> - * Adding a cmm_smp_mb() which is _not_ formally required, but makes the >> - * model easier to understand. It does not have a big performance impact >> - * anyway, given this is the write-side. >> - */ >> - cmm_smp_mb(); >> + cds_list_for_each_entry(index, ®istry, node) >> + uwl_proxy_lock(&index->wait, index->tid); >> >> /* >> - * Wait for each thread rcu_reader.ctr count to become 0. >> + * synchronize_rcu rcu_read_unlock(outmost) >> + * >> + * sync = tid; ctr = 0; >> + * smp_mb_master()(1); smp_mb_slave()(3); >> + * test index->ctr and wait; test index->wait and wakeup; >> */ >> - for (;;) { >> - wait_loops++; >> - if (wait_loops == RCU_QS_ACTIVE_ATTEMPTS) { >> - uatomic_dec(&gp_futex); >> - /* Write futex before read reader_gp */ >> - smp_mb_master(RCU_MB_GROUP); >> - } >> + smp_mb_master(RCU_MB_GROUP); /* (1) */ >> >> + for (wait_loops = 0; wait_loops < RCU_QS_ACTIVE_ATTEMPTS; wait_loops++) >> { >> cds_list_for_each_entry_safe(index, tmp, ®istry, node) { >> - if (!rcu_gp_ongoing(&index->ctr)) >> + if (_CMM_LOAD_SHARED(index->ctr) == 0) { >> + uwl_proxy_unlock(&index->wait); >> cds_list_move(&index->node, &qsreaders); >> + } >> } >> >> + if (cds_list_empty(®istry)) >> + goto done; >> + >> #ifndef HAS_INCOHERENT_CACHES >> - if (cds_list_empty(®istry)) { >> - if (wait_loops == RCU_QS_ACTIVE_ATTEMPTS) { >> - /* Read reader_gp before write futex */ >> - smp_mb_master(RCU_MB_GROUP); >> - uatomic_set(&gp_futex, 0); >> - } >> - break; >> - } else { >> - if (wait_loops == RCU_QS_ACTIVE_ATTEMPTS) >> - wait_gp(); >> - else >> - caa_cpu_relax(); >> - } >> + caa_cpu_relax(); >> #else /* #ifndef HAS_INCOHERENT_CACHES */ >> - /* >> - * BUSY-LOOP. Force the reader thread to commit its >> - * rcu_reader.ctr update to memory if we wait for too long. >> - */ >> - if (cds_list_empty(®istry)) { >> - if (wait_loops == RCU_QS_ACTIVE_ATTEMPTS) { >> - /* Read reader_gp before write futex */ >> - smp_mb_master(RCU_MB_GROUP); >> - uatomic_set(&gp_futex, 0); >> - } >> - break; >> - } else { >> - switch (wait_loops) { >> - case RCU_QS_ACTIVE_ATTEMPTS: >> - wait_gp(); >> - break; /* only escape switch */ >> - case KICK_READER_LOOPS: >> - smp_mb_master(RCU_MB_GROUP); >> - wait_loops = 0; >> - break; /* only escape switch */ >> - default: >> - caa_cpu_relax(); >> - } >> - } >> + cmm_smp_mb(); >> #endif /* #else #ifndef HAS_INCOHERENT_CACHES */ >> } >> - /* put back the reader list in the registry */ >> - cds_list_splice(&qsreaders, ®istry); >> -} >> >> -void synchronize_rcu(void) >> -{ >> - mutex_lock(&rcu_gp_lock); >> - >> - if (cds_list_empty(®istry)) >> - goto out; >> + /* avoid short-time read site's competing and syscall overhead */ >> + usleep(2000); >> >> - /* All threads should read qparity before accessing data structure >> - * where new ptr points to. Must be done within rcu_gp_lock because it >> - * iterates on reader threads.*/ >> - /* Write new ptr before changing the qparity */ >> - smp_mb_master(RCU_MB_GROUP); >> + /* index->ctr > 0 */ >> + cds_list_for_each_entry_safe(index, tmp, ®istry, node) { >> + /* reader still running, we need to wait reader */ >> + uwl_lock(&index->wait, self); >> >> - /* >> - * Wait for previous parity to be empty of readers. >> - */ >> - update_counter_and_wait(); /* 0 -> 1, wait readers in parity 0 */ >> + /* >> + * use uwl_set_unlock() instead of uwl_unlock() >> + * to avoid a syscall overhead. There is no competing >> + * now, so it is safe. >> + */ >> + uwl_set_unlock(&index->wait); >> + (void)(uwl_unlock); >> >> - /* >> - * Must finish waiting for quiescent state for parity 0 before >> - * committing next rcu_gp_ctr update to memory. Failure to do so could >> - * result in the writer waiting forever while new readers are always >> - * accessing data (no progress). Enforce compiler-order of load >> - * rcu_reader ctr before store to rcu_gp_ctr. >> - */ >> - cmm_barrier(); >> + cds_list_move(&index->node, &qsreaders); >> + } >> >> +done: >> /* >> - * Adding a cmm_smp_mb() which is _not_ formally required, but makes the >> - * model easier to understand. It does not have a big performance impact >> - * anyway, given this is the write-side. >> + * Ensure RCU C.S. finish when we found the ctr==0 >> + * >> + * synchronize_rcu rcu_read_unlock(outmost) >> + * >> + * see ctr == 0 RCU read C.S. >> + * smp_mb_master()(2); smp_mb_slave()(2); >> + * free(ptr) ctr = 0 >> */ >> - cmm_smp_mb(); >> + smp_mb_master(RCU_MB_GROUP); /* (2) */ >> >> - /* >> - * Wait for previous parity to be empty of readers. >> - */ >> - update_counter_and_wait(); /* 1 -> 0, wait readers in parity 1 */ >> + /* put back the reader list in the registry */ >> + cds_list_splice(&qsreaders, ®istry); >> >> - /* Finish waiting for reader threads before letting the old ptr being >> - * freed. Must be done within rcu_gp_lock because it iterates on reader >> - * threads. */ >> - smp_mb_master(RCU_MB_GROUP); >> out: >> mutex_unlock(&rcu_gp_lock); >> } >> @@ -352,8 +287,8 @@ void rcu_read_unlock(void) >> void rcu_register_thread(void) >> { >> rcu_reader.pthread = pthread_self(); >> + rcu_reader.tid = syscall(SYS_gettid); >> assert(rcu_reader.need_mb == 0); >> - assert(!(rcu_reader.ctr & RCU_GP_CTR_NEST_MASK)); >> >> mutex_lock(&rcu_gp_lock); >> rcu_init(); /* In case gcc does not support constructor attribute */ >> diff --git a/urcu/static/urcu.h b/urcu/static/urcu.h >> index cfcb300..b5b1af5 100644 >> --- a/urcu/static/urcu.h >> +++ b/urcu/static/urcu.h >> @@ -96,13 +96,6 @@ extern "C" { >> #endif >> >> /* >> - * If a reader is really non-cooperative and refuses to commit its >> - * rcu_active_readers count to memory (there is no barrier in the reader >> - * per-se), kick it after a few loops waiting for it. >> - */ >> -#define KICK_READER_LOOPS 10000 >> - >> -/* >> * Active attempts to check for reader Q.S. before calling futex(). >> */ >> #define RCU_QS_ACTIVE_ATTEMPTS 100 >> @@ -209,59 +202,19 @@ static inline void smp_mb_slave(int group) >> } >> #endif >> >> -/* >> - * The trick here is that RCU_GP_CTR_PHASE must be a multiple of 8 so we >> can use >> - * a full 8-bits, 16-bits or 32-bits bitmask for the lower order bits. >> - */ >> -#define RCU_GP_COUNT (1UL << 0) >> -/* Use the amount of bits equal to half of the architecture long size */ >> -#define RCU_GP_CTR_PHASE (1UL << (sizeof(unsigned long) << 2)) >> -#define RCU_GP_CTR_NEST_MASK (RCU_GP_CTR_PHASE - 1) >> - >> -/* >> - * Global quiescent period counter with low-order bits unused. >> - * Using a int rather than a char to eliminate false register dependencies >> - * causing stalls on some architectures. >> - */ >> -extern unsigned long rcu_gp_ctr; >> - >> struct rcu_reader { >> /* Data used by both reader and synchronize_rcu() */ >> unsigned long ctr; >> + int32_t wait; >> char need_mb; >> /* Data used for registry */ >> struct cds_list_head node __attribute__((aligned(CAA_CACHE_LINE_SIZE))); >> pthread_t pthread; >> + pid_t tid; >> }; >> >> extern struct rcu_reader __thread rcu_reader; >> - >> -extern int32_t gp_futex; >> - >> -/* >> - * Wake-up waiting synchronize_rcu(). Called from many concurrent threads. >> - */ >> -static inline void wake_up_gp(void) >> -{ >> - if (unlikely(uatomic_read(&gp_futex) == -1)) { >> - uatomic_set(&gp_futex, 0); >> - futex_async(&gp_futex, FUTEX_WAKE, 1, >> - NULL, NULL, 0); >> - } >> -} >> - >> -static inline int rcu_gp_ongoing(unsigned long *ctr) >> -{ >> - unsigned long v; >> - >> - /* >> - * Make sure both tests below are done on the same version of *value >> - * to insure consistency. >> - */ >> - v = CMM_LOAD_SHARED(*ctr); >> - return (v & RCU_GP_CTR_NEST_MASK) && >> - ((v ^ rcu_gp_ctr) & RCU_GP_CTR_PHASE); >> -} >> +void __urcu_read_unlock_specail(void); >> >> static inline void _rcu_read_lock(void) >> { >> @@ -269,19 +222,28 @@ static inline void _rcu_read_lock(void) >> >> cmm_barrier(); /* Ensure the compiler does not reorder us with mutex */ >> tmp = rcu_reader.ctr; >> - /* >> - * rcu_gp_ctr is >> - * RCU_GP_COUNT | (~RCU_GP_CTR_PHASE or RCU_GP_CTR_PHASE) >> - */ >> - if (likely(!(tmp & RCU_GP_CTR_NEST_MASK))) { >> - _CMM_STORE_SHARED(rcu_reader.ctr, _CMM_LOAD_SHARED(rcu_gp_ctr)); >> + >> + if (!tmp) { >> +#ifndef RCU_MB >> + _CMM_STORE_SHARED(rcu_reader.ctr, 1); >> + smp_mb_slave(RCU_MB_GROUP); /* (1) */ >> +#else /* #ifndef RCU_MB */ >> /* >> - * Set active readers count for outermost nesting level before >> - * accessing the pointer. See smp_mb_master(). >> + * the same behavior as !RCU_MB(above), but atomically. >> + * >> + * When a inturrupt(signal etc..) happens between > > inturrupt -> interrupt > >> + * "_CMM_STORE_SHARED(rcu_reader.ctr, 1);" and >> smp_mb_slave()(1), >> + * inturrupt handler's RCU C.S will miss a smp_mb_slave() >> + * when it enters the RCU C.S. >> + * >> + * For !RCU_MB, cmm_barrier()(1) implies smp_mb_slave(), it is >> OK. >> + * For RCU_MB, we should use xchg() to do it atomically. > > Why is xchg() needed ? Isn't a normal aligned word-sized memory write > atomic ? My comments are bad. What I wanted to explain: For RCU_MB, the starting memory barrier will be missing for interrupt's RCU C.S. Thread1 Thread2 ctr = 1; interrupt rcu_read_lock() mb() is missing before here. xxx dereference the ptr. xxx synchronize_rcu() starts, see thread1's ctr==0 xxx synchronize_rcu() returns. xxx free ptr xxx use the freed ptr. rcu_read_unlock() interrupt return mb(). xchg() combines "ctr = 1" and "mb()" as an operation and execute it atomically, no interrupt can interrupt it between "ctr = 1" and "mb()". Thanks, Lai. > > Thanks, > > Mathieu > >> */ >> - smp_mb_slave(RCU_MB_GROUP); >> + uatomic_xchg(&rcu_reader.ctr, 1); >> +#endif /* #else #ifndef RCU_MB */ >> } else { >> - _CMM_STORE_SHARED(rcu_reader.ctr, tmp + RCU_GP_COUNT); >> + _CMM_STORE_SHARED(rcu_reader.ctr, tmp + 1); >> + cmm_barrier(); /* (1) */ >> } >> } >> >> @@ -289,21 +251,35 @@ static inline void _rcu_read_unlock(void) >> { >> unsigned long tmp; >> >> + cmm_barrier(); /* (2) */ >> tmp = rcu_reader.ctr; >> - /* >> - * Finish using rcu before decrementing the pointer. >> - * See smp_mb_master(). >> - */ >> - if (likely((tmp & RCU_GP_CTR_NEST_MASK) == RCU_GP_COUNT)) { >> - smp_mb_slave(RCU_MB_GROUP); >> - _CMM_STORE_SHARED(rcu_reader.ctr, rcu_reader.ctr - >> RCU_GP_COUNT); >> - /* write rcu_reader.ctr before read futex */ >> - smp_mb_slave(RCU_MB_GROUP); >> - wake_up_gp(); >> + >> + if (tmp == 1) { >> +#ifndef RCU_MB >> + smp_mb_slave(RCU_MB_GROUP); /* (2) */ >> + _CMM_STORE_SHARED(rcu_reader.ctr, 0); >> + smp_mb_slave(RCU_MB_GROUP); /* (3) */ >> +#else /* #ifndef RCU_MB */ >> + /* >> + * the same behavior as !RCU_MB(above), but atomically. >> + * >> + * When a inturrupt(signal etc..) happens between >> + * smp_mb_slave()(2) and "_CMM_STORE_SHARED(rcu_reader.ctr, >> 0);", >> + * inturrupt handler's RCU C.S will miss a smp_mb_slave() >> + * when it exits the RCU C.S. >> + * >> + * For !RCU_MB, cmm_barrier()(2) implies smp_mb_slave(), it is >> OK. >> + * For RCU_MB, we should use xchg() to do it atomically. >> + */ >> + uatomic_xchg(&rcu_reader.ctr, 0); >> +#endif /* #else #ifndef RCU_MB */ >> + >> + if (unlikely(_CMM_LOAD_SHARED(rcu_reader.wait))) >> + __urcu_read_unlock_specail(); >> } else { >> - _CMM_STORE_SHARED(rcu_reader.ctr, rcu_reader.ctr - >> RCU_GP_COUNT); >> + _CMM_STORE_SHARED(rcu_reader.ctr, tmp - 1); >> + cmm_barrier(); >> } >> - cmm_barrier(); /* Ensure the compiler does not reorder us with mutex */ >> } >> >> #ifdef __cplusplus >> -- >> 1.7.4.4 >> > _______________________________________________ ltt-dev mailing list [email protected] http://lists.casi.polymtl.ca/cgi-bin/mailman/listinfo/ltt-dev
