Hopefully, the remote will only swap pages on the kernel instruction (via
the swap_reader_page() callback). This means we know at what point the
ring-buffer geometry has changed. It is therefore possible to rearrange
the kernel view of that ring-buffer to allow non-consuming read.

Signed-off-by: Vincent Donnefort <[email protected]>

diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c
index 9c6c8e206e04..cb674a9fc359 100644
--- a/kernel/trace/ring_buffer.c
+++ b/kernel/trace/ring_buffer.c
@@ -5347,10 +5347,51 @@ static bool rb_read_remote_meta_page(struct 
ring_buffer_per_cpu *cpu_buffer)
        return rb_num_of_entries(cpu_buffer);
 }
 
+static void rb_update_remote_head(struct ring_buffer_per_cpu *cpu_buffer)
+{
+       struct buffer_page *next, *orig;
+       int retry = 3;
+
+       orig = next = cpu_buffer->head_page;
+       rb_inc_page(&next);
+
+       /* Run after the writer */
+       while (cpu_buffer->head_page->page->time_stamp > 
next->page->time_stamp) {
+               rb_inc_page(&next);
+
+               rb_list_head_clear(cpu_buffer->head_page->list.prev);
+               rb_inc_page(&cpu_buffer->head_page);
+               rb_set_list_to_head(cpu_buffer->head_page->list.prev);
+
+               if (cpu_buffer->head_page == orig) {
+                       if (WARN_ON_ONCE(!(--retry)))
+                               return;
+               }
+       }
+
+       orig = cpu_buffer->commit_page = cpu_buffer->head_page;
+       retry = 3;
+
+       while (cpu_buffer->commit_page->page->time_stamp < 
next->page->time_stamp) {
+               rb_inc_page(&next);
+               rb_inc_page(&cpu_buffer->commit_page);
+
+               if (cpu_buffer->commit_page == orig) {
+                       if (WARN_ON_ONCE(!(--retry)))
+                               return;
+               }
+       }
+}
+
 static void rb_iter_reset(struct ring_buffer_iter *iter)
 {
        struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
 
+       if (cpu_buffer->remote) {
+               rb_read_remote_meta_page(cpu_buffer);
+               rb_update_remote_head(cpu_buffer);
+       }
+
        /* Iterator usage is expected to have record disabled */
        iter->head_page = cpu_buffer->reader_page;
        iter->head = cpu_buffer->reader_page->read;
@@ -5503,7 +5544,7 @@ rb_update_iter_read_stamp(struct ring_buffer_iter *iter,
 static struct buffer_page *
 __rb_get_reader_page_from_remote(struct ring_buffer_per_cpu *cpu_buffer)
 {
-       struct buffer_page *new_reader, *prev_reader;
+       struct buffer_page *new_reader, *prev_reader, *prev_head, *new_head, 
*last;
 
        if (!rb_read_remote_meta_page(cpu_buffer))
                return NULL;
@@ -5527,10 +5568,32 @@ __rb_get_reader_page_from_remote(struct 
ring_buffer_per_cpu *cpu_buffer)
 
        WARN_ON_ONCE(prev_reader == new_reader);
 
-       cpu_buffer->reader_page->page = new_reader->page;
-       cpu_buffer->reader_page->id = new_reader->id;
-       cpu_buffer->reader_page->read = 0;
-       cpu_buffer->read_stamp = cpu_buffer->reader_page->page->time_stamp;
+       prev_head = new_reader;  /* New reader was also the previous head */
+       new_head = prev_head;
+       rb_inc_page(&new_head);
+       last = prev_head;
+       rb_dec_page(&last);
+
+       /* Clear the old HEAD flag */
+       rb_list_head_clear(cpu_buffer->head_page->list.prev);
+
+       prev_reader->list.next = prev_head->list.next;
+       prev_reader->list.prev = prev_head->list.prev;
+
+       /* Swap prev_reader with new_reader */
+       last->list.next = &prev_reader->list;
+       new_head->list.prev = &prev_reader->list;
+
+       new_reader->list.prev = &new_reader->list;
+       new_reader->list.next = &new_head->list;
+
+       /* Reactivate the HEAD flag */
+       rb_set_list_to_head(&last->list);
+
+       cpu_buffer->head_page = new_head;
+       cpu_buffer->reader_page = new_reader;
+       cpu_buffer->pages = &new_head->list;
+       cpu_buffer->read_stamp = new_reader->page->time_stamp;
        cpu_buffer->lost_events = cpu_buffer->meta_page->reader.lost_events;
 
        return rb_page_size(cpu_buffer->reader_page) ? cpu_buffer->reader_page 
: NULL;
@@ -6114,7 +6177,7 @@ ring_buffer_read_start(struct trace_buffer *buffer, int 
cpu, gfp_t flags)
        struct ring_buffer_per_cpu *cpu_buffer;
        struct ring_buffer_iter *iter;
 
-       if (!cpumask_test_cpu(cpu, buffer->cpumask) || buffer->remote)
+       if (!cpumask_test_cpu(cpu, buffer->cpumask))
                return NULL;
 
        iter = kzalloc(sizeof(*iter), flags);
-- 
2.52.0.107.ga0afd4fd5b-goog


Reply via email to