Hi Wainman,

On Sun, Dec 25, 2016 at 03:26:01PM -0500, Waiman Long wrote:
> A number of cmpxchg calls in qspinlock_paravirt.h were replaced by more
> relaxed versions to improve performance on architectures that use LL/SC.
> 
> All the locking related cmpxchg's are replaced with the _acquire
> variants:
>  - pv_queued_spin_steal_lock()
>  - trylock_clear_pending()
> 
> The cmpxchg's related to hashing are replaced by either by the _release
> or the _relaxed variants. See the inline comment for details.
> 
> Signed-off-by: Waiman Long <[email protected]>
> 
>  v1->v2:
>   - Add comments in changelog and code for the rationale of the change.
> 
> ---
>  kernel/locking/qspinlock_paravirt.h | 50 
> ++++++++++++++++++++++++-------------
>  1 file changed, 33 insertions(+), 17 deletions(-)
> 
> diff --git a/kernel/locking/qspinlock_paravirt.h 
> b/kernel/locking/qspinlock_paravirt.h
> index e3b5520..c31d1ab 100644
> --- a/kernel/locking/qspinlock_paravirt.h
> +++ b/kernel/locking/qspinlock_paravirt.h
> @@ -72,7 +72,7 @@ static inline bool pv_queued_spin_steal_lock(struct 
> qspinlock *lock)
>       struct __qspinlock *l = (void *)lock;
>  
>       if (!(atomic_read(&lock->val) & _Q_LOCKED_PENDING_MASK) &&
> -         (cmpxchg(&l->locked, 0, _Q_LOCKED_VAL) == 0)) {
> +         (cmpxchg_acquire(&l->locked, 0, _Q_LOCKED_VAL) == 0)) {
>               qstat_inc(qstat_pv_lock_stealing, true);
>               return true;
>       }
> @@ -101,16 +101,16 @@ static __always_inline void clear_pending(struct 
> qspinlock *lock)
>  
>  /*
>   * The pending bit check in pv_queued_spin_steal_lock() isn't a memory
> - * barrier. Therefore, an atomic cmpxchg() is used to acquire the lock
> - * just to be sure that it will get it.
> + * barrier. Therefore, an atomic cmpxchg_acquire() is used to acquire the
> + * lock to provide the proper memory barrier.
>   */
>  static __always_inline int trylock_clear_pending(struct qspinlock *lock)
>  {
>       struct __qspinlock *l = (void *)lock;
>  
>       return !READ_ONCE(l->locked) &&
> -            (cmpxchg(&l->locked_pending, _Q_PENDING_VAL, _Q_LOCKED_VAL)
> -                     == _Q_PENDING_VAL);
> +            (cmpxchg_acquire(&l->locked_pending, _Q_PENDING_VAL,
> +                             _Q_LOCKED_VAL) == _Q_PENDING_VAL);
>  }
>  #else /* _Q_PENDING_BITS == 8 */
>  static __always_inline void set_pending(struct qspinlock *lock)
> @@ -138,7 +138,7 @@ static __always_inline int trylock_clear_pending(struct 
> qspinlock *lock)
>                */
>               old = val;
>               new = (val & ~_Q_PENDING_MASK) | _Q_LOCKED_VAL;
> -             val = atomic_cmpxchg(&lock->val, old, new);
> +             val = atomic_cmpxchg_acquire(&lock->val, old, new);
>  
>               if (val == old)
>                       return 1;
> @@ -209,9 +209,15 @@ static struct qspinlock **pv_hash(struct qspinlock 
> *lock, struct pv_node *node)
>       struct pv_hash_entry *he;
>       int hopcnt = 0;
>  
> +     /*
> +      * Synchronizing with the node state variable will control who does
> +      * the hashing - the lock holder or lock waiter. The control
> +      * dependency will ensure that node value is written after the lock
> +      * value. So we don't need other ordering guarantee.
> +      */

By this comment, you mean that
        
        cmpxchg_relaxed(&he->lock, NULL, lock);
          r1 = ll he->lock;
          <compare part>
          sc he->lock, lock // successed

        if (r1)
                WRITE_ONCE(he->node, node);


the sc and WRITE_ONCE() can not be reordered because of the control
dependency? I dont think this is true. Yes the sc must execute before
the WRITE_ONCE(), but the memory/cache effects may be reordered. IOW,
the following may happen


        CPU 0                   CPU 1
        ===================     =======================
        {x = 0, y = 0}          if (!cmpxchg_relaxed(&y, 0, 1))
                                        WRITE_ONCE(x, 1);
        r1 = READ_ONCE(x);

        smp_rmb();

        r2 = READ_ONCE(y);

