Signed-off-by: Bill Fischofer <[email protected]>
---
 .../linux-generic/include/odp_buffer_internal.h    |  12 +-
 .../linux-generic/include/odp_packet_io_queue.h    |   5 +-
 .../linux-generic/include/odp_queue_internal.h     | 177 +++++++-
 .../linux-generic/include/odp_schedule_internal.h  |   4 +-
 platform/linux-generic/odp_classification.c        |   2 +-
 platform/linux-generic/odp_packet_io.c             |  10 +-
 platform/linux-generic/odp_pool.c                  |   3 +
 platform/linux-generic/odp_queue.c                 | 443 ++++++++++++++++++++-
 platform/linux-generic/odp_schedule.c              |  74 +++-
 platform/linux-generic/pktio/loop.c                |   2 +-
 test/validation/scheduler/scheduler.c              |   8 +
 11 files changed, 694 insertions(+), 46 deletions(-)

diff --git a/platform/linux-generic/include/odp_buffer_internal.h 
b/platform/linux-generic/include/odp_buffer_internal.h
index ae799dd..ca4d314 100644
--- a/platform/linux-generic/include/odp_buffer_internal.h
+++ b/platform/linux-generic/include/odp_buffer_internal.h
@@ -103,16 +103,23 @@ typedef union odp_buffer_bits_t {
 
 /* forward declaration */
 struct odp_buffer_hdr_t;
+union queue_entry_u;
+typedef union queue_entry_u queue_entry_t;
 
 /* Common buffer header */
 typedef struct odp_buffer_hdr_t {
-       struct odp_buffer_hdr_t *next;       /* next buf in a list */
+       struct odp_buffer_hdr_t *next;       /* next buf in a list--keep 1st */
+       union {                              /* Multi-use secondary link */
+               struct odp_buffer_hdr_t *prev;
+               struct odp_buffer_hdr_t *link;
+       };
        odp_buffer_bits_t        handle;     /* handle */
        union {
                uint32_t all;
                struct {
                        uint32_t zeroized:1; /* Zeroize buf data on free */
                        uint32_t hdrdata:1;  /* Data is in buffer hdr */
+                       uint32_t sustain:1;  /* Sustain order */
                };
        } flags;
        int16_t                  allocator;  /* allocating thread id */
@@ -131,6 +138,9 @@ typedef struct odp_buffer_hdr_t {
        uint32_t                 segcount;   /* segment count */
        uint32_t                 segsize;    /* segment size */
        void                    *addr[ODP_BUFFER_MAX_SEG]; /* block addrs */
+       uint64_t                 order;      /* sequence for ordered queues */
+       queue_entry_t           *origin_qe;  /* ordered queue origin */
+               queue_entry_t   *target_qe;  /* ordered queue target */
 } odp_buffer_hdr_t;
 
 /** @internal Compile time assert that the
diff --git a/platform/linux-generic/include/odp_packet_io_queue.h 
b/platform/linux-generic/include/odp_packet_io_queue.h
index c3b8309..12e2b9f 100644
--- a/platform/linux-generic/include/odp_packet_io_queue.h
+++ b/platform/linux-generic/include/odp_packet_io_queue.h
@@ -27,10 +27,11 @@ extern "C" {
 _ODP_STATIC_ASSERT(ODP_PKTIN_QUEUE_MAX_BURST >= QUEUE_MULTI_MAX,
                   "ODP_PKTIN_DEQ_MULTI_MAX_ERROR");
 
-int pktin_enqueue(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr);
+int pktin_enqueue(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int 
sustain);
 odp_buffer_hdr_t *pktin_dequeue(queue_entry_t *queue);
 
-int pktin_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int 
num);
+int pktin_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num,
+                   int sustain);
 int pktin_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int 
num);
 
 
diff --git a/platform/linux-generic/include/odp_queue_internal.h 
b/platform/linux-generic/include/odp_queue_internal.h
index 61d0c43..163172c 100644
--- a/platform/linux-generic/include/odp_queue_internal.h
+++ b/platform/linux-generic/include/odp_queue_internal.h
@@ -23,6 +23,7 @@ extern "C" {
 #include <odp_align_internal.h>
 #include <odp/packet_io.h>
 #include <odp/align.h>
+#include <odp/hints.h>
 
 
 #define USE_TICKETLOCK
@@ -45,11 +46,11 @@ extern "C" {
 /* forward declaration */
 union queue_entry_u;
 
-typedef int (*enq_func_t)(union queue_entry_u *, odp_buffer_hdr_t *);
+typedef int (*enq_func_t)(union queue_entry_u *, odp_buffer_hdr_t *, int);
 typedef        odp_buffer_hdr_t *(*deq_func_t)(union queue_entry_u *);
 
 typedef int (*enq_multi_func_t)(union queue_entry_u *,
-                               odp_buffer_hdr_t **, int);
+                               odp_buffer_hdr_t **, int, int);
 typedef        int (*deq_multi_func_t)(union queue_entry_u *,
                                odp_buffer_hdr_t **, int);
 
@@ -77,6 +78,10 @@ struct queue_entry_s {
        odp_pktio_t       pktin;
        odp_pktio_t       pktout;
        char              name[ODP_QUEUE_NAME_LEN];
+       uint64_t          order_in;
+       uint64_t          order_out;
+       odp_buffer_hdr_t *reorder_head;
+       odp_buffer_hdr_t *reorder_tail;
 };
 
 typedef union queue_entry_u {
@@ -87,12 +92,20 @@ typedef union queue_entry_u {
 
 queue_entry_t *get_qentry(uint32_t queue_id);
 
-int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr);
+int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain);
 odp_buffer_hdr_t *queue_deq(queue_entry_t *queue);
 
-int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int 
num);
+int queue_enq_internal(odp_buffer_hdr_t *buf_hdr);
+
+int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num,
+                   int sustain);
 int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int 
