This really should have been many patches.

On Sun, Aug 20, 2017 at 07:25:01PM +1000, Nicholas Piggin wrote:

> @@ -108,12 +109,10 @@ static inline unsigned __read_seqcount_begin(const 
> seqcount_t *s)
>  {
>       unsigned ret;
>  
> -repeat:
>       ret = READ_ONCE(s->sequence);
> -     if (unlikely(ret & 1)) {
> -             cpu_relax();
> -             goto repeat;
> -     }
> +     if (unlikely(ret & 1))
> +             spin_until_cond( !((ret = READ_ONCE(s->sequence)) & 1) );
> +

That seems to suggest something like: cond_load(), similar to
smp_cond_load_acquire(), except without the smp/acquire parts.

>       return ret;
>  }
>  
> diff --git a/kernel/locking/mcs_spinlock.h b/kernel/locking/mcs_spinlock.h
> index 6a385aabcce7..a91a0cc46a4c 100644
> --- a/kernel/locking/mcs_spinlock.h
> +++ b/kernel/locking/mcs_spinlock.h
> @@ -27,8 +27,7 @@ struct mcs_spinlock {
>   */
>  #define arch_mcs_spin_lock_contended(l)                                      
> \
>  do {                                                                 \
> -     while (!(smp_load_acquire(l)))                                  \
> -             cpu_relax();                                            \
> +     spin_until_cond(smp_load_acquire(l));                           \
>  } while (0)

Torn on whether is makes sense to use smp_cond_load_acquire() here
instead. That avoids issuing the barrier on each loop.

>  #endif
>  
> @@ -107,8 +106,7 @@ void mcs_spin_unlock(struct mcs_spinlock **lock, struct 
> mcs_spinlock *node)
>               if (likely(cmpxchg_release(lock, node, NULL) == node))
>                       return;
>               /* Wait until the next pointer is set */
> -             while (!(next = READ_ONCE(node->next)))
> -                     cpu_relax();
> +             spin_until_cond((next = READ_ONCE(node->next)) != 0);
>       }
>  
>       /* Pass lock to next waiter. */

Hmm, you could've used that same pattern above too I suppose,

        spin_until_cond(!((ret = READ_ONCE(s->sequence)) & 1));

