On 08/17/2011 03:20 AM, Paul E. McKenney wrote:
> On Tue, Aug 16, 2011 at 03:58:07PM +0800, Lai Jiangshan wrote:
>> synchronize_rcu() find out ongoing readers and wait for them
> 
> Cute!
> 
> But some questions and spelling nits below.
> 
>                                                       Thanx, Paul
> 
>> Signed-off-by: Lai Jiangshan <[email protected]>
>> ---
>>  urcu.c             |   49 +++++++++++++++++++++++++++++++++++++++++++++++++
>>  urcu/static/urcu.h |   42 ++++++++++++++++++++++++++++++++++++++++++
>>  2 files changed, 91 insertions(+), 0 deletions(-)
>>
>> diff --git a/urcu.c b/urcu.c
>> index bd54467..c13be67 100644
>> --- a/urcu.c
>> +++ b/urcu.c
>> @@ -419,6 +419,53 @@ static void 
>> urcu_sync_unlock_if_not_proxy_unlocked(int32_t *lock, int32_t self)
>>              urcu_sync_slow_unlock(lock);
>>  }
>>
>> +/* also implies mb() */
>> +void __urcu_read_unlock_specail(void)
> 
> s/__urcu_read_unlock_specail/__urcu_read_unlock_special/, please.
> 
>> +{
>> +    urcu_sync_unlock_if_not_proxy_unlocked(&rcu_reader.sync,
>> +                    rcu_reader.tid);
>> +}
>> +
>> +/*
>> + * synchronize_rcu          rcu_read_unlock(outmost)
>> + *
>> + *
>> + * sync = tid;                      ctr = 0;
>> + * smp_mb_master();         smp_mb_slave();
> 

Sorry, I use this comments to explain why synchronize_rcu()
will not sleep endless when synchronize_rcu() find ctr != 0,

> But smp_mb_slave() expands to 'asm volatile("" : : : "memory")'
> for RCU_SIGNAL, which would open up a number of vulnerabilities.
> 
> And yes, smp_mb_master() does a memory barrier on all CPUs in
> the RCU_SIGNAL case, but that only guarantees that if there was
> a reader pre-dating the beginning of the grace period, that if
> that reader accessed any pre-grace-period state, we also see
> that reader.  Misordering of the rcu_read_unlock() with the
> prior critical section, and this is why there is an smp_mb_master()
> prior to exit from synchronize_rcu().
> 
> So for this to work, I believe that you need an smp_mb_master()
> between checking the index->ctr and doing the urcu_sync_proxy_unlock().

Calling urcu_sync_proxy_unlock() earlier is OK.
__urcu_read_unlock_special() do nothing when it found the lock is proxy 
unlocked.

If we find ctr == 0:
I did add a smp_mb_master() after we find ctr == 0(also after 
urcu_sync_proxy_unlock()).

> 
> Or am I missing something that restricts URCU priority boosting
> to RCU_MB (where it looks like it might work)?  Or missing some
> extra synchronization in there somewhere?

initial: sync=0,ctr>0

sync = tid;                     ctr = 0
smp_mb_master();                smp_mb_slave();
local_ctr = ctr;                local_sync = sync;
        assert(local_ctr==0 || local_sync == tid);

So if we find the ctr != 0(local_ctr!=0):
the reader will call __urcu_read_unlock_special(),
synchronize_rcu() can be woke up and will not sleep endless.

> 
>> + * test ctr and wait;               test sync and wakeup;
>> + */
>> +
>> +void synchronize_rcu(void)
>> +{
>> +    struct rcu_reader *index;
>> +    int32_t self = syscall(SYS_gettid);
>> +
>> +    mutex_lock(&rcu_gp_lock);
>> +
>> +    if (cds_list_empty(&registry))
>> +            goto out;
>> +
>> +    cds_list_for_each_entry(index, &registry, node)
>> +            urcu_sync_proxy_lock(&index->sync, index->tid);
>> +
>> +    smp_mb_master(RCU_MB_GROUP);
>> +
>> +    cds_list_for_each_entry(index, &registry, node) {
>> +            if (_CMM_LOAD_SHARED(index->ctr) == 0) {
>> +                    urcu_sync_proxy_unlock(&index->sync);
>> +            } else {
>> +                    /* reader still running, we need to wait reader */
>> +                    urcu_sync_lock(&index->sync, self);
>> +                    urcu_sync_unlock(&index->sync, self);
>> +            }
>> +    }
>> +
>> +    /* ensure rcu_read_unlock() finish when we found the ctr==0 */
>> +    smp_mb_master(RCU_MB_GROUP); /* ensure rcu_read_unlock() finish */

This smp_mb_master() is needed after we find ctr == 0.

>> +
>> +out:
>> +    mutex_unlock(&rcu_gp_lock);
>> +}
>>  #endif /* #else #ifndef URCU_WAIT_READER */
>>
>>  /*
>> @@ -437,7 +484,9 @@ void rcu_read_unlock(void)
>>
>>  void rcu_register_thread(void)
>>  {
>> +    rcu_reader.tid = syscall(SYS_gettid);
>>      rcu_reader.pthread = pthread_self();
>> +    rcu_reader.sync = 0;
>>      assert(rcu_reader.need_mb == 0);
>>      assert(!(rcu_reader.ctr & RCU_GP_CTR_NEST_MASK));
>>
>> diff --git a/urcu/static/urcu.h b/urcu/static/urcu.h
>> index 32d1af8..0941fd2 100644
>> --- a/urcu/static/urcu.h
>> +++ b/urcu/static/urcu.h
>> @@ -221,9 +221,11 @@ static inline void smp_mb_slave(int group)
>>  struct rcu_reader {
>>      /* Data used by both reader and synchronize_rcu() */
>>      unsigned long ctr;
>> +    int32_t sync;
>>      char need_mb;
>>      /* Data used for registry */
>>      struct cds_list_head node __attribute__((aligned(CAA_CACHE_LINE_SIZE)));
>> +    pid_t tid;
>>      pthread_t pthread;
>>  };
>>
>> @@ -313,6 +315,46 @@ static inline int32_t urcu_sync_lock_onwer(int32_t 
>> *lock)
>>      return _CMM_LOAD_SHARED(*lock) & ~FUTEX_WAITERS;
>>  }
>>
>> +void __urcu_read_unlock_specail(void);
>> +
>> +static inline void _rcu_read_lock(void)
>> +{
>> +    unsigned long tmp;
>> +
>> +    cmm_barrier();
>> +    tmp = rcu_reader.ctr;
>> +
>> +    if (!tmp) {
>> +            _CMM_STORE_SHARED(rcu_reader.ctr, 1);
>> +            smp_mb_slave(RCU_MB_GROUP);
>> +    } else {
>> +            _CMM_STORE_SHARED(rcu_reader.ctr, tmp + 1);
>> +            cmm_barrier();
>> +    }
>> +}
>> +
>> +static inline void _rcu_read_unlock(void)
>> +{
>> +    unsigned long tmp;
>> +
>> +    cmm_barrier();
>> +    tmp = rcu_reader.ctr;
>> +
>> +    if (tmp == 1) {
>> +            smp_mb_slave(RCU_MB_GROUP);
>> +
>> +            rcu_reader.ctr = 0;
>> +            smp_mb_slave(RCU_MB_GROUP);
>> +
>> +            if (unlikely(urcu_sync_lock_onwer(&rcu_reader.sync)
>> +                            == rcu_reader.tid))
>> +                    __urcu_read_unlock_specail();
>> +    } else {
>> +            _CMM_STORE_SHARED(rcu_reader.ctr, tmp - 1);
>> +            cmm_barrier();
>> +    }
>> +}
>> +
>>  #endif /* #else #ifndef URCU_WAIT_READER */
>>
>>  #ifdef __cplusplus
>> -- 
>> 1.7.4.4
>>
> 


_______________________________________________
ltt-dev mailing list
[email protected]
http://lists.casi.polymtl.ca/cgi-bin/mailman/listinfo/ltt-dev

Reply via email to