num);
 
+int queue_pktout_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr,
+                    int sustain);
+int queue_pktout_enq_multi(queue_entry_t *queue,
+                          odp_buffer_hdr_t *buf_hdr[], int num, int sustain);
+
 int queue_enq_dummy(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr);
 int queue_enq_multi_dummy(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
                          int num);
@@ -104,6 +117,12 @@ void queue_unlock(queue_entry_t *queue);
 
 int queue_sched_atomic(odp_queue_t handle);
 
+int release_order(queue_entry_t *origin_qe, uint64_t order,
+                 odp_pool_t pool, int enq_called);
+void get_sched_order(queue_entry_t **origin_qe, uint64_t *order);
+void sched_enq_called(void);
+void sched_order_resolved(odp_buffer_hdr_t *buf_hdr);
+
 static inline uint32_t queue_to_id(odp_queue_t handle)
 {
        return _odp_typeval(handle) - 1;
@@ -127,6 +146,11 @@ static inline int queue_is_atomic(queue_entry_t *qe)
        return qe->s.param.sched.sync == ODP_SCHED_SYNC_ATOMIC;
 }
 
+static inline int queue_is_ordered(queue_entry_t *qe)
+{
+       return qe->s.param.sched.sync == ODP_SCHED_SYNC_ORDERED;
+}
+
 static inline odp_queue_t queue_handle(queue_entry_t *qe)
 {
        return qe->s.handle;
@@ -137,6 +161,151 @@ static inline int queue_prio(queue_entry_t *qe)
        return qe->s.param.sched.prio;
 }
 
+static inline void reorder_enq(queue_entry_t *queue,
+                              uint64_t order,
+                              queue_entry_t *origin_qe,
+                              odp_buffer_hdr_t *buf_hdr,
+                              int sustain)
+{
+       odp_buffer_hdr_t *reorder_buf = origin_qe->s.reorder_head;
+       odp_buffer_hdr_t *reorder_prev =
+               (odp_buffer_hdr_t *)&origin_qe->s.reorder_head;
+
+       while (reorder_buf && order >= reorder_buf->order) {
+               reorder_prev = reorder_buf;
+               reorder_buf  = reorder_buf->next;
+       }
+
+       buf_hdr->next = reorder_buf;
+       reorder_prev->next = buf_hdr;
+
+       if (!reorder_buf)
+               origin_qe->s.reorder_tail = buf_hdr;
+
+       buf_hdr->origin_qe     = origin_qe;
+       buf_hdr->target_qe     = queue;
+       buf_hdr->order         = order;
+       buf_hdr->flags.sustain = sustain;
+}
+
+static inline void order_release(queue_entry_t *origin_qe, int count)
+{
+       origin_qe->s.order_out += count;
+       odp_atomic_fetch_add_u64(&origin_qe->s.sync_out, count);
+}
+
+static inline int reorder_deq(queue_entry_t *queue,
+                             queue_entry_t *origin_qe,
+                             odp_buffer_hdr_t **reorder_buf_return,
+                             odp_buffer_hdr_t **reorder_prev_return,
+                             odp_buffer_hdr_t **placeholder_buf_return,
+                             int *release_count_return,
+                             int *placeholder_count_return)
+{
+       odp_buffer_hdr_t *reorder_buf     = origin_qe->s.reorder_head;
+       odp_buffer_hdr_t *reorder_prev    = NULL;
+       odp_buffer_hdr_t *placeholder_buf = NULL;
+       odp_buffer_hdr_t *next_buf;
+       int               deq_count = 0;
+       int               release_count = 0;
+       int               placeholder_count = 0;
+
+       while (reorder_buf &&
+              reorder_buf->order <= origin_qe->s.order_out +
+              release_count + placeholder_count) {
+               /*
+                * Elements on the reorder list fall into one of
+                * three categories:
+                *
+                * 1. Those destined for the same queue.  These
+                *    can be enq'd now if they were waiting to
+                *    be unblocked by this enq.
+                *
+                * 2. Those representing placeholders for events
+                *    whose ordering was released by a prior
+                *    odp_schedule_release_ordered() call.  These
+                *    can now just be freed.
+                *
+                * 3. Those representing events destined for another
+                *    queue. These cannot be consolidated with this
+                *    enq since they have a different target.
+                *
+                * Detecting an element with an order sequence gap, an
+                * element in category 3, or running out of elements
+                * stops the scan.
+                */
+               next_buf = reorder_buf->next;
+
+               if (odp_likely(reorder_buf->target_qe == queue)) {
+                       /* promote any chain */
+                       odp_buffer_hdr_t *reorder_link =
+                               reorder_buf->link;
+
+                       if (reorder_link) {
+                               reorder_buf->next = reorder_link;
+                               reorder_buf->link = NULL;
+                               while (reorder_link->next)
+                                       reorder_link = reorder_link->next;
+                               reorder_link->next = next_buf;
+                               reorder_prev = reorder_link;
+                       } else {
+                               reorder_prev = reorder_buf;
+                       }
+
+                       deq_count++;
+                       if (!reorder_buf->flags.sustain)
+                               release_count++;
+                       reorder_buf = next_buf;
+               } else if (!reorder_buf->target_qe) {
+                       if (reorder_prev)
+                               reorder_prev->next = next_buf;
+                       else
+                               origin_qe->s.reorder_head = next_buf;
+
+                       reorder_buf->next = placeholder_buf;
+                       placeholder_buf = reorder_buf;
+
+                       reorder_buf = next_buf;
+                       placeholder_count++;
+               } else {
+                       break;
+               }
+       }
+
+       *reorder_buf_return = reorder_buf;
+       *reorder_prev_return = reorder_prev;
+       *placeholder_buf_return = placeholder_buf;
+       *release_count_return = release_count;
+       *placeholder_count_return = placeholder_count;
+
+       return deq_count;
+}
+
+static inline int reorder_complete(odp_buffer_hdr_t *reorder_buf)
+{
+       odp_buffer_hdr_t *next_buf = reorder_buf->next;
+       uint64_t order = reorder_buf->order;
+
+       while (reorder_buf->flags.sustain &&
+              next_buf && next_buf->order == order) {
+               reorder_buf = next_buf;
+               next_buf = reorder_buf->next;
+       }
+
+       return !reorder_buf->flags.sustain;
+}
+
+static inline void get_queue_order(queue_entry_t **origin_qe, uint64_t *order,
+                                  odp_buffer_hdr_t *buf_hdr)
+{
+       if (buf_hdr && buf_hdr->origin_qe) {
+               *origin_qe = buf_hdr->origin_qe;
+               *order     = buf_hdr->order;
+       } else {
+               get_sched_order(origin_qe, order);
+       }
+}
+
 void queue_destroy_finalize(queue_entry_t *qe);
 
 #ifdef __cplusplus
