There's a number of 'interesting' problems with FUTEX_UNLOCK_PI, all
caused by holding hb->lock while doing the rt_mutex_unlock()
equivalient.

This patch doesn't attempt to fix any of the actual problems, but
instead reworks the code to not hold hb->lock across the unlock,
paving the way to actually fix the problems later.

The current reason we hold hb->lock over unlock is that it serializes
against FUTEX_LOCK_PI and avoids new waiters from coming in, this then
ensures the rt_mutex_next_owner() value is stable and can be written
into the user-space futex value before doing the unlock. Such that the
unlock will indeed end up at new_owner.

This patch recognises that holding rt_mutex::wait_lock results in the
very same guarantee, no new waiters can come in while we hold that
lock -- after all, waiters would need this lock to queue themselves.

It therefore restructures the code to keep rt_mutex::wait_lock held.

This (of course) is not entirely straight forward either, see the
comment in rt_mutex_slowunlock(), doing the unlock itself might drop
wait_lock, letting new waiters in. To cure this
rt_mutex_futex_unlock() becomes a variant of rt_mutex_slowunlock()
that return -EAGAIN instead. This ensures the FUTEX_UNLOCK_PI code
aborts and restarts the entire operation.

Signed-off-by: Peter Zijlstra (Intel) <pet...@infradead.org>
---
 kernel/futex.c                  |   63 +++++++++++++++++--------------
 kernel/locking/rtmutex.c        |   81 ++++++++++++++++++++++++++++++++++++----
 kernel/locking/rtmutex_common.h |    4 -
 3 files changed, 110 insertions(+), 38 deletions(-)

