On Tue, Jun 16, 2026 at 4:36 PM Dumitru Ceara <[email protected]> wrote:

> On 6/16/26 11:48 AM, Ales Musil via dev wrote:
> > Add a Single-Producer, Single-Consumer lock-free ring buffer
> > to lib/ for use as a building block in removing pinctrl_mutex
> > contention from packet-in processing.
> >
> > The ring buffer uses a pre-allocated array with atomic read
> > and write indices for synchronization.  Data is copied in
> > and out by value (memcpy), making it well suited for small,
> > fixed-size structs.  Capacity must be a power of two; the
> > implementation uses unsigned 32-bit wraparound arithmetic
> > for correct index handling.
> >
> > Provide SPSC_RING_FOR_EACH_POP() for idiomatic drain loops.
> >
> > Include unit tests covering basic push/pop, FIFO ordering,
> > full ring behavior, index wraparound, the FOR_EACH_POP
> > macro, and multi-field struct elements.
> >
> > Assisted-by: Claude Opus 4.6, OpenCode
> > Signed-off-by: Ales Musil <[email protected]>
> > ---
>
> Hi Ales,
>
> Thanks for the patch!
>

Thank you for the review Dumitru.


>
> > v2: Rebase on top of latest main.
> >     Remove the SPSC_RING_INIT macro.
> >     Remove the spsc_ring_len function.
> > ---
> >  lib/automake.mk        |   2 +
> >  lib/spsc-ring.c        |  79 +++++++++++++++
> >  lib/spsc-ring.h        |  68 +++++++++++++
> >  tests/automake.mk      |   1 +
> >  tests/ovn.at           |  14 +++
> >  tests/test-spsc-ring.c | 224 +++++++++++++++++++++++++++++++++++++++++
> >  6 files changed, 388 insertions(+)
> >  create mode 100644 lib/spsc-ring.c
> >  create mode 100644 lib/spsc-ring.h
> >  create mode 100644 tests/test-spsc-ring.c
> >
> > diff --git a/lib/automake.mk b/lib/automake.mk
> > index 25741cbdf..e1c644bdd 100644
> > --- a/lib/automake.mk
> > +++ b/lib/automake.mk
> > @@ -50,6 +50,8 @@ lib_libovn_la_SOURCES = \
> >       lib/lb.h \
> >       lib/sparse-array.c \
> >       lib/sparse-array.h \
> > +     lib/spsc-ring.c \
> > +     lib/spsc-ring.h \
> >       lib/stopwatch-names.h \
> >       lib/vec.c \
> >       lib/vec.h \
> > diff --git a/lib/spsc-ring.c b/lib/spsc-ring.c
> > new file mode 100644
> > index 000000000..d174f56b0
> > --- /dev/null
> > +++ b/lib/spsc-ring.c
> > @@ -0,0 +1,79 @@
> > +/* Copyright (c) 2026, Red Hat, Inc.
> > + *
> > + * Licensed under the Apache License, Version 2.0 (the "License");
> > + * you may not use this file except in compliance with the License.
> > + * You may obtain a copy of the License at:
> > + *
> > + *     http://www.apache.org/licenses/LICENSE-2.0
> > + *
> > + * Unless required by applicable law or agreed to in writing, software
> > + * distributed under the License is distributed on an "AS IS" BASIS,
> > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> > + * See the License for the specific language governing permissions and
> > + * limitations under the License.
> > + */
> > +
> > +#include <config.h>
> > +#include <string.h>
> > +
> > +#include "spsc-ring.h"
> > +#include "util.h"
> > +
> > +void
> > +spsc_ring_init(struct spsc_ring *r, uint32_t capacity, size_t esize)
> > +{
> > +    ovs_assert(IS_POW2(capacity));
>
> This is not documented anywhere AFAICT.  Maybe we could add a comment
> above?
>

It is mentioned in the .h file, but I have also added a comment here.


>
> > +    r->buffer = xmalloc(capacity * esize);
> > +    r->esize = esize;
> > +    r->mask = capacity - 1;
> > +    atomic_init(&r->read, 0);
> > +    atomic_init(&r->write, 0);
> > +}
> > +
> > +void
> > +spsc_ring_destroy(struct spsc_ring *r)
> > +{
> > +    free(r->buffer);
> > +    r->buffer = NULL;
> > +}
> > +
> > +/* Producer: copy 'data' into the next available slot.
> > + * Returns true on success, false if the ring is full. */
> > +bool
> > +spsc_ring_push(struct spsc_ring *r, const void *data)
> > +{
> > +    uint32_t wr, rd;
> > +
> > +    atomic_read(&r->write, &wr);
> > +    atomic_read(&r->read, &rd);
> > +
> > +    if (wr - rd > r->mask) {
> > +        return false;
> > +    }
> > +
> > +    memcpy((uint8_t *) r->buffer + (wr & r->mask) * r->esize,
> > +           data, r->esize);
> > +    atomic_store(&r->write, wr + 1);
> > +    return true;
> > +}
> > +
> > +/* Consumer: copy the oldest slot's data into 'data'.
> > + * Returns true on success, false if the ring is empty. */
> > +bool
> > +spsc_ring_pop(struct spsc_ring *r, void *data)
> > +{
> > +    uint32_t rd, wr;
> > +
> > +    atomic_read(&r->read, &rd);
> > +    atomic_read(&r->write, &wr);
> > +
> > +    if (rd == wr) {
> > +        return false;
> > +    }
> > +
> > +    memcpy(data,
> > +           (uint8_t *) r->buffer + (rd & r->mask) * r->esize,
> > +           r->esize);
> > +    atomic_store(&r->read, rd + 1);
> > +    return true;
> > +}
> > diff --git a/lib/spsc-ring.h b/lib/spsc-ring.h
> > new file mode 100644
> > index 000000000..542733609
> > --- /dev/null
> > +++ b/lib/spsc-ring.h
> > @@ -0,0 +1,68 @@
> > +/* Copyright (c) 2026, Red Hat, Inc.
> > + *
> > + * Licensed under the Apache License, Version 2.0 (the "License");
> > + * you may not use this file except in compliance with the License.
> > + * You may obtain a copy of the License at:
> > + *
> > + *     http://www.apache.org/licenses/LICENSE-2.0
> > + *
> > + * Unless required by applicable law or agreed to in writing, software
> > + * distributed under the License is distributed on an "AS IS" BASIS,
> > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> > + * See the License for the specific language governing permissions and
> > + * limitations under the License.
> > + */
> > +
> > +#ifndef SPSC_RING_H
> > +#define SPSC_RING_H
> > +
> > +#include <stdbool.h>
> > +#include <stdint.h>
> > +
> > +#include "openvswitch/compiler.h"
>
> Jacob had mentioned this in the v1 review, I agree, we can remove it.
>

