On 1 Apr 2026, at 11:13, Eli Britstein wrote:

> From: Gaetan Rivet <[email protected]>
>
> Add a way to schedule executions with the RCU using memory embedded
> within the object being scheduled, if applicable.
>
> This way, freeing a high volume of objects does not require many small
> allocations, potentially increasing heap fragmentation and memory
> pressure.

Thanks, Gaetan, for following up on this patch. This embedded version looks
way nicer. I have a few comments below.

Cheers,

Eelco

> Signed-off-by: Gaetan Rivet <[email protected]>
> Co-authored-by: Eli Britstein <[email protected]>
> Signed-off-by: Eli Britstein <[email protected]>
> ---
>  lib/guarded-list.c |  10 ++++
>  lib/guarded-list.h |   2 +
>  lib/ovs-rcu.c      | 110 ++++++++++++++++++++++---------------
>  lib/ovs-rcu.h      |  39 ++++++++++++++
>  tests/test-rcu.c   | 131 +++++++++++++++++++++++++++++++++++++++++++++
>  5 files changed, 249 insertions(+), 43 deletions(-)
>
> diff --git a/lib/guarded-list.c b/lib/guarded-list.c
> index 2186d074e..bb77fb55f 100644
> --- a/lib/guarded-list.c
> +++ b/lib/guarded-list.c
> @@ -65,6 +65,16 @@ guarded_list_push_back(struct guarded_list *list,
>      return retval;
>  }
>
> +void
> +guarded_list_push_back_all(struct guarded_list *list,
> +                           struct ovs_list *nodes, size_t n)
> +{

guarded_list_push_back_all() trusts the 'n' parameter matches the actual
list length, which could lead to incorrect counts if caller passes wrong
value. I would prefer verifying which is probably as expensive as
computing it internally.

> +    ovs_mutex_lock(&list->mutex);
> +    ovs_list_push_back_all(&list->list, nodes);
> +    list->n += n;
> +    ovs_mutex_unlock(&list->mutex);
> +}
> +
>  struct ovs_list *
>  guarded_list_pop_front(struct guarded_list *list)
>  {
> diff --git a/lib/guarded-list.h b/lib/guarded-list.h
> index 80ce22c12..b575dc425 100644
> --- a/lib/guarded-list.h
> +++ b/lib/guarded-list.h
> @@ -40,6 +40,8 @@ bool guarded_list_is_empty(const struct guarded_list *);
>
>  size_t guarded_list_push_back(struct guarded_list *, struct ovs_list *,
>                                size_t max);
> +void guarded_list_push_back_all(struct guarded_list *, struct ovs_list *,
> +                                size_t n);
>  struct ovs_list *guarded_list_pop_front(struct guarded_list *);
>  size_t guarded_list_pop_all(struct guarded_list *, struct ovs_list *);
>
> diff --git a/lib/ovs-rcu.c b/lib/ovs-rcu.c
> index 49afcc55c..54e6c469d 100644
> --- a/lib/ovs-rcu.c
> +++ b/lib/ovs-rcu.c
> @@ -38,7 +38,7 @@ struct ovsrcu_cb {
>  };
>
>  struct ovsrcu_cbset {
> -    struct ovs_list list_node;
> +    struct ovsrcu_node rcu_node;
>      struct ovsrcu_cb *cbs;
>      size_t n_allocated;
>      int n_cbs;
> @@ -49,6 +49,8 @@ struct ovsrcu_perthread {
>
>      uint64_t seqno;
>      struct ovsrcu_cbset *cbset;
> +    struct ovs_list pending;    /* Thread-local list of ovsrcu_node. */
> +    size_t n_pending;

We might not need this, based on my earlier comment regarding
guarded_list_push_back_all().

>      char name[16];              /* This thread's name. */
>  };
>
> @@ -58,15 +60,15 @@ static pthread_key_t perthread_key;
>  static struct ovs_list ovsrcu_threads;
>  static struct ovs_mutex ovsrcu_threads_mutex;
>
> -static struct guarded_list flushed_cbsets;
> -static struct seq *flushed_cbsets_seq;
> +static struct guarded_list flushed_nodes;
> +static struct seq *flushed_nodes_seq;
>
>  static struct latch postpone_exit;
>  static struct ovs_barrier postpone_barrier;
>
>  static void ovsrcu_init_module(void);
> -static void ovsrcu_flush_cbset__(struct ovsrcu_perthread *, bool);
> -static void ovsrcu_flush_cbset(struct ovsrcu_perthread *);
> +static void ovsrcu_flush_nodes__(struct ovsrcu_perthread *, bool);
> +static void ovsrcu_flush_nodes(struct ovsrcu_perthread *);
>  static void ovsrcu_unregister__(struct ovsrcu_perthread *);
>  static bool ovsrcu_call_postponed(void);
>  static void *ovsrcu_postpone_thread(void *arg OVS_UNUSED);
> @@ -85,6 +87,8 @@ ovsrcu_perthread_get(void)
>          perthread = xmalloc(sizeof *perthread);
>          perthread->seqno = seq_read(global_seqno);
>          perthread->cbset = NULL;
> +        ovs_list_init(&perthread->pending);
> +        perthread->n_pending = 0;
>          ovs_strlcpy(perthread->name, name[0] ? name : "main",
>                      sizeof perthread->name);
>
> @@ -153,9 +157,7 @@ ovsrcu_quiesce(void)
>
>      perthread = ovsrcu_perthread_get();
>      perthread->seqno = seq_read(global_seqno);
> -    if (perthread->cbset) {
> -        ovsrcu_flush_cbset(perthread);
> -    }
> +    ovsrcu_flush_nodes(perthread);
>      seq_change(global_seqno);
>
>      ovsrcu_quiesced();
> @@ -171,9 +173,7 @@ ovsrcu_try_quiesce(void)
>      perthread = ovsrcu_perthread_get();
>      if (!seq_try_lock()) {
>          perthread->seqno = seq_read(global_seqno);
> -        if (perthread->cbset) {
> -            ovsrcu_flush_cbset__(perthread, true);
> -        }
> +        ovsrcu_flush_nodes__(perthread, true);
>          seq_change_protected(global_seqno);
>          seq_unlock();
>          ovsrcu_quiesced();
> @@ -264,10 +264,10 @@ ovsrcu_exit(void)
>      /* Repeatedly:
>       *
>       *    - Wait for a grace period.  One important side effect is to push 
> the
> -     *      running thread's cbset into 'flushed_cbsets' so that the next 
> call
> +     *      running thread's nodes into 'flushed_nodes' so that the next call
>       *      has something to call.
>       *
> -     *    - Call all the callbacks in 'flushed_cbsets'.  If there aren't any,
> +     *    - Call all the callbacks in 'flushed_nodes'.  If there aren't any,
>       *      we're done, otherwise the callbacks themselves might have 
> requested
>       *      more deferred callbacks so we go around again.
>       *
> @@ -282,6 +282,32 @@ ovsrcu_exit(void)
>      }
>  }
>
> +static void
> +ovsrcu_run_cbset(void *aux)

Maybe the name should be more explicit, for example;
  ovsrcu_cbset_execute_and_free()

You might be missing OVS_NO_SANITIZE_FUNCTION attribute causing the upstream
regression failures.

> +{
> +    struct ovsrcu_cbset *cbset = aux;
> +    struct ovsrcu_cb *cb;
> +
> +    for (cb = cbset->cbs; cb < &cbset->cbs[cbset->n_cbs]; cb++) {
> +        cb->function(cb->aux);
> +    }
> +

Should this be something like this, as &cbset->cbs[cbset->n_cbs] is
undefined behaviour when cbset->cbs is NULL.

      ovs_assert(cbset->n_cbs <= cbset->n_allocated);

      if (cbset->cbs != NULL && cbset->n_cbs > 0) {
          for (cb = cbset->cbs; cb < &cbset->cbs[cbset->n_cbs]; cb++) {
              cb->function(cb->aux);
          }
      }

But I guess this would even be cleaner:

  ovs_assert(cbset->n_cbs <= cbset->n_allocated);

  for (size_t i = 0; i < cbset->n_cbs; i++) {
      cbset->cbs[i].function(cbset->cbs[i].aux);
  }

> +    free(cbset->cbs);
> +    free(cbset);
> +}
> +
> +void
> +ovsrcu_postpone_embedded__(void (*function)(void *aux), void *aux,
> +                           struct ovsrcu_node *rcu_node)
> +{
> +    struct ovsrcu_perthread *perthread = ovsrcu_perthread_get();
> +
> +    rcu_node->cb = function;
> +    rcu_node->aux = aux;
> +    ovs_list_push_back(&perthread->pending, &rcu_node->list_node);
> +    perthread->n_pending++;
> +}
> +
>  /* Registers 'function' to be called, passing 'aux' as argument, after the
>   * next grace period.
>   *
> @@ -314,6 +340,7 @@ ovsrcu_postpone__(void (*function)(void *aux), void *aux)
>          cbset->cbs = xmalloc(MIN_CBS * sizeof *cbset->cbs);
>          cbset->n_allocated = MIN_CBS;
>          cbset->n_cbs = 0;
> +        ovsrcu_postpone_embedded(ovsrcu_run_cbset, cbset, rcu_node);
>      }
>
>      if (cbset->n_cbs == cbset->n_allocated) {
> @@ -329,24 +356,18 @@ ovsrcu_postpone__(void (*function)(void *aux), void 
> *aux)
>  static bool OVS_NO_SANITIZE_FUNCTION
>  ovsrcu_call_postponed(void)
>  {
> -    struct ovsrcu_cbset *cbset;
> -    struct ovs_list cbsets;
> +    struct ovs_list nodes = OVS_LIST_INITIALIZER(&nodes);
> +    struct ovsrcu_node *node;
>
> -    guarded_list_pop_all(&flushed_cbsets, &cbsets);
> -    if (ovs_list_is_empty(&cbsets)) {
> +    guarded_list_pop_all(&flushed_nodes, &nodes);
> +    if (ovs_list_is_empty(&nodes)) {
>          return false;
>      }
>
>      ovsrcu_synchronize();
>
> -    LIST_FOR_EACH_POP (cbset, list_node, &cbsets) {
> -        struct ovsrcu_cb *cb;
> -
> -        for (cb = cbset->cbs; cb < &cbset->cbs[cbset->n_cbs]; cb++) {
> -            cb->function(cb->aux);
> -        }
> -        free(cbset->cbs);
> -        free(cbset);
> +    LIST_FOR_EACH_POP (node, list_node, &nodes) {
> +        node->cb(node->aux);
>      }
>
>      return true;
> @@ -358,9 +379,9 @@ ovsrcu_postpone_thread(void *arg OVS_UNUSED)
>      pthread_detach(pthread_self());
>
>      while (!latch_is_set(&postpone_exit)) {
> -        uint64_t seqno = seq_read(flushed_cbsets_seq);
> +        uint64_t cb_seqno = seq_read(flushed_nodes_seq);
>          if (!ovsrcu_call_postponed()) {
> -            seq_wait(flushed_cbsets_seq, seqno);
> +            seq_wait(flushed_nodes_seq, cb_seqno);
>              latch_wait(&postpone_exit);
>              poll_block();
>          }
> @@ -371,33 +392,36 @@ ovsrcu_postpone_thread(void *arg OVS_UNUSED)
>  }
>
>  static void
> -ovsrcu_flush_cbset__(struct ovsrcu_perthread *perthread, bool protected)
> +ovsrcu_flush_nodes__(struct ovsrcu_perthread *perthread, bool protected)
>  {
> -    struct ovsrcu_cbset *cbset = perthread->cbset;
> +    if (ovs_list_is_empty(&perthread->pending)) {
> +        return;
> +    }
>
> -    if (cbset) {
> -        guarded_list_push_back(&flushed_cbsets, &cbset->list_node, SIZE_MAX);
> -        perthread->cbset = NULL;
> +    perthread->cbset = NULL;
> +    guarded_list_push_back_all(&flushed_nodes, &perthread->pending,
> +                               perthread->n_pending);
> +    ovs_list_init(&perthread->pending);

Don't think there is a need to call init. ovs_list_push_back_all() calls
ovs_list_splice() which leaves the source list empty, making this
ovs_list_init() call redundant.

> +    perthread->n_pending = 0;
>
> -        if (protected) {
> -            seq_change_protected(flushed_cbsets_seq);
> -        } else {
> -            seq_change(flushed_cbsets_seq);
> -        }
> +    if (protected) {
> +        seq_change_protected(flushed_nodes_seq);
> +    } else {
> +        seq_change(flushed_nodes_seq);
>      }
>  }
>
>  static void
> -ovsrcu_flush_cbset(struct ovsrcu_perthread *perthread)
> +ovsrcu_flush_nodes(struct ovsrcu_perthread *perthread)
>  {
> -    ovsrcu_flush_cbset__(perthread, false);
> +    ovsrcu_flush_nodes__(perthread, false);
>  }
>
>  static void
>  ovsrcu_unregister__(struct ovsrcu_perthread *perthread)
>  {
> -    if (perthread->cbset) {
> -        ovsrcu_flush_cbset(perthread);
> +    if (!ovs_list_is_empty(&perthread->pending)) {
> +        ovsrcu_flush_nodes(perthread);
>      }
>
>      ovs_mutex_lock(&ovsrcu_threads_mutex);
> @@ -438,8 +462,8 @@ ovsrcu_init_module(void)
>          ovs_list_init(&ovsrcu_threads);
>          ovs_mutex_init(&ovsrcu_threads_mutex);
>
> -        guarded_list_init(&flushed_cbsets);
> -        flushed_cbsets_seq = seq_create();
> +        guarded_list_init(&flushed_nodes);
> +        flushed_nodes_seq = seq_create();
>
>          ovsthread_once_done(&once);
>      }
> diff --git a/lib/ovs-rcu.h b/lib/ovs-rcu.h
> index a1c15c126..efd43a1a2 100644
> --- a/lib/ovs-rcu.h
> +++ b/lib/ovs-rcu.h
> @@ -125,6 +125,22 @@
>   *         ovs_mutex_unlock(&mutex);
>   *     }
>   *
> + * As an alternative to ovsrcu_postpone(), the same deferred execution can be
> + * achieved using ovsrcu_postpone_embedded():
> + *
> + *      struct deferrable {
> + *          struct ovsrcu_node rcu_node;
> + *      };
> + *
> + *      void
> + *      deferred_free(struct deferrable *d)
> + *      {
> + *          ovsrcu_postpone_embedded(free, d, rcu_node);
> + *      }
> + *
> + * Using embedded fields can be preferred sometimes to avoid the small
> + * allocations done in ovsrcu_postpone().
> + *
>   * In some rare cases an object may not be addressable with a pointer, but 
> only
>   * through an array index (e.g. because it's provided by another library).  
> It
>   * is still possible to have RCU semantics by using the ovsrcu_index type.
> @@ -173,6 +189,8 @@
>  #include "compiler.h"
>  #include "ovs-atomic.h"
>
> +#include "openvswitch/list.h"
> +
>  #if __GNUC__
>  #define OVSRCU_TYPE(TYPE) struct { ATOMIC(TYPE) p; }
>  #define OVSRCU_INITIALIZER(VALUE) { VALUE }
> @@ -256,6 +274,27 @@ void ovsrcu_postpone__(void (*function)(void *aux), void 
> *aux);
>       (void) sizeof(*(ARG)),                                     \
>       ovsrcu_postpone__((void (*)(void *))(FUNCTION), ARG))
>
> +struct ovsrcu_node {
> +    struct ovs_list list_node;
> +    void (*cb)(void *aux);
> +    void *aux;
> +};
> +
> +/* Calls FUNCTION passing ARG as its pointer-type argument, which
> + * contains an 'ovsrcu_node' as a field named MEMBER. The function

Missing double space after period: "MEMBER. The function" should be
"MEMBER.  The function".

> + * is called following the next grace period.  See 'Usage' above for an
> + * example.

Should this comment also mention that the same ovsrcu_node must not be
scheduled multiple times?  This restriction is not obvious and violating
it could cause subtle bugs (use-after-free when the callback executes
twice and frees the object twice).

> + */
> +void ovsrcu_postpone_embedded__(void (*function)(void *aux), void *aux,
> +                                struct ovsrcu_node *node);
> +#define ovsrcu_postpone_embedded(FUNCTION, ARG, MEMBER)             \
> +    (/* Verify that ARG is appropriate for FUNCTION. */             \
> +     (void) sizeof((FUNCTION)(ARG), 1),                             \
> +     /* Verify that ARG is a pointer type. */                       \
> +     (void) sizeof(*(ARG)),                                         \
> +     ovsrcu_postpone_embedded__((void (*)(void *))(FUNCTION), ARG,  \
> +                                &(ARG)->MEMBER))
> +
>  /* An array index protected by RCU semantics.  This is an easier alternative 
> to
>   * an RCU protected pointer to a malloc'd int. */
>  typedef struct { atomic_int v; } ovsrcu_index;
> diff --git a/tests/test-rcu.c b/tests/test-rcu.c
> index bb17092bf..26150e7d9 100644
> --- a/tests/test-rcu.c
> +++ b/tests/test-rcu.c
> @@ -17,11 +17,16 @@
>  #include <config.h>
>  #undef NDEBUG
>  #include "fatal-signal.h"
> +#include "ovs-atomic.h"
>  #include "ovs-rcu.h"
>  #include "ovs-thread.h"
>  #include "ovstest.h"
> +#include "seq.h"
> +#include "timeval.h"
>  #include "util.h"
>
> +#include "openvswitch/poll-loop.h"
> +
>  static void *
>  quiescer_main(void *aux OVS_UNUSED)
>  {
> @@ -67,10 +72,136 @@ test_rcu_barrier(void)
>      ovs_assert(count == 10);
>  }
>
> +struct element {
> +    struct ovsrcu_node rcu_node;
> +    struct seq *trigger;
> +    atomic_bool wait;
> +};
> +
> +static void
> +trigger_cb(void *e_)
> +{
> +    struct element *e = (struct element *) e_;
> +
> +    seq_change(e->trigger);

Maybe add a counter in struct element to verify the callback
executes exactly once, catching potential double-execution bugs.

> +}
> +
> +static void *
> +wait_main(void *aux)
> +{
> +    struct element *e = aux;
> +
> +    for (;;) {
> +        bool wait;
> +
> +        atomic_read(&e->wait, &wait);
> +        if (!wait) {
> +            break;
> +        }
> +    }
> +
> +    seq_wait(e->trigger, seq_read(e->trigger));
> +    poll_block();
> +
> +    return NULL;
> +}
> +
> +static void
> +test_rcu_postpone_embedded(bool multithread)
> +{
> +    long long int timeout;
> +    pthread_t waiter;
> +    struct element e;
> +    uint64_t seqno;
> +
> +    atomic_init(&e.wait, true);
> +
> +    if (multithread) {
> +        waiter = ovs_thread_create("waiter", wait_main, &e);
> +    }
> +
> +    e.trigger = seq_create();
> +    seqno = seq_read(e.trigger);
> +
> +    ovsrcu_postpone_embedded(trigger_cb, &e, rcu_node);
> +
> +    /* Check that GC holds out until all threads are quiescent. */
> +    timeout = time_msec();
> +    if (multithread) {
> +        timeout += 200;
> +    }
> +    while (time_msec() <= timeout) {
> +        ovs_assert(seq_read(e.trigger) == seqno);
> +    }
> +
> +    atomic_store(&e.wait, false);
> +
> +    seq_wait(e.trigger, seqno);
> +    poll_timer_wait_until(time_msec() + 200);
> +    poll_block();
> +
> +    /* Verify that GC executed. */
> +    ovs_assert(seq_read(e.trigger) != seqno);
> +    seq_destroy(e.trigger);
> +
> +    if (multithread) {
> +        xpthread_join(waiter, NULL);
> +    }
> +}
> +
> +#define N_ORDER_CBS 5
> +
> +struct order_element {
> +    struct ovsrcu_node rcu_node;
> +    int id;
> +    int *log;
> +    int *log_idx;
> +};
> +
> +static void
> +order_cb(void *aux)
> +{
> +    struct order_element *e = aux;
> +    e->log[(*e->log_idx)++] = e->id;
> +}
> +
> +static void
> +test_rcu_ordering(void)
> +{

The documentation states "All functions postponed by a single thread are
guaranteed to execute in the order they were postponed", the test should
verify this for mixed ovsrcu_postpone() and ovsrcu_postpone_embedded()
calls.

> +    struct order_element elems[N_ORDER_CBS];
> +    int log[N_ORDER_CBS];
> +    int log_idx = 0;
> +
> +    for (int i = 0; i < N_ORDER_CBS; i++) {
> +        elems[i].id = i;
> +        elems[i].log = log;
> +        elems[i].log_idx = &log_idx;
> +        ovsrcu_postpone_embedded(order_cb, &elems[i], rcu_node);
> +    }
> +
> +    ovsrcu_barrier();
> +
> +    ovs_assert(log_idx == N_ORDER_CBS);
> +    for (int i = 0; i < N_ORDER_CBS; i++) {
> +        if (log[i] != i) {
> +            ovs_abort(0, "RCU embedded callback ordering violated: "
> +                      "expected cb %d at position %d, got %d",
> +                      i, i, log[i]);
> +        }
> +    }
> +}
> +
>  static void
>  test_rcu(int argc OVS_UNUSED, char *argv[] OVS_UNUSED) {
> +    const bool multithread = true;
> +
> +    /* Execute single-threaded check before spawning additional threads. */
> +    test_rcu_postpone_embedded(!multithread);
> +    test_rcu_postpone_embedded(multithread);
> +
>      test_rcu_quiesce();
>      test_rcu_barrier();
> +    test_rcu_ordering();

test_rcu_ordering() should be tested in both single-threaded and
multi-threaded modes like test_rcu_postpone_embedded() to verify
ordering guarantees hold in both RCU execution paths.

>  }
>
>  OVSTEST_REGISTER("test-rcu", test_rcu);

_______________________________________________
dev mailing list
[email protected]
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to