On 16 Mar 2026, at 3:16, Gaetan Rivet wrote:

> On 03/03/2026 14:41, Eelco Chaudron wrote:
>> External email: Use caution opening links or attachments
>>
>>
>> On 9 Feb 2026, at 14:29, Eli Britstein wrote:
>>
>>> From: Gaetan Rivet <[email protected]>
>>>
>>> Add a way to schedule executions with the RCU using memory embedded
>>> within the object being scheduled, if applicable.
>>>
>>> This way, freeing a high volume of objects does not require many small
>>> allocations, reducing memory pressure and heap fragmentation.
>>>
>>> This fragmentation is seen as the heap grows instead of shrinking when
>>> a high volume of objects are freed using RCU.
>>
>> Hi Gaetan,
>>
>> Thanks for the patch. I have some comments inline below (not a complete
>> review yet).
>>
>> I'm curious about the design choice: why implement this as a parallel
>> mechanism rather than refactoring the existing RCU implementation to handle
>> both cases? Was this driven by performance concerns, or are there fundamental
>> differences that require separate code paths?
>>
>> Cheers,
>>
>> Eelco
>>
>
> This is historical.
> At the time, I wanted to make sure the main mechanism would not be disturbed 
> by my changes
> so I kept them separate. As time passed, I didn't have a reason to revisit 
> something that was working.
> I agree that it would make sense to use a single mechanism.

Thanks for the clarification. I suspected there might have been more behind it.

