wait_event()/wake_up_all() in stop_work_alloc/stop_work_free logic
is very suboptimal because of non-exclusive wakeups. So we add the
wait_queue_func_t alloc_wake() helper which wakes the waiter up only
a) if it actually waits for a stop_work in the "freed" cpumask, and
b) only after we already set ->stop_owner = waiter.

So if 2 stop_machine()'s race with each other, the loser will likely
call schedule() only once and we will have a single wakeup.

TODO: we can optimize (and simplify!) this code more. We can remove
stop_work_alloc_one() and fold it into stop_work_alloc(), so that
prepare_to_wait() will be the outer loop. Lets do this later.

TODO: the init_waitqueue_func_entry() code in stop_work_alloc_one()
is really annoying, we need the trivial new __init_wait(wait, func)
helper.

Signed-off-by: Oleg Nesterov <[email protected]>
---
 kernel/stop_machine.c |   47 +++++++++++++++++++++++++++++++++++++++++++----
 1 files changed, 43 insertions(+), 4 deletions(-)

diff --git a/kernel/stop_machine.c b/kernel/stop_machine.c
index 572abc9..bbfc670 100644
--- a/kernel/stop_machine.c
+++ b/kernel/stop_machine.c
@@ -63,21 +63,60 @@ static void stop_work_free(const struct cpumask *cpumask)
 
        for_each_cpu(cpu, cpumask)
                stop_work_free_one(cpu);
-       wake_up_all(&stop_work_wq);
+       __wake_up(&stop_work_wq, TASK_ALL, 0, (void *)cpumask);
+}
+
+struct alloc_wait {
+       wait_queue_t    wait;
+       int             cpu;
+};
+
+static int alloc_wake(wait_queue_t *wait, unsigned int mode, int sync, void 
*key)
+{
+       struct alloc_wait *aw = container_of(wait, struct alloc_wait, wait);
+       struct cpu_stopper *stopper = &per_cpu(cpu_stopper, aw->cpu);
+       const struct cpumask *cpumask = key;
+
+       if (!cpumask_test_cpu(aw->cpu, cpumask))
+               return 0;
+       if (cmpxchg(&stopper->stop_owner, NULL, aw->wait.private) != NULL)
+               return 0;
+
+       return autoremove_wake_function(wait, mode, sync, key);
 }
 
 static struct cpu_stop_work *stop_work_alloc_one(int cpu, bool wait)
 {
        struct cpu_stopper *stopper = &per_cpu(cpu_stopper, cpu);
+       struct task_struct *me = current;
+       struct alloc_wait aw;
 
-       if (cmpxchg(&stopper->stop_owner, NULL, current) == NULL)
+       if (cmpxchg(&stopper->stop_owner, NULL, me) == NULL)
                goto done;
 
        if (!wait)
                return NULL;
 
-       __wait_event(stop_work_wq,
-               cmpxchg(&stopper->stop_owner, NULL, current) == NULL);
+       /* TODO: add __init_wait(wait, func) helper! */
+       INIT_LIST_HEAD(&aw.wait.task_list);
+       init_waitqueue_func_entry(&aw.wait, alloc_wake);
+       aw.wait.private = me;
+       aw.cpu = cpu;
+       for (;;) {
+               prepare_to_wait(&stop_work_wq, &aw.wait, TASK_UNINTERRUPTIBLE);
+               /*
+                * This can "falsely" fail if we race with alloc_wake() and
+                * stopper->stop_owner is already me, in this case schedule()
+                * won't block and the check below will notice this change.
+                */
+               if (cmpxchg(&stopper->stop_owner, NULL, me) == NULL)
+                       break;
+
+               schedule();
+               if (likely(stopper->stop_owner == me))
+                       break;
+       }
+       finish_wait(&stop_work_wq, &aw.wait);
 done:
        return &stopper->stop_work;
 }
-- 
1.5.5.1

--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to [email protected]
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Please read the FAQ at  http://www.tux.org/lkml/

Reply via email to