On 04/04/17 21:48, Brian Brooks wrote:
> Add queue getters and setters to provide an abstraction over more than one
> internal queue data structure.
> 
> Use buffer handles instead of pointer to internal object in pktio and tm code.
> 
> Increase the running time of odp_sched_latency to get more stable numbers
> across multiple runs. This is not done for SP scheduler because it is too 
> slow.
> 
> Use an explicit scheduler group in odp_sched_latency.
> 
> Add scalable queue and scheduler implementation.
> 
> Signed-off-by: Brian Brooks <[email protected]>
> Signed-off-by: Kevin Wang <[email protected]>
> Signed-off-by: Honnappa Nagarahalli <[email protected]>
> Signed-off-by: Ola Liljedahl <[email protected]>
> ---
>  platform/linux-generic/Makefile.am                 |   21 +-
>  .../include/odp/api/plat/schedule_types.h          |   20 +-
>  .../linux-generic/include/odp_queue_internal.h     |  122 +-
>  platform/linux-generic/include/odp_schedule_if.h   |  166 +-
>  .../include/odp_schedule_ordered_internal.h        |  150 ++
>  platform/linux-generic/m4/odp_schedule.m4          |   55 +-
>  platform/linux-generic/odp_classification.c        |    4 +-
>  platform/linux-generic/odp_packet_io.c             |   88 +-
>  platform/linux-generic/odp_queue.c                 |    2 +-
>  platform/linux-generic/odp_queue_scalable.c        |  883 +++++++++
>  platform/linux-generic/odp_schedule_if.c           |   36 +-
>  platform/linux-generic/odp_schedule_scalable.c     | 1922 
> ++++++++++++++++++++
>  .../linux-generic/odp_schedule_scalable_ordered.c  |  285 +++
>  platform/linux-generic/odp_traffic_mngr.c          |    7 +-
>  platform/linux-generic/pktio/loop.c                |   10 +-
>  test/common_plat/performance/odp_sched_latency.c   |   68 +-
>  16 files changed, 3754 insertions(+), 85 deletions(-)
>  create mode 100644 
> platform/linux-generic/include/odp_schedule_ordered_internal.h
>  create mode 100644 platform/linux-generic/odp_queue_scalable.c
>  create mode 100644 platform/linux-generic/odp_schedule_scalable.c
>  create mode 100644 platform/linux-generic/odp_schedule_scalable_ordered.c
> 
> diff --git a/platform/linux-generic/Makefile.am 
> b/platform/linux-generic/Makefile.am
> index 70683cac..8c263b99 100644
> --- a/platform/linux-generic/Makefile.am
> +++ b/platform/linux-generic/Makefile.am
> @@ -151,6 +151,8 @@ noinst_HEADERS = \
>                 ${srcdir}/include/odp_debug_internal.h \
>                 ${srcdir}/include/odp_forward_typedefs_internal.h \
>                 ${srcdir}/include/odp_internal.h \
> +               ${srcdir}/include/odp_llqueue.h \
> +               ${srcdir}/include/odp_llsc.h \
>                 ${srcdir}/include/odp_name_table_internal.h \
>                 ${srcdir}/include/odp_packet_internal.h \
>                 ${srcdir}/include/odp_packet_io_internal.h \
> @@ -219,13 +221,9 @@ __LIB__libodp_linux_la_SOURCES = \
>                          pktio/ring.c \
>                          odp_pkt_queue.c \
>                          odp_pool.c \
> -                        odp_queue.c \
>                          odp_rwlock.c \
>                          odp_rwlock_recursive.c \
> -                        odp_schedule.c \
>                          odp_schedule_if.c \
> -                        odp_schedule_sp.c \
> -                        odp_schedule_iquery.c \
>                          odp_shared_memory.c \
>                          odp_sorted_list.c \
>                          odp_spinlock.c \
> @@ -250,6 +248,21 @@ __LIB__libodp_linux_la_SOURCES = \
>                          arch/@ARCH_DIR@/odp_cpu_arch.c \
>                          arch/@ARCH_DIR@/odp_sysinfo_parse.c
>  
> +if ODP_SCHEDULE_SP

can that ifs be removed here? I commented about that in previous patch set.

Maxim.

