Added support for a buffer header to carry a burst of buffer
pointers. The buffer itself is the last one a burst. Burst are
built with a enq_multi call, so single enq operations do not
benefit from it.

Signed-off-by: Petri Savolainen <[email protected]>
---
 .../linux-generic/include/odp_buffer_internal.h    |   7 ++
 platform/linux-generic/odp_queue.c                 | 124 +++++++++++++++------
 platform/linux-generic/odp_schedule_ordered.c      |  19 +++-
 3 files changed, 113 insertions(+), 37 deletions(-)

diff --git a/platform/linux-generic/include/odp_buffer_internal.h 
b/platform/linux-generic/include/odp_buffer_internal.h
index 7b0ef8b..69daf94 100644
--- a/platform/linux-generic/include/odp_buffer_internal.h
+++ b/platform/linux-generic/include/odp_buffer_internal.h
@@ -103,6 +103,8 @@ typedef union odp_buffer_bits_t {
        };
 } odp_buffer_bits_t;
 
+#define BUFFER_BURST_SIZE    8
+
 /* Common buffer header */
 struct odp_buffer_hdr_t {
        struct odp_buffer_hdr_t *next;       /* next buf in a list--keep 1st */
@@ -111,6 +113,11 @@ struct odp_buffer_hdr_t {
                struct odp_buffer_hdr_t *link;
        };
        odp_buffer_bits_t        handle;     /* handle */
+
+       int burst_num;
+       int burst_first;
+       struct odp_buffer_hdr_t *burst[BUFFER_BURST_SIZE];
+
        union {
                uint32_t all;
                struct {
diff --git a/platform/linux-generic/odp_queue.c 
b/platform/linux-generic/odp_queue.c
index 80d99e8..5b962e9 100644
--- a/platform/linux-generic/odp_queue.c
+++ b/platform/linux-generic/odp_queue.c
@@ -388,20 +388,48 @@ static inline int enq_multi(queue_entry_t *queue, 
odp_buffer_hdr_t *buf_hdr[],
 {
        int sched = 0;
        int i, ret;
-       odp_buffer_hdr_t *tail;
-
-       /* Chain input buffers together */
-       for (i = 0; i < num - 1; i++)
-               buf_hdr[i]->next = buf_hdr[i + 1];
-
-       tail = buf_hdr[num - 1];
-       buf_hdr[num - 1]->next = NULL;
+       odp_buffer_hdr_t *hdr, *tail, *next_hdr;
 
+       /* Ordered queues do not use bursts */
        if (sched_fn->ord_enq_multi(queue->s.index, (void **)buf_hdr, num,
                                    sustain, &ret))
                return ret;
 
-       /* Handle unordered enqueues */
+       /* Optimize the common case of single enqueue */
+       if (num == 1) {
+               tail = buf_hdr[0];
+               hdr  = tail;
+               hdr->burst_num = 0;
+       } else {
+               int next;
+
+               /* Start from the last buffer header */
+               tail = buf_hdr[num - 1];
+               hdr  = tail;
+               next = num - 2;
+
+               while (1) {
+                       /* Build a burst. The buffer header carrying
+                        * a burst is the last buffer of the burst. */
+                       for (i = 0; next >= 0 && i < BUFFER_BURST_SIZE;
+                            i++, next--)
+                               hdr->burst[BUFFER_BURST_SIZE - 1 - i] =
+                                       buf_hdr[next];
+
+                       hdr->burst_num   = i;
+                       hdr->burst_first = BUFFER_BURST_SIZE - i;
+
+                       if (odp_likely(next < 0))
+                               break;
+
+                       /* Get another header and link it */
+                       next_hdr  = hdr;
+                       hdr       = buf_hdr[next];
+                       hdr->next = next_hdr;
+                       next--;
+               }
+       }
+
        LOCK(&queue->s.lock);
        if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
                UNLOCK(&queue->s.lock);
@@ -411,9 +439,9 @@ static inline int enq_multi(queue_entry_t *queue, 
odp_buffer_hdr_t *buf_hdr[],
 
        /* Empty queue */
        if (queue->s.head == NULL)
-               queue->s.head = buf_hdr[0];
+               queue->s.head = hdr;
        else
-               queue->s.tail->next = buf_hdr[0];
+               queue->s.tail->next = hdr;
 
        queue->s.tail = tail;
 
@@ -483,9 +511,9 @@ int odp_queue_enq(odp_queue_t handle, odp_event_t ev)
 static inline int deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
                            int num)
 {
-       odp_buffer_hdr_t *hdr;
-       int i;
-       uint32_t j;
+       odp_buffer_hdr_t *hdr, *next;
+       int i, j;
+       int updated = 0;
 
        LOCK(&queue->s.lock);
        if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
@@ -506,33 +534,65 @@ static inline int deq_multi(queue_entry_t *queue, 
odp_buffer_hdr_t *buf_hdr[],
                return 0;
        }
 
-       for (i = 0; i < num && hdr; i++) {
-               buf_hdr[i]       = hdr;
-               hdr              = hdr->next;
-               buf_hdr[i]->next = NULL;
-               if (queue_is_ordered(queue)) {
-                       buf_hdr[i]->origin_qe = queue;
-                       buf_hdr[i]->order     = queue->s.order_in++;
-                       for (j = 0; j < queue->s.param.sched.lock_count; j++) {
-                               buf_hdr[i]->sync[j] =
+       for (i = 0; i < num && hdr; ) {
+               int burst_num = hdr->burst_num;
+               int first     = hdr->burst_first;
+
+               /* First, get bursted buffers */
+               for (j = 0; j < burst_num && i < num; j++, i++) {
+                       buf_hdr[i] = hdr->burst[first + j];
+                       odp_prefetch(buf_hdr[i]);
+               }
+
+               if (burst_num) {
+                       hdr->burst_num   = burst_num - j;
+                       hdr->burst_first = first + j;
+               }
+
+               if (i == num)
+                       break;
+
+               /* When burst is empty, consume the current buffer header and
+                * move to the next header */
+               buf_hdr[i] = hdr;
+               next       = hdr->next;
+               hdr->next  = NULL;
+               hdr        = next;
+               updated++;
+               i++;
+       }
+
+       /* Ordered queue book keeping inside the lock */
+       if (queue_is_ordered(queue)) {
+               for (j = 0; j < i; j++) {
+                       uint32_t k;
+
+                       buf_hdr[j]->origin_qe = queue;
+                       buf_hdr[j]->order     = queue->s.order_in++;
+                       for (k = 0; k < queue->s.param.sched.lock_count; k++) {
+                               buf_hdr[j]->sync[k] =
                                        odp_atomic_fetch_inc_u64
-                                       (&queue->s.sync_in[j]);
+                                       (&queue->s.sync_in[k]);
                        }
-                       buf_hdr[i]->flags.sustain = SUSTAIN_ORDER;
-               } else {
-                       buf_hdr[i]->origin_qe = NULL;
+                       buf_hdr[j]->flags.sustain = SUSTAIN_ORDER;
                }
        }
 
-       queue->s.head = hdr;
+       /* Write head only if updated */
+       if (updated)
+               queue->s.head = hdr;
 
-       if (hdr == NULL) {
-               /* Queue is now empty */
+       /* Queue is empty */
+       if (hdr == NULL)
                queue->s.tail = NULL;
-       }
 
        UNLOCK(&queue->s.lock);
 
+       /* Init origin_qe for non-ordered queues */
+       if (!queue_is_ordered(queue))
+               for (j = 0; j < i; j++)
+                       buf_hdr[j]->origin_qe = NULL;
+
        return i;
 }
 
@@ -543,7 +603,7 @@ int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t 
*buf_hdr[], int num)
 
 odp_buffer_hdr_t *queue_deq(queue_entry_t *queue)
 {
-       odp_buffer_hdr_t *buf_hdr;
+       odp_buffer_hdr_t *buf_hdr = NULL;
        int ret;
 
        ret = deq_multi(queue, &buf_hdr, 1);
diff --git a/platform/linux-generic/odp_schedule_ordered.c 
b/platform/linux-generic/odp_schedule_ordered.c
index 92a1cc8..8412183 100644
--- a/platform/linux-generic/odp_schedule_ordered.c
+++ b/platform/linux-generic/odp_schedule_ordered.c
@@ -457,15 +457,24 @@ int schedule_ordered_queue_enq_multi(uint32_t 
queue_index, void *p_buf_hdr[],
 {
        queue_entry_t *origin_qe;
        uint64_t order;
-       int rc;
+       int i, rc;
        queue_entry_t *qe = get_qentry(queue_index);
-       odp_buffer_hdr_t *buf_hdr = p_buf_hdr[0];
+       odp_buffer_hdr_t *first_hdr = p_buf_hdr[0];
+       odp_buffer_hdr_t **buf_hdr = (odp_buffer_hdr_t **)p_buf_hdr;
+
+       /* Chain input buffers together */
+       for (i = 0; i < num - 1; i++) {
+               buf_hdr[i]->next = buf_hdr[i + 1];
+               buf_hdr[i]->burst_num = 0;
+       }
+
+       buf_hdr[num - 1]->next = NULL;
 
        /* Handle ordered enqueues commonly via links */
-       get_queue_order(&origin_qe, &order, buf_hdr);
+       get_queue_order(&origin_qe, &order, first_hdr);
        if (origin_qe) {
-               buf_hdr->link = buf_hdr->next;
-               rc = ordered_queue_enq(qe, buf_hdr, sustain,
+               first_hdr->link = first_hdr->next;
+               rc = ordered_queue_enq(qe, first_hdr, sustain,
                                       origin_qe, order);
                *ret = rc == 0 ? num : rc;
                return 1;
-- 
2.8.1

Reply via email to