On Mon, May 8, 2017 at 4:55 AM Ola Liljedahl <[email protected]> wrote:

>
>
> On 08/05/2017, 13:20, "Bill Fischofer" <[email protected]> wrote:
>
> >On Sun, May 7, 2017 at 9:56 PM, Honnappa Nagarahalli
> ><[email protected]> wrote:
> >> On 5 May 2017 at 21:46, Bill Fischofer <[email protected]>
> >>wrote:
> >>> v5 still fails to compile on a 32-bit system using clang:
> >>>
> >>>   CC       odp_schedule_scalable.lo
> >>> odp_schedule_scalable.c:1631:13: error: implicit conversion from
> >>>       'unsigned long long' to 'sched_group_mask_t' (aka 'unsigned
> >>>int') changes
> >>>       value from 18446744073709551615 to 4294967295
> >>>       [-Werror,-Wconstant-conversion]
> >>>                 sg_free = ~0ULL;
> >>>                         ~ ^~~~~
> >>> 1 error generated.
> >>> Makefile:1019: recipe for target 'odp_schedule_scalable.lo' failed
> >>>
> >>
> >> Trying to understand what all needs to be done:
> >>
> >> Compilation:
> >> x86
> >> 1) gcc - 32b and 64b
> >> 2) clang - 32b and 64b
> >>
> >> ARM
> >> 1) gcc - 32b and 64b
> >> 2) clang - 32b and 64b
> >>
> >> Running tests:
> >> x86
> >> 1) gcc - 64b
> >>
> >> ARM
> >> 1) gcc - 64b
> >>
> >> Is the understanding correct?
> >
> >ODP is expected to compile and all validations tests pass on both
> >32-bit and 64-bit systems, and when compiled with either gcc or clang
> >on supported platforms. The APIs are compiler / architecture agnostic.
> Yes that¹s the goal. But we can¹t test that exhaustively (even trying to
> enumerate ³32-bit and 64-bit systems² is in practice impossible) so we
> need to find a small set of representative targets and configurations to
> test for. Success on that selected subset should then predict success also
> on non-tested platforms. I think it would be useful if all contributors
> used the same subset of build and test targets, this should minimise
> surprises.
>

X86 and ARM are the main architectures of interest. The issue here is
pretty straightforward. Clans requires strict observance of casting
requirements.


