From: Gaetan Rivet <[email protected]>

Add a way to schedule executions with the RCU using memory embedded
within the object being scheduled, if applicable.

This way, freeing a high volume of objects does not require many small
allocations, reducing memory pressure and heap fragmentation.

This fragmentation is seen as the heap grows instead of shrinking when
a high volume of objects are freed using RCU.

Signed-off-by: Gaetan Rivet <[email protected]>
Signed-off-by: Eli Britstein <[email protected]>
---
 lib/ovs-rcu.c           | 125 ++++++++++++++++++++++++++++++++++------
 lib/ovs-rcu.h           |  39 +++++++++++++
 tests/automake.mk       |   1 +
 tests/library.at        |   4 ++
 tests/test-rcu-inline.c | 116 +++++++++++++++++++++++++++++++++++++
 5 files changed, 268 insertions(+), 17 deletions(-)
 create mode 100644 tests/test-rcu-inline.c

diff --git a/lib/ovs-rcu.c b/lib/ovs-rcu.c
index 49afcc55c..539d17a7c 100644
--- a/lib/ovs-rcu.c
+++ b/lib/ovs-rcu.c
@@ -20,6 +20,7 @@
 #include "fatal-signal.h"
 #include "guarded-list.h"
 #include "latch.h"
+#include "mpsc-queue.h"
 #include "openvswitch/list.h"
 #include "ovs-thread.h"
 #include "openvswitch/poll-loop.h"
@@ -49,6 +50,7 @@ struct ovsrcu_perthread {
 
     uint64_t seqno;
     struct ovsrcu_cbset *cbset;
+    bool do_inline;
     char name[16];              /* This thread's name. */
 };
 
@@ -61,6 +63,8 @@ static struct ovs_mutex ovsrcu_threads_mutex;
 static struct guarded_list flushed_cbsets;
 static struct seq *flushed_cbsets_seq;
 
+static struct seq *inline_seq;
+
 static struct latch postpone_exit;
 static struct ovs_barrier postpone_barrier;
 
@@ -68,7 +72,8 @@ static void ovsrcu_init_module(void);
 static void ovsrcu_flush_cbset__(struct ovsrcu_perthread *, bool);
 static void ovsrcu_flush_cbset(struct ovsrcu_perthread *);
 static void ovsrcu_unregister__(struct ovsrcu_perthread *);
-static bool ovsrcu_call_postponed(void);
+static bool ovsrcu_call_inline(uint64_t);
+static bool ovsrcu_call_postponed(struct ovs_list *cbsets);
 static void *ovsrcu_postpone_thread(void *arg OVS_UNUSED);
 
 static struct ovsrcu_perthread *
@@ -85,6 +90,7 @@ ovsrcu_perthread_get(void)
         perthread = xmalloc(sizeof *perthread);
         perthread->seqno = seq_read(global_seqno);
         perthread->cbset = NULL;
+        perthread->do_inline = false;
         ovs_strlcpy(perthread->name, name[0] ? name : "main",
                     sizeof perthread->name);
 
