As the priority of a task may get boosted due to an acquired rtmutex,
we will need to periodically check the task priority to see if it
gets boosted. For an originally non-RT task, that means unqueuing from
the MCS wait queue before doing an RT spinning. So the unqueuing code
from osq_lock is borrowed to handle the unqueuing aspect of it.

Signed-off-by: Waiman Long <[email protected]>
---
 kernel/locking/qspinlock.c    |  25 ++++-
 kernel/locking/qspinlock_rt.h | 235 +++++++++++++++++++++++++++++++++++++++++-
 2 files changed, 253 insertions(+), 7 deletions(-)

diff --git a/kernel/locking/qspinlock.c b/kernel/locking/qspinlock.c
index cb5c2e5..b25ad58 100644
--- a/kernel/locking/qspinlock.c
+++ b/kernel/locking/qspinlock.c
@@ -270,10 +270,18 @@ static __always_inline u32  __pv_wait_head_or_lock(struct 
qspinlock *lock,
 /*
  * Realtime queued spinlock is mutual exclusive with native and PV spinlocks.
  */
+#define RT_RETRY       ((u32)-1)
+
 #ifdef CONFIG_REALTIME_QUEUED_SPINLOCKS
 #include "qspinlock_rt.h"
 #else
-static __always_inline u32 rt_wait_head_or_retry(struct qspinlock *lock)
+static __always_inline void rt_init_node(struct mcs_spinlock *node, u32 tail) 
{}
+static __always_inline bool rt_wait_node_or_unqueue(struct qspinlock *lock,
+                                                   struct mcs_spinlock *node,
+                                                   struct mcs_spinlock *prev)
+                                               { return false; }
+static __always_inline u32  rt_spin_lock_or_retry(struct qspinlock *lock,
+                                                 struct mcs_spinlock *node)
                                                { return 0; }
 #define rt_pending(v)          0
 #define rt_enabled()           false
@@ -514,6 +522,7 @@ void queued_spin_lock_slowpath(struct qspinlock *lock, u32 
val)
        node->locked = 0;
        node->next = NULL;
        pv_init_node(node);
+       rt_init_node(node, tail);
 
        /*
         * We touched a (possibly) cold cacheline in the per-cpu queue node;
@@ -552,6 +561,10 @@ void queued_spin_lock_slowpath(struct qspinlock *lock, u32 
val)
                WRITE_ONCE(prev->next, node);
 
                pv_wait_node(node, prev);
+
+               if (rt_wait_node_or_unqueue(lock, node, prev))
+                       goto queue;     /* Retry RT locking */
+
                arch_mcs_spin_lock_contended(&node->locked);
 
                /*
@@ -591,10 +604,14 @@ void queued_spin_lock_slowpath(struct qspinlock *lock, 
u32 val)
 
        /*
         * The RT rt_wait_head_or_lock function, if active, will acquire the
-        * lock and return a non-zero value.
+        * lock, release the MCS lock and return a non-zero value. We need to
+        * retry with RT spinning when RT_RETRY is returned.
         */
-       if ((val = rt_wait_head_or_retry(lock)))
-               goto locked;
+       val = rt_spin_lock_or_retry(lock, node);
+       if (val == RT_RETRY)
+               goto queue;
+       else if (val)
+               return;
 
        val = smp_cond_load_acquire(&lock->val.counter, !(VAL & 
_Q_LOCKED_PENDING_MASK));
 
diff --git a/kernel/locking/qspinlock_rt.h b/kernel/locking/qspinlock_rt.h
index c2f2722..0c4d051 100644
--- a/kernel/locking/qspinlock_rt.h
+++ b/kernel/locking/qspinlock_rt.h
@@ -37,6 +37,12 @@
  * doing so, we can query the highest priority task that is waiting on the
  * outer lock and adjust our waiting priority accordingly. To speed up nested
  * spinlock calls, they will have a minimum RT priority of 1 to begin with.
+ *
+ * To handle priority boosting due to an acquired rt-mutex, The task->prio
+ * field is queried in each iteration of the loop. For originally non-RT tasks,
+ * it will have to break out of the MCS wait queue just like what is done
+ * in the OSQ lock. Then it has to retry RT spinning if it has been boosted
+ * to RT priority.
  */
 #include <linux/sched.h>
 
@@ -45,9 +51,56 @@
 #endif
 
 /*
+ * For proper unqueuing from the MCS wait queue, we need to store the encoded
+ * tail code as well the previous node pointer into the extra MCS node. Since
+ * CPUs in interrupt context won't use the per-CPU MCS nodes anymore. So only
+ * one is needed for process context CPUs. As a result, we can use the
+ * additional nodes for data storage. Here, we allow 2 nodes per cpu in case
+ * we want to put softIRQ CPUs into the queue as well.
+ */
+struct rt_node {
+       struct mcs_spinlock     mcs;
+       struct mcs_spinlock     __reserved;
+       struct mcs_spinlock     *prev;
+       u32                     tail;
+};
+
+/*
  * =========================[ Helper Functions ]=========================
  */
 
+static u32 cmpxchg_tail_acquire(struct qspinlock *lock, u32 old, u32 new)
+{
+       struct __qspinlock *l = (void *)lock;
+
+       return cmpxchg_acquire(&l->tail, old >> _Q_TAIL_OFFSET,
+                              new >> _Q_TAIL_OFFSET) << _Q_TAIL_OFFSET;
+}
+
+static u32 cmpxchg_tail_release(struct qspinlock *lock, u32 old, u32 new)
+{
+       struct __qspinlock *l = (void *)lock;
+
+       return cmpxchg_release(&l->tail, old >> _Q_TAIL_OFFSET,
+                              new >> _Q_TAIL_OFFSET) << _Q_TAIL_OFFSET;
+}
+
+static inline void rt_write_prev(struct mcs_spinlock *node,
+                                struct mcs_spinlock *prev)
+{
+       WRITE_ONCE(((struct rt_node *)node)->prev, prev);
+}
+
+static inline u32 rt_read_tail(struct mcs_spinlock *node)
+{
+       return READ_ONCE(((struct rt_node *)node)->tail);
+}
+
+static inline struct mcs_spinlock *rt_read_prev(struct mcs_spinlock *node)
+{
+       return READ_ONCE(((struct rt_node *)node)->prev);
+}
+
 /*
  * Translate the priority of a task to an equivalent RT priority
  */
@@ -141,6 +194,56 @@ static bool __rt_spin_trylock(struct qspinlock *lock,
 }
 
 /*
+ * MCS wait queue unqueuing code, borrow mostly from osq_lock.c.
+ */
+static struct mcs_spinlock *
+mcsq_wait_next(struct qspinlock *lock, struct mcs_spinlock *node,
+              struct mcs_spinlock *prev)
+{
+        struct mcs_spinlock *next = NULL;
+        u32 tail = rt_read_tail(node);
+        u32 old;
+
+       /*
+        * If there is a prev node in queue, the 'old' value will be
+        * the prev node's tail value. Otherwise, it's set to 0 since if
+        * we're the only one in queue, the queue will then become empty.
+        */
+       old = prev ? rt_read_tail(prev) : 0;
+
+       for (;;) {
+               if ((atomic_read(&lock->val) & _Q_TAIL_MASK) == tail &&
+                   cmpxchg_tail_acquire(lock, tail, old) == tail) {
+                       /*
+                        * We are at the queue tail, we moved the @lock back.
+                        * @prev will now observe @lock and will complete its
+                        * unlock()/unqueue().
+                        */
+                       break;
+               }
+
+               /*
+                * We must xchg() the @node->next value, because if we were to
+                * leave it in, a concurrent unlock()/unqueue() from
+                * @node->next might complete Step-A and think its @prev is
+                * still valid.
+                *
+                * If the concurrent unlock()/unqueue() wins the race, we'll
+                * wait for either @lock to point to us, through its Step-B, or
+                * wait for a new @node->next from its Step-C.
+                */
+               if (node->next) {
+                       next = xchg(&node->next, NULL);
+                       if (next)
+                               break;
+               }
+
+               cpu_relax();
+       }
+       return next;
+}
+
+/*
  * ====================[ Functions Used by qspinlock.c ]====================
  */
 
@@ -158,6 +261,17 @@ static inline int rt_pending(int val)
 }
 
 /*
+ * Initialize the RT fields of a MCS node.
+ */
+static inline void rt_init_node(struct mcs_spinlock *node, u32 tail)
+{
+       struct rt_node *n = (struct rt_node *)node;
+
+       n->prev = NULL;
+       n->tail = tail;
+}
+
+/*
  * Return: true if locked acquired
  *        false if queuing in the MCS wait queue is needed.
  */
