On 1 Apr 2026, at 11:13, Eli Britstein wrote: > From: Gaetan Rivet <[email protected]> > > Add a way to schedule executions with the RCU using memory embedded > within the object being scheduled, if applicable. > > This way, freeing a high volume of objects does not require many small > allocations, potentially increasing heap fragmentation and memory > pressure.
Thanks, Gaetan, for following up on this patch. This embedded version looks way nicer. I have a few comments below. Cheers, Eelco > Signed-off-by: Gaetan Rivet <[email protected]> > Co-authored-by: Eli Britstein <[email protected]> > Signed-off-by: Eli Britstein <[email protected]> > --- > lib/guarded-list.c | 10 ++++ > lib/guarded-list.h | 2 + > lib/ovs-rcu.c | 110 ++++++++++++++++++++++--------------- > lib/ovs-rcu.h | 39 ++++++++++++++ > tests/test-rcu.c | 131 +++++++++++++++++++++++++++++++++++++++++++++ > 5 files changed, 249 insertions(+), 43 deletions(-) > > diff --git a/lib/guarded-list.c b/lib/guarded-list.c > index 2186d074e..bb77fb55f 100644 > --- a/lib/guarded-list.c > +++ b/lib/guarded-list.c > @@ -65,6 +65,16 @@ guarded_list_push_back(struct guarded_list *list, > return retval; > } > > +void > +guarded_list_push_back_all(struct guarded_list *list, > + struct ovs_list *nodes, size_t n) > +{ guarded_list_push_back_all() trusts the 'n' parameter matches the actual list length, which could lead to incorrect counts if caller passes wrong value. I would prefer verifying which is probably as expensive as computing it internally. > + ovs_mutex_lock(&list->mutex); > + ovs_list_push_back_all(&list->list, nodes); > + list->n += n; > + ovs_mutex_unlock(&list->mutex); > +} > + > struct ovs_list * > guarded_list_pop_front(struct guarded_list *list) > { > diff --git a/lib/guarded-list.h b/lib/guarded-list.h > index 80ce22c12..b575dc425 100644 > --- a/lib/guarded-list.h > +++ b/lib/guarded-list.h > @@ -40,6 +40,8 @@ bool guarded_list_is_empty(const struct guarded_list *); > > size_t guarded_list_push_back(struct guarded_list *, struct ovs_list *, > size_t max); > +void guarded_list_push_back_all(struct guarded_list *, struct ovs_list *, > + size_t n); > struct ovs_list *guarded_list_pop_front(struct guarded_list *); > size_t guarded_list_pop_all(struct guarded_list *, struct ovs_list *); > > diff --git a/lib/ovs-rcu.c b/lib/ovs-rcu.c > index 49afcc55c..54e6c469d 100644 > --- a/lib/ovs-rcu.c > +++ b/lib/ovs-rcu.c > @@ -38,7 +38,7 @@ struct ovsrcu_cb { > }; > > struct ovsrcu_cbset { > - struct ovs_list list_node; > + struct ovsrcu_node rcu_node; > struct ovsrcu_cb *cbs; > size_t n_allocated; > int n_cbs; > @@ -49,6 +49,8 @@ struct ovsrcu_perthread { > > uint64_t seqno; > struct ovsrcu_cbset *cbset; > + struct ovs_list pending; /* Thread-local list of ovsrcu_node. */ > + size_t n_pending; We might not need this, based on my earlier comment regarding guarded_list_push_back_all(). > char name[16]; /* This thread's name. */ > }; > > @@ -58,15 +60,15 @@ static pthread_key_t perthread_key; > static struct ovs_list ovsrcu_threads; > static struct ovs_mutex ovsrcu_threads_mutex; > > -static struct guarded_list flushed_cbsets; > -static struct seq *flushed_cbsets_seq; > +static struct guarded_list flushed_nodes; > +static struct seq *flushed_nodes_seq; > > static struct latch postpone_exit; > static struct ovs_barrier postpone_barrier; > > static void ovsrcu_init_module(void); > -static void ovsrcu_flush_cbset__(struct ovsrcu_perthread *, bool); > -static void ovsrcu_flush_cbset(struct ovsrcu_perthread *); > +static void ovsrcu_flush_nodes__(struct ovsrcu_perthread *, bool); > +static void ovsrcu_flush_nodes(struct ovsrcu_perthread *); > static void ovsrcu_unregister__(struct ovsrcu_perthread *); > static bool ovsrcu_call_postponed(void); > static void *ovsrcu_postpone_thread(void *arg OVS_UNUSED); > @@ -85,6 +87,8 @@ ovsrcu_perthread_get(void) > perthread = xmalloc(sizeof *perthread); > perthread->seqno = seq_read(global_seqno); > perthread->cbset = NULL; > + ovs_list_init(&perthread->pending); > + perthread->n_pending = 0; > ovs_strlcpy(perthread->name, name[0] ? name : "main", > sizeof perthread->name); > > @@ -153,9 +157,7 @@ ovsrcu_quiesce(void) > > perthread = ovsrcu_perthread_get(); > perthread->seqno = seq_read(global_seqno); > - if (perthread->cbset) { > - ovsrcu_flush_cbset(perthread); > - } > + ovsrcu_flush_nodes(perthread); > seq_change(global_seqno); > > ovsrcu_quiesced(); > @@ -171,9 +173,7 @@ ovsrcu_try_quiesce(void) > perthread = ovsrcu_perthread_get(); > if (!seq_try_lock()) { > perthread->seqno = seq_read(global_seqno); > - if (perthread->cbset) { > - ovsrcu_flush_cbset__(perthread, true); > - } > + ovsrcu_flush_nodes__(perthread, true); > seq_change_protected(global_seqno); > seq_unlock(); > ovsrcu_quiesced(); > @@ -264,10 +264,10 @@ ovsrcu_exit(void) > /* Repeatedly: > * > * - Wait for a grace period. One important side effect is to push > the > - * running thread's cbset into 'flushed_cbsets' so that the next > call > + * running thread's nodes into 'flushed_nodes' so that the next call > * has something to call. > * > - * - Call all the callbacks in 'flushed_cbsets'. If there aren't any, > + * - Call all the callbacks in 'flushed_nodes'. If there aren't any, > * we're done, otherwise the callbacks themselves might have > requested > * more deferred callbacks so we go around again. > * > @@ -282,6 +282,32 @@ ovsrcu_exit(void) > } > } > > +static void > +ovsrcu_run_cbset(void *aux) Maybe the name should be more explicit, for example; ovsrcu_cbset_execute_and_free() You might be missing OVS_NO_SANITIZE_FUNCTION attribute causing the upstream regression failures. > +{ > + struct ovsrcu_cbset *cbset = aux; > + struct ovsrcu_cb *cb; > + > + for (cb = cbset->cbs; cb < &cbset->cbs[cbset->n_cbs]; cb++) { > + cb->function(cb->aux); > + } > + Should this be something like this, as &cbset->cbs[cbset->n_cbs] is undefined behaviour when cbset->cbs is NULL. ovs_assert(cbset->n_cbs <= cbset->n_allocated); if (cbset->cbs != NULL && cbset->n_cbs > 0) { for (cb = cbset->cbs; cb < &cbset->cbs[cbset->n_cbs]; cb++) { cb->function(cb->aux); } } But I guess this would even be cleaner: ovs_assert(cbset->n_cbs <= cbset->n_allocated); for (size_t i = 0; i < cbset->n_cbs; i++) { cbset->cbs[i].function(cbset->cbs[i].aux); } > + free(cbset->cbs); > + free(cbset); > +} > + > +void > +ovsrcu_postpone_embedded__(void (*function)(void *aux), void *aux, > + struct ovsrcu_node *rcu_node) > +{ > + struct ovsrcu_perthread *perthread = ovsrcu_perthread_get(); > + > + rcu_node->cb = function; > + rcu_node->aux = aux; > + ovs_list_push_back(&perthread->pending, &rcu_node->list_node); > + perthread->n_pending++; > +} > + > /* Registers 'function' to be called, passing 'aux' as argument, after the > * next grace period. > * > @@ -314,6 +340,7 @@ ovsrcu_postpone__(void (*function)(void *aux), void *aux) > cbset->cbs = xmalloc(MIN_CBS * sizeof *cbset->cbs); > cbset->n_allocated = MIN_CBS; > cbset->n_cbs = 0; > + ovsrcu_postpone_embedded(ovsrcu_run_cbset, cbset, rcu_node); > } > > if (cbset->n_cbs == cbset->n_allocated) { > @@ -329,24 +356,18 @@ ovsrcu_postpone__(void (*function)(void *aux), void > *aux) > static bool OVS_NO_SANITIZE_FUNCTION > ovsrcu_call_postponed(void) > { > - struct ovsrcu_cbset *cbset; > - struct ovs_list cbsets; > + struct ovs_list nodes = OVS_LIST_INITIALIZER(&nodes); > + struct ovsrcu_node *node; > > - guarded_list_pop_all(&flushed_cbsets, &cbsets); > - if (ovs_list_is_empty(&cbsets)) { > + guarded_list_pop_all(&flushed_nodes, &nodes); > + if (ovs_list_is_empty(&nodes)) { > return false; > } > > ovsrcu_synchronize(); > > - LIST_FOR_EACH_POP (cbset, list_node, &cbsets) { > - struct ovsrcu_cb *cb; > - > - for (cb = cbset->cbs; cb < &cbset->cbs[cbset->n_cbs]; cb++) { > - cb->function(cb->aux); > - } > - free(cbset->cbs); > - free(cbset); > + LIST_FOR_EACH_POP (node, list_node, &nodes) { > + node->cb(node->aux); > } > > return true; > @@ -358,9 +379,9 @@ ovsrcu_postpone_thread(void *arg OVS_UNUSED) > pthread_detach(pthread_self()); > > while (!latch_is_set(&postpone_exit)) { > - uint64_t seqno = seq_read(flushed_cbsets_seq); > + uint64_t cb_seqno = seq_read(flushed_nodes_seq); > if (!ovsrcu_call_postponed()) { > - seq_wait(flushed_cbsets_seq, seqno); > + seq_wait(flushed_nodes_seq, cb_seqno); > latch_wait(&postpone_exit); > poll_block(); > } > @@ -371,33 +392,36 @@ ovsrcu_postpone_thread(void *arg OVS_UNUSED) > } > > static void > -ovsrcu_flush_cbset__(struct ovsrcu_perthread *perthread, bool protected) > +ovsrcu_flush_nodes__(struct ovsrcu_perthread *perthread, bool protected) > { > - struct ovsrcu_cbset *cbset = perthread->cbset; > + if (ovs_list_is_empty(&perthread->pending)) { > + return; > + } > > - if (cbset) { > - guarded_list_push_back(&flushed_cbsets, &cbset->list_node, SIZE_MAX); > - perthread->cbset = NULL; > + perthread->cbset = NULL; > + guarded_list_push_back_all(&flushed_nodes, &perthread->pending, > + perthread->n_pending); > + ovs_list_init(&perthread->pending); Don't think there is a need to call init. ovs_list_push_back_all() calls ovs_list_splice() which leaves the source list empty, making this ovs_list_init() call redundant. > + perthread->n_pending = 0; > > - if (protected) { > - seq_change_protected(flushed_cbsets_seq); > - } else { > - seq_change(flushed_cbsets_seq); > - } > + if (protected) { > + seq_change_protected(flushed_nodes_seq); > + } else { > + seq_change(flushed_nodes_seq); > } > } > > static void > -ovsrcu_flush_cbset(struct ovsrcu_perthread *perthread) > +ovsrcu_flush_nodes(struct ovsrcu_perthread *perthread) > { > - ovsrcu_flush_cbset__(perthread, false); > + ovsrcu_flush_nodes__(perthread, false); > } > > static void > ovsrcu_unregister__(struct ovsrcu_perthread *perthread) > { > - if (perthread->cbset) { > - ovsrcu_flush_cbset(perthread); > + if (!ovs_list_is_empty(&perthread->pending)) { > + ovsrcu_flush_nodes(perthread); > } > > ovs_mutex_lock(&ovsrcu_threads_mutex); > @@ -438,8 +462,8 @@ ovsrcu_init_module(void) > ovs_list_init(&ovsrcu_threads); > ovs_mutex_init(&ovsrcu_threads_mutex); > > - guarded_list_init(&flushed_cbsets); > - flushed_cbsets_seq = seq_create(); > + guarded_list_init(&flushed_nodes); > + flushed_nodes_seq = seq_create(); > > ovsthread_once_done(&once); > } > diff --git a/lib/ovs-rcu.h b/lib/ovs-rcu.h > index a1c15c126..efd43a1a2 100644 > --- a/lib/ovs-rcu.h > +++ b/lib/ovs-rcu.h > @@ -125,6 +125,22 @@ > * ovs_mutex_unlock(&mutex); > * } > * > + * As an alternative to ovsrcu_postpone(), the same deferred execution can be > + * achieved using ovsrcu_postpone_embedded(): > + * > + * struct deferrable { > + * struct ovsrcu_node rcu_node; > + * }; > + * > + * void > + * deferred_free(struct deferrable *d) > + * { > + * ovsrcu_postpone_embedded(free, d, rcu_node); > + * } > + * > + * Using embedded fields can be preferred sometimes to avoid the small > + * allocations done in ovsrcu_postpone(). > + * > * In some rare cases an object may not be addressable with a pointer, but > only > * through an array index (e.g. because it's provided by another library). > It > * is still possible to have RCU semantics by using the ovsrcu_index type. > @@ -173,6 +189,8 @@ > #include "compiler.h" > #include "ovs-atomic.h" > > +#include "openvswitch/list.h" > + > #if __GNUC__ > #define OVSRCU_TYPE(TYPE) struct { ATOMIC(TYPE) p; } > #define OVSRCU_INITIALIZER(VALUE) { VALUE } > @@ -256,6 +274,27 @@ void ovsrcu_postpone__(void (*function)(void *aux), void > *aux); > (void) sizeof(*(ARG)), \ > ovsrcu_postpone__((void (*)(void *))(FUNCTION), ARG)) > > +struct ovsrcu_node { > + struct ovs_list list_node; > + void (*cb)(void *aux); > + void *aux; > +}; > + > +/* Calls FUNCTION passing ARG as its pointer-type argument, which > + * contains an 'ovsrcu_node' as a field named MEMBER. The function Missing double space after period: "MEMBER. The function" should be "MEMBER. The function". > + * is called following the next grace period. See 'Usage' above for an > + * example. Should this comment also mention that the same ovsrcu_node must not be scheduled multiple times? This restriction is not obvious and violating it could cause subtle bugs (use-after-free when the callback executes twice and frees the object twice). > + */ > +void ovsrcu_postpone_embedded__(void (*function)(void *aux), void *aux, > + struct ovsrcu_node *node); > +#define ovsrcu_postpone_embedded(FUNCTION, ARG, MEMBER) \ > + (/* Verify that ARG is appropriate for FUNCTION. */ \ > + (void) sizeof((FUNCTION)(ARG), 1), \ > + /* Verify that ARG is a pointer type. */ \ > + (void) sizeof(*(ARG)), \ > + ovsrcu_postpone_embedded__((void (*)(void *))(FUNCTION), ARG, \ > + &(ARG)->MEMBER)) > + > /* An array index protected by RCU semantics. This is an easier alternative > to > * an RCU protected pointer to a malloc'd int. */ > typedef struct { atomic_int v; } ovsrcu_index; > diff --git a/tests/test-rcu.c b/tests/test-rcu.c > index bb17092bf..26150e7d9 100644 > --- a/tests/test-rcu.c > +++ b/tests/test-rcu.c > @@ -17,11 +17,16 @@ > #include <config.h> > #undef NDEBUG > #include "fatal-signal.h" > +#include "ovs-atomic.h" > #include "ovs-rcu.h" > #include "ovs-thread.h" > #include "ovstest.h" > +#include "seq.h" > +#include "timeval.h" > #include "util.h" > > +#include "openvswitch/poll-loop.h" > + > static void * > quiescer_main(void *aux OVS_UNUSED) > { > @@ -67,10 +72,136 @@ test_rcu_barrier(void) > ovs_assert(count == 10); > } > > +struct element { > + struct ovsrcu_node rcu_node; > + struct seq *trigger; > + atomic_bool wait; > +}; > + > +static void > +trigger_cb(void *e_) > +{ > + struct element *e = (struct element *) e_; > + > + seq_change(e->trigger); Maybe add a counter in struct element to verify the callback executes exactly once, catching potential double-execution bugs. > +} > + > +static void * > +wait_main(void *aux) > +{ > + struct element *e = aux; > + > + for (;;) { > + bool wait; > + > + atomic_read(&e->wait, &wait); > + if (!wait) { > + break; > + } > + } > + > + seq_wait(e->trigger, seq_read(e->trigger)); > + poll_block(); > + > + return NULL; > +} > + > +static void > +test_rcu_postpone_embedded(bool multithread) > +{ > + long long int timeout; > + pthread_t waiter; > + struct element e; > + uint64_t seqno; > + > + atomic_init(&e.wait, true); > + > + if (multithread) { > + waiter = ovs_thread_create("waiter", wait_main, &e); > + } > + > + e.trigger = seq_create(); > + seqno = seq_read(e.trigger); > + > + ovsrcu_postpone_embedded(trigger_cb, &e, rcu_node); > + > + /* Check that GC holds out until all threads are quiescent. */ > + timeout = time_msec(); > + if (multithread) { > + timeout += 200; > + } > + while (time_msec() <= timeout) { > + ovs_assert(seq_read(e.trigger) == seqno); > + } > + > + atomic_store(&e.wait, false); > + > + seq_wait(e.trigger, seqno); > + poll_timer_wait_until(time_msec() + 200); > + poll_block(); > + > + /* Verify that GC executed. */ > + ovs_assert(seq_read(e.trigger) != seqno); > + seq_destroy(e.trigger); > + > + if (multithread) { > + xpthread_join(waiter, NULL); > + } > +} > + > +#define N_ORDER_CBS 5 > + > +struct order_element { > + struct ovsrcu_node rcu_node; > + int id; > + int *log; > + int *log_idx; > +}; > + > +static void > +order_cb(void *aux) > +{ > + struct order_element *e = aux; > + e->log[(*e->log_idx)++] = e->id; > +} > + > +static void > +test_rcu_ordering(void) > +{ The documentation states "All functions postponed by a single thread are guaranteed to execute in the order they were postponed", the test should verify this for mixed ovsrcu_postpone() and ovsrcu_postpone_embedded() calls. > + struct order_element elems[N_ORDER_CBS]; > + int log[N_ORDER_CBS]; > + int log_idx = 0; > + > + for (int i = 0; i < N_ORDER_CBS; i++) { > + elems[i].id = i; > + elems[i].log = log; > + elems[i].log_idx = &log_idx; > + ovsrcu_postpone_embedded(order_cb, &elems[i], rcu_node); > + } > + > + ovsrcu_barrier(); > + > + ovs_assert(log_idx == N_ORDER_CBS); > + for (int i = 0; i < N_ORDER_CBS; i++) { > + if (log[i] != i) { > + ovs_abort(0, "RCU embedded callback ordering violated: " > + "expected cb %d at position %d, got %d", > + i, i, log[i]); > + } > + } > +} > + > static void > test_rcu(int argc OVS_UNUSED, char *argv[] OVS_UNUSED) { > + const bool multithread = true; > + > + /* Execute single-threaded check before spawning additional threads. */ > + test_rcu_postpone_embedded(!multithread); > + test_rcu_postpone_embedded(multithread); > + > test_rcu_quiesce(); > test_rcu_barrier(); > + test_rcu_ordering(); test_rcu_ordering() should be tested in both single-threaded and multi-threaded modes like test_rcu_postpone_embedded() to verify ordering guarantees hold in both RCU execution paths. > } > > OVSTEST_REGISTER("test-rcu", test_rcu); _______________________________________________ dev mailing list [email protected] https://mail.openvswitch.org/mailman/listinfo/ovs-dev
