From: Petri Savolainen <petri.savolai...@linaro.org> Use single thread local variable to keep track if a synchronization context is held and the type of the context (atomic or ordered). Performance is improved as sync context status is located on single (the first) cache line of sched_local_t.
Signed-off-by: Petri Savolainen <petri.savolai...@linaro.org> --- /** Email created from pull request 699 (psavol:master-sched-optim-clean-ups) ** https://github.com/Linaro/odp/pull/699 ** Patch: https://github.com/Linaro/odp/pull/699.patch ** Base sha: 33fbc04b6373960ec3f84de4e7e7b34c49d71508 ** Merge commit sha: 32d7a11f22e6f2e1e378b653993c5377d4116d8f **/ platform/linux-generic/odp_schedule_basic.c | 130 +++++++++++--------- 1 file changed, 72 insertions(+), 58 deletions(-) diff --git a/platform/linux-generic/odp_schedule_basic.c b/platform/linux-generic/odp_schedule_basic.c index 89c0a5c42..46ae7f1c1 100644 --- a/platform/linux-generic/odp_schedule_basic.c +++ b/platform/linux-generic/odp_schedule_basic.c @@ -30,6 +30,9 @@ #include <odp_libconfig_internal.h> #include <odp/api/plat/queue_inlines.h> +/* No synchronization context */ +#define NO_SYNC_CONTEXT ODP_SCHED_SYNC_PARALLEL + /* Number of priority levels */ #define NUM_PRIO 8 @@ -124,7 +127,8 @@ ODP_STATIC_ASSERT(sizeof(lock_called_t) == sizeof(uint32_t), /* Scheduler local data */ typedef struct ODP_ALIGNED_CACHE { uint16_t thr; - uint16_t pause; + uint8_t pause; + uint8_t sync_ctx; uint16_t grp_round; uint16_t spread_round; @@ -241,9 +245,6 @@ static sched_global_t *sched; /* Thread local scheduler context */ static __thread sched_local_t sched_local; -/* Function prototypes */ -static inline void schedule_release_context(void); - static int read_config_file(sched_global_t *sched) { const char *str; @@ -311,6 +312,7 @@ static void sched_local_init(void) memset(&sched_local, 0, sizeof(sched_local_t)); sched_local.thr = odp_thread_id(); + sched_local.sync_ctx = NO_SYNC_CONTEXT; sched_local.stash.queue = ODP_QUEUE_INVALID; sched_local.stash.qi = PRIO_QUEUE_EMPTY; sched_local.ordered.src_queue = NULL_INDEX; @@ -450,17 +452,6 @@ static int schedule_init_local(void) return 0; } -static int schedule_term_local(void) -{ - if (sched_local.stash.num_ev) { - ODP_ERR("Locally pre-scheduled events exist.\n"); - return -1; - } - - schedule_release_context(); - return 0; -} - static inline void grp_update_mask(int grp, const odp_thrmask_t *new_mask) { odp_thrmask_copy(&sched->sched_grp[grp].mask, new_mask); @@ -565,14 +556,9 @@ static int schedule_init_queue(uint32_t queue_index, return 0; } -static inline int queue_is_atomic(uint32_t queue_index) +static inline uint8_t sched_sync_type(uint32_t queue_index) { - return sched->queue[queue_index].sync == ODP_SCHED_SYNC_ATOMIC; -} - -static inline int queue_is_ordered(uint32_t queue_index) -{ - return sched->queue[queue_index].sync == ODP_SCHED_SYNC_ORDERED; + return sched->queue[queue_index].sync; } static void schedule_destroy_queue(uint32_t queue_index) @@ -584,7 +570,7 @@ static void schedule_destroy_queue(uint32_t queue_index) sched->queue[queue_index].prio = 0; sched->queue[queue_index].spread = 0; - if (queue_is_ordered(queue_index) && + if ((sched_sync_type(queue_index) == ODP_SCHED_SYNC_ORDERED) && odp_atomic_load_u64(&sched->order[queue_index].ctx) != odp_atomic_load_u64(&sched->order[queue_index].next_ctx)) ODP_ERR("queue reorder incomplete\n"); @@ -623,21 +609,26 @@ static void schedule_pktio_start(int pktio_index, int num_pktin, } } -static void schedule_release_atomic(void) +static inline void release_atomic(void) { - uint32_t qi = sched_local.stash.qi; + uint32_t qi = sched_local.stash.qi; + int grp = sched->queue[qi].grp; + int prio = sched->queue[qi].prio; + int spread = sched->queue[qi].spread; + ring_t *ring = &sched->prio_q[grp][prio][spread].ring; - if (qi != PRIO_QUEUE_EMPTY && sched_local.stash.num_ev == 0) { - int grp = sched->queue[qi].grp; - int prio = sched->queue[qi].prio; - int spread = sched->queue[qi].spread; - ring_t *ring = &sched->prio_q[grp][prio][spread].ring; + /* Release current atomic queue */ + ring_enq(ring, sched->ring_mask, qi); - /* Release current atomic queue */ - ring_enq(ring, sched->ring_mask, qi); + /* We don't hold sync context anymore */ + sched_local.sync_ctx = NO_SYNC_CONTEXT; +} - sched_local.stash.qi = PRIO_QUEUE_EMPTY; - } +static void schedule_release_atomic(void) +{ + if (sched_local.sync_ctx == ODP_SCHED_SYNC_ATOMIC && + sched_local.stash.num_ev == 0) + release_atomic(); } static inline int ordered_own_turn(uint32_t queue_index) @@ -709,9 +700,11 @@ static inline void release_ordered(void) } sched_local.ordered.lock_called.all = 0; - sched_local.ordered.src_queue = NULL_INDEX; sched_local.ordered.in_order = 0; + /* We don't hold sync context anymore */ + sched_local.sync_ctx = NO_SYNC_CONTEXT; + ordered_stash_release(); /* Next thread can continue processing */ @@ -720,23 +713,26 @@ static inline void release_ordered(void) static void schedule_release_ordered(void) { - uint32_t queue_index; - - queue_index = sched_local.ordered.src_queue; - - if (odp_unlikely((queue_index == NULL_INDEX) || + if (odp_unlikely((sched_local.sync_ctx != ODP_SCHED_SYNC_ORDERED) || sched_local.stash.num_ev)) return; release_ordered(); } -static inline void schedule_release_context(void) +static int schedule_term_local(void) { - if (sched_local.ordered.src_queue != NULL_INDEX) - release_ordered(); - else + if (sched_local.stash.num_ev) { + ODP_ERR("Locally pre-scheduled events exist.\n"); + return -1; + } + + if (sched_local.sync_ctx == ODP_SCHED_SYNC_ATOMIC) schedule_release_atomic(); + else if (sched_local.sync_ctx == ODP_SCHED_SYNC_ORDERED) + schedule_release_ordered(); + + return 0; } static inline int copy_from_stash(odp_event_t out_ev[], unsigned int max) @@ -758,13 +754,22 @@ static int schedule_ord_enq_multi(odp_queue_t dst_queue, void *buf_hdr[], int num, int *ret) { int i; - uint32_t stash_num = sched_local.ordered.stash_num; - queue_entry_t *dst_qentry = qentry_from_handle(dst_queue); - uint32_t src_queue = sched_local.ordered.src_queue; + uint32_t stash_num; + queue_entry_t *dst_qentry; + uint32_t src_queue; - if ((src_queue == NULL_INDEX) || sched_local.ordered.in_order) + /* This check is done for every queue enqueue operation, also for plain + * queues. Return fast when not holding a scheduling context. */ + if (odp_likely(sched_local.sync_ctx != ODP_SCHED_SYNC_ORDERED)) return 0; + if (sched_local.ordered.in_order) + return 0; + + src_queue = sched_local.ordered.src_queue; + stash_num = sched_local.ordered.stash_num; + dst_qentry = qentry_from_handle(dst_queue); + if (ordered_own_turn(src_queue)) { /* Own turn, so can do enqueue directly. */ sched_local.ordered.in_order = 1; @@ -891,7 +896,7 @@ static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[], for (i = 0; i < num_spread;) { int num; - int ordered; + uint8_t sync_ctx, ordered; odp_queue_t handle; ring_t *ring; int pktin; @@ -921,7 +926,8 @@ static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[], continue; } - ordered = queue_is_ordered(qi); + sync_ctx = sched_sync_type(qi); + ordered = (sync_ctx == ODP_SCHED_SYNC_ORDERED); /* When application's array is larger than max burst * size, output all events directly there. Also, ordered @@ -989,10 +995,12 @@ static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[], /* Continue scheduling ordered queues */ ring_enq(ring, ring_mask, qi); + sched_local.sync_ctx = sync_ctx; - } else if (queue_is_atomic(qi)) { + } else if (sync_ctx == ODP_SCHED_SYNC_ATOMIC) { /* Hold queue during atomic access */ sched_local.stash.qi = qi; + sched_local.sync_ctx = sync_ctx; } else { /* Continue scheduling the queue */ ring_enq(ring, ring_mask, qi); @@ -1042,7 +1050,11 @@ static inline int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[], return ret; } - schedule_release_context(); + /* Release schedule context */ + if (sched_local.sync_ctx == ODP_SCHED_SYNC_ATOMIC) + release_atomic(); + else if (sched_local.sync_ctx == ODP_SCHED_SYNC_ORDERED) + release_ordered(); if (odp_unlikely(sched_local.pause)) return 0; @@ -1141,14 +1153,10 @@ static int schedule_multi(odp_queue_t *out_queue, uint64_t wait, static inline void order_lock(void) { - uint32_t queue_index; - - queue_index = sched_local.ordered.src_queue; - - if (queue_index == NULL_INDEX) + if (sched_local.sync_ctx != ODP_SCHED_SYNC_ORDERED) return; - wait_for_order(queue_index); + wait_for_order(sched_local.ordered.src_queue); } static void order_unlock(void) @@ -1160,6 +1168,9 @@ static void schedule_order_lock(uint32_t lock_index) odp_atomic_u64_t *ord_lock; uint32_t queue_index; + if (sched_local.sync_ctx != ODP_SCHED_SYNC_ORDERED) + return; + queue_index = sched_local.ordered.src_queue; ODP_ASSERT(queue_index != NULL_INDEX && @@ -1187,6 +1198,9 @@ static void schedule_order_unlock(uint32_t lock_index) odp_atomic_u64_t *ord_lock; uint32_t queue_index; + if (sched_local.sync_ctx != ODP_SCHED_SYNC_ORDERED) + return; + queue_index = sched_local.ordered.src_queue; ODP_ASSERT(queue_index != NULL_INDEX &&