Add a lockless multi-producer/multi-consumer, array-based, non-intrusive, bounded queue that will fail on overflow.
Each operation (enqueue, dequeue) uses a CAS(). As such, both producer and consumer sides guarantee lock-free forward progress. If the queue is full, enqueuing will fail. Conversely, if the queue is empty, dequeueing will fail. The bound of the queue are restricted to power-of-twos, to allow simpler overflow on unsigned position markers. Signed-off-by: Gaetan Rivet <[email protected]> Reviewed-by: Eli Britstein <[email protected]> --- lib/automake.mk | 2 + lib/llring.c | 153 ++++++++++++++++++++++++++++++++++++++++++++++++ lib/llring.h | 76 ++++++++++++++++++++++++ 3 files changed, 231 insertions(+) create mode 100644 lib/llring.c create mode 100644 lib/llring.h diff --git a/lib/automake.mk b/lib/automake.mk index cbdda460a..45948e519 100644 --- a/lib/automake.mk +++ b/lib/automake.mk @@ -156,6 +156,8 @@ lib_libopenvswitch_la_SOURCES = \ lib/learn.h \ lib/learning-switch.c \ lib/learning-switch.h \ + lib/llring.c \ + lib/llring.h \ lib/lockfile.c \ lib/lockfile.h \ lib/mac-learning.c \ diff --git a/lib/llring.c b/lib/llring.c new file mode 100644 index 000000000..1fb930017 --- /dev/null +++ b/lib/llring.c @@ -0,0 +1,153 @@ +/* + * Copyright (c) 2020 NVIDIA Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <config.h> + +#include "ovs-atomic.h" + +#include "llring.h" + +/* A queue element. + * Calling 'llring_create' will allocate an array of such elements, + * that will hold the inserted data. + */ +struct llring_node { + atomic_uint32_t seq; + uint32_t data; +}; + +/* A ring description. + * The head and tail of the ring are padded to avoid false-sharing, + * which improves slightly multi-thread performance, at the cost + * of some memory. + */ +struct llring { + PADDED_MEMBERS(CACHE_LINE_SIZE, atomic_uint32_t head;); + PADDED_MEMBERS(CACHE_LINE_SIZE, atomic_uint32_t tail;); + uint32_t mask; + struct llring_node nodes[0]; +}; + +struct llring * +llring_create(uint32_t size) +{ + struct llring *r; + uint32_t i; + + if (size < 2 || !IS_POW2(size)) { + return NULL; + } + + r = xmalloc(sizeof *r + size * sizeof r->nodes[0]); + + r->mask = size - 1; + for (i = 0; i < size; i++) { + atomic_store_relaxed(&r->nodes[i].seq, i); + } + atomic_store_relaxed(&r->head, 0); + atomic_store_relaxed(&r->tail, 0); + + return r; +} + +void +llring_destroy(struct llring *r) +{ + free(r); +} + +bool +llring_enqueue(struct llring *r, uint32_t data) +{ + struct llring_node *node; + uint32_t pos; + + atomic_read_relaxed(&r->head, &pos); + while (true) { + int64_t diff; + uint32_t seq; + + node = &r->nodes[pos & r->mask]; + atomic_read_explicit(&node->seq, &seq, memory_order_acquire); + diff = (int64_t) seq - (int64_t) pos; + + if (diff < 0) { + /* Current ring[head].seq is from previous ring generation, + * ring is full and enqueue fails. */ + return false; + } + + if (diff == 0) { + /* If head == ring[head].seq, then the slot is free, + * attempt to take it by moving the head, if no one moved it since. + */ + if (atomic_compare_exchange_weak_explicit(&r->head, &pos, pos + 1, + memory_order_relaxed, + memory_order_relaxed)) { + break; + } + } else { + /* Someone changed the head since last read, retry. */ + atomic_read_relaxed(&r->head, &pos); + } + } + + node->data = data; + atomic_store_explicit(&node->seq, pos + 1, memory_order_release); + return true; +} + +bool +llring_dequeue(struct llring *r, uint32_t *data) +{ + struct llring_node *node; + uint32_t pos; + + atomic_read_relaxed(&r->tail, &pos); + while (true) { + int64_t diff; + uint32_t seq; + + node = &r->nodes[pos & r->mask]; + atomic_read_explicit(&node->seq, &seq, memory_order_acquire); + diff = (int64_t) seq - (int64_t) (pos + 1); + + if (diff < 0) { + /* Current ring[tail + 1].seq is from previous ring generation, + * ring is empty and dequeue fails. */ + return false; + } + + if (diff == 0) { + /* If tail + 1 == ring[tail + 1].seq, then the slot is allocated, + * attempt to free it by moving the tail, if no one moved it since. + */ + if (atomic_compare_exchange_weak_explicit(&r->tail, &pos, pos + 1, + memory_order_relaxed, + memory_order_relaxed)) { + break; + } + } else { + /* Someone changed the tail since last read, retry. */ + atomic_read_relaxed(&r->tail, &pos); + } + } + + *data = node->data; + /* Advance the slot to next gen by adding r->mask + 1 to its sequence. */ + atomic_store_explicit(&node->seq, pos + r->mask + 1, memory_order_release); + return true; +} diff --git a/lib/llring.h b/lib/llring.h new file mode 100644 index 000000000..f97baa343 --- /dev/null +++ b/lib/llring.h @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2020 NVIDIA Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <stdint.h> +#include <stdbool.h> + +#include "ovs-atomic.h" + +/* Bounded lockless queue + * ====================== + * + * A lockless FIFO queue bounded to a known size. + * Each operation (insert, remove) uses one CAS(). + * + * The structure is: + * + * Multi-producer: multiple threads can write to it + * concurrently. + * + * Multi-consumer: multiple threads can read from it + * concurrently. + * + * Bounded: the queue is backed by external memory. + * No new allocation is made on insertion, only the + * used elements in the queue are marked as such. + * The boundary of the queue is defined as the size given + * at init, which must be a power of two. + * + * Failing: when an operation (enqueue, dequeue) cannot + * be performed due to the queue being full/empty, the + * operation immediately fails, instead of waiting on + * a state change. + * + * Non-intrusive: queue elements are allocated prior to + * initialization. Data is shallow-copied to those + * allocated elements. + * + * Thread safety + * ============= + * + * The queue is thread-safe for MPMC case. + * No lock is taken by the queue. The queue guarantees + * lock-free forward progress for each of its operations. + * + */ + +/* Create a circular lockless ring. + * The 'size' parameter must be a power-of-two higher than 2, + * otherwise allocation will fail. + */ +struct llring; +struct llring *llring_create(uint32_t size); + +/* Free a lockless ring. */ +void llring_destroy(struct llring *r); + +/* 'data' is copied to the latest free slot in the queue. */ +bool llring_enqueue(struct llring *r, uint32_t data); + +/* The value within the oldest slot taken in the queue is copied + * to the address pointed by 'data'. + */ +bool llring_dequeue(struct llring *r, uint32_t *data); -- 2.30.0 _______________________________________________ dev mailing list [email protected] https://mail.openvswitch.org/mailman/listinfo/ovs-dev
