On 13/04/2026 16:21, Eelco Chaudron wrote:
> External email: Use caution opening links or attachments
> 
> 
> On 1 Apr 2026, at 11:13, Eli Britstein wrote:
> 
>> From: Gaetan Rivet <[email protected]>
>>
>> Add a way to schedule executions with the RCU using memory embedded
>> within the object being scheduled, if applicable.
>>
>> This way, freeing a high volume of objects does not require many small
>> allocations, potentially increasing heap fragmentation and memory
>> pressure.
> 
> Thanks, Gaetan, for following up on this patch. This embedded version looks
> way nicer. I have a few comments below.
> 
> Cheers,
> 
> Eelco
> 
>> Signed-off-by: Gaetan Rivet <[email protected]>
>> Co-authored-by: Eli Britstein <[email protected]>
>> Signed-off-by: Eli Britstein <[email protected]>
>> ---
>>   lib/guarded-list.c |  10 ++++
>>   lib/guarded-list.h |   2 +
>>   lib/ovs-rcu.c      | 110 ++++++++++++++++++++++---------------
>>   lib/ovs-rcu.h      |  39 ++++++++++++++
>>   tests/test-rcu.c   | 131 +++++++++++++++++++++++++++++++++++++++++++++
>>   5 files changed, 249 insertions(+), 43 deletions(-)
>>
>> diff --git a/lib/guarded-list.c b/lib/guarded-list.c
>> index 2186d074e..bb77fb55f 100644
>> --- a/lib/guarded-list.c
>> +++ b/lib/guarded-list.c
>> @@ -65,6 +65,16 @@ guarded_list_push_back(struct guarded_list *list,
>>       return retval;
>>   }
>>
>> +void
>> +guarded_list_push_back_all(struct guarded_list *list,
>> +                           struct ovs_list *nodes, size_t n)
>> +{
> 
> guarded_list_push_back_all() trusts the 'n' parameter matches the actual
> list length, which could lead to incorrect counts if caller passes wrong
> value. I would prefer verifying which is probably as expensive as
> computing it internally.
> 

Ack, I think though it's a shame to have an unbounded list of nodes that
we push one by one under lock only to keep the guarded list under a max 
count, while this max count is not otherwise used.

Would you be ok to change to an ovs_list + mutex, and keep the single op 
move to the flushed nodes?

>> +    ovs_mutex_lock(&list->mutex);
>> +    ovs_list_push_back_all(&list->list, nodes);
>> +    list->n += n;
>> +    ovs_mutex_unlock(&list->mutex);
>> +}
>> +
>>   struct ovs_list *
>>   guarded_list_pop_front(struct guarded_list *list)
>>   {
>> diff --git a/lib/guarded-list.h b/lib/guarded-list.h
>> index 80ce22c12..b575dc425 100644
>> --- a/lib/guarded-list.h
>> +++ b/lib/guarded-list.h
>> @@ -40,6 +40,8 @@ bool guarded_list_is_empty(const struct guarded_list *);
>>
>>   size_t guarded_list_push_back(struct guarded_list *, struct ovs_list *,
>>                                 size_t max);
>> +void guarded_list_push_back_all(struct guarded_list *, struct ovs_list *,
>> +                                size_t n);
>>   struct ovs_list *guarded_list_pop_front(struct guarded_list *);
>>   size_t guarded_list_pop_all(struct guarded_list *, struct ovs_list *);
>>
>> diff --git a/lib/ovs-rcu.c b/lib/ovs-rcu.c
>> index 49afcc55c..54e6c469d 100644
>> --- a/lib/ovs-rcu.c
>> +++ b/lib/ovs-rcu.c
>> @@ -38,7 +38,7 @@ struct ovsrcu_cb {
>>   };
>>
>>   struct ovsrcu_cbset {
>> -    struct ovs_list list_node;
>> +    struct ovsrcu_node rcu_node;
>>       struct ovsrcu_cb *cbs;
>>       size_t n_allocated;
>>       int n_cbs;
>> @@ -49,6 +49,8 @@ struct ovsrcu_perthread {
>>
>>       uint64_t seqno;
>>       struct ovsrcu_cbset *cbset;
>> +    struct ovs_list pending;    /* Thread-local list of ovsrcu_node. */
>> +    size_t n_pending;
> 
> We might not need this, based on my earlier comment regarding
> guarded_list_push_back_all().
> 

Ack.

>>       char name[16];              /* This thread's name. */
>>   };
>>
>> @@ -58,15 +60,15 @@ static pthread_key_t perthread_key;
>>   static struct ovs_list ovsrcu_threads;
>>   static struct ovs_mutex ovsrcu_threads_mutex;
>>
>> -static struct guarded_list flushed_cbsets;
>> -static struct seq *flushed_cbsets_seq;
>> +static struct guarded_list flushed_nodes;
>> +static struct seq *flushed_nodes_seq;
>>
>>   static struct latch postpone_exit;
>>   static struct ovs_barrier postpone_barrier;
>>
>>   static void ovsrcu_init_module(void);
>> -static void ovsrcu_flush_cbset__(struct ovsrcu_perthread *, bool);
>> -static void ovsrcu_flush_cbset(struct ovsrcu_perthread *);
>> +static void ovsrcu_flush_nodes__(struct ovsrcu_perthread *, bool);
>> +static void ovsrcu_flush_nodes(struct ovsrcu_perthread *);
>>   static void ovsrcu_unregister__(struct ovsrcu_perthread *);
>>   static bool ovsrcu_call_postponed(void);
>>   static void *ovsrcu_postpone_thread(void *arg OVS_UNUSED);
>> @@ -85,6 +87,8 @@ ovsrcu_perthread_get(void)
>>           perthread = xmalloc(sizeof *perthread);
>>           perthread->seqno = seq_read(global_seqno);
>>           perthread->cbset = NULL;
>> +        ovs_list_init(&perthread->pending);
>> +        perthread->n_pending = 0;
>>           ovs_strlcpy(perthread->name, name[0] ? name : "main",
>>                       sizeof perthread->name);
>>
>> @@ -153,9 +157,7 @@ ovsrcu_quiesce(void)
>>
>>       perthread = ovsrcu_perthread_get();
>>       perthread->seqno = seq_read(global_seqno);
>> -    if (perthread->cbset) {
>> -        ovsrcu_flush_cbset(perthread);
>> -    }
>> +    ovsrcu_flush_nodes(perthread);
>>       seq_change(global_seqno);
>>
>>       ovsrcu_quiesced();
>> @@ -171,9 +173,7 @@ ovsrcu_try_quiesce(void)
>>       perthread = ovsrcu_perthread_get();
>>       if (!seq_try_lock()) {
>>           perthread->seqno = seq_read(global_seqno);
>> -        if (perthread->cbset) {
>> -            ovsrcu_flush_cbset__(perthread, true);
>> -        }
>> +        ovsrcu_flush_nodes__(perthread, true);
>>           seq_change_protected(global_seqno);
>>           seq_unlock();
>>           ovsrcu_quiesced();
>> @@ -264,10 +264,10 @@ ovsrcu_exit(void)
>>       /* Repeatedly:
>>        *
>>        *    - Wait for a grace period.  One important side effect is to push 
>> the
>> -     *      running thread's cbset into 'flushed_cbsets' so that the next 
>> call
>> +     *      running thread's nodes into 'flushed_nodes' so that the next 
>> call
>>        *      has something to call.
>>        *
>> -     *    - Call all the callbacks in 'flushed_cbsets'.  If there aren't 
>> any,
>> +     *    - Call all the callbacks in 'flushed_nodes'.  If there aren't any,
>>        *      we're done, otherwise the callbacks themselves might have 
>> requested
>>        *      more deferred callbacks so we go around again.
>>        *
>> @@ -282,6 +282,32 @@ ovsrcu_exit(void)
>>       }
>>   }
>>
>> +static void
>> +ovsrcu_run_cbset(void *aux)
> 
> Maybe the name should be more explicit, for example;
>    ovsrcu_cbset_execute_and_free()
> 
> You might be missing OVS_NO_SANITIZE_FUNCTION attribute causing the upstream
> regression failures.
> 

Ack.

>> +{
>> +    struct ovsrcu_cbset *cbset = aux;
>> +    struct ovsrcu_cb *cb;
>> +
>> +    for (cb = cbset->cbs; cb < &cbset->cbs[cbset->n_cbs]; cb++) {
>> +        cb->function(cb->aux);
>> +    }
>> +
> 
> Should this be something like this, as &cbset->cbs[cbset->n_cbs] is
> undefined behaviour when cbset->cbs is NULL.
> 
>        ovs_assert(cbset->n_cbs <= cbset->n_allocated);
> 
>        if (cbset->cbs != NULL && cbset->n_cbs > 0) {
>            for (cb = cbset->cbs; cb < &cbset->cbs[cbset->n_cbs]; cb++) {
>                cb->function(cb->aux);
>            }
>        }
> 
> But I guess this would even be cleaner:
> 
>    ovs_assert(cbset->n_cbs <= cbset->n_allocated);
> 
>    for (size_t i = 0; i < cbset->n_cbs; i++) {
>        cbset->cbs[i].function(cbset->cbs[i].aux);
>    }
> 

Ack.

>> +    free(cbset->cbs);
>> +    free(cbset);
>> +}
>> +
>> +void
>> +ovsrcu_postpone_embedded__(void (*function)(void *aux), void *aux,
>> +                           struct ovsrcu_node *rcu_node)
>> +{
>> +    struct ovsrcu_perthread *perthread = ovsrcu_perthread_get();
>> +
>> +    rcu_node->cb = function;
>> +    rcu_node->aux = aux;
>> +    ovs_list_push_back(&perthread->pending, &rcu_node->list_node);
>> +    perthread->n_pending++;
>> +}
>> +
>>   /* Registers 'function' to be called, passing 'aux' as argument, after the
>>    * next grace period.
>>    *
>> @@ -314,6 +340,7 @@ ovsrcu_postpone__(void (*function)(void *aux), void *aux)
>>           cbset->cbs = xmalloc(MIN_CBS * sizeof *cbset->cbs);
>>           cbset->n_allocated = MIN_CBS;
>>           cbset->n_cbs = 0;
>> +        ovsrcu_postpone_embedded(ovsrcu_run_cbset, cbset, rcu_node);
>>       }
>>
>>       if (cbset->n_cbs == cbset->n_allocated) {
>> @@ -329,24 +356,18 @@ ovsrcu_postpone__(void (*function)(void *aux), void 
>> *aux)
>>   static bool OVS_NO_SANITIZE_FUNCTION
>>   ovsrcu_call_postponed(void)
>>   {
>> -    struct ovsrcu_cbset *cbset;
>> -    struct ovs_list cbsets;
>> +    struct ovs_list nodes = OVS_LIST_INITIALIZER(&nodes);
>> +    struct ovsrcu_node *node;
>>
>> -    guarded_list_pop_all(&flushed_cbsets, &cbsets);
>> -    if (ovs_list_is_empty(&cbsets)) {
>> +    guarded_list_pop_all(&flushed_nodes, &nodes);
>> +    if (ovs_list_is_empty(&nodes)) {
>>           return false;
>>       }
>>
>>       ovsrcu_synchronize();
>>
>> -    LIST_FOR_EACH_POP (cbset, list_node, &cbsets) {
>> -        struct ovsrcu_cb *cb;
>> -
>> -        for (cb = cbset->cbs; cb < &cbset->cbs[cbset->n_cbs]; cb++) {
>> -            cb->function(cb->aux);
>> -        }
>> -        free(cbset->cbs);
>> -        free(cbset);
>> +    LIST_FOR_EACH_POP (node, list_node, &nodes) {
>> +        node->cb(node->aux);
>>       }
>>
>>       return true;
>> @@ -358,9 +379,9 @@ ovsrcu_postpone_thread(void *arg OVS_UNUSED)
>>       pthread_detach(pthread_self());
>>
>>       while (!latch_is_set(&postpone_exit)) {
>> -        uint64_t seqno = seq_read(flushed_cbsets_seq);
>> +        uint64_t cb_seqno = seq_read(flushed_nodes_seq);
>>           if (!ovsrcu_call_postponed()) {
>> -            seq_wait(flushed_cbsets_seq, seqno);
>> +            seq_wait(flushed_nodes_seq, cb_seqno);
>>               latch_wait(&postpone_exit);
>>               poll_block();
>>           }
>> @@ -371,33 +392,36 @@ ovsrcu_postpone_thread(void *arg OVS_UNUSED)
>>   }
>>
>>   static void
>> -ovsrcu_flush_cbset__(struct ovsrcu_perthread *perthread, bool protected)
>> +ovsrcu_flush_nodes__(struct ovsrcu_perthread *perthread, bool protected)
>>   {
>> -    struct ovsrcu_cbset *cbset = perthread->cbset;
>> +    if (ovs_list_is_empty(&perthread->pending)) {
>> +        return;
>> +    }
>>
>> -    if (cbset) {
>> -        guarded_list_push_back(&flushed_cbsets, &cbset->list_node, 
>> SIZE_MAX);
>> -        perthread->cbset = NULL;
>> +    perthread->cbset = NULL;
>> +    guarded_list_push_back_all(&flushed_nodes, &perthread->pending,
>> +                               perthread->n_pending);
>> +    ovs_list_init(&perthread->pending);
> 
> Don't think there is a need to call init. ovs_list_push_back_all() calls
> ovs_list_splice() which leaves the source list empty, making this
> ovs_list_init() call redundant.
> 

Ack.

>> +    perthread->n_pending = 0;
>>
>> -        if (protected) {
>> -            seq_change_protected(flushed_cbsets_seq);
>> -        } else {
>> -            seq_change(flushed_cbsets_seq);
>> -        }
>> +    if (protected) {
>> +        seq_change_protected(flushed_nodes_seq);
>> +    } else {
>> +        seq_change(flushed_nodes_seq);
>>       }
>>   }
>>
>>   static void
>> -ovsrcu_flush_cbset(struct ovsrcu_perthread *perthread)
>> +ovsrcu_flush_nodes(struct ovsrcu_perthread *perthread)
>>   {
>> -    ovsrcu_flush_cbset__(perthread, false);
>> +    ovsrcu_flush_nodes__(perthread, false);
>>   }
>>
>>   static void
>>   ovsrcu_unregister__(struct ovsrcu_perthread *perthread)
>>   {
>> -    if (perthread->cbset) {
>> -        ovsrcu_flush_cbset(perthread);
>> +    if (!ovs_list_is_empty(&perthread->pending)) {
>> +        ovsrcu_flush_nodes(perthread);
>>       }
>>
>>       ovs_mutex_lock(&ovsrcu_threads_mutex);
>> @@ -438,8 +462,8 @@ ovsrcu_init_module(void)
>>           ovs_list_init(&ovsrcu_threads);
>>           ovs_mutex_init(&ovsrcu_threads_mutex);
>>
>> -        guarded_list_init(&flushed_cbsets);
>> -        flushed_cbsets_seq = seq_create();
>> +        guarded_list_init(&flushed_nodes);
>> +        flushed_nodes_seq = seq_create();
>>
>>           ovsthread_once_done(&once);
>>       }
>> diff --git a/lib/ovs-rcu.h b/lib/ovs-rcu.h
>> index a1c15c126..efd43a1a2 100644
>> --- a/lib/ovs-rcu.h
>> +++ b/lib/ovs-rcu.h
>> @@ -125,6 +125,22 @@
>>    *         ovs_mutex_unlock(&mutex);
>>    *     }
>>    *
>> + * As an alternative to ovsrcu_postpone(), the same deferred execution can 
>> be
>> + * achieved using ovsrcu_postpone_embedded():
>> + *
>> + *      struct deferrable {
>> + *          struct ovsrcu_node rcu_node;
>> + *      };
>> + *
>> + *      void
>> + *      deferred_free(struct deferrable *d)
>> + *      {
>> + *          ovsrcu_postpone_embedded(free, d, rcu_node);
>> + *      }
>> + *
>> + * Using embedded fields can be preferred sometimes to avoid the small
>> + * allocations done in ovsrcu_postpone().
>> + *
>>    * In some rare cases an object may not be addressable with a pointer, but 
>> only
>>    * through an array index (e.g. because it's provided by another library). 
>>  It
>>    * is still possible to have RCU semantics by using the ovsrcu_index type.
>> @@ -173,6 +189,8 @@
>>   #include "compiler.h"
>>   #include "ovs-atomic.h"
>>
>> +#include "openvswitch/list.h"
>> +
>>   #if __GNUC__
>>   #define OVSRCU_TYPE(TYPE) struct { ATOMIC(TYPE) p; }
>>   #define OVSRCU_INITIALIZER(VALUE) { VALUE }
>> @@ -256,6 +274,27 @@ void ovsrcu_postpone__(void (*function)(void *aux), 
>> void *aux);
>>        (void) sizeof(*(ARG)),                                     \
>>        ovsrcu_postpone__((void (*)(void *))(FUNCTION), ARG))
>>
>> +struct ovsrcu_node {
>> +    struct ovs_list list_node;
>> +    void (*cb)(void *aux);
>> +    void *aux;
>> +};
>> +
>> +/* Calls FUNCTION passing ARG as its pointer-type argument, which
>> + * contains an 'ovsrcu_node' as a field named MEMBER. The function
> 
> Missing double space after period: "MEMBER. The function" should be
> "MEMBER.  The function".
> 

Ack.

>> + * is called following the next grace period.  See 'Usage' above for an
>> + * example.
> 
> Should this comment also mention that the same ovsrcu_node must not be
> scheduled multiple times?  This restriction is not obvious and violating
> it could cause subtle bugs (use-after-free when the callback executes
> twice and frees the object twice).
> 

Yes it's an important point, will add it.
If we are ok with sparing a bit of memory, maybe we can maintain a flag 
within the node to assert no double-free?

It sounds like a user error though, so not sure it's worth making all 
nodes bigger to detect it.

>> + */
>> +void ovsrcu_postpone_embedded__(void (*function)(void *aux), void *aux,
>> +                                struct ovsrcu_node *node);
>> +#define ovsrcu_postpone_embedded(FUNCTION, ARG, MEMBER)             \
>> +    (/* Verify that ARG is appropriate for FUNCTION. */             \
>> +     (void) sizeof((FUNCTION)(ARG), 1),                             \
>> +     /* Verify that ARG is a pointer type. */                       \
>> +     (void) sizeof(*(ARG)),                                         \
>> +     ovsrcu_postpone_embedded__((void (*)(void *))(FUNCTION), ARG,  \
>> +                                &(ARG)->MEMBER))
>> +
>>   /* An array index protected by RCU semantics.  This is an easier 
>> alternative to
>>    * an RCU protected pointer to a malloc'd int. */
>>   typedef struct { atomic_int v; } ovsrcu_index;
>> diff --git a/tests/test-rcu.c b/tests/test-rcu.c
>> index bb17092bf..26150e7d9 100644
>> --- a/tests/test-rcu.c
>> +++ b/tests/test-rcu.c
>> @@ -17,11 +17,16 @@
>>   #include <config.h>
>>   #undef NDEBUG
>>   #include "fatal-signal.h"
>> +#include "ovs-atomic.h"
>>   #include "ovs-rcu.h"
>>   #include "ovs-thread.h"
>>   #include "ovstest.h"
>> +#include "seq.h"
>> +#include "timeval.h"
>>   #include "util.h"
>>
>> +#include "openvswitch/poll-loop.h"
>> +
>>   static void *
>>   quiescer_main(void *aux OVS_UNUSED)
>>   {
>> @@ -67,10 +72,136 @@ test_rcu_barrier(void)
>>       ovs_assert(count == 10);
>>   }
>>
>> +struct element {
>> +    struct ovsrcu_node rcu_node;
>> +    struct seq *trigger;
>> +    atomic_bool wait;
>> +};
>> +
>> +static void
>> +trigger_cb(void *e_)
>> +{
>> +    struct element *e = (struct element *) e_;
>> +
>> +    seq_change(e->trigger);
> 
> Maybe add a counter in struct element to verify the callback
> executes exactly once, catching potential double-execution bugs.
> 

Ack.

>> +}
>> +
>> +static void *
>> +wait_main(void *aux)
>> +{
>> +    struct element *e = aux;
>> +
>> +    for (;;) {
>> +        bool wait;
>> +
>> +        atomic_read(&e->wait, &wait);
>> +        if (!wait) {
>> +            break;
>> +        }
>> +    }
>> +
>> +    seq_wait(e->trigger, seq_read(e->trigger));
>> +    poll_block();
>> +
>> +    return NULL;
>> +}
>> +
>> +static void
>> +test_rcu_postpone_embedded(bool multithread)
>> +{
>> +    long long int timeout;
>> +    pthread_t waiter;
>> +    struct element e;
>> +    uint64_t seqno;
>> +
>> +    atomic_init(&e.wait, true);
>> +
>> +    if (multithread) {
>> +        waiter = ovs_thread_create("waiter", wait_main, &e);
>> +    }
>> +
>> +    e.trigger = seq_create();
>> +    seqno = seq_read(e.trigger);
>> +
>> +    ovsrcu_postpone_embedded(trigger_cb, &e, rcu_node);
>> +
>> +    /* Check that GC holds out until all threads are quiescent. */
>> +    timeout = time_msec();
>> +    if (multithread) {
>> +        timeout += 200;
>> +    }
>> +    while (time_msec() <= timeout) {
>> +        ovs_assert(seq_read(e.trigger) == seqno);
>> +    }
>> +
>> +    atomic_store(&e.wait, false);
>> +
>> +    seq_wait(e.trigger, seqno);
>> +    poll_timer_wait_until(time_msec() + 200);
>> +    poll_block();
>> +
>> +    /* Verify that GC executed. */
>> +    ovs_assert(seq_read(e.trigger) != seqno);
>> +    seq_destroy(e.trigger);
>> +
>> +    if (multithread) {
>> +        xpthread_join(waiter, NULL);
>> +    }
>> +}
>> +
>> +#define N_ORDER_CBS 5
>> +
>> +struct order_element {
>> +    struct ovsrcu_node rcu_node;
>> +    int id;
>> +    int *log;
>> +    int *log_idx;
>> +};
>> +
>> +static void
>> +order_cb(void *aux)
>> +{
>> +    struct order_element *e = aux;
>> +    e->log[(*e->log_idx)++] = e->id;
>> +}
>> +
>> +static void
>> +test_rcu_ordering(void)
>> +{
> 
> The documentation states "All functions postponed by a single thread are
> guaranteed to execute in the order they were postponed", the test should
> verify this for mixed ovsrcu_postpone() and ovsrcu_postpone_embedded()
> calls.
> 

Currently, ordering is only guaranteed within each 'scheduling types' 
(cbset or embedded).

This is a simpler implementation. I can either

1. Modify the doc to specify this point.

or

2. Modify the scheduling such that postponing into a cbset will trigger
creating a new cbset if an embedded not is already at the tail of the
thread-local node list: this way, each cbset executes in the right
order, at the cost of potentially many cbset under-utilized (wasted of 
memory).

I am assuming you will prefer (2.), if not let me know.

>> +    struct order_element elems[N_ORDER_CBS];
>> +    int log[N_ORDER_CBS];
>> +    int log_idx = 0;
>> +
>> +    for (int i = 0; i < N_ORDER_CBS; i++) {
>> +        elems[i].id = i;
>> +        elems[i].log = log;
>> +        elems[i].log_idx = &log_idx;
>> +        ovsrcu_postpone_embedded(order_cb, &elems[i], rcu_node);
>> +    }
>> +
>> +    ovsrcu_barrier();
>> +
>> +    ovs_assert(log_idx == N_ORDER_CBS);
>> +    for (int i = 0; i < N_ORDER_CBS; i++) {
>> +        if (log[i] != i) {
>> +            ovs_abort(0, "RCU embedded callback ordering violated: "
>> +                      "expected cb %d at position %d, got %d",
>> +                      i, i, log[i]);
>> +        }
>> +    }
>> +}
>> +
>>   static void
>>   test_rcu(int argc OVS_UNUSED, char *argv[] OVS_UNUSED) {
>> +    const bool multithread = true;
>> +
>> +    /* Execute single-threaded check before spawning additional threads. */
>> +    test_rcu_postpone_embedded(!multithread);
>> +    test_rcu_postpone_embedded(multithread);
>> +
>>       test_rcu_quiesce();
>>       test_rcu_barrier();
>> +    test_rcu_ordering();
> 
> test_rcu_ordering() should be tested in both single-threaded and
> multi-threaded modes like test_rcu_postpone_embedded() to verify
> ordering guarantees hold in both RCU execution paths.
> 

Ack.

>>   }
>>
>>   OVSTEST_REGISTER("test-rcu", test_rcu);
> 

_______________________________________________
dev mailing list
[email protected]
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to