On Mon, Dec 12, 2016 at 6:35 AM, Maxim Uvarov <[email protected]> wrote:
> On 12/08/16 14:04, Petri Savolainen wrote:
>> Improve scalability by replacing lock protected linked list
>> with a ring. Schedule group supported was updated also, since
>> ring does not support peek of the head item.
>>
>> Signed-off-by: Petri Savolainen <[email protected]>
>> ---
>>  platform/linux-generic/odp_schedule_sp.c | 271 
>> +++++++++++++++++++++++--------
>>  1 file changed, 199 insertions(+), 72 deletions(-)
>>
>> diff --git a/platform/linux-generic/odp_schedule_sp.c 
>> b/platform/linux-generic/odp_schedule_sp.c
>> index 76d1357..5150d28 100644
>> --- a/platform/linux-generic/odp_schedule_sp.c
>> +++ b/platform/linux-generic/odp_schedule_sp.c
>> @@ -13,9 +13,12 @@
>>  #include <odp_debug_internal.h>
>>  #include <odp_align_internal.h>
>>  #include <odp_config_internal.h>
>> +#include <odp_ring_internal.h>
>>
>> +#define NUM_THREAD        ODP_THREAD_COUNT_MAX
>>  #define NUM_QUEUE         ODP_CONFIG_QUEUES
>>  #define NUM_PKTIO         ODP_CONFIG_PKTIO_ENTRIES
>> +#define NUM_ORDERED_LOCKS 1
>>  #define NUM_PRIO          3
>>  #define NUM_STATIC_GROUP  3
>>  #define NUM_GROUP         (NUM_STATIC_GROUP + 9)
>> @@ -28,9 +31,17 @@
>>  #define GROUP_ALL         ODP_SCHED_GROUP_ALL
>>  #define GROUP_WORKER      ODP_SCHED_GROUP_WORKER
>>  #define GROUP_CONTROL     ODP_SCHED_GROUP_CONTROL
>> -#define MAX_ORDERED_LOCKS_PER_QUEUE 1
>> +#define GROUP_PKTIN       GROUP_ALL
>>
>> -ODP_STATIC_ASSERT(MAX_ORDERED_LOCKS_PER_QUEUE <= CONFIG_QUEUE_MAX_ORD_LOCKS,
>> +/* Maximum number of commands: one priority/group for all queues and pktios 
>> */
>> +#define RING_SIZE         (ODP_ROUNDUP_POWER_2(NUM_QUEUE + NUM_PKTIO))
>> +#define RING_MASK         (RING_SIZE - 1)
>> +
>> +/* Ring size must be power of two */
>> +ODP_STATIC_ASSERT(ODP_VAL_IS_POWER_2(RING_SIZE),
>> +               "Ring_size_is_not_power_of_two");
>> +
>> +ODP_STATIC_ASSERT(NUM_ORDERED_LOCKS <= CONFIG_QUEUE_MAX_ORD_LOCKS,
>>                 "Too_many_ordered_locks");
>>
>>  struct sched_cmd_t;
>> @@ -38,6 +49,7 @@ struct sched_cmd_t;
>>  struct sched_cmd_s {
>>       struct sched_cmd_t *next;
>>       uint32_t           index;
>> +     uint32_t           ring_idx;
>>       int                type;
>>       int                prio;
>>       int                group;
>> @@ -52,38 +64,49 @@ typedef struct sched_cmd_t {
>>                              sizeof(struct sched_cmd_s)];
>>  } sched_cmd_t ODP_ALIGNED_CACHE;
>>
>> -struct prio_queue_s {
>> -     odp_ticketlock_t  lock;
>> -     sched_cmd_t       *head;
>> -     sched_cmd_t       *tail;
>> -};
>> +typedef struct {
>> +     /* Ring header */
>> +     ring_t ring;
>> +
>> +     /* Ring data: queue indexes */
>> +     uint32_t ring_idx[RING_SIZE];
>>
>> -typedef      struct prio_queue_t {
>> -     struct prio_queue_s s;
>> -     uint8_t             pad[ROUNDUP_CACHE(sizeof(struct prio_queue_s)) -
>> -                             sizeof(struct prio_queue_s)];
>>  } prio_queue_t ODP_ALIGNED_CACHE;
>>
>> -struct sched_group_s {
>> -     odp_ticketlock_t  lock;
>> +typedef struct thr_group_t {
>> +     /* A generation counter for fast comparison if groups have changed */
>> +     odp_atomic_u32_t gen_cnt;
>>
>> -     struct {
>> -             char          name[ODP_SCHED_GROUP_NAME_LEN + 1];
>> -             odp_thrmask_t mask;
>> -             int           allocated;
>> -     } group[NUM_GROUP];
>> -};
>> +     /* Number of groups the thread belongs to */
>> +     int num_group;
>> +
>> +     /* The groups the thread belongs to */
>> +     int group[NUM_GROUP];
>> +
>> +} thr_group_t;
>>
>>  typedef struct sched_group_t {
>> -     struct sched_group_s s;
>> -     uint8_t              pad[ROUNDUP_CACHE(sizeof(struct sched_group_s)) -
>> -                              sizeof(struct sched_group_s)];
>> +     struct {
>> +             odp_ticketlock_t  lock;
>> +
>> +             /* All groups */
>> +             struct {
>> +                     char          name[ODP_SCHED_GROUP_NAME_LEN + 1];
>> +                     odp_thrmask_t mask;
>> +                     int           allocated;
>> +             } group[NUM_GROUP];
>> +
>> +             /* Per thread group information */
>> +             thr_group_t thr[NUM_THREAD];
>> +
>> +     } s;
>> +
>>  } sched_group_t ODP_ALIGNED_CACHE;
>>
>>  typedef struct {
>>       sched_cmd_t   queue_cmd[NUM_QUEUE];
>>       sched_cmd_t   pktio_cmd[NUM_PKTIO];
>> -     prio_queue_t  prio_queue[NUM_PRIO];
>> +     prio_queue_t  prio_queue[NUM_GROUP][NUM_PRIO];
>>       sched_group_t sched_group;
>>  } sched_global_t;
>>
>> @@ -91,14 +114,37 @@ typedef struct {
>>       sched_cmd_t *cmd;
>>       int          pause;
>>       int          thr_id;
>> +     uint32_t     gen_cnt;
>> +     int          num_group;
>> +     int          group[NUM_GROUP];
>>  } sched_local_t;
>>
>>  static sched_global_t sched_global;
>>  static __thread sched_local_t sched_local;
>>
>> +static inline uint32_t index_to_ring_idx(int pktio, uint32_t index)
>> +{
>> +     if (pktio)
>> +             return (0x80000000 | index);
>> +
>> +     return index;
>> +}
>> +
>> +static inline uint32_t index_from_ring_idx(uint32_t *index, uint32_t 
>> ring_idx)
>> +{
>> +     uint32_t pktio = ring_idx & 0x80000000;
>> +
>> +     if (pktio)
>> +             *index = ring_idx & (~0x80000000);
>> +     else
>> +             *index = ring_idx;
>> +
>> +     return pktio;
>> +}
>> +
>>  static int init_global(void)
>>  {
>> -     int i;
>> +     int i, j;
>>       sched_group_t *sched_group = &sched_global.sched_group;
>>
>>       ODP_DBG("Using SP scheduler\n");
>> @@ -106,21 +152,28 @@ static int init_global(void)
>>       memset(&sched_global, 0, sizeof(sched_global_t));
>>
>>       for (i = 0; i < NUM_QUEUE; i++) {
>> -             sched_global.queue_cmd[i].s.type  = CMD_QUEUE;
>> -             sched_global.queue_cmd[i].s.index = i;
>> +             sched_global.queue_cmd[i].s.type     = CMD_QUEUE;
>> +             sched_global.queue_cmd[i].s.index    = i;
>> +             sched_global.queue_cmd[i].s.ring_idx = index_to_ring_idx(0, i);
>>       }
>>
>>       for (i = 0; i < NUM_PKTIO; i++) {
>> -             sched_global.pktio_cmd[i].s.type  = CMD_PKTIO;
>> -             sched_global.pktio_cmd[i].s.index = i;
>> -             sched_global.pktio_cmd[i].s.prio  = PKTIN_PRIO;
>> +             sched_global.pktio_cmd[i].s.type     = CMD_PKTIO;
>> +             sched_global.pktio_cmd[i].s.index    = i;
>> +             sched_global.pktio_cmd[i].s.ring_idx = index_to_ring_idx(1, i);
>> +             sched_global.pktio_cmd[i].s.prio     = PKTIN_PRIO;
>> +             sched_global.pktio_cmd[i].s.group    = GROUP_PKTIN;
>>       }
>>
>> -     for (i = 0; i < NUM_PRIO; i++)
>> -             odp_ticketlock_init(&sched_global.prio_queue[i].s.lock);
>> +     for (i = 0; i < NUM_GROUP; i++)
>> +             for (j = 0; j < NUM_PRIO; j++)
>> +                     ring_init(&sched_global.prio_queue[i][j].ring);
>>
>>       odp_ticketlock_init(&sched_group->s.lock);
>>
>> +     for (i = 0; i < NUM_THREAD; i++)
>> +             odp_atomic_init_u32(&sched_group->s.thr[i].gen_cnt, 0);
>> +
>>       strncpy(sched_group->s.group[GROUP_ALL].name, "__group_all",
>>               ODP_SCHED_GROUP_NAME_LEN);
>>       odp_thrmask_zero(&sched_group->s.group[GROUP_ALL].mask);
>> @@ -168,7 +221,48 @@ static int term_local(void)
>>
>>  static unsigned max_ordered_locks(void)
>>  {
>> -     return MAX_ORDERED_LOCKS_PER_QUEUE;
>> +     return NUM_ORDERED_LOCKS;
>> +}
>> +
>> +static void add_group(sched_group_t *sched_group, int thr, int group)
>> +{
>> +     int num;
>> +     uint32_t gen_cnt;
>> +     thr_group_t *thr_group = &sched_group->s.thr[thr];
>> +
>> +     num = thr_group->num_group;
>> +     thr_group->group[num] = group;
>> +     thr_group->num_group  = num + 1;
>> +     gen_cnt = odp_atomic_load_u32(&thr_group->gen_cnt);
>> +     odp_atomic_store_u32(&thr_group->gen_cnt, gen_cnt + 1);
>
>
> odp_atomic_inc_u32()

Better is:

gen_cnt = odp_atomic_fetch_inc_u32(&thr_group->gen_cnt);

>
>
>> +}
>> +
>> +static void remove_group(sched_group_t *sched_group, int thr, int group)
>> +{
>> +     int i, num;
>> +     int found = 0;
>> +     thr_group_t *thr_group = &sched_group->s.thr[thr];
>> +
>> +     num = thr_group->num_group;
>> +
>> +     for (i = 0; i < num; i++) {
>> +             if (thr_group->group[i] == group) {
>> +                     found = 1;
>> +
>> +                     for (; i < num - 1; i++)
>> +                             thr_group->group[i] = thr_group->group[i + 1];
>> +
>> +                     break;
>> +             }
>> +     }
>> +
>> +     if (found) {
>> +             uint32_t gen_cnt;
>> +
>> +             thr_group->num_group = num - 1;
>> +             gen_cnt = odp_atomic_load_u32(&thr_group->gen_cnt);
>> +             odp_atomic_store_u32(&thr_group->gen_cnt, gen_cnt + 1);
>
> odp_atomic_inc_u32()
>
> and you can remove if found by placing inc and break above. That should
> fit in 80 characters.

Same here:

gen_cnt = odp_atomic_fetch_inc_u32(&thr_group->gen_cnt);

>
>
>> +     }
>>  }
>>
>>  static int thr_add(odp_schedule_group_t group, int thr)
>> @@ -178,6 +272,9 @@ static int thr_add(odp_schedule_group_t group, int thr)
>>       if (group < 0 || group >= NUM_GROUP)
>>               return -1;
>>
>> +     if (thr < 0 || thr >= NUM_THREAD)
>> +             return -1;
>> +
>>       odp_ticketlock_lock(&sched_group->s.lock);
>>
>>       if (!sched_group->s.group[group].allocated) {
>> @@ -186,6 +283,7 @@ static int thr_add(odp_schedule_group_t group, int thr)
>>       }
>>
>>       odp_thrmask_set(&sched_group->s.group[group].mask, thr);
>> +     add_group(sched_group, thr, group);
>>
>>       odp_ticketlock_unlock(&sched_group->s.lock);
>>
>> @@ -208,6 +306,8 @@ static int thr_rem(odp_schedule_group_t group, int thr)
>>
>>       odp_thrmask_clr(&sched_group->s.group[group].mask, thr);
>>
>> +     remove_group(sched_group, thr, group);
>> +
>>       odp_ticketlock_unlock(&sched_group->s.lock);
>>
>>       return 0;
>> @@ -250,51 +350,34 @@ static void destroy_queue(uint32_t qi)
>>  static inline void add_tail(sched_cmd_t *cmd)
>>  {
>>       prio_queue_t *prio_queue;
>> +     int group    = cmd->s.group;
>> +     int prio     = cmd->s.prio;
>> +     uint32_t idx = cmd->s.ring_idx;
>>
>> -     prio_queue   = &sched_global.prio_queue[cmd->s.prio];
>> -     cmd->s.next  = NULL;
>> -
>> -     odp_ticketlock_lock(&prio_queue->s.lock);
>> -
>> -     if (prio_queue->s.head == NULL)
>> -             prio_queue->s.head = cmd;
>> -     else
>> -             prio_queue->s.tail->s.next = cmd;
>> -
>> -     prio_queue->s.tail = cmd;
>> +     prio_queue = &sched_global.prio_queue[group][prio];
>>
>> -     odp_ticketlock_unlock(&prio_queue->s.lock);
>> +     ring_enq(&prio_queue->ring, RING_MASK, idx);
>>  }
>>
>> -static inline sched_cmd_t *rem_head(int prio)
>> +static inline sched_cmd_t *rem_head(int group, int prio)
>>  {
>>       prio_queue_t *prio_queue;
>> -     sched_cmd_t *cmd;
>> -
>> -     prio_queue = &sched_global.prio_queue[prio];
>> +     uint32_t ring_idx, index;
>> +     int pktio;
>>
>> -     odp_ticketlock_lock(&prio_queue->s.lock);
>> +     prio_queue = &sched_global.prio_queue[group][prio];
>>
>> -     if (prio_queue->s.head == NULL) {
>> -             cmd = NULL;
>> -     } else {
>> -             sched_group_t *sched_group = &sched_global.sched_group;
>> +     ring_idx = ring_deq(&prio_queue->ring, RING_MASK);
>>
>> -             cmd = prio_queue->s.head;
>> +     if (ring_idx == RING_EMPTY)
>> +             return NULL;
>>
>> -             /* Remove head cmd only if thread belongs to the
>> -              * scheduler group. Otherwise continue to the next priority
>> -              * queue. */
>> -             if (odp_thrmask_isset(&sched_group->s.group[cmd->s.group].mask,
>> -                                   sched_local.thr_id))
>> -                     prio_queue->s.head = cmd->s.next;
>> -             else
>> -                     cmd = NULL;
>> -     }
>> +     pktio = index_from_ring_idx(&index, ring_idx);
>>
>> -     odp_ticketlock_unlock(&prio_queue->s.lock);
>> +     if (pktio)
>> +             return &sched_global.pktio_cmd[index];
>>
>> -     return cmd;
>> +     return &sched_global.queue_cmd[index];
>>  }
>>
>>  static int sched_queue(uint32_t qi)
>> @@ -341,15 +424,43 @@ static void pktio_start(int pktio_index, int num, int 
>> pktin_idx[])
>>       add_tail(cmd);
>>  }
>>
>> -static inline sched_cmd_t *sched_cmd(int num_prio)
>> +static inline sched_cmd_t *sched_cmd(void)
>>  {
>> -     int prio;
>> +     int prio, i;
>> +     int thr = sched_local.thr_id;
>> +     sched_group_t *sched_group = &sched_global.sched_group;
>> +     thr_group_t *thr_group = &sched_group->s.thr[thr];
>> +     uint32_t gen_cnt;
>> +
>> +     /* There's no matching store_rel since the value is updated while
>> +      * holding a lock */
>> +     gen_cnt = odp_atomic_load_acq_u32(&thr_group->gen_cnt);
>> +
>> +     /* Check if groups have changed and need to be read again */
>> +     if (odp_unlikely(gen_cnt != sched_local.gen_cnt)) {
>> +             int num_grp;
>> +
>> +             odp_ticketlock_lock(&sched_group->s.lock);
>> +
>> +             num_grp = thr_group->num_group;
>> +             gen_cnt = odp_atomic_load_u32(&thr_group->gen_cnt);
>>
>> -     for (prio = 0; prio < num_prio; prio++) {
>> -             sched_cmd_t *cmd = rem_head(prio);
>> +             for (i = 0; i < num_grp; i++)
>> +                     sched_local.group[i] = thr_group->group[i];
>>
>> -             if (cmd)
>> -                     return cmd;
>> +             odp_ticketlock_unlock(&sched_group->s.lock);
>> +
>> +             sched_local.num_group = num_grp;
>> +             sched_local.gen_cnt   = gen_cnt;
>> +     }
>> +
>> +     for (i = 0; i < sched_local.num_group; i++) {
>> +             for (prio = 0; prio < NUM_PRIO; prio++) {
>> +                     sched_cmd_t *cmd = rem_head(sched_local.group[i], 
>> prio);
>> +
>> +                     if (cmd)
>> +                             return cmd;
>> +             }
>>       }
>>
>>       return NULL;
>> @@ -382,7 +493,7 @@ static int schedule_multi(odp_queue_t *from, uint64_t 
>> wait,
>>               uint32_t qi;
>>               int num;
>>
>> -             cmd = sched_cmd(NUM_PRIO);
>> +             cmd = sched_cmd();
>>
>>               if (cmd && cmd->s.type == CMD_PKTIO) {
>>                       if (sched_cb_pktin_poll(cmd->s.index, cmd->s.num_pktin,
>> @@ -565,11 +676,14 @@ static odp_schedule_group_t 
>> schedule_group_lookup(const char *name)
>>  static int schedule_group_join(odp_schedule_group_t group,
>>                              const odp_thrmask_t *thrmask)
>>  {
>> +     int thr;
>>       sched_group_t *sched_group = &sched_global.sched_group;
>>
>>       if (group < 0 || group >= NUM_GROUP)
>>               return -1;
>>
>> +     thr = odp_thrmask_first(thrmask);
>> +
>>       odp_ticketlock_lock(&sched_group->s.lock);
>>
>>       if (!sched_group->s.group[group].allocated) {
>> @@ -581,6 +695,11 @@ static int schedule_group_join(odp_schedule_group_t 
>> group,
>>                      &sched_group->s.group[group].mask,
>>                      thrmask);
>>
>> +     while (thr >= 0) {
>> +             add_group(sched_group, thr, group);
>> +             thr = odp_thrmask_next(thrmask, thr);
>> +     }
>> +
>>       odp_ticketlock_unlock(&sched_group->s.lock);
>>
>>       return 0;
>> @@ -589,6 +708,7 @@ static int schedule_group_join(odp_schedule_group_t 
>> group,
>>  static int schedule_group_leave(odp_schedule_group_t group,
>>                               const odp_thrmask_t *thrmask)
>>  {
>> +     int thr;
>>       sched_group_t *sched_group = &sched_global.sched_group;
>>       odp_thrmask_t *all = &sched_group->s.group[GROUP_ALL].mask;
>>       odp_thrmask_t not;
>> @@ -596,6 +716,8 @@ static int schedule_group_leave(odp_schedule_group_t 
>> group,
>>       if (group < 0 || group >= NUM_GROUP)
>>               return -1;
>>
>> +     thr = odp_thrmask_first(thrmask);
>> +
>>       odp_ticketlock_lock(&sched_group->s.lock);
>>
>>       if (!sched_group->s.group[group].allocated) {
>> @@ -608,6 +730,11 @@ static int schedule_group_leave(odp_schedule_group_t 
>> group,
>>                       &sched_group->s.group[group].mask,
>>                       &not);
>>
>> +     while (thr >= 0) {
>> +             remove_group(sched_group, thr, group);
>> +             thr = odp_thrmask_next(thrmask, thr);
>> +     }
>> +
>>       odp_ticketlock_unlock(&sched_group->s.lock);
>>
>>       return 0;
>>
>

Reply via email to