> >
> >>
> >>> On Thu, May 4, 2017 at 11:34 PM, Brian Brooks <[email protected]>
> >>>wrote:
> >>>> Signed-off-by: Brian Brooks <[email protected]>
> >>>> Signed-off-by: Kevin Wang <[email protected]>
> >>>> Signed-off-by: Honnappa Nagarahalli <[email protected]>
> >>>> Signed-off-by: Ola Liljedahl <[email protected]>
> >>>> ---
> >>>>  platfom/linux-generic/Makefile.am                 |   15 +-
> >>>>  .../include/odp/api/plat/schedule_types.h          |    4 +-
> >>>>  .../linux-generic/include/odp_config_internal.h    |   17 +-
> >>>>  .../linux-generic/include/odp_queue_internal.h     |   70 +-
> >>>>  platform/linux-generic/include/odp_schedule_if.h   |   23 +-
> >>>  .../linux-generic/include/odp_schedule_scalable.h  |  137 ++
> >>>>  .../include/odp_schedule_scalable_config.h         |   42 +
> >>>>  .../include/odp_schedule_scalable_ordered.h        |  131 ++
> >>>>  platform/linux-generic/m4/odp_schedule.m4          |   55 +-
> >>>>  platform/linux-generic/odp_queue_scalable.c        |  922 ++++++++++
> >>>>  platform/linux-generic/odp_schedule_if.c           |   36 +-
> >>>>  platform/linux-generic/odp_schedule_scalable.c     | 1943
> >>>>++++++++++++++++++++
> >>>>  .../linux-generic/odp_schedule_scalable_ordered.c  |  280 +++
> >>>>  13 files changed, 3628 insertions(+), 47 deletions(-)
> >>>>  create mode 100644
> >>>>platform/linux-generic/include/odp_schedule_scalable.h
> >>>>  create mode 100644
> >>>>platform/linux-generic/include/odp_schedule_scalable_config.h
> >>>>  create mode 100644
> >>>>platform/linux-generic/include/odp_schedule_scalable_ordered.h
> >>>>  create mode 100644 platform/linux-generic/odp_queuescalable.c
> >>>>  create mode 100644 platform/linux-generic/odp_schedule_scalable.c
> >>>>  create mode 100644
> >>>>platform/linux-generic/odp_schedule_scalable_ordered.c
> >>>>
> >>>> diff --git a/platform/linux-generic/Makefile.am
> >>>>b/platform/linux-generic/Makefile.am
> >>>> index 7ad63d59..5290993a 100644
> >>>> --- a/platform/linux-generic/Makefile.am
> >>>> +++ b/platform/linux-generic/Makefile.am
> >>>> @@ -173,6 +173,9 @@ noinst_HEADERS = \
> >>>>                   ${srcdir}/include/odp_queue_internal.h \
> >>>>                   ${srcdir}/include/odp_ring_internal.h \
> >>>>                   ${srcdir}/include/odp_schedule_if.h \
> >>>> +                 ${srcdir}/include/odp_schedule_scalable.h \
> >>>> +                 ${srcdir}/include/odp_schedule_scalable_config.h \
> >>>> +                 ${srcdir}/include/dp_schedule_scalable_ordered.h \
> >>>>                   ${srcdir}/include/odp_sorted_list_internal.h \
> >>>>                   ${srcdir}/include/odp_shm_internal.h \
> >>>>                   ${srcdir}/include/odp_time_internal.h \
> >>>> @@ -227,13 +230,9 @@ __LIB__libodp_linux_la_SOURCES = \
> >>>>                           pktio/ring.c \
> >>>>                            odp_pkt_queue.c \
> >>>>                            odp_pool.c \
> >>>> -                          odp_queue.c \
> >>>>                            odp_rwlock.c \
> >>>>                            odp_rwlock_recursive.c \
> >>>> -                          odp_schedule.c \
> >>>>                            odp_schedule_if.c \
> >>>> -                          odp_schedule_sp.c \
> >>>> -                          odp_schedule_iquery.c \
> >>>>                            odp_shared_memory.c \
> >>>>                            odp_sorted_list.c \
> >>>>                            odp_spinlock.c \
> >>>> @@ -262,6 +261,14 @@ if ARCH_IS_X86
> >>>>  __LIB__libodp_linux_la_SOURCES += arch/@ARCH_DIR@/cpu_flags.c
> >>>>  endif
> >>>>
> >>>> +if ODP_SCHEDULE_SCALABLE
> >>>> +__LIB__libodp_linux_la_SOURCES += odp_queue_scalable.c
> >>>>odp_schedule_scalable.c \
> >>>> +                                 odp_schedule_scalable_ordered.c
> >>>> +else
> >>>> +__LIB__libodp_linux_la_SOURCES += odp_queue.c odp_schedule.c
> >>>>odp_schedule_sp.c \
> >>>> +                                 odp_schedule_iquer.c
> >>>> +endif
> >>>> +
> >>>>  if HAVE_PCAP
> >>>>  __LIB__libodp_linux_la_SOURCES += pktio/pcap.c
> >>>>  endif
> >>>> diff --git
> >>>>a/platform/linux-generic/include/odp/api/plat/schedule_types.h
> >>>>b/platform/linux-generic/include/odp/api/plat/schedule_types.h
> >>>> index 535fd6d0..4e75f9ee 100644
> >>>> --- a/platform/linux-generic/include/odp/api/plat/schedule_types.h
> >>>> +++ b/platform/linux-generic/include/odp/api/plat/schedule_types.h
> >>>> @@ -18,6 +18,8 @@
> >>>>  extern "C" {
> >>>>  #endif
> >>>>
> >>>> +#include <odp/api/std_types.h>
> >>>> +
> >>>>  /** @addtogroup odp_scheduler
> >>>>   *  @{
> >>>>   */
> >>>> @@ -44,7 +46,7 @@ typedef int odp_schedule_sync_t;
> >>>>  typedef int odp_schedule_group_t;
> >>>>
> >>>>  /* These must be kept in sync with thread_globals_t in odp_thread.c
> >>>>*/
> >>>> -#define ODP_SCHED_GROUP_INVALID -1
> >>>> +#define ODP_SCHED_GROUP_INVALID ((odp_schedule_group_t)-1)
> >>>>  #define ODP_SCHED_GROUP_ALL     0
> >>>>  #define ODP_SCHED_GROUP_WORKER  1
> >>>>  #define ODP_SCHED_GROUP_CONTROL 2
> >>>> diff --git a/platform/linux-generic/include/odp_config_internal.h
> >>>>b/platform/linux-generic/include/odp_confg_internal.h
> >>>> index dadd59e7..6cc844f3 100644
> >>>> --- a/platform/linux-generic/includ/odp_config_internal.h
> >>>> +++ b/platform/linux-generic/include/odp_config_internal.h
> >>>> @@ -7,9 +7,7 @@
> >>>>  #ifndef ODP_CONFIG_INTERNAL_H_
> >>>>  #define ODP_CONFIG_INTERNAL_H_
> >>>>
> >>>> -#ifdef __cplusplus
> >>>> -extern "C {
> >>>> -#endif
> >>>> +#include <odp_schedule_scalable_config.h>
> >>>>
> >>>>  /*
> >>>>   * Maximum number of pools
> >>>> @@ -22,6 +20,13 @@ extern "C" {
> >>>>  #define ODP_CONFIG_QUEUES 1024
> >>>>
> >>>>  /*
> >>>> + * Maximum queue depth. Maximum number of elements that can be
> >>>>stored in a
> >>>> + * queue. This value is used only when the size is not explicitly
> >>>>provied
> >>>> + * during queue creation.
> >>>> + */
> >>>> +#define CONFIG_QUEUE_SIZE 4096
> >>>> +
> >>>> +/*
> >>>>   * Maximum number of ordered locks per queue
> >>>>   */
> >>>>  #define CONFIG_QUEUE_MAX_ORD_LOCKS 4
> >>>> @@ -120,7 +125,7 @@ extern "C" {
> >>>>   *
> >>>>   * This the the number of separate SHM areas that can be reserved
> >>>>concurrently
> >>>>  */
> >>>> -#define ODPDRV_CONFIG_SHM_BLOCKS 48
> >>>> +#define ODPDRV_CONFIG_SHM_BLOCKS ODP_CONFIG_SHM_BLOCKS
> >>>>
> >>>>  /* Maximum event burst size
> >>>>   *
> >>>> @@ -139,8 +144,4 @@ extern C" {
> >>>>   */
> >>>>  #define CONFIGcplusplus
> >>>> -}
> >>>> -#endif
> >>>> -
> >>>>  #endif
> >>>> diff --git a/platform/linux-generic/include/odp_queue_internal.h
> >>>>b/platform/linux-generi/include/odp_queue_internal.h
> >>>> index 977546eb..0c95d249 100644
> >>>> --- a/platform/linux-generic/include/odp_queue_internal.h
> >>>> +++ b/platform/linux-generic/include/odp_queue_internal.h
> >>>> @@ -19,16 +19,24 @@ extern "C" {
> >>>>  #endif
> >>>>
> >>>>  #include <odp/api/queue.h>
> >>>> -#include <odp_forward_typedefs_internal.h>
> >>>> -#include <odp_schedule_if.h>
> >>>> -#include <odp_buffer_internal.h>
> >>>> -#include <odp_align_internal.h>
> >>>> +#include <odp/api/std_types.h>
> >>>> +#include <odp/api/buffer.h>
> >>>>  #include <odp/api/packet_io.h>
> >>>>  #include <odp/api/align.h>
> >>>>  #include <odp/api/hints.h>
> >>>>  #include <odp/api/ticketlock.h>
> >>>> +
> >>>>  #include <odp_config_internal.h>
> >>>>
> >>>> +#include <odp_align_internal.h>
> >>>> +#include <odp_buffer_internal.h>
> >>>> +#include <odp_forward_typedefs_internal.h>
> >>>> +#include <odp_schedule_if.h>
> >>>> +#ifdef ODP_SCHEDULE_SCALABLE
> >>>> +#include <odp_schedule_scalable.h>
> >>>> +#include <odp_schedule_scalable_ordered.h>
> >>>> +#endif
> >>>> +
> >>>>  #define QUEUE_MULTI_MAX CONFIG_BURST_SIZE
> >>>>
> >>>>  #define QUEUE_STATUS_FREE         0
> >>>> @@ -37,9 +45,6 @@ extern "C" {
> >>>>  #define QUEUE_STATUS_NOTSCHED     3
> >>>>  #define QUEUE_STATUS_SCHED        4
> >>>>
> >>>> -#define BUFFER_HDR_INVALID NULL
> >>>> -
> >>>> -/* forward declaration */
> >>>>  unin queue_entry_u;
> >>>>
> >>>>  typedef int (*enq_func_t)(union queue_entry_u *, odp_buffer_hdr_t *);
> >>>> @@ -50,6 +55,8 @@ typedef int (*enq_multi_func_t)(union
> >>>>queue_entry_u *,
> >>>>  typedef        int (*deq_multi_func_t)(union queue_entry_u *,
> >>>>                                 odp_bufferhdr_t **, int);
> >>>>
> >>>> +#ifdef ODP_SCHEDULE_SCALABLE
> >>>> +#define BUFFER_HDR_INVALID ((odp_buffer_hdr_t
> *)(void>>>>*)ODP_EVENT_INVALID)
> >>>> +
> >>>> +struct queue_entry_s {
> >>>> +       sched_elem_t     sched_elem;
> >>>> +
> >>>> +       odp_ticketlock_t lock ODP_ALIGNED_CACHE;
> >>>> +       int              status;
> >>>> +
> >>>> +       enq_func_t       enqueue ODP_ALIGNED_CACHE;
> >>>> +       deq_func_t       dequeue;
> >>>> +       enq_multi_func_t enqueue_multi;
> >>>> +       deq_multi_funct dequeue_multi;
> >>>> +
> >>>> +       uint32_t           index;
> >>>> +       odp_queue_t        handle;
> >>>> +       odp_queue_type_t   type;
> >>>> +       odp_queue_param_t  param;
> >>>> +       odp_pktin_queue_t  pktin;
> >>>> +       odp_pktout_queue_t pktout;
> >>>> +       char               name[ODP_QUEUE_NAME_LEN];
> >>>> +};
> >>>> +
> >>>> +int _odp_queue_deq(sced_elem_t *q, odp_event_t *evp, int num);
> >>>> +int _odp_queue_deq_sc(shed_elem_t *q, odp_event_t *evp, int num;
> >>>> +
> >>>> +/* Round up memory size to next cache line size to
> >>>> + * align all memory addresses on cache line boundary.
> >>>> + */
> >>>> +static inline void *shm_pool_alloc_align(_odp_ishm_pool_t *pool,
> >>>>uint32_t size)
> >>>> +{
> >>>> +       void *addr;
> >>>> +
> >>>> +       addr = _odp_ishm_pool_alloc(pool, ROUNDUP_CACHE_LINE(size));
> >>>> +       ODP_ASSERT(((uintptr_t)addr & (ODP_CACHE_LINE_SIZE - 1)) ==
> >>>>0);
> >>>> +
> >>>> +       return addr;
> >>>> +}
> >>>> +
> >>>> +#else>>>> +#define BUFFER_HDR_INVALID NULL
> >>>> +
> >>>>  struct queue_entry_s {
> >>>>         odp_ticketlock_t  lock ODP_ALIGNED_CACHE;
> >>>>
> >>>> @@ -78,6 +125,8 @@ struct queue_entry_s {
> >>>>         char              name[ODP_QUEUE_NAME_LEN];
> >>>>  };
> >>>>
> >>>> +#endif
> >>>> +
> >>>>  union queue_entry_u {
> >>>>         struct queue_entry_s s;
> >>>>         uint8_t pad[ROUNDUP_CACHE_LINE(sizeof(struct queue_entry_s))];
> >>>> @@ -172,6 +221,13 @@ static inline void queue_set_type(queue_entry_t
> >>>>*queue, odp_queue_type_t type)
> >>>>         queue->s.type = type;
> >>>>  }
> >>>>
> >>>> +#ifdef ODP_SCHEDULE_SCALABLE
> >>>> +static inline reorder_window_t *queue_get_rwin(queue_entry_t *queue)
> >>>> +{
> >>>> +      return queue->s.sched_elem.rwin;
> >>>> +}
> >>>> +#endif
> >>>> +
> >>>>  #ifdef __cpluspus
> >>>>  }
> >>>>  #endif
> >>>> diff --git a/platform/linux-generic/include/odp_schedule_if.h
> >>>>b/platform/linux-generic/include/odp_schedule_if.h
> >>>> index 530d157f..5f070ec4 100644
> >>>> --- a/platform/linux-generic/include/odp_schedule_if.h
> >>>> +++ b/platform/linux-generic/include/odp_schedule_if.
> >>>> @@ -4,6 +4,12 @@
> >>>>   * SPDX-License-Identifier:     BSD-3-Clause
> >>>>   */
> >>>>
> >>>> +/* Copyright (c) 2017, ARM Limited
> >>>> + * All rights reserved.
> >>>> + *
> >>>> + * SPDX-License-Identifier:     BSD-3-Clause
> >>>> + */
> >>>> +
> >>>>  #ifndef ODP_SCHEDULE_IF_H_
> >>>>  #define ODP_SCHEDULE_IF_H_
> >>>>
> >>>> @@ -12,17 +18,0 @@ extern "C" {
> >>>>  #endif
> >>>>
> >>>>  #include <odp/api/queue.h>
> >>>> -#include <odp_queue_internal.h>
> >>>>  #include <odp/api/schedule.h>
> >>>>
> >>>> +#include <odp_forward_typedefs_internal.h>
> >>>> +
> >>>> +/* Number of ordered locks per queue */
> >>>> +#define SCHEDULE_ORDERED_LOCKS_PER_QUEUE 2
> >>>> +
> >>>>  typedef void (*schedule_pktio_start_fn_t)(int pktio_index, int
> >>>>num_in_queue,
> >>>>                                           int in_queue_idx[]);
> >>>>  typedef int (*schedule_thr_add_fn_t)(odp_schedule_group_t group, int
> >>>>thr);
> >>>>  typedef int (*schedule_thr_rem_fn_t)(odp_schedule_group_t group, int
> >>>>thr);
> >>>>  typedef int (*schedule_num_grps_fn_t)(void);
> >>>> -typedef int (*schedule_init_queue_fn_t)(uint32_t queue_index,
> >>>> -                                       const odpschedule_param_t
> >>>>*sched_param
> >>>> -                                      );
> >>>> +typedef int (*schedule_init_queue_fn_t)(
> >>> +       uint32_t queue_index, const odp_schedule_param_t
> >>>>*sched_param);
> >>>>  typedef void (*schedule_destroy_queue_fn_t)(uint32_t queue_index)
> >>>>  typedef int (*schedule_sched_queue_fn_t)(uint32_t queue_index);
> >>>>  typedef int (*schedule_unsched_queue_fn_t)(uint32_t queue_indx);
> >>>> @@ -64,6 +73,11 @@ extern const schedule_fn_t *sched_fn;
> >>>>  int sched_cb_pktin_poll(int pktio_index, int num_queue, int index[]);
> >>>>  void sched_cb_pktio_stop_inalize(int pktio_index);
> >>>>  int sched_cb_num_pktio(void);
> >>>> +#ifndf ODP_SCHEDULE_SCALABLE
> >>>> +/*
> >>>> + * These functions are either dead code or only used by default
> >>>>scheduler.
> >>>> + * Remove them and/or move them to default scheduler specific file.
> >>>> + */
> >>>>  int sched_cb_num_queues(void);
> >>>>  int sched_cb_queue_prio(uint32_t queueindex);
> >>>>  int sched_cb_queue_grp(uint32_t queue_index);
> >>>> @@ -73,6 +87,7 @@ odp_queue_t sched_cb_queue_handle(uint32_t
> >>>>queue_index;
> >>>>  void sched_cb_queue_destroy_finalize(uint32_t queue_index);
> >>>>  int sched_cb_queue_deq_multi(uint32_t queue_index, odp_event_t ev[],
> >>>>int num);
> >>>>  int sched_cb_queue_empty(uint32_t queue_index);
> >>>> +#endif
> >>>>
> >>>>  /* API functions */
> >>>  typedef struct {
> >>>> diff --git a/platform/linux-generic/include/odp_schedule_scalable.h
> >>>>b/platform/linux-generic/include/odp_schedule_scalable.h
> >>>> new file mode 10044
> >>>> index 00000000..4afb0878
> >>>> --- /dev/null
> >>>> +++ b/platform/linux-generi/include/odp_schedule_scalable.h
> >>>> @@ -0,0 +1,137 @
> >>>> +/* Copyright (c) 2017, ARM Limited
> >>>> + * All rights reserved.
> >>>> + *>>>> + * SPDX-License-Identifier:     BSD-3-Clause
> >>>> + */
> >>>> +
> >>>> +#ifndef ODP_SCHEDUE_SCALABLE_H
> >>>> +#define ODP_SCHEDULE_SCALABLE_H
> >>>> +
> >>>> +#include <odp/ap/align.h>
> >>>> +#include <odp/api/shedule.h>
> >>>> +#include <odp/api/ticketlock.h>
> >>>> +
> >>>> +#include <odp_schedule_scalable_config.h>
> >>>> +#include <odp_schedule_scalable_ordered.h>
> >>>> +#include <odp_llqueue.h>
> >>>> +
> >>>> +/*
> >>>> + * ODP_SCED_PRIO_HIGHEST/NORMAL/LOWEST/DEFAULT are compile time
> >>>> + * constants, but not ODP_SCHED_PRIO_NUM. The current API for this
> >>>>  * is odp_schedule_num_prio(). The other schedulers also define
> >>>> + * this internally as NUM_PRIO.
> >>>> + */
> >>>> +#define ODP_SCHED_PRIO_NUM  8
> >>>> +
> >>>> +typedf struct {
> >>>> +       union {
> >>>> +               sruct {
> >>>> +                       struct llqueue llq;
> >>>> +                       uint32_t prio;
> >>>> +               };
> >>>> +               char line[ODP_CACHE_LINE_SIZE];
> >>>> +       };
> >>>> +} sched_queue_t ODP_ALIGNED_CACHE;
> >>>> +
> >>>> +#define TICKET_INVALIDct {
> >>>> +       int32_t numevts;
> >>>> + nt8_t cur_ticket;
> >>>> +       uint8_t nxt_ticket;
> >>>> +} qschedstate_t ODP_ALIGNED(sizeof(uint64_t));
> >>>> +
> >>>> +typedef uint32_t ringidx_t;
> >>>> +
> >>>> +#ifdef CONFIG_SPLIT_PRODCONS
> >>>> +#define SPLIT_PC ODP_ALIGNED_CACHE
> >>>> +#else
> >>>> +#define SPLIT_PC
> >>>> +#endif
> >>>> +
> >>>> +#define ODP_NO_SCHED_QUEUE (ODPSCHED_SYNC_ORDERED + 1)
> >>>> +
> >>>> +typedef struct {
> >>>> +       struct llnode nde;  /* must be first */
> >>>> +       sched_queue_t *schedq;
> >>>> +#ifdef CONFIG_QSHST_LOCK
> >>>> +       odp_ticketlock_t qschl>>>> +       qschedstate_t qschst;
> >>>> +       uint16_t pop_deficit;
> >>>> +       uint16_t qschst_type;
> >>>>+       ringidx_t prod_read SPLIT_PC;
> >>>> +       ringidx_t prod_write;
> >>>> +       rinidx_t prod_mask;
> >>>> +       odp_buffer_hdr_t **prod_ring;
> >>>> +       ringidx_t cons_write SPLIT_PC;
> >>>> +       ringidx_t cons_read;
> >>>> +       reorder_window_t *rwin;
> >>>> +       void *user_ctx;
> >>>> +#ifdef CONFIG_SPLIT_PRODCONS>>>> +       odp_buffer_hdr_t
> **cons_ring;
> >>>> +       ringidx_t cons_mask;
> >>>> +      uint16_t cons_type;
> >>>> +#else
> >>>>+#define cons_mask prod_mask
> >>>> +#define cons_ring prod_ring
> >>>> +#define cons_type qschst_type
> >>>> +#endif
> >>>> +} sched_elem_t ODP_ALIGNED_CACHE;
> >>>> +
> >>>> +/* Number of scheduling groups */
> >>>> +#define MAX_SCHED_GROUP (sizeof(sched_group_mask_t) * CHAR_BIT
> >>>> +
> >>>> +typedef bitset_t sched_group_mask_t;
> >>>> +
> >>>> +typedef struct {
> >>>> +       /* Threads currently associated with the sched group */
> >>>> +       bitset_t thr_actual[ODP_SCHED_PRIO_NUM] ODP_ALIGNED_CACHE;
> >>>> +       bitset_t thr_wanted;
> >>>> +       /* Used to spread queues over schedq's */
> >>>> +       uint32_t xcount[ODP_SCHED_PRIO_NUM];
> >>>> +       /* Number of schedq's per prio */
> >>>> +       uint32_t xfactor;
> >>>> +       char name[ODP_SCHED_GROUP_NAME_LEN];
> >>>> +       /* ODP_SCHED_PRIO_NUM * xfactor. Must be last. */
> >>>> +       sched_queue_tschedq[1] ODP_ALIGNED_CACHE;
> >>>> +} sched_group_t;
> >>>> +
> >>>> +/* Number of reorder contexts per thread */
> >>>> +#define TS_RVEC_SIZE 16
> >>>> +
> >>>> +typedef struct {
> >>>> +       /* Atomic queue currently being processed or NULL */
> >>>> +       sched_elem_t *atomq;
> >>>> +       /* Current reorder context or NULL */
> >>>> +       reorder_context_t *rctx;
> >>>> +       uint8_t pause;
> >>>> +       uint8_t out_of_order;
> >>>> +       uint8_t tidx;
> >>>> +       uint8_t pad;
> >>>> +       uint32_t dequeued; /* Number of events deueued from atomic
> >>>>queue */
> >>>> +       uint16_t pktin_next; /* Next pktin tag t poll */
> >>>> +       uint16_t pktin_poll_cnts;
> >>>> +       uint16_t ticket; /* Ticket for atomic queue or TICKET_INVALID
> >>>>*/>>>> +       uint16_t num_schedq;
> >>>> +       uint16_t sg_sem; /* Set when sg_wanted is modified by other
> >>>>thread */
> >>>> +#define SCHEDQ_PER_THREAD (MAX_SCHED_GROUP * ODP_SCHED_PRIO_NUM)
> >>>> +       sched_queue_t *schedq_ist[SCHEDQ_PER_THREAD];
> >>>> +       /* Current sched_group membership */
> >>>> +       sched_group_mask_t sg_actual[ODP_SCHED_PRIO_NUM];
> >>>> +       /* Future sched_group membership. */
> >>>> +       sched_group_mask_t sg_wanted[ODP_SCHED_PRIO_NUM];
> >>>> +       bitset_t priv_rvec_free;
> >>>> +       /* Bitset of free entries in rvec[] */
> >>>> +       bitset_t rvec_free ODP_ALIGNED_CACHE;
> >>>> +      /* Reordering contexts to allocate from */
> >>>> +       reorder_context_t rvec[TS_RVEC_SIZE] ODP_ALIGNED_CACHE;
> >>>> +} sched_scalble_thread_state_t ODP_ALIGNED_CACHE;
> >>>> +
> >>>> +void sched_update_enq(sched_elem_t *q, uint32_t actual);
> >>>> +void sched_update_enq_sp(sched_elem_t *q, uint32_t actual);
> >>>> +sched_queue_t *schedq_from_sched_group(odp_schedule_group_t grp,
> >>>>uint32_t prio);
> >>>> +void sched_group_xcount_dec(odp_schedule_group_t grp, uint32_t prio);
> >>>> +
> >>>> +#endif  /* ODP_SCHEDULE_SCALABLE_H */
> >>>> diff --git
> >>>>a/platform/linux-generic/include/odp_schedule_scalable_config.h
> edule_scalable_config.h
> >>>> new file mode-- /dev/null
> >>>> +++ b/platform/linux-generic/include/odp_schedule_scalable_config.h
> >>>> @@ -0,0 +1,42 @@
> >>>> +/* Copyright (c) 2017, ARM Limited
> >>>> + * All rights reserved.
> >>>> + *
> >>>> + * SPDX-License-Identifier:     BSD-3-Clause
> >>>> + */
> >>>> +
> >>>> +#ifndef ODP_SCHEDULE_SCALABLE_CONFIG_H_
> >>>> +#define ODP_SCHEDULE_SCALABLE_CONFIG_H_
> >>>> +
> >>>> +/*
> >>>> + * Default weight (in events) for WRR in scalable scheduler
> >>>> + *
> >>>> + * This controls the per-queue weight forWRR between queues of the
> >>>>same
> >>>> + * priority in the scalable scheduler
> >>>> + * A higher value improves throughput while a lower value increases
> >>>>fairness
> >>>> + * and thus likely decreases latency
> >>>> + *
> >>>> + * If WRR is undesired, set the value to ~0 which will use the
> >>>>largest possible
> >>>> + * weight
> >>>> + *
> >>>> + * Note: anAPI for specifying this on a per-queue basis would be
> >>>>useful but is
> >>>> + * not yet available
> >>>> + */
> >>>> +#define CONFIG_WRR_WEIGHT 64
> >>>> +
> >>>> +/*
> >>>> + * Split queue producer/consumer metadata into separate cache lines.
> >>>> + * This is beneficial on e.g. Cortex-A57 but not so much on A53.
> >>>> + */
> >>>> +#define CONFIG_SPLIT_PRODCONS
> >>>> +
> >>>> +/*
> >>>> + * Use locks to protect queue (ring buffer) and scheduler state
> >>>>updates
> >>>> + * On x86, this decreases overhead noticably.
> >>>> + */
> >>>> +#ifndef __ARM_ARCH
> >>>> +#define CONFIG_QSCHST_LOCK
> >>>> +/* Keep all ring buffer/qschst data together when using locks */
> >>>> +#undef CONFIG_SPLIT_PRODCONS
> >>>> +#endif
> >>>> +
> >>>> +#endif  /* ODP_SCHEDULE_SCALABLE_CONFIG_H_ */
> >>> diff --git
> >>>>a/platform/linux-generic/include/odp_schedule_scalable_ordered.h
> >>>>b/platform/linux-generic/include/odp_schedule_scalable_ordered.h
> >>>> new file mode 100644
> >>>> index 00000000..968c2490
> >>>> --- /dev/null
> >>>> +++ b/platform/linux-generic/include/odp_schedule_scalable_ordered.h
> >>>> @@ -0,0 +1,131 @@
> >>>> +/* Copyright (c) 2017, ARM Limited
> >>>> + *All rights reserved.
> >>>> + *
> >>>> + * SPDX-License-Identifier:     BSD-3-Clause
> >>>> + */
> >>>> +
> >>>> +#ifndef ODP_SCHEDULE_SCALABL_ORDERED_H
> >>>> +#define ODP_SCHEDULE_SCALABLE_ORDERED_H
> >>>> +
> >>>> +#include <odp/api/shared_memory.h>
> >>>> +
> >>>> +#incude <odp_internal.h>
> >>>> +#include <odp_align_internal.h>
> >>>> +#include <odp_bitset.h>
> >>> +#include <_ishmpool_internal.h>
> >>>> +
> >>>> +/* High level functioning of reordering
> >>>> + * Datastructures -
> >>>> + * Reorder Window - Every ordered queue is associated with a reorder
> >>>>window.
> >>>> + *                  Reorder window stores reorder contexts from
> >>>>threads that
> >>>> + *                  have completed processing out-of-order.
> >>>> + * Reorder Context - Reorder context consists of events hat a thread
> >>>> + *                   wants to enqueue while processing a batch of
> >>>>events
> >>>> + *                   from an ordered queue.
> >>>> + *
> >>>> + * Algorithm -
> >>>> + * 1) Thread identifies the ordered queue.
> >>>> + * 2) It 'reserves a slot in the reorder window and dequeues the
> >>>> + *    events' atomically. Atomicity is achieve by using a
> >>>>ticket-lock
> >>>> + *    like design where the reorder window slot is the ticket.
> >>>> + * 3a) Upon order-release/next schedule call, the thread
> >>>> +*     checks if it's slot ticket) equals the head of the reorder
> >>>>window.
> >>>> + *     If yes, enqueues the events to the destination queue till
> >>>> + *         i) the reorder window is empty r
> >>>> + *         ii) there is a gap in the reorder window
> >>>> + *     If no, the reorder context is stored in te reorder window at
> >>>> + *     the reserved slot.
> >>>> + * 3b) Upon the first enqueue, the thread checks if it's slot
> >>>>(ticket)
> >>>> + *     equals the head of the reorder window.
> >>>> +*     If yes, enqueues the events immediately to the destination
> >>>>queue
> >>>> + *     If no, these (and subsequent) events are stored in the
> >>>>reorder context
> >>>> + *     (in the application given order)
> >>>> + */
> >>>> +
> >>>> +/* Head and change indicator variables are used to synchronise
> >>>>between
> >>>> + * concurrent insert operations in the reorder window. A thread
> >>>>performing
> >>>> + * an in-order insertion must be notified about the newly inserted
> >>>> + * reorder contexts so that it doesn¹t halt the retire process too
> >>>>early.
> >>>> + * A thread performing an out-of-order insertion must correspondingly
> >>>> + * notify the thread doing inorder insertion of te new waiting
> >>>>reorder
> >>>> + * context, which may need to be handled by that thread.
> >>>> + *
> >>>> + * Also, an out-of-order insertion may become an in-order insertion
> >>>>if the
> >>>> + * thread doing an in-order insertion completes before this thread
> >>>>completes.
> >>>> + * We need a point of synchronisation where this knowledge and
> >>>>potential state
> >>>> + * change can be transferred between threads.
> >>>> + */
> >>>> +typedef struct hc {
> >>>> +       /* First missing context */
> >>>> +       uint32_t head;
> >>>> +       /* Chang indicator */
> >>>> +       uint32_t chgi;
> >>>> +} hc_t ODP_ALIGNED(sizef(uint64_t));
> >>>> +
> >>>> +/* Number of reorder contects in the reorder window.
> >>>> + * Should be at least one per CPU.
> >>>> + */
> >>>> +#define RWIN_SIZE 32
> >>> +ODP_STATIC_ASSERT(CHECK_IS_POWER2(RWIN_SIZE), "RWIN_SIZE is not a
> >>>>power of 2");
> >>>> +
> >>>> +#define NUM_OLOCKS 2
> >>>> +
> >>>> +typedef struct reorder_context reorder_context_t;
> >>>> +
> >>>> +typedef struct reorder_window {
> >>>> +       /* head and change indicator */
> >>>> +      hc_t hc;
> >>>> +       uint32>>>> +       uint32_t turn;
> >>>> +       uuint16_t lock_count;
> >>>> +       /* Reorder contexts in this window */
> >>>> +       reorder_contet_t *ring[RWIN_SIZE];
> >>>> +} reorder_window_t;
> >>>> +
> >>>> +/* Number of events that can be stored in a reorder context.
> >>>> +* This size is chosen so that there is no space left unused at the
> >>>>end
> >>>> + * of the last cache line (for 64b architectures and 64b handles).
> >>>> + */
> >>>> +#define RC_EVT_SIZE 18
> >>>> +
> >>>> +struct reorder_context {
> >>>> +       /* Reorder window to which this context belongs */
> >>>> +       reorder_window_t *rwin;
> >>>> +       /* Pointerto TS->rvec_free */
> >>>> +       bitset_t *rvec_free;
> >>>> +       /* Our slot number in the reorder window */
> >>>> +       uint32_t sn;
> >>>> +       uint8_t olock_flags;
> >>>> +       /* Our inex in thread_state rvec array */
> >>>> +       uint8_t idx;
> >>>> +       /* Use to link reorder contexts together */
> >>>> +       uint8_t next_idx;
> >>>> +       /* Currentreorder context to save events in */
> >>>> +       uint8_t cur_idx;
> >>>> +       /* Number of events stored in thisreorder context */
> >>> +       uint8_t numevts;
> >>>> +       /* Events stored in this context */
> >>>> +       odp_buffer_hdr_t *events[RC_EVT_SIZE];
> >>>> +       queue_entry_t *destq[RC_EVT_SIZE];
> >>>> +} OP_ALIGNED_CACHE;
> >>>> +
> >>>> +reorder_window_t *rwin_alloc(int rwin_id, _odp_ishm_pool_t *pool,
> >>>> +                            unsigned lock_count);
> >>>> +int rwin_free(_odp_ishm_pool_t *pool, reorder_window_t *rwin);
> >>>> +bool rwin_reserve(reorder_window_t *rwin, uint32_t *sn);
> >>>> +void rwin_insert(reorder_window_t *rwin,
> >>>> +                reorder_context_t *rctx,
> >>>> +                uint32_t sn,
> >>>> +                void (*callback)(reorder_context_t *));
> >>>> +void rctx_init(reorder_context_t *rctx, uint16_t idx,
> >>>> +              reorder_window_t *rwin, uint32_t sn);
> >>>> +void rctx_free(const reorder_context_t *rctx);
> >>>> +void olock_unlock(onst reorder_context_t *rctx, reorder_window_t
> >>>>rwin,
> >>>> +                 uint32_t lock_index);
> >>>> +void olock_release(const reorder_context_t *rctx);
> >>>> +void rctx_retire(reorder_context_t *first);
> >>>> +void rctx_release(reorder_context_t *rctx);
> >>>> +
> >>>> +#endif  /* ODP_SCHEDLE_SCALABLE_ORDERED_H */
> >>>> diff --git a/platform/linux-generic/m4/odp_schedule.m4
> >>>>b/platform/linux-generic/m4/odp_schedule.m4
> >>>> index 91c19f21..d862b8b2 100644
> >>>> --- a/platform/linux-generic/m4/odp_schedule.m4
> >>>> +++ b/platform/linux-generic/m4/odp_scheule.m4
> >>>> @@ -1,13 +1,44 @@
> >>>> -AC_ARG_ENABLE([schedule-sp],
> >>>> -    [  --enable-schedule-sp    enable strict priority scheduler],
> >>>> -    [if test x$enbleval = xyes; then
> >>>> -       schedule_sp_enabled=yes
> >>>> -       ODP_CFLAGS="$ODP_CFLAGS -DODP_SCHEDULE_SP"
> >>>> -    fi])
> >>>> +# Checks for --enable-schedule-sp and defines ODP_SCHEDULE_SP and
> >>>>adds
> >>>> +# -DODP_SCHEDULE_SP to CFLAGS.
> >>>> +AC_ARG_ENABLE(
> >>>> +    [schedule_sp],
> >>>> +    [AC_HELP_STRING([--enable-schedule-sp],
> >>>> +                    [enable strict priority schedler])],
> >>>> +    [if test "x$enableval" = xyes; then
> >>>> +         schedule_sp=true
> >>>> +         ODP_CFLAGS="$ODP_CFLAGS -DODP_SCHEDULE_SP"
> >>>> +     else
> >>>> +         schedule_sp=false
> >>>> +     fi],
> >>>> +    [schedule_sp=false])
> >>>> +AM_CONDITIONAL([ODP_SCHEDULE_SP], [test x$schedule_sp = xtrue])
> >>>>
> >>>> -AC_ARG_ENABLE([schedule-iquery],
> >>>> -    [  --enable-schedule-iquery    enable interest query (sparse
> >>>>bitmap) scheduler],
> >>>> -    [if test x$enableval = xyes; then
> >>>> -       schedule_iquery_enabled=yes
> >>>> -       ODP_CFLAGS="$ODP_CFLAGS -DODP_SCHEDULE_IQUERY"
> >>> -    fi])
> >>>> +# Checks for --enable-schedule-iquery and defines
> >>>>ODP_SCHEDULE_IQUERY and add
> >>>> +# -DOD_IQUERY to CFLAGS.
> >>>> +AC_ARG_ENABLE(
> >>>> +    [schedule_iquery],
> >>>> +    [AC_HELP_STRIG([--enable-schedule-iquery],
> >>>> +                    [enable interests query (sparse bitmap)
> >>>>scheduler])],
> >>>> +    [if test "x$enableval" = xyes; then
> >>>> +         schedule_iquery=true
> >>>> +         ODP_CFLAGS="$ODP_CFLAGS -DODP_SCHEDULE_IQUERY"
> >>>> +     else
> >>>> +         schedule_iquery=false
> >>>> +     fi],
> >>>> +    [schedule_iquery=false])
> >>>> +AM_CONDITIONA([ODP_SCHEDULE_IQUERY], [test x$schedule_iquery =
> >>>>xtrue])
> >>>> +
> >>>> +# Checks for --enable-schedule-scalable and defines
> >>>>DP_SCHEDULE_SCALABLE and
> >>>> +# adds -DODP_SCHEDULE_SCALABLE to CFLAGS.
> >>>> +AC_ARG_ENABLE(
> >>>> +    [schedule_scalable],
> >>>> +    [AC_HLP_STRING([--enable-schedule-scalable],
> >>>> +                    [enable scalable scheduler])],
> >>>> +    [if test "x$enableval" = yes; then
> >>>> +         schedule_scalable=true
> >>>> +         ODP_CFLAGS="$ODP_CFLAGS -DODPSCHEDULE_SCALABLE"
> >>>> +     else
> >>>> +         schedule_scalable=false
> >>>> +     fi],
> >>>> +    [schedule_scalable=false])
> >>>> +AM_CONDITIONAL([ODP_SCHEDULE_SCALABLE], [test x$schedule_scalable =
> >>>>xtrue])
> >>>> diff --git a/platform/linux-generic/odp_queue_scalable.c
> >>>>b/platform/linux-generic/odp_queue_scalable.c
> >>>> new file mode 100644
> >>>> index 00000000..240180eb
> >>>> --- /dev/null
> >>>> +++ b/platform/linux-generic/odp_queue_scalable.c
> >>>> @@ -0,0 +1,922 @@
> >>>> +/* Copyright (c) 2017, ARM Limited
> >>>>  * All rights reserved.
> >>>> + *
> >>>> + * SPDX-License-Identifier:     BSD-3-Clause
> >>>> + */
> >>>> +clude <odp/api/hints.h>
> >>>> +#include <odp/api/plat/ticketlock_inlines.h>
> >>>> +#include <odp/api/queue.h>
> >>>> +#include <odp/api/schedule.h>
> >>>> +#include <odp/api/shared_memory.h>
> >>>> +#include <odp/api/sync.h>
> >>>> +#include <odp/api/traffic_mngr.h>
> >>>> +
> >>>> +#include <odp_internal.h>
> >>>> +#include <odp_config_internal.h>
> >>>> +#include <odp_debug_internal.h>
> >>>> +
> >>>> +#include <odp_buffer_inlines.h>
> >>>> +#include <odp_packet_io_internal.h>
> >>>> +#include <odp_packet_io_queue.h>
> >>>> +#include <odp_pool_internal.h>
> >>>> +#include <odp_queue_nternal.h>
> >>>> +#include <odp_schedule_if.h>
> >>>> +#include <_ishm_internal.h>
> >>>> +#include <_ishmpool_internal.h>
> >>>> +
> >>>> +#include <string.h>
> >>>> +#include<inttypes.h>
> >>>> +
> >>>> +#define NUM_INTERNAL_QUEUES 64
> >>>> +
> >>>> +#define MIN(a, b) \
> >>>> +       ({ \
> >>>> +               __typeof__(a) tmpa = (a); \
> >>>> +               __typeof__(b) tmp_b = (b); \
> >>>> +               tmp_a < tmp_b ? tmp_a : tmp_b; \
> >>>> +       })
> >>>> +
> >>>> +#define LCK(a)     _odp_ticketlock_lock(a)
> >>>> +#define UNLOCK(a)    _odp_ticketlock_unlock(a)
> >>>> +#define LOCK_INIT(a) odp_ticketlock_init(a)
> >>>> +
> >>>> +extern __thread sched_scalable_thread_state_t *sched_ts;
> >>>> +
> >>>> +typedef struct queue_table_t {
> >>>> +       queue_entry_t  queue[ODP_CONFIG_QUEUES];
> >>>> +} queue_tabe_t;
> >>>> +
> >>>> +static queue_table_t queue_tbl;
> >>>> +_odp_ishm_pool_t *queue_shm_pool;
> >>>> +
> >>>> +static inline odp_queue_t queue_from_id(uint32_t queue_id)
> >>>> +{
> >>>> +       return _odp_cast_scalar(odp_queue_t, queue_id + 1);
> >>>> +}
> >>>> +
> >>>> +queue_entry_t *get_qentry(uint32_t queue_id)
> >>>> +{
> >>>> +       return &queue_tbl->queue[queue_id];
> >>>> +}
> >>>> +
> >>>> +static int _odp_queue_disable_enq(sched_elem_t *q)
> >>>> +{
> >>>> +       ringidx_t old_read, old_write, new_write;
> >>>> +       uint32_t size;
> >>>> +
> >>>> +       old_write = q->prod_write;
> >>>> +       size = q->prod_mask + 1;
> >>>> +       do {
> >>>> +               /* Need __atomic_load to avoid compiler reordering */
> >>>> +               old_read = __atomic_load_n(&q->prod_read,
> >>>>__ATOMIC_ACQUIRE);
> >>>> +               if (old_write != old_read) {
> >>>> +                       /* Queue is not empty, cannot claim all
> >>>>elements
> >>>> +                       * Cannot disable enqueue.
> >>>> +                       */
> >>>> +                       return -1;
> >>>> +               }
> >>>> +               /* Claim all elements in ring */
> >>>> +               new_write = old_write + size;
> >>>> +       } while (!__atomic_compare_exchange_n(&q->prod_write,
> >>>> +                               &old_write, /* Updated on failure */
> >>>> +                               new_write,
> >>>> +                               true,
> >>>> +                               __ATOMIC_RELAXED,
> >>>> +                               __ATOMIC_RELAXED));
> >>>> +       /* All remaining elements claimed, no one else can enqueue */
> >>>> +       return 0;
> >>>> +}
> >>>> +
> >>>> +static int queue_init(int queue_idx, queue_entry_t *queue, const
> >>>>char *name,
> >>>> +                     const odp_queue_param_t *param)
> >>>> +{
> >>>> +       ringidx_t ring_idx;
> >>>> +       sched_elem_t *sched_elem;
> >>>> +       uint32_t ring_size;
> >>>> +       odp_buffer_hdr_t **ring;
> >>>> +       uint32_t size;
> >>>> +
> >>>> +       sched_elem = &queue->s.sched_elem;
> >>>> +       ring_size = param->size > 0 ?
> >>>> +               ROUNDUP_POWER2_U32(param->size) : CONFIG_QUEUE_SIZE;
> >>>> +       strncpy(queue->s.name, name ? name : "", ODP_QUEUE_NAME_LEN -
> >>>>1);
> >>>> +       queue->s.name[ODP_QUEUE_NAME_LEN - 1] = 0;
> >>>> +       memcpy(&queue->s.param, param, sizeof(odp_queue_param_t));
> >>>> +
> >>>> +       size = ring_size * sizeof(odp_buffer_hdr_t *);
> >>>> +       ring = (odp_buffer_hdr_t
> >>>>**)shm_pool_alloc_align(queue_shm_pool, size);
> >>>> +       if (NULL == ring)
> >>>> +               return -1;
> >>>> +
> >>>> +       for (ring_idx = 0; ring_idx < ring_size; ring_idx++)
> >>>> +               ring[ring_idx] = NULL;
> >>>> +
> >>>> +       queue->s.type = queue->s.param.type;
> >>>> +       queue->s.enqueue = queue_enq;
> >>>> +       queue->s.dequeue = queue_deq;
> >>>> +       queue->s.enqueue_multi = queue_enq_multi;
> >>>> +       queue->s.dequeue_multi = queue_deq_multi;
> >>>> +       queue->s.pktin = PKTIN_INVALID;
> >>>> +
> >>>> +       sched_elem->node.next = NULL;
> >>>> +#ifdef CONFIG_QSCHST_LOCK
> >>>> +       LOCK_INIT(&sched_elem->qschlock);
> >>>> +#endif
> >>>> +       sched_elem->qschst.numevts = 0;
> >>>> +       sched_elem->qschst.wrr_budget = CONFIG_WRR_WEIGHT;
> >>>> +       sched_elem->qschst.cur_ticket = 0;
> >>>> +       sched_elem->qschst.nxt_ticket = 0;
> >>>> +       sched_elem->pop_deficit = 0;
> >>>> +       if (queue->s.type == ODP_QUEUE_TYPE_SCHED)
> >>>> +               sched_elem->qschst_type = queue->s.param.sched.sync;
> >>>> +       else
> >>>> +               sched_elem->qschst_type = ODP_NO_SCHED_QUEUE;
> >>>> +       /* 2nd cache line - enqueue */
> >>>> +       sched_elem->prod_read = 0;
> >>>> +       sched_elem->prod_write = 0;
> >>>> +       sched_elem->prod_ring = ring;
> >>>> +       sched_elem->prod_mask = ring_size - 1;
> >>>> +       /* 3rd cache line - dequeue */
> >>>> +       sched_elem->cons_read = 0;
> >>>> +       sched_elem->cons_write = 0;
> >>>> +       sched_elem->rwin = NULL;
> >>>> +       sched_elem->schedq = NULL;
> >>>> +       sched_elem->user_ctx = queue->s.param.context;
> >>>> +#ifdef CONFIG_SPLIT_PRODCONS
> >>>> +       sched_elem->cons_ring = ring;
> >>>> +       sched_elem->cons_mask = ring_size - 1;
> >>>> +       sched_elem->cons_type = sched_elem->qschst_type;
> >>>> +#endif
> >>>> +
> >>>> +       /* Queue initialized successfully, add it to the sched group
> >>>>*/
> >>>> +       if (queue->s.type == ODP_QUEUE_TYPE_SCHED) {
> >>>> +               if (queue->s.param.sched.sync ==
> >>>>ODP_SCHED_SYNC_ORDERED) {
> >>>> +                       sched_elem->rwin =
> >>>> +                               rwin_alloc(queue_idx, queue_shm_pool,
> >>>> +
> >>>>queue->s.param.sched.lock_count);
> >>>> +                       if (sched_elem->rwin == NULL) {
> >>>> +                               ODP_ERR("Reorder window not
> >>>>created\n");
> >>>> +                               goto rwin_create_failed;
> >>>> +                       }
> >>>> +               }
> >>>> +               sched_elem->schedq =
> >>>> +                       schedq_from_sched_group(param->sched.group,
> >>>> +                                               param->sched.prio);
> >>>> +       }
> >>>> +
> >>>> +       return 0;
> >>>> +
> >>>> +rwin_create_failed:
> >>>> +       _odp_ishm_pool_free(queue_shm_pool, ring);
> >>>> +
> >>>> +       return -1;
> >>>> +}
> >>>> +
> >>>> +int odp_queue_init_global(void)
> >>>> +{
> >>>> +       uint32_t i;
> >>>> +       uint64_t pool_size;
> >>>> +       uint64_t min_alloc;
> >>>> +       uint64_t max_alloc;
> >>>> +
> >>>> +       ODP_DBG("Queue init ... ");
> >>>> +
> >>>> +       /* Attach to the pool if it exists */
> >>>> +       queue_shm_pool = _odp_ishm_pool_lookup("queue_shm_pool");
> >>>> +       if (queue_shm_pool == NULL) {
> >>>> +               /* Create shared memory pool to allocate shared
> >>>>memory for the
> >>>> +                * queues. Use the default queue size.
> >>>> +                */
> >>>> +               /* Add size of the array holding the queues */
> >>>> +               pool_size = sizeof(queue_table_t);
> >>>> +               /* Add storage required for queues */
> >>>> +               pool_size += (CONFIG_QUEUE_SIZE *
> >>>>sizeof(odp_buffer_hdr_t *)) *
> >>>> +                            ODP_CONFIG_QUEUES;
> >>>> +               /* Add the reorder window size */
> >>>> +               pool_size += sizeof(reorder_window_t) *
> >>>>ODP_CONFIG_QUEUES;
> >>>> +               /* Choose min_alloc and max_alloc such that buddy
> >>>>allocator is
> >>>> +                * is selected.
> >>>> +                */
> >>>> +               min_alloc = 0;
> >>>> +               max_alloc = CONFIG_QUEUE_SIZE *
> >>>>sizeof(odp_buffer_hdr_t *);
> >>>> +               queue_shm_pool =
> >>>>_odp_ishm_pool_create("queue_shm_pool",
> >>>> +                                                      pool_size,
> >>>> +                                                      min_alloc,
> >>>>max_alloc,
> >>>> +
> >>>>_ODP_ISHM_SINGLE_VA);
> >>>> +               if (queue_shm_pool == NULL) {
> >>>> +                       ODP_ERR("Failed to allocate shared memory
> >>>>pool for"
> >>>> +                               " queues\n");
> >>>> +                       goto queue_shm_pool_create_failed;
> >>>> +               }
> >>>> +       }
> >>>> +
> >>>> +       queue_tbl = (queue_table_t *)
> >>>> +                   shm_pool_alloc_align(queue_shm_pool,
> >>>> +                                        sizeof(queue_table_t));
> >>>> +       if (queue_tbl == NULL) {
> >>>> +               ODP_ERR("Failed to reserve shared memory for queue
> >>>>table\n");
> >>>> +               goto queue_tbl_ishm_alloc_failed;
> >>>> +       }
> >>>> +
> >>>> +       memset(queue_tbl, 0, sizeof(queue_table_t));
> >>>> +
> >>>> +       for (i = 0; i < ODP_CONFIG_QUEUES; i++) {
> >>>> +               /* init locks */
> >>>> +               queue_entry_t *queue;
> >>>> +
> >>>> +               queue = get_qentry(i);
> >>>> +               LOCK_INIT(&queue->s.lock);
> >>>> +               queue->s.index  = i;
> >>>> +               queue->s.handle = queue_from_id(i);
> >>>> +       }
> >>>> +
> >>>> +       ODP_DBG("done\n");
> >>>> +       ODP_DBG("Queue init global\n");
> >>>> +       ODP_DBG("  struct queue_entry_s size %zu\n",
> >>>> +               sizeof(struct queue_entry_s));
> >>>> +       ODP_DBG("  queue_entry_t size        %zu\n",
> >>>> +               sizeof(queue_entry_t));
> >>>> +       ODP_DBG("\n");
> >>>> +
> >>>> +       return 0;
> >>>> +
> >>>> +queue_shm_pool_create_failed:
> >>>> +
> >>>> +queue_tbl_ishm_alloc_failed:
> >>>> +       _odp_ishm_pool_destroy(queue_shm_pool);
> >>>> +
> >>>> +       return -1;
> >>>> +}
> >>>> +
> >>>> +int odp_queue_term_global(void)
> >>>> +{
> >>>> +       int ret = 0;
> >>>> +       int rc = 0;
> >>>> +       queue_entry_t *queue;
> >>>> +       int i;
> >>>> +
> >>>> +       for (i = 0; i < ODP_CONFIG_QUEUES; i++) {
> >>>> +               queue = &queue_tbl->queue[i];
> >>>> +               if (__atomic_load_n(&queue->s.status,
> >>>> +                                   __ATOMIC_RELAXED) !=
> >>>>QUEUE_STATUS_FREE) {
> >>>> +                       ODP_ERR("Not destroyed queue: %s\n",
> >>>>queue->s.name);
> >>>> +                       rc = -1;
> >>>> +               }
> >>>> +       }
> >>>> +
> >>>> +       _odp_ishm_pool_free(queue_shm_pool, queue_tbl);
> >>>> +
> >>>> +       ret = _odp_ishm_pool_destroy(queue_shm_pool);
> >>>> +       if (ret < 0) {
> >>>> +               ODP_ERR("Failed to destroy shared memory pool for
> >>>>queues\n");
> >>>> +               rc = -1;
> >>>> +       }
> >>>> +
> >>>> +       return rc;
> >>>> +}
> >>>> +
> >>>> +int odp_queue_capability(odp_queue_capability_t *capa)
> >>>> +{
> >>>> +       memset(capa, 0, sizeof(odp_queue_capability_t));
> >>>> +
> >>>> +       /* Reserve some queues for internal use */
> >>>> +       capa->max_queues        = ODP_CONFIG_QUEUES -
> >>>>NUM_INTERNAL_QUEUES;
> >>>> +       capa->max_ordered_locks = SCHEDULE_ORDERED_LOCKS_PER_QUEUE;
> >>>> +       capa->max_sched_groups  = sched_fn->num_grps();
> >>>> +       capa->sched_prios       = odp_schedule_num_prio();
> >>>> +       capa->plain.max_num     = ODP_CONFIG_QUEUES -
> >>>>NUM_INTERNAL_QUEUES;
> >>>> +       capa->plain.max_size    = 0;
> >>>> +       capa->sched.max_num     = ODP_CONFIG_QUEUES -
> >>>>NUM_INTERNAL_QUEUES;
> >>>> +       capa->sched.max_size    = 0;
> >>>> +
> >>>> +       return 0;
> >>>> +}
> >>>> +
> >>>> +odp_queue_type_t odp_queue_type(odp_queue_t handle)
> >>>> +{
> >>>> +       return queue_to_qentry(handle)->s.type;
> >>>> +}
> >>>> +
> >>>> +odp_schedule_sync_t odp_queue_sched_type(odp_queue_t handle)
> >>>> +{
> >>>> +       return queue_to_qentry(handle)->s.param.sched.sync;
> >>>> +}
> >>>> +
> >>>> +odp_schedule_prio_t odp_queue_sched_prio(odp_queue_t handle)
> >>>> +{
> >>>> +       return queue_to_qentry(handle)->s.param.sched.prio;
> >>>> +}
> >>>> +
> >>>> +odp_schedule_group_t odp_queue_sched_group(odp_queue_t handle)
> >>>> +{
> >>>> +       return queue_to_qentry(handle)->s.param.sched.group;
> >>>> +}
> >>>> +
> >>>> +int odp_queue_lock_count(odp_queue_t handle)
> >>>> +{
> >>>> +       queue_entry_t *queue = queue_to_qentry(handle);
> >>>> +
> >>>> +       return queue->s.param.sched.sync == ODP_SCHED_SYNC_ORDERED ?
> >>>> +               (int)queue->s.param.sched.lock_count : -1;
> >>>> +}
> >>>> +
> >>>> +odp_queue_t odp_queue_create(const char *name, const
> >>>>odp_queue_param_t *param)
> >>>> +{
> >>>> +       int queue_idx;
> >>>> +       odp_queue_t handle = ODP_QUEUE_INVALID;
> >>>> +       queue_entry_t *queue;
> >>>> +       odp_queue_param_t default_param;
> >>>> +
> >>>> +       if (param == NULL) {
> >>>> +               odp_queue_param_init(&default_param);
> >>>> +               param = &default_param;
> >>>> +       }
> >>>> +
> >>>> +       for (queue_idx = 0; queue_idx < ODP_CONFIG_QUEUES;
> >>>>queue_idx++) {
> >>>> +               queue = &queue_tbl->queue[queue_idx];
> >>>> +
> >>>> +               if (queue->s.status != QUEUE_STATUS_FREE)
> >>>> +                       continue;
> >>>> +
> >>>> +               LOCK(&queue->s.lock);
> >>>> +               if (queue->s.status == QUEUE_STATUS_FREE) {
> >>>> +                       if (queue_init(queue_idx, queue, name,
> >>>>param)) {
> >>>> +                               UNLOCK(&queue->s.lock);
> >>>> +                               return handle;
> >>>> +                       }
> >>>> +                       queue->s.status = QUEUE_STATUS_READY;
> >>>> +                       handle = queue->s.handle;
> >>>> +                       UNLOCK(&queue->s.lock);
> >>>> +                       break;
> >>>> +               }
> >>>> +               UNLOCK(&queue->s.lock);
> >>>> +       }
> >>>> +       return handle;
> >>>> +}
> >>>> +
> >>>> +int odp_queue_destroy(odp_queue_t handle)
> >>>> +{
> >>>> +       queue_entry_t *queue;
> >>>> +       sched_elem_t *q;
> >>>> +
> >>>> +       if (handle == ODP_QUEUE_INVALID)
> >>>> +               return -1;
> >>>> +
> >>>> +       queue = queue_to_qentry(handle);
> >>>> +       LOCK(&queue->s.lock);
> >>>> +       if (queue->s.status != QUEUE_STATUS_READY) {
> >>>> +               UNLOCK(&queue->s.lock);
> >>>> +               return -1;
> >>>> +       }
> >>>> +       q = &queue->s.sched_elem;
> >>>> +
> >>>> +#ifdef CONFIG_QSCHST_LOCK
> >>>> +       LOCK(&q->qschlock);
> >>>> +#endif
> >>>> +       if (_odp_queue_disable_enq(q)) {
> >>>> +               /* Producer side not empty */
> >>>> +#ifdef CONFIG_QSCHST_LOCK
> >>>> +               UNLOCK(&q->qschlock);
> >>>> +#endif
> >>>> +               UNLOCK(&queue->s.lock);
> >>>> +               return -1;
> >>>> +       }
> >>>> +       /* Enqueue is now disabled */
> >>>> +       if (q->cons_read != q->cons_write) {
> >>>> +               /* Consumer side is not empty
> >>>> +                * Roll back previous change, enable enqueue again.
> >>>> +                */
> >>>> +               uint32_t size;
> >>>> +
> >>>> +               size = q->prod_mask + 1;
> >>>> +               __atomic_fetch_sub(&q->prod_write, size,
> >>>>__ATOMIC_RELAXED);
> >>>> +#ifdef CONFIG_QSCHST_LOCK
> >>>> +               UNLOCK(&q->qschlock);
> >>>> +#endif
> >>>> +               UNLOCK(&queue->s.lock);
> >>>> +               return -1;
> >>>> +       }
> >>>> +#ifdef CONFIG_QSCHST_LOCK
> >>>> +       UNLOCK(&q->qschlock);
> >>>> +#endif
> >>>> +       /* Producer and consumer sides empty, enqueue disabled
> >>>> +        * Now wait until schedq state is empty and no outstanding
> >>>>tickets
> >>>> +        */
> >>>> +       while (__atomic_load_n(&q->qschst.numevts, __ATOMIC_RELAXED)
> >>>>!= 0 ||
> >>>> +              __atomic_load_n(&q->qschst.cur_ticket,
> >>>>__ATOMIC_RELAXED) !=
> >>>> +              __atomic_load_n(&q->qschst.nxt_ticket,
> >>>>__ATOMIC_RELAXED)) {
> >>>> +               sevl();
> >>>> +               while (wfe() && monitor32((uint32_t
> >>>>*)&q->qschst.numevts,
> >>>> +                                         __ATOMIC_RELAXED) != 0)
> >>>> +                       doze();
> >>>> +       }
> >>>> +
> >>>> +       /* Adjust the spread factor for the queues in the schedule
> >>>>group */
> >>>> +       if (queue->s.type == ODP_QUEUE_TYPE_SCHED)
> >>>> +               sched_group_xcount_dec(queue->s.param.sched.group,
> >>>> +                                      queue->s.param.sched.prio);
> >>>> +
> >>>> +       _odp_ishm_pool_free(queue_shm_pool, q->prod_ring);
> >>>> +
> >>>> +       if (queue->s.param.sched.sync == ODP_SCHED_SYNC_ORDERED) {
> >>>> +               if (rwin_free(queue_shm_pool, q->rwin) < 0) {
> >>>> +                       ODP_ERR("Failed to free reorder window\n");
> >>>> +                       UNLOCK(&queue->s.lock);
> >>>> +                       return -1;
> >>>> +               }
> >>>> +       }
> >>>> +       queue->s.status = QUEUE_STATUS_FREE;
> >>>> +       UNLOCK(&queue->s.lock);
> >>>> +       return 0;
> >>>> +}
> >>>> +
> >>>> +int odp_queue_context_set(odp_queue_t handle, void *context,
> >>>> +                         uint32_t len ODP_UNUSED)
> >>>> +{
> >>>> +       odp_mb_full();
> >>>> +       queue_to_qentry(handle)->s.param.context = context;
> >>>> +       odp_mb_full();
> >>>> +       return 0;
> >>>> +}
> >>>> +
> >>>> +void *odp_queue_context(odp_queue_t handle)
> >>>> +{
> >>>> +       return queue_to_qentry(handle)->s.param.context;
> >>>> +}
> >>>> +
> >>>> +odp_queue_t odp_queue_lookup(const char *name)
> >>>> +{
> >>>> +       uint32_t i;
> >>>> +
> >>>> +       for (i = 0; i < ODP_CONFIG_QUEUES; i++) {
> >>>> +               queue_entry_t *queue = &queue_tbl->queue[i];
> >>>> +
> >>>> +               if (queue->s.status == QUEUE_STATUS_FREE ||
> >>>> +                   queue->s.status == QUEUE_STATUS_DESTROYED)
> >>>> +                       continue;
> >>>> +
> >>>> +               LOCK(&queue->s.lock);
> >>>> +               if (strcmp(name, queue->s.name) == 0) {
> >>>> +                       /* found it */
> >>>> +                       UNLOCK(&queue->s.lock);
> >>>> +                       return queue->s.handle;
> >>>> +               }
> >>>> +               UNLOCK(&queue->s.lock);
> >>>> +       }
> >>>> +
> >>>> +       return ODP_QUEUE_INVALID;
> >>>> +}
> >>>> +
> >>>> +#ifndef CONFIG_QSCHST_LOCK
> >>>> +static inline int _odp_queue_enq(sched_elem_t *q,
> >>>> +                                odp_buffer_hdr_t *buf_hdr[],
> >>>> +                                int num)
> >>>> +{
> >>>> +       ringidx_t old_read;
> >>>> +       ringidx_t old_write;
> >>>> +       ringidx_t new_write;
> >>>> +       int actual;
> >>>> +       uint32_t mask;
> >>>> +       odp_buffer_hdr_t **ring;
> >>>> +
> >>>> +       mask = q->prod_mask;
> >>>> +       ring = q->prod_ring;
> >>>> +
> >>>> +       /* Load producer ring state (read & write index) */
> >>>> +       old_write = __atomic_load_n(&q->prod_write, __ATOMIC_RELAXED);
> >>>> +       do {
> >>>> +               /* Consumer does store-release prod_read, we need
> >>>> +                * load-acquire.
> >>>> +                */
> >>>> +               old_read = __atomic_load_n(&q->prod_read,
> >>>>__ATOMIC_ACQUIRE);
> >>>> +
> >>>> +               actual = MIN(num, (int)((mask + 1) - (old_write -
> >>>>old_read)));
> >>>> +               if (odp_unlikely(actual <= 0))
> >>>> +                       return 0;
> >>>> +
> >>>> +               new_write = old_write + actual;
> >>>> +       } while (!__atomic_compare_exchange_n(&q->prod_write,
> >>>> +                                       &old_write, /* Updated on
> >>>>failure */
> >>>> +                                       new_write,
> >>>> +                                       true,
> >>>> +                                       __ATOMIC_RELAXED,
> >>>> +                                       __ATOMIC_RELAXED));
> >>>> +
> >>>> +#ifdef CONFIG_SPLIT_PRODCONS
> >>>> +       __builtin_prefetch(&q->cons_write, 0, 0);
> >>>> +#endif
> >>>> +       /* Store our event(s) in the ring */
> >>>> +       do {
> >>>> +               ring[old_write & mask] = *buf_hdr++;
> >>>> +       } while (++old_write != new_write);
> >>>> +       old_write -= actual;
> >>>> +
> >>>> +#ifdef CONFIG_SPLIT_PRODCONS
> >>>> +       __builtin_prefetch(&q->node, 1, 0);
> >>>> +#endif
> >>>> +       /* Wait for our turn to signal consumers */
> >>>> +       if (odp_unlikely(__atomic_load_n(&q->cons_write,
> >>>> +                                        __ATOMIC_RELAXED) !=
> >>>>old_write)) {
> >>>> +               sevl();
> >>>> +               while (wfe() && monitor32(&q->cons_write,
> >>>> +                                         __ATOMIC_RELAXED) !=
> >>>>old_write)
> >>>> +                       doze();
> >>>> +       }
> >>>> +
> >>>> +       /* Signal consumers that events are available (release events)
> >>>> +        * Enable other producers to continue
> >>>> +        */
> >>>> +       /* Wait for writes (to ring slots) to complete */
> >>>> +       atomic_store_release(&q->cons_write, new_write,
> >>>>/*readonly=*/false);
> >>>> +
> >>>> +       return actual;
> >>>> +}
> >>>> +
> >>>> +#else
> >>>> +
> >>>> +static inline int _odp_queue_enq_sp(sched_elem_t *q,
> >>>> +                                   odp_buffer_hdr_t *buf_hdr[],
> >>>> +                                   int num)
> >>>> +{
> >>>> +       ringidx_t old_read;
> >>>> +       ringidx_t old_write;
> >>>> +       ringidx_t new_write;
> >>>> +       int actual;
> >>>> +       uint32_t mask;
> >>>> +       odp_buffer_hdr_t **ring;
> >>>> +
> >>>> +       mask = q->prod_mask;
> >>>> +       ring = q->prod_ring;
> >>>> +
> >>>> +       /* Load producer ring state (read & write index) */
> >>>> +       old_write = q->prod_write;
> >>>> +       /* Consumer does store-release prod_read, we need
> >>>>load-acquire */
> >>>> +       old_read = __atomic_load_n(&q->prod_read, __ATOMIC_ACQUIRE);
> >>>> +       actual = MIN(num, (int)((mask + 1) - (old_write - old_read)));
> >>>> +       if (odp_unlikely(actual <= 0))
> >>>> +               return 0;
> >>>> +
> >>>> +       new_write = old_write + actual;
> >>>> +       q->prod_write = new_write;
> >>>> +
> >>>> +       /* Store our event(s) in the ring */
> >>>> +       do {
> >>>> +               ring[old_write & mask] = *buf_hdr++;
> >>>> +       } while (++old_write != new_write);
> >>>> +       old_write -= actual;
> >>>> +
> >>>> +#ifdef CONFIG_SPLIT_PRODCONS
> >>>> +       __builtin_prefetch(&q->node, 1, 0);
> >>>> +#endif
> >>>> +
> >>>> +       /* Signal consumers that events are available (release events)
> >>>> +        * Enable other producers to continue
> >>>> +        */
> >>>> +#ifdef CONFIG_QSCHST_LOCK
> >>>> +       q->cons_write

Reply via email to