Ring buffer callbacks and contexts are fixed when each ring is added to
a manager. Callers that drain records into per-operation state, such as
a bounded destination buffer, must therefore keep mutable dispatch state
alive for the lifetime of the manager or rebuild it for each operation.

Add bounded manager and per-ring consumption APIs that take a callback
and context for one operation. The supplied callback overrides the
configured callback without changing it. This also allows managers used
only through the new APIs to be created without a setup-time callback.

Make zero-record requests return without invoking callbacks so all
bounded consumption APIs preserve their bound.

Assisted-by: Codex:gpt-5.5
Signed-off-by: Tamir Duberstein <[email protected]>
---
I discussed the desire for this API with Andrii during my time at Meta
and more recently with Alexei during Rust Week 2026.
---
 tools/lib/bpf/libbpf.h                             | 56 +++++++++++--
 tools/lib/bpf/libbpf.map                           |  2 +
 tools/lib/bpf/ringbuf.c                            | 51 ++++++++++--
 tools/testing/selftests/bpf/prog_tests/ringbuf.c   | 25 ++++++
 .../selftests/bpf/prog_tests/ringbuf_multi.c       | 92 ++++++++++++++++++++++
 5 files changed, 214 insertions(+), 12 deletions(-)

diff --git a/tools/lib/bpf/libbpf.h b/tools/lib/bpf/libbpf.h
index b965ad571540..5a87c12bab0b 100644
--- a/tools/lib/bpf/libbpf.h
+++ b/tools/lib/bpf/libbpf.h
@@ -1471,6 +1471,29 @@ LIBBPF_API int ring_buffer__add(struct ring_buffer *rb, 
int map_fd,
 LIBBPF_API int ring_buffer__poll(struct ring_buffer *rb, int timeout_ms);
 LIBBPF_API int ring_buffer__consume(struct ring_buffer *rb);
 LIBBPF_API int ring_buffer__consume_n(struct ring_buffer *rb, size_t n);
+
+/**
+ * @brief **ring_buffer__consume_n_cb()** consumes up to a requested number of
+ * records across all registered ring buffer maps without event polling, using
+ * the provided callback and context for every ring buffer map.
+ *
+ * The provided callback and context override the callbacks configured for the
+ * manager's rings for the duration of this call without changing them.
+ * **ring_buffer__new()** and **ring_buffer__add()** may receive a NULL 
callback
+ * for rings consumed only through this function.
+ *
+ * @param rb A ring buffer manager object.
+ * @param sample_cb Non-NULL callback to invoke for each available record.
+ * @param ctx Context to pass to *sample_cb*.
+ * @param n Maximum number of records to consume across all registered ring
+ * buffer maps.
+ * @return The number of records consumed (or INT_MAX, whichever is less), or a
+ * negative error code on failure.
+ */
+LIBBPF_API int ring_buffer__consume_n_cb(struct ring_buffer *rb,
+                                        ring_buffer_sample_fn sample_cb,
+                                        void *ctx, size_t n);
+
 LIBBPF_API int ring_buffer__epoll_fd(const struct ring_buffer *rb);
 
 /**
@@ -1546,16 +1569,37 @@ LIBBPF_API int ring__map_fd(const struct ring *r);
 LIBBPF_API int ring__consume(struct ring *r);
 
 /**
- * @brief **ring__consume_n()** consumes up to a requested amount of items from
- * a ringbuffer without event polling.
+ * @brief **ring__consume_n()** consumes up to a requested number of records
+ * from a ring buffer without event polling.
  *
- * @param r A ringbuffer object.
- * @param n Maximum amount of items to consume.
- * @return The number of items consumed, or a negative number if any of the
- * callbacks return an error.
+ * @param r A ring buffer object.
+ * @param n Maximum number of records to consume.
+ * @return The number of records consumed (or INT_MAX, whichever is less), or a
+ * negative error code on failure.
  */
 LIBBPF_API int ring__consume_n(struct ring *r, size_t n);
 
+/**
+ * @brief **ring__consume_n_cb()** consumes up to a requested number of records
+ * from a ring buffer without event polling, using the provided callback and
+ * context.
+ *
+ * The provided callback and context override the callback configured for the
+ * ring for the duration of this call without changing it.
+ * **ring_buffer__new()** and **ring_buffer__add()** may receive a NULL 
callback
+ * for rings consumed only through this function.
+ *
+ * @param r A ring buffer object.
+ * @param sample_cb Non-NULL callback to invoke for each available record.
+ * @param ctx Context to pass to *sample_cb*.
+ * @param n Maximum number of records to consume.
+ * @return The number of records consumed (or INT_MAX, whichever is less), or a
+ * negative error code on failure.
+ */
+LIBBPF_API int ring__consume_n_cb(struct ring *r,
+                                 ring_buffer_sample_fn sample_cb, void *ctx,
+                                 size_t n);
+
 struct user_ring_buffer_opts {
        size_t sz; /* size of this struct, for forward/backward compatibility */
 };
diff --git a/tools/lib/bpf/libbpf.map b/tools/lib/bpf/libbpf.map
index b731df19ae69..b6d84297fce7 100644
--- a/tools/lib/bpf/libbpf.map
+++ b/tools/lib/bpf/libbpf.map
@@ -461,4 +461,6 @@ LIBBPF_1.8.0 {
                bpf_program__attach_tracing_multi;
                bpf_program__clone;
                btf__new_empty_opts;
+               ring__consume_n_cb;
+               ring_buffer__consume_n_cb;
 } LIBBPF_1.7.0;
diff --git a/tools/lib/bpf/ringbuf.c b/tools/lib/bpf/ringbuf.c
index 00ec4837a06d..efe5e396f1d2 100644
--- a/tools/lib/bpf/ringbuf.c
+++ b/tools/lib/bpf/ringbuf.c
@@ -231,7 +231,8 @@ static inline int roundup_len(__u32 len)
        return (len + 7) / 8 * 8;
 }
 
