Sorry I have been preempted before I can answer here:
On Tue, Aug 06, 2024 at 04:39:53PM -0400, Steven Rostedt wrote:
> On Mon, 5 Aug 2024 18:32:25 +0100
> Vincent Donnefort <[email protected]> wrote:
>
> > +
> > +#define for_each_rb_page_desc(__pdesc, __cpu, __trace_pdesc)
> > \
> > + for (__pdesc = (struct rb_page_desc *)&((__trace_pdesc)->__data[0]),
> > __cpu = 0; \
> > + __cpu < (__trace_pdesc)->nr_cpus;
> > \
> > + __cpu++, __pdesc = __next_rb_page_desc(__pdesc))
> > +
> > +static inline
> > +struct rb_page_desc *rb_page_desc(struct trace_page_desc *trace_pdesc, int
> > cpu)
> > +{
> > + struct rb_page_desc *pdesc;
> > + int i;
> > +
> > + if (!trace_pdesc)
> > + return NULL;
> > +
> > + for_each_rb_page_desc(pdesc, i, trace_pdesc) {
> > + if (pdesc->cpu == cpu)
>
> Is there a reason for the linear search?
>
> Why not just:
>
> if (cpu >= trace_pdesc->nr_cpus)
> return NULL;
>
> len = struct_size(pdesc, page_va, pdesc->nr_page_va);
> pdesc = (void *)&(trace_pdesc->__data[0]);
>
> return pdesc + len * cpu;
>
> (note I don't think you need to typecast the void pointer).
I supposed we can't assume buffers will be allocated for each CPU, hence the
need to look at each buffer.
>
> > + return pdesc;
> > + }
> > +
> > + return NULL;
> > +}
> > +
> > +static inline
> > +void *rb_page_desc_page(struct rb_page_desc *pdesc, int page_id)
> > +{
> > + return page_id > pdesc->nr_page_va ? NULL : (void
> > *)pdesc->page_va[page_id];
> > +}
> > +
> > +struct ring_buffer_writer {
> > + struct trace_page_desc *pdesc;
> > + int (*get_reader_page)(int cpu);
> > +};
> > +
> > +int ring_buffer_poll_writer(struct trace_buffer *buffer, int cpu);
> > +
> > +#define ring_buffer_reader(writer) \
> > +({ \
> > + static struct lock_class_key __key; \
> > + __ring_buffer_alloc(0, RB_FL_OVERWRITE, &__key, writer);\
> > +})
> > #endif /* _LINUX_RING_BUFFER_H */
> > diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c
> > index f4f4dda28077..a07c22254cfd 100644
> > --- a/kernel/trace/ring_buffer.c
> > +++ b/kernel/trace/ring_buffer.c
> > @@ -495,6 +495,8 @@ struct ring_buffer_per_cpu {
> > unsigned long *subbuf_ids; /* ID to subbuf VA */
> > struct trace_buffer_meta *meta_page;
> >
> > + struct ring_buffer_writer *writer;
> > +
> > /* ring buffer pages to update, > 0 to add, < 0 to remove */
> > long nr_pages_to_update;
> > struct list_head new_pages; /* new pages to add */
> > @@ -517,6 +519,8 @@ struct trace_buffer {
> >
> > struct ring_buffer_per_cpu **buffers;
> >
> > + struct ring_buffer_writer *writer;
> > +
> > struct hlist_node node;
> > u64 (*clock)(void);
> >
> > @@ -1626,6 +1630,31 @@ rb_allocate_cpu_buffer(struct trace_buffer *buffer,
> > long nr_pages, int cpu)
> >
> > cpu_buffer->reader_page = bpage;
> >
> > + if (buffer->writer) {
> > + struct rb_page_desc *pdesc =
> > rb_page_desc(buffer->writer->pdesc, cpu);
> > +
> > + if (!pdesc)
> > + goto fail_free_reader;
> > +
> > + cpu_buffer->writer = buffer->writer;
> > + cpu_buffer->meta_page = (struct trace_buffer_meta *)(void
> > *)pdesc->meta_va;
> > + cpu_buffer->subbuf_ids = pdesc->page_va;
> > + cpu_buffer->nr_pages = pdesc->nr_page_va - 1;
> > + atomic_inc(&cpu_buffer->record_disabled);
> > + atomic_inc(&cpu_buffer->resize_disabled);
> > +
> > + bpage->page = rb_page_desc_page(pdesc,
> > +
> > cpu_buffer->meta_page->reader.id);
> > + if (!bpage->page)
> > + goto fail_free_reader;
> > + /*
> > + * The meta-page can only describe which of the ring-buffer page
> > + * is the reader. There is no need to init the rest of the
> > + * ring-buffer.
> > + */
> > + return cpu_buffer;
> > + }
> > +
> > page = alloc_pages_node(cpu_to_node(cpu), GFP_KERNEL | __GFP_COMP |
> > __GFP_ZERO,
> > cpu_buffer->buffer->subbuf_order);
> > if (!page)
> > @@ -1663,6 +1692,10 @@ static void rb_free_cpu_buffer(struct
> > ring_buffer_per_cpu *cpu_buffer)
> >
> > irq_work_sync(&cpu_buffer->irq_work.work);
> >
> > + if (cpu_buffer->writer)
> > + /* the ring_buffer doesn't own the data pages */
> > + cpu_buffer->reader_page->page = NULL;
>
> Note, if statements are to have brackets if it's more than one line. That
> even includes comments. So the above should be written either as:
>
> if (cpu_buffer->writer) {
> /* the ring_buffer doesn't own the data pages */
> cpu_buffer->reader_page->page = NULL;
> }
>
> Or
>
> /* the ring_buffer doesn't own the data pages */
> if (cpu_buffer->writer)
> cpu_buffer->reader_page->page = NULL;
>
> For the second version, you should probably add more detail:
>
> /* ring_buffers with writer set do not own the data pages */
> if (cpu_buffer->writer)
> cpu_buffer->reader_page->page = NULL;
>
>
> > +
> > free_buffer_page(cpu_buffer->reader_page);
> >
> > if (head) {
> > @@ -1693,7 +1726,8 @@ static void rb_free_cpu_buffer(struct
> > ring_buffer_per_cpu *cpu_buffer)
> > * drop data when the tail hits the head.
> > */
> > struct trace_buffer *__ring_buffer_alloc(unsigned long size, unsigned
> > flags,
> > - struct lock_class_key *key)
> > + struct lock_class_key *key,
> > + struct ring_buffer_writer *writer)
> > {
> > struct trace_buffer *buffer;
> > long nr_pages;
> > @@ -1721,6 +1755,10 @@ struct trace_buffer *__ring_buffer_alloc(unsigned
> > long size, unsigned flags,
> > buffer->flags = flags;
> > buffer->clock = trace_clock_local;
> > buffer->reader_lock_key = key;
> > + if (writer) {
> > + buffer->writer = writer;
>
> Should probably add a comment here:
>
> /* The writer is external and never done by the kernel */
>
> or something like that.
>
> > + atomic_inc(&buffer->record_disabled);
> > + }
> >
>
> -- Steve
>
[...]