These changes add a new concept called the tm_system_group.  This new
entity collects together a set of tm_systems that all share some
common resources - eg. share the service thread.

Signed-off-by: Barry Spinney <[email protected]>
---
 .../include/odp_traffic_mngr_internal.h            |  34 ++-
 platform/linux-generic/odp_traffic_mngr.c          | 233 +++++++++++++++++++--
 2 files changed, 249 insertions(+), 18 deletions(-)

diff --git a/platform/linux-generic/include/odp_traffic_mngr_internal.h 
b/platform/linux-generic/include/odp_traffic_mngr_internal.h
index e4bc5ce..1b1218b 100644
--- a/platform/linux-generic/include/odp_traffic_mngr_internal.h
+++ b/platform/linux-generic/include/odp_traffic_mngr_internal.h
@@ -347,14 +347,25 @@ typedef struct {
        tm_tos_marking_t  ip_tos_marking[ODP_NUM_PACKET_COLORS];
 } tm_marking_t;
 
-typedef struct {
+typedef struct tm_system_s       tm_system_t;
+typedef struct tm_system_group_s tm_system_group_t;
+typedef        uint64_t          odp_tm_group_t;
+
+struct tm_system_s {
+       /* The previous and next tm_system in the same tm_system_group. These
+        * links form a circle and so to round robin amongst the tm_system's
+        * one just needs to continue to follow next. In the case where the
+        * tm_system_group only has one tm_system, the prev and next point to
+        * this single tm_system. */
+       tm_system_t    *prev;
+       tm_system_t    *next;
+       odp_tm_group_t  odp_tm_group;
+
        odp_ticketlock_t tm_system_lock;
        odp_barrier_t    tm_system_barrier;
        odp_barrier_t    tm_system_destroy_barrier;
        odp_atomic_u64_t destroying;
        _odp_int_name_t  name_tbl_id;
-       pthread_t        thread;
-       pthread_attr_t   attr;
 
        void               *trace_buffer;
        uint32_t            next_queue_num;
@@ -389,7 +400,22 @@ typedef struct {
        uint64_t shaper_green_cnt;
        uint64_t shaper_yellow_cnt;
        uint64_t shaper_red_cnt;
-} tm_system_t;
+};
+
+/* A tm_system_group is a set of 1 to N tm_systems that share some processing
+ * resources - like a bunch of service threads, input queue, timers, etc.
+ * Currently a tm_system_group only supports a single service thread - and
+ * while the input work queue is shared - timers are not. */
+
+struct tm_system_group_s {
+       tm_system_group_t *prev;
+       tm_system_group_t *next;
+
+       tm_system_t *first_tm_system;
+       uint32_t num_tm_systems;
+       pthread_t        thread;
+       pthread_attr_t   attr;
+};
 
 #ifdef __cplusplus
 }
diff --git a/platform/linux-generic/odp_traffic_mngr.c 
b/platform/linux-generic/odp_traffic_mngr.c
index e668bf9..fc48000 100644
--- a/platform/linux-generic/odp_traffic_mngr.c
+++ b/platform/linux-generic/odp_traffic_mngr.c
@@ -78,11 +78,14 @@ static dynamic_tbl_t 
odp_tm_profile_tbls[ODP_TM_NUM_PROFILES];
 /* TM systems table. */
 static tm_system_t *odp_tm_systems[ODP_TM_MAX_NUM_SYSTEMS];
 
+static tm_system_group_t *tm_group_list;
+
 static odp_ticketlock_t tm_create_lock;
 static odp_ticketlock_t tm_profile_lock;
 static odp_barrier_t tm_first_enq;
 
 static int g_main_thread_cpu = -1;
