From: Gaetan Rivet <[email protected]> Add a way to schedule executions with the RCU using memory embedded within the object being scheduled, if applicable.
This way, freeing a high volume of objects does not require many small allocations, reducing memory pressure and heap fragmentation. This fragmentation is seen as the heap grows instead of shrinking when a high volume of objects are freed using RCU. Signed-off-by: Gaetan Rivet <[email protected]> Signed-off-by: Eli Britstein <[email protected]> --- lib/ovs-rcu.c | 125 ++++++++++++++++++++++++++++++++++------ lib/ovs-rcu.h | 39 +++++++++++++ tests/automake.mk | 1 + tests/library.at | 4 ++ tests/test-rcu-inline.c | 116 +++++++++++++++++++++++++++++++++++++ 5 files changed, 268 insertions(+), 17 deletions(-) create mode 100644 tests/test-rcu-inline.c diff --git a/lib/ovs-rcu.c b/lib/ovs-rcu.c index 49afcc55c..539d17a7c 100644 --- a/lib/ovs-rcu.c +++ b/lib/ovs-rcu.c @@ -20,6 +20,7 @@ #include "fatal-signal.h" #include "guarded-list.h" #include "latch.h" +#include "mpsc-queue.h" #include "openvswitch/list.h" #include "ovs-thread.h" #include "openvswitch/poll-loop.h" @@ -49,6 +50,7 @@ struct ovsrcu_perthread { uint64_t seqno; struct ovsrcu_cbset *cbset; + bool do_inline; char name[16]; /* This thread's name. */ }; @@ -61,6 +63,8 @@ static struct ovs_mutex ovsrcu_threads_mutex; static struct guarded_list flushed_cbsets; static struct seq *flushed_cbsets_seq; +static struct seq *inline_seq; + static struct latch postpone_exit; static struct ovs_barrier postpone_barrier; @@ -68,7 +72,8 @@ static void ovsrcu_init_module(void); static void ovsrcu_flush_cbset__(struct ovsrcu_perthread *, bool); static void ovsrcu_flush_cbset(struct ovsrcu_perthread *); static void ovsrcu_unregister__(struct ovsrcu_perthread *); -static bool ovsrcu_call_postponed(void); +static bool ovsrcu_call_inline(uint64_t); +static bool ovsrcu_call_postponed(struct ovs_list *cbsets); static void *ovsrcu_postpone_thread(void *arg OVS_UNUSED); static struct ovsrcu_perthread * @@ -85,6 +90,7 @@ ovsrcu_perthread_get(void) perthread = xmalloc(sizeof *perthread); perthread->seqno = seq_read(global_seqno); perthread->cbset = NULL; + perthread->do_inline = false; ovs_strlcpy(perthread->name, name[0] ? name : "main", sizeof perthread->name); @@ -112,7 +118,11 @@ static void ovsrcu_quiesced(void) { if (single_threaded()) { - ovsrcu_call_postponed(); + struct ovs_list cbsets = OVS_LIST_INITIALIZER(&cbsets); + + ovsrcu_call_inline(seq_read(global_seqno)); + guarded_list_pop_all(&flushed_cbsets, &cbsets); + ovsrcu_call_postponed(&cbsets); } else { static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER; if (ovsthread_once_start(&once)) { @@ -156,6 +166,9 @@ ovsrcu_quiesce(void) if (perthread->cbset) { ovsrcu_flush_cbset(perthread); } + if (perthread->do_inline) { + seq_change(inline_seq); + } seq_change(global_seqno); ovsrcu_quiesced(); @@ -174,6 +187,9 @@ ovsrcu_try_quiesce(void) if (perthread->cbset) { ovsrcu_flush_cbset__(perthread, true); } + if (perthread->do_inline) { + seq_change_protected(inline_seq); + } seq_change_protected(global_seqno); seq_unlock(); ovsrcu_quiesced(); @@ -189,21 +205,28 @@ ovsrcu_is_quiescent(void) return pthread_getspecific(perthread_key) == NULL; } -void -ovsrcu_synchronize(void) +static uint64_t +ovsrcu_synchronize_(struct ovs_list *cbsets) { unsigned int warning_threshold = 1000; uint64_t target_seqno; long long int start; if (single_threaded()) { - return; + return UINT64_MAX; } target_seqno = seq_read(global_seqno); ovsrcu_quiesce_start(); start = time_msec(); + if (cbsets != NULL) { + /* Move the flushed 'cbsets' after 'ovsrcu_quiesce_start', + * as this function has the potential in single-threaded mode + * to itself execute those 'cbsets'. */ + guarded_list_pop_all(&flushed_cbsets, cbsets); + } + for (;;) { uint64_t cur_seqno = seq_read(global_seqno); struct ovsrcu_perthread *perthread; @@ -237,6 +260,15 @@ ovsrcu_synchronize(void) poll_block(); } ovsrcu_quiesce_end(); + + /* Return the 'seqno' that is safe to consider reached by all threads. */ + return target_seqno; +} + +void +ovsrcu_synchronize(void) +{ + ovs_ignore(ovsrcu_synchronize_(NULL)); } /* Waits until as many postponed callbacks as possible have executed. @@ -275,11 +307,57 @@ ovsrcu_exit(void) * infinite loop. This function is just for making memory leaks easier to * spot so there's no point in breaking things on that basis. */ for (int i = 0; i < 8; i++) { - ovsrcu_synchronize(); - if (!ovsrcu_call_postponed()) { + struct ovs_list cbsets = OVS_LIST_INITIALIZER(&cbsets); + uint64_t target = ovsrcu_synchronize_(&cbsets); + bool inline_active; + bool cbsets_active; + + /* Both RCU calls must be examined for activity. */ + inline_active = ovsrcu_call_inline(target); + cbsets_active = ovsrcu_call_postponed(&cbsets); + if (!inline_active && !cbsets_active) { + break; + } + } +} + +static struct mpsc_queue inline_queue = MPSC_QUEUE_INITIALIZER(&inline_queue); + +void +ovsrcu_postpone_inline__(void (*function)(void *aux), void *aux, + struct ovsrcu_inline_node *rcu_node) +{ + struct ovsrcu_perthread *perthread = ovsrcu_perthread_get(); + + rcu_node->seqno = perthread->seqno; + rcu_node->cb = function; + rcu_node->aux = aux; + mpsc_queue_insert(&inline_queue, &rcu_node->node); + + perthread->do_inline = true; +} + +static bool +ovsrcu_call_inline(uint64_t target_seqno) +{ + struct mpsc_queue_node *msg; + unsigned int count = 0; + + mpsc_queue_acquire(&inline_queue); + MPSC_QUEUE_FOR_EACH_POP (msg, &inline_queue) { + struct ovsrcu_inline_node *node; + + node = CONTAINER_OF(msg, struct ovsrcu_inline_node, node); + if (node->seqno >= target_seqno) { + mpsc_queue_push_front(&inline_queue, msg); break; } + node->cb(node->aux); + count++; } + mpsc_queue_release(&inline_queue); + + return count > 0; } /* Registers 'function' to be called, passing 'aux' as argument, after the @@ -327,19 +405,18 @@ ovsrcu_postpone__(void (*function)(void *aux), void *aux) } static bool OVS_NO_SANITIZE_FUNCTION -ovsrcu_call_postponed(void) +ovsrcu_call_postponed(struct ovs_list *cbsets) { struct ovsrcu_cbset *cbset; - struct ovs_list cbsets; - guarded_list_pop_all(&flushed_cbsets, &cbsets); - if (ovs_list_is_empty(&cbsets)) { + if (cbsets == NULL) { + return false; + } + if (ovs_list_is_empty(cbsets)) { return false; } - ovsrcu_synchronize(); - - LIST_FOR_EACH_POP (cbset, list_node, &cbsets) { + LIST_FOR_EACH_POP (cbset, list_node, cbsets) { struct ovsrcu_cb *cb; for (cb = cbset->cbs; cb < &cbset->cbs[cbset->n_cbs]; cb++) { @@ -358,9 +435,19 @@ ovsrcu_postpone_thread(void *arg OVS_UNUSED) pthread_detach(pthread_self()); while (!latch_is_set(&postpone_exit)) { - uint64_t seqno = seq_read(flushed_cbsets_seq); - if (!ovsrcu_call_postponed()) { - seq_wait(flushed_cbsets_seq, seqno); + struct ovs_list cbsets = OVS_LIST_INITIALIZER(&cbsets); + uint64_t cb_seqno = seq_read(flushed_cbsets_seq); + uint64_t target = ovsrcu_synchronize_(&cbsets); + uint64_t inline_seqno = seq_read(inline_seq); + bool inline_active; + bool cbsets_active; + + /* Both RCU calls must be examined for activity. */ + inline_active = ovsrcu_call_inline(target); + cbsets_active = ovsrcu_call_postponed(&cbsets); + if (!inline_active && !cbsets_active) { + seq_wait(flushed_cbsets_seq, cb_seqno); + seq_wait(inline_seq, inline_seqno); latch_wait(&postpone_exit); poll_block(); } @@ -399,6 +486,9 @@ ovsrcu_unregister__(struct ovsrcu_perthread *perthread) if (perthread->cbset) { ovsrcu_flush_cbset(perthread); } + if (perthread->do_inline) { + seq_change(inline_seq); + } ovs_mutex_lock(&ovsrcu_threads_mutex); ovs_list_remove(&perthread->list_node); @@ -440,6 +530,7 @@ ovsrcu_init_module(void) guarded_list_init(&flushed_cbsets); flushed_cbsets_seq = seq_create(); + inline_seq = seq_create(); ovsthread_once_done(&once); } diff --git a/lib/ovs-rcu.h b/lib/ovs-rcu.h index a1c15c126..ed756c1c2 100644 --- a/lib/ovs-rcu.h +++ b/lib/ovs-rcu.h @@ -125,6 +125,22 @@ * ovs_mutex_unlock(&mutex); * } * + * As an alternative to ovsrcu_postpone(), the same deferred execution can be + * achieved using ovsrcu_postpone_inline(): + * + * struct deferrable { + * struct ovsrcu_inline_node rcu_node; + * }; + * + * void + * deferred_free(struct deferrable *d) + * { + * ovsrcu_postpone_inline(free, d, rcu_node); + * } + * + * Using inline fields can be preferred sometimes to avoid the small + * allocations done in ovsrcu_postpone(). + * * In some rare cases an object may not be addressable with a pointer, but only * through an array index (e.g. because it's provided by another library). It * is still possible to have RCU semantics by using the ovsrcu_index type. @@ -171,6 +187,7 @@ */ #include "compiler.h" +#include "mpsc-queue.h" #include "ovs-atomic.h" #if __GNUC__ @@ -256,6 +273,28 @@ void ovsrcu_postpone__(void (*function)(void *aux), void *aux); (void) sizeof(*(ARG)), \ ovsrcu_postpone__((void (*)(void *))(FUNCTION), ARG)) +struct ovsrcu_inline_node { + struct mpsc_queue_node node; + void (*cb)(void *aux); + void *aux; + uint64_t seqno; +}; + +/* Calls FUNCTION passing ARG as its pointer-type argument, which + * contains an 'ovsrcu_inline_node' as a field named MEMBER. The function + * is called following the next grace period. See 'Usage' above for an + * example. + */ +void ovsrcu_postpone_inline__(void (*function)(void *aux), void *aux, + struct ovsrcu_inline_node *node); +#define ovsrcu_postpone_inline(FUNCTION, ARG, MEMBER) \ + (/* Verify that ARG is appropriate for FUNCTION. */ \ + (void) sizeof((FUNCTION)(ARG), 1), \ + /* Verify that ARG is a pointer type. */ \ + (void) sizeof(*(ARG)), \ + ovsrcu_postpone_inline__((void (*)(void *))(FUNCTION), ARG, \ + &(ARG)->MEMBER)) + /* An array index protected by RCU semantics. This is an easier alternative to * an RCU protected pointer to a malloc'd int. */ typedef struct { atomic_int v; } ovsrcu_index; diff --git a/tests/automake.mk b/tests/automake.mk index da569b022..da4d2e0b8 100644 --- a/tests/automake.mk +++ b/tests/automake.mk @@ -500,6 +500,7 @@ tests_ovstest_SOURCES = \ tests/test-random.c \ tests/test-rcu.c \ tests/test-rculist.c \ + tests/test-rcu-inline.c \ tests/test-reconnect.c \ tests/test-rstp.c \ tests/test-sflow.c \ diff --git a/tests/library.at b/tests/library.at index 82ac80a27..16820ff49 100644 --- a/tests/library.at +++ b/tests/library.at @@ -275,6 +275,10 @@ AT_SETUP([rcu]) AT_CHECK([ovstest test-rcu], [0], []) AT_CLEANUP +AT_SETUP([rcu inline]) +AT_CHECK([ovstest test-rcu-inline], [0], []) +AT_CLEANUP + AT_SETUP([stopwatch module]) AT_CHECK([ovstest test-stopwatch], [0], [...... ], [ignore]) diff --git a/tests/test-rcu-inline.c b/tests/test-rcu-inline.c new file mode 100644 index 000000000..af4713af9 --- /dev/null +++ b/tests/test-rcu-inline.c @@ -0,0 +1,116 @@ +/* + * Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <config.h> + +#undef NDEBUG +#include "ovs-atomic.h" +#include "ovs-rcu.h" +#include "ovs-thread.h" +#include "ovstest.h" +#include "seq.h" +#include "timeval.h" +#include "util.h" + +#include "openvswitch/poll-loop.h" + +struct element { + struct ovsrcu_inline_node rcu_node; + struct seq *trigger; + atomic_bool wait; +}; + +static void +do_inline(void *e_) +{ + struct element *e = (struct element *) e_; + + seq_change(e->trigger); +} + +static void * +wait_main(void *aux) +{ + struct element *e = aux; + + for (;;) { + bool wait; + + atomic_read(&e->wait, &wait); + if (!wait) { + break; + } + } + + seq_wait(e->trigger, seq_read(e->trigger)); + poll_block(); + + return NULL; +} + +static void +test_rcu_inline_main(bool multithread) +{ + long long int timeout; + pthread_t waiter; + struct element e; + uint64_t seqno; + + atomic_init(&e.wait, true); + + if (multithread) { + waiter = ovs_thread_create("waiter", wait_main, &e); + } + + e.trigger = seq_create(); + seqno = seq_read(e.trigger); + + ovsrcu_postpone_inline(do_inline, &e, rcu_node); + + /* Check that GC holds out until all threads are quiescent. */ + timeout = time_msec(); + if (multithread) { + timeout += 200; + } + while (time_msec() <= timeout) { + ovs_assert(seq_read(e.trigger) == seqno); + } + + atomic_store(&e.wait, false); + + seq_wait(e.trigger, seqno); + poll_timer_wait_until(time_msec() + 200); + poll_block(); + + /* Verify that GC executed. */ + ovs_assert(seq_read(e.trigger) != seqno); + seq_destroy(e.trigger); + + if (multithread) { + xpthread_join(waiter, NULL); + } +} + +static void +test_rcu_inline(int argc OVS_UNUSED, char *argv[] OVS_UNUSED) +{ + const bool multithread = true; + + test_rcu_inline_main(!multithread); + test_rcu_inline_main(multithread); +} + +OVSTEST_REGISTER("test-rcu-inline", test_rcu_inline); -- 2.34.1 _______________________________________________ dev mailing list [email protected] https://mail.openvswitch.org/mailman/listinfo/ovs-dev
