Add a Single-Producer, Single-Consumer lock-free ring buffer to lib/ for use as a building block in removing pinctrl_mutex contention from packet-in processing.
The ring buffer uses a pre-allocated array with atomic read and write indices for synchronization. Data is copied in and out by value (memcpy), making it well suited for small, fixed-size structs. Capacity must be a power of two; the implementation uses unsigned 32-bit wraparound arithmetic for correct index handling. Provide SPSC_RING_INIT() for compile-time capacity checking and SPSC_RING_FOR_EACH_POP() for idiomatic drain loops. Include unit tests covering basic push/pop, FIFO ordering, full ring behavior, index wraparound, the FOR_EACH_POP macro, and multi-field struct elements. Assisted-by: Claude Opus 4.6, OpenCode Signed-off-by: Ales Musil <[email protected]> --- lib/automake.mk | 2 + lib/spsc-ring.c | 91 +++++++++++++++ lib/spsc-ring.h | 80 +++++++++++++ tests/automake.mk | 1 + tests/ovn.at | 16 +++ tests/test-spsc-ring.c | 256 +++++++++++++++++++++++++++++++++++++++++ 6 files changed, 446 insertions(+) create mode 100644 lib/spsc-ring.c create mode 100644 lib/spsc-ring.h create mode 100644 tests/test-spsc-ring.c diff --git a/lib/automake.mk b/lib/automake.mk index 25741cbdf..e1c644bdd 100644 --- a/lib/automake.mk +++ b/lib/automake.mk @@ -50,6 +50,8 @@ lib_libovn_la_SOURCES = \ lib/lb.h \ lib/sparse-array.c \ lib/sparse-array.h \ + lib/spsc-ring.c \ + lib/spsc-ring.h \ lib/stopwatch-names.h \ lib/vec.c \ lib/vec.h \ diff --git a/lib/spsc-ring.c b/lib/spsc-ring.c new file mode 100644 index 000000000..b5aa462b2 --- /dev/null +++ b/lib/spsc-ring.c @@ -0,0 +1,91 @@ +/* Copyright (c) 2026, Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <config.h> +#include <string.h> + +#include "spsc-ring.h" +#include "util.h" + +void +spsc_ring_init(struct spsc_ring *r, size_t capacity, size_t esize) +{ + ovs_assert(IS_POW2(capacity)); + r->buffer = xmalloc(capacity * esize); + r->esize = esize; + r->mask = capacity - 1; + atomic_init(&r->read, 0); + atomic_init(&r->write, 0); +} + +void +spsc_ring_destroy(struct spsc_ring *r) +{ + free(r->buffer); + r->buffer = NULL; +} + +/* Producer: copy 'data' into the next available slot. + * Returns true on success, false if the ring is full. */ +bool +spsc_ring_push(struct spsc_ring *r, const void *data) +{ + uint32_t wr, rd; + + atomic_read(&r->write, &wr); + atomic_read(&r->read, &rd); + + if (wr - rd > r->mask) { + return false; + } + + memcpy((uint8_t *) r->buffer + (wr & r->mask) * r->esize, + data, r->esize); + atomic_store(&r->write, wr + 1); + return true; +} + +/* Consumer: copy the oldest slot's data into 'data'. + * Returns true on success, false if the ring is empty. */ +bool +spsc_ring_pop(struct spsc_ring *r, void *data) +{ + uint32_t rd, wr; + + atomic_read(&r->read, &rd); + atomic_read(&r->write, &wr); + + if (rd == wr) { + return false; + } + + memcpy(data, + (uint8_t *) r->buffer + (rd & r->mask) * r->esize, + r->esize); + atomic_store(&r->read, rd + 1); + return true; +} + +/* Returns the number of elements currently in the ring. */ +uint32_t +spsc_ring_len(const struct spsc_ring *r) +{ + uint32_t rd, wr; + + atomic_read(&CONST_CAST(struct spsc_ring *, r)->read, &rd); + atomic_read(&CONST_CAST(struct spsc_ring *, r)->write, &wr); + + return wr - rd; +} diff --git a/lib/spsc-ring.h b/lib/spsc-ring.h new file mode 100644 index 000000000..a990962ad --- /dev/null +++ b/lib/spsc-ring.h @@ -0,0 +1,80 @@ +/* Copyright (c) 2026, Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef SPSC_RING_H +#define SPSC_RING_H + +#include <stdbool.h> +#include <stdint.h> + +#include "ovs-atomic.h" +#include "openvswitch/compiler.h" +#include "openvswitch/util.h" + +/* Single-Producer, Single-Consumer lock-free ring buffer. + * + * Thread-safety + * ============= + * + * Exactly one thread (the producer) may call spsc_ring_push(). + * Exactly one thread (the consumer) may call spsc_ring_pop(). + * These two threads may be different and may operate concurrently + * without any external synchronization. + * + * Memory ordering: sequential consistency (the default for OVS + * atomic_read/atomic_store) on the read and write indices ensures + * that slot data written by the producer is visible to the consumer + * after it observes the updated write index (and vice versa for + * read index updates freeing slots for reuse). + * + * Slot data is copied in and out by value (memcpy), so this is + * best suited for small, fixed-size structs. + * + * Capacity must be a power of two. The ring uses unsigned 32-bit + * wraparound arithmetic on read/write indices, which is correct as + * long as capacity is much smaller than 2^32. + */ +struct spsc_ring { + void *buffer; /* Pre-allocated slot array. */ + size_t esize; /* Size of each element in bytes. */ + uint32_t mask; /* capacity - 1 (for power-of-two modulo). */ + atomic_uint32_t read; /* Next slot to consume (advanced by consumer). */ + atomic_uint32_t write; /* Next slot to fill (advanced by producer). */ +}; + +void spsc_ring_init(struct spsc_ring *, size_t capacity, size_t esize); +void spsc_ring_destroy(struct spsc_ring *); +bool spsc_ring_push(struct spsc_ring *, const void *data); +bool spsc_ring_pop(struct spsc_ring *, void *data); +uint32_t spsc_ring_len(const struct spsc_ring *); + +/* Initialize a ring with CAPACITY slots of TYPE. + * CAPACITY must be a power of two (compile-time assertion). */ +#define SPSC_RING_INIT(RING, CAPACITY, TYPE) \ + do { \ + BUILD_ASSERT_DECL(IS_POW2(CAPACITY)); \ + spsc_ring_init(RING, CAPACITY, sizeof(TYPE)); \ + } while (0) + +/* Pop each element into VAR until the ring is empty. + * + * Uses ITER_VAR() to derive a unique loop variable name from VAR, + * avoiding name clashes in nested loops. */ +#define SPSC_RING_FOR_EACH_POP(RING, VAR) \ + for (bool ITER_VAR(VAR) = spsc_ring_pop(RING, &(VAR)); \ + ITER_VAR(VAR); \ + ITER_VAR(VAR) = spsc_ring_pop(RING, &(VAR))) + +#endif /* lib/spsc-ring.h */ diff --git a/tests/automake.mk b/tests/automake.mk index 76ef4fda0..fd3e2ccae 100644 --- a/tests/automake.mk +++ b/tests/automake.mk @@ -280,6 +280,7 @@ tests_ovstest_SOURCES = \ tests/test-utils.h \ tests/test-ovn.c \ tests/test-sparse-array.c \ + tests/test-spsc-ring.c \ tests/test-vector.c \ controller/test-lflow-cache.c \ controller/test-vif-plug.c \ diff --git a/tests/ovn.at b/tests/ovn.at index 522c1c90d..c70aa2dfe 100644 --- a/tests/ovn.at +++ b/tests/ovn.at @@ -2451,6 +2451,22 @@ check ovstest test-sparse-array add check ovstest test-sparse-array remove-replace AT_CLEANUP +AT_SETUP([SPSC ring buffer]) +check ovstest test-spsc-ring basic + +check ovstest test-spsc-ring fifo + +check ovstest test-spsc-ring full + +check ovstest test-spsc-ring wraparound + +check ovstest test-spsc-ring for-each-pop + +check ovstest test-spsc-ring len + +check ovstest test-spsc-ring struct +AT_CLEANUP + AT_SETUP([Parse MAC]) AT_CHECK([ovstest test-ovn parse-eth-addr 01:02:03:04:05:xx], [1]) AT_CHECK([ovstest test-ovn parse-eth-addr 01:02:03:04:05:06], [0], [dnl diff --git a/tests/test-spsc-ring.c b/tests/test-spsc-ring.c new file mode 100644 index 000000000..51375bbf5 --- /dev/null +++ b/tests/test-spsc-ring.c @@ -0,0 +1,256 @@ +/* Copyright (c) 2026, Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <config.h> +#include <stdint.h> + +#include "tests/ovstest.h" +#include "lib/ovn-util.h" +#include "lib/spsc-ring.h" + +/* Basic push and pop with data integrity check. */ +static void +test_basic(struct ovs_cmdl_context *ctx OVS_UNUSED) +{ + struct spsc_ring ring; + SPSC_RING_INIT(&ring, 16, uint32_t); + + uint32_t val; + + /* Pop on empty ring returns false. */ + ovs_assert(!spsc_ring_pop(&ring, &val)); + + /* Push one element and pop it. */ + val = 42; + ovs_assert(spsc_ring_push(&ring, &val)); + val = 0; + ovs_assert(spsc_ring_pop(&ring, &val)); + ovs_assert(val == 42); + + /* Ring is empty again. */ + ovs_assert(!spsc_ring_pop(&ring, &val)); + + spsc_ring_destroy(&ring); +} + +/* FIFO ordering: push a sequence, pop and verify order. */ +static void +test_fifo(struct ovs_cmdl_context *ctx OVS_UNUSED) +{ + struct spsc_ring ring; + SPSC_RING_INIT(&ring, 64, uint32_t); + + for (uint32_t i = 0; i < 50; i++) { + ovs_assert(spsc_ring_push(&ring, &i)); + } + + for (uint32_t i = 0; i < 50; i++) { + uint32_t val; + ovs_assert(spsc_ring_pop(&ring, &val)); + ovs_assert(val == i); + } + + uint32_t val; + ovs_assert(!spsc_ring_pop(&ring, &val)); + + spsc_ring_destroy(&ring); +} + +/* Fill ring to capacity, verify push fails, pop one, push succeeds. */ +static void +test_full(struct ovs_cmdl_context *ctx OVS_UNUSED) +{ + struct spsc_ring ring; + SPSC_RING_INIT(&ring, 8, uint32_t); + + /* Fill all 8 slots. */ + for (uint32_t i = 0; i < 8; i++) { + ovs_assert(spsc_ring_push(&ring, &i)); + } + + /* 9th push must fail. */ + uint32_t overflow = 99; + ovs_assert(!spsc_ring_push(&ring, &overflow)); + + /* Pop one element. */ + uint32_t val; + ovs_assert(spsc_ring_pop(&ring, &val)); + ovs_assert(val == 0); + + /* Now push succeeds. */ + ovs_assert(spsc_ring_push(&ring, &overflow)); + + /* Drain and verify: 1..7, then 99. */ + for (uint32_t i = 1; i <= 7; i++) { + ovs_assert(spsc_ring_pop(&ring, &val)); + ovs_assert(val == i); + } + ovs_assert(spsc_ring_pop(&ring, &val)); + ovs_assert(val == 99); + + ovs_assert(!spsc_ring_pop(&ring, &val)); + + spsc_ring_destroy(&ring); +} + +/* Wraparound: push/pop many times to wrap head/tail past UINT32_MAX. */ +static void +test_wraparound(struct ovs_cmdl_context *ctx OVS_UNUSED) +{ + struct spsc_ring ring; + SPSC_RING_INIT(&ring, 4, uint32_t); + + /* Push and pop one-at-a-time many times to advance read/write. + * After 2^32 iterations the indices would wrap, but we can't run + * that many. Instead, manually set read/write near UINT32_MAX to + * test the wraparound arithmetic. */ + atomic_store(&ring.read, UINT32_MAX - 2); + atomic_store(&ring.write, UINT32_MAX - 2); + + /* Push 4 elements (fills the ring, wrapping tail past UINT32_MAX). */ + for (uint32_t i = 0; i < 4; i++) { + ovs_assert(spsc_ring_push(&ring, &i)); + } + + /* Ring is full. */ + uint32_t overflow = 99; + ovs_assert(!spsc_ring_push(&ring, &overflow)); + + /* Pop all 4, verify FIFO order. */ + for (uint32_t i = 0; i < 4; i++) { + uint32_t val; + ovs_assert(spsc_ring_pop(&ring, &val)); + ovs_assert(val == i); + } + + /* Empty. */ + uint32_t val; + ovs_assert(!spsc_ring_pop(&ring, &val)); + + spsc_ring_destroy(&ring); +} + +/* Test SPSC_RING_FOR_EACH_POP macro. */ +static void +test_for_each_pop(struct ovs_cmdl_context *ctx OVS_UNUSED) +{ + struct spsc_ring ring; + SPSC_RING_INIT(&ring, 32, uint32_t); + + for (uint32_t i = 0; i < 20; i++) { + ovs_assert(spsc_ring_push(&ring, &i)); + } + + uint32_t count = 0; + uint32_t val; + SPSC_RING_FOR_EACH_POP (&ring, val) { + ovs_assert(val == count); + count++; + } + ovs_assert(count == 20); + + /* Ring is empty after iteration. */ + ovs_assert(!spsc_ring_pop(&ring, &val)); + + spsc_ring_destroy(&ring); +} + +/* Verify spsc_ring_len() returns the correct count. */ +static void +test_len(struct ovs_cmdl_context *ctx OVS_UNUSED) +{ + struct spsc_ring ring; + SPSC_RING_INIT(&ring, 16, uint32_t); + + /* Empty ring has length 0. */ + ovs_assert(spsc_ring_len(&ring) == 0); + + /* Push 5 elements, length should be 5. */ + for (uint32_t i = 0; i < 5; i++) { + ovs_assert(spsc_ring_push(&ring, &i)); + } + ovs_assert(spsc_ring_len(&ring) == 5); + + /* Pop 2, length should be 3. */ + uint32_t val; + ovs_assert(spsc_ring_pop(&ring, &val)); + ovs_assert(spsc_ring_pop(&ring, &val)); + ovs_assert(spsc_ring_len(&ring) == 3); + + /* Pop remaining 3, length should be 0. */ + for (uint32_t i = 0; i < 3; i++) { + ovs_assert(spsc_ring_pop(&ring, &val)); + } + ovs_assert(spsc_ring_len(&ring) == 0); + + spsc_ring_destroy(&ring); +} + +/* Test with a multi-field struct to verify memcpy correctness. */ +struct test_element { + uint64_t key; + uint32_t value; + uint8_t tag; +}; + +static void +test_struct(struct ovs_cmdl_context *ctx OVS_UNUSED) +{ + struct spsc_ring ring; + SPSC_RING_INIT(&ring, 16, struct test_element); + + for (uint64_t i = 0; i < 10; i++) { + struct test_element elem = { + .key = i * 1000, + .value = (uint32_t) i, + .tag = (uint8_t) (i & 0xff), + }; + ovs_assert(spsc_ring_push(&ring, &elem)); + } + + struct test_element out; + uint64_t count = 0; + SPSC_RING_FOR_EACH_POP (&ring, out) { + ovs_assert(out.key == count * 1000); + ovs_assert(out.value == (uint32_t) count); + ovs_assert(out.tag == (uint8_t) (count & 0xff)); + count++; + } + ovs_assert(count == 10); + + spsc_ring_destroy(&ring); +} + +static void +test_spsc_ring_main(int argc OVS_UNUSED, char *argv[] OVS_UNUSED) +{ + ovn_set_program_name(argv[0]); + static const struct ovs_cmdl_command commands[] = { + {"basic", NULL, 0, 0, test_basic, OVS_RO}, + {"fifo", NULL, 0, 0, test_fifo, OVS_RO}, + {"full", NULL, 0, 0, test_full, OVS_RO}, + {"wraparound", NULL, 0, 0, test_wraparound, OVS_RO}, + {"for-each-pop", NULL, 0, 0, test_for_each_pop, OVS_RO}, + {"len", NULL, 0, 0, test_len, OVS_RO}, + {"struct", NULL, 0, 0, test_struct, OVS_RO}, + {NULL, NULL, 0, 0, NULL, OVS_RO}, + }; + struct ovs_cmdl_context ctx; + ctx.argc = argc - 1; + ctx.argv = argv + 1; + ovs_cmdl_run_command(&ctx, commands); +} + +OVSTEST_REGISTER("test-spsc-ring", test_spsc_ring_main); -- 2.54.0 _______________________________________________ dev mailing list [email protected] https://mail.openvswitch.org/mailman/listinfo/ovs-dev
