The WakeOvfIsDeferred code path in __call_rcu_nocb_wake() attempts to
wake rcuog when the callback count exceeds qhimark and callbacks aren't
done with their GP (newly queued or awaiting GP). However, a lot of
testing proves this wake is always redundant or useless.

In the flooding case, rcuog is always waiting for a GP to finish. So
waking up the rcuog thread is pointless. The timer wakeup adds overhead,
rcuog simply wakes up and goes back to sleep achieving nothing.

This path also adds a full memory barrier, and additional timer expiry
modifications unnecessarily.

The root cause is that WakeOvfIsDeferred fires when
!rcu_segcblist_ready_cbs() (GP not complete), but waking rcuog cannot
accelerate GP completion.

This commit therefore removes this path.

Tested with rcutorture scenarios: TREE01, TREE05, TREE08 (all NOCB
configurations) - all pass. Also stress tested using a kernel module
that floods call_rcu() to trigger the overload conditions and made the
observations confirming the findings.

Reviewed-by: Frederic Weisbecker <[email protected]>
Signed-off-by: Joel Fernandes <[email protected]>
---
 kernel/rcu/tree.c      |  2 +-
 kernel/rcu/tree.h      |  3 +--
 kernel/rcu/tree_nocb.h | 49 ++++++++++++++----------------------------
 3 files changed, 18 insertions(+), 36 deletions(-)

diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
index 293bbd9ac3f4..2921ffb19939 100644
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -3769,7 +3769,7 @@ static void rcu_barrier_entrain(struct rcu_data *rdp)
        }
        rcu_nocb_unlock(rdp);
        if (wake_nocb)
-               wake_nocb_gp(rdp, false);
+               wake_nocb_gp(rdp);
        smp_store_release(&rdp->barrier_seq_snap, gseq);
 }
 
diff --git a/kernel/rcu/tree.h b/kernel/rcu/tree.h
index 2265b9c2906e..7dfc57e9adb1 100644
--- a/kernel/rcu/tree.h
+++ b/kernel/rcu/tree.h
@@ -301,7 +301,6 @@ struct rcu_data {
 #define RCU_NOCB_WAKE_BYPASS   1
 #define RCU_NOCB_WAKE_LAZY     2
 #define RCU_NOCB_WAKE          3
-#define RCU_NOCB_WAKE_FORCE    4
 
 #define RCU_JIFFIES_TILL_FORCE_QS (1 + (HZ > 250) + (HZ > 500))
                                        /* For jiffies_till_first_fqs and */
@@ -500,7 +499,7 @@ static void zero_cpu_stall_ticks(struct rcu_data *rdp);
 static struct swait_queue_head *rcu_nocb_gp_get(struct rcu_node *rnp);
 static void rcu_nocb_gp_cleanup(struct swait_queue_head *sq);
 static void rcu_init_one_nocb(struct rcu_node *rnp);
-static bool wake_nocb_gp(struct rcu_data *rdp, bool force);
+static bool wake_nocb_gp(struct rcu_data *rdp);
 static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
                                  unsigned long j, bool lazy);
 static void call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *head,
