Akihiko Odaki <[email protected]> writes:
> Previously, synchronize_rcu() was a single-threaded implementation that
> is protected with a mutex. It was used only in the RCU thread and tests,
> and real users instead use call_rcu(), which relies on the RCU thread.
>
> The usage of synchronize_rcu() in tests did not accurately represent
> real use cases because it caused locking with the mutex, which never
> happened in real use cases, and it did not exercise the logic in the
> RCU thread.
>
> Add a new implementation of synchronize_rcu() which uses call_rcu() to
> represent real use cases in tests. The old synchronize_rcu() is now
> renamed to enter_qs() and only used in the RCU thread, making the mutex
> unnecessary.
>
> Signed-off-by: Akihiko Odaki <[email protected]>
> Tested-by: Dmitry Osipenko <[email protected]>
Can we have an a-b or r-b from the RCU maintainers for these changes?
> ---
> util/rcu.c | 51 +++++++++++++++++++++++++++------------------------
> 1 file changed, 27 insertions(+), 24 deletions(-)
>
> diff --git a/util/rcu.c b/util/rcu.c
> index acac9446ea98..3c4af9d213c8 100644
> --- a/util/rcu.c
> +++ b/util/rcu.c
> @@ -38,7 +38,7 @@
>
> /*
> * Global grace period counter. Bit 0 is always one in rcu_gp_ctr.
> - * Bits 1 and above are defined in synchronize_rcu.
> + * Bits 1 and above are defined in enter_qs().
> */
> #define RCU_GP_LOCKED (1UL << 0)
> #define RCU_GP_CTR (1UL << 1)
> @@ -52,7 +52,6 @@ QemuEvent rcu_gp_event;
> static int in_drain_call_rcu;
> static int rcu_call_count;
> static QemuMutex rcu_registry_lock;
> -static QemuMutex rcu_sync_lock;
>
> /*
> * Check whether a quiescent state was crossed between the beginning of
> @@ -111,7 +110,7 @@ static void wait_for_readers(void)
> *
> * If this is the last iteration, this barrier also prevents
> * frees from seeping upwards, and orders the two wait phases
> - * on architectures with 32-bit longs; see synchronize_rcu().
> + * on architectures with 32-bit longs; see enter_qs().
> */
> smp_mb_global();
>
> @@ -137,9 +136,9 @@ static void wait_for_readers(void)
> * wait too much time.
> *
> * rcu_register_thread() may add nodes to ®istry; it will not
> - * wake up synchronize_rcu, but that is okay because at least another
> + * wake up enter_qs(), but that is okay because at least another
> * thread must exit its RCU read-side critical section before
> - * synchronize_rcu is done. The next iteration of the loop will
> + * enter_qs() is done. The next iteration of the loop will
> * move the new thread's rcu_reader from ®istry to &qsreaders,
> * because rcu_gp_ongoing() will return false.
> *
> @@ -171,10 +170,8 @@ static void wait_for_readers(void)
> QLIST_SWAP(®istry, &qsreaders, node);
> }
>
> -void synchronize_rcu(void)
> +static void enter_qs(void)
> {
> - QEMU_LOCK_GUARD(&rcu_sync_lock);
> -
> /* Write RCU-protected pointers before reading p_rcu_reader->ctr.
> * Pairs with smp_mb_placeholder() in rcu_read_lock().
> *
> @@ -289,7 +286,7 @@ static void *call_rcu_thread(void *opaque)
>
> /*
> * Fetch rcu_call_count now, we only must process elements that were
> - * added before synchronize_rcu() starts.
> + * added before enter_qs() starts.
> */
> for (;;) {
> qemu_event_reset(&rcu_call_ready_event);
> @@ -304,7 +301,7 @@ static void *call_rcu_thread(void *opaque)
> qemu_event_wait(&rcu_call_ready_event);
> }
>
> - synchronize_rcu();
> + enter_qs();
> qatomic_sub(&rcu_call_count, n);
> bql_lock();
> while (n > 0) {
> @@ -337,15 +334,24 @@ void call_rcu1(struct rcu_head *node, void
> (*func)(struct rcu_head *node))
> }
>
>
> -struct rcu_drain {
> +typedef struct Sync {
> struct rcu_head rcu;
> - QemuEvent drain_complete_event;
> -};
> + QemuEvent complete_event;
> +} Sync;
>
> -static void drain_rcu_callback(struct rcu_head *node)
> +static void sync_rcu_callback(Sync *sync)
> {
> - struct rcu_drain *event = (struct rcu_drain *)node;
> - qemu_event_set(&event->drain_complete_event);
> + qemu_event_set(&sync->complete_event);
> +}
> +
> +void synchronize_rcu(void)
> +{
> + Sync sync;
> +
> + qemu_event_init(&sync.complete_event, false);
> + call_rcu(&sync, sync_rcu_callback, rcu);
> + qemu_event_wait(&sync.complete_event);
> + qemu_event_destroy(&sync.complete_event);
> }
>
> /*
> @@ -359,11 +365,11 @@ static void drain_rcu_callback(struct rcu_head *node)
>
> void drain_call_rcu(void)
> {
> - struct rcu_drain rcu_drain;
> + Sync sync;
> bool locked = bql_locked();
>
> - memset(&rcu_drain, 0, sizeof(struct rcu_drain));
> - qemu_event_init(&rcu_drain.drain_complete_event, false);
> + memset(&sync, 0, sizeof(sync));
> + qemu_event_init(&sync.complete_event, false);
>
> if (locked) {
> bql_unlock();
> @@ -383,8 +389,8 @@ void drain_call_rcu(void)
> */
>
> qatomic_inc(&in_drain_call_rcu);
> - call_rcu1(&rcu_drain.rcu, drain_rcu_callback);
> - qemu_event_wait(&rcu_drain.drain_complete_event);
> + call_rcu(&sync, sync_rcu_callback, rcu);
> + qemu_event_wait(&sync.complete_event);
> qatomic_dec(&in_drain_call_rcu);
>
> if (locked) {
> @@ -427,7 +433,6 @@ static void rcu_init_complete(void)
> QemuThread thread;
>
> qemu_mutex_init(&rcu_registry_lock);
> - qemu_mutex_init(&rcu_sync_lock);
> qemu_event_init(&rcu_gp_event, true);
>
> qemu_event_init(&rcu_call_ready_event, false);
> @@ -460,7 +465,6 @@ static void rcu_init_lock(void)
> return;
> }
>
> - qemu_mutex_lock(&rcu_sync_lock);
> qemu_mutex_lock(&rcu_registry_lock);
> }
>
> @@ -471,7 +475,6 @@ static void rcu_init_unlock(void)
> }
>
> qemu_mutex_unlock(&rcu_registry_lock);
> - qemu_mutex_unlock(&rcu_sync_lock);
> }
>
> static void rcu_init_child(void)
--
Alex Bennée
Virtualisation Tech Lead @ Linaro