On 03/03/2026 14:41, Eelco Chaudron wrote: > External email: Use caution opening links or attachments > > > On 9 Feb 2026, at 14:29, Eli Britstein wrote: > >> From: Gaetan Rivet <[email protected]> >> >> Add a way to schedule executions with the RCU using memory embedded >> within the object being scheduled, if applicable. >> >> This way, freeing a high volume of objects does not require many small >> allocations, reducing memory pressure and heap fragmentation. >> >> This fragmentation is seen as the heap grows instead of shrinking when >> a high volume of objects are freed using RCU. > > Hi Gaetan, > > Thanks for the patch. I have some comments inline below (not a complete > review yet). > > I'm curious about the design choice: why implement this as a parallel > mechanism rather than refactoring the existing RCU implementation to handle > both cases? Was this driven by performance concerns, or are there fundamental > differences that require separate code paths? > > Cheers, > > Eelco >
This is historical. At the time, I wanted to make sure the main mechanism would not be disturbed by my changes so I kept them separate. As time passed, I didn't have a reason to revisit something that was working. I agree that it would make sense to use a single mechanism. >> Signed-off-by: Gaetan Rivet <[email protected]> >> Co-authored-by: Eli Britstein <[email protected]> >> Signed-off-by: Eli Britstein <[email protected]> >> --- >> lib/ovs-rcu.c | 125 ++++++++++++++++++++++++++++++++++------ >> lib/ovs-rcu.h | 39 +++++++++++++ >> tests/automake.mk | 1 + >> tests/library.at | 4 ++ >> tests/test-rcu-inline.c | 118 +++++++++++++++++++++++++++++++++++++ >> 5 files changed, 270 insertions(+), 17 deletions(-) >> create mode 100644 tests/test-rcu-inline.c >> >> diff --git a/lib/ovs-rcu.c b/lib/ovs-rcu.c >> index 49afcc55c..539d17a7c 100644 >> --- a/lib/ovs-rcu.c >> +++ b/lib/ovs-rcu.c >> @@ -20,6 +20,7 @@ >> #include "fatal-signal.h" >> #include "guarded-list.h" >> #include "latch.h" >> +#include "mpsc-queue.h" >> #include "openvswitch/list.h" >> #include "ovs-thread.h" >> #include "openvswitch/poll-loop.h" >> @@ -49,6 +50,7 @@ struct ovsrcu_perthread { >> >> uint64_t seqno; >> struct ovsrcu_cbset *cbset; >> + bool do_inline; > > Do we need this bool? It's not set atomically or under any lock, and once set > to true it stays true forever for that thread. Would it be better to check if > the inline_queue is empty instead of relying on this sticky flag? > Some quiescent modes will delete the perthread structure (cf. ovsrcu_quiesce_start) In this case, this flag is reset when creating the new perthread next active period. I'll check if the flag can be avoided. >> char name[16]; /* This thread's name. */ >> }; >> >> @@ -61,6 +63,8 @@ static struct ovs_mutex ovsrcu_threads_mutex; >> static struct guarded_list flushed_cbsets; >> static struct seq *flushed_cbsets_seq; >> >> +static struct seq *inline_seq; > > If do_inline is removed (always signal), inline_seq becomes redundant with > global_seqno. > >> + static struct latch postpone_exit; >> static struct ovs_barrier postpone_barrier; >> >> @@ -68,7 +72,8 @@ static void ovsrcu_init_module(void); >> static void ovsrcu_flush_cbset__(struct ovsrcu_perthread *, bool); >> static void ovsrcu_flush_cbset(struct ovsrcu_perthread *); >> static void ovsrcu_unregister__(struct ovsrcu_perthread *); >> -static bool ovsrcu_call_postponed(void); >> +static bool ovsrcu_call_inline(uint64_t); >> +static bool ovsrcu_call_postponed(struct ovs_list *cbsets); >> static void *ovsrcu_postpone_thread(void *arg OVS_UNUSED); >> >> static struct ovsrcu_perthread * >> @@ -85,6 +90,7 @@ ovsrcu_perthread_get(void) >> perthread = xmalloc(sizeof *perthread); >> perthread->seqno = seq_read(global_seqno); >> perthread->cbset = NULL; >> + perthread->do_inline = false; >> ovs_strlcpy(perthread->name, name[0] ? name : "main", >> sizeof perthread->name); >> >> @@ -112,7 +118,11 @@ static void >> ovsrcu_quiesced(void) >> { >> if (single_threaded()) { >> - ovsrcu_call_postponed(); >> + struct ovs_list cbsets = OVS_LIST_INITIALIZER(&cbsets); >> + >> + ovsrcu_call_inline(seq_read(global_seqno)); >> + guarded_list_pop_all(&flushed_cbsets, &cbsets); >> + ovsrcu_call_postponed(&cbsets); >> } else { >> static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER; >> if (ovsthread_once_start(&once)) { >> @@ -156,6 +166,9 @@ ovsrcu_quiesce(void) >> if (perthread->cbset) { >> ovsrcu_flush_cbset(perthread); >> } >> + if (perthread->do_inline) { >> + seq_change(inline_seq); >> + } >> seq_change(global_seqno); >> >> ovsrcu_quiesced(); >> @@ -174,6 +187,9 @@ ovsrcu_try_quiesce(void) >> if (perthread->cbset) { >> ovsrcu_flush_cbset__(perthread, true); >> } >> + if (perthread->do_inline) { >> + seq_change_protected(inline_seq); >> + } >> seq_change_protected(global_seqno); >> seq_unlock(); >> ovsrcu_quiesced(); >> @@ -189,21 +205,28 @@ ovsrcu_is_quiescent(void) >> return pthread_getspecific(perthread_key) == NULL; >> } >> >> -void >> -ovsrcu_synchronize(void) >> +static uint64_t >> +ovsrcu_synchronize_(struct ovs_list *cbsets) > > We use a double _ for specific variants. > Ack. >> { >> unsigned int warning_threshold = 1000; >> uint64_t target_seqno; >> long long int start; >> >> if (single_threaded()) { >> - return; >> + return UINT64_MAX; >> } >> >> target_seqno = seq_read(global_seqno); >> ovsrcu_quiesce_start(); >> start = time_msec(); >> >> + if (cbsets != NULL) { >> + /* Move the flushed 'cbsets' after 'ovsrcu_quiesce_start', >> + * as this function has the potential in single-threaded mode >> + * to itself execute those 'cbsets'. */ > > This comment doesn't make sense. We can only reach here in multi-threaded mode > (single-threaded returns early), so the described scenario is impossible. > >> + guarded_list_pop_all(&flushed_cbsets, cbsets); >> + } >> + >> for (;;) { >> uint64_t cur_seqno = seq_read(global_seqno); >> struct ovsrcu_perthread *perthread; >> @@ -237,6 +260,15 @@ ovsrcu_synchronize(void) >> poll_block(); >> } >> ovsrcu_quiesce_end(); >> + >> + /* Return the 'seqno' that is safe to consider reached by all threads. >> */ >> + return target_seqno; >> +} >> + >> +void >> +ovsrcu_synchronize(void) >> +{ >> + ovs_ignore(ovsrcu_synchronize_(NULL)); >> } >> >> /* Waits until as many postponed callbacks as possible have executed. >> @@ -275,11 +307,57 @@ ovsrcu_exit(void) >> * infinite loop. This function is just for making memory leaks >> easier to >> * spot so there's no point in breaking things on that basis. */ >> for (int i = 0; i < 8; i++) { >> - ovsrcu_synchronize(); >> - if (!ovsrcu_call_postponed()) { >> + struct ovs_list cbsets = OVS_LIST_INITIALIZER(&cbsets); >> + uint64_t target = ovsrcu_synchronize_(&cbsets); >> + bool inline_active; >> + bool cbsets_active; >> + >> + /* Both RCU calls must be examined for activity. */ >> + inline_active = ovsrcu_call_inline(target); > > The inline naming is confusing. Consider using "embedded" instead, like > ovsrcu_postpone_embedded(). For consistency, this function would be > ovsrcu_call_postpone_embedded(). > Ack. >> + cbsets_active = ovsrcu_call_postponed(&cbsets); >> + if (!inline_active && !cbsets_active) { >> + break; >> + } >> + } >> +} >> + >> +static struct mpsc_queue inline_queue = >> MPSC_QUEUE_INITIALIZER(&inline_queue); >> + >> +void >> +ovsrcu_postpone_inline__(void (*function)(void *aux), void *aux, >> + struct ovsrcu_inline_node *rcu_node) >> +{ >> + struct ovsrcu_perthread *perthread = ovsrcu_perthread_get(); >> + >> + rcu_node->seqno = perthread->seqno; >> + rcu_node->cb = function; >> + rcu_node->aux = aux; >> + mpsc_queue_insert(&inline_queue, &rcu_node->node); >> + >> + perthread->do_inline = true; >> +} >> + >> +static bool >> +ovsrcu_call_inline(uint64_t target_seqno) >> +{ >> + struct mpsc_queue_node *msg; >> + unsigned int count = 0; >> + >> + mpsc_queue_acquire(&inline_queue); >> + MPSC_QUEUE_FOR_EACH_POP (msg, &inline_queue) { >> + struct ovsrcu_inline_node *node; >> + >> + node = CONTAINER_OF(msg, struct ovsrcu_inline_node, node); >> + if (node->seqno >= target_seqno) { >> + mpsc_queue_push_front(&inline_queue, msg); >> break; >> } >> + node->cb(node->aux); >> + count++; >> } >> + mpsc_queue_release(&inline_queue); >> + >> + return count > 0; >> } >> >> /* Registers 'function' to be called, passing 'aux' as argument, after the >> @@ -327,19 +405,18 @@ ovsrcu_postpone__(void (*function)(void *aux), void >> *aux) >> } >> >> static bool OVS_NO_SANITIZE_FUNCTION >> -ovsrcu_call_postponed(void) >> +ovsrcu_call_postponed(struct ovs_list *cbsets) >> { >> struct ovsrcu_cbset *cbset; >> - struct ovs_list cbsets; >> >> - guarded_list_pop_all(&flushed_cbsets, &cbsets); >> - if (ovs_list_is_empty(&cbsets)) { >> + if (cbsets == NULL) { >> + return false; >> + } >> + if (ovs_list_is_empty(cbsets)) { > > These two ifs could be combined. Also, why does cbsets need to be a parameter? > (See earlier comment about the confusing comment in ovsrcu_synchronize_.) > Ack on combined if(). The cbsets are provided as parameter due to the ovsrcu_synchronize() call below that is removed. It is removed to allow the inlined / embedded nodes to be called properly. Instead of executing "sync" as part of "call_postponed": + call_postponed +--> pop cbsets +--> sync() +--> execute cbsets becomes: + pop cbsets + sync + call_inlined + call_posponed(cbsets) Removing sync from inside "call_postponed" means that new cbsets might be scheduled by another thread, then this thread enters ovsrcu_call_postponed, pops all cbsets and start executing them (now without sync): it would execute those new cbsets immediately, breaking RCU. To avoid this situation, the cbsets are popped before entering this function. A sync point is then executed, and the previously popped cbsets are provided as parameter to "postponed" to be executed. If both inline / embedded and regular posponed cbsets use a single defer mechanism, maybe this whole sync reordering can be avoided altogether, removing this subtle bug and general confusing changes. I'll try it. >> return false; >> } >> >> - ovsrcu_synchronize(); >> - >> - LIST_FOR_EACH_POP (cbset, list_node, &cbsets) { >> + LIST_FOR_EACH_POP (cbset, list_node, cbsets) { >> struct ovsrcu_cb *cb; >> >> for (cb = cbset->cbs; cb < &cbset->cbs[cbset->n_cbs]; cb++) { >> @@ -358,9 +435,19 @@ ovsrcu_postpone_thread(void *arg OVS_UNUSED) >> pthread_detach(pthread_self()); >> >> while (!latch_is_set(&postpone_exit)) { >> - uint64_t seqno = seq_read(flushed_cbsets_seq); >> - if (!ovsrcu_call_postponed()) { >> - seq_wait(flushed_cbsets_seq, seqno); >> + struct ovs_list cbsets = OVS_LIST_INITIALIZER(&cbsets); >> + uint64_t cb_seqno = seq_read(flushed_cbsets_seq); >> + uint64_t target = ovsrcu_synchronize_(&cbsets); >> + uint64_t inline_seqno = seq_read(inline_seq); >> + bool inline_active; >> + bool cbsets_active; >> + >> + /* Both RCU calls must be examined for activity. */ >> + inline_active = ovsrcu_call_inline(target); >> + cbsets_active = ovsrcu_call_postponed(&cbsets); >> + if (!inline_active && !cbsets_active) { >> + seq_wait(flushed_cbsets_seq, cb_seqno); >> + seq_wait(inline_seq, inline_seqno); >> latch_wait(&postpone_exit); >> poll_block(); >> } >> @@ -399,6 +486,9 @@ ovsrcu_unregister__(struct ovsrcu_perthread *perthread) >> if (perthread->cbset) { >> ovsrcu_flush_cbset(perthread); >> } >> + if (perthread->do_inline) { >> + seq_change(inline_seq); >> + } >> >> ovs_mutex_lock(&ovsrcu_threads_mutex); >> ovs_list_remove(&perthread->list_node); >> @@ -440,6 +530,7 @@ ovsrcu_init_module(void) >> >> guarded_list_init(&flushed_cbsets); >> flushed_cbsets_seq = seq_create(); >> + inline_seq = seq_create(); >> >> ovsthread_once_done(&once); >> } >> diff --git a/lib/ovs-rcu.h b/lib/ovs-rcu.h >> index a1c15c126..ed756c1c2 100644 >> --- a/lib/ovs-rcu.h >> +++ b/lib/ovs-rcu.h >> @@ -125,6 +125,22 @@ >> * ovs_mutex_unlock(&mutex); >> * } >> * >> + * As an alternative to ovsrcu_postpone(), the same deferred execution can >> be >> + * achieved using ovsrcu_postpone_inline(): >> + * >> + * struct deferrable { >> + * struct ovsrcu_inline_node rcu_node; >> + * }; >> + * >> + * void >> + * deferred_free(struct deferrable *d) >> + * { >> + * ovsrcu_postpone_inline(free, d, rcu_node); >> + * } >> + * >> + * Using inline fields can be preferred sometimes to avoid the small >> + * allocations done in ovsrcu_postpone(). >> + * >> * In some rare cases an object may not be addressable with a pointer, but >> only >> * through an array index (e.g. because it's provided by another library). >> It >> * is still possible to have RCU semantics by using the ovsrcu_index type. >> @@ -171,6 +187,7 @@ >> */ >> >> #include "compiler.h" >> +#include "mpsc-queue.h" >> #include "ovs-atomic.h" >> >> #if __GNUC__ >> @@ -256,6 +273,28 @@ void ovsrcu_postpone__(void (*function)(void *aux), >> void *aux); >> (void) sizeof(*(ARG)), \ >> ovsrcu_postpone__((void (*)(void *))(FUNCTION), ARG)) >> >> +struct ovsrcu_inline_node { >> + struct mpsc_queue_node node; >> + void (*cb)(void *aux); >> + void *aux; >> + uint64_t seqno; >> +}; >> + >> +/* Calls FUNCTION passing ARG as its pointer-type argument, which >> + * contains an 'ovsrcu_inline_node' as a field named MEMBER. The function >> + * is called following the next grace period. See 'Usage' above for an >> + * example. >> + */ >> +void ovsrcu_postpone_inline__(void (*function)(void *aux), void *aux, >> + struct ovsrcu_inline_node *node); >> +#define ovsrcu_postpone_inline(FUNCTION, ARG, MEMBER) \ >> + (/* Verify that ARG is appropriate for FUNCTION. */ \ >> + (void) sizeof((FUNCTION)(ARG), 1), \ >> + /* Verify that ARG is a pointer type. */ \ >> + (void) sizeof(*(ARG)), \ >> + ovsrcu_postpone_inline__((void (*)(void *))(FUNCTION), ARG, \ >> + &(ARG)->MEMBER)) >> + >> /* An array index protected by RCU semantics. This is an easier >> alternative to >> * an RCU protected pointer to a malloc'd int. */ >> typedef struct { atomic_int v; } ovsrcu_index; >> diff --git a/tests/automake.mk b/tests/automake.mk >> index da569b022..da4d2e0b8 100644 >> --- a/tests/automake.mk >> +++ b/tests/automake.mk >> @@ -500,6 +500,7 @@ tests_ovstest_SOURCES = \ >> tests/test-random.c \ >> tests/test-rcu.c \ >> tests/test-rculist.c \ >> + tests/test-rcu-inline.c \ >> tests/test-reconnect.c \ >> tests/test-rstp.c \ >> tests/test-sflow.c \ >> diff --git a/tests/library.at b/tests/library.at >> index 82ac80a27..16820ff49 100644 >> --- a/tests/library.at >> +++ b/tests/library.at >> @@ -275,6 +275,10 @@ AT_SETUP([rcu]) >> AT_CHECK([ovstest test-rcu], [0], []) >> AT_CLEANUP >> >> +AT_SETUP([rcu inline]) >> +AT_CHECK([ovstest test-rcu-inline], [0], []) >> +AT_CLEANUP >> + >> AT_SETUP([stopwatch module]) >> AT_CHECK([ovstest test-stopwatch], [0], [...... >> ], [ignore]) >> diff --git a/tests/test-rcu-inline.c b/tests/test-rcu-inline.c >> new file mode 100644 >> index 000000000..c72410223 >> --- /dev/null >> +++ b/tests/test-rcu-inline.c >> @@ -0,0 +1,118 @@ >> +/* >> + * SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & >> AFFILIATES. >> + * All rights reserved. >> + * SPDX-License-Identifier: Apache-2.0 >> + * >> + * Licensed under the Apache License, Version 2.0 (the "License"); >> + * you may not use this file except in compliance with the License. >> + * You may obtain a copy of the License at >> + * >> + * http://www.apache.org/licenses/LICENSE-2.0 >> + * >> + * Unless required by applicable law or agreed to in writing, software >> + * distributed under the License is distributed on an "AS IS" BASIS, >> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. >> + * See the License for the specific language governing permissions and >> + * limitations under the License. >> + */ >> + >> +#include <config.h> >> + >> +#undef NDEBUG >> +#include "ovs-atomic.h" >> +#include "ovs-rcu.h" >> +#include "ovs-thread.h" >> +#include "ovstest.h" >> +#include "seq.h" >> +#include "timeval.h" >> +#include "util.h" >> + >> +#include "openvswitch/poll-loop.h" >> + >> +struct element { >> + struct ovsrcu_inline_node rcu_node; >> + struct seq *trigger; >> + atomic_bool wait; >> +}; >> + >> +static void >> +do_inline(void *e_) >> +{ >> + struct element *e = (struct element *) e_; >> + >> + seq_change(e->trigger); >> +} >> + >> +static void * >> +wait_main(void *aux) >> +{ >> + struct element *e = aux; >> + >> + for (;;) { >> + bool wait; >> + >> + atomic_read(&e->wait, &wait); >> + if (!wait) { >> + break; >> + } >> + } >> + >> + seq_wait(e->trigger, seq_read(e->trigger)); >> + poll_block(); >> + >> + return NULL; >> +} >> + >> +static void >> +test_rcu_inline_main(bool multithread) >> +{ >> + long long int timeout; >> + pthread_t waiter; >> + struct element e; >> + uint64_t seqno; >> + >> + atomic_init(&e.wait, true); >> + >> + if (multithread) { >> + waiter = ovs_thread_create("waiter", wait_main, &e); >> + } >> + >> + e.trigger = seq_create(); >> + seqno = seq_read(e.trigger); >> + >> + ovsrcu_postpone_inline(do_inline, &e, rcu_node); >> + >> + /* Check that GC holds out until all threads are quiescent. */ >> + timeout = time_msec(); >> + if (multithread) { >> + timeout += 200; >> + } >> + while (time_msec() <= timeout) { >> + ovs_assert(seq_read(e.trigger) == seqno); >> + } >> + >> + atomic_store(&e.wait, false); >> + >> + seq_wait(e.trigger, seqno); >> + poll_timer_wait_until(time_msec() + 200); >> + poll_block(); >> + >> + /* Verify that GC executed. */ >> + ovs_assert(seq_read(e.trigger) != seqno); >> + seq_destroy(e.trigger); >> + >> + if (multithread) { >> + xpthread_join(waiter, NULL); >> + } >> +} >> + >> +static void >> +test_rcu_inline(int argc OVS_UNUSED, char *argv[] OVS_UNUSED) >> +{ >> + const bool multithread = true; >> + >> + test_rcu_inline_main(!multithread); >> + test_rcu_inline_main(multithread); >> +} >> + >> +OVSTEST_REGISTER("test-rcu-inline", test_rcu_inline); >> -- >> 2.34.1 > _______________________________________________ dev mailing list [email protected] https://mail.openvswitch.org/mailman/listinfo/ovs-dev
