From: Petri Savolainen <petri.savolai...@linaro.org>

Use single thread local variable to keep track if
a synchronization context is held and the type of the context
(atomic or ordered). Performance is improved as sync context
status is located on single (the first) cache line of
sched_local_t.

Signed-off-by: Petri Savolainen <petri.savolai...@linaro.org>
---
/** Email created from pull request 699 (psavol:master-sched-optim-clean-ups)
 ** https://github.com/Linaro/odp/pull/699
 ** Patch: https://github.com/Linaro/odp/pull/699.patch
 ** Base sha: 33fbc04b6373960ec3f84de4e7e7b34c49d71508
 ** Merge commit sha: 32d7a11f22e6f2e1e378b653993c5377d4116d8f
 **/
 platform/linux-generic/odp_schedule_basic.c | 130 +++++++++++---------
 1 file changed, 72 insertions(+), 58 deletions(-)

diff --git a/platform/linux-generic/odp_schedule_basic.c 
b/platform/linux-generic/odp_schedule_basic.c
index 89c0a5c42..46ae7f1c1 100644
--- a/platform/linux-generic/odp_schedule_basic.c
+++ b/platform/linux-generic/odp_schedule_basic.c
@@ -30,6 +30,9 @@
 #include <odp_libconfig_internal.h>
 #include <odp/api/plat/queue_inlines.h>
 
+/* No synchronization context */
+#define NO_SYNC_CONTEXT ODP_SCHED_SYNC_PARALLEL
+
 /* Number of priority levels  */
 #define NUM_PRIO 8
 
