On Tue, Aug 16, 2011 at 03:58:07PM +0800, Lai Jiangshan wrote:
> synchronize_rcu() find out ongoing readers and wait for them

Cute!

But some questions and spelling nits below.

                                                        Thanx, Paul

> Signed-off-by: Lai Jiangshan <[email protected]>
> ---
>  urcu.c             |   49 +++++++++++++++++++++++++++++++++++++++++++++++++
>  urcu/static/urcu.h |   42 ++++++++++++++++++++++++++++++++++++++++++
>  2 files changed, 91 insertions(+), 0 deletions(-)
> 
> diff --git a/urcu.c b/urcu.c
> index bd54467..c13be67 100644
> --- a/urcu.c
> +++ b/urcu.c
> @@ -419,6 +419,53 @@ static void 
> urcu_sync_unlock_if_not_proxy_unlocked(int32_t *lock, int32_t self)
>               urcu_sync_slow_unlock(lock);
>  }
> 
> +/* also implies mb() */
> +void __urcu_read_unlock_specail(void)

s/__urcu_read_unlock_specail/__urcu_read_unlock_special/, please.

> +{
> +     urcu_sync_unlock_if_not_proxy_unlocked(&rcu_reader.sync,
> +                     rcu_reader.tid);
> +}
> +
> +/*
> + * synchronize_rcu           rcu_read_unlock(outmost)
> + *
> + *
> + * sync = tid;                       ctr = 0;
> + * smp_mb_master();          smp_mb_slave();

But smp_mb_slave() expands to 'asm volatile("" : : : "memory")'
for RCU_SIGNAL, which would open up a number of vulnerabilities.

And yes, smp_mb_master() does a memory barrier on all CPUs in
the RCU_SIGNAL case, but that only guarantees that if there was
a reader pre-dating the beginning of the grace period, that if
that reader accessed any pre-grace-period state, we also see
that reader.  Misordering of the rcu_read_unlock() with the
prior critical section, and this is why there is an smp_mb_master()
prior to exit from synchronize_rcu().

So for this to work, I believe that you need an smp_mb_master()
between checking the index->ctr and doing the urcu_sync_proxy_unlock().

Or am I missing something that restricts URCU priority boosting
to RCU_MB (where it looks like it might work)?  Or missing some
extra synchronization in there somewhere?

> + * test ctr and wait;                test sync and wakeup;
> + */
> +
> +void synchronize_rcu(void)
> +{
> +     struct rcu_reader *index;
> +     int32_t self = syscall(SYS_gettid);
> +
> +     mutex_lock(&rcu_gp_lock);
> +
> +     if (cds_list_empty(&registry))
> +             goto out;
> +
> +     cds_list_for_each_entry(index, &registry, node)
> +             urcu_sync_proxy_lock(&index->sync, index->tid);
> +
> +     smp_mb_master(RCU_MB_GROUP);
> +
> +     cds_list_for_each_entry(index, &registry, node) {
> +             if (_CMM_LOAD_SHARED(index->ctr) == 0) {
> +                     urcu_sync_proxy_unlock(&index->sync);
> +             } else {
> +                     /* reader still running, we need to wait reader */
> +                     urcu_sync_lock(&index->sync, self);
> +                     urcu_sync_unlock(&index->sync, self);
> +             }
> +     }
> +
> +     /* ensure rcu_read_unlock() finish when we found the ctr==0 */
> +     smp_mb_master(RCU_MB_GROUP); /* ensure rcu_read_unlock() finish */
> +
> +out:
> +     mutex_unlock(&rcu_gp_lock);
> +}
>  #endif /* #else #ifndef URCU_WAIT_READER */
> 
>  /*
> @@ -437,7 +484,9 @@ void rcu_read_unlock(void)
> 
>  void rcu_register_thread(void)
>  {
> +     rcu_reader.tid = syscall(SYS_gettid);
>       rcu_reader.pthread = pthread_self();
> +     rcu_reader.sync = 0;
>       assert(rcu_reader.need_mb == 0);
>       assert(!(rcu_reader.ctr & RCU_GP_CTR_NEST_MASK));
> 
> diff --git a/urcu/static/urcu.h b/urcu/static/urcu.h
> index 32d1af8..0941fd2 100644
> --- a/urcu/static/urcu.h
> +++ b/urcu/static/urcu.h
> @@ -221,9 +221,11 @@ static inline void smp_mb_slave(int group)
>  struct rcu_reader {
>       /* Data used by both reader and synchronize_rcu() */
>       unsigned long ctr;
> +     int32_t sync;
>       char need_mb;
>       /* Data used for registry */
>       struct cds_list_head node __attribute__((aligned(CAA_CACHE_LINE_SIZE)));
> +     pid_t tid;
>       pthread_t pthread;
>  };
> 
> @@ -313,6 +315,46 @@ static inline int32_t urcu_sync_lock_onwer(int32_t *lock)
>       return _CMM_LOAD_SHARED(*lock) & ~FUTEX_WAITERS;
>  }
> 
> +void __urcu_read_unlock_specail(void);
> +
> +static inline void _rcu_read_lock(void)
> +{
> +     unsigned long tmp;
> +
> +     cmm_barrier();
> +     tmp = rcu_reader.ctr;
> +
> +     if (!tmp) {
> +             _CMM_STORE_SHARED(rcu_reader.ctr, 1);
> +             smp_mb_slave(RCU_MB_GROUP);
> +     } else {
> +             _CMM_STORE_SHARED(rcu_reader.ctr, tmp + 1);
> +             cmm_barrier();
> +     }
> +}
> +
> +static inline void _rcu_read_unlock(void)
> +{
> +     unsigned long tmp;
> +
> +     cmm_barrier();
> +     tmp = rcu_reader.ctr;
> +
> +     if (tmp == 1) {
> +             smp_mb_slave(RCU_MB_GROUP);
> +
> +             rcu_reader.ctr = 0;
> +             smp_mb_slave(RCU_MB_GROUP);
> +
> +             if (unlikely(urcu_sync_lock_onwer(&rcu_reader.sync)
> +                             == rcu_reader.tid))
> +                     __urcu_read_unlock_specail();
> +     } else {
> +             _CMM_STORE_SHARED(rcu_reader.ctr, tmp - 1);
> +             cmm_barrier();
> +     }
> +}
> +
>  #endif /* #else #ifndef URCU_WAIT_READER */
> 
>  #ifdef __cplusplus
> -- 
> 1.7.4.4
> 

_______________________________________________
ltt-dev mailing list
[email protected]
http://lists.casi.polymtl.ca/cgi-bin/mailman/listinfo/ltt-dev

Reply via email to