@@ -167,16 +281,98 @@ static inline bool rt_spin_trylock(struct qspinlock *lock)
 }
 
 /*
+ * Return: true if it has been unqueued and need to retry locking.
+ *        false if it becomes the wait queue head & proceed to next step.
+ */
+static bool rt_wait_node_or_unqueue(struct qspinlock *lock,
+                                   struct mcs_spinlock *node,
+                                   struct mcs_spinlock *prev)
+{
+       struct mcs_spinlock *next;
+
+       rt_write_prev(node, prev);      /* Save previous node pointer */
+
+       while (!READ_ONCE(node->locked)) {
+               if (rt_task_priority(current, 0))
+                       goto unqueue;
+               cpu_relax();
+       }
+       return false;
+
+unqueue:
+       /*
+        * Step - A  -- stabilize @prev
+        *
+        * Undo our @prev->next assignment; this will make @prev's
+        * unlock()/unqueue() wait for a next pointer since @lock points
+        * to us (or later).
+        */
+       for (;;) {
+               if (prev->next == node &&
+                   cmpxchg(&prev->next, node, NULL) == node)
+                       break;
+
+               /*
+                * We can only fail the cmpxchg() racing against an unlock(),
+                * in which case we should observe @node->locked becoming
+                * true.
+                */
+               if (smp_load_acquire(&node->locked))
+                       return false;
+
+               cpu_relax();
+
+               /*
+                * Or we race against a concurrent unqueue()'s step-B, in which
+                * case its step-C will write us a new @node->prev pointer.
+                */
+               prev = rt_read_prev(node);
+       }
+
+       /*
+        * Step - B -- stabilize @next
+        *
+        * Similar to unlock(), wait for @node->next or move @lock from @node
+        * back to @prev.
+        */
+       next = mcsq_wait_next(lock, node, prev);
+
+       /*
+        * Step - C -- unlink
+        *
+        * @prev is stable because its still waiting for a new @prev->next
+        * pointer, @next is stable because our @node->next pointer is NULL and
+        * it will wait in Step-A.
+        */
+       if (next) {
+               rt_write_prev(next, prev);
+               WRITE_ONCE(prev->next, next);
+       }
+
+       /*
+        * Release the node.
+        */
+       __this_cpu_dec(mcs_nodes[0].count);
+
+       return true;    /* Need to retry RT spinning */
+}
+
+/*
  * We need to make the non-RT tasks wait longer if RT tasks are spinning for
  * the lock. This is done to reduce the chance that a non-RT task may
  * accidently grab the lock away from the RT tasks in the short interval
  * where the pending priority may be reset after an RT task acquires the lock.
  *
- * Return: Current value of the lock.
+ * Return: RT_RETRY if it needs to retry locking.
+ *        1 if lock acquired.
  */