diff --git a/platform/linux-generic/include/odp_schedule_internal.h 
b/platform/linux-generic/include/odp_schedule_internal.h
index 4c6577d..6f9cbdc 100644
--- a/platform/linux-generic/include/odp_schedule_internal.h
+++ b/platform/linux-generic/include/odp_schedule_internal.h
@@ -15,6 +15,7 @@ extern "C" {
 
 
 #include <odp/buffer.h>
+#include <odp_buffer_internal.h>
 #include <odp/queue.h>
 #include <odp/packet_io.h>
 #include <odp_queue_internal.h>
@@ -28,9 +29,8 @@ static inline int schedule_queue(const queue_entry_t *qe)
        return odp_queue_enq(qe->s.pri_queue, qe->s.cmd_ev);
 }
 
-
 int schedule_pktio_start(odp_pktio_t pktio, int prio);
-
+void odp_schedule_release_context(void);
 
 #ifdef __cplusplus
 }
diff --git a/platform/linux-generic/odp_classification.c 
b/platform/linux-generic/odp_classification.c
index fdb544d..6c1aff4 100644
--- a/platform/linux-generic/odp_classification.c
+++ b/platform/linux-generic/odp_classification.c
@@ -810,7 +810,7 @@ int packet_classifier(odp_pktio_t pktio, odp_packet_t pkt)
 
        /* Enqueuing the Packet based on the CoS */
        queue = cos->s.queue;
-       return queue_enq(queue, odp_buf_to_hdr((odp_buffer_t)pkt));
+       return queue_enq(queue, odp_buf_to_hdr((odp_buffer_t)pkt), 0);
 }
 
 cos_t *pktio_select_cos(pktio_entry_t *entry, uint8_t *pkt_addr,
diff --git a/platform/linux-generic/odp_packet_io.c 
b/platform/linux-generic/odp_packet_io.c
index 135e84f..866ae38 100644
--- a/platform/linux-generic/odp_packet_io.c
+++ b/platform/linux-generic/odp_packet_io.c
@@ -482,7 +482,7 @@ int pktout_deq_multi(queue_entry_t *qentry ODP_UNUSED,
 }
 
 int pktin_enqueue(queue_entry_t *qentry ODP_UNUSED,
-                 odp_buffer_hdr_t *buf_hdr ODP_UNUSED)
+                 odp_buffer_hdr_t *buf_hdr ODP_UNUSED, int sustain ODP_UNUSED)
 {
        ODP_ABORT("attempted enqueue to a pktin queue");
        return -1;
@@ -515,14 +515,14 @@ odp_buffer_hdr_t *pktin_dequeue(queue_entry_t *qentry)
                return NULL;
 
        if (j > 1)
-               queue_enq_multi(qentry, &tmp_hdr_tbl[1], j-1);
+               queue_enq_multi(qentry, &tmp_hdr_tbl[1], j - 1, 0);
        buf_hdr = tmp_hdr_tbl[0];
        return buf_hdr;
 }
 
 int pktin_enq_multi(queue_entry_t *qentry ODP_UNUSED,
                    odp_buffer_hdr_t *buf_hdr[] ODP_UNUSED,
-                   int num ODP_UNUSED)
+                   int num ODP_UNUSED, int sustain ODP_UNUSED)
 {
        ODP_ABORT("attempted enqueue to a pktin queue");
        return 0;
@@ -560,7 +560,7 @@ int pktin_deq_multi(queue_entry_t *qentry, odp_buffer_hdr_t 
*buf_hdr[], int num)
        }
 
        if (j)
-               queue_enq_multi(qentry, tmp_hdr_tbl, j);
+               queue_enq_multi(qentry, tmp_hdr_tbl, j, 0);
        return nbr;
 }
 
@@ -601,7 +601,7 @@ int pktin_poll(pktio_entry_t *entry)
        if (num_enq) {
                queue_entry_t *qentry;
                qentry = queue_to_qentry(entry->s.inq_default);
-               queue_enq_multi(qentry, hdr_tbl, num_enq);
+               queue_enq_multi(qentry, hdr_tbl, num_enq, 0);
        }
 
        return 0;
diff --git a/platform/linux-generic/odp_pool.c 
b/platform/linux-generic/odp_pool.c
index 14221fd..30d4b2b 100644
--- a/platform/linux-generic/odp_pool.c
+++ b/platform/linux-generic/odp_pool.c
@@ -514,6 +514,9 @@ odp_buffer_t buffer_alloc(odp_pool_t pool_hdl, size_t size)
        /* By default, buffers inherit their pool's zeroization setting */
        buf->buf.flags.zeroized = pool->s.flags.zeroized;
 
+       /* By default, buffers are not associated with an ordered queue */
+       buf->buf.origin_qe = NULL;
+
        if (buf->buf.type == ODP_EVENT_PACKET)
                packet_init(pool, &buf->pkt, size);
 
