The WakeOvfIsDeferred code path in __call_rcu_nocb_wake() attempts to wake rcuog when the callback count exceeds qhimark and callbacks aren't done with their GP (newly queued or awaiting GP). However, a lot of testing proves this wake is always redundant or useless.
In the flooding case, rcuog is always waiting for a GP to finish. So waking up the rcuog thread is pointless. The timer wakeup adds overhead, rcuog simply wakes up and goes back to sleep achieving nothing. This path also adds a full memory barrier, and additional timer expiry modifications unnecessarily. The root cause is that WakeOvfIsDeferred fires when !rcu_segcblist_ready_cbs() (GP not complete), but waking rcuog cannot accelerate GP completion. This commit therefore removes this path. Tested with rcutorture scenarios: TREE01, TREE05, TREE08 (all NOCB configurations) - all pass. Also stress tested using a kernel module that floods call_rcu() to trigger the overload conditions and made the observations confirming the findings. Reviewed-by: Frederic Weisbecker <[email protected]> Signed-off-by: Joel Fernandes <[email protected]> --- kernel/rcu/tree.c | 2 +- kernel/rcu/tree.h | 3 +-- kernel/rcu/tree_nocb.h | 49 ++++++++++++++---------------------------- 3 files changed, 18 insertions(+), 36 deletions(-) diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c index 293bbd9ac3f4..2921ffb19939 100644 --- a/kernel/rcu/tree.c +++ b/kernel/rcu/tree.c @@ -3769,7 +3769,7 @@ static void rcu_barrier_entrain(struct rcu_data *rdp) } rcu_nocb_unlock(rdp); if (wake_nocb) - wake_nocb_gp(rdp, false); + wake_nocb_gp(rdp); smp_store_release(&rdp->barrier_seq_snap, gseq); } diff --git a/kernel/rcu/tree.h b/kernel/rcu/tree.h index 2265b9c2906e..7dfc57e9adb1 100644 --- a/kernel/rcu/tree.h +++ b/kernel/rcu/tree.h @@ -301,7 +301,6 @@ struct rcu_data { #define RCU_NOCB_WAKE_BYPASS 1 #define RCU_NOCB_WAKE_LAZY 2 #define RCU_NOCB_WAKE 3 -#define RCU_NOCB_WAKE_FORCE 4 #define RCU_JIFFIES_TILL_FORCE_QS (1 + (HZ > 250) + (HZ > 500)) /* For jiffies_till_first_fqs and */ @@ -500,7 +499,7 @@ static void zero_cpu_stall_ticks(struct rcu_data *rdp); static struct swait_queue_head *rcu_nocb_gp_get(struct rcu_node *rnp); static void rcu_nocb_gp_cleanup(struct swait_queue_head *sq); static void rcu_init_one_nocb(struct rcu_node *rnp); -static bool wake_nocb_gp(struct rcu_data *rdp, bool force); +static bool wake_nocb_gp(struct rcu_data *rdp); static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp, unsigned long j, bool lazy); static void call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *head, diff --git a/kernel/rcu/tree_nocb.h b/kernel/rcu/tree_nocb.h index e6cd56603cad..f525e4f7985b 100644 --- a/kernel/rcu/tree_nocb.h +++ b/kernel/rcu/tree_nocb.h @@ -192,7 +192,7 @@ static void rcu_init_one_nocb(struct rcu_node *rnp) static bool __wake_nocb_gp(struct rcu_data *rdp_gp, struct rcu_data *rdp, - bool force, unsigned long flags) + unsigned long flags) __releases(rdp_gp->nocb_gp_lock) { bool needwake = false; @@ -209,7 +209,7 @@ static bool __wake_nocb_gp(struct rcu_data *rdp_gp, timer_delete(&rdp_gp->nocb_timer); } - if (force || READ_ONCE(rdp_gp->nocb_gp_sleep)) { + if (READ_ONCE(rdp_gp->nocb_gp_sleep)) { WRITE_ONCE(rdp_gp->nocb_gp_sleep, false); needwake = true; } @@ -225,13 +225,13 @@ static bool __wake_nocb_gp(struct rcu_data *rdp_gp, /* * Kick the GP kthread for this NOCB group. */ -static bool wake_nocb_gp(struct rcu_data *rdp, bool force) +static bool wake_nocb_gp(struct rcu_data *rdp) { unsigned long flags; struct rcu_data *rdp_gp = rdp->nocb_gp_rdp; raw_spin_lock_irqsave(&rdp_gp->nocb_gp_lock, flags); - return __wake_nocb_gp(rdp_gp, rdp, force, flags); + return __wake_nocb_gp(rdp_gp, rdp, flags); } #ifdef CONFIG_RCU_LAZY @@ -518,10 +518,8 @@ static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp, } /* - * Awaken the no-CBs grace-period kthread if needed, either due to it - * legitimately being asleep or due to overload conditions. - * - * If warranted, also wake up the kthread servicing this CPUs queues. + * Awaken the no-CBs grace-period kthread if needed due to it legitimately + * being asleep. */ static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_alldone, unsigned long flags) @@ -533,7 +531,6 @@ static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_alldone, long lazy_len; long len; struct task_struct *t; - struct rcu_data *rdp_gp = rdp->nocb_gp_rdp; // If we are being polled or there is no kthread, just leave. t = READ_ONCE(rdp->nocb_gp_kthread); @@ -549,22 +546,22 @@ static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_alldone, lazy_len = READ_ONCE(rdp->lazy_len); if (was_alldone) { rdp->qlen_last_fqs_check = len; + rcu_nocb_unlock(rdp); // Only lazy CBs in bypass list if (lazy_len && bypass_len == lazy_len) { - rcu_nocb_unlock(rdp); wake_nocb_gp_defer(rdp, RCU_NOCB_WAKE_LAZY, TPS("WakeLazy")); } else if (!irqs_disabled_flags(flags)) { /* ... if queue was empty ... */ - rcu_nocb_unlock(rdp); - wake_nocb_gp(rdp, false); + wake_nocb_gp(rdp); trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WakeEmpty")); } else { - rcu_nocb_unlock(rdp); wake_nocb_gp_defer(rdp, RCU_NOCB_WAKE, TPS("WakeEmptyIsDeferred")); } + + return; } else if (len > rdp->qlen_last_fqs_check + qhimark) { /* ... or if many callbacks queued. */ rdp->qlen_last_fqs_check = len; @@ -575,21 +572,10 @@ static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_alldone, rcu_advance_cbs_nowake(rdp->mynode, rdp); rdp->nocb_gp_adv_time = j; } - smp_mb(); /* Enqueue before timer_pending(). */ - if ((rdp->nocb_cb_sleep || - !rcu_segcblist_ready_cbs(&rdp->cblist)) && - !timer_pending(&rdp_gp->nocb_timer)) { - rcu_nocb_unlock(rdp); - wake_nocb_gp_defer(rdp, RCU_NOCB_WAKE_FORCE, - TPS("WakeOvfIsDeferred")); - } else { - rcu_nocb_unlock(rdp); - trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WakeNot")); - } - } else { - rcu_nocb_unlock(rdp); - trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WakeNot")); } + + rcu_nocb_unlock(rdp); + trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WakeNot")); } static void call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *head, @@ -966,7 +952,6 @@ static bool do_nocb_deferred_wakeup_common(struct rcu_data *rdp_gp, unsigned long flags) __releases(rdp_gp->nocb_gp_lock) { - int ndw; int ret; if (!rcu_nocb_need_deferred_wakeup(rdp_gp, level)) { @@ -974,8 +959,7 @@ static bool do_nocb_deferred_wakeup_common(struct rcu_data *rdp_gp, return false; } - ndw = rdp_gp->nocb_defer_wakeup; - ret = __wake_nocb_gp(rdp_gp, rdp, ndw == RCU_NOCB_WAKE_FORCE, flags); + ret = __wake_nocb_gp(rdp_gp, rdp, flags); trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("DeferredWake")); return ret; @@ -991,7 +975,6 @@ static void do_nocb_deferred_wakeup_timer(struct timer_list *t) trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("Timer")); raw_spin_lock_irqsave(&rdp->nocb_gp_lock, flags); - smp_mb__after_spinlock(); /* Timer expire before wakeup. */ do_nocb_deferred_wakeup_common(rdp, rdp, RCU_NOCB_WAKE_BYPASS, flags); } @@ -1272,7 +1255,7 @@ lazy_rcu_shrink_scan(struct shrinker *shrink, struct shrink_control *sc) } rcu_nocb_try_flush_bypass(rdp, jiffies); rcu_nocb_unlock_irqrestore(rdp, flags); - wake_nocb_gp(rdp, false); + wake_nocb_gp(rdp); sc->nr_to_scan -= _count; count += _count; if (sc->nr_to_scan <= 0) @@ -1657,7 +1640,7 @@ static void rcu_init_one_nocb(struct rcu_node *rnp) { } -static bool wake_nocb_gp(struct rcu_data *rdp, bool force) +static bool wake_nocb_gp(struct rcu_data *rdp) { return false; } -- 2.34.1

