Extend scheduler CUnit test to add explicit tests that ordered queues
perform reorder processing properly.

This fixes Bug https://bugs.linaro.org/show_bug.cgi?id=1824

Signed-off-by: Bill Fischofer <[email protected]>
---
 test/validation/scheduler/scheduler.c | 138 ++++++++++++++++++++++++++++++++--
 1 file changed, 130 insertions(+), 8 deletions(-)

diff --git a/test/validation/scheduler/scheduler.c 
b/test/validation/scheduler/scheduler.c
index 89c1099..7090e75 100644
--- a/test/validation/scheduler/scheduler.c
+++ b/test/validation/scheduler/scheduler.c
@@ -44,6 +44,7 @@ typedef struct {
        int num_workers;
        odp_barrier_t barrier;
        int buf_count;
+       int buf_count_cpy;
        odp_ticketlock_t lock;
        odp_spinlock_t atomic_lock;
 } test_globals_t;
@@ -63,10 +64,12 @@ typedef struct {
 typedef struct {
        uint64_t sequence;
        uint64_t lock_sequence[ODP_CONFIG_MAX_ORDERED_LOCKS_PER_QUEUE];
+       uint64_t output_sequence;
 } buf_contents;
 
 typedef struct {
        odp_buffer_t ctx_handle;
+       odp_queue_t pq_handle;
        uint64_t sequence;
        uint64_t lock_sequence[ODP_CONFIG_MAX_ORDERED_LOCKS_PER_QUEUE];
 } queue_context;
@@ -384,17 +387,21 @@ static void *schedule_common_(void *arg)
        odp_schedule_sync_t sync;
        test_globals_t *globals;
        queue_context *qctx;
-       buf_contents *bctx;
+       buf_contents *bctx, *bctx_cpy;
+       odp_pool_t pool;
 
        globals = args->globals;
        sync = args->sync;
 
+       pool = odp_pool_lookup(MSG_POOL_NAME);
+       CU_ASSERT_FATAL(pool != ODP_POOL_INVALID);
+
        if (args->num_workers > 1)
                odp_barrier_wait(&globals->barrier);
 
        while (1) {
                odp_event_t ev;
-               odp_buffer_t buf;
+               odp_buffer_t buf, buf_cpy;
                odp_queue_t from = ODP_QUEUE_INVALID;
                int num = 0;
                int locked;
@@ -407,7 +414,9 @@ static void *schedule_common_(void *arg)
                odp_ticketlock_unlock(&globals->lock);
 
                if (args->enable_schd_multi) {
-                       odp_event_t events[BURST_BUF_SIZE];
+                       odp_event_t events[BURST_BUF_SIZE],
+                               ev_cpy[BURST_BUF_SIZE];
+                       odp_buffer_t buf_cpy[BURST_BUF_SIZE];
                        int j;
                        num = odp_schedule_multi(&from, ODP_SCHED_NO_WAIT,
                                                 events, BURST_BUF_SIZE);
@@ -419,11 +428,33 @@ static void *schedule_common_(void *arg)
                        if (sync == ODP_SCHED_SYNC_ORDERED) {
                                uint32_t ndx;
                                uint32_t ndx_max = odp_queue_lock_count(from);
+                               int rc;
 
                                qctx = odp_queue_context(from);
+
+                               for (j = 0; j < num; j++) {
+                                       bctx = odp_buffer_addr(
+                                               odp_buffer_from_event
+                                               (events[j]));
+
+                                       buf_cpy[j] = odp_buffer_alloc(pool);
+                                       CU_ASSERT_FATAL(buf_cpy[j] !=
+                                                       ODP_BUFFER_INVALID);
+                                       bctx_cpy = odp_buffer_addr(buf_cpy[j]);
+                                       memcpy(bctx_cpy, bctx,
+                                              sizeof(buf_contents));
+                                       bctx_cpy->output_sequence =
+                                               bctx_cpy->sequence;
+                                       ev_cpy[j] =
+                                               odp_buffer_to_event(buf_cpy[j]);
+                               }
+
+                               rc = odp_queue_enq_multi(qctx->pq_handle,
+                                                        ev_cpy, num);
+                               CU_ASSERT(rc == num);
+
                                bctx = odp_buffer_addr(
                                        odp_buffer_from_event(events[0]));
-
                                for (ndx = 0; ndx < ndx_max; ndx++) {
                                        odp_schedule_order_lock(ndx);
                                        CU_ASSERT(bctx->sequence ==
@@ -444,9 +475,20 @@ static void *schedule_common_(void *arg)
                        if (sync == ODP_SCHED_SYNC_ORDERED) {
                                uint32_t ndx;
                                uint32_t ndx_max = odp_queue_lock_count(from);
+                               int rc;
 
                                qctx = odp_queue_context(from);
                                bctx = odp_buffer_addr(buf);
+                               buf_cpy = odp_buffer_alloc(pool);
+                               CU_ASSERT_FATAL(buf_cpy != ODP_BUFFER_INVALID);
+                               bctx_cpy = odp_buffer_addr(buf_cpy);
+                               memcpy(bctx_cpy, bctx, sizeof(buf_contents));
+                               bctx_cpy->output_sequence = bctx_cpy->sequence;
+
+                               rc = odp_queue_enq(qctx->pq_handle,
+                                                  odp_buffer_to_event
+                                                  (buf_cpy));
+                               CU_ASSERT(rc == 0);
 
                                for (ndx = 0; ndx < ndx_max; ndx++) {
                                        odp_schedule_order_lock(ndx);
@@ -456,6 +498,7 @@ static void *schedule_common_(void *arg)
                                        odp_schedule_order_unlock(ndx);
                                }
                        }
+
                        odp_buffer_free(buf);
                }
 
@@ -491,6 +534,52 @@ static void *schedule_common_(void *arg)
                odp_ticketlock_unlock(&globals->lock);
        }
 
+       if (args->num_workers > 1)
+               odp_barrier_wait(&globals->barrier);
+
+       if (sync == ODP_SCHED_SYNC_ORDERED &&
+           odp_ticketlock_trylock(&globals->lock) &&
+           globals->buf_count_cpy > 0) {
+               odp_event_t ev;
+               odp_queue_t pq;
+               uint64_t seq;
+               uint64_t bcount = 0;
+               int i, j;
+               char name[32];
+               uint64_t num_bufs = args->num_bufs;
+               uint64_t buf_count = globals->buf_count_cpy;
+
+               for (i = 0; i < args->num_prio; i++) {
+                       for (j = 0; j < args->num_queues; j++) {
+                               snprintf(name, sizeof(name),
+                                        "poll_%d_%d_o", i, j);
+                               pq = odp_queue_lookup(name);
+                               CU_ASSERT_FATAL(pq != ODP_QUEUE_INVALID);
+
+                               seq = 0;
+                               while (1) {
+                                       ev = odp_queue_deq(pq);
+
+                                       if (ev == ODP_EVENT_INVALID) {
+                                               CU_ASSERT(seq == num_bufs);
+                                               break;
+                                       }
+
+                                       bctx = odp_buffer_addr(
+                                               odp_buffer_from_event(ev));
+
+                                       CU_ASSERT(bctx->sequence == seq);
+                                       seq++;
+                                       bcount++;
+                                       odp_event_free(ev);
+                               }
+                       }
+               }
+               CU_ASSERT(bcount == buf_count);
+               globals->buf_count_cpy = 0;
+               odp_ticketlock_unlock(&globals->lock);
+       }
+
        return NULL;
 }
 
@@ -559,6 +648,7 @@ static void fill_queues(thread_args_t *args)
        }
 
        globals->buf_count = buf_count;
+       globals->buf_count_cpy = buf_count;
 }
 
 static void reset_queues(thread_args_t *args)
@@ -915,13 +1005,13 @@ static int create_queues(void)
        int i, j, prios, rc;
        odp_pool_param_t params;
        odp_buffer_t queue_ctx_buf;
-       queue_context *qctx;
+       queue_context *qctx, *pqctx;
        uint32_t ndx;
 
        prios = odp_schedule_num_prio();
        odp_pool_param_init(&params);
        params.buf.size = sizeof(queue_context);
-       params.buf.num  = prios * QUEUES_PER_PRIO;
+       params.buf.num  = prios * QUEUES_PER_PRIO * 2;
        params.type     = ODP_POOL_BUFFER;
 
        queue_ctx_pool = odp_pool_create(QUEUE_CTX_POOL_NAME, &params);
@@ -939,7 +1029,7 @@ static int create_queues(void)
                for (j = 0; j < QUEUES_PER_PRIO; j++) {
                        /* Per sched sync type */
                        char name[32];
-                       odp_queue_t q;
+                       odp_queue_t q, pq;
 
                        snprintf(name, sizeof(name), "sched_%d_%d_n", i, j);
                        p.sched.sync = ODP_SCHED_SYNC_NONE;
@@ -959,6 +1049,31 @@ static int create_queues(void)
                                return -1;
                        }
 
+                       snprintf(name, sizeof(name), "poll_%d_%d_o", i, j);
+                       pq = odp_queue_create(name, ODP_QUEUE_TYPE_POLL, NULL);
+                       if (pq == ODP_QUEUE_INVALID) {
+                               printf("Poll queue create failed.\n");
+                               return -1;
+                       }
+
+                       queue_ctx_buf = odp_buffer_alloc(queue_ctx_pool);
+
+                       if (queue_ctx_buf == ODP_BUFFER_INVALID) {
+                               printf("Cannot allocate poll queue ctx buf\n");
+                               return -1;
+                       }
+
+                       pqctx = odp_buffer_addr(queue_ctx_buf);
+                       pqctx->ctx_handle = queue_ctx_buf;
+                       pqctx->sequence = 0;
+
+                       rc = odp_queue_context_set(pq, pqctx);
+
+                       if (rc != 0) {
+                               printf("Cannot set poll queue context\n");
+                               return -1;
+                       }
+
                        snprintf(name, sizeof(name), "sched_%d_%d_o", i, j);
                        p.sched.sync = ODP_SCHED_SYNC_ORDERED;
                        p.sched.lock_count =
@@ -988,6 +1103,7 @@ static int create_queues(void)
 
                        qctx = odp_buffer_addr(queue_ctx_buf);
                        qctx->ctx_handle = queue_ctx_buf;
+                       qctx->pq_handle = pq;
                        qctx->sequence = 0;
 
                        for (ndx = 0;
@@ -1105,11 +1221,17 @@ static int destroy_queues(void)
                        snprintf(name, sizeof(name), "sched_%d_%d_o", i, j);
                        if (destroy_queue(name) != 0)
                                return -1;
+
+                       snprintf(name, sizeof(name), "poll_%d_%d_o", i, j);
+                       if (destroy_queue(name) != 0)
+                               return -1;
                }
        }
 
-       if (odp_pool_destroy(queue_ctx_pool) != 0)
+       if (odp_pool_destroy(queue_ctx_pool) != 0) {
+               fprintf(stderr, "error: failed to destroy queue ctx pool\n");
                return -1;
+       }
 
        return 0;
 }
-- 
2.1.4

_______________________________________________
lng-odp mailing list
[email protected]
https://lists.linaro.org/mailman/listinfo/lng-odp

Reply via email to