diff --git a/platform/linux-generic/odp_queue.c 
b/platform/linux-generic/odp_queue.c
index 4a0df11..09b0398 100644
--- a/platform/linux-generic/odp_queue.c
+++ b/platform/linux-generic/odp_queue.c
@@ -14,6 +14,7 @@
 #include <odp_buffer_inlines.h>
 #include <odp_internal.h>
 #include <odp/shared_memory.h>
+#include <odp/schedule.h>
 #include <odp_schedule_internal.h>
 #include <odp/config.h>
 #include <odp_packet_io_internal.h>
@@ -21,17 +22,20 @@
 #include <odp_debug_internal.h>
 #include <odp/hints.h>
 #include <odp/sync.h>
+#include <odp_spin_internal.h>
 
 #ifdef USE_TICKETLOCK
 #include <odp/ticketlock.h>
 #define LOCK(a)      odp_ticketlock_lock(a)
 #define UNLOCK(a)    odp_ticketlock_unlock(a)
 #define LOCK_INIT(a) odp_ticketlock_init(a)
+#define LOCK_TRY(a)  odp_ticketlock_trylock(a)
 #else
 #include <odp/spinlock.h>
 #define LOCK(a)      odp_spinlock_lock(a)
 #define UNLOCK(a)    odp_spinlock_unlock(a)
 #define LOCK_INIT(a) odp_spinlock_init(a)
+#define LOCK_TRY(a)  odp_spinlock_trylock(a)
 #endif
 
 #include <string.h>
@@ -73,9 +77,9 @@ static void queue_init(queue_entry_t *queue, const char *name,
                queue->s.dequeue_multi = pktin_deq_multi;
                break;
        case ODP_QUEUE_TYPE_PKTOUT:
-               queue->s.enqueue = pktout_enqueue;
+               queue->s.enqueue = queue_pktout_enq;
                queue->s.dequeue = pktout_dequeue;
-               queue->s.enqueue_multi = pktout_enq_multi;
+               queue->s.enqueue_multi = queue_pktout_enq_multi;
                queue->s.dequeue_multi = pktout_deq_multi;
                break;
        default:
@@ -89,6 +93,9 @@ static void queue_init(queue_entry_t *queue, const char *name,
        queue->s.head = NULL;
        queue->s.tail = NULL;
 
+       queue->s.reorder_head = NULL;
+       queue->s.reorder_tail = NULL;
+
        queue->s.pri_queue = ODP_QUEUE_INVALID;
        queue->s.cmd_ev    = ODP_EVENT_INVALID;
 }
@@ -326,33 +333,148 @@ odp_queue_t odp_queue_lookup(const char *name)
 }
 
 
-int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
+int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain)
 {
        int sched = 0;
+       queue_entry_t *origin_qe;
+       uint64_t order;
+       odp_buffer_hdr_t *buf_tail;
+
+       get_queue_order(&origin_qe, &order, buf_hdr);
+
+       /* Need two locks for enq operations from ordered queues */
+       if (origin_qe) {
+               LOCK(&origin_qe->s.lock);
+               while (!LOCK_TRY(&queue->s.lock)) {
+                       UNLOCK(&origin_qe->s.lock);
+                       LOCK(&origin_qe->s.lock);
+               }
+               if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY)) {
+                       UNLOCK(&queue->s.lock);
+                       UNLOCK(&origin_qe->s.lock);
+                       ODP_ERR("Bad origin queue status\n");
+                       ODP_ERR("queue = %s, origin q = %s, buf = %p\n",
+                               queue->s.name, origin_qe->s.name, buf_hdr);
+                       return -1;
+               }
+       } else {
+               LOCK(&queue->s.lock);
+       }
 
-       LOCK(&queue->s.lock);
        if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
                UNLOCK(&queue->s.lock);
+               if (origin_qe)
+                       UNLOCK(&origin_qe->s.lock);
                ODP_ERR("Bad queue status\n");
                return -1;
        }
 
