From: Akihiko Odaki <[email protected]> Borrow the concept of force quiescent state from Linux to ensure readers remain fast during normal operation and to avoid stalls.
Background ========== The previous implementation had four steps to begin reclamation. 1. call_rcu_thread() would wait for the first callback. 2. call_rcu_thread() would periodically poll until a decent number of callbacks piled up or it timed out. 3. synchronize_rcu() would statr a grace period (GP). 4. wait_for_readers() would wait for the GP to end. It would also trigger the force_rcu notifier to break busy loops in a read-side critical section if drain_call_rcu() had been called. Problem ======= The separation of waiting logic across these steps led to suboptimal behavior: The GP was delayed until call_rcu_thread() stops polling. force_rcu was not consistently triggered when call_rcu_thread() detected a high number of pending callbacks or a timeout. This inconsistency sometimes led to stalls, as reported in a virtio-gpu issue where memory unmapping was blocked[1]. wait_for_readers() imposed unnecessary overhead in non-urgent cases by unconditionally executing qatomic_set(&index->waiting, true) and qemu_event_reset(&rcu_gp_event), which are necessary only for expedited synchronization. Solution ======== Move the polling in call_rcu_thread() to wait_for_readers() to prevent the delay of the GP. Additionally, reorganize wait_for_readers() to distinguish between two states: Normal State: it relies exclusively on periodic polling to detect the end of the GP and maintains the read-side fast path. Force Quiescent State: Whenever expediting synchronization, it always triggers force_rcu and executes both qatomic_set(&index->waiting, true) and qemu_event_reset(&rcu_gp_event). This avoids stalls while confining the read-side overhead to this state. This unified approach, inspired by the Linux RCU, ensures consistent and efficient RCU grace period handling and confirms resolution of the virtio-gpu issue. [1] https://lore.kernel.org/qemu-devel/[email protected]/ Signed-off-by: Akihiko Odaki <[email protected]> Link: https://lore.kernel.org/r/[email protected] Tested-by: Dmitry Osipenko <[email protected]> Signed-off-by: Paolo Bonzini <[email protected]> --- util/rcu.c | 81 +++++++++++++++++++++++++++++++++++------------------- 1 file changed, 52 insertions(+), 29 deletions(-) diff --git a/util/rcu.c b/util/rcu.c index b703c86f15a..acac9446ea9 100644 --- a/util/rcu.c +++ b/util/rcu.c @@ -43,10 +43,14 @@ #define RCU_GP_LOCKED (1UL << 0) #define RCU_GP_CTR (1UL << 1) + +#define RCU_CALL_MIN_SIZE 30 + unsigned long rcu_gp_ctr = RCU_GP_LOCKED; QemuEvent rcu_gp_event; static int in_drain_call_rcu; +static int rcu_call_count; static QemuMutex rcu_registry_lock; static QemuMutex rcu_sync_lock; @@ -76,15 +80,29 @@ static void wait_for_readers(void) { ThreadList qsreaders = QLIST_HEAD_INITIALIZER(qsreaders); struct rcu_reader_data *index, *tmp; + int sleeps = 0; + bool forced = false; for (;;) { - /* We want to be notified of changes made to rcu_gp_ongoing - * while we walk the list. + /* + * Force the grace period to end and wait for it if any of the + * following heuristical conditions are satisfied: + * - A decent number of callbacks piled up. + * - It timed out. + * - It is in a drain_call_rcu() call. + * + * Otherwise, periodically poll the grace period, hoping it ends + * promptly. */ - qemu_event_reset(&rcu_gp_event); + if (!forced && + (qatomic_read(&rcu_call_count) >= RCU_CALL_MIN_SIZE || + sleeps >= 5 || qatomic_read(&in_drain_call_rcu))) { + forced = true; - QLIST_FOREACH(index, ®istry, node) { - qatomic_set(&index->waiting, true); + QLIST_FOREACH(index, ®istry, node) { + notifier_list_notify(&index->force_rcu, NULL); + qatomic_set(&index->waiting, true); + } } /* Here, order the stores to index->waiting before the loads of @@ -106,8 +124,6 @@ static void wait_for_readers(void) * get some extra futex wakeups. */ qatomic_set(&index->waiting, false); - } else if (qatomic_read(&in_drain_call_rcu)) { - notifier_list_notify(&index->force_rcu, NULL); } } @@ -115,7 +131,8 @@ static void wait_for_readers(void) break; } - /* Wait for one thread to report a quiescent state and try again. + /* + * Sleep for a while and try again. * Release rcu_registry_lock, so rcu_(un)register_thread() doesn't * wait too much time. * @@ -133,7 +150,20 @@ static void wait_for_readers(void) * rcu_registry_lock is released. */ qemu_mutex_unlock(&rcu_registry_lock); - qemu_event_wait(&rcu_gp_event); + + if (forced) { + qemu_event_wait(&rcu_gp_event); + + /* + * We want to be notified of changes made to rcu_gp_ongoing + * while we walk the list. + */ + qemu_event_reset(&rcu_gp_event); + } else { + g_usleep(10000); + sleeps++; + } + qemu_mutex_lock(&rcu_registry_lock); } @@ -173,15 +203,11 @@ void synchronize_rcu(void) } } - -#define RCU_CALL_MIN_SIZE 30 - /* Multi-producer, single-consumer queue based on urcu/static/wfqueue.h * from liburcu. Note that head is only used by the consumer. */ static struct rcu_head dummy; static struct rcu_head *head = &dummy, **tail = &dummy.next; -static int rcu_call_count; static QemuEvent rcu_call_ready_event; static void enqueue(struct rcu_head *node) @@ -259,30 +285,27 @@ static void *call_rcu_thread(void *opaque) rcu_register_thread(); for (;;) { - int tries = 0; - int n = qatomic_read(&rcu_call_count); + int n; - /* Heuristically wait for a decent number of callbacks to pile up. + /* * Fetch rcu_call_count now, we only must process elements that were * added before synchronize_rcu() starts. */ - while (n == 0 || (n < RCU_CALL_MIN_SIZE && ++tries <= 5)) { - g_usleep(10000); - if (n == 0) { - qemu_event_reset(&rcu_call_ready_event); - n = qatomic_read(&rcu_call_count); - if (n == 0) { -#if defined(CONFIG_MALLOC_TRIM) - malloc_trim(4 * 1024 * 1024); -#endif - qemu_event_wait(&rcu_call_ready_event); - } - } + for (;;) { + qemu_event_reset(&rcu_call_ready_event); n = qatomic_read(&rcu_call_count); + if (n) { + break; + } + +#if defined(CONFIG_MALLOC_TRIM) + malloc_trim(4 * 1024 * 1024); +#endif + qemu_event_wait(&rcu_call_ready_event); } - qatomic_sub(&rcu_call_count, n); synchronize_rcu(); + qatomic_sub(&rcu_call_count, n); bql_lock(); while (n > 0) { node = try_dequeue(); -- 2.51.1
