On Tue, Jun 16, 2026 at 4:36 PM Dumitru Ceara <[email protected]> wrote:
> On 6/16/26 11:48 AM, Ales Musil via dev wrote: > > Add a Single-Producer, Single-Consumer lock-free ring buffer > > to lib/ for use as a building block in removing pinctrl_mutex > > contention from packet-in processing. > > > > The ring buffer uses a pre-allocated array with atomic read > > and write indices for synchronization. Data is copied in > > and out by value (memcpy), making it well suited for small, > > fixed-size structs. Capacity must be a power of two; the > > implementation uses unsigned 32-bit wraparound arithmetic > > for correct index handling. > > > > Provide SPSC_RING_FOR_EACH_POP() for idiomatic drain loops. > > > > Include unit tests covering basic push/pop, FIFO ordering, > > full ring behavior, index wraparound, the FOR_EACH_POP > > macro, and multi-field struct elements. > > > > Assisted-by: Claude Opus 4.6, OpenCode > > Signed-off-by: Ales Musil <[email protected]> > > --- > > Hi Ales, > > Thanks for the patch! > Thank you for the review Dumitru. > > > v2: Rebase on top of latest main. > > Remove the SPSC_RING_INIT macro. > > Remove the spsc_ring_len function. > > --- > > lib/automake.mk | 2 + > > lib/spsc-ring.c | 79 +++++++++++++++ > > lib/spsc-ring.h | 68 +++++++++++++ > > tests/automake.mk | 1 + > > tests/ovn.at | 14 +++ > > tests/test-spsc-ring.c | 224 +++++++++++++++++++++++++++++++++++++++++ > > 6 files changed, 388 insertions(+) > > create mode 100644 lib/spsc-ring.c > > create mode 100644 lib/spsc-ring.h > > create mode 100644 tests/test-spsc-ring.c > > > > diff --git a/lib/automake.mk b/lib/automake.mk > > index 25741cbdf..e1c644bdd 100644 > > --- a/lib/automake.mk > > +++ b/lib/automake.mk > > @@ -50,6 +50,8 @@ lib_libovn_la_SOURCES = \ > > lib/lb.h \ > > lib/sparse-array.c \ > > lib/sparse-array.h \ > > + lib/spsc-ring.c \ > > + lib/spsc-ring.h \ > > lib/stopwatch-names.h \ > > lib/vec.c \ > > lib/vec.h \ > > diff --git a/lib/spsc-ring.c b/lib/spsc-ring.c > > new file mode 100644 > > index 000000000..d174f56b0 > > --- /dev/null > > +++ b/lib/spsc-ring.c > > @@ -0,0 +1,79 @@ > > +/* Copyright (c) 2026, Red Hat, Inc. > > + * > > + * Licensed under the Apache License, Version 2.0 (the "License"); > > + * you may not use this file except in compliance with the License. > > + * You may obtain a copy of the License at: > > + * > > + * http://www.apache.org/licenses/LICENSE-2.0 > > + * > > + * Unless required by applicable law or agreed to in writing, software > > + * distributed under the License is distributed on an "AS IS" BASIS, > > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > > + * See the License for the specific language governing permissions and > > + * limitations under the License. > > + */ > > + > > +#include <config.h> > > +#include <string.h> > > + > > +#include "spsc-ring.h" > > +#include "util.h" > > + > > +void > > +spsc_ring_init(struct spsc_ring *r, uint32_t capacity, size_t esize) > > +{ > > + ovs_assert(IS_POW2(capacity)); > > This is not documented anywhere AFAICT. Maybe we could add a comment > above? > It is mentioned in the .h file, but I have also added a comment here. > > > + r->buffer = xmalloc(capacity * esize); > > + r->esize = esize; > > + r->mask = capacity - 1; > > + atomic_init(&r->read, 0); > > + atomic_init(&r->write, 0); > > +} > > + > > +void > > +spsc_ring_destroy(struct spsc_ring *r) > > +{ > > + free(r->buffer); > > + r->buffer = NULL; > > +} > > + > > +/* Producer: copy 'data' into the next available slot. > > + * Returns true on success, false if the ring is full. */ > > +bool > > +spsc_ring_push(struct spsc_ring *r, const void *data) > > +{ > > + uint32_t wr, rd; > > + > > + atomic_read(&r->write, &wr); > > + atomic_read(&r->read, &rd); > > + > > + if (wr - rd > r->mask) { > > + return false; > > + } > > + > > + memcpy((uint8_t *) r->buffer + (wr & r->mask) * r->esize, > > + data, r->esize); > > + atomic_store(&r->write, wr + 1); > > + return true; > > +} > > + > > +/* Consumer: copy the oldest slot's data into 'data'. > > + * Returns true on success, false if the ring is empty. */ > > +bool > > +spsc_ring_pop(struct spsc_ring *r, void *data) > > +{ > > + uint32_t rd, wr; > > + > > + atomic_read(&r->read, &rd); > > + atomic_read(&r->write, &wr); > > + > > + if (rd == wr) { > > + return false; > > + } > > + > > + memcpy(data, > > + (uint8_t *) r->buffer + (rd & r->mask) * r->esize, > > + r->esize); > > + atomic_store(&r->read, rd + 1); > > + return true; > > +} > > diff --git a/lib/spsc-ring.h b/lib/spsc-ring.h > > new file mode 100644 > > index 000000000..542733609 > > --- /dev/null > > +++ b/lib/spsc-ring.h > > @@ -0,0 +1,68 @@ > > +/* Copyright (c) 2026, Red Hat, Inc. > > + * > > + * Licensed under the Apache License, Version 2.0 (the "License"); > > + * you may not use this file except in compliance with the License. > > + * You may obtain a copy of the License at: > > + * > > + * http://www.apache.org/licenses/LICENSE-2.0 > > + * > > + * Unless required by applicable law or agreed to in writing, software > > + * distributed under the License is distributed on an "AS IS" BASIS, > > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > > + * See the License for the specific language governing permissions and > > + * limitations under the License. > > + */ > > + > > +#ifndef SPSC_RING_H > > +#define SPSC_RING_H > > + > > +#include <stdbool.h> > > +#include <stdint.h> > > + > > +#include "openvswitch/compiler.h" > > Jacob had mentioned this in the v1 review, I agree, we can remove it. > Yeah my bad I did miss that. > > > +#include "openvswitch/util.h" > > +#include "ovs-atomic.h" > > + > > +/* Single-Producer, Single-Consumer lock-free ring buffer. > > + * > > + * Thread-safety > > + * ============= > > + * > > + * Exactly one thread (the producer) may call spsc_ring_push(). > > + * Exactly one thread (the consumer) may call spsc_ring_pop(). > > + * These two threads may be different and may operate concurrently > > + * without any external synchronization. > > + * > > + * Memory ordering: sequential consistency (the default for OVS > > + * atomic_read/atomic_store) on the read and write indices ensures > > + * that slot data written by the producer is visible to the consumer > > + * after it observes the updated write index (and vice versa for > > + * read index updates freeing slots for reuse). > > + * > > + * Slot data is copied in and out by value (memcpy), so this is > > + * best suited for small, fixed-size structs. > > + * > > + * Capacity must be a power of two. The ring uses unsigned 32-bit > > + * wraparound arithmetic on read/write indices, which is correct as > > + * long as capacity is much smaller than 2^32. > > + */ > > +struct spsc_ring { > > + void *buffer; /* Pre-allocated slot array. */ > > + size_t esize; /* Size of each element in bytes. */ > > + uint32_t mask; /* capacity - 1 (for power-of-two modulo). > */ > > + atomic_uint32_t read; /* Next slot to consume (advanced by > consumer). */ > > + atomic_uint32_t write; /* Next slot to fill (advanced by > producer). */ > > +}; > > + > > +void spsc_ring_init(struct spsc_ring *, uint32_t capacity, size_t > esize); > > +void spsc_ring_destroy(struct spsc_ring *); > > +bool spsc_ring_push(struct spsc_ring *, const void *data); > > +bool spsc_ring_pop(struct spsc_ring *, void *data); > > + > > +/* Pop each element into VAR until the ring is empty. */ > > +#define SPSC_RING_FOR_EACH_POP(RING, VAR) \ > > + for (bool ITER_VAR(VAR) = spsc_ring_pop(RING, &(VAR)); \ > > + ITER_VAR(VAR); \ > > Nit: misaligned \ > > > + ITER_VAR(VAR) = spsc_ring_pop(RING, &(VAR))) > > + > > +#endif /* lib/spsc-ring.h */ > > diff --git a/tests/automake.mk b/tests/automake.mk > > index 76ef4fda0..fd3e2ccae 100644 > > --- a/tests/automake.mk > > +++ b/tests/automake.mk > > @@ -280,6 +280,7 @@ tests_ovstest_SOURCES = \ > > tests/test-utils.h \ > > tests/test-ovn.c \ > > tests/test-sparse-array.c \ > > + tests/test-spsc-ring.c \ > > tests/test-vector.c \ > > controller/test-lflow-cache.c \ > > controller/test-vif-plug.c \ > > diff --git a/tests/ovn.at b/tests/ovn.at > > index 522c1c90d..0acb22d68 100644 > > --- a/tests/ovn.at > > +++ b/tests/ovn.at > > @@ -2451,6 +2451,20 @@ check ovstest test-sparse-array add > > check ovstest test-sparse-array remove-replace > > AT_CLEANUP > > > > +AT_SETUP([SPSC ring buffer]) > > +check ovstest test-spsc-ring basic > > + > > +check ovstest test-spsc-ring fifo > > + > > +check ovstest test-spsc-ring full > > + > > +check ovstest test-spsc-ring wraparound > > + > > +check ovstest test-spsc-ring for-each-pop > > + > > +check ovstest test-spsc-ring struct > > Nit: the empty lines bother me more than they should but they bother me > a bit. :D And the test-sparse-array tests above don't add empty lines > in between the various checks (although the test-vector ones do have > empty lines). Anyhow, I'd remove them. > > +AT_CLEANUP > > + > > AT_SETUP([Parse MAC]) > > AT_CHECK([ovstest test-ovn parse-eth-addr 01:02:03:04:05:xx], [1]) > > AT_CHECK([ovstest test-ovn parse-eth-addr 01:02:03:04:05:06], [0], [dnl > > diff --git a/tests/test-spsc-ring.c b/tests/test-spsc-ring.c > > new file mode 100644 > > index 000000000..960a917be > > --- /dev/null > > +++ b/tests/test-spsc-ring.c > > @@ -0,0 +1,224 @@ > > +/* Copyright (c) 2026, Red Hat, Inc. > > + * > > + * Licensed under the Apache License, Version 2.0 (the "License"); > > + * you may not use this file except in compliance with the License. > > + * You may obtain a copy of the License at: > > + * > > + * http://www.apache.org/licenses/LICENSE-2.0 > > + * > > + * Unless required by applicable law or agreed to in writing, software > > + * distributed under the License is distributed on an "AS IS" BASIS, > > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > implied. > > + * See the License for the specific language governing permissions and > > + * limitations under the License. > > + */ > > + > > +#include <config.h> > > +#include <stdint.h> > > + > > +#include "tests/ovstest.h" > > +#include "lib/ovn-util.h" > > +#include "lib/spsc-ring.h" > > + > > +/* Basic push and pop with data integrity check. */ > > +static void > > +test_basic(struct ovs_cmdl_context *ctx OVS_UNUSED) > > +{ > > + struct spsc_ring ring; > > + spsc_ring_init(&ring, 16, sizeof(uint32_t)); > > + > > + uint32_t val; > > + > > + /* Pop on empty ring returns false. */ > > + ovs_assert(!spsc_ring_pop(&ring, &val)); > > + > > + /* Push one element and pop it. */ > > + val = 42; > > + ovs_assert(spsc_ring_push(&ring, &val)); > > + val = 0; > > + ovs_assert(spsc_ring_pop(&ring, &val)); > > + ovs_assert(val == 42); > > + > > + /* Ring is empty again. */ > > + ovs_assert(!spsc_ring_pop(&ring, &val)); > > + > > + spsc_ring_destroy(&ring); > > +} > > + > > +/* FIFO ordering: push a sequence, pop and verify order. */ > > +static void > > +test_fifo(struct ovs_cmdl_context *ctx OVS_UNUSED) > > +{ > > + struct spsc_ring ring; > > + spsc_ring_init(&ring, 64, sizeof(uint32_t)); > > + > > + for (uint32_t i = 0; i < 50; i++) { > > + ovs_assert(spsc_ring_push(&ring, &i)); > > + } > > + > > + for (uint32_t i = 0; i < 50; i++) { > > + uint32_t val; > > + ovs_assert(spsc_ring_pop(&ring, &val)); > > + ovs_assert(val == i); > > + } > > + > > + uint32_t val; > > + ovs_assert(!spsc_ring_pop(&ring, &val)); > > + > > + spsc_ring_destroy(&ring); > > +} > > + > > +/* Fill ring to capacity, verify push fails, pop one, push succeeds. */ > > +static void > > +test_full(struct ovs_cmdl_context *ctx OVS_UNUSED) > > +{ > > + struct spsc_ring ring; > > + spsc_ring_init(&ring, 8, sizeof(uint32_t)); > > + > > + /* Fill all 8 slots. */ > > + for (uint32_t i = 0; i < 8; i++) { > > + ovs_assert(spsc_ring_push(&ring, &i)); > > + } > > + > > + /* 9th push must fail. */ > > + uint32_t overflow = 99; > > + ovs_assert(!spsc_ring_push(&ring, &overflow)); > > + > > + /* Pop one element. */ > > + uint32_t val; > > + ovs_assert(spsc_ring_pop(&ring, &val)); > > + ovs_assert(val == 0); > > + > > + /* Now push succeeds. */ > > + ovs_assert(spsc_ring_push(&ring, &overflow)); > > + > > + /* Drain and verify: 1..7, then 99. */ > > + for (uint32_t i = 1; i <= 7; i++) { > > + ovs_assert(spsc_ring_pop(&ring, &val)); > > + ovs_assert(val == i); > > + } > > + ovs_assert(spsc_ring_pop(&ring, &val)); > > + ovs_assert(val == 99); > > + > > + ovs_assert(!spsc_ring_pop(&ring, &val)); > > + > > + spsc_ring_destroy(&ring); > > +} > > + > > +/* Wraparound: push/pop many times to wrap head/tail past UINT32_MAX. */ > > +static void > > +test_wraparound(struct ovs_cmdl_context *ctx OVS_UNUSED) > > +{ > > + struct spsc_ring ring; > > + spsc_ring_init(&ring, 4, sizeof(uint32_t)); > > + > > + /* Push and pop one-at-a-time many times to advance read/write. > > + * After 2^32 iterations the indices would wrap, but we can't run > > + * that many. Instead, manually set read/write near UINT32_MAX to > > + * test the wraparound arithmetic. */ > > + atomic_store(&ring.read, UINT32_MAX - 2); > > + atomic_store(&ring.write, UINT32_MAX - 2); > > + > > + /* Push 4 elements (fills the ring, wrapping tail past UINT32_MAX). > */ > > + for (uint32_t i = 0; i < 4; i++) { > > + ovs_assert(spsc_ring_push(&ring, &i)); > > + } > > + > > + /* Ring is full. */ > > + uint32_t overflow = 99; > > + ovs_assert(!spsc_ring_push(&ring, &overflow)); > > + > > + /* Pop all 4, verify FIFO order. */ > > + for (uint32_t i = 0; i < 4; i++) { > > + uint32_t val; > > + ovs_assert(spsc_ring_pop(&ring, &val)); > > + ovs_assert(val == i); > > + } > > + > > + /* Empty. */ > > + uint32_t val; > > + ovs_assert(!spsc_ring_pop(&ring, &val)); > > + > > + spsc_ring_destroy(&ring); > > +} > > + > > +/* Test SPSC_RING_FOR_EACH_POP macro. */ > > +static void > > +test_for_each_pop(struct ovs_cmdl_context *ctx OVS_UNUSED) > > +{ > > + struct spsc_ring ring; > > + spsc_ring_init(&ring, 32, sizeof(uint32_t)); > > + > > + for (uint32_t i = 0; i < 20; i++) { > > + ovs_assert(spsc_ring_push(&ring, &i)); > > + } > > + > > + uint32_t count = 0; > > + uint32_t val; > > + SPSC_RING_FOR_EACH_POP (&ring, val) { > > + ovs_assert(val == count); > > + count++; > > + } > > + ovs_assert(count == 20); > > + > > + /* Ring is empty after iteration. */ > > + ovs_assert(!spsc_ring_pop(&ring, &val)); > > + > > + spsc_ring_destroy(&ring); > > +} > > + > > +/* Test with a multi-field struct to verify memcpy correctness. */ > > +struct test_element { > > + uint64_t key; > > + uint32_t value; > > + uint8_t tag; > > +}; > > + > > +static void > > +test_struct(struct ovs_cmdl_context *ctx OVS_UNUSED) > > +{ > > + struct spsc_ring ring; > > + spsc_ring_init(&ring, 16, sizeof(struct test_element)); > > + > > + for (uint64_t i = 0; i < 10; i++) { > > + struct test_element elem = { > > + .key = i * 1000, > > + .value = (uint32_t) i, > > + .tag = (uint8_t) (i & 0xff), > > + }; > > + ovs_assert(spsc_ring_push(&ring, &elem)); > > + } > > + > > + struct test_element out; > > + uint64_t count = 0; > > + SPSC_RING_FOR_EACH_POP (&ring, out) { > > + ovs_assert(out.key == count * 1000); > > + ovs_assert(out.value == (uint32_t) count); > > + ovs_assert(out.tag == (uint8_t) (count & 0xff)); > > Nit: Arguably, this is not enough to verify memcpy correctness. The > padding (24 bits in this case) also gets copied but we're not really > checking that here. > > Any reason to use a struct with padding? > Ah not really the comment is just wrong. As discussed offline, I have adjusted the comment. > > > + count++; > > + } > > + ovs_assert(count == 10); > > + > > + spsc_ring_destroy(&ring); > > +} > > + > > +static void > > +test_spsc_ring_main(int argc OVS_UNUSED, char *argv[] OVS_UNUSED) > > +{ > > + ovn_set_program_name(argv[0]); > > + static const struct ovs_cmdl_command commands[] = { > > + {"basic", NULL, 0, 0, test_basic, OVS_RO}, > > + {"fifo", NULL, 0, 0, test_fifo, OVS_RO}, > > + {"full", NULL, 0, 0, test_full, OVS_RO}, > > + {"wraparound", NULL, 0, 0, test_wraparound, OVS_RO}, > > + {"for-each-pop", NULL, 0, 0, test_for_each_pop, OVS_RO}, > > + {"struct", NULL, 0, 0, test_struct, OVS_RO}, > > + {NULL, NULL, 0, 0, NULL, OVS_RO}, > > + }; > > + struct ovs_cmdl_context ctx; > > + ctx.argc = argc - 1; > > + ctx.argv = argv + 1; > > + ovs_cmdl_run_command(&ctx, commands); > > +} > > + > > +OVSTEST_REGISTER("test-spsc-ring", test_spsc_ring_main); > > With the minor comments I had above addressed, this looks correct to me: > > Acked-by: Dumitru Ceara <[email protected]> > > Regards, > Dumitru > > > With the nits fixed applied to main and 26.03. Regards, Ales _______________________________________________ dev mailing list [email protected] https://mail.openvswitch.org/mailman/listinfo/ovs-dev
