On 04/04/17 21:48, Brian Brooks wrote:
> Add queue getters and setters to provide an abstraction over more than one
> internal queue data structure.
>
> Use buffer handles instead of pointer to internal object in pktio and tm code.
>
> Increase the running time of odp_sched_latency to get more stable numbers
> across multiple runs. This is not done for SP scheduler because it is too
> slow.
>
> Use an explicit scheduler group in odp_sched_latency.
>
> Add scalable queue and scheduler implementation.
>
> Signed-off-by: Brian Brooks <[email protected]>
> Signed-off-by: Kevin Wang <[email protected]>
> Signed-off-by: Honnappa Nagarahalli <[email protected]>
> Signed-off-by: Ola Liljedahl <[email protected]>
> ---
> platform/linux-generic/Makefile.am | 21 +-
> .../include/odp/api/plat/schedule_types.h | 20 +-
> .../linux-generic/include/odp_queue_internal.h | 122 +-
> platform/linux-generic/include/odp_schedule_if.h | 166 +-
> .../include/odp_schedule_ordered_internal.h | 150 ++
> platform/linux-generic/m4/odp_schedule.m4 | 55 +-
> platform/linux-generic/odp_classification.c | 4 +-
> platform/linux-generic/odp_packet_io.c | 88 +-
> platform/linux-generic/odp_queue.c | 2 +-
> platform/linux-generic/odp_queue_scalable.c | 883 +++++++++
> platform/linux-generic/odp_schedule_if.c | 36 +-
> platform/linux-generic/odp_schedule_scalable.c | 1922
> ++++++++++++++++++++
> .../linux-generic/odp_schedule_scalable_ordered.c | 285 +++
> platform/linux-generic/odp_traffic_mngr.c | 7 +-
> platform/linux-generic/pktio/loop.c | 10 +-
> test/common_plat/performance/odp_sched_latency.c | 68 +-
> 16 files changed, 3754 insertions(+), 85 deletions(-)
> create mode 100644
> platform/linux-generic/include/odp_schedule_ordered_internal.h
> create mode 100644 platform/linux-generic/odp_queue_scalable.c
> create mode 100644 platform/linux-generic/odp_schedule_scalable.c
> create mode 100644 platform/linux-generic/odp_schedule_scalable_ordered.c
>
> diff --git a/platform/linux-generic/Makefile.am
> b/platform/linux-generic/Makefile.am
> index 70683cac..8c263b99 100644
> --- a/platform/linux-generic/Makefile.am
> +++ b/platform/linux-generic/Makefile.am
> @@ -151,6 +151,8 @@ noinst_HEADERS = \
> ${srcdir}/include/odp_debug_internal.h \
> ${srcdir}/include/odp_forward_typedefs_internal.h \
> ${srcdir}/include/odp_internal.h \
> + ${srcdir}/include/odp_llqueue.h \
> + ${srcdir}/include/odp_llsc.h \
> ${srcdir}/include/odp_name_table_internal.h \
> ${srcdir}/include/odp_packet_internal.h \
> ${srcdir}/include/odp_packet_io_internal.h \
> @@ -219,13 +221,9 @@ __LIB__libodp_linux_la_SOURCES = \
> pktio/ring.c \
> odp_pkt_queue.c \
> odp_pool.c \
> - odp_queue.c \
> odp_rwlock.c \
> odp_rwlock_recursive.c \
> - odp_schedule.c \
> odp_schedule_if.c \
> - odp_schedule_sp.c \
> - odp_schedule_iquery.c \
> odp_shared_memory.c \
> odp_sorted_list.c \
> odp_spinlock.c \
> @@ -250,6 +248,21 @@ __LIB__libodp_linux_la_SOURCES = \
> arch/@ARCH_DIR@/odp_cpu_arch.c \
> arch/@ARCH_DIR@/odp_sysinfo_parse.c
>
> +if ODP_SCHEDULE_SP
can that ifs be removed here? I commented about that in previous patch set.
Maxim.
> +__LIB__libodp_linux_la_SOURCES += odp_schedule_sp.c
> +endif
> +
> +if ODP_SCHEDULE_IQUERY
> +__LIB__libodp_linux_la_SOURCES += odp_schedule_iquery.c
> +endif
> +
> +if ODP_SCHEDULE_SCALABLE
> +__LIB__libodp_linux_la_SOURCES += odp_queue_scalable.c
> odp_schedule_scalable.c \
> + odp_schedule_scalable_ordered.c
> +else
> +__LIB__libodp_linux_la_SOURCES += odp_queue.c odp_schedule.c
> +endif
> +
> if HAVE_PCAP
> __LIB__libodp_linux_la_SOURCES += pktio/pcap.c
> endif
> diff --git a/platform/linux-generic/include/odp/api/plat/schedule_types.h
> b/platform/linux-generic/include/odp/api/plat/schedule_types.h
> index 535fd6d0..373065c9 100644
> --- a/platform/linux-generic/include/odp/api/plat/schedule_types.h
> +++ b/platform/linux-generic/include/odp/api/plat/schedule_types.h
> @@ -18,6 +18,8 @@
> extern "C" {
> #endif
>
> +#include <odp/api/std_types.h>
> +
> /** @addtogroup odp_scheduler
> * @{
> */
> @@ -27,6 +29,20 @@ extern "C" {
>
> typedef int odp_schedule_prio_t;
>
> +#ifdef ODP_SCHEDULE_SCALABLE
> +
> +#define ODP_SCHED_PRIO_NUM 8
> +
> +#define ODP_SCHED_PRIO_HIGHEST 0
> +
> +#define ODP_SCHED_PRIO_LOWEST (ODP_SCHED_PRIO_NUM - 1)
> +
> +#define ODP_SCHED_PRIO_DEFAULT (ODP_SCHED_PRIO_NUM / 2)
> +
> +#define ODP_SCHED_PRIO_NORMAL ODP_SCHED_PRIO_DEFAULT
> +
> +#else
> +
> #define ODP_SCHED_PRIO_HIGHEST 0
>
> #define ODP_SCHED_PRIO_NORMAL 4
> @@ -35,6 +51,8 @@ typedef int odp_schedule_prio_t;
>
> #define ODP_SCHED_PRIO_DEFAULT ODP_SCHED_PRIO_NORMAL
>
> +#endif
> +
> typedef int odp_schedule_sync_t;
>
> #define ODP_SCHED_SYNC_PARALLEL 0
> @@ -44,7 +62,7 @@ typedef int odp_schedule_sync_t;
> typedef int odp_schedule_group_t;
>
> /* These must be kept in sync with thread_globals_t in odp_thread.c */
> -#define ODP_SCHED_GROUP_INVALID -1
> +#define ODP_SCHED_GROUP_INVALID ((odp_schedule_group_t)-1)
> #define ODP_SCHED_GROUP_ALL 0
> #define ODP_SCHED_GROUP_WORKER 1
> #define ODP_SCHED_GROUP_CONTROL 2
> diff --git a/platform/linux-generic/include/odp_queue_internal.h
> b/platform/linux-generic/include/odp_queue_internal.h
> index 560f826e..b9387340 100644
> --- a/platform/linux-generic/include/odp_queue_internal.h
> +++ b/platform/linux-generic/include/odp_queue_internal.h
> @@ -19,16 +19,24 @@ extern "C" {
> #endif
>
> #include <odp/api/queue.h>
> -#include <odp_forward_typedefs_internal.h>
> -#include <odp_schedule_if.h>
> -#include <odp_buffer_internal.h>
> -#include <odp_align_internal.h>
> +#include <odp/api/std_types.h>
> +#include <odp/api/buffer.h>
> #include <odp/api/packet_io.h>
> #include <odp/api/align.h>
> #include <odp/api/hints.h>
> #include <odp/api/ticketlock.h>
> +
> #include <odp_config_internal.h>
>
> +#include <odp_align_internal.h>
> +#include <odp_buffer_internal.h>
> +#include <odp_forward_typedefs_internal.h>
> +#ifdef ODP_SCHEDULE_SCALABLE
> +#include <odp_llsc.h>
> +#include <odp_schedule_ordered_internal.h>
> +#endif
> +#include <odp_schedule_if.h>
> +
> #define QUEUE_MULTI_MAX CONFIG_BURST_SIZE
>
> #define QUEUE_STATUS_FREE 0
> @@ -37,8 +45,6 @@ extern "C" {
> #define QUEUE_STATUS_NOTSCHED 3
> #define QUEUE_STATUS_SCHED 4
>
> -
> -/* forward declaration */
> union queue_entry_u;
>
> typedef int (*enq_func_t)(union queue_entry_u *, odp_buffer_hdr_t *);
> @@ -49,6 +55,37 @@ typedef int (*enq_multi_func_t)(union queue_entry_u *,
> typedef int (*deq_multi_func_t)(union queue_entry_u *,
> odp_buffer_hdr_t **, int);
>
> +#ifdef ODP_SCHEDULE_SCALABLE
> +#define BUFFER_HDR_INVALID ((odp_buffer_hdr_t *)(void *)ODP_EVENT_INVALID)
> +
> +struct queue_entry_s {
> + sched_elem_t sched_elem;
> + odp_shm_t shm;
> + odp_shm_t rwin_shm;
> +
> + odp_ticketlock_t lock ODP_ALIGNED_CACHE;
> + int status;
> +
> + enq_func_t enqueue ODP_ALIGNED_CACHE;
> + deq_func_t dequeue;
> + enq_multi_func_t enqueue_multi;
> + deq_multi_func_t dequeue_multi;
> +
> + uint32_t index;
> + odp_queue_t handle;
> + odp_queue_type_t type;
> + odp_queue_param_t param;
> + odp_pktin_queue_t pktin;
> + odp_pktout_queue_t pktout;
> + char name[ODP_QUEUE_NAME_LEN];
> +};
> +
> +int _odp_queue_deq(sched_elem_t *q, odp_event_t *evp, int num);
> +int _odp_queue_deq_sc(sched_elem_t *q, odp_event_t *evp, int num);
> +
> +#else
> +#define BUFFER_HDR_INVALID NULL
> +
> struct queue_entry_s {
> odp_ticketlock_t lock ODP_ALIGNED_CACHE;
>
> @@ -77,6 +114,8 @@ struct queue_entry_s {
> char name[ODP_QUEUE_NAME_LEN];
> };
>
> +#endif
> +
> union queue_entry_u {
> struct queue_entry_s s;
> uint8_t pad[ROUNDUP_CACHE_LINE(sizeof(struct queue_entry_s))];
> @@ -94,6 +133,12 @@ int queue_deq_multi(queue_entry_t *queue,
> odp_buffer_hdr_t *buf_hdr[], int num);
> void queue_lock(queue_entry_t *queue);
> void queue_unlock(queue_entry_t *queue);
>
> +int queue_pktout_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr);
> +int queue_pktout_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
> + int num);
> +
> +int queue_tm_reorder(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr);
> +
> static inline uint32_t queue_to_id(odp_queue_t handle)
> {
> return _odp_typeval(handle) - 1;
> @@ -107,6 +152,71 @@ static inline queue_entry_t *queue_to_qentry(odp_queue_t
> handle)
> return get_qentry(queue_id);
> }
>
> +static inline odp_queue_t queue_get_handle(queue_entry_t *queue)
> +{
> + return queue->s.handle;
> +}
> +
> +static inline odp_pktout_queue_t queue_get_pktout(queue_entry_t *queue)
> +{
> + return queue->s.pktout;
> +}
> +
> +static inline void queue_set_pktout(queue_entry_t *queue,
> + odp_pktio_t pktio,
> + int index)
> +{
> + queue->s.pktout.pktio = pktio;
> + queue->s.pktout.index = index;
> +}
> +
> +static inline odp_pktin_queue_t queue_get_pktin(queue_entry_t *queue)
> +{
> + return queue->s.pktin;
> +}
> +
> +static inline void queue_set_pktin(queue_entry_t *queue,
> + odp_pktio_t pktio,
> + int index)
> +{
> + queue->s.pktin.pktio = pktio;
> + queue->s.pktin.index = index;
> +}
> +
> +static inline void queue_set_enq_func(queue_entry_t *queue, enq_func_t func)
> +{
> + queue->s.enqueue = func;
> +}
> +
> +static inline void queue_set_enq_multi_func(queue_entry_t *queue,
> + enq_multi_func_t func)
> +{
> + queue->s.enqueue_multi = func;
> +}
> +
> +static inline void queue_set_deq_func(queue_entry_t *queue, deq_func_t func)
> +{
> + queue->s.dequeue = func;
> +}
> +
> +static inline void queue_set_deq_multi_func(queue_entry_t *queue,
> + deq_multi_func_t func)
> +{
> + queue->s.dequeue_multi = func;
> +}
> +
> +static inline void queue_set_type(queue_entry_t *queue, odp_queue_type_t
> type)
> +{
> + queue->s.type = type;
> +}
> +
> +#ifdef ODP_SCHEDULE_SCALABLE
> +static inline reorder_window_t *queue_get_rwin(queue_entry_t *queue)
> +{
> + return queue->s.sched_elem.rwin;
> +}
> +#endif
> +
> #ifdef __cplusplus
> }
> #endif
> diff --git a/platform/linux-generic/include/odp_schedule_if.h
> b/platform/linux-generic/include/odp_schedule_if.h
> index 530d157f..41998833 100644
> --- a/platform/linux-generic/include/odp_schedule_if.h
> +++ b/platform/linux-generic/include/odp_schedule_if.h
> @@ -4,6 +4,12 @@
> * SPDX-License-Identifier: BSD-3-Clause
> */
>
> +/* Copyright (c) 2017, ARM Limited
> + * All rights reserved.
> + *
> + * SPDX-License-Identifier: BSD-3-Clause
> + */
> +
> #ifndef ODP_SCHEDULE_IF_H_
> #define ODP_SCHEDULE_IF_H_
>
> @@ -11,18 +17,168 @@
> extern "C" {
> #endif
>
> +#include <odp/api/align.h>
> #include <odp/api/queue.h>
> -#include <odp_queue_internal.h>
> #include <odp/api/schedule.h>
> +#include <odp/api/ticketlock.h>
> +
> +#include <odp_forward_typedefs_internal.h>
> +#ifdef ODP_SCHEDULE_SCALABLE
> +#include <odp_config_internal.h>
> +#include <odp_llqueue.h>
> +#include <odp_schedule_ordered_internal.h>
> +
> +#include <limits.h>
> +#endif
> +
> +/* Number of ordered locks per queue */
> +#define SCHEDULE_ORDERED_LOCKS_PER_QUEUE 2
> +
> +#ifdef ODP_SCHEDULE_SCALABLE
> +
> +typedef struct {
> + union {
> + struct {
> + struct llqueue llq;
> + uint32_t prio;
> + };
> + char line[ODP_CACHE_LINE_SIZE];
> + };
> +} sched_queue_t ODP_ALIGNED_CACHE;
> +
> +#define TICKET_INVALID (uint16_t)(~0U)
> +
> +typedef struct {
> + int32_t numevts;
> + uint16_t wrr_budget;
> + uint8_t cur_ticket;
> + uint8_t nxt_ticket;
> +} qschedstate_t ODP_ALIGNED(sizeof(uint64_t));
> +
> +typedef uint32_t ringidx_t;
> +
> +#ifdef ODP_CONFIG_USE_SPLIT_PRODCONS
> +#define SPLIT_PC ODP_ALIGNED_CACHE
> +#else
> +#define SPLIT_PC
> +#endif
> +
> +#ifdef ODP_CONFIG_USE_SPLIT_READWRITE
> +#define SPLIT_RW ODP_ALIGNED_CACHE
> +#else
> +#define SPLIT_RW
> +#endif
> +
> +#define ODP_NO_SCHED_QUEUE (ODP_SCHED_SYNC_ORDERED + 1)
> +
> +typedef struct {
> + /* (ll)node must be first */
> + struct llnode node; /* 8: 0.. 7 */
> + sched_queue_t *schedq; /* 8: 8..15 */
> +#ifdef CONFIG_QSCHST_LOCK
> + odp_ticketlock_t qschlock;
> +#endif
> + qschedstate_t qschst; /* 8: 16..23 */
> + uint16_t pop_deficit; /* 2: 24..25 */
> + uint16_t qschst_type; /* 2: 26..27 */
> +
> + ringidx_t prod_read SPLIT_PC; /* 4: 28..31 */
> +
> + ringidx_t prod_write SPLIT_RW;/* 4: 32..35 */
> + ringidx_t prod_mask; /* 4: 36..39 */
> + odp_buffer_hdr_t **prod_ring; /* 8: 40..47 */
> +
> + ringidx_t cons_write SPLIT_PC;/* 4: 48..51 */
> +
> + ringidx_t cons_read SPLIT_RW; /* 4: 52..55 */
> +
> + reorder_window_t *rwin; /* 8: 56..63 */
> + void *user_ctx; /* 8: 64..71 */
> +#ifdef ODP_CONFIG_USE_SPLIT_PRODCONS
> + odp_buffer_hdr_t **cons_ring;
> + ringidx_t cons_mask;
> + uint16_t cons_type;
> +#else
> +#define cons_mask prod_mask
> +#define cons_ring prod_ring
> +#define cons_type qschst_type
> +#endif
> +} sched_elem_t ODP_ALIGNED_CACHE;
> +
> +/*
> + * Scheduler group related declarations.
> + */
> +typedef bitset_t sched_group_mask_t;
> +
> +/* Number of scheduling groups */
> +#define MAX_SCHED_GROUP (sizeof(sched_group_mask_t) * CHAR_BIT)
> +
> +typedef struct {
> + /* Threads currently associated with the sched group */
> + bitset_t thr_actual[ODP_SCHED_PRIO_NUM] ODP_ALIGNED_CACHE;
> + bitset_t thr_wanted;
> + /* Used to spread queues over schedq's */
> + uint32_t xcount[ODP_SCHED_PRIO_NUM];
> + /* Number of schedq's per prio */
> + uint32_t xfactor;
> +
> + char name[ODP_SCHED_GROUP_NAME_LEN];
> + /* shm handle for the group */
> + odp_shm_t shm;
> +
> + /* ODP_SCHED_PRIO_NUM * xfactor.
> + * Must be the last memember in this struct.
> + */
> + sched_queue_t schedq[1] ODP_ALIGNED_CACHE;
> +} sched_group_t;
> +
> +/*
> + * Per thread state
> + */
> +/* Number of reorder contexts per thread */
> +#define TS_RVEC_SIZE 16
> +typedef struct {
> + /* Atomic queue currently being processed or NULL */
> + sched_elem_t *atomq;
> + /* Current reorder context or NULL */
> + reorder_context_t *rctx;
> + uint8_t pause;
> + uint8_t out_of_order;
> + uint8_t tidx;
> + uint8_t pad;
> + uint32_t dequeued; /* Number of events dequeued from atomic queue */
> + uint16_t pktin_next;/* Next pktin tag to poll */
> + uint16_t pktin_poll_cnts;
> + uint16_t ticket; /* Ticket for atomic queue or TICKET_INVALID */
> + uint16_t num_schedq;
> + uint16_t sg_sem; /* Set when sg_wanted is modified by other thread */
> +#define SCHEDQ_PER_THREAD (MAX_SCHED_GROUP * ODP_SCHED_PRIO_NUM)
> + sched_queue_t *schedq_list[SCHEDQ_PER_THREAD];
> + /* Current sched_group membership */
> + sched_group_mask_t sg_actual[ODP_SCHED_PRIO_NUM];
> + /* Future sched_group membership. */
> + sched_group_mask_t sg_wanted[ODP_SCHED_PRIO_NUM];
> + bitset_t priv_rvec_free;
> + /* Bitset of free entries in rvec[] */
> + bitset_t rvec_free ODP_ALIGNED_CACHE;
> + /* Reordering contexts to allocate from */
> + reorder_context_t rvec[TS_RVEC_SIZE] ODP_ALIGNED_CACHE;
> +} sched_scalable_thread_state_t ODP_ALIGNED_CACHE;
> +
> +void sched_update_enq(sched_elem_t *q, uint32_t actual);
> +void sched_update_enq_sp(sched_elem_t *q, uint32_t actual);
> +sched_queue_t *schedq_from_sched_group(odp_schedule_group_t grp, uint32_t
> prio);
> +void sched_group_xcount_dec(odp_schedule_group_t grp, uint32_t prio);
> +
> +#endif /* ODP_SCHEDULE_SCALABLE */
>
> typedef void (*schedule_pktio_start_fn_t)(int pktio_index, int num_in_queue,
> int in_queue_idx[]);
> typedef int (*schedule_thr_add_fn_t)(odp_schedule_group_t group, int thr);
> typedef int (*schedule_thr_rem_fn_t)(odp_schedule_group_t group, int thr);
> typedef int (*schedule_num_grps_fn_t)(void);
> -typedef int (*schedule_init_queue_fn_t)(uint32_t queue_index,
> - const odp_schedule_param_t *sched_param
> - );
> +typedef int (*schedule_init_queue_fn_t)(
> + uint32_t queue_index, const odp_schedule_param_t *sched_param);
> typedef void (*schedule_destroy_queue_fn_t)(uint32_t queue_index);
> typedef int (*schedule_sched_queue_fn_t)(uint32_t queue_index);
> typedef int (*schedule_unsched_queue_fn_t)(uint32_t queue_index);
> @@ -64,6 +220,7 @@ extern const schedule_fn_t *sched_fn;
> int sched_cb_pktin_poll(int pktio_index, int num_queue, int index[]);
> void sched_cb_pktio_stop_finalize(int pktio_index);
> int sched_cb_num_pktio(void);
> +#ifndef ODP_SCHEDULE_SCALABLE
> int sched_cb_num_queues(void);
> int sched_cb_queue_prio(uint32_t queue_index);
> int sched_cb_queue_grp(uint32_t queue_index);
> @@ -73,6 +230,7 @@ odp_queue_t sched_cb_queue_handle(uint32_t queue_index);
> void sched_cb_queue_destroy_finalize(uint32_t queue_index);
> int sched_cb_queue_deq_multi(uint32_t queue_index, odp_event_t ev[], int
> num);
> int sched_cb_queue_empty(uint32_t queue_index);
> +#endif
>
> /* API functions */
> typedef struct {
> diff --git a/platform/linux-generic/include/odp_schedule_ordered_internal.h
> b/platform/linux-generic/include/odp_schedule_ordered_internal.h
> new file mode 100644
> index 00000000..f086dae4
> --- /dev/null
> +++ b/platform/linux-generic/include/odp_schedule_ordered_internal.h
> @@ -0,0 +1,150 @@
> +/* Copyright (c) 2017, ARM Limited
> + * All rights reserved.
> + *
> + * SPDX-License-Identifier: BSD-3-Clause
> + */
> +
> +#ifndef ODP_SCHEDULE_ORDERED_INTERNAL_H_
> +#define ODP_SCHEDULE_ORDERED_INTERNAL_H_
> +
> +#ifdef __cplusplus
> +extern "C" {
> +#endif
> +
> +#ifdef ODP_SCHEDULE_SCALABLE
> +
> +#include <odp/api/shared_memory.h>
> +
> +#include <odp_internal.h>
> +
> +#include <odp_align_internal.h>
> +#include <odp_bitset.h>
> +#include <odp_llsc.h>
> +
> +/* High level functioning of reordering
> + * Datastructures -
> + * Reorder Window - Every ordered queue is associated with a reorder window.
> + * Reorder window stores reorder contexts from threads that
> + * have completed processing out-of-order.
> + * Reorder Context - Reorder context consists of events that a thread
> + * wants to enqueue while processing a batch of events
> + * from an ordered queue.
> + *
> + * Algorithm -
> + * 1) Thread identifies the ordered queue.
> + * 2) It 'reserves a slot in the reorder window and dequeues the
> + * events' atomically. Atomicity is achieved by using a ticket-lock
> + * like design where the reorder window slot is the ticket.
> + * 3a) Upon order-release/next schedule call, the thread
> + * checks if it's slot (ticket) equals the head of the reorder window.
> + * If yes, enqueues the events to the destination queue till
> + * i) the reorder window is empty or
> + * ii) there is a gap in the reorder window
> + * If no, the reorder context is stored in the reorder window at
> + * the reserved slot.
> + * 3b) Upon the first enqueue, the thread checks if it's slot (ticket)
> + * equals the head of the reorder window.
> + * If yes, enqueues the events immediately to the destination queue
> + * If no, these (and subsequent) events are stored in the reorder context
> + * (in the application given order)
> + */
> +
> +/* Head and change indicator variables are used to synchronise between
> + * concurrent insert operations in the reorder window. A thread performing
> + * an in-order insertion must be notified about the newly inserted
> + * reorder contexts so that it doesn’t halt the retire process too early.
> + * A thread performing an out-of-order insertion must correspondingly
> + * notify the thread doing in-order insertion of the new waiting reorder
> + * context, which may need to be handled by that thread.
> + *
> + * Also, an out-of-order insertion may become an in-order insertion if the
> + * thread doing an in-order insertion completes before this thread completes.
> + * We need a point of synchronisation where this knowledge and potential
> state
> + * change can be transferred between threads.
> + */
> +typedef struct hc {
> + /* First missing context */
> + uint32_t head;
> + /* Change indicator */
> + uint32_t chgi;
> +} hc_t ODP_ALIGNED(sizeof(uint64_t));
> +
> +/* Number of reorder contects in the reorder window.
> + * Should be at least one per CPU.
> + */
> +#define RWIN_SIZE 32
> +ODP_STATIC_ASSERT(CHECK_IS_POWER2(RWIN_SIZE), "RWIN_SIZE is not a power of
> 2");
> +
> +#define NUM_OLOCKS 2
> +
> +typedef struct reorder_context reorder_context_t;
> +
> +typedef struct reorder_window {
> + /* head and change indicator */
> + hc_t hc;
> + uint32_t winmask;
> + uint32_t tail;
> + uint32_t turn;
> + uint32_t olock[NUM_OLOCKS];
> + uint16_t lock_count;
> + /* Reorder contexts in this window */
> + reorder_context_t *ring[RWIN_SIZE];
> +} reorder_window_t;
> +
> +/* Number of events that can be stored in a reorder context.
> + * This size is chosen so that there is no space left unused at the end
> + * of the last cache line (for 64b architectures and 64b handles).
> + */
> +#define RC_EVT_SIZE 18
> +
> +struct reorder_context {
> + /* Reorder window to which this context belongs */
> + reorder_window_t *rwin;
> + /* Pointer to TS->rvec_free */
> + bitset_t *rvec_free;
> + /* Our slot number in the reorder window */
> + uint32_t sn;
> + uint8_t olock_flags;
> + /* Our index in thread_state rvec array */
> + uint8_t idx;
> + /* Use to link reorder contexts together */
> + uint8_t next_idx;
> + /* Current reorder context to save events in */
> + uint8_t cur_idx;
> + /* Number of events stored in this reorder context */
> + uint8_t numevts;
> + /* Events stored in this context */
> + odp_buffer_hdr_t *events[RC_EVT_SIZE];
> + queue_entry_t *destq[RC_EVT_SIZE];
> +} ODP_ALIGNED_CACHE;
> +
> +reorder_window_t *rwin_alloc(int rwin_id, odp_shm_t *shm, unsigned
> lock_count);
> +bool rwin_reserve(reorder_window_t *rwin, uint32_t *sn);
> +void rwin_insert(reorder_window_t *rwin,
> + reorder_context_t *rctx,
> + uint32_t sn,
> + void (*callback)(reorder_context_t *));
> +void rctx_init(reorder_context_t *rctx, uint16_t idx,
> + reorder_window_t *rwin, uint32_t sn);
> +void rctx_free(const reorder_context_t *rctx);
> +void olock_unlock(const reorder_context_t *rctx, reorder_window_t *rwin,
> + uint32_t lock_index);
> +void olock_release(const reorder_context_t *rctx);
> +void rctx_retire(reorder_context_t *first);
> +void rctx_release(reorder_context_t *rctx);
> +
> +#else
> +
> +#define SUSTAIN_ORDER 1
> +
> +int schedule_ordered_queue_enq(uint32_t queue_index, void *p_buf_hdr,
> + int sustain, int *ret);
> +int schedule_ordered_queue_enq_multi(uint32_t queue_index, void *p_buf_hdr[],
> + int num, int sustain, int *ret);
> +#endif
> +
> +#ifdef __cplusplus
> +}
> +#endif
> +
> +#endif
> diff --git a/platform/linux-generic/m4/odp_schedule.m4
> b/platform/linux-generic/m4/odp_schedule.m4
> index 91c19f21..d862b8b2 100644
> --- a/platform/linux-generic/m4/odp_schedule.m4
> +++ b/platform/linux-generic/m4/odp_schedule.m4
> @@ -1,13 +1,44 @@
> -AC_ARG_ENABLE([schedule-sp],
> - [ --enable-schedule-sp enable strict priority scheduler],
> - [if test x$enableval = xyes; then
> - schedule_sp_enabled=yes
> - ODP_CFLAGS="$ODP_CFLAGS -DODP_SCHEDULE_SP"
> - fi])
> +# Checks for --enable-schedule-sp and defines ODP_SCHEDULE_SP and adds
> +# -DODP_SCHEDULE_SP to CFLAGS.
> +AC_ARG_ENABLE(
> + [schedule_sp],
> + [AC_HELP_STRING([--enable-schedule-sp],
> + [enable strict priority scheduler])],
> + [if test "x$enableval" = xyes; then
> + schedule_sp=true
> + ODP_CFLAGS="$ODP_CFLAGS -DODP_SCHEDULE_SP"
> + else
> + schedule_sp=false
> + fi],
> + [schedule_sp=false])
> +AM_CONDITIONAL([ODP_SCHEDULE_SP], [test x$schedule_sp = xtrue])
>
> -AC_ARG_ENABLE([schedule-iquery],
> - [ --enable-schedule-iquery enable interests query (sparse bitmap)
> scheduler],
> - [if test x$enableval = xyes; then
> - schedule_iquery_enabled=yes
> - ODP_CFLAGS="$ODP_CFLAGS -DODP_SCHEDULE_IQUERY"
> - fi])
> +# Checks for --enable-schedule-iquery and defines ODP_SCHEDULE_IQUERY and
> adds
> +# -DODP_SCHEDULE_IQUERY to CFLAGS.
> +AC_ARG_ENABLE(
> + [schedule_iquery],
> + [AC_HELP_STRING([--enable-schedule-iquery],
> + [enable interests query (sparse bitmap) scheduler])],
> + [if test "x$enableval" = xyes; then
> + schedule_iquery=true
> + ODP_CFLAGS="$ODP_CFLAGS -DODP_SCHEDULE_IQUERY"
> + else
> + schedule_iquery=false
> + fi],
> + [schedule_iquery=false])
> +AM_CONDITIONAL([ODP_SCHEDULE_IQUERY], [test x$schedule_iquery = xtrue])
> +
> +# Checks for --enable-schedule-scalable and defines ODP_SCHEDULE_SCALABLE and
> +# adds -DODP_SCHEDULE_SCALABLE to CFLAGS.
> +AC_ARG_ENABLE(
> + [schedule_scalable],
> + [AC_HELP_STRING([--enable-schedule-scalable],
> + [enable scalable scheduler])],
> + [if test "x$enableval" = xyes; then
> + schedule_scalable=true
> + ODP_CFLAGS="$ODP_CFLAGS -DODP_SCHEDULE_SCALABLE"
> + else
> + schedule_scalable=false
> + fi],
> + [schedule_scalable=false])
> +AM_CONDITIONAL([ODP_SCHEDULE_SCALABLE], [test x$schedule_scalable = xtrue])
> diff --git a/platform/linux-generic/odp_classification.c
> b/platform/linux-generic/odp_classification.c
> index 5d96b00b..8fb5c32f 100644
> --- a/platform/linux-generic/odp_classification.c
> +++ b/platform/linux-generic/odp_classification.c
> @@ -282,7 +282,7 @@ odp_queue_t odp_cos_queue(odp_cos_t cos_id)
> if (!cos->s.queue)
> return ODP_QUEUE_INVALID;
>
> - return cos->s.queue->s.handle;
> + return queue_get_handle(cos->s.queue);
> }
>
> int odp_cos_drop_set(odp_cos_t cos_id, odp_cls_drop_t drop_policy)
> @@ -849,7 +849,7 @@ int cls_classify_packet(pktio_entry_t *entry, const
> uint8_t *base,
>
> *pool = cos->s.pool;
> pkt_hdr->p.input_flags.dst_queue = 1;
> - pkt_hdr->dst_queue = cos->s.queue->s.handle;
> + pkt_hdr->dst_queue = queue_get_handle(cos->s.queue);
>
> return 0;
> }
> diff --git a/platform/linux-generic/odp_packet_io.c
> b/platform/linux-generic/odp_packet_io.c
> index 5e783d83..a267ee97 100644
> --- a/platform/linux-generic/odp_packet_io.c
> +++ b/platform/linux-generic/odp_packet_io.c
> @@ -5,23 +5,25 @@
> */
> #include <odp_posix_extensions.h>
>
> -#include <odp/api/packet_io.h>
> -#include <odp_packet_io_internal.h>
> -#include <odp_packet_io_queue.h>
> #include <odp/api/packet.h>
> -#include <odp_packet_internal.h>
> -#include <odp_internal.h>
> +#include <odp/api/packet_io.h>
> #include <odp/api/spinlock.h>
> #include <odp/api/ticketlock.h>
> #include <odp/api/shared_memory.h>
> -#include <odp_packet_socket.h>
> +#include <odp/api/time.h>
> +
> +#include <odp_internal.h>
> #include <odp_config_internal.h>
> -#include <odp_queue_internal.h>
> -#include <odp_schedule_if.h>
> -#include <odp_classification_internal.h>
> #include <odp_debug_internal.h>
> +
> +#include <odp_classification_internal.h>
> +#include <odp_queue_internal.h>
> #include <odp_packet_io_ipc_internal.h>
> -#include <odp/api/time.h>
> +#include <odp_packet_io_internal.h>
> +#include <odp_packet_io_queue.h>
> +#include <odp_packet_internal.h>
> +#include <odp_packet_socket.h>
> +#include <odp_schedule_if.h>
>
> #include <string.h>
> #include <inttypes.h>
> @@ -470,7 +472,6 @@ int odp_pktio_start(odp_pktio_t hdl)
> return -1;
> }
> }
> -
> sched_fn->pktio_start(pktio_to_id(hdl), num, index);
> }
>
> @@ -552,7 +553,6 @@ static inline int pktin_recv_buf(odp_pktin_queue_t queue,
> odp_packet_t packets[num];
> odp_packet_hdr_t *pkt_hdr;
> odp_buffer_hdr_t *buf_hdr;
> - odp_buffer_t buf;
> int i;
> int pkts;
> int num_rx = 0;
> @@ -562,9 +562,11 @@ static inline int pktin_recv_buf(odp_pktin_queue_t queue,
> for (i = 0; i < pkts; i++) {
> pkt = packets[i];
> pkt_hdr = odp_packet_hdr(pkt);
> - buf = _odp_packet_to_buffer(pkt);
> - buf_hdr = buf_hdl_to_hdr(buf);
> -
> +#ifdef ODP_SCHEDULE_SCALABLE
> + buf_hdr = _odp_packet_to_buf_hdr_ptr(pkt);
> +#else
> + buf_hdr = buf_hdl_to_hdr(_odp_packet_to_buffer(pkt));
> +#endif
> if (pkt_hdr->p.input_flags.dst_queue) {
> queue_entry_t *dst_queue;
> int ret;
> @@ -582,11 +584,17 @@ static inline int pktin_recv_buf(odp_pktin_queue_t
> queue,
>
> int pktout_enqueue(queue_entry_t *qentry, odp_buffer_hdr_t *buf_hdr)
> {
> - odp_packet_t pkt = _odp_packet_from_buffer(buf_hdr->handle.handle);
> + odp_packet_t pkt;
> +
> +#ifdef ODP_SCHEDULE_SCALABLE
> + pkt = _odp_packet_from_buffer((odp_buffer_t)buf_hdr);
> +#else
> + pkt = _odp_packet_from_buffer(buf_hdr->handle.handle);
> +#endif
> int len = 1;
> int nbr;
>
> - nbr = odp_pktout_send(qentry->s.pktout, &pkt, len);
> + nbr = odp_pktout_send(queue_get_pktout(qentry), &pkt, len);
> return (nbr == len ? 0 : -1);
> }
>
> @@ -604,9 +612,13 @@ int pktout_enq_multi(queue_entry_t *qentry,
> odp_buffer_hdr_t *buf_hdr[],
> int i;
>
> for (i = 0; i < num; ++i)
> +#ifdef ODP_SCHEDULE_SCALABLE
> + pkt_tbl[i] = _odp_packet_from_buffer((odp_buffer_t)buf_hdr[i]);
> +#else
> pkt_tbl[i] = _odp_packet_from_buffer(buf_hdr[i]->handle.handle);
> +#endif
>
> - nbr = odp_pktout_send(qentry->s.pktout, pkt_tbl, num);
> + nbr = odp_pktout_send(queue_get_pktout(qentry), pkt_tbl, num);
> return nbr;
> }
>
> @@ -632,13 +644,14 @@ odp_buffer_hdr_t *pktin_dequeue(queue_entry_t *qentry)
> int pkts;
>
> buf_hdr = queue_deq(qentry);
> - if (buf_hdr != NULL)
> + if (buf_hdr != BUFFER_HDR_INVALID)
> return buf_hdr;
>
> - pkts = pktin_recv_buf(qentry->s.pktin, hdr_tbl, QUEUE_MULTI_MAX);
> + pkts = pktin_recv_buf(queue_get_pktin(qentry),
> + hdr_tbl, QUEUE_MULTI_MAX);
>
> if (pkts <= 0)
> - return NULL;
> + return BUFFER_HDR_INVALID;
>
> if (pkts > 1)
> queue_enq_multi(qentry, &hdr_tbl[1], pkts - 1);
> @@ -669,7 +682,8 @@ int pktin_deq_multi(queue_entry_t *qentry,
> odp_buffer_hdr_t *buf_hdr[], int num)
> if (nbr == num)
> return nbr;
>
> - pkts = pktin_recv_buf(qentry->s.pktin, hdr_tbl, QUEUE_MULTI_MAX);
> + pkts = pktin_recv_buf(queue_get_pktin(qentry),
> + hdr_tbl, QUEUE_MULTI_MAX);
> if (pkts <= 0)
> return nbr;
>
> @@ -684,7 +698,6 @@ int pktin_deq_multi(queue_entry_t *qentry,
> odp_buffer_hdr_t *buf_hdr[], int num)
> queue_enq_multi(qentry, hdr_tbl, j);
> return nbr;
> }
> -
> int sched_cb_pktin_poll(int pktio_index, int num_queue, int index[])
> {
> odp_buffer_hdr_t *hdr_tbl[QUEUE_MULTI_MAX];
> @@ -1266,13 +1279,14 @@ int odp_pktin_queue_config(odp_pktio_t pktio,
> queue_entry_t *qentry;
>
> qentry = queue_to_qentry(queue);
> - qentry->s.pktin.index = i;
> - qentry->s.pktin.pktio = pktio;
> -
> - qentry->s.enqueue = pktin_enqueue;
> - qentry->s.dequeue = pktin_dequeue;
> - qentry->s.enqueue_multi = pktin_enq_multi;
> - qentry->s.dequeue_multi = pktin_deq_multi;
> + queue_set_pktin(qentry, pktio, i);
> +
> + queue_set_enq_func(qentry, pktin_enqueue);
> + queue_set_deq_func(qentry, pktin_dequeue);
> + queue_set_enq_multi_func(qentry,
> + pktin_enq_multi);
> + queue_set_deq_multi_func(qentry,
> + pktin_deq_multi);
> }
>
> entry->s.in_queue[i].queue = queue;
> @@ -1390,14 +1404,12 @@ int odp_pktout_queue_config(odp_pktio_t pktio,
> }
>
> qentry = queue_to_qentry(queue);
> - qentry->s.pktout.index = i;
> - qentry->s.pktout.pktio = pktio;
> -
> - /* Override default enqueue / dequeue functions */
> - qentry->s.enqueue = pktout_enqueue;
> - qentry->s.dequeue = pktout_dequeue;
> - qentry->s.enqueue_multi = pktout_enq_multi;
> - qentry->s.dequeue_multi = pktout_deq_multi;
> + queue_set_pktout(qentry, pktio, i);
> +
> + queue_set_enq_func(qentry, pktout_enqueue);
> + queue_set_deq_func(qentry, pktout_dequeue);
> + queue_set_enq_multi_func(qentry, pktout_enq_multi);
> + queue_set_deq_multi_func(qentry, pktout_deq_multi);
>
> entry->s.out_queue[i].queue = queue;
> }
> diff --git a/platform/linux-generic/odp_queue.c
> b/platform/linux-generic/odp_queue.c
> index fcf4bf5b..42b58c44 100644
> --- a/platform/linux-generic/odp_queue.c
> +++ b/platform/linux-generic/odp_queue.c
> @@ -291,11 +291,11 @@ void sched_cb_queue_destroy_finalize(uint32_t
> queue_index)
> int odp_queue_destroy(odp_queue_t handle)
> {
> queue_entry_t *queue;
> - queue = queue_to_qentry(handle);
>
> if (handle == ODP_QUEUE_INVALID)
> return -1;
>
> + queue = queue_to_qentry(handle);
> LOCK(&queue->s.lock);
> if (queue->s.status == QUEUE_STATUS_FREE) {
> UNLOCK(&queue->s.lock);
> diff --git a/platform/linux-generic/odp_queue_scalable.c
> b/platform/linux-generic/odp_queue_scalable.c
> new file mode 100644
> index 00000000..6bbf2846
> --- /dev/null
> +++ b/platform/linux-generic/odp_queue_scalable.c
> @@ -0,0 +1,883 @@
> +/* Copyright (c) 2017, ARM Limited
> + * All rights reserved.
> + *
> + * SPDX-License-Identifier: BSD-3-Clause
> + */
> +
> +#include <odp/api/hints.h>
> +#include <odp/api/plat/ticketlock_inlines.h>
> +#include <odp/api/queue.h>
> +#include <odp/api/schedule.h>
> +#include <odp/api/shared_memory.h>
> +#include <odp/api/sync.h>
> +#include <odp/api/traffic_mngr.h>
> +
> +#include <odp_internal.h>
> +#include <odp_config_internal.h>
> +#include <odp_debug_internal.h>
> +
> +#include <odp_buffer_inlines.h>
> +#include <odp_packet_io_internal.h>
> +#include <odp_packet_io_queue.h>
> +#include <odp_pool_internal.h>
> +#include <odp_queue_internal.h>
> +#include <odp_schedule_if.h>
> +
> +#include <string.h>
> +#include <inttypes.h>
> +
> +#define NUM_INTERNAL_QUEUES 64
> +
> +#define MIN(a, b) \
> + ({ \
> + __typeof__(a) tmp_a = (a); \
> + __typeof__(b) tmp_b = (b); \
> + tmp_a < tmp_b ? tmp_a : tmp_b; \
> + })
> +
> +#define LOCK(a) _odp_ticketlock_lock(a)
> +#define UNLOCK(a) _odp_ticketlock_unlock(a)
> +#define LOCK_INIT(a) odp_ticketlock_init(a)
> +
> +extern __thread sched_scalable_thread_state_t *sched_ts;
> +
> +typedef struct queue_table_t {
> + queue_entry_t queue[ODP_CONFIG_QUEUES];
> +} queue_table_t;
> +
> +static queue_table_t *queue_tbl;
> +
> +static inline odp_queue_t queue_from_id(uint32_t queue_id)
> +{
> + return _odp_cast_scalar(odp_queue_t, queue_id + 1);
> +}
> +
> +queue_entry_t *get_qentry(uint32_t queue_id)
> +{
> + return &queue_tbl->queue[queue_id];
> +}
> +
> +static int _odp_queue_disable_enq(sched_elem_t *q)
> +{
> + ringidx_t old_read, old_write, new_write;
> + uint32_t size;
> +
> + old_write = q->prod_write;
> + size = q->prod_mask + 1;
> + do {
> + /* Need __atomic_load to avoid compiler reordering */
> + old_read = __atomic_load_n(&q->prod_read, __ATOMIC_ACQUIRE);
> + if (old_write != old_read) {
> + /* Queue is not empty, cannot claim all elements
> + * Cannot disable enqueue.
> + */
> + return -1;
> + }
> + /* Claim all elements in ring */
> + new_write = old_write + size;
> + } while (!__atomic_compare_exchange_n(&q->prod_write,
> + &old_write, /* Updated on failure */
> + new_write,
> + true,
> + __ATOMIC_RELAXED,
> + __ATOMIC_RELAXED));
> + /* All remaining elements claimed, noone else can enqueue */
> + return 0;
> +}
> +
> +static int queue_init(int queue_idx, queue_entry_t *queue, const char *name,
> + const odp_queue_param_t *param)
> +{
> + ringidx_t ring_idx;
> + sched_elem_t *sched_elem;
> + odp_shm_t shm;
> + uint32_t ring_size;
> + odp_buffer_hdr_t **ring;
> +
> + sched_elem = &queue->s.sched_elem;
> + ring_size = param->ring_size > 0 ?
> + ROUNDUP_POWER2_U32(param->ring_size) :
> + ODP_CONFIG_QUEUE_SIZE;
> + strncpy(queue->s.name, name ? name : "", ODP_QUEUE_NAME_LEN - 1);
> + queue->s.name[ODP_QUEUE_NAME_LEN - 1] = 0;
> + memcpy(&queue->s.param, param, sizeof(odp_queue_param_t));
> +
> + shm = odp_shm_reserve(name,
> + ring_size * sizeof(odp_buffer_hdr_t *),
> + ODP_CACHE_LINE_SIZE,
> + ODP_SHM_PROC);
> + if (ODP_SHM_INVALID == shm)
> + return -1;
> +
> + ring = (odp_buffer_hdr_t **)odp_shm_addr(shm);
> +
> + for (ring_idx = 0; ring_idx < ring_size; ring_idx++)
> + ring[ring_idx] = NULL;
> +
> + queue->s.shm = shm;
> + queue->s.type = queue->s.param.type;
> + queue->s.enqueue = queue_enq;
> + queue->s.dequeue = queue_deq;
> + queue->s.enqueue_multi = queue_enq_multi;
> + queue->s.dequeue_multi = queue_deq_multi;
> + queue->s.pktin = PKTIN_INVALID;
> +
> + sched_elem->node.next = NULL;
> +#ifdef CONFIG_QSCHST_LOCK
> + LOCK_INIT(&sched_elem->qschlock);
> +#endif
> + sched_elem->qschst.numevts = 0;
> + sched_elem->qschst.wrr_budget = CONFIG_WRR_WEIGHT;
> + sched_elem->qschst.cur_ticket = 0;
> + sched_elem->qschst.nxt_ticket = 0;
> + sched_elem->pop_deficit = 0;
> + if (queue->s.type == ODP_QUEUE_TYPE_SCHED)
> + sched_elem->qschst_type = queue->s.param.sched.sync;
> + else
> + sched_elem->qschst_type = ODP_NO_SCHED_QUEUE;
> + /* 2nd cache line - enqueue */
> + sched_elem->prod_read = 0;
> + sched_elem->prod_write = 0;
> + sched_elem->prod_ring = ring;
> + sched_elem->prod_mask = ring_size - 1;
> + /* 3rd cache line - dequeue */
> + sched_elem->cons_read = 0;
> + sched_elem->cons_write = 0;
> + sched_elem->rwin = NULL;
> + sched_elem->schedq = NULL;
> + sched_elem->user_ctx = queue->s.param.context;
> +#ifdef ODP_CONFIG_USE_SPLIT_PRODCONS
> + sched_elem->cons_ring = ring;
> + sched_elem->cons_mask = ring_size - 1;
> + sched_elem->cons_type = sched_elem->qschst_type;
> +#endif
> +
> + /* Queue initialized successfully, add it to the sched group */
> + if (queue->s.type == ODP_QUEUE_TYPE_SCHED) {
> + if (queue->s.param.sched.sync == ODP_SCHED_SYNC_ORDERED) {
> + sched_elem->rwin =
> + rwin_alloc(queue_idx, &queue->s.rwin_shm,
> + queue->s.param.sched.lock_count);
> + if (sched_elem->rwin == NULL) {
> + ODP_ERR("Reorder window not created\n");
> + goto rwin_create_failed;
> + }
> + }
> + sched_elem->schedq =
> + schedq_from_sched_group(param->sched.group,
> + param->sched.prio);
> + }
> +
> + return 0;
> +
> +rwin_create_failed:
> + odp_shm_free(queue->s.shm);
> + return -1;
> +}
> +
> +int odp_queue_init_global(void)
> +{
> + uint32_t i;
> + odp_shm_t shm;
> +
> + ODP_DBG("Queue init ... ");
> +
> + shm = odp_shm_reserve("odp_queues",
> + sizeof(queue_table_t),
> + sizeof(queue_entry_t), 0);
> +
> + queue_tbl = odp_shm_addr(shm);
> +
> + if (queue_tbl == NULL)
> + return -1;
> +
> + memset(queue_tbl, 0, sizeof(queue_table_t));
> +
> + for (i = 0; i < ODP_CONFIG_QUEUES; i++) {
> + /* init locks */
> + queue_entry_t *queue;
> +
> + queue = get_qentry(i);
> + LOCK_INIT(&queue->s.lock);
> + queue->s.index = i;
> + queue->s.handle = queue_from_id(i);
> + }
> +
> + ODP_DBG("done\n");
> + ODP_DBG("Queue init global\n");
> + ODP_DBG(" struct queue_entry_s size %zu\n",
> + sizeof(struct queue_entry_s));
> + ODP_DBG(" queue_entry_t size %zu\n",
> + sizeof(queue_entry_t));
> + ODP_DBG("\n");
> +
> + return 0;
> +}
> +
> +int odp_queue_term_global(void)
> +{
> + int ret = 0;
> + int rc = 0;
> + queue_entry_t *queue;
> + int i;
> +
> + for (i = 0; i < ODP_CONFIG_QUEUES; i++) {
> + queue = &queue_tbl->queue[i];
> + if (__atomic_load_n(&queue->s.status,
> + __ATOMIC_RELAXED) != QUEUE_STATUS_FREE) {
> + ODP_ERR("Not destroyed queue: %s\n", queue->s.name);
> + rc = -1;
> + }
> + }
> +
> + ret = odp_shm_free(odp_shm_lookup("odp_queues"));
> + if (ret < 0) {
> + ODP_ERR("shm free failed for odp_queues");
> + rc = -1;
> + }
> +
> + return rc;
> +}
> +
> +int odp_queue_capability(odp_queue_capability_t *capa)
> +{
> + memset(capa, 0, sizeof(odp_queue_capability_t));
> +
> + /* Reserve some queues for internal use */
> + capa->max_queues = ODP_CONFIG_QUEUES - NUM_INTERNAL_QUEUES;
> + capa->max_ordered_locks = SCHEDULE_ORDERED_LOCKS_PER_QUEUE;
> + capa->max_sched_groups = sched_fn->num_grps();
> + capa->sched_prios = odp_schedule_num_prio();
> +
> + return 0;
> +}
> +
> +odp_queue_type_t odp_queue_type(odp_queue_t handle)
> +{
> + return queue_to_qentry(handle)->s.type;
> +}
> +
> +odp_schedule_sync_t odp_queue_sched_type(odp_queue_t handle)
> +{
> + return queue_to_qentry(handle)->s.param.sched.sync;
> +}
> +
> +odp_schedule_prio_t odp_queue_sched_prio(odp_queue_t handle)
> +{
> + return queue_to_qentry(handle)->s.param.sched.prio;
> +}
> +
> +odp_schedule_group_t odp_queue_sched_group(odp_queue_t handle)
> +{
> + return queue_to_qentry(handle)->s.param.sched.group;
> +}
> +
> +int odp_queue_lock_count(odp_queue_t handle)
> +{
> + queue_entry_t *queue = queue_to_qentry(handle);
> +
> + return queue->s.param.sched.sync == ODP_SCHED_SYNC_ORDERED ?
> + (int)queue->s.param.sched.lock_count : -1;
> +}
> +
> +odp_queue_t odp_queue_create(const char *name, const odp_queue_param_t
> *param)
> +{
> + int queue_idx;
> + odp_queue_t handle = ODP_QUEUE_INVALID;
> + queue_entry_t *queue;
> + odp_queue_param_t default_param;
> +
> + if (param == NULL) {
> + odp_queue_param_init(&default_param);
> + param = &default_param;
> + }
> +
> + for (queue_idx = 0; queue_idx < ODP_CONFIG_QUEUES; queue_idx++) {
> + queue = &queue_tbl->queue[queue_idx];
> +
> + if (queue->s.status != QUEUE_STATUS_FREE)
> + continue;
> +
> + LOCK(&queue->s.lock);
> + if (queue->s.status == QUEUE_STATUS_FREE) {
> + if (queue_init(queue_idx, queue, name, param)) {
> + UNLOCK(&queue->s.lock);
> + return handle;
> + }
> + queue->s.status = QUEUE_STATUS_READY;
> + handle = queue->s.handle;
> + UNLOCK(&queue->s.lock);
> + break;
> + }
> + UNLOCK(&queue->s.lock);
> + }
> + return handle;
> +}
> +
> +int odp_queue_destroy(odp_queue_t handle)
> +{
> + queue_entry_t *queue;
> + sched_elem_t *q;
> +
> + if (handle == ODP_QUEUE_INVALID)
> + return -1;
> +
> + queue = queue_to_qentry(handle);
> + LOCK(&queue->s.lock);
> + if (queue->s.status != QUEUE_STATUS_READY) {
> + UNLOCK(&queue->s.lock);
> + return -1;
> + }
> + q = &queue->s.sched_elem;
> +
> +#ifdef CONFIG_QSCHST_LOCK
> + LOCK(&q->qschlock);
> +#endif
> + if (_odp_queue_disable_enq(q)) {
> + /* Producer side not empty */
> +#ifdef CONFIG_QSCHST_LOCK
> + UNLOCK(&q->qschlock);
> +#endif
> + UNLOCK(&queue->s.lock);
> + return -1;
> + }
> + /* Enqueue is now disabled */
> + if (q->cons_read != q->cons_write) {
> + /* Consumer side is not empty
> + * Roll back previous change, enable enqueue again.
> + */
> + uint32_t size;
> +
> + size = q->prod_mask + 1;
> + __atomic_fetch_sub(&q->prod_write, size, __ATOMIC_RELAXED);
> +#ifdef CONFIG_QSCHST_LOCK
> + UNLOCK(&q->qschlock);
> +#endif
> + UNLOCK(&queue->s.lock);
> + return -1;
> + }
> +#ifdef CONFIG_QSCHST_LOCK
> + UNLOCK(&q->qschlock);
> +#endif
> + /* Producer and consumer sides empty, enqueue disabled
> + * Now wait until schedq state is empty and no outstanding tickets
> + */
> + while (__atomic_load_n(&q->qschst.numevts, __ATOMIC_RELAXED) != 0 ||
> + __atomic_load_n(&q->qschst.cur_ticket, __ATOMIC_RELAXED) !=
> + __atomic_load_n(&q->qschst.nxt_ticket, __ATOMIC_RELAXED)) {
> + SEVL();
> + while (WFE() && LDXR32((uint32_t *)&q->qschst.numevts,
> + __ATOMIC_RELAXED) != 0)
> + DOZE();
> + }
> +
> + /* Adjust the spread factor for the queues in the schedule group */
> + if (queue->s.type == ODP_QUEUE_TYPE_SCHED)
> + sched_group_xcount_dec(queue->s.param.sched.group,
> + queue->s.param.sched.prio);
> +
> + if (odp_shm_free(queue->s.shm) < 0) {
> + UNLOCK(&queue->s.lock);
> + return -1;
> + }
> + if (queue->s.param.sched.sync == ODP_SCHED_SYNC_ORDERED) {
> + if (odp_shm_free(queue->s.rwin_shm) < 0) {
> + ODP_ERR("Failed to free reorder window shm\n");
> + UNLOCK(&queue->s.lock);
> + return -1;
> + }
> + }
> + queue->s.status = QUEUE_STATUS_FREE;
> + UNLOCK(&queue->s.lock);
> + return 0;
> +}
> +
> +int odp_queue_context_set(odp_queue_t handle, void *context,
> + uint32_t len ODP_UNUSED)
> +{
> + odp_mb_full();
> + queue_to_qentry(handle)->s.param.context = context;
> + odp_mb_full();
> + return 0;
> +}
> +
> +void *odp_queue_context(odp_queue_t handle)
> +{
> + return queue_to_qentry(handle)->s.param.context;
> +}
> +
> +odp_queue_t odp_queue_lookup(const char *name)
> +{
> + uint32_t i;
> +
> + for (i = 0; i < ODP_CONFIG_QUEUES; i++) {
> + queue_entry_t *queue = &queue_tbl->queue[i];
> +
> + if (queue->s.status == QUEUE_STATUS_FREE ||
> + queue->s.status == QUEUE_STATUS_DESTROYED)
> + continue;
> +
> + LOCK(&queue->s.lock);
> + if (strcmp(name, queue->s.name) == 0) {
> + /* found it */
> + UNLOCK(&queue->s.lock);
> + return queue->s.handle;
> + }
> + UNLOCK(&queue->s.lock);
> + }
> +
> + return ODP_QUEUE_INVALID;
> +}
> +
> +#ifndef CONFIG_QSCHST_LOCK
> +static inline int _odp_queue_enq(sched_elem_t *q,
> + odp_buffer_hdr_t *buf_hdr[],
> + int num)
> +{
> + ringidx_t old_read;
> + ringidx_t old_write;
> + ringidx_t new_write;
> + int actual;
> + uint32_t mask;
> + odp_buffer_hdr_t **ring;
> +
> + mask = q->prod_mask;
> + ring = q->prod_ring;
> +
> + /* Load producer ring state (read & write index) */
> + old_write = __atomic_load_n(&q->prod_write, __ATOMIC_RELAXED);
> + do {
> + /* Consumer does store-release prod_read, we need
> + * load-acquire.
> + */
> + old_read = __atomic_load_n(&q->prod_read, __ATOMIC_ACQUIRE);
> +
> + actual = MIN(num, (int)((mask + 1) - (old_write - old_read)));
> + if (odp_unlikely(actual <= 0))
> + return 0;
> +
> + new_write = old_write + actual;
> + } while (!__atomic_compare_exchange_n(&q->prod_write,
> + &old_write, /* Updated on failure */
> + new_write,
> + true,
> + __ATOMIC_RELAXED,
> + __ATOMIC_RELAXED));
> +
> +#ifdef ODP_CONFIG_USE_SPLIT_PRODCONS
> + __builtin_prefetch(&q->cons_write, 0, 0);
> +#endif
> + /* Store our event(s) in the ring */
> + do {
> + ring[old_write & mask] = *buf_hdr++;
> + } while (++old_write != new_write);
> + old_write -= actual;
> +
> +#ifdef ODP_CONFIG_USE_SPLIT_PRODCONS
> + __builtin_prefetch(&q->node, 1, 0);
> +#endif
> + /* Wait for our turn to signal consumers */
> + if (odp_unlikely(__atomic_load_n(&q->cons_write,
> + __ATOMIC_RELAXED) != old_write)) {
> + SEVL();
> + while (WFE() && LDXR32(&q->cons_write,
> + __ATOMIC_RELAXED) != old_write)
> + DOZE();
> + }
> +
> + /* Signal consumers that events are available (release events)
> + * Enable other producers to continue
> + */
> + /* Wait for writes (to ring slots) to complete */
> + atomic_store_release(&q->cons_write, new_write, /*readonly=*/false);
> +
> + return actual;
> +}
> +
> +#else
> +
> +static inline int _odp_queue_enq_sp(sched_elem_t *q,
> + odp_buffer_hdr_t *buf_hdr[],
> + int num)
> +{
> + ringidx_t old_read;
> + ringidx_t old_write;
> + ringidx_t new_write;
> + int actual;
> + uint32_t mask;
> + odp_buffer_hdr_t **ring;
> +
> + mask = q->prod_mask;
> + ring = q->prod_ring;
> +
> + /* Load producer ring state (read & write index) */
> + old_write = q->prod_write;
> + /* Consumer does store-release prod_read, we need load-acquire */
> + old_read = __atomic_load_n(&q->prod_read, __ATOMIC_ACQUIRE);
> + actual = MIN(num, (int)((mask + 1) - (old_write - old_read)));
> + if (odp_unlikely(actual <= 0))
> + return 0;
> +
> + new_write = old_write + actual;
> + q->prod_write = new_write;
> +
> + /* Store our event(s) in the ring */
> + do {
> + ring[old_write & mask] = *buf_hdr++;
> + } while (++old_write != new_write);
> + old_write -= actual;
> +
> +#ifdef ODP_CONFIG_USE_SPLIT_PRODCONS
> + __builtin_prefetch(&q->node, 1, 0);
> +#endif
> +
> + /* Signal consumers that events are available (release events)
> + * Enable other producers to continue
> + */
> +#ifdef CONFIG_QSCHST_LOCK
> + q->cons_write = new_write;
> +#else
> + atomic_store_release(&q->cons_write, new_write, /*readonly=*/false);
> +#endif
> +
> + return actual;
> +}
> +#endif
> +
> +int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int
> num)
> +{
> + int actual;
> + int i;
> + sched_scalable_thread_state_t *ts;
> + reorder_context_t *first;
> + reorder_context_t *cur;
> + bitset_t next_idx;
> +
> +#ifdef CONFIG_QSCHST_LOCK
> + LOCK(&queue->s.sched_elem.qschlock);
> +#endif
> + ts = sched_ts;
> + if (ts && odp_unlikely(ts->out_of_order)) {
> + first = ts->rctx;
> + ODP_ASSERT(ts->rctx != NULL);
> + cur = &first[(int)first->cur_idx - (int)first->idx];
> + for (i = 0; i < num; i++) {
> + if (odp_unlikely(cur->numevts == RC_EVT_SIZE)) {
> + /* No more space in current reorder context
> + * Try to allocate another.
> + */
> + if (odp_unlikely(
> + bitset_is_null(ts->priv_rvec_free))) {
> + ts->priv_rvec_free =
> + atom_bitset_xchg(
> + &ts->rvec_free,
> + 0,
> + __ATOMIC_RELAXED);
> + if (odp_unlikely(bitset_is_null(
> + ts->priv_rvec_free)))
> + /* Out of reorder contexts.
> + * Return the number of events
> + * stored so far.
> + */
> +#ifdef CONFIG_QSCHST_LOCK
> + UNLOCK(&queue->s.sched_elem.
> + qschlock);
> +#endif
> + return i;
> + }
> + next_idx = bitset_ffs(ts->priv_rvec_free) - 1;
> + ts->priv_rvec_free =
> + bitset_clr(ts->priv_rvec_free,
> + next_idx);
> + /* Link current to next (for eventual
> + * retiring)
> + */
> + cur->next_idx = next_idx;
> + /* Link first to next (for next call to
> + * queue_enq_multi())
> + */
> + first->cur_idx = next_idx;
> + /* Update current to next */
> + cur = &ts->rvec[next_idx];
> + rctx_init(cur, next_idx, NULL, 0);
> + /* The last rctx (so far) */
> + cur->next_idx = first->idx;
> + }
> + cur->events[cur->numevts] = buf_hdr[i];
> + cur->destq[cur->numevts] = queue;
> + cur->numevts++;
> + }
> + /* All events stored. */
> +#ifdef CONFIG_QSCHST_LOCK
> + UNLOCK(&queue->s.sched_elem.qschlock);
> +#endif
> + return num;
> + }
> +
> +#ifdef CONFIG_QSCHST_LOCK
> + actual = _odp_queue_enq_sp(&queue->s.sched_elem, buf_hdr, num);
> +#else
> + actual = _odp_queue_enq(&queue->s.sched_elem, buf_hdr, num);
> +#endif
> +
> + if (odp_likely(queue->s.sched_elem.schedq != NULL && actual != 0)) {
> + /* Perform scheduler related updates. */
> +#ifdef CONFIG_QSCHST_LOCK
> + sched_update_enq_sp(&queue->s.sched_elem, actual);
> +#else
> + sched_update_enq(&queue->s.sched_elem, actual);
> +#endif
> + }
> +
> +#ifdef CONFIG_QSCHST_LOCK
> + UNLOCK(&queue->s.sched_elem.qschlock);
> +#endif
> + return actual;
> +}
> +
> +int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
> +{
> + return odp_likely(
> + queue_enq_multi(queue, &buf_hdr, 1) == 1) ? 0 : -1;
> +}
> +
> +int odp_queue_enq_multi(odp_queue_t handle, const odp_event_t ev[], int num)
> +{
> + odp_buffer_hdr_t *buf_hdr[QUEUE_MULTI_MAX];
> + queue_entry_t *queue;
> + int i;
> +
> + if (num > QUEUE_MULTI_MAX)
> + num = QUEUE_MULTI_MAX;
> +
> + queue = queue_to_qentry(handle);
> +
> + for (i = 0; i < num; i++)
> + buf_hdr[i] = (odp_buffer_hdr_t *)(void *)ev[i];
> +
> + return queue->s.enqueue_multi(queue, buf_hdr, num);
> +}
> +
> +int odp_queue_enq(odp_queue_t handle, odp_event_t ev)
> +{
> + queue_entry_t *queue;
> +
> + queue = queue_to_qentry(handle);
> + return queue->s.enqueue(queue, (odp_buffer_hdr_t *)(void *)ev);
> +}
> +
> +/* Single-consumer dequeue. */
> +int _odp_queue_deq_sc(sched_elem_t *q, odp_event_t *evp, int num)
> +{
> + int actual;
> + int i;
> + ringidx_t old_read;
> + ringidx_t old_write;
> + ringidx_t new_read;
> + uint32_t mask;
> + odp_event_t *ring;
> +
> + /* Load consumer ring state (read & write index). */
> + old_read = q->cons_read;
> + /* Producer does store-release cons_write, we need load-acquire */
> + old_write = __atomic_load_n(&q->cons_write, __ATOMIC_ACQUIRE);
> + actual = MIN(num, (int)(old_write - old_read));
> +
> + if (odp_unlikely(actual <= 0))
> + return 0;
> +
> +#ifdef ODP_CONFIG_USE_SPLIT_PRODCONS
> + __builtin_prefetch(&q->node, 1, 0);
> +#endif
> + new_read = old_read + actual;
> + q->cons_read = new_read;
> +
> + mask = q->cons_mask;
> + ring = (odp_event_t *)q->cons_ring;
> + do {
> + *evp++ = ring[old_read & mask];
> + } while (++old_read != new_read);
> +
> + /* Signal producers that empty slots are available
> + * (release ring slots). Enable other consumers to continue.
> + */
> +#ifdef CONFIG_QSCHST_LOCK
> + q->prod_read = new_read;
> +#else
> + /* Wait for loads (from ring slots) to complete. */
> + atomic_store_release(&q->prod_read, new_read, /*readonly=*/true);
> +#endif
> + /* Prefetch events after we have release ring buffer slots */
> + for (i = 0; i < actual; i++) {
> + odp_event_t evt = *--evp;
> +
> + __builtin_prefetch(
> + odp_buffer_addr(odp_buffer_from_event(evt)), 0, 0);
> + }
> + return actual;
> +}
> +
> +inline int _odp_queue_deq(sched_elem_t *q, odp_event_t *evp, int num)
> +{
> + int actual;
> + ringidx_t old_read;
> + ringidx_t old_write;
> + ringidx_t new_read;
> + uint32_t mask;
> + odp_buffer_hdr_t **ring;
> + odp_buffer_hdr_t **p_buf_hdr;
> +
> + mask = q->cons_mask;
> + ring = q->cons_ring;
> +
> + /* Load consumer ring state (read & write index) */
> + old_read = __atomic_load_n(&q->cons_read, __ATOMIC_RELAXED);
> + do {
> + /* Need __atomic_load to avoid compiler reordering
> + * Producer does store-release cons_write, we need
> + * load-acquire.
> + */
> + old_write = __atomic_load_n(&q->cons_write, __ATOMIC_ACQUIRE);
> + /* Prefetch ring buffer array */
> + __builtin_prefetch(&q->cons_ring[old_read & mask], 0, 0);
> +
> + actual = MIN(num, (int)(old_write - old_read));
> + if (odp_unlikely(actual <= 0))
> + return 0;
> +
> + /* Attempt to free ring slot(s) */
> + new_read = old_read + actual;
> + } while (!__atomic_compare_exchange_n(&q->cons_read,
> + &old_read, /* Updated on failure */
> + new_read,
> + true,
> + __ATOMIC_RELAXED,
> + __ATOMIC_RELAXED));
> +#ifdef ODP_CONFIG_USE_SPLIT_PRODCONS
> + __builtin_prefetch(&q->prod_read, 0, 0);
> +#endif
> + p_buf_hdr = (odp_buffer_hdr_t **)evp;
> + do {
> + odp_buffer_hdr_t *p_buf;
> +
> + p_buf = ring[old_read & mask];
> + __builtin_prefetch(
> + odp_buffer_addr(
> + odp_buffer_from_event((odp_event_t)p_buf)),
> + 0, 0);
> + *p_buf_hdr++ = p_buf;
> + } while (++old_read != new_read);
> + old_read -= actual;
> +
> +#ifdef ODP_CONFIG_USE_SPLIT_PRODCONS
> + __builtin_prefetch(&q->node, 1, 0);
> +#endif
> + /* Wait for our turn to signal producers */
> + if (odp_unlikely(__atomic_load_n(&q->prod_read, __ATOMIC_RELAXED) !=
> + old_read)) {
> + SEVL();
> + while (WFE() && LDXR32(&q->prod_read,
> + __ATOMIC_RELAXED) != old_read)
> + DOZE();
> + }
> +
> + /* Signal producers that empty slots are available
> + * (release ring slots)
> + * Enable other consumers to continue
> + */
> + /* Wait for loads (from ring slots) to complete */
> + atomic_store_release(&q->prod_read, new_read, /*readonly=*/true);
> +
> + return actual;
> +}
> +
> +int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int
> num)
> +{
> + sched_elem_t *q;
> +
> + q = &queue->s.sched_elem;
> + return _odp_queue_deq(q, (odp_event_t *)buf_hdr, num);
> +}
> +
> +odp_buffer_hdr_t *queue_deq(queue_entry_t *queue)
> +{
> + sched_elem_t *q;
> + odp_buffer_hdr_t *buf_hdr;
> +
> + q = &queue->s.sched_elem;
> + if (_odp_queue_deq(q, (odp_event_t *)&buf_hdr, 1) == 1)
> + return buf_hdr;
> + else
> + return BUFFER_HDR_INVALID;
> +}
> +
> +int odp_queue_deq_multi(odp_queue_t handle, odp_event_t events[], int num)
> +{
> + queue_entry_t *queue;
> +
> + queue = queue_to_qentry(handle);
> + return queue->s.dequeue_multi(queue, (odp_buffer_hdr_t **)events, num);
> +}
> +
> +odp_event_t odp_queue_deq(odp_queue_t handle)
> +{
> + queue_entry_t *queue;
> +
> + queue = queue_to_qentry(handle);
> + return (odp_event_t)queue->s.dequeue(queue);
> +}
> +
> +void odp_queue_param_init(odp_queue_param_t *params)
> +{
> + memset(params, 0, sizeof(odp_queue_param_t));
> + params->type = ODP_QUEUE_TYPE_PLAIN;
> + params->enq_mode = ODP_QUEUE_OP_MT;
> + params->deq_mode = ODP_QUEUE_OP_MT;
> + params->sched.prio = ODP_SCHED_PRIO_DEFAULT;
> + params->sched.sync = ODP_SCHED_SYNC_PARALLEL;
> + params->sched.group = ODP_SCHED_GROUP_ALL;
> +}
> +
> +int odp_queue_info(odp_queue_t handle, odp_queue_info_t *info)
> +{
> + uint32_t queue_id;
> + queue_entry_t *queue;
> + int status;
> +
> + if (odp_unlikely(info == NULL)) {
> + ODP_ERR("Unable to store info, NULL ptr given\n");
> + return -1;
> + }
> +
> + queue_id = queue_to_id(handle);
> +
> + if (odp_unlikely(queue_id >= ODP_CONFIG_QUEUES)) {
> + ODP_ERR("Invalid queue handle:%" PRIu64 "\n",
> + odp_queue_to_u64(handle));
> + return -1;
> + }
> +
> + queue = get_qentry(queue_id);
> +
> + LOCK(&queue->s.lock);
> + status = queue->s.status;
> +
> + if (odp_unlikely(status == QUEUE_STATUS_FREE ||
> + status == QUEUE_STATUS_DESTROYED)) {
> + UNLOCK(&queue->s.lock);
> + ODP_ERR("Invalid queue status:%d\n", status);
> + return -1;
> + }
> +
> + info->name = queue->s.name;
> + info->param = queue->s.param;
> +
> + UNLOCK(&queue->s.lock);
> +
> + return 0;
> +}
> +
> +uint64_t odp_queue_to_u64(odp_queue_t hdl)
> +{
> + return _odp_pri(hdl);
> +}
> diff --git a/platform/linux-generic/odp_schedule_if.c
> b/platform/linux-generic/odp_schedule_if.c
> index a9ede98d..bf92333e 100644
> --- a/platform/linux-generic/odp_schedule_if.c
> +++ b/platform/linux-generic/odp_schedule_if.c
> @@ -6,24 +6,38 @@
>
> #include <odp_schedule_if.h>
>
> -extern const schedule_fn_t schedule_sp_fn;
> -extern const schedule_api_t schedule_sp_api;
> +#if defined(ODP_SCHEDULE_SCALABLE)
>
> -extern const schedule_fn_t schedule_default_fn;
> -extern const schedule_api_t schedule_default_api;
> +extern const schedule_fn_t schedule_scalable_fn;
> +extern const schedule_api_t schedule_scalable_api;
>
> -extern const schedule_fn_t schedule_iquery_fn;
> -extern const schedule_api_t schedule_iquery_api;
> +const schedule_fn_t *sched_fn = &schedule_scalable_fn;
> +const schedule_api_t *sched_api = &schedule_scalable_api;
>
> -#ifdef ODP_SCHEDULE_SP
> -const schedule_fn_t *sched_fn = &schedule_sp_fn;
> -const schedule_api_t *sched_api = &schedule_sp_api;
> #elif defined(ODP_SCHEDULE_IQUERY)
> -const schedule_fn_t *sched_fn = &schedule_iquery_fn;
> +
> +extern const schedule_fn_t schedule_iquery_fn;
> +extern const schedule_api_t schedule_iquery_api;
> +
> +const schedule_fn_t *sched_fn = &schedule_iquery_fn;
> const schedule_api_t *sched_api = &schedule_iquery_api;
> -#else
> +
> +#elif defined(ODP_SCHEDULE_SP)
> +
> +extern const schedule_fn_t schedule_sp_fn;
> +extern const schedule_api_t schedule_sp_api;
> +
> +const schedule_fn_t *sched_fn = &schedule_sp_fn;
> +const schedule_api_t *sched_api = &schedule_sp_api;
> +
> +#else /* default scheduler */
> +
> +extern const schedule_fn_t schedule_default_fn;
> +extern const schedule_api_t schedule_default_api;
> +
> const schedule_fn_t *sched_fn = &schedule_default_fn;
> const schedule_api_t *sched_api = &schedule_default_api;
> +
> #endif
>
> uint64_t odp_schedule_wait_time(uint64_t ns)
> diff --git a/platform/linux-generic/odp_schedule_scalable.c
> b/platform/linux-generic/odp_schedule_scalable.c
> new file mode 100644
> index 00000000..f9ec656c
> --- /dev/null
> +++ b/platform/linux-generic/odp_schedule_scalable.c
> @@ -0,0 +1,1922 @@
> +/* Copyright (c) 2017, ARM Limited
> + * All rights reserved.
> + *
> + * SPDX-License-Identifier: BSD-3-Clause
> + */
> +
> +#include <odp/api/align.h>
> +#include <odp/api/atomic.h>
> +#include <odp/api/cpu.h>
> +#include <odp/api/hints.h>
> +#include <odp/api/schedule.h>
> +#include <odp/api/shared_memory.h>
> +#include <odp/api/sync.h>
> +#include <odp/api/thread.h>
> +#include <odp/api/thrmask.h>
> +#include <odp/api/time.h>
> +
> +#include <odp_internal.h>
> +#include <odp_config_internal.h>
> +#include <odp_debug_internal.h>
> +
> +#include <odp_align_internal.h>
> +#include <odp_buffer_inlines.h>
> +#include <odp_llqueue.h>
> +#include <odp_queue_internal.h>
> +#include <odp_schedule_if.h>
> +#include <odp_llsc.h>
> +#include <odp_bitset.h>
> +#include <odp_atomic16.h>
> +#include <odp_packet_io_internal.h>
> +
> +#include <limits.h>
> +#include <stdbool.h>
> +#include <string.h>
> +
> +#include <odp/api/plat/ticketlock_inlines.h>
> +#define LOCK(a) _odp_ticketlock_lock((a))
> +#define UNLOCK(a) _odp_ticketlock_unlock((a))
> +
> +#define TAG_EMPTY 0U
> +#define TAG_USED (1U << 15)
> +#define TAG_BUSY (1U << 31)
> +#define PKTIO_QUEUE_2_TAG(p, q) ((p) << 16 | (q) | TAG_USED)
> +#define TAG_2_PKTIO(t) (((t) >> 16) & 0x7FFF)
> +#define TAG_2_QUEUE(t) ((t) & 0x7FFF)
> +#define TAG_IS_READY(t) (((t) & (TAG_USED | TAG_BUSY)) == TAG_USED)
> +#define PKTIN_MAX (ODP_CONFIG_PKTIO_ENTRIES * PKTIO_MAX_QUEUES)
> +#define MAXTHREADS ATOM_BITSET_SIZE
> +
> +static uint32_t pktin_num;
> +static uint32_t pktin_hi;
> +static uint16_t pktin_count[ODP_CONFIG_PKTIO_ENTRIES];
> +static uint32_t pktin_tags[PKTIN_MAX] ODP_ALIGNED_CACHE;
> +
> +#define __atomic_fetch_max(var, v, mo) do { \
> + /* Evalulate 'v' once */ \
> + __typeof__(v) tmp_v = (v); \
> + __typeof__(*var) old_var = \
> + __atomic_load_n((var), __ATOMIC_RELAXED); \
> + while (tmp_v > old_var) { \
> + /* Attempt to store 'v' in '*var' */ \
> + if (__atomic_compare_exchange_n((var), &old_var, \
> + tmp_v, true, (mo), \
> + (mo))) \
> + break; \
> + /* Else failure, try again (with updated value of \
> + * old_var). \
> + */ \
> + } \
> + /* v <= old_var, nothing to do */ \
> + } while (0)
> +
> +ODP_STATIC_ASSERT(ODP_SCHED_PRIO_LOWEST == (ODP_SCHED_PRIO_NUM - 1),
> + "lowest_prio_does_not_match_with_num_prios");
> +
> +ODP_STATIC_ASSERT((ODP_SCHED_PRIO_NORMAL > 0) &&
> + (ODP_SCHED_PRIO_NORMAL < (ODP_SCHED_PRIO_NUM - 1)),
> + "normal_prio_is_not_between_highest_and_lowest");
> +
> +ODP_STATIC_ASSERT(CHECK_IS_POWER2(ODP_CONFIG_QUEUES),
> + "Number_of_queues_is_not_power_of_two");
> +
> +/*
> + * Scheduler group related variables.
> + */
> +/* Currently used scheduler groups */
> +static sched_group_mask_t sg_free;
> +static sched_group_t *sg_vec[MAX_SCHED_GROUP];
> +/* Group lock for MT-safe APIs */
> +odp_spinlock_t sched_grp_lock;
> +
> +#define SCHED_GROUP_JOIN 0
> +#define SCHED_GROUP_LEAVE 1
> +
> +/*
> + * Per thread state
> + */
> +static sched_scalable_thread_state_t thread_state[MAXTHREADS];
> +__thread sched_scalable_thread_state_t *sched_ts;
> +
> +/*
> + * Forward declarations.
> + */
> +static int thread_state_init(int tidx)
> +{
> + sched_scalable_thread_state_t *ts;
> + uint32_t i;
> +
> + ODP_ASSERT(tidx < MAXTHREADS);
> + ts = &thread_state[tidx];
> + ts->atomq = NULL;
> + ts->rctx = NULL;
> + ts->pause = false;
> + ts->out_of_order = false;
> + ts->tidx = tidx;
> + ts->dequeued = 0;
> + ts->pktin_next = 0;
> + ts->pktin_poll_cnts = 0;
> + ts->ticket = TICKET_INVALID;
> + ts->priv_rvec_free = 0;
> + ts->rvec_free = (1ULL << TS_RVEC_SIZE) - 1;
> + ts->num_schedq = 0;
> + ts->sg_sem = 1; /* Start with sched group semaphore changed */
> + memset(ts->sg_actual, 0, sizeof(ts->sg_actual));
> + for (i = 0; i < TS_RVEC_SIZE; i++) {
> + ts->rvec[i].rvec_free = &ts->rvec_free;
> + ts->rvec[i].idx = i;
> + }
> + sched_ts = ts;
> +
> + return 0;
> +}
> +
> +static void insert_schedq_in_list(sched_scalable_thread_state_t *ts,
> + sched_queue_t *schedq)
> +{
> + /* Find slot for schedq */
> + for (uint32_t i = 0; i < ts->num_schedq; i++) {
> + /* Lower value is higher priority and closer to start of list */
> + if (schedq->prio <= ts->schedq_list[i]->prio) {
> + /* This is the slot! */
> + sched_queue_t *tmp;
> +
> + tmp = ts->schedq_list[i];
> + ts->schedq_list[i] = schedq;
> + schedq = tmp;
> + /* Continue the insertion procedure with the
> + * new schedq.
> + */
> + }
> + }
> + if (ts->num_schedq == SCHEDQ_PER_THREAD)
> + ODP_ABORT("Too many schedqs\n");
> + ts->schedq_list[ts->num_schedq++] = schedq;
> +}
> +
> +static void remove_schedq_from_list(sched_scalable_thread_state_t *ts,
> + sched_queue_t *schedq)
> +{
> + /* Find schedq */
> + for (uint32_t i = 0; i < ts->num_schedq; i++)
> + if (ts->schedq_list[i] == schedq) {
> + /* Move remaining schedqs */
> + for (uint32_t j = i + 1; j < ts->num_schedq; j++)
> + ts->schedq_list[j - 1] = ts->schedq_list[j];
> + ts->num_schedq--;
> + return;
> + }
> + ODP_ABORT("Cannot find schedq\n");
> +}
> +
> +/*******************************************************************************
> + * Scheduler queues
> +
> ******************************************************************************/
> +#ifndef odp_container_of
> +#define odp_container_of(pointer, type, member) \
> + ((type *)(void *)(((char *)pointer) - offsetof(type, member)))
> +#endif
> +
> +static inline void schedq_init(sched_queue_t *schedq, uint32_t prio)
> +{
> + llqueue_init(&schedq->llq);
> + schedq->prio = prio;
> +}
> +
> +static inline sched_elem_t *schedq_peek(sched_queue_t *schedq)
> +{
> + struct llnode *ptr;
> +
> + ptr = llq_head(&schedq->llq);
> + return odp_container_of(ptr, sched_elem_t, node);
> +}
> +
> +static inline odp_bool_t schedq_cond_pop(sched_queue_t *schedq,
> + sched_elem_t *elem)
> +{
> + return llq_dequeue_cond(&schedq->llq, &elem->node);
> +}
> +
> +static inline void schedq_push(sched_queue_t *schedq, sched_elem_t *elem)
> +{
> + llq_enqueue(&schedq->llq, &elem->node);
> +}
> +
> +static inline odp_bool_t schedq_cond_rotate(sched_queue_t *schedq,
> + sched_elem_t *elem)
> +{
> + return llq_cond_rotate(&schedq->llq, &elem->node);
> +}
> +
> +static inline bool schedq_elem_on_queue(sched_elem_t *elem)
> +{
> + return elem->node.next != NULL;
> +}
> +
> +/*******************************************************************************
> + * Shared metadata btwn scheduler and queue
> +
> ******************************************************************************/
> +
> +void sched_update_enq(sched_elem_t *q, uint32_t actual)
> +{
> + qschedstate_t oss, nss;
> + uint32_t ticket;
> +
> + oss = q->qschst;
> + /* Update event counter, optionally taking a ticket. */
> + do {
> + ticket = TICKET_INVALID;
> + nss = oss;
> + nss.numevts += actual;
> + if (odp_unlikely(oss.numevts <= 0 && nss.numevts > 0))
> + /* E -> NE transition */
> + if (q->qschst_type != ODP_SCHED_SYNC_ATOMIC ||
> + oss.cur_ticket == oss.nxt_ticket)
> + /* Parallel or ordered queues: always take
> + * ticket.
> + * Atomic queue: only take ticket if one is
> + * immediately available.
> + * Otherwise ticket already taken => queue
> + * processed by some thread.
> + */
> + ticket = nss.nxt_ticket++;
> + /* Else queue already was non-empty. */
> + /* Attempt to update numevts counter and optionally take ticket. */
> + } while (!__atomic_compare_exchange(
> + &q->qschst, &oss, &nss,
> + true, __ATOMIC_RELAXED, __ATOMIC_RELAXED));
> +
> + if (odp_unlikely(ticket != TICKET_INVALID)) {
> + /* Wait for our turn to update schedq. */
> + if (odp_unlikely(
> + __atomic_load_n(&q->qschst.cur_ticket,
> + __ATOMIC_ACQUIRE) != ticket)) {
> + SEVL();
> + while (WFE() &&
> + LDXR8(&q->qschst.cur_ticket,
> + __ATOMIC_ACQUIRE) != ticket)
> + DOZE();
> + }
> + /* Enqueue at end of scheduler queue */
> + /* We are here because of empty-to-non-empty transition
> + * This means queue must be pushed to schedq if possible
> + * but we can't do that if it already is on the schedq
> + */
> + if (odp_likely(!schedq_elem_on_queue(q) &&
> + q->pop_deficit == 0)) {
> + /* Queue not already on schedq and no pop deficit means
> + * we can push queue to schedq */
> + schedq_push(q->schedq, q);
> + } else {
> + /* Missed push => cancels one missed pop */
> + q->pop_deficit--;
> + }
> + atomic_store_release(&q->qschst.cur_ticket, ticket + 1,
> + /*readonly=*/false);
> + }
> + /* Else queue was not empty or atomic queue already busy. */
> +}
> +
> +void sched_update_enq_sp(sched_elem_t *q, uint32_t actual)
> +{
> + qschedstate_t oss, nss;
> + uint32_t ticket;
> +
> + oss = q->qschst;
> + /* Update event counter, optionally taking a ticket. */
> + ticket = TICKET_INVALID;
> + nss = oss;
> + nss.numevts += actual;
> + if (odp_unlikely(oss.numevts <= 0 && nss.numevts > 0)) {
> + /* E -> NE transition */
> + if (q->qschst_type != ODP_SCHED_SYNC_ATOMIC ||
> + oss.cur_ticket == oss.nxt_ticket) {
> + /* Parallel or ordered queues: always take
> + * ticket.
> + * Atomic queue: only take ticket if one is
> + * immediately available. Otherwise ticket already
> + * taken => queue owned/processed by some thread
> + */
> + ticket = nss.nxt_ticket++;
> + }
> + }
> + /* Else queue already was non-empty. */
> + /* Attempt to update numevts counter and optionally take ticket. */
> + q->qschst = nss;
> +
> + if (odp_unlikely(ticket != TICKET_INVALID)) {
> + /* Enqueue at end of scheduler queue */
> + /* We are here because of empty-to-non-empty transition
> + * This means queue must be pushed to schedq if possible
> + * but we can't do that if it already is on the schedq
> + */
> + if (odp_likely(!schedq_elem_on_queue(q) &&
> + q->pop_deficit == 0)) {
> + /* Queue not already on schedq and no pop deficit means
> + * we can push queue to schedq */
> + schedq_push(q->schedq, q);
> + } else {
> + /* Missed push => cancels one missed pop */
> + q->pop_deficit--;
> + }
> + q->qschst.cur_ticket = ticket + 1;
> + }
> + /* Else queue was not empty or atomic queue already busy. */
> +}
> +
> +#ifndef CONFIG_QSCHST_LOCK
> +/* The scheduler is the only entity that performs the dequeue from a queue.
> */
> +static void
> +sched_update_deq(sched_elem_t *q,
> + uint32_t actual,
> + bool atomic) __attribute__((always_inline));
> +static inline void
> +sched_update_deq(sched_elem_t *q,
> + uint32_t actual, bool atomic)
> +{
> + qschedstate_t oss, nss;
> + uint32_t ticket;
> +
> + if (atomic) {
> + bool pushed = false;
> +
> + /* We own this atomic queue, only we can dequeue from it and
> + * thus decrease numevts. Other threads may enqueue and thus
> + * increase numevts.
> + * This means that numevts can't unexpectedly become 0 and
> + * invalidate a push operation already performed
> + */
> + oss = q->qschst;
> + do {
> + ODP_ASSERT(oss.cur_ticket == sched_ts->ticket);
> + nss = oss;
> + nss.numevts -= actual;
> + if (nss.numevts > 0 && !pushed) {
> + schedq_push(q->schedq, q);
> + pushed = true;
> + }
> + /* Attempt to release ticket expecting our view of
> + * numevts to be correct
> + * Unfortunately nxt_ticket will also be included in
> + * the CAS operation
> + */
> + nss.cur_ticket = sched_ts->ticket + 1;
> + } while (odp_unlikely(!__atomic_compare_exchange(
> + &q->qschst,
> + &oss, &nss,
> + true,
> + __ATOMIC_RELEASE,
> + __ATOMIC_RELAXED)));
> + return;
> + }
> +
> + oss = q->qschst;
> + do {
> + ticket = TICKET_INVALID;
> + nss = oss;
> + nss.numevts -= actual;
> + nss.wrr_budget -= actual;
> + if ((oss.numevts > 0 && nss.numevts <= 0) ||
> + oss.wrr_budget <= actual) {
> + /* If we have emptied parallel/ordered queue or
> + * exchausted its WRR budget, we need a ticket
> + * for a later pop.
> + */
> + ticket = nss.nxt_ticket++;
> + /* Reset wrr_budget as we might also push the
> + * queue to the schedq.
> + */
> + nss.wrr_budget = CONFIG_WRR_WEIGHT;
> + }
> + /* Attempt to update numevts and optionally take ticket. */
> + } while (!__atomic_compare_exchange(
> + &q->qschst, &oss, &nss,
> + true, __ATOMIC_RELAXED, __ATOMIC_RELAXED));
> +
> + if (odp_unlikely(ticket != TICKET_INVALID)) {
> + ODP_ASSERT(q->qschst_type != ODP_SCHED_SYNC_ATOMIC);
> + /* Wait for our turn to update schedq. */
> + if (odp_unlikely(
> + __atomic_load_n(&q->qschst.cur_ticket,
> + __ATOMIC_ACQUIRE) != ticket)) {
> + SEVL();
> + while (WFE() &&
> + LDXR8(&q->qschst.cur_ticket,
> + __ATOMIC_ACQUIRE) != ticket)
> + DOZE();
> + }
> + /* We are here because of non-empty-to-empty transition or
> + * WRR budget exhausted
> + * This means the queue must be popped from the schedq, now or
> + * later
> + * If there was no NE->E transition but instead the WRR budget
> + * was exhausted, the queue needs to be moved (popped and
> + * pushed) to the tail of the schedq
> + */
> + if (oss.numevts > 0 && nss.numevts <= 0) {
> + /* NE->E transition, need to pop */
> + if (!schedq_elem_on_queue(q) ||
> + !schedq_cond_pop(q->schedq, q)) {
> + /* Queue not at head, failed to dequeue
> + * Missed a pop.
> + */
> + q->pop_deficit++;
> + }
> + } else {
> + /* WRR budget exhausted
> + * Need to move queue to tail of schedq if possible
> + */
> + if (odp_likely(schedq_elem_on_queue(q))) {
> + /* Queue is on schedq, try to move it to
> + * the tail
> + */
> + (void)schedq_cond_rotate(q->schedq, q);
> + }
> + /* Else queue not on schedq or not at head of schedq
> + * No pop => no push
> + */
> + }
> + atomic_store_release(&q->qschst.cur_ticket, ticket + 1,
> + /*readonly=*/false);
> + }
> +}
> +#endif
> +
> +#ifdef CONFIG_QSCHST_LOCK
> +static void
> +sched_update_deq_sc(sched_elem_t *q,
> + uint32_t actual,
> + bool atomic) __attribute__((always_inline));
> +static inline void
> +sched_update_deq_sc(sched_elem_t *q,
> + uint32_t actual, bool atomic)
> +{
> + qschedstate_t oss, nss;
> + uint32_t ticket;
> +
> + if (atomic) {
> + ODP_ASSERT(q->qschst.cur_ticket == sched_ts->ticket);
> + ODP_ASSERT(q->qschst.cur_ticket != q->qschst.nxt_ticket);
> + q->qschst.numevts -= actual;
> + q->qschst.cur_ticket = sched_ts->ticket + 1;
> + if (q->qschst.numevts > 0)
> + schedq_push(q->schedq, q);
> + return;
> + }
> +
> + oss = q->qschst;
> + ticket = TICKET_INVALID;
> + nss = oss;
> + nss.numevts -= actual;
> + nss.wrr_budget -= actual;
> + if ((oss.numevts > 0 && nss.numevts <= 0) || oss.wrr_budget <= actual) {
> + /* If we emptied the queue or
> + * if we have served the maximum number of events
> + * then we need a ticket for a later pop.
> + */
> + ticket = nss.nxt_ticket++;
> + /* Also reset wrr_budget as we might also push the
> + * queue to the schedq.
> + */
> + nss.wrr_budget = CONFIG_WRR_WEIGHT;
> + }
> + q->qschst = nss;
> +
> + if (ticket != TICKET_INVALID) {
> + if (oss.numevts > 0 && nss.numevts <= 0) {
> + /* NE->E transition, need to pop */
> + if (!schedq_elem_on_queue(q) ||
> + !schedq_cond_pop(q->schedq, q)) {
> + /* Queue not at head, failed to dequeue.
> + * Missed a pop.
> + */
> + q->pop_deficit++;
> + }
> + } else {
> + /* WRR budget exhausted
> + * Need to move queue to tail of schedq if possible
> + */
> + if (odp_likely(schedq_elem_on_queue(q))) {
> + /* Queue is on schedq, try to move it to
> + * the tail
> + */
> + (void)schedq_cond_rotate(q->schedq, q);
> + }
> + /* Else queue not on schedq or not at head of schedq
> + * No pop => no push
> + */
> + }
> + q->qschst.cur_ticket = ticket + 1;
> + }
> +}
> +#endif
> +
> +static inline void sched_update_popd_sc(sched_elem_t *elem)
> +{
> + if (elem->pop_deficit != 0 &&
> + schedq_elem_on_queue(elem) &&
> + schedq_cond_pop(elem->schedq, elem))
> + elem->pop_deficit--;
> +}
> +
> +#ifndef CONFIG_QSCHST_LOCK
> +static inline void sched_update_popd(sched_elem_t *elem)
> +{
> + uint32_t ticket = __atomic_fetch_add(&elem->qschst.nxt_ticket,
> + 1,
> + __ATOMIC_RELAXED);
> + if (odp_unlikely(__atomic_load_n(&elem->qschst.cur_ticket,
> + __ATOMIC_ACQUIRE) != ticket)) {
> + SEVL();
> + while (WFE() && LDXR8(&elem->qschst.cur_ticket,
> + __ATOMIC_ACQUIRE) != ticket)
> + DOZE();
> + }
> + sched_update_popd_sc(elem);
> + atomic_store_release(&elem->qschst.cur_ticket, ticket + 1,
> + /*readonly=*/false);
> +}
> +#endif
> +
> +sched_queue_t *schedq_from_sched_group(odp_schedule_group_t grp, uint32_t
> prio)
> +{
> + uint32_t sgi;
> + sched_group_t *sg;
> + uint32_t x;
> +
> + ODP_ASSERT(grp >= 0 && grp <= (odp_schedule_group_t)MAX_SCHED_GROUP);
> + ODP_ASSERT((sg_free & (1ULL << grp)) == 0);
> + ODP_ASSERT(prio < ODP_SCHED_PRIO_NUM);
> +
> + sgi = grp;
> + sg = sg_vec[sgi];
> +
> + /* Use xcount to spread queues over the xfactor schedq's
> + * per priority.
> + */
> + x = __atomic_fetch_add(&sg->xcount[prio], 1, __ATOMIC_RELAXED);
> + if (x == 0) {
> + /* First ODP queue for this priority
> + * Notify all threads in sg->thr_wanted that they
> + * should join.
> + */
> + sched_group_mask_t thrds = sg->thr_wanted;
> +
> + while (!bitset_is_null(thrds)) {
> + uint32_t thr;
> +
> + thr = bitset_ffs(thrds) - 1;
> + thrds = bitset_clr(thrds, thr);
> + /* Notify the thread about membership in this
> + * group/priority.
> + */
> + atom_bitset_set(&thread_state[thr].sg_wanted[prio],
> + sgi, __ATOMIC_RELEASE);
> + __atomic_store_n(&thread_state[thr].sg_sem, 1,
> + __ATOMIC_RELEASE);
> + }
> + }
> + return &sg->schedq[prio * sg->xfactor + x % sg->xfactor];
> +}
> +
> +void sched_group_xcount_dec(odp_schedule_group_t grp, uint32_t prio)
> +{
> + uint32_t sgi;
> + sched_group_t *sg;
> + uint32_t x;
> +
> + ODP_ASSERT(grp >= 0 && grp <= (odp_schedule_group_t)MAX_SCHED_GROUP);
> + ODP_ASSERT((sg_free & (1ULL << grp)) == 0);
> + ODP_ASSERT(prio < ODP_SCHED_PRIO_NUM);
> +
> + sgi = grp;
> + sg = sg_vec[sgi];
> + x = __atomic_sub_fetch(&sg->xcount[prio], 1, __ATOMIC_RELAXED);
> +
> + if (x == 0) {
> + /* Last ODP queue for this priority
> + * Notify all threads in sg->thr_wanted that they
> + * should leave.
> + */
> + sched_group_mask_t thrds = sg->thr_wanted;
> +
> + while (!bitset_is_null(thrds)) {
> + uint32_t thr;
> +
> + thr = bitset_ffs(thrds) - 1;
> + thrds = bitset_clr(thrds, thr);
> + /* Notify the thread about membership in this
> + * group/priority.
> + */
> + atom_bitset_clr(&thread_state[thr].sg_wanted[prio],
> + sgi, __ATOMIC_RELEASE);
> + __atomic_store_n(&thread_state[thr].sg_sem, 1,
> + __ATOMIC_RELEASE);
> + }
> + }
> +}
> +
> +static void update_sg_membership(sched_scalable_thread_state_t *ts)
> +{
> + uint32_t p;
> + sched_group_mask_t sg_wanted;
> + sched_group_mask_t added;
> + sched_group_mask_t removed;
> + uint32_t sgi;
> + sched_group_t *sg;
> + uint32_t x;
> +
> + for (p = 0; p < ODP_SCHED_PRIO_NUM; p++) {
> + sg_wanted = __atomic_load_n(&ts->sg_wanted[p],
> + __ATOMIC_ACQUIRE);
> + if (!bitset_is_eql(ts->sg_actual[p], sg_wanted)) {
> + /* Our sched_group membership has changed */
> + added = bitset_andn(sg_wanted, ts->sg_actual[p]);
> + while (!bitset_is_null(added)) {
> + sgi = bitset_ffs(added) - 1;
> + sg = sg_vec[sgi];
> + for (x = 0; x < sg->xfactor; x++) {
> + /* Include our thread index to shift
> + * (rotate) the order of schedq's
> + */
> + insert_schedq_in_list
> + (ts,
> + &sg->schedq[p * sg->xfactor +
> + (x + ts->tidx) % sg->xfactor]);
> + }
> + atom_bitset_set(&sg->thr_actual[p], ts->tidx,
> + __ATOMIC_RELAXED);
> + added = bitset_clr(added, sgi);
> + }
> + removed = bitset_andn(ts->sg_actual[p], sg_wanted);
> + while (!bitset_is_null(removed)) {
> + sgi = bitset_ffs(removed) - 1;
> + sg = sg_vec[sgi];
> + for (x = 0; x < sg->xfactor; x++) {
> + remove_schedq_from_list
> + (ts,
> + &sg->schedq[p *
> + sg->xfactor + x]);
> + }
> + atom_bitset_clr(&sg->thr_actual[p], ts->tidx,
> + __ATOMIC_RELAXED);
> + removed = bitset_clr(removed, sgi);
> + }
> + ts->sg_actual[p] = sg_wanted;
> + }
> + }
> +}
> +
> +/*******************************************************************************
> + * Scheduler
> +
> ******************************************************************************/
> +
> +static inline void _schedule_release_atomic(sched_scalable_thread_state_t
> *ts)
> +{
> +#ifdef CONFIG_QSCHST_LOCK
> + sched_update_deq_sc(ts->atomq, ts->dequeued, true);
> + ODP_ASSERT(ts->atomq->qschst.cur_ticket != ts->ticket);
> + ODP_ASSERT(ts->atomq->qschst.cur_ticket ==
> + ts->atomq->qschst.nxt_ticket);
> +#else
> + sched_update_deq(ts->atomq, ts->dequeued, true);
> +#endif
> + ts->atomq = NULL;
> + ts->ticket = TICKET_INVALID;
> +}
> +
> +static inline void _schedule_release_ordered(sched_scalable_thread_state_t
> *ts)
> +{
> + ts->out_of_order = false;
> + rctx_release(ts->rctx);
> + ts->rctx = NULL;
> +}
> +
> +static void pktin_poll(sched_scalable_thread_state_t *ts)
> +{
> + uint32_t i, tag, hi, npolls = 0;
> + int pktio_index, queue_index;
> +
> + hi = __atomic_load_n(&pktin_hi, __ATOMIC_RELAXED);
> + if (hi == 0)
> + return;
> +
> + for (i = ts->pktin_next; npolls != hi; i = (i + 1) % hi, npolls++) {
> + tag = __atomic_load_n(&pktin_tags[i], __ATOMIC_RELAXED);
> + if (!TAG_IS_READY(tag))
> + continue;
> + if (!__atomic_compare_exchange_n(&pktin_tags[i], &tag,
> + tag | TAG_BUSY,
> + true,
> + __ATOMIC_ACQUIRE,
> + __ATOMIC_RELAXED))
> + continue;
> + /* Tag grabbed */
> + pktio_index = TAG_2_PKTIO(tag);
> + queue_index = TAG_2_QUEUE(tag);
> + if (odp_unlikely(sched_cb_pktin_poll(pktio_index,
> + 1, &queue_index))) {
> + /* Pktio stopped or closed
> + * Remove tag from pktin_tags
> + */
> + __atomic_store_n(&pktin_tags[i],
> + TAG_EMPTY, __ATOMIC_RELAXED);
> + __atomic_fetch_sub(&pktin_num,
> + 1, __ATOMIC_RELEASE);
> + /* Call stop_finalize when all queues
> + * of the pktio have been removed
> + */
> + if (__atomic_sub_fetch(&pktin_count[pktio_index], 1,
> + __ATOMIC_RELAXED) == 0)
> + sched_cb_pktio_stop_finalize(pktio_index);
> + } else {
> + /* We don't know whether any packets were found and enqueued
> + * Write back original tag value to release pktin queue
> + */
> + __atomic_store_n(&pktin_tags[i], tag, __ATOMIC_RELAXED);
> + /* Do not iterate through all pktin queues every time */
> + if ((ts->pktin_poll_cnts & 0xf) != 0)
> + break;
> + }
> + }
> + ODP_ASSERT(i < hi);
> + ts->pktin_poll_cnts++;
> + ts->pktin_next = i;
> +}
> +
> +static int _schedule(odp_queue_t *from, odp_event_t ev[], int num_evts)
> +{
> + sched_scalable_thread_state_t *ts;
> + sched_elem_t *atomq;
> + int num;
> + uint32_t i;
> +
> + ts = sched_ts;
> + atomq = ts->atomq;
> +
> + /* Once an atomic queue has been scheduled to a thread, it will stay
> + * on that thread until empty or 'rotated' by WRR
> + */
> + if (atomq != NULL) {
> + ODP_ASSERT(ts->ticket != TICKET_INVALID);
> +#ifdef CONFIG_QSCHST_LOCK
> + LOCK(&atomq->qschlock);
> +#endif
> +dequeue_atomic:
> + ODP_ASSERT(ts->ticket == atomq->qschst.cur_ticket);
> + ODP_ASSERT(ts->ticket != atomq->qschst.nxt_ticket);
> + /* Atomic queues can be dequeued without lock since this thread
> + * has the only reference to the atomic queue being processed.
> + */
> + if (ts->dequeued < atomq->qschst.wrr_budget) {
> + num = _odp_queue_deq_sc(atomq, ev, num_evts);
> + if (odp_likely(num != 0)) {
> +#ifdef CONFIG_QSCHST_LOCK
> + UNLOCK(&atomq->qschlock);
> +#endif
> + ts->dequeued += num;
> + /* Allow this thread to continue to 'own' this
> + * atomic queue until all events have been
> + * processed and the thread re-invokes the
> + * scheduler.
> + */
> + if (from)
> + *from = queue_get_handle(
> + (queue_entry_t *)atomq);
> + return num;
> + }
> + }
> + /* Atomic queue was empty or interrupted by WRR, release it. */
> + _schedule_release_atomic(ts);
> +#ifdef CONFIG_QSCHST_LOCK
> + UNLOCK(&atomq->qschlock);
> +#endif
> + }
> +
> + /* Release any previous reorder context. */
> + if (ts->rctx != NULL)
> + _schedule_release_ordered(ts);
> +
> + /* Check for and perform any scheduler group updates. */
> + if (odp_unlikely(__atomic_load_n(&ts->sg_sem, __ATOMIC_RELAXED) != 0)) {
> + (void)__atomic_load_n(&ts->sg_sem, __ATOMIC_ACQUIRE);
> + ts->sg_sem = 0;
> + update_sg_membership(ts);
> + }
> +
> + /* Scan our schedq list from beginning to end */
> + for (i = 0; i < ts->num_schedq; i++) {
> + sched_queue_t *schedq = ts->schedq_list[i];
> + sched_elem_t *elem;
> +restart_same:
> + elem = schedq_peek(schedq);
> + if (odp_unlikely(elem == NULL)) {
> + /* Schedq empty, look at next one. */
> + continue;
> + }
> +
> + if (elem->cons_type == ODP_SCHED_SYNC_ATOMIC) {
> + /* Dequeue element only if it is still at head
> + * of schedq.
> + */
> + if (odp_unlikely(!schedq_cond_pop(schedq, elem))) {
> + /* Queue not at head of schedq anymore, some
> + * other thread popped it.
> + */
> + goto restart_same;
> + }
> + ts->atomq = elem;
> + atomq = elem;
> + ts->dequeued = 0;
> +#ifdef CONFIG_QSCHST_LOCK
> + LOCK(&atomq->qschlock);
> + ts->ticket = atomq->qschst.nxt_ticket++;
> + ODP_ASSERT(atomq->qschst.cur_ticket == ts->ticket);
> +#else
> + /* Dequeued atomic queue from the schedq, only we
> + * can process it and any qschst updates are our
> + * responsibility.
> + */
> + /* The ticket taken below will signal producers */
> + ts->ticket = __atomic_fetch_add(
> + &atomq->qschst.nxt_ticket, 1, __ATOMIC_RELAXED);
> + while (__atomic_load_n(
> + &atomq->qschst.cur_ticket,
> + __ATOMIC_ACQUIRE) != ts->ticket) {
> + /* No need to use WFE, spinning here seems
> + * very infrequent.
> + */
> + odp_cpu_pause();
> + }
> +#endif
> + goto dequeue_atomic;
> + } else if (elem->cons_type == ODP_SCHED_SYNC_PARALLEL) {
> +#ifdef CONFIG_QSCHST_LOCK
> + LOCK(&elem->qschlock);
> + num = _odp_queue_deq_sc(elem, ev, num_evts);
> + if (odp_likely(num != 0)) {
> + sched_update_deq_sc(elem, num, false);
> + UNLOCK(&elem->qschlock);
> + if (from)
> + *from =
> + queue_get_handle((queue_entry_t *)elem);
> + return num;
> + }
> + UNLOCK(&elem->qschlock);
> +#else
> + num = _odp_queue_deq(elem, ev, num_evts);
> + if (odp_likely(num != 0)) {
> + sched_update_deq(elem, num, false);
> + if (from)
> + *from =
> + queue_get_handle((queue_entry_t *)elem);
> + return num;
> + }
> +#endif
> + } else if (elem->cons_type == ODP_SCHED_SYNC_ORDERED) {
> + reorder_window_t *rwin;
> + reorder_context_t *rctx;
> + uint32_t sn;
> + uint32_t idx;
> + int ret;
> +
> + /* The ordered queue has a reorder window so requires
> + * order restoration. We must use a reorder context to
> + * collect all outgoing events. Ensure there is at least
> + * one available reorder context.
> + */
> + if (odp_unlikely(bitset_is_null(ts->priv_rvec_free))) {
> + ts->priv_rvec_free = atom_bitset_xchg(
> + &ts->rvec_free, 0,
> + __ATOMIC_RELAXED);
> + if (odp_unlikely(bitset_is_null(
> + ts->priv_rvec_free))) {
> + /* No free reorder contexts for
> + * this thread. Look at next schedq,
> + * hope we find non-ordered queue.
> + */
> + continue;
> + }
> + }
> + /* rwin_reserve and odp_queue_deq must be atomic or
> + * there will be a potential race condition.
> + * Allocate a slot in the reorder window.
> + */
> + rwin = queue_get_rwin((queue_entry_t *)elem);
> + ODP_ASSERT(rwin != NULL);
> + if (odp_unlikely(!rwin_reserve(rwin, &sn))) {
> + /* Reorder window full */
> + /* Look at next schedq, find other queue */
> + continue;
> + }
> + /* Wait for our turn to dequeue */
> + if (odp_unlikely(__atomic_load_n(&rwin->turn,
> + __ATOMIC_ACQUIRE)
> + != sn)) {
> + SEVL();
> + while (WFE() &&
> + LDXR32(&rwin->turn, __ATOMIC_ACQUIRE)
> + != sn)
> + DOZE();
> + }
> +#ifdef CONFIG_QSCHST_LOCK
> + LOCK(&elem->qschlock);
> +#endif
> + ret = _odp_queue_deq_sc(elem, ev, num_evts);
> + /* Wait for prod_read write in _odp_queue_dequeue_sc()
> + * to complete before we signal the next consumer
> + */
> + atomic_store_release(&rwin->turn, sn + 1,
> + /*readonly=*/false);
> + /* Find and initialise an unused reorder context. */
> + idx = bitset_ffs(ts->priv_rvec_free) - 1;
> + ts->priv_rvec_free =
> + bitset_clr(ts->priv_rvec_free, idx);
> + rctx = &ts->rvec[idx];
> + /* Need to initialise reorder context or we can't
> + * release it later.
> + */
> + rctx_init(rctx, idx, rwin, sn);
> +
> + /* Was dequeue successful? */
> + if (odp_likely(ret != 0)) {
> + /* Perform scheduler related updates */
> +#ifdef CONFIG_QSCHST_LOCK
> + sched_update_deq_sc(elem, ret,
> + /*atomic=*/false);
> + UNLOCK(&elem->qschlock);
> +#else
> + sched_update_deq(elem, ret, /*atomic=*/false);
> +#endif
> +
> + /* Are we in-order or out-of-order? */
> + ts->out_of_order = sn != rwin->hc.head;
> +
> + ts->rctx = rctx;
> + if (from)
> + *from = queue_get_handle(
> + (queue_entry_t *)elem);
> + return ret;
> + }
> +#ifdef CONFIG_QSCHST_LOCK
> + UNLOCK(&elem->qschlock);
> +#endif
> + /* Since a slot was reserved in the reorder window,
> + * the reorder context needs to be released and
> + * inserted into the reorder window.
> + */
> + rctx_release(rctx);
> + ODP_ASSERT(ts->rctx == NULL);
> + }
> + /* Dequeue from parallel/ordered queue failed
> + * Check if we have a queue at the head of the schedq that needs
> + * to be popped
> + */
> + if (odp_unlikely(__atomic_load_n(&elem->pop_deficit,
> + __ATOMIC_RELAXED) != 0)) {
> +#ifdef CONFIG_QSCHST_LOCK
> + LOCK(&elem->qschlock);
> + sched_update_popd_sc(elem);
> + UNLOCK(&elem->qschlock);
> +#else
> + sched_update_popd(elem);
> +#endif
> + }
> + }
> +
> + pktin_poll(ts);
> + return 0;
> +}
> +
> +/******************************************************************************/
> +
> +static void schedule_order_lock(unsigned lock_index)
> +{
> + struct reorder_context *rctx = sched_ts->rctx;
> +
> + if (odp_unlikely(rctx == NULL ||
> + rctx->rwin == NULL ||
> + lock_index >= rctx->rwin->lock_count)) {
> + ODP_ERR("Invalid call to odp_schedule_order_lock\n");
> + return;
> + }
> + if (odp_unlikely(__atomic_load_n(&rctx->rwin->olock[lock_index],
> + __ATOMIC_ACQUIRE) != rctx->sn)) {
> + SEVL();
> + while (WFE() &&
> + LDXR32(&rctx->rwin->olock[lock_index],
> + __ATOMIC_ACQUIRE) != rctx->sn)
> + DOZE();
> + }
> +}
> +
> +static void schedule_order_unlock(unsigned lock_index)
> +{
> + struct reorder_context *rctx;
> +
> + rctx = sched_ts->rctx;
> + if (odp_unlikely(rctx == NULL ||
> + rctx->rwin == NULL ||
> + lock_index >= rctx->rwin->lock_count ||
> + rctx->rwin->olock[lock_index] != rctx->sn)) {
> + ODP_ERR("Invalid call to odp_schedule_order_unlock\n");
> + return;
> + }
> + atomic_store_release(&rctx->rwin->olock[lock_index],
> + rctx->sn + 1,
> + /*readonly=*/false);
> + rctx->olock_flags |= 1U << lock_index;
> +}
> +
> +static void schedule_release_atomic(void)
> +{
> + sched_scalable_thread_state_t *ts;
> +
> + ts = sched_ts;
> + if (odp_likely(ts->atomq != NULL)) {
> +#ifdef CONFIG_QSCHST_LOCK
> + sched_elem_t *atomq;
> +
> + atomq = ts->atomq;
> + LOCK(&atomq->qschlock);
> +#endif
> + _schedule_release_atomic(ts);
> +#ifdef CONFIG_QSCHST_LOCK
> + UNLOCK(&atomq->qschlock);
> +#endif
> + }
> +}
> +
> +static void schedule_release_ordered(void)
> +{
> + sched_scalable_thread_state_t *ts;
> +
> + ts = sched_ts;
> + if (ts->rctx != NULL)
> + _schedule_release_ordered(ts);
> +}
> +
> +static int schedule_multi(odp_queue_t *from, uint64_t wait, odp_event_t ev[],
> + int num)
> +{
> + sched_scalable_thread_state_t *ts;
> + int n;
> + odp_time_t start;
> + odp_time_t delta;
> + odp_time_t deadline;
> +
> + ts = sched_ts;
> + if (odp_unlikely(ts->pause)) {
> + if (ts->atomq != NULL) {
> +#ifdef CONFIG_QSCHST_LOCK
> + sched_elem_t *atomq;
> +
> + atomq = ts->atomq;
> + LOCK(&atomq->qschlock);
> +#endif
> + _schedule_release_atomic(ts);
> +#ifdef CONFIG_QSCHST_LOCK
> + UNLOCK(&atomq->qschlock);
> +#endif
> + } else if (ts->rctx != NULL) {
> + _schedule_release_ordered(ts);
> + }
> + return 0;
> + }
> +
> + if (wait == ODP_SCHED_NO_WAIT)
> + return _schedule(from, ev, num);
> +
> + if (wait == ODP_SCHED_WAIT) {
> + for (;;) {
> + n = _schedule(from, ev, num);
> + if (odp_likely(n > 0))
> + return n;
> + }
> + }
> +
> + start = odp_time_local();
> +
> + n = _schedule(from, ev, num);
> + if (odp_likely(n > 0))
> + return n;
> +
> + delta = odp_time_local_from_ns(wait);
> + deadline = odp_time_sum(start, delta);
> +
> + while (odp_time_cmp(deadline, odp_time_local()) > 0) {
> + n = _schedule(from, ev, num);
> + if (odp_likely(n > 0))
> + return n;
> + }
> +
> + return 0;
> +}
> +
> +static odp_event_t schedule(odp_queue_t *from, uint64_t wait)
> +{
> + odp_event_t ev = ODP_EVENT_INVALID;
> + const int num = 1;
> + sched_scalable_thread_state_t *ts;
> + int n;
> + odp_time_t start;
> + odp_time_t delta;
> + odp_time_t deadline;
> +
> + ts = sched_ts;
> + if (odp_unlikely(ts->pause)) {
> + if (ts->atomq != NULL) {
> +#ifdef CONFIG_QSCHST_LOCK
> + sched_elem_t *atomq;
> +
> + atomq = ts->atomq;
> + LOCK(&atomq->qschlock);
> +#endif
> + _schedule_release_atomic(ts);
> +#ifdef CONFIG_QSCHST_LOCK
> + UNLOCK(&atomq->qschlock);
> +#endif
> + } else if (ts->rctx != NULL) {
> + _schedule_release_ordered(ts);
> + }
> + return ev;
> + }
> +
> + if (wait == ODP_SCHED_NO_WAIT) {
> + (void)_schedule(from, &ev, num);
> + return ev;
> + }
> +
> + if (wait == ODP_SCHED_WAIT) {
> + for (;;) {
> + n = _schedule(from, &ev, num);
> + if (odp_likely(n > 0))
> + return ev;
> + }
> + }
> +
> + start = odp_time_local();
> +
> + n = _schedule(from, &ev, num);
> + if (odp_likely(n > 0))
> + return ev;
> +
> + delta = odp_time_local_from_ns(wait);
> + deadline = odp_time_sum(start, delta);
> +
> + while (odp_time_cmp(deadline, odp_time_local()) > 0) {
> + n = _schedule(from, &ev, num);
> + if (odp_likely(n > 0))
> + return ev;
> + }
> +
> + return ev;
> +}
> +
> +static void schedule_pause(void)
> +{
> + sched_ts->pause = true;
> +}
> +
> +static void schedule_resume(void)
> +{
> + sched_ts->pause = false;
> +}
> +
> +static uint64_t schedule_wait_time(uint64_t ns)
> +{
> + return ns;
> +}
> +
> +static int schedule_num_prio(void)
> +{
> + return ODP_SCHED_PRIO_NUM;
> +}
> +
> +static int schedule_group_update(sched_group_t *sg,
> + uint32_t sgi,
> + const odp_thrmask_t *mask,
> + int join_leave)
> +{
> + int thr;
> + uint32_t p;
> +
> + /* Internal function, do not validate inputs */
> +
> + /* Notify relevant threads about the change */
> + thr = odp_thrmask_first(mask);
> + while (0 <= thr) {
> + /* Add thread to scheduler group's wanted thread mask */
> + if (join_leave == SCHED_GROUP_JOIN)
> + atom_bitset_set(&sg->thr_wanted, thr, __ATOMIC_RELAXED);
> + else
> + atom_bitset_clr(&sg->thr_wanted, thr, __ATOMIC_RELAXED);
> + for (p = 0; p < ODP_SCHED_PRIO_NUM; p++) {
> + if (sg->xcount[p] != 0) {
> + /* This priority level has ODP queues
> + * Notify the thread about membership in
> + * this group/priority
> + */
> + if (join_leave == SCHED_GROUP_JOIN)
> + atom_bitset_set(
> + &thread_state[thr].sg_wanted[p],
> + sgi,
> + __ATOMIC_RELEASE);
> + else
> + atom_bitset_clr(
> + &thread_state[thr].sg_wanted[p],
> + sgi,
> + __ATOMIC_RELEASE);
> + __atomic_store_n(&thread_state[thr].sg_sem,
> + 1,
> + __ATOMIC_RELEASE);
> + }
> + }
> + thr = odp_thrmask_next(mask, thr);
> + }
> +
> + return 0;
> +}
> +
> +static int _schedule_group_thrmask(sched_group_t *sg, odp_thrmask_t *mask)
> +{
> + bitset_t bs;
> + uint32_t bit;
> +
> + /* Internal function, do not validate inputs */
> +
> + odp_thrmask_zero(mask);
> + bs = sg->thr_wanted;
> + while (!bitset_is_null(bs)) {
> + bit = bitset_ffs(bs) - 1;
> + bs = bitset_clr(bs, bit);
> + odp_thrmask_set(mask, bit);
> + }
> +
> + return 0;
> +}
> +
> +static odp_schedule_group_t schedule_group_create(const char *name,
> + const odp_thrmask_t *mask)
> +{
> + odp_shm_t shm;
> + uint32_t sgi;
> + sched_group_mask_t free;
> + uint32_t xfactor;
> + sched_group_t *sg;
> + uint32_t p;
> + uint32_t x;
> +
> + /* Validate inputs */
> + if (mask == NULL)
> + ODP_ABORT("mask is NULL\n");
> +
> + odp_spinlock_lock(&sched_grp_lock);
> +
> + /* Allocate a scheduler group */
> + free = __atomic_load_n(&sg_free, __ATOMIC_RELAXED);
> + do {
> + /* All sched_groups in use */
> + if (bitset_is_null(free))
> + goto no_free_sched_group;
> +
> + sgi = bitset_ffs(free) - 1;
> + /* All sched_groups in use */
> + if (sgi >= MAX_SCHED_GROUP)
> + goto no_free_sched_group;
> + } while (!__atomic_compare_exchange_n(&sg_free,
> + &free,
> + bitset_clr(free, sgi),
> + true,
> + __ATOMIC_ACQUIRE,
> + __ATOMIC_ACQUIRE));
> +
> + /* Compute xfactor (spread factor) from the number of threads
> + * present in the thread mask. Preferable this would be an
> + * explicit parameter.
> + */
> + xfactor = odp_thrmask_count(mask);
> + if (xfactor < 1)
> + xfactor = 1;
> +
> + shm = odp_shm_reserve(name ? name : "",
> + (sizeof(sched_group_t) +
> + (ODP_SCHED_PRIO_NUM * xfactor - 1) *
> + sizeof(sched_queue_t)),
> + ODP_CACHE_LINE_SIZE,
> + ODP_SHM_PROC);
> + if (ODP_SHM_INVALID == shm)
> + goto shm_reserve_failed;
> +
> + sg = (sched_group_t *)odp_shm_addr(shm);
> + if (sg == NULL)
> + goto shm_addr_failed;
> +
> + strncpy(sg->name, name ? name : "", ODP_SCHED_GROUP_NAME_LEN - 1);
> + sg->shm = shm;
> + sg_vec[sgi] = sg;
> + memset(sg->thr_actual, 0, sizeof(sg->thr_actual));
> + sg->thr_wanted = bitset_null();
> + sg->xfactor = xfactor;
> + for (p = 0; p < ODP_SCHED_PRIO_NUM; p++) {
> + sg->xcount[p] = 0;
> + for (x = 0; x < xfactor; x++)
> + schedq_init(&sg->schedq[p * xfactor + x], p);
> + }
> + if (odp_thrmask_count(mask) != 0)
> + schedule_group_update(sg, sgi, mask, SCHED_GROUP_JOIN);
> +
> + odp_spinlock_unlock(&sched_grp_lock);
> +
> + return (odp_schedule_group_t)(sgi);
> +
> +shm_addr_failed:
> + odp_shm_free(shm);
> +
> +shm_reserve_failed:
> + /* Free the allocated group index */
> + atom_bitset_set(&sg_free, sgi, __ATOMIC_RELAXED);
> +
> +no_free_sched_group:
> + odp_spinlock_unlock(&sched_grp_lock);
> +
> + return ODP_SCHED_GROUP_INVALID;
> +}
> +
> +static int schedule_group_destroy(odp_schedule_group_t group)
> +{
> + uint32_t sgi;
> + sched_group_t *sg;
> + uint32_t p;
> + int ret = 0;
> +
> + /* Validate inputs */
> + if (group < 0 && group >= (odp_schedule_group_t)MAX_SCHED_GROUP) {
> + ret = -1;
> + goto invalid_group;
> + }
> +
> + if (sched_ts &&
> + odp_unlikely(__atomic_load_n(&sched_ts->sg_sem,
> + __ATOMIC_RELAXED) != 0)) {
> + (void)__atomic_load_n(&sched_ts->sg_sem,
> + __ATOMIC_ACQUIRE);
> + sched_ts->sg_sem = 0;
> + update_sg_membership(sched_ts);
> + }
> + odp_spinlock_lock(&sched_grp_lock);
> +
> + sgi = (uint32_t)group;
> + if (bitset_is_set(sg_free, sgi)) {
> + ret = -1;
> + goto group_not_found;
> + }
> +
> + sg = sg_vec[sgi];
> + /* First ensure all threads have processed group_join/group_leave
> + * requests.
> + */
> + for (p = 0; p < ODP_SCHED_PRIO_NUM; p++) {
> + if (sg->xcount[p] != 0) {
> + bitset_t wanted = atom_bitset_load(
> + &sg->thr_wanted, __ATOMIC_RELAXED);
> +
> + SEVL();
> + while (WFE() &&
> + !bitset_is_eql(wanted,
> + bitset_ldex(&sg->thr_actual[p],
> + __ATOMIC_RELAXED)))
> + DOZE();
> + }
> + /* Else ignore because no ODP queues on this prio */
> + }
> +
> + /* Check if all threads/queues have left the group */
> + for (p = 0; p < ODP_SCHED_PRIO_NUM; p++) {
> + if (!bitset_is_null(sg->thr_actual[p])) {
> + ODP_ERR("Group has threads\n");
> + ret = -1;
> + goto thrd_q_present_in_group;
> + }
> + if (sg->xcount[p] != 0) {
> + ODP_ERR("Group has queues\n");
> + ret = -1;
> + goto thrd_q_present_in_group;
> + }
> + }
> +
> + odp_shm_free(sg->shm);
> + sg_vec[sgi] = NULL;
> + atom_bitset_set(&sg_free, sgi, __ATOMIC_RELEASE);
> +
> + odp_spinlock_unlock(&sched_grp_lock);
> +
> + return ret;
> +
> +thrd_q_present_in_group:
> +
> +group_not_found:
> + odp_spinlock_unlock(&sched_grp_lock);
> +
> +invalid_group:
> +
> + return ret;
> +}
> +
> +static odp_schedule_group_t schedule_group_lookup(const char *name)
> +{
> + uint32_t sgi;
> + odp_schedule_group_t group;
> +
> + /* Validate inputs */
> + if (name == NULL)
> + ODP_ABORT("name or mask is NULL\n");
> +
> + group = ODP_SCHED_GROUP_INVALID;
> +
> + odp_spinlock_lock(&sched_grp_lock);
> +
> + /* Scan through the schedule group array */
> + for (sgi = 0; sgi < MAX_SCHED_GROUP; sgi++) {
> + if ((sg_vec[sgi] != NULL) &&
> + (strncmp(name, sg_vec[sgi]->name,
> + ODP_SCHED_GROUP_NAME_LEN) == 0)) {
> + group = (odp_schedule_group_t)sgi;
> + break;
> + }
> + }
> +
> + odp_spinlock_unlock(&sched_grp_lock);
> +
> + return group;
> +}
> +
> +static int schedule_group_join(odp_schedule_group_t group,
> + const odp_thrmask_t *mask)
> +{
> + uint32_t sgi;
> + sched_group_t *sg;
> + int ret;
> +
> + /* Validate inputs */
> + if (group < 0 && group >= ((odp_schedule_group_t)MAX_SCHED_GROUP))
> + return -1;
> +
> + if (mask == NULL)
> + ODP_ABORT("name or mask is NULL\n");
> +
> + odp_spinlock_lock(&sched_grp_lock);
> +
> + sgi = (uint32_t)group;
> + if (bitset_is_set(sg_free, sgi)) {
> + odp_spinlock_unlock(&sched_grp_lock);
> + return -1;
> + }
> +
> + sg = sg_vec[sgi];
> + ret = schedule_group_update(sg, sgi, mask, SCHED_GROUP_JOIN);
> +
> + odp_spinlock_unlock(&sched_grp_lock);
> +
> + return ret;
> +}
> +
> +static int schedule_group_leave(odp_schedule_group_t group,
> + const odp_thrmask_t *mask)
> +{
> + uint32_t sgi;
> + sched_group_t *sg;
> + int ret = 0;
> +
> + /* Validate inputs */
> + if (group < 0 && group >= (odp_schedule_group_t)MAX_SCHED_GROUP) {
> + ret = -1;
> + goto invalid_group;
> + }
> +
> + if (mask == NULL)
> + ODP_ABORT("name or mask is NULL\n");
> +
> + odp_spinlock_lock(&sched_grp_lock);
> +
> + sgi = (uint32_t)group;
> + if (bitset_is_set(sg_free, sgi)) {
> + ret = -1;
> + goto group_not_found;
> + }
> +
> + sg = sg_vec[sgi];
> +
> + ret = schedule_group_update(sg, sgi, mask, SCHED_GROUP_LEAVE);
> +
> + odp_spinlock_unlock(&sched_grp_lock);
> +
> + return ret;
> +
> +group_not_found:
> + odp_spinlock_unlock(&sched_grp_lock);
> +
> +invalid_group:
> + return ret;
> +}
> +
> +static int schedule_group_thrmask(odp_schedule_group_t group,
> + odp_thrmask_t *mask)
> +{
> + uint32_t sgi;
> + sched_group_t *sg;
> + int ret = 0;
> +
> + /* Validate inputs */
> + if (group < 0 && group >= ((odp_schedule_group_t)MAX_SCHED_GROUP)) {
> + ret = -1;
> + goto invalid_group;
> + }
> +
> + if (mask == NULL)
> + ODP_ABORT("name or mask is NULL\n");
> +
> + odp_spinlock_lock(&sched_grp_lock);
> +
> + sgi = (uint32_t)group;
> + if (bitset_is_set(sg_free, sgi)) {
> + ret = -1;
> + goto group_not_found;
> + }
> +
> + sg = sg_vec[sgi];
> + ret = _schedule_group_thrmask(sg, mask);
> +
> + odp_spinlock_unlock(&sched_grp_lock);
> +
> + return ret;
> +
> +group_not_found:
> + odp_spinlock_unlock(&sched_grp_lock);
> +
> +invalid_group:
> + return ret;
> +}
> +
> +static int schedule_group_info(odp_schedule_group_t group,
> + odp_schedule_group_info_t *info)
> +{
> + uint32_t sgi;
> + sched_group_t *sg;
> + int ret = 0;
> +
> + /* Validate inputs */
> + if (group < 0 && group >= ((odp_schedule_group_t)MAX_SCHED_GROUP)) {
> + ret = -1;
> + goto invalid_group;
> + }
> +
> + if (info == NULL)
> + ODP_ABORT("name or mask is NULL\n");
> +
> + odp_spinlock_lock(&sched_grp_lock);
> +
> + sgi = (uint32_t)group;
> + if (bitset_is_set(sg_free, sgi)) {
> + ret = -1;
> + goto group_not_found;
> + }
> +
> + sg = sg_vec[sgi];
> +
> + ret = _schedule_group_thrmask(sg, &info->thrmask);
> +
> + info->name = sg->name;
> +
> + odp_spinlock_unlock(&sched_grp_lock);
> +
> + return ret;
> +
> +group_not_found:
> + odp_spinlock_unlock(&sched_grp_lock);
> +
> +invalid_group:
> + return ret;
> +}
> +
> +static int schedule_init_global(void)
> +{
> + odp_thrmask_t mask;
> + odp_schedule_group_t tmp_all;
> + odp_schedule_group_t tmp_wrkr;
> + odp_schedule_group_t tmp_ctrl;
> + uint32_t bits;
> +
> + odp_spinlock_init(&sched_grp_lock);
> +
> + bits = MAX_SCHED_GROUP;
> + if (MAX_SCHED_GROUP == sizeof(sg_free) * CHAR_BIT)
> + sg_free = ~0ULL;
> + else
> + sg_free = (1ULL << bits) - 1;
> +
> + for (uint32_t i = 0; i < MAX_SCHED_GROUP; i++)
> + sg_vec[i] = NULL;
> + for (uint32_t i = 0; i < MAXTHREADS; i++) {
> + thread_state[i].sg_sem = 0;
> + for (uint32_t j = 0; j < ODP_SCHED_PRIO_NUM; j++)
> + thread_state[i].sg_wanted[j] = bitset_null();
> + }
> +
> + /* Create sched groups for default GROUP_ALL, GROUP_WORKER and
> + * GROUP_CONTROL groups.
> + */
> + odp_thrmask_zero(&mask);
> + tmp_all = odp_schedule_group_create("__group_all", &mask);
> + if (tmp_all != ODP_SCHED_GROUP_ALL) {
> + ODP_ERR("Could not create ODP_SCHED_GROUP_ALL()\n");
> + goto failed_create_group_all;
> + }
> +
> + tmp_wrkr = odp_schedule_group_create("__group_worker", &mask);
> + if (tmp_wrkr != ODP_SCHED_GROUP_WORKER) {
> + ODP_ERR("Could not create ODP_SCHED_GROUP_WORKER()\n");
> + goto failed_create_group_worker;
> + }
> +
> + tmp_ctrl = odp_schedule_group_create("__group_control", &mask);
> + if (tmp_ctrl != ODP_SCHED_GROUP_CONTROL) {
> + ODP_ERR("Could not create ODP_SCHED_GROUP_CONTROL()\n");
> + goto failed_create_group_control;
> + }
> +
> + return 0;
> +
> +failed_create_group_control:
> + if (tmp_ctrl != ODP_SCHED_GROUP_INVALID)
> + odp_schedule_group_destroy(ODP_SCHED_GROUP_CONTROL);
> +
> +failed_create_group_worker:
> + if (tmp_wrkr != ODP_SCHED_GROUP_INVALID)
> + odp_schedule_group_destroy(ODP_SCHED_GROUP_WORKER);
> +
> +failed_create_group_all:
> + if (tmp_all != ODP_SCHED_GROUP_INVALID)
> + odp_schedule_group_destroy(ODP_SCHED_GROUP_ALL);
> +
> + return -1;
> +}
> +
> +static int schedule_term_global(void)
> +{
> + /* Destroy sched groups for default GROUP_ALL, GROUP_WORKER and
> + * GROUP_CONTROL groups.
> + */
> + if (odp_schedule_group_destroy(ODP_SCHED_GROUP_ALL) != 0)
> + ODP_ERR("Failed to destroy ODP_SCHED_GROUP_ALL\n");
> + if (odp_schedule_group_destroy(ODP_SCHED_GROUP_WORKER) != 0)
> + ODP_ERR("Failed to destroy ODP_SCHED_GROUP_WORKER\n");
> + if (odp_schedule_group_destroy(ODP_SCHED_GROUP_CONTROL) != 0)
> + ODP_ERR("Failed to destroy ODP_SCHED_GROUP_CONTROL\n");
> +
> + return 0;
> +}
> +
> +static int schedule_init_local(void)
> +{
> + int thr_id;
> + odp_thread_type_t thr_type;
> + odp_thrmask_t mask;
> +
> + thr_id = odp_thread_id();
> + if (thread_state_init(thr_id))
> + goto failed_to_init_ts;
> +
> + /* Add this thread to default schedule groups */
> + thr_type = odp_thread_type();
> + odp_thrmask_zero(&mask);
> + odp_thrmask_set(&mask, thr_id);
> +
> + if (odp_schedule_group_join(ODP_SCHED_GROUP_ALL, &mask) != 0) {
> + ODP_ERR("Failed to join ODP_SCHED_GROUP_ALL\n");
> + goto failed_to_join_grp_all;
> + }
> + if (thr_type == ODP_THREAD_CONTROL) {
> + if (odp_schedule_group_join(ODP_SCHED_GROUP_CONTROL,
> + &mask) != 0) {
> + ODP_ERR("Failed to join ODP_SCHED_GROUP_CONTROL\n");
> + goto failed_to_join_grp_ctrl;
> + }
> + } else {
> + if (odp_schedule_group_join(ODP_SCHED_GROUP_WORKER,
> + &mask) != 0) {
> + ODP_ERR("Failed to join ODP_SCHED_GROUP_WORKER\n");
> + goto failed_to_join_grp_wrkr;
> + }
> + }
> +
> + return 0;
> +
> +failed_to_join_grp_wrkr:
> +
> +failed_to_join_grp_ctrl:
> + odp_schedule_group_leave(ODP_SCHED_GROUP_ALL, &mask);
> +
> +failed_to_join_grp_all:
> +failed_to_init_ts:
> +
> + return -1;
> +}
> +
> +static int schedule_term_local(void)
> +{
> + int thr_id;
> + odp_thread_type_t thr_type;
> + odp_thrmask_t mask;
> + int rc = 0;
> +
> + /* Remove this thread from default schedule groups */
> + thr_id = odp_thread_id();
> + thr_type = odp_thread_type();
> + odp_thrmask_zero(&mask);
> + odp_thrmask_set(&mask, thr_id);
> +
> + if (odp_schedule_group_leave(ODP_SCHED_GROUP_ALL, &mask) != 0)
> + ODP_ERR("Failed to leave ODP_SCHED_GROUP_ALL\n");
> + if (thr_type == ODP_THREAD_CONTROL) {
> + if (odp_schedule_group_leave(ODP_SCHED_GROUP_CONTROL,
> + &mask) != 0)
> + ODP_ERR("Failed to leave ODP_SCHED_GROUP_CONTROL\n");
> + } else {
> + if (odp_schedule_group_leave(ODP_SCHED_GROUP_WORKER,
> + &mask) != 0)
> + ODP_ERR("Failed to leave ODP_SCHED_GROUP_WORKER\n");
> + }
> +
> + update_sg_membership(sched_ts);
> +
> + /* Check if the thread is still part of any groups */
> + if (sched_ts->num_schedq != 0) {
> + ODP_ERR("Thread %d still part of scheduler group(s)\n",
> + sched_ts->tidx);
> + rc = -1;
> + }
> +
> + return rc;
> +}
> +
> +int queue_tm_reorder(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
> +{
> + (void)queue;
> + (void)buf_hdr;
> + return 0;
> +}
> +
> +static void pktio_start(int pktio_index, int num_in_queue, int
> in_queue_idx[])
> +{
> + int i;
> + uint32_t old, tag, j;
> +
> + for (i = 0; i < num_in_queue; i++) {
> + /* Try to reserve a slot */
> + if (__atomic_fetch_add(&pktin_num,
> + 1, __ATOMIC_RELAXED) >= PKTIN_MAX) {
> + __atomic_fetch_sub(&pktin_num, 1, __ATOMIC_RELAXED);
> + ODP_ABORT("Too many pktio in queues for scheduler\n");
> + }
> + /* A slot has been reserved, now we need to find an empty one */
> + for (j = 0; ; j = (j + 1) % PKTIN_MAX) {
> + if (__atomic_load_n(&pktin_tags[j],
> + __ATOMIC_RELAXED) != TAG_EMPTY)
> + /* Slot used, continue with next */
> + continue;
> + /* Empty slot found */
> + old = TAG_EMPTY;
> + tag = PKTIO_QUEUE_2_TAG(pktio_index, in_queue_idx[i]);
> + if (__atomic_compare_exchange_n(&pktin_tags[j],
> + &old,
> + tag,
> + true,
> + __ATOMIC_RELEASE,
> + __ATOMIC_RELAXED)) {
> + /* Success grabbing slot,update high
> + * watermark
> + */
> + __atomic_fetch_max(&pktin_hi,
> + j + 1, __ATOMIC_RELAXED);
> + /* One more tag (queue) for this pktio
> + * instance
> + */
> + __atomic_fetch_add(&pktin_count[pktio_index],
> + 1, __ATOMIC_RELAXED);
> + /* Continue with next RX queue */
> + break;
> + }
> + /* Failed to grab slot */
> + }
> + }
> +}
> +
> +static int num_grps(void)
> +{
> + return MAX_SCHED_GROUP;
> +}
> +
> +/*
> + * Stubs for internal scheduler abstraction layer due to absence of NULL
> + * checking before calling the function pointer.
> + */
> +
> +static int thr_add(odp_schedule_group_t group, int thr)
> +{
> + /* This function is a schedule_init_local duplicate. */
> + (void)group;
> + (void)thr;
> + return 0;
> +}
> +
> +static int thr_rem(odp_schedule_group_t group, int thr)
> +{
> + /* This function is a schedule_term_local duplicate. */
> + (void)group;
> + (void)thr;
> + return 0;
> +}
> +
> +static int init_queue(uint32_t queue_index,
> + const odp_schedule_param_t *sched_param)
> +{
> + /* Not used in scalable scheduler. */
> + (void)queue_index;
> + (void)sched_param;
> + return 0;
> +}
> +
> +static void destroy_queue(uint32_t queue_index)
> +{
> + /* Not used in scalable scheduler. */
> + (void)queue_index;
> +}
> +
> +static int sched_queue(uint32_t queue_index)
> +{
> + /* Not used in scalable scheduler. */
> + (void)queue_index;
> + return 0;
> +}
> +
> +static int ord_enq_multi(uint32_t queue_index, void *p_buf_hdr[],
> + int num, int *ret)
> +{
> + (void)queue_index;
> + (void)p_buf_hdr;
> + (void)num;
> + (void)ret;
> + return 0;
> +}
> +
> +static void schedule_prefetch(int num)
> +{
> + (void)num;
> +}
> +
> +static void order_lock(void)
> +{
> +}
> +
> +static void order_unlock(void)
> +{
> +}
> +
> +const schedule_fn_t schedule_scalable_fn = {
> + .pktio_start = pktio_start,
> + .thr_add = thr_add,
> + .thr_rem = thr_rem,
> + .num_grps = num_grps,
> + .init_queue = init_queue,
> + .destroy_queue = destroy_queue,
> + .sched_queue = sched_queue,
> + .ord_enq_multi = ord_enq_multi,
> + .init_global = schedule_init_global,
> + .term_global = schedule_term_global,
> + .init_local = schedule_init_local,
> + .term_local = schedule_term_local,
> + .order_lock = order_lock,
> + .order_unlock = order_unlock,
> +};
> +
> +const schedule_api_t schedule_scalable_api = {
> + .schedule_wait_time = schedule_wait_time,
> + .schedule = schedule,
> + .schedule_multi = schedule_multi,
> + .schedule_pause = schedule_pause,
> + .schedule_resume = schedule_resume,
> + .schedule_release_atomic = schedule_release_atomic,
> + .schedule_release_ordered = schedule_release_ordered,
> + .schedule_prefetch = schedule_prefetch,
> + .schedule_num_prio = schedule_num_prio,
> + .schedule_group_create = schedule_group_create,
> + .schedule_group_destroy = schedule_group_destroy,
> + .schedule_group_lookup = schedule_group_lookup,
> + .schedule_group_join = schedule_group_join,
> + .schedule_group_leave = schedule_group_leave,
> + .schedule_group_thrmask = schedule_group_thrmask,
> + .schedule_group_info = schedule_group_info,
> + .schedule_order_lock = schedule_order_lock,
> + .schedule_order_unlock = schedule_order_unlock,
> +};
> diff --git a/platform/linux-generic/odp_schedule_scalable_ordered.c
> b/platform/linux-generic/odp_schedule_scalable_ordered.c
> new file mode 100644
> index 00000000..ba2b6c3a
> --- /dev/null
> +++ b/platform/linux-generic/odp_schedule_scalable_ordered.c
> @@ -0,0 +1,285 @@
> +/* Copyright (c) 2017, ARM Limited
> + * All rights reserved.
> + *
> + * SPDX-License-Identifier: BSD-3-Clause
> + */
> +
> +#include <odp/api/shared_memory.h>
> +#include <odp_queue_internal.h>
> +#include <odp_schedule_if.h>
> +#include <odp_schedule_ordered_internal.h>
> +#include <odp_llsc.h>
> +#include <odp_bitset.h>
> +
> +#include <string.h>
> +
> +extern __thread sched_scalable_thread_state_t *sched_ts;
> +
> +#define RWIN_NAME_SIZE 32
> +
> +reorder_window_t *rwin_alloc(int rwin_id, odp_shm_t *shm, unsigned
> lock_count)
> +{
> + char rwin_name[RWIN_NAME_SIZE];
> + reorder_window_t *rwin;
> + uint32_t i;
> +
> + strncpy(rwin_name, "rwin", RWIN_NAME_SIZE);
> + i = strlen(rwin_name);
> + snprintf(rwin_name + i,
> + (RWIN_NAME_SIZE - i), "%d", rwin_id);
> +
> + *shm = odp_shm_reserve(rwin_name,
> + sizeof(reorder_window_t),
> + ODP_CACHE_LINE_SIZE,
> + ODP_SHM_PROC);
> + if (ODP_SHM_INVALID == *shm)
> + goto shm_reserve_failed;
> +
> + rwin = (reorder_window_t *)odp_shm_addr(*shm);
> + if (rwin == NULL)
> + goto shm_addr_failed;
> +
> + rwin->hc.head = 0;
> + rwin->hc.chgi = 0;
> + rwin->winmask = RWIN_SIZE - 1;
> + rwin->tail = 0;
> + rwin->turn = 0;
> + rwin->lock_count = (uint16_t)lock_count;
> + memset(rwin->olock, 0, sizeof(rwin->olock));
> + for (i = 0; i < RWIN_SIZE; i++)
> + rwin->ring[i] = NULL;
> +
> + return rwin;
> +
> +shm_addr_failed:
> + odp_shm_free(*shm);
> +
> +shm_reserve_failed:
> + return NULL;
> +}
> +
> +bool rwin_reserve(reorder_window_t *rwin, uint32_t *sn)
> +{
> + uint32_t head;
> + uint32_t oldt;
> + uint32_t newt;
> + uint32_t winmask;
> +
> + /* Read head and tail separately */
> + oldt = rwin->tail;
> + winmask = rwin->winmask;
> + do {
> + /* Need __atomic_load to avoid compiler reordering */
> + head = __atomic_load_n(&rwin->hc.head, __ATOMIC_RELAXED);
> + if (odp_unlikely(oldt - head >= winmask))
> + return false;
> +
> + newt = oldt + 1;
> + } while (!__atomic_compare_exchange(&rwin->tail,
> + &oldt,
> + &newt,
> + true,
> + __ATOMIC_RELAXED,
> + __ATOMIC_RELAXED));
> + *sn = oldt;
> +
> + return true;
> +}
> +
> +void rwin_insert(reorder_window_t *rwin,
> + reorder_context_t *rctx,
> + uint32_t sn,
> + void (*callback)(reorder_context_t *))
> +{
> + /* Initialise to silence scan-build */
> + hc_t old = {0, 0};
> + hc_t new;
> + uint32_t winmask;
> +
> + __atomic_load(&rwin->hc, &old, __ATOMIC_ACQUIRE);
> + winmask = rwin->winmask;
> + if (old.head != sn) {
> + /* We are out-of-order. Store context in reorder window,
> + * releasing its content.
> + */
> + ODP_ASSERT(rwin->ring[sn & winmask] == NULL);
> + atomic_store_release(&rwin->ring[sn & winmask],
> + rctx,
> + /*readonly=*/false);
> + rctx = NULL;
> + do {
> + hc_t new;
> +
> + new.head = old.head;
> + new.chgi = old.chgi + 1; /* Changed value */
> + /* Update head & chgi, fail if any has changed */
> + if (__atomic_compare_exchange(&rwin->hc,
> + /* Updated on fail */
> + &old,
> + &new,
> + true,
> + /* Rel our ring update */
> + __ATOMIC_RELEASE,
> + __ATOMIC_ACQUIRE))
> + /* CAS succeeded => head same (we are not
> + * in-order), chgi updated.
> + */
> + return;
> + /* CAS failed => head and/or chgi changed.
> + * We might not be out-of-order anymore.
> + */
> + } while (old.head != sn);
> + }
> +
> + /* old.head == sn => we are now in-order! */
> + ODP_ASSERT(old.head == sn);
> + /* We are in-order so our responsibility to retire contexts */
> + new.head = old.head;
> + new.chgi = old.chgi + 1;
> +
> + /* Retire our in-order context (if we still have it) */
> + if (rctx != NULL) {
> + callback(rctx);
> + new.head++;
> + }
> +
> + /* Retire in-order contexts in the ring
> + * The first context might actually be ours (if we were originally
> + * out-of-order)
> + */
> + do {
> + for (;;) {
> + rctx = __atomic_load_n(&rwin->ring[new.head & winmask],
> + __ATOMIC_ACQUIRE);
> + if (rctx == NULL)
> + break;
> + /* We are the only thread that are in-order
> + * (until head updated) so don't have to use
> + * atomic load-and-clear (exchange)
> + */
> + rwin->ring[new.head & winmask] = NULL;
> + callback(rctx);
> + new.head++;
> + }
> + /* Update head&chgi, fail if chgi has changed (head cannot change) */
> + } while (!__atomic_compare_exchange(&rwin->hc,
> + &old, /* Updated on failure */
> + &new,
> + false, /* weak */
> + __ATOMIC_RELEASE, /* Release our ring updates */
> + __ATOMIC_ACQUIRE));
> +}
> +
> +void rctx_init(reorder_context_t *rctx, uint16_t idx,
> + reorder_window_t *rwin, uint32_t sn)
> +{
> + /* rctx->rvec_free and rctx->idx already initialised in
> + * thread_state_init function.
> + */
> + ODP_ASSERT(rctx->idx == idx);
> + rctx->rwin = rwin;
> + rctx->sn = sn;
> + rctx->olock_flags = 0;
> + /* First => no next reorder context */
> + rctx->next_idx = idx;
> + /* Where to store next event */
> + rctx->cur_idx = idx;
> + rctx->numevts = 0;
> +}
> +
> +inline void rctx_free(const reorder_context_t *rctx)
> +{
> + const reorder_context_t *const base = &rctx[-(int)rctx->idx];
> + const uint32_t first = rctx->idx;
> + uint32_t next_idx;
> +
> + next_idx = rctx->next_idx;
> +
> + ODP_ASSERT(rctx->rwin != NULL);
> + /* Set free bit */
> + if (rctx->rvec_free == &sched_ts->rvec_free)
> + /* Since it is our own reorder context, we can instead
> + * perform a non-atomic and relaxed update on our private
> + * rvec_free.
> + */
> + sched_ts->priv_rvec_free =
> + bitset_set(sched_ts->priv_rvec_free, rctx->idx);
> + else
> + atom_bitset_set(rctx->rvec_free, rctx->idx, __ATOMIC_RELEASE);
> +
> + /* Can't dereference rctx after the corresponding free bit is set */
> + while (next_idx != first) {
> + rctx = &base[next_idx];
> + next_idx = rctx->next_idx;
> + /* Set free bit */
> + if (rctx->rvec_free == &sched_ts->rvec_free)
> + sched_ts->priv_rvec_free =
> + bitset_set(sched_ts->priv_rvec_free, rctx->idx);
> + else
> + atom_bitset_set(rctx->rvec_free, rctx->idx,
> + __ATOMIC_RELEASE);
> + }
> +}
> +
> +inline void olock_unlock(const reorder_context_t *rctx, reorder_window_t
> *rwin,
> + uint32_t lock_index)
> +{
> + if ((rctx->olock_flags & (1U << lock_index)) == 0) {
> + /* Use relaxed ordering, we are not releasing any updates */
> + rwin->olock[lock_index] = rctx->sn + 1;
> + }
> +}
> +
> +void olock_release(const reorder_context_t *rctx)
> +{
> + reorder_window_t *rwin;
> +
> + rwin = rctx->rwin;
> +
> + switch (rwin->lock_count) {
> + case 2:
> + olock_unlock(rctx, rwin, 1);
> + case 1:
> + olock_unlock(rctx, rwin, 0);
> + }
> + ODP_STATIC_ASSERT(NUM_OLOCKS == 2, "Number of ordered locks != 2");
> +}
> +
> +void rctx_retire(reorder_context_t *first)
> +{
> + reorder_context_t *rctx;
> + queue_entry_t *q;
> + uint32_t i;
> + uint32_t j;
> + uint32_t num;
> + int rc;
> +
> + rctx = first;
> + do {
> + /* Process all events in this reorder context */
> + for (i = 0; i < rctx->numevts;) {
> + q = rctx->destq[i];
> + /* Find index of next different destq */
> + j = i + 1;
> + while (j < rctx->numevts && rctx->destq[j] == q)
> + j++;
> + num = j - i;
> + rc = q->s.enqueue_multi(q, &rctx->events[i], num);
> + if (odp_unlikely(rc != (int)num))
> + ODP_ERR("Failed to enqueue deferred events\n");
> + i += num;
> + }
> + /* Update rctx pointer to point to 'next_idx' element */
> + rctx += (int)rctx->next_idx - (int)rctx->idx;
> + } while (rctx != first);
> + olock_release(first);
> + rctx_free(first);
> +}
> +
> +void rctx_release(reorder_context_t *rctx)
> +{
> + /* Insert reorder context into reorder window, potentially calling the
> + * rctx_retire function for all pending reorder_contexts.
> + */
> + rwin_insert(rctx->rwin, rctx, rctx->sn, rctx_retire);
> +}
> diff --git a/platform/linux-generic/odp_traffic_mngr.c
> b/platform/linux-generic/odp_traffic_mngr.c
> index 4e9358b9..3244dfe3 100644
> --- a/platform/linux-generic/odp_traffic_mngr.c
> +++ b/platform/linux-generic/odp_traffic_mngr.c
> @@ -3918,9 +3918,10 @@ odp_tm_queue_t odp_tm_queue_create(odp_tm_t odp_tm,
> tm_queue_obj->pkt = ODP_PACKET_INVALID;
> odp_ticketlock_init(&tm_wred_node->tm_wred_node_lock);
>
> - tm_queue_obj->tm_qentry.s.type = QUEUE_TYPE_TM;
> - tm_queue_obj->tm_qentry.s.enqueue = queue_tm_reenq;
> - tm_queue_obj->tm_qentry.s.enqueue_multi = queue_tm_reenq_multi;
> + queue_set_type(&tm_queue_obj->tm_qentry, QUEUE_TYPE_TM);
> + queue_set_enq_func(&tm_queue_obj->tm_qentry, queue_tm_reenq);
> + queue_set_enq_multi_func(&tm_queue_obj->tm_qentry,
> + queue_tm_reenq_multi);
>
> tm_system->queue_num_tbl[tm_queue_obj->queue_num - 1] = tm_queue_obj;
> odp_ticketlock_lock(&tm_system->tm_system_lock);
> diff --git a/platform/linux-generic/pktio/loop.c
> b/platform/linux-generic/pktio/loop.c
> index 49d8a211..1f2f16c1 100644
> --- a/platform/linux-generic/pktio/loop.c
> +++ b/platform/linux-generic/pktio/loop.c
> @@ -80,11 +80,13 @@ static int loopback_recv(pktio_entry_t *pktio_entry, int
> index ODP_UNUSED,
>
> for (i = 0; i < nbr; i++) {
> uint32_t pkt_len;
> -
> +#ifdef ODP_SCHEDULE_SCALABLE
> + pkt = _odp_packet_from_buffer((odp_buffer_t)(hdr_tbl[i]));
> +#else
> pkt = _odp_packet_from_buffer(odp_hdr_to_buf(hdr_tbl[i]));
> +#endif
> pkt_len = odp_packet_len(pkt);
>
> -
> if (pktio_cls_enabled(pktio_entry)) {
> odp_packet_t new_pkt;
> odp_pool_t new_pool;
> @@ -162,7 +164,11 @@ static int loopback_send(pktio_entry_t *pktio_entry, int
> index ODP_UNUSED,
> len = QUEUE_MULTI_MAX;
>
> for (i = 0; i < len; ++i) {
> +#ifdef ODP_SCHEDULE_SCALABLE
> + hdr_tbl[i] = _odp_packet_to_buf_hdr_ptr(pkt_tbl[i]);
> +#else
> hdr_tbl[i] = buf_hdl_to_hdr(_odp_packet_to_buffer(pkt_tbl[i]));
> +#endif
> bytes += odp_packet_len(pkt_tbl[i]);
> }
>
> diff --git a/test/common_plat/performance/odp_sched_latency.c
> b/test/common_plat/performance/odp_sched_latency.c
> index 2b28cd7b..4a933f5b 100644
> --- a/test/common_plat/performance/odp_sched_latency.c
> +++ b/test/common_plat/performance/odp_sched_latency.c
> @@ -28,7 +28,13 @@
> #define MAX_WORKERS 64 /**< Maximum number of worker threads */
> #define MAX_QUEUES 4096 /**< Maximum number of queues */
> #define EVENT_POOL_SIZE (1024 * 1024) /**< Event pool size */
> +
> +#ifdef ODP_SCHEDULE_SP
> #define TEST_ROUNDS (4 * 1024 * 1024) /**< Test rounds for each
> thread */
> +#else
> +#define TEST_ROUNDS (32 * 1024 * 1024) /**< Test rounds for each
> thread */
> +#endif
> +
> #define MAIN_THREAD 1 /**< Thread ID performing maintenance tasks */
>
> /* Default values for command line arguments */
> @@ -104,6 +110,9 @@ typedef union {
> typedef struct {
> core_stat_t core_stat[MAX_WORKERS]; /**< Core specific stats */
> odp_barrier_t barrier; /**< Barrier for thread synchronization */
> +#ifdef ODP_SCHEDULE_SCALABLE
> + odp_schedule_group_t schedule_group;
> +#endif
> odp_pool_t pool; /**< Pool for allocating test events */
> test_args_t args; /**< Parsed command line arguments */
> odp_queue_t queue[NUM_PRIOS][MAX_QUEUES]; /**< Scheduled queues */
> @@ -119,7 +128,11 @@ static void clear_sched_queues(void)
> odp_event_t ev;
>
> while (1) {
> - ev = odp_schedule(NULL, ODP_SCHED_NO_WAIT);
> + /* Need a non-zero timeout to ensure we can observe
> + * any non-empty queue made eligible for scheduling
> + * by some other thread.
> + */
> + ev = odp_schedule(NULL, 1000);
>
> if (ev == ODP_EVENT_INVALID)
> break;
> @@ -428,6 +441,20 @@ static int run_thread(void *arg ODP_UNUSED)
> return -1;
> }
>
> +#ifdef ODP_SCHEDULE_SCALABLE
> + int err;
> + odp_thrmask_t thrmask;
> +
> + odp_thrmask_zero(&thrmask);
> + odp_thrmask_set(&thrmask, thr);
> +
> + err = odp_schedule_group_join(globals->schedule_group, &thrmask);
> + if (err != 0) {
> + LOG_ERR("odp_schedule_group_join failed\n");
> + return -1;
> + }
> +#endif
> +
> if (thr == MAIN_THREAD) {
> args = &globals->args;
>
> @@ -452,6 +479,13 @@ static int run_thread(void *arg ODP_UNUSED)
> if (test_schedule(thr, globals))
> return -1;
>
> +#ifdef ODP_SCHEDULE_SCALABLE
> + err = odp_schedule_group_leave(globals->schedule_group, &thrmask);
> + if (err != 0) {
> + LOG_ERR("odp_schedule_group_leave failed\n");
> + return -1;
> + }
> +#endif
> return 0;
> }
>
> @@ -692,6 +726,31 @@ int main(int argc, char *argv[])
> }
> globals->pool = pool;
>
> +#ifdef ODP_SCHEDULE_SCALABLE
> + /*
> + * Create scheduler group
> + */
> + odp_thrmask_t expected_thrmask;
> + int cpu;
> + int thr;
> +
> + odp_thrmask_zero(&expected_thrmask);
> + /* This is a odp_thrmask_from_cpumask() */
> + cpu = odp_cpumask_first(&cpumask);
> + thr = 1;
> + while (0 <= cpu) {
> + odp_thrmask_set(&expected_thrmask, thr++);
> + cpu = odp_cpumask_next(&cpumask, cpu);
> + }
> +
> + globals->schedule_group =
> + odp_schedule_group_create("sg0", &expected_thrmask);
> + if (globals->schedule_group == ODP_SCHED_GROUP_INVALID) {
> + LOG_ERR("odp_schedule_group_create failed\n");
> + return -1;
> + }
> +#endif
> +
> /*
> * Create queues for schedule test
> */
> @@ -713,7 +772,11 @@ int main(int argc, char *argv[])
> param.type = ODP_QUEUE_TYPE_SCHED;
> param.sched.prio = prio;
> param.sched.sync = args.sync_type;
> +#ifdef ODP_SCHEDULE_SCALABLE
> + param.sched.group = globals->schedule_group;
> +#else
> param.sched.group = ODP_SCHED_GROUP_ALL;
> +#endif
>
> for (j = 0; j < args.prio[i].queues; j++) {
> name[9] = '0' + j / 10;
> @@ -758,6 +821,9 @@ int main(int argc, char *argv[])
> }
> }
>
> +#ifdef ODP_SCHEDULE_SCALABLE
> + ret += odp_schedule_group_destroy(globals->schedule_group);
> +#endif
> ret += odp_shm_free(shm);
> ret += odp_pool_destroy(pool);
> ret += odp_term_local();
>