From: Petri Savolainen <petri.savolai...@linaro.org>

Change plain queue implementation to use ring_mpmc instead
of ticket lock and ring_st ring. Performance and scalability
improves especially on 64 bit ARM.

Signed-off-by: Petri Savolainen <petri.savolai...@linaro.org>
---
/** Email created from pull request 683 (psavol:master-queue-lockless-enqdeq-3)
 ** https://github.com/Linaro/odp/pull/683
 ** Patch: https://github.com/Linaro/odp/pull/683.patch
 ** Base sha: 989df5d2f97ab4711328b11282dcc743f5740e00
 ** Merge commit sha: 28073c54671148efdd01c9cf38c1a235d5a133f0
 **/
 .../include/odp_queue_basic_internal.h        | 30 +++++++-----
 platform/linux-generic/odp_queue_basic.c      | 48 +++++++------------
 2 files changed, 37 insertions(+), 41 deletions(-)

diff --git a/platform/linux-generic/include/odp_queue_basic_internal.h 
b/platform/linux-generic/include/odp_queue_basic_internal.h
index 15e49772c..46b747955 100644
--- a/platform/linux-generic/include/odp_queue_basic_internal.h
+++ b/platform/linux-generic/include/odp_queue_basic_internal.h
@@ -22,6 +22,7 @@ extern "C" {
 #include <odp/api/hints.h>
 #include <odp/api/ticketlock.h>
 #include <odp_config_internal.h>
+#include <odp_ring_mpmc_internal.h>
 #include <odp_ring_st_internal.h>
 #include <odp_ring_spsc_internal.h>
 #include <odp_queue_lf.h>
@@ -33,22 +34,29 @@ extern "C" {
 #define QUEUE_STATUS_SCHED        4
 
 struct queue_entry_s {
-       odp_ticketlock_t  ODP_ALIGNED_CACHE lock;
-       union {
-               ring_st_t         ring_st;
-               ring_spsc_t       ring_spsc;
-       };
-       int               status;
-
+       /* The first cache line is read only */
        queue_enq_fn_t       ODP_ALIGNED_CACHE enqueue;
        queue_deq_fn_t       dequeue;
        queue_enq_multi_fn_t enqueue_multi;
        queue_deq_multi_fn_t dequeue_multi;
-       queue_deq_multi_fn_t orig_dequeue_multi;
+       uint32_t             *ring_data;
+       uint32_t             ring_mask;
+       uint32_t             index;
+       odp_queue_t          handle;
+       odp_queue_type_t     type;
+
+       /* MPMC ring (2 cache lines). */
+       ring_mpmc_t          ring_mpmc;
 
-       uint32_t          index;
-       odp_queue_t       handle;
-       odp_queue_type_t  type;
+       odp_ticketlock_t     lock;
+       union {
+               ring_st_t    ring_st;
+               ring_spsc_t  ring_spsc;
+       };
+
+       int                  status;
+
+       queue_deq_multi_fn_t orig_dequeue_multi;
        odp_queue_param_t param;
        odp_pktin_queue_t pktin;
        odp_pktout_queue_t pktout;
diff --git a/platform/linux-generic/odp_queue_basic.c 
b/platform/linux-generic/odp_queue_basic.c
index e5d915643..8b9a70bbc 100644
--- a/platform/linux-generic/odp_queue_basic.c
+++ b/platform/linux-generic/odp_queue_basic.c
@@ -400,8 +400,10 @@ static int queue_destroy(odp_queue_t handle)
 
        if (queue->s.spsc)
                empty = ring_spsc_is_empty(&queue->s.ring_spsc);
-       else
+       else if (queue->s.type == ODP_QUEUE_TYPE_SCHED)
                empty = ring_st_is_empty(&queue->s.ring_st);
+       else
+               empty = ring_mpmc_is_empty(&queue->s.ring_mpmc);
 
        if (!empty) {
                UNLOCK(queue);
@@ -490,28 +492,19 @@ static inline int _plain_queue_enq_multi(odp_queue_t 
handle,
 {
        queue_entry_t *queue;
        int ret, num_enq;
-       ring_st_t *ring_st;
+       ring_mpmc_t *ring_mpmc;
        uint32_t buf_idx[num];
 
        queue = qentry_from_handle(handle);
-       ring_st = &queue->s.ring_st;
+       ring_mpmc = &queue->s.ring_mpmc;
 
        if (sched_fn->ord_enq_multi(handle, (void **)buf_hdr, num, &ret))
                return ret;
 
        buffer_index_from_buf(buf_idx, buf_hdr, num);
 
-       LOCK(queue);
-
-       if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
-               UNLOCK(queue);
-               ODP_ERR("Bad queue status\n");
-               return -1;
-       }
-
-       num_enq = ring_st_enq_multi(ring_st, buf_idx, num);
-
-       UNLOCK(queue);
+       num_enq = ring_mpmc_enq_multi(ring_mpmc, queue->s.ring_data,
+                                     queue->s.ring_mask, buf_idx, num);
 
        return num_enq;
 }
@@ -521,23 +514,14 @@ static inline int _plain_queue_deq_multi(odp_queue_t 
handle,
 {
        int num_deq;
        queue_entry_t *queue;
-       ring_st_t *ring_st;
+       ring_mpmc_t *ring_mpmc;
        uint32_t buf_idx[num];
 
        queue = qentry_from_handle(handle);
-       ring_st = &queue->s.ring_st;
+       ring_mpmc = &queue->s.ring_mpmc;
 
-       LOCK(queue);
-
-       if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
-               /* Bad queue, or queue has been destroyed. */
-               UNLOCK(queue);
-               return -1;
-       }
-
-       num_deq = ring_st_deq_multi(ring_st, buf_idx, num);
-
-       UNLOCK(queue);
+       num_deq = ring_mpmc_deq_multi(ring_mpmc, queue->s.ring_data,
+                                     queue->s.ring_mask, buf_idx, num);
 
        if (num_deq == 0)
                return 0;
@@ -883,13 +867,17 @@ static int queue_init(queue_entry_t *queue, const char 
*name,
                        queue->s.dequeue            = plain_queue_deq;
                        queue->s.dequeue_multi      = plain_queue_deq_multi;
                        queue->s.orig_dequeue_multi = plain_queue_deq_multi;
+
+                       queue->s.ring_data = &queue_glb->ring_data[offset];
+                       queue->s.ring_mask = queue_size - 1;
+                       ring_mpmc_init(&queue->s.ring_mpmc);
+
                } else {
                        queue->s.enqueue            = sched_queue_enq;
                        queue->s.enqueue_multi      = sched_queue_enq_multi;
+                       ring_st_init(&queue->s.ring_st,
+                                    &queue_glb->ring_data[offset], queue_size);
                }
-
-               ring_st_init(&queue->s.ring_st, &queue_glb->ring_data[offset],
-                            queue_size);
        }
 
        return 0;

Reply via email to