>>> Signed-off-by: Gaetan Rivet <[email protected]>
>>> Co-authored-by: Eli Britstein <[email protected]>
>>> Signed-off-by: Eli Britstein <[email protected]>
>>> ---
>>>   lib/ovs-rcu.c           | 125 ++++++++++++++++++++++++++++++++++------
>>>   lib/ovs-rcu.h           |  39 +++++++++++++
>>>   tests/automake.mk       |   1 +
>>>   tests/library.at        |   4 ++
>>>   tests/test-rcu-inline.c | 118 +++++++++++++++++++++++++++++++++++++
>>>   5 files changed, 270 insertions(+), 17 deletions(-)
>>>   create mode 100644 tests/test-rcu-inline.c
>>>
>>> diff --git a/lib/ovs-rcu.c b/lib/ovs-rcu.c
>>> index 49afcc55c..539d17a7c 100644
>>> --- a/lib/ovs-rcu.c
>>> +++ b/lib/ovs-rcu.c
>>> @@ -20,6 +20,7 @@
>>>   #include "fatal-signal.h"
>>>   #include "guarded-list.h"
>>>   #include "latch.h"
>>> +#include "mpsc-queue.h"
>>>   #include "openvswitch/list.h"
>>>   #include "ovs-thread.h"
>>>   #include "openvswitch/poll-loop.h"
>>> @@ -49,6 +50,7 @@ struct ovsrcu_perthread {
>>>
>>>       uint64_t seqno;
>>>       struct ovsrcu_cbset *cbset;
>>> +    bool do_inline;
>>
>> Do we need this bool? It's not set atomically or under any lock, and once set
>> to true it stays true forever for that thread. Would it be better to check if
>> the inline_queue is empty instead of relying on this sticky flag?
>>
>
> Some quiescent modes will delete the perthread structure (cf. 
> ovsrcu_quiesce_start)
> In this case, this flag is reset when creating the new perthread next active 
> period.
>
> I'll check if the flag can be avoided.

Thanks for looking into this.

>>>       char name[16];              /* This thread's name. */
>>>   };
>>>
>>> @@ -61,6 +63,8 @@ static struct ovs_mutex ovsrcu_threads_mutex;
>>>   static struct guarded_list flushed_cbsets;
>>>   static struct seq *flushed_cbsets_seq;
>>>
>>> +static struct seq *inline_seq;
>>
>> If do_inline is removed (always signal), inline_seq becomes redundant with 
>> global_seqno.
>>
>>> +  static struct latch postpone_exit;
>>>   static struct ovs_barrier postpone_barrier;
>>>
>>> @@ -68,7 +72,8 @@ static void ovsrcu_init_module(void);
>>>   static void ovsrcu_flush_cbset__(struct ovsrcu_perthread *, bool);
>>>   static void ovsrcu_flush_cbset(struct ovsrcu_perthread *);
>>>   static void ovsrcu_unregister__(struct ovsrcu_perthread *);
>>> -static bool ovsrcu_call_postponed(void);
>>> +static bool ovsrcu_call_inline(uint64_t);
>>> +static bool ovsrcu_call_postponed(struct ovs_list *cbsets);
>>>   static void *ovsrcu_postpone_thread(void *arg OVS_UNUSED);
>>>
>>>   static struct ovsrcu_perthread *
>>> @@ -85,6 +90,7 @@ ovsrcu_perthread_get(void)
>>>           perthread = xmalloc(sizeof *perthread);
>>>           perthread->seqno = seq_read(global_seqno);
>>>           perthread->cbset = NULL;
>>> +        perthread->do_inline = false;
>>>           ovs_strlcpy(perthread->name, name[0] ? name : "main",
>>>                       sizeof perthread->name);
>>>
>>> @@ -112,7 +118,11 @@ static void
>>>   ovsrcu_quiesced(void)
>>>   {
>>>       if (single_threaded()) {
>>> -        ovsrcu_call_postponed();
>>> +        struct ovs_list cbsets = OVS_LIST_INITIALIZER(&cbsets);
>>> +
>>> +        ovsrcu_call_inline(seq_read(global_seqno));
>>> +        guarded_list_pop_all(&flushed_cbsets, &cbsets);
>>> +        ovsrcu_call_postponed(&cbsets);
>>>       } else {
>>>           static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
>>>           if (ovsthread_once_start(&once)) {
>>> @@ -156,6 +166,9 @@ ovsrcu_quiesce(void)
>>>       if (perthread->cbset) {
>>>           ovsrcu_flush_cbset(perthread);
>>>       }
>>> +    if (perthread->do_inline) {
>>> +        seq_change(inline_seq);
>>> +    }
>>>       seq_change(global_seqno);
>>>
>>>       ovsrcu_quiesced();
>>> @@ -174,6 +187,9 @@ ovsrcu_try_quiesce(void)
>>>           if (perthread->cbset) {
>>>               ovsrcu_flush_cbset__(perthread, true);
>>>           }
>>> +        if (perthread->do_inline) {
>>> +            seq_change_protected(inline_seq);
>>> +        }
>>>           seq_change_protected(global_seqno);
>>>           seq_unlock();
>>>           ovsrcu_quiesced();
>>> @@ -189,21 +205,28 @@ ovsrcu_is_quiescent(void)
>>>       return pthread_getspecific(perthread_key) == NULL;
>>>   }
>>>
>>> -void
>>> -ovsrcu_synchronize(void)
>>> +static uint64_t
>>> +ovsrcu_synchronize_(struct ovs_list *cbsets)
>>
>> We use a double _ for specific variants.
>>
>
> Ack.
>
>>>   {
>>>       unsigned int warning_threshold = 1000;
>>>       uint64_t target_seqno;
>>>       long long int start;
>>>
>>>       if (single_threaded()) {
>>> -        return;
>>> +        return UINT64_MAX;
>>>       }
>>>
>>>       target_seqno = seq_read(global_seqno);
>>>       ovsrcu_quiesce_start();
>>>       start = time_msec();
>>>
>>> +    if (cbsets != NULL) {
>>> +        /* Move the flushed 'cbsets' after 'ovsrcu_quiesce_start',
>>> +         * as this function has the potential in single-threaded mode
>>> +         * to itself execute those 'cbsets'. */
>>
>> This comment doesn't make sense. We can only reach here in multi-threaded 
>> mode
>> (single-threaded returns early), so the described scenario is impossible.
>>
>>> +        guarded_list_pop_all(&flushed_cbsets, cbsets);
>>> +    }
>>> +
>>>       for (;;) {
>>>           uint64_t cur_seqno = seq_read(global_seqno);
>>>           struct ovsrcu_perthread *perthread;
>>> @@ -237,6 +260,15 @@ ovsrcu_synchronize(void)
>>>           poll_block();
>>>       }
>>>       ovsrcu_quiesce_end();
>>> +
>>> +    /* Return the 'seqno' that is safe to consider reached by all threads. 
>>> */
>>> +    return target_seqno;
>>> +}
>>> +
>>> +void
>>> +ovsrcu_synchronize(void)
>>> +{
>>> +    ovs_ignore(ovsrcu_synchronize_(NULL));
>>>   }
>>>
>>>   /* Waits until as many postponed callbacks as possible have executed.
>>> @@ -275,11 +307,57 @@ ovsrcu_exit(void)
>>>        * infinite loop.  This function is just for making memory leaks 
>>> easier to
>>>        * spot so there's no point in breaking things on that basis. */
>>>       for (int i = 0; i < 8; i++) {
>>> -        ovsrcu_synchronize();
>>> -        if (!ovsrcu_call_postponed()) {
>>> +        struct ovs_list cbsets = OVS_LIST_INITIALIZER(&cbsets);
>>> +        uint64_t target = ovsrcu_synchronize_(&cbsets);
>>> +        bool inline_active;
>>> +        bool cbsets_active;
>>> +
>>> +        /* Both RCU calls must be examined for activity. */
>>> +        inline_active = ovsrcu_call_inline(target);
>>
>> The inline naming is confusing. Consider using "embedded" instead, like
>> ovsrcu_postpone_embedded(). For consistency, this function would be
>> ovsrcu_call_postpone_embedded().
>>
>
> Ack.
>
>>> +        cbsets_active = ovsrcu_call_postponed(&cbsets);
>>> +        if (!inline_active && !cbsets_active) {
>>> +            break;
>>> +        }
>>> +    }
>>> +}
>>> +
>>> +static struct mpsc_queue inline_queue = 
>>> MPSC_QUEUE_INITIALIZER(&inline_queue);
>>> +
>>> +void
>>> +ovsrcu_postpone_inline__(void (*function)(void *aux), void *aux,
>>> +                         struct ovsrcu_inline_node *rcu_node)
>>> +{
>>> +    struct ovsrcu_perthread *perthread = ovsrcu_perthread_get();
>>> +
>>> +    rcu_node->seqno = perthread->seqno;
>>> +    rcu_node->cb = function;
>>> +    rcu_node->aux = aux;
>>> +    mpsc_queue_insert(&inline_queue, &rcu_node->node);
>>> +
>>> +    perthread->do_inline = true;
>>> +}
>>> +
>>> +static bool
>>> +ovsrcu_call_inline(uint64_t target_seqno)
>>> +{
>>> +    struct mpsc_queue_node *msg;
>>> +    unsigned int count = 0;
>>> +
>>> +    mpsc_queue_acquire(&inline_queue);
>>> +    MPSC_QUEUE_FOR_EACH_POP (msg, &inline_queue) {
>>> +        struct ovsrcu_inline_node *node;
>>> +
>>> +        node = CONTAINER_OF(msg, struct ovsrcu_inline_node, node);
>>> +        if (node->seqno >= target_seqno) {
>>> +            mpsc_queue_push_front(&inline_queue, msg);
>>>               break;
>>>           }
>>> +        node->cb(node->aux);
>>> +        count++;
>>>       }
>>> +    mpsc_queue_release(&inline_queue);
>>> +
>>> +    return count > 0;
>>>   }
>>>
>>>   /* Registers 'function' to be called, passing 'aux' as argument, after the
>>> @@ -327,19 +405,18 @@ ovsrcu_postpone__(void (*function)(void *aux), void 
>>> *aux)
>>>   }
>>>
>>>   static bool OVS_NO_SANITIZE_FUNCTION
>>> -ovsrcu_call_postponed(void)
>>> +ovsrcu_call_postponed(struct ovs_list *cbsets)
>>>   {
>>>       struct ovsrcu_cbset *cbset;
>>> -    struct ovs_list cbsets;
>>>
>>> -    guarded_list_pop_all(&flushed_cbsets, &cbsets);
>>> -    if (ovs_list_is_empty(&cbsets)) {
>>> +    if (cbsets == NULL) {
>>> +        return false;
>>> +    }
>>> +    if (ovs_list_is_empty(cbsets)) {
>>
>> These two ifs could be combined. Also, why does cbsets need to be a 
>> parameter?
>> (See earlier comment about the confusing comment in ovsrcu_synchronize_.)
>>
>
> Ack on combined if().
>
> The cbsets are provided as parameter due to the ovsrcu_synchronize() call
> below that is removed.
>
> It is removed to allow the inlined / embedded nodes to be called properly.
> Instead of executing "sync" as part of "call_postponed":
>
>   + call_postponed
>       +--> pop cbsets
>       +--> sync()
>       +--> execute cbsets
>
> becomes:
>
>   + pop cbsets
>   + sync
>   + call_inlined
>   + call_posponed(cbsets)
>
> Removing sync from inside "call_postponed" means that new cbsets might be 
> scheduled by another thread,
> then this thread enters ovsrcu_call_postponed, pops all cbsets and start 
> executing them (now without
> sync): it would execute those new cbsets immediately, breaking RCU.
>
> To avoid this situation, the cbsets are popped before entering this function.
> A sync point is then executed, and the previously popped cbsets are provided
> as parameter to "postponed" to be executed.
>
> If both inline / embedded and regular posponed cbsets use a single defer 
> mechanism, maybe this whole
> sync reordering can be avoided altogether, removing this subtle bug and 
> general confusing changes.
>
> I'll try it.

Thanks, will wait for the v3.

>>>           return false;
>>>       }
>>>
>>> -    ovsrcu_synchronize();
>>> -
>>> -    LIST_FOR_EACH_POP (cbset, list_node, &cbsets) {
>>> +    LIST_FOR_EACH_POP (cbset, list_node, cbsets) {
>>>           struct ovsrcu_cb *cb;
>>>
>>>           for (cb = cbset->cbs; cb < &cbset->cbs[cbset->n_cbs]; cb++) {
>>> @@ -358,9 +435,19 @@ ovsrcu_postpone_thread(void *arg OVS_UNUSED)
>>>       pthread_detach(pthread_self());
>>>
>>>       while (!latch_is_set(&postpone_exit)) {
>>> -        uint64_t seqno = seq_read(flushed_cbsets_seq);
>>> -        if (!ovsrcu_call_postponed()) {
>>> -            seq_wait(flushed_cbsets_seq, seqno);
>>> +        struct ovs_list cbsets = OVS_LIST_INITIALIZER(&cbsets);
>>> +        uint64_t cb_seqno = seq_read(flushed_cbsets_seq);
>>> +        uint64_t target = ovsrcu_synchronize_(&cbsets);
>>> +        uint64_t inline_seqno = seq_read(inline_seq);
>>> +        bool inline_active;
>>> +        bool cbsets_active;
>>> +
>>> +        /* Both RCU calls must be examined for activity. */
>>> +        inline_active = ovsrcu_call_inline(target);
>>> +        cbsets_active = ovsrcu_call_postponed(&cbsets);
>>> +        if (!inline_active && !cbsets_active) {
>>> +            seq_wait(flushed_cbsets_seq, cb_seqno);
>>> +            seq_wait(inline_seq, inline_seqno);
>>>               latch_wait(&postpone_exit);
>>>               poll_block();
>>>           }
>>> @@ -399,6 +486,9 @@ ovsrcu_unregister__(struct ovsrcu_perthread *perthread)
>>>       if (perthread->cbset) {
>>>           ovsrcu_flush_cbset(perthread);
>>>       }
>>> +    if (perthread->do_inline) {
>>> +        seq_change(inline_seq);
>>> +    }
>>>
>>>       ovs_mutex_lock(&ovsrcu_threads_mutex);
>>>       ovs_list_remove(&perthread->list_node);
>>> @@ -440,6 +530,7 @@ ovsrcu_init_module(void)
>>>
>>>           guarded_list_init(&flushed_cbsets);
>>>           flushed_cbsets_seq = seq_create();
>>> +        inline_seq = seq_create();
>>>
>>>           ovsthread_once_done(&once);
>>>       }
>>> diff --git a/lib/ovs-rcu.h b/lib/ovs-rcu.h
>>> index a1c15c126..ed756c1c2 100644
>>> --- a/lib/ovs-rcu.h
>>> +++ b/lib/ovs-rcu.h
>>> @@ -125,6 +125,22 @@
>>>    *         ovs_mutex_unlock(&mutex);
>>>    *     }
>>>    *
>>> + * As an alternative to ovsrcu_postpone(), the same deferred execution can 
>>> be
>>> + * achieved using ovsrcu_postpone_inline():
>>> + *
>>> + *      struct deferrable {
>>> + *          struct ovsrcu_inline_node rcu_node;
>>> + *      };
>>> + *
>>> + *      void
>>> + *      deferred_free(struct deferrable *d)
>>> + *      {
>>> + *          ovsrcu_postpone_inline(free, d, rcu_node);
>>> + *      }
>>> + *
>>> + * Using inline fields can be preferred sometimes to avoid the small
>>> + * allocations done in ovsrcu_postpone().
>>> + *
>>>    * In some rare cases an object may not be addressable with a pointer, 
>>> but only
>>>    * through an array index (e.g. because it's provided by another 
>>> library).  It
>>>    * is still possible to have RCU semantics by using the ovsrcu_index type.
>>> @@ -171,6 +187,7 @@
>>>    */
>>>
>>>   #include "compiler.h"
>>> +#include "mpsc-queue.h"
>>>   #include "ovs-atomic.h"
>>>
>>>   #if __GNUC__
>>> @@ -256,6 +273,28 @@ void ovsrcu_postpone__(void (*function)(void *aux), 
>>> void *aux);
>>>        (void) sizeof(*(ARG)),                                     \
>>>        ovsrcu_postpone__((void (*)(void *))(FUNCTION), ARG))
>>>
>>> +struct ovsrcu_inline_node {
>>> +    struct mpsc_queue_node node;
>>> +    void (*cb)(void *aux);
>>> +    void *aux;
>>> +    uint64_t seqno;
>>> +};
>>> +
>>> +/* Calls FUNCTION passing ARG as its pointer-type argument, which
>>> + * contains an 'ovsrcu_inline_node' as a field named MEMBER. The function
>>> + * is called following the next grace period.  See 'Usage' above for an
>>> + * example.
>>> + */
>>> +void ovsrcu_postpone_inline__(void (*function)(void *aux), void *aux,
>>> +                              struct ovsrcu_inline_node *node);
>>> +#define ovsrcu_postpone_inline(FUNCTION, ARG, MEMBER)               \
>>> +    (/* Verify that ARG is appropriate for FUNCTION. */             \
>>> +     (void) sizeof((FUNCTION)(ARG), 1),                             \
>>> +     /* Verify that ARG is a pointer type. */                       \
>>> +     (void) sizeof(*(ARG)),                                         \
>>> +     ovsrcu_postpone_inline__((void (*)(void *))(FUNCTION), ARG,    \
>>> +                              &(ARG)->MEMBER))
>>> +
>>>   /* An array index protected by RCU semantics.  This is an easier 
>>> alternative to
>>>    * an RCU protected pointer to a malloc'd int. */
>>>   typedef struct { atomic_int v; } ovsrcu_index;
>>> diff --git a/tests/automake.mk b/tests/automake.mk
>>> index da569b022..da4d2e0b8 100644
>>> --- a/tests/automake.mk
>>> +++ b/tests/automake.mk
>>> @@ -500,6 +500,7 @@ tests_ovstest_SOURCES = \
>>>        tests/test-random.c \
>>>        tests/test-rcu.c \
>>>        tests/test-rculist.c \
>>> +     tests/test-rcu-inline.c \
>>>        tests/test-reconnect.c \
>>>        tests/test-rstp.c \
>>>        tests/test-sflow.c \
>>> diff --git a/tests/library.at b/tests/library.at
>>> index 82ac80a27..16820ff49 100644
>>> --- a/tests/library.at
>>> +++ b/tests/library.at
>>> @@ -275,6 +275,10 @@ AT_SETUP([rcu])
>>>   AT_CHECK([ovstest test-rcu], [0], [])
>>>   AT_CLEANUP
>>>
>>> +AT_SETUP([rcu inline])
>>> +AT_CHECK([ovstest test-rcu-inline], [0], [])
>>> +AT_CLEANUP
>>> +
>>>   AT_SETUP([stopwatch module])
>>>   AT_CHECK([ovstest test-stopwatch], [0], [......
>>>   ], [ignore])
>>> diff --git a/tests/test-rcu-inline.c b/tests/test-rcu-inline.c
>>> new file mode 100644
>>> index 000000000..c72410223
>>> --- /dev/null
>>> +++ b/tests/test-rcu-inline.c
>>> @@ -0,0 +1,118 @@
>>> +/*
>>> + * SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & 
>>> AFFILIATES.
>>> + * All rights reserved.
>>> + * SPDX-License-Identifier: Apache-2.0
>>> + *
>>> + * Licensed under the Apache License, Version 2.0 (the "License");
>>> + * you may not use this file except in compliance with the License.
>>> + * You may obtain a copy of the License at
>>> + *
>>> + * http://www.apache.org/licenses/LICENSE-2.0
>>> + *
>>> + * Unless required by applicable law or agreed to in writing, software
>>> + * distributed under the License is distributed on an "AS IS" BASIS,
>>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>>> + * See the License for the specific language governing permissions and
>>> + * limitations under the License.
>>> + */
>>> +
>>> +#include <config.h>
>>> +
>>> +#undef NDEBUG
>>> +#include "ovs-atomic.h"
>>> +#include "ovs-rcu.h"
>>> +#include "ovs-thread.h"
>>> +#include "ovstest.h"
>>> +#include "seq.h"
>>> +#include "timeval.h"
>>> +#include "util.h"
>>> +
>>> +#include "openvswitch/poll-loop.h"
>>> +
>>> +struct element {
>>> +    struct ovsrcu_inline_node rcu_node;
>>> +    struct seq *trigger;
>>> +    atomic_bool wait;
>>> +};
>>> +
>>> +static void
>>> +do_inline(void *e_)
>>> +{
>>> +    struct element *e = (struct element *) e_;
>>> +
>>> +    seq_change(e->trigger);
>>> +}
>>> +
>>> +static void *
>>> +wait_main(void *aux)
>>> +{
>>> +    struct element *e = aux;
>>> +
>>> +    for (;;) {
>>> +        bool wait;
>>> +
>>> +        atomic_read(&e->wait, &wait);
>>> +        if (!wait) {
>>> +            break;
>>> +        }
>>> +    }
>>> +
>>> +    seq_wait(e->trigger, seq_read(e->trigger));
>>> +    poll_block();
>>> +
>>> +    return NULL;
>>> +}
>>> +
>>> +static void
>>> +test_rcu_inline_main(bool multithread)
>>> +{
>>> +    long long int timeout;
>>> +    pthread_t waiter;
>>> +    struct element e;
>>> +    uint64_t seqno;
>>> +
>>> +    atomic_init(&e.wait, true);
>>> +
>>> +    if (multithread) {
>>> +        waiter = ovs_thread_create("waiter", wait_main, &e);
>>> +    }
>>> +
>>> +    e.trigger = seq_create();
>>> +    seqno = seq_read(e.trigger);
>>> +
>>> +    ovsrcu_postpone_inline(do_inline, &e, rcu_node);
>>> +
>>> +    /* Check that GC holds out until all threads are quiescent. */
>>> +    timeout = time_msec();
>>> +    if (multithread) {
>>> +        timeout += 200;
>>> +    }
>>> +    while (time_msec() <= timeout) {
>>> +        ovs_assert(seq_read(e.trigger) == seqno);
>>> +    }
>>> +
>>> +    atomic_store(&e.wait, false);
>>> +
>>> +    seq_wait(e.trigger, seqno);
>>> +    poll_timer_wait_until(time_msec() + 200);
>>> +    poll_block();
>>> +
>>> +    /* Verify that GC executed. */
>>> +    ovs_assert(seq_read(e.trigger) != seqno);
>>> +    seq_destroy(e.trigger);
>>> +
>>> +    if (multithread) {
>>> +        xpthread_join(waiter, NULL);
>>> +    }
>>> +}
>>> +
>>> +static void
>>> +test_rcu_inline(int argc OVS_UNUSED, char *argv[] OVS_UNUSED)
>>> +{
>>> +    const bool multithread = true;
>>> +
>>> +    test_rcu_inline_main(!multithread);
>>> +    test_rcu_inline_main(multithread);
>>> +}
>>> +
>>> +OVSTEST_REGISTER("test-rcu-inline", test_rcu_inline);
>>> --
>>> 2.34.1
>>

_______________________________________________
dev mailing list
[email protected]
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to