On 13/04/2026 16:21, Eelco Chaudron wrote:
> External email: Use caution opening links or attachments
>
>
> On 1 Apr 2026, at 11:13, Eli Britstein wrote:
>
>> From: Gaetan Rivet <[email protected]>
>>
>> Add a way to schedule executions with the RCU using memory embedded
>> within the object being scheduled, if applicable.
>>
>> This way, freeing a high volume of objects does not require many small
>> allocations, potentially increasing heap fragmentation and memory
>> pressure.
>
> Thanks, Gaetan, for following up on this patch. This embedded version looks
> way nicer. I have a few comments below.
>
> Cheers,
>
> Eelco
>
>> Signed-off-by: Gaetan Rivet <[email protected]>
>> Co-authored-by: Eli Britstein <[email protected]>
>> Signed-off-by: Eli Britstein <[email protected]>
>> ---
>> lib/guarded-list.c | 10 ++++
>> lib/guarded-list.h | 2 +
>> lib/ovs-rcu.c | 110 ++++++++++++++++++++++---------------
>> lib/ovs-rcu.h | 39 ++++++++++++++
>> tests/test-rcu.c | 131 +++++++++++++++++++++++++++++++++++++++++++++
>> 5 files changed, 249 insertions(+), 43 deletions(-)
>>
>> diff --git a/lib/guarded-list.c b/lib/guarded-list.c
>> index 2186d074e..bb77fb55f 100644
>> --- a/lib/guarded-list.c
>> +++ b/lib/guarded-list.c
>> @@ -65,6 +65,16 @@ guarded_list_push_back(struct guarded_list *list,
>> return retval;
>> }
>>
>> +void
>> +guarded_list_push_back_all(struct guarded_list *list,
>> + struct ovs_list *nodes, size_t n)
>> +{
>
> guarded_list_push_back_all() trusts the 'n' parameter matches the actual
> list length, which could lead to incorrect counts if caller passes wrong
> value. I would prefer verifying which is probably as expensive as
> computing it internally.
>
Ack, I think though it's a shame to have an unbounded list of nodes that
we push one by one under lock only to keep the guarded list under a max
count, while this max count is not otherwise used.
Would you be ok to change to an ovs_list + mutex, and keep the single op
move to the flushed nodes?
>> + ovs_mutex_lock(&list->mutex);
>> + ovs_list_push_back_all(&list->list, nodes);
>> + list->n += n;
>> + ovs_mutex_unlock(&list->mutex);
>> +}
>> +
>> struct ovs_list *
>> guarded_list_pop_front(struct guarded_list *list)
>> {
>> diff --git a/lib/guarded-list.h b/lib/guarded-list.h
>> index 80ce22c12..b575dc425 100644
>> --- a/lib/guarded-list.h
>> +++ b/lib/guarded-list.h
>> @@ -40,6 +40,8 @@ bool guarded_list_is_empty(const struct guarded_list *);
>>
>> size_t guarded_list_push_back(struct guarded_list *, struct ovs_list *,
>> size_t max);
>> +void guarded_list_push_back_all(struct guarded_list *, struct ovs_list *,
>> + size_t n);
>> struct ovs_list *guarded_list_pop_front(struct guarded_list *);
>> size_t guarded_list_pop_all(struct guarded_list *, struct ovs_list *);
>>
>> diff --git a/lib/ovs-rcu.c b/lib/ovs-rcu.c
>> index 49afcc55c..54e6c469d 100644
>> --- a/lib/ovs-rcu.c
>> +++ b/lib/ovs-rcu.c
>> @@ -38,7 +38,7 @@ struct ovsrcu_cb {
>> };
>>
>> struct ovsrcu_cbset {
>> - struct ovs_list list_node;
>> + struct ovsrcu_node rcu_node;
>> struct ovsrcu_cb *cbs;
>> size_t n_allocated;
>> int n_cbs;
>> @@ -49,6 +49,8 @@ struct ovsrcu_perthread {
>>
>> uint64_t seqno;
>> struct ovsrcu_cbset *cbset;
>> + struct ovs_list pending; /* Thread-local list of ovsrcu_node. */
>> + size_t n_pending;
>
> We might not need this, based on my earlier comment regarding
> guarded_list_push_back_all().
>
Ack.
>> char name[16]; /* This thread's name. */
>> };
>>
>> @@ -58,15 +60,15 @@ static pthread_key_t perthread_key;
>> static struct ovs_list ovsrcu_threads;
>> static struct ovs_mutex ovsrcu_threads_mutex;
>>
>> -static struct guarded_list flushed_cbsets;
>> -static struct seq *flushed_cbsets_seq;
>> +static struct guarded_list flushed_nodes;
>> +static struct seq *flushed_nodes_seq;
>>
>> static struct latch postpone_exit;
>> static struct ovs_barrier postpone_barrier;
>>
>> static void ovsrcu_init_module(void);
>> -static void ovsrcu_flush_cbset__(struct ovsrcu_perthread *, bool);
>> -static void ovsrcu_flush_cbset(struct ovsrcu_perthread *);
>> +static void ovsrcu_flush_nodes__(struct ovsrcu_perthread *, bool);
>> +static void ovsrcu_flush_nodes(struct ovsrcu_perthread *);
>> static void ovsrcu_unregister__(struct ovsrcu_perthread *);
>> static bool ovsrcu_call_postponed(void);
>> static void *ovsrcu_postpone_thread(void *arg OVS_UNUSED);
>> @@ -85,6 +87,8 @@ ovsrcu_perthread_get(void)
>> perthread = xmalloc(sizeof *perthread);
>> perthread->seqno = seq_read(global_seqno);
>> perthread->cbset = NULL;
>> + ovs_list_init(&perthread->pending);
>> + perthread->n_pending = 0;
>> ovs_strlcpy(perthread->name, name[0] ? name : "main",
>> sizeof perthread->name);
>>
>> @@ -153,9 +157,7 @@ ovsrcu_quiesce(void)
>>
>> perthread = ovsrcu_perthread_get();
>> perthread->seqno = seq_read(global_seqno);
>> - if (perthread->cbset) {
>> - ovsrcu_flush_cbset(perthread);
>> - }
>> + ovsrcu_flush_nodes(perthread);
>> seq_change(global_seqno);
>>
>> ovsrcu_quiesced();
>> @@ -171,9 +173,7 @@ ovsrcu_try_quiesce(void)
>> perthread = ovsrcu_perthread_get();
>> if (!seq_try_lock()) {
>> perthread->seqno = seq_read(global_seqno);
>> - if (perthread->cbset) {
>> - ovsrcu_flush_cbset__(perthread, true);
>> - }
>> + ovsrcu_flush_nodes__(perthread, true);
>> seq_change_protected(global_seqno);
>> seq_unlock();
>> ovsrcu_quiesced();
>> @@ -264,10 +264,10 @@ ovsrcu_exit(void)
>> /* Repeatedly:
>> *
>> * - Wait for a grace period. One important side effect is to push
>> the
>> - * running thread's cbset into 'flushed_cbsets' so that the next
>> call
>> + * running thread's nodes into 'flushed_nodes' so that the next
>> call
>> * has something to call.
>> *
>> - * - Call all the callbacks in 'flushed_cbsets'. If there aren't
>> any,
>> + * - Call all the callbacks in 'flushed_nodes'. If there aren't any,
>> * we're done, otherwise the callbacks themselves might have
>> requested
>> * more deferred callbacks so we go around again.
>> *
>> @@ -282,6 +282,32 @@ ovsrcu_exit(void)
>> }
>> }
>>
>> +static void
>> +ovsrcu_run_cbset(void *aux)
>
> Maybe the name should be more explicit, for example;
> ovsrcu_cbset_execute_and_free()
>
> You might be missing OVS_NO_SANITIZE_FUNCTION attribute causing the upstream
> regression failures.
>
Ack.
>> +{
>> + struct ovsrcu_cbset *cbset = aux;
>> + struct ovsrcu_cb *cb;
>> +
>> + for (cb = cbset->cbs; cb < &cbset->cbs[cbset->n_cbs]; cb++) {
>> + cb->function(cb->aux);
>> + }
>> +
>
> Should this be something like this, as &cbset->cbs[cbset->n_cbs] is
> undefined behaviour when cbset->cbs is NULL.
>
> ovs_assert(cbset->n_cbs <= cbset->n_allocated);
>
> if (cbset->cbs != NULL && cbset->n_cbs > 0) {
> for (cb = cbset->cbs; cb < &cbset->cbs[cbset->n_cbs]; cb++) {
> cb->function(cb->aux);
> }
> }
>
> But I guess this would even be cleaner:
>
> ovs_assert(cbset->n_cbs <= cbset->n_allocated);
>
> for (size_t i = 0; i < cbset->n_cbs; i++) {
> cbset->cbs[i].function(cbset->cbs[i].aux);
> }
>
Ack.
>> + free(cbset->cbs);
>> + free(cbset);
>> +}
>> +
>> +void
>> +ovsrcu_postpone_embedded__(void (*function)(void *aux), void *aux,
>> + struct ovsrcu_node *rcu_node)
>> +{
>> + struct ovsrcu_perthread *perthread = ovsrcu_perthread_get();
>> +
>> + rcu_node->cb = function;
>> + rcu_node->aux = aux;
>> + ovs_list_push_back(&perthread->pending, &rcu_node->list_node);
>> + perthread->n_pending++;
>> +}
>> +
>> /* Registers 'function' to be called, passing 'aux' as argument, after the
>> * next grace period.
>> *
>> @@ -314,6 +340,7 @@ ovsrcu_postpone__(void (*function)(void *aux), void *aux)
>> cbset->cbs = xmalloc(MIN_CBS * sizeof *cbset->cbs);
>> cbset->n_allocated = MIN_CBS;
>> cbset->n_cbs = 0;
>> + ovsrcu_postpone_embedded(ovsrcu_run_cbset, cbset, rcu_node);
>> }
>>
>> if (cbset->n_cbs == cbset->n_allocated) {
>> @@ -329,24 +356,18 @@ ovsrcu_postpone__(void (*function)(void *aux), void
>> *aux)
>> static bool OVS_NO_SANITIZE_FUNCTION
>> ovsrcu_call_postponed(void)
>> {
>> - struct ovsrcu_cbset *cbset;
>> - struct ovs_list cbsets;
>> + struct ovs_list nodes = OVS_LIST_INITIALIZER(&nodes);
>> + struct ovsrcu_node *node;
>>
>> - guarded_list_pop_all(&flushed_cbsets, &cbsets);
>> - if (ovs_list_is_empty(&cbsets)) {
>> + guarded_list_pop_all(&flushed_nodes, &nodes);
>> + if (ovs_list_is_empty(&nodes)) {
>> return false;
>> }
>>
>> ovsrcu_synchronize();
>>
>> - LIST_FOR_EACH_POP (cbset, list_node, &cbsets) {
>> - struct ovsrcu_cb *cb;
>> -
>> - for (cb = cbset->cbs; cb < &cbset->cbs[cbset->n_cbs]; cb++) {
>> - cb->function(cb->aux);
>> - }
>> - free(cbset->cbs);
>> - free(cbset);
>> + LIST_FOR_EACH_POP (node, list_node, &nodes) {
>> + node->cb(node->aux);
>> }
>>
>> return true;
>> @@ -358,9 +379,9 @@ ovsrcu_postpone_thread(void *arg OVS_UNUSED)
>> pthread_detach(pthread_self());
>>
>> while (!latch_is_set(&postpone_exit)) {
>> - uint64_t seqno = seq_read(flushed_cbsets_seq);
>> + uint64_t cb_seqno = seq_read(flushed_nodes_seq);
>> if (!ovsrcu_call_postponed()) {
>> - seq_wait(flushed_cbsets_seq, seqno);
>> + seq_wait(flushed_nodes_seq, cb_seqno);
>> latch_wait(&postpone_exit);
>> poll_block();
>> }
>> @@ -371,33 +392,36 @@ ovsrcu_postpone_thread(void *arg OVS_UNUSED)
>> }
>>
>> static void
>> -ovsrcu_flush_cbset__(struct ovsrcu_perthread *perthread, bool protected)
>> +ovsrcu_flush_nodes__(struct ovsrcu_perthread *perthread, bool protected)
>> {
>> - struct ovsrcu_cbset *cbset = perthread->cbset;
>> + if (ovs_list_is_empty(&perthread->pending)) {
>> + return;
>> + }
>>
>> - if (cbset) {
>> - guarded_list_push_back(&flushed_cbsets, &cbset->list_node,
>> SIZE_MAX);
>> - perthread->cbset = NULL;
>> + perthread->cbset = NULL;
>> + guarded_list_push_back_all(&flushed_nodes, &perthread->pending,
>> + perthread->n_pending);
>> + ovs_list_init(&perthread->pending);
>
> Don't think there is a need to call init. ovs_list_push_back_all() calls
> ovs_list_splice() which leaves the source list empty, making this
> ovs_list_init() call redundant.
>
Ack.
>> + perthread->n_pending = 0;
>>
>> - if (protected) {
>> - seq_change_protected(flushed_cbsets_seq);
>> - } else {
>> - seq_change(flushed_cbsets_seq);
>> - }
>> + if (protected) {
>> + seq_change_protected(flushed_nodes_seq);
>> + } else {
>> + seq_change(flushed_nodes_seq);
>> }
>> }
>>
>> static void
>> -ovsrcu_flush_cbset(struct ovsrcu_perthread *perthread)
>> +ovsrcu_flush_nodes(struct ovsrcu_perthread *perthread)
>> {
>> - ovsrcu_flush_cbset__(perthread, false);
>> + ovsrcu_flush_nodes__(perthread, false);
>> }
>>
>> static void
>> ovsrcu_unregister__(struct ovsrcu_perthread *perthread)
>> {
>> - if (perthread->cbset) {
>> - ovsrcu_flush_cbset(perthread);
>> + if (!ovs_list_is_empty(&perthread->pending)) {
>> + ovsrcu_flush_nodes(perthread);
>> }
>>
>> ovs_mutex_lock(&ovsrcu_threads_mutex);
>> @@ -438,8 +462,8 @@ ovsrcu_init_module(void)
>> ovs_list_init(&ovsrcu_threads);
>> ovs_mutex_init(&ovsrcu_threads_mutex);
>>
>> - guarded_list_init(&flushed_cbsets);
>> - flushed_cbsets_seq = seq_create();
>> + guarded_list_init(&flushed_nodes);
>> + flushed_nodes_seq = seq_create();
>>
>> ovsthread_once_done(&once);
>> }
>> diff --git a/lib/ovs-rcu.h b/lib/ovs-rcu.h
>> index a1c15c126..efd43a1a2 100644
>> --- a/lib/ovs-rcu.h
>> +++ b/lib/ovs-rcu.h
>> @@ -125,6 +125,22 @@
>> * ovs_mutex_unlock(&mutex);
>> * }
>> *
>> + * As an alternative to ovsrcu_postpone(), the same deferred execution can
>> be
>> + * achieved using ovsrcu_postpone_embedded():
>> + *
>> + * struct deferrable {
>> + * struct ovsrcu_node rcu_node;
>> + * };
>> + *
>> + * void
>> + * deferred_free(struct deferrable *d)
>> + * {
>> + * ovsrcu_postpone_embedded(free, d, rcu_node);
>> + * }
>> + *
>> + * Using embedded fields can be preferred sometimes to avoid the small
>> + * allocations done in ovsrcu_postpone().
>> + *
>> * In some rare cases an object may not be addressable with a pointer, but
>> only
>> * through an array index (e.g. because it's provided by another library).
>> It
>> * is still possible to have RCU semantics by using the ovsrcu_index type.
>> @@ -173,6 +189,8 @@
>> #include "compiler.h"
>> #include "ovs-atomic.h"
>>
>> +#include "openvswitch/list.h"
>> +
>> #if __GNUC__
>> #define OVSRCU_TYPE(TYPE) struct { ATOMIC(TYPE) p; }
>> #define OVSRCU_INITIALIZER(VALUE) { VALUE }
>> @@ -256,6 +274,27 @@ void ovsrcu_postpone__(void (*function)(void *aux),
>> void *aux);
>> (void) sizeof(*(ARG)), \
>> ovsrcu_postpone__((void (*)(void *))(FUNCTION), ARG))
>>
>> +struct ovsrcu_node {
>> + struct ovs_list list_node;
>> + void (*cb)(void *aux);
>> + void *aux;
>> +};
>> +
>> +/* Calls FUNCTION passing ARG as its pointer-type argument, which
>> + * contains an 'ovsrcu_node' as a field named MEMBER. The function
>
> Missing double space after period: "MEMBER. The function" should be
> "MEMBER. The function".
>
Ack.
>> + * is called following the next grace period. See 'Usage' above for an
>> + * example.
>
> Should this comment also mention that the same ovsrcu_node must not be
> scheduled multiple times? This restriction is not obvious and violating
> it could cause subtle bugs (use-after-free when the callback executes
> twice and frees the object twice).
>
Yes it's an important point, will add it.
If we are ok with sparing a bit of memory, maybe we can maintain a flag
within the node to assert no double-free?
It sounds like a user error though, so not sure it's worth making all
nodes bigger to detect it.
>> + */
>> +void ovsrcu_postpone_embedded__(void (*function)(void *aux), void *aux,
>> + struct ovsrcu_node *node);
>> +#define ovsrcu_postpone_embedded(FUNCTION, ARG, MEMBER) \
>> + (/* Verify that ARG is appropriate for FUNCTION. */ \
>> + (void) sizeof((FUNCTION)(ARG), 1), \
>> + /* Verify that ARG is a pointer type. */ \
>> + (void) sizeof(*(ARG)), \
>> + ovsrcu_postpone_embedded__((void (*)(void *))(FUNCTION), ARG, \
>> + &(ARG)->MEMBER))
>> +
>> /* An array index protected by RCU semantics. This is an easier
>> alternative to
>> * an RCU protected pointer to a malloc'd int. */
>> typedef struct { atomic_int v; } ovsrcu_index;
>> diff --git a/tests/test-rcu.c b/tests/test-rcu.c
>> index bb17092bf..26150e7d9 100644
>> --- a/tests/test-rcu.c
>> +++ b/tests/test-rcu.c
>> @@ -17,11 +17,16 @@
>> #include <config.h>
>> #undef NDEBUG
>> #include "fatal-signal.h"
>> +#include "ovs-atomic.h"
>> #include "ovs-rcu.h"
>> #include "ovs-thread.h"
>> #include "ovstest.h"
>> +#include "seq.h"
>> +#include "timeval.h"
>> #include "util.h"
>>
>> +#include "openvswitch/poll-loop.h"
>> +
>> static void *
>> quiescer_main(void *aux OVS_UNUSED)
>> {
>> @@ -67,10 +72,136 @@ test_rcu_barrier(void)
>> ovs_assert(count == 10);
>> }
>>
>> +struct element {
>> + struct ovsrcu_node rcu_node;
>> + struct seq *trigger;
>> + atomic_bool wait;
>> +};
>> +
>> +static void
>> +trigger_cb(void *e_)
>> +{
>> + struct element *e = (struct element *) e_;
>> +
>> + seq_change(e->trigger);
>
> Maybe add a counter in struct element to verify the callback
> executes exactly once, catching potential double-execution bugs.
>
Ack.
>> +}
>> +
>> +static void *
>> +wait_main(void *aux)
>> +{
>> + struct element *e = aux;
>> +
>> + for (;;) {
>> + bool wait;
>> +
>> + atomic_read(&e->wait, &wait);
>> + if (!wait) {
>> + break;
>> + }
>> + }
>> +
>> + seq_wait(e->trigger, seq_read(e->trigger));
>> + poll_block();
>> +
>> + return NULL;
>> +}
>> +
>> +static void
>> +test_rcu_postpone_embedded(bool multithread)
>> +{
>> + long long int timeout;
>> + pthread_t waiter;
>> + struct element e;
>> + uint64_t seqno;
>> +
>> + atomic_init(&e.wait, true);
>> +
>> + if (multithread) {
>> + waiter = ovs_thread_create("waiter", wait_main, &e);
>> + }
>> +
>> + e.trigger = seq_create();
>> + seqno = seq_read(e.trigger);
>> +
>> + ovsrcu_postpone_embedded(trigger_cb, &e, rcu_node);
>> +
>> + /* Check that GC holds out until all threads are quiescent. */
>> + timeout = time_msec();
>> + if (multithread) {
>> + timeout += 200;
>> + }
>> + while (time_msec() <= timeout) {
>> + ovs_assert(seq_read(e.trigger) == seqno);
>> + }
>> +
>> + atomic_store(&e.wait, false);
>> +
>> + seq_wait(e.trigger, seqno);
>> + poll_timer_wait_until(time_msec() + 200);
>> + poll_block();
>> +
>> + /* Verify that GC executed. */
>> + ovs_assert(seq_read(e.trigger) != seqno);
>> + seq_destroy(e.trigger);
>> +
>> + if (multithread) {
>> + xpthread_join(waiter, NULL);
>> + }
>> +}
>> +
>> +#define N_ORDER_CBS 5
>> +
>> +struct order_element {
>> + struct ovsrcu_node rcu_node;
>> + int id;
>> + int *log;
>> + int *log_idx;
>> +};
>> +
>> +static void
>> +order_cb(void *aux)
>> +{
>> + struct order_element *e = aux;
>> + e->log[(*e->log_idx)++] = e->id;
>> +}
>> +
>> +static void
>> +test_rcu_ordering(void)
>> +{
>
> The documentation states "All functions postponed by a single thread are
> guaranteed to execute in the order they were postponed", the test should
> verify this for mixed ovsrcu_postpone() and ovsrcu_postpone_embedded()
> calls.
>
Currently, ordering is only guaranteed within each 'scheduling types'
(cbset or embedded).
This is a simpler implementation. I can either
1. Modify the doc to specify this point.
or
2. Modify the scheduling such that postponing into a cbset will trigger
creating a new cbset if an embedded not is already at the tail of the
thread-local node list: this way, each cbset executes in the right
order, at the cost of potentially many cbset under-utilized (wasted of
memory).
I am assuming you will prefer (2.), if not let me know.
>> + struct order_element elems[N_ORDER_CBS];
>> + int log[N_ORDER_CBS];
>> + int log_idx = 0;
>> +
>> + for (int i = 0; i < N_ORDER_CBS; i++) {
>> + elems[i].id = i;
>> + elems[i].log = log;
>> + elems[i].log_idx = &log_idx;
>> + ovsrcu_postpone_embedded(order_cb, &elems[i], rcu_node);
>> + }
>> +
>> + ovsrcu_barrier();
>> +
>> + ovs_assert(log_idx == N_ORDER_CBS);
>> + for (int i = 0; i < N_ORDER_CBS; i++) {
>> + if (log[i] != i) {
>> + ovs_abort(0, "RCU embedded callback ordering violated: "
>> + "expected cb %d at position %d, got %d",
>> + i, i, log[i]);
>> + }
>> + }
>> +}
>> +
>> static void
>> test_rcu(int argc OVS_UNUSED, char *argv[] OVS_UNUSED) {
>> + const bool multithread = true;
>> +
>> + /* Execute single-threaded check before spawning additional threads. */
>> + test_rcu_postpone_embedded(!multithread);
>> + test_rcu_postpone_embedded(multithread);
>> +
>> test_rcu_quiesce();
>> test_rcu_barrier();
>> + test_rcu_ordering();
>
> test_rcu_ordering() should be tested in both single-threaded and
> multi-threaded modes like test_rcu_postpone_embedded() to verify
> ordering guarantees hold in both RCU execution paths.
>
Ack.
>> }
>>
>> OVSTEST_REGISTER("test-rcu", test_rcu);
>
_______________________________________________
dev mailing list
[email protected]
https://mail.openvswitch.org/mailman/listinfo/ovs-dev