-static u32 rt_wait_head_or_retry(struct qspinlock *lock)
+static u32 rt_spin_lock_or_retry(struct qspinlock *lock,
+                                struct mcs_spinlock *node)
 {
        struct __qspinlock *l = (void *)lock;
+       struct mcs_spinlock *next;
+       bool retry = false;
+       u32 tail;
 
        for (;;) {
                u16 lockpend = READ_ONCE(l->locked_pending);
@@ -187,6 +383,14 @@ static u32 rt_wait_head_or_retry(struct qspinlock *lock)
                        if (!lockpend)
                                break;
                }
+               /*
+                * We need to break out of the non-RT wait queue and do
+                * RT spinnning if we become an RT task.
+                */
+               if (rt_task_priority(current, 0)) {
+                       retry = true;
+                       goto unlock;
+               }
 
                /*
                 * 4 cpu_relax's if RT tasks present.
@@ -198,7 +402,32 @@ static u32 rt_wait_head_or_retry(struct qspinlock *lock)
                }
                cpu_relax();
        }
-       return atomic_read(&lock->val);
+
+unlock:
+       /*
+        * Remove itself from the MCS wait queue (unlock).
+        */
+       tail = rt_read_tail(node);
+       if (cmpxchg_tail_release(lock, tail, 0) == tail)
+               goto release;
+
+       /*
+        * Second case.
+        */
+       next = xchg(&node->next, NULL);
+       if (!next)
+               next = mcsq_wait_next(lock, node, NULL);
+
+       if (next)
+               WRITE_ONCE(next->locked, 1);
+
+release:
+       /*
+        * Release the node.
+        */
+       __this_cpu_dec(mcs_nodes[0].count);
+
+       return retry ? RT_RETRY : 1;
 }
 
 /*
-- 
1.8.3.1

Reply via email to