Signed-off-by: Bill Fischofer <[email protected]>
---
 .../linux-generic/include/odp_buffer_internal.h    |  3 ++
 .../linux-generic/include/odp_queue_internal.h     |  3 ++
 platform/linux-generic/odp_queue.c                 | 48 ++++++++++++++++++++++
 platform/linux-generic/odp_schedule.c              |  7 ++++
 4 files changed, 61 insertions(+)

diff --git a/platform/linux-generic/include/odp_buffer_internal.h 
b/platform/linux-generic/include/odp_buffer_internal.h
index ca4d314..6badeba 100644
--- a/platform/linux-generic/include/odp_buffer_internal.h
+++ b/platform/linux-generic/include/odp_buffer_internal.h
@@ -140,7 +140,10 @@ typedef struct odp_buffer_hdr_t {
        void                    *addr[ODP_BUFFER_MAX_SEG]; /* block addrs */
        uint64_t                 order;      /* sequence for ordered queues */
        queue_entry_t           *origin_qe;  /* ordered queue origin */
+       union {
                queue_entry_t   *target_qe;  /* ordered queue target */
+               uint64_t         sync;       /* for ordered synchronization */
+       };
 } odp_buffer_hdr_t;
 
 /** @internal Compile time assert that the
diff --git a/platform/linux-generic/include/odp_queue_internal.h 
b/platform/linux-generic/include/odp_queue_internal.h
index 163172c..4cee9b6 100644
--- a/platform/linux-generic/include/odp_queue_internal.h
+++ b/platform/linux-generic/include/odp_queue_internal.h
@@ -82,6 +82,8 @@ struct queue_entry_s {
        uint64_t          order_out;
        odp_buffer_hdr_t *reorder_head;
        odp_buffer_hdr_t *reorder_tail;
+       odp_atomic_u64_t  sync_in;
+       odp_atomic_u64_t  sync_out;
 };
 
 typedef union queue_entry_u {
@@ -120,6 +122,7 @@ int queue_sched_atomic(odp_queue_t handle);
 int release_order(queue_entry_t *origin_qe, uint64_t order,
                  odp_pool_t pool, int enq_called);
 void get_sched_order(queue_entry_t **origin_qe, uint64_t *order);
+void get_sched_sync(queue_entry_t **origin_qe, uint64_t **sync);
 void sched_enq_called(void);
 void sched_order_resolved(odp_buffer_hdr_t *buf_hdr);
 
diff --git a/platform/linux-generic/odp_queue.c 
b/platform/linux-generic/odp_queue.c
index 09b0398..1bd0de6 100644
--- a/platform/linux-generic/odp_queue.c
+++ b/platform/linux-generic/odp_queue.c
@@ -123,6 +123,8 @@ int odp_queue_init_global(void)
                /* init locks */
                queue_entry_t *queue = get_qentry(i);
                LOCK_INIT(&queue->s.lock);
+               odp_atomic_init_u64(&queue->s.sync_in, 0);
+               odp_atomic_init_u64(&queue->s.sync_out, 0);
                queue->s.handle = queue_from_id(i);
        }
 
@@ -599,6 +601,7 @@ odp_buffer_hdr_t *queue_deq(queue_entry_t *queue)
        if (queue_is_ordered(queue)) {
                buf_hdr->origin_qe = queue;
                buf_hdr->order = queue->s.order_in++;
+               buf_hdr->sync  = odp_atomic_fetch_inc_u64(&queue->s.sync_in);
                buf_hdr->flags.sustain = 0;
        } else {
                buf_hdr->origin_qe = NULL;
@@ -646,6 +649,8 @@ int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t 
*buf_hdr[], int num)
                if (queue_is_ordered(queue)) {
                        buf_hdr[i]->origin_qe = queue;
                        buf_hdr[i]->order     = queue->s.order_in++;
+                       buf_hdr[i]->sync =
+                               odp_atomic_fetch_inc_u64(&queue->s.sync_in);
                        buf_hdr[i]->flags.sustain = 0;
                } else {
                        buf_hdr[i]->origin_qe = NULL;
@@ -960,3 +965,46 @@ int release_order(queue_entry_t *origin_qe, uint64_t order,
        UNLOCK(&origin_qe->s.lock);
        return 0;
 }
+
+/* This routine is a no-op in linux-generic */
+int odp_schedule_order_lock_init(odp_schedule_order_lock_t *lock ODP_UNUSED,
+                                odp_queue_t queue ODP_UNUSED)
+{
+       return 0;
+}
+
+void odp_schedule_order_lock(odp_schedule_order_lock_t *lock ODP_UNUSED)
+{
+       queue_entry_t *origin_qe;
+       uint64_t *sync;
+
+       get_sched_sync(&origin_qe, &sync);
+       if (!origin_qe)
+               return;
+
+       /* Wait until we are in order. Note that sync_out will be incremented
+        * both by unlocks as well as order resolution, so we're OK if only
+        * some events in the ordered flow need to lock.
+        */
+       while (*sync > odp_atomic_load_u64(&origin_qe->s.sync_out))
+               odp_spin();
+}
+
+void odp_schedule_order_unlock(odp_schedule_order_lock_t *lock ODP_UNUSED)
+{
+       queue_entry_t *origin_qe;
+       uint64_t *sync;
+
+       get_sched_sync(&origin_qe, &sync);
+       if (!origin_qe)
+               return;
+
+       /* Get a new sync order for reusability, and release the lock. Note
+        * that this must be done in this sequence to prevent race conditions
+        * where the next waiter could lock and unlock before we're able to
+        * get a new sync order since that would cause order inversion on
+        * subsequent locks we may perform in this ordered context.
+        */
+       *sync = odp_atomic_fetch_inc_u64(&origin_qe->s.sync_in);
+       odp_atomic_fetch_inc_u64(&origin_qe->s.sync_out);
+}
diff --git a/platform/linux-generic/odp_schedule.c 
b/platform/linux-generic/odp_schedule.c
index e80cec1..51f7adc 100644
--- a/platform/linux-generic/odp_schedule.c
+++ b/platform/linux-generic/odp_schedule.c
@@ -84,6 +84,7 @@ typedef struct {
        queue_entry_t *qe;
        queue_entry_t *origin_qe;
        uint64_t order;
+       uint64_t sync;
        odp_pool_t pool;
        int enq_called;
        int num;
@@ -796,6 +797,12 @@ void get_sched_order(queue_entry_t **origin_qe, uint64_t 
*order)
        *order     = sched_local.order;
 }
 
+void get_sched_sync(queue_entry_t **origin_qe, uint64_t **sync)
+{
+       *origin_qe = sched_local.origin_qe;
+       *sync      = &sched_local.sync;
+}
+
 void sched_order_resolved(odp_buffer_hdr_t *buf_hdr)
 {
        if (buf_hdr)
-- 
2.1.4

_______________________________________________
lng-odp mailing list
[email protected]
https://lists.linaro.org/mailman/listinfo/lng-odp

Reply via email to