Add a lockless multi-producer/single-consumer (MPSC), linked-list based, intrusive, unbounded queue that does not require deferred memory management.
The queue is an implementation of the structure described by Dmitri Vyukov[1]. It adds a slightly more explicit API explaining the proper use of the queue. Alternatives were considered such as a Treiber Stack [2] or a Michael-Scott queue [3], but this one is faster, simpler and scalable. [1]: http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue [2]: R. K. Treiber. Systems programming: Coping with parallelism. Technical Report RJ 5118, IBM Almaden Research Center, April 1986. [3]: M. M. Michael, Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms. https://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html The queue is designed to improve the specific MPSC setup. A benchmark accompanies the unit tests to measure the difference in this configuration. A single reader thread polls the queue while N writers enqueue elements as fast as possible. The mpsc-queue is compared against the regular ovs-list as well as the guarded list. The latter usually offers a slight improvement by batching the element removal, however the mpsc-queue is faster. The average is of each producer threads time: $ ./tests/ovstest test-mpsc-queue benchmark 3000000 1 Benchmarking n=3000000 on 1 + 1 threads. type\thread: Reader 1 Avg mpsc-queue: 161 161 161 ms list: 803 803 803 ms guarded list: 665 665 665 ms $ ./tests/ovstest test-mpsc-queue benchmark 3000000 2 Benchmarking n=3000000 on 1 + 2 threads. type\thread: Reader 1 2 Avg mpsc-queue: 102 101 97 99 ms list: 246 212 246 229 ms guarded list: 264 263 214 238 ms $ ./tests/ovstest test-mpsc-queue benchmark 3000000 3 Benchmarking n=3000000 on 1 + 3 threads. type\thread: Reader 1 2 3 Avg mpsc-queue: 92 91 92 91 91 ms list: 520 517 515 520 517 ms guarded list: 405 395 401 404 400 ms $ ./tests/ovstest test-mpsc-queue benchmark 3000000 4 Benchmarking n=3000000 on 1 + 4 threads. type\thread: Reader 1 2 3 4 Avg mpsc-queue: 77 73 73 77 75 74 ms list: 371 359 361 287 370 344 ms guarded list: 389 388 359 363 357 366 ms Signed-off-by: Gaetan Rivet <[email protected]> Reviewed-by: Eli Britstein <[email protected]> --- lib/automake.mk | 2 + lib/mpsc-queue.c | 251 ++++++++++++++ lib/mpsc-queue.h | 189 +++++++++++ tests/automake.mk | 1 + tests/library.at | 5 + tests/test-mpsc-queue.c | 727 ++++++++++++++++++++++++++++++++++++++++ 6 files changed, 1175 insertions(+) create mode 100644 lib/mpsc-queue.c create mode 100644 lib/mpsc-queue.h create mode 100644 tests/test-mpsc-queue.c diff --git a/lib/automake.mk b/lib/automake.mk index 39901bd6d..9c6836688 100644 --- a/lib/automake.mk +++ b/lib/automake.mk @@ -166,6 +166,8 @@ lib_libopenvswitch_la_SOURCES = \ lib/memory.c \ lib/memory.h \ lib/meta-flow.c \ + lib/mpsc-queue.c \ + lib/mpsc-queue.h \ lib/multipath.c \ lib/multipath.h \ lib/namemap.c \ diff --git a/lib/mpsc-queue.c b/lib/mpsc-queue.c new file mode 100644 index 000000000..cd26ed207 --- /dev/null +++ b/lib/mpsc-queue.c @@ -0,0 +1,251 @@ +/* + * Copyright (c) 2020 NVIDIA Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <config.h> + +#include "ovs-atomic.h" + +#include "mpsc-queue.h" + +/* Multi-producer, single-consumer queue + * ===================================== + * + * This an implementation of the MPSC queue described by Dmitri Vyukov [1]. + * + * One atomic exchange operation is done per insertion. Removal in most cases + * will not require atomic operation and will use one atomic exchange to close + * the queue chain. + * + * Insertion + * ========= + * + * The queue is implemented using a linked-list. Insertion is done at the + * back of the queue, by swapping the current end with the new node atomically, + * then pointing the previous end toward the new node. To follow Vyukov + * nomenclature, the end-node of the chain is called head. A producer will + * only manipulate the head. + * + * The head swap is atomic, however the link from the previous head to the new + * one is done in a separate operation. This means that the chain is + * momentarily broken, when the previous head still points to NULL and the + * current head has been inserted. + * + * Considering a series of insertions, the queue state will remain consistent + * and the insertions order is compatible with their precedence, thus the + * queue is serializable. However, because an insertion consists in two + * separate memory transactions, it is not linearizable. + * + * Removal + * ======= + * + * The consumer must deal with the queue inconsistency. It will manipulate + * the tail of the queue and move it along the latest consumed elements. + * When an end of the chain of elements is found (the next pointer is NULL), + * the tail is compared with the head. + * + * If both points to different addresses, then the queue is in an inconsistent + * state: the tail cannot move forward as the next is NULL, but the head is not + * the last element in the chain: this can only happen if the chain is broken. + * + * In this case, the consumer must wait for the producer to finish writing the + * next pointer of its current tail: 'MPSC_QUEUE_RETRY' is returned. + * + * Removal is thus in most cases (when there are elements in the queue) + * accomplished without using atomics, until the last element of the queue. + * There, the head is atomically loaded. If the queue is in a consistent state, + * the head is moved back to the queue stub by inserting the stub in the queue: + * ending the queue is the same as an insertion, which is one atomic XCHG. + * + * Forward guarantees + * ================== + * + * Insertion and peeking are wait-free: they will execute in a known bounded + * number of instructions, regardless of the state of the queue. + * + * However, while removal consists in peeking and a constant write to + * update the tail, it can repeatedly fail until the queue become consistent. + * It is thus dependent on other threads progressing. This means that the + * queue forward progress is obstruction-free only. It has a potential for + * livelocking. + * + * The chain will remain broken as long as a producer is not finished writing + * its next pointer. If a producer is cancelled for example, the queue could + * remain broken for any future readings. This queue should either be used + * with cooperative threads or insertion must only be done outside cancellable + * sections. + * + * Performances + * ============ + * + * In benchmarks this structure was better than alternatives such as: + * + * * A reversed Treiber stack [2], using 1 CAS per operations + * and requiring reversal of the node list on removal. + * + * * Michael-Scott lock-free queue [3], using 2 CAS per operations. + * + * While it is not linearizable, this queue is well-suited for message passing. + * If a proper hardware XCHG operation is used, it scales better than + * CAS-based implementations. + * + * References + * ========== + * + * [1]: http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue + * + * [2]: R. K. Treiber. Systems programming: Coping with parallelism. + * Technical Report RJ 5118, IBM Almaden Research Center, April 1986. + * + * [3]: M. M. Michael, Simple, Fast, and Practical Non-Blocking and + * Blocking Concurrent Queue Algorithms + * [3]: https://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html + * + */ + +void +mpsc_queue_init(struct mpsc_queue *queue) +{ + atomic_store_relaxed(&queue->head, &queue->stub); + atomic_store_relaxed(&queue->tail, &queue->stub); + atomic_store_relaxed(&queue->stub.next, NULL); + + ovs_mutex_init(&queue->read_lock); +} + +void +mpsc_queue_destroy(struct mpsc_queue *queue) + OVS_EXCLUDED(queue->read_lock) +{ + ovs_mutex_destroy(&queue->read_lock); +} + +enum mpsc_queue_poll_result +mpsc_queue_poll(struct mpsc_queue *queue, struct mpsc_queue_node **node) + OVS_REQUIRES(queue->read_lock) +{ + struct mpsc_queue_node *tail; + struct mpsc_queue_node *next; + struct mpsc_queue_node *head; + + atomic_read_relaxed(&queue->tail, &tail); + atomic_read_explicit(&tail->next, &next, memory_order_acquire); + + if (tail == &queue->stub) { + if (next == NULL) { + return MPSC_QUEUE_EMPTY; + } + + atomic_store_relaxed(&queue->tail, next); + tail = next; + atomic_read_explicit(&tail->next, &next, memory_order_acquire); + } + + if (next != NULL) { + atomic_store_relaxed(&queue->tail, next); + *node = tail; + return MPSC_QUEUE_ITEM; + } + + atomic_read_explicit(&queue->head, &head, memory_order_acquire); + if (tail != head) { + return MPSC_QUEUE_RETRY; + } + + mpsc_queue_insert(queue, &queue->stub); + + atomic_read_explicit(&tail->next, &next, memory_order_acquire); + if (next != NULL) { + atomic_store_relaxed(&queue->tail, next); + *node = tail; + return MPSC_QUEUE_ITEM; + } + + return MPSC_QUEUE_EMPTY; +} + +struct mpsc_queue_node * +mpsc_queue_pop(struct mpsc_queue *queue) + OVS_REQUIRES(queue->read_lock) +{ + enum mpsc_queue_poll_result result; + struct mpsc_queue_node *node; + + do { + result = mpsc_queue_poll(queue, &node); + if (result == MPSC_QUEUE_EMPTY) { + return NULL; + } + } while (result == MPSC_QUEUE_RETRY); + + return node; +} + +void +mpsc_queue_push_back(struct mpsc_queue *queue, struct mpsc_queue_node *node) + OVS_REQUIRES(queue->read_lock) +{ + struct mpsc_queue_node *tail; + + atomic_read_relaxed(&queue->tail, &tail); + atomic_store_relaxed(&node->next, tail); + atomic_store_relaxed(&queue->tail, node); +} + +struct mpsc_queue_node * +mpsc_queue_tail(struct mpsc_queue *queue) + OVS_REQUIRES(queue->read_lock) +{ + struct mpsc_queue_node *tail; + struct mpsc_queue_node *next; + + atomic_read_relaxed(&queue->tail, &tail); + atomic_read_explicit(&tail->next, &next, memory_order_acquire); + + if (tail == &queue->stub) { + if (next == NULL) { + return NULL; + } + + atomic_store_relaxed(&queue->tail, next); + tail = next; + } + + return tail; +} + +/* Get the next element of a node. */ +struct mpsc_queue_node *mpsc_queue_next(struct mpsc_queue *queue, + struct mpsc_queue_node *prev) + OVS_REQUIRES(queue->read_lock) +{ + struct mpsc_queue_node *next; + + atomic_read_explicit(&prev->next, &next, memory_order_acquire); + if (next == &queue->stub) { + atomic_read_explicit(&next->next, &next, memory_order_acquire); + } + return next; +} + +void +mpsc_queue_insert(struct mpsc_queue *queue, struct mpsc_queue_node *node) +{ + struct mpsc_queue_node *prev; + + atomic_store_relaxed(&node->next, NULL); + prev = atomic_exchange_explicit(&queue->head, node, memory_order_acq_rel); + atomic_store_explicit(&prev->next, node, memory_order_release); +} diff --git a/lib/mpsc-queue.h b/lib/mpsc-queue.h new file mode 100644 index 000000000..4f8c32cc6 --- /dev/null +++ b/lib/mpsc-queue.h @@ -0,0 +1,189 @@ +/* + * Copyright (c) 2020 NVIDIA Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef MPSC_QUEUE_H +#define MPSC_QUEUE_H 1 + +#include <stdbool.h> +#include <stdint.h> +#include <stddef.h> + +#include <openvswitch/thread.h> +#include <openvswitch/util.h> + +#include "ovs-atomic.h" + +/* Multi-producer, single-consumer queue + * ===================================== + * + * This data structure is a lockless queue implementation with + * the following properties: + * + * * Multi-producer: multiple threads can write concurrently. + * Insertion in the queue is thread-safe, no inter-thread + * synchronization is necessary. + * + * * Single-consumer: only a single thread can safely remove + * nodes from the queue. The queue must be 'acquired' using + * 'mpsc_queue_acquire()' before removing nodes. + * + * * Unbounded: the queue is backed by a linked-list and is not + * limited in number of elements. + * + * * Intrusive: queue elements are allocated as part of larger + * objects. Objects are retrieved by offset manipulation. + * + * * per-producer FIFO: Elements in the queue are kept in the + * order their producer inserted them. The consumer retrieves + * them in in the same insertion order. When multiple + * producers insert at the same time, either will proceed. + * + * This queue is well-suited for message passing between threads, + * where any number of thread can insert a message and a single + * thread is meant to receive and process it. + * + * Thread-safety + * ============= + * + * The consumer thread must acquire the queue using 'mpsc_queue_acquire()'. + * Once the queue is protected against concurrent reads, the thread can call + * the consumer API: + * + * * mpsc_queue_poll() to peek and return the head of the queue + * * mpsc_queue_pop() to remove the head of the queue + * * mpsc_queue_tail() to read the current tail + * * MPSC_QUEUE_FOR_EACH() to iterate over the current elements, + * without removing them. + * * MPSC_QUEUE_FOR_EACH_POP() to iterate over the elements while + * removing them. + * + * When a thread is finished with reading the queue, it can release the + * reader lock using 'mpsc_queue_release()'. + * + * Producers can always insert elements in the queue, even if no consumer + * acquired the reader lock. No inter-producer synchronization is needed. + * + * The consumer thread is also allowed to insert elements while it holds the + * reader lock. + * + * Producer threads must never be cancelled while writing to the queue. + * This will block the consumer, that will then lose any subsequent elements + * in the queue. Producers should ideally be cooperatively managed or + * the queue insertion should be within non-cancellable sections. + * + * Queue state + * =========== + * + * When polling the queue, three states can be observed: 'empty', 'non-empty', + * and 'inconsistent'. Three polling results are defined, respectively: + * + * * MPSC_QUEUE_EMPTY: the queue is empty. + * * MPSC_QUEUE_ITEM: an item was available and has been removed. + * * MPSC_QUEUE_RETRY: the queue is inconsistent. + * + * If 'MPSC_QUEUE_RETRY' is returned, then a producer has not yet finished + * writing to the queue and the list of nodes is not coherent. The consumer + * can retry shortly to check if the producer has finished. + * + * This behavior is the reason the removal function is called + * 'mpsc_queue_poll()'. + * + */ + +struct mpsc_queue_node { + ATOMIC(struct mpsc_queue_node *) next; +}; + +struct mpsc_queue { + ATOMIC(struct mpsc_queue_node *) head; + ATOMIC(struct mpsc_queue_node *) tail; + struct mpsc_queue_node stub; + struct ovs_mutex read_lock; +}; + +#define MPSC_QUEUE_INITIALIZER(Q) { \ + .head = ATOMIC_VAR_INIT(&(Q)->stub), \ + .tail = ATOMIC_VAR_INIT(&(Q)->stub), \ + .stub = { .next = ATOMIC_VAR_INIT(NULL) }, \ + .read_lock = OVS_MUTEX_INITIALIZER, \ +} + +/* Consumer API. */ + +/* Initialize the queue. Not necessary is 'MPSC_QUEUE_INITIALIZER' was used. */ +void mpsc_queue_init(struct mpsc_queue *queue); +/* The reader lock must be released prior to destroying the queue. */ +void mpsc_queue_destroy(struct mpsc_queue *queue); + +/* Acquire and release the consumer lock. */ +#define mpsc_queue_acquire(q) do { \ + ovs_mutex_lock(&(q)->read_lock); \ + } while (0) +#define mpsc_queue_release(q) do { \ + ovs_mutex_unlock(&(q)->read_lock); \ + } while (0) + +enum mpsc_queue_poll_result { + /* Queue is empty. */ + MPSC_QUEUE_EMPTY, + /* Polling the queue returned an item. */ + MPSC_QUEUE_ITEM, + /* Data has been enqueued but one or more producer thread have not + * finished writing it. The queue is in an inconsistent state. + * Retrying shortly, if the producer threads are still active, will + * return the data. + */ + MPSC_QUEUE_RETRY, +}; + +/* Set 'node' to a removed item from the queue if 'MPSC_QUEUE_ITEM' is + * returned, otherwise 'node' is not set. + */ +enum mpsc_queue_poll_result mpsc_queue_poll(struct mpsc_queue *queue, + struct mpsc_queue_node **node) + OVS_REQUIRES(queue->read_lock); + +/* Pop an element if there is any in the queue. */ +struct mpsc_queue_node *mpsc_queue_pop(struct mpsc_queue *queue) + OVS_REQUIRES(queue->read_lock); + +/* Insert at the back of the queue. Only the consumer can do it. */ +void mpsc_queue_push_back(struct mpsc_queue *queue, + struct mpsc_queue_node *node) + OVS_REQUIRES(queue->read_lock); + +/* Get the current queue tail. */ +struct mpsc_queue_node *mpsc_queue_tail(struct mpsc_queue *queue) + OVS_REQUIRES(queue->read_lock); + +/* Get the next element of a node. */ +struct mpsc_queue_node *mpsc_queue_next(struct mpsc_queue *queue, + struct mpsc_queue_node *prev) + OVS_REQUIRES(queue->read_lock); + +#define MPSC_QUEUE_FOR_EACH(node, queue) \ + for (node = mpsc_queue_tail(queue); node != NULL; \ + node = mpsc_queue_next((queue), node)) + +#define MPSC_QUEUE_FOR_EACH_POP(node, queue) \ + for (node = mpsc_queue_pop(queue); node != NULL; \ + node = mpsc_queue_pop(queue)) + +/* Producer API. */ + +void mpsc_queue_insert(struct mpsc_queue *queue, struct mpsc_queue_node *node); + +#endif /* MPSC_QUEUE_H */ diff --git a/tests/automake.mk b/tests/automake.mk index 1a528aa39..e95eb0180 100644 --- a/tests/automake.mk +++ b/tests/automake.mk @@ -465,6 +465,7 @@ tests_ovstest_SOURCES = \ tests/test-list.c \ tests/test-lockfile.c \ tests/test-multipath.c \ + tests/test-mpsc-queue.c \ tests/test-netflow.c \ tests/test-odp.c \ tests/test-ofpbuf.c \ diff --git a/tests/library.at b/tests/library.at index 1702b7556..ace6234b5 100644 --- a/tests/library.at +++ b/tests/library.at @@ -254,3 +254,8 @@ AT_SETUP([stopwatch module]) AT_CHECK([ovstest test-stopwatch], [0], [...... ], [ignore]) AT_CLEANUP + +AT_SETUP([mpsc-queue module]) +AT_CHECK([ovstest test-mpsc-queue check], [0], [.... +]) +AT_CLEANUP diff --git a/tests/test-mpsc-queue.c b/tests/test-mpsc-queue.c new file mode 100644 index 000000000..26d48b9ff --- /dev/null +++ b/tests/test-mpsc-queue.c @@ -0,0 +1,727 @@ +/* + * Copyright (c) 2020 NVIDIA Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#undef NDEBUG +#include <assert.h> +#include <getopt.h> +#include <string.h> + +#include <config.h> + +#include "command-line.h" +#include "guarded-list.h" +#include "mpsc-queue.h" +#include "openvswitch/list.h" +#include "openvswitch/util.h" +#include "ovs-thread.h" +#include "ovstest.h" +#include "timeval.h" +#include "util.h" + +struct element { + union { + struct mpsc_queue_node mpscq; + struct ovs_list list; + } node; + uint64_t mark; +}; + +static void +test_mpsc_queue_mark_element(struct mpsc_queue_node *node, + uint64_t mark, + unsigned int *counter) +{ + struct element *elem; + + elem = CONTAINER_OF(node, struct element, node.mpscq); + elem->mark = mark; + *counter += 1; +} + +static void +test_mpsc_queue_insert(void) +{ + struct element elements[100]; + struct mpsc_queue_node *node; + struct mpsc_queue queue; + unsigned int counter; + size_t i; + + memset(elements, 0, sizeof(elements)); + mpsc_queue_init(&queue); + mpsc_queue_acquire(&queue); + + for (i = 0; i < ARRAY_SIZE(elements); i++) { + mpsc_queue_insert(&queue, &elements[i].node.mpscq); + } + + counter = 0; + while (mpsc_queue_poll(&queue, &node) == MPSC_QUEUE_ITEM) { + test_mpsc_queue_mark_element(node, 1, &counter); + } + + mpsc_queue_release(&queue); + mpsc_queue_destroy(&queue); + + ovs_assert(counter == ARRAY_SIZE(elements)); + for (i = 0; i < ARRAY_SIZE(elements); i++) { + ovs_assert(elements[i].mark == 1); + } + + printf("."); +} + +static void +test_mpsc_queue_removal_fifo(void) +{ + struct element elements[100]; + struct mpsc_queue_node *node; + struct mpsc_queue queue; + unsigned int counter; + size_t i; + + memset(elements, 0, sizeof(elements)); + + mpsc_queue_init(&queue); + mpsc_queue_acquire(&queue); + + for (i = 0; i < ARRAY_SIZE(elements); i++) { + mpsc_queue_insert(&queue, &elements[i].node.mpscq); + } + + /* Elements are in the same order in the list as they + * were declared / initialized. + */ + counter = 0; + while (mpsc_queue_poll(&queue, &node) == MPSC_QUEUE_ITEM) { + test_mpsc_queue_mark_element(node, counter, &counter); + } + + /* The list is valid once extracted from the queue, + * the queue can be destroyed here. + */ + mpsc_queue_release(&queue); + mpsc_queue_destroy(&queue); + + for (i = 0; i < ARRAY_SIZE(elements) - 1; i++) { + struct element *e1, *e2; + + e1 = &elements[i]; + e2 = &elements[i + 1]; + + ovs_assert(e1->mark < e2->mark); + } + + printf("."); +} + +/* Partial insert: + * + * Those functions are 'mpsc_queue_insert()' divided in two parts. + * They serve to test the behavior of the queue when forcing the potential + * condition of a thread starting an insertion then yielding. + */ +static struct mpsc_queue_node * +mpsc_queue_insert_begin(struct mpsc_queue *queue, struct mpsc_queue_node *node) +{ + struct mpsc_queue_node *prev; + + atomic_store_explicit(&node->next, NULL, memory_order_relaxed); + prev = atomic_exchange_explicit(&queue->head, node, memory_order_acq_rel); + return prev; +} + +static void +mpsc_queue_insert_end(struct mpsc_queue_node *prev, + struct mpsc_queue_node *node) +{ + atomic_store_explicit(&prev->next, node, memory_order_release); +} + +static void +test_mpsc_queue_insert_partial(void) +{ + struct element elements[10]; + struct mpsc_queue_node *prevs[ARRAY_SIZE(elements)]; + struct mpsc_queue_node *node; + struct mpsc_queue queue, *q = &queue; + size_t i; + + mpsc_queue_init(q); + + /* Insert the first half of elements entirely, + * insert the second hald of elements partially. + */ + for (i = 0; i < ARRAY_SIZE(elements); i++) { + elements[i].mark = i; + if (i > ARRAY_SIZE(elements) / 2) { + prevs[i] = mpsc_queue_insert_begin(q, &elements[i].node.mpscq); + } else { + prevs[i] = NULL; + mpsc_queue_insert(q, &elements[i].node.mpscq); + } + } + + mpsc_queue_acquire(q); + + /* Verify that when the chain is broken, iterators will stop. */ + i = 0; + MPSC_QUEUE_FOR_EACH (node, q) { + struct element *e = CONTAINER_OF(node, struct element, node.mpscq); + ovs_assert(e == &elements[i]); + i++; + } + ovs_assert(i < ARRAY_SIZE(elements)); + + for (i = 0; i < ARRAY_SIZE(elements); i++) { + if (prevs[i] != NULL) { + mpsc_queue_insert_end(prevs[i], &elements[i].node.mpscq); + } + } + + i = 0; + MPSC_QUEUE_FOR_EACH (node, q) { + struct element *e = CONTAINER_OF(node, struct element, node.mpscq); + ovs_assert(e == &elements[i]); + i++; + } + ovs_assert(i == ARRAY_SIZE(elements)); + + MPSC_QUEUE_FOR_EACH_POP (node, q) { + struct element *e = CONTAINER_OF(node, struct element, node.mpscq); + ovs_assert(e->mark == (unsigned int)(e - elements)); + } + + mpsc_queue_release(q); + mpsc_queue_destroy(q); + + printf("."); +} + +static void +test_mpsc_queue_push_back(void) +{ + struct mpsc_queue queue, *q = &queue; + struct mpsc_queue_node *node; + struct element elements[10]; + size_t i; + + mpsc_queue_init(q); + mpsc_queue_acquire(q); + + ovs_assert(mpsc_queue_pop(q) == NULL); + mpsc_queue_push_back(q, &elements[0].node.mpscq); + node = mpsc_queue_pop(q); + ovs_assert(node == &elements[0].node.mpscq); + ovs_assert(mpsc_queue_pop(q) == NULL); + + mpsc_queue_push_back(q, &elements[0].node.mpscq); + mpsc_queue_push_back(q, &elements[1].node.mpscq); + ovs_assert(mpsc_queue_pop(q) == &elements[1].node.mpscq); + ovs_assert(mpsc_queue_pop(q) == &elements[0].node.mpscq); + ovs_assert(mpsc_queue_pop(q) == NULL); + + mpsc_queue_push_back(q, &elements[1].node.mpscq); + mpsc_queue_push_back(q, &elements[0].node.mpscq); + mpsc_queue_insert(q, &elements[2].node.mpscq); + ovs_assert(mpsc_queue_pop(q) == &elements[0].node.mpscq); + ovs_assert(mpsc_queue_pop(q) == &elements[1].node.mpscq); + ovs_assert(mpsc_queue_pop(q) == &elements[2].node.mpscq); + ovs_assert(mpsc_queue_pop(q) == NULL); + + for (i = 0; i < ARRAY_SIZE(elements); i++) { + elements[i].mark = i; + mpsc_queue_insert(q, &elements[i].node.mpscq); + } + + node = mpsc_queue_pop(q); + mpsc_queue_push_back(q, node); + ovs_assert(mpsc_queue_pop(q) == node); + mpsc_queue_push_back(q, node); + + i = 0; + MPSC_QUEUE_FOR_EACH (node, q) { + struct element *e = CONTAINER_OF(node, struct element, node.mpscq); + ovs_assert(e == &elements[i]); + i++; + } + ovs_assert(i == ARRAY_SIZE(elements)); + + MPSC_QUEUE_FOR_EACH_POP (node, q) { + struct element *e = CONTAINER_OF(node, struct element, node.mpscq); + ovs_assert(e->mark == (unsigned int)(e - elements)); + } + + mpsc_queue_release(q); + mpsc_queue_destroy(q); + + printf("."); +} + +static void +run_tests(struct ovs_cmdl_context *ctx OVS_UNUSED) +{ + /* Verify basic insertion. */ + test_mpsc_queue_insert(); + /* Test partial insertion. */ + test_mpsc_queue_insert_partial(); + /* Verify removal order is respected. */ + test_mpsc_queue_removal_fifo(); + /* Verify tail-end insertion works. */ + test_mpsc_queue_push_back(); + printf("\n"); +} + +static struct element *elements; +static uint64_t *thread_working_ms; /* Measured work time. */ + +static unsigned int n_threads; +static unsigned int n_elems; + +static struct ovs_barrier barrier; +static volatile bool working; + +static int +elapsed(const struct timeval *start) +{ + struct timeval end; + + xgettimeofday(&end); + return timeval_to_msec(&end) - timeval_to_msec(start); +} + +struct mpscq_aux { + struct mpsc_queue *queue; + atomic_uint thread_id; +}; + +static void * +mpsc_queue_insert_thread(void *aux_) +{ + unsigned int n_elems_per_thread; + struct element *th_elements; + struct mpscq_aux *aux = aux_; + struct timeval start; + unsigned int id; + size_t i; + + atomic_add(&aux->thread_id, 1u, &id); + n_elems_per_thread = n_elems / n_threads; + th_elements = &elements[id * n_elems_per_thread]; + + ovs_barrier_block(&barrier); + xgettimeofday(&start); + + for (i = 0; i < n_elems_per_thread; i++) { + mpsc_queue_insert(aux->queue, &th_elements[i].node.mpscq); + } + + thread_working_ms[id] = elapsed(&start); + ovs_barrier_block(&barrier); + + working = false; + + return NULL; +} + +static void +benchmark_mpsc_queue(void) +{ + struct mpsc_queue_node *node; + struct mpsc_queue queue; + struct timeval start; + unsigned int counter; + bool work_complete; + pthread_t *threads; + struct mpscq_aux aux; + uint64_t epoch; + uint64_t avg; + size_t i; + + memset(elements, 0, n_elems & sizeof *elements); + memset(thread_working_ms, 0, n_threads & sizeof *thread_working_ms); + + mpsc_queue_init(&queue); + + aux.queue = &queue; + atomic_store(&aux.thread_id, 0); + + for (i = n_elems - (n_elems % n_threads); i < n_elems; i++) { + mpsc_queue_insert(&queue, &elements[i].node.mpscq); + } + + working = true; + + threads = xmalloc(n_threads * sizeof *threads); + ovs_barrier_init(&barrier, n_threads); + + for (i = 0; i < n_threads; i++) { + threads[i] = ovs_thread_create("sc_queue_insert", + mpsc_queue_insert_thread, &aux); + } + + mpsc_queue_acquire(&queue); + xgettimeofday(&start); + + counter = 0; + epoch = 1; + do { + while (mpsc_queue_poll(&queue, &node) == MPSC_QUEUE_ITEM) { + test_mpsc_queue_mark_element(node, epoch, &counter); + } + if (epoch == UINT64_MAX) { + epoch = 0; + } + epoch++; + } while (working); + + avg = 0; + for (i = 0; i < n_threads; i++) { + xpthread_join(threads[i], NULL); + avg += thread_working_ms[i]; + } + avg /= n_threads; + + /* Elements might have been inserted before threads were joined. */ + while (mpsc_queue_poll(&queue, &node) == MPSC_QUEUE_ITEM) { + test_mpsc_queue_mark_element(node, epoch, &counter); + } + + printf(" mpsc-queue: %6d", elapsed(&start)); + for (i = 0; i < n_threads; i++) { + printf(" %6" PRIu64, thread_working_ms[i]); + } + printf(" %6" PRIu64 " ms\n", avg); + + mpsc_queue_release(&queue); + mpsc_queue_destroy(&queue); + ovs_barrier_destroy(&barrier); + free(threads); + + work_complete = true; + for (i = 0; i < n_elems; i++) { + if (elements[i].mark == 0) { + printf("Element %" PRIuSIZE " was never consumed.\n", i); + work_complete = false; + } + } + ovs_assert(work_complete); + ovs_assert(counter == n_elems); +} + +struct list_aux { + struct ovs_list *list; + struct ovs_mutex *lock; + atomic_uint thread_id; +}; + +static void * +locked_list_insert_thread(void *aux_) +{ + unsigned int n_elems_per_thread; + struct element *th_elements; + struct list_aux *aux = aux_; + struct timeval start; + unsigned int id; + size_t i; + + atomic_add(&aux->thread_id, 1u, &id); + n_elems_per_thread = n_elems / n_threads; + th_elements = &elements[id * n_elems_per_thread]; + + ovs_barrier_block(&barrier); + xgettimeofday(&start); + + for (i = 0; i < n_elems_per_thread; i++) { + ovs_mutex_lock(aux->lock); + ovs_list_push_front(aux->list, &th_elements[i].node.list); + ovs_mutex_unlock(aux->lock); + } + + thread_working_ms[id] = elapsed(&start); + ovs_barrier_block(&barrier); + + working = false; + + return NULL; +} + +static void +benchmark_list(void) +{ + struct ovs_mutex lock; + struct ovs_list list; + struct element *elem; + struct timeval start; + unsigned int counter; + bool work_complete; + pthread_t *threads; + struct list_aux aux; + uint64_t epoch; + uint64_t avg; + size_t i; + + memset(elements, 0, n_elems * sizeof *elements); + memset(thread_working_ms, 0, n_threads * sizeof *thread_working_ms); + + ovs_mutex_init(&lock); + ovs_list_init(&list); + + aux.list = &list; + aux.lock = &lock; + atomic_store(&aux.thread_id, 0); + + ovs_mutex_lock(&lock); + for (i = n_elems - (n_elems % n_threads); i < n_elems; i++) { + ovs_list_push_front(&list, &elements[i].node.list); + } + ovs_mutex_unlock(&lock); + + working = true; + + threads = xmalloc(n_threads * sizeof *threads); + ovs_barrier_init(&barrier, n_threads); + + for (i = 0; i < n_threads; i++) { + threads[i] = ovs_thread_create("locked_list_insert", + locked_list_insert_thread, &aux); + } + + xgettimeofday(&start); + + counter = 0; + epoch = 1; + do { + ovs_mutex_lock(&lock); + LIST_FOR_EACH_POP (elem, node.list, &list) { + elem->mark = epoch; + counter++; + } + ovs_mutex_unlock(&lock); + if (epoch == UINT64_MAX) { + epoch = 0; + } + epoch++; + } while (working); + + avg = 0; + for (i = 0; i < n_threads; i++) { + xpthread_join(threads[i], NULL); + avg += thread_working_ms[i]; + } + avg /= n_threads; + + /* Elements might have been inserted before threads were joined. */ + ovs_mutex_lock(&lock); + LIST_FOR_EACH_POP (elem, node.list, &list) { + elem->mark = epoch; + counter++; + } + ovs_mutex_unlock(&lock); + + printf(" list: %6d", elapsed(&start)); + for (i = 0; i < n_threads; i++) { + printf(" %6" PRIu64, thread_working_ms[i]); + } + printf(" %6" PRIu64 " ms\n", avg); + ovs_barrier_destroy(&barrier); + free(threads); + + work_complete = true; + for (i = 0; i < n_elems; i++) { + if (elements[i].mark == 0) { + printf("Element %" PRIuSIZE " was never consumed.\n", i); + work_complete = false; + } + } + ovs_assert(work_complete); + ovs_assert(counter == n_elems); +} + +struct guarded_list_aux { + struct guarded_list *glist; + atomic_uint thread_id; +}; + +static void * +guarded_list_insert_thread(void *aux_) +{ + unsigned int n_elems_per_thread; + struct element *th_elements; + struct guarded_list_aux *aux = aux_; + struct timeval start; + unsigned int id; + size_t i; + + atomic_add(&aux->thread_id, 1u, &id); + n_elems_per_thread = n_elems / n_threads; + th_elements = &elements[id * n_elems_per_thread]; + + ovs_barrier_block(&barrier); + xgettimeofday(&start); + + for (i = 0; i < n_elems_per_thread; i++) { + guarded_list_push_back(aux->glist, &th_elements[i].node.list, n_elems); + } + + thread_working_ms[id] = elapsed(&start); + ovs_barrier_block(&barrier); + + working = false; + + return NULL; +} + +static void +benchmark_guarded_list(void) +{ + struct guarded_list_aux aux; + struct ovs_list extracted; + struct guarded_list glist; + struct element *elem; + struct timeval start; + unsigned int counter; + bool work_complete; + pthread_t *threads; + uint64_t epoch; + uint64_t avg; + size_t i; + + memset(elements, 0, n_elems * sizeof *elements); + memset(thread_working_ms, 0, n_threads * sizeof *thread_working_ms); + + guarded_list_init(&glist); + ovs_list_init(&extracted); + + aux.glist = &glist; + atomic_store(&aux.thread_id, 0); + + for (i = n_elems - (n_elems % n_threads); i < n_elems; i++) { + guarded_list_push_back(&glist, &elements[i].node.list, n_elems); + } + + working = true; + + threads = xmalloc(n_threads * sizeof *threads); + ovs_barrier_init(&barrier, n_threads); + + for (i = 0; i < n_threads; i++) { + threads[i] = ovs_thread_create("guarded_list_insert", + guarded_list_insert_thread, &aux); + } + + xgettimeofday(&start); + + counter = 0; + epoch = 1; + do { + guarded_list_pop_all(&glist, &extracted); + LIST_FOR_EACH_POP (elem, node.list, &extracted) { + elem->mark = epoch; + counter++; + } + if (epoch == UINT64_MAX) { + epoch = 0; + } + epoch++; + } while (working); + + avg = 0; + for (i = 0; i < n_threads; i++) { + xpthread_join(threads[i], NULL); + avg += thread_working_ms[i]; + } + avg /= n_threads; + + /* Elements might have been inserted before threads were joined. */ + guarded_list_pop_all(&glist, &extracted); + LIST_FOR_EACH_POP (elem, node.list, &extracted) { + elem->mark = epoch; + counter++; + } + + printf("guarded list: %6d", elapsed(&start)); + for (i = 0; i < n_threads; i++) { + printf(" %6" PRIu64, thread_working_ms[i]); + } + printf(" %6" PRIu64 " ms\n", avg); + ovs_barrier_destroy(&barrier); + free(threads); + guarded_list_destroy(&glist); + + work_complete = true; + for (i = 0; i < n_elems; i++) { + if (elements[i].mark == 0) { + printf("Element %" PRIuSIZE " was never consumed.\n", i); + work_complete = false; + } + } + ovs_assert(work_complete); + ovs_assert(counter == n_elems); +} + +static void +run_benchmarks(struct ovs_cmdl_context *ctx) +{ + long int l_threads; + long int l_elems; + size_t i; + + l_elems = strtol(ctx->argv[1], NULL, 10); + l_threads = strtol(ctx->argv[2], NULL, 10); + ovs_assert(l_elems > 0 && l_threads > 0); + + n_elems = l_elems; + n_threads = l_threads; + + elements = xcalloc(n_elems, sizeof *elements); + thread_working_ms = xcalloc(n_threads, sizeof *thread_working_ms); + + printf("Benchmarking n=%u on 1 + %u threads.\n", n_elems, n_threads); + + printf(" type\\thread: Reader "); + for (i = 0; i < n_threads; i++) { + printf(" %3" PRIuSIZE " ", i + 1); + } + printf(" Avg\n"); + + benchmark_mpsc_queue(); + benchmark_list(); + benchmark_guarded_list(); + + free(thread_working_ms); + free(elements); +} + +static const struct ovs_cmdl_command commands[] = { + {"check", NULL, 0, 0, run_tests, OVS_RO}, + {"benchmark", "<nb elem> <nb threads>", 2, 2, run_benchmarks, OVS_RO}, + {NULL, NULL, 0, 0, NULL, OVS_RO}, +}; + +static void +test_mpsc_queue_main(int argc, char *argv[]) +{ + struct ovs_cmdl_context ctx = { + .argc = argc - optind, + .argv = argv + optind, + }; + + set_program_name(argv[0]); + ovs_cmdl_run_command(&ctx, commands); +} + +OVSTEST_REGISTER("test-mpsc-queue", test_mpsc_queue_main); -- 2.31.1 _______________________________________________ dev mailing list [email protected] https://mail.openvswitch.org/mailman/listinfo/ovs-dev
