+samir
On 1/15/26 12:04 AM, Uladzislau Rezki (Sony) wrote:
Currently, rcu_normal_wake_from_gp is only enabled by default
on small systems(<= 16 CPUs) or when a user explicitly set it
enabled.
This patch introduces an adaptive latching mechanism:
* Tracks the number of in-flight synchronize_rcu() requests
using a new atomic_t counter(rcu_sr_normal_count);
is this atomic variable getting updated by multiple CPUs at the
same time? We had seen in past such updates tend to be very costly.
* If the count exceeds RCU_SR_NORMAL_LATCH_THR(64), it sets
the rcu_sr_normal_latched, reverting new requests onto the
scaled wait_rcu_gp() path;
* The latch is cleared only when the pending requests are fully
drained(nr == 0);
* Enables rcu_normal_wake_from_gp by default for all systems,
relying on this dynamic throttling instead of static CPU
limits.
Suggested-by: Joel Fernandes <[email protected]>
Signed-off-by: Uladzislau Rezki (Sony) <[email protected]>
---
kernel/rcu/tree.c | 37 ++++++++++++++++++++++++++-----------
1 file changed, 26 insertions(+), 11 deletions(-)
diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
index 293bbd9ac3f4..c42d480d6e0b 100644
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -1631,17 +1631,21 @@ static void rcu_sr_put_wait_head(struct llist_node
*node)
atomic_set_release(&sr_wn->inuse, 0);
}
-/* Enable rcu_normal_wake_from_gp automatically on small systems. */
-#define WAKE_FROM_GP_CPU_THRESHOLD 16
-
-static int rcu_normal_wake_from_gp = -1;
+static int rcu_normal_wake_from_gp = 1;
module_param(rcu_normal_wake_from_gp, int, 0644);
static struct workqueue_struct *sync_wq;
+#define RCU_SR_NORMAL_LATCH_THR 64
+
+/* Number of in-flight synchronize_rcu() calls queued on srs_next. */
+static atomic_long_t rcu_sr_normal_count;
+static atomic_t rcu_sr_normal_latched;
+
static void rcu_sr_normal_complete(struct llist_node *node)
{
struct rcu_synchronize *rs = container_of(
(struct rcu_head *) node, struct rcu_synchronize, head);
+ long nr;
WARN_ONCE(IS_ENABLED(CONFIG_PROVE_RCU) &&
!poll_state_synchronize_rcu_full(&rs->oldstate),
@@ -1649,6 +1653,15 @@ static void rcu_sr_normal_complete(struct llist_node
*node)
/* Finally. */
complete(&rs->completion);
+ nr = atomic_long_dec_return(&rcu_sr_normal_count);
+ WARN_ON_ONCE(nr < 0);
+
+ /*
+ * Unlatch: switch back to normal path when fully
+ * drained and if it has been latched.
+ */
+ if (nr == 0)
+ (void)atomic_cmpxchg(&rcu_sr_normal_latched, 1, 0);
}
static void rcu_sr_normal_gp_cleanup_work(struct work_struct *work)
@@ -1794,7 +1807,14 @@ static bool rcu_sr_normal_gp_init(void)
static void rcu_sr_normal_add_req(struct rcu_synchronize *rs)
{
+ long nr;
+
llist_add((struct llist_node *) &rs->head, &rcu_state.srs_next);
+ nr = atomic_long_inc_return(&rcu_sr_normal_count);
+
+ /* Latch: only when flooded and if unlatched. */
+ if (nr >= RCU_SR_NORMAL_LATCH_THR)
+ (void)atomic_cmpxchg(&rcu_sr_normal_latched, 0, 1);
}
/*
@@ -3268,7 +3288,8 @@ static void synchronize_rcu_normal(void)
trace_rcu_sr_normal(rcu_state.name, &rs.head, TPS("request"));
- if (READ_ONCE(rcu_normal_wake_from_gp) < 1) {
+ if (READ_ONCE(rcu_normal_wake_from_gp) < 1 ||
+ atomic_read(&rcu_sr_normal_latched)) {
wait_rcu_gp(call_rcu_hurry);
goto trace_complete_out;
}
@@ -4892,12 +4913,6 @@ void __init rcu_init(void)
sync_wq = alloc_workqueue("sync_wq", WQ_MEM_RECLAIM | WQ_UNBOUND, 0);
WARN_ON(!sync_wq);
- /* Respect if explicitly disabled via a boot parameter. */
- if (rcu_normal_wake_from_gp < 0) {
- if (num_possible_cpus() <= WAKE_FROM_GP_CPU_THRESHOLD)
- rcu_normal_wake_from_gp = 1;
- }
-
/* Fill in default value for rcutree.qovld boot parameter. */
/* -After- the rcu_node ->lock fields are initialized! */
if (qovld < 0)
Samir,
Could you please give this patch a try on 1000+ cpu system?
Specifically test time taken for SMT1 to SMT8 and SMT8 to SMT1 switching
time.
Uladzislau, Is there any specific testing(other than above) you are looking for?