-static int64_t ringbuf_process_ring(struct ring *r, size_t n)
+static int64_t ringbuf_process_ring(struct ring *r, size_t n,
+                                   ring_buffer_sample_fn sample_cb, void *ctx)
 {
        int *len_ptr, len, err;
        /* 64-bit to avoid overflow in case of extreme application behavior */
@@ -240,6 +241,11 @@ static int64_t ringbuf_process_ring(struct ring *r, size_t 
n)
        bool got_new_data;
        void *sample;
 
+       if (!sample_cb)
+               return -EINVAL;
+       if (n == 0)
+               return 0;
+
        cons_pos = smp_load_acquire(r->consumer_pos);
        do {
                got_new_data = false;
@@ -257,7 +263,7 @@ static int64_t ringbuf_process_ring(struct ring *r, size_t 
n)
 
                        if ((len & BPF_RINGBUF_DISCARD_BIT) == 0) {
                                sample = (void *)len_ptr + BPF_RINGBUF_HDR_SZ;
-                               err = r->sample_cb(r->ctx, sample, len);
+                               err = sample_cb(ctx, sample, len);
                                if (err < 0) {
                                        /* update consumer pos and bail out */
                                        smp_store_release(r->consumer_pos,
@@ -292,7 +298,32 @@ int ring_buffer__consume_n(struct ring_buffer *rb, size_t 
n)
        for (i = 0; i < rb->ring_cnt; i++) {
                struct ring *ring = rb->rings[i];
 
-               err = ringbuf_process_ring(ring, n);
+               err = ringbuf_process_ring(ring, n, ring->sample_cb, ring->ctx);
+               if (err < 0)
+                       return libbpf_err(err);
+               res += err;
+               n -= err;
+
+               if (n == 0)
+                       break;
+       }
+       return res > INT_MAX ? INT_MAX : res;
+}
+
+int ring_buffer__consume_n_cb(struct ring_buffer *rb,
+                             ring_buffer_sample_fn sample_cb, void *ctx,
+                             size_t n)
+{
+       int64_t err, res = 0;
+       int i;
+
+       if (!sample_cb)
+               return libbpf_err(-EINVAL);
+       if (n == 0)
+               return 0;
+
+       for (i = 0; i < rb->ring_cnt; i++) {
+               err = ringbuf_process_ring(rb->rings[i], n, sample_cb, ctx);
                if (err < 0)
                        return libbpf_err(err);
                res += err;
@@ -317,7 +348,8 @@ int ring_buffer__consume(struct ring_buffer *rb)
        for (i = 0; i < rb->ring_cnt; i++) {
                struct ring *ring = rb->rings[i];
 
-               err = ringbuf_process_ring(ring, INT_MAX);
+               err = ringbuf_process_ring(ring, INT_MAX, ring->sample_cb,
+                                          ring->ctx);
                if (err < 0)
                        return libbpf_err(err);
                res += err;
@@ -346,7 +378,8 @@ int ring_buffer__poll(struct ring_buffer *rb, int 
timeout_ms)
                __u32 ring_id = rb->events[i].data.fd;
                struct ring *ring = rb->rings[ring_id];
 
-               err = ringbuf_process_ring(ring, INT_MAX);
+               err = ringbuf_process_ring(ring, INT_MAX, ring->sample_cb,
+                                          ring->ctx);
                if (err < 0)
                        return libbpf_err(err);
                res += err;
@@ -404,10 +437,16 @@ int ring__map_fd(const struct ring *r)
 }
 
 int ring__consume_n(struct ring *r, size_t n)
+{
+       return ring__consume_n_cb(r, r->sample_cb, r->ctx, n);
+}
+
+int ring__consume_n_cb(struct ring *r, ring_buffer_sample_fn sample_cb,
+                      void *ctx, size_t n)
 {
        int64_t res;
 
-       res = ringbuf_process_ring(r, n);
+       res = ringbuf_process_ring(r, n, sample_cb, ctx);
        if (res < 0)
                return libbpf_err(res);
 
diff --git a/tools/testing/selftests/bpf/prog_tests/ringbuf.c 
b/tools/testing/selftests/bpf/prog_tests/ringbuf.c
index 64520684d2cb..df3af61af2e0 100644
--- a/tools/testing/selftests/bpf/prog_tests/ringbuf.c
+++ b/tools/testing/selftests/bpf/prog_tests/ringbuf.c
@@ -397,6 +397,8 @@ static int process_n_sample(void *ctx, void *data, size_t 
len)
        struct sample *s = data;
 
        ASSERT_EQ(s->value, SAMPLE_VALUE, "sample_value");
+       if (ctx)
+               (*(int *)ctx)++;
 
        return 0;
 }
@@ -404,6 +406,8 @@ static int process_n_sample(void *ctx, void *data, size_t 
len)
 static void ringbuf_n_subtest(void)
 {
        struct test_ringbuf_n_lskel *skel_n;
+       struct ring *ring;
+       int cb_cnt = 0;
        int err, i;
 
        skel_n = test_ringbuf_n_lskel__open();
@@ -438,6 +442,27 @@ static void ringbuf_n_subtest(void)
                        goto cleanup_ringbuf;
        }
 
+       ring_buffer__free(ringbuf);
+       ringbuf =
+               ring_buffer__new(skel_n->maps.ringbuf.map_fd, NULL, NULL, NULL);
+       if (!ASSERT_OK_PTR(ringbuf, "ring_buffer__new_without_cb"))
+               goto cleanup;
+
+       for (i = 0; i < N_TOT_SAMPLES; i++)
+               syscall(__NR_getpgid);
+
+       ring = ring_buffer__ring(ringbuf, 0);
+       if (!ASSERT_OK_PTR(ring, "ring_buffer__ring"))
+               goto cleanup_ringbuf;
+
+       for (i = 0; i < N_TOT_SAMPLES; i += err) {
+               err = ring__consume_n_cb(ring, process_n_sample, &cb_cnt,
+                                        N_SAMPLES);
+               if (!ASSERT_EQ(err, N_SAMPLES, "ring_consume_cb"))
+                       goto cleanup_ringbuf;
+       }
+       ASSERT_EQ(cb_cnt, N_TOT_SAMPLES, "callback_count");
+
 cleanup_ringbuf:
        ring_buffer__free(ringbuf);
 cleanup:
diff --git a/tools/testing/selftests/bpf/prog_tests/ringbuf_multi.c 
b/tools/testing/selftests/bpf/prog_tests/ringbuf_multi.c
index 58522195081b..c499e46fe831 100644
--- a/tools/testing/selftests/bpf/prog_tests/ringbuf_multi.c
+++ b/tools/testing/selftests/bpf/prog_tests/ringbuf_multi.c
@@ -38,12 +38,37 @@ static int process_sample(void *ctx, void *data, size_t len)
        return 0;
 }
 
+static int process_sample_override(void *ctx, void *data, size_t len)
+{
+       struct sample *s = data;
+       int *cnt = ctx;
+
+       if (!ASSERT_LT(s->seq, 2, "override_sample_seq"))
+               return -EINVAL;
+       ASSERT_EQ(s->value, s->seq ? 777L : 333L, "override_sample_value");
+       (*cnt)++;
+
+       return 0;
+}
+
+static void trigger_samples(struct test_ringbuf_multi *skel)
+{
+       skel->bss->total = 0;
+       skel->bss->target_ring = 0;
+       skel->bss->value = 333;
+       syscall(__NR_getpgid);
+       skel->bss->target_ring = 2;
+       skel->bss->value = 777;
+       syscall(__NR_getpgid);
+}
+
 void test_ringbuf_multi(void)
 {
        struct test_ringbuf_multi *skel;
        struct ring_buffer *ringbuf = NULL;
        struct ring *ring_old;
        struct ring *ring;
+       int override_cnt = 0;
        int err;
        int page_size = getpagesize();
        int proto_fd = -1;
@@ -139,6 +164,73 @@ void test_ringbuf_multi(void)
        CHECK(skel->bss->total != 2, "err_total", "exp %ld, got %ld\n",
              2L, skel->bss->total);
 
+       trigger_samples(skel);
+       err = ring_buffer__consume_n(ringbuf, 0);
+       if (!ASSERT_EQ(err, 0, "ringbuf_consume_zero_stored_cb"))
+               goto cleanup;
+       err = ring_buffer__consume_n_cb(ringbuf, NULL, NULL, 0);
+       if (!ASSERT_EQ(err, -EINVAL, "ringbuf_consume_null_cb"))
+               goto cleanup;
+       err = ring_buffer__consume_n_cb(ringbuf, process_sample_override,
+                                       &override_cnt, 0);
+       if (!ASSERT_EQ(err, 0, "ringbuf_consume_zero"))
+               goto cleanup;
+       if (!ASSERT_EQ(override_cnt, 0, "ringbuf_zero_callback_count"))
+               goto cleanup;
+       err = ring_buffer__consume_n_cb(ringbuf, process_sample_override,
+                                       &override_cnt, 1);
+       if (!ASSERT_EQ(err, 1, "ringbuf_consume_n_cb_1"))
+               goto cleanup;
+       err = ring_buffer__consume_n_cb(ringbuf, process_sample_override,
+                                       &override_cnt, 1);
+       if (!ASSERT_EQ(err, 1, "ringbuf_consume_n_cb_2"))
+               goto cleanup;
+       if (!ASSERT_EQ(override_cnt, 2, "ringbuf_callback_count"))
+               goto cleanup;
+
+       trigger_samples(skel);
+       err = ring_buffer__consume_n(ringbuf, 2);
+       if (!ASSERT_EQ(err, 2, "ringbuf_consume_stored_cb"))
+               goto cleanup;
+       if (!ASSERT_EQ(override_cnt, 2, "ringbuf_override_not_stored"))
+               goto cleanup;
+
+       trigger_samples(skel);
+       ring = ring_buffer__ring(ringbuf, 0);
+       if (!ASSERT_OK_PTR(ring, "ring_buffer__ring_idx_0_cb"))
+               goto cleanup;
+       err = ring__consume_n(ring, 0);
+       if (!ASSERT_EQ(err, 0, "ring1_consume_zero_stored_cb"))
+               goto cleanup;
+       err = ring__consume_n_cb(ring, NULL, NULL, 0);
+       if (!ASSERT_EQ(err, -EINVAL, "ring1_consume_null_cb"))
+               goto cleanup;
+       err = ring__consume_n_cb(ring, process_sample_override, &override_cnt,
+                                0);
+       if (!ASSERT_EQ(err, 0, "ring1_consume_zero"))
+               goto cleanup;
+       if (!ASSERT_EQ(override_cnt, 2, "ring1_zero_callback_count"))
+               goto cleanup;
+       err = ring__consume_n_cb(ring, process_sample_override, &override_cnt,
+                                1);
+       if (!ASSERT_EQ(err, 1, "ring1_consume_n_cb"))
+               goto cleanup;
+
+       ring = ring_buffer__ring(ringbuf, 1);
+       if (!ASSERT_OK_PTR(ring, "ring_buffer__ring_idx_1_cb"))
+               goto cleanup;
+       err = ring__consume_n_cb(ring, process_sample_override, &override_cnt,
+                                1);
+       if (!ASSERT_EQ(err, 1, "ring2_consume_n_cb"))
+               goto cleanup;
+       if (!ASSERT_EQ(override_cnt, 4, "ring_callback_count"))
+               goto cleanup;
+
+       trigger_samples(skel);
+       err = ring_buffer__consume_n(ringbuf, 2);
+       ASSERT_EQ(err, 2, "ring_consume_stored_cb");
+       ASSERT_EQ(override_cnt, 4, "ring_override_not_stored");
+
 cleanup:
        if (proto_fd >= 0)
                close(proto_fd);

---
base-commit: 30dee2c176e7954f63d1fa3e52d172f30beb9bfb
change-id: 20260611-bpf-ringbuf-callback-4127b8e72882

Best regards,
--  
Tamir Duberstein <[email protected]>


Reply via email to