On Thu, Dec 1, 2016 at 5:37 AM, Matias Elo wrote:
> Implement ordered locks using per lock atomic counters. The counter values
> are compared to the queue’s atomic context to guarantee ordered locking.
> Compared to the previous implementation this enables parallel processing of
> ordered events outside of the lock context.
>
> Signed-off-by: Matias Elo
> ---
> .../linux-generic/include/odp_queue_internal.h | 2 +
> platform/linux-generic/odp_queue.c | 6 +++
> platform/linux-generic/odp_schedule.c | 49
> --
> 3 files changed, 54 insertions(+), 3 deletions(-)
>
> diff --git a/platform/linux-generic/include/odp_queue_internal.h
> b/platform/linux-generic/include/odp_queue_internal.h
> index b905bd8..8b55de1 100644
> --- a/platform/linux-generic/include/odp_queue_internal.h
> +++ b/platform/linux-generic/include/odp_queue_internal.h
> @@ -59,6 +59,8 @@ struct queue_entry_s {
> struct {
> odp_atomic_u64_t ctx; /**< Current ordered context id */
> odp_atomic_u64_t next_ctx; /**< Next unallocated context id
> */
> + /** Array of ordered locks */
> + odp_atomic_u64_t lock[CONFIG_QUEUE_MAX_ORD_LOCKS];
> } ordered ODP_ALIGNED_CACHE;
>
> enq_func_t enqueue ODP_ALIGNED_CACHE;
> diff --git a/platform/linux-generic/odp_queue.c
> b/platform/linux-generic/odp_queue.c
> index 4c7f497..d9cb9f3 100644
> --- a/platform/linux-generic/odp_queue.c
> +++ b/platform/linux-generic/odp_queue.c
> @@ -77,8 +77,14 @@ static int queue_init(queue_entry_t *queue, const char
> *name,
> queue->s.param.deq_mode = ODP_QUEUE_OP_DISABLED;
>
> if (param->sched.sync == ODP_SCHED_SYNC_ORDERED) {
> + unsigned i;
> +
> odp_atomic_init_u64(&queue->s.ordered.ctx, 0);
> odp_atomic_init_u64(&queue->s.ordered.next_ctx, 0);
> +
> + for (i = 0; i < queue->s.param.sched.lock_count; i++)
> + odp_atomic_init_u64(&queue->s.ordered.lock[i],
> + 0);
> }
> }
> queue->s.type = queue->s.param.type;
> diff --git a/platform/linux-generic/odp_schedule.c
> b/platform/linux-generic/odp_schedule.c
> index 4b33513..c628142 100644
> --- a/platform/linux-generic/odp_schedule.c
> +++ b/platform/linux-generic/odp_schedule.c
> @@ -126,6 +126,15 @@ typedef struct {
> int num;
> } ordered_stash_t;
>
> +/* Ordered lock states */
> +typedef union {
> + uint8_t u8[CONFIG_QUEUE_MAX_ORD_LOCKS];
> + uint32_t all;
> +} lock_called_t;
> +
> +ODP_STATIC_ASSERT(sizeof(lock_called_t) == sizeof(uint32_t),
> + "Lock_called_values_do_not_fit_in_uint32");
> +
> /* Scheduler local data */
> typedef struct {
> int thr;
> @@ -145,6 +154,7 @@ typedef struct {
> ordered_stash_t stash[MAX_ORDERED_STASH];
> int stash_num; /**< Number of stashed enqueue operations */
> uint8_t in_order; /**< Order status */
> + lock_called_t lock_called; /**< States of ordered locks */
> } ordered;
>
> } sched_local_t;
> @@ -553,12 +563,21 @@ static inline void ordered_stash_release(void)
>
> static inline void release_ordered(void)
> {
> + unsigned i;
> queue_entry_t *queue;
>
> queue = sched_local.ordered.src_queue;
>
> wait_for_order(queue);
>
> + /* Release all ordered locks */
> + for (i = 0; i < queue->s.param.sched.lock_count; i++) {
> + if (!sched_local.ordered.lock_called.u8[i])
> + odp_atomic_store_rel_u64(&queue->s.ordered.lock[i],
> +sched_local.ordered.ctx + 1);
> + }
> +
> + sched_local.ordered.lock_called.all = 0;
> sched_local.ordered.src_queue = NULL;
> sched_local.ordered.in_order = 0;
>
> @@ -923,19 +942,43 @@ static void order_unlock(void)
> {
> }
>
> -static void schedule_order_lock(unsigned lock_index ODP_UNUSED)
> +static void schedule_order_lock(unsigned lock_index)
> {
> + odp_atomic_u64_t *ord_lock;
> queue_entry_t *queue;
>
> queue = sched_local.ordered.src_queue;
>
> ODP_ASSERT(queue && lock_index <= queue->s.param.sched.lock_count);
Sorry, I should have been more precise. The staleness test I was
referring to was to verify that the lock had not been previously used
in this ordered context. In the current code that's the following
assert (in odp_schedule_order.c)
sync = sched_local.sync[lock_index];
sync_out = odp_atomic_load_u64(&origin_qe->s.sync_out[lock_index]);
ODP_ASSERT(sync >= sync_out);
The test above should be open code since it's a validity check on the
call. The current code treats odp_schedule_order_lock/unlock as no-ops
if we aren't running in an ordered context as that permits queue