From: Petri Savolainen <petri.savolai...@linaro.org> Change plain queue implementation to use ring_mpmc instead of ticket lock and ring_st ring. Performance and scalability improves especially on 64 bit ARM.
Signed-off-by: Petri Savolainen <petri.savolai...@linaro.org> --- /** Email created from pull request 683 (psavol:master-queue-lockless-enqdeq-3) ** https://github.com/Linaro/odp/pull/683 ** Patch: https://github.com/Linaro/odp/pull/683.patch ** Base sha: 989df5d2f97ab4711328b11282dcc743f5740e00 ** Merge commit sha: 28073c54671148efdd01c9cf38c1a235d5a133f0 **/ .../include/odp_queue_basic_internal.h | 30 +++++++----- platform/linux-generic/odp_queue_basic.c | 48 +++++++------------ 2 files changed, 37 insertions(+), 41 deletions(-) diff --git a/platform/linux-generic/include/odp_queue_basic_internal.h b/platform/linux-generic/include/odp_queue_basic_internal.h index 15e49772c..46b747955 100644 --- a/platform/linux-generic/include/odp_queue_basic_internal.h +++ b/platform/linux-generic/include/odp_queue_basic_internal.h @@ -22,6 +22,7 @@ extern "C" { #include <odp/api/hints.h> #include <odp/api/ticketlock.h> #include <odp_config_internal.h> +#include <odp_ring_mpmc_internal.h> #include <odp_ring_st_internal.h> #include <odp_ring_spsc_internal.h> #include <odp_queue_lf.h> @@ -33,22 +34,29 @@ extern "C" { #define QUEUE_STATUS_SCHED 4 struct queue_entry_s { - odp_ticketlock_t ODP_ALIGNED_CACHE lock; - union { - ring_st_t ring_st; - ring_spsc_t ring_spsc; - }; - int status; - + /* The first cache line is read only */ queue_enq_fn_t ODP_ALIGNED_CACHE enqueue; queue_deq_fn_t dequeue; queue_enq_multi_fn_t enqueue_multi; queue_deq_multi_fn_t dequeue_multi; - queue_deq_multi_fn_t orig_dequeue_multi; + uint32_t *ring_data; + uint32_t ring_mask; + uint32_t index; + odp_queue_t handle; + odp_queue_type_t type; + + /* MPMC ring (2 cache lines). */ + ring_mpmc_t ring_mpmc; - uint32_t index; - odp_queue_t handle; - odp_queue_type_t type; + odp_ticketlock_t lock; + union { + ring_st_t ring_st; + ring_spsc_t ring_spsc; + }; + + int status; + + queue_deq_multi_fn_t orig_dequeue_multi; odp_queue_param_t param; odp_pktin_queue_t pktin; odp_pktout_queue_t pktout; diff --git a/platform/linux-generic/odp_queue_basic.c b/platform/linux-generic/odp_queue_basic.c index e5d915643..8b9a70bbc 100644 --- a/platform/linux-generic/odp_queue_basic.c +++ b/platform/linux-generic/odp_queue_basic.c @@ -400,8 +400,10 @@ static int queue_destroy(odp_queue_t handle) if (queue->s.spsc) empty = ring_spsc_is_empty(&queue->s.ring_spsc); - else + else if (queue->s.type == ODP_QUEUE_TYPE_SCHED) empty = ring_st_is_empty(&queue->s.ring_st); + else + empty = ring_mpmc_is_empty(&queue->s.ring_mpmc); if (!empty) { UNLOCK(queue); @@ -490,28 +492,19 @@ static inline int _plain_queue_enq_multi(odp_queue_t handle, { queue_entry_t *queue; int ret, num_enq; - ring_st_t *ring_st; + ring_mpmc_t *ring_mpmc; uint32_t buf_idx[num]; queue = qentry_from_handle(handle); - ring_st = &queue->s.ring_st; + ring_mpmc = &queue->s.ring_mpmc; if (sched_fn->ord_enq_multi(handle, (void **)buf_hdr, num, &ret)) return ret; buffer_index_from_buf(buf_idx, buf_hdr, num); - LOCK(queue); - - if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) { - UNLOCK(queue); - ODP_ERR("Bad queue status\n"); - return -1; - } - - num_enq = ring_st_enq_multi(ring_st, buf_idx, num); - - UNLOCK(queue); + num_enq = ring_mpmc_enq_multi(ring_mpmc, queue->s.ring_data, + queue->s.ring_mask, buf_idx, num); return num_enq; } @@ -521,23 +514,14 @@ static inline int _plain_queue_deq_multi(odp_queue_t handle, { int num_deq; queue_entry_t *queue; - ring_st_t *ring_st; + ring_mpmc_t *ring_mpmc; uint32_t buf_idx[num]; queue = qentry_from_handle(handle); - ring_st = &queue->s.ring_st; + ring_mpmc = &queue->s.ring_mpmc; - LOCK(queue); - - if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) { - /* Bad queue, or queue has been destroyed. */ - UNLOCK(queue); - return -1; - } - - num_deq = ring_st_deq_multi(ring_st, buf_idx, num); - - UNLOCK(queue); + num_deq = ring_mpmc_deq_multi(ring_mpmc, queue->s.ring_data, + queue->s.ring_mask, buf_idx, num); if (num_deq == 0) return 0; @@ -883,13 +867,17 @@ static int queue_init(queue_entry_t *queue, const char *name, queue->s.dequeue = plain_queue_deq; queue->s.dequeue_multi = plain_queue_deq_multi; queue->s.orig_dequeue_multi = plain_queue_deq_multi; + + queue->s.ring_data = &queue_glb->ring_data[offset]; + queue->s.ring_mask = queue_size - 1; + ring_mpmc_init(&queue->s.ring_mpmc); + } else { queue->s.enqueue = sched_queue_enq; queue->s.enqueue_multi = sched_queue_enq_multi; + ring_st_init(&queue->s.ring_st, + &queue_glb->ring_data[offset], queue_size); } - - ring_st_init(&queue->s.ring_st, &queue_glb->ring_data[offset], - queue_size); } return 0;