From: Xuelin Shi <forrest....@linaro.org> While the tm_group(thread) is waiting for enquing at barrier, it may receive destroy call. In this case, need a mechanism to exit. This patch adds a variant of odp_barrier_wait to enable it.
Signed-off-by: Xuelin Shi <forrest....@linaro.org> --- platform/linux-generic/odp_traffic_mngr.c | 93 ++++++++++++++++++++++++------- 1 file changed, 73 insertions(+), 20 deletions(-) diff --git a/platform/linux-generic/odp_traffic_mngr.c b/platform/linux-generic/odp_traffic_mngr.c index 309f237..9b61c50 100644 --- a/platform/linux-generic/odp_traffic_mngr.c +++ b/platform/linux-generic/odp_traffic_mngr.c @@ -91,6 +91,8 @@ static int g_main_thread_cpu = -1; static int g_tm_cpu_num; /* Forward function declarations. */ +static int _odp_tm_group_remove(_odp_tm_group_t odp_tm_group, odp_tm_t odp_tm); + static void tm_queue_cnts_decrement(tm_system_t *tm_system, tm_wred_node_t *tm_wred_node, uint32_t priority, @@ -2326,6 +2328,58 @@ static int thread_affinity_get(odp_cpumask_t *odp_cpu_mask) return 0; } +static tm_system_t *tm_system_find_run(tm_system_group_t *grp, + tm_system_t *tm_system) +{ + while (tm_system && odp_atomic_load_u64(&tm_system->destroying)) { + tm_system_t *odp_tm; + + odp_tm = tm_system->next; + odp_barrier_wait(&tm_system->tm_system_destroy_barrier); + _odp_tm_group_remove(MAKE_ODP_TM_SYSTEM_GROUP(grp), + MAKE_ODP_TM_HANDLE(tm_system)); + tm_system = odp_tm; + if (!grp->num_tm_systems) + tm_system = NULL; + } + + return tm_system; +} + +static bool odp_barrier_wait_stop(tm_system_group_t *grp, + bool (*stop)(tm_system_group_t *grp)) +{ + uint32_t count; + int wasless; + odp_barrier_t *barrier = &grp->tm_group_barrier; + + odp_mb_full(); + + count = odp_atomic_fetch_inc_u32(&barrier->bar); + wasless = count < barrier->count; + + if (count == 2 * barrier->count - 1) { + /* Wrap around *atomically* */ + odp_atomic_sub_u32(&barrier->bar, 2 * barrier->count); + } else { + while ((odp_atomic_load_u32(&barrier->bar) < barrier->count) + == wasless) + if (!stop(grp)) + odp_cpu_pause(); + else + return true; + } + + odp_mb_full(); + + return false; +} + +static bool try_stop_tm_group(tm_system_group_t *grp) +{ + return tm_system_find_run(grp, grp->first_tm_system) == NULL; +} + static void *tm_system_thread(void *arg) { _odp_timer_wheel_t _odp_int_timer_wheel; @@ -2333,7 +2387,7 @@ static void *tm_system_thread(void *arg) tm_system_group_t *tm_group; tm_system_t *tm_system; uint64_t current_ns; - uint32_t destroying, work_queue_cnt, timer_cnt; + uint32_t work_queue_cnt, timer_cnt; int rc; rc = odp_init_local((odp_instance_t)odp_global_data.main_pid, @@ -2341,20 +2395,28 @@ static void *tm_system_thread(void *arg) ODP_ASSERT(rc == 0); tm_group = arg; - tm_system = tm_group->first_tm_system; - _odp_int_timer_wheel = tm_system->_odp_int_timer_wheel; - input_work_queue = tm_system->input_work_queue; + /* Wait here until we have seen the first enqueue operation + or destroy. */ + if (odp_barrier_wait_stop(tm_group, try_stop_tm_group)) { + odp_term_local(); + return NULL; + } - /* Wait here until we have seen the first enqueue operation. */ - odp_barrier_wait(&tm_group->tm_group_barrier); main_loop_running = true; + tm_system = tm_system_find_run(tm_group, tm_group->first_tm_system); + if (!tm_system) { + odp_term_local(); + return NULL; + } - destroying = odp_atomic_load_u64(&tm_system->destroying); - + _odp_int_timer_wheel = tm_system->_odp_int_timer_wheel; current_ns = odp_time_to_ns(odp_time_local()); _odp_timer_wheel_start(_odp_int_timer_wheel, current_ns); - while (destroying == 0) { + while (tm_system) { + _odp_int_timer_wheel = tm_system->_odp_int_timer_wheel; + input_work_queue = tm_system->input_work_queue; + /* See if another thread wants to make a configuration * change. */ check_for_request(); @@ -2392,16 +2454,12 @@ static void *tm_system_thread(void *arg) tm_system->current_time = current_ns; tm_system->is_idle = (timer_cnt == 0) && (work_queue_cnt == 0); - destroying = odp_atomic_load_u64(&tm_system->destroying); /* Advance to the next tm_system in the tm_system_group. */ - tm_system = tm_system->next; - _odp_int_timer_wheel = tm_system->_odp_int_timer_wheel; - input_work_queue = tm_system->input_work_queue; + tm_system = tm_system_find_run(tm_group, tm_system->next); } - - odp_barrier_wait(&tm_system->tm_system_destroy_barrier); odp_term_local(); + return NULL; } @@ -2992,11 +3050,6 @@ int odp_tm_destroy(odp_tm_t odp_tm) odp_atomic_inc_u64(&tm_system->destroying); odp_barrier_wait(&tm_system->tm_system_destroy_barrier); - /* Remove ourselves from the group. If we are the last tm_system in - * this group, odp_tm_group_remove will destroy any service threads - * allocated by this group. */ - _odp_tm_group_remove(tm_system->odp_tm_group, odp_tm); - input_work_queue_destroy(tm_system->input_work_queue); _odp_sorted_pool_destroy(tm_system->_odp_int_sorted_pool); _odp_queue_pool_destroy(tm_system->_odp_int_queue_pool); -- 1.8.3.1