From: Petri Savolainen <petri.savolai...@linaro.org>

Scheduled queue dequeue function calls directly the scheduler
queue destroy callback. Sched_queue_deq() usage is simpler
when the extra round of callbacks is removed.

Signed-off-by: Petri Savolainen <petri.savolai...@linaro.org>
---
/** Email created from pull request 699 (psavol:master-sched-optim-clean-ups)
 ** https://github.com/Linaro/odp/pull/699
 ** Patch: https://github.com/Linaro/odp/pull/699.patch
 ** Base sha: 33fbc04b6373960ec3f84de4e7e7b34c49d71508
 ** Merge commit sha: 32d7a11f22e6f2e1e378b653993c5377d4116d8f
 **/
 .../include/odp_queue_basic_internal.h        |  1 -
 platform/linux-generic/odp_queue_basic.c      | 20 +++------
 platform/linux-generic/odp_schedule_basic.c   | 11 +----
 platform/linux-generic/odp_schedule_iquery.c  | 35 +++++++--------
 platform/linux-generic/odp_schedule_sp.c      | 43 ++++++++++---------
 5 files changed, 47 insertions(+), 63 deletions(-)

diff --git a/platform/linux-generic/include/odp_queue_basic_internal.h 
b/platform/linux-generic/include/odp_queue_basic_internal.h
index 46b747955..41ca424c7 100644
--- a/platform/linux-generic/include/odp_queue_basic_internal.h
+++ b/platform/linux-generic/include/odp_queue_basic_internal.h
@@ -113,7 +113,6 @@ static inline queue_entry_t *qentry_from_handle(odp_queue_t 
handle)
 void queue_spsc_init(queue_entry_t *queue, uint32_t queue_size);
 
 /* Functions for schedulers */
-void sched_queue_destroy_finalize(uint32_t queue_index);
 void sched_queue_set_status(uint32_t queue_index, int status);
 int sched_queue_deq(uint32_t queue_index, odp_event_t ev[], int num,
                    int update_status);
diff --git a/platform/linux-generic/odp_queue_basic.c 
b/platform/linux-generic/odp_queue_basic.c
index 61cf8a56c..3f00cc118 100644
--- a/platform/linux-generic/odp_queue_basic.c
+++ b/platform/linux-generic/odp_queue_basic.c
@@ -353,19 +353,6 @@ static odp_queue_t queue_create(const char *name,
        return handle;
 }
 
