Add a Single-Producer, Single-Consumer lock-free ring buffer
to lib/ for use as a building block in removing pinctrl_mutex
contention from packet-in processing.

The ring buffer uses a pre-allocated array with atomic read
and write indices for synchronization.  Data is copied in
and out by value (memcpy), making it well suited for small,
fixed-size structs.  Capacity must be a power of two; the
implementation uses unsigned 32-bit wraparound arithmetic
for correct index handling.

Provide SPSC_RING_INIT() for compile-time capacity checking
and SPSC_RING_FOR_EACH_POP() for idiomatic drain loops.

Include unit tests covering basic push/pop, FIFO ordering,
full ring behavior, index wraparound, the FOR_EACH_POP
macro, and multi-field struct elements.

Assisted-by: Claude Opus 4.6, OpenCode
Signed-off-by: Ales Musil <[email protected]>
---
 lib/automake.mk        |   2 +
 lib/spsc-ring.c        |  91 +++++++++++++++
 lib/spsc-ring.h        |  80 +++++++++++++
 tests/automake.mk      |   1 +
 tests/ovn.at           |  16 +++
 tests/test-spsc-ring.c | 256 +++++++++++++++++++++++++++++++++++++++++
 6 files changed, 446 insertions(+)
 create mode 100644 lib/spsc-ring.c
 create mode 100644 lib/spsc-ring.h
 create mode 100644 tests/test-spsc-ring.c

diff --git a/lib/automake.mk b/lib/automake.mk
index 25741cbdf..e1c644bdd 100644
--- a/lib/automake.mk
+++ b/lib/automake.mk
@@ -50,6 +50,8 @@ lib_libovn_la_SOURCES = \
        lib/lb.h \
        lib/sparse-array.c \
        lib/sparse-array.h \
+       lib/spsc-ring.c \
+       lib/spsc-ring.h \
        lib/stopwatch-names.h \
        lib/vec.c \
        lib/vec.h \