@@ -112,7 +118,11 @@ static void
 ovsrcu_quiesced(void)
 {
     if (single_threaded()) {
-        ovsrcu_call_postponed();
+        struct ovs_list cbsets = OVS_LIST_INITIALIZER(&cbsets);
+
+        ovsrcu_call_inline(seq_read(global_seqno));
+        guarded_list_pop_all(&flushed_cbsets, &cbsets);
+        ovsrcu_call_postponed(&cbsets);
     } else {
         static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
         if (ovsthread_once_start(&once)) {
@@ -156,6 +166,9 @@ ovsrcu_quiesce(void)
     if (perthread->cbset) {
         ovsrcu_flush_cbset(perthread);
     }
+    if (perthread->do_inline) {
+        seq_change(inline_seq);
+    }
     seq_change(global_seqno);
 
     ovsrcu_quiesced();
@@ -174,6 +187,9 @@ ovsrcu_try_quiesce(void)
         if (perthread->cbset) {
             ovsrcu_flush_cbset__(perthread, true);
         }
+        if (perthread->do_inline) {
+            seq_change_protected(inline_seq);
+        }
         seq_change_protected(global_seqno);
         seq_unlock();
         ovsrcu_quiesced();
@@ -189,21 +205,28 @@ ovsrcu_is_quiescent(void)
     return pthread_getspecific(perthread_key) == NULL;
 }
 
-void
-ovsrcu_synchronize(void)
+static uint64_t
+ovsrcu_synchronize_(struct ovs_list *cbsets)
 {
     unsigned int warning_threshold = 1000;
     uint64_t target_seqno;
     long long int start;
 
     if (single_threaded()) {
-        return;
+        return UINT64_MAX;
     }
 
     target_seqno = seq_read(global_seqno);
     ovsrcu_quiesce_start();
     start = time_msec();
 
+    if (cbsets != NULL) {
+        /* Move the flushed 'cbsets' after 'ovsrcu_quiesce_start',
+         * as this function has the potential in single-threaded mode
+         * to itself execute those 'cbsets'. */
+        guarded_list_pop_all(&flushed_cbsets, cbsets);
+    }
+
     for (;;) {
         uint64_t cur_seqno = seq_read(global_seqno);
         struct ovsrcu_perthread *perthread;
@@ -237,6 +260,15 @@ ovsrcu_synchronize(void)
         poll_block();
     }
     ovsrcu_quiesce_end();
+
+    /* Return the 'seqno' that is safe to consider reached by all threads. */
+    return target_seqno;
+}
+
+void
+ovsrcu_synchronize(void)
+{
+    ovs_ignore(ovsrcu_synchronize_(NULL));
 }
 
 /* Waits until as many postponed callbacks as possible have executed.
@@ -275,11 +307,57 @@ ovsrcu_exit(void)
      * infinite loop.  This function is just for making memory leaks easier to
      * spot so there's no point in breaking things on that basis. */
     for (int i = 0; i < 8; i++) {
-        ovsrcu_synchronize();
-        if (!ovsrcu_call_postponed()) {
+        struct ovs_list cbsets = OVS_LIST_INITIALIZER(&cbsets);
+        uint64_t target = ovsrcu_synchronize_(&cbsets);
+        bool inline_active;
+        bool cbsets_active;
+
+        /* Both RCU calls must be examined for activity. */
+        inline_active = ovsrcu_call_inline(target);
+        cbsets_active = ovsrcu_call_postponed(&cbsets);
+        if (!inline_active && !cbsets_active) {
+            break;
+        }
+    }
+}
+
+static struct mpsc_queue inline_queue = MPSC_QUEUE_INITIALIZER(&inline_queue);
+
+void
+ovsrcu_postpone_inline__(void (*function)(void *aux), void *aux,
+                         struct ovsrcu_inline_node *rcu_node)
+{
+    struct ovsrcu_perthread *perthread = ovsrcu_perthread_get();
+
+    rcu_node->seqno = perthread->seqno;
+    rcu_node->cb = function;
+    rcu_node->aux = aux;
+    mpsc_queue_insert(&inline_queue, &rcu_node->node);
+
+    perthread->do_inline = true;
+}
+
+static bool
+ovsrcu_call_inline(uint64_t target_seqno)
+{
+    struct mpsc_queue_node *msg;
+    unsigned int count = 0;
+
+    mpsc_queue_acquire(&inline_queue);
+    MPSC_QUEUE_FOR_EACH_POP (msg, &inline_queue) {
+        struct ovsrcu_inline_node *node;
+
+        node = CONTAINER_OF(msg, struct ovsrcu_inline_node, node);
+        if (node->seqno >= target_seqno) {
+            mpsc_queue_push_front(&inline_queue, msg);
             break;
         }
+        node->cb(node->aux);
+        count++;
     }
+    mpsc_queue_release(&inline_queue);
+
+    return count > 0;
 }
 
 /* Registers 'function' to be called, passing 'aux' as argument, after the
@@ -327,19 +405,18 @@ ovsrcu_postpone__(void (*function)(void *aux), void *aux)
 }
 
 static bool OVS_NO_SANITIZE_FUNCTION
-ovsrcu_call_postponed(void)
+ovsrcu_call_postponed(struct ovs_list *cbsets)
 {
     struct ovsrcu_cbset *cbset;
-    struct ovs_list cbsets;
 
-    guarded_list_pop_all(&flushed_cbsets, &cbsets);
-    if (ovs_list_is_empty(&cbsets)) {
+    if (cbsets == NULL) {
+        return false;
+    }
+    if (ovs_list_is_empty(cbsets)) {
         return false;
     }
 
-    ovsrcu_synchronize();
-
-    LIST_FOR_EACH_POP (cbset, list_node, &cbsets) {
+    LIST_FOR_EACH_POP (cbset, list_node, cbsets) {
         struct ovsrcu_cb *cb;
 
         for (cb = cbset->cbs; cb < &cbset->cbs[cbset->n_cbs]; cb++) {
@@ -358,9 +435,19 @@ ovsrcu_postpone_thread(void *arg OVS_UNUSED)
     pthread_detach(pthread_self());
 
     while (!latch_is_set(&postpone_exit)) {
-        uint64_t seqno = seq_read(flushed_cbsets_seq);
-        if (!ovsrcu_call_postponed()) {
-            seq_wait(flushed_cbsets_seq, seqno);
+        struct ovs_list cbsets = OVS_LIST_INITIALIZER(&cbsets);
+        uint64_t cb_seqno = seq_read(flushed_cbsets_seq);
+        uint64_t target = ovsrcu_synchronize_(&cbsets);
+        uint64_t inline_seqno = seq_read(inline_seq);
+        bool inline_active;
+        bool cbsets_active;
+
+        /* Both RCU calls must be examined for activity. */
+        inline_active = ovsrcu_call_inline(target);
+        cbsets_active = ovsrcu_call_postponed(&cbsets);
+        if (!inline_active && !cbsets_active) {
+            seq_wait(flushed_cbsets_seq, cb_seqno);
+            seq_wait(inline_seq, inline_seqno);
             latch_wait(&postpone_exit);
             poll_block();
         }
@@ -399,6 +486,9 @@ ovsrcu_unregister__(struct ovsrcu_perthread *perthread)
     if (perthread->cbset) {
         ovsrcu_flush_cbset(perthread);
     }
+    if (perthread->do_inline) {
+        seq_change(inline_seq);
+    }
 
     ovs_mutex_lock(&ovsrcu_threads_mutex);
     ovs_list_remove(&perthread->list_node);
@@ -440,6 +530,7 @@ ovsrcu_init_module(void)
 
         guarded_list_init(&flushed_cbsets);
         flushed_cbsets_seq = seq_create();
+        inline_seq = seq_create();
 
         ovsthread_once_done(&once);
     }
diff --git a/lib/ovs-rcu.h b/lib/ovs-rcu.h
index a1c15c126..ed756c1c2 100644
--- a/lib/ovs-rcu.h
+++ b/lib/ovs-rcu.h
@@ -125,6 +125,22 @@
  *         ovs_mutex_unlock(&mutex);
  *     }
  *
