Add new implementation for ordered queues. Compared to the old
implementation this is much simpler and improves performance ~1-4x
depending on the test case.

The implementation is based on an atomic ordered context, which only a
single thread may possess at a time. Only the thread owning the atomic
context may do enqueue(s) from the ordered queue. All other threads put
their enqueued events to a thread local enqueue stash (ordered_stash_t).
All stashed enqueue operations will be performed in the original order when
the thread acquires the ordered context. If the ordered stash becomes full,
the enqueue blocks. At the latest a thread blocks when the ev_stash is
empty and the thread tries to release the order context.

Signed-off-by: Matias Elo <matias....@nokia.com>
---

V2:
- Support multiple ordered locks (Bill)
- Refactored code

 .../linux-generic/include/odp_queue_internal.h     |   5 +
 platform/linux-generic/odp_queue.c                 |  14 +-
 platform/linux-generic/odp_schedule.c              | 173 +++++++++++++++++++--
 3 files changed, 174 insertions(+), 18 deletions(-)

diff --git a/platform/linux-generic/include/odp_queue_internal.h 
b/platform/linux-generic/include/odp_queue_internal.h
index df36b76..b905bd8 100644
--- a/platform/linux-generic/include/odp_queue_internal.h
+++ b/platform/linux-generic/include/odp_queue_internal.h
@@ -56,6 +56,11 @@ struct queue_entry_s {
        odp_buffer_hdr_t *tail;
        int               status;
 
+       struct {
+               odp_atomic_u64_t  ctx; /**< Current ordered context id */
+               odp_atomic_u64_t  next_ctx; /**< Next unallocated context id */
+       } ordered ODP_ALIGNED_CACHE;
+
        enq_func_t       enqueue ODP_ALIGNED_CACHE;
        deq_func_t       dequeue;
        enq_multi_func_t enqueue_multi;
diff --git a/platform/linux-generic/odp_queue.c 
b/platform/linux-generic/odp_queue.c
index 99c91e7..4c7f497 100644
--- a/platform/linux-generic/odp_queue.c
+++ b/platform/linux-generic/odp_queue.c
@@ -73,9 +73,14 @@ static int queue_init(queue_entry_t *queue, const char *name,
        if (queue->s.param.sched.lock_count > sched_fn->max_ordered_locks())
                return -1;
 
-       if (param->type == ODP_QUEUE_TYPE_SCHED)
+       if (param->type == ODP_QUEUE_TYPE_SCHED) {
                queue->s.param.deq_mode = ODP_QUEUE_OP_DISABLED;
 
+               if (param->sched.sync == ODP_SCHED_SYNC_ORDERED) {
+                       odp_atomic_init_u64(&queue->s.ordered.ctx, 0);
+                       odp_atomic_init_u64(&queue->s.ordered.next_ctx, 0);
+               }
+       }
        queue->s.type = queue->s.param.type;
 
        queue->s.enqueue = queue_enq;
@@ -301,6 +306,13 @@ int odp_queue_destroy(odp_queue_t handle)
                ODP_ERR("queue \"%s\" not empty\n", queue->s.name);
                return -1;
        }
+       if (queue_is_ordered(queue) &&
+           odp_atomic_load_u64(&queue->s.ordered.ctx) !=
+                           odp_atomic_load_u64(&queue->s.ordered.next_ctx)) {
+               UNLOCK(&queue->s.lock);
+               ODP_ERR("queue \"%s\" reorder incomplete\n", queue->s.name);
+               return -1;
+       }
 
        switch (queue->s.status) {
        case QUEUE_STATUS_READY:
diff --git a/platform/linux-generic/odp_schedule.c 
b/platform/linux-generic/odp_schedule.c
index 5bc274f..e9cd746 100644
--- a/platform/linux-generic/odp_schedule.c
+++ b/platform/linux-generic/odp_schedule.c
@@ -111,11 +111,21 @@ ODP_STATIC_ASSERT((8 * sizeof(pri_mask_t)) >= 
QUEUES_PER_PRIO,
 #define MAX_DEQ CONFIG_BURST_SIZE
 
 /* Maximum number of ordered locks per queue */
-#define MAX_ORDERED_LOCKS_PER_QUEUE 1
+#define MAX_ORDERED_LOCKS_PER_QUEUE 2
 
 ODP_STATIC_ASSERT(MAX_ORDERED_LOCKS_PER_QUEUE <= CONFIG_QUEUE_MAX_ORD_LOCKS,
                  "Too_many_ordered_locks");
 
+/* Ordered stash size */
+#define MAX_ORDERED_STASH 512
+
+/* Storage for stashed enqueue operation arguments */
+typedef struct {
+       odp_buffer_hdr_t *buf_hdr[QUEUE_MULTI_MAX];
+       queue_entry_t *queue;
+       int num;
+} ordered_stash_t;
+
 /* Scheduler local data */
 typedef struct {
        int thr;
@@ -128,7 +138,15 @@ typedef struct {
        uint32_t queue_index;
        odp_queue_t queue;
        odp_event_t ev_stash[MAX_DEQ];
-       void *queue_entry;
+       struct {
+               queue_entry_t *src_queue; /**< Source queue entry */
+               uint64_t ctx; /**< Ordered context id */
+               /** Storage for stashed enqueue operations */
+               ordered_stash_t stash[MAX_ORDERED_STASH];
+               int stash_num; /**< Number of stashed enqueue operations */
+               uint8_t in_order; /**< Order status */
+       } ordered;
+
 } sched_local_t;
 
 /* Priority queue */
@@ -491,17 +509,81 @@ static void schedule_release_atomic(void)
        }
 }
 
+static inline int ordered_own_turn(queue_entry_t *queue)
+{
+       uint64_t ctx;
+
+       ctx = odp_atomic_load_acq_u64(&queue->s.ordered.ctx);
+
+       return ctx == sched_local.ordered.ctx;
+}
+
+static inline void wait_for_order(queue_entry_t *queue)
+{
+       /* Busy loop to synchronize ordered processing */
+       while (1) {
+               if (ordered_own_turn(queue))
+                       break;
+               odp_cpu_pause();
+       }
+}
+
+/**
+ * Perform stashed enqueue operations
+ *
+ * Should be called only when already in order.
+ */
+static inline void ordered_stash_release(void)
+{
+       int i;
+
+       for (i = 0; i < sched_local.ordered.stash_num; i++) {
+               queue_entry_t *queue;
+               odp_buffer_hdr_t **buf_hdr;
+               int num;
+
+               queue = sched_local.ordered.stash[i].queue;
+               buf_hdr = sched_local.ordered.stash[i].buf_hdr;
+               num = sched_local.ordered.stash[i].num;
+
+               queue_enq_multi(queue, buf_hdr, num);
+       }
+       sched_local.ordered.stash_num = 0;
+}
+
+static inline void release_ordered(void)
+{
+       queue_entry_t *queue;
+
+       queue = sched_local.ordered.src_queue;
+
+       wait_for_order(queue);
+
+       sched_local.ordered.src_queue = NULL;
+       sched_local.ordered.in_order = 0;
+
+       ordered_stash_release();
+
+       /* Next thread can continue processing */
+       odp_atomic_add_rel_u64(&queue->s.ordered.ctx, 1);
+}
+
 static void schedule_release_ordered(void)
 {
-       /* Process ordered queue as atomic */
-       schedule_release_atomic();
-       sched_local.queue_entry = NULL;
+       queue_entry_t *queue;
+
+       queue = sched_local.ordered.src_queue;
+
+       if (odp_unlikely(!queue || sched_local.num))
+               return;
+
+       release_ordered();
 }
 
 static inline void schedule_release_context(void)
 {
-       if (sched_local.queue_entry != NULL)
-               schedule_release_ordered();
+       if (sched_local.ordered.src_queue != NULL)
+               release_ordered();
        else
                schedule_release_atomic();
 }
@@ -524,13 +606,41 @@ static inline int copy_events(odp_event_t out_ev[], 
unsigned int max)
 static int schedule_ord_enq_multi(uint32_t queue_index, void *buf_hdr[],
                                  int num, int *ret)
 {
-       (void)queue_index;
-       (void)buf_hdr;
-       (void)num;
-       (void)ret;
+       int i;
+       uint32_t stash_num = sched_local.ordered.stash_num;
+       queue_entry_t *dst_queue = get_qentry(queue_index);
+       queue_entry_t *src_queue = sched_local.ordered.src_queue;
 
-       /* didn't consume the events */
-       return 0;
+       if (!sched_local.ordered.src_queue || sched_local.ordered.in_order)
+               return 0;
+
+       if (ordered_own_turn(src_queue)) {
+               /* Own turn, so can do enqueue directly. */
+               sched_local.ordered.in_order = 1;
+               ordered_stash_release();
+               return 0;
+       }
+
+       if (odp_unlikely(stash_num >=  MAX_ORDERED_STASH)) {
+               /* If the local stash is full, wait until it is our turn and
+                * then release the stash and do enqueue directly. */
+               wait_for_order(src_queue);
+
+               sched_local.ordered.in_order = 1;
+
+               ordered_stash_release();
+               return 0;
+       }
+
+       sched_local.ordered.stash[stash_num].queue = dst_queue;
+       sched_local.ordered.stash[stash_num].num = num;
+       for (i = 0; i < num; i++)
+               sched_local.ordered.stash[stash_num].buf_hdr[i] = buf_hdr[i];
+
+       sched_local.ordered.stash_num++;
+
+       *ret = num;
+       return 1;
 }
 
 /*
@@ -658,9 +768,21 @@ static int do_schedule(odp_queue_t *out_queue, odp_event_t 
out_ev[],
                        ret = copy_events(out_ev, max_num);
 
                        if (ordered) {
-                               /* Operate as atomic */
-                               sched_local.queue_index = qi;
-                               sched_local.queue_entry = get_qentry(qi);
+                               uint64_t ctx;
+                               queue_entry_t *queue;
+                               odp_atomic_u64_t *next_ctx;
+
+                               queue = get_qentry(qi);
+                               next_ctx = &queue->s.ordered.next_ctx;
+
+                               ctx = odp_atomic_fetch_inc_u64(next_ctx);
+
+                               sched_local.ordered.ctx = ctx;
+                               sched_local.ordered.src_queue = queue;
+
+                               /* Continue scheduling ordered queues */
+                               ring_enq(ring, PRIO_QUEUE_MASK, qi);
+
                        } else if (sched_cb_queue_is_atomic(qi)) {
                                /* Hold queue during atomic access */
                                sched_local.queue_index = qi;
@@ -785,8 +907,16 @@ static int schedule_multi(odp_queue_t *out_queue, uint64_t 
wait,
        return schedule_loop(out_queue, wait, events, num);
 }
 
-static void order_lock(void)
+static inline void order_lock(void)
 {
+       queue_entry_t *queue;
+
+       queue = sched_local.ordered.src_queue;
+
+       if (!queue)
+               return;
+
+       wait_for_order(queue);
 }
 
 static void order_unlock(void)
@@ -795,6 +925,15 @@ static void order_unlock(void)
 
 static void schedule_order_lock(unsigned lock_index ODP_UNUSED)
 {
+       queue_entry_t *queue;
+
+       queue = sched_local.ordered.src_queue;
+       if (!queue || lock_index >= queue->s.param.sched.lock_count) {
+               ODP_ERR("Invalid ordered lock usage\n");
+               return;
+       }
+
+       wait_for_order(queue);
 }
 
 static void schedule_order_unlock(unsigned lock_index ODP_UNUSED)
-- 
2.7.4

Reply via email to