Hopefully, the remote will only swap pages on the kernel instruction (via the swap_reader_page() callback). This means we know at what point the ring-buffer geometry has changed. It is therefore possible to rearrange the kernel view of that ring-buffer to allow non-consuming read.
Signed-off-by: Vincent Donnefort <[email protected]> diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c index 9c6c8e206e04..cb674a9fc359 100644 --- a/kernel/trace/ring_buffer.c +++ b/kernel/trace/ring_buffer.c @@ -5347,10 +5347,51 @@ static bool rb_read_remote_meta_page(struct ring_buffer_per_cpu *cpu_buffer) return rb_num_of_entries(cpu_buffer); } +static void rb_update_remote_head(struct ring_buffer_per_cpu *cpu_buffer) +{ + struct buffer_page *next, *orig; + int retry = 3; + + orig = next = cpu_buffer->head_page; + rb_inc_page(&next); + + /* Run after the writer */ + while (cpu_buffer->head_page->page->time_stamp > next->page->time_stamp) { + rb_inc_page(&next); + + rb_list_head_clear(cpu_buffer->head_page->list.prev); + rb_inc_page(&cpu_buffer->head_page); + rb_set_list_to_head(cpu_buffer->head_page->list.prev); + + if (cpu_buffer->head_page == orig) { + if (WARN_ON_ONCE(!(--retry))) + return; + } + } + + orig = cpu_buffer->commit_page = cpu_buffer->head_page; + retry = 3; + + while (cpu_buffer->commit_page->page->time_stamp < next->page->time_stamp) { + rb_inc_page(&next); + rb_inc_page(&cpu_buffer->commit_page); + + if (cpu_buffer->commit_page == orig) { + if (WARN_ON_ONCE(!(--retry))) + return; + } + } +} + static void rb_iter_reset(struct ring_buffer_iter *iter) { struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; + if (cpu_buffer->remote) { + rb_read_remote_meta_page(cpu_buffer); + rb_update_remote_head(cpu_buffer); + } + /* Iterator usage is expected to have record disabled */ iter->head_page = cpu_buffer->reader_page; iter->head = cpu_buffer->reader_page->read; @@ -5503,7 +5544,7 @@ rb_update_iter_read_stamp(struct ring_buffer_iter *iter, static struct buffer_page * __rb_get_reader_page_from_remote(struct ring_buffer_per_cpu *cpu_buffer) { - struct buffer_page *new_reader, *prev_reader; + struct buffer_page *new_reader, *prev_reader, *prev_head, *new_head, *last; if (!rb_read_remote_meta_page(cpu_buffer)) return NULL; @@ -5527,10 +5568,32 @@ __rb_get_reader_page_from_remote(struct ring_buffer_per_cpu *cpu_buffer) WARN_ON_ONCE(prev_reader == new_reader); - cpu_buffer->reader_page->page = new_reader->page; - cpu_buffer->reader_page->id = new_reader->id; - cpu_buffer->reader_page->read = 0; - cpu_buffer->read_stamp = cpu_buffer->reader_page->page->time_stamp; + prev_head = new_reader; /* New reader was also the previous head */ + new_head = prev_head; + rb_inc_page(&new_head); + last = prev_head; + rb_dec_page(&last); + + /* Clear the old HEAD flag */ + rb_list_head_clear(cpu_buffer->head_page->list.prev); + + prev_reader->list.next = prev_head->list.next; + prev_reader->list.prev = prev_head->list.prev; + + /* Swap prev_reader with new_reader */ + last->list.next = &prev_reader->list; + new_head->list.prev = &prev_reader->list; + + new_reader->list.prev = &new_reader->list; + new_reader->list.next = &new_head->list; + + /* Reactivate the HEAD flag */ + rb_set_list_to_head(&last->list); + + cpu_buffer->head_page = new_head; + cpu_buffer->reader_page = new_reader; + cpu_buffer->pages = &new_head->list; + cpu_buffer->read_stamp = new_reader->page->time_stamp; cpu_buffer->lost_events = cpu_buffer->meta_page->reader.lost_events; return rb_page_size(cpu_buffer->reader_page) ? cpu_buffer->reader_page : NULL; @@ -6114,7 +6177,7 @@ ring_buffer_read_start(struct trace_buffer *buffer, int cpu, gfp_t flags) struct ring_buffer_per_cpu *cpu_buffer; struct ring_buffer_iter *iter; - if (!cpumask_test_cpu(cpu, buffer->cpumask) || buffer->remote) + if (!cpumask_test_cpu(cpu, buffer->cpumask)) return NULL; iter = kzalloc(sizeof(*iter), flags); -- 2.52.0.107.ga0afd4fd5b-goog
