Implement ordered locks using per lock atomic counters. The counter values
are compared to the queue’s atomic context to guarantee ordered locking.
Compared to the previous implementation this enables parallel processing of
ordered events outside of the lock context.

Signed-off-by: Matias Elo <matias....@nokia.com>
---
 .../linux-generic/include/odp_queue_internal.h     |  2 +
 platform/linux-generic/odp_queue.c                 |  6 +++
 platform/linux-generic/odp_schedule.c              | 49 ++++++++++++++++++++--
 3 files changed, 54 insertions(+), 3 deletions(-)

diff --git a/platform/linux-generic/include/odp_queue_internal.h 
b/platform/linux-generic/include/odp_queue_internal.h
index b905bd8..8b55de1 100644
--- a/platform/linux-generic/include/odp_queue_internal.h
+++ b/platform/linux-generic/include/odp_queue_internal.h
@@ -59,6 +59,8 @@ struct queue_entry_s {
        struct {
                odp_atomic_u64_t  ctx; /**< Current ordered context id */
                odp_atomic_u64_t  next_ctx; /**< Next unallocated context id */
+               /** Array of ordered locks */
+               odp_atomic_u64_t  lock[CONFIG_QUEUE_MAX_ORD_LOCKS];
        } ordered ODP_ALIGNED_CACHE;
 
        enq_func_t       enqueue ODP_ALIGNED_CACHE;
diff --git a/platform/linux-generic/odp_queue.c 
b/platform/linux-generic/odp_queue.c
index 4c7f497..d9cb9f3 100644
--- a/platform/linux-generic/odp_queue.c
+++ b/platform/linux-generic/odp_queue.c
@@ -77,8 +77,14 @@ static int queue_init(queue_entry_t *queue, const char *name,
                queue->s.param.deq_mode = ODP_QUEUE_OP_DISABLED;
 
                if (param->sched.sync == ODP_SCHED_SYNC_ORDERED) {
+                       unsigned i;
+
                        odp_atomic_init_u64(&queue->s.ordered.ctx, 0);
                        odp_atomic_init_u64(&queue->s.ordered.next_ctx, 0);
+
+                       for (i = 0; i < queue->s.param.sched.lock_count; i++)
+                               odp_atomic_init_u64(&queue->s.ordered.lock[i],
+                                                   0);
                }
        }
        queue->s.type = queue->s.param.type;
diff --git a/platform/linux-generic/odp_schedule.c 
b/platform/linux-generic/odp_schedule.c
index 4b33513..c628142 100644
--- a/platform/linux-generic/odp_schedule.c
+++ b/platform/linux-generic/odp_schedule.c
@@ -126,6 +126,15 @@ typedef struct {
        int num;
 } ordered_stash_t;
 
+/* Ordered lock states */
+typedef union {
+       uint8_t u8[CONFIG_QUEUE_MAX_ORD_LOCKS];
+       uint32_t all;
+} lock_called_t;
+
+ODP_STATIC_ASSERT(sizeof(lock_called_t) == sizeof(uint32_t),
+                 "Lock_called_values_do_not_fit_in_uint32");
+
 /* Scheduler local data */
 typedef struct {
        int thr;
@@ -145,6 +154,7 @@ typedef struct {
                ordered_stash_t stash[MAX_ORDERED_STASH];
                int stash_num; /**< Number of stashed enqueue operations */
                uint8_t in_order; /**< Order status */
+               lock_called_t lock_called; /**< States of ordered locks */
        } ordered;
 
 } sched_local_t;
@@ -553,12 +563,21 @@ static inline void ordered_stash_release(void)
 
 static inline void release_ordered(void)
 {
+       unsigned i;
        queue_entry_t *queue;
 
        queue = sched_local.ordered.src_queue;
 
        wait_for_order(queue);
 
+       /* Release all ordered locks */
+       for (i = 0; i < queue->s.param.sched.lock_count; i++) {
+               if (!sched_local.ordered.lock_called.u8[i])
+                       odp_atomic_store_rel_u64(&queue->s.ordered.lock[i],
+                                                sched_local.ordered.ctx + 1);
+       }
+
+       sched_local.ordered.lock_called.all = 0;
        sched_local.ordered.src_queue = NULL;
        sched_local.ordered.in_order = 0;
 
@@ -923,19 +942,43 @@ static void order_unlock(void)
 {
 }
 
-static void schedule_order_lock(unsigned lock_index ODP_UNUSED)
+static void schedule_order_lock(unsigned lock_index)
 {
+       odp_atomic_u64_t *ord_lock;
        queue_entry_t *queue;
 
        queue = sched_local.ordered.src_queue;
 
        ODP_ASSERT(queue && lock_index <= queue->s.param.sched.lock_count);
 
-       wait_for_order(queue);
+       ord_lock = &queue->s.ordered.lock[lock_index];
+
+       /* Busy loop to synchronize ordered processing */
+       while (1) {
+               uint64_t lock_seq;
+
+               lock_seq = odp_atomic_load_acq_u64(ord_lock);
+
+               if (lock_seq == sched_local.ordered.ctx) {
+                       sched_local.ordered.lock_called.u8[lock_index] = 1;
+                       return;
+               }
+               odp_cpu_pause();
+       }
 }
 
-static void schedule_order_unlock(unsigned lock_index ODP_UNUSED)
+static void schedule_order_unlock(unsigned lock_index)
 {
+       odp_atomic_u64_t *ord_lock;
+       queue_entry_t *queue;
+
+       queue = sched_local.ordered.src_queue;
+
+       ODP_ASSERT(queue && lock_index <= queue->s.param.sched.lock_count);
+
+       ord_lock = &queue->s.ordered.lock[lock_index];
+
+       odp_atomic_store_rel_u64(ord_lock, sched_local.ordered.ctx + 1);
 }
 
 static void schedule_pause(void)
-- 
2.7.4

Reply via email to