Reuse multi enqueue and dequeue implementations for single
enq/deq operations. This enables implementation to
concentrate on optimizing the multi operations. Single
operations do not suffer a major performance decrease since
compiler likely optimizes the inlined code for single
operations (num is fixed to 1).

Signed-off-by: Petri Savolainen <petri.savolai...@nokia.com>
---
 platform/linux-generic/include/odp_schedule_if.h |   3 -
 platform/linux-generic/odp_queue.c               | 134 +++++++----------------
 platform/linux-generic/odp_schedule.c            |   1 -
 platform/linux-generic/odp_schedule_ordered.c    |  20 ----
 platform/linux-generic/odp_schedule_sp.c         |  12 --
 5 files changed, 41 insertions(+), 129 deletions(-)

diff --git a/platform/linux-generic/include/odp_schedule_if.h 
b/platform/linux-generic/include/odp_schedule_if.h
index 13cdfb3..df73e70 100644
--- a/platform/linux-generic/include/odp_schedule_if.h
+++ b/platform/linux-generic/include/odp_schedule_if.h
@@ -30,8 +30,6 @@ typedef int (*schedule_init_queue_fn_t)(uint32_t queue_index,
                                       );
 typedef void (*schedule_destroy_queue_fn_t)(uint32_t queue_index);
 typedef int (*schedule_sched_queue_fn_t)(uint32_t queue_index);
-typedef int (*schedule_ord_enq_fn_t)(uint32_t queue_index, void *buf_hdr,
-                                    int sustain, int *ret);
 typedef int (*schedule_ord_enq_multi_fn_t)(uint32_t queue_index,
                                           void *buf_hdr[], int num,
                                           int sustain, int *ret);
@@ -48,7 +46,6 @@ typedef struct schedule_fn_t {
        schedule_init_queue_fn_t    init_queue;
        schedule_destroy_queue_fn_t destroy_queue;
        schedule_sched_queue_fn_t   sched_queue;
-       schedule_ord_enq_fn_t       ord_enq;
        schedule_ord_enq_multi_fn_t ord_enq_multi;
        schedule_init_global_fn_t   init_global;
        schedule_term_global_fn_t   term_global;
diff --git a/platform/linux-generic/odp_queue.c 
b/platform/linux-generic/odp_queue.c
index bec1e51..80d99e8 100644
--- a/platform/linux-generic/odp_queue.c
+++ b/platform/linux-generic/odp_queue.c
@@ -65,19 +65,6 @@ static inline int queue_is_ordered(queue_entry_t *qe)
        return qe->s.param.sched.sync == ODP_SCHED_SYNC_ORDERED;
 }
 
-static inline void queue_add(queue_entry_t *queue,
-                            odp_buffer_hdr_t *buf_hdr)
-{
-       buf_hdr->next = NULL;
-
-       if (queue->s.head)
-               queue->s.tail->next = buf_hdr;
-       else
-               queue->s.head = buf_hdr;
-
-       queue->s.tail = buf_hdr;
-}
-
 queue_entry_t *get_qentry(uint32_t queue_id)
 {
        return &queue_tbl->queue[queue_id];
@@ -396,37 +383,8 @@ odp_queue_t odp_queue_lookup(const char *name)
        return ODP_QUEUE_INVALID;
 }
 
-int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain)
-{
-       int ret;
-
-       if (sched_fn->ord_enq(queue->s.index, buf_hdr, sustain, &ret))
-               return ret;
-
-       LOCK(&queue->s.lock);
-
-       if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
-               UNLOCK(&queue->s.lock);
-               ODP_ERR("Bad queue status\n");
-               return -1;
-       }
-
-       queue_add(queue, buf_hdr);
-
-       if (queue->s.status == QUEUE_STATUS_NOTSCHED) {
-               queue->s.status = QUEUE_STATUS_SCHED;
-               UNLOCK(&queue->s.lock);
-               if (sched_fn->sched_queue(queue->s.index))
-                       ODP_ABORT("schedule_queue failed\n");
-               return 0;
-       }
-
-       UNLOCK(&queue->s.lock);
-       return 0;
-}
-
-int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
-                   int num, int sustain)
+static inline int enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
+                           int num, int sustain)
 {
        int sched = 0;
        int i, ret;
@@ -472,6 +430,24 @@ int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t 
*buf_hdr[],
        return num; /* All events enqueued */
 }
 
+int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num,
+                   int sustain)
+{
+       return enq_multi(queue, buf_hdr, num, sustain);
+}
+
+int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain)
+{
+       int ret;
+
+       ret = enq_multi(queue, &buf_hdr, 1, sustain);
+
+       if (ret == 1)
+               return 0;
+       else
+               return -1;
+}
+
 int odp_queue_enq_multi(odp_queue_t handle, const odp_event_t ev[], int num)
 {
        odp_buffer_hdr_t *buf_hdr[QUEUE_MULTI_MAX];
@@ -504,54 +480,8 @@ int odp_queue_enq(odp_queue_t handle, odp_event_t ev)
        return queue->s.enqueue(queue, buf_hdr, SUSTAIN_ORDER);
 }
 