-       if (queue->s.head == NULL) {
+       /* We can only complete the enq if we're in order */
+       if (origin_qe) {
+               sched_enq_called();
+               if (order > origin_qe->s.order_out) {
+                       reorder_enq(queue, order, origin_qe, buf_hdr, sustain);
+
+                       /* This enq can't complete until order is restored, so
+                        * we're done here.
+                        */
+                       UNLOCK(&queue->s.lock);
+                       UNLOCK(&origin_qe->s.lock);
+                       return 0;
+               }
+
+               /* We're in order, so account for this and proceed with enq */
+               if (!sustain) {
+                       order_release(origin_qe, 1);
+                       sched_order_resolved(buf_hdr);
+               }
+
+               /* if this element is linked, restore the linked chain */
+               buf_tail = buf_hdr->link;
+
+               if (buf_tail) {
+                       buf_hdr->next = buf_tail;
+                       buf_hdr->link = NULL;
+
+                       /* find end of the chain */
+                       while (buf_tail->next)
+                               buf_tail = buf_tail->next;
+               } else {
+                       buf_tail = buf_hdr;
+               }
+       } else {
+               buf_tail = buf_hdr;
+       }
+
+       if (!queue->s.head) {
                /* Empty queue */
                queue->s.head = buf_hdr;
-               queue->s.tail = buf_hdr;
-               buf_hdr->next = NULL;
+               queue->s.tail = buf_tail;
+               buf_tail->next = NULL;
        } else {
                queue->s.tail->next = buf_hdr;
-               queue->s.tail = buf_hdr;
-               buf_hdr->next = NULL;
+               queue->s.tail = buf_tail;
+               buf_tail->next = NULL;
        }
 
        if (queue->s.status == QUEUE_STATUS_NOTSCHED) {
                queue->s.status = QUEUE_STATUS_SCHED;
                sched = 1; /* retval: schedule queue */
        }
-       UNLOCK(&queue->s.lock);
+
+       /*
+        * If we came from an ordered queue, check to see if our successful
+        * enq has unblocked other buffers in the origin's reorder queue.
+        */
+       if (origin_qe) {
+               odp_buffer_hdr_t *reorder_buf;
+               odp_buffer_hdr_t *next_buf;
+               odp_buffer_hdr_t *reorder_prev;
+               odp_buffer_hdr_t *placeholder_buf;
+               int               deq_count, release_count, placeholder_count;
+
+               deq_count = reorder_deq(queue, origin_qe, &reorder_buf,
+                                       &reorder_prev, &placeholder_buf,
+                                       &release_count, &placeholder_count);
+
+               /* Add released buffers to the queue as well */
+               if (deq_count > 0) {
+                       queue->s.tail->next       = origin_qe->s.reorder_head;
+                       queue->s.tail             = reorder_prev;
+                       origin_qe->s.reorder_head = reorder_prev->next;
+                       reorder_prev->next        = NULL;
+               }
+
+               /* Reflect resolved orders in the output sequence */
+               order_release(origin_qe, release_count + placeholder_count);
+
+               /* Now handle any unblocked complete buffers destined for
+                * other queues.  Note that these must be complete because
+                * otherwise another thread is working on it and is
+                * responsible for resolving order when it is complete.
+                */
+               UNLOCK(&queue->s.lock);
+
+               if (reorder_buf &&
+                   reorder_buf->order <= origin_qe->s.order_out &&
+                   reorder_complete(reorder_buf))
+                       origin_qe->s.reorder_head = reorder_buf->next;
+               else
+                       reorder_buf = NULL;
+               UNLOCK(&origin_qe->s.lock);
+
+               if (reorder_buf)
+                       queue_enq_internal(reorder_buf);
+
+               /* Free all placeholder bufs that are now released */
+               while (placeholder_buf) {
+                       next_buf = placeholder_buf->next;
+                       odp_buffer_free(placeholder_buf->handle.handle);
+                       placeholder_buf = next_buf;
+               }
+       } else {
+               UNLOCK(&queue->s.lock);
+       }
 
        /* Add queue to scheduling */
        if (sched && schedule_queue(queue))
@@ -361,18 +483,31 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t 
*buf_hdr)
        return 0;
 }
 
-int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num)
+int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
+                   int num, int sustain)
 {
        int sched = 0;
-       int i;
+       int i, rc;
        odp_buffer_hdr_t *tail;
+       queue_entry_t *origin_qe;
+       uint64_t order;
 
+       /* Chain input buffers together */
        for (i = 0; i < num - 1; i++)
-               buf_hdr[i]->next = buf_hdr[i+1];
+               buf_hdr[i]->next = buf_hdr[i + 1];
 
-       tail = buf_hdr[num-1];
-       buf_hdr[num-1]->next = NULL;
+       tail = buf_hdr[num - 1];
+       buf_hdr[num - 1]->next = NULL;
 
+       /* Handle ordered enqueues commonly via links */
+       get_queue_order(&origin_qe, &order, buf_hdr[0]);
+       if (origin_qe) {
+               buf_hdr[0]->link = buf_hdr[0]->next;
+               rc = queue_enq(queue, buf_hdr[0], sustain);
+               return rc == 0 ? num : rc;
+       }
+
+       /* Handle unordered enqueues */
        LOCK(&queue->s.lock);
        if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
                UNLOCK(&queue->s.lock);
@@ -415,10 +550,9 @@ int odp_queue_enq_multi(odp_queue_t handle, const 
odp_event_t ev[], int num)
        for (i = 0; i < num; i++)
                buf_hdr[i] = odp_buf_to_hdr(odp_buffer_from_event(ev[i]));
 
-       return queue->s.enqueue_multi(queue, buf_hdr, num);
+       return num == 0 ? 0 : queue->s.enqueue_multi(queue, buf_hdr, num, 1);
 }
 
-
 int odp_queue_enq(odp_queue_t handle, odp_event_t ev)
 {
        odp_buffer_hdr_t *buf_hdr;
@@ -427,9 +561,17 @@ int odp_queue_enq(odp_queue_t handle, odp_event_t ev)
        queue   = queue_to_qentry(handle);
        buf_hdr = odp_buf_to_hdr(odp_buffer_from_event(ev));
 
-       return queue->s.enqueue(queue, buf_hdr);
+       /* No chains via this entry */
+       buf_hdr->link = NULL;
+
+       return queue->s.enqueue(queue, buf_hdr, 1);
 }
 
+int queue_enq_internal(odp_buffer_hdr_t *buf_hdr)
+{
+       return buf_hdr->origin_qe->s.enqueue(buf_hdr->target_qe, buf_hdr,
+                                            buf_hdr->flags.sustain);
+}
 
 odp_buffer_hdr_t *queue_deq(queue_entry_t *queue)
 {
@@ -450,6 +592,18 @@ odp_buffer_hdr_t *queue_deq(queue_entry_t *queue)
        queue->s.head = buf_hdr->next;
        buf_hdr->next = NULL;
 
+       /* Note that order should really be assigned on enq to an
+        * ordered queue rather than deq, however the logic is simpler
+        * to do it here and has the same effect.
+        */
+       if (queue_is_ordered(queue)) {
+               buf_hdr->origin_qe = queue;
+               buf_hdr->order = queue->s.order_in++;
+               buf_hdr->flags.sustain = 0;
+       } else {
+               buf_hdr->origin_qe = NULL;
+       }
+
        if (queue->s.head == NULL) {
                /* Queue is now empty */
                queue->s.tail = NULL;
@@ -489,6 +643,13 @@ int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t 
*buf_hdr[], int num)
                buf_hdr[i]       = hdr;
                hdr              = hdr->next;
                buf_hdr[i]->next = NULL;
+               if (queue_is_ordered(queue)) {
+                       buf_hdr[i]->origin_qe = queue;
+                       buf_hdr[i]->order     = queue->s.order_in++;
+                       buf_hdr[i]->flags.sustain = 0;
+               } else {
+                       buf_hdr[i]->origin_qe = NULL;
+               }
        }
 
        queue->s.head = hdr;