+ * As an alternative to ovsrcu_postpone(), the same deferred execution can be
+ * achieved using ovsrcu_postpone_inline():
+ *
+ *      struct deferrable {
+ *          struct ovsrcu_inline_node rcu_node;
+ *      };
+ *
+ *      void
+ *      deferred_free(struct deferrable *d)
+ *      {
+ *          ovsrcu_postpone_inline(free, d, rcu_node);
+ *      }
+ *
+ * Using inline fields can be preferred sometimes to avoid the small
+ * allocations done in ovsrcu_postpone().
+ *
  * In some rare cases an object may not be addressable with a pointer, but only
  * through an array index (e.g. because it's provided by another library).  It
  * is still possible to have RCU semantics by using the ovsrcu_index type.
@@ -171,6 +187,7 @@
  */
 
 #include "compiler.h"
+#include "mpsc-queue.h"
 #include "ovs-atomic.h"
 
 #if __GNUC__
@@ -256,6 +273,28 @@ void ovsrcu_postpone__(void (*function)(void *aux), void 
*aux);
      (void) sizeof(*(ARG)),                                     \
      ovsrcu_postpone__((void (*)(void *))(FUNCTION), ARG))
 
+struct ovsrcu_inline_node {
+    struct mpsc_queue_node node;
+    void (*cb)(void *aux);
+    void *aux;
+    uint64_t seqno;
+};
+
+/* Calls FUNCTION passing ARG as its pointer-type argument, which
+ * contains an 'ovsrcu_inline_node' as a field named MEMBER. The function
+ * is called following the next grace period.  See 'Usage' above for an
+ * example.
+ */
+void ovsrcu_postpone_inline__(void (*function)(void *aux), void *aux,
+                              struct ovsrcu_inline_node *node);
+#define ovsrcu_postpone_inline(FUNCTION, ARG, MEMBER)               \
+    (/* Verify that ARG is appropriate for FUNCTION. */             \
+     (void) sizeof((FUNCTION)(ARG), 1),                             \
+     /* Verify that ARG is a pointer type. */                       \
+     (void) sizeof(*(ARG)),                                         \
+     ovsrcu_postpone_inline__((void (*)(void *))(FUNCTION), ARG,    \
+                              &(ARG)->MEMBER))
+
 /* An array index protected by RCU semantics.  This is an easier alternative to
  * an RCU protected pointer to a malloc'd int. */
 typedef struct { atomic_int v; } ovsrcu_index;