-odp_buffer_hdr_t *queue_deq(queue_entry_t *queue)
-{
-       odp_buffer_hdr_t *buf_hdr;
-       uint32_t i;
-
-       LOCK(&queue->s.lock);
-
-       if (queue->s.head == NULL) {
-               /* Already empty queue */
-               if (queue->s.status == QUEUE_STATUS_SCHED)
-                       queue->s.status = QUEUE_STATUS_NOTSCHED;
-
-               UNLOCK(&queue->s.lock);
-               return NULL;
-       }
-
-       buf_hdr       = queue->s.head;
-       queue->s.head = buf_hdr->next;
-       buf_hdr->next = NULL;
-
-       /* Note that order should really be assigned on enq to an
-        * ordered queue rather than deq, however the logic is simpler
-        * to do it here and has the same effect.
-        */
-       if (queue_is_ordered(queue)) {
-               buf_hdr->origin_qe = queue;
-               buf_hdr->order = queue->s.order_in++;
-               for (i = 0; i < queue->s.param.sched.lock_count; i++) {
-                       buf_hdr->sync[i] =
-                               odp_atomic_fetch_inc_u64(&queue->s.sync_in[i]);
-               }
-               buf_hdr->flags.sustain = SUSTAIN_ORDER;
-       } else {
-               buf_hdr->origin_qe = NULL;
-       }
-
-       if (queue->s.head == NULL) {
-               /* Queue is now empty */
-               queue->s.tail = NULL;
-       }
-
-       UNLOCK(&queue->s.lock);
-
-       return buf_hdr;
-}
-
-
-int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num)
+static inline int deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
+                           int num)
 {
        odp_buffer_hdr_t *hdr;
        int i;
@@ -606,6 +536,24 @@ int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t 
*buf_hdr[], int num)
        return i;
 }
 
+int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num)
+{
+       return deq_multi(queue, buf_hdr, num);
+}
+
+odp_buffer_hdr_t *queue_deq(queue_entry_t *queue)
+{
+       odp_buffer_hdr_t *buf_hdr;
+       int ret;
+
+       ret = deq_multi(queue, &buf_hdr, 1);
+
+       if (ret == 1)
+               return buf_hdr;
+       else
+               return NULL;
+}
+
 int odp_queue_deq_multi(odp_queue_t handle, odp_event_t events[], int num)
 {
        queue_entry_t *queue;
@@ -740,7 +688,7 @@ int sched_cb_queue_deq_multi(uint32_t queue_index, 
odp_event_t ev[], int num)
        queue_entry_t *qe = get_qentry(queue_index);
        odp_buffer_hdr_t *buf_hdr[num];
 
-       ret = queue_deq_multi(qe, buf_hdr, num);
+       ret = deq_multi(qe, buf_hdr, num);
 
        if (ret > 0)
                for (i = 0; i < ret; i++)
diff --git a/platform/linux-generic/odp_schedule.c 
b/platform/linux-generic/odp_schedule.c
index 78982d9..81e79c9 100644
--- a/platform/linux-generic/odp_schedule.c
+++ b/platform/linux-generic/odp_schedule.c
@@ -1063,7 +1063,6 @@ const schedule_fn_t schedule_default_fn = {
        .init_queue = schedule_init_queue,
        .destroy_queue = schedule_destroy_queue,
        .sched_queue = schedule_sched_queue,
-       .ord_enq = schedule_ordered_queue_enq,
        .ord_enq_multi = schedule_ordered_queue_enq_multi,
        .init_global = schedule_init_global,
        .term_global = schedule_term_global,
diff --git a/platform/linux-generic/odp_schedule_ordered.c 
b/platform/linux-generic/odp_schedule_ordered.c
index 8c1dd7e..92a1cc8 100644
--- a/platform/linux-generic/odp_schedule_ordered.c
+++ b/platform/linux-generic/odp_schedule_ordered.c
@@ -452,26 +452,6 @@ static int ordered_queue_enq(queue_entry_t *queue, 
odp_buffer_hdr_t *buf_hdr,
        return 0;
 }
 
-int schedule_ordered_queue_enq(uint32_t queue_index, void *p_buf_hdr,
-                              int sustain, int *ret)
-{
-       queue_entry_t *origin_qe;
-       uint64_t order;
-       queue_entry_t *qe = get_qentry(queue_index);
-       odp_buffer_hdr_t *buf_hdr = p_buf_hdr;
-
-       get_queue_order(&origin_qe, &order, buf_hdr);
-
-       /* Handle enqueues from ordered queues separately */
-       if (origin_qe) {
-               *ret = ordered_queue_enq(qe, buf_hdr, sustain,
-                                        origin_qe, order);
-               return 1;
-       }
-
-       return 0;
-}
-
 int schedule_ordered_queue_enq_multi(uint32_t queue_index, void *p_buf_hdr[],
                                     int num, int sustain, int *ret)
 {
diff --git a/platform/linux-generic/odp_schedule_sp.c 
b/platform/linux-generic/odp_schedule_sp.c
index 2e28aa4..879eb5c 100644
--- a/platform/linux-generic/odp_schedule_sp.c
+++ b/platform/linux-generic/odp_schedule_sp.c
@@ -298,17 +298,6 @@ static int sched_queue(uint32_t qi)
        return 0;
 }
 
-static int ord_enq(uint32_t queue_index, void *buf_hdr, int sustain, int *ret)
-{
-       (void)queue_index;
-       (void)buf_hdr;
-       (void)sustain;
-       (void)ret;
-
-       /* didn't consume the events */
-       return 0;
-}
-
 static int ord_enq_multi(uint32_t queue_index, void *buf_hdr[], int num,
                         int sustain, int *ret)
 {
@@ -673,7 +662,6 @@ const schedule_fn_t schedule_sp_fn = {
        .init_queue    = init_queue,
        .destroy_queue = destroy_queue,
        .sched_queue   = sched_queue,
-       .ord_enq       = ord_enq,
        .ord_enq_multi = ord_enq_multi,
        .init_global   = init_global,
        .term_global   = term_global,
-- 
2.8.1

Reply via email to