Use userspace lock based on futex with proxies APIs for waiting and boosting. Simpler implmentation.
Signed-off-by: Lai Jiangshan <[email protected]> --- urcu-qsbr.c | 207 +++++++++++------------------------------------ urcu/static/urcu-qsbr.h | 63 ++++----------- 2 files changed, 65 insertions(+), 205 deletions(-) diff --git a/urcu-qsbr.c b/urcu-qsbr.c index 87cf41d..c9b1d87 100644 --- a/urcu-qsbr.c +++ b/urcu-qsbr.c @@ -40,18 +40,12 @@ #include "urcu/static/urcu-qsbr.h" /* Do not #define _LGPL_SOURCE to ensure we can emit the wrapper symbols */ #include "urcu-qsbr.h" +#include "urcu-wait-lock-impl.h" void __attribute__((destructor)) rcu_exit(void); static pthread_mutex_t rcu_gp_lock = PTHREAD_MUTEX_INITIALIZER; -int32_t gp_futex; - -/* - * Global grace period counter. - */ -unsigned long rcu_gp_ctr = RCU_GP_ONLINE; - /* * Written to only by each individual reader. Read by both the reader and the * writers. @@ -98,185 +92,82 @@ static void mutex_unlock(pthread_mutex_t *mutex) } } -/* - * synchronize_rcu() waiting. Single thread. - */ -static void wait_gp(void) +void __urcu_read_unlock_specail(void) { - /* Read reader_gp before read futex */ - cmm_smp_rmb(); - if (uatomic_read(&gp_futex) == -1) - futex_noasync(&gp_futex, FUTEX_WAIT, -1, - NULL, NULL, 0); + if (uwl_onwer(&rcu_reader.wait) == rcu_reader.tid) { + uwl_unlock_if_not_proxy_unlocked(&rcu_reader.wait, + rcu_reader.tid); + } } -static void update_counter_and_wait(void) +void synchronize_rcu(void) { CDS_LIST_HEAD(qsreaders); - int wait_loops = 0; + int wait_loops; struct rcu_reader *index, *tmp; + pid_t self = syscall(SYS_gettid); -#if (CAA_BITS_PER_LONG < 64) - /* Switch parity: 0 -> 1, 1 -> 0 */ - CMM_STORE_SHARED(rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR); -#else /* !(CAA_BITS_PER_LONG < 64) */ - /* Increment current G.P. */ - CMM_STORE_SHARED(rcu_gp_ctr, rcu_gp_ctr + RCU_GP_CTR); -#endif /* !(CAA_BITS_PER_LONG < 64) */ + mutex_lock(&rcu_gp_lock); - /* - * Must commit rcu_gp_ctr update to memory before waiting for - * quiescent state. Failure to do so could result in the writer - * waiting forever while new readers are always accessing data - * (no progress). Enforce compiler-order of store to rcu_gp_ctr - * before load rcu_reader ctr. - */ - cmm_barrier(); + if (cds_list_empty(®istry)) + goto out; - /* - * Adding a cmm_smp_mb() which is _not_ formally required, but makes the - * model easier to understand. It does not have a big performance impact - * anyway, given this is the write-side. - */ - cmm_smp_mb(); + cds_list_for_each_entry(index, ®istry, node) + uwl_proxy_lock(&index->wait, index->tid); - /* - * Wait for each thread rcu_reader_qs_gp count to become 0. - */ - for (;;) { - wait_loops++; - if (wait_loops == RCU_QS_ACTIVE_ATTEMPTS) { - uatomic_dec(&gp_futex); - /* Write futex before read reader_gp */ - cmm_smp_mb(); - } + cmm_smp_mb(); /* write rcu_reader.wait before read offline */ + + cds_list_for_each_entry_safe(index, tmp, ®istry, node) { + if (_CMM_LOAD_SHARED(index->offline) == 1 || + index == &rcu_reader) { + uwl_proxy_unlock(&index->wait); + cds_list_move(&index->node, &qsreaders); + } + } + for (wait_loops = 0; wait_loops < RCU_QS_ACTIVE_ATTEMPTS; wait_loops++) { cds_list_for_each_entry_safe(index, tmp, ®istry, node) { - if (!rcu_gp_ongoing(&index->ctr)) + if (_CMM_LOAD_SHARED(index->wait) == 0) cds_list_move(&index->node, &qsreaders); } if (cds_list_empty(®istry)) { - if (wait_loops == RCU_QS_ACTIVE_ATTEMPTS) { - /* Read reader_gp before write futex */ - cmm_smp_mb(); - uatomic_set(&gp_futex, 0); - } - break; - } else { - if (wait_loops == RCU_QS_ACTIVE_ATTEMPTS) { - wait_gp(); - } else { + cmm_smp_mb(); + goto done; + } + #ifndef HAS_INCOHERENT_CACHES - caa_cpu_relax(); + caa_cpu_relax(); #else /* #ifndef HAS_INCOHERENT_CACHES */ - cmm_smp_mb(); + cmm_smp_mb(); #endif /* #else #ifndef HAS_INCOHERENT_CACHES */ - } - } } - /* put back the reader list in the registry */ - cds_list_splice(&qsreaders, ®istry); -} -/* - * Using a two-subphases algorithm for architectures with smaller than 64-bit - * long-size to ensure we do not encounter an overflow bug. - */ + /* avoid short-time read site's competing and syscall overhead */ + usleep(2000); -#if (CAA_BITS_PER_LONG < 64) -void synchronize_rcu(void) -{ - unsigned long was_online; + cds_list_for_each_entry_safe(index, tmp, ®istry, node) { + /* reader still running, we need to wait reader */ + uwl_lock(&index->wait, self); - was_online = rcu_reader.ctr; + /* + * use uwl_set_unlock() instead of uwl_unlock() + * to avoid a syscall overhead. There is no competing + * now, so it is safe. + */ + uwl_set_unlock(&index->wait); + (void)(uwl_unlock); - /* All threads should read qparity before accessing data structure - * where new ptr points to. - */ - /* Write new ptr before changing the qparity */ - cmm_smp_mb(); - - /* - * Mark the writer thread offline to make sure we don't wait for - * our own quiescent state. This allows using synchronize_rcu() - * in threads registered as readers. - */ - if (was_online) - CMM_STORE_SHARED(rcu_reader.ctr, 0); - - mutex_lock(&rcu_gp_lock); - - if (cds_list_empty(®istry)) - goto out; - - /* - * Wait for previous parity to be empty of readers. - */ - update_counter_and_wait(); /* 0 -> 1, wait readers in parity 0 */ - - /* - * Must finish waiting for quiescent state for parity 0 before - * committing next rcu_gp_ctr update to memory. Failure to - * do so could result in the writer waiting forever while new - * readers are always accessing data (no progress). Enforce - * compiler-order of load rcu_reader ctr before store to - * rcu_gp_ctr. - */ - cmm_barrier(); - - /* - * Adding a cmm_smp_mb() which is _not_ formally required, but makes the - * model easier to understand. It does not have a big performance impact - * anyway, given this is the write-side. - */ - cmm_smp_mb(); - - /* - * Wait for previous parity to be empty of readers. - */ - update_counter_and_wait(); /* 1 -> 0, wait readers in parity 1 */ -out: - mutex_unlock(&rcu_gp_lock); - - /* - * Finish waiting for reader threads before letting the old ptr being - * freed. - */ - if (was_online) - _CMM_STORE_SHARED(rcu_reader.ctr, - CMM_LOAD_SHARED(rcu_gp_ctr)); - cmm_smp_mb(); -} -#else /* !(CAA_BITS_PER_LONG < 64) */ -void synchronize_rcu(void) -{ - unsigned long was_online; - - was_online = rcu_reader.ctr; + cds_list_move(&index->node, &qsreaders); + } - /* - * Mark the writer thread offline to make sure we don't wait for - * our own quiescent state. This allows using synchronize_rcu() - * in threads registered as readers. - */ - cmm_smp_mb(); - if (was_online) - CMM_STORE_SHARED(rcu_reader.ctr, 0); +done: + /* put back the reader list in the registry */ + cds_list_splice(&qsreaders, ®istry); - mutex_lock(&rcu_gp_lock); - if (cds_list_empty(®istry)) - goto out; - update_counter_and_wait(); out: mutex_unlock(&rcu_gp_lock); - - if (was_online) - _CMM_STORE_SHARED(rcu_reader.ctr, - CMM_LOAD_SHARED(rcu_gp_ctr)); - cmm_smp_mb(); } -#endif /* !(CAA_BITS_PER_LONG < 64) */ /* * library wrappers to be used by non-LGPL compatible source code. @@ -309,8 +198,8 @@ void rcu_thread_online(void) void rcu_register_thread(void) { - rcu_reader.tid = pthread_self(); - assert(rcu_reader.ctr == 0); + rcu_reader.tid = syscall(SYS_gettid); + assert(rcu_reader.wait == 0); mutex_lock(&rcu_gp_lock); cds_list_add(&rcu_reader.node, ®istry); diff --git a/urcu/static/urcu-qsbr.h b/urcu/static/urcu-qsbr.h index c46a7be..e389a5b 100644 --- a/urcu/static/urcu-qsbr.h +++ b/urcu/static/urcu-qsbr.h @@ -56,13 +56,6 @@ extern "C" { */ /* - * If a reader is really non-cooperative and refuses to commit its - * rcu_reader.ctr count to memory (there is no barrier in the reader - * per-se), kick it after a few loops waiting for it. - */ -#define KICK_READER_LOOPS 10000 - -/* * Active attempts to check for reader Q.S. before calling futex(). */ #define RCU_QS_ACTIVE_ATTEMPTS 100 @@ -124,76 +117,54 @@ static inline void debug_yield_init(void) #define RCU_GP_ONLINE (1UL << 0) #define RCU_GP_CTR (1UL << 1) -/* - * Global quiescent period counter with low-order bits unused. - * Using a int rather than a char to eliminate false register dependencies - * causing stalls on some architectures. - */ -extern unsigned long rcu_gp_ctr; - struct rcu_reader { /* Data used by both reader and synchronize_rcu() */ - unsigned long ctr; + int32_t wait; + int offline; /* Data used for registry */ struct cds_list_head node __attribute__((aligned(CAA_CACHE_LINE_SIZE))); - pthread_t tid; + pid_t tid; }; extern struct rcu_reader __thread rcu_reader; -extern int32_t gp_futex; - -/* - * Wake-up waiting synchronize_rcu(). Called from many concurrent threads. - */ -static inline void wake_up_gp(void) -{ - if (unlikely(uatomic_read(&gp_futex) == -1)) { - uatomic_set(&gp_futex, 0); - futex_noasync(&gp_futex, FUTEX_WAKE, 1, - NULL, NULL, 0); - } -} - -static inline int rcu_gp_ongoing(unsigned long *ctr) -{ - unsigned long v; - - v = CMM_LOAD_SHARED(*ctr); - return v && (v != rcu_gp_ctr); -} - static inline void _rcu_read_lock(void) { - rcu_assert(rcu_reader.ctr); } static inline void _rcu_read_unlock(void) { } +void __urcu_read_unlock_specail(void); + static inline void _rcu_quiescent_state(void) { cmm_smp_mb(); - _CMM_STORE_SHARED(rcu_reader.ctr, _CMM_LOAD_SHARED(rcu_gp_ctr)); - cmm_smp_mb(); /* write rcu_reader.ctr before read futex */ - wake_up_gp(); + + if (unlikely(_CMM_LOAD_SHARED(rcu_reader.wait))) + __urcu_read_unlock_specail(); + cmm_smp_mb(); } static inline void _rcu_thread_offline(void) { cmm_smp_mb(); - CMM_STORE_SHARED(rcu_reader.ctr, 0); - cmm_smp_mb(); /* write rcu_reader.ctr before read futex */ - wake_up_gp(); + CMM_STORE_SHARED(rcu_reader.offline, 1); + + cmm_smp_mb(); /* write rcu_reader.offline before read lock */ + + if (unlikely(_CMM_LOAD_SHARED(rcu_reader.wait))) + __urcu_read_unlock_specail(); + cmm_barrier(); /* Ensure the compiler does not reorder us with mutex */ } static inline void _rcu_thread_online(void) { cmm_barrier(); /* Ensure the compiler does not reorder us with mutex */ - _CMM_STORE_SHARED(rcu_reader.ctr, CMM_LOAD_SHARED(rcu_gp_ctr)); + _CMM_STORE_SHARED(rcu_reader.offline, 0); cmm_smp_mb(); } -- 1.7.4.4 _______________________________________________ ltt-dev mailing list [email protected] http://lists.casi.polymtl.ca/cgi-bin/mailman/listinfo/ltt-dev