> +__LIB__libodp_linux_la_SOURCES += odp_schedule_sp.c
> +endif
> +
> +if ODP_SCHEDULE_IQUERY
> +__LIB__libodp_linux_la_SOURCES += odp_schedule_iquery.c
> +endif
> +
> +if ODP_SCHEDULE_SCALABLE
> +__LIB__libodp_linux_la_SOURCES += odp_queue_scalable.c 
> odp_schedule_scalable.c \
> +                               odp_schedule_scalable_ordered.c
> +else
> +__LIB__libodp_linux_la_SOURCES += odp_queue.c odp_schedule.c
> +endif
> +
>  if HAVE_PCAP
>  __LIB__libodp_linux_la_SOURCES += pktio/pcap.c
>  endif
> diff --git a/platform/linux-generic/include/odp/api/plat/schedule_types.h 
> b/platform/linux-generic/include/odp/api/plat/schedule_types.h
> index 535fd6d0..373065c9 100644
> --- a/platform/linux-generic/include/odp/api/plat/schedule_types.h
> +++ b/platform/linux-generic/include/odp/api/plat/schedule_types.h
> @@ -18,6 +18,8 @@
>  extern "C" {
>  #endif
>  
> +#include <odp/api/std_types.h>
> +
>  /** @addtogroup odp_scheduler
>   *  @{
>   */
> @@ -27,6 +29,20 @@ extern "C" {
>  
>  typedef int odp_schedule_prio_t;
>  
> +#ifdef ODP_SCHEDULE_SCALABLE
> +
> +#define ODP_SCHED_PRIO_NUM  8
> +
> +#define ODP_SCHED_PRIO_HIGHEST 0
> +
> +#define ODP_SCHED_PRIO_LOWEST (ODP_SCHED_PRIO_NUM - 1)
> +
> +#define ODP_SCHED_PRIO_DEFAULT (ODP_SCHED_PRIO_NUM / 2)
> +
> +#define ODP_SCHED_PRIO_NORMAL ODP_SCHED_PRIO_DEFAULT
> +
> +#else
> +
>  #define ODP_SCHED_PRIO_HIGHEST  0
>  
>  #define ODP_SCHED_PRIO_NORMAL   4
> @@ -35,6 +51,8 @@ typedef int odp_schedule_prio_t;
>  
>  #define ODP_SCHED_PRIO_DEFAULT  ODP_SCHED_PRIO_NORMAL
>  
> +#endif
> +
>  typedef int odp_schedule_sync_t;
>  
>  #define ODP_SCHED_SYNC_PARALLEL 0
> @@ -44,7 +62,7 @@ typedef int odp_schedule_sync_t;
>  typedef int odp_schedule_group_t;
>  
>  /* These must be kept in sync with thread_globals_t in odp_thread.c */
> -#define ODP_SCHED_GROUP_INVALID -1
> +#define ODP_SCHED_GROUP_INVALID ((odp_schedule_group_t)-1)
>  #define ODP_SCHED_GROUP_ALL     0
>  #define ODP_SCHED_GROUP_WORKER  1
>  #define ODP_SCHED_GROUP_CONTROL 2
> diff --git a/platform/linux-generic/include/odp_queue_internal.h 
> b/platform/linux-generic/include/odp_queue_internal.h
> index 560f826e..b9387340 100644
> --- a/platform/linux-generic/include/odp_queue_internal.h
> +++ b/platform/linux-generic/include/odp_queue_internal.h
> @@ -19,16 +19,24 @@ extern "C" {
>  #endif
>  
>  #include <odp/api/queue.h>
> -#include <odp_forward_typedefs_internal.h>
> -#include <odp_schedule_if.h>
> -#include <odp_buffer_internal.h>
> -#include <odp_align_internal.h>
> +#include <odp/api/std_types.h>
> +#include <odp/api/buffer.h>
>  #include <odp/api/packet_io.h>
>  #include <odp/api/align.h>
>  #include <odp/api/hints.h>
>  #include <odp/api/ticketlock.h>
> +
>  #include <odp_config_internal.h>
>  
> +#include <odp_align_internal.h>
> +#include <odp_buffer_internal.h>
> +#include <odp_forward_typedefs_internal.h>
> +#ifdef ODP_SCHEDULE_SCALABLE
> +#include <odp_llsc.h>
> +#include <odp_schedule_ordered_internal.h>
> +#endif
> +#include <odp_schedule_if.h>
> +
>  #define QUEUE_MULTI_MAX CONFIG_BURST_SIZE
>  
>  #define QUEUE_STATUS_FREE         0
> @@ -37,8 +45,6 @@ extern "C" {
>  #define QUEUE_STATUS_NOTSCHED     3
>  #define QUEUE_STATUS_SCHED        4
>  
> -
> -/* forward declaration */
>  union queue_entry_u;
>  
>  typedef int (*enq_func_t)(union queue_entry_u *, odp_buffer_hdr_t *);
> @@ -49,6 +55,37 @@ typedef int (*enq_multi_func_t)(union queue_entry_u *,
>  typedef      int (*deq_multi_func_t)(union queue_entry_u *,
>                               odp_buffer_hdr_t **, int);
>  
> +#ifdef ODP_SCHEDULE_SCALABLE
> +#define BUFFER_HDR_INVALID ((odp_buffer_hdr_t *)(void *)ODP_EVENT_INVALID)
> +
> +struct queue_entry_s {
> +     sched_elem_t     sched_elem;
> +     odp_shm_t        shm;
> +     odp_shm_t        rwin_shm;
> +
> +     odp_ticketlock_t lock ODP_ALIGNED_CACHE;
> +     int              status;
> +
> +     enq_func_t       enqueue ODP_ALIGNED_CACHE;
> +     deq_func_t       dequeue;
> +     enq_multi_func_t enqueue_multi;
> +     deq_multi_func_t dequeue_multi;
> +
> +     uint32_t           index;
> +     odp_queue_t        handle;
> +     odp_queue_type_t   type;
> +     odp_queue_param_t  param;
> +     odp_pktin_queue_t  pktin;
> +     odp_pktout_queue_t pktout;
> +     char               name[ODP_QUEUE_NAME_LEN];
> +};
> +
> +int _odp_queue_deq(sched_elem_t *q, odp_event_t *evp, int num);
> +int _odp_queue_deq_sc(sched_elem_t *q, odp_event_t *evp, int num);
> +
> +#else
> +#define BUFFER_HDR_INVALID NULL
> +
>  struct queue_entry_s {
>       odp_ticketlock_t  lock ODP_ALIGNED_CACHE;
>  
> @@ -77,6 +114,8 @@ struct queue_entry_s {
>       char              name[ODP_QUEUE_NAME_LEN];
>  };
>  
> +#endif
> +
>  union queue_entry_u {
>       struct queue_entry_s s;
>       uint8_t pad[ROUNDUP_CACHE_LINE(sizeof(struct queue_entry_s))];
> @@ -94,6 +133,12 @@ int queue_deq_multi(queue_entry_t *queue, 
> odp_buffer_hdr_t *buf_hdr[], int num);
>  void queue_lock(queue_entry_t *queue);
>  void queue_unlock(queue_entry_t *queue);
>  
> +int queue_pktout_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr);
> +int queue_pktout_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
> +                        int num);
> +
> +int queue_tm_reorder(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr);
> +
>  static inline uint32_t queue_to_id(odp_queue_t handle)
>  {
>       return _odp_typeval(handle) - 1;
> @@ -107,6 +152,71 @@ static inline queue_entry_t *queue_to_qentry(odp_queue_t 
> handle)
>       return get_qentry(queue_id);
>  }
>  
> +static inline odp_queue_t queue_get_handle(queue_entry_t *queue)
> +{
> +     return queue->s.handle;
> +}
> +
> +static inline odp_pktout_queue_t queue_get_pktout(queue_entry_t *queue)
> +{
> +     return queue->s.pktout;
> +}
> +
> +static inline void queue_set_pktout(queue_entry_t *queue,
> +                                 odp_pktio_t pktio,
> +                                 int index)
> +{
> +     queue->s.pktout.pktio = pktio;
> +     queue->s.pktout.index = index;
> +}
> +
> +static inline odp_pktin_queue_t queue_get_pktin(queue_entry_t *queue)
> +{
> +     return queue->s.pktin;
> +}
> +
> +static inline void queue_set_pktin(queue_entry_t *queue,
> +                                odp_pktio_t pktio,
> +                                int index)
> +{
> +     queue->s.pktin.pktio = pktio;
> +     queue->s.pktin.index = index;
> +}
> +
> +static inline void queue_set_enq_func(queue_entry_t *queue, enq_func_t func)
> +{
> +     queue->s.enqueue = func;
> +}
> +
> +static inline void queue_set_enq_multi_func(queue_entry_t *queue,
> +                                         enq_multi_func_t func)
> +{
> +     queue->s.enqueue_multi = func;
> +}
> +
> +static inline void queue_set_deq_func(queue_entry_t *queue, deq_func_t func)
> +{
> +     queue->s.dequeue = func;
> +}
> +
> +static inline void queue_set_deq_multi_func(queue_entry_t *queue,
> +                                         deq_multi_func_t func)
> +{
> +     queue->s.dequeue_multi = func;
> +}
> +
> +static inline void queue_set_type(queue_entry_t *queue, odp_queue_type_t 
> type)
> +{
> +     queue->s.type = type;
> +}
> +
> +#ifdef ODP_SCHEDULE_SCALABLE
> +static inline reorder_window_t *queue_get_rwin(queue_entry_t *queue)
> +{
> +     return queue->s.sched_elem.rwin;
> +}
> +#endif
> +
>  #ifdef __cplusplus
>  }
>  #endif
> diff --git a/platform/linux-generic/include/odp_schedule_if.h 
> b/platform/linux-generic/include/odp_schedule_if.h
> index 530d157f..41998833 100644
> --- a/platform/linux-generic/include/odp_schedule_if.h
> +++ b/platform/linux-generic/include/odp_schedule_if.h
> @@ -4,6 +4,12 @@
>   * SPDX-License-Identifier:     BSD-3-Clause
>   */
>  
> +/* Copyright (c) 2017, ARM Limited
> + * All rights reserved.
> + *
> + * SPDX-License-Identifier:     BSD-3-Clause
> + */
> +
>  #ifndef ODP_SCHEDULE_IF_H_
>  #define ODP_SCHEDULE_IF_H_
>  
> @@ -11,18 +17,168 @@
>  extern "C" {
>  #endif
>  
> +#include <odp/api/align.h>
>  #include <odp/api/queue.h>
> -#include <odp_queue_internal.h>
>  #include <odp/api/schedule.h>
> +#include <odp/api/ticketlock.h>
> +
> +#include <odp_forward_typedefs_internal.h>
> +#ifdef ODP_SCHEDULE_SCALABLE
> +#include <odp_config_internal.h>
> +#include <odp_llqueue.h>
> +#include <odp_schedule_ordered_internal.h>
> +
> +#include <limits.h>
> +#endif
> +
> +/* Number of ordered locks per queue */
> +#define SCHEDULE_ORDERED_LOCKS_PER_QUEUE 2
> +
> +#ifdef ODP_SCHEDULE_SCALABLE
> +
> +typedef struct {
> +     union {
> +             struct {
> +                     struct llqueue llq;
> +                     uint32_t prio;
> +             };
> +             char line[ODP_CACHE_LINE_SIZE];
> +     };
> +} sched_queue_t ODP_ALIGNED_CACHE;
> +
> +#define TICKET_INVALID (uint16_t)(~0U)
> +
> +typedef struct {
> +     int32_t numevts;
> +     uint16_t wrr_budget;
> +     uint8_t cur_ticket;
> +     uint8_t nxt_ticket;
> +} qschedstate_t ODP_ALIGNED(sizeof(uint64_t));
> +
> +typedef uint32_t ringidx_t;
> +
> +#ifdef ODP_CONFIG_USE_SPLIT_PRODCONS
> +#define SPLIT_PC ODP_ALIGNED_CACHE
> +#else
> +#define SPLIT_PC
> +#endif
> +
> +#ifdef ODP_CONFIG_USE_SPLIT_READWRITE
> +#define SPLIT_RW ODP_ALIGNED_CACHE
> +#else
> +#define SPLIT_RW
> +#endif
> +
> +#define ODP_NO_SCHED_QUEUE (ODP_SCHED_SYNC_ORDERED + 1)
> +
> +typedef struct {
> +     /* (ll)node must be first */
> +     struct llnode node;           /* 8:  0.. 7 */
> +     sched_queue_t *schedq;        /* 8:  8..15 */
> +#ifdef CONFIG_QSCHST_LOCK
> +     odp_ticketlock_t qschlock;
> +#endif
> +     qschedstate_t qschst;         /* 8: 16..23 */
> +     uint16_t pop_deficit;         /* 2: 24..25 */
> +     uint16_t qschst_type;         /* 2: 26..27 */
> +
> +     ringidx_t prod_read SPLIT_PC; /* 4: 28..31 */
> +
> +     ringidx_t prod_write SPLIT_RW;/* 4: 32..35 */
> +     ringidx_t prod_mask;          /* 4: 36..39 */
> +     odp_buffer_hdr_t **prod_ring; /* 8: 40..47 */
> +
> +     ringidx_t cons_write SPLIT_PC;/* 4: 48..51 */
> +
> +     ringidx_t cons_read SPLIT_RW; /* 4: 52..55 */
> +
> +     reorder_window_t *rwin;       /* 8: 56..63 */
> +     void *user_ctx;               /* 8: 64..71 */
> +#ifdef ODP_CONFIG_USE_SPLIT_PRODCONS
> +     odp_buffer_hdr_t **cons_ring;
> +     ringidx_t cons_mask;
> +     uint16_t cons_type;
> +#else
> +#define cons_mask prod_mask
> +#define cons_ring prod_ring
> +#define cons_type qschst_type
> +#endif
> +} sched_elem_t ODP_ALIGNED_CACHE;
> +
> +/*
> + * Scheduler group related declarations.
> + */
> +typedef bitset_t sched_group_mask_t;
> +
> +/* Number of scheduling groups */
> +#define MAX_SCHED_GROUP (sizeof(sched_group_mask_t) * CHAR_BIT)
> +
> +typedef struct {
> +     /* Threads currently associated with the sched group */
> +     bitset_t thr_actual[ODP_SCHED_PRIO_NUM] ODP_ALIGNED_CACHE;
> +     bitset_t thr_wanted;
> +     /* Used to spread queues over schedq's */
> +     uint32_t xcount[ODP_SCHED_PRIO_NUM];
> +     /* Number of schedq's per prio */
> +     uint32_t xfactor;
> +
> +     char name[ODP_SCHED_GROUP_NAME_LEN];
> +     /* shm handle for the group */
> +     odp_shm_t shm;
> +
> +     /* ODP_SCHED_PRIO_NUM * xfactor.
> +      * Must be the last memember in this struct.
> +      */
> +     sched_queue_t schedq[1] ODP_ALIGNED_CACHE;
> +} sched_group_t;
> +
> +/*
> + * Per thread state
> + */
> +/* Number of reorder contexts per thread */
> +#define TS_RVEC_SIZE 16
> +typedef struct {
> +     /* Atomic queue currently being processed or NULL */
> +     sched_elem_t *atomq;
> +     /* Current reorder context or NULL */
> +     reorder_context_t *rctx;
> +     uint8_t pause;
> +     uint8_t out_of_order;
> +     uint8_t tidx;
> +     uint8_t pad;
> +     uint32_t dequeued; /* Number of events dequeued from atomic queue */
> +     uint16_t pktin_next;/* Next pktin tag to poll */
> +     uint16_t pktin_poll_cnts;
> +     uint16_t ticket; /* Ticket for atomic queue or TICKET_INVALID */
> +     uint16_t num_schedq;
> +     uint16_t sg_sem; /* Set when sg_wanted is modified by other thread */
> +#define SCHEDQ_PER_THREAD (MAX_SCHED_GROUP * ODP_SCHED_PRIO_NUM)
> +     sched_queue_t *schedq_list[SCHEDQ_PER_THREAD];
> +     /* Current sched_group membership */
> +     sched_group_mask_t sg_actual[ODP_SCHED_PRIO_NUM];
> +     /* Future sched_group membership. */
> +     sched_group_mask_t sg_wanted[ODP_SCHED_PRIO_NUM];
> +     bitset_t priv_rvec_free;
> +     /* Bitset of free entries in rvec[] */
> +     bitset_t rvec_free ODP_ALIGNED_CACHE;
> +     /* Reordering contexts to allocate from */
> +     reorder_context_t rvec[TS_RVEC_SIZE] ODP_ALIGNED_CACHE;
> +} sched_scalable_thread_state_t ODP_ALIGNED_CACHE;
> +
> +void sched_update_enq(sched_elem_t *q, uint32_t actual);
> +void sched_update_enq_sp(sched_elem_t *q, uint32_t actual);
> +sched_queue_t *schedq_from_sched_group(odp_schedule_group_t grp, uint32_t 
> prio);
> +void sched_group_xcount_dec(odp_schedule_group_t grp, uint32_t prio);
> +
> +#endif  /* ODP_SCHEDULE_SCALABLE */
>  
>  typedef void (*schedule_pktio_start_fn_t)(int pktio_index, int num_in_queue,
>                                         int in_queue_idx[]);
>  typedef int (*schedule_thr_add_fn_t)(odp_schedule_group_t group, int thr);
>  typedef int (*schedule_thr_rem_fn_t)(odp_schedule_group_t group, int thr);
>  typedef int (*schedule_num_grps_fn_t)(void);
> -typedef int (*schedule_init_queue_fn_t)(uint32_t queue_index,
> -                                     const odp_schedule_param_t *sched_param
> -                                    );
> +typedef int (*schedule_init_queue_fn_t)(
> +     uint32_t queue_index, const odp_schedule_param_t *sched_param);
>  typedef void (*schedule_destroy_queue_fn_t)(uint32_t queue_index);
>  typedef int (*schedule_sched_queue_fn_t)(uint32_t queue_index);
>  typedef int (*schedule_unsched_queue_fn_t)(uint32_t queue_index);
> @@ -64,6 +220,7 @@ extern const schedule_fn_t *sched_fn;
>  int sched_cb_pktin_poll(int pktio_index, int num_queue, int index[]);
>  void sched_cb_pktio_stop_finalize(int pktio_index);
>  int sched_cb_num_pktio(void);
> +#ifndef ODP_SCHEDULE_SCALABLE
>  int sched_cb_num_queues(void);
>  int sched_cb_queue_prio(uint32_t queue_index);
>  int sched_cb_queue_grp(uint32_t queue_index);
> @@ -73,6 +230,7 @@ odp_queue_t sched_cb_queue_handle(uint32_t queue_index);
>  void sched_cb_queue_destroy_finalize(uint32_t queue_index);
>  int sched_cb_queue_deq_multi(uint32_t queue_index, odp_event_t ev[], int 
> num);
>  int sched_cb_queue_empty(uint32_t queue_index);
> +#endif
>  
>  /* API functions */
>  typedef struct {
> diff --git a/platform/linux-generic/include/odp_schedule_ordered_internal.h 
> b/platform/linux-generic/include/odp_schedule_ordered_internal.h
> new file mode 100644
> index 00000000..f086dae4
> --- /dev/null
> +++ b/platform/linux-generic/include/odp_schedule_ordered_internal.h
> @@ -0,0 +1,150 @@
> +/* Copyright (c) 2017, ARM Limited
> + * All rights reserved.
> + *
> + * SPDX-License-Identifier:     BSD-3-Clause
> + */
> +
> +#ifndef ODP_SCHEDULE_ORDERED_INTERNAL_H_
> +#define ODP_SCHEDULE_ORDERED_INTERNAL_H_
> +
> +#ifdef __cplusplus
> +extern "C" {
> +#endif
> +
> +#ifdef ODP_SCHEDULE_SCALABLE
> +
> +#include <odp/api/shared_memory.h>
> +
> +#include <odp_internal.h>
> +
> +#include <odp_align_internal.h>
> +#include <odp_bitset.h>
> +#include <odp_llsc.h>
> +
> +/* High level functioning of reordering
> + * Datastructures -
> + * Reorder Window - Every ordered queue is associated with a reorder window.
> + *                  Reorder window stores reorder contexts from threads that
> + *                  have completed processing out-of-order.
> + * Reorder Context - Reorder context consists of events that a thread
> + *                   wants to enqueue while processing a batch of events
> + *                   from an ordered queue.
> + *
> + * Algorithm -
> + * 1) Thread identifies the ordered queue.
> + * 2) It 'reserves a slot in the reorder window and dequeues the
> + *    events' atomically. Atomicity is achieved by using a ticket-lock
> + *    like design where the reorder window slot is the ticket.
> + * 3a) Upon order-release/next schedule call, the thread
> + *     checks if it's slot (ticket) equals the head of the reorder window.
> + *     If yes, enqueues the events to the destination queue till
> + *         i) the reorder window is empty or
> + *         ii) there is a gap in the reorder window
> + *     If no, the reorder context is stored in the reorder window at
> + *     the reserved slot.
> + * 3b) Upon the first enqueue, the thread checks if it's slot (ticket)
> + *     equals the head of the reorder window.
> + *     If yes, enqueues the events immediately to the destination queue
> + *     If no, these (and subsequent) events are stored in the reorder context
> + *     (in the application given order)
> + */
> +
> +/* Head and change indicator variables are used to synchronise between
> + * concurrent insert operations in the reorder window. A thread performing
> + * an in-order insertion must be notified about the newly inserted
> + * reorder contexts so that it doesn’t halt the retire process too early.
> + * A thread performing an out-of-order insertion must correspondingly
> + * notify the thread doing in-order insertion of the new waiting reorder
> + * context, which may need to be handled by that thread.
> + *
> + * Also, an out-of-order insertion may become an in-order insertion if the
> + * thread doing an in-order insertion completes before this thread completes.
> + * We need a point of synchronisation where this knowledge and potential 
> state
> + * change can be transferred between threads.
> + */
> +typedef struct hc {
> +     /* First missing context */
> +     uint32_t head;
> +     /* Change indicator */
> +     uint32_t chgi;
> +} hc_t ODP_ALIGNED(sizeof(uint64_t));
> +
> +/* Number of reorder contects in the reorder window.
> + * Should be at least one per CPU.
> + */
> +#define RWIN_SIZE 32
> +ODP_STATIC_ASSERT(CHECK_IS_POWER2(RWIN_SIZE), "RWIN_SIZE is not a power of 
> 2");
> +
> +#define NUM_OLOCKS 2
> +
> +typedef struct reorder_context reorder_context_t;
> +
> +typedef struct reorder_window {
> +     /* head and change indicator */
> +     hc_t hc;
> +     uint32_t winmask;
> +     uint32_t tail;
> +     uint32_t turn;
> +     uint32_t olock[NUM_OLOCKS];
> +     uint16_t lock_count;
> +     /* Reorder contexts in this window */
> +     reorder_context_t *ring[RWIN_SIZE];
> +} reorder_window_t;
> +
> +/* Number of events that can be stored in a reorder context.
> + * This size is chosen so that there is no space left unused at the end
> + * of the last cache line (for 64b architectures and 64b handles).
> + */
> +#define RC_EVT_SIZE 18
> +
> +struct reorder_context {
> +     /* Reorder window to which this context belongs */
> +     reorder_window_t *rwin;
> +     /* Pointer to TS->rvec_free */
> +     bitset_t *rvec_free;
> +     /* Our slot number in the reorder window */
> +     uint32_t sn;
> +     uint8_t olock_flags;
> +     /* Our index in thread_state rvec array */
> +     uint8_t idx;
> +     /* Use to link reorder contexts together */
> +     uint8_t next_idx;
> +     /* Current reorder context to save events in */
> +     uint8_t cur_idx;
> +     /* Number of events stored in this reorder context */
> +     uint8_t numevts;
> +     /* Events stored in this context */
> +     odp_buffer_hdr_t *events[RC_EVT_SIZE];
> +     queue_entry_t *destq[RC_EVT_SIZE];
> +} ODP_ALIGNED_CACHE;
> +
> +reorder_window_t *rwin_alloc(int rwin_id, odp_shm_t *shm, unsigned 
> lock_count);
> +bool rwin_reserve(reorder_window_t *rwin, uint32_t *sn);
> +void rwin_insert(reorder_window_t *rwin,
> +              reorder_context_t *rctx,
> +              uint32_t sn,
> +              void (*callback)(reorder_context_t *));
> +void rctx_init(reorder_context_t *rctx, uint16_t idx,
> +            reorder_window_t *rwin, uint32_t sn);
> +void rctx_free(const reorder_context_t *rctx);
> +void olock_unlock(const reorder_context_t *rctx, reorder_window_t *rwin,
> +               uint32_t lock_index);
> +void olock_release(const reorder_context_t *rctx);
> +void rctx_retire(reorder_context_t *first);
> +void rctx_release(reorder_context_t *rctx);
> +
> +#else
> +
> +#define SUSTAIN_ORDER 1
> +
> +int schedule_ordered_queue_enq(uint32_t queue_index, void *p_buf_hdr,
> +                            int sustain, int *ret);
> +int schedule_ordered_queue_enq_multi(uint32_t queue_index, void *p_buf_hdr[],
> +                                  int num, int sustain, int *ret);
> +#endif
> +
> +#ifdef __cplusplus
> +}
> +#endif
> +
> +#endif
> diff --git a/platform/linux-generic/m4/odp_schedule.m4 
> b/platform/linux-generic/m4/odp_schedule.m4
> index 91c19f21..d862b8b2 100644
> --- a/platform/linux-generic/m4/odp_schedule.m4
> +++ b/platform/linux-generic/m4/odp_schedule.m4
> @@ -1,13 +1,44 @@
> -AC_ARG_ENABLE([schedule-sp],
> -    [  --enable-schedule-sp    enable strict priority scheduler],
> -    [if test x$enableval = xyes; then
> -     schedule_sp_enabled=yes
> -     ODP_CFLAGS="$ODP_CFLAGS -DODP_SCHEDULE_SP"
> -    fi])
> +# Checks for --enable-schedule-sp and defines ODP_SCHEDULE_SP and adds
> +# -DODP_SCHEDULE_SP to CFLAGS.
> +AC_ARG_ENABLE(
> +    [schedule_sp],
> +    [AC_HELP_STRING([--enable-schedule-sp],
> +                    [enable strict priority scheduler])],
> +    [if test "x$enableval" = xyes; then
> +         schedule_sp=true
> +         ODP_CFLAGS="$ODP_CFLAGS -DODP_SCHEDULE_SP"
> +     else
> +         schedule_sp=false
> +     fi],
> +    [schedule_sp=false])
> +AM_CONDITIONAL([ODP_SCHEDULE_SP], [test x$schedule_sp = xtrue])
>  
> -AC_ARG_ENABLE([schedule-iquery],
> -    [  --enable-schedule-iquery    enable interests query (sparse bitmap) 
> scheduler],
> -    [if test x$enableval = xyes; then
> -     schedule_iquery_enabled=yes
> -     ODP_CFLAGS="$ODP_CFLAGS -DODP_SCHEDULE_IQUERY"
> -    fi])
> +# Checks for --enable-schedule-iquery and defines ODP_SCHEDULE_IQUERY and 
> adds
> +# -DODP_SCHEDULE_IQUERY to CFLAGS.
> +AC_ARG_ENABLE(
> +    [schedule_iquery],
> +    [AC_HELP_STRING([--enable-schedule-iquery],
> +                    [enable interests query (sparse bitmap) scheduler])],
> +    [if test "x$enableval" = xyes; then
> +         schedule_iquery=true
> +         ODP_CFLAGS="$ODP_CFLAGS -DODP_SCHEDULE_IQUERY"
> +     else
> +         schedule_iquery=false
> +     fi],
> +    [schedule_iquery=false])
> +AM_CONDITIONAL([ODP_SCHEDULE_IQUERY], [test x$schedule_iquery = xtrue])
> +
> +# Checks for --enable-schedule-scalable and defines ODP_SCHEDULE_SCALABLE and
> +# adds -DODP_SCHEDULE_SCALABLE to CFLAGS.
> +AC_ARG_ENABLE(
> +    [schedule_scalable],
> +    [AC_HELP_STRING([--enable-schedule-scalable],
> +                    [enable scalable scheduler])],
> +    [if test "x$enableval" = xyes; then
> +         schedule_scalable=true
> +         ODP_CFLAGS="$ODP_CFLAGS -DODP_SCHEDULE_SCALABLE"
> +     else
> +         schedule_scalable=false
> +     fi],
> +    [schedule_scalable=false])
> +AM_CONDITIONAL([ODP_SCHEDULE_SCALABLE], [test x$schedule_scalable = xtrue])
> diff --git a/platform/linux-generic/odp_classification.c 
> b/platform/linux-generic/odp_classification.c
> index 5d96b00b..8fb5c32f 100644
> --- a/platform/linux-generic/odp_classification.c
> +++ b/platform/linux-generic/odp_classification.c
> @@ -282,7 +282,7 @@ odp_queue_t odp_cos_queue(odp_cos_t cos_id)
>       if (!cos->s.queue)
>               return ODP_QUEUE_INVALID;
>  
> -     return cos->s.queue->s.handle;
> +     return queue_get_handle(cos->s.queue);
>  }
>  
>  int odp_cos_drop_set(odp_cos_t cos_id, odp_cls_drop_t drop_policy)
> @@ -849,7 +849,7 @@ int cls_classify_packet(pktio_entry_t *entry, const 
> uint8_t *base,
>  
>       *pool = cos->s.pool;
>       pkt_hdr->p.input_flags.dst_queue = 1;
> -     pkt_hdr->dst_queue = cos->s.queue->s.handle;
> +     pkt_hdr->dst_queue = queue_get_handle(cos->s.queue);
>  
>       return 0;
>  }
> diff --git a/platform/linux-generic/odp_packet_io.c 
> b/platform/linux-generic/odp_packet_io.c
> index 5e783d83..a267ee97 100644
> --- a/platform/linux-generic/odp_packet_io.c
> +++ b/platform/linux-generic/odp_packet_io.c
> @@ -5,23 +5,25 @@
>   */
>  #include <odp_posix_extensions.h>
>  
> -#include <odp/api/packet_io.h>
> -#include <odp_packet_io_internal.h>
> -#include <odp_packet_io_queue.h>
>  #include <odp/api/packet.h>
> -#include <odp_packet_internal.h>
> -#include <odp_internal.h>
> +#include <odp/api/packet_io.h>
>  #include <odp/api/spinlock.h>
>  #include <odp/api/ticketlock.h>
>  #include <odp/api/shared_memory.h>
> -#include <odp_packet_socket.h>
> +#include <odp/api/time.h>
> +
> +#include <odp_internal.h>
>  #include <odp_config_internal.h>
> -#include <odp_queue_internal.h>
> -#include <odp_schedule_if.h>
> -#include <odp_classification_internal.h>
>  #include <odp_debug_internal.h>
> +
> +#include <odp_classification_internal.h>
> +#include <odp_queue_internal.h>
>  #include <odp_packet_io_ipc_internal.h>
> -#include <odp/api/time.h>
> +#include <odp_packet_io_internal.h>
> +#include <odp_packet_io_queue.h>
> +#include <odp_packet_internal.h>
> +#include <odp_packet_socket.h>
> +#include <odp_schedule_if.h>
>  
>  #include <string.h>
>  #include <inttypes.h>
> @@ -470,7 +472,6 @@ int odp_pktio_start(odp_pktio_t hdl)
>                               return -1;
>                       }
>               }
> -
>               sched_fn->pktio_start(pktio_to_id(hdl), num, index);
>       }
>  
> @@ -552,7 +553,6 @@ static inline int pktin_recv_buf(odp_pktin_queue_t queue,
>       odp_packet_t packets[num];
>       odp_packet_hdr_t *pkt_hdr;
>       odp_buffer_hdr_t *buf_hdr;
> -     odp_buffer_t buf;
>       int i;
>       int pkts;
>       int num_rx = 0;
> @@ -562,9 +562,11 @@ static inline int pktin_recv_buf(odp_pktin_queue_t queue,
>       for (i = 0; i < pkts; i++) {
>               pkt = packets[i];
>               pkt_hdr = odp_packet_hdr(pkt);
> -             buf = _odp_packet_to_buffer(pkt);
> -             buf_hdr = buf_hdl_to_hdr(buf);
> -
> +#ifdef ODP_SCHEDULE_SCALABLE
> +             buf_hdr = _odp_packet_to_buf_hdr_ptr(pkt);
> +#else
> +             buf_hdr = buf_hdl_to_hdr(_odp_packet_to_buffer(pkt));
> +#endif
>               if (pkt_hdr->p.input_flags.dst_queue) {
>                       queue_entry_t *dst_queue;
>                       int ret;
> @@ -582,11 +584,17 @@ static inline int pktin_recv_buf(odp_pktin_queue_t 
> queue,
>  
>  int pktout_enqueue(queue_entry_t *qentry, odp_buffer_hdr_t *buf_hdr)
>  {
> -     odp_packet_t pkt = _odp_packet_from_buffer(buf_hdr->handle.handle);
> +     odp_packet_t pkt;
> +
> +#ifdef ODP_SCHEDULE_SCALABLE
> +     pkt = _odp_packet_from_buffer((odp_buffer_t)buf_hdr);
> +#else
> +     pkt = _odp_packet_from_buffer(buf_hdr->handle.handle);
> +#endif
>       int len = 1;
>       int nbr;
>  
> -     nbr = odp_pktout_send(qentry->s.pktout, &pkt, len);
> +     nbr = odp_pktout_send(queue_get_pktout(qentry), &pkt, len);
>       return (nbr == len ? 0 : -1);
>  }
>  
> @@ -604,9 +612,13 @@ int pktout_enq_multi(queue_entry_t *qentry, 
> odp_buffer_hdr_t *buf_hdr[],
>       int i;
>  
>       for (i = 0; i < num; ++i)
> +#ifdef ODP_SCHEDULE_SCALABLE
> +             pkt_tbl[i] = _odp_packet_from_buffer((odp_buffer_t)buf_hdr[i]);
> +#else
>               pkt_tbl[i] = _odp_packet_from_buffer(buf_hdr[i]->handle.handle);
> +#endif
>  
> -     nbr = odp_pktout_send(qentry->s.pktout, pkt_tbl, num);
> +     nbr = odp_pktout_send(queue_get_pktout(qentry), pkt_tbl, num);
>       return nbr;
>  }
>  
> @@ -632,13 +644,14 @@ odp_buffer_hdr_t *pktin_dequeue(queue_entry_t *qentry)
>       int pkts;
>  
>       buf_hdr = queue_deq(qentry);
> -     if (buf_hdr != NULL)
> +     if (buf_hdr != BUFFER_HDR_INVALID)
>               return buf_hdr;
>  
> -     pkts = pktin_recv_buf(qentry->s.pktin, hdr_tbl, QUEUE_MULTI_MAX);
> +     pkts = pktin_recv_buf(queue_get_pktin(qentry),
> +                           hdr_tbl, QUEUE_MULTI_MAX);
>  
>       if (pkts <= 0)
> -             return NULL;
> +             return BUFFER_HDR_INVALID;
>  
>       if (pkts > 1)
>               queue_enq_multi(qentry, &hdr_tbl[1], pkts - 1);
> @@ -669,7 +682,8 @@ int pktin_deq_multi(queue_entry_t *qentry, 
> odp_buffer_hdr_t *buf_hdr[], int num)
>       if (nbr == num)
>               return nbr;
>  
> -     pkts = pktin_recv_buf(qentry->s.pktin, hdr_tbl, QUEUE_MULTI_MAX);
> +     pkts = pktin_recv_buf(queue_get_pktin(qentry),
> +                           hdr_tbl, QUEUE_MULTI_MAX);
>       if (pkts <= 0)
>               return nbr;
>  
> @@ -684,7 +698,6 @@ int pktin_deq_multi(queue_entry_t *qentry, 
> odp_buffer_hdr_t *buf_hdr[], int num)
>               queue_enq_multi(qentry, hdr_tbl, j);
>       return nbr;
>  }
> -
>  int sched_cb_pktin_poll(int pktio_index, int num_queue, int index[])
>  {
>       odp_buffer_hdr_t *hdr_tbl[QUEUE_MULTI_MAX];
> @@ -1266,13 +1279,14 @@ int odp_pktin_queue_config(odp_pktio_t pktio,
>                               queue_entry_t *qentry;
>  
>                               qentry = queue_to_qentry(queue);
> -                             qentry->s.pktin.index  = i;
> -                             qentry->s.pktin.pktio  = pktio;
> -
> -                             qentry->s.enqueue = pktin_enqueue;
> -                             qentry->s.dequeue = pktin_dequeue;
> -                             qentry->s.enqueue_multi = pktin_enq_multi;
> -                             qentry->s.dequeue_multi = pktin_deq_multi;
> +                             queue_set_pktin(qentry, pktio, i);
> +
> +                             queue_set_enq_func(qentry, pktin_enqueue);
> +                             queue_set_deq_func(qentry, pktin_dequeue);
> +                             queue_set_enq_multi_func(qentry,
> +                                                      pktin_enq_multi);
> +                             queue_set_deq_multi_func(qentry,
> +                                                      pktin_deq_multi);
>                       }
>  
>                       entry->s.in_queue[i].queue = queue;
> @@ -1390,14 +1404,12 @@ int odp_pktout_queue_config(odp_pktio_t pktio,
>                       }
>  
>                       qentry = queue_to_qentry(queue);
> -                     qentry->s.pktout.index  = i;
> -                     qentry->s.pktout.pktio  = pktio;
> -
> -                     /* Override default enqueue / dequeue functions */
> -                     qentry->s.enqueue       = pktout_enqueue;
> -                     qentry->s.dequeue       = pktout_dequeue;
> -                     qentry->s.enqueue_multi = pktout_enq_multi;
> -                     qentry->s.dequeue_multi = pktout_deq_multi;
> +                     queue_set_pktout(qentry, pktio, i);
> +
> +                     queue_set_enq_func(qentry, pktout_enqueue);
> +                     queue_set_deq_func(qentry, pktout_dequeue);
> +                     queue_set_enq_multi_func(qentry, pktout_enq_multi);
> +                     queue_set_deq_multi_func(qentry, pktout_deq_multi);
>  
>                       entry->s.out_queue[i].queue = queue;
>               }
> diff --git a/platform/linux-generic/odp_queue.c 
> b/platform/linux-generic/odp_queue.c
> index fcf4bf5b..42b58c44 100644
> --- a/platform/linux-generic/odp_queue.c
> +++ b/platform/linux-generic/odp_queue.c
> @@ -291,11 +291,11 @@ void sched_cb_queue_destroy_finalize(uint32_t 
> queue_index)
>  int odp_queue_destroy(odp_queue_t handle)
>  {
>       queue_entry_t *queue;
> -     queue = queue_to_qentry(handle);
>  
>       if (handle == ODP_QUEUE_INVALID)
>               return -1;
>  
> +     queue = queue_to_qentry(handle);
>       LOCK(&queue->s.lock);
>       if (queue->s.status == QUEUE_STATUS_FREE) {
>               UNLOCK(&queue->s.lock);
> diff --git a/platform/linux-generic/odp_queue_scalable.c 
> b/platform/linux-generic/odp_queue_scalable.c
> new file mode 100644
> index 00000000..6bbf2846
> --- /dev/null
> +++ b/platform/linux-generic/odp_queue_scalable.c
> @@ -0,0 +1,883 @@
> +/* Copyright (c) 2017, ARM Limited
> + * All rights reserved.
> + *
> + * SPDX-License-Identifier:     BSD-3-Clause
> + */
> +
> +#include <odp/api/hints.h>
> +#include <odp/api/plat/ticketlock_inlines.h>
> +#include <odp/api/queue.h>
> +#include <odp/api/schedule.h>
> +#include <odp/api/shared_memory.h>
> +#include <odp/api/sync.h>
> +#include <odp/api/traffic_mngr.h>
> +
> +#include <odp_internal.h>
> +#include <odp_config_internal.h>
> +#include <odp_debug_internal.h>
> +
> +#include <odp_buffer_inlines.h>
> +#include <odp_packet_io_internal.h>
> +#include <odp_packet_io_queue.h>
> +#include <odp_pool_internal.h>
> +#include <odp_queue_internal.h>
> +#include <odp_schedule_if.h>
> +
> +#include <string.h>
> +#include <inttypes.h>
> +
> +#define NUM_INTERNAL_QUEUES 64
> +
> +#define MIN(a, b) \
> +     ({ \
> +             __typeof__(a) tmp_a = (a); \
> +             __typeof__(b) tmp_b = (b); \
> +             tmp_a < tmp_b ? tmp_a : tmp_b; \
> +     })
> +
> +#define LOCK(a)      _odp_ticketlock_lock(a)
> +#define UNLOCK(a)    _odp_ticketlock_unlock(a)
> +#define LOCK_INIT(a) odp_ticketlock_init(a)
> +
> +extern __thread sched_scalable_thread_state_t *sched_ts;
> +
> +typedef struct queue_table_t {
> +     queue_entry_t  queue[ODP_CONFIG_QUEUES];
> +} queue_table_t;
> +
> +static queue_table_t *queue_tbl;
> +
> +static inline odp_queue_t queue_from_id(uint32_t queue_id)
> +{
> +     return _odp_cast_scalar(odp_queue_t, queue_id + 1);
> +}
> +
> +queue_entry_t *get_qentry(uint32_t queue_id)
> +{
> +     return &queue_tbl->queue[queue_id];
> +}
> +
> +static int _odp_queue_disable_enq(sched_elem_t *q)
> +{
> +     ringidx_t old_read, old_write, new_write;
> +     uint32_t size;
> +
> +     old_write = q->prod_write;
> +     size = q->prod_mask + 1;
> +     do {
> +             /* Need __atomic_load to avoid compiler reordering */
> +             old_read = __atomic_load_n(&q->prod_read, __ATOMIC_ACQUIRE);
> +             if (old_write != old_read) {
> +                     /* Queue is not empty, cannot claim all elements
> +                     * Cannot disable enqueue.
> +                     */
> +                     return -1;
> +             }
> +             /* Claim all elements in ring */
> +             new_write = old_write + size;
> +     } while (!__atomic_compare_exchange_n(&q->prod_write,
> +                             &old_write, /* Updated on failure */
> +                             new_write,
> +                             true,
> +                             __ATOMIC_RELAXED,
> +                             __ATOMIC_RELAXED));
> +     /* All remaining elements claimed, noone else can enqueue */
> +     return 0;
> +}
> +
> +static int queue_init(int queue_idx, queue_entry_t *queue, const char *name,
> +                   const odp_queue_param_t *param)
> +{
> +     ringidx_t ring_idx;
> +     sched_elem_t *sched_elem;
> +     odp_shm_t shm;
> +     uint32_t ring_size;
> +     odp_buffer_hdr_t **ring;
> +
> +     sched_elem = &queue->s.sched_elem;
> +     ring_size = param->ring_size > 0 ?
> +                     ROUNDUP_POWER2_U32(param->ring_size) :
> +                     ODP_CONFIG_QUEUE_SIZE;
> +     strncpy(queue->s.name, name ? name : "", ODP_QUEUE_NAME_LEN - 1);
> +     queue->s.name[ODP_QUEUE_NAME_LEN - 1] = 0;
> +     memcpy(&queue->s.param, param, sizeof(odp_queue_param_t));
> +
> +     shm = odp_shm_reserve(name,
> +                           ring_size * sizeof(odp_buffer_hdr_t *),
> +                           ODP_CACHE_LINE_SIZE,
> +                           ODP_SHM_PROC);
> +     if (ODP_SHM_INVALID == shm)
> +             return -1;
> +
> +     ring = (odp_buffer_hdr_t **)odp_shm_addr(shm);
> +
> +     for (ring_idx = 0; ring_idx < ring_size; ring_idx++)
> +             ring[ring_idx] = NULL;
> +
> +     queue->s.shm = shm;
> +     queue->s.type = queue->s.param.type;
> +     queue->s.enqueue = queue_enq;
> +     queue->s.dequeue = queue_deq;
> +     queue->s.enqueue_multi = queue_enq_multi;
> +     queue->s.dequeue_multi = queue_deq_multi;
> +     queue->s.pktin = PKTIN_INVALID;
> +
> +     sched_elem->node.next = NULL;
> +#ifdef CONFIG_QSCHST_LOCK
> +     LOCK_INIT(&sched_elem->qschlock);
> +#endif
> +     sched_elem->qschst.numevts = 0;
> +     sched_elem->qschst.wrr_budget = CONFIG_WRR_WEIGHT;
> +     sched_elem->qschst.cur_ticket = 0;
> +     sched_elem->qschst.nxt_ticket = 0;
> +     sched_elem->pop_deficit = 0;
> +     if (queue->s.type == ODP_QUEUE_TYPE_SCHED)
> +             sched_elem->qschst_type = queue->s.param.sched.sync;
> +     else
> +             sched_elem->qschst_type = ODP_NO_SCHED_QUEUE;
> +     /* 2nd cache line - enqueue */
> +     sched_elem->prod_read = 0;
> +     sched_elem->prod_write = 0;
> +     sched_elem->prod_ring = ring;
> +     sched_elem->prod_mask = ring_size - 1;
> +     /* 3rd cache line - dequeue */
> +     sched_elem->cons_read = 0;
> +     sched_elem->cons_write = 0;
> +     sched_elem->rwin = NULL;
> +     sched_elem->schedq = NULL;
> +     sched_elem->user_ctx = queue->s.param.context;
> +#ifdef ODP_CONFIG_USE_SPLIT_PRODCONS
> +     sched_elem->cons_ring = ring;
> +     sched_elem->cons_mask = ring_size - 1;
> +     sched_elem->cons_type = sched_elem->qschst_type;
> +#endif
> +
> +     /* Queue initialized successfully, add it to the sched group */
> +     if (queue->s.type == ODP_QUEUE_TYPE_SCHED) {
> +             if (queue->s.param.sched.sync == ODP_SCHED_SYNC_ORDERED) {
> +                     sched_elem->rwin =
> +                             rwin_alloc(queue_idx, &queue->s.rwin_shm,
> +                                        queue->s.param.sched.lock_count);
> +                     if (sched_elem->rwin == NULL) {
> +                             ODP_ERR("Reorder window not created\n");
> +                             goto rwin_create_failed;
> +                     }
> +             }
> +             sched_elem->schedq =
> +                     schedq_from_sched_group(param->sched.group,
> +                                             param->sched.prio);
> +     }
> +
> +     return 0;
> +
> +rwin_create_failed:
> +     odp_shm_free(queue->s.shm);
> +     return -1;
> +}
> +
> +int odp_queue_init_global(void)
> +{
> +     uint32_t i;
> +     odp_shm_t shm;
> +
> +     ODP_DBG("Queue init ... ");
> +
> +     shm = odp_shm_reserve("odp_queues",
> +                           sizeof(queue_table_t),
> +                           sizeof(queue_entry_t), 0);
> +
> +     queue_tbl = odp_shm_addr(shm);
> +
> +     if (queue_tbl == NULL)
> +             return -1;
> +
> +     memset(queue_tbl, 0, sizeof(queue_table_t));
> +
> +     for (i = 0; i < ODP_CONFIG_QUEUES; i++) {
> +             /* init locks */
> +             queue_entry_t *queue;
> +
> +             queue = get_qentry(i);
> +             LOCK_INIT(&queue->s.lock);
> +             queue->s.index  = i;
> +             queue->s.handle = queue_from_id(i);
> +     }
> +
> +     ODP_DBG("done\n");
> +     ODP_DBG("Queue init global\n");
> +     ODP_DBG("  struct queue_entry_s size %zu\n",
> +             sizeof(struct queue_entry_s));
> +     ODP_DBG("  queue_entry_t size        %zu\n",
> +             sizeof(queue_entry_t));
> +     ODP_DBG("\n");
> +
> +     return 0;
> +}
> +
> +int odp_queue_term_global(void)
> +{
> +     int ret = 0;
> +     int rc = 0;
> +     queue_entry_t *queue;
> +     int i;
> +
> +     for (i = 0; i < ODP_CONFIG_QUEUES; i++) {
> +             queue = &queue_tbl->queue[i];
> +             if (__atomic_load_n(&queue->s.status,
> +                                 __ATOMIC_RELAXED) != QUEUE_STATUS_FREE) {
> +                     ODP_ERR("Not destroyed queue: %s\n", queue->s.name);
> +                     rc = -1;
> +             }
> +     }
> +
> +     ret = odp_shm_free(odp_shm_lookup("odp_queues"));
> +     if (ret < 0) {
> +             ODP_ERR("shm free failed for odp_queues");
> +             rc = -1;
> +     }
> +
> +     return rc;
> +}
> +
> +int odp_queue_capability(odp_queue_capability_t *capa)
> +{
> +     memset(capa, 0, sizeof(odp_queue_capability_t));
> +
> +     /* Reserve some queues for internal use */
> +     capa->max_queues        = ODP_CONFIG_QUEUES - NUM_INTERNAL_QUEUES;
> +     capa->max_ordered_locks = SCHEDULE_ORDERED_LOCKS_PER_QUEUE;
> +     capa->max_sched_groups  = sched_fn->num_grps();
> +     capa->sched_prios       = odp_schedule_num_prio();
> +
> +     return 0;
> +}
> +
> +odp_queue_type_t odp_queue_type(odp_queue_t handle)
> +{
> +     return queue_to_qentry(handle)->s.type;
> +}
> +
> +odp_schedule_sync_t odp_queue_sched_type(odp_queue_t handle)
> +{
> +     return queue_to_qentry(handle)->s.param.sched.sync;
> +}
> +
> +odp_schedule_prio_t odp_queue_sched_prio(odp_queue_t handle)
> +{
> +     return queue_to_qentry(handle)->s.param.sched.prio;
> +}
> +
> +odp_schedule_group_t odp_queue_sched_group(odp_queue_t handle)
> +{
> +     return queue_to_qentry(handle)->s.param.sched.group;
> +}
> +
> +int odp_queue_lock_count(odp_queue_t handle)
> +{
> +     queue_entry_t *queue = queue_to_qentry(handle);
> +
> +     return queue->s.param.sched.sync == ODP_SCHED_SYNC_ORDERED ?
> +             (int)queue->s.param.sched.lock_count : -1;
> +}
> +
> +odp_queue_t odp_queue_create(const char *name, const odp_queue_param_t 
> *param)
> +{
> +     int queue_idx;
> +     odp_queue_t handle = ODP_QUEUE_INVALID;
> +     queue_entry_t *queue;
> +     odp_queue_param_t default_param;
> +
> +     if (param == NULL) {
> +             odp_queue_param_init(&default_param);
> +             param = &default_param;
> +     }
> +
> +     for (queue_idx = 0; queue_idx < ODP_CONFIG_QUEUES; queue_idx++) {
> +             queue = &queue_tbl->queue[queue_idx];
> +
> +             if (queue->s.status != QUEUE_STATUS_FREE)
> +                     continue;
> +
> +             LOCK(&queue->s.lock);
> +             if (queue->s.status == QUEUE_STATUS_FREE) {
> +                     if (queue_init(queue_idx, queue, name, param)) {
> +                             UNLOCK(&queue->s.lock);
> +                             return handle;
> +                     }
> +                     queue->s.status = QUEUE_STATUS_READY;
> +                     handle = queue->s.handle;
> +                     UNLOCK(&queue->s.lock);
> +                     break;
> +             }
> +             UNLOCK(&queue->s.lock);
> +     }
> +     return handle;
> +}
> +
> +int odp_queue_destroy(odp_queue_t handle)
> +{
> +     queue_entry_t *queue;
> +     sched_elem_t *q;
> +
> +     if (handle == ODP_QUEUE_INVALID)
> +             return -1;
> +
> +     queue = queue_to_qentry(handle);
> +     LOCK(&queue->s.lock);
> +     if (queue->s.status != QUEUE_STATUS_READY) {
> +             UNLOCK(&queue->s.lock);
> +             return -1;
> +     }
> +     q = &queue->s.sched_elem;
> +
> +#ifdef CONFIG_QSCHST_LOCK
> +     LOCK(&q->qschlock);
> +#endif
> +     if (_odp_queue_disable_enq(q)) {
> +             /* Producer side not empty */
> +#ifdef CONFIG_QSCHST_LOCK
> +             UNLOCK(&q->qschlock);
> +#endif
> +             UNLOCK(&queue->s.lock);
> +             return -1;
> +     }
> +     /* Enqueue is now disabled */
> +     if (q->cons_read != q->cons_write) {
> +             /* Consumer side is not empty
> +              * Roll back previous change, enable enqueue again.
> +              */
> +             uint32_t size;
> +
> +             size = q->prod_mask + 1;
> +             __atomic_fetch_sub(&q->prod_write, size, __ATOMIC_RELAXED);
> +#ifdef CONFIG_QSCHST_LOCK
> +             UNLOCK(&q->qschlock);
> +#endif
> +             UNLOCK(&queue->s.lock);
> +             return -1;
> +     }
> +#ifdef CONFIG_QSCHST_LOCK
> +     UNLOCK(&q->qschlock);
> +#endif
> +     /* Producer and consumer sides empty, enqueue disabled
> +      * Now wait until schedq state is empty and no outstanding tickets
> +      */
> +     while (__atomic_load_n(&q->qschst.numevts, __ATOMIC_RELAXED) != 0 ||
> +            __atomic_load_n(&q->qschst.cur_ticket, __ATOMIC_RELAXED) !=
> +            __atomic_load_n(&q->qschst.nxt_ticket, __ATOMIC_RELAXED)) {
> +             SEVL();
> +             while (WFE() && LDXR32((uint32_t *)&q->qschst.numevts,
> +                                    __ATOMIC_RELAXED) != 0)
> +                     DOZE();
> +     }
> +
> +     /* Adjust the spread factor for the queues in the schedule group */
> +     if (queue->s.type == ODP_QUEUE_TYPE_SCHED)
> +             sched_group_xcount_dec(queue->s.param.sched.group,
> +                                    queue->s.param.sched.prio);
> +
> +     if (odp_shm_free(queue->s.shm) < 0) {
> +             UNLOCK(&queue->s.lock);
> +             return -1;
> +     }
> +     if (queue->s.param.sched.sync == ODP_SCHED_SYNC_ORDERED) {
> +             if (odp_shm_free(queue->s.rwin_shm) < 0) {
> +                     ODP_ERR("Failed to free reorder window shm\n");
> +                     UNLOCK(&queue->s.lock);
> +                     return -1;
> +             }
> +     }
> +     queue->s.status = QUEUE_STATUS_FREE;
> +     UNLOCK(&queue->s.lock);
> +     return 0;
> +}
> +
> +int odp_queue_context_set(odp_queue_t handle, void *context,
> +                       uint32_t len ODP_UNUSED)
> +{
> +     odp_mb_full();
> +     queue_to_qentry(handle)->s.param.context = context;
> +     odp_mb_full();
> +     return 0;
> +}
> +
> +void *odp_queue_context(odp_queue_t handle)
> +{
> +     return queue_to_qentry(handle)->s.param.context;
> +}
> +
> +odp_queue_t odp_queue_lookup(const char *name)
> +{
> +     uint32_t i;
> +
> +     for (i = 0; i < ODP_CONFIG_QUEUES; i++) {
> +             queue_entry_t *queue = &queue_tbl->queue[i];
> +
> +             if (queue->s.status == QUEUE_STATUS_FREE ||
> +                 queue->s.status == QUEUE_STATUS_DESTROYED)
> +                     continue;
> +
> +             LOCK(&queue->s.lock);
> +             if (strcmp(name, queue->s.name) == 0) {
> +                     /* found it */
> +                     UNLOCK(&queue->s.lock);
> +                     return queue->s.handle;
> +             }
> +             UNLOCK(&queue->s.lock);
> +     }
> +
> +     return ODP_QUEUE_INVALID;
> +}
> +
> +#ifndef CONFIG_QSCHST_LOCK
> +static inline int _odp_queue_enq(sched_elem_t *q,
> +                              odp_buffer_hdr_t *buf_hdr[],
> +                              int num)
> +{
> +     ringidx_t old_read;
> +     ringidx_t old_write;
> +     ringidx_t new_write;
> +     int actual;
> +     uint32_t mask;
> +     odp_buffer_hdr_t **ring;
> +
> +     mask = q->prod_mask;
> +     ring = q->prod_ring;
> +
> +     /* Load producer ring state (read & write index) */
> +     old_write = __atomic_load_n(&q->prod_write, __ATOMIC_RELAXED);
> +     do {
> +             /* Consumer does store-release prod_read, we need
> +              * load-acquire.
> +              */
> +             old_read = __atomic_load_n(&q->prod_read, __ATOMIC_ACQUIRE);
> +
> +             actual = MIN(num, (int)((mask + 1) - (old_write - old_read)));
> +             if (odp_unlikely(actual <= 0))
> +                     return 0;
> +
> +             new_write = old_write + actual;
> +     } while (!__atomic_compare_exchange_n(&q->prod_write,
> +                                     &old_write, /* Updated on failure */
> +                                     new_write,
> +                                     true,
> +                                     __ATOMIC_RELAXED,
> +                                     __ATOMIC_RELAXED));
> +
> +#ifdef ODP_CONFIG_USE_SPLIT_PRODCONS
> +     __builtin_prefetch(&q->cons_write, 0, 0);
> +#endif
> +     /* Store our event(s) in the ring */
> +     do {
> +             ring[old_write & mask] = *buf_hdr++;
> +     } while (++old_write != new_write);
> +     old_write -= actual;
> +
> +#ifdef ODP_CONFIG_USE_SPLIT_PRODCONS
> +     __builtin_prefetch(&q->node, 1, 0);
> +#endif
> +     /* Wait for our turn to signal consumers */
> +     if (odp_unlikely(__atomic_load_n(&q->cons_write,
> +                                      __ATOMIC_RELAXED) != old_write)) {
> +             SEVL();
> +             while (WFE() && LDXR32(&q->cons_write,
> +                                    __ATOMIC_RELAXED) != old_write)
> +                     DOZE();
> +     }
> +
> +     /* Signal consumers that events are available (release events)
> +      * Enable other producers to continue
> +      */
> +     /* Wait for writes (to ring slots) to complete */
> +     atomic_store_release(&q->cons_write, new_write, /*readonly=*/false);
> +
> +     return actual;
> +}
> +
> +#else
> +
> +static inline int _odp_queue_enq_sp(sched_elem_t *q,
> +                                 odp_buffer_hdr_t *buf_hdr[],
> +                                 int num)
> +{
> +     ringidx_t old_read;
> +     ringidx_t old_write;
> +     ringidx_t new_write;
> +     int actual;
> +     uint32_t mask;
> +     odp_buffer_hdr_t **ring;
> +
> +     mask = q->prod_mask;
> +     ring = q->prod_ring;
> +
> +     /* Load producer ring state (read & write index) */
> +     old_write = q->prod_write;
> +     /* Consumer does store-release prod_read, we need load-acquire */
> +     old_read = __atomic_load_n(&q->prod_read, __ATOMIC_ACQUIRE);
> +     actual = MIN(num, (int)((mask + 1) - (old_write - old_read)));
> +     if (odp_unlikely(actual <= 0))
> +             return 0;
> +
> +     new_write = old_write + actual;
> +     q->prod_write = new_write;
> +
> +     /* Store our event(s) in the ring */
> +     do {
> +             ring[old_write & mask] = *buf_hdr++;
> +     } while (++old_write != new_write);
> +     old_write -= actual;
> +
> +#ifdef ODP_CONFIG_USE_SPLIT_PRODCONS
> +     __builtin_prefetch(&q->node, 1, 0);
> +#endif
> +
> +     /* Signal consumers that events are available (release events)
> +      * Enable other producers to continue
> +      */
> +#ifdef CONFIG_QSCHST_LOCK
> +     q->cons_write = new_write;
> +#else
> +     atomic_store_release(&q->cons_write, new_write, /*readonly=*/false);
> +#endif
> +
> +     return actual;
> +}
> +#endif
> +
> +int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int 
> num)
> +{
> +     int actual;
> +     int i;
> +     sched_scalable_thread_state_t *ts;
> +     reorder_context_t *first;
> +     reorder_context_t *cur;
> +     bitset_t next_idx;
> +
> +#ifdef CONFIG_QSCHST_LOCK
> +     LOCK(&queue->s.sched_elem.qschlock);
> +#endif
> +     ts = sched_ts;
> +     if (ts && odp_unlikely(ts->out_of_order)) {
> +             first = ts->rctx;
> +             ODP_ASSERT(ts->rctx != NULL);
> +             cur = &first[(int)first->cur_idx - (int)first->idx];
> +             for (i = 0; i < num; i++) {
> +                     if (odp_unlikely(cur->numevts == RC_EVT_SIZE)) {
> +                             /* No more space in current reorder context
> +                              * Try to allocate another.
> +                              */
> +                             if (odp_unlikely(
> +                                     bitset_is_null(ts->priv_rvec_free))) {
> +                                     ts->priv_rvec_free =
> +                                             atom_bitset_xchg(
> +                                                     &ts->rvec_free,
> +                                                     0,
> +                                                     __ATOMIC_RELAXED);
> +                                     if (odp_unlikely(bitset_is_null(
> +                                                     ts->priv_rvec_free)))
> +                                             /* Out of reorder contexts.
> +                                              * Return the number of events
> +                                              * stored so far.
> +                                              */
> +#ifdef CONFIG_QSCHST_LOCK
> +                                             UNLOCK(&queue->s.sched_elem.
> +                                                    qschlock);
> +#endif
> +                                             return i;
> +                             }
> +                             next_idx = bitset_ffs(ts->priv_rvec_free) - 1;
> +                             ts->priv_rvec_free =
> +                                     bitset_clr(ts->priv_rvec_free,
> +                                                next_idx);
> +                             /* Link current to next (for eventual
> +                              * retiring)
> +                              */
> +                             cur->next_idx = next_idx;
> +                             /* Link first to next (for next call to
> +                             * queue_enq_multi())
> +                             */
> +                             first->cur_idx = next_idx;
> +                             /* Update current to next */
> +                             cur = &ts->rvec[next_idx];
> +                             rctx_init(cur, next_idx, NULL, 0);
> +                             /* The last rctx (so far) */
> +                             cur->next_idx = first->idx;
> +                     }
> +                     cur->events[cur->numevts] = buf_hdr[i];
> +                     cur->destq[cur->numevts] = queue;
> +                     cur->numevts++;
> +             }
> +             /* All events stored. */
> +#ifdef CONFIG_QSCHST_LOCK
> +             UNLOCK(&queue->s.sched_elem.qschlock);
> +#endif
> +             return num;
> +     }
> +
> +#ifdef CONFIG_QSCHST_LOCK
> +     actual = _odp_queue_enq_sp(&queue->s.sched_elem, buf_hdr, num);
> +#else
> +     actual = _odp_queue_enq(&queue->s.sched_elem, buf_hdr, num);
> +#endif
> +
> +     if (odp_likely(queue->s.sched_elem.schedq != NULL && actual != 0)) {
> +             /* Perform scheduler related updates. */
> +#ifdef CONFIG_QSCHST_LOCK
> +             sched_update_enq_sp(&queue->s.sched_elem, actual);
> +#else
> +             sched_update_enq(&queue->s.sched_elem, actual);
> +#endif
> +     }
> +
> +#ifdef CONFIG_QSCHST_LOCK
> +     UNLOCK(&queue->s.sched_elem.qschlock);
> +#endif
> +     return actual;
> +}
> +
> +int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
> +{
> +     return odp_likely(
> +             queue_enq_multi(queue, &buf_hdr, 1) == 1) ? 0 : -1;
> +}
> +
> +int odp_queue_enq_multi(odp_queue_t handle, const odp_event_t ev[], int num)
> +{
> +     odp_buffer_hdr_t *buf_hdr[QUEUE_MULTI_MAX];
> +     queue_entry_t *queue;
> +     int i;
> +
> +     if (num > QUEUE_MULTI_MAX)
> +             num = QUEUE_MULTI_MAX;
> +
> +     queue = queue_to_qentry(handle);
> +
> +     for (i = 0; i < num; i++)
> +             buf_hdr[i] = (odp_buffer_hdr_t *)(void *)ev[i];
> +
> +     return queue->s.enqueue_multi(queue, buf_hdr, num);
> +}
> +
> +int odp_queue_enq(odp_queue_t handle, odp_event_t ev)
> +{
> +     queue_entry_t *queue;
> +
> +     queue = queue_to_qentry(handle);
> +     return queue->s.enqueue(queue, (odp_buffer_hdr_t *)(void *)ev);
> +}
> +
> +/* Single-consumer dequeue. */
> +int _odp_queue_deq_sc(sched_elem_t *q, odp_event_t *evp, int num)
> +{
> +     int actual;
> +     int i;
> +     ringidx_t old_read;
> +     ringidx_t old_write;
> +     ringidx_t new_read;
> +     uint32_t mask;
> +     odp_event_t *ring;
> +
> +     /* Load consumer ring state (read & write index). */
> +     old_read  = q->cons_read;
> +     /* Producer does store-release cons_write, we need load-acquire */
> +     old_write = __atomic_load_n(&q->cons_write, __ATOMIC_ACQUIRE);
> +     actual    = MIN(num, (int)(old_write - old_read));
> +
> +     if (odp_unlikely(actual <= 0))
> +             return 0;
> +
> +#ifdef ODP_CONFIG_USE_SPLIT_PRODCONS
> +     __builtin_prefetch(&q->node, 1, 0);
> +#endif
> +     new_read = old_read + actual;
> +     q->cons_read = new_read;
> +
> +     mask = q->cons_mask;
> +     ring = (odp_event_t *)q->cons_ring;
> +     do {
> +             *evp++ = ring[old_read & mask];
> +     } while (++old_read != new_read);
> +
> +     /* Signal producers that empty slots are available
> +      * (release ring slots). Enable other consumers to continue.
> +      */
> +#ifdef CONFIG_QSCHST_LOCK
> +     q->prod_read = new_read;
> +#else
> +     /* Wait for loads (from ring slots) to complete. */
> +     atomic_store_release(&q->prod_read, new_read, /*readonly=*/true);
> +#endif
> +     /* Prefetch events after we have release ring buffer slots */
> +     for (i = 0; i < actual; i++) {
> +             odp_event_t evt = *--evp;
> +
> +             __builtin_prefetch(
> +                     odp_buffer_addr(odp_buffer_from_event(evt)), 0, 0);
> +     }
> +     return actual;
> +}
> +
> +inline int _odp_queue_deq(sched_elem_t *q, odp_event_t *evp, int num)
> +{
> +     int actual;
> +     ringidx_t old_read;
> +     ringidx_t old_write;
> +     ringidx_t new_read;
> +     uint32_t mask;
> +     odp_buffer_hdr_t **ring;
> +     odp_buffer_hdr_t **p_buf_hdr;
> +
> +     mask = q->cons_mask;
> +     ring = q->cons_ring;
> +
> +     /* Load consumer ring state (read & write index) */
> +     old_read = __atomic_load_n(&q->cons_read, __ATOMIC_RELAXED);
> +     do {
> +             /* Need __atomic_load to avoid compiler reordering
> +              * Producer does store-release cons_write, we need
> +              * load-acquire.
> +              */
> +             old_write = __atomic_load_n(&q->cons_write, __ATOMIC_ACQUIRE);
> +             /* Prefetch ring buffer array */
> +             __builtin_prefetch(&q->cons_ring[old_read & mask], 0, 0);
> +
> +             actual = MIN(num, (int)(old_write - old_read));
> +             if (odp_unlikely(actual <= 0))
> +                     return 0;
> +
> +             /* Attempt to free ring slot(s) */
> +             new_read = old_read + actual;
> +     } while (!__atomic_compare_exchange_n(&q->cons_read,
> +                                     &old_read, /* Updated on failure */
> +                                     new_read,
> +                                     true,
> +                                     __ATOMIC_RELAXED,
> +                                     __ATOMIC_RELAXED));
> +#ifdef ODP_CONFIG_USE_SPLIT_PRODCONS
> +     __builtin_prefetch(&q->prod_read, 0, 0);
> +#endif
> +     p_buf_hdr = (odp_buffer_hdr_t **)evp;
> +     do {
> +             odp_buffer_hdr_t *p_buf;
> +
> +             p_buf = ring[old_read & mask];
> +             __builtin_prefetch(
> +                     odp_buffer_addr(
> +                             odp_buffer_from_event((odp_event_t)p_buf)),
> +                                                   0, 0);
> +             *p_buf_hdr++ = p_buf;
> +     } while (++old_read != new_read);
> +     old_read -= actual;
> +
> +#ifdef ODP_CONFIG_USE_SPLIT_PRODCONS
> +     __builtin_prefetch(&q->node, 1, 0);
> +#endif
> +     /* Wait for our turn to signal producers */
> +     if (odp_unlikely(__atomic_load_n(&q->prod_read, __ATOMIC_RELAXED) !=
> +             old_read)) {
> +             SEVL();
> +             while (WFE() && LDXR32(&q->prod_read,
> +                                    __ATOMIC_RELAXED) != old_read)
> +                     DOZE();
> +     }
> +
> +     /* Signal producers that empty slots are available
> +      * (release ring slots)
> +      * Enable other consumers to continue
> +      */
> +     /* Wait for loads (from ring slots) to complete */
> +     atomic_store_release(&q->prod_read, new_read, /*readonly=*/true);
> +
> +     return actual;
> +}
> +
> +int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int 
> num)
> +{
> +     sched_elem_t *q;
> +
> +     q = &queue->s.sched_elem;
> +     return _odp_queue_deq(q, (odp_event_t *)buf_hdr, num);
> +}
> +
> +odp_buffer_hdr_t *queue_deq(queue_entry_t *queue)
> +{
> +     sched_elem_t *q;
> +     odp_buffer_hdr_t *buf_hdr;
> +
> +     q = &queue->s.sched_elem;
> +     if (_odp_queue_deq(q, (odp_event_t *)&buf_hdr, 1) == 1)
> +             return buf_hdr;
> +     else
> +             return BUFFER_HDR_INVALID;
> +}
> +
> +int odp_queue_deq_multi(odp_queue_t handle, odp_event_t events[], int num)
> +{
> +     queue_entry_t *queue;
> +
> +     queue = queue_to_qentry(handle);
> +     return queue->s.dequeue_multi(queue, (odp_buffer_hdr_t **)events, num);
> +}
> +
> +odp_event_t odp_queue_deq(odp_queue_t handle)
> +{
> +     queue_entry_t *queue;
> +
> +     queue = queue_to_qentry(handle);
> +     return (odp_event_t)queue->s.dequeue(queue);
> +}
> +
> +void odp_queue_param_init(odp_queue_param_t *params)
> +{
> +     memset(params, 0, sizeof(odp_queue_param_t));
> +     params->type = ODP_QUEUE_TYPE_PLAIN;
> +     params->enq_mode = ODP_QUEUE_OP_MT;
> +     params->deq_mode = ODP_QUEUE_OP_MT;
> +     params->sched.prio = ODP_SCHED_PRIO_DEFAULT;
> +     params->sched.sync = ODP_SCHED_SYNC_PARALLEL;
> +     params->sched.group = ODP_SCHED_GROUP_ALL;
> +}
> +
> +int odp_queue_info(odp_queue_t handle, odp_queue_info_t *info)
> +{
> +     uint32_t queue_id;
> +     queue_entry_t *queue;
> +     int status;
> +
> +     if (odp_unlikely(info == NULL)) {
> +             ODP_ERR("Unable to store info, NULL ptr given\n");
> +             return -1;
> +     }
> +
> +     queue_id = queue_to_id(handle);
> +
> +     if (odp_unlikely(queue_id >= ODP_CONFIG_QUEUES)) {
> +             ODP_ERR("Invalid queue handle:%" PRIu64 "\n",
> +                     odp_queue_to_u64(handle));
> +             return -1;
> +     }
> +
> +     queue = get_qentry(queue_id);
> +
> +     LOCK(&queue->s.lock);
> +     status = queue->s.status;
> +
> +     if (odp_unlikely(status == QUEUE_STATUS_FREE ||
> +                      status == QUEUE_STATUS_DESTROYED)) {
> +             UNLOCK(&queue->s.lock);
> +             ODP_ERR("Invalid queue status:%d\n", status);
> +             return -1;
> +     }
> +
> +     info->name = queue->s.name;
> +     info->param = queue->s.param;
> +
> +     UNLOCK(&queue->s.lock);
> +
> +     return 0;
> +}
> +
> +uint64_t odp_queue_to_u64(odp_queue_t hdl)
> +{
> +     return _odp_pri(hdl);
> +}
> diff --git a/platform/linux-generic/odp_schedule_if.c 
> b/platform/linux-generic/odp_schedule_if.c
> index a9ede98d..bf92333e 100644
> --- a/platform/linux-generic/odp_schedule_if.c
> +++ b/platform/linux-generic/odp_schedule_if.c
> @@ -6,24 +6,38 @@
>  
>  #include <odp_schedule_if.h>
>  
> -extern const schedule_fn_t schedule_sp_fn;
> -extern const schedule_api_t schedule_sp_api;
> +#if defined(ODP_SCHEDULE_SCALABLE)
>  
> -extern const schedule_fn_t schedule_default_fn;
> -extern const schedule_api_t schedule_default_api;
> +extern const schedule_fn_t  schedule_scalable_fn;
> +extern const schedule_api_t schedule_scalable_api;
>  
> -extern const schedule_fn_t schedule_iquery_fn;
> -extern const schedule_api_t schedule_iquery_api;
> +const schedule_fn_t  *sched_fn  = &schedule_scalable_fn;
> +const schedule_api_t *sched_api = &schedule_scalable_api;
>  
> -#ifdef ODP_SCHEDULE_SP
> -const schedule_fn_t *sched_fn   = &schedule_sp_fn;
> -const schedule_api_t *sched_api = &schedule_sp_api;
>  #elif defined(ODP_SCHEDULE_IQUERY)
> -const schedule_fn_t *sched_fn   = &schedule_iquery_fn;
> +
> +extern const schedule_fn_t  schedule_iquery_fn;
> +extern const schedule_api_t schedule_iquery_api;
> +
> +const schedule_fn_t  *sched_fn  = &schedule_iquery_fn;
>  const schedule_api_t *sched_api = &schedule_iquery_api;
> -#else
> +
> +#elif defined(ODP_SCHEDULE_SP)
> +
> +extern const schedule_fn_t  schedule_sp_fn;
> +extern const schedule_api_t schedule_sp_api;
> +
> +const schedule_fn_t  *sched_fn  = &schedule_sp_fn;
> +const schedule_api_t *sched_api = &schedule_sp_api;
> +
> +#else /* default scheduler */
> +
> +extern const schedule_fn_t  schedule_default_fn;
> +extern const schedule_api_t schedule_default_api;
> +
>  const schedule_fn_t  *sched_fn  = &schedule_default_fn;
>  const schedule_api_t *sched_api = &schedule_default_api;
> +
>  #endif
>  
>  uint64_t odp_schedule_wait_time(uint64_t ns)
> diff --git a/platform/linux-generic/odp_schedule_scalable.c 
> b/platform/linux-generic/odp_schedule_scalable.c
> new file mode 100644
> index 00000000..f9ec656c
> --- /dev/null
> +++ b/platform/linux-generic/odp_schedule_scalable.c
> @@ -0,0 +1,1922 @@
> +/* Copyright (c) 2017, ARM Limited
> + * All rights reserved.
> + *
> + * SPDX-License-Identifier:     BSD-3-Clause
> + */
> +
> +#include <odp/api/align.h>
> +#include <odp/api/atomic.h>
> +#include <odp/api/cpu.h>
> +#include <odp/api/hints.h>
> +#include <odp/api/schedule.h>
> +#include <odp/api/shared_memory.h>
> +#include <odp/api/sync.h>
> +#include <odp/api/thread.h>
> +#include <odp/api/thrmask.h>
> +#include <odp/api/time.h>
> +
> +#include <odp_internal.h>
> +#include <odp_config_internal.h>
> +#include <odp_debug_internal.h>
> +
> +#include <odp_align_internal.h>
> +#include <odp_buffer_inlines.h>
> +#include <odp_llqueue.h>
> +#include <odp_queue_internal.h>
> +#include <odp_schedule_if.h>
> +#include <odp_llsc.h>
> +#include <odp_bitset.h>
> +#include <odp_atomic16.h>
> +#include <odp_packet_io_internal.h>
> +
> +#include <limits.h>
> +#include <stdbool.h>
> +#include <string.h>
> +
> +#include <odp/api/plat/ticketlock_inlines.h>
> +#define LOCK(a) _odp_ticketlock_lock((a))
> +#define UNLOCK(a) _odp_ticketlock_unlock((a))
> +
> +#define TAG_EMPTY 0U
> +#define TAG_USED (1U << 15)
> +#define TAG_BUSY (1U << 31)
> +#define PKTIO_QUEUE_2_TAG(p, q) ((p) << 16 | (q) | TAG_USED)
> +#define TAG_2_PKTIO(t) (((t) >> 16) & 0x7FFF)
> +#define TAG_2_QUEUE(t) ((t) & 0x7FFF)
> +#define TAG_IS_READY(t) (((t) & (TAG_USED | TAG_BUSY)) == TAG_USED)
> +#define PKTIN_MAX (ODP_CONFIG_PKTIO_ENTRIES * PKTIO_MAX_QUEUES)
> +#define MAXTHREADS ATOM_BITSET_SIZE
> +
> +static uint32_t pktin_num;
> +static uint32_t pktin_hi;
> +static uint16_t pktin_count[ODP_CONFIG_PKTIO_ENTRIES];
> +static uint32_t pktin_tags[PKTIN_MAX] ODP_ALIGNED_CACHE;
> +
> +#define __atomic_fetch_max(var, v, mo) do { \
> +             /* Evalulate 'v' once */ \
> +             __typeof__(v) tmp_v = (v); \
> +             __typeof__(*var) old_var = \
> +                     __atomic_load_n((var), __ATOMIC_RELAXED); \
> +             while (tmp_v > old_var) { \
> +                     /* Attempt to store 'v' in '*var' */ \
> +                     if (__atomic_compare_exchange_n((var), &old_var, \
> +                                                     tmp_v, true, (mo), \
> +                                                     (mo))) \
> +                             break; \
> +                     /* Else failure, try again (with updated value of \
> +                      * old_var). \
> +                      */ \
> +             } \
> +             /* v <= old_var, nothing to do */ \
> +     } while (0)
> +
> +ODP_STATIC_ASSERT(ODP_SCHED_PRIO_LOWEST == (ODP_SCHED_PRIO_NUM - 1),
> +               "lowest_prio_does_not_match_with_num_prios");
> +
> +ODP_STATIC_ASSERT((ODP_SCHED_PRIO_NORMAL > 0) &&
> +               (ODP_SCHED_PRIO_NORMAL < (ODP_SCHED_PRIO_NUM - 1)),
> +               "normal_prio_is_not_between_highest_and_lowest");
> +
> +ODP_STATIC_ASSERT(CHECK_IS_POWER2(ODP_CONFIG_QUEUES),
> +               "Number_of_queues_is_not_power_of_two");
> +
> +/*
> + * Scheduler group related variables.
> + */
> +/* Currently used scheduler groups */
> +static sched_group_mask_t sg_free;
> +static sched_group_t *sg_vec[MAX_SCHED_GROUP];
> +/* Group lock for MT-safe APIs */
> +odp_spinlock_t sched_grp_lock;
> +
> +#define SCHED_GROUP_JOIN 0
> +#define SCHED_GROUP_LEAVE 1
> +
> +/*
> + * Per thread state
> + */
> +static sched_scalable_thread_state_t thread_state[MAXTHREADS];
> +__thread sched_scalable_thread_state_t *sched_ts;
> +
> +/*
> + * Forward declarations.
> + */
> +static int thread_state_init(int tidx)
> +{
> +     sched_scalable_thread_state_t *ts;
> +     uint32_t i;
> +
> +     ODP_ASSERT(tidx < MAXTHREADS);
> +     ts = &thread_state[tidx];
> +     ts->atomq = NULL;
> +     ts->rctx = NULL;
> +     ts->pause = false;
> +     ts->out_of_order = false;
> +     ts->tidx = tidx;
> +     ts->dequeued = 0;
> +     ts->pktin_next = 0;
> +     ts->pktin_poll_cnts = 0;
> +     ts->ticket = TICKET_INVALID;
> +     ts->priv_rvec_free = 0;
> +     ts->rvec_free = (1ULL << TS_RVEC_SIZE) - 1;
> +     ts->num_schedq = 0;
> +     ts->sg_sem = 1; /* Start with sched group semaphore changed */
> +     memset(ts->sg_actual, 0, sizeof(ts->sg_actual));
> +     for (i = 0; i < TS_RVEC_SIZE; i++) {
> +             ts->rvec[i].rvec_free = &ts->rvec_free;
> +             ts->rvec[i].idx = i;
> +     }
> +     sched_ts = ts;
> +
> +     return 0;
> +}
> +
> +static void insert_schedq_in_list(sched_scalable_thread_state_t *ts,
> +                               sched_queue_t *schedq)
> +{
> +     /* Find slot for schedq */
> +     for (uint32_t i = 0; i < ts->num_schedq; i++) {
> +             /* Lower value is higher priority and closer to start of list */
> +             if (schedq->prio <= ts->schedq_list[i]->prio) {
> +                     /* This is the slot! */
> +                     sched_queue_t *tmp;
> +
> +                     tmp = ts->schedq_list[i];
> +                     ts->schedq_list[i] = schedq;
> +                     schedq = tmp;
> +                     /* Continue the insertion procedure with the
> +                      * new schedq.
> +                      */
> +             }
> +     }
> +     if (ts->num_schedq == SCHEDQ_PER_THREAD)
> +             ODP_ABORT("Too many schedqs\n");
> +     ts->schedq_list[ts->num_schedq++] = schedq;
> +}
> +
> +static void remove_schedq_from_list(sched_scalable_thread_state_t *ts,
> +                                 sched_queue_t *schedq)
> +{
> +     /* Find schedq */
> +     for (uint32_t i = 0; i < ts->num_schedq; i++)
> +             if (ts->schedq_list[i] == schedq) {
> +                     /* Move remaining schedqs */
> +                     for (uint32_t j = i + 1; j < ts->num_schedq; j++)
> +                             ts->schedq_list[j - 1] = ts->schedq_list[j];
> +                     ts->num_schedq--;
> +                     return;
> +             }
> +     ODP_ABORT("Cannot find schedq\n");
> +}
> +
> +/*******************************************************************************
> + * Scheduler queues
> + 
> ******************************************************************************/
> +#ifndef odp_container_of
> +#define odp_container_of(pointer, type, member) \
> +     ((type *)(void *)(((char *)pointer) - offsetof(type, member)))
> +#endif
> +
> +static inline void schedq_init(sched_queue_t *schedq, uint32_t prio)
> +{
> +     llqueue_init(&schedq->llq);
> +     schedq->prio = prio;
> +}
> +
> +static inline sched_elem_t *schedq_peek(sched_queue_t *schedq)
> +{
> +     struct llnode *ptr;
> +
> +     ptr = llq_head(&schedq->llq);
> +     return odp_container_of(ptr, sched_elem_t, node);
> +}
> +
> +static inline odp_bool_t schedq_cond_pop(sched_queue_t *schedq,
> +                                      sched_elem_t *elem)
> +{
> +     return llq_dequeue_cond(&schedq->llq, &elem->node);
> +}
> +
> +static inline void schedq_push(sched_queue_t *schedq, sched_elem_t *elem)
> +{
> +     llq_enqueue(&schedq->llq, &elem->node);
> +}
> +
> +static inline odp_bool_t schedq_cond_rotate(sched_queue_t *schedq,
> +                                         sched_elem_t *elem)
> +{
> +     return llq_cond_rotate(&schedq->llq, &elem->node);
> +}
> +
> +static inline bool schedq_elem_on_queue(sched_elem_t *elem)
> +{
> +     return elem->node.next != NULL;
> +}
> +
> +/*******************************************************************************
> + * Shared metadata btwn scheduler and queue
> + 
> ******************************************************************************/
> +
> +void sched_update_enq(sched_elem_t *q, uint32_t actual)
> +{
> +     qschedstate_t oss, nss;
> +     uint32_t ticket;
> +
> +     oss = q->qschst;
> +     /* Update event counter, optionally taking a ticket. */
> +     do {
> +             ticket = TICKET_INVALID;
> +             nss = oss;
> +             nss.numevts += actual;
> +             if (odp_unlikely(oss.numevts <= 0 && nss.numevts > 0))
> +                     /* E -> NE transition */
> +                     if (q->qschst_type != ODP_SCHED_SYNC_ATOMIC ||
> +                         oss.cur_ticket == oss.nxt_ticket)
> +                             /* Parallel or ordered queues: always take
> +                              * ticket.
> +                              * Atomic queue: only take ticket if one is
> +                              * immediately available.
> +                              * Otherwise ticket already taken => queue
> +                              * processed by some thread.
> +                              */
> +                             ticket = nss.nxt_ticket++;
> +             /* Else queue already was non-empty. */
> +     /* Attempt to update numevts counter and optionally take ticket. */
> +     } while (!__atomic_compare_exchange(
> +                    &q->qschst, &oss, &nss,
> +                    true, __ATOMIC_RELAXED, __ATOMIC_RELAXED));
> +
> +     if (odp_unlikely(ticket != TICKET_INVALID)) {
> +             /* Wait for our turn to update schedq. */
> +             if (odp_unlikely(
> +                         __atomic_load_n(&q->qschst.cur_ticket,
> +                                         __ATOMIC_ACQUIRE) != ticket)) {
> +                     SEVL();
> +                     while (WFE() &&
> +                            LDXR8(&q->qschst.cur_ticket,
> +                                  __ATOMIC_ACQUIRE) != ticket)
> +                             DOZE();
> +             }
> +             /* Enqueue at end of scheduler queue */
> +             /* We are here because of empty-to-non-empty transition
> +              * This means queue must be pushed to schedq if possible
> +              * but we can't do that if it already is on the schedq
> +              */
> +             if (odp_likely(!schedq_elem_on_queue(q) &&
> +                            q->pop_deficit == 0)) {
> +                     /* Queue not already on schedq and no pop deficit means
> +                      * we can push queue to schedq */
> +                     schedq_push(q->schedq, q);
> +             } else {
> +                     /* Missed push => cancels one missed pop */
> +                     q->pop_deficit--;
> +             }
> +             atomic_store_release(&q->qschst.cur_ticket, ticket + 1,
> +                                  /*readonly=*/false);
> +     }
> +     /* Else queue was not empty or atomic queue already busy. */
> +}
> +
> +void sched_update_enq_sp(sched_elem_t *q, uint32_t actual)
> +{
> +     qschedstate_t oss, nss;
> +     uint32_t ticket;
> +
> +     oss = q->qschst;
> +     /* Update event counter, optionally taking a ticket. */
> +     ticket = TICKET_INVALID;
> +     nss = oss;
> +     nss.numevts += actual;
> +     if (odp_unlikely(oss.numevts <= 0 && nss.numevts > 0)) {
> +             /* E -> NE transition */
> +             if (q->qschst_type != ODP_SCHED_SYNC_ATOMIC ||
> +                 oss.cur_ticket == oss.nxt_ticket) {
> +                     /* Parallel or ordered queues: always take
> +                      * ticket.
> +                      * Atomic queue: only take ticket if one is
> +                      * immediately available. Otherwise ticket already
> +                      * taken => queue owned/processed by some thread
> +                      */
> +                     ticket = nss.nxt_ticket++;
> +             }
> +     }
> +     /* Else queue already was non-empty. */
> +     /* Attempt to update numevts counter and optionally take ticket. */
> +     q->qschst = nss;
> +
> +     if (odp_unlikely(ticket != TICKET_INVALID)) {
> +             /* Enqueue at end of scheduler queue */
> +             /* We are here because of empty-to-non-empty transition
> +              * This means queue must be pushed to schedq if possible
> +              * but we can't do that if it already is on the schedq
> +              */
> +             if (odp_likely(!schedq_elem_on_queue(q) &&
> +                            q->pop_deficit == 0)) {
> +                     /* Queue not already on schedq and no pop deficit means
> +                      * we can push queue to schedq */
> +                     schedq_push(q->schedq, q);
> +             } else {
> +                     /* Missed push => cancels one missed pop */
> +                     q->pop_deficit--;
> +             }
> +             q->qschst.cur_ticket = ticket + 1;
> +     }
> +     /* Else queue was not empty or atomic queue already busy. */
> +}
> +
> +#ifndef CONFIG_QSCHST_LOCK
> +/* The scheduler is the only entity that performs the dequeue from a queue. 
> */
> +static void
> +sched_update_deq(sched_elem_t *q,
> +              uint32_t actual,
> +              bool atomic) __attribute__((always_inline));
> +static inline void
> +sched_update_deq(sched_elem_t *q,
> +              uint32_t actual, bool atomic)
> +{
> +     qschedstate_t oss, nss;
> +     uint32_t ticket;
> +
> +     if (atomic) {
> +             bool pushed = false;
> +
> +             /* We own this atomic queue, only we can dequeue from it and
> +              * thus decrease numevts. Other threads may enqueue and thus
> +              * increase numevts.
> +              * This means that numevts can't unexpectedly become 0 and
> +              * invalidate a push operation already performed
> +              */
> +             oss = q->qschst;
> +             do {
> +                     ODP_ASSERT(oss.cur_ticket == sched_ts->ticket);
> +                     nss = oss;
> +                     nss.numevts -= actual;
> +                     if (nss.numevts > 0 && !pushed) {
> +                             schedq_push(q->schedq, q);
> +                             pushed = true;
> +                     }
> +                     /* Attempt to release ticket expecting our view of
> +                      * numevts to be correct
> +                      * Unfortunately nxt_ticket will also be included in
> +                      * the CAS operation
> +                      */
> +                     nss.cur_ticket = sched_ts->ticket + 1;
> +             } while (odp_unlikely(!__atomic_compare_exchange(
> +                                                       &q->qschst,
> +                                                       &oss, &nss,
> +                                                       true,
> +                                                       __ATOMIC_RELEASE,
> +                                                       __ATOMIC_RELAXED)));
> +             return;
> +     }
> +
> +     oss = q->qschst;
> +     do {
> +             ticket = TICKET_INVALID;
> +             nss = oss;
> +             nss.numevts -= actual;
> +             nss.wrr_budget -= actual;
> +             if ((oss.numevts > 0 && nss.numevts <= 0) ||
> +                 oss.wrr_budget <= actual) {
> +                     /* If we have emptied parallel/ordered queue or
> +                      * exchausted its WRR budget, we need a ticket
> +                      * for a later pop.
> +                      */
> +                     ticket = nss.nxt_ticket++;
> +                     /* Reset wrr_budget as we might also push the
> +                      * queue to the schedq.
> +                      */
> +                     nss.wrr_budget = CONFIG_WRR_WEIGHT;
> +             }
> +     /* Attempt to update numevts and optionally take ticket. */
> +     } while (!__atomic_compare_exchange(
> +                    &q->qschst, &oss, &nss,
> +                    true, __ATOMIC_RELAXED, __ATOMIC_RELAXED));
> +
> +     if (odp_unlikely(ticket != TICKET_INVALID)) {
> +             ODP_ASSERT(q->qschst_type != ODP_SCHED_SYNC_ATOMIC);
> +             /* Wait for our turn to update schedq. */
> +             if (odp_unlikely(
> +                         __atomic_load_n(&q->qschst.cur_ticket,
> +                                         __ATOMIC_ACQUIRE) != ticket)) {
> +                     SEVL();
> +                     while (WFE() &&
> +                            LDXR8(&q->qschst.cur_ticket,
> +                                  __ATOMIC_ACQUIRE) != ticket)
> +                             DOZE();
> +             }
> +             /* We are here because of non-empty-to-empty transition or
> +              * WRR budget exhausted
> +              * This means the queue must be popped from the schedq, now or
> +              * later
> +              * If there was no NE->E transition but instead the WRR budget
> +              * was exhausted, the queue needs to be moved (popped and
> +              * pushed) to the tail of the schedq
> +              */
> +             if (oss.numevts > 0 && nss.numevts <= 0) {
> +                     /* NE->E transition, need to pop */
> +                     if (!schedq_elem_on_queue(q) ||
> +                         !schedq_cond_pop(q->schedq, q)) {
> +                             /* Queue not at head, failed to dequeue
> +                              * Missed a pop.
> +                              */
> +                             q->pop_deficit++;
> +                     }
> +             } else {
> +                     /* WRR budget exhausted
> +                      * Need to move queue to tail of schedq if possible
> +                      */
> +                     if (odp_likely(schedq_elem_on_queue(q))) {
> +                             /* Queue is on schedq, try to move it to
> +                              * the tail
> +                              */
> +                             (void)schedq_cond_rotate(q->schedq, q);
> +                     }
> +                     /* Else queue not on schedq or not at head of schedq
> +                      * No pop => no push
> +                      */
> +             }
> +             atomic_store_release(&q->qschst.cur_ticket, ticket + 1,
> +                                  /*readonly=*/false);
> +     }
> +}
> +#endif
> +
> +#ifdef CONFIG_QSCHST_LOCK
> +static void
> +sched_update_deq_sc(sched_elem_t *q,
> +                 uint32_t actual,
> +                 bool atomic) __attribute__((always_inline));
> +static inline void
> +sched_update_deq_sc(sched_elem_t *q,
> +                 uint32_t actual, bool atomic)
> +{
> +     qschedstate_t oss, nss;
> +     uint32_t ticket;
> +
> +     if (atomic) {
> +             ODP_ASSERT(q->qschst.cur_ticket == sched_ts->ticket);
> +             ODP_ASSERT(q->qschst.cur_ticket != q->qschst.nxt_ticket);
> +             q->qschst.numevts -= actual;
> +             q->qschst.cur_ticket = sched_ts->ticket + 1;
> +             if (q->qschst.numevts > 0)
> +                     schedq_push(q->schedq, q);
> +             return;
> +     }
> +
> +     oss = q->qschst;
> +     ticket = TICKET_INVALID;
> +     nss = oss;
> +     nss.numevts -= actual;
> +     nss.wrr_budget -= actual;
> +     if ((oss.numevts > 0 && nss.numevts <= 0) || oss.wrr_budget <= actual) {
> +             /* If we emptied the queue or
> +              * if we have served the maximum number of events
> +              * then we need a ticket for a later pop.
> +              */
> +             ticket = nss.nxt_ticket++;
> +             /* Also reset wrr_budget as we might also push the
> +              * queue to the schedq.
> +              */
> +             nss.wrr_budget = CONFIG_WRR_WEIGHT;
> +     }
> +     q->qschst = nss;
> +
> +     if (ticket != TICKET_INVALID) {
> +             if (oss.numevts > 0 && nss.numevts <= 0) {
> +                     /* NE->E transition, need to pop */
> +                     if (!schedq_elem_on_queue(q) ||
> +                         !schedq_cond_pop(q->schedq, q)) {
> +                             /* Queue not at head, failed to dequeue.
> +                              * Missed a pop.
> +                              */
> +                             q->pop_deficit++;
> +                     }
> +             } else {
> +                     /* WRR budget exhausted
> +                      * Need to move queue to tail of schedq if possible
> +                      */
> +                     if (odp_likely(schedq_elem_on_queue(q))) {
> +                             /* Queue is on schedq, try to move it to
> +                              * the tail
> +                              */
> +                             (void)schedq_cond_rotate(q->schedq, q);
> +                     }
> +                     /* Else queue not on schedq or not at head of schedq
> +                      * No pop => no push
> +                      */
> +             }
> +             q->qschst.cur_ticket = ticket + 1;
> +     }
> +}
> +#endif
> +
> +static inline void sched_update_popd_sc(sched_elem_t *elem)
> +{
> +     if (elem->pop_deficit != 0 &&
> +         schedq_elem_on_queue(elem) &&
> +         schedq_cond_pop(elem->schedq, elem))
> +             elem->pop_deficit--;
> +}
> +
> +#ifndef CONFIG_QSCHST_LOCK
> +static inline void sched_update_popd(sched_elem_t *elem)
> +{
> +     uint32_t ticket = __atomic_fetch_add(&elem->qschst.nxt_ticket,
> +                                         1,
> +                                         __ATOMIC_RELAXED);
> +     if (odp_unlikely(__atomic_load_n(&elem->qschst.cur_ticket,
> +                                      __ATOMIC_ACQUIRE) != ticket)) {
> +             SEVL();
> +             while (WFE() && LDXR8(&elem->qschst.cur_ticket,
> +                                   __ATOMIC_ACQUIRE) != ticket)
> +                     DOZE();
> +     }
> +     sched_update_popd_sc(elem);
> +     atomic_store_release(&elem->qschst.cur_ticket, ticket + 1,
> +                          /*readonly=*/false);
> +}
> +#endif
> +
> +sched_queue_t *schedq_from_sched_group(odp_schedule_group_t grp, uint32_t 
> prio)
> +{
> +     uint32_t sgi;
> +     sched_group_t *sg;
> +     uint32_t x;
> +
> +     ODP_ASSERT(grp >= 0 && grp <= (odp_schedule_group_t)MAX_SCHED_GROUP);
> +     ODP_ASSERT((sg_free & (1ULL << grp)) == 0);
> +     ODP_ASSERT(prio < ODP_SCHED_PRIO_NUM);
> +
> +     sgi = grp;
> +     sg = sg_vec[sgi];
> +
> +     /* Use xcount to spread queues over the xfactor schedq's
> +      * per priority.
> +      */
> +     x = __atomic_fetch_add(&sg->xcount[prio], 1, __ATOMIC_RELAXED);
> +     if (x == 0) {
> +             /* First ODP queue for this priority
> +              * Notify all threads in sg->thr_wanted that they
> +              * should join.
> +              */
> +             sched_group_mask_t thrds = sg->thr_wanted;
> +
> +             while (!bitset_is_null(thrds)) {
> +                     uint32_t thr;
> +
> +                     thr = bitset_ffs(thrds) - 1;
> +                     thrds = bitset_clr(thrds, thr);
> +                     /* Notify the thread about membership in this
> +                      * group/priority.
> +                      */
> +                     atom_bitset_set(&thread_state[thr].sg_wanted[prio],
> +                                     sgi, __ATOMIC_RELEASE);
> +                     __atomic_store_n(&thread_state[thr].sg_sem, 1,
> +                                      __ATOMIC_RELEASE);
> +             }
> +     }
> +     return &sg->schedq[prio * sg->xfactor + x % sg->xfactor];
> +}
> +
> +void sched_group_xcount_dec(odp_schedule_group_t grp, uint32_t prio)
> +{
> +     uint32_t sgi;
> +     sched_group_t *sg;
> +     uint32_t x;
> +
> +     ODP_ASSERT(grp >= 0 && grp <= (odp_schedule_group_t)MAX_SCHED_GROUP);
> +     ODP_ASSERT((sg_free & (1ULL << grp)) == 0);
> +     ODP_ASSERT(prio < ODP_SCHED_PRIO_NUM);
> +
> +     sgi = grp;
> +     sg = sg_vec[sgi];
> +     x = __atomic_sub_fetch(&sg->xcount[prio], 1, __ATOMIC_RELAXED);
> +
> +     if (x == 0) {
> +             /* Last ODP queue for this priority
> +              * Notify all threads in sg->thr_wanted that they
> +              * should leave.
> +              */
> +             sched_group_mask_t thrds = sg->thr_wanted;
> +
> +             while (!bitset_is_null(thrds)) {
> +                     uint32_t thr;
> +
> +                     thr = bitset_ffs(thrds) - 1;
> +                     thrds = bitset_clr(thrds, thr);
> +                     /* Notify the thread about membership in this
> +                      * group/priority.
> +                      */
> +                     atom_bitset_clr(&thread_state[thr].sg_wanted[prio],
> +                                     sgi, __ATOMIC_RELEASE);
> +                     __atomic_store_n(&thread_state[thr].sg_sem, 1,
> +                                      __ATOMIC_RELEASE);
> +             }
> +     }
> +}
> +
> +static void update_sg_membership(sched_scalable_thread_state_t *ts)
> +{
> +     uint32_t p;
> +     sched_group_mask_t sg_wanted;
> +     sched_group_mask_t added;
> +     sched_group_mask_t removed;
> +     uint32_t sgi;
> +     sched_group_t *sg;
> +     uint32_t x;
> +
> +     for (p = 0; p < ODP_SCHED_PRIO_NUM; p++) {
> +             sg_wanted = __atomic_load_n(&ts->sg_wanted[p],
> +                                         __ATOMIC_ACQUIRE);
> +             if (!bitset_is_eql(ts->sg_actual[p], sg_wanted)) {
> +                     /* Our sched_group membership has changed */
> +                     added = bitset_andn(sg_wanted, ts->sg_actual[p]);
> +                     while (!bitset_is_null(added)) {
> +                             sgi = bitset_ffs(added) - 1;
> +                             sg = sg_vec[sgi];
> +                             for (x = 0; x < sg->xfactor; x++) {
> +                                     /* Include our thread index to shift
> +                                      * (rotate) the order of schedq's
> +                                      */
> +                                     insert_schedq_in_list
> +                                             (ts,
> +                                              &sg->schedq[p * sg->xfactor +
> +                                              (x + ts->tidx) % sg->xfactor]);
> +                             }
> +                             atom_bitset_set(&sg->thr_actual[p], ts->tidx,
> +                                             __ATOMIC_RELAXED);
> +                             added = bitset_clr(added, sgi);
> +                     }
> +                     removed = bitset_andn(ts->sg_actual[p], sg_wanted);
> +                     while (!bitset_is_null(removed)) {
> +                             sgi = bitset_ffs(removed) - 1;
> +                             sg = sg_vec[sgi];
> +                             for (x = 0; x < sg->xfactor; x++) {
> +                                     remove_schedq_from_list
> +                                             (ts,
> +                                              &sg->schedq[p *
> +                                              sg->xfactor + x]);
> +                             }
> +                             atom_bitset_clr(&sg->thr_actual[p], ts->tidx,
> +                                             __ATOMIC_RELAXED);
> +                             removed = bitset_clr(removed, sgi);
> +                     }
> +                     ts->sg_actual[p] = sg_wanted;
> +             }
> +     }
> +}
> +
> +/*******************************************************************************
> + * Scheduler
> + 
> ******************************************************************************/
> +
> +static inline void _schedule_release_atomic(sched_scalable_thread_state_t 
> *ts)
> +{
> +#ifdef CONFIG_QSCHST_LOCK
> +     sched_update_deq_sc(ts->atomq, ts->dequeued, true);
> +     ODP_ASSERT(ts->atomq->qschst.cur_ticket != ts->ticket);
> +     ODP_ASSERT(ts->atomq->qschst.cur_ticket ==
> +                     ts->atomq->qschst.nxt_ticket);
> +#else
> +     sched_update_deq(ts->atomq, ts->dequeued, true);
> +#endif
> +     ts->atomq = NULL;
> +     ts->ticket = TICKET_INVALID;
> +}
> +
> +static inline void _schedule_release_ordered(sched_scalable_thread_state_t 
> *ts)
> +{
> +     ts->out_of_order = false;
> +     rctx_release(ts->rctx);
> +     ts->rctx = NULL;
> +}
> +
> +static void pktin_poll(sched_scalable_thread_state_t *ts)
> +{
> +     uint32_t i, tag, hi, npolls = 0;
> +     int pktio_index, queue_index;
> +
> +     hi = __atomic_load_n(&pktin_hi, __ATOMIC_RELAXED);
> +     if (hi == 0)
> +             return;
> +
> +     for (i = ts->pktin_next; npolls != hi; i = (i + 1) % hi, npolls++) {
> +             tag = __atomic_load_n(&pktin_tags[i], __ATOMIC_RELAXED);
> +             if (!TAG_IS_READY(tag))
> +                     continue;
> +             if (!__atomic_compare_exchange_n(&pktin_tags[i], &tag,
> +                                              tag | TAG_BUSY,
> +                                              true,
> +                                              __ATOMIC_ACQUIRE,
> +                                              __ATOMIC_RELAXED))
> +                     continue;
> +             /* Tag grabbed */
> +             pktio_index = TAG_2_PKTIO(tag);
> +             queue_index = TAG_2_QUEUE(tag);
> +             if (odp_unlikely(sched_cb_pktin_poll(pktio_index,
> +                                                  1, &queue_index))) {
> +                     /* Pktio stopped or closed
> +                      * Remove tag from pktin_tags
> +                      */
> +                     __atomic_store_n(&pktin_tags[i],
> +                                      TAG_EMPTY, __ATOMIC_RELAXED);
> +                     __atomic_fetch_sub(&pktin_num,
> +                                        1, __ATOMIC_RELEASE);
> +                     /* Call stop_finalize when all queues
> +                      * of the pktio have been removed
> +                      */
> +                     if (__atomic_sub_fetch(&pktin_count[pktio_index], 1,
> +                                            __ATOMIC_RELAXED) == 0)
> +                             sched_cb_pktio_stop_finalize(pktio_index);
> +             } else {
> +                 /* We don't know whether any packets were found and enqueued
> +                  * Write back original tag value to release pktin queue
> +                  */
> +                 __atomic_store_n(&pktin_tags[i], tag, __ATOMIC_RELAXED);
> +                 /* Do not iterate through all pktin queues every time */
> +                 if ((ts->pktin_poll_cnts & 0xf) != 0)
> +                     break;
> +             }
> +     }
> +     ODP_ASSERT(i < hi);
> +     ts->pktin_poll_cnts++;
> +     ts->pktin_next = i;
> +}
> +
> +static int _schedule(odp_queue_t *from, odp_event_t ev[], int num_evts)
> +{
> +     sched_scalable_thread_state_t *ts;
> +     sched_elem_t *atomq;
> +     int num;
> +     uint32_t i;
> +
> +     ts = sched_ts;
> +     atomq = ts->atomq;
> +
> +     /* Once an atomic queue has been scheduled to a thread, it will stay
> +      * on that thread until empty or 'rotated' by WRR
> +      */
> +     if (atomq != NULL) {
> +             ODP_ASSERT(ts->ticket != TICKET_INVALID);
> +#ifdef CONFIG_QSCHST_LOCK
> +             LOCK(&atomq->qschlock);
> +#endif
> +dequeue_atomic:
> +             ODP_ASSERT(ts->ticket == atomq->qschst.cur_ticket);
> +             ODP_ASSERT(ts->ticket != atomq->qschst.nxt_ticket);
> +             /* Atomic queues can be dequeued without lock since this thread
> +              * has the only reference to the atomic queue being processed.
> +              */
> +             if (ts->dequeued < atomq->qschst.wrr_budget) {
> +                     num = _odp_queue_deq_sc(atomq, ev, num_evts);
> +                     if (odp_likely(num != 0)) {
> +#ifdef CONFIG_QSCHST_LOCK
> +                             UNLOCK(&atomq->qschlock);
> +#endif
> +                             ts->dequeued += num;
> +                             /* Allow this thread to continue to 'own' this
> +                              * atomic queue until all events have been
> +                              * processed and the thread re-invokes the
> +                              * scheduler.
> +                              */
> +                             if (from)
> +                                     *from = queue_get_handle(
> +                                                     (queue_entry_t *)atomq);
> +                             return num;
> +                     }
> +             }
> +             /* Atomic queue was empty or interrupted by WRR, release it. */
> +             _schedule_release_atomic(ts);
> +#ifdef CONFIG_QSCHST_LOCK
> +             UNLOCK(&atomq->qschlock);
> +#endif
> +     }
> +
> +     /* Release any previous reorder context. */
> +     if (ts->rctx != NULL)
> +             _schedule_release_ordered(ts);
> +
> +     /* Check for and perform any scheduler group updates. */
> +     if (odp_unlikely(__atomic_load_n(&ts->sg_sem, __ATOMIC_RELAXED) != 0)) {
> +             (void)__atomic_load_n(&ts->sg_sem, __ATOMIC_ACQUIRE);
> +             ts->sg_sem = 0;
> +             update_sg_membership(ts);
> +     }
> +
> +     /* Scan our schedq list from beginning to end */
> +     for (i = 0; i < ts->num_schedq; i++) {
> +             sched_queue_t *schedq = ts->schedq_list[i];
> +             sched_elem_t *elem;
> +restart_same:
> +             elem = schedq_peek(schedq);
> +             if (odp_unlikely(elem == NULL)) {
> +                     /* Schedq empty, look at next one. */
> +                     continue;
> +             }
> +
> +             if (elem->cons_type == ODP_SCHED_SYNC_ATOMIC) {
> +                     /* Dequeue element only if it is still at head
> +                      * of schedq.
> +                      */
> +                     if (odp_unlikely(!schedq_cond_pop(schedq, elem))) {
> +                             /* Queue not at head of schedq anymore, some
> +                              * other thread popped it.
> +                              */
> +                             goto restart_same;
> +                     }
> +                     ts->atomq = elem;
> +                     atomq = elem;
> +                     ts->dequeued = 0;
> +#ifdef CONFIG_QSCHST_LOCK
> +                     LOCK(&atomq->qschlock);
> +                     ts->ticket = atomq->qschst.nxt_ticket++;
> +                     ODP_ASSERT(atomq->qschst.cur_ticket == ts->ticket);
> +#else
> +                     /* Dequeued atomic queue from the schedq, only we
> +                      * can process it and any qschst updates are our
> +                      * responsibility.
> +                      */
> +                     /* The ticket taken below will signal producers */
> +                     ts->ticket = __atomic_fetch_add(
> +                             &atomq->qschst.nxt_ticket, 1, __ATOMIC_RELAXED);
> +                     while (__atomic_load_n(
> +                                     &atomq->qschst.cur_ticket,
> +                                     __ATOMIC_ACQUIRE) != ts->ticket) {
> +                             /* No need to use WFE, spinning here seems
> +                              * very infrequent.
> +                              */
> +                             odp_cpu_pause();
> +                     }
> +#endif
> +                     goto dequeue_atomic;
> +             } else if (elem->cons_type == ODP_SCHED_SYNC_PARALLEL) {
> +#ifdef CONFIG_QSCHST_LOCK
> +                     LOCK(&elem->qschlock);
> +                     num = _odp_queue_deq_sc(elem, ev, num_evts);
> +                     if (odp_likely(num != 0)) {
> +                             sched_update_deq_sc(elem, num, false);
> +                             UNLOCK(&elem->qschlock);
> +                             if (from)
> +                                     *from =
> +                                     queue_get_handle((queue_entry_t *)elem);
> +                             return num;
> +                     }
> +                     UNLOCK(&elem->qschlock);
> +#else
> +                     num = _odp_queue_deq(elem, ev, num_evts);
> +                     if (odp_likely(num != 0)) {
> +                             sched_update_deq(elem, num, false);
> +                             if (from)
> +                                     *from =
> +                                     queue_get_handle((queue_entry_t *)elem);
> +                             return num;
> +                     }
> +#endif
> +             } else if (elem->cons_type == ODP_SCHED_SYNC_ORDERED) {
> +                     reorder_window_t *rwin;
> +                     reorder_context_t *rctx;
> +                     uint32_t sn;
> +                     uint32_t idx;
> +                     int ret;
> +
> +                     /* The ordered queue has a reorder window so requires
> +                      * order restoration. We must use a reorder context to
> +                      * collect all outgoing events. Ensure there is at least
> +                      * one available reorder context.
> +                      */
> +                     if (odp_unlikely(bitset_is_null(ts->priv_rvec_free))) {
> +                             ts->priv_rvec_free = atom_bitset_xchg(
> +                                                     &ts->rvec_free, 0,
> +                                                     __ATOMIC_RELAXED);
> +                             if (odp_unlikely(bitset_is_null(
> +                                             ts->priv_rvec_free))) {
> +                                     /* No free reorder contexts for
> +                                      * this thread. Look at next schedq,
> +                                      * hope we find non-ordered queue.
> +                                      */
> +                                     continue;
> +                             }
> +                     }
> +                     /* rwin_reserve and odp_queue_deq must be atomic or
> +                      * there will be a potential race condition.
> +                      * Allocate a slot in the reorder window.
> +                      */
> +                     rwin = queue_get_rwin((queue_entry_t *)elem);
> +                     ODP_ASSERT(rwin != NULL);
> +                     if (odp_unlikely(!rwin_reserve(rwin, &sn))) {
> +                             /* Reorder window full */
> +                             /* Look at next schedq, find other queue */
> +                             continue;
> +                     }
> +                     /* Wait for our turn to dequeue */
> +                     if (odp_unlikely(__atomic_load_n(&rwin->turn,
> +                                                      __ATOMIC_ACQUIRE)
> +                         != sn)) {
> +                             SEVL();
> +                             while (WFE() &&
> +                                    LDXR32(&rwin->turn, __ATOMIC_ACQUIRE)
> +                                             != sn)
> +                                     DOZE();
> +                     }
> +#ifdef CONFIG_QSCHST_LOCK
> +                     LOCK(&elem->qschlock);
> +#endif
> +                     ret = _odp_queue_deq_sc(elem, ev, num_evts);
> +                     /* Wait for prod_read write in _odp_queue_dequeue_sc()
> +                      * to complete before we signal the next consumer
> +                      */
> +                     atomic_store_release(&rwin->turn, sn + 1,
> +                                          /*readonly=*/false);
> +                     /* Find and initialise an unused reorder context. */
> +                     idx = bitset_ffs(ts->priv_rvec_free) - 1;
> +                     ts->priv_rvec_free =
> +                             bitset_clr(ts->priv_rvec_free, idx);
> +                     rctx = &ts->rvec[idx];
> +                     /* Need to initialise reorder context or we can't
> +                      * release it later.
> +                      */
> +                     rctx_init(rctx, idx, rwin, sn);
> +
> +                     /* Was dequeue successful? */
> +                     if (odp_likely(ret != 0)) {
> +                             /* Perform scheduler related updates */
> +#ifdef CONFIG_QSCHST_LOCK
> +                             sched_update_deq_sc(elem, ret,
> +                                                 /*atomic=*/false);
> +                             UNLOCK(&elem->qschlock);
> +#else
> +                             sched_update_deq(elem, ret, /*atomic=*/false);
> +#endif
> +
> +                             /* Are we in-order or out-of-order? */
> +                             ts->out_of_order = sn != rwin->hc.head;
> +
> +                             ts->rctx = rctx;
> +                             if (from)
> +                                     *from = queue_get_handle(
> +                                             (queue_entry_t *)elem);
> +                             return ret;
> +                     }
> +#ifdef CONFIG_QSCHST_LOCK
> +                     UNLOCK(&elem->qschlock);
> +#endif
> +                     /* Since a slot was reserved in the reorder window,
> +                      * the reorder context needs to be released and
> +                      * inserted into the reorder window.
> +                      */
> +                     rctx_release(rctx);
> +                     ODP_ASSERT(ts->rctx == NULL);
> +             }
> +             /* Dequeue from parallel/ordered queue failed
> +              * Check if we have a queue at the head of the schedq that needs
> +              * to be popped
> +              */
> +             if (odp_unlikely(__atomic_load_n(&elem->pop_deficit,
> +                                              __ATOMIC_RELAXED) != 0)) {
> +#ifdef CONFIG_QSCHST_LOCK
> +                     LOCK(&elem->qschlock);
> +                     sched_update_popd_sc(elem);
> +                     UNLOCK(&elem->qschlock);
> +#else
> +                     sched_update_popd(elem);
> +#endif
> +             }
> +     }
> +
> +     pktin_poll(ts);
> +     return 0;
> +}
> +
> +/******************************************************************************/
> +
> +static void schedule_order_lock(unsigned lock_index)
> +{
> +     struct reorder_context *rctx = sched_ts->rctx;
> +
> +     if (odp_unlikely(rctx == NULL ||
> +                      rctx->rwin == NULL ||
> +                      lock_index >= rctx->rwin->lock_count)) {
> +             ODP_ERR("Invalid call to odp_schedule_order_lock\n");
> +             return;
> +     }
> +     if (odp_unlikely(__atomic_load_n(&rctx->rwin->olock[lock_index],
> +                                      __ATOMIC_ACQUIRE) != rctx->sn)) {
> +             SEVL();
> +             while (WFE() &&
> +                    LDXR32(&rctx->rwin->olock[lock_index],
> +                           __ATOMIC_ACQUIRE) != rctx->sn)
> +                     DOZE();
> +     }
> +}
> +
> +static void schedule_order_unlock(unsigned lock_index)
> +{
> +     struct reorder_context *rctx;
> +
> +     rctx = sched_ts->rctx;
> +     if (odp_unlikely(rctx == NULL ||
> +                      rctx->rwin == NULL ||
> +                      lock_index >= rctx->rwin->lock_count ||
> +                      rctx->rwin->olock[lock_index] != rctx->sn)) {
> +             ODP_ERR("Invalid call to odp_schedule_order_unlock\n");
> +             return;
> +     }
> +     atomic_store_release(&rctx->rwin->olock[lock_index],
> +                          rctx->sn + 1,
> +                          /*readonly=*/false);
> +     rctx->olock_flags |= 1U << lock_index;
> +}
> +
> +static void schedule_release_atomic(void)
> +{
> +     sched_scalable_thread_state_t *ts;
> +
> +     ts = sched_ts;
> +     if (odp_likely(ts->atomq != NULL)) {
> +#ifdef CONFIG_QSCHST_LOCK
> +             sched_elem_t *atomq;
> +
> +             atomq = ts->atomq;
> +             LOCK(&atomq->qschlock);
> +#endif
> +             _schedule_release_atomic(ts);
> +#ifdef CONFIG_QSCHST_LOCK
> +             UNLOCK(&atomq->qschlock);
> +#endif
> +     }
> +}
> +
> +static void schedule_release_ordered(void)
> +{
> +     sched_scalable_thread_state_t *ts;
> +
> +     ts = sched_ts;
> +     if (ts->rctx != NULL)
> +             _schedule_release_ordered(ts);
> +}
> +
> +static int schedule_multi(odp_queue_t *from, uint64_t wait, odp_event_t ev[],
> +                       int num)
> +{
> +     sched_scalable_thread_state_t *ts;
> +     int n;
> +     odp_time_t start;
> +     odp_time_t delta;
> +     odp_time_t deadline;
> +
> +     ts = sched_ts;
> +     if (odp_unlikely(ts->pause)) {
> +             if (ts->atomq != NULL) {
> +#ifdef CONFIG_QSCHST_LOCK
> +                     sched_elem_t *atomq;
> +
> +                     atomq = ts->atomq;
> +                     LOCK(&atomq->qschlock);
> +#endif
> +                     _schedule_release_atomic(ts);
> +#ifdef CONFIG_QSCHST_LOCK
> +                     UNLOCK(&atomq->qschlock);
> +#endif
> +             } else if (ts->rctx != NULL) {
> +                     _schedule_release_ordered(ts);
> +             }
> +             return 0;
> +     }
> +
> +     if (wait == ODP_SCHED_NO_WAIT)
> +             return _schedule(from, ev, num);
> +
> +     if (wait == ODP_SCHED_WAIT) {
> +             for (;;) {
> +                     n = _schedule(from, ev, num);
> +                     if (odp_likely(n  > 0))
> +                             return n;
> +             }
> +     }
> +
> +     start = odp_time_local();
> +
> +     n = _schedule(from, ev, num);
> +     if (odp_likely(n > 0))
> +             return n;
> +
> +     delta = odp_time_local_from_ns(wait);
> +     deadline = odp_time_sum(start, delta);
> +
> +     while (odp_time_cmp(deadline, odp_time_local()) > 0) {
> +             n = _schedule(from, ev, num);
> +             if (odp_likely(n > 0))
> +                     return n;
> +     }
> +
> +     return 0;
> +}
> +
> +static odp_event_t schedule(odp_queue_t *from, uint64_t wait)
> +{
> +     odp_event_t ev = ODP_EVENT_INVALID;
> +     const int num = 1;
> +     sched_scalable_thread_state_t *ts;
> +     int n;
> +     odp_time_t start;
> +     odp_time_t delta;
> +     odp_time_t deadline;
> +
> +     ts = sched_ts;
> +     if (odp_unlikely(ts->pause)) {
> +             if (ts->atomq != NULL) {
> +#ifdef CONFIG_QSCHST_LOCK
> +                     sched_elem_t *atomq;
> +
> +                     atomq = ts->atomq;
> +                     LOCK(&atomq->qschlock);
> +#endif
> +                     _schedule_release_atomic(ts);
> +#ifdef CONFIG_QSCHST_LOCK
> +                     UNLOCK(&atomq->qschlock);
> +#endif
> +             } else if (ts->rctx != NULL) {
> +                     _schedule_release_ordered(ts);
> +             }
> +             return ev;
> +     }
> +
> +     if (wait == ODP_SCHED_NO_WAIT) {
> +             (void)_schedule(from, &ev, num);
> +             return ev;
> +     }
> +
> +     if (wait == ODP_SCHED_WAIT) {
> +             for (;;) {
> +                     n = _schedule(from, &ev, num);
> +                     if (odp_likely(n > 0))
> +                             return ev;
> +             }
> +     }
> +
> +     start = odp_time_local();
> +
> +     n = _schedule(from, &ev, num);
> +     if (odp_likely(n > 0))
> +             return ev;
> +
> +     delta = odp_time_local_from_ns(wait);
> +     deadline = odp_time_sum(start, delta);
> +
> +     while (odp_time_cmp(deadline, odp_time_local()) > 0) {
> +             n = _schedule(from, &ev, num);
> +             if (odp_likely(n > 0))
> +                     return ev;
> +     }
> +
> +     return ev;
> +}
> +
> +static void schedule_pause(void)
> +{
> +     sched_ts->pause = true;
> +}
> +
> +static void schedule_resume(void)
> +{
> +     sched_ts->pause = false;
> +}
> +
> +static uint64_t schedule_wait_time(uint64_t ns)
> +{
> +     return ns;
> +}
> +
> +static int schedule_num_prio(void)
> +{
> +     return ODP_SCHED_PRIO_NUM;
> +}
> +
> +static int schedule_group_update(sched_group_t *sg,
> +                              uint32_t sgi,
> +                              const odp_thrmask_t *mask,
> +                              int join_leave)
> +{
> +     int thr;
> +     uint32_t p;
> +
> +     /* Internal function, do not validate inputs */
> +
> +     /* Notify relevant threads about the change */
> +     thr = odp_thrmask_first(mask);
> +     while (0 <= thr) {
> +             /* Add thread to scheduler group's wanted thread mask */
> +             if (join_leave == SCHED_GROUP_JOIN)
> +                     atom_bitset_set(&sg->thr_wanted, thr, __ATOMIC_RELAXED);
> +             else
> +                     atom_bitset_clr(&sg->thr_wanted, thr, __ATOMIC_RELAXED);
> +             for (p = 0; p < ODP_SCHED_PRIO_NUM; p++) {
> +                     if (sg->xcount[p] != 0) {
> +                             /* This priority level has ODP queues
> +                              * Notify the thread about membership in
> +                              * this group/priority
> +                              */
> +                             if (join_leave == SCHED_GROUP_JOIN)
> +                                     atom_bitset_set(
> +                                             &thread_state[thr].sg_wanted[p],
> +                                             sgi,
> +                                             __ATOMIC_RELEASE);
> +                             else
> +                                     atom_bitset_clr(
> +                                             &thread_state[thr].sg_wanted[p],
> +                                             sgi,
> +                                             __ATOMIC_RELEASE);
> +                             __atomic_store_n(&thread_state[thr].sg_sem,
> +                                              1,
> +                                              __ATOMIC_RELEASE);
> +                     }
> +             }
> +             thr = odp_thrmask_next(mask, thr);
> +     }
> +
> +     return 0;
> +}
> +
> +static int _schedule_group_thrmask(sched_group_t *sg, odp_thrmask_t *mask)
> +{
> +     bitset_t bs;
> +     uint32_t bit;
> +
> +     /* Internal function, do not validate inputs */
> +
> +     odp_thrmask_zero(mask);
> +     bs = sg->thr_wanted;
> +     while (!bitset_is_null(bs)) {
> +             bit = bitset_ffs(bs) - 1;
> +             bs = bitset_clr(bs, bit);
> +             odp_thrmask_set(mask, bit);
> +     }
> +
> +     return 0;
> +}
> +
> +static odp_schedule_group_t schedule_group_create(const char *name,
> +                                               const odp_thrmask_t *mask)
> +{
> +     odp_shm_t shm;
> +     uint32_t sgi;
> +     sched_group_mask_t free;
> +     uint32_t xfactor;
> +     sched_group_t *sg;
> +     uint32_t p;
> +     uint32_t x;
> +
> +     /* Validate inputs */
> +     if (mask == NULL)
> +             ODP_ABORT("mask is NULL\n");
> +
> +     odp_spinlock_lock(&sched_grp_lock);
> +
> +     /* Allocate a scheduler group */
> +     free = __atomic_load_n(&sg_free, __ATOMIC_RELAXED);
> +     do {
> +             /* All sched_groups in use */
> +             if (bitset_is_null(free))
> +                     goto no_free_sched_group;
> +
> +             sgi = bitset_ffs(free) - 1;
> +             /* All sched_groups in use */
> +             if (sgi >= MAX_SCHED_GROUP)
> +                     goto no_free_sched_group;
> +     } while (!__atomic_compare_exchange_n(&sg_free,
> +                                       &free,
> +                                       bitset_clr(free, sgi),
> +                                       true,
> +                                       __ATOMIC_ACQUIRE,
> +                                       __ATOMIC_ACQUIRE));
> +
> +     /* Compute xfactor (spread factor) from the number of threads
> +      * present in the thread mask. Preferable this would be an
> +      * explicit parameter.
> +      */
> +     xfactor = odp_thrmask_count(mask);
> +     if (xfactor < 1)
> +             xfactor = 1;
> +
> +     shm = odp_shm_reserve(name ? name : "",
> +                           (sizeof(sched_group_t) +
> +                           (ODP_SCHED_PRIO_NUM * xfactor - 1) *
> +                            sizeof(sched_queue_t)),
> +                           ODP_CACHE_LINE_SIZE,
> +                           ODP_SHM_PROC);
> +     if (ODP_SHM_INVALID == shm)
> +             goto shm_reserve_failed;
> +
> +     sg = (sched_group_t *)odp_shm_addr(shm);
> +     if (sg == NULL)
> +             goto shm_addr_failed;
> +
> +     strncpy(sg->name, name ? name : "", ODP_SCHED_GROUP_NAME_LEN - 1);
> +     sg->shm = shm;
> +     sg_vec[sgi] = sg;
> +     memset(sg->thr_actual, 0, sizeof(sg->thr_actual));
> +     sg->thr_wanted = bitset_null();
> +     sg->xfactor = xfactor;
> +     for (p = 0; p < ODP_SCHED_PRIO_NUM; p++) {
> +             sg->xcount[p] = 0;
> +             for (x = 0; x < xfactor; x++)
> +                     schedq_init(&sg->schedq[p * xfactor + x], p);
> +     }
> +     if (odp_thrmask_count(mask) != 0)
> +             schedule_group_update(sg, sgi, mask, SCHED_GROUP_JOIN);
> +
> +     odp_spinlock_unlock(&sched_grp_lock);
> +
> +     return (odp_schedule_group_t)(sgi);
> +
> +shm_addr_failed:
> +     odp_shm_free(shm);
> +
> +shm_reserve_failed:
> +     /* Free the allocated group index */
> +     atom_bitset_set(&sg_free, sgi, __ATOMIC_RELAXED);
> +
> +no_free_sched_group:
> +     odp_spinlock_unlock(&sched_grp_lock);
> +
> +     return ODP_SCHED_GROUP_INVALID;
> +}
> +
> +static int schedule_group_destroy(odp_schedule_group_t group)
> +{
> +     uint32_t sgi;
> +     sched_group_t *sg;
> +     uint32_t p;
> +     int ret = 0;
> +
> +     /* Validate inputs */
> +     if (group < 0 && group >= (odp_schedule_group_t)MAX_SCHED_GROUP) {
> +             ret = -1;
> +             goto invalid_group;
> +     }
> +
> +     if (sched_ts &&
> +         odp_unlikely(__atomic_load_n(&sched_ts->sg_sem,
> +                                      __ATOMIC_RELAXED) != 0)) {
> +             (void)__atomic_load_n(&sched_ts->sg_sem,
> +                                   __ATOMIC_ACQUIRE);
> +             sched_ts->sg_sem = 0;
> +             update_sg_membership(sched_ts);
> +     }
> +     odp_spinlock_lock(&sched_grp_lock);
> +
> +     sgi = (uint32_t)group;
> +     if (bitset_is_set(sg_free, sgi)) {
> +             ret = -1;
> +             goto group_not_found;
> +     }
> +
> +     sg = sg_vec[sgi];
> +     /* First ensure all threads have processed group_join/group_leave
> +      * requests.
> +      */
> +     for (p = 0; p < ODP_SCHED_PRIO_NUM; p++) {
> +             if (sg->xcount[p] != 0) {
> +                     bitset_t wanted = atom_bitset_load(
> +                             &sg->thr_wanted, __ATOMIC_RELAXED);
> +
> +                     SEVL();
> +                     while (WFE() &&
> +                            !bitset_is_eql(wanted,
> +                                           bitset_ldex(&sg->thr_actual[p],
> +                                                       __ATOMIC_RELAXED)))
> +                             DOZE();
> +             }
> +             /* Else ignore because no ODP queues on this prio */
> +     }
> +
> +     /* Check if all threads/queues have left the group */
> +     for (p = 0; p < ODP_SCHED_PRIO_NUM; p++) {
> +             if (!bitset_is_null(sg->thr_actual[p])) {
> +                     ODP_ERR("Group has threads\n");
> +                     ret = -1;
> +                     goto thrd_q_present_in_group;
> +             }
> +             if (sg->xcount[p] != 0) {
> +                     ODP_ERR("Group has queues\n");
> +                     ret = -1;
> +                     goto thrd_q_present_in_group;
> +             }
> +     }
> +
> +     odp_shm_free(sg->shm);
> +     sg_vec[sgi] = NULL;
> +     atom_bitset_set(&sg_free, sgi, __ATOMIC_RELEASE);
> +
> +     odp_spinlock_unlock(&sched_grp_lock);
> +
> +     return ret;
> +
> +thrd_q_present_in_group:
> +
> +group_not_found:
> +     odp_spinlock_unlock(&sched_grp_lock);
> +
> +invalid_group:
> +
> +     return ret;
> +}
> +
> +static odp_schedule_group_t schedule_group_lookup(const char *name)
> +{
> +     uint32_t sgi;
> +     odp_schedule_group_t group;
> +
> +     /* Validate inputs */
> +     if (name == NULL)
> +             ODP_ABORT("name or mask is NULL\n");
> +
> +     group = ODP_SCHED_GROUP_INVALID;
> +
> +     odp_spinlock_lock(&sched_grp_lock);
> +
> +     /* Scan through the schedule group array */
> +     for (sgi = 0; sgi < MAX_SCHED_GROUP; sgi++) {
> +             if ((sg_vec[sgi] != NULL) &&
> +                 (strncmp(name, sg_vec[sgi]->name,
> +                          ODP_SCHED_GROUP_NAME_LEN) == 0)) {
> +                     group = (odp_schedule_group_t)sgi;
> +                     break;
> +             }
> +     }
> +
> +     odp_spinlock_unlock(&sched_grp_lock);
> +
> +     return group;
> +}
> +
> +static int schedule_group_join(odp_schedule_group_t group,
> +                            const odp_thrmask_t *mask)
> +{
> +     uint32_t sgi;
> +     sched_group_t *sg;
> +     int ret;
> +
> +     /* Validate inputs */
> +     if (group < 0 && group >= ((odp_schedule_group_t)MAX_SCHED_GROUP))
> +             return -1;
> +
> +     if (mask == NULL)
> +             ODP_ABORT("name or mask is NULL\n");
> +
> +     odp_spinlock_lock(&sched_grp_lock);
> +
> +     sgi = (uint32_t)group;
> +     if (bitset_is_set(sg_free, sgi)) {
> +             odp_spinlock_unlock(&sched_grp_lock);
> +             return -1;
> +     }
> +
> +     sg = sg_vec[sgi];
> +     ret = schedule_group_update(sg, sgi, mask, SCHED_GROUP_JOIN);
> +
> +     odp_spinlock_unlock(&sched_grp_lock);
> +
> +     return ret;
> +}
> +
> +static int schedule_group_leave(odp_schedule_group_t group,
> +                             const odp_thrmask_t *mask)
> +{
> +     uint32_t sgi;
> +     sched_group_t *sg;
> +     int ret = 0;
> +
> +     /* Validate inputs */
> +     if (group < 0 && group >= (odp_schedule_group_t)MAX_SCHED_GROUP) {
> +             ret = -1;
> +             goto invalid_group;
> +     }
> +
> +     if (mask == NULL)
> +             ODP_ABORT("name or mask is NULL\n");
> +
> +     odp_spinlock_lock(&sched_grp_lock);
> +
> +     sgi = (uint32_t)group;
> +     if (bitset_is_set(sg_free, sgi)) {
> +             ret = -1;
> +             goto group_not_found;
> +     }
> +
> +     sg = sg_vec[sgi];
> +
> +     ret = schedule_group_update(sg, sgi, mask, SCHED_GROUP_LEAVE);
> +
> +     odp_spinlock_unlock(&sched_grp_lock);
> +
> +     return ret;
> +
> +group_not_found:
> +     odp_spinlock_unlock(&sched_grp_lock);
> +
> +invalid_group:
> +     return ret;
> +}
> +
> +static int schedule_group_thrmask(odp_schedule_group_t group,
> +                               odp_thrmask_t *mask)
> +{
> +     uint32_t sgi;
> +     sched_group_t *sg;
> +     int ret = 0;
> +
> +     /* Validate inputs */
> +     if (group < 0 && group >= ((odp_schedule_group_t)MAX_SCHED_GROUP)) {
> +             ret = -1;
> +             goto invalid_group;
> +     }
> +
> +     if (mask == NULL)
> +             ODP_ABORT("name or mask is NULL\n");
> +
> +     odp_spinlock_lock(&sched_grp_lock);
> +
> +     sgi = (uint32_t)group;
> +     if (bitset_is_set(sg_free, sgi)) {
> +             ret = -1;
> +             goto group_not_found;
> +     }
> +
> +     sg = sg_vec[sgi];
> +     ret = _schedule_group_thrmask(sg, mask);
> +
> +     odp_spinlock_unlock(&sched_grp_lock);
> +
> +     return ret;
> +
> +group_not_found:
> +     odp_spinlock_unlock(&sched_grp_lock);
> +
> +invalid_group:
> +     return ret;
> +}
> +
> +static int schedule_group_info(odp_schedule_group_t group,
> +                            odp_schedule_group_info_t *info)
> +{
> +     uint32_t sgi;
> +     sched_group_t *sg;
> +     int ret = 0;
> +
> +     /* Validate inputs */
> +     if (group < 0 && group >= ((odp_schedule_group_t)MAX_SCHED_GROUP)) {
> +             ret = -1;
> +             goto invalid_group;
> +     }
> +
> +     if (info == NULL)
> +             ODP_ABORT("name or mask is NULL\n");
> +
> +     odp_spinlock_lock(&sched_grp_lock);
> +
> +     sgi = (uint32_t)group;
> +     if (bitset_is_set(sg_free, sgi)) {
> +             ret = -1;
> +             goto group_not_found;
> +     }
> +
> +     sg = sg_vec[sgi];
> +
> +     ret = _schedule_group_thrmask(sg, &info->thrmask);
> +
> +     info->name = sg->name;
> +
> +     odp_spinlock_unlock(&sched_grp_lock);
> +
> +     return ret;
> +
> +group_not_found:
> +     odp_spinlock_unlock(&sched_grp_lock);
> +
> +invalid_group:
> +     return ret;
> +}
> +
> +static int schedule_init_global(void)
> +{
> +     odp_thrmask_t mask;
> +     odp_schedule_group_t tmp_all;
> +     odp_schedule_group_t tmp_wrkr;
> +     odp_schedule_group_t tmp_ctrl;
> +     uint32_t bits;
> +
> +     odp_spinlock_init(&sched_grp_lock);
> +
> +     bits = MAX_SCHED_GROUP;
> +     if (MAX_SCHED_GROUP == sizeof(sg_free) * CHAR_BIT)
> +             sg_free = ~0ULL;
> +     else
> +             sg_free = (1ULL << bits) - 1;
> +
> +     for (uint32_t i = 0; i < MAX_SCHED_GROUP; i++)
> +             sg_vec[i] = NULL;
> +     for (uint32_t i = 0; i < MAXTHREADS; i++) {
> +             thread_state[i].sg_sem = 0;
> +             for (uint32_t j = 0; j < ODP_SCHED_PRIO_NUM; j++)
> +                     thread_state[i].sg_wanted[j] = bitset_null();
> +     }
> +
> +     /* Create sched groups for default GROUP_ALL, GROUP_WORKER and
> +      * GROUP_CONTROL groups.
> +      */
> +     odp_thrmask_zero(&mask);
> +     tmp_all = odp_schedule_group_create("__group_all", &mask);
> +     if (tmp_all != ODP_SCHED_GROUP_ALL) {
> +             ODP_ERR("Could not create ODP_SCHED_GROUP_ALL()\n");
> +             goto failed_create_group_all;
> +     }
> +
> +     tmp_wrkr = odp_schedule_group_create("__group_worker", &mask);
> +     if (tmp_wrkr != ODP_SCHED_GROUP_WORKER) {
> +             ODP_ERR("Could not create ODP_SCHED_GROUP_WORKER()\n");
> +             goto failed_create_group_worker;
> +     }
> +
> +     tmp_ctrl = odp_schedule_group_create("__group_control", &mask);
> +     if (tmp_ctrl != ODP_SCHED_GROUP_CONTROL) {
> +             ODP_ERR("Could not create ODP_SCHED_GROUP_CONTROL()\n");
> +             goto failed_create_group_control;
> +     }
> +
> +     return 0;
> +
> +failed_create_group_control:
> +     if (tmp_ctrl != ODP_SCHED_GROUP_INVALID)
> +             odp_schedule_group_destroy(ODP_SCHED_GROUP_CONTROL);
> +
> +failed_create_group_worker:
> +     if (tmp_wrkr != ODP_SCHED_GROUP_INVALID)
> +             odp_schedule_group_destroy(ODP_SCHED_GROUP_WORKER);
> +
> +failed_create_group_all:
> +     if (tmp_all != ODP_SCHED_GROUP_INVALID)
> +             odp_schedule_group_destroy(ODP_SCHED_GROUP_ALL);
> +
> +     return -1;
> +}
> +
> +static int schedule_term_global(void)
> +{
> +     /* Destroy sched groups for default GROUP_ALL, GROUP_WORKER and
> +      * GROUP_CONTROL groups.
> +      */
> +     if (odp_schedule_group_destroy(ODP_SCHED_GROUP_ALL) != 0)
> +             ODP_ERR("Failed to destroy ODP_SCHED_GROUP_ALL\n");
> +     if (odp_schedule_group_destroy(ODP_SCHED_GROUP_WORKER) != 0)
> +             ODP_ERR("Failed to destroy ODP_SCHED_GROUP_WORKER\n");
> +     if (odp_schedule_group_destroy(ODP_SCHED_GROUP_CONTROL) != 0)
> +             ODP_ERR("Failed to destroy ODP_SCHED_GROUP_CONTROL\n");
> +
> +     return 0;
> +}
> +
> +static int schedule_init_local(void)
> +{
> +     int thr_id;
> +     odp_thread_type_t thr_type;
> +     odp_thrmask_t mask;
> +
> +     thr_id = odp_thread_id();
> +     if (thread_state_init(thr_id))
> +             goto failed_to_init_ts;
> +
> +     /* Add this thread to default schedule groups */
> +     thr_type = odp_thread_type();
> +     odp_thrmask_zero(&mask);
> +     odp_thrmask_set(&mask, thr_id);
> +
> +     if (odp_schedule_group_join(ODP_SCHED_GROUP_ALL, &mask) != 0) {
> +             ODP_ERR("Failed to join ODP_SCHED_GROUP_ALL\n");
> +             goto failed_to_join_grp_all;
> +     }
> +     if (thr_type == ODP_THREAD_CONTROL) {
> +             if (odp_schedule_group_join(ODP_SCHED_GROUP_CONTROL,
> +                                         &mask) != 0) {
> +                     ODP_ERR("Failed to join ODP_SCHED_GROUP_CONTROL\n");
> +                     goto failed_to_join_grp_ctrl;
> +             }
> +     } else {
> +             if (odp_schedule_group_join(ODP_SCHED_GROUP_WORKER,
> +                                         &mask) != 0) {
> +                     ODP_ERR("Failed to join ODP_SCHED_GROUP_WORKER\n");
> +                     goto failed_to_join_grp_wrkr;
> +             }
> +     }
> +
> +     return 0;
> +
> +failed_to_join_grp_wrkr:
> +
> +failed_to_join_grp_ctrl:
> +     odp_schedule_group_leave(ODP_SCHED_GROUP_ALL, &mask);
> +
> +failed_to_join_grp_all:
> +failed_to_init_ts:
> +
> +     return -1;
> +}
> +
> +static int schedule_term_local(void)
> +{
> +     int thr_id;
> +     odp_thread_type_t thr_type;
> +     odp_thrmask_t mask;
> +     int rc = 0;
> +
> +     /* Remove this thread from default schedule groups */
> +     thr_id = odp_thread_id();
> +     thr_type = odp_thread_type();
> +     odp_thrmask_zero(&mask);
> +     odp_thrmask_set(&mask, thr_id);
> +
> +     if (odp_schedule_group_leave(ODP_SCHED_GROUP_ALL, &mask) != 0)
> +             ODP_ERR("Failed to leave ODP_SCHED_GROUP_ALL\n");
> +     if (thr_type == ODP_THREAD_CONTROL) {
> +             if (odp_schedule_group_leave(ODP_SCHED_GROUP_CONTROL,
> +                                          &mask) != 0)
> +                     ODP_ERR("Failed to leave ODP_SCHED_GROUP_CONTROL\n");
> +     } else {
> +             if (odp_schedule_group_leave(ODP_SCHED_GROUP_WORKER,
> +                                          &mask) != 0)
> +                     ODP_ERR("Failed to leave ODP_SCHED_GROUP_WORKER\n");
> +     }
> +
> +     update_sg_membership(sched_ts);
> +
> +     /* Check if the thread is still part of any groups */
> +     if (sched_ts->num_schedq != 0) {
> +             ODP_ERR("Thread %d still part of scheduler group(s)\n",
> +                     sched_ts->tidx);
> +             rc = -1;
> +     }
> +
> +     return rc;
> +}
> +
> +int queue_tm_reorder(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
> +{
> +     (void)queue;
> +     (void)buf_hdr;
> +     return 0;
> +}
> +
> +static void pktio_start(int pktio_index, int num_in_queue, int 
> in_queue_idx[])
> +{
> +     int i;
> +     uint32_t old, tag, j;
> +
> +     for (i = 0; i < num_in_queue; i++) {
> +             /* Try to reserve a slot */
> +             if (__atomic_fetch_add(&pktin_num,
> +                                    1, __ATOMIC_RELAXED) >= PKTIN_MAX) {
> +                     __atomic_fetch_sub(&pktin_num, 1, __ATOMIC_RELAXED);
> +                     ODP_ABORT("Too many pktio in queues for scheduler\n");
> +             }
> +             /* A slot has been reserved, now we need to find an empty one */
> +             for (j = 0; ; j = (j + 1) % PKTIN_MAX) {
> +                     if (__atomic_load_n(&pktin_tags[j],
> +                                         __ATOMIC_RELAXED) != TAG_EMPTY)
> +                             /* Slot used, continue with next */
> +                             continue;
> +                     /* Empty slot found */
> +                     old = TAG_EMPTY;
> +                     tag = PKTIO_QUEUE_2_TAG(pktio_index, in_queue_idx[i]);
> +                     if (__atomic_compare_exchange_n(&pktin_tags[j],
> +                                                     &old,
> +                                                     tag,
> +                                                     true,
> +                                                     __ATOMIC_RELEASE,
> +                                                     __ATOMIC_RELAXED)) {
> +                             /* Success grabbing slot,update high
> +                              * watermark
> +                              */
> +                             __atomic_fetch_max(&pktin_hi,
> +                                                j + 1, __ATOMIC_RELAXED);
> +                             /* One more tag (queue) for this pktio
> +                              * instance
> +                              */
> +                             __atomic_fetch_add(&pktin_count[pktio_index],
> +                                                1, __ATOMIC_RELAXED);
> +                             /* Continue with next RX queue */
> +                             break;
> +                     }
> +                     /* Failed to grab slot */
> +             }
> +     }
> +}
> +
> +static int num_grps(void)
> +{
> +     return MAX_SCHED_GROUP;
> +}
> +
> +/*
> + * Stubs for internal scheduler abstraction layer due to absence of NULL
> + * checking before calling the function pointer.
> + */
> +
> +static int thr_add(odp_schedule_group_t group, int thr)
> +{
> +     /* This function is a schedule_init_local duplicate. */
> +     (void)group;
> +     (void)thr;
> +     return 0;
> +}
> +
> +static int thr_rem(odp_schedule_group_t group, int thr)
> +{
> +     /* This function is a schedule_term_local duplicate. */
> +     (void)group;
> +     (void)thr;
> +     return 0;
> +}
> +
> +static int init_queue(uint32_t queue_index,
> +                   const odp_schedule_param_t *sched_param)
> +{
> +     /* Not used in scalable scheduler. */
> +     (void)queue_index;
> +     (void)sched_param;
> +     return 0;
> +}
> +
> +static void destroy_queue(uint32_t queue_index)
> +{
> +     /* Not used in scalable scheduler. */
> +     (void)queue_index;
> +}
> +
> +static int sched_queue(uint32_t queue_index)
> +{
> +     /* Not used in scalable scheduler. */
> +     (void)queue_index;
> +     return 0;
> +}
> +
> +static int ord_enq_multi(uint32_t queue_index, void *p_buf_hdr[],
> +                      int num, int *ret)
> +{
> +     (void)queue_index;
> +     (void)p_buf_hdr;
> +     (void)num;
> +     (void)ret;
> +     return 0;
> +}
> +
> +static void schedule_prefetch(int num)
> +{
> +     (void)num;
> +}
> +
> +static void order_lock(void)
> +{
> +}
> +
> +static void order_unlock(void)
> +{
> +}
> +
> +const schedule_fn_t schedule_scalable_fn = {
> +     .pktio_start    = pktio_start,
> +     .thr_add        = thr_add,
> +     .thr_rem        = thr_rem,
> +     .num_grps       = num_grps,
> +     .init_queue     = init_queue,
> +     .destroy_queue  = destroy_queue,
> +     .sched_queue    = sched_queue,
> +     .ord_enq_multi  = ord_enq_multi,
> +     .init_global    = schedule_init_global,
> +     .term_global    = schedule_term_global,
> +     .init_local     = schedule_init_local,
> +     .term_local     = schedule_term_local,
> +     .order_lock     = order_lock,
> +     .order_unlock   = order_unlock,
> +};
> +
> +const schedule_api_t schedule_scalable_api = {
> +     .schedule_wait_time             = schedule_wait_time,
> +     .schedule                       = schedule,
> +     .schedule_multi                 = schedule_multi,
> +     .schedule_pause                 = schedule_pause,
> +     .schedule_resume                = schedule_resume,
> +     .schedule_release_atomic        = schedule_release_atomic,
> +     .schedule_release_ordered       = schedule_release_ordered,
> +     .schedule_prefetch              = schedule_prefetch,
> +     .schedule_num_prio              = schedule_num_prio,
> +     .schedule_group_create          = schedule_group_create,
> +     .schedule_group_destroy         = schedule_group_destroy,
> +     .schedule_group_lookup          = schedule_group_lookup,
> +     .schedule_group_join            = schedule_group_join,
> +     .schedule_group_leave           = schedule_group_leave,
> +     .schedule_group_thrmask         = schedule_group_thrmask,
> +     .schedule_group_info            = schedule_group_info,
> +     .schedule_order_lock            = schedule_order_lock,
> +     .schedule_order_unlock          = schedule_order_unlock,
> +};
> diff --git a/platform/linux-generic/odp_schedule_scalable_ordered.c 
> b/platform/linux-generic/odp_schedule_scalable_ordered.c
> new file mode 100644
> index 00000000..ba2b6c3a
> --- /dev/null
> +++ b/platform/linux-generic/odp_schedule_scalable_ordered.c
> @@ -0,0 +1,285 @@
> +/* Copyright (c) 2017, ARM Limited
> + * All rights reserved.
> + *
> + * SPDX-License-Identifier:     BSD-3-Clause
> + */
> +
> +#include <odp/api/shared_memory.h>
> +#include <odp_queue_internal.h>
> +#include <odp_schedule_if.h>
> +#include <odp_schedule_ordered_internal.h>
> +#include <odp_llsc.h>
> +#include <odp_bitset.h>
> +
> +#include <string.h>
> +
> +extern __thread sched_scalable_thread_state_t *sched_ts;
> +
> +#define RWIN_NAME_SIZE 32
> +
> +reorder_window_t *rwin_alloc(int rwin_id, odp_shm_t *shm, unsigned 
> lock_count)
> +{
> +     char rwin_name[RWIN_NAME_SIZE];
> +     reorder_window_t *rwin;
> +     uint32_t i;
> +
> +     strncpy(rwin_name, "rwin", RWIN_NAME_SIZE);
> +     i = strlen(rwin_name);
> +     snprintf(rwin_name + i,
> +              (RWIN_NAME_SIZE - i), "%d", rwin_id);
> +
> +     *shm = odp_shm_reserve(rwin_name,
> +                     sizeof(reorder_window_t),
> +                     ODP_CACHE_LINE_SIZE,
> +                     ODP_SHM_PROC);
> +     if (ODP_SHM_INVALID == *shm)
> +             goto shm_reserve_failed;
> +
> +     rwin = (reorder_window_t *)odp_shm_addr(*shm);
> +     if (rwin == NULL)
> +             goto shm_addr_failed;
> +
> +     rwin->hc.head = 0;
> +     rwin->hc.chgi = 0;
> +     rwin->winmask = RWIN_SIZE - 1;
> +     rwin->tail = 0;
> +     rwin->turn = 0;
> +     rwin->lock_count = (uint16_t)lock_count;
> +     memset(rwin->olock, 0, sizeof(rwin->olock));
> +     for (i = 0; i < RWIN_SIZE; i++)
> +             rwin->ring[i] = NULL;
> +
> +     return rwin;
> +
> +shm_addr_failed:
> +     odp_shm_free(*shm);
> +
> +shm_reserve_failed:
> +     return NULL;
> +}
> +
> +bool rwin_reserve(reorder_window_t *rwin, uint32_t *sn)
> +{
> +     uint32_t head;
> +     uint32_t oldt;
> +     uint32_t newt;
> +     uint32_t winmask;
> +
> +     /* Read head and tail separately */
> +     oldt = rwin->tail;
> +     winmask = rwin->winmask;
> +     do {
> +             /* Need __atomic_load to avoid compiler reordering */
> +             head = __atomic_load_n(&rwin->hc.head, __ATOMIC_RELAXED);
> +             if (odp_unlikely(oldt - head >= winmask))
> +                     return false;
> +
> +             newt = oldt + 1;
> +     } while (!__atomic_compare_exchange(&rwin->tail,
> +                                         &oldt,
> +                                         &newt,
> +                                         true,
> +                                         __ATOMIC_RELAXED,
> +                                         __ATOMIC_RELAXED));
> +     *sn = oldt;
> +
> +     return true;
> +}
> +
> +void rwin_insert(reorder_window_t *rwin,
> +              reorder_context_t *rctx,
> +              uint32_t sn,
> +              void (*callback)(reorder_context_t *))
> +{
> +     /* Initialise to silence scan-build */
> +     hc_t old = {0, 0};
> +     hc_t new;
> +     uint32_t winmask;
> +
> +     __atomic_load(&rwin->hc, &old, __ATOMIC_ACQUIRE);
> +     winmask = rwin->winmask;
> +     if (old.head != sn) {
> +             /* We are out-of-order. Store context in reorder window,
> +              * releasing its content.
> +              */
> +             ODP_ASSERT(rwin->ring[sn & winmask] == NULL);
> +             atomic_store_release(&rwin->ring[sn & winmask],
> +                                  rctx,
> +                                  /*readonly=*/false);
> +             rctx = NULL;
> +             do {
> +                     hc_t new;
> +
> +                     new.head = old.head;
> +                     new.chgi = old.chgi + 1; /* Changed value */
> +                     /* Update head & chgi, fail if any has changed */
> +                     if (__atomic_compare_exchange(&rwin->hc,
> +                                                   /* Updated on fail */
> +                                                   &old,
> +                                                   &new,
> +                                                   true,
> +                                                   /* Rel our ring update */
> +                                                   __ATOMIC_RELEASE,
> +                                                   __ATOMIC_ACQUIRE))
> +                             /* CAS succeeded => head same (we are not
> +                              * in-order), chgi updated.
> +                              */
> +                             return;
> +                     /* CAS failed => head and/or chgi changed.
> +                      * We might not be out-of-order anymore.
> +                      */
> +             } while (old.head != sn);
> +     }
> +
> +     /* old.head == sn => we are now in-order! */
> +     ODP_ASSERT(old.head == sn);
> +     /* We are in-order so our responsibility to retire contexts */
> +     new.head = old.head;
> +     new.chgi = old.chgi + 1;
> +
> +     /* Retire our in-order context (if we still have it) */
> +     if (rctx != NULL) {
> +             callback(rctx);
> +             new.head++;
> +     }
> +
> +     /* Retire in-order contexts in the ring
> +      * The first context might actually be ours (if we were originally
> +      * out-of-order)
> +      */
> +     do {
> +             for (;;) {
> +                     rctx = __atomic_load_n(&rwin->ring[new.head & winmask],
> +                                            __ATOMIC_ACQUIRE);
> +                     if (rctx == NULL)
> +                             break;
> +                     /* We are the only thread that are in-order
> +                      * (until head updated) so don't have to use
> +                      * atomic load-and-clear (exchange)
> +                      */
> +                     rwin->ring[new.head & winmask] = NULL;
> +                     callback(rctx);
> +                     new.head++;
> +             }
> +     /* Update head&chgi, fail if chgi has changed (head cannot change) */
> +     } while (!__atomic_compare_exchange(&rwin->hc,
> +                     &old, /* Updated on failure */
> +                     &new,
> +                     false, /* weak */
> +                     __ATOMIC_RELEASE, /* Release our ring updates */
> +                     __ATOMIC_ACQUIRE));
> +}
> +
> +void rctx_init(reorder_context_t *rctx, uint16_t idx,
> +            reorder_window_t *rwin, uint32_t sn)
> +{
> +     /* rctx->rvec_free and rctx->idx already initialised in
> +      * thread_state_init function.
> +      */
> +     ODP_ASSERT(rctx->idx == idx);
> +     rctx->rwin = rwin;
> +     rctx->sn = sn;
> +     rctx->olock_flags = 0;
> +     /* First => no next reorder context */
> +     rctx->next_idx = idx;
> +     /* Where to store next event */
> +     rctx->cur_idx = idx;
> +     rctx->numevts = 0;
> +}
> +
> +inline void rctx_free(const reorder_context_t *rctx)
> +{
> +     const reorder_context_t *const base = &rctx[-(int)rctx->idx];
> +     const uint32_t first = rctx->idx;
> +     uint32_t next_idx;
> +
> +     next_idx = rctx->next_idx;
> +
> +     ODP_ASSERT(rctx->rwin != NULL);
> +     /* Set free bit */
> +     if (rctx->rvec_free == &sched_ts->rvec_free)
> +             /* Since it is our own reorder context, we can instead
> +              * perform a non-atomic and relaxed update on our private
> +              * rvec_free.
> +              */
> +             sched_ts->priv_rvec_free =
> +                     bitset_set(sched_ts->priv_rvec_free, rctx->idx);
> +     else
> +             atom_bitset_set(rctx->rvec_free, rctx->idx, __ATOMIC_RELEASE);
> +
> +     /* Can't dereference rctx after the corresponding free bit is set */
> +     while (next_idx != first) {
> +             rctx = &base[next_idx];
> +             next_idx = rctx->next_idx;
> +             /* Set free bit */
> +             if (rctx->rvec_free == &sched_ts->rvec_free)
> +                     sched_ts->priv_rvec_free =
> +                             bitset_set(sched_ts->priv_rvec_free, rctx->idx);
> +             else
> +                     atom_bitset_set(rctx->rvec_free, rctx->idx,
> +                                     __ATOMIC_RELEASE);
> +     }
> +}
> +
> +inline void olock_unlock(const reorder_context_t *rctx, reorder_window_t 
> *rwin,
> +                      uint32_t lock_index)
> +{
> +     if ((rctx->olock_flags & (1U << lock_index)) == 0) {
> +             /* Use relaxed ordering, we are not releasing any updates */
> +             rwin->olock[lock_index] = rctx->sn + 1;
> +     }
> +}
> +
> +void olock_release(const reorder_context_t *rctx)
> +{
> +     reorder_window_t *rwin;
> +
> +     rwin = rctx->rwin;
> +
> +     switch (rwin->lock_count) {
> +     case 2:
> +             olock_unlock(rctx, rwin, 1);
> +     case 1:
> +             olock_unlock(rctx, rwin, 0);
> +     }
> +     ODP_STATIC_ASSERT(NUM_OLOCKS == 2, "Number of ordered locks != 2");
> +}
> +
> +void rctx_retire(reorder_context_t *first)
> +{
> +     reorder_context_t *rctx;
> +     queue_entry_t *q;
> +     uint32_t i;
> +     uint32_t j;
> +     uint32_t num;
> +     int rc;
> +
> +     rctx = first;
> +     do {
> +             /* Process all events in this reorder context */
> +             for (i = 0; i < rctx->numevts;) {
> +                     q = rctx->destq[i];
> +                     /* Find index of next different destq */
> +                     j = i + 1;
> +                     while (j < rctx->numevts && rctx->destq[j] == q)
> +                             j++;
> +                     num = j - i;
> +                     rc = q->s.enqueue_multi(q, &rctx->events[i], num);
> +                     if (odp_unlikely(rc != (int)num))
> +                             ODP_ERR("Failed to enqueue deferred events\n");
> +                     i += num;
> +             }
> +             /* Update rctx pointer to point to 'next_idx' element */
> +             rctx += (int)rctx->next_idx - (int)rctx->idx;
> +     } while (rctx != first);
> +     olock_release(first);
> +     rctx_free(first);
> +}
> +
> +void rctx_release(reorder_context_t *rctx)
> +{
> +     /* Insert reorder context into reorder window, potentially calling the
> +      * rctx_retire function for all pending reorder_contexts.
> +      */
> +     rwin_insert(rctx->rwin, rctx, rctx->sn, rctx_retire);
> +}
> diff --git a/platform/linux-generic/odp_traffic_mngr.c 
> b/platform/linux-generic/odp_traffic_mngr.c
> index 4e9358b9..3244dfe3 100644
> --- a/platform/linux-generic/odp_traffic_mngr.c
> +++ b/platform/linux-generic/odp_traffic_mngr.c
> @@ -3918,9 +3918,10 @@ odp_tm_queue_t odp_tm_queue_create(odp_tm_t odp_tm,
>       tm_queue_obj->pkt = ODP_PACKET_INVALID;
>       odp_ticketlock_init(&tm_wred_node->tm_wred_node_lock);
>  
> -     tm_queue_obj->tm_qentry.s.type = QUEUE_TYPE_TM;
> -     tm_queue_obj->tm_qentry.s.enqueue = queue_tm_reenq;
> -     tm_queue_obj->tm_qentry.s.enqueue_multi = queue_tm_reenq_multi;
> +     queue_set_type(&tm_queue_obj->tm_qentry, QUEUE_TYPE_TM);
> +     queue_set_enq_func(&tm_queue_obj->tm_qentry, queue_tm_reenq);
> +     queue_set_enq_multi_func(&tm_queue_obj->tm_qentry,
> +                              queue_tm_reenq_multi);
>  
>       tm_system->queue_num_tbl[tm_queue_obj->queue_num - 1] = tm_queue_obj;
>       odp_ticketlock_lock(&tm_system->tm_system_lock);
> diff --git a/platform/linux-generic/pktio/loop.c 
> b/platform/linux-generic/pktio/loop.c
> index 49d8a211..1f2f16c1 100644
> --- a/platform/linux-generic/pktio/loop.c
> +++ b/platform/linux-generic/pktio/loop.c
> @@ -80,11 +80,13 @@ static int loopback_recv(pktio_entry_t *pktio_entry, int 
> index ODP_UNUSED,
>  
>       for (i = 0; i < nbr; i++) {
>               uint32_t pkt_len;
> -
> +#ifdef ODP_SCHEDULE_SCALABLE
> +             pkt = _odp_packet_from_buffer((odp_buffer_t)(hdr_tbl[i]));
> +#else
>               pkt = _odp_packet_from_buffer(odp_hdr_to_buf(hdr_tbl[i]));
> +#endif
>               pkt_len = odp_packet_len(pkt);
>  
> -
>               if (pktio_cls_enabled(pktio_entry)) {
>                       odp_packet_t new_pkt;
>                       odp_pool_t new_pool;
> @@ -162,7 +164,11 @@ static int loopback_send(pktio_entry_t *pktio_entry, int 
> index ODP_UNUSED,
>               len = QUEUE_MULTI_MAX;
>  
>       for (i = 0; i < len; ++i) {
> +#ifdef ODP_SCHEDULE_SCALABLE
> +             hdr_tbl[i] = _odp_packet_to_buf_hdr_ptr(pkt_tbl[i]);
> +#else
>               hdr_tbl[i] = buf_hdl_to_hdr(_odp_packet_to_buffer(pkt_tbl[i]));
> +#endif
>               bytes += odp_packet_len(pkt_tbl[i]);
>       }
>  
> diff --git a/test/common_plat/performance/odp_sched_latency.c 
> b/test/common_plat/performance/odp_sched_latency.c
> index 2b28cd7b..4a933f5b 100644
> --- a/test/common_plat/performance/odp_sched_latency.c
> +++ b/test/common_plat/performance/odp_sched_latency.c
> @@ -28,7 +28,13 @@
>  #define MAX_WORKERS    64            /**< Maximum number of worker threads */
>  #define MAX_QUEUES     4096          /**< Maximum number of queues */
>  #define EVENT_POOL_SIZE        (1024 * 1024) /**< Event pool size */
> +
> +#ifdef ODP_SCHEDULE_SP
>  #define TEST_ROUNDS (4 * 1024 * 1024)        /**< Test rounds for each 
> thread */
> +#else
> +#define TEST_ROUNDS (32 * 1024 * 1024)       /**< Test rounds for each 
> thread */
> +#endif
> +
>  #define MAIN_THREAD     1 /**< Thread ID performing maintenance tasks */
>  
>  /* Default values for command line arguments */
> @@ -104,6 +110,9 @@ typedef union {
>  typedef struct {
>       core_stat_t      core_stat[MAX_WORKERS]; /**< Core specific stats */
>       odp_barrier_t    barrier; /**< Barrier for thread synchronization */
> +#ifdef ODP_SCHEDULE_SCALABLE
> +     odp_schedule_group_t schedule_group;
> +#endif
>       odp_pool_t       pool;    /**< Pool for allocating test events */
>       test_args_t      args;    /**< Parsed command line arguments */
>       odp_queue_t      queue[NUM_PRIOS][MAX_QUEUES]; /**< Scheduled queues */
> @@ -119,7 +128,11 @@ static void clear_sched_queues(void)
>       odp_event_t ev;
>  
>       while (1) {
> -             ev = odp_schedule(NULL, ODP_SCHED_NO_WAIT);
> +             /* Need a non-zero timeout to ensure we can observe
> +              * any non-empty queue made eligible for scheduling
> +              * by some other thread.
> +              */
> +             ev = odp_schedule(NULL, 1000);
>  
>               if (ev == ODP_EVENT_INVALID)
>                       break;
> @@ -428,6 +441,20 @@ static int run_thread(void *arg ODP_UNUSED)
>               return -1;
>       }
>  
> +#ifdef ODP_SCHEDULE_SCALABLE
> +     int err;
> +     odp_thrmask_t thrmask;
> +
> +     odp_thrmask_zero(&thrmask);
> +     odp_thrmask_set(&thrmask, thr);
> +
> +     err = odp_schedule_group_join(globals->schedule_group, &thrmask);
> +     if (err != 0) {
> +             LOG_ERR("odp_schedule_group_join failed\n");
> +             return -1;
> +     }
> +#endif
> +
>       if (thr == MAIN_THREAD) {
>               args = &globals->args;
>  
> @@ -452,6 +479,13 @@ static int run_thread(void *arg ODP_UNUSED)
>       if (test_schedule(thr, globals))
>               return -1;
>  
> +#ifdef ODP_SCHEDULE_SCALABLE
> +     err = odp_schedule_group_leave(globals->schedule_group, &thrmask);
> +     if (err != 0) {
> +             LOG_ERR("odp_schedule_group_leave failed\n");
> +             return -1;
> +     }
> +#endif
>       return 0;
>  }
>  
> @@ -692,6 +726,31 @@ int main(int argc, char *argv[])
>       }
>       globals->pool = pool;
>  
> +#ifdef ODP_SCHEDULE_SCALABLE
> +     /*
> +      * Create scheduler group
> +      */
> +     odp_thrmask_t expected_thrmask;
> +     int cpu;
> +     int thr;
> +
> +     odp_thrmask_zero(&expected_thrmask);
> +     /* This is a odp_thrmask_from_cpumask() */
> +     cpu = odp_cpumask_first(&cpumask);
> +     thr = 1;
> +     while (0 <= cpu) {
> +             odp_thrmask_set(&expected_thrmask, thr++);
> +             cpu = odp_cpumask_next(&cpumask, cpu);
> +     }
> +
> +     globals->schedule_group =
> +             odp_schedule_group_create("sg0", &expected_thrmask);
> +     if (globals->schedule_group == ODP_SCHED_GROUP_INVALID) {
> +             LOG_ERR("odp_schedule_group_create failed\n");
> +             return -1;
> +     }
> +#endif
> +
>       /*
>        * Create queues for schedule test
>        */
> @@ -713,7 +772,11 @@ int main(int argc, char *argv[])
>               param.type        = ODP_QUEUE_TYPE_SCHED;
>               param.sched.prio  = prio;
>               param.sched.sync  = args.sync_type;
> +#ifdef ODP_SCHEDULE_SCALABLE
> +             param.sched.group = globals->schedule_group;
> +#else
>               param.sched.group = ODP_SCHED_GROUP_ALL;
> +#endif
>  
>               for (j = 0; j < args.prio[i].queues; j++) {
>                       name[9]  = '0' + j / 10;
> @@ -758,6 +821,9 @@ int main(int argc, char *argv[])
>               }
>       }
>  
> +#ifdef ODP_SCHEDULE_SCALABLE
> +     ret += odp_schedule_group_destroy(globals->schedule_group);
> +#endif
>       ret += odp_shm_free(shm);
>       ret += odp_pool_destroy(pool);
>       ret += odp_term_local();
> 

Reply via email to