The following result is possible:

        y = 1 && r1 = 1 && r2 = 0

Or I'm missing your point here? ;-) 

Regards,
Boqun

>       for_each_hash_entry(he, offset, hash) {
>               hopcnt++;
> -             if (!cmpxchg(&he->lock, NULL, lock)) {
> +             if (!cmpxchg_relaxed(&he->lock, NULL, lock)) {
>                       WRITE_ONCE(he->node, node);
>                       qstat_hop(hopcnt);
>                       return &he->lock;
> @@ -309,7 +315,7 @@ static void pv_wait_node(struct mcs_spinlock *node, 
> struct mcs_spinlock *prev)
>                *     MB                             MB
>                * [L] pn->locked               [RmW] pn->state = vcpu_hashed
>                *
> -              * Matches the cmpxchg() from pv_kick_node().
> +              * Matches the cmpxchg_release() from pv_kick_node().
>                */
>               smp_store_mb(pn->state, vcpu_halted);
>  
> @@ -323,8 +329,14 @@ static void pv_wait_node(struct mcs_spinlock *node, 
> struct mcs_spinlock *prev)
>                * If pv_kick_node() changed us to vcpu_hashed, retain that
>                * value so that pv_wait_head_or_lock() knows to not also try
>                * to hash this lock.
> +              *
> +              * The smp_store_mb() and control dependency above will ensure
> +              * that state change won't happen before that. Synchronizing
> +              * with pv_kick_node() wrt hashing by this waiter or by the
> +              * lock holder is done solely by the state variable. There is
> +              * no other ordering requirement.
>                */
> -             cmpxchg(&pn->state, vcpu_halted, vcpu_running);
> +             cmpxchg_relaxed(&pn->state, vcpu_halted, vcpu_running);
>  
>               /*
>                * If the locked flag is still not set after wakeup, it is a
> @@ -360,9 +372,12 @@ static void pv_kick_node(struct qspinlock *lock, struct 
> mcs_spinlock *node)
>        * pv_wait_node(). If OTOH this fails, the vCPU was running and will
>        * observe its next->locked value and advance itself.
>        *
> -      * Matches with smp_store_mb() and cmpxchg() in pv_wait_node()
> +      * Matches with smp_store_mb() and cmpxchg_relaxed() in pv_wait_node().
> +      * A release barrier is used here to ensure that node->locked is
> +      * always set before changing the state. See comment in pv_wait_node().
>        */
> -     if (cmpxchg(&pn->state, vcpu_halted, vcpu_hashed) != vcpu_halted)
> +     if (cmpxchg_release(&pn->state, vcpu_halted, vcpu_hashed)
> +                     != vcpu_halted)
>               return;
>  
>       /*
> @@ -461,8 +476,8 @@ static void pv_kick_node(struct qspinlock *lock, struct 
> mcs_spinlock *node)
>       }
>  
>       /*
> -      * The cmpxchg() or xchg() call before coming here provides the
> -      * acquire semantics for locking. The dummy ORing of _Q_LOCKED_VAL
> +      * The cmpxchg_acquire() or xchg() call before coming here provides
> +      * the acquire semantics for locking. The dummy ORing of _Q_LOCKED_VAL
>        * here is to indicate to the compiler that the value will always
>        * be nozero to enable better code optimization.
>        */
> @@ -488,11 +503,12 @@ static void pv_kick_node(struct qspinlock *lock, struct 
> mcs_spinlock *node)
>       }
>  
>       /*
> -      * A failed cmpxchg doesn't provide any memory-ordering guarantees,
> -      * so we need a barrier to order the read of the node data in
> -      * pv_unhash *after* we've read the lock being _Q_SLOW_VAL.
> +      * A failed cmpxchg_release doesn't provide any memory-ordering
> +      * guarantees, so we need a barrier to order the read of the node
> +      * data in pv_unhash *after* we've read the lock being _Q_SLOW_VAL.
>        *
> -      * Matches the cmpxchg() in pv_wait_head_or_lock() setting _Q_SLOW_VAL.
> +      * Matches the cmpxchg_acquire() in pv_wait_head_or_lock() setting
> +      * _Q_SLOW_VAL.
>        */
>       smp_rmb();
>  
> -- 
> 1.8.3.1
> 

Attachment: signature.asc
Description: PGP signature

Reply via email to