@@ -537,6 +698,170 @@ odp_event_t odp_queue_deq(odp_queue_t handle)
        return ODP_EVENT_INVALID;
 }
 
+int queue_pktout_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr,
+                    int sustain)
+{
+       queue_entry_t *origin_qe;
+       uint64_t order;
+       int rc;
+
+       /* Special processing needed only if we came from an ordered queue */
+       get_queue_order(&origin_qe, &order, buf_hdr);
+       if (!origin_qe)
+               return pktout_enqueue(queue, buf_hdr);
+
+       /* Must lock origin_qe for ordered processing */
+       LOCK(&origin_qe->s.lock);
+       if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY)) {
+               UNLOCK(&origin_qe->s.lock);
+               ODP_ERR("Bad origin queue status\n");
+               return -1;
+       }
+
+       /* We can only complete the enq if we're in order */
+       sched_enq_called();
+       if (order > origin_qe->s.order_out) {
+               reorder_enq(queue, order, origin_qe, buf_hdr, sustain);
+
+               /* This enq can't complete until order is restored, so
+                * we're done here.
+                */
+               UNLOCK(&origin_qe->s.lock);
+               return 0;
+       }
+
+       /* Perform our enq since we're in order.
+        * Note: Don't hold the origin_qe lock across an I/O operation!
+        */
+       UNLOCK(&origin_qe->s.lock);
+
+       /* Handle any chained buffers (internal calls) */
+       if (buf_hdr->link) {
+               odp_buffer_hdr_t *buf_hdrs[QUEUE_MULTI_MAX];
+               odp_buffer_hdr_t *next_buf;
+               int num = 0;
+
+               next_buf = buf_hdr->link;
+               buf_hdr->link = NULL;
+
+               while (next_buf) {
+                       buf_hdrs[num++] = next_buf;
+                       next_buf = next_buf->next;
+               }
+
+               rc = pktout_enq_multi(queue, buf_hdrs, num);
+               if (rc < num)
+                       return -1;
+       } else {
+               rc = pktout_enqueue(queue, buf_hdr);
+               if (!rc)
+                       return rc;
+       }
+
+       /* Reacquire the lock following the I/O send. Note that we're still
+        * guaranteed to be in order here since we haven't released
+        * order yet.
+        */
+       LOCK(&origin_qe->s.lock);
+       if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY)) {
+               UNLOCK(&origin_qe->s.lock);
+               ODP_ERR("Bad origin queue status\n");
+               return -1;
+       }
+
+       /* Account for this ordered enq */
+       if (!sustain) {
+               order_release(origin_qe, 1);
+               sched_order_resolved(NULL);
+       }
+
+       /* Now check to see if our successful enq has unblocked other buffers
+        * in the origin's reorder queue.
+        */
+       odp_buffer_hdr_t *reorder_buf;
+       odp_buffer_hdr_t *next_buf;
+       odp_buffer_hdr_t *reorder_prev;
+       odp_buffer_hdr_t *xmit_buf;
+       odp_buffer_hdr_t *placeholder_buf;
+       int               deq_count, release_count, placeholder_count;
+
+       deq_count = reorder_deq(queue, origin_qe,
+                               &reorder_buf, &reorder_prev, &placeholder_buf,
+                               &release_count, &placeholder_count);
+
+       /* Send released buffers as well */
+       if (deq_count > 0) {
+               xmit_buf = origin_qe->s.reorder_head;
+               origin_qe->s.reorder_head = reorder_prev->next;
+               reorder_prev->next = NULL;
+               UNLOCK(&origin_qe->s.lock);
+
+               do {
+                       next_buf = xmit_buf->next;
+                       pktout_enqueue(queue, xmit_buf);
+                       xmit_buf = next_buf;
+               } while (xmit_buf);
+
+               /* Reacquire the origin_qe lock to continue */
+               LOCK(&origin_qe->s.lock);
+               if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY)) {
+                       UNLOCK(&origin_qe->s.lock);
+                       ODP_ERR("Bad origin queue status\n");
+                       return -1;
+               }
+       }
+
+       /* Update the order sequence to reflect the deq'd elements */
+       order_release(origin_qe, release_count + placeholder_count);
+
+       /* Now handle sends to other queues that are ready to go */
+       if (reorder_buf &&
+           reorder_buf->order <= origin_qe->s.order_out &&
+           reorder_complete(reorder_buf))
+               origin_qe->s.reorder_head = reorder_buf->next;
+       else
+               reorder_buf = NULL;
+
+       /* We're fully done with the origin_qe at last */
+       UNLOCK(&origin_qe->s.lock);
+
+       /* Now send the next buffer to its target queue */
+       if (reorder_buf)
+               queue_enq_internal(reorder_buf);
+
+       /* Free all placeholder bufs that are now released */
+       while (placeholder_buf) {
+               next_buf = placeholder_buf->next;
+               odp_buffer_free(placeholder_buf->handle.handle);
+               placeholder_buf = next_buf;
+       }
+
+       return 0;
+}
+
+int queue_pktout_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
+                          int num, int sustain)
+{
+       int i, rc;
+       queue_entry_t *origin_qe;
+       uint64_t order;
+
+       /* If we're not ordered, handle directly */
+       get_queue_order(&origin_qe, &order, buf_hdr[0]);
+       if (!origin_qe)
+               return pktout_enq_multi(queue, buf_hdr, num);
+
+       /* Chain input buffers together */
+       for (i = 0; i < num - 1; i++)
+               buf_hdr[i]->next = buf_hdr[i + 1];
+
+       buf_hdr[num - 1]->next = NULL;
+
+       /* Handle commonly via links */
+       buf_hdr[0]->link = buf_hdr[0]->next;
+       rc = queue_pktout_enq(queue, buf_hdr[0], sustain);
+       return rc == 0 ? num : rc;
+}
 
 void queue_lock(queue_entry_t *queue)
 {
@@ -553,3 +878,85 @@ void odp_queue_param_init(odp_queue_param_t *params)
 {
        memset(params, 0, sizeof(odp_queue_param_t));
 }
+
+/* These routines exists here rather than in odp_schedule
+ * because they operate on queue interenal structures
+ */
+int release_order(queue_entry_t *origin_qe, uint64_t order,
+                 odp_pool_t pool, int enq_called)
+{
+       odp_buffer_t placeholder_buf;
+       odp_buffer_hdr_t *placeholder_buf_hdr, *reorder_buf, *next_buf;
+
+       /* Must tlock the origin queue to process the release */
+       LOCK(&origin_qe->s.lock);
+
+       /* If we are in the order we can release immediately since there can
+        * be no confusion about intermediate elements
+        */
+       if (order <= origin_qe->s.order_out) {
+               order_release(origin_qe, 1);
+
+               /* Check if this release allows us to unblock waiters.
+                * Note that we can only process complete waiters since
+                * if the sustain bit is set for a buffer this means that
+                * some other thread is working on it and will be
+                * responsible for resolving order when it is complete.
+                */
+               reorder_buf = origin_qe->s.reorder_head;
+
+               if (reorder_buf &&
+                   reorder_buf->order <= origin_qe->s.order_out &&
+                   reorder_complete(reorder_buf))
+                       origin_qe->s.reorder_head = reorder_buf->next;
+               else
+                       reorder_buf = NULL;
+
+               UNLOCK(&origin_qe->s.lock);
+               if (reorder_buf)
+                       queue_enq_internal(reorder_buf);
+               return 0;
+       }
+
+       /* If we are not in order we need a placeholder to represent our
+        * "place in line" unless we have issued enqs, in which case we
+        * already have a place in the reorder queue. If we need a
+        * placeholder, use an element from the same pool we were scheduled
+        * with is from, otherwise just ensure that the final element for our
+        * order is not marked sustain.
+        */
+       if (enq_called) {
+               reorder_buf = NULL;
+               next_buf    = origin_qe->s.reorder_head;
+
+               while (next_buf && next_buf->order <= order) {
+                       reorder_buf = next_buf;
+                       next_buf = next_buf->next;
+               }
+
+               if (reorder_buf && reorder_buf->order == order) {
+                       reorder_buf->flags.sustain = 0;
+                       return 0;
+               }
+       }
+
+       placeholder_buf = odp_buffer_alloc(pool);
+
+       /* Can't release if no placeholder is available */
+       if (odp_unlikely(placeholder_buf == ODP_BUFFER_INVALID)) {
+               UNLOCK(&origin_qe->s.lock);
+               return -1;
+       }
+
+       placeholder_buf_hdr = odp_buf_to_hdr(placeholder_buf);
+
+       /* Copy info to placeholder and add it to the reorder queue */
+       placeholder_buf_hdr->origin_qe     = origin_qe;
+       placeholder_buf_hdr->order         = order;
+       placeholder_buf_hdr->flags.sustain = 0;
+
+       reorder_enq(NULL, order, origin_qe, placeholder_buf_hdr, 0);
+
+       UNLOCK(&origin_qe->s.lock);
+       return 0;
+}
diff --git a/platform/linux-generic/odp_schedule.c 
b/platform/linux-generic/odp_schedule.c
index c195ba5..fdf522f 100644
--- a/platform/linux-generic/odp_schedule.c
+++ b/platform/linux-generic/odp_schedule.c
@@ -82,6 +82,10 @@ typedef struct {
 
        odp_buffer_hdr_t *buf_hdr[MAX_DEQ];
        queue_entry_t *qe;
+       queue_entry_t *origin_qe;
+       uint64_t order;
+       odp_pool_t pool;
+       int enq_called;
        int num;
        int index;
        int pause;
@@ -99,16 +103,10 @@ odp_thrmask_t *thread_sched_grp_mask(int index);
 
 static void sched_local_init(void)
 {
-       int i;
-
        memset(&sched_local, 0, sizeof(sched_local_t));
 
        sched_local.pri_queue = ODP_QUEUE_INVALID;
        sched_local.cmd_ev    = ODP_EVENT_INVALID;
-       sched_local.qe        = NULL;
-
-       for (i = 0; i < MAX_DEQ; i++)
-               sched_local.buf_hdr[i] = NULL;
 }
 
 int odp_schedule_init_global(void)
@@ -260,7 +258,7 @@ int odp_schedule_term_local(void)
                return -1;
        }
 
-       odp_schedule_release_atomic();
+       odp_schedule_release_context();
 
        sched_local_init();
        return 0;
@@ -392,6 +390,27 @@ void odp_schedule_release_atomic(void)
        }
 }
 
