On Fri, Jul 15, 2016 at 06:35:56PM +0200, Peter Zijlstra wrote:
> On Fri, Jul 15, 2016 at 12:07:03PM +0200, Peter Zijlstra wrote:
> > > So if we are kicked by the unlock_slowpath, and the lock is stealed by
> > > someone else,  we need hash its node again and set l->locked to
> > > _Q_SLOW_VAL, then enter pv_wait.
> > 
> > Right, let me go think about this a bit.
> 
> Urgh, brain hurt.
> 
> So I _think_ the below does for it but I could easily have missed yet
> another case.
> 
> Waiman's patch has the problem that it can have two pv_hash() calls for
> the same lock in progress and I'm thinking that means we can hit the
> BUG() in pv_hash() due to that.
> 

I think Waiman's patch does have the problem of two pv_hash() calls for
the same lock in progress. As I mentioned in the first version:

http://lkml.kernel.org/g/20160527074331.GB8096@insomnia

And he tried to address this in the patch #3 of this series. However,
I think what is proper here is either to reorder patch 2 and 3 or to
merge patch 2 and 3, otherwise, we are introducing a bug in the middle
of this series.

Thoughts, Waiman?

That said, I found Peter's way is much simpler and easier to understand
;-)

> If we can't, it still has a problem because its not telling us either.
> 
> 
> 
> --- a/kernel/locking/qspinlock_paravirt.h
> +++ b/kernel/locking/qspinlock_paravirt.h
> @@ -20,7 +20,8 @@
>   * native_queued_spin_unlock().
>   */
>  
> -#define _Q_SLOW_VAL  (3U << _Q_LOCKED_OFFSET)
> +#define _Q_HASH_VAL  (3U << _Q_LOCKED_OFFSET)
> +#define _Q_SLOW_VAL  (7U << _Q_LOCKED_OFFSET)
>  
>  /*
>   * Queue Node Adaptive Spinning
> @@ -36,14 +37,11 @@
>   */
>  #define PV_PREV_CHECK_MASK   0xff
>  
> -/*
> - * Queue node uses: vcpu_running & vcpu_halted.
> - * Queue head uses: vcpu_running & vcpu_hashed.
> - */
>  enum vcpu_state {
> -     vcpu_running = 0,
> -     vcpu_halted,            /* Used only in pv_wait_node */
> -     vcpu_hashed,            /* = pv_hash'ed + vcpu_halted */
> +     vcpu_node_running = 0,
> +     vcpu_node_halted,
> +     vcpu_head_running,

We actually don't need this extra running state, right? Because nobody
cares about the difference between two running states right now.

> +     vcpu_head_halted,
>  };
>  
>  struct pv_node {
> @@ -263,7 +261,7 @@ pv_wait_early(struct pv_node *prev, int
>       if ((loop & PV_PREV_CHECK_MASK) != 0)
>               return false;
>  
> -     return READ_ONCE(prev->state) != vcpu_running;
> +     return READ_ONCE(prev->state) & 1;
>  }
>  
>  /*
> @@ -311,20 +309,19 @@ static void pv_wait_node(struct mcs_spin
>                *
>                * Matches the cmpxchg() from pv_kick_node().
>                */
> -             smp_store_mb(pn->state, vcpu_halted);
> +             smp_store_mb(pn->state, vcpu_node_halted);
>  
> -             if (!READ_ONCE(node->locked)) {
> -                     qstat_inc(qstat_pv_wait_node, true);
> -                     qstat_inc(qstat_pv_wait_early, wait_early);
> -                     pv_wait(&pn->state, vcpu_halted);
> -             }
> +             if (READ_ONCE(node->locked))
> +                     return;
> +
> +             qstat_inc(qstat_pv_wait_node, true);
> +             qstat_inc(qstat_pv_wait_early, wait_early);
> +             pv_wait(&pn->state, vcpu_node_halted);
>  
>               /*
> -              * If pv_kick_node() changed us to vcpu_hashed, retain that
> -              * value so that pv_wait_head_or_lock() knows to not also try
> -              * to hash this lock.
> +              * If pv_kick_node() advanced us, retain that state.
>                */
> -             cmpxchg(&pn->state, vcpu_halted, vcpu_running);
> +             cmpxchg(&pn->state, vcpu_node_halted, vcpu_node_running);
>  
>               /*
>                * If the locked flag is still not set after wakeup, it is a
> @@ -362,18 +359,17 @@ static void pv_kick_node(struct qspinloc
>        *
>        * Matches with smp_store_mb() and cmpxchg() in pv_wait_node()
>        */
> -     if (cmpxchg(&pn->state, vcpu_halted, vcpu_hashed) != vcpu_halted)
> +     if (cmpxchg(&pn->state, vcpu_node_halted, vcpu_head_running) != 
> vcpu_node_halted)
>               return;
>  
>       /*
> -      * Put the lock into the hash table and set the _Q_SLOW_VAL.
> -      *
> -      * As this is the same vCPU that will check the _Q_SLOW_VAL value and
> -      * the hash table later on at unlock time, no atomic instruction is
> -      * needed.
> +      * See pv_wait_head_or_lock(). We have to hash and force the unlock
> +      * into the slow path to deliver the actual kick for waking.
>        */
> -     WRITE_ONCE(l->locked, _Q_SLOW_VAL);
> -     (void)pv_hash(lock, pn);
> +     if (cmpxchg(&l->locked, _Q_LOCKED_VAL, _Q_HASH_VAL) == _Q_LOCKED_VAL) {
> +             (void)pv_hash(lock, pn);
> +             smp_store_release(&l->locked, _Q_SLOW_VAL);
> +     }
>  }
>  
>  /*
> @@ -388,28 +384,22 @@ pv_wait_head_or_lock(struct qspinlock *l
>  {
>       struct pv_node *pn = (struct pv_node *)node;
>       struct __qspinlock *l = (void *)lock;
> -     struct qspinlock **lp = NULL;
>       int waitcnt = 0;
>       int loop;
>  
>       /*
> -      * If pv_kick_node() already advanced our state, we don't need to
> -      * insert ourselves into the hash table anymore.
> -      */
> -     if (READ_ONCE(pn->state) == vcpu_hashed)
> -             lp = (struct qspinlock **)1;
> -
> -     /*
>        * Tracking # of slowpath locking operations
>        */
>       qstat_inc(qstat_pv_lock_slowpath, true);
>  
>       for (;; waitcnt++) {
> +             u8 locked;
> +
>               /*
>                * Set correct vCPU state to be used by queue node wait-early
>                * mechanism.
>                */
> -             WRITE_ONCE(pn->state, vcpu_running);
> +             WRITE_ONCE(pn->state, vcpu_head_running);
>  
>               /*
>                * Set the pending bit in the active lock spinning loop to
> @@ -423,33 +413,38 @@ pv_wait_head_or_lock(struct qspinlock *l
>               }
>               clear_pending(lock);
>  
> +             /*
> +              * We want to go sleep; ensure we're hashed so that
> +              * __pv_queued_spin_unlock_slow() can find us for a wakeup.
> +              */
> +             locked = cmpxchg(&l->locked, _Q_LOCKED_VAL, _Q_HASH_VAL);
> +             switch (locked) {
> +             /*
> +              * We're not hashed yet, either we're fresh from pv_wait_node()
> +              * or __pv_queued_spin_unlock_slow() unhashed us but we lost
> +              * the trylock to a steal and have to re-hash.
> +              */
> +             case _Q_LOCKED_VAL:
> +                     (void)pv_hash(lock, pn);
> +                     smp_store_release(&l->locked, _Q_SLOW_VAL);
> +                     break;
>  
> -             if (!lp) { /* ONCE */
> -                     lp = pv_hash(lock, pn);
> +             /*
> +              * pv_kick_node() is hashing us, wait for it.
> +              */
> +             case _Q_HASH_VAL:
> +                     while (READ_ONCE(l->locked) == _Q_HASH_VAL)
> +                             cpu_relax();
> +                     break;
>  
> -                     /*
> -                      * We must hash before setting _Q_SLOW_VAL, such that
> -                      * when we observe _Q_SLOW_VAL in 
> __pv_queued_spin_unlock()
> -                      * we'll be sure to be able to observe our hash entry.
> -                      *
> -                      *   [S] <hash>                 [Rmw] l->locked == 
> _Q_SLOW_VAL
> -                      *       MB                           RMB
> -                      * [RmW] l->locked = _Q_SLOW_VAL  [L] <unhash>
> -                      *
> -                      * Matches the smp_rmb() in __pv_queued_spin_unlock().
> -                      */
> -                     if (xchg(&l->locked, _Q_SLOW_VAL) == 0) {
> -                             /*
> -                              * The lock was free and now we own the lock.
> -                              * Change the lock value back to _Q_LOCKED_VAL
> -                              * and unhash the table.
> -                              */
> -                             WRITE_ONCE(l->locked, _Q_LOCKED_VAL);
> -                             WRITE_ONCE(*lp, NULL);
> -                             goto gotlock;
> -                     }
> +             /*
> +              * Ooh, unlocked, try and grab it.
> +              */
> +             case 0:
> +                     continue;
>               }
> -             WRITE_ONCE(pn->state, vcpu_hashed);
> +
> +             WRITE_ONCE(pn->state, vcpu_head_halted);
>               qstat_inc(qstat_pv_wait_head, true);
>               qstat_inc(qstat_pv_wait_again, waitcnt);
>               pv_wait(&l->locked, _Q_SLOW_VAL);
> @@ -480,7 +475,7 @@ __pv_queued_spin_unlock_slowpath(struct
>       struct __qspinlock *l = (void *)lock;
>       struct pv_node *node;
>  
> -     if (unlikely(locked != _Q_SLOW_VAL)) {
> +     if (unlikely(locked != _Q_SLOW_VAL && locked != _Q_HASH_VAL)) {
>               WARN(!debug_locks_silent,
>                    "pvqspinlock: lock 0x%lx has corrupted value 0x%x!\n",
>                    (unsigned long)lock, atomic_read(&lock->val));
> @@ -488,18 +483,17 @@ __pv_queued_spin_unlock_slowpath(struct
>       }
>  
>       /*
> -      * A failed cmpxchg doesn't provide any memory-ordering guarantees,
> -      * so we need a barrier to order the read of the node data in
> -      * pv_unhash *after* we've read the lock being _Q_SLOW_VAL.
> -      *
> -      * Matches the cmpxchg() in pv_wait_head_or_lock() setting _Q_SLOW_VAL.
> +      * Wait until the hash-bucket is complete.
>        */
> -     smp_rmb();
> +     while (READ_ONCE(l->locked) == _Q_HASH_VAL)
> +             cpu_relax();
>  

This does give a chance to let the lock waiter block the lock holder,
right? Considering a lock queue head's vcpu is preempted before it could
set the lock to _Q_SLOW_VAL.

Could we do something like this here:

        if (unlikely(cmpxchg(l->locked, _Q_HASH_VAL, 0) == _Q_HASH_VAL))
                return;
                
And in pv_wait_head_or_lock()

        locked = cmpxchg(&l->locked, _Q_LOCKED_VAL, _Q_HASH_VAL);
        switch(locked) {
                case _Q_LOCKED_VAL:
                        (void)pv_hash(lock, pn);
                        locked = cmpxchg_release(&l->locked, _Q_HASH_VAL, 
_Q_SLOW_VAL);

                        /*
                         * Only the holder will change the ->locked from
                         * _Q_HASH_VAL to another value, if this
                         * happens, the holder has already released the
                         * lock without trying to wake the head, in this
                         * case, we need to unhash ourselves and there
                         * is a great chance we can get the locke.
                         */
                        if (unlikely(locked != _Q_HASH_VAL)) {
                                pv_unhash(lock, pn);
                                if (!cmpxchg_relaxed(&l->locked, 0, 
_Q_LOCKED_VAL)
                                        goto gotlock;
                        }
                        break;



Wrote those in my mailbox, may miss something.

Thoughts?

Regards,
Boqun

>       /*
> -      * Since the above failed to release, this must be the SLOW path.
> -      * Therefore start by looking up the blocked node and unhashing it.
> +      * Must first observe _Q_SLOW_VAL in order to observe
> +      * consistent hash bucket.
>        */
> +     smp_rmb();
> +
>       node = pv_unhash(lock);
>  
>       /*

Attachment: signature.asc
Description: PGP signature

Reply via email to