We already have sched::thread::pin(cpu*) to migrate the *current* thread to a given CPU and pin it there. This patch adds a similar function, pin(thread*, cpu*), to migrate any thread, not just the calling one.
This pin() variant is necessary for a complete implementation of pthread_setaffinity_np(), for example (see issue #520). The thread being migrated can of course be running, runnable, or sleeping, on any of the CPUs, and the code needs to handle all these cases correctly. The code also needs to handle the case of a thread being temporarily under migrate_disable(): such a thread must not be migrated, as code surrounded by migrate_disable() cannot handle being migrated in the middle. So pin() will sleep a bit, in a loop, until the thread becomes migratable again. Re-pinning of an already pin()ed thread is supported, though. Signed-off-by: Nadav Har'El <[email protected]> --- include/osv/sched.hh | 20 ++++++++++ core/sched.cc | 111 +++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 131 insertions(+) diff --git a/include/osv/sched.hh b/include/osv/sched.hh index e79d022..2ff1d00 100644 --- a/include/osv/sched.hh +++ b/include/osv/sched.hh @@ -464,6 +464,26 @@ public: * thread (it cannot operate on an arbitrary thread). */ static void pin(cpu *target_cpu); + /** + * Pin the given thread to the target CPU. + * + * This will migrate the given thread to the target CPU. After this + * function returns, the thread is guaranteed to only run on the target + * CPU. + * + * A thread cannot be migrated while holding a migrate_disable() lock, + * so pin() waits until it is released. However, re-pinning an already + * pinned thread (which is not additionally holding a migrate_disable()) + * is allowed. + * + * Currently, calling this function on a dead or dying thread may cause + * a crash. + * + * For historic reasons (the previous existance of the pin(cpu*) static + * method), this is a static function taking the thread as a parameter. + */ + static void pin(thread *t, cpu *target_cpu); + #ifdef __OSV_CORE__ static inline thread* current() { return s_current; }; #else diff --git a/core/sched.cc b/core/sched.cc index 344b188..bf6e935 100644 --- a/core/sched.cc +++ b/core/sched.cc @@ -479,8 +479,13 @@ unsigned cpu::load() return runqueue.size(); } +// function to pin the *current* thread: void thread::pin(cpu *target_cpu) { + // Note that this code may proceed to migrate the current thread even if + // it was protected by a migrate_disable(). It is the thread's own fault + // for doing this to itself... The function to pin a different thread + // (below) waits for that different thread to leave migrate_disable(). thread &t = *current(); if (!t._pinned) { // _pinned comes with a +1 increase to _migration_counter. @@ -520,6 +525,112 @@ void thread::pin(cpu *target_cpu) // wakeme will be implicitly join()ed here. } +// function to pin another thread: +void thread::pin(thread *t, cpu *target_cpu) +{ + if (t == current()) { + thread::pin(target_cpu); + return; + } + // To work on the target thread, we need to run code on the same CPU on + // where the target thread is currently running. We start here a new + // helper thread to follow the target thread's CPU. We could have also + // re-used an existing thread (e.g., the load balancer thread). + thread helper([&] { + WITH_LOCK(irq_lock) { + // This thread started on the same CPU as t, but by now t might + // have moved. If that happened, we need to move too. + while (sched::cpu::current() != t->tcpu()) { + DROP_LOCK(irq_lock) { + thread::pin(t->tcpu()); + } + } + // At this point, t is not running and it belongs to this CPU, and + // we hold the irq lock, so we can mess with t's data structures. + if (t->_pinned) { + // The thread was already pin()ed, explaining 1 on + // _migration_lock_counter. Remove this pinning, so the code + // below can pin it again and not think a temporary + // migration_disable() is in force. + t->_migration_lock_counter--; + } + if (t->tcpu() == target_cpu) { + t->_migration_lock_counter++; + t->_pinned = true; + return; + } + // The target thread might be temporarily holding a migration lock + // and we must not migrate it in the middle of this. Currently we + // sleep a bit and retry, I don't know if there's a better way. + while(t->_migration_lock_counter) { + t->_migration_lock_counter++; + DROP_LOCK(irq_lock) { + debug("sched::thread::pin() retrying\n"); + // we drop the irq lock but still hold migration lock on t + // and also the helper thread is pinned, so when we get + // the irq lock back, they will still be on same CPU. + sched::thread::sleep(std::chrono::milliseconds(1)); + } + t->_migration_lock_counter--; + } + t->_migration_lock_counter = 1; + t->_pinned = true; + // Racing with another CPU doing t->wake() is a complication. + // The biggest risk is that t will be woken up on the new (target) + // CPU, but read old values for some of its variables. Let's avoid + // this risk by pretending that the thread is already waking up. + // After this pretense we will have to really wake it up at the + // end (if not, we may lose a real wakeup!). That may be a + // spurious wakeup, but spurious wakeups are fine. + // Importantly, if the thread was already woken on this CPU before + // we moved it and woken it again, handle_incoming_wakeups() on + // this CPU will notice it doesn't belong to it and ignore it. + if (t->_detached_state->st.load(std::memory_order_relaxed) + == status::waiting) { + t->_detached_state->st.store(status::waking); + } + switch (t->_detached_state->st.load(std::memory_order_relaxed)) { + case status::prestarted: + case status::sending_lock: + case status::waking: + trace_sched_migrate(t, target_cpu->id); + t->stat_migrations.incr(); + t->suspend_timers(); + t->_runtime.export_runtime(); + t->_detached_state->_cpu = target_cpu; + t->remote_thread_local_var(::percpu_base) = target_cpu->percpu_base; + t->remote_thread_local_var(current_cpu) = target_cpu; + // May be a spurious wakeup, but that doesn't matter (see + // comment above). + if (t->_detached_state->st.load(std::memory_order_relaxed) == status::waking) { + t->_detached_state->st.store(status::waiting); + t->wake(); + } + break; + case status::queued: + current_cpu->runqueue.erase(current_cpu->runqueue.iterator_to(*t)); + trace_sched_migrate(t, target_cpu->id); + t->stat_migrations.incr(); + t->suspend_timers(); + t->_runtime.export_runtime(); + t->_detached_state->_cpu = target_cpu; + t->remote_thread_local_var(::percpu_base) = target_cpu->percpu_base; + t->remote_thread_local_var(current_cpu) = target_cpu; + // pretend the thread was waiting, so we can wake it + t->_detached_state->st.store(status::waiting); + t->wake(); + break; + default: + // Thread is in an unexpected state (for example, already + // terminated, or not started), and cannot be moved. + return; + } + } + }, sched::thread::attr().pin(t->tcpu())); + helper.start(); + helper.join(); +} + void cpu::load_balance() { notifier::fire(); -- 2.5.5 -- You received this message because you are subscribed to the Google Groups "OSv Development" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. For more options, visit https://groups.google.com/d/optout.