Yeah my bad I did miss that.


>
> > +#include "openvswitch/util.h"
> > +#include "ovs-atomic.h"
> > +
> > +/* Single-Producer, Single-Consumer lock-free ring buffer.
> > + *
> > + * Thread-safety
> > + * =============
> > + *
> > + * Exactly one thread (the producer) may call spsc_ring_push().
> > + * Exactly one thread (the consumer) may call spsc_ring_pop().
> > + * These two threads may be different and may operate concurrently
> > + * without any external synchronization.
> > + *
> > + * Memory ordering: sequential consistency (the default for OVS
> > + * atomic_read/atomic_store) on the read and write indices ensures
> > + * that slot data written by the producer is visible to the consumer
> > + * after it observes the updated write index (and vice versa for
> > + * read index updates freeing slots for reuse).
> > + *
> > + * Slot data is copied in and out by value (memcpy), so this is
> > + * best suited for small, fixed-size structs.
> > + *
> > + * Capacity must be a power of two.  The ring uses unsigned 32-bit
> > + * wraparound arithmetic on read/write indices, which is correct as
> > + * long as capacity is much smaller than 2^32.
> > + */
> > +struct spsc_ring {
> > +    void *buffer;           /* Pre-allocated slot array. */
> > +    size_t esize;           /* Size of each element in bytes. */
> > +    uint32_t mask;          /* capacity - 1 (for power-of-two modulo).
> */
> > +    atomic_uint32_t read;   /* Next slot to consume (advanced by
> consumer). */
> > +    atomic_uint32_t write;  /* Next slot to fill (advanced by
> producer). */
> > +};
> > +
> > +void spsc_ring_init(struct spsc_ring *, uint32_t capacity, size_t
> esize);
> > +void spsc_ring_destroy(struct spsc_ring *);
> > +bool spsc_ring_push(struct spsc_ring *, const void *data);
> > +bool spsc_ring_pop(struct spsc_ring *, void *data);
> > +
> > +/* Pop each element into VAR until the ring is empty. */
> > +#define SPSC_RING_FOR_EACH_POP(RING, VAR)                          \
> > +    for (bool ITER_VAR(VAR) = spsc_ring_pop(RING, &(VAR));         \
> > +         ITER_VAR(VAR);                                             \
>
> Nit: misaligned \
>
> > +         ITER_VAR(VAR) = spsc_ring_pop(RING, &(VAR)))
> > +
> > +#endif /* lib/spsc-ring.h */
> > diff --git a/tests/automake.mk b/tests/automake.mk
> > index 76ef4fda0..fd3e2ccae 100644
> > --- a/tests/automake.mk
> > +++ b/tests/automake.mk
> > @@ -280,6 +280,7 @@ tests_ovstest_SOURCES = \
> >       tests/test-utils.h \
> >       tests/test-ovn.c \
> >       tests/test-sparse-array.c \
> > +     tests/test-spsc-ring.c \
> >       tests/test-vector.c \
> >       controller/test-lflow-cache.c \
> >       controller/test-vif-plug.c \
> > diff --git a/tests/ovn.at b/tests/ovn.at
> > index 522c1c90d..0acb22d68 100644
> > --- a/tests/ovn.at
> > +++ b/tests/ovn.at
> > @@ -2451,6 +2451,20 @@ check ovstest test-sparse-array add
> >  check ovstest test-sparse-array remove-replace
> >  AT_CLEANUP
> >
> > +AT_SETUP([SPSC ring buffer])
> > +check ovstest test-spsc-ring basic
> > +
> > +check ovstest test-spsc-ring fifo
> > +
> > +check ovstest test-spsc-ring full
> > +
> > +check ovstest test-spsc-ring wraparound
> > +
> > +check ovstest test-spsc-ring for-each-pop
> > +
> > +check ovstest test-spsc-ring struct
>
> Nit: the empty lines bother me more than they should but they bother me
> a bit. :D  And the test-sparse-array tests above don't add empty lines
> in between the various checks (although the test-vector ones do have
> empty lines).  Anyhow, I'd remove them.