> diff --git a/kernel/locking/mutex.c b/kernel/locking/mutex.c
> index 858a07590e39..0ffa1cd7f12b 100644
> --- a/kernel/locking/mutex.c
> +++ b/kernel/locking/mutex.c
> @@ -427,6 +427,7 @@ bool mutex_spin_on_owner(struct mutex *lock, struct 
> task_struct *owner,
>       bool ret = true;
>  
>       rcu_read_lock();
> +     spin_begin();
>       while (__mutex_owner(lock) == owner) {
>               /*
>                * Ensure we emit the owner->on_cpu, dereference _after_
> @@ -450,8 +451,9 @@ bool mutex_spin_on_owner(struct mutex *lock, struct 
> task_struct *owner,
>                       break;
>               }
>  
> -             cpu_relax();
> +             spin_cpu_relax();
>       }
> +     spin_end();
>       rcu_read_unlock();

Since we just did a trylock we expect to wait, which is why we don't do
a condition test before spin_begin(), right?

>  
>       return ret;
> @@ -532,6 +534,7 @@ mutex_optimistic_spin(struct mutex *lock, struct 
> ww_acquire_ctx *ww_ctx,
>                       goto fail;
>       }
>  
> +     spin_begin();
>       for (;;) {
>               struct task_struct *owner;
>  
> @@ -553,8 +556,9 @@ mutex_optimistic_spin(struct mutex *lock, struct 
> ww_acquire_ctx *ww_ctx,
>                * memory barriers as we'll eventually observe the right
>                * values at the cost of a few extra spins.
>                */
> -             cpu_relax();
> +             spin_cpu_relax();
>       }
> +     spin_end();
>  
>       if (!waiter)
>               osq_unlock(&lock->osq);
> @@ -563,6 +567,8 @@ mutex_optimistic_spin(struct mutex *lock, struct 
> ww_acquire_ctx *ww_ctx,
>  
>  
>  fail_unlock:
> +     spin_end();
> +
>       if (!waiter)
>               osq_unlock(&lock->osq);
>  

The same, we just did a trylock before calling this.

However!, I think you just created a nested spin_begin()/spin_end,
looking at how PPC implements that, this doesn't look right.

> diff --git a/kernel/locking/osq_lock.c b/kernel/locking/osq_lock.c
> index a3167941093b..9dd58bbe60b7 100644
> --- a/kernel/locking/osq_lock.c
> +++ b/kernel/locking/osq_lock.c
> @@ -53,6 +53,7 @@ osq_wait_next(struct optimistic_spin_queue *lock,
>        */
>       old = prev ? prev->cpu : OSQ_UNLOCKED_VAL;
>  
> +     spin_begin();
>       for (;;) {
>               if (atomic_read(&lock->tail) == curr &&
>                   atomic_cmpxchg_acquire(&lock->tail, curr, old) == curr) {
> @@ -80,8 +81,9 @@ osq_wait_next(struct optimistic_spin_queue *lock,
>                               break;
>               }
>  
> -             cpu_relax();
> +             spin_cpu_relax();
>       }
> +     spin_end();
>  
>       return next;
>  }

This however doesn't have a fast-path, we unconditionally do
spin_begin(). That looks sub-optimal. From looking at PPC that drops the
HT priority even though we might not end up waiting at all.


> @@ -107,6 +109,8 @@ bool osq_lock(struct optimistic_spin_queue *lock)
>       if (old == OSQ_UNLOCKED_VAL)
>               return true;
>  
> +     spin_begin();
> +
>       prev = decode_cpu(old);
>       node->prev = prev;
>       WRITE_ONCE(prev->next, node);

Why so early? You again do HMT_low() even though we might not hit the
cpu_relax().

> @@ -129,8 +133,9 @@ bool osq_lock(struct optimistic_spin_queue *lock)
>               if (need_resched() || vcpu_is_preempted(node_cpu(node->prev)))
>                       goto unqueue;
>  
> -             cpu_relax();
> +             spin_cpu_relax();
>       }
> +     spin_end();
>       return true;
>  
>  unqueue:



> @@ -152,10 +157,12 @@ bool osq_lock(struct optimistic_spin_queue *lock)
>                * in which case we should observe @node->locked becomming
>                * true.
>                */
> -             if (smp_load_acquire(&node->locked))
> +             if (smp_load_acquire(&node->locked)) {
> +                     spin_end();
>                       return true;
> +             }
>  
> -             cpu_relax();
> +             spin_cpu_relax();
>  
>               /*
>                * Or we race against a concurrent unqueue()'s step-B, in which
> @@ -164,6 +171,8 @@ bool osq_lock(struct optimistic_spin_queue *lock)
>               prev = READ_ONCE(node->prev);
>       }
>  
> +     spin_end();
> +

So here we did all of step-A in low-prio, that doesn't look right.

>       /*
>        * Step - B -- stabilize @next
>        *
> diff --git a/kernel/locking/qrwlock.c b/kernel/locking/qrwlock.c
> index 2655f26ec882..186ff495097d 100644
> --- a/kernel/locking/qrwlock.c
> +++ b/kernel/locking/qrwlock.c
> @@ -54,10 +54,12 @@ struct __qrwlock {
>  static __always_inline void
>  rspin_until_writer_unlock(struct qrwlock *lock, u32 cnts)
>  {
> +     spin_begin();
>       while ((cnts & _QW_WMASK) == _QW_LOCKED) {
> -             cpu_relax();
> +             spin_cpu_relax();
>               cnts = atomic_read_acquire(&lock->cnts);
>       }
> +     spin_end();
>  }

Again, unconditional :/

>  /**
> @@ -124,6 +126,7 @@ void queued_write_lock_slowpath(struct qrwlock *lock)
>        * Set the waiting flag to notify readers that a writer is pending,
>        * or wait for a previous writer to go away.
>        */
> +     spin_begin();
>       for (;;) {
>               struct __qrwlock *l = (struct __qrwlock *)lock;
>  
> @@ -131,7 +134,7 @@ void queued_write_lock_slowpath(struct qrwlock *lock)
>                  (cmpxchg_relaxed(&l->wmode, 0, _QW_WAITING) == 0))
>                       break;
>  
> -             cpu_relax();
> +             spin_cpu_relax();
>       }
>       /* When no more readers, set the locked flag */
> @@ -142,8 +145,10 @@ void queued_write_lock_slowpath(struct qrwlock *lock)
>                                           _QW_LOCKED) == _QW_WAITING))
>                       break;
>  
> -             cpu_relax();
> +             spin_cpu_relax();
>       }
> +     spin_end();
> +
>  unlock:
>       arch_spin_unlock(&lock->wait_lock);
>  }

