On Thu, 2022-07-28 at 16:31 +1000, Nicholas Piggin wrote: <snip> > > +/* > + * Bitfields in the atomic value: > + * > + * 0: locked bit > + * 16-31: tail cpu (+1) > + */ > +#define _Q_SET_MASK(type) (((1U << _Q_ ## type ## _BITS) - 1)\ > + << _Q_ ## type ## _OFFSET) > +#define _Q_LOCKED_OFFSET 0 > +#define _Q_LOCKED_BITS 1 > +#define _Q_LOCKED_MASK _Q_SET_MASK(LOCKED) > +#define _Q_LOCKED_VAL (1U << _Q_LOCKED_OFFSET) > + > +#define _Q_TAIL_CPU_OFFSET 16 > +#define _Q_TAIL_CPU_BITS (32 - _Q_TAIL_CPU_OFFSET) > +#define _Q_TAIL_CPU_MASK _Q_SET_MASK(TAIL_CPU) > +
Just to state the obvious this is: #define _Q_LOCKED_OFFSET 0 #define _Q_LOCKED_BITS 1 #define _Q_LOCKED_MASK 0x00000001 #define _Q_LOCKED_VAL 1 #define _Q_TAIL_CPU_OFFSET 16 #define _Q_TAIL_CPU_BITS 16 #define _Q_TAIL_CPU_MASK 0xffff0000 > +#if CONFIG_NR_CPUS >= (1U << _Q_TAIL_CPU_BITS) > +#error "qspinlock does not support such large CONFIG_NR_CPUS" > +#endif > + > #endif /* _ASM_POWERPC_QSPINLOCK_TYPES_H */ > diff --git a/arch/powerpc/lib/qspinlock.c b/arch/powerpc/lib/qspinlock.c > index 8dbce99a373c..5ebb88d95636 100644 > --- a/arch/powerpc/lib/qspinlock.c > +++ b/arch/powerpc/lib/qspinlock.c > @@ -1,12 +1,172 @@ > // SPDX-License-Identifier: GPL-2.0-or-later > +#include <linux/atomic.h> > +#include <linux/bug.h> > +#include <linux/compiler.h> > #include <linux/export.h> > -#include <linux/processor.h> > +#include <linux/percpu.h> > +#include <linux/smp.h> > #include <asm/qspinlock.h> > > -void queued_spin_lock_slowpath(struct qspinlock *lock) > +#define MAX_NODES 4 > + > +struct qnode { > + struct qnode *next; > + struct qspinlock *lock; > + u8 locked; /* 1 if lock acquired */ > +}; > + > +struct qnodes { > + int count; > + struct qnode nodes[MAX_NODES]; > +}; I think it could be worth commenting why qnodes::count instead _Q_TAIL_IDX_OFFSET. > + > +static DEFINE_PER_CPU_ALIGNED(struct qnodes, qnodes); > + > +static inline int encode_tail_cpu(void) I think the generic version that takes smp_processor_id() as a parameter is clearer - at least with this function name. > +{ > + return (smp_processor_id() + 1) << _Q_TAIL_CPU_OFFSET; > +} > + > +static inline int get_tail_cpu(int val) It seems like there should be a "decode" function to pair up with the "encode" function. > +{ > + return (val >> _Q_TAIL_CPU_OFFSET) - 1; > +} > + > +/* Take the lock by setting the bit, no other CPUs may concurrently lock it. > */ Does that comment mean it is not necessary to use an atomic_or here? > +static __always_inline void lock_set_locked(struct qspinlock *lock) nit: could just be called set_locked() > +{ > + atomic_or(_Q_LOCKED_VAL, &lock->val); > + __atomic_acquire_fence(); > +} > + > +/* Take lock, clearing tail, cmpxchg with val (which must not be locked) */ > +static __always_inline int trylock_clear_tail_cpu(struct qspinlock *lock, > int val) > +{ > + int newval = _Q_LOCKED_VAL; > + > + if (atomic_cmpxchg_acquire(&lock->val, val, newval) == val) > + return 1; > + else > + return 0; same optional style nit: return (atomic_cmpxchg_acquire(&lock->val, val, newval) == val); > +} > + > +/* > + * Publish our tail, replacing previous tail. Return previous value. > + * > + * This provides a release barrier for publishing node, and an acquire > barrier > + * for getting the old node. > + */ > +static __always_inline int publish_tail_cpu(struct qspinlock *lock, int tail) Did you change from the xchg_tail() name in the generic version because of the release and acquire barriers this provides? Does "publish" generally imply the old value will be returned? > { > - while (!queued_spin_trylock(lock)) > + for (;;) { > + int val = atomic_read(&lock->val); > + int newval = (val & ~_Q_TAIL_CPU_MASK) | tail; > + int old; > + > + old = atomic_cmpxchg(&lock->val, val, newval); > + if (old == val) > + return old; > + } > +} > + > +static struct qnode *get_tail_qnode(struct qspinlock *lock, int val) > +{ > + int cpu = get_tail_cpu(val); > + struct qnodes *qnodesp = per_cpu_ptr(&qnodes, cpu); > + int idx; > + > + for (idx = 0; idx < MAX_NODES; idx++) { > + struct qnode *qnode = &qnodesp->nodes[idx]; > + if (qnode->lock == lock) > + return qnode; > + } In case anyone else is confused by this, Nick explained each cpu can only queue on a unique spinlock once regardless of "idx" level. > + > + BUG(); > +} > + > +static inline void queued_spin_lock_mcs_queue(struct qspinlock *lock) > +{ > + struct qnodes *qnodesp; > + struct qnode *next, *node; > + int val, old, tail; > + int idx; > + > + BUILD_BUG_ON(CONFIG_NR_CPUS >= (1U << _Q_TAIL_CPU_BITS)); > + > + qnodesp = this_cpu_ptr(&qnodes); > + if (unlikely(qnodesp->count == MAX_NODES)) { The comparison is >= in the generic, I guess we've no nested NMI so this is safe? > + while (!queued_spin_trylock(lock)) > + cpu_relax(); > + return; > + } > + > + idx = qnodesp->count++; > + /* > + * Ensure that we increment the head node->count before initialising > + * the actual node. If the compiler is kind enough to reorder these > + * stores, then an IRQ could overwrite our assignments. > + */ > + barrier(); > + node = &qnodesp->nodes[idx]; > + node->next = NULL; > + node->lock = lock; > + node->locked = 0; > + > + tail = encode_tail_cpu(); > + > + old = publish_tail_cpu(lock, tail); > + > + /* > + * If there was a previous node; link it and wait until reaching the > + * head of the waitqueue. > + */ > + if (old & _Q_TAIL_CPU_MASK) { > + struct qnode *prev = get_tail_qnode(lock, old); > + > + /* Link @node into the waitqueue. */ > + WRITE_ONCE(prev->next, node); > + > + /* Wait for mcs node lock to be released */ > + while (!node->locked) > + cpu_relax(); > + > + smp_rmb(); /* acquire barrier for the mcs lock */ > + } > + > + /* We're at the head of the waitqueue, wait for the lock. */ > + while ((val = atomic_read(&lock->val)) & _Q_LOCKED_VAL) > + cpu_relax(); > + > + /* If we're the last queued, must clean up the tail. */ > + if ((val & _Q_TAIL_CPU_MASK) == tail) { > + if (trylock_clear_tail_cpu(lock, val)) > + goto release; > + /* Another waiter must have enqueued */ > + } > + > + /* We must be the owner, just set the lock bit and acquire */ > + lock_set_locked(lock); > + > + /* contended path; must wait for next != NULL (MCS protocol) */ > + while (!(next = READ_ONCE(node->next))) > cpu_relax(); > + > + /* > + * Unlock the next mcs waiter node. Release barrier is not required > + * here because the acquirer is only accessing the lock word, and > + * the acquire barrier we took the lock with orders that update vs > + * this store to locked. The corresponding barrier is the smp_rmb() > + * acquire barrier for mcs lock, above. > + */ > + WRITE_ONCE(next->locked, 1); > + > +release: > + qnodesp->count--; /* release the node */ > +} > + > +void queued_spin_lock_slowpath(struct qspinlock *lock) > +{ > + queued_spin_lock_mcs_queue(lock); > } > EXPORT_SYMBOL(queued_spin_lock_slowpath); >