@@ -124,7 +127,8 @@ ODP_STATIC_ASSERT(sizeof(lock_called_t) == sizeof(uint32_t),
 /* Scheduler local data */
 typedef struct ODP_ALIGNED_CACHE {
        uint16_t thr;
-       uint16_t pause;
+       uint8_t  pause;
+       uint8_t  sync_ctx;
        uint16_t grp_round;
        uint16_t spread_round;
 
@@ -241,9 +245,6 @@ static sched_global_t *sched;
 /* Thread local scheduler context */
 static __thread sched_local_t sched_local;
 
-/* Function prototypes */
-static inline void schedule_release_context(void);
-
 static int read_config_file(sched_global_t *sched)
 {
        const char *str;
@@ -311,6 +312,7 @@ static void sched_local_init(void)
        memset(&sched_local, 0, sizeof(sched_local_t));
 
        sched_local.thr         = odp_thread_id();
+       sched_local.sync_ctx    = NO_SYNC_CONTEXT;
        sched_local.stash.queue = ODP_QUEUE_INVALID;
        sched_local.stash.qi    = PRIO_QUEUE_EMPTY;
        sched_local.ordered.src_queue = NULL_INDEX;
@@ -450,17 +452,6 @@ static int schedule_init_local(void)
        return 0;
 }
 
-static int schedule_term_local(void)
-{
-       if (sched_local.stash.num_ev) {
-               ODP_ERR("Locally pre-scheduled events exist.\n");
-               return -1;
-       }
-
-       schedule_release_context();
-       return 0;
-}
-
 static inline void grp_update_mask(int grp, const odp_thrmask_t *new_mask)
 {
        odp_thrmask_copy(&sched->sched_grp[grp].mask, new_mask);
@@ -565,14 +556,9 @@ static int schedule_init_queue(uint32_t queue_index,
        return 0;
 }
 
-static inline int queue_is_atomic(uint32_t queue_index)
+static inline uint8_t sched_sync_type(uint32_t queue_index)
 {
-       return sched->queue[queue_index].sync == ODP_SCHED_SYNC_ATOMIC;
-}
-
-static inline int queue_is_ordered(uint32_t queue_index)
-{
-       return sched->queue[queue_index].sync == ODP_SCHED_SYNC_ORDERED;
+       return sched->queue[queue_index].sync;
 }
 
 static void schedule_destroy_queue(uint32_t queue_index)
@@ -584,7 +570,7 @@ static void schedule_destroy_queue(uint32_t queue_index)
        sched->queue[queue_index].prio   = 0;
        sched->queue[queue_index].spread = 0;
 
-       if (queue_is_ordered(queue_index) &&
+       if ((sched_sync_type(queue_index) == ODP_SCHED_SYNC_ORDERED) &&
            odp_atomic_load_u64(&sched->order[queue_index].ctx) !=
            odp_atomic_load_u64(&sched->order[queue_index].next_ctx))
                ODP_ERR("queue reorder incomplete\n");
@@ -623,21 +609,26 @@ static void schedule_pktio_start(int pktio_index, int 
num_pktin,
        }
 }
 
-static void schedule_release_atomic(void)
+static inline void release_atomic(void)
 {
-       uint32_t qi = sched_local.stash.qi;
+       uint32_t qi  = sched_local.stash.qi;
+       int grp      = sched->queue[qi].grp;
+       int prio     = sched->queue[qi].prio;
+       int spread   = sched->queue[qi].spread;
+       ring_t *ring = &sched->prio_q[grp][prio][spread].ring;
 
-       if (qi != PRIO_QUEUE_EMPTY && sched_local.stash.num_ev  == 0) {
-               int grp      = sched->queue[qi].grp;
-               int prio     = sched->queue[qi].prio;
-               int spread   = sched->queue[qi].spread;
-               ring_t *ring = &sched->prio_q[grp][prio][spread].ring;
+       /* Release current atomic queue */
+       ring_enq(ring, sched->ring_mask, qi);
 
-               /* Release current atomic queue */
-               ring_enq(ring, sched->ring_mask, qi);
+       /* We don't hold sync context anymore */
+       sched_local.sync_ctx = NO_SYNC_CONTEXT;
+}
 
-               sched_local.stash.qi = PRIO_QUEUE_EMPTY;
-       }
+static void schedule_release_atomic(void)
+{
+       if (sched_local.sync_ctx == ODP_SCHED_SYNC_ATOMIC &&
+           sched_local.stash.num_ev == 0)
+               release_atomic();
 }
 
 static inline int ordered_own_turn(uint32_t queue_index)
@@ -709,9 +700,11 @@ static inline void release_ordered(void)
        }
 
        sched_local.ordered.lock_called.all = 0;
-       sched_local.ordered.src_queue = NULL_INDEX;
        sched_local.ordered.in_order = 0;
 
+       /* We don't hold sync context anymore */
+       sched_local.sync_ctx = NO_SYNC_CONTEXT;
+
        ordered_stash_release();
 
        /* Next thread can continue processing */
@@ -720,23 +713,26 @@ static inline void release_ordered(void)
 
 static void schedule_release_ordered(void)
 {
-       uint32_t queue_index;
-
-       queue_index = sched_local.ordered.src_queue;
-
-       if (odp_unlikely((queue_index == NULL_INDEX) ||
+       if (odp_unlikely((sched_local.sync_ctx != ODP_SCHED_SYNC_ORDERED) ||
                         sched_local.stash.num_ev))
                return;
 
        release_ordered();
 }
 
-static inline void schedule_release_context(void)
+static int schedule_term_local(void)
 {
-       if (sched_local.ordered.src_queue != NULL_INDEX)
-               release_ordered();
-       else
+       if (sched_local.stash.num_ev) {
+               ODP_ERR("Locally pre-scheduled events exist.\n");
+               return -1;
+       }
+
+       if (sched_local.sync_ctx == ODP_SCHED_SYNC_ATOMIC)
                schedule_release_atomic();
+       else if (sched_local.sync_ctx == ODP_SCHED_SYNC_ORDERED)
+               schedule_release_ordered();
+
+       return 0;
 }
 
 static inline int copy_from_stash(odp_event_t out_ev[], unsigned int max)
@@ -758,13 +754,22 @@ static int schedule_ord_enq_multi(odp_queue_t dst_queue, 
void *buf_hdr[],
                                  int num, int *ret)
 {
        int i;
-       uint32_t stash_num = sched_local.ordered.stash_num;
-       queue_entry_t *dst_qentry = qentry_from_handle(dst_queue);
-       uint32_t src_queue = sched_local.ordered.src_queue;
+       uint32_t stash_num;
+       queue_entry_t *dst_qentry;
+       uint32_t src_queue;
 
-       if ((src_queue == NULL_INDEX) || sched_local.ordered.in_order)
+       /* This check is done for every queue enqueue operation, also for plain
+        * queues. Return fast when not holding a scheduling context. */
+       if (odp_likely(sched_local.sync_ctx != ODP_SCHED_SYNC_ORDERED))
                return 0;
 
+       if (sched_local.ordered.in_order)
+               return 0;
+
+       src_queue  = sched_local.ordered.src_queue;
+       stash_num  = sched_local.ordered.stash_num;
+       dst_qentry = qentry_from_handle(dst_queue);
+
        if (ordered_own_turn(src_queue)) {
                /* Own turn, so can do enqueue directly. */
                sched_local.ordered.in_order = 1;
@@ -891,7 +896,7 @@ static inline int do_schedule_grp(odp_queue_t *out_queue, 
odp_event_t out_ev[],
 
                for (i = 0; i < num_spread;) {
                        int num;
-                       int ordered;
+                       uint8_t sync_ctx, ordered;
                        odp_queue_t handle;
                        ring_t *ring;
                        int pktin;
@@ -921,7 +926,8 @@ static inline int do_schedule_grp(odp_queue_t *out_queue, 
odp_event_t out_ev[],
                                continue;
                        }
 
-                       ordered = queue_is_ordered(qi);
+                       sync_ctx = sched_sync_type(qi);
+                       ordered  = (sync_ctx == ODP_SCHED_SYNC_ORDERED);
 
                        /* When application's array is larger than max burst
                         * size, output all events directly there. Also, ordered
@@ -989,10 +995,12 @@ static inline int do_schedule_grp(odp_queue_t *out_queue, 
odp_event_t out_ev[],
 
                                /* Continue scheduling ordered queues */
                                ring_enq(ring, ring_mask, qi);
+                               sched_local.sync_ctx = sync_ctx;
 
-                       } else if (queue_is_atomic(qi)) {
+                       } else if (sync_ctx == ODP_SCHED_SYNC_ATOMIC) {
                                /* Hold queue during atomic access */
                                sched_local.stash.qi = qi;
+                               sched_local.sync_ctx = sync_ctx;
                        } else {
                                /* Continue scheduling the queue */
                                ring_enq(ring, ring_mask, qi);
@@ -1042,7 +1050,11 @@ static inline int do_schedule(odp_queue_t *out_queue, 
odp_event_t out_ev[],
                return ret;
        }
 
-       schedule_release_context();
+       /* Release schedule context */
+       if (sched_local.sync_ctx == ODP_SCHED_SYNC_ATOMIC)
+               release_atomic();
+       else if (sched_local.sync_ctx == ODP_SCHED_SYNC_ORDERED)
+               release_ordered();
 
        if (odp_unlikely(sched_local.pause))
                return 0;
@@ -1141,14 +1153,10 @@ static int schedule_multi(odp_queue_t *out_queue, 
uint64_t wait,
 
 static inline void order_lock(void)
 {
-       uint32_t queue_index;
-
-       queue_index = sched_local.ordered.src_queue;
-
-       if (queue_index == NULL_INDEX)
+       if (sched_local.sync_ctx != ODP_SCHED_SYNC_ORDERED)
                return;
 
-       wait_for_order(queue_index);
+       wait_for_order(sched_local.ordered.src_queue);
 }
 
 static void order_unlock(void)
@@ -1160,6 +1168,9 @@ static void schedule_order_lock(uint32_t lock_index)
        odp_atomic_u64_t *ord_lock;
        uint32_t queue_index;
 
+       if (sched_local.sync_ctx != ODP_SCHED_SYNC_ORDERED)
+               return;
+
        queue_index = sched_local.ordered.src_queue;
 
        ODP_ASSERT(queue_index != NULL_INDEX &&
@@ -1187,6 +1198,9 @@ static void schedule_order_unlock(uint32_t lock_index)
        odp_atomic_u64_t *ord_lock;
        uint32_t queue_index;
 
+       if (sched_local.sync_ctx != ODP_SCHED_SYNC_ORDERED)
+               return;
+
        queue_index = sched_local.ordered.src_queue;
 
        ODP_ASSERT(queue_index != NULL_INDEX &&

Reply via email to