--- a/kernel/futex.c
+++ b/kernel/futex.c
@@ -1294,24 +1294,21 @@ static void mark_wake_futex(struct wake_
 static int wake_futex_pi(u32 __user *uaddr, u32 uval, struct futex_q 
*top_waiter,
                         struct futex_hash_bucket *hb)
 {
-       struct task_struct *new_owner;
        struct futex_pi_state *pi_state = top_waiter->pi_state;
        u32 uninitialized_var(curval), newval;
+       struct task_struct *new_owner;
        WAKE_Q(wake_q);
-       bool deboost;
        int ret = 0;
 
-       if (!pi_state)
-               return -EINVAL;
+       raw_spin_lock_irq(&pi_state->pi_mutex.wait_lock);
 
+       WARN_ON_ONCE(!atomic_inc_not_zero(&pi_state->refcount));
        /*
-        * If current does not own the pi_state then the futex is
-        * inconsistent and user space fiddled with the futex value.
+        * Now that we hold wait_lock, no new waiters can happen on the
+        * rt_mutex and new owner is stable. Drop hb->lock.
         */
-       if (pi_state->owner != current)
-               return -EINVAL;
+       spin_unlock(&hb->lock);
 
-       raw_spin_lock_irq(&pi_state->pi_mutex.wait_lock);
        new_owner = rt_mutex_next_owner(&pi_state->pi_mutex);
 
        /*
@@ -1334,6 +1331,7 @@ static int wake_futex_pi(u32 __user *uad
 
        if (cmpxchg_futex_value_locked(&curval, uaddr, uval, newval)) {
                ret = -EFAULT;
+
        } else if (curval != uval) {
                /*
                 * If a unconditional UNLOCK_PI operation (user space did not
@@ -1346,10 +1344,9 @@ static int wake_futex_pi(u32 __user *uad
                else
                        ret = -EINVAL;
        }
-       if (ret) {
-               raw_spin_unlock_irq(&pi_state->pi_mutex.wait_lock);
-               return ret;
-       }
+
+       if (ret)
+               goto out_unlock;
 
        raw_spin_lock(&pi_state->owner->pi_lock);
        WARN_ON(list_empty(&pi_state->list));
@@ -1362,22 +1359,20 @@ static int wake_futex_pi(u32 __user *uad
        pi_state->owner = new_owner;
        raw_spin_unlock(&new_owner->pi_lock);
 
-       raw_spin_unlock_irq(&pi_state->pi_mutex.wait_lock);
+       ret = rt_mutex_futex_unlock(&pi_state->pi_mutex, &wake_q);
 
-       deboost = rt_mutex_futex_unlock(&pi_state->pi_mutex, &wake_q);
+out_unlock:
+       raw_spin_unlock_irq(&pi_state->pi_mutex.wait_lock);
 
-       /*
-        * First unlock HB so the waiter does not spin on it once he got woken
-        * up. Second wake up the waiter before the priority is adjusted. If we
-        * deboost first (and lose our higher priority), then the task might get
-        * scheduled away before the wake up can take place.
-        */
-       spin_unlock(&hb->lock);
-       wake_up_q(&wake_q);
-       if (deboost)
+       if (ret > 0) {
+               wake_up_q(&wake_q);
                rt_mutex_adjust_prio(current);
+               ret = 0;
+       }
 
-       return 0;
+       put_pi_state(pi_state);
+
+       return ret;
 }
 
 /*
@@ -2656,6 +2651,20 @@ static int futex_unlock_pi(u32 __user *u
         */
        top_waiter = futex_top_waiter(hb, &key);
        if (top_waiter) {
+
+               if (!top_waiter->pi_state)
+                       goto out_unlock;
+
+               /*
+                * If current does not own the pi_state then the futex is
+                * inconsistent and user space fiddled with the futex value.
+                */
+               if (top_waiter->pi_state->owner != current)
+                       goto out_unlock;
+
+               /*
+                * wait_futex_pi() _will_ drop hb->lock
+                */
                ret = wake_futex_pi(uaddr, uval, top_waiter, hb);
                /*
                 * In case of success wake_futex_pi dropped the hash
@@ -2674,7 +2683,6 @@ static int futex_unlock_pi(u32 __user *u
                 * setting the FUTEX_WAITERS bit. Try again.
                 */
                if (ret == -EAGAIN) {
-                       spin_unlock(&hb->lock);
                        put_futex_key(&key);
                        goto retry;
                }
@@ -2682,7 +2690,7 @@ static int futex_unlock_pi(u32 __user *u
                 * wake_futex_pi has detected invalid state. Tell user
                 * space.
                 */
-               goto out_unlock;
+               goto out_putkey;
        }
 
        /*
@@ -2707,7 +2715,6 @@ static int futex_unlock_pi(u32 __user *u
        return ret;
 
 pi_faulted:
-       spin_unlock(&hb->lock);
        put_futex_key(&key);
 
        ret = fault_in_user_writeable(uaddr);
--- a/kernel/locking/rtmutex.c
+++ b/kernel/locking/rtmutex.c
@@ -1489,19 +1489,84 @@ void __sched rt_mutex_unlock(struct rt_m
 EXPORT_SYMBOL_GPL(rt_mutex_unlock);
 
 /**
- * rt_mutex_futex_unlock - Futex variant of rt_mutex_unlock
+ * rt_mutex_futex_unlock - Futex variant of rt_mutex_slowunlock
  * @lock: the rt_mutex to be unlocked
+ * @wqh: wake queue
  *
- * Returns: true/false indicating whether priority adjustment is
- * required or not.
+ * This cannot just be rt_mutex_slowunlock() since that does the wait_lock
+ * dance, and the futex code is tricky (ha!) and uses the wait_lock
+ * to hold off new waiters, so dropping this lock invalidates prior state
+ * and we need to redo all of it.
+ *
+ * Returns:
+ *  -EAGAIN: if we need to retry the whole operation;
+ *        1: if we need to wake and deboost;
+ *        0: if we're done.
  */
-bool __sched rt_mutex_futex_unlock(struct rt_mutex *lock,
-                                  struct wake_q_head *wqh)
+int __sched rt_mutex_futex_unlock(struct rt_mutex *lock,
+                                 struct wake_q_head *wake_q)
 {
-       if (likely(rt_mutex_cmpxchg_release(lock, current, NULL)))
-               return false;
+       lockdep_assert_held(&lock->wait_lock);
+
+       debug_rt_mutex_unlock(lock);
+
+       /*
+        * We must be careful here if the fast path is enabled. If we
+        * have no waiters queued we cannot set owner to NULL here
+        * because of:
+        *
+        * foo->lock->owner = NULL;
+        *                      rtmutex_lock(foo->lock);   <- fast path
+        *                      free = atomic_dec_and_test(foo->refcnt);
+        *                      rtmutex_unlock(foo->lock); <- fast path
+        *                      if (free)
+        *                              kfree(foo);
+        * raw_spin_unlock(foo->lock->wait_lock);
+        *
+        * So for the fastpath enabled kernel:
+        *
+        * Nothing can set the waiters bit as long as we hold
+        * lock->wait_lock. So we do the following sequence:
+        *
+        *      owner = rt_mutex_owner(lock);
+        *      clear_rt_mutex_waiters(lock);
+        *      raw_spin_unlock(&lock->wait_lock);
+        *      if (cmpxchg(&lock->owner, owner, 0) == owner)
+        *              return;
+        *      goto retry;
+        *
+        * The fastpath disabled variant is simple as all access to
+        * lock->owner is serialized by lock->wait_lock:
+        *
+        *      lock->owner = NULL;
+        *      raw_spin_unlock(&lock->wait_lock);
+        */
+       if (!rt_mutex_has_waiters(lock)) {
+               unsigned long flags;
+               int ret = 0;
+
+               local_save_flags(flags);
+
+               /* Drops lock->wait_lock ! */
+               if (unlock_rt_mutex_safe(lock, flags))
+                       ret = -EAGAIN;
+
+               /* Relock the rtmutex and try again */
+               raw_spin_lock_irq(&lock->wait_lock);
+
+               return ret;
+       }
+
+       /*
+        * The wakeup next waiter path does not suffer from the above
+        * race. See the comments there.
+        *
+        * Queue the next waiter for wakeup once we release the wait_lock.
+        */
+       mark_wakeup_next_waiter(wake_q, lock);
 
-       return rt_mutex_slowunlock(lock, wqh);
+       /* Do wakeups and deboost. */
+       return 1;
 }
 
 /**
--- a/kernel/locking/rtmutex_common.h
+++ b/kernel/locking/rtmutex_common.h
@@ -109,8 +109,8 @@ extern int rt_mutex_finish_proxy_lock(st
                                      struct hrtimer_sleeper *to,
                                      struct rt_mutex_waiter *waiter);
 extern int rt_mutex_timed_futex_lock(struct rt_mutex *l, struct 
hrtimer_sleeper *to);
-extern bool rt_mutex_futex_unlock(struct rt_mutex *lock,
-                                 struct wake_q_head *wqh);
+extern int rt_mutex_futex_unlock(struct rt_mutex *lock,
+                                struct wake_q_head *wqh);
 extern void rt_mutex_adjust_prio(struct task_struct *task);
 
 #ifdef CONFIG_DEBUG_RT_MUTEXES


Reply via email to