diff --git a/tests/automake.mk b/tests/automake.mk
index da569b022..da4d2e0b8 100644
--- a/tests/automake.mk
+++ b/tests/automake.mk
@@ -500,6 +500,7 @@ tests_ovstest_SOURCES = \
        tests/test-random.c \
        tests/test-rcu.c \
        tests/test-rculist.c \
+       tests/test-rcu-inline.c \
        tests/test-reconnect.c \
        tests/test-rstp.c \
        tests/test-sflow.c \
diff --git a/tests/library.at b/tests/library.at
index 82ac80a27..16820ff49 100644
--- a/tests/library.at
+++ b/tests/library.at
@@ -275,6 +275,10 @@ AT_SETUP([rcu])
 AT_CHECK([ovstest test-rcu], [0], [])
 AT_CLEANUP
 
+AT_SETUP([rcu inline])
+AT_CHECK([ovstest test-rcu-inline], [0], [])
+AT_CLEANUP
+
 AT_SETUP([stopwatch module])
 AT_CHECK([ovstest test-stopwatch], [0], [......
 ], [ignore])
diff --git a/tests/test-rcu-inline.c b/tests/test-rcu-inline.c
new file mode 100644
index 000000000..af4713af9
--- /dev/null
+++ b/tests/test-rcu-inline.c
@@ -0,0 +1,116 @@
+/*
+ * Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+
+#undef NDEBUG
+#include "ovs-atomic.h"
+#include "ovs-rcu.h"
+#include "ovs-thread.h"
+#include "ovstest.h"
+#include "seq.h"
+#include "timeval.h"
+#include "util.h"
+
+#include "openvswitch/poll-loop.h"
+
+struct element {
+    struct ovsrcu_inline_node rcu_node;
+    struct seq *trigger;
+    atomic_bool wait;
+};
+
+static void
+do_inline(void *e_)
+{
+    struct element *e = (struct element *) e_;
+
+    seq_change(e->trigger);
+}
+
+static void *
+wait_main(void *aux)
+{
+    struct element *e = aux;
+
+    for (;;) {
+        bool wait;
+
+        atomic_read(&e->wait, &wait);
+        if (!wait) {
+            break;
+        }
+    }
+
+    seq_wait(e->trigger, seq_read(e->trigger));
+    poll_block();
+
+    return NULL;
+}
+
+static void
+test_rcu_inline_main(bool multithread)
+{
+    long long int timeout;
+    pthread_t waiter;
+    struct element e;
+    uint64_t seqno;
+
+    atomic_init(&e.wait, true);
+
+    if (multithread) {
+        waiter = ovs_thread_create("waiter", wait_main, &e);
+    }
+
+    e.trigger = seq_create();
+    seqno = seq_read(e.trigger);
+
+    ovsrcu_postpone_inline(do_inline, &e, rcu_node);
+
+    /* Check that GC holds out until all threads are quiescent. */
+    timeout = time_msec();
+    if (multithread) {
+        timeout += 200;
+    }
+    while (time_msec() <= timeout) {
+        ovs_assert(seq_read(e.trigger) == seqno);
+    }
+
+    atomic_store(&e.wait, false);
+
+    seq_wait(e.trigger, seqno);
+    poll_timer_wait_until(time_msec() + 200);
+    poll_block();
+
+    /* Verify that GC executed. */
+    ovs_assert(seq_read(e.trigger) != seqno);
+    seq_destroy(e.trigger);
+
+    if (multithread) {
+        xpthread_join(waiter, NULL);
+    }
+}
+
+static void
+test_rcu_inline(int argc OVS_UNUSED, char *argv[] OVS_UNUSED)
+{
+    const bool multithread = true;
+
+    test_rcu_inline_main(!multithread);
+    test_rcu_inline_main(multithread);
+}
+
+OVSTEST_REGISTER("test-rcu-inline", test_rcu_inline);
-- 
2.34.1

_______________________________________________
dev mailing list
[email protected]
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to