From: Petri Savolainen <petri.savolai...@linaro.org> Scheduled queue dequeue function calls directly the scheduler queue destroy callback. Sched_queue_deq() usage is simpler when the extra round of callbacks is removed.
Signed-off-by: Petri Savolainen <petri.savolai...@linaro.org> --- /** Email created from pull request 699 (psavol:master-sched-optim-clean-ups) ** https://github.com/Linaro/odp/pull/699 ** Patch: https://github.com/Linaro/odp/pull/699.patch ** Base sha: 33fbc04b6373960ec3f84de4e7e7b34c49d71508 ** Merge commit sha: 32d7a11f22e6f2e1e378b653993c5377d4116d8f **/ .../include/odp_queue_basic_internal.h | 1 - platform/linux-generic/odp_queue_basic.c | 20 +++------ platform/linux-generic/odp_schedule_basic.c | 11 +---- platform/linux-generic/odp_schedule_iquery.c | 35 +++++++-------- platform/linux-generic/odp_schedule_sp.c | 43 ++++++++++--------- 5 files changed, 47 insertions(+), 63 deletions(-) diff --git a/platform/linux-generic/include/odp_queue_basic_internal.h b/platform/linux-generic/include/odp_queue_basic_internal.h index 46b747955..41ca424c7 100644 --- a/platform/linux-generic/include/odp_queue_basic_internal.h +++ b/platform/linux-generic/include/odp_queue_basic_internal.h @@ -113,7 +113,6 @@ static inline queue_entry_t *qentry_from_handle(odp_queue_t handle) void queue_spsc_init(queue_entry_t *queue, uint32_t queue_size); /* Functions for schedulers */ -void sched_queue_destroy_finalize(uint32_t queue_index); void sched_queue_set_status(uint32_t queue_index, int status); int sched_queue_deq(uint32_t queue_index, odp_event_t ev[], int num, int update_status); diff --git a/platform/linux-generic/odp_queue_basic.c b/platform/linux-generic/odp_queue_basic.c index 61cf8a56c..3f00cc118 100644 --- a/platform/linux-generic/odp_queue_basic.c +++ b/platform/linux-generic/odp_queue_basic.c @@ -353,19 +353,6 @@ static odp_queue_t queue_create(const char *name, return handle; } -void sched_queue_destroy_finalize(uint32_t queue_index) -{ - queue_entry_t *queue = qentry_from_index(queue_index); - - LOCK(queue); - - if (queue->s.status == QUEUE_STATUS_DESTROYED) { - queue->s.status = QUEUE_STATUS_FREE; - sched_fn->destroy_queue(queue_index); - } - UNLOCK(queue); -} - void sched_queue_set_status(uint32_t queue_index, int status) { queue_entry_t *queue = qentry_from_index(queue_index); @@ -720,7 +707,12 @@ int sched_queue_deq(uint32_t queue_index, odp_event_t ev[], int max_num, if (odp_unlikely(status < QUEUE_STATUS_READY)) { /* Bad queue, or queue has been destroyed. - * Scheduler finalizes queue destroy after this. */ + * Inform scheduler about a destroyed queue. */ + if (queue->s.status == QUEUE_STATUS_DESTROYED) { + queue->s.status = QUEUE_STATUS_FREE; + sched_fn->destroy_queue(queue_index); + } + UNLOCK(queue); return -1; } diff --git a/platform/linux-generic/odp_schedule_basic.c b/platform/linux-generic/odp_schedule_basic.c index 46ae7f1c1..6ed1f8b49 100644 --- a/platform/linux-generic/odp_schedule_basic.c +++ b/platform/linux-generic/odp_schedule_basic.c @@ -402,11 +402,6 @@ static int schedule_init_global(void) return 0; } -static inline void queue_destroy_finalize(uint32_t qi) -{ - sched_queue_destroy_finalize(qi); -} - static int schedule_term_global(void) { int ret = 0; @@ -427,9 +422,6 @@ static int schedule_term_global(void) num = sched_queue_deq(qi, events, 1, 1); - if (num < 0) - queue_destroy_finalize(qi); - if (num > 0) ODP_ERR("Queue not empty\n"); } @@ -944,10 +936,9 @@ static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[], num = sched_queue_deq(qi, ev_tbl, max_deq, !pktin); - if (num < 0) { + if (odp_unlikely(num < 0)) { /* Destroyed queue. Continue scheduling the same * priority queue. */ - sched_queue_destroy_finalize(qi); continue; } diff --git a/platform/linux-generic/odp_schedule_iquery.c b/platform/linux-generic/odp_schedule_iquery.c index 7dde77844..f76942ff3 100644 --- a/platform/linux-generic/odp_schedule_iquery.c +++ b/platform/linux-generic/odp_schedule_iquery.c @@ -209,6 +209,7 @@ struct sched_thread_local { * in the same priority level. */ odp_rwlock_t lock; + int r_locked; queue_index_sparse_t indexes[NUM_SCHED_PRIO]; sparse_bitmap_iterator_t iterators[NUM_SCHED_PRIO]; @@ -292,9 +293,7 @@ static int schedule_term_global(void) if (sched->availables[i]) count = sched_queue_deq(i, events, 1, 1); - if (count < 0) - sched_queue_destroy_finalize(i); - else if (count > 0) + if (count > 0) ODP_ERR("Queue (%d) not empty\n", i); } @@ -526,7 +525,14 @@ static void destroy_sched_queue(uint32_t queue_index) return; } + if (thread_local.r_locked) + odp_rwlock_read_unlock(&thread_local.lock); + __destroy_sched_queue(G, queue_index); + + if (thread_local.r_locked) + odp_rwlock_read_lock(&thread_local.lock); + odp_rwlock_write_unlock(&G->lock); if (sched->queues[queue_index].sync == ODP_SCHED_SYNC_ORDERED && @@ -614,9 +620,6 @@ static int schedule_pktio_stop(int pktio, int pktin ODP_UNUSED) return remains; } -#define DO_SCHED_LOCK() odp_rwlock_read_lock(&thread_local.lock) -#define DO_SCHED_UNLOCK() odp_rwlock_read_unlock(&thread_local.lock) - static inline bool do_schedule_prio(int prio); static inline int pop_cache_events(odp_event_t ev[], unsigned int max) @@ -720,7 +723,9 @@ static int do_schedule(odp_queue_t *out_queue, if (odp_unlikely(thread_local.pause)) return count; - DO_SCHED_LOCK(); + odp_rwlock_read_lock(&thread_local.lock); + thread_local.r_locked = 1; + /* Schedule events */ for (prio = 0; prio < NUM_SCHED_PRIO; prio++) { /* Round robin iterate the interested queue @@ -732,11 +737,14 @@ static int do_schedule(odp_queue_t *out_queue, count = pop_cache_events(out_ev, max_num); assign_queue_handle(out_queue); - DO_SCHED_UNLOCK(); + + odp_rwlock_read_unlock(&thread_local.lock); + thread_local.r_locked = 0; return count; } - DO_SCHED_UNLOCK(); + odp_rwlock_read_unlock(&thread_local.lock); + thread_local.r_locked = 0; /* Poll packet input when there are no events */ pktio_poll_input(); @@ -1536,14 +1544,7 @@ static inline int consume_queue(int prio, unsigned int queue_index) count = sched_queue_deq(queue_index, cache->stash, max, 1); - if (count < 0) { - DO_SCHED_UNLOCK(); - sched_queue_destroy_finalize(queue_index); - DO_SCHED_LOCK(); - return 0; - } - - if (count == 0) + if (count <= 0) return 0; cache->top = &cache->stash[0]; diff --git a/platform/linux-generic/odp_schedule_sp.c b/platform/linux-generic/odp_schedule_sp.c index 7932e1860..8ddd1e94e 100644 --- a/platform/linux-generic/odp_schedule_sp.c +++ b/platform/linux-generic/odp_schedule_sp.c @@ -223,12 +223,21 @@ static int init_local(void) static int term_global(void) { + odp_event_t event; int qi, ret = 0; for (qi = 0; qi < NUM_QUEUE; qi++) { + int report = 1; + if (sched_global->queue_cmd[qi].s.init) { - /* todo: dequeue until empty ? */ - sched_queue_destroy_finalize(qi); + while (sched_queue_deq(qi, &event, 1, 1) > 0) { + if (report) { + ODP_ERR("Queue not empty\n"); + report = 0; + } + odp_event_free(event); + } + } } @@ -564,28 +573,20 @@ static int schedule_multi(odp_queue_t *from, uint64_t wait, qi = cmd->s.index; num = sched_queue_deq(qi, events, 1, 1); - if (num > 0) { - sched_local.cmd = cmd; - - if (from) - *from = queue_from_index(qi); - - return num; - } - - if (num < 0) { - /* Destroyed queue */ - sched_queue_destroy_finalize(qi); + if (num <= 0) { + /* Destroyed or empty queue. Remove empty queue from + * scheduling. A dequeue operation to on an already + * empty queue moves it to NOTSCHED state and + * sched_queue() will be called on next enqueue. */ continue; } - if (num == 0) { - /* Remove empty queue from scheduling. A dequeue - * operation to on an already empty queue moves - * it to NOTSCHED state and sched_queue() will - * be called on next enqueue. */ - continue; - } + sched_local.cmd = cmd; + + if (from) + *from = queue_from_index(qi); + + return num; } }