These changes add a new concept called the tm_system_group. This new entity collects together a set of tm_systems that all share some common resources - eg. share the service thread.
Signed-off-by: Barry Spinney <[email protected]> --- .../include/odp_traffic_mngr_internal.h | 34 ++- platform/linux-generic/odp_traffic_mngr.c | 233 +++++++++++++++++++-- 2 files changed, 249 insertions(+), 18 deletions(-) diff --git a/platform/linux-generic/include/odp_traffic_mngr_internal.h b/platform/linux-generic/include/odp_traffic_mngr_internal.h index e4bc5ce..1b1218b 100644 --- a/platform/linux-generic/include/odp_traffic_mngr_internal.h +++ b/platform/linux-generic/include/odp_traffic_mngr_internal.h @@ -347,14 +347,25 @@ typedef struct { tm_tos_marking_t ip_tos_marking[ODP_NUM_PACKET_COLORS]; } tm_marking_t; -typedef struct { +typedef struct tm_system_s tm_system_t; +typedef struct tm_system_group_s tm_system_group_t; +typedef uint64_t odp_tm_group_t; + +struct tm_system_s { + /* The previous and next tm_system in the same tm_system_group. These + * links form a circle and so to round robin amongst the tm_system's + * one just needs to continue to follow next. In the case where the + * tm_system_group only has one tm_system, the prev and next point to + * this single tm_system. */ + tm_system_t *prev; + tm_system_t *next; + odp_tm_group_t odp_tm_group; + odp_ticketlock_t tm_system_lock; odp_barrier_t tm_system_barrier; odp_barrier_t tm_system_destroy_barrier; odp_atomic_u64_t destroying; _odp_int_name_t name_tbl_id; - pthread_t thread; - pthread_attr_t attr; void *trace_buffer; uint32_t next_queue_num; @@ -389,7 +400,22 @@ typedef struct { uint64_t shaper_green_cnt; uint64_t shaper_yellow_cnt; uint64_t shaper_red_cnt; -} tm_system_t; +}; + +/* A tm_system_group is a set of 1 to N tm_systems that share some processing + * resources - like a bunch of service threads, input queue, timers, etc. + * Currently a tm_system_group only supports a single service thread - and + * while the input work queue is shared - timers are not. */ + +struct tm_system_group_s { + tm_system_group_t *prev; + tm_system_group_t *next; + + tm_system_t *first_tm_system; + uint32_t num_tm_systems; + pthread_t thread; + pthread_attr_t attr; +}; #ifdef __cplusplus } diff --git a/platform/linux-generic/odp_traffic_mngr.c b/platform/linux-generic/odp_traffic_mngr.c index e668bf9..fc48000 100644 --- a/platform/linux-generic/odp_traffic_mngr.c +++ b/platform/linux-generic/odp_traffic_mngr.c @@ -78,11 +78,14 @@ static dynamic_tbl_t odp_tm_profile_tbls[ODP_TM_NUM_PROFILES]; /* TM systems table. */ static tm_system_t *odp_tm_systems[ODP_TM_MAX_NUM_SYSTEMS]; +static tm_system_group_t *tm_group_list; + static odp_ticketlock_t tm_create_lock; static odp_ticketlock_t tm_profile_lock; static odp_barrier_t tm_first_enq; static int g_main_thread_cpu = -1; +static int g_tm_cpu_num; /* Forward function declarations. */ static void tm_queue_cnts_decrement(tm_system_t *tm_system, @@ -2312,6 +2315,7 @@ static void *tm_system_thread(void *arg) { _odp_timer_wheel_t _odp_int_timer_wheel; input_work_queue_t *input_work_queue; + tm_system_group_t *tm_group; tm_system_t *tm_system; uint64_t current_ns; uint32_t destroying, work_queue_cnt, timer_cnt; @@ -2319,7 +2323,9 @@ static void *tm_system_thread(void *arg) rc = odp_init_local(INSTANCE_ID, ODP_THREAD_WORKER); ODP_ASSERT(rc == 0); - tm_system = arg; + tm_group = arg; + + tm_system = tm_group->first_tm_system; _odp_int_timer_wheel = tm_system->_odp_int_timer_wheel; input_work_queue = tm_system->input_work_queue; @@ -2371,6 +2377,11 @@ static void *tm_system_thread(void *arg) tm_system->is_idle = (timer_cnt == 0) && (work_queue_cnt == 0); destroying = odp_atomic_load_u64(&tm_system->destroying); + + /* Advance to the next tm_system in the tm_system_group. */ + tm_system = tm_system->next; + _odp_int_timer_wheel = tm_system->_odp_int_timer_wheel; + input_work_queue = tm_system->input_work_queue; } odp_barrier_wait(&tm_system->tm_system_destroy_barrier); @@ -2558,7 +2569,7 @@ static int affinitize_main_thread(void) static uint32_t tm_thread_cpu_select(void) { odp_cpumask_t odp_cpu_mask; - int cpu_count; + int cpu_count, cpu; odp_cpumask_default_worker(&odp_cpu_mask, 0); if ((g_main_thread_cpu != -1) && @@ -2576,30 +2587,224 @@ static uint32_t tm_thread_cpu_select(void) odp_cpumask_all_available(&odp_cpu_mask); } - return odp_cpumask_first(&odp_cpu_mask); + if (g_tm_cpu_num == 0) { + cpu = odp_cpumask_first(&odp_cpu_mask); + } else { + cpu = odp_cpumask_next(&odp_cpu_mask, g_tm_cpu_num); + if (cpu == -1) { + g_tm_cpu_num = 0; + cpu = odp_cpumask_first(&odp_cpu_mask); + } + } + + g_tm_cpu_num++; + return cpu; } -static int tm_thread_create(tm_system_t *tm_system) +static int tm_thread_create(tm_system_group_t *tm_group) { cpu_set_t cpu_set; uint32_t cpu_num; int rc; - pthread_attr_init(&tm_system->attr); + pthread_attr_init(&tm_group->attr); cpu_num = tm_thread_cpu_select(); CPU_ZERO(&cpu_set); CPU_SET(cpu_num, &cpu_set); - pthread_attr_setaffinity_np(&tm_system->attr, sizeof(cpu_set_t), + pthread_attr_setaffinity_np(&tm_group->attr, sizeof(cpu_set_t), &cpu_set); - rc = pthread_create(&tm_system->thread, &tm_system->attr, - tm_system_thread, tm_system); + rc = pthread_create(&tm_group->thread, &tm_group->attr, + tm_system_thread, tm_group); if (rc != 0) ODP_DBG("Failed to start thread on cpu num=%u\n", cpu_num); return rc; } +static odp_tm_group_t odp_tm_group_create(const char *name ODP_UNUSED) +{ + tm_system_group_t *tm_group, *first_tm_group, *second_tm_group; + + tm_group = malloc(sizeof(tm_system_group_t)); + memset(tm_group, 0, sizeof(tm_system_group_t)); + + /* Add this group to the tm_group_list linked list. */ + if (tm_group_list == NULL) { + tm_group_list = tm_group; + tm_group->next = tm_group; + tm_group->prev = tm_group; + } else { + first_tm_group = tm_group_list; + second_tm_group = first_tm_group->next; + first_tm_group->next = tm_group; + second_tm_group->prev = tm_group; + tm_group->next = second_tm_group; + tm_group->prev = first_tm_group; + } + + return (odp_tm_group_t)tm_group; +} + +static void odp_tm_group_destroy(odp_tm_group_t odp_tm_group) +{ + tm_system_group_t *tm_group, *prev_tm_group, *next_tm_group; + int rc; + + tm_group = (tm_system_group_t *)odp_tm_group; + + /* Wait for the thread to exit. */ + ODP_ASSERT(tm_group->num_tm_systems <= 1); + rc = pthread_join(tm_group->thread, NULL); + ODP_ASSERT(rc == 0); + pthread_attr_destroy(&tm_group->attr); + if (g_tm_cpu_num > 0) + g_tm_cpu_num--; + + /* Remove this group from the tm_group_list linked list. Special case + * when this is the last tm_group in the linked list. */ + prev_tm_group = tm_group->prev; + next_tm_group = tm_group->next; + if (prev_tm_group == next_tm_group) { + ODP_ASSERT(tm_group_list == tm_group); + tm_group_list = NULL; + } else { + prev_tm_group->next = next_tm_group; + next_tm_group->prev = prev_tm_group; + if (tm_group_list == tm_group) + tm_group_list = next_tm_group; + } + + tm_group->prev = NULL; + tm_group->next = NULL; + free(tm_group); +} + +static int odp_tm_group_add(odp_tm_group_t odp_tm_group, odp_tm_t odp_tm) +{ + tm_system_group_t *tm_group; + tm_system_t *tm_system, *first_tm_system, *second_tm_system; + + tm_group = (tm_system_group_t *)odp_tm_group; + tm_system = GET_TM_SYSTEM(odp_tm); + tm_group->num_tm_systems++; + tm_system->odp_tm_group = odp_tm_group; + + /* Link this tm_system into the circular linked list of all tm_systems + * belonging to the same tm_group. */ + if (tm_group->num_tm_systems == 1) { + tm_group->first_tm_system = tm_system; + tm_system->next = tm_system; + tm_system->prev = tm_system; + } else { + first_tm_system = tm_group->first_tm_system; + second_tm_system = first_tm_system->next; + first_tm_system->next = tm_system; + second_tm_system->prev = tm_system; + tm_system->prev = first_tm_system; + tm_system->next = second_tm_system; + tm_group->first_tm_system = tm_system; + } + + /* If this is the first tm_system associated with this group, then + * create the service thread and the input work queue. */ + if (tm_group->num_tm_systems >= 2) + return 0; + + affinitize_main_thread(); + return tm_thread_create(tm_group); +} + +static int odp_tm_group_remove(odp_tm_group_t odp_tm_group, odp_tm_t odp_tm) +{ + tm_system_group_t *tm_group; + tm_system_t *tm_system, *prev_tm_system, *next_tm_system; + + tm_group = (tm_system_group_t *)odp_tm_group; + tm_system = GET_TM_SYSTEM(odp_tm); + if (tm_system->odp_tm_group != odp_tm_group) + return -1; + + if ((tm_group->num_tm_systems == 0) || + (tm_group->first_tm_system == NULL)) + return -1; + + /* Remove this tm_system from the tm_group linked list. */ + if (tm_group->first_tm_system == tm_system) + tm_group->first_tm_system = tm_system->next; + + prev_tm_system = tm_system->prev; + next_tm_system = tm_system->next; + prev_tm_system->next = next_tm_system; + next_tm_system->prev = prev_tm_system; + tm_system->next = NULL; + tm_system->prev = NULL; + tm_group->num_tm_systems--; + + /* If this is the last tm_system associated with this group then + * destroy the group (and thread etc). */ + if (tm_group->num_tm_systems == 0) + odp_tm_group_destroy(odp_tm_group); + + return 0; +} + +static int tm_group_attach(odp_tm_t odp_tm) +{ + tm_system_group_t *tm_group, *min_tm_group; + odp_tm_group_t odp_tm_group; + odp_cpumask_t all_cpus, worker_cpus; + uint32_t total_cpus, avail_cpus; + + /* If this platform has a small number of cpu's then allocate one + * tm_group and assign all tm_system's to this tm_group. Otherwise in + * the case of a manycore platform try to allocate one tm_group per + * tm_system, as long as there are still extra cpu's left. If not + * enough cpu's left than allocate this tm_system to the next tm_group + * in a round robin fashion. */ + odp_cpumask_all_available(&all_cpus); + odp_cpumask_default_worker(&worker_cpus, 0); + total_cpus = odp_cpumask_count(&all_cpus); + avail_cpus = odp_cpumask_count(&worker_cpus); + + if (total_cpus < 24) { + tm_group = tm_group_list; + odp_tm_group = (odp_tm_group_t)tm_group; + if (tm_group == NULL) + odp_tm_group = odp_tm_group_create(""); + + odp_tm_group_add(odp_tm_group, odp_tm); + return 0; + } + + /* Manycore case. */ + if ((tm_group_list == NULL) || (avail_cpus > 1)) { + odp_tm_group = odp_tm_group_create(""); + odp_tm_group_add(odp_tm_group, odp_tm); + return 0; + } + + /* Pick a tm_group according to the smallest number of tm_systems. */ + tm_group = tm_group_list; + min_tm_group = NULL; + while (tm_group != NULL) { + if (min_tm_group == NULL) + min_tm_group = tm_group; + else if (tm_group->num_tm_systems < + min_tm_group->num_tm_systems) + min_tm_group = tm_group; + + tm_group = tm_group->next; + } + + if (min_tm_group == NULL) + return -1; + + odp_tm_group = (odp_tm_group_t)tm_group; + odp_tm_group_add(odp_tm_group, odp_tm); + return 0; +} + odp_tm_t odp_tm_create(const char *name, odp_tm_requirements_t *requirements, odp_tm_egress_t *egress) @@ -2693,8 +2898,9 @@ odp_tm_t odp_tm_create(const char *name, } if (create_fail == 0) { + /* Pass any odp_groups or hints to tm_group_attach here. */ affinitize_main_thread(); - rc = tm_thread_create(tm_system); + rc = tm_group_attach(odp_tm); create_fail |= rc < 0; } @@ -2754,7 +2960,6 @@ int odp_tm_capability(odp_tm_t odp_tm, odp_tm_capabilities_t *capabilities) int odp_tm_destroy(odp_tm_t odp_tm) { tm_system_t *tm_system; - int rc; tm_system = GET_TM_SYSTEM(odp_tm); @@ -2765,10 +2970,10 @@ int odp_tm_destroy(odp_tm_t odp_tm) odp_atomic_inc_u64(&tm_system->destroying); odp_barrier_wait(&tm_system->tm_system_destroy_barrier); - /* Next wait for the thread to exit. */ - rc = pthread_join(tm_system->thread, NULL); - ODP_ASSERT(rc == 0); - pthread_attr_destroy(&tm_system->attr); + /* Remove ourselves from the group. If we are the last tm_system in + * this group, odp_tm_group_remove will destroy any service threads + * allocated by this group. */ + odp_tm_group_remove(tm_system->odp_tm_group, odp_tm); input_work_queue_destroy(tm_system->input_work_queue); _odp_sorted_pool_destroy(tm_system->_odp_int_sorted_pool); -- 2.1.2
