On 16 Mar 2026, at 3:16, Gaetan Rivet wrote:
> On 03/03/2026 14:41, Eelco Chaudron wrote:
>> External email: Use caution opening links or attachments
>>
>>
>> On 9 Feb 2026, at 14:29, Eli Britstein wrote:
>>
>>> From: Gaetan Rivet <[email protected]>
>>>
>>> Add a way to schedule executions with the RCU using memory embedded
>>> within the object being scheduled, if applicable.
>>>
>>> This way, freeing a high volume of objects does not require many small
>>> allocations, reducing memory pressure and heap fragmentation.
>>>
>>> This fragmentation is seen as the heap grows instead of shrinking when
>>> a high volume of objects are freed using RCU.
>>
>> Hi Gaetan,
>>
>> Thanks for the patch. I have some comments inline below (not a complete
>> review yet).
>>
>> I'm curious about the design choice: why implement this as a parallel
>> mechanism rather than refactoring the existing RCU implementation to handle
>> both cases? Was this driven by performance concerns, or are there fundamental
>> differences that require separate code paths?
>>
>> Cheers,
>>
>> Eelco
>>
>
> This is historical.
> At the time, I wanted to make sure the main mechanism would not be disturbed
> by my changes
> so I kept them separate. As time passed, I didn't have a reason to revisit
> something that was working.
> I agree that it would make sense to use a single mechanism.
Thanks for the clarification. I suspected there might have been more behind it.
>>> Signed-off-by: Gaetan Rivet <[email protected]>
>>> Co-authored-by: Eli Britstein <[email protected]>
>>> Signed-off-by: Eli Britstein <[email protected]>
>>> ---
>>> lib/ovs-rcu.c | 125 ++++++++++++++++++++++++++++++++++------
>>> lib/ovs-rcu.h | 39 +++++++++++++
>>> tests/automake.mk | 1 +
>>> tests/library.at | 4 ++
>>> tests/test-rcu-inline.c | 118 +++++++++++++++++++++++++++++++++++++
>>> 5 files changed, 270 insertions(+), 17 deletions(-)
>>> create mode 100644 tests/test-rcu-inline.c
>>>
>>> diff --git a/lib/ovs-rcu.c b/lib/ovs-rcu.c
>>> index 49afcc55c..539d17a7c 100644
>>> --- a/lib/ovs-rcu.c
>>> +++ b/lib/ovs-rcu.c
>>> @@ -20,6 +20,7 @@
>>> #include "fatal-signal.h"
>>> #include "guarded-list.h"
>>> #include "latch.h"
>>> +#include "mpsc-queue.h"
>>> #include "openvswitch/list.h"
>>> #include "ovs-thread.h"
>>> #include "openvswitch/poll-loop.h"
>>> @@ -49,6 +50,7 @@ struct ovsrcu_perthread {
>>>
>>> uint64_t seqno;
>>> struct ovsrcu_cbset *cbset;
>>> + bool do_inline;
>>
>> Do we need this bool? It's not set atomically or under any lock, and once set
>> to true it stays true forever for that thread. Would it be better to check if
>> the inline_queue is empty instead of relying on this sticky flag?
>>
>
> Some quiescent modes will delete the perthread structure (cf.
> ovsrcu_quiesce_start)
> In this case, this flag is reset when creating the new perthread next active
> period.
>
> I'll check if the flag can be avoided.
Thanks for looking into this.
>>> char name[16]; /* This thread's name. */
>>> };
>>>
>>> @@ -61,6 +63,8 @@ static struct ovs_mutex ovsrcu_threads_mutex;
>>> static struct guarded_list flushed_cbsets;
>>> static struct seq *flushed_cbsets_seq;
>>>
>>> +static struct seq *inline_seq;
>>
>> If do_inline is removed (always signal), inline_seq becomes redundant with
>> global_seqno.
>>
>>> + static struct latch postpone_exit;
>>> static struct ovs_barrier postpone_barrier;
>>>
>>> @@ -68,7 +72,8 @@ static void ovsrcu_init_module(void);
>>> static void ovsrcu_flush_cbset__(struct ovsrcu_perthread *, bool);
>>> static void ovsrcu_flush_cbset(struct ovsrcu_perthread *);
>>> static void ovsrcu_unregister__(struct ovsrcu_perthread *);
>>> -static bool ovsrcu_call_postponed(void);
>>> +static bool ovsrcu_call_inline(uint64_t);
>>> +static bool ovsrcu_call_postponed(struct ovs_list *cbsets);
>>> static void *ovsrcu_postpone_thread(void *arg OVS_UNUSED);
>>>
>>> static struct ovsrcu_perthread *
>>> @@ -85,6 +90,7 @@ ovsrcu_perthread_get(void)
>>> perthread = xmalloc(sizeof *perthread);
>>> perthread->seqno = seq_read(global_seqno);
>>> perthread->cbset = NULL;
>>> + perthread->do_inline = false;
>>> ovs_strlcpy(perthread->name, name[0] ? name : "main",
>>> sizeof perthread->name);
>>>
>>> @@ -112,7 +118,11 @@ static void
>>> ovsrcu_quiesced(void)
>>> {
>>> if (single_threaded()) {
>>> - ovsrcu_call_postponed();
>>> + struct ovs_list cbsets = OVS_LIST_INITIALIZER(&cbsets);
>>> +
>>> + ovsrcu_call_inline(seq_read(global_seqno));
>>> + guarded_list_pop_all(&flushed_cbsets, &cbsets);
>>> + ovsrcu_call_postponed(&cbsets);
>>> } else {
>>> static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
>>> if (ovsthread_once_start(&once)) {
>>> @@ -156,6 +166,9 @@ ovsrcu_quiesce(void)
>>> if (perthread->cbset) {
>>> ovsrcu_flush_cbset(perthread);
>>> }
>>> + if (perthread->do_inline) {
>>> + seq_change(inline_seq);
>>> + }
>>> seq_change(global_seqno);
>>>
>>> ovsrcu_quiesced();
>>> @@ -174,6 +187,9 @@ ovsrcu_try_quiesce(void)
>>> if (perthread->cbset) {
>>> ovsrcu_flush_cbset__(perthread, true);
>>> }
>>> + if (perthread->do_inline) {
>>> + seq_change_protected(inline_seq);
>>> + }
>>> seq_change_protected(global_seqno);
>>> seq_unlock();
>>> ovsrcu_quiesced();
>>> @@ -189,21 +205,28 @@ ovsrcu_is_quiescent(void)
>>> return pthread_getspecific(perthread_key) == NULL;
>>> }
>>>
>>> -void
>>> -ovsrcu_synchronize(void)
>>> +static uint64_t
>>> +ovsrcu_synchronize_(struct ovs_list *cbsets)
>>
>> We use a double _ for specific variants.
>>
>
> Ack.
>
>>> {
>>> unsigned int warning_threshold = 1000;
>>> uint64_t target_seqno;
>>> long long int start;
>>>
>>> if (single_threaded()) {
>>> - return;
>>> + return UINT64_MAX;
>>> }
>>>
>>> target_seqno = seq_read(global_seqno);
>>> ovsrcu_quiesce_start();
>>> start = time_msec();
>>>
>>> + if (cbsets != NULL) {
>>> + /* Move the flushed 'cbsets' after 'ovsrcu_quiesce_start',
>>> + * as this function has the potential in single-threaded mode
>>> + * to itself execute those 'cbsets'. */
>>
>> This comment doesn't make sense. We can only reach here in multi-threaded
>> mode
>> (single-threaded returns early), so the described scenario is impossible.
>>
>>> + guarded_list_pop_all(&flushed_cbsets, cbsets);
>>> + }
>>> +
>>> for (;;) {
>>> uint64_t cur_seqno = seq_read(global_seqno);
>>> struct ovsrcu_perthread *perthread;
>>> @@ -237,6 +260,15 @@ ovsrcu_synchronize(void)
>>> poll_block();
>>> }
>>> ovsrcu_quiesce_end();
>>> +
>>> + /* Return the 'seqno' that is safe to consider reached by all threads.
>>> */
>>> + return target_seqno;
>>> +}
>>> +
>>> +void
>>> +ovsrcu_synchronize(void)
>>> +{
>>> + ovs_ignore(ovsrcu_synchronize_(NULL));
>>> }
>>>
>>> /* Waits until as many postponed callbacks as possible have executed.
>>> @@ -275,11 +307,57 @@ ovsrcu_exit(void)
>>> * infinite loop. This function is just for making memory leaks
>>> easier to
>>> * spot so there's no point in breaking things on that basis. */
>>> for (int i = 0; i < 8; i++) {
>>> - ovsrcu_synchronize();
>>> - if (!ovsrcu_call_postponed()) {
>>> + struct ovs_list cbsets = OVS_LIST_INITIALIZER(&cbsets);
>>> + uint64_t target = ovsrcu_synchronize_(&cbsets);
>>> + bool inline_active;
>>> + bool cbsets_active;
>>> +
>>> + /* Both RCU calls must be examined for activity. */
>>> + inline_active = ovsrcu_call_inline(target);
>>
>> The inline naming is confusing. Consider using "embedded" instead, like
>> ovsrcu_postpone_embedded(). For consistency, this function would be
>> ovsrcu_call_postpone_embedded().
>>
>
> Ack.
>
>>> + cbsets_active = ovsrcu_call_postponed(&cbsets);
>>> + if (!inline_active && !cbsets_active) {
>>> + break;
>>> + }
>>> + }
>>> +}
>>> +
>>> +static struct mpsc_queue inline_queue =
>>> MPSC_QUEUE_INITIALIZER(&inline_queue);
>>> +
>>> +void
>>> +ovsrcu_postpone_inline__(void (*function)(void *aux), void *aux,
>>> + struct ovsrcu_inline_node *rcu_node)
>>> +{
>>> + struct ovsrcu_perthread *perthread = ovsrcu_perthread_get();
>>> +
>>> + rcu_node->seqno = perthread->seqno;
>>> + rcu_node->cb = function;
>>> + rcu_node->aux = aux;
>>> + mpsc_queue_insert(&inline_queue, &rcu_node->node);
>>> +
>>> + perthread->do_inline = true;
>>> +}
>>> +
>>> +static bool
>>> +ovsrcu_call_inline(uint64_t target_seqno)
>>> +{
>>> + struct mpsc_queue_node *msg;
>>> + unsigned int count = 0;
>>> +
>>> + mpsc_queue_acquire(&inline_queue);
>>> + MPSC_QUEUE_FOR_EACH_POP (msg, &inline_queue) {
>>> + struct ovsrcu_inline_node *node;
>>> +
>>> + node = CONTAINER_OF(msg, struct ovsrcu_inline_node, node);
>>> + if (node->seqno >= target_seqno) {
>>> + mpsc_queue_push_front(&inline_queue, msg);
>>> break;
>>> }
>>> + node->cb(node->aux);
>>> + count++;
>>> }
>>> + mpsc_queue_release(&inline_queue);
>>> +
>>> + return count > 0;
>>> }
>>>
>>> /* Registers 'function' to be called, passing 'aux' as argument, after the
>>> @@ -327,19 +405,18 @@ ovsrcu_postpone__(void (*function)(void *aux), void
>>> *aux)
>>> }
>>>
>>> static bool OVS_NO_SANITIZE_FUNCTION
>>> -ovsrcu_call_postponed(void)
>>> +ovsrcu_call_postponed(struct ovs_list *cbsets)
>>> {
>>> struct ovsrcu_cbset *cbset;
>>> - struct ovs_list cbsets;
>>>
>>> - guarded_list_pop_all(&flushed_cbsets, &cbsets);
>>> - if (ovs_list_is_empty(&cbsets)) {
>>> + if (cbsets == NULL) {
>>> + return false;
>>> + }
>>> + if (ovs_list_is_empty(cbsets)) {
>>
>> These two ifs could be combined. Also, why does cbsets need to be a
>> parameter?
>> (See earlier comment about the confusing comment in ovsrcu_synchronize_.)
>>
>
> Ack on combined if().
>
> The cbsets are provided as parameter due to the ovsrcu_synchronize() call
> below that is removed.
>
> It is removed to allow the inlined / embedded nodes to be called properly.
> Instead of executing "sync" as part of "call_postponed":
>
> + call_postponed
> +--> pop cbsets
> +--> sync()
> +--> execute cbsets
>
> becomes:
>
> + pop cbsets
> + sync
> + call_inlined
> + call_posponed(cbsets)
>
> Removing sync from inside "call_postponed" means that new cbsets might be
> scheduled by another thread,
> then this thread enters ovsrcu_call_postponed, pops all cbsets and start
> executing them (now without
> sync): it would execute those new cbsets immediately, breaking RCU.
>
> To avoid this situation, the cbsets are popped before entering this function.
> A sync point is then executed, and the previously popped cbsets are provided
> as parameter to "postponed" to be executed.
>
> If both inline / embedded and regular posponed cbsets use a single defer
> mechanism, maybe this whole
> sync reordering can be avoided altogether, removing this subtle bug and
> general confusing changes.
>
> I'll try it.
Thanks, will wait for the v3.
>>> return false;
>>> }
>>>
>>> - ovsrcu_synchronize();
>>> -
>>> - LIST_FOR_EACH_POP (cbset, list_node, &cbsets) {
>>> + LIST_FOR_EACH_POP (cbset, list_node, cbsets) {
>>> struct ovsrcu_cb *cb;
>>>
>>> for (cb = cbset->cbs; cb < &cbset->cbs[cbset->n_cbs]; cb++) {
>>> @@ -358,9 +435,19 @@ ovsrcu_postpone_thread(void *arg OVS_UNUSED)
>>> pthread_detach(pthread_self());
>>>
>>> while (!latch_is_set(&postpone_exit)) {
>>> - uint64_t seqno = seq_read(flushed_cbsets_seq);
>>> - if (!ovsrcu_call_postponed()) {
>>> - seq_wait(flushed_cbsets_seq, seqno);
>>> + struct ovs_list cbsets = OVS_LIST_INITIALIZER(&cbsets);
>>> + uint64_t cb_seqno = seq_read(flushed_cbsets_seq);
>>> + uint64_t target = ovsrcu_synchronize_(&cbsets);
>>> + uint64_t inline_seqno = seq_read(inline_seq);
>>> + bool inline_active;
>>> + bool cbsets_active;
>>> +
>>> + /* Both RCU calls must be examined for activity. */
>>> + inline_active = ovsrcu_call_inline(target);
>>> + cbsets_active = ovsrcu_call_postponed(&cbsets);
>>> + if (!inline_active && !cbsets_active) {
>>> + seq_wait(flushed_cbsets_seq, cb_seqno);
>>> + seq_wait(inline_seq, inline_seqno);
>>> latch_wait(&postpone_exit);
>>> poll_block();
>>> }
>>> @@ -399,6 +486,9 @@ ovsrcu_unregister__(struct ovsrcu_perthread *perthread)
>>> if (perthread->cbset) {
>>> ovsrcu_flush_cbset(perthread);
>>> }
>>> + if (perthread->do_inline) {
>>> + seq_change(inline_seq);
>>> + }
>>>
>>> ovs_mutex_lock(&ovsrcu_threads_mutex);
>>> ovs_list_remove(&perthread->list_node);
>>> @@ -440,6 +530,7 @@ ovsrcu_init_module(void)
>>>
>>> guarded_list_init(&flushed_cbsets);
>>> flushed_cbsets_seq = seq_create();
>>> + inline_seq = seq_create();
>>>
>>> ovsthread_once_done(&once);
>>> }
>>> diff --git a/lib/ovs-rcu.h b/lib/ovs-rcu.h
>>> index a1c15c126..ed756c1c2 100644
>>> --- a/lib/ovs-rcu.h
>>> +++ b/lib/ovs-rcu.h
>>> @@ -125,6 +125,22 @@
>>> * ovs_mutex_unlock(&mutex);
>>> * }
>>> *
>>> + * As an alternative to ovsrcu_postpone(), the same deferred execution can
>>> be
>>> + * achieved using ovsrcu_postpone_inline():
>>> + *
>>> + * struct deferrable {
>>> + * struct ovsrcu_inline_node rcu_node;
>>> + * };
>>> + *
>>> + * void
>>> + * deferred_free(struct deferrable *d)
>>> + * {
>>> + * ovsrcu_postpone_inline(free, d, rcu_node);
>>> + * }
>>> + *
>>> + * Using inline fields can be preferred sometimes to avoid the small
>>> + * allocations done in ovsrcu_postpone().
>>> + *
>>> * In some rare cases an object may not be addressable with a pointer,
>>> but only
>>> * through an array index (e.g. because it's provided by another
>>> library). It
>>> * is still possible to have RCU semantics by using the ovsrcu_index type.
>>> @@ -171,6 +187,7 @@
>>> */
>>>
>>> #include "compiler.h"
>>> +#include "mpsc-queue.h"
>>> #include "ovs-atomic.h"
>>>
>>> #if __GNUC__
>>> @@ -256,6 +273,28 @@ void ovsrcu_postpone__(void (*function)(void *aux),
>>> void *aux);
>>> (void) sizeof(*(ARG)), \
>>> ovsrcu_postpone__((void (*)(void *))(FUNCTION), ARG))
>>>
>>> +struct ovsrcu_inline_node {
>>> + struct mpsc_queue_node node;
>>> + void (*cb)(void *aux);
>>> + void *aux;
>>> + uint64_t seqno;
>>> +};
>>> +
>>> +/* Calls FUNCTION passing ARG as its pointer-type argument, which
>>> + * contains an 'ovsrcu_inline_node' as a field named MEMBER. The function
>>> + * is called following the next grace period. See 'Usage' above for an
>>> + * example.
>>> + */
>>> +void ovsrcu_postpone_inline__(void (*function)(void *aux), void *aux,
>>> + struct ovsrcu_inline_node *node);
>>> +#define ovsrcu_postpone_inline(FUNCTION, ARG, MEMBER) \
>>> + (/* Verify that ARG is appropriate for FUNCTION. */ \
>>> + (void) sizeof((FUNCTION)(ARG), 1), \
>>> + /* Verify that ARG is a pointer type. */ \
>>> + (void) sizeof(*(ARG)), \
>>> + ovsrcu_postpone_inline__((void (*)(void *))(FUNCTION), ARG, \
>>> + &(ARG)->MEMBER))
>>> +
>>> /* An array index protected by RCU semantics. This is an easier
>>> alternative to
>>> * an RCU protected pointer to a malloc'd int. */
>>> typedef struct { atomic_int v; } ovsrcu_index;
>>> diff --git a/tests/automake.mk b/tests/automake.mk
>>> index da569b022..da4d2e0b8 100644
>>> --- a/tests/automake.mk
>>> +++ b/tests/automake.mk
>>> @@ -500,6 +500,7 @@ tests_ovstest_SOURCES = \
>>> tests/test-random.c \
>>> tests/test-rcu.c \
>>> tests/test-rculist.c \
>>> + tests/test-rcu-inline.c \
>>> tests/test-reconnect.c \
>>> tests/test-rstp.c \
>>> tests/test-sflow.c \
>>> diff --git a/tests/library.at b/tests/library.at
>>> index 82ac80a27..16820ff49 100644
>>> --- a/tests/library.at
>>> +++ b/tests/library.at
>>> @@ -275,6 +275,10 @@ AT_SETUP([rcu])
>>> AT_CHECK([ovstest test-rcu], [0], [])
>>> AT_CLEANUP
>>>
>>> +AT_SETUP([rcu inline])
>>> +AT_CHECK([ovstest test-rcu-inline], [0], [])
>>> +AT_CLEANUP
>>> +
>>> AT_SETUP([stopwatch module])
>>> AT_CHECK([ovstest test-stopwatch], [0], [......
>>> ], [ignore])
>>> diff --git a/tests/test-rcu-inline.c b/tests/test-rcu-inline.c
>>> new file mode 100644
>>> index 000000000..c72410223
>>> --- /dev/null
>>> +++ b/tests/test-rcu-inline.c
>>> @@ -0,0 +1,118 @@
>>> +/*
>>> + * SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION &
>>> AFFILIATES.
>>> + * All rights reserved.
>>> + * SPDX-License-Identifier: Apache-2.0
>>> + *
>>> + * Licensed under the Apache License, Version 2.0 (the "License");
>>> + * you may not use this file except in compliance with the License.
>>> + * You may obtain a copy of the License at
>>> + *
>>> + * http://www.apache.org/licenses/LICENSE-2.0
>>> + *
>>> + * Unless required by applicable law or agreed to in writing, software
>>> + * distributed under the License is distributed on an "AS IS" BASIS,
>>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>>> + * See the License for the specific language governing permissions and
>>> + * limitations under the License.
>>> + */
>>> +
>>> +#include <config.h>
>>> +
>>> +#undef NDEBUG
>>> +#include "ovs-atomic.h"
>>> +#include "ovs-rcu.h"
>>> +#include "ovs-thread.h"
>>> +#include "ovstest.h"
>>> +#include "seq.h"
>>> +#include "timeval.h"
>>> +#include "util.h"
>>> +
>>> +#include "openvswitch/poll-loop.h"
>>> +
>>> +struct element {
>>> + struct ovsrcu_inline_node rcu_node;
>>> + struct seq *trigger;
>>> + atomic_bool wait;
>>> +};
>>> +
>>> +static void
>>> +do_inline(void *e_)
>>> +{
>>> + struct element *e = (struct element *) e_;
>>> +
>>> + seq_change(e->trigger);
>>> +}
>>> +
>>> +static void *
>>> +wait_main(void *aux)
>>> +{
>>> + struct element *e = aux;
>>> +
>>> + for (;;) {
>>> + bool wait;
>>> +
>>> + atomic_read(&e->wait, &wait);
>>> + if (!wait) {
>>> + break;
>>> + }
>>> + }
>>> +
>>> + seq_wait(e->trigger, seq_read(e->trigger));
>>> + poll_block();
>>> +
>>> + return NULL;
>>> +}
>>> +
>>> +static void
>>> +test_rcu_inline_main(bool multithread)
>>> +{
>>> + long long int timeout;
>>> + pthread_t waiter;
>>> + struct element e;
>>> + uint64_t seqno;
>>> +
>>> + atomic_init(&e.wait, true);
>>> +
>>> + if (multithread) {
>>> + waiter = ovs_thread_create("waiter", wait_main, &e);
>>> + }
>>> +
>>> + e.trigger = seq_create();
>>> + seqno = seq_read(e.trigger);
>>> +
>>> + ovsrcu_postpone_inline(do_inline, &e, rcu_node);
>>> +
>>> + /* Check that GC holds out until all threads are quiescent. */
>>> + timeout = time_msec();
>>> + if (multithread) {
>>> + timeout += 200;
>>> + }
>>> + while (time_msec() <= timeout) {
>>> + ovs_assert(seq_read(e.trigger) == seqno);
>>> + }
>>> +
>>> + atomic_store(&e.wait, false);
>>> +
>>> + seq_wait(e.trigger, seqno);
>>> + poll_timer_wait_until(time_msec() + 200);
>>> + poll_block();
>>> +
>>> + /* Verify that GC executed. */
>>> + ovs_assert(seq_read(e.trigger) != seqno);
>>> + seq_destroy(e.trigger);
>>> +
>>> + if (multithread) {
>>> + xpthread_join(waiter, NULL);
>>> + }
>>> +}
>>> +
>>> +static void
>>> +test_rcu_inline(int argc OVS_UNUSED, char *argv[] OVS_UNUSED)
>>> +{
>>> + const bool multithread = true;
>>> +
>>> + test_rcu_inline_main(!multithread);
>>> + test_rcu_inline_main(multithread);
>>> +}
>>> +
>>> +OVSTEST_REGISTER("test-rcu-inline", test_rcu_inline);
>>> --
>>> 2.34.1
>>
_______________________________________________
dev mailing list
[email protected]
https://mail.openvswitch.org/mailman/listinfo/ovs-dev