This applies and works fine for me, however I'd like to get Oriol's input
as to whether this solves his reported issues. In the meantime, some small
comments.

On Fri, Aug 5, 2016 at 1:53 PM, Barry Spinney <[email protected]> wrote:

> These changes add a new concept called the tm_system_group.  This new
> entity collects together a set of tm_systems that all share some
> common resources - eg. share the service thread.
>
> Signed-off-by: Barry Spinney <[email protected]>
> ---
>  .../include/odp_traffic_mngr_internal.h            |  34 ++-
>  platform/linux-generic/odp_traffic_mngr.c          | 233
> +++++++++++++++++++--
>  2 files changed, 249 insertions(+), 18 deletions(-)
>
> diff --git a/platform/linux-generic/include/odp_traffic_mngr_internal.h
> b/platform/linux-generic/include/odp_traffic_mngr_internal.h
> index e4bc5ce..1b1218b 100644
> --- a/platform/linux-generic/include/odp_traffic_mngr_internal.h
> +++ b/platform/linux-generic/include/odp_traffic_mngr_internal.h
> @@ -347,14 +347,25 @@ typedef struct {
>         tm_tos_marking_t  ip_tos_marking[ODP_NUM_PACKET_COLORS];
>  } tm_marking_t;
>
> -typedef struct {
> +typedef struct tm_system_s       tm_system_t;
> +typedef struct tm_system_group_s tm_system_group_t;
> +typedef        uint64_t          odp_tm_group_t;
>

I'm assuming the intent is to surface this as a new ODP type in Tiger Moth,
but for now I'd make this explicitly internal by changing the name to
_odp_tm_group_t to avoid confusion. We're reserving the odp_ prefix to
those symbols and APIs that are part of the published ODP API.


> +
> +struct tm_system_s {
> +       /* The previous and next tm_system in the same tm_system_group.
> These
> +        * links form a circle and so to round robin amongst the
> tm_system's
> +        * one just needs to continue to follow next. In the case where the
> +        * tm_system_group only has one tm_system, the prev and next point
> to
> +        * this single tm_system. */
> +       tm_system_t    *prev;
> +       tm_system_t    *next;
> +       odp_tm_group_t  odp_tm_group;
> +
>         odp_ticketlock_t tm_system_lock;
>         odp_barrier_t    tm_system_barrier;
>         odp_barrier_t    tm_system_destroy_barrier;
>         odp_atomic_u64_t destroying;
>         _odp_int_name_t  name_tbl_id;
> -       pthread_t        thread;
> -       pthread_attr_t   attr;
>
>         void               *trace_buffer;
>         uint32_t            next_queue_num;
> @@ -389,7 +400,22 @@ typedef struct {
>         uint64_t shaper_green_cnt;
>         uint64_t shaper_yellow_cnt;
>         uint64_t shaper_red_cnt;
> -} tm_system_t;
> +};
> +
> +/* A tm_system_group is a set of 1 to N tm_systems that share some
> processing
> + * resources - like a bunch of service threads, input queue, timers, etc.
> + * Currently a tm_system_group only supports a single service thread - and
> + * while the input work queue is shared - timers are not. */
> +
> +struct tm_system_group_s {
> +       tm_system_group_t *prev;
> +       tm_system_group_t *next;
> +
> +       tm_system_t *first_tm_system;
> +       uint32_t num_tm_systems;
> +       pthread_t        thread;
> +       pthread_attr_t   attr;
> +};
>
>  #ifdef __cplusplus
>  }
> diff --git a/platform/linux-generic/odp_traffic_mngr.c
> b/platform/linux-generic/odp_traffic_mngr.c
> index e668bf9..fc48000 100644
> --- a/platform/linux-generic/odp_traffic_mngr.c
> +++ b/platform/linux-generic/odp_traffic_mngr.c
> @@ -78,11 +78,14 @@ static dynamic_tbl_t odp_tm_profile_tbls[ODP_TM_
> NUM_PROFILES];
>  /* TM systems table. */
>  static tm_system_t *odp_tm_systems[ODP_TM_MAX_NUM_SYSTEMS];
>
> +static tm_system_group_t *tm_group_list;
> +
>  static odp_ticketlock_t tm_create_lock;
>  static odp_ticketlock_t tm_profile_lock;
>  static odp_barrier_t tm_first_enq;
>
>  static int g_main_thread_cpu = -1;
> +static int g_tm_cpu_num;
>
>  /* Forward function declarations. */
>  static void tm_queue_cnts_decrement(tm_system_t *tm_system,
> @@ -2312,6 +2315,7 @@ static void *tm_system_thread(void *arg)
>  {
>         _odp_timer_wheel_t _odp_int_timer_wheel;
>         input_work_queue_t *input_work_queue;
> +       tm_system_group_t  *tm_group;
>         tm_system_t *tm_system;
>         uint64_t current_ns;
>         uint32_t destroying, work_queue_cnt, timer_cnt;
> @@ -2319,7 +2323,9 @@ static void *tm_system_thread(void *arg)
>
>         rc = odp_init_local(INSTANCE_ID, ODP_THREAD_WORKER);
>         ODP_ASSERT(rc == 0);
> -       tm_system = arg;
> +       tm_group = arg;
> +
> +       tm_system = tm_group->first_tm_system;
>         _odp_int_timer_wheel = tm_system->_odp_int_timer_wheel;
>         input_work_queue = tm_system->input_work_queue;
>
> @@ -2371,6 +2377,11 @@ static void *tm_system_thread(void *arg)
>                 tm_system->is_idle = (timer_cnt == 0) &&
>                         (work_queue_cnt == 0);
>                 destroying = odp_atomic_load_u64(&tm_system->destroying);
> +
> +               /* Advance to the next tm_system in the tm_system_group. */
> +               tm_system = tm_system->next;
> +               _odp_int_timer_wheel = tm_system->_odp_int_timer_wheel;
> +               input_work_queue = tm_system->input_work_queue;
>         }
>
>         odp_barrier_wait(&tm_system->tm_system_destroy_barrier);
> @@ -2558,7 +2569,7 @@ static int affinitize_main_thread(void)
>  static uint32_t tm_thread_cpu_select(void)
>  {
>         odp_cpumask_t odp_cpu_mask;
> -       int           cpu_count;
> +       int           cpu_count, cpu;
>
>         odp_cpumask_default_worker(&odp_cpu_mask, 0);
>         if ((g_main_thread_cpu != -1) &&
> @@ -2576,30 +2587,224 @@ static uint32_t tm_thread_cpu_select(void)
>                         odp_cpumask_all_available(&odp_cpu_mask);
>         }
>
> -       return odp_cpumask_first(&odp_cpu_mask);
> +       if (g_tm_cpu_num == 0) {
> +               cpu = odp_cpumask_first(&odp_cpu_mask);
> +       } else {
> +               cpu = odp_cpumask_next(&odp_cpu_mask, g_tm_cpu_num);
> +               if (cpu == -1) {
> +                       g_tm_cpu_num = 0;
> +                       cpu = odp_cpumask_first(&odp_cpu_mask);
> +               }
> +       }
> +
> +       g_tm_cpu_num++;
> +       return cpu;
>  }
>
> -static int tm_thread_create(tm_system_t *tm_system)
> +static int tm_thread_create(tm_system_group_t *tm_group)
>  {
>         cpu_set_t      cpu_set;
>         uint32_t       cpu_num;
>         int            rc;
>
> -       pthread_attr_init(&tm_system->attr);
> +       pthread_attr_init(&tm_group->attr);
>         cpu_num = tm_thread_cpu_select();
>         CPU_ZERO(&cpu_set);
>         CPU_SET(cpu_num, &cpu_set);
> -       pthread_attr_setaffinity_np(&tm_system->attr, sizeof(cpu_set_t),
> +       pthread_attr_setaffinity_np(&tm_group->attr, sizeof(cpu_set_t),
>                                     &cpu_set);
>
> -       rc = pthread_create(&tm_system->thread, &tm_system->attr,
> -                           tm_system_thread, tm_system);
> +       rc = pthread_create(&tm_group->thread, &tm_group->attr,
> +                           tm_system_thread, tm_group);
>         if (rc != 0)
>                 ODP_DBG("Failed to start thread on cpu num=%u\n", cpu_num);
>
>         return rc;
>  }
>
> +static odp_tm_group_t odp_tm_group_create(const char *name ODP_UNUSED)
>

Same comment here about using _odp_ instead of odp_. Making this static
keeps it private, but it should still be:

static _odp_tm_group_t _odp_tm_group_create(...).  Same comment for all of
the new tm_group APIs below.

We can then provide a delta patch to promote these to new ODP APIs in an
API-NEXT follow-on to this patch that can be applied there for use in Tiger
Moth development. That delta patch would also include the appropriate
updates to include/odp/api/spec/traffic_mngr.h, etc.


> +{
> +       tm_system_group_t *tm_group, *first_tm_group, *second_tm_group;
> +
> +       tm_group = malloc(sizeof(tm_system_group_t));
> +       memset(tm_group, 0, sizeof(tm_system_group_t));
> +
> +       /* Add this group to the tm_group_list linked list. */
> +       if (tm_group_list == NULL) {
> +               tm_group_list  = tm_group;
> +               tm_group->next = tm_group;
> +               tm_group->prev = tm_group;
> +       } else {
> +               first_tm_group        = tm_group_list;
> +               second_tm_group       = first_tm_group->next;
> +               first_tm_group->next  = tm_group;
> +               second_tm_group->prev = tm_group;
> +               tm_group->next        = second_tm_group;
> +               tm_group->prev        = first_tm_group;
> +       }
> +
> +       return (odp_tm_group_t)tm_group;
> +}
> +
> +static void odp_tm_group_destroy(odp_tm_group_t odp_tm_group)
> +{
> +       tm_system_group_t *tm_group, *prev_tm_group, *next_tm_group;
> +       int                rc;
> +
> +       tm_group = (tm_system_group_t *)odp_tm_group;
> +
> +       /* Wait for the thread to exit. */
> +       ODP_ASSERT(tm_group->num_tm_systems <= 1);
> +       rc = pthread_join(tm_group->thread, NULL);
> +       ODP_ASSERT(rc == 0);
> +       pthread_attr_destroy(&tm_group->attr);
> +       if (g_tm_cpu_num > 0)
> +               g_tm_cpu_num--;
> +
> +       /* Remove this group from the tm_group_list linked list. Special
> case
> +        * when this is the last tm_group in the linked list. */
> +       prev_tm_group = tm_group->prev;
> +       next_tm_group = tm_group->next;
> +       if (prev_tm_group == next_tm_group) {
> +               ODP_ASSERT(tm_group_list == tm_group);
> +               tm_group_list = NULL;
> +       } else {
> +               prev_tm_group->next = next_tm_group;
> +               next_tm_group->prev = prev_tm_group;
> +               if (tm_group_list == tm_group)
> +                       tm_group_list = next_tm_group;
> +       }
> +
> +       tm_group->prev = NULL;
> +       tm_group->next = NULL;
> +       free(tm_group);
> +}
> +
> +static int odp_tm_group_add(odp_tm_group_t odp_tm_group, odp_tm_t odp_tm)
> +{
> +       tm_system_group_t *tm_group;
> +       tm_system_t       *tm_system, *first_tm_system, *second_tm_system;
> +
> +       tm_group  = (tm_system_group_t *)odp_tm_group;
> +       tm_system = GET_TM_SYSTEM(odp_tm);
> +       tm_group->num_tm_systems++;
> +       tm_system->odp_tm_group = odp_tm_group;
> +
> +       /* Link this tm_system into the circular linked list of all
> tm_systems
> +        * belonging to the same tm_group. */
> +       if (tm_group->num_tm_systems == 1) {
> +               tm_group->first_tm_system = tm_system;
> +               tm_system->next           = tm_system;
> +               tm_system->prev           = tm_system;
> +       } else {
> +               first_tm_system        = tm_group->first_tm_system;
> +               second_tm_system       = first_tm_system->next;
> +               first_tm_system->next  = tm_system;
> +               second_tm_system->prev = tm_system;
> +               tm_system->prev        = first_tm_system;
> +               tm_system->next        = second_tm_system;
> +               tm_group->first_tm_system = tm_system;
> +       }
> +
> +       /* If this is the first tm_system associated with this group, then
> +        * create the service thread and the input work queue. */
> +       if (tm_group->num_tm_systems >= 2)
> +               return 0;
> +
> +       affinitize_main_thread();
> +       return tm_thread_create(tm_group);
> +}
> +
> +static int odp_tm_group_remove(odp_tm_group_t odp_tm_group, odp_tm_t
> odp_tm)
> +{
> +       tm_system_group_t *tm_group;
> +       tm_system_t       *tm_system, *prev_tm_system, *next_tm_system;
> +
> +       tm_group  = (tm_system_group_t *)odp_tm_group;
> +       tm_system = GET_TM_SYSTEM(odp_tm);
> +       if (tm_system->odp_tm_group != odp_tm_group)
> +               return -1;
> +
> +       if ((tm_group->num_tm_systems  == 0) ||
> +           (tm_group->first_tm_system == NULL))
> +               return -1;
> +
> +       /* Remove this tm_system from the tm_group linked list. */
> +       if (tm_group->first_tm_system == tm_system)
> +               tm_group->first_tm_system = tm_system->next;
> +
> +       prev_tm_system       = tm_system->prev;
> +       next_tm_system       = tm_system->next;
> +       prev_tm_system->next = next_tm_system;
> +       next_tm_system->prev = prev_tm_system;
> +       tm_system->next      = NULL;
> +       tm_system->prev      = NULL;
> +       tm_group->num_tm_systems--;
> +
> +       /* If this is the last tm_system associated with this group then
> +        * destroy the group (and thread etc). */
> +       if (tm_group->num_tm_systems == 0)
> +               odp_tm_group_destroy(odp_tm_group);
> +
> +       return 0;
> +}
> +
> +static int tm_group_attach(odp_tm_t odp_tm)
> +{
> +       tm_system_group_t *tm_group, *min_tm_group;
> +       odp_tm_group_t     odp_tm_group;
> +       odp_cpumask_t      all_cpus, worker_cpus;
> +       uint32_t           total_cpus, avail_cpus;
> +
> +       /* If this platform has a small number of cpu's then allocate one
> +        * tm_group and assign all tm_system's to this tm_group.
> Otherwise in
> +        * the case of a manycore platform try to allocate one tm_group per
> +        * tm_system, as long as there are still extra cpu's left.  If not
> +        * enough cpu's left than allocate this tm_system to the next
> tm_group
> +        * in a round robin fashion. */
> +       odp_cpumask_all_available(&all_cpus);
> +       odp_cpumask_default_worker(&worker_cpus, 0);
> +       total_cpus = odp_cpumask_count(&all_cpus);
> +       avail_cpus = odp_cpumask_count(&worker_cpus);
> +
> +       if (total_cpus < 24) {
> +               tm_group     = tm_group_list;
> +               odp_tm_group = (odp_tm_group_t)tm_group;
> +               if (tm_group == NULL)
> +                       odp_tm_group = odp_tm_group_create("");
> +
> +               odp_tm_group_add(odp_tm_group, odp_tm);
> +               return 0;
> +       }
> +
> +       /* Manycore case. */
> +       if ((tm_group_list == NULL) || (avail_cpus > 1)) {
> +               odp_tm_group = odp_tm_group_create("");
> +               odp_tm_group_add(odp_tm_group, odp_tm);
> +               return 0;
> +       }
> +
> +       /* Pick a tm_group according to the smallest number of tm_systems.
> */
> +       tm_group     = tm_group_list;
> +       min_tm_group = NULL;
> +       while (tm_group != NULL) {
> +               if (min_tm_group == NULL)
> +                       min_tm_group = tm_group;
> +               else if (tm_group->num_tm_systems <
> +                        min_tm_group->num_tm_systems)
> +                       min_tm_group = tm_group;
> +
> +               tm_group = tm_group->next;
> +       }
> +
> +       if (min_tm_group == NULL)
> +               return -1;
> +
> +       odp_tm_group = (odp_tm_group_t)tm_group;
> +       odp_tm_group_add(odp_tm_group, odp_tm);
> +       return 0;
> +}
> +
>  odp_tm_t odp_tm_create(const char            *name,
>                        odp_tm_requirements_t *requirements,
>                        odp_tm_egress_t       *egress)
> @@ -2693,8 +2898,9 @@ odp_tm_t odp_tm_create(const char            *name,
>         }
>
>         if (create_fail == 0) {
> +               /* Pass any odp_groups or hints to tm_group_attach here. */
>                 affinitize_main_thread();
> -               rc = tm_thread_create(tm_system);
> +               rc = tm_group_attach(odp_tm);
>                 create_fail |= rc < 0;
>         }
>
> @@ -2754,7 +2960,6 @@ int odp_tm_capability(odp_tm_t odp_tm,
> odp_tm_capabilities_t *capabilities)
>  int odp_tm_destroy(odp_tm_t odp_tm)
>  {
>         tm_system_t *tm_system;
> -       int rc;
>
>         tm_system = GET_TM_SYSTEM(odp_tm);
>
> @@ -2765,10 +2970,10 @@ int odp_tm_destroy(odp_tm_t odp_tm)
>         odp_atomic_inc_u64(&tm_system->destroying);
>         odp_barrier_wait(&tm_system->tm_system_destroy_barrier);
>
> -       /* Next wait for the thread to exit. */
> -       rc = pthread_join(tm_system->thread, NULL);
> -       ODP_ASSERT(rc == 0);
> -       pthread_attr_destroy(&tm_system->attr);
> +       /* Remove ourselves from the group.  If we are the last tm_system
> in
> +        * this group, odp_tm_group_remove will destroy any service threads
> +        * allocated by this group. */
> +       odp_tm_group_remove(tm_system->odp_tm_group, odp_tm);
>
>         input_work_queue_destroy(tm_system->input_work_queue);
>         _odp_sorted_pool_destroy(tm_system->_odp_int_sorted_pool);
> --
> 2.1.2
>
>

Reply via email to