From: Petri Savolainen <petri.savolai...@linaro.org>

Change from linked list of bursts to a ring implementation.
Queues have maximum size but code is simpler and performance
is a bit better. This step helps in a potential future step to
implement queues with a lockless ring.

Signed-off-by: Petri Savolainen <petri.savolai...@linaro.org>
---
/** Email created from pull request 492 (psavol:master-sched-optim)
 ** https://github.com/Linaro/odp/pull/492
 ** Patch: https://github.com/Linaro/odp/pull/492.patch
 ** Base sha: 5a58bbf2bb331fd7dde2ebbc0430634ace6900fb
 ** Merge commit sha: b29563293c1bca56419d2dc355a8e64d961e024a
 **/
 .../linux-generic/include/odp_buffer_internal.h    |  34 ++---
 platform/linux-generic/include/odp_pool_internal.h |  28 ++++
 .../linux-generic/include/odp_queue_internal.h     |   5 +-
 platform/linux-generic/odp_packet.c                |   3 +-
 platform/linux-generic/odp_pool.c                  |  22 +--
 platform/linux-generic/odp_queue.c                 | 159 +++++++++------------
 6 files changed, 120 insertions(+), 131 deletions(-)

diff --git a/platform/linux-generic/include/odp_buffer_internal.h 
b/platform/linux-generic/include/odp_buffer_internal.h
index b52669387..b7af785aa 100644
--- a/platform/linux-generic/include/odp_buffer_internal.h
+++ b/platform/linux-generic/include/odp_buffer_internal.h
@@ -33,19 +33,31 @@ extern "C" {
 #include <odp_schedule_if.h>
 #include <stddef.h>
 
-#define BUFFER_BURST_SIZE    CONFIG_BURST_SIZE
-
 typedef struct seg_entry_t {
        void     *hdr;
        uint8_t  *data;
        uint32_t  len;
 } seg_entry_t;
 
+typedef union buffer_index_t {
+       uint32_t u32;
+
+       struct {
+               uint32_t pool   :8;
+               uint32_t buffer :24;
+       };
+} buffer_index_t;
+
+/* Check that pool index fit into bit field */
+ODP_STATIC_ASSERT(ODP_CONFIG_POOLS    <= (0xFF + 1), "TOO_MANY_POOLS");
+
+/* Check that buffer index fit into bit field */
+ODP_STATIC_ASSERT(CONFIG_POOL_MAX_NUM <= (0xFFFFFF + 1), "TOO_LARGE_POOL");
+
 /* Common buffer header */
 struct ODP_ALIGNED_CACHE odp_buffer_hdr_t {
-
-       /* Buffer index in the pool */
-       uint32_t  index;
+       /* Combined pool and buffer index */
+       buffer_index_t index;
 
        /* Total segment count */
        uint16_t  segcount;
@@ -73,16 +85,6 @@ struct ODP_ALIGNED_CACHE odp_buffer_hdr_t {
        /* Segments */
        seg_entry_t seg[CONFIG_PACKET_SEGS_PER_HDR];
 
-       /* Burst counts */
-       uint8_t   burst_num;
-       uint8_t   burst_first;
-
-       /* Next buf in a list */
-       struct odp_buffer_hdr_t *next;
-
-       /* Burst table */
-       struct odp_buffer_hdr_t *burst[BUFFER_BURST_SIZE];
-
        /* --- Mostly read only data --- */
 
        /* User context pointer or u64 */
@@ -115,8 +117,6 @@ struct ODP_ALIGNED_CACHE odp_buffer_hdr_t {
 ODP_STATIC_ASSERT(CONFIG_PACKET_SEGS_PER_HDR < 256,
                  "CONFIG_PACKET_SEGS_PER_HDR_TOO_LARGE");
 
-ODP_STATIC_ASSERT(BUFFER_BURST_SIZE < 256, "BUFFER_BURST_SIZE_TOO_LARGE");
-
 #ifdef __cplusplus
 }
 #endif
diff --git a/platform/linux-generic/include/odp_pool_internal.h 
b/platform/linux-generic/include/odp_pool_internal.h
index 8284bcd7d..276032e7b 100644
--- a/platform/linux-generic/include/odp_pool_internal.h
+++ b/platform/linux-generic/include/odp_pool_internal.h
@@ -104,6 +104,34 @@ static inline odp_buffer_hdr_t 
*buf_hdl_to_hdr(odp_buffer_t buf)
        return (odp_buffer_hdr_t *)(uintptr_t)buf;
 }
 
+static inline odp_buffer_hdr_t *buf_hdr_from_index(pool_t *pool,
+                                                  uint32_t buffer_idx)
+{
+       uint32_t block_offset;
+       odp_buffer_hdr_t *buf_hdr;
+
+       block_offset = buffer_idx * pool->block_size;
+
+       /* clang requires cast to uintptr_t */
+       buf_hdr = (odp_buffer_hdr_t *)(uintptr_t)&pool->base_addr[block_offset];
+
+       return buf_hdr;
+}
+
+static inline odp_buffer_hdr_t *buf_hdr_from_index_u32(uint32_t u32)
+{
+       buffer_index_t index;
+       uint32_t pool_idx, buffer_idx;
+       pool_t *pool;
+
+       index.u32  = u32;
+       pool_idx   = index.pool;
+       buffer_idx = index.buffer;
+       pool       = pool_entry(pool_idx);
+
+       return buf_hdr_from_index(pool, buffer_idx);
+}
+
 int buffer_alloc_multi(pool_t *pool, odp_buffer_hdr_t *buf_hdr[], int num);
 void buffer_free_multi(odp_buffer_hdr_t *buf_hdr[], int num_free);
 
diff --git a/platform/linux-generic/include/odp_queue_internal.h 
b/platform/linux-generic/include/odp_queue_internal.h
index 6fe3fb462..68dddbb0b 100644
--- a/platform/linux-generic/include/odp_queue_internal.h
+++ b/platform/linux-generic/include/odp_queue_internal.h
@@ -29,6 +29,7 @@ extern "C" {
 #include <odp/api/hints.h>
 #include <odp/api/ticketlock.h>
 #include <odp_config_internal.h>
+#include <odp_ring_st_internal.h>
 
 #define QUEUE_STATUS_FREE         0
 #define QUEUE_STATUS_DESTROYED    1
@@ -38,9 +39,7 @@ extern "C" {
 
 struct queue_entry_s {
        odp_ticketlock_t  ODP_ALIGNED_CACHE lock;
-
-       odp_buffer_hdr_t *head;
-       odp_buffer_hdr_t *tail;
+       ring_st_t         ring_st;
        int               status;
 
        queue_enq_fn_t       ODP_ALIGNED_CACHE enqueue;
diff --git a/platform/linux-generic/odp_packet.c 
b/platform/linux-generic/odp_packet.c
index 1c1b45ff1..6f8f2d039 100644
--- a/platform/linux-generic/odp_packet.c
+++ b/platform/linux-generic/odp_packet.c
@@ -1814,7 +1814,8 @@ void odp_packet_print_data(odp_packet_t pkt, uint32_t 
offset,
        len += snprintf(&str[len], n - len,
                        "  pool index    %" PRIu32 "\n", pool->pool_idx);
        len += snprintf(&str[len], n - len,
-                       "  buf index     %" PRIu32 "\n", hdr->buf_hdr.index);
+                       "  buf index     %" PRIu32 "\n",
+                       hdr->buf_hdr.index.buffer);
        len += snprintf(&str[len], n - len,
                        "  segcount      %" PRIu16 "\n", hdr->buf_hdr.segcount);
        len += snprintf(&str[len], n - len,
diff --git a/platform/linux-generic/odp_pool.c 
b/platform/linux-generic/odp_pool.c
index e5ba8982a..a9c57f4cb 100644
--- a/platform/linux-generic/odp_pool.c
+++ b/platform/linux-generic/odp_pool.c
@@ -80,20 +80,6 @@ static inline pool_t *pool_from_buf(odp_buffer_t buf)
        return buf_hdr->pool_ptr;
 }
 
-static inline odp_buffer_hdr_t *buf_hdr_from_index(pool_t *pool,
-                                                  uint32_t buffer_idx)
-{
-       uint32_t block_offset;
-       odp_buffer_hdr_t *buf_hdr;
-
-       block_offset = buffer_idx * pool->block_size;
-
-       /* clang requires cast to uintptr_t */
-       buf_hdr = (odp_buffer_hdr_t *)(uintptr_t)&pool->base_addr[block_offset];
-
-       return buf_hdr;
-}
-
 int odp_pool_init_global(void)
 {
        uint32_t i;
@@ -296,7 +282,9 @@ static void init_buffers(pool_t *pool)
                memset(buf_hdr, 0, (uintptr_t)data - (uintptr_t)buf_hdr);
 
                /* Initialize buffer metadata */
-               buf_hdr->index = i;
+               buf_hdr->index.u32    = 0;
+               buf_hdr->index.pool   = pool->pool_idx;
+               buf_hdr->index.buffer = i;
                buf_hdr->type = type;
                buf_hdr->event_type = type;
                buf_hdr->pool_ptr = pool;
@@ -782,7 +770,7 @@ static inline void buffer_free_to_pool(pool_t *pool,
                ring  = &pool->ring->hdr;
                mask  = pool->ring_mask;
                for (i = 0; i < num; i++)
-                       buf_index[i] = buf_hdr[i]->index;
+                       buf_index[i] = buf_hdr[i]->index.buffer;
 
                ring_enq_multi(ring, mask, buf_index, num);
 
@@ -822,7 +810,7 @@ static inline void buffer_free_to_pool(pool_t *pool,
        }
 
        for (i = 0; i < num; i++)
-               cache->buf_index[cache_num + i] = buf_hdr[i]->index;
+               cache->buf_index[cache_num + i] = buf_hdr[i]->index.buffer;
 
        cache->num = cache_num + num;
 }
diff --git a/platform/linux-generic/odp_queue.c 
b/platform/linux-generic/odp_queue.c
index 21135df63..eda872321 100644
--- a/platform/linux-generic/odp_queue.c
+++ b/platform/linux-generic/odp_queue.c
@@ -39,8 +39,15 @@
 static int queue_init(queue_entry_t *queue, const char *name,
                      const odp_queue_param_t *param);
 
+typedef struct ODP_ALIGNED_CACHE {
+       /* Storage space for ring data */
+       uint32_t data[CONFIG_QUEUE_SIZE];
+} ring_data_t;
+
 typedef struct queue_table_t {
-       queue_entry_t  queue[ODP_CONFIG_QUEUES];
+       queue_entry_t queue[ODP_CONFIG_QUEUES];
+       ring_data_t   ring_data[ODP_CONFIG_QUEUES];
+
 } queue_table_t;
 
 static queue_table_t *queue_tbl;
@@ -143,8 +150,10 @@ static int queue_capability(odp_queue_capability_t *capa)
        capa->max_sched_groups  = sched_fn->num_grps();
        capa->sched_prios       = odp_schedule_num_prio();
        capa->plain.max_num     = capa->max_queues;
+       capa->plain.max_size    = CONFIG_QUEUE_SIZE;
        capa->plain.nonblocking = ODP_BLOCKING;
        capa->sched.max_num     = capa->max_queues;
+       capa->sched.max_size    = CONFIG_QUEUE_SIZE;
        capa->sched.nonblocking = ODP_BLOCKING;
 
        return 0;
@@ -192,6 +201,9 @@ static odp_queue_t queue_create(const char *name,
                param = &default_param;
        }
 
+       if (param->size > CONFIG_QUEUE_SIZE)
+               return ODP_QUEUE_INVALID;
+
        for (i = 0; i < ODP_CONFIG_QUEUES; i++) {
                queue = &queue_tbl->queue[i];
 
@@ -263,7 +275,7 @@ static int queue_destroy(odp_queue_t handle)
                ODP_ERR("queue \"%s\" already destroyed\n", queue->s.name);
                return -1;
        }
-       if (queue->s.head != NULL) {
+       if (ring_st_is_empty(&queue->s.ring_st) == 0) {
                UNLOCK(&queue->s.lock);
                ODP_ERR("queue \"%s\" not empty\n", queue->s.name);
                return -1;
@@ -326,81 +338,71 @@ static odp_queue_t queue_lookup(const char *name)
        return ODP_QUEUE_INVALID;
 }
 
+static inline void buffer_index_from_buf(uint32_t buffer_index[],
+                                        odp_buffer_hdr_t *buf_hdr[], int num)
+{
+       int i;
+
+       for (i = 0; i < num; i++)
+               buffer_index[i] = buf_hdr[i]->index.u32;
+}
+
+static inline void buffer_index_to_buf(odp_buffer_hdr_t *buf_hdr[],
+                                      uint32_t buffer_index[], int num)
+{
+       int i;
+
+       for (i = 0; i < num; i++) {
+               buf_hdr[i] = buf_hdr_from_index_u32(buffer_index[i]);
+               odp_prefetch(buf_hdr[i]);
+       }
+}
+
 static inline int enq_multi(queue_t q_int, odp_buffer_hdr_t *buf_hdr[],
                            int num)
 {
        int sched = 0;
-       int i, ret;
+       int ret;
        queue_entry_t *queue;
-       odp_buffer_hdr_t *hdr, *tail, *next_hdr;
+       int num_enq;
+       ring_st_t *ring_st;
+       uint32_t buf_idx[num];
 
        queue = qentry_from_int(q_int);
+       ring_st = &queue->s.ring_st;
+
        if (sched_fn->ord_enq_multi(q_int, (void **)buf_hdr, num, &ret))
                return ret;
 
-       /* Optimize the common case of single enqueue */
-       if (num == 1) {
-               tail = buf_hdr[0];
-               hdr  = tail;
-               hdr->burst_num = 0;
-               hdr->next = NULL;
-       } else {
-               int next;
-
-               /* Start from the last buffer header */
-               tail = buf_hdr[num - 1];
-               hdr  = tail;
-               hdr->next = NULL;
-               next = num - 2;
-
-               while (1) {
-                       /* Build a burst. The buffer header carrying
-                        * a burst is the last buffer of the burst. */
-                       for (i = 0; next >= 0 && i < BUFFER_BURST_SIZE;
-                            i++, next--)
-                               hdr->burst[BUFFER_BURST_SIZE - 1 - i] =
-                                       buf_hdr[next];
-
-                       hdr->burst_num   = i;
-                       hdr->burst_first = BUFFER_BURST_SIZE - i;
-
-                       if (odp_likely(next < 0))
-                               break;
-
-                       /* Get another header and link it */
-                       next_hdr  = hdr;
-                       hdr       = buf_hdr[next];
-                       hdr->next = next_hdr;
-                       next--;
-               }
-       }
+       buffer_index_from_buf(buf_idx, buf_hdr, num);
 
        LOCK(&queue->s.lock);
+
        if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
                UNLOCK(&queue->s.lock);
                ODP_ERR("Bad queue status\n");
                return -1;
        }
 
-       /* Empty queue */
-       if (queue->s.head == NULL)
-               queue->s.head = hdr;
-       else
-               queue->s.tail->next = hdr;
+       num_enq = ring_st_enq_multi(ring_st, buf_idx, num);
 
-       queue->s.tail = tail;
+       if (odp_unlikely(num_enq == 0)) {
+               UNLOCK(&queue->s.lock);
+               return 0;
+       }
 
        if (queue->s.status == QUEUE_STATUS_NOTSCHED) {
                queue->s.status = QUEUE_STATUS_SCHED;
-               sched = 1; /* retval: schedule queue */
+               sched = 1;
        }
+
        UNLOCK(&queue->s.lock);
 
        /* Add queue to scheduling */
        if (sched && sched_fn->sched_queue(queue->s.index))
                ODP_ABORT("schedule_queue failed\n");
 
-       return num; /* All events enqueued */
+       return num_enq;
 }
 
 static int queue_int_enq_multi(queue_t q_int, odp_buffer_hdr_t *buf_hdr[],
@@ -446,12 +448,15 @@ static int queue_enq(odp_queue_t handle, odp_event_t ev)
 static inline int deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
                            int num)
 {
-       odp_buffer_hdr_t *hdr, *next;
-       int i, j;
-       int updated = 0;
        int status_sync = sched_fn->status_sync;
+       int num_deq;
+       ring_st_t *ring_st;
+       uint32_t buf_idx[num];
+
+       ring_st = &queue->s.ring_st;
 
        LOCK(&queue->s.lock);
+
        if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
                /* Bad queue, or queue has been destroyed.
                 * Scheduler finalizes queue destroy after this. */
@@ -459,9 +464,9 @@ static inline int deq_multi(queue_entry_t *queue, 
odp_buffer_hdr_t *buf_hdr[],
                return -1;
        }
 
-       hdr = queue->s.head;
+       num_deq = ring_st_deq_multi(ring_st, buf_idx, num);
 
-       if (hdr == NULL) {
+       if (num_deq == 0) {
                /* Already empty queue */
                if (queue->s.status == QUEUE_STATUS_SCHED) {
                        queue->s.status = QUEUE_STATUS_NOTSCHED;
@@ -471,51 +476,18 @@ static inline int deq_multi(queue_entry_t *queue, 
odp_buffer_hdr_t *buf_hdr[],
                }
 
                UNLOCK(&queue->s.lock);
-               return 0;
-       }
-
-       for (i = 0; i < num && hdr; ) {
-               int burst_num = hdr->burst_num;
-               int first     = hdr->burst_first;
 
-               /* First, get bursted buffers */
-               for (j = 0; j < burst_num && i < num; j++, i++) {
-                       buf_hdr[i] = hdr->burst[first + j];
-                       odp_prefetch(buf_hdr[i]);
-               }
-
-               if (burst_num) {
-                       hdr->burst_num   = burst_num - j;
-                       hdr->burst_first = first + j;
-               }
-
-               if (i == num)
-                       break;
-
-               /* When burst is empty, consume the current buffer header and
-                * move to the next header */
-               buf_hdr[i] = hdr;
-               next       = hdr->next;
-               hdr->next  = NULL;
-               hdr        = next;
-               updated++;
-               i++;
+               return 0;
        }
 
-       /* Write head only if updated */
-       if (updated)
-               queue->s.head = hdr;
-
-       /* Queue is empty */
-       if (hdr == NULL)
-               queue->s.tail = NULL;
-
        if (status_sync && queue->s.type == ODP_QUEUE_TYPE_SCHED)
                sched_fn->save_context(queue->s.index);
 
        UNLOCK(&queue->s.lock);
 
-       return i;
+       buffer_index_to_buf(buf_hdr, buf_idx, num_deq);
+
+       return num_deq;
 }
 
 static int queue_int_deq_multi(queue_t q_int, odp_buffer_hdr_t *buf_hdr[],
@@ -584,8 +556,9 @@ static int queue_init(queue_entry_t *queue, const char 
*name,
        queue->s.pktin = PKTIN_INVALID;
        queue->s.pktout = PKTOUT_INVALID;
 
-       queue->s.head = NULL;
-       queue->s.tail = NULL;
+       ring_st_init(&queue->s.ring_st,
+                    queue_tbl->ring_data[queue->s.index].data,
+                    CONFIG_QUEUE_SIZE);
 
        return 0;
 }
@@ -661,7 +634,7 @@ int sched_cb_queue_empty(uint32_t queue_index)
                return -1;
        }
 
-       if (queue->s.head == NULL) {
+       if (ring_st_is_empty(&queue->s.ring_st)) {
                /* Already empty queue. Update status. */
                if (queue->s.status == QUEUE_STATUS_SCHED)
                        queue->s.status = QUEUE_STATUS_NOTSCHED;

Reply via email to