diff --git a/lib/spsc-ring.c b/lib/spsc-ring.c
new file mode 100644
index 000000000..b5aa462b2
--- /dev/null
+++ b/lib/spsc-ring.c
@@ -0,0 +1,91 @@
+/* Copyright (c) 2026, Red Hat, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+#include <string.h>
+
+#include "spsc-ring.h"
+#include "util.h"
+
+void
+spsc_ring_init(struct spsc_ring *r, size_t capacity, size_t esize)
+{
+    ovs_assert(IS_POW2(capacity));
+    r->buffer = xmalloc(capacity * esize);
+    r->esize = esize;
+    r->mask = capacity - 1;
+    atomic_init(&r->read, 0);
+    atomic_init(&r->write, 0);
+}
+
+void
+spsc_ring_destroy(struct spsc_ring *r)
+{
+    free(r->buffer);
+    r->buffer = NULL;
+}
+
+/* Producer: copy 'data' into the next available slot.
+ * Returns true on success, false if the ring is full. */
+bool
+spsc_ring_push(struct spsc_ring *r, const void *data)
+{
+    uint32_t wr, rd;
+
+    atomic_read(&r->write, &wr);
+    atomic_read(&r->read, &rd);
+
+    if (wr - rd > r->mask) {
+        return false;
+    }
+
+    memcpy((uint8_t *) r->buffer + (wr & r->mask) * r->esize,
+           data, r->esize);
+    atomic_store(&r->write, wr + 1);
+    return true;
+}
+
+/* Consumer: copy the oldest slot's data into 'data'.
+ * Returns true on success, false if the ring is empty. */
+bool
+spsc_ring_pop(struct spsc_ring *r, void *data)
+{
+    uint32_t rd, wr;
+
+    atomic_read(&r->read, &rd);
+    atomic_read(&r->write, &wr);
+
+    if (rd == wr) {
+        return false;
+    }
+
+    memcpy(data,
+           (uint8_t *) r->buffer + (rd & r->mask) * r->esize,
+           r->esize);
+    atomic_store(&r->read, rd + 1);
+    return true;
+}
+
+/* Returns the number of elements currently in the ring. */
+uint32_t
+spsc_ring_len(const struct spsc_ring *r)
+{
+    uint32_t rd, wr;
+
+    atomic_read(&CONST_CAST(struct spsc_ring *, r)->read, &rd);
+    atomic_read(&CONST_CAST(struct spsc_ring *, r)->write, &wr);
+
+    return wr - rd;
+}
diff --git a/lib/spsc-ring.h b/lib/spsc-ring.h
new file mode 100644
index 000000000..a990962ad
--- /dev/null
+++ b/lib/spsc-ring.h
@@ -0,0 +1,80 @@
+/* Copyright (c) 2026, Red Hat, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SPSC_RING_H
+#define SPSC_RING_H
+
+#include <stdbool.h>
+#include <stdint.h>
+
+#include "ovs-atomic.h"
+#include "openvswitch/compiler.h"
+#include "openvswitch/util.h"
+
+/* Single-Producer, Single-Consumer lock-free ring buffer.
+ *
+ * Thread-safety
+ * =============
+ *
+ * Exactly one thread (the producer) may call spsc_ring_push().
+ * Exactly one thread (the consumer) may call spsc_ring_pop().
+ * These two threads may be different and may operate concurrently
+ * without any external synchronization.
+ *
+ * Memory ordering: sequential consistency (the default for OVS
+ * atomic_read/atomic_store) on the read and write indices ensures
+ * that slot data written by the producer is visible to the consumer
+ * after it observes the updated write index (and vice versa for
+ * read index updates freeing slots for reuse).
+ *
+ * Slot data is copied in and out by value (memcpy), so this is
+ * best suited for small, fixed-size structs.
+ *
+ * Capacity must be a power of two.  The ring uses unsigned 32-bit
+ * wraparound arithmetic on read/write indices, which is correct as
+ * long as capacity is much smaller than 2^32.
+ */
+struct spsc_ring {
+    void *buffer;           /* Pre-allocated slot array. */
+    size_t esize;           /* Size of each element in bytes. */
+    uint32_t mask;          /* capacity - 1 (for power-of-two modulo). */
+    atomic_uint32_t read;   /* Next slot to consume (advanced by consumer). */
+    atomic_uint32_t write;  /* Next slot to fill (advanced by producer). */
+};
+
+void spsc_ring_init(struct spsc_ring *, size_t capacity, size_t esize);
+void spsc_ring_destroy(struct spsc_ring *);
+bool spsc_ring_push(struct spsc_ring *, const void *data);
+bool spsc_ring_pop(struct spsc_ring *, void *data);
+uint32_t spsc_ring_len(const struct spsc_ring *);
+
+/* Initialize a ring with CAPACITY slots of TYPE.
+ * CAPACITY must be a power of two (compile-time assertion). */
+#define SPSC_RING_INIT(RING, CAPACITY, TYPE)         \
+    do {                                              \
+        BUILD_ASSERT_DECL(IS_POW2(CAPACITY));         \
+        spsc_ring_init(RING, CAPACITY, sizeof(TYPE)); \
+    } while (0)
+
+/* Pop each element into VAR until the ring is empty.
+ *
+ * Uses ITER_VAR() to derive a unique loop variable name from VAR,
+ * avoiding name clashes in nested loops. */
+#define SPSC_RING_FOR_EACH_POP(RING, VAR)                          \
+    for (bool ITER_VAR(VAR) = spsc_ring_pop(RING, &(VAR));         \
+         ITER_VAR(VAR);                                             \
+         ITER_VAR(VAR) = spsc_ring_pop(RING, &(VAR)))
+
+#endif /* lib/spsc-ring.h */
diff --git a/tests/automake.mk b/tests/automake.mk
index 76ef4fda0..fd3e2ccae 100644
--- a/tests/automake.mk
+++ b/tests/automake.mk
@@ -280,6 +280,7 @@ tests_ovstest_SOURCES = \
        tests/test-utils.h \
        tests/test-ovn.c \
        tests/test-sparse-array.c \
+       tests/test-spsc-ring.c \
        tests/test-vector.c \
        controller/test-lflow-cache.c \
        controller/test-vif-plug.c \
diff --git a/tests/ovn.at b/tests/ovn.at
index 522c1c90d..c70aa2dfe 100644
--- a/tests/ovn.at
+++ b/tests/ovn.at
@@ -2451,6 +2451,22 @@ check ovstest test-sparse-array add
 check ovstest test-sparse-array remove-replace
 AT_CLEANUP
 
