On Sun, Aug 3, 2025 at 7:27 PM Xu Kuohai <xukuo...@huaweicloud.com> wrote: > > From: Xu Kuohai <xukuo...@huawei.com> > > In overwrite mode, the producer does not wait for the consumer, so the > consumer is responsible for handling conflicts. An optimistic method > is used to resolve the conflicts: the consumer first reads consumer_pos, > producer_pos and overwrite_pos, then calculates a read window and copies > data in the window from the ring buffer. After copying, it checks the > positions to decide if the data in the copy window have been overwritten > by be the producer. If so, it discards the copy and tries again. Once > success, the consumer processes the events in the copy. > > Signed-off-by: Xu Kuohai <xukuo...@huawei.com> > --- > tools/lib/bpf/ringbuf.c | 103 +++++++++++++++++++++++++++++++++++++++- > 1 file changed, 102 insertions(+), 1 deletion(-) > > diff --git a/tools/lib/bpf/ringbuf.c b/tools/lib/bpf/ringbuf.c > index 9702b70da444..9c072af675ff 100644 > --- a/tools/lib/bpf/ringbuf.c > +++ b/tools/lib/bpf/ringbuf.c > @@ -27,10 +27,13 @@ struct ring { > ring_buffer_sample_fn sample_cb; > void *ctx; > void *data; > + void *read_buffer; > unsigned long *consumer_pos; > unsigned long *producer_pos; > + unsigned long *overwrite_pos; > unsigned long mask; > int map_fd; > + bool overwrite_mode; > }; > > struct ring_buffer { > @@ -69,6 +72,9 @@ static void ringbuf_free_ring(struct ring_buffer *rb, > struct ring *r) > r->producer_pos = NULL; > } > > + if (r->read_buffer) > + free(r->read_buffer); > + > free(r); > } > > @@ -119,6 +125,14 @@ int ring_buffer__add(struct ring_buffer *rb, int map_fd, > r->sample_cb = sample_cb; > r->ctx = ctx; > r->mask = info.max_entries - 1; > + r->overwrite_mode = info.map_flags & BPF_F_OVERWRITE; > + if (unlikely(r->overwrite_mode)) { > + r->read_buffer = malloc(info.max_entries); > + if (!r->read_buffer) { > + err = -ENOMEM; > + goto err_out; > + } > + } > > /* Map writable consumer page */ > tmp = mmap(NULL, rb->page_size, PROT_READ | PROT_WRITE, MAP_SHARED, map_fd, > 0); > @@ -148,6 +162,7 @@ int ring_buffer__add(struct ring_buffer *rb, int map_fd, > goto err_out; > } > r->producer_pos = tmp; > + r->overwrite_pos = r->producer_pos + 1; /* overwrite_pos is next to > producer_pos */ > r->data = tmp + rb->page_size; > > e = &rb->events[rb->ring_cnt]; > @@ -232,7 +247,7 @@ static inline int roundup_len(__u32 len) > return (len + 7) / 8 * 8; > } > > -static int64_t ringbuf_process_ring(struct ring *r, size_t n) > +static int64_t ringbuf_process_normal_ring(struct ring *r, size_t n) > { > int *len_ptr, len, err; > /* 64-bit to avoid overflow in case of extreme application behavior */ > @@ -278,6 +293,92 @@ static int64_t ringbuf_process_ring(struct ring *r, > size_t n) > return cnt; > } > > +static int64_t ringbuf_process_overwrite_ring(struct ring *r, size_t n) > +{ > + > + int err; > + uint32_t *len_ptr, len; > + /* 64-bit to avoid overflow in case of extreme application behavior */ > + int64_t cnt = 0; > + size_t size, offset; > + unsigned long cons_pos, prod_pos, over_pos, tmp_pos; > + bool got_new_data; > + void *sample; > + bool copied; > + > + size = r->mask + 1; > + > + cons_pos = smp_load_acquire(r->consumer_pos); > + do { > + got_new_data = false; > + > + /* grab a copy of data */ > + prod_pos = smp_load_acquire(r->producer_pos); > + do { > + over_pos = READ_ONCE(*r->overwrite_pos); > + /* prod_pos may be outdated now */ > + if (over_pos < prod_pos) { > + tmp_pos = max(cons_pos, over_pos); > + /* smp_load_acquire(r->producer_pos) before > + * READ_ONCE(*r->overwrite_pos) ensures that > + * over_pos + r->mask < prod_pos never occurs, > + * so size is never larger than r->mask > + */ > + size = prod_pos - tmp_pos; > + if (!size) > + goto done; > + memcpy(r->read_buffer, > + r->data + (tmp_pos & r->mask), size); > + copied = true; > + } else { > + copied = false; > + } > + prod_pos = smp_load_acquire(r->producer_pos); > + /* retry if data is overwritten by producer */ > + } while (!copied || prod_pos - tmp_pos > r->mask);
This seems to allow for a situation where a call to process the ring can infinite loop if the producers are producing and overwriting fast enough. That seems suboptimal to me? Should there be a timeout or maximum number of attempts or something that returns -EBUSY or another error to the user? > + > + cons_pos = tmp_pos; > + > + for (offset = 0; offset < size; offset += roundup_len(len)) { > + len_ptr = r->read_buffer + (offset & r->mask); > + len = *len_ptr; > + > + if (len & BPF_RINGBUF_BUSY_BIT) > + goto done; > + > + got_new_data = true; > + cons_pos += roundup_len(len); > + > + if ((len & BPF_RINGBUF_DISCARD_BIT) == 0) { > + sample = (void *)len_ptr + BPF_RINGBUF_HDR_SZ; > + err = r->sample_cb(r->ctx, sample, len); > + if (err < 0) { > + /* update consumer pos and bail out */ > + smp_store_release(r->consumer_pos, > + cons_pos); > + return err; > + } > + cnt++; > + } > + > + if (cnt >= n) > + goto done; > + } > + } while (got_new_data); > + > +done: > + smp_store_release(r->consumer_pos, cons_pos); > + return cnt; > +} > + > +static int64_t ringbuf_process_ring(struct ring *r, size_t n) > +{ > + if (likely(!r->overwrite_mode)) > + return ringbuf_process_normal_ring(r, n); > + else > + return ringbuf_process_overwrite_ring(r, n); > +} > + > /* Consume available ring buffer(s) data without event polling, up to n > * records. > * > -- > 2.43.0 > >