+void odp_schedule_release_ordered(void)
+{
+       if (sched_local.origin_qe) {
+               int rc = release_order(sched_local.origin_qe,
+                                      sched_local.order,
+                                      sched_local.pool,
+                                      sched_local.enq_called);
+               if (rc == 0)
+                       sched_local.origin_qe = NULL;
+       }
+}
+
+void odp_schedule_release_context(void)
+{
+       if (sched_local.origin_qe) {
+               release_order(sched_local.origin_qe, sched_local.order,
+                             sched_local.pool, sched_local.enq_called);
+               sched_local.origin_qe = NULL;
+       } else
+               odp_schedule_release_atomic();
+}
 
 static inline int copy_events(odp_event_t out_ev[], unsigned int max)
 {
@@ -409,11 +428,8 @@ static inline int copy_events(odp_event_t out_ev[], 
unsigned int max)
        return i;
 }
 
-
 /*
  * Schedule queues
- *
- * TODO: SYNC_ORDERED not implemented yet
  */
 static int schedule(odp_queue_t *out_queue, odp_event_t out_ev[],
                    unsigned int max_num, unsigned int max_deq)
@@ -431,7 +447,7 @@ static int schedule(odp_queue_t *out_queue, odp_event_t 
out_ev[],
                return ret;
        }
 