+AT_SETUP([SPSC ring buffer])
+check ovstest test-spsc-ring basic
+
+check ovstest test-spsc-ring fifo
+
+check ovstest test-spsc-ring full
+
+check ovstest test-spsc-ring wraparound
+
+check ovstest test-spsc-ring for-each-pop
+
+check ovstest test-spsc-ring len
+
+check ovstest test-spsc-ring struct
+AT_CLEANUP
+
 AT_SETUP([Parse MAC])
 AT_CHECK([ovstest test-ovn parse-eth-addr 01:02:03:04:05:xx], [1])
 AT_CHECK([ovstest test-ovn parse-eth-addr 01:02:03:04:05:06], [0], [dnl
diff --git a/tests/test-spsc-ring.c b/tests/test-spsc-ring.c
new file mode 100644
index 000000000..51375bbf5
--- /dev/null
+++ b/tests/test-spsc-ring.c
@@ -0,0 +1,256 @@
+/* Copyright (c) 2026, Red Hat, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <config.h>
+#include <stdint.h>
+
+#include "tests/ovstest.h"
+#include "lib/ovn-util.h"
+#include "lib/spsc-ring.h"
+
+/* Basic push and pop with data integrity check. */
+static void
+test_basic(struct ovs_cmdl_context *ctx OVS_UNUSED)
+{
+    struct spsc_ring ring;
+    SPSC_RING_INIT(&ring, 16, uint32_t);
+
+    uint32_t val;
+
+    /* Pop on empty ring returns false. */
+    ovs_assert(!spsc_ring_pop(&ring, &val));
+
+    /* Push one element and pop it. */
+    val = 42;
+    ovs_assert(spsc_ring_push(&ring, &val));
+    val = 0;
+    ovs_assert(spsc_ring_pop(&ring, &val));
+    ovs_assert(val == 42);
+
+    /* Ring is empty again. */
+    ovs_assert(!spsc_ring_pop(&ring, &val));
+
+    spsc_ring_destroy(&ring);
+}
+
+/* FIFO ordering: push a sequence, pop and verify order. */
+static void
+test_fifo(struct ovs_cmdl_context *ctx OVS_UNUSED)
+{
+    struct spsc_ring ring;
+    SPSC_RING_INIT(&ring, 64, uint32_t);
+
+    for (uint32_t i = 0; i < 50; i++) {
+        ovs_assert(spsc_ring_push(&ring, &i));
+    }
+
+    for (uint32_t i = 0; i < 50; i++) {
+        uint32_t val;
+        ovs_assert(spsc_ring_pop(&ring, &val));
+        ovs_assert(val == i);
+    }
+
+    uint32_t val;
+    ovs_assert(!spsc_ring_pop(&ring, &val));
+
+    spsc_ring_destroy(&ring);
+}
+
+/* Fill ring to capacity, verify push fails, pop one, push succeeds. */
+static void
+test_full(struct ovs_cmdl_context *ctx OVS_UNUSED)
+{
+    struct spsc_ring ring;
+    SPSC_RING_INIT(&ring, 8, uint32_t);
+
+    /* Fill all 8 slots. */
+    for (uint32_t i = 0; i < 8; i++) {
+        ovs_assert(spsc_ring_push(&ring, &i));
+    }
+
+    /* 9th push must fail. */
+    uint32_t overflow = 99;
+    ovs_assert(!spsc_ring_push(&ring, &overflow));
+
+    /* Pop one element. */
+    uint32_t val;
+    ovs_assert(spsc_ring_pop(&ring, &val));
+    ovs_assert(val == 0);
+
+    /* Now push succeeds. */
+    ovs_assert(spsc_ring_push(&ring, &overflow));
+
+    /* Drain and verify: 1..7, then 99. */
+    for (uint32_t i = 1; i <= 7; i++) {
+        ovs_assert(spsc_ring_pop(&ring, &val));
+        ovs_assert(val == i);
+    }
+    ovs_assert(spsc_ring_pop(&ring, &val));
+    ovs_assert(val == 99);
+
+    ovs_assert(!spsc_ring_pop(&ring, &val));
+
+    spsc_ring_destroy(&ring);
+}
+
+/* Wraparound: push/pop many times to wrap head/tail past UINT32_MAX. */
+static void
+test_wraparound(struct ovs_cmdl_context *ctx OVS_UNUSED)
+{
+    struct spsc_ring ring;
+    SPSC_RING_INIT(&ring, 4, uint32_t);
+
+    /* Push and pop one-at-a-time many times to advance read/write.
+     * After 2^32 iterations the indices would wrap, but we can't run
+     * that many.  Instead, manually set read/write near UINT32_MAX to
+     * test the wraparound arithmetic. */
+    atomic_store(&ring.read, UINT32_MAX - 2);
+    atomic_store(&ring.write, UINT32_MAX - 2);
+
+    /* Push 4 elements (fills the ring, wrapping tail past UINT32_MAX). */
+    for (uint32_t i = 0; i < 4; i++) {
+        ovs_assert(spsc_ring_push(&ring, &i));
+    }
+
+    /* Ring is full. */
+    uint32_t overflow = 99;
+    ovs_assert(!spsc_ring_push(&ring, &overflow));
+
+    /* Pop all 4, verify FIFO order. */
+    for (uint32_t i = 0; i < 4; i++) {
+        uint32_t val;
+        ovs_assert(spsc_ring_pop(&ring, &val));
+        ovs_assert(val == i);
+    }
+
+    /* Empty. */
+    uint32_t val;
+    ovs_assert(!spsc_ring_pop(&ring, &val));
+
+    spsc_ring_destroy(&ring);
+}
+
+/* Test SPSC_RING_FOR_EACH_POP macro. */
+static void
+test_for_each_pop(struct ovs_cmdl_context *ctx OVS_UNUSED)
+{
+    struct spsc_ring ring;
+    SPSC_RING_INIT(&ring, 32, uint32_t);
+
+    for (uint32_t i = 0; i < 20; i++) {
+        ovs_assert(spsc_ring_push(&ring, &i));
+    }
+
+    uint32_t count = 0;
+    uint32_t val;
+    SPSC_RING_FOR_EACH_POP (&ring, val) {
+        ovs_assert(val == count);
+        count++;
+    }
+    ovs_assert(count == 20);
+
+    /* Ring is empty after iteration. */
+    ovs_assert(!spsc_ring_pop(&ring, &val));
+
+    spsc_ring_destroy(&ring);
+}
+
+/* Verify spsc_ring_len() returns the correct count. */
+static void
+test_len(struct ovs_cmdl_context *ctx OVS_UNUSED)
+{
+    struct spsc_ring ring;
+    SPSC_RING_INIT(&ring, 16, uint32_t);
+
+    /* Empty ring has length 0. */
+    ovs_assert(spsc_ring_len(&ring) == 0);
+
+    /* Push 5 elements, length should be 5. */
+    for (uint32_t i = 0; i < 5; i++) {
+        ovs_assert(spsc_ring_push(&ring, &i));
+    }
+    ovs_assert(spsc_ring_len(&ring) == 5);
+
+    /* Pop 2, length should be 3. */
+    uint32_t val;
+    ovs_assert(spsc_ring_pop(&ring, &val));
+    ovs_assert(spsc_ring_pop(&ring, &val));
+    ovs_assert(spsc_ring_len(&ring) == 3);
+
+    /* Pop remaining 3, length should be 0. */
+    for (uint32_t i = 0; i < 3; i++) {
+        ovs_assert(spsc_ring_pop(&ring, &val));
+    }
+    ovs_assert(spsc_ring_len(&ring) == 0);
+
+    spsc_ring_destroy(&ring);
+}
+
+/* Test with a multi-field struct to verify memcpy correctness. */
+struct test_element {
+    uint64_t key;
+    uint32_t value;
+    uint8_t  tag;
+};
+
+static void
+test_struct(struct ovs_cmdl_context *ctx OVS_UNUSED)
+{
+    struct spsc_ring ring;
+    SPSC_RING_INIT(&ring, 16, struct test_element);
+
+    for (uint64_t i = 0; i < 10; i++) {
+        struct test_element elem = {
+            .key = i * 1000,
+            .value = (uint32_t) i,
+            .tag = (uint8_t) (i & 0xff),
+        };
+        ovs_assert(spsc_ring_push(&ring, &elem));
+    }
+
+    struct test_element out;
+    uint64_t count = 0;
+    SPSC_RING_FOR_EACH_POP (&ring, out) {
+        ovs_assert(out.key == count * 1000);
+        ovs_assert(out.value == (uint32_t) count);
+        ovs_assert(out.tag == (uint8_t) (count & 0xff));
+        count++;
+    }
+    ovs_assert(count == 10);
+
+    spsc_ring_destroy(&ring);
+}
+
+static void
+test_spsc_ring_main(int argc OVS_UNUSED, char *argv[] OVS_UNUSED)
+{
+    ovn_set_program_name(argv[0]);
+    static const struct ovs_cmdl_command commands[] = {
+        {"basic", NULL, 0, 0, test_basic, OVS_RO},
+        {"fifo", NULL, 0, 0, test_fifo, OVS_RO},
+        {"full", NULL, 0, 0, test_full, OVS_RO},
+        {"wraparound", NULL, 0, 0, test_wraparound, OVS_RO},
+        {"for-each-pop", NULL, 0, 0, test_for_each_pop, OVS_RO},
+        {"len", NULL, 0, 0, test_len, OVS_RO},
+        {"struct", NULL, 0, 0, test_struct, OVS_RO},
+        {NULL, NULL, 0, 0, NULL, OVS_RO},
+    };
+    struct ovs_cmdl_context ctx;
+    ctx.argc = argc - 1;
+    ctx.argv = argv + 1;
+    ovs_cmdl_run_command(&ctx, commands);
+}
+
+OVSTEST_REGISTER("test-spsc-ring", test_spsc_ring_main);
-- 
2.54.0

_______________________________________________
dev mailing list
[email protected]
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to