I added Rabin in the tags as Cc'd but as I send out single patches
manually, I forgot to add him to the real Cc.

-- Steve


On Wed, 11 Feb 2015 07:44:59 -0500
Steven Rostedt <[email protected]> wrote:

>   git://git.kernel.org/pub/scm/linux/kernel/git/rostedt/linux-trace.git
> for-next
> 
> Head SHA1: 1e0d6714aceb770b04161fbedd7765d0e1fc27bd
> 
> 
> Steven Rostedt (Red Hat) (1):
>       ring-buffer: Do not wake up a splice waiter when page is not full
> 
> ----
>  kernel/trace/ring_buffer.c | 40 +++++++++++++++++++++++++++++++++++-----
>  1 file changed, 35 insertions(+), 5 deletions(-)
> ---------------------------
> commit 1e0d6714aceb770b04161fbedd7765d0e1fc27bd
> Author: Steven Rostedt (Red Hat) <[email protected]>
> Date:   Tue Feb 10 22:14:53 2015 -0500
> 
>     ring-buffer: Do not wake up a splice waiter when page is not full
>     
>     When an application connects to the ring buffer via splice, it can only
>     read full pages. Splice does not work with partial pages. If there is
>     not enough data to fill a page, the splice command will either block
>     or return -EAGAIN (if set to nonblock).
>     
>     Code was added where if the page is not full, to just sleep again.
>     The problem is, it will get woken up again on the next event. That
>     is, when something is written into the ring buffer, if there is a waiter
>     it will wake it up. The waiter would then check the buffer, see that
>     it still does not have enough data to fill a page and go back to sleep.
>     To make matters worse, when the waiter goes back to sleep, it could
>     cause another event, which would wake it back up again to see it
>     doesn't have enough data and sleep again. This produces a tremendous
>     overhead and fills the ring buffer with noise.
>     
>     For example, recording sched_switch on an idle system for 10 seconds
>     produces 25,350,475 events!!!
>     
>     Create another wait queue for those waiters wanting full pages.
>     When an event is written, it only wakes up waiters if there's a full
>     page of data. It does not wake up the waiter if the page is not yet
>     full.
>     
>     After this change, recording sched_switch on an idle system for 10
>     seconds produces only 800 events. Getting rid of 25,349,675 useless
>     events (99.9969% of events!!), is something to take seriously.
>     
>     Cc: [email protected] # 3.16+
>     Cc: Rabin Vincent <[email protected]>
>     Fixes: e30f53aad220 "tracing: Do not busy wait in buffer splice"
>     Signed-off-by: Steven Rostedt <[email protected]>
> 
> diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c
> index 96079180de3d..5040d44fe5a3 100644
> --- a/kernel/trace/ring_buffer.c
> +++ b/kernel/trace/ring_buffer.c
> @@ -445,7 +445,10 @@ int ring_buffer_print_page_header(struct trace_seq *s)
>  struct rb_irq_work {
>       struct irq_work                 work;
>       wait_queue_head_t               waiters;
> +     wait_queue_head_t               full_waiters;
>       bool                            waiters_pending;
> +     bool                            full_waiters_pending;
> +     bool                            wakeup_full;
>  };
>  
>  /*
> @@ -527,6 +530,10 @@ static void rb_wake_up_waiters(struct irq_work *work)
>       struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, 
> work);
>  
>       wake_up_all(&rbwork->waiters);
> +     if (rbwork->wakeup_full) {
> +             rbwork->wakeup_full = false;
> +             wake_up_all(&rbwork->full_waiters);
> +     }
>  }
>  
>  /**
> @@ -551,9 +558,11 @@ int ring_buffer_wait(struct ring_buffer *buffer, int 
> cpu, bool full)
>        * data in any cpu buffer, or a specific buffer, put the
>        * caller on the appropriate wait queue.
>        */
> -     if (cpu == RING_BUFFER_ALL_CPUS)
> +     if (cpu == RING_BUFFER_ALL_CPUS) {
>               work = &buffer->irq_work;
> -     else {
> +             /* Full only makes sense on per cpu reads */
> +             full = false;
> +     } else {
>               if (!cpumask_test_cpu(cpu, buffer->cpumask))
>                       return -ENODEV;
>               cpu_buffer = buffer->buffers[cpu];
> @@ -562,7 +571,10 @@ int ring_buffer_wait(struct ring_buffer *buffer, int 
> cpu, bool full)
>  
>  
>       while (true) {
> -             prepare_to_wait(&work->waiters, &wait, TASK_INTERRUPTIBLE);
> +             if (full)
> +                     prepare_to_wait(&work->full_waiters, &wait, 
> TASK_INTERRUPTIBLE);
> +             else
> +                     prepare_to_wait(&work->waiters, &wait, 
> TASK_INTERRUPTIBLE);
>  
>               /*
>                * The events can happen in critical sections where
> @@ -584,7 +596,10 @@ int ring_buffer_wait(struct ring_buffer *buffer, int 
> cpu, bool full)
>                * that is necessary is that the wake up happens after
>                * a task has been queued. It's OK for spurious wake ups.
>                */
> -             work->waiters_pending = true;
> +             if (full)
> +                     work->full_waiters_pending = true;
> +             else
> +                     work->waiters_pending = true;
>  
>               if (signal_pending(current)) {
>                       ret = -EINTR;
> @@ -613,7 +628,10 @@ int ring_buffer_wait(struct ring_buffer *buffer, int 
> cpu, bool full)
>               schedule();
>       }
>  
> -     finish_wait(&work->waiters, &wait);
> +     if (full)
> +             finish_wait(&work->full_waiters, &wait);
> +     else
> +             finish_wait(&work->waiters, &wait);
>  
>       return ret;
>  }
> @@ -1228,6 +1246,7 @@ rb_allocate_cpu_buffer(struct ring_buffer *buffer, int 
> nr_pages, int cpu)
>       init_completion(&cpu_buffer->update_done);
>       init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters);
>       init_waitqueue_head(&cpu_buffer->irq_work.waiters);
> +     init_waitqueue_head(&cpu_buffer->irq_work.full_waiters);
>  
>       bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
>                           GFP_KERNEL, cpu_to_node(cpu));
> @@ -2799,6 +2818,8 @@ static void rb_commit(struct ring_buffer_per_cpu 
> *cpu_buffer,
>  static __always_inline void
>  rb_wakeups(struct ring_buffer *buffer, struct ring_buffer_per_cpu 
> *cpu_buffer)
>  {
> +     bool pagebusy;
> +
>       if (buffer->irq_work.waiters_pending) {
>               buffer->irq_work.waiters_pending = false;
>               /* irq_work_queue() supplies it's own memory barriers */
> @@ -2810,6 +2831,15 @@ rb_wakeups(struct ring_buffer *buffer, struct 
> ring_buffer_per_cpu *cpu_buffer)
>               /* irq_work_queue() supplies it's own memory barriers */
>               irq_work_queue(&cpu_buffer->irq_work.work);
>       }
> +
> +     pagebusy = cpu_buffer->reader_page == cpu_buffer->commit_page;
> +
> +     if (!pagebusy && cpu_buffer->irq_work.full_waiters_pending) {
> +             cpu_buffer->irq_work.wakeup_full = true;
> +             cpu_buffer->irq_work.full_waiters_pending = false;
> +             /* irq_work_queue() supplies it's own memory barriers */
> +             irq_work_queue(&cpu_buffer->irq_work.work);
> +     }
>  }
>  
>  /**

--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to [email protected]
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Please read the FAQ at  http://www.tux.org/lkml/

Reply via email to