On Thu, Aug 30, 2012 at 12:03:01PM -0700, Paul E. McKenney wrote:
> From: "Paul E. McKenney" <[email protected]>
> 
> Currently, _rcu_barrier() relies on preempt_disable() to prevent
> any CPU from going offline, which in turn depends on CPU hotplug's
> use of __stop_machine().
> 
> This patch therefore makes _rcu_barrier() use get_online_cpus() to
> block CPU-hotplug operations.  This has the added benefit of removing
> the need for _rcu_barrier() to adopt callbacks:  Because CPU-hotplug
> operations are excluded, there can be no callbacks to adopt.  This
> commit simplifies the code accordingly.
> 
> Signed-off-by: Paul E. McKenney <[email protected]>
> Signed-off-by: Paul E. McKenney <[email protected]>

Impressive simplification!

Reviewed-by: Josh Triplett <[email protected]>

> ---
>  kernel/rcutree.c       |   83 ++++++-----------------------------------------
>  kernel/rcutree.h       |    3 --
>  kernel/rcutree_trace.c |    4 +-
>  3 files changed, 13 insertions(+), 77 deletions(-)
> 
> diff --git a/kernel/rcutree.c b/kernel/rcutree.c
> index f280e54..9854a00 100644
> --- a/kernel/rcutree.c
> +++ b/kernel/rcutree.c
> @@ -1390,17 +1390,6 @@ static void rcu_adopt_orphan_cbs(struct rcu_state *rsp)
>       int i;
>       struct rcu_data *rdp = __this_cpu_ptr(rsp->rda);
>  
> -     /*
> -      * If there is an rcu_barrier() operation in progress, then
> -      * only the task doing that operation is permitted to adopt
> -      * callbacks.  To do otherwise breaks rcu_barrier() and friends
> -      * by causing them to fail to wait for the callbacks in the
> -      * orphanage.
> -      */
> -     if (rsp->rcu_barrier_in_progress &&
> -         rsp->rcu_barrier_in_progress != current)
> -             return;
> -
>       /* Do the accounting first. */
>       rdp->qlen_lazy += rsp->qlen_lazy;
>       rdp->qlen += rsp->qlen;
> @@ -1455,9 +1444,8 @@ static void rcu_cleanup_dying_cpu(struct rcu_state *rsp)
>   * The CPU has been completely removed, and some other CPU is reporting
>   * this fact from process context.  Do the remainder of the cleanup,
>   * including orphaning the outgoing CPU's RCU callbacks, and also
> - * adopting them, if there is no _rcu_barrier() instance running.
> - * There can only be one CPU hotplug operation at a time, so no other
> - * CPU can be attempting to update rcu_cpu_kthread_task.
> + * adopting them.  There can only be one CPU hotplug operation at a time,
> + * so no other CPU can be attempting to update rcu_cpu_kthread_task.
>   */
>  static void rcu_cleanup_dead_cpu(int cpu, struct rcu_state *rsp)
>  {
> @@ -1519,10 +1507,6 @@ static void rcu_cleanup_dead_cpu(int cpu, struct 
> rcu_state *rsp)
>  
>  #else /* #ifdef CONFIG_HOTPLUG_CPU */
>  
> -static void rcu_adopt_orphan_cbs(struct rcu_state *rsp)
> -{
> -}
> -
>  static void rcu_cleanup_dying_cpu(struct rcu_state *rsp)
>  {
>  }
> @@ -2326,13 +2310,10 @@ static void rcu_barrier_func(void *type)
>  static void _rcu_barrier(struct rcu_state *rsp)
>  {
>       int cpu;
> -     unsigned long flags;
>       struct rcu_data *rdp;
> -     struct rcu_data rd;
>       unsigned long snap = ACCESS_ONCE(rsp->n_barrier_done);
>       unsigned long snap_done;
>  
> -     init_rcu_head_on_stack(&rd.barrier_head);
>       _rcu_barrier_trace(rsp, "Begin", -1, snap);
>  
>       /* Take mutex to serialize concurrent rcu_barrier() requests. */
> @@ -2372,70 +2353,30 @@ static void _rcu_barrier(struct rcu_state *rsp)
>       /*
>        * Initialize the count to one rather than to zero in order to
>        * avoid a too-soon return to zero in case of a short grace period
> -      * (or preemption of this task).  Also flag this task as doing
> -      * an rcu_barrier().  This will prevent anyone else from adopting
> -      * orphaned callbacks, which could cause otherwise failure if a
> -      * CPU went offline and quickly came back online.  To see this,
> -      * consider the following sequence of events:
> -      *
> -      * 1.   We cause CPU 0 to post an rcu_barrier_callback() callback.
> -      * 2.   CPU 1 goes offline, orphaning its callbacks.
> -      * 3.   CPU 0 adopts CPU 1's orphaned callbacks.
> -      * 4.   CPU 1 comes back online.
> -      * 5.   We cause CPU 1 to post an rcu_barrier_callback() callback.
> -      * 6.   Both rcu_barrier_callback() callbacks are invoked, awakening
> -      *      us -- but before CPU 1's orphaned callbacks are invoked!!!
> +      * (or preemption of this task).  Exclude CPU-hotplug operations
> +      * to ensure that no offline CPU has callbacks queued.
>        */
>       init_completion(&rsp->barrier_completion);
>       atomic_set(&rsp->barrier_cpu_count, 1);
> -     raw_spin_lock_irqsave(&rsp->onofflock, flags);
> -     rsp->rcu_barrier_in_progress = current;
> -     raw_spin_unlock_irqrestore(&rsp->onofflock, flags);
> +     get_online_cpus();
>  
>       /*
> -      * Force every CPU with callbacks to register a new callback
> -      * that will tell us when all the preceding callbacks have
> -      * been invoked.  If an offline CPU has callbacks, wait for
> -      * it to either come back online or to finish orphaning those
> -      * callbacks.
> +      * Force each CPU with callbacks to register a new callback.
> +      * When that callback is invoked, we will know that all of the
> +      * corresponding CPU's preceding callbacks have been invoked.
>        */
> -     for_each_possible_cpu(cpu) {
> -             preempt_disable();
> +     for_each_online_cpu(cpu) {
>               rdp = per_cpu_ptr(rsp->rda, cpu);
> -             if (cpu_is_offline(cpu)) {
> -                     _rcu_barrier_trace(rsp, "Offline", cpu,
> -                                        rsp->n_barrier_done);
> -                     preempt_enable();
> -                     while (cpu_is_offline(cpu) && ACCESS_ONCE(rdp->qlen))
> -                             schedule_timeout_interruptible(1);
> -             } else if (ACCESS_ONCE(rdp->qlen)) {
> +             if (ACCESS_ONCE(rdp->qlen)) {
>                       _rcu_barrier_trace(rsp, "OnlineQ", cpu,
>                                          rsp->n_barrier_done);
>                       smp_call_function_single(cpu, rcu_barrier_func, rsp, 1);
> -                     preempt_enable();
>               } else {
>                       _rcu_barrier_trace(rsp, "OnlineNQ", cpu,
>                                          rsp->n_barrier_done);
> -                     preempt_enable();
>               }
>       }
> -
> -     /*
> -      * Now that all online CPUs have rcu_barrier_callback() callbacks
> -      * posted, we can adopt all of the orphaned callbacks and place
> -      * an rcu_barrier_callback() callback after them.  When that is done,
> -      * we are guaranteed to have an rcu_barrier_callback() callback
> -      * following every callback that could possibly have been
> -      * registered before _rcu_barrier() was called.
> -      */
> -     raw_spin_lock_irqsave(&rsp->onofflock, flags);
> -     rcu_adopt_orphan_cbs(rsp);
> -     rsp->rcu_barrier_in_progress = NULL;
> -     raw_spin_unlock_irqrestore(&rsp->onofflock, flags);
> -     atomic_inc(&rsp->barrier_cpu_count);
> -     smp_mb__after_atomic_inc(); /* Ensure atomic_inc() before callback. */
> -     rd.rsp = rsp;
> -     rsp->call(&rd.barrier_head, rcu_barrier_callback);
> +     put_online_cpus();
>  
>       /*
>        * Now that we have an rcu_barrier_callback() callback on each
> @@ -2456,8 +2397,6 @@ static void _rcu_barrier(struct rcu_state *rsp)
>  
>       /* Other rcu_barrier() invocations can now safely proceed. */
>       mutex_unlock(&rsp->barrier_mutex);
> -
> -     destroy_rcu_head_on_stack(&rd.barrier_head);
>  }
>  
>  /**
> diff --git a/kernel/rcutree.h b/kernel/rcutree.h
> index 4d29169..94dfdf1 100644
> --- a/kernel/rcutree.h
> +++ b/kernel/rcutree.h
> @@ -398,9 +398,6 @@ struct rcu_state {
>       struct rcu_head **orphan_donetail;      /* Tail of above. */
>       long qlen_lazy;                         /* Number of lazy callbacks. */
>       long qlen;                              /* Total number of callbacks. */
> -     struct task_struct *rcu_barrier_in_progress;
> -                                             /* Task doing rcu_barrier(), */
> -                                             /*  or NULL if no barrier. */
>       struct mutex barrier_mutex;             /* Guards barrier fields. */
>       atomic_t barrier_cpu_count;             /* # CPUs waiting on. */
>       struct completion barrier_completion;   /* Wake at barrier end. */
> diff --git a/kernel/rcutree_trace.c b/kernel/rcutree_trace.c
> index abffb48..6a2e52a 100644
> --- a/kernel/rcutree_trace.c
> +++ b/kernel/rcutree_trace.c
> @@ -51,8 +51,8 @@ static int show_rcubarrier(struct seq_file *m, void *unused)
>       struct rcu_state *rsp;
>  
>       for_each_rcu_flavor(rsp)
> -             seq_printf(m, "%s: %c bcc: %d nbd: %lu\n",
> -                        rsp->name, rsp->rcu_barrier_in_progress ? 'B' : '.',
> +             seq_printf(m, "%s: bcc: %d nbd: %lu\n",
> +                        rsp->name,
>                          atomic_read(&rsp->barrier_cpu_count),
>                          rsp->n_barrier_done);
>       return 0;
> -- 
> 1.7.8
> 
--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to [email protected]
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Please read the FAQ at  http://www.tux.org/lkml/

Reply via email to