On 03/03/2026 14:41, Eelco Chaudron wrote:
> External email: Use caution opening links or attachments
> 
> 
> On 9 Feb 2026, at 14:29, Eli Britstein wrote:
> 
>> From: Gaetan Rivet <[email protected]>
>>
>> Add a way to schedule executions with the RCU using memory embedded
>> within the object being scheduled, if applicable.
>>
>> This way, freeing a high volume of objects does not require many small
>> allocations, reducing memory pressure and heap fragmentation.
>>
>> This fragmentation is seen as the heap grows instead of shrinking when
>> a high volume of objects are freed using RCU.
> 
> Hi Gaetan,
> 
> Thanks for the patch. I have some comments inline below (not a complete
> review yet).
> 
> I'm curious about the design choice: why implement this as a parallel
> mechanism rather than refactoring the existing RCU implementation to handle
> both cases? Was this driven by performance concerns, or are there fundamental
> differences that require separate code paths?
> 
> Cheers,
> 
> Eelco
> 

This is historical.
At the time, I wanted to make sure the main mechanism would not be disturbed by 
my changes
so I kept them separate. As time passed, I didn't have a reason to revisit 
something that was working.
I agree that it would make sense to use a single mechanism.

>> Signed-off-by: Gaetan Rivet <[email protected]>
>> Co-authored-by: Eli Britstein <[email protected]>
>> Signed-off-by: Eli Britstein <[email protected]>
>> ---
>>   lib/ovs-rcu.c           | 125 ++++++++++++++++++++++++++++++++++------
>>   lib/ovs-rcu.h           |  39 +++++++++++++
>>   tests/automake.mk       |   1 +
>>   tests/library.at        |   4 ++
>>   tests/test-rcu-inline.c | 118 +++++++++++++++++++++++++++++++++++++
>>   5 files changed, 270 insertions(+), 17 deletions(-)
>>   create mode 100644 tests/test-rcu-inline.c
>>
>> diff --git a/lib/ovs-rcu.c b/lib/ovs-rcu.c
>> index 49afcc55c..539d17a7c 100644
>> --- a/lib/ovs-rcu.c
>> +++ b/lib/ovs-rcu.c
>> @@ -20,6 +20,7 @@
>>   #include "fatal-signal.h"
>>   #include "guarded-list.h"
>>   #include "latch.h"
>> +#include "mpsc-queue.h"
>>   #include "openvswitch/list.h"
>>   #include "ovs-thread.h"
>>   #include "openvswitch/poll-loop.h"
>> @@ -49,6 +50,7 @@ struct ovsrcu_perthread {
>>
>>       uint64_t seqno;
>>       struct ovsrcu_cbset *cbset;
>> +    bool do_inline;
> 
> Do we need this bool? It's not set atomically or under any lock, and once set
> to true it stays true forever for that thread. Would it be better to check if
> the inline_queue is empty instead of relying on this sticky flag?
> 

Some quiescent modes will delete the perthread structure (cf. 
ovsrcu_quiesce_start)
In this case, this flag is reset when creating the new perthread next active 
period.

I'll check if the flag can be avoided.

>>       char name[16];              /* This thread's name. */
>>   };
>>
>> @@ -61,6 +63,8 @@ static struct ovs_mutex ovsrcu_threads_mutex;
>>   static struct guarded_list flushed_cbsets;
>>   static struct seq *flushed_cbsets_seq;
>>
>> +static struct seq *inline_seq;
> 
> If do_inline is removed (always signal), inline_seq becomes redundant with 
> global_seqno.
> 
>> +  static struct latch postpone_exit;
>>   static struct ovs_barrier postpone_barrier;
>>
>> @@ -68,7 +72,8 @@ static void ovsrcu_init_module(void);
>>   static void ovsrcu_flush_cbset__(struct ovsrcu_perthread *, bool);
>>   static void ovsrcu_flush_cbset(struct ovsrcu_perthread *);
>>   static void ovsrcu_unregister__(struct ovsrcu_perthread *);
>> -static bool ovsrcu_call_postponed(void);
>> +static bool ovsrcu_call_inline(uint64_t);
>> +static bool ovsrcu_call_postponed(struct ovs_list *cbsets);
>>   static void *ovsrcu_postpone_thread(void *arg OVS_UNUSED);
>>
>>   static struct ovsrcu_perthread *
>> @@ -85,6 +90,7 @@ ovsrcu_perthread_get(void)
>>           perthread = xmalloc(sizeof *perthread);
>>           perthread->seqno = seq_read(global_seqno);
>>           perthread->cbset = NULL;
>> +        perthread->do_inline = false;
>>           ovs_strlcpy(perthread->name, name[0] ? name : "main",
>>                       sizeof perthread->name);
>>
>> @@ -112,7 +118,11 @@ static void
>>   ovsrcu_quiesced(void)
>>   {
>>       if (single_threaded()) {
>> -        ovsrcu_call_postponed();
>> +        struct ovs_list cbsets = OVS_LIST_INITIALIZER(&cbsets);
>> +
>> +        ovsrcu_call_inline(seq_read(global_seqno));
>> +        guarded_list_pop_all(&flushed_cbsets, &cbsets);
>> +        ovsrcu_call_postponed(&cbsets);
>>       } else {
>>           static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
>>           if (ovsthread_once_start(&once)) {
>> @@ -156,6 +166,9 @@ ovsrcu_quiesce(void)
>>       if (perthread->cbset) {
>>           ovsrcu_flush_cbset(perthread);
>>       }
>> +    if (perthread->do_inline) {
>> +        seq_change(inline_seq);
>> +    }
>>       seq_change(global_seqno);
>>
>>       ovsrcu_quiesced();
>> @@ -174,6 +187,9 @@ ovsrcu_try_quiesce(void)
>>           if (perthread->cbset) {
>>               ovsrcu_flush_cbset__(perthread, true);
>>           }
>> +        if (perthread->do_inline) {
>> +            seq_change_protected(inline_seq);
>> +        }
>>           seq_change_protected(global_seqno);
>>           seq_unlock();
>>           ovsrcu_quiesced();
>> @@ -189,21 +205,28 @@ ovsrcu_is_quiescent(void)
>>       return pthread_getspecific(perthread_key) == NULL;
>>   }
>>
>> -void
>> -ovsrcu_synchronize(void)
>> +static uint64_t
>> +ovsrcu_synchronize_(struct ovs_list *cbsets)
> 
> We use a double _ for specific variants.
> 

Ack.

>>   {
>>       unsigned int warning_threshold = 1000;
>>       uint64_t target_seqno;
>>       long long int start;
>>
>>       if (single_threaded()) {
>> -        return;
>> +        return UINT64_MAX;
>>       }
>>
>>       target_seqno = seq_read(global_seqno);
>>       ovsrcu_quiesce_start();
>>       start = time_msec();
>>
>> +    if (cbsets != NULL) {
>> +        /* Move the flushed 'cbsets' after 'ovsrcu_quiesce_start',
>> +         * as this function has the potential in single-threaded mode
>> +         * to itself execute those 'cbsets'. */
> 
> This comment doesn't make sense. We can only reach here in multi-threaded mode
> (single-threaded returns early), so the described scenario is impossible.
> 
>> +        guarded_list_pop_all(&flushed_cbsets, cbsets);
>> +    }
>> +
>>       for (;;) {
>>           uint64_t cur_seqno = seq_read(global_seqno);
>>           struct ovsrcu_perthread *perthread;
>> @@ -237,6 +260,15 @@ ovsrcu_synchronize(void)
>>           poll_block();
>>       }
>>       ovsrcu_quiesce_end();
>> +
>> +    /* Return the 'seqno' that is safe to consider reached by all threads. 
>> */
>> +    return target_seqno;
>> +}
>> +
>> +void
>> +ovsrcu_synchronize(void)
>> +{
>> +    ovs_ignore(ovsrcu_synchronize_(NULL));
>>   }
>>
>>   /* Waits until as many postponed callbacks as possible have executed.
>> @@ -275,11 +307,57 @@ ovsrcu_exit(void)
>>        * infinite loop.  This function is just for making memory leaks 
>> easier to
>>        * spot so there's no point in breaking things on that basis. */
>>       for (int i = 0; i < 8; i++) {
>> -        ovsrcu_synchronize();
>> -        if (!ovsrcu_call_postponed()) {
>> +        struct ovs_list cbsets = OVS_LIST_INITIALIZER(&cbsets);
>> +        uint64_t target = ovsrcu_synchronize_(&cbsets);
>> +        bool inline_active;
>> +        bool cbsets_active;
>> +
>> +        /* Both RCU calls must be examined for activity. */
>> +        inline_active = ovsrcu_call_inline(target);
> 
> The inline naming is confusing. Consider using "embedded" instead, like
> ovsrcu_postpone_embedded(). For consistency, this function would be
> ovsrcu_call_postpone_embedded().
> 

Ack.

>> +        cbsets_active = ovsrcu_call_postponed(&cbsets);
>> +        if (!inline_active && !cbsets_active) {
>> +            break;
>> +        }
>> +    }
>> +}
>> +
>> +static struct mpsc_queue inline_queue = 
>> MPSC_QUEUE_INITIALIZER(&inline_queue);
>> +
>> +void
>> +ovsrcu_postpone_inline__(void (*function)(void *aux), void *aux,
>> +                         struct ovsrcu_inline_node *rcu_node)
>> +{
>> +    struct ovsrcu_perthread *perthread = ovsrcu_perthread_get();
>> +
>> +    rcu_node->seqno = perthread->seqno;
>> +    rcu_node->cb = function;
>> +    rcu_node->aux = aux;
>> +    mpsc_queue_insert(&inline_queue, &rcu_node->node);
>> +
>> +    perthread->do_inline = true;
>> +}
>> +
>> +static bool
>> +ovsrcu_call_inline(uint64_t target_seqno)
>> +{
>> +    struct mpsc_queue_node *msg;
>> +    unsigned int count = 0;
>> +
>> +    mpsc_queue_acquire(&inline_queue);
>> +    MPSC_QUEUE_FOR_EACH_POP (msg, &inline_queue) {
>> +        struct ovsrcu_inline_node *node;
>> +
>> +        node = CONTAINER_OF(msg, struct ovsrcu_inline_node, node);
>> +        if (node->seqno >= target_seqno) {
>> +            mpsc_queue_push_front(&inline_queue, msg);
>>               break;
>>           }
>> +        node->cb(node->aux);
>> +        count++;
>>       }
>> +    mpsc_queue_release(&inline_queue);
>> +
>> +    return count > 0;
>>   }
>>
>>   /* Registers 'function' to be called, passing 'aux' as argument, after the
>> @@ -327,19 +405,18 @@ ovsrcu_postpone__(void (*function)(void *aux), void 
>> *aux)
>>   }
>>
>>   static bool OVS_NO_SANITIZE_FUNCTION
>> -ovsrcu_call_postponed(void)
>> +ovsrcu_call_postponed(struct ovs_list *cbsets)
>>   {
>>       struct ovsrcu_cbset *cbset;
>> -    struct ovs_list cbsets;
>>
>> -    guarded_list_pop_all(&flushed_cbsets, &cbsets);
>> -    if (ovs_list_is_empty(&cbsets)) {
>> +    if (cbsets == NULL) {
>> +        return false;
>> +    }
>> +    if (ovs_list_is_empty(cbsets)) {
> 
> These two ifs could be combined. Also, why does cbsets need to be a parameter?
> (See earlier comment about the confusing comment in ovsrcu_synchronize_.)
> 

Ack on combined if().

The cbsets are provided as parameter due to the ovsrcu_synchronize() call
below that is removed.

It is removed to allow the inlined / embedded nodes to be called properly.
Instead of executing "sync" as part of "call_postponed":

  + call_postponed
      +--> pop cbsets
      +--> sync()
      +--> execute cbsets

becomes:

  + pop cbsets
  + sync
  + call_inlined
  + call_posponed(cbsets)

Removing sync from inside "call_postponed" means that new cbsets might be 
scheduled by another thread,
then this thread enters ovsrcu_call_postponed, pops all cbsets and start 
executing them (now without
sync): it would execute those new cbsets immediately, breaking RCU.

To avoid this situation, the cbsets are popped before entering this function.
A sync point is then executed, and the previously popped cbsets are provided
as parameter to "postponed" to be executed.

If both inline / embedded and regular posponed cbsets use a single defer 
mechanism, maybe this whole
sync reordering can be avoided altogether, removing this subtle bug and general 
confusing changes.

I'll try it.

>>           return false;
>>       }
>>
>> -    ovsrcu_synchronize();
>> -
>> -    LIST_FOR_EACH_POP (cbset, list_node, &cbsets) {
>> +    LIST_FOR_EACH_POP (cbset, list_node, cbsets) {
>>           struct ovsrcu_cb *cb;
>>
>>           for (cb = cbset->cbs; cb < &cbset->cbs[cbset->n_cbs]; cb++) {
>> @@ -358,9 +435,19 @@ ovsrcu_postpone_thread(void *arg OVS_UNUSED)
>>       pthread_detach(pthread_self());
>>
>>       while (!latch_is_set(&postpone_exit)) {
>> -        uint64_t seqno = seq_read(flushed_cbsets_seq);
>> -        if (!ovsrcu_call_postponed()) {
>> -            seq_wait(flushed_cbsets_seq, seqno);
>> +        struct ovs_list cbsets = OVS_LIST_INITIALIZER(&cbsets);
>> +        uint64_t cb_seqno = seq_read(flushed_cbsets_seq);
>> +        uint64_t target = ovsrcu_synchronize_(&cbsets);
>> +        uint64_t inline_seqno = seq_read(inline_seq);
>> +        bool inline_active;
>> +        bool cbsets_active;
>> +
>> +        /* Both RCU calls must be examined for activity. */
>> +        inline_active = ovsrcu_call_inline(target);
>> +        cbsets_active = ovsrcu_call_postponed(&cbsets);
>> +        if (!inline_active && !cbsets_active) {
>> +            seq_wait(flushed_cbsets_seq, cb_seqno);
>> +            seq_wait(inline_seq, inline_seqno);
>>               latch_wait(&postpone_exit);
>>               poll_block();
>>           }
>> @@ -399,6 +486,9 @@ ovsrcu_unregister__(struct ovsrcu_perthread *perthread)
>>       if (perthread->cbset) {
>>           ovsrcu_flush_cbset(perthread);
>>       }
>> +    if (perthread->do_inline) {
>> +        seq_change(inline_seq);
>> +    }
>>
>>       ovs_mutex_lock(&ovsrcu_threads_mutex);
>>       ovs_list_remove(&perthread->list_node);
>> @@ -440,6 +530,7 @@ ovsrcu_init_module(void)
>>
>>           guarded_list_init(&flushed_cbsets);
>>           flushed_cbsets_seq = seq_create();
>> +        inline_seq = seq_create();
>>
>>           ovsthread_once_done(&once);
>>       }
>> diff --git a/lib/ovs-rcu.h b/lib/ovs-rcu.h
>> index a1c15c126..ed756c1c2 100644
>> --- a/lib/ovs-rcu.h
>> +++ b/lib/ovs-rcu.h
>> @@ -125,6 +125,22 @@
>>    *         ovs_mutex_unlock(&mutex);
>>    *     }
>>    *
>> + * As an alternative to ovsrcu_postpone(), the same deferred execution can 
>> be
>> + * achieved using ovsrcu_postpone_inline():
>> + *
>> + *      struct deferrable {
>> + *          struct ovsrcu_inline_node rcu_node;
>> + *      };
>> + *
>> + *      void
>> + *      deferred_free(struct deferrable *d)
>> + *      {
>> + *          ovsrcu_postpone_inline(free, d, rcu_node);
>> + *      }
>> + *
>> + * Using inline fields can be preferred sometimes to avoid the small
>> + * allocations done in ovsrcu_postpone().
>> + *
>>    * In some rare cases an object may not be addressable with a pointer, but 
>> only
>>    * through an array index (e.g. because it's provided by another library). 
>>  It
>>    * is still possible to have RCU semantics by using the ovsrcu_index type.
>> @@ -171,6 +187,7 @@
>>    */
>>
>>   #include "compiler.h"
>> +#include "mpsc-queue.h"
>>   #include "ovs-atomic.h"
>>
>>   #if __GNUC__
>> @@ -256,6 +273,28 @@ void ovsrcu_postpone__(void (*function)(void *aux), 
>> void *aux);
>>        (void) sizeof(*(ARG)),                                     \
>>        ovsrcu_postpone__((void (*)(void *))(FUNCTION), ARG))
>>
>> +struct ovsrcu_inline_node {
>> +    struct mpsc_queue_node node;
>> +    void (*cb)(void *aux);
>> +    void *aux;
>> +    uint64_t seqno;
>> +};
>> +
>> +/* Calls FUNCTION passing ARG as its pointer-type argument, which
>> + * contains an 'ovsrcu_inline_node' as a field named MEMBER. The function
>> + * is called following the next grace period.  See 'Usage' above for an
>> + * example.
>> + */
>> +void ovsrcu_postpone_inline__(void (*function)(void *aux), void *aux,
>> +                              struct ovsrcu_inline_node *node);
>> +#define ovsrcu_postpone_inline(FUNCTION, ARG, MEMBER)               \
>> +    (/* Verify that ARG is appropriate for FUNCTION. */             \
>> +     (void) sizeof((FUNCTION)(ARG), 1),                             \
>> +     /* Verify that ARG is a pointer type. */                       \
>> +     (void) sizeof(*(ARG)),                                         \
>> +     ovsrcu_postpone_inline__((void (*)(void *))(FUNCTION), ARG,    \
>> +                              &(ARG)->MEMBER))
>> +
>>   /* An array index protected by RCU semantics.  This is an easier 
>> alternative to
>>    * an RCU protected pointer to a malloc'd int. */
>>   typedef struct { atomic_int v; } ovsrcu_index;
>> diff --git a/tests/automake.mk b/tests/automake.mk
>> index da569b022..da4d2e0b8 100644
>> --- a/tests/automake.mk
>> +++ b/tests/automake.mk
>> @@ -500,6 +500,7 @@ tests_ovstest_SOURCES = \
>>        tests/test-random.c \
>>        tests/test-rcu.c \
>>        tests/test-rculist.c \
>> +     tests/test-rcu-inline.c \
>>        tests/test-reconnect.c \
>>        tests/test-rstp.c \
>>        tests/test-sflow.c \
>> diff --git a/tests/library.at b/tests/library.at
>> index 82ac80a27..16820ff49 100644
>> --- a/tests/library.at
>> +++ b/tests/library.at
>> @@ -275,6 +275,10 @@ AT_SETUP([rcu])
>>   AT_CHECK([ovstest test-rcu], [0], [])
>>   AT_CLEANUP
>>
>> +AT_SETUP([rcu inline])
>> +AT_CHECK([ovstest test-rcu-inline], [0], [])
>> +AT_CLEANUP
>> +
>>   AT_SETUP([stopwatch module])
>>   AT_CHECK([ovstest test-stopwatch], [0], [......
>>   ], [ignore])
>> diff --git a/tests/test-rcu-inline.c b/tests/test-rcu-inline.c
>> new file mode 100644
>> index 000000000..c72410223
>> --- /dev/null
>> +++ b/tests/test-rcu-inline.c
>> @@ -0,0 +1,118 @@
>> +/*
>> + * SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & 
>> AFFILIATES.
>> + * All rights reserved.
>> + * SPDX-License-Identifier: Apache-2.0
>> + *
>> + * Licensed under the Apache License, Version 2.0 (the "License");
>> + * you may not use this file except in compliance with the License.
>> + * You may obtain a copy of the License at
>> + *
>> + * http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing, software
>> + * distributed under the License is distributed on an "AS IS" BASIS,
>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> + * See the License for the specific language governing permissions and
>> + * limitations under the License.
>> + */
>> +
>> +#include <config.h>
>> +
>> +#undef NDEBUG
>> +#include "ovs-atomic.h"
>> +#include "ovs-rcu.h"
>> +#include "ovs-thread.h"
>> +#include "ovstest.h"
>> +#include "seq.h"
>> +#include "timeval.h"
>> +#include "util.h"
>> +
>> +#include "openvswitch/poll-loop.h"
>> +
>> +struct element {
>> +    struct ovsrcu_inline_node rcu_node;
>> +    struct seq *trigger;
>> +    atomic_bool wait;
>> +};
>> +
>> +static void
>> +do_inline(void *e_)
>> +{
>> +    struct element *e = (struct element *) e_;
>> +
>> +    seq_change(e->trigger);
>> +}
>> +
>> +static void *
>> +wait_main(void *aux)
>> +{
>> +    struct element *e = aux;
>> +
>> +    for (;;) {
>> +        bool wait;
>> +
>> +        atomic_read(&e->wait, &wait);
>> +        if (!wait) {
>> +            break;
>> +        }
>> +    }
>> +
>> +    seq_wait(e->trigger, seq_read(e->trigger));
>> +    poll_block();
>> +
>> +    return NULL;
>> +}
>> +
>> +static void
>> +test_rcu_inline_main(bool multithread)
>> +{
>> +    long long int timeout;
>> +    pthread_t waiter;
>> +    struct element e;
>> +    uint64_t seqno;
>> +
>> +    atomic_init(&e.wait, true);
>> +
>> +    if (multithread) {
>> +        waiter = ovs_thread_create("waiter", wait_main, &e);
>> +    }
>> +
>> +    e.trigger = seq_create();
>> +    seqno = seq_read(e.trigger);
>> +
>> +    ovsrcu_postpone_inline(do_inline, &e, rcu_node);
>> +
>> +    /* Check that GC holds out until all threads are quiescent. */
>> +    timeout = time_msec();
>> +    if (multithread) {
>> +        timeout += 200;
>> +    }
>> +    while (time_msec() <= timeout) {
>> +        ovs_assert(seq_read(e.trigger) == seqno);
>> +    }
>> +
>> +    atomic_store(&e.wait, false);
>> +
>> +    seq_wait(e.trigger, seqno);
>> +    poll_timer_wait_until(time_msec() + 200);
>> +    poll_block();
>> +
>> +    /* Verify that GC executed. */
>> +    ovs_assert(seq_read(e.trigger) != seqno);
>> +    seq_destroy(e.trigger);
>> +
>> +    if (multithread) {
>> +        xpthread_join(waiter, NULL);
>> +    }
>> +}
>> +
>> +static void
>> +test_rcu_inline(int argc OVS_UNUSED, char *argv[] OVS_UNUSED)
>> +{
>> +    const bool multithread = true;
>> +
>> +    test_rcu_inline_main(!multithread);
>> +    test_rcu_inline_main(multithread);
>> +}
>> +
>> +OVSTEST_REGISTER("test-rcu-inline", test_rcu_inline);
>> --
>> 2.34.1
> 
_______________________________________________
dev mailing list
[email protected]
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to