> > +AT_CLEANUP
> > +
> >  AT_SETUP([Parse MAC])
> >  AT_CHECK([ovstest test-ovn parse-eth-addr 01:02:03:04:05:xx], [1])
> >  AT_CHECK([ovstest test-ovn parse-eth-addr 01:02:03:04:05:06], [0], [dnl
> > diff --git a/tests/test-spsc-ring.c b/tests/test-spsc-ring.c
> > new file mode 100644
> > index 000000000..960a917be
> > --- /dev/null
> > +++ b/tests/test-spsc-ring.c
> > @@ -0,0 +1,224 @@
> > +/* Copyright (c) 2026, Red Hat, Inc.
> > + *
> > + * Licensed under the Apache License, Version 2.0 (the "License");
> > + * you may not use this file except in compliance with the License.
> > + * You may obtain a copy of the License at:
> > + *
> > + *     http://www.apache.org/licenses/LICENSE-2.0
> > + *
> > + * Unless required by applicable law or agreed to in writing, software
> > + * distributed under the License is distributed on an "AS IS" BASIS,
> > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> > + * See the License for the specific language governing permissions and
> > + * limitations under the License.
> > + */
> > +
> > +#include <config.h>
> > +#include <stdint.h>
> > +
> > +#include "tests/ovstest.h"
> > +#include "lib/ovn-util.h"
> > +#include "lib/spsc-ring.h"
> > +
> > +/* Basic push and pop with data integrity check. */
> > +static void
> > +test_basic(struct ovs_cmdl_context *ctx OVS_UNUSED)
> > +{
> > +    struct spsc_ring ring;
> > +    spsc_ring_init(&ring, 16, sizeof(uint32_t));
> > +
> > +    uint32_t val;
> > +
> > +    /* Pop on empty ring returns false. */
> > +    ovs_assert(!spsc_ring_pop(&ring, &val));
> > +
> > +    /* Push one element and pop it. */
> > +    val = 42;
> > +    ovs_assert(spsc_ring_push(&ring, &val));
> > +    val = 0;
> > +    ovs_assert(spsc_ring_pop(&ring, &val));
> > +    ovs_assert(val == 42);
> > +
> > +    /* Ring is empty again. */
> > +    ovs_assert(!spsc_ring_pop(&ring, &val));
> > +
> > +    spsc_ring_destroy(&ring);
> > +}
> > +
> > +/* FIFO ordering: push a sequence, pop and verify order. */
> > +static void
> > +test_fifo(struct ovs_cmdl_context *ctx OVS_UNUSED)
> > +{
> > +    struct spsc_ring ring;
> > +    spsc_ring_init(&ring, 64, sizeof(uint32_t));
> > +
> > +    for (uint32_t i = 0; i < 50; i++) {
> > +        ovs_assert(spsc_ring_push(&ring, &i));
> > +    }
> > +
> > +    for (uint32_t i = 0; i < 50; i++) {
> > +        uint32_t val;
> > +        ovs_assert(spsc_ring_pop(&ring, &val));
> > +        ovs_assert(val == i);
> > +    }
> > +
> > +    uint32_t val;
> > +    ovs_assert(!spsc_ring_pop(&ring, &val));
> > +
> > +    spsc_ring_destroy(&ring);
> > +}
> > +
> > +/* Fill ring to capacity, verify push fails, pop one, push succeeds. */
> > +static void
> > +test_full(struct ovs_cmdl_context *ctx OVS_UNUSED)
> > +{
> > +    struct spsc_ring ring;
> > +    spsc_ring_init(&ring, 8, sizeof(uint32_t));
> > +
> > +    /* Fill all 8 slots. */
> > +    for (uint32_t i = 0; i < 8; i++) {
> > +        ovs_assert(spsc_ring_push(&ring, &i));
> > +    }
> > +
> > +    /* 9th push must fail. */
> > +    uint32_t overflow = 99;
> > +    ovs_assert(!spsc_ring_push(&ring, &overflow));
> > +
> > +    /* Pop one element. */
> > +    uint32_t val;
> > +    ovs_assert(spsc_ring_pop(&ring, &val));
> > +    ovs_assert(val == 0);
> > +
> > +    /* Now push succeeds. */
> > +    ovs_assert(spsc_ring_push(&ring, &overflow));
> > +
> > +    /* Drain and verify: 1..7, then 99. */
> > +    for (uint32_t i = 1; i <= 7; i++) {
> > +        ovs_assert(spsc_ring_pop(&ring, &val));
> > +        ovs_assert(val == i);
> > +    }
> > +    ovs_assert(spsc_ring_pop(&ring, &val));
> > +    ovs_assert(val == 99);
> > +
> > +    ovs_assert(!spsc_ring_pop(&ring, &val));
> > +
> > +    spsc_ring_destroy(&ring);
> > +}
> > +
> > +/* Wraparound: push/pop many times to wrap head/tail past UINT32_MAX. */
> > +static void
> > +test_wraparound(struct ovs_cmdl_context *ctx OVS_UNUSED)
> > +{
> > +    struct spsc_ring ring;
> > +    spsc_ring_init(&ring, 4, sizeof(uint32_t));
> > +
> > +    /* Push and pop one-at-a-time many times to advance read/write.
> > +     * After 2^32 iterations the indices would wrap, but we can't run
> > +     * that many.  Instead, manually set read/write near UINT32_MAX to
> > +     * test the wraparound arithmetic. */
> > +    atomic_store(&ring.read, UINT32_MAX - 2);
> > +    atomic_store(&ring.write, UINT32_MAX - 2);
> > +
> > +    /* Push 4 elements (fills the ring, wrapping tail past UINT32_MAX).
> */
> > +    for (uint32_t i = 0; i < 4; i++) {
> > +        ovs_assert(spsc_ring_push(&ring, &i));
> > +    }
> > +
> > +    /* Ring is full. */
> > +    uint32_t overflow = 99;
> > +    ovs_assert(!spsc_ring_push(&ring, &overflow));
> > +
> > +    /* Pop all 4, verify FIFO order. */
> > +    for (uint32_t i = 0; i < 4; i++) {
> > +        uint32_t val;
> > +        ovs_assert(spsc_ring_pop(&ring, &val));
> > +        ovs_assert(val == i);
> > +    }
> > +
> > +    /* Empty. */
> > +    uint32_t val;
> > +    ovs_assert(!spsc_ring_pop(&ring, &val));
> > +
> > +    spsc_ring_destroy(&ring);
> > +}
> > +
> > +/* Test SPSC_RING_FOR_EACH_POP macro. */
> > +static void
> > +test_for_each_pop(struct ovs_cmdl_context *ctx OVS_UNUSED)
> > +{
> > +    struct spsc_ring ring;
> > +    spsc_ring_init(&ring, 32, sizeof(uint32_t));
> > +
> > +    for (uint32_t i = 0; i < 20; i++) {
> > +        ovs_assert(spsc_ring_push(&ring, &i));
> > +    }
> > +
> > +    uint32_t count = 0;
> > +    uint32_t val;
> > +    SPSC_RING_FOR_EACH_POP (&ring, val) {
> > +        ovs_assert(val == count);
> > +        count++;
> > +    }
> > +    ovs_assert(count == 20);
> > +
> > +    /* Ring is empty after iteration. */
> > +    ovs_assert(!spsc_ring_pop(&ring, &val));
> > +
> > +    spsc_ring_destroy(&ring);
> > +}
> > +
> > +/* Test with a multi-field struct to verify memcpy correctness. */
> > +struct test_element {
> > +    uint64_t key;
> > +    uint32_t value;
> > +    uint8_t  tag;
> > +};
> > +
> > +static void
> > +test_struct(struct ovs_cmdl_context *ctx OVS_UNUSED)
> > +{
> > +    struct spsc_ring ring;
> > +    spsc_ring_init(&ring, 16, sizeof(struct test_element));
> > +
> > +    for (uint64_t i = 0; i < 10; i++) {
> > +        struct test_element elem = {
> > +            .key = i * 1000,
> > +            .value = (uint32_t) i,
> > +            .tag = (uint8_t) (i & 0xff),
> > +        };
> > +        ovs_assert(spsc_ring_push(&ring, &elem));
> > +    }
> > +
> > +    struct test_element out;
> > +    uint64_t count = 0;
> > +    SPSC_RING_FOR_EACH_POP (&ring, out) {
> > +        ovs_assert(out.key == count * 1000);
> > +        ovs_assert(out.value == (uint32_t) count);
> > +        ovs_assert(out.tag == (uint8_t) (count & 0xff));
>
> Nit: Arguably, this is not enough to verify memcpy correctness.  The
> padding (24 bits in this case) also gets copied but we're not really
> checking that here.
>
> Any reason to use a struct with padding?
>

Ah not really the comment is just wrong. As discussed offline, I have
adjusted the comment.


>
> > +        count++;
> > +    }
> > +    ovs_assert(count == 10);
> > +
> > +    spsc_ring_destroy(&ring);
> > +}
> > +
> > +static void
> > +test_spsc_ring_main(int argc OVS_UNUSED, char *argv[] OVS_UNUSED)
> > +{
> > +    ovn_set_program_name(argv[0]);
> > +    static const struct ovs_cmdl_command commands[] = {
> > +        {"basic", NULL, 0, 0, test_basic, OVS_RO},
> > +        {"fifo", NULL, 0, 0, test_fifo, OVS_RO},
> > +        {"full", NULL, 0, 0, test_full, OVS_RO},
> > +        {"wraparound", NULL, 0, 0, test_wraparound, OVS_RO},
> > +        {"for-each-pop", NULL, 0, 0, test_for_each_pop, OVS_RO},
> > +        {"struct", NULL, 0, 0, test_struct, OVS_RO},
> > +        {NULL, NULL, 0, 0, NULL, OVS_RO},
> > +    };
> > +    struct ovs_cmdl_context ctx;
> > +    ctx.argc = argc - 1;
> > +    ctx.argv = argv + 1;
> > +    ovs_cmdl_run_command(&ctx, commands);
> > +}
> > +
> > +OVSTEST_REGISTER("test-spsc-ring", test_spsc_ring_main);
>
> With the minor comments I had above addressed, this looks correct to me:
>
> Acked-by: Dumitru Ceara <[email protected]>
>
> Regards,
> Dumitru
>
>
>
With the nits fixed applied to main and 26.03.

Regards,
Ales
_______________________________________________
dev mailing list
[email protected]
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to