From: "Steven Rostedt (Google)" <rost...@goodmis.org>

If a reader of the ring buffer is doing a poll, and waiting for the ring
buffer to hit a specific watermark, there could be a case where it gets
into an infinite ping-pong loop.

The poll code has:

  rbwork->full_waiters_pending = true;
  if (!cpu_buffer->shortest_full ||
      cpu_buffer->shortest_full > full)
         cpu_buffer->shortest_full = full;

The writer will see full_waiters_pending and check if the ring buffer is
filled over the percentage of the shortest_full value. If it is, it calls
an irq_work to wake up all the waiters.

But the code could get into a circular loop:

        CPU 0                                   CPU 1
        -----                                   -----
 [ Poll ]
   [ shortest_full = 0 ]
   rbwork->full_waiters_pending = true;
                                          if (rbwork->full_waiters_pending &&
                                              [ buffer percent ] > 
shortest_full) {
                                                 rbwork->wakeup_full = true;
                                                 [ queue_irqwork ]

   cpu_buffer->shortest_full = full;

                                          [ IRQ work ]
                                          if (rbwork->wakeup_full) {
                                                cpu_buffer->shortest_full = 0;
                                                wakeup poll waiters;
  [woken]
   if ([ buffer percent ] > full)
      break;
   rbwork->full_waiters_pending = true;
                                          if (rbwork->full_waiters_pending &&
                                              [ buffer percent ] > 
shortest_full) {
                                                 rbwork->wakeup_full = true;
                                                 [ queue_irqwork ]

   cpu_buffer->shortest_full = full;

                                          [ IRQ work ]
                                          if (rbwork->wakeup_full) {
                                                cpu_buffer->shortest_full = 0;
                                                wakeup poll waiters;
  [woken]

 [ Wash, rinse, repeat! ]

In the poll, the shortest_full needs to be set before the
full_pending_waiters, as once that is set, the writer will compare the
current shortest_full (which is incorrect) to decide to call the irq_work,
which will reset the shortest_full (expecting the readers to update it).

Also move the setting of full_waiters_pending after the check if the ring
buffer has the required percentage filled. There's no reason to tell the
writer to wake up waiters if there are no waiters.

Cc: sta...@vger.kernel.org
Fixes: 42fb0a1e84ff5 ("tracing/ring-buffer: Have polling block on watermark")
Signed-off-by: Steven Rostedt (Google) <rost...@goodmis.org>
---
 kernel/trace/ring_buffer.c | 27 ++++++++++++++++++++-------
 1 file changed, 20 insertions(+), 7 deletions(-)

diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c
index aa332ace108b..adfe603a769b 100644
--- a/kernel/trace/ring_buffer.c
+++ b/kernel/trace/ring_buffer.c
@@ -964,16 +964,32 @@ __poll_t ring_buffer_poll_wait(struct trace_buffer 
*buffer, int cpu,
                poll_wait(filp, &rbwork->full_waiters, poll_table);
 
                raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
-               rbwork->full_waiters_pending = true;
                if (!cpu_buffer->shortest_full ||
                    cpu_buffer->shortest_full > full)
                        cpu_buffer->shortest_full = full;
                raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
-       } else {
-               poll_wait(filp, &rbwork->waiters, poll_table);
-               rbwork->waiters_pending = true;
+               if (full_hit(buffer, cpu, full))
+                       return EPOLLIN | EPOLLRDNORM;
+               /*
+                * Only allow full_waiters_pending update to be seen after
+                * the shortest_full is set. If the writer sees the
+                * full_waiters_pending flag set, it will compare the
+                * amount in the ring buffer to shortest_full. If the amount
+                * in the ring buffer is greater than the shortest_full
+                * percent, it will call the irq_work handler to wake up
+                * this list. The irq_handler will reset shortest_full
+                * back to zero. That's done under the reader_lock, but
+                * the below smp_mb() makes sure that the update to
+                * full_waiters_pending doesn't leak up into the above.
+                */
+               smp_mb();
+               rbwork->full_waiters_pending = true;
+               return 0;
        }
 
+       poll_wait(filp, &rbwork->waiters, poll_table);
+       rbwork->waiters_pending = true;
+
        /*
         * There's a tight race between setting the waiters_pending and
         * checking if the ring buffer is empty.  Once the waiters_pending bit
@@ -989,9 +1005,6 @@ __poll_t ring_buffer_poll_wait(struct trace_buffer 
*buffer, int cpu,
         */
        smp_mb();
 
-       if (full)
-               return full_hit(buffer, cpu, full) ? EPOLLIN | EPOLLRDNORM : 0;
-
        if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) ||
            (cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu(buffer, 
cpu)))
                return EPOLLIN | EPOLLRDNORM;
-- 
2.43.0



Reply via email to