Add new implementation for ordered queues. Compared to the old implementation this is much simpler and improves performance ~1-4x depending on the test case.
The implementation is based on an atomic ordered context, which only a single thread may possess at a time. Only the thread owning the atomic context may do enqueue(s) from the ordered queue. All other threads put their enqueued events to a thread local enqueue stash (ordered_stash_t). All stashed enqueue operations will be performed in the original order when the thread acquires the ordered context. If the ordered stash becomes full, the enqueue blocks. At the latest a thread blocks when the ev_stash is empty and the thread tries to release the order context. Signed-off-by: Matias Elo <matias....@nokia.com> --- .../linux-generic/include/odp_queue_internal.h | 5 + platform/linux-generic/odp_queue.c | 14 +- platform/linux-generic/odp_schedule.c | 171 +++++++++++++++++++-- 3 files changed, 172 insertions(+), 18 deletions(-) diff --git a/platform/linux-generic/include/odp_queue_internal.h b/platform/linux-generic/include/odp_queue_internal.h index df36b76..b905bd8 100644 --- a/platform/linux-generic/include/odp_queue_internal.h +++ b/platform/linux-generic/include/odp_queue_internal.h @@ -56,6 +56,11 @@ struct queue_entry_s { odp_buffer_hdr_t *tail; int status; + struct { + odp_atomic_u64_t ctx; /**< Current ordered context id */ + odp_atomic_u64_t next_ctx; /**< Next unallocated context id */ + } ordered ODP_ALIGNED_CACHE; + enq_func_t enqueue ODP_ALIGNED_CACHE; deq_func_t dequeue; enq_multi_func_t enqueue_multi; diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c index 99c91e7..4c7f497 100644 --- a/platform/linux-generic/odp_queue.c +++ b/platform/linux-generic/odp_queue.c @@ -73,9 +73,14 @@ static int queue_init(queue_entry_t *queue, const char *name, if (queue->s.param.sched.lock_count > sched_fn->max_ordered_locks()) return -1; - if (param->type == ODP_QUEUE_TYPE_SCHED) + if (param->type == ODP_QUEUE_TYPE_SCHED) { queue->s.param.deq_mode = ODP_QUEUE_OP_DISABLED; + if (param->sched.sync == ODP_SCHED_SYNC_ORDERED) { + odp_atomic_init_u64(&queue->s.ordered.ctx, 0); + odp_atomic_init_u64(&queue->s.ordered.next_ctx, 0); + } + } queue->s.type = queue->s.param.type; queue->s.enqueue = queue_enq; @@ -301,6 +306,13 @@ int odp_queue_destroy(odp_queue_t handle) ODP_ERR("queue \"%s\" not empty\n", queue->s.name); return -1; } + if (queue_is_ordered(queue) && + odp_atomic_load_u64(&queue->s.ordered.ctx) != + odp_atomic_load_u64(&queue->s.ordered.next_ctx)) { + UNLOCK(&queue->s.lock); + ODP_ERR("queue \"%s\" reorder incomplete\n", queue->s.name); + return -1; + } switch (queue->s.status) { case QUEUE_STATUS_READY: diff --git a/platform/linux-generic/odp_schedule.c b/platform/linux-generic/odp_schedule.c index 5bc274f..4b33513 100644 --- a/platform/linux-generic/odp_schedule.c +++ b/platform/linux-generic/odp_schedule.c @@ -111,11 +111,21 @@ ODP_STATIC_ASSERT((8 * sizeof(pri_mask_t)) >= QUEUES_PER_PRIO, #define MAX_DEQ CONFIG_BURST_SIZE /* Maximum number of ordered locks per queue */ -#define MAX_ORDERED_LOCKS_PER_QUEUE 1 +#define MAX_ORDERED_LOCKS_PER_QUEUE 2 ODP_STATIC_ASSERT(MAX_ORDERED_LOCKS_PER_QUEUE <= CONFIG_QUEUE_MAX_ORD_LOCKS, "Too_many_ordered_locks"); +/* Ordered stash size */ +#define MAX_ORDERED_STASH 512 + +/* Storage for stashed enqueue operation arguments */ +typedef struct { + odp_buffer_hdr_t *buf_hdr[QUEUE_MULTI_MAX]; + queue_entry_t *queue; + int num; +} ordered_stash_t; + /* Scheduler local data */ typedef struct { int thr; @@ -128,7 +138,15 @@ typedef struct { uint32_t queue_index; odp_queue_t queue; odp_event_t ev_stash[MAX_DEQ]; - void *queue_entry; + struct { + queue_entry_t *src_queue; /**< Source queue entry */ + uint64_t ctx; /**< Ordered context id */ + /** Storage for stashed enqueue operations */ + ordered_stash_t stash[MAX_ORDERED_STASH]; + int stash_num; /**< Number of stashed enqueue operations */ + uint8_t in_order; /**< Order status */ + } ordered; + } sched_local_t; /* Priority queue */ @@ -491,17 +509,81 @@ static void schedule_release_atomic(void) } } +static inline int ordered_own_turn(queue_entry_t *queue) +{ + uint64_t ctx; + + ctx = odp_atomic_load_acq_u64(&queue->s.ordered.ctx); + + return ctx == sched_local.ordered.ctx; +} + +static inline void wait_for_order(queue_entry_t *queue) +{ + /* Busy loop to synchronize ordered processing */ + while (1) { + if (ordered_own_turn(queue)) + break; + odp_cpu_pause(); + } +} + +/** + * Perform stashed enqueue operations + * + * Should be called only when already in order. + */ +static inline void ordered_stash_release(void) +{ + int i; + + for (i = 0; i < sched_local.ordered.stash_num; i++) { + queue_entry_t *queue; + odp_buffer_hdr_t **buf_hdr; + int num; + + queue = sched_local.ordered.stash[i].queue; + buf_hdr = sched_local.ordered.stash[i].buf_hdr; + num = sched_local.ordered.stash[i].num; + + queue_enq_multi(queue, buf_hdr, num); + } + sched_local.ordered.stash_num = 0; +} + +static inline void release_ordered(void) +{ + queue_entry_t *queue; + + queue = sched_local.ordered.src_queue; + + wait_for_order(queue); + + sched_local.ordered.src_queue = NULL; + sched_local.ordered.in_order = 0; + + ordered_stash_release(); + + /* Next thread can continue processing */ + odp_atomic_add_rel_u64(&queue->s.ordered.ctx, 1); +} + static void schedule_release_ordered(void) { - /* Process ordered queue as atomic */ - schedule_release_atomic(); - sched_local.queue_entry = NULL; + queue_entry_t *queue; + + queue = sched_local.ordered.src_queue; + + if (odp_unlikely(!queue || sched_local.num)) + return; + + release_ordered(); } static inline void schedule_release_context(void) { - if (sched_local.queue_entry != NULL) - schedule_release_ordered(); + if (sched_local.ordered.src_queue != NULL) + release_ordered(); else schedule_release_atomic(); } @@ -524,13 +606,41 @@ static inline int copy_events(odp_event_t out_ev[], unsigned int max) static int schedule_ord_enq_multi(uint32_t queue_index, void *buf_hdr[], int num, int *ret) { - (void)queue_index; - (void)buf_hdr; - (void)num; - (void)ret; + int i; + uint32_t stash_num = sched_local.ordered.stash_num; + queue_entry_t *dst_queue = get_qentry(queue_index); + queue_entry_t *src_queue = sched_local.ordered.src_queue; - /* didn't consume the events */ - return 0; + if (!sched_local.ordered.src_queue || sched_local.ordered.in_order) + return 0; + + if (ordered_own_turn(src_queue)) { + /* Own turn, so can do enqueue directly. */ + sched_local.ordered.in_order = 1; + ordered_stash_release(); + return 0; + } + + if (odp_unlikely(stash_num >= MAX_ORDERED_STASH)) { + /* If the local stash is full, wait until it is our turn and + * then release the stash and do enqueue directly. */ + wait_for_order(src_queue); + + sched_local.ordered.in_order = 1; + + ordered_stash_release(); + return 0; + } + + sched_local.ordered.stash[stash_num].queue = dst_queue; + sched_local.ordered.stash[stash_num].num = num; + for (i = 0; i < num; i++) + sched_local.ordered.stash[stash_num].buf_hdr[i] = buf_hdr[i]; + + sched_local.ordered.stash_num++; + + *ret = num; + return 1; } /* @@ -658,9 +768,21 @@ static int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[], ret = copy_events(out_ev, max_num); if (ordered) { - /* Operate as atomic */ - sched_local.queue_index = qi; - sched_local.queue_entry = get_qentry(qi); + uint64_t ctx; + queue_entry_t *queue; + odp_atomic_u64_t *next_ctx; + + queue = get_qentry(qi); + next_ctx = &queue->s.ordered.next_ctx; + + ctx = odp_atomic_fetch_inc_u64(next_ctx); + + sched_local.ordered.ctx = ctx; + sched_local.ordered.src_queue = queue; + + /* Continue scheduling ordered queues */ + ring_enq(ring, PRIO_QUEUE_MASK, qi); + } else if (sched_cb_queue_is_atomic(qi)) { /* Hold queue during atomic access */ sched_local.queue_index = qi; @@ -785,8 +907,16 @@ static int schedule_multi(odp_queue_t *out_queue, uint64_t wait, return schedule_loop(out_queue, wait, events, num); } -static void order_lock(void) +static inline void order_lock(void) { + queue_entry_t *queue; + + queue = sched_local.ordered.src_queue; + + if (!queue) + return; + + wait_for_order(queue); } static void order_unlock(void) @@ -795,6 +925,13 @@ static void order_unlock(void) static void schedule_order_lock(unsigned lock_index ODP_UNUSED) { + queue_entry_t *queue; + + queue = sched_local.ordered.src_queue; + + ODP_ASSERT(queue && lock_index <= queue->s.param.sched.lock_count); + + wait_for_order(queue); } static void schedule_order_unlock(unsigned lock_index ODP_UNUSED) -- 2.7.4