+static int g_tm_cpu_num;
 
 /* Forward function declarations. */
 static void tm_queue_cnts_decrement(tm_system_t *tm_system,
@@ -2312,6 +2315,7 @@ static void *tm_system_thread(void *arg)
 {
        _odp_timer_wheel_t _odp_int_timer_wheel;
        input_work_queue_t *input_work_queue;
+       tm_system_group_t  *tm_group;
        tm_system_t *tm_system;
        uint64_t current_ns;
        uint32_t destroying, work_queue_cnt, timer_cnt;
@@ -2319,7 +2323,9 @@ static void *tm_system_thread(void *arg)
 
        rc = odp_init_local(INSTANCE_ID, ODP_THREAD_WORKER);
        ODP_ASSERT(rc == 0);
-       tm_system = arg;
+       tm_group = arg;
+
+       tm_system = tm_group->first_tm_system;
        _odp_int_timer_wheel = tm_system->_odp_int_timer_wheel;
        input_work_queue = tm_system->input_work_queue;
 
@@ -2371,6 +2377,11 @@ static void *tm_system_thread(void *arg)
                tm_system->is_idle = (timer_cnt == 0) &&
                        (work_queue_cnt == 0);
                destroying = odp_atomic_load_u64(&tm_system->destroying);
+
+               /* Advance to the next tm_system in the tm_system_group. */
+               tm_system = tm_system->next;
+               _odp_int_timer_wheel = tm_system->_odp_int_timer_wheel;
+               input_work_queue = tm_system->input_work_queue;
        }
 
        odp_barrier_wait(&tm_system->tm_system_destroy_barrier);
@@ -2558,7 +2569,7 @@ static int affinitize_main_thread(void)
 static uint32_t tm_thread_cpu_select(void)
 {
        odp_cpumask_t odp_cpu_mask;
-       int           cpu_count;
+       int           cpu_count, cpu;
 
        odp_cpumask_default_worker(&odp_cpu_mask, 0);
        if ((g_main_thread_cpu != -1) &&
@@ -2576,30 +2587,224 @@ static uint32_t tm_thread_cpu_select(void)
                        odp_cpumask_all_available(&odp_cpu_mask);
        }
 
-       return odp_cpumask_first(&odp_cpu_mask);
+       if (g_tm_cpu_num == 0) {
+               cpu = odp_cpumask_first(&odp_cpu_mask);
+       } else {
+               cpu = odp_cpumask_next(&odp_cpu_mask, g_tm_cpu_num);
+               if (cpu == -1) {
+                       g_tm_cpu_num = 0;
+                       cpu = odp_cpumask_first(&odp_cpu_mask);
+               }
+       }
+
+       g_tm_cpu_num++;
+       return cpu;
 }
 
-static int tm_thread_create(tm_system_t *tm_system)
+static int tm_thread_create(tm_system_group_t *tm_group)
 {
        cpu_set_t      cpu_set;
        uint32_t       cpu_num;
        int            rc;
 
-       pthread_attr_init(&tm_system->attr);
+       pthread_attr_init(&tm_group->attr);
        cpu_num = tm_thread_cpu_select();
        CPU_ZERO(&cpu_set);
        CPU_SET(cpu_num, &cpu_set);
-       pthread_attr_setaffinity_np(&tm_system->attr, sizeof(cpu_set_t),
+       pthread_attr_setaffinity_np(&tm_group->attr, sizeof(cpu_set_t),
                                    &cpu_set);
 
-       rc = pthread_create(&tm_system->thread, &tm_system->attr,
-                           tm_system_thread, tm_system);
+       rc = pthread_create(&tm_group->thread, &tm_group->attr,
+                           tm_system_thread, tm_group);
        if (rc != 0)
                ODP_DBG("Failed to start thread on cpu num=%u\n", cpu_num);
 
        return rc;
 }
 
+static odp_tm_group_t odp_tm_group_create(const char *name ODP_UNUSED)
+{
+       tm_system_group_t *tm_group, *first_tm_group, *second_tm_group;
+
+       tm_group = malloc(sizeof(tm_system_group_t));
+       memset(tm_group, 0, sizeof(tm_system_group_t));
+
+       /* Add this group to the tm_group_list linked list. */
+       if (tm_group_list == NULL) {
+               tm_group_list  = tm_group;
+               tm_group->next = tm_group;
+               tm_group->prev = tm_group;
+       } else {
+               first_tm_group        = tm_group_list;
+               second_tm_group       = first_tm_group->next;
+               first_tm_group->next  = tm_group;
+               second_tm_group->prev = tm_group;
+               tm_group->next        = second_tm_group;
+               tm_group->prev        = first_tm_group;
+       }
+
+       return (odp_tm_group_t)tm_group;
+}
+
+static void odp_tm_group_destroy(odp_tm_group_t odp_tm_group)
+{
+       tm_system_group_t *tm_group, *prev_tm_group, *next_tm_group;
+       int                rc;
+
+       tm_group = (tm_system_group_t *)odp_tm_group;
+
+       /* Wait for the thread to exit. */
+       ODP_ASSERT(tm_group->num_tm_systems <= 1);
+       rc = pthread_join(tm_group->thread, NULL);
+       ODP_ASSERT(rc == 0);
+       pthread_attr_destroy(&tm_group->attr);
+       if (g_tm_cpu_num > 0)
+               g_tm_cpu_num--;
+
+       /* Remove this group from the tm_group_list linked list. Special case
+        * when this is the last tm_group in the linked list. */
+       prev_tm_group = tm_group->prev;
+       next_tm_group = tm_group->next;
+       if (prev_tm_group == next_tm_group) {
+               ODP_ASSERT(tm_group_list == tm_group);
+               tm_group_list = NULL;
+       } else {
+               prev_tm_group->next = next_tm_group;
+               next_tm_group->prev = prev_tm_group;
+               if (tm_group_list == tm_group)
+                       tm_group_list = next_tm_group;
+       }
+
+       tm_group->prev = NULL;
+       tm_group->next = NULL;
+       free(tm_group);
+}
+
+static int odp_tm_group_add(odp_tm_group_t odp_tm_group, odp_tm_t odp_tm)
+{
+       tm_system_group_t *tm_group;
+       tm_system_t       *tm_system, *first_tm_system, *second_tm_system;
+
+       tm_group  = (tm_system_group_t *)odp_tm_group;
+       tm_system = GET_TM_SYSTEM(odp_tm);
+       tm_group->num_tm_systems++;
+       tm_system->odp_tm_group = odp_tm_group;
+
+       /* Link this tm_system into the circular linked list of all tm_systems
+        * belonging to the same tm_group. */
+       if (tm_group->num_tm_systems == 1) {
+               tm_group->first_tm_system = tm_system;
+               tm_system->next           = tm_system;
+               tm_system->prev           = tm_system;
+       } else {
+               first_tm_system        = tm_group->first_tm_system;
+               second_tm_system       = first_tm_system->next;
+               first_tm_system->next  = tm_system;
+               second_tm_system->prev = tm_system;
+               tm_system->prev        = first_tm_system;
+               tm_system->next        = second_tm_system;
+               tm_group->first_tm_system = tm_system;
+       }
+
+       /* If this is the first tm_system associated with this group, then
+        * create the service thread and the input work queue. */
+       if (tm_group->num_tm_systems >= 2)
+               return 0;
+
+       affinitize_main_thread();
+       return tm_thread_create(tm_group);
+}
+
+static int odp_tm_group_remove(odp_tm_group_t odp_tm_group, odp_tm_t odp_tm)
+{
+       tm_system_group_t *tm_group;
+       tm_system_t       *tm_system, *prev_tm_system, *next_tm_system;
+
+       tm_group  = (tm_system_group_t *)odp_tm_group;
+       tm_system = GET_TM_SYSTEM(odp_tm);
+       if (tm_system->odp_tm_group != odp_tm_group)
+               return -1;
+
+       if ((tm_group->num_tm_systems  == 0) ||
+           (tm_group->first_tm_system == NULL))
+               return -1;
+
+       /* Remove this tm_system from the tm_group linked list. */
+       if (tm_group->first_tm_system == tm_system)
+               tm_group->first_tm_system = tm_system->next;
+
+       prev_tm_system       = tm_system->prev;
+       next_tm_system       = tm_system->next;
+       prev_tm_system->next = next_tm_system;
+       next_tm_system->prev = prev_tm_system;
+       tm_system->next      = NULL;
+       tm_system->prev      = NULL;
+       tm_group->num_tm_systems--;
+
+       /* If this is the last tm_system associated with this group then
+        * destroy the group (and thread etc). */
+       if (tm_group->num_tm_systems == 0)
+               odp_tm_group_destroy(odp_tm_group);
+
+       return 0;
+}
+
+static int tm_group_attach(odp_tm_t odp_tm)
+{
+       tm_system_group_t *tm_group, *min_tm_group;
+       odp_tm_group_t     odp_tm_group;
+       odp_cpumask_t      all_cpus, worker_cpus;
+       uint32_t           total_cpus, avail_cpus;
+
+       /* If this platform has a small number of cpu's then allocate one
+        * tm_group and assign all tm_system's to this tm_group.  Otherwise in
+        * the case of a manycore platform try to allocate one tm_group per
+        * tm_system, as long as there are still extra cpu's left.  If not
+        * enough cpu's left than allocate this tm_system to the next tm_group
+        * in a round robin fashion. */
+       odp_cpumask_all_available(&all_cpus);
+       odp_cpumask_default_worker(&worker_cpus, 0);
+       total_cpus = odp_cpumask_count(&all_cpus);
+       avail_cpus = odp_cpumask_count(&worker_cpus);
+
+       if (total_cpus < 24) {
+               tm_group     = tm_group_list;
+               odp_tm_group = (odp_tm_group_t)tm_group;
+               if (tm_group == NULL)
+                       odp_tm_group = odp_tm_group_create("");
+
+               odp_tm_group_add(odp_tm_group, odp_tm);
+               return 0;
+       }
+
+       /* Manycore case. */
+       if ((tm_group_list == NULL) || (avail_cpus > 1)) {
+               odp_tm_group = odp_tm_group_create("");
+               odp_tm_group_add(odp_tm_group, odp_tm);
+               return 0;
+       }
+
+       /* Pick a tm_group according to the smallest number of tm_systems. */
+       tm_group     = tm_group_list;
+       min_tm_group = NULL;
+       while (tm_group != NULL) {
+               if (min_tm_group == NULL)
+                       min_tm_group = tm_group;
+               else if (tm_group->num_tm_systems <
+                        min_tm_group->num_tm_systems)
+                       min_tm_group = tm_group;
+
+               tm_group = tm_group->next;
+       }
+
+       if (min_tm_group == NULL)
+               return -1;
+
+       odp_tm_group = (odp_tm_group_t)tm_group;
+       odp_tm_group_add(odp_tm_group, odp_tm);
+       return 0;
+}
+
 odp_tm_t odp_tm_create(const char            *name,
                       odp_tm_requirements_t *requirements,
                       odp_tm_egress_t       *egress)
@@ -2693,8 +2898,9 @@ odp_tm_t odp_tm_create(const char            *name,
        }
 
        if (create_fail == 0) {
+               /* Pass any odp_groups or hints to tm_group_attach here. */
                affinitize_main_thread();
-               rc = tm_thread_create(tm_system);
+               rc = tm_group_attach(odp_tm);
                create_fail |= rc < 0;
        }
 
@@ -2754,7 +2960,6 @@ int odp_tm_capability(odp_tm_t odp_tm, 
odp_tm_capabilities_t *capabilities)
 int odp_tm_destroy(odp_tm_t odp_tm)
 {
        tm_system_t *tm_system;
-       int rc;
 
        tm_system = GET_TM_SYSTEM(odp_tm);
 
@@ -2765,10 +2970,10 @@ int odp_tm_destroy(odp_tm_t odp_tm)
        odp_atomic_inc_u64(&tm_system->destroying);
        odp_barrier_wait(&tm_system->tm_system_destroy_barrier);
 
-       /* Next wait for the thread to exit. */
-       rc = pthread_join(tm_system->thread, NULL);
-       ODP_ASSERT(rc == 0);
-       pthread_attr_destroy(&tm_system->attr);
+       /* Remove ourselves from the group.  If we are the last tm_system in
+        * this group, odp_tm_group_remove will destroy any service threads
+        * allocated by this group. */
+       odp_tm_group_remove(tm_system->odp_tm_group, odp_tm);
 
        input_work_queue_destroy(tm_system->input_work_queue);
        _odp_sorted_pool_destroy(tm_system->_odp_int_sorted_pool);
-- 
2.1.2

Reply via email to