Currently, rcu_normal_wake_from_gp is only enabled by default on small systems(<= 16 CPUs) or when a user explicitly set it enabled.
This patch introduces an adaptive latching mechanism: * Tracks the number of in-flight synchronize_rcu() requests using a new atomic_t counter(rcu_sr_normal_count); * If the count exceeds RCU_SR_NORMAL_LATCH_THR(64), it sets the rcu_sr_normal_latched, reverting new requests onto the scaled wait_rcu_gp() path; * The latch is cleared only when the pending requests are fully drained(nr == 0); * Enables rcu_normal_wake_from_gp by default for all systems, relying on this dynamic throttling instead of static CPU limits. Suggested-by: Joel Fernandes <[email protected]> Signed-off-by: Uladzislau Rezki (Sony) <[email protected]> --- kernel/rcu/tree.c | 37 ++++++++++++++++++++++++++----------- 1 file changed, 26 insertions(+), 11 deletions(-) diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c index 293bbd9ac3f4..c42d480d6e0b 100644 --- a/kernel/rcu/tree.c +++ b/kernel/rcu/tree.c @@ -1631,17 +1631,21 @@ static void rcu_sr_put_wait_head(struct llist_node *node) atomic_set_release(&sr_wn->inuse, 0); } -/* Enable rcu_normal_wake_from_gp automatically on small systems. */ -#define WAKE_FROM_GP_CPU_THRESHOLD 16 - -static int rcu_normal_wake_from_gp = -1; +static int rcu_normal_wake_from_gp = 1; module_param(rcu_normal_wake_from_gp, int, 0644); static struct workqueue_struct *sync_wq; +#define RCU_SR_NORMAL_LATCH_THR 64 + +/* Number of in-flight synchronize_rcu() calls queued on srs_next. */ +static atomic_long_t rcu_sr_normal_count; +static atomic_t rcu_sr_normal_latched; + static void rcu_sr_normal_complete(struct llist_node *node) { struct rcu_synchronize *rs = container_of( (struct rcu_head *) node, struct rcu_synchronize, head); + long nr; WARN_ONCE(IS_ENABLED(CONFIG_PROVE_RCU) && !poll_state_synchronize_rcu_full(&rs->oldstate), @@ -1649,6 +1653,15 @@ static void rcu_sr_normal_complete(struct llist_node *node) /* Finally. */ complete(&rs->completion); + nr = atomic_long_dec_return(&rcu_sr_normal_count); + WARN_ON_ONCE(nr < 0); + + /* + * Unlatch: switch back to normal path when fully + * drained and if it has been latched. + */ + if (nr == 0) + (void)atomic_cmpxchg(&rcu_sr_normal_latched, 1, 0); } static void rcu_sr_normal_gp_cleanup_work(struct work_struct *work) @@ -1794,7 +1807,14 @@ static bool rcu_sr_normal_gp_init(void) static void rcu_sr_normal_add_req(struct rcu_synchronize *rs) { + long nr; + llist_add((struct llist_node *) &rs->head, &rcu_state.srs_next); + nr = atomic_long_inc_return(&rcu_sr_normal_count); + + /* Latch: only when flooded and if unlatched. */ + if (nr >= RCU_SR_NORMAL_LATCH_THR) + (void)atomic_cmpxchg(&rcu_sr_normal_latched, 0, 1); } /* @@ -3268,7 +3288,8 @@ static void synchronize_rcu_normal(void) trace_rcu_sr_normal(rcu_state.name, &rs.head, TPS("request")); - if (READ_ONCE(rcu_normal_wake_from_gp) < 1) { + if (READ_ONCE(rcu_normal_wake_from_gp) < 1 || + atomic_read(&rcu_sr_normal_latched)) { wait_rcu_gp(call_rcu_hurry); goto trace_complete_out; } @@ -4892,12 +4913,6 @@ void __init rcu_init(void) sync_wq = alloc_workqueue("sync_wq", WQ_MEM_RECLAIM | WQ_UNBOUND, 0); WARN_ON(!sync_wq); - /* Respect if explicitly disabled via a boot parameter. */ - if (rcu_normal_wake_from_gp < 0) { - if (num_possible_cpus() <= WAKE_FROM_GP_CPU_THRESHOLD) - rcu_normal_wake_from_gp = 1; - } - /* Fill in default value for rcutree.qovld boot parameter. */ /* -After- the rcu_node ->lock fields are initialized! */ if (qovld < 0) -- 2.47.3