-       odp_schedule_release_atomic();
+       odp_schedule_release_context();
 
        if (odp_unlikely(sched_local.pause))
                return 0;
@@ -498,6 +514,13 @@ static int schedule(odp_queue_t *out_queue, odp_event_t 
out_ev[],
                                        ODP_ABORT("schedule failed\n");
                                continue;
                        }
+
+                       /* For ordered queues we want consecutive events to
+                        * be dispatched to separate threads, so do not cache
+                        * them locally.
+                        */
+                       if (queue_is_ordered(qe))
+                               max_deq = 1;
                        num = queue_deq_multi(qe, sched_local.buf_hdr, max_deq);
 
                        if (num < 0) {
@@ -516,7 +539,16 @@ static int schedule(odp_queue_t *out_queue, odp_event_t 
out_ev[],
                        sched_local.qe    = qe;
                        ret = copy_events(out_ev, max_num);
 
-                       if (queue_is_atomic(qe)) {
+                       if (queue_is_ordered(qe)) {
+                               sched_local.origin_qe = qe;
+                               sched_local.order =
+                                       sched_local.buf_hdr[0]->order;
+                               sched_local.sync =
+                                       sched_local.buf_hdr[0]->sync;
+                               sched_local.enq_called = 0;
+                               if (odp_queue_enq(pri_q, ev))
+                                       ODP_ABORT("schedule failed\n");
+                       } else if (queue_is_atomic(qe)) {
                                /* Hold queue during atomic access */
                                sched_local.pri_queue = pri_q;
                                sched_local.cmd_ev    = ev;
@@ -747,3 +779,21 @@ int odp_schedule_group_thrmask(odp_schedule_group_t group,
        odp_spinlock_unlock(&sched->grp_lock);
        return ret;
 }
+
+void sched_enq_called(void)
+{
+       sched_local.enq_called = 1;
+}
+
+void get_sched_order(queue_entry_t **origin_qe, uint64_t *order)
+{
+       *origin_qe = sched_local.origin_qe;
+       *order     = sched_local.order;
+}
+
+void sched_order_resolved(odp_buffer_hdr_t *buf_hdr)
+{
+       if (buf_hdr)
+               buf_hdr->origin_qe = NULL;
+       sched_local.origin_qe = NULL;
+}
diff --git a/platform/linux-generic/pktio/loop.c 
b/platform/linux-generic/pktio/loop.c
index 188a9ee..8cf4034 100644
--- a/platform/linux-generic/pktio/loop.c
+++ b/platform/linux-generic/pktio/loop.c
@@ -76,7 +76,7 @@ static int loopback_send(pktio_entry_t *pktio_entry, 
odp_packet_t pkt_tbl[],
                hdr_tbl[i] = odp_buf_to_hdr(_odp_packet_to_buffer(pkt_tbl[i]));
 
        qentry = queue_to_qentry(pktio_entry->s.pkt_loop.loopq);
-       return queue_enq_multi(qentry, hdr_tbl, len);
+       return queue_enq_multi(qentry, hdr_tbl, len, 0);
 }
 
 static int loopback_mtu_get(pktio_entry_t *pktio_entry ODP_UNUSED)
diff --git a/test/validation/scheduler/scheduler.c 
b/test/validation/scheduler/scheduler.c
index 1e9a669..9e40bf4 100644
--- a/test/validation/scheduler/scheduler.c
+++ b/test/validation/scheduler/scheduler.c
@@ -157,6 +157,7 @@ void scheduler_test_queue_destroy(void)
                CU_ASSERT_FATAL(u32[0] == MAGIC);
 
                odp_buffer_free(buf);
+               odp_schedule_release_ordered();
 
                CU_ASSERT_FATAL(odp_queue_destroy(queue) == 0);
        }
@@ -341,6 +342,9 @@ void scheduler_test_groups(void)
                rc = odp_schedule_group_leave(mygrp1, &mymask);
                CU_ASSERT_FATAL(rc == 0);
 
+               /* We must release order before destroying queues */
+               odp_schedule_release_ordered();
+
                /* Done with queues for this round */
                CU_ASSERT_FATAL(odp_queue_destroy(queue_grp1) == 0);
                CU_ASSERT_FATAL(odp_queue_destroy(queue_grp2) == 0);
@@ -384,6 +388,7 @@ static void *schedule_common_(void *arg)
                        CU_ASSERT(num <= BURST_BUF_SIZE);
                        if (num == 0)
                                continue;
+
                        for (j = 0; j < num; j++)
                                odp_event_free(events[j]);
                } else {
@@ -413,6 +418,9 @@ static void *schedule_common_(void *arg)
                if (sync == ODP_SCHED_SYNC_ATOMIC)
                        odp_schedule_release_atomic();
 
+               if (sync == ODP_SCHED_SYNC_ORDERED)
+                       odp_schedule_release_ordered();
+
                odp_ticketlock_lock(&globals->lock);
                globals->buf_count -= num;
 
-- 
2.1.4

_______________________________________________
lng-odp mailing list
[email protected]
https://lists.linaro.org/mailman/listinfo/lng-odp

Reply via email to