Would not something like:

        do {
                spin_until_cond(!READ_ONCE(l->wmode));
        } while (try_cmpxchg_relaxed(&l->wmode, 0, _QW_WAITING));

Be a better pattern here? Then we have a fast-path without low-medium
flips.


> diff --git a/kernel/locking/qspinlock_paravirt.h 
> b/kernel/locking/qspinlock_paravirt.h
> index 4ccfcaae5b89..88817e41fadf 100644
> --- a/kernel/locking/qspinlock_paravirt.h
> +++ b/kernel/locking/qspinlock_paravirt.h
> @@ -293,15 +293,19 @@ static void pv_wait_node(struct mcs_spinlock *node, 
> struct mcs_spinlock *prev)
>       bool wait_early;
>  
>       for (;;) {
> +             spin_begin();

again, unconditional..

>               for (wait_early = false, loop = SPIN_THRESHOLD; loop; loop--) {
> -                     if (READ_ONCE(node->locked))
> +                     if (READ_ONCE(node->locked)) {
> +                             spin_end();
>                               return;
> +                     }
>                       if (pv_wait_early(pp, loop)) {
>                               wait_early = true;
>                               break;
>                       }
> -                     cpu_relax();
> +                     spin_cpu_relax();
>               }
> +             spin_end();
>  
>               /*
>                * Order pn->state vs pn->locked thusly:
> @@ -417,11 +421,15 @@ pv_wait_head_or_lock(struct qspinlock *lock, struct 
> mcs_spinlock *node)
>                * disable lock stealing before attempting to acquire the lock.
>                */
>               set_pending(lock);
> +             spin_begin();

and again...

>               for (loop = SPIN_THRESHOLD; loop; loop--) {
> -                     if (trylock_clear_pending(lock))
> +                     if (trylock_clear_pending(lock)) {
> +                             spin_end();
>                               goto gotlock;
> -                     cpu_relax();
> +                     }
> +                     spin_cpu_relax();
>               }
> +             spin_end();
>               clear_pending(lock);
>  



> diff --git a/kernel/locking/rwsem-xadd.c b/kernel/locking/rwsem-xadd.c
> index 34e727f18e49..2d0e539f1a95 100644
> --- a/kernel/locking/rwsem-xadd.c
> +++ b/kernel/locking/rwsem-xadd.c
> @@ -358,6 +358,7 @@ static noinline bool rwsem_spin_on_owner(struct 
> rw_semaphore *sem)
>               goto out;
>  
>       rcu_read_lock();
> +     spin_begin();
>       while (sem->owner == owner) {
>               /*
>                * Ensure we emit the owner->on_cpu, dereference _after_
> @@ -373,12 +374,14 @@ static noinline bool rwsem_spin_on_owner(struct 
> rw_semaphore *sem)
>                */
>               if (!owner->on_cpu || need_resched() ||
>                               vcpu_is_preempted(task_cpu(owner))) {
> +                     spin_end();
>                       rcu_read_unlock();
>                       return false;
>               }
>  
> -             cpu_relax();
> +             spin_cpu_relax();
>       }
> +     spin_end();
>       rcu_read_unlock();
>  out:
>       /*

rwsem_spin_on_owner() is different from the mutex one in that we don't
first do a trylock() which then makes this unconditional.

So I think we want the rwsem_optimistic_spin() main loop re-ordered to
first do the trylock and _then_ wait if it fails. Not first wait and
then try.

> @@ -408,6 +411,7 @@ static bool rwsem_optimistic_spin(struct rw_semaphore 
> *sem)
>        *  2) readers own the lock as we can't determine if they are
>        *     actively running or not.
>        */
> +     spin_begin();
>       while (rwsem_spin_on_owner(sem)) {
>               /*
>                * Try to acquire the lock
> @@ -432,8 +436,9 @@ static bool rwsem_optimistic_spin(struct rw_semaphore 
> *sem)
>                * memory barriers as we'll eventually observe the right
>                * values at the cost of a few extra spins.
>                */
> -             cpu_relax();
> +             spin_cpu_relax();
>       }
> +     spin_end();
>       osq_unlock(&sem->osq);
>  done:
>       preempt_enable();

And I think you did the recursive spin_begin thing again here.

Reply via email to