-void sched_queue_destroy_finalize(uint32_t queue_index)
-{
-       queue_entry_t *queue = qentry_from_index(queue_index);
-
-       LOCK(queue);
-
-       if (queue->s.status == QUEUE_STATUS_DESTROYED) {
-               queue->s.status = QUEUE_STATUS_FREE;
-               sched_fn->destroy_queue(queue_index);
-       }
-       UNLOCK(queue);
-}
-
 void sched_queue_set_status(uint32_t queue_index, int status)
 {
        queue_entry_t *queue = qentry_from_index(queue_index);
@@ -720,7 +707,12 @@ int sched_queue_deq(uint32_t queue_index, odp_event_t 
ev[], int max_num,
 
        if (odp_unlikely(status < QUEUE_STATUS_READY)) {
                /* Bad queue, or queue has been destroyed.
-                * Scheduler finalizes queue destroy after this. */
+                * Inform scheduler about a destroyed queue. */
+               if (queue->s.status == QUEUE_STATUS_DESTROYED) {
+                       queue->s.status = QUEUE_STATUS_FREE;
+                       sched_fn->destroy_queue(queue_index);
+               }
+
                UNLOCK(queue);
                return -1;
        }
diff --git a/platform/linux-generic/odp_schedule_basic.c 
b/platform/linux-generic/odp_schedule_basic.c
index 46ae7f1c1..6ed1f8b49 100644
--- a/platform/linux-generic/odp_schedule_basic.c
+++ b/platform/linux-generic/odp_schedule_basic.c
@@ -402,11 +402,6 @@ static int schedule_init_global(void)
        return 0;
 }
 
-static inline void queue_destroy_finalize(uint32_t qi)
-{
-       sched_queue_destroy_finalize(qi);
-}
-
 static int schedule_term_global(void)
 {
        int ret = 0;
@@ -427,9 +422,6 @@ static int schedule_term_global(void)
 
                                        num = sched_queue_deq(qi, events, 1, 1);
 
-                                       if (num < 0)
-                                               queue_destroy_finalize(qi);
-
                                        if (num > 0)
                                                ODP_ERR("Queue not empty\n");
                                }
@@ -944,10 +936,9 @@ static inline int do_schedule_grp(odp_queue_t *out_queue, 
odp_event_t out_ev[],
 
                        num = sched_queue_deq(qi, ev_tbl, max_deq, !pktin);
 
-                       if (num < 0) {
+                       if (odp_unlikely(num < 0)) {
                                /* Destroyed queue. Continue scheduling the same
                                 * priority queue. */
-                               sched_queue_destroy_finalize(qi);
                                continue;
                        }
 
diff --git a/platform/linux-generic/odp_schedule_iquery.c 
b/platform/linux-generic/odp_schedule_iquery.c
index 7dde77844..f76942ff3 100644
--- a/platform/linux-generic/odp_schedule_iquery.c
+++ b/platform/linux-generic/odp_schedule_iquery.c
@@ -209,6 +209,7 @@ struct sched_thread_local {
         * in the same priority level.
         */
        odp_rwlock_t lock;
+       int r_locked;
        queue_index_sparse_t indexes[NUM_SCHED_PRIO];
        sparse_bitmap_iterator_t iterators[NUM_SCHED_PRIO];
 
@@ -292,9 +293,7 @@ static int schedule_term_global(void)
                if (sched->availables[i])
                        count = sched_queue_deq(i, events, 1, 1);
 
-               if (count < 0)
-                       sched_queue_destroy_finalize(i);
-               else if (count > 0)
+               if (count > 0)
                        ODP_ERR("Queue (%d) not empty\n", i);
        }
 
@@ -526,7 +525,14 @@ static void destroy_sched_queue(uint32_t queue_index)
                return;
        }
 
+       if (thread_local.r_locked)
+               odp_rwlock_read_unlock(&thread_local.lock);
+
        __destroy_sched_queue(G, queue_index);
+
+       if (thread_local.r_locked)
+               odp_rwlock_read_lock(&thread_local.lock);
+
        odp_rwlock_write_unlock(&G->lock);
 
        if (sched->queues[queue_index].sync == ODP_SCHED_SYNC_ORDERED &&
@@ -614,9 +620,6 @@ static int schedule_pktio_stop(int pktio, int pktin 
ODP_UNUSED)
        return remains;
 }
 
-#define DO_SCHED_LOCK() odp_rwlock_read_lock(&thread_local.lock)
-#define DO_SCHED_UNLOCK() odp_rwlock_read_unlock(&thread_local.lock)
-
 static inline bool do_schedule_prio(int prio);
 
 static inline int pop_cache_events(odp_event_t ev[], unsigned int max)
@@ -720,7 +723,9 @@ static int do_schedule(odp_queue_t *out_queue,
        if (odp_unlikely(thread_local.pause))
                return count;
 
-       DO_SCHED_LOCK();
+       odp_rwlock_read_lock(&thread_local.lock);
+       thread_local.r_locked = 1;
+
        /* Schedule events */
        for (prio = 0; prio < NUM_SCHED_PRIO; prio++) {
                /* Round robin iterate the interested queue
@@ -732,11 +737,14 @@ static int do_schedule(odp_queue_t *out_queue,
 
                count = pop_cache_events(out_ev, max_num);
                assign_queue_handle(out_queue);
-               DO_SCHED_UNLOCK();
+
+               odp_rwlock_read_unlock(&thread_local.lock);
+               thread_local.r_locked = 0;
                return count;
        }
 
-       DO_SCHED_UNLOCK();
+       odp_rwlock_read_unlock(&thread_local.lock);
+       thread_local.r_locked = 0;
 
        /* Poll packet input when there are no events */
        pktio_poll_input();
@@ -1536,14 +1544,7 @@ static inline int consume_queue(int prio, unsigned int 
queue_index)
 
        count = sched_queue_deq(queue_index, cache->stash, max, 1);
 
-       if (count < 0) {
-               DO_SCHED_UNLOCK();
-               sched_queue_destroy_finalize(queue_index);
-               DO_SCHED_LOCK();
-               return 0;
-       }
-
-       if (count == 0)
+       if (count <= 0)
                return 0;
 
        cache->top = &cache->stash[0];
diff --git a/platform/linux-generic/odp_schedule_sp.c 
b/platform/linux-generic/odp_schedule_sp.c
index 7932e1860..8ddd1e94e 100644
--- a/platform/linux-generic/odp_schedule_sp.c
+++ b/platform/linux-generic/odp_schedule_sp.c
@@ -223,12 +223,21 @@ static int init_local(void)
 
 static int term_global(void)
 {
+       odp_event_t event;
        int qi, ret = 0;
 
        for (qi = 0; qi < NUM_QUEUE; qi++) {
+               int report = 1;
+
                if (sched_global->queue_cmd[qi].s.init) {
-                       /* todo: dequeue until empty ? */
-                       sched_queue_destroy_finalize(qi);
+                       while (sched_queue_deq(qi, &event, 1, 1) > 0) {
+                               if (report) {
+                                       ODP_ERR("Queue not empty\n");
+                                       report = 0;
+                               }
+                               odp_event_free(event);
+                       }
+
                }
        }
 
@@ -564,28 +573,20 @@ static int schedule_multi(odp_queue_t *from, uint64_t 
wait,
                qi  = cmd->s.index;
                num = sched_queue_deq(qi, events, 1, 1);
 
-               if (num > 0) {
-                       sched_local.cmd = cmd;
-
-                       if (from)
-                               *from = queue_from_index(qi);
-
-                       return num;
-               }
-
-               if (num < 0) {
-                       /* Destroyed queue */
-                       sched_queue_destroy_finalize(qi);
+               if (num <= 0) {
+                       /* Destroyed or empty queue. Remove empty queue from
+                        * scheduling. A dequeue operation to on an already
+                        * empty queue moves it to NOTSCHED state and
+                        * sched_queue() will be called on next enqueue. */
                        continue;
                }
 
-               if (num == 0) {
-                       /* Remove empty queue from scheduling. A dequeue
-                        * operation to on an already empty queue moves
-                        * it to NOTSCHED state and sched_queue() will
-                        * be called on next enqueue. */
-                       continue;
-               }
+               sched_local.cmd = cmd;
+
+               if (from)
+                       *from = queue_from_index(qi);
+
+               return num;
        }
 }
 

Reply via email to