diff --git a/kernel/rcu/tree_nocb.h b/kernel/rcu/tree_nocb.h
index e6cd56603cad..f525e4f7985b 100644
--- a/kernel/rcu/tree_nocb.h
+++ b/kernel/rcu/tree_nocb.h
@@ -192,7 +192,7 @@ static void rcu_init_one_nocb(struct rcu_node *rnp)
 
 static bool __wake_nocb_gp(struct rcu_data *rdp_gp,
                           struct rcu_data *rdp,
-                          bool force, unsigned long flags)
+                          unsigned long flags)
        __releases(rdp_gp->nocb_gp_lock)
 {
        bool needwake = false;
@@ -209,7 +209,7 @@ static bool __wake_nocb_gp(struct rcu_data *rdp_gp,
                timer_delete(&rdp_gp->nocb_timer);
        }
 
-       if (force || READ_ONCE(rdp_gp->nocb_gp_sleep)) {
+       if (READ_ONCE(rdp_gp->nocb_gp_sleep)) {
                WRITE_ONCE(rdp_gp->nocb_gp_sleep, false);
                needwake = true;
        }
@@ -225,13 +225,13 @@ static bool __wake_nocb_gp(struct rcu_data *rdp_gp,
 /*
  * Kick the GP kthread for this NOCB group.
  */
-static bool wake_nocb_gp(struct rcu_data *rdp, bool force)
+static bool wake_nocb_gp(struct rcu_data *rdp)
 {
        unsigned long flags;
        struct rcu_data *rdp_gp = rdp->nocb_gp_rdp;
 
        raw_spin_lock_irqsave(&rdp_gp->nocb_gp_lock, flags);
-       return __wake_nocb_gp(rdp_gp, rdp, force, flags);
+       return __wake_nocb_gp(rdp_gp, rdp, flags);
 }
 
 #ifdef CONFIG_RCU_LAZY
@@ -518,10 +518,8 @@ static bool rcu_nocb_try_bypass(struct rcu_data *rdp, 
struct rcu_head *rhp,
 }
 
 /*
- * Awaken the no-CBs grace-period kthread if needed, either due to it
- * legitimately being asleep or due to overload conditions.
- *
- * If warranted, also wake up the kthread servicing this CPUs queues.
+ * Awaken the no-CBs grace-period kthread if needed due to it legitimately
+ * being asleep.
  */
 static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_alldone,
                                 unsigned long flags)
@@ -533,7 +531,6 @@ static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool 
was_alldone,
        long lazy_len;
        long len;
        struct task_struct *t;
-       struct rcu_data *rdp_gp = rdp->nocb_gp_rdp;
 
        // If we are being polled or there is no kthread, just leave.
        t = READ_ONCE(rdp->nocb_gp_kthread);
@@ -549,22 +546,22 @@ static void __call_rcu_nocb_wake(struct rcu_data *rdp, 
bool was_alldone,
        lazy_len = READ_ONCE(rdp->lazy_len);
        if (was_alldone) {
                rdp->qlen_last_fqs_check = len;
+               rcu_nocb_unlock(rdp);
                // Only lazy CBs in bypass list
                if (lazy_len && bypass_len == lazy_len) {
-                       rcu_nocb_unlock(rdp);
                        wake_nocb_gp_defer(rdp, RCU_NOCB_WAKE_LAZY,
                                           TPS("WakeLazy"));
                } else if (!irqs_disabled_flags(flags)) {
                        /* ... if queue was empty ... */
-                       rcu_nocb_unlock(rdp);
-                       wake_nocb_gp(rdp, false);
+                       wake_nocb_gp(rdp);
                        trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
                                            TPS("WakeEmpty"));
                } else {
-                       rcu_nocb_unlock(rdp);
                        wake_nocb_gp_defer(rdp, RCU_NOCB_WAKE,
                                           TPS("WakeEmptyIsDeferred"));
                }
+
+               return;
        } else if (len > rdp->qlen_last_fqs_check + qhimark) {
                /* ... or if many callbacks queued. */
                rdp->qlen_last_fqs_check = len;
@@ -575,21 +572,10 @@ static void __call_rcu_nocb_wake(struct rcu_data *rdp, 
bool was_alldone,
                        rcu_advance_cbs_nowake(rdp->mynode, rdp);
                        rdp->nocb_gp_adv_time = j;
                }
-               smp_mb(); /* Enqueue before timer_pending(). */
-               if ((rdp->nocb_cb_sleep ||
-                    !rcu_segcblist_ready_cbs(&rdp->cblist)) &&
-                   !timer_pending(&rdp_gp->nocb_timer)) {
-                       rcu_nocb_unlock(rdp);
-                       wake_nocb_gp_defer(rdp, RCU_NOCB_WAKE_FORCE,
-                                          TPS("WakeOvfIsDeferred"));
-               } else {
-                       rcu_nocb_unlock(rdp);
-                       trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, 
TPS("WakeNot"));
-               }
-       } else {
-               rcu_nocb_unlock(rdp);
-               trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WakeNot"));
        }
+
+       rcu_nocb_unlock(rdp);
+       trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WakeNot"));
 }
 
 static void call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *head,
@@ -966,7 +952,6 @@ static bool do_nocb_deferred_wakeup_common(struct rcu_data 
*rdp_gp,
                                           unsigned long flags)
        __releases(rdp_gp->nocb_gp_lock)
 {
-       int ndw;
        int ret;
 
        if (!rcu_nocb_need_deferred_wakeup(rdp_gp, level)) {
@@ -974,8 +959,7 @@ static bool do_nocb_deferred_wakeup_common(struct rcu_data 
*rdp_gp,
                return false;
        }
 
-       ndw = rdp_gp->nocb_defer_wakeup;
-       ret = __wake_nocb_gp(rdp_gp, rdp, ndw == RCU_NOCB_WAKE_FORCE, flags);
+       ret = __wake_nocb_gp(rdp_gp, rdp, flags);
        trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("DeferredWake"));
 
        return ret;
@@ -991,7 +975,6 @@ static void do_nocb_deferred_wakeup_timer(struct timer_list 
*t)
        trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("Timer"));
 
        raw_spin_lock_irqsave(&rdp->nocb_gp_lock, flags);
-       smp_mb__after_spinlock(); /* Timer expire before wakeup. */
        do_nocb_deferred_wakeup_common(rdp, rdp, RCU_NOCB_WAKE_BYPASS, flags);
 }
 
@@ -1272,7 +1255,7 @@ lazy_rcu_shrink_scan(struct shrinker *shrink, struct 
shrink_control *sc)
                }
                rcu_nocb_try_flush_bypass(rdp, jiffies);
                rcu_nocb_unlock_irqrestore(rdp, flags);
-               wake_nocb_gp(rdp, false);
+               wake_nocb_gp(rdp);
                sc->nr_to_scan -= _count;
                count += _count;
                if (sc->nr_to_scan <= 0)
@@ -1657,7 +1640,7 @@ static void rcu_init_one_nocb(struct rcu_node *rnp)
 {
 }
 
-static bool wake_nocb_gp(struct rcu_data *rdp, bool force)
+static bool wake_nocb_gp(struct rcu_data *rdp)
 {
        return false;
 }
-- 
2.34.1


Reply via email to