On 5/18/22 14:59, Dumitru Ceara wrote: > On 5/18/22 12:07, Xavier Simonart wrote: >> This patch is intended to change the way to enable northd lflow build >> parallelization, as well as enable runtime change of number of threads. >> Before this patch, the following was needed to use parallelization: >> - enable parallelization through use_parallel_build in NBDB >> - use --dummy-numa to select number of threads. >> This second part was needed as otherwise as many threads as cores in the >> system >> were used, while parallelization showed some performance improvement only >> until >> using around 4 (or maybe 8) threads. >> >> With this patch, the number of threads used for lflow parallel build can be >> specified either: >> - at startup, using --n-threads=<N> as ovn-northd command line option >> - using unixctl >> If the number of threads specified is > 1, then parallelization is enabled. >> If the number is 1, parallelization is disabled. >> If the number is < 1, parallelization is disabled at startup and a warning >> is logged. >> If the number is > 256, parallelization is enabled (with 256 threads) and >> a warning is logged. >> >> The following unixctl have been added: >> - set-n-threads <N>: set the number of treads used. >> - get-n-threads: returns the number of threads used >> If the number of threads is within <2-256> bounds, parallelization is >> enabled. >> If the number of thread is 1, parallelization is disabled. >> Otherwise an error is thrown. >> >> Note that, if set-n-threads failed for any reason (e.g. failure to setup some >> semaphore), parallelization is disabled, and get-n-thread will return 1. >> >> Reported-at: https://bugzilla.redhat.com/show_bug.cgi?id=2078552 >> Signed-off-by: Xavier Simonart <[email protected]> >> >> --- >> v2: - handled Dumitru's comments >> - added missing mutex_destroy >> - fixed issue when use_logical_dp_group is enabled after northd startup >> - rebased on top of main >> v3: >> - fix mutex_destroy issue >> v4: >> - handled Mark's comments >> - rebased on top of main >> --- >> NEWS | 7 + >> lib/ovn-parallel-hmap.c | 291 +++++++++++++++++++++----------------- >> lib/ovn-parallel-hmap.h | 30 ++-- >> northd/northd.c | 176 ++++++++++++----------- >> northd/northd.h | 1 + >> northd/ovn-northd-ddlog.c | 6 - >> northd/ovn-northd.8.xml | 70 +++++---- >> northd/ovn-northd.c | 68 ++++++++- >> tests/ovn-macros.at | 59 ++------ >> tests/ovn-northd.at | 109 ++++++++++++++ >> 10 files changed, 495 insertions(+), 322 deletions(-) >> > > Hi Xavier, > > Sorry, I should've mentioned this earlier but I forgot. Can you please > also add an option to ovn-ctl to allow specifiying the number of threads? > > Something along the lines of: > > ovn-ctl start_northd .. --ovn-northd-n-threads=42 > > This would allow CMSs that use ovn-ctl [0] to enable parallelization. > For ovn-org/ovn-kubernetes in particular, we could even make it such > that in the ovn-org/ovn repo the ovn-kubernetes CI run we do runs both > with and without northd parallelization. > > Overall the code looks good but given that we probably also want the > ovn-ctl change I also left a few very small style comments below. > > Thanks, > Dumitru
[0] https://github.com/ovn-org/ovn-kubernetes/blob/4d79168271dcb22e8f2a580f5e75e5e38232224b/dist/images/ovnkube.sh#L857 > >> diff --git a/NEWS b/NEWS >> index 244824e3f..6e489df32 100644 >> --- a/NEWS >> +++ b/NEWS >> @@ -9,6 +9,13 @@ Post v22.03.0 >> implicit drop behavior on logical switches with ACLs applied. >> - Support (LSP.options:qos_min_rate) to guarantee minimal bandwidth >> available >> for a logical port. >> + - Changed the way to enable northd parallelization. >> + Removed support for: >> + - use_parallel_build in NBDB. >> + - --dummy-numa in northd cmdline. >> + Added support for: >> + - --n-threads=<N> in northd cmdline. >> + - set-n-threads/get-n-threads unixctls. >> >> OVN v22.03.0 - 11 Mar 2022 >> -------------------------- >> diff --git a/lib/ovn-parallel-hmap.c b/lib/ovn-parallel-hmap.c >> index 7edc4c0b6..c4d8cee16 100644 >> --- a/lib/ovn-parallel-hmap.c >> +++ b/lib/ovn-parallel-hmap.c >> @@ -38,14 +38,10 @@ VLOG_DEFINE_THIS_MODULE(ovn_parallel_hmap); >> >> #ifndef OVS_HAS_PARALLEL_HMAP >> >> -#define WORKER_SEM_NAME "%x-%p-%x" >> +#define WORKER_SEM_NAME "%x-%p-%"PRIxSIZE >> #define MAIN_SEM_NAME "%x-%p-main" >> >> -/* These are accessed under mutex inside add_worker_pool(). >> - * They do not need to be atomic. >> - */ >> static atomic_bool initial_pool_setup = ATOMIC_VAR_INIT(false); >> -static bool can_parallelize = false; >> >> /* This is set only in the process of exit and the set is >> * accompanied by a fence. It does not need to be atomic or be >> @@ -57,18 +53,18 @@ static struct ovs_list worker_pools = >> OVS_LIST_INITIALIZER(&worker_pools); >> >> static struct ovs_mutex init_mutex = OVS_MUTEX_INITIALIZER; >> >> -static int pool_size; >> +static size_t pool_size = 1; >> >> static int sembase; >> >> static void worker_pool_hook(void *aux OVS_UNUSED); >> -static void setup_worker_pools(bool force); >> +static void setup_worker_pools(void); >> static void merge_list_results(struct worker_pool *pool OVS_UNUSED, >> void *fin_result, void *result_frags, >> - int index); >> + size_t index); >> static void merge_hash_results(struct worker_pool *pool OVS_UNUSED, >> void *fin_result, void *result_frags, >> - int index); >> + size_t index); >> >> bool >> ovn_stop_parallel_processing(void) >> @@ -76,107 +72,184 @@ ovn_stop_parallel_processing(void) >> return workers_must_exit; >> } >> >> -bool >> -ovn_can_parallelize_hashes(bool force_parallel) >> +size_t >> +ovn_get_worker_pool_size(void) >> { >> - bool test = false; >> + return pool_size; >> +} >> >> - if (atomic_compare_exchange_strong( >> - &initial_pool_setup, >> - &test, >> - true)) { >> - ovs_mutex_lock(&init_mutex); >> - setup_worker_pools(force_parallel); >> - ovs_mutex_unlock(&init_mutex); >> +static void >> +stop_controls(struct worker_pool *pool) >> +{ >> + if (pool->controls) { >> + workers_must_exit = true; >> + >> + /* unlock threads */ > > Nit: /* Unlock threads. */ > >> + for (size_t i = 0; i < pool->size ; i++) { >> + if (pool->controls[i].fire != SEM_FAILED) { >> + sem_post(pool->controls[i].fire); >> + } >> + } >> + >> + /* Wait completion */ > > Nit: /* Wait for completion. */ > >> + for (size_t i = 0; i < pool->size ; i++) { >> + if (pool->controls[i].worker) { >> + pthread_join(pool->controls[i].worker, NULL); >> + pool->controls[i].worker = 0; >> + } >> + } >> + workers_must_exit = false; >> } >> - return can_parallelize; >> } >> >> -struct worker_pool * >> -ovn_add_worker_pool(void *(*start)(void *)) >> +static void >> +free_controls(struct worker_pool *pool) >> +{ >> + char sem_name[256]; >> + if (pool->controls) { >> + /* Close/unlink semaphores */ > > Nit: /* Close/unlink semaphores. */ > >> + for (size_t i = 0; i < pool->size; i++) { >> + ovs_mutex_destroy(&pool->controls[i].mutex); >> + if (pool->controls[i].fire != SEM_FAILED) { >> + sem_close(pool->controls[i].fire); >> + sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, i); >> + sem_unlink(sem_name); >> + } else { >> + /* This and following controls are not initialized */ > > Nit: indent needs one extra space. > >> + break; >> + } >> + } >> + free(pool->controls); >> + pool->controls = NULL; >> + } >> +} >> + >> +static void >> +free_pool(struct worker_pool *pool) >> +{ >> + char sem_name[256]; >> + stop_controls(pool); >> + free_controls(pool); >> + if (pool->done != SEM_FAILED) { >> + sem_close(pool->done); >> + sprintf(sem_name, MAIN_SEM_NAME, sembase, pool); >> + sem_unlink(sem_name); >> + } >> + free(pool); >> +} >> + >> +static int >> +init_controls(struct worker_pool *pool) >> { >> - struct worker_pool *new_pool = NULL; >> struct worker_control *new_control; >> + char sem_name[256]; >> + >> + pool->controls = xmalloc(sizeof(struct worker_control) * pool->size); >> + for (size_t i = 0; i < pool->size ; i++) { >> + pool->controls[i].fire = SEM_FAILED; >> + } >> + for (size_t i = 0; i < pool->size; i++) { >> + new_control = &pool->controls[i]; >> + new_control->id = i; >> + new_control->done = pool->done; >> + new_control->data = NULL; >> + new_control->pool = pool; >> + new_control->worker = 0; >> + ovs_mutex_init(&new_control->mutex); >> + new_control->finished = ATOMIC_VAR_INIT(false); >> + sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, i); >> + new_control->fire = sem_open(sem_name, O_CREAT, S_IRWXU, 0); >> + if (new_control->fire == SEM_FAILED) { >> + free_controls(pool); >> + return -1; >> + } >> + } >> + return 0; >> +} >> + >> +static void >> +init_threads(struct worker_pool *pool, void *(*start)(void *)) >> +{ >> + for (size_t i = 0; i < pool_size; i++) { >> + pool->controls[i].worker = >> + ovs_thread_create("worker pool helper", start, >> &pool->controls[i]); >> + } >> + ovs_list_push_back(&worker_pools, &pool->list_node); >> +} >> + >> +enum pool_update_status >> +ovn_update_worker_pool(size_t requested_pool_size, >> + struct worker_pool **pool, void *(*start)(void *)) >> +{ >> bool test = false; >> - int i; >> char sem_name[256]; >> >> - /* Belt and braces - initialize the pool system just in case if >> - * if it is not yet initialized. >> - */ >> + if (requested_pool_size == pool_size) { >> + return POOL_UNCHANGED; >> + } >> + >> if (atomic_compare_exchange_strong( >> &initial_pool_setup, >> &test, >> true)) { >> ovs_mutex_lock(&init_mutex); >> - setup_worker_pools(false); >> + setup_worker_pools(); >> ovs_mutex_unlock(&init_mutex); >> } >> - >> ovs_mutex_lock(&init_mutex); >> - if (can_parallelize) { >> - new_pool = xmalloc(sizeof(struct worker_pool)); >> - new_pool->size = pool_size; >> - new_pool->controls = NULL; >> - sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool); >> - new_pool->done = sem_open(sem_name, O_CREAT, S_IRWXU, 0); >> - if (new_pool->done == SEM_FAILED) { >> - goto cleanup; >> - } >> - >> - new_pool->controls = >> - xmalloc(sizeof(struct worker_control) * new_pool->size); >> - >> - for (i = 0; i < new_pool->size; i++) { >> - new_control = &new_pool->controls[i]; >> - new_control->id = i; >> - new_control->done = new_pool->done; >> - new_control->data = NULL; >> - ovs_mutex_init(&new_control->mutex); >> - new_control->finished = ATOMIC_VAR_INIT(false); >> - sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i); >> - new_control->fire = sem_open(sem_name, O_CREAT, S_IRWXU, 0); >> - if (new_control->fire == SEM_FAILED) { >> + pool_size = requested_pool_size; >> + VLOG_INFO("Setting thread count to %"PRIuSIZE, pool_size); >> + >> + if (*pool == NULL) { >> + if (pool_size > 1) { >> + VLOG_INFO("Creating new pool with size %"PRIuSIZE, pool_size); >> + *pool = xmalloc(sizeof(struct worker_pool)); >> + (*pool)->size = pool_size; >> + (*pool)->controls = NULL; >> + sprintf(sem_name, MAIN_SEM_NAME, sembase, *pool); >> + (*pool)->done = sem_open(sem_name, O_CREAT, S_IRWXU, 0); >> + if ((*pool)->done == SEM_FAILED) { >> goto cleanup; >> } >> + if (init_controls(*pool) == -1) { >> + goto cleanup; >> + } >> + init_threads(*pool, start); >> } >> - >> - for (i = 0; i < pool_size; i++) { >> - new_pool->controls[i].worker = >> - ovs_thread_create("worker pool helper", start, >> &new_pool->controls[i]); >> + } else { >> + if (pool_size > 1) { >> + VLOG_INFO("Changing size of existing pool to %"PRIuSIZE, >> + pool_size); >> + stop_controls(*pool); >> + free_controls(*pool); >> + ovs_list_remove(&(*pool)->list_node); >> + (*pool)->size = pool_size; >> + if (init_controls(*pool) == -1) { >> + goto cleanup; >> + } >> + init_threads(*pool, start); >> + } else { >> + VLOG_INFO("Deleting existing pool"); >> + worker_pool_hook(NULL); >> + *pool = NULL; >> } >> - ovs_list_push_back(&worker_pools, &new_pool->list_node); >> } >> ovs_mutex_unlock(&init_mutex); >> - return new_pool; >> -cleanup: >> + return POOL_UPDATED; >> >> +cleanup: >> /* Something went wrong when opening semaphores. In this case >> * it is better to shut off parallel procesing altogether >> */ >> - >> - VLOG_INFO("Failed to initialize parallel processing, error %d", errno); >> - can_parallelize = false; >> - if (new_pool->controls) { >> - for (i = 0; i < new_pool->size; i++) { >> - if (new_pool->controls[i].fire != SEM_FAILED) { >> - sem_close(new_pool->controls[i].fire); >> - sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i); >> - sem_unlink(sem_name); >> - break; /* semaphores past this one are uninitialized */ >> - } >> - } >> - } >> - if (new_pool->done != SEM_FAILED) { >> - sem_close(new_pool->done); >> - sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool); >> - sem_unlink(sem_name); >> - } >> + VLOG_ERR("Failed to initialize parallel processing: %s", >> + ovs_strerror(errno)); >> + free_pool(*pool); >> + *pool = NULL; >> + pool_size = 1; >> ovs_mutex_unlock(&init_mutex); >> - return NULL; >> + return POOL_UPDATE_FAILED; >> } >> >> - >> /* Initializes 'hmap' as an empty hash table with mask N. */ >> void >> ovn_fast_hmap_init(struct hmap *hmap, ssize_t mask) >> @@ -225,9 +298,9 @@ ovn_run_pool_callback(struct worker_pool *pool, >> void *fin_result, void *result_frags, >> void (*helper_func)(struct worker_pool *pool, >> void *fin_result, >> - void *result_frags, int index)) >> + void *result_frags, size_t index)) >> { >> - int index, completed; >> + size_t index, completed; >> >> /* Ensure that all worker threads see the same data as the >> * main thread. >> @@ -367,9 +440,7 @@ ovn_update_hashrow_locks(struct hmap *lflows, struct >> hashrow_locks *hrl) >> >> static void >> worker_pool_hook(void *aux OVS_UNUSED) { >> - int i; >> static struct worker_pool *pool; >> - char sem_name[256]; >> >> workers_must_exit = true; >> >> @@ -380,55 +451,15 @@ worker_pool_hook(void *aux OVS_UNUSED) { >> */ >> atomic_thread_fence(memory_order_acq_rel); >> >> - /* Wake up the workers after the must_exit flag has been set */ >> - >> - LIST_FOR_EACH (pool, list_node, &worker_pools) { >> - for (i = 0; i < pool->size ; i++) { >> - sem_post(pool->controls[i].fire); >> - } >> - for (i = 0; i < pool->size ; i++) { >> - pthread_join(pool->controls[i].worker, NULL); >> - } >> - for (i = 0; i < pool->size ; i++) { >> - sem_close(pool->controls[i].fire); >> - sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, i); >> - sem_unlink(sem_name); >> - } >> - sem_close(pool->done); >> - sprintf(sem_name, MAIN_SEM_NAME, sembase, pool); >> - sem_unlink(sem_name); >> + LIST_FOR_EACH_SAFE (pool, list_node, &worker_pools) { >> + ovs_list_remove(&pool->list_node); >> + free_pool(pool); >> } >> } >> >> static void >> -setup_worker_pools(bool force) { >> - int cores, nodes; >> - >> - ovs_numa_init(); >> - nodes = ovs_numa_get_n_numas(); >> - if (nodes == OVS_NUMA_UNSPEC || nodes <= 0) { >> - nodes = 1; >> - } >> - cores = ovs_numa_get_n_cores(); >> - >> - /* If there is no NUMA config, use 4 cores. >> - * If there is NUMA config use half the cores on >> - * one node so that the OS does not start pushing >> - * threads to other nodes. >> - */ >> - if (cores == OVS_CORE_UNSPEC || cores <= 0) { >> - /* If there is no NUMA we can try the ovs-threads routine. >> - * It falls back to sysconf and/or affinity mask. >> - */ >> - cores = count_cpu_cores(); >> - pool_size = cores; >> - } else { >> - pool_size = cores / nodes; >> - } >> - if ((pool_size < 4) && force) { >> - pool_size = 4; >> - } >> - can_parallelize = (pool_size >= 3); >> +setup_worker_pools(void) >> +{ >> fatal_signal_add_hook(worker_pool_hook, NULL, NULL, true); >> sembase = random_uint32(); >> } >> @@ -436,7 +467,7 @@ setup_worker_pools(bool force) { >> static void >> merge_list_results(struct worker_pool *pool OVS_UNUSED, >> void *fin_result, void *result_frags, >> - int index) >> + size_t index) >> { >> struct ovs_list *result = (struct ovs_list *)fin_result; >> struct ovs_list *res_frags = (struct ovs_list *)result_frags; >> @@ -450,7 +481,7 @@ merge_list_results(struct worker_pool *pool OVS_UNUSED, >> static void >> merge_hash_results(struct worker_pool *pool OVS_UNUSED, >> void *fin_result, void *result_frags, >> - int index) >> + size_t index) >> { >> struct hmap *result = (struct hmap *)fin_result; >> struct hmap *res_frags = (struct hmap *)result_frags; >> diff --git a/lib/ovn-parallel-hmap.h b/lib/ovn-parallel-hmap.h >> index 0f7d68770..72b31b489 100644 >> --- a/lib/ovn-parallel-hmap.h >> +++ b/lib/ovn-parallel-hmap.h >> @@ -81,21 +81,30 @@ struct worker_control { >> sem_t *done; /* Work completion semaphore - sem_post on completion. */ >> struct ovs_mutex mutex; /* Guards the data. */ >> void *data; /* Pointer to data to be processed. */ >> - void *workload; /* back-pointer to the worker pool structure. */ >> pthread_t worker; >> + struct worker_pool *pool; >> }; >> >> struct worker_pool { >> - int size; /* Number of threads in the pool. */ >> + size_t size; /* Number of threads in the pool. */ >> struct ovs_list list_node; /* List of pools - used in cleanup/exit. */ >> struct worker_control *controls; /* "Handles" in this pool. */ >> sem_t *done; /* Work completion semaphorew. */ >> }; >> >> -/* Add a worker pool for thread function start() which expects a pointer to >> - * a worker_control structure as an argument. */ >> +/* Return pool size; bigger than 1 means parallelization has been enabled. >> */ >> +size_t ovn_get_worker_pool_size(void); >> >> -struct worker_pool *ovn_add_worker_pool(void *(*start)(void *)); >> +enum pool_update_status { >> + POOL_UNCHANGED, /* no change to pool */ >> + POOL_UPDATED, /* pool has been updated */ >> + POOL_UPDATE_FAILED, /* pool update failed; parallelization disabled */ >> +}; > > Nit: I would add an empty line here. > >> +/* Add/delete a worker pool for thread function start() which expects a >> pointer >> + * to a worker_control structure as an argument. Return true if updated */ >> +enum pool_update_status ovn_update_worker_pool(size_t requested_pool_size, >> + struct worker_pool **, >> + void *(*start)(void *)); >> >> /* Setting this to true will make all processing threads exit */ >> >> @@ -140,7 +149,8 @@ void ovn_run_pool_list(struct worker_pool *pool, >> void ovn_run_pool_callback(struct worker_pool *pool, void *fin_result, >> void *result_frags, >> void (*helper_func)(struct worker_pool *pool, >> - void *fin_result, void *result_frags, int >> index)); >> + void *fin_result, void *result_frags, >> + size_t index)); >> >> >> /* Returns the first node in 'hmap' in the bucket in which the given 'hash' >> @@ -251,17 +261,17 @@ static inline void init_hash_row_locks(struct >> hashrow_locks *hrl) >> hrl->row_locks = NULL; >> } >> >> -bool ovn_can_parallelize_hashes(bool force_parallel); >> - >> /* Use the OVN library functions for stuff which OVS has not defined >> * If OVS has defined these, they will still compile using the OVN >> * local names, but will be dropped by the linker in favour of the OVS >> * supplied functions. >> */ >> +#define update_worker_pool(requested_pool_size, existing_pool, func) \ >> + ovn_update_worker_pool(requested_pool_size, existing_pool, func) >> >> -#define update_hashrow_locks(lflows, hrl) ovn_update_hashrow_locks(lflows, >> hrl) >> +#define get_worker_pool_size() ovn_get_worker_pool_size() >> >> -#define can_parallelize_hashes(force) ovn_can_parallelize_hashes(force) >> +#define update_hashrow_locks(lflows, hrl) ovn_update_hashrow_locks(lflows, >> hrl) >> >> #define stop_parallel_processing() ovn_stop_parallel_processing() >> >> diff --git a/northd/northd.c b/northd/northd.c >> index 67c39df88..48426c801 100644 >> --- a/northd/northd.c >> +++ b/northd/northd.c >> @@ -59,6 +59,7 @@ >> VLOG_DEFINE_THIS_MODULE(northd); >> >> static bool controller_event_en; >> +static bool lflow_hash_lock_initialized = false; >> >> static bool check_lsp_is_up; >> >> @@ -4740,7 +4741,13 @@ ovn_lflow_equal(const struct ovn_lflow *a, const >> struct ovn_datapath *od, >> /* If this option is 'true' northd will combine logical flows that differ by >> * logical datapath only by creating a datapath group. */ >> static bool use_logical_dp_groups = false; >> -static bool use_parallel_build = false; >> + >> +enum { >> + STATE_NULL, /* parallelization is off */ >> + STATE_INIT_HASH_SIZES, /* parallelization is on; hashes sizing >> needed */ >> + STATE_USE_PARALLELIZATION /* parallelization is on */ >> +}; >> +static int parallelization_state = STATE_NULL; >> >> static void >> ovn_lflow_init(struct ovn_lflow *lflow, struct ovn_datapath *od, >> @@ -4759,7 +4766,8 @@ ovn_lflow_init(struct ovn_lflow *lflow, struct >> ovn_datapath *od, >> lflow->ctrl_meter = ctrl_meter; >> lflow->dpg = NULL; >> lflow->where = where; >> - if (use_parallel_build && use_logical_dp_groups) { >> + if ((parallelization_state != STATE_NULL) >> + && use_logical_dp_groups) { >> ovs_mutex_init(&lflow->odg_lock); >> } >> } >> @@ -4773,7 +4781,7 @@ ovn_dp_group_add_with_reference(struct ovn_lflow >> *lflow_ref, >> return false; >> } >> >> - if (use_parallel_build) { >> + if (parallelization_state == STATE_USE_PARALLELIZATION) { >> ovs_mutex_lock(&lflow_ref->odg_lock); >> hmapx_add(&lflow_ref->od_group, od); >> ovs_mutex_unlock(&lflow_ref->odg_lock); >> @@ -4803,9 +4811,23 @@ static struct ovs_mutex >> lflow_hash_locks[LFLOW_HASH_LOCK_MASK + 1]; >> static void >> lflow_hash_lock_init(void) >> { >> - for (size_t i = 0; i < LFLOW_HASH_LOCK_MASK + 1; i++) { >> - ovs_mutex_init(&lflow_hash_locks[i]); >> + if (!lflow_hash_lock_initialized) { >> + for (size_t i = 0; i < LFLOW_HASH_LOCK_MASK + 1; i++) { >> + ovs_mutex_init(&lflow_hash_locks[i]); >> + } >> + lflow_hash_lock_initialized = true; >> + } >> +} >> + >> +static void >> +lflow_hash_lock_destroy(void) >> +{ >> + if (lflow_hash_lock_initialized) { >> + for (size_t i = 0; i < LFLOW_HASH_LOCK_MASK + 1; i++) { >> + ovs_mutex_destroy(&lflow_hash_locks[i]); >> + } >> } >> + lflow_hash_lock_initialized = false; >> } >> >> /* This thread-local var is used for parallel lflow building when dp-groups >> is >> @@ -4853,7 +4875,7 @@ do_ovn_lflow_add(struct hmap *lflow_map, struct >> ovn_datapath *od, >> nullable_xstrdup(ctrl_meter), >> ovn_lflow_hint(stage_hint), where); >> hmapx_add(&lflow->od_group, od); >> - if (!use_parallel_build) { >> + if (parallelization_state != STATE_USE_PARALLELIZATION) { >> hmap_insert(lflow_map, &lflow->hmap_node, hash); >> } else { >> hmap_insert_fast(lflow_map, &lflow->hmap_node, hash); >> @@ -4896,7 +4918,8 @@ ovn_lflow_add_at_with_hash(struct hmap *lflow_map, >> struct ovn_datapath *od, >> struct ovn_lflow *lflow; >> >> ovs_assert(ovn_stage_to_datapath_type(stage) == >> ovn_datapath_get_type(od)); >> - if (use_logical_dp_groups && use_parallel_build) { >> + if (use_logical_dp_groups >> + && (parallelization_state == STATE_USE_PARALLELIZATION)) { >> lflow = do_ovn_lflow_add_pd(lflow_map, od, hash, stage, priority, >> match, actions, io_port, stage_hint, >> where, >> ctrl_meter); >> @@ -4982,6 +5005,10 @@ static void >> ovn_lflow_destroy(struct hmap *lflows, struct ovn_lflow *lflow) >> { >> if (lflow) { >> + if ((parallelization_state != STATE_NULL) >> + && use_logical_dp_groups) { >> + ovs_mutex_destroy(&lflow->odg_lock); >> + } >> if (lflows) { >> hmap_remove(lflows, &lflow->hmap_node); >> } >> @@ -13925,15 +13952,10 @@ build_lswitch_and_lrouter_iterate_by_op(struct >> ovn_port *op, >> &lsi->actions); >> } >> >> -struct lflows_thread_pool { >> - struct worker_pool *pool; >> -}; >> - >> static void * >> build_lflows_thread(void *arg) >> { >> struct worker_control *control = (struct worker_control *) arg; >> - struct lflows_thread_pool *workload; >> struct lswitch_flow_build_info *lsi; >> >> struct ovn_datapath *od; >> @@ -13944,17 +13966,16 @@ build_lflows_thread(void *arg) >> >> while (!stop_parallel_processing()) { >> wait_for_work(control); >> - workload = (struct lflows_thread_pool *) control->workload; >> lsi = (struct lswitch_flow_build_info *) control->data; >> if (stop_parallel_processing()) { >> return NULL; >> } >> thread_lflow_counter = 0; >> - if (lsi && workload) { >> + if (lsi) { >> /* Iterate over bucket ThreadID, ThreadID+size, ... */ >> for (bnum = control->id; >> bnum <= lsi->datapaths->mask; >> - bnum += workload->pool->size) >> + bnum += control->pool->size) >> { >> HMAP_FOR_EACH_IN_PARALLEL (od, key_node, bnum, >> lsi->datapaths) { >> if (stop_parallel_processing()) { >> @@ -13965,7 +13986,7 @@ build_lflows_thread(void *arg) >> } >> for (bnum = control->id; >> bnum <= lsi->ports->mask; >> - bnum += workload->pool->size) >> + bnum += control->pool->size) >> { >> HMAP_FOR_EACH_IN_PARALLEL (op, key_node, bnum, lsi->ports) { >> if (stop_parallel_processing()) { >> @@ -13976,7 +13997,7 @@ build_lflows_thread(void *arg) >> } >> for (bnum = control->id; >> bnum <= lsi->lbs->mask; >> - bnum += workload->pool->size) >> + bnum += control->pool->size) >> { >> HMAP_FOR_EACH_IN_PARALLEL (lb, hmap_node, bnum, lsi->lbs) { >> if (stop_parallel_processing()) { >> @@ -13997,7 +14018,7 @@ build_lflows_thread(void *arg) >> } >> for (bnum = control->id; >> bnum <= lsi->igmp_groups->mask; >> - bnum += workload->pool->size) >> + bnum += control->pool->size) >> { >> HMAP_FOR_EACH_IN_PARALLEL ( >> igmp_group, hmap_node, bnum, lsi->igmp_groups) { >> @@ -14016,39 +14037,13 @@ build_lflows_thread(void *arg) >> return NULL; >> } >> >> -static bool pool_init_done = false; >> -static struct lflows_thread_pool *build_lflows_pool = NULL; >> - >> -static void >> -init_lflows_thread_pool(void) >> -{ >> - int index; >> - >> - if (!pool_init_done) { >> - struct worker_pool *pool = add_worker_pool(build_lflows_thread); >> - pool_init_done = true; >> - if (pool) { >> - build_lflows_pool = xmalloc(sizeof(*build_lflows_pool)); >> - build_lflows_pool->pool = pool; >> - for (index = 0; index < build_lflows_pool->pool->size; index++) >> { >> - build_lflows_pool->pool->controls[index].workload = >> - build_lflows_pool; >> - } >> - } >> - } >> -} >> - >> -/* TODO: replace hard cutoffs by configurable via commands. These are >> - * temporary defines to determine single-thread to multi-thread processing >> - * cutoff. >> - * Setting to 1 forces "all parallel" lflow build. >> - */ >> +static struct worker_pool *build_lflows_pool = NULL; >> >> static void >> noop_callback(struct worker_pool *pool OVS_UNUSED, >> void *fin_result OVS_UNUSED, >> void *result_frags OVS_UNUSED, >> - int index OVS_UNUSED) >> + size_t index OVS_UNUSED) >> { >> /* Do nothing */ >> } >> @@ -14088,28 +14083,21 @@ build_lswitch_and_lrouter_flows(const struct hmap >> *datapaths, >> >> char *svc_check_match = xasprintf("eth.dst == %s", svc_monitor_mac); >> >> - if (use_parallel_build) { >> - init_lflows_thread_pool(); >> - if (!can_parallelize_hashes(false)) { >> - use_parallel_build = false; >> - } >> - } >> - >> - if (use_parallel_build) { >> + if (parallelization_state == STATE_USE_PARALLELIZATION) { >> struct hmap *lflow_segs; >> struct lswitch_flow_build_info *lsiv; >> int index; >> >> - lsiv = xcalloc(sizeof(*lsiv), build_lflows_pool->pool->size); >> + lsiv = xcalloc(sizeof(*lsiv), build_lflows_pool->size); >> if (use_logical_dp_groups) { >> lflow_segs = NULL; >> } else { >> - lflow_segs = xcalloc(sizeof(*lflow_segs), >> build_lflows_pool->pool->size); >> + lflow_segs = xcalloc(sizeof(*lflow_segs), >> build_lflows_pool->size); >> } >> >> /* Set up "work chunks" for each thread to work on. */ >> >> - for (index = 0; index < build_lflows_pool->pool->size; index++) { >> + for (index = 0; index < build_lflows_pool->size; index++) { >> if (use_logical_dp_groups) { >> /* if dp_groups are in use we lock a shared lflows hash >> * on a per-bucket level instead of merging hash frags */ >> @@ -14132,19 +14120,19 @@ build_lswitch_and_lrouter_flows(const struct hmap >> *datapaths, >> ds_init(&lsiv[index].match); >> ds_init(&lsiv[index].actions); >> >> - build_lflows_pool->pool->controls[index].data = &lsiv[index]; >> + build_lflows_pool->controls[index].data = &lsiv[index]; >> } >> >> /* Run thread pool. */ >> if (use_logical_dp_groups) { >> - run_pool_callback(build_lflows_pool->pool, NULL, NULL, >> + run_pool_callback(build_lflows_pool, NULL, NULL, >> noop_callback); >> - fix_flow_map_size(lflows, lsiv, build_lflows_pool->pool->size); >> + fix_flow_map_size(lflows, lsiv, build_lflows_pool->size); >> } else { >> - run_pool_hash(build_lflows_pool->pool, lflows, lflow_segs); >> + run_pool_hash(build_lflows_pool, lflows, lflow_segs); >> } >> >> - for (index = 0; index < build_lflows_pool->pool->size; index++) { >> + for (index = 0; index < build_lflows_pool->size; index++) { >> ds_destroy(&lsiv[index].match); >> ds_destroy(&lsiv[index].actions); >> } >> @@ -14280,8 +14268,39 @@ ovn_sb_set_lflow_logical_dp_group( >> } >> >> static ssize_t max_seen_lflow_size = 128; >> -static bool needs_parallel_init = true; >> -static bool reset_parallel = false; >> + >> +void run_update_worker_pool(int n_threads) >> +{ >> + /* If number of threads has been updated (or initially set), >> + * update the worker pool. */ >> + if (update_worker_pool(n_threads, &build_lflows_pool, >> + build_lflows_thread) != POOL_UNCHANGED) { >> + /* worker pool was updated */ >> + if (get_worker_pool_size() <= 1) { >> + /* destroy potentially created lflow_hash_lock */ >> + lflow_hash_lock_destroy(); >> + parallelization_state = STATE_NULL; >> + } else if (parallelization_state != STATE_USE_PARALLELIZATION) { >> + if (use_logical_dp_groups) { >> + lflow_hash_lock_init(); >> + parallelization_state = STATE_INIT_HASH_SIZES; >> + } else { >> + parallelization_state = STATE_USE_PARALLELIZATION; >> + } >> + } >> + } >> +} >> + >> +static void worker_pool_init_for_ldp(void) >> +{ >> + /* If parallelization is enabled, make sure locks are initialized >> + * when ldp are used. >> + */ >> + if (parallelization_state != STATE_NULL) { >> + lflow_hash_lock_init(); >> + parallelization_state = STATE_INIT_HASH_SIZES; >> + } >> +} >> >> static void >> build_mcast_groups(struct lflow_input *data, >> @@ -14302,30 +14321,18 @@ void build_lflows(struct lflow_input *input_data, >> build_mcast_groups(input_data, input_data->datapaths, input_data->ports, >> &mcast_groups, &igmp_groups); >> >> - if (reset_parallel) { >> - /* Parallel build was disabled before, we need to >> - * re-enable it. */ >> - use_parallel_build = true; >> - reset_parallel = false; >> - } >> - >> fast_hmap_size_for(&lflows, max_seen_lflow_size); >> >> - if (use_parallel_build && use_logical_dp_groups && >> - needs_parallel_init) { >> - lflow_hash_lock_init(); >> - needs_parallel_init = false; >> - /* Disable parallel build on first run with dp_groups >> - * to determine the correct sizing of hashes. */ >> - use_parallel_build = false; >> - reset_parallel = true; >> - } >> build_lswitch_and_lrouter_flows(input_data->datapaths, >> input_data->ports, >> input_data->port_groups, &lflows, >> &mcast_groups, &igmp_groups, >> input_data->meter_groups, >> input_data->lbs, >> input_data->bfd_connections); >> >> + if (parallelization_state == STATE_INIT_HASH_SIZES) { >> + parallelization_state = STATE_USE_PARALLELIZATION; >> + } >> + >> /* Parallel build may result in a suboptimal hash. Resize the >> * hash to a correct size before doing lookups */ >> >> @@ -15567,12 +15574,13 @@ ovnnb_db_run(struct northd_input *input_data, >> >> smap_destroy(&options); >> >> - use_parallel_build = >> - (smap_get_bool(&nb->options, "use_parallel_build", false) && >> - can_parallelize_hashes(false)); >> - >> + bool old_use_ldp = use_logical_dp_groups; >> use_logical_dp_groups = smap_get_bool(&nb->options, >> "use_logical_dp_groups", true); >> + if (use_logical_dp_groups && !old_use_ldp) { >> + worker_pool_init_for_ldp(); >> + } >> + >> use_ct_inv_match = smap_get_bool(&nb->options, >> "use_ct_inv_match", true); >> >> diff --git a/northd/northd.h b/northd/northd.h >> index 2d804a22e..fe8dad03a 100644 >> --- a/northd/northd.h >> +++ b/northd/northd.h >> @@ -107,5 +107,6 @@ void build_bfd_table(struct lflow_input *input_data, >> struct hmap *bfd_connections, struct hmap *ports); >> void bfd_cleanup_connections(struct lflow_input *input_data, >> struct hmap *bfd_map); >> +void run_update_worker_pool(int n_threads); >> >> #endif /* NORTHD_H */ >> diff --git a/northd/ovn-northd-ddlog.c b/northd/ovn-northd-ddlog.c >> index 718d052cc..e9afda4c6 100644 >> --- a/northd/ovn-northd-ddlog.c >> +++ b/northd/ovn-northd-ddlog.c >> @@ -1084,7 +1084,6 @@ parse_options(int argc OVS_UNUSED, char *argv[] >> OVS_UNUSED, >> SSL_OPTION_ENUMS, >> OPT_DRY_RUN, >> OPT_DDLOG_RECORD, >> - OPT_DUMMY_NUMA, >> }; >> static const struct option long_options[] = { >> {"ovnsb-db", required_argument, NULL, 'd'}, >> @@ -1098,7 +1097,6 @@ parse_options(int argc OVS_UNUSED, char *argv[] >> OVS_UNUSED, >> OVN_DAEMON_LONG_OPTIONS, >> VLOG_LONG_OPTIONS, >> STREAM_SSL_LONG_OPTIONS, >> - {"dummy-numa", required_argument, NULL, OPT_DUMMY_NUMA}, >> {NULL, 0, NULL, 0}, >> }; >> char *short_options = >> ovs_cmdl_long_options_to_short_options(long_options); >> @@ -1155,10 +1153,6 @@ parse_options(int argc OVS_UNUSED, char *argv[] >> OVS_UNUSED, >> *pause = true; >> break; >> >> - case OPT_DUMMY_NUMA: >> - ovs_numa_set_dummy(optarg); >> - break; >> - >> case OPT_DDLOG_RECORD: >> record_file = optarg; >> break; >> diff --git a/northd/ovn-northd.8.xml b/northd/ovn-northd.8.xml >> index 73ecfbc2d..077bd1f41 100644 >> --- a/northd/ovn-northd.8.xml >> +++ b/northd/ovn-northd.8.xml >> @@ -68,53 +68,26 @@ >> restarting a process or disturbing a running system. >> </p> >> </dd> >> - <dt><code>--dummy-numa</code></dt> >> + <dt><code>n-threads N</code></dt> >> <dd> >> - <p> >> - Typically, OVS uses sysfs to determine the number of NUMA nodes >> and >> - CPU cores that are available on a machine. The parallelization >> code >> - in OVN uses this information to determine if there are enough >> - resources to use parallelization. The current algorithm enables >> - parallelization if the total number of CPU cores divided by the >> - number of NUMA nodes is greater than or equal to four. >> - </p> >> - >> <p> >> In certain situations, it may be desirable to enable >> parallelization >> - on a system that otherwise would not have it allowed. The >> - <code>--dummy-numa</code> option allows for you to fake the NUMA >> - nodes and cores that OVS thinks your system has. The syntax >> consists >> - of using numbers to represent the NUMA node IDs. The number of >> times >> - that a NUMA node ID appears represents how many CPU cores that >> NUMA >> - node contains. So for instance, if you did the following: >> - </p> >> - >> - <p> >> - <code>--dummy-numa=0,0,0,0</code> >> - </p> >> - >> - <p> >> - it would make OVS assume that you have a single NUMA node with ID >> 0, >> - and that NUMA node consists of four CPU cores. Similarly, you >> could >> - do: >> - </p> >> - >> - <p> >> - <code>--dummy-numa=0,0,0,0,0,0,1,1,1,1,1,1</code> >> + on a system to decrease latency (at the potential cost of >> increasing >> + CPU usage). >> </p> >> >> <p> >> - to make OVS assume you have two NUMA nodes with IDs 0 and 1, each >> - with six CPU cores. >> + This option will cause ovn-northd to use N threads when building >> + logical flows, when N is within [2-256]. >> + If N is 1, parallelization is disabled (default behavior). >> + If N is less than 1, then N is set to 1, parallelization is >> disabled >> + and a warning is logged. >> + If N is more than 256, then N is set to 256, parallelization is >> + enabled (with 256 threads) and a warning is logged. >> </p> >> >> <p> >> - Currently, the only affect this option has is on whether >> - parallelization can be enabled in ovn-northd. There are no NUMA >> node >> - or CPU core-specific actions performed by OVN. Setting >> - <code>--dummy-numa</code> in ovn-northd does not affect how other >> OVS >> - processes on the system (such as ovs-vswitchd) count the number of >> - NUMA nodes and CPU cores; this setting is local to ovn-northd. >> + ovn-northd-ddlog does not support this option. >> </p> >> </dd> >> </dl> >> @@ -210,6 +183,27 @@ >> except for the northbound database client. >> </p> >> </dd> >> + >> + >> + <dt><code>set-n-threads N</code></dt> >> + <dd> >> + <p> >> + Set the number of threads used for building logical flows. >> + When N is within [2-256], parallelization is enabled. >> + When N is 1 parallelization is disabled. >> + When N is less than 1 or more than 256, an error is returned. >> + If ovn-northd fails to start parallelization (e.g. fails to setup >> + semaphores, parallelization is disabled and an error is returned. >> + </p> >> + </dd> >> + >> + <dt><code>get-n-threads</code></dt> >> + <dd> >> + <p> >> + Return the number of threads used for building logical flows. >> + </p> >> + </dd> >> + >> </dl> >> </p> >> >> diff --git a/northd/ovn-northd.c b/northd/ovn-northd.c >> index 0a0f85010..b7363b16d 100644 >> --- a/northd/ovn-northd.c >> +++ b/northd/ovn-northd.c >> @@ -41,6 +41,7 @@ >> #include "unixctl.h" >> #include "util.h" >> #include "openvswitch/vlog.h" >> +#include "lib/ovn-parallel-hmap.h" >> >> VLOG_DEFINE_THIS_MODULE(ovn_northd); >> >> @@ -50,12 +51,16 @@ static unixctl_cb_func ovn_northd_resume; >> static unixctl_cb_func ovn_northd_is_paused; >> static unixctl_cb_func ovn_northd_status; >> static unixctl_cb_func cluster_state_reset_cmd; >> +static unixctl_cb_func ovn_northd_set_thread_count_cmd; >> +static unixctl_cb_func ovn_northd_get_thread_count_cmd; >> >> struct northd_state { >> bool had_lock; >> bool paused; >> }; >> >> +#define OVN_MAX_SUPPORTED_THREADS 256 >> + >> static const char *ovnnb_db; >> static const char *ovnsb_db; >> static const char *unixctl_path; >> @@ -525,7 +530,7 @@ Options:\n\ >> --ovnsb-db=DATABASE connect to ovn-sb database at DATABASE\n\ >> (default: %s)\n\ >> --dry-run start in paused state (do not commit db >> changes)\n\ >> - --dummy-numa override default NUMA node and CPU core >> discovery\n\ >> + --n-threads=N specify number of threads\n\ >> --unixctl=SOCKET override default control socket name\n\ >> -h, --help display this help message\n\ >> -o, --options list available options\n\ >> @@ -538,14 +543,14 @@ Options:\n\ >> >> static void >> parse_options(int argc OVS_UNUSED, char *argv[] OVS_UNUSED, >> - bool *paused) >> + bool *paused, int *n_threads) >> { >> enum { >> OVN_DAEMON_OPTION_ENUMS, >> VLOG_OPTION_ENUMS, >> SSL_OPTION_ENUMS, >> OPT_DRY_RUN, >> - OPT_DUMMY_NUMA, >> + OPT_N_THREADS, >> }; >> static const struct option long_options[] = { >> {"ovnsb-db", required_argument, NULL, 'd'}, >> @@ -555,7 +560,7 @@ parse_options(int argc OVS_UNUSED, char *argv[] >> OVS_UNUSED, >> {"options", no_argument, NULL, 'o'}, >> {"version", no_argument, NULL, 'V'}, >> {"dry-run", no_argument, NULL, OPT_DRY_RUN}, >> - {"dummy-numa", required_argument, NULL, OPT_DUMMY_NUMA}, >> + {"n-threads", required_argument, NULL, OPT_N_THREADS}, >> OVN_DAEMON_LONG_OPTIONS, >> VLOG_LONG_OPTIONS, >> STREAM_SSL_LONG_OPTIONS, >> @@ -611,8 +616,21 @@ parse_options(int argc OVS_UNUSED, char *argv[] >> OVS_UNUSED, >> ovn_print_version(0, 0); >> exit(EXIT_SUCCESS); >> >> - case OPT_DUMMY_NUMA: >> - ovs_numa_set_dummy(optarg); >> + case OPT_N_THREADS: >> + *n_threads = strtoul(optarg, NULL, 10); >> + if (*n_threads < 1) { >> + *n_threads = 1; >> + VLOG_WARN("Setting n_threads to %d as --n-threads option >> was " >> + "set to : [%s]", *n_threads, optarg); >> + } >> + if (*n_threads > OVN_MAX_SUPPORTED_THREADS) { >> + *n_threads = OVN_MAX_SUPPORTED_THREADS; >> + VLOG_WARN("Setting n_threads to %d as --n-threads option >> was " >> + "set to : [%s]", *n_threads, optarg); >> + } >> + if (*n_threads != 1) { >> + VLOG_INFO("Using %d threads", *n_threads); >> + } >> break; >> >> case OPT_DRY_RUN: >> @@ -668,6 +686,7 @@ main(int argc, char *argv[]) >> struct unixctl_server *unixctl; >> int retval; >> bool exiting; >> + int n_threads = 1; >> struct northd_state state = { >> .had_lock = false, >> .paused = false >> @@ -677,7 +696,7 @@ main(int argc, char *argv[]) >> ovs_cmdl_proctitle_init(argc, argv); >> ovn_set_program_name(argv[0]); >> service_start(&argc, &argv); >> - parse_options(argc, argv, &state.paused); >> + parse_options(argc, argv, &state.paused, &n_threads); >> >> daemonize_start(false); >> >> @@ -704,6 +723,12 @@ main(int argc, char *argv[]) >> unixctl_command_register("nb-cluster-state-reset", "", 0, 0, >> cluster_state_reset_cmd, >> &reset_ovnnb_idl_min_index); >> + unixctl_command_register("parallel-build/set-n-threads", "N_THREADS", >> 1, 1, >> + ovn_northd_set_thread_count_cmd, >> + NULL); >> + unixctl_command_register("parallel-build/get-n-threads", "", 0, 0, >> + ovn_northd_get_thread_count_cmd, >> + NULL); >> >> daemonize_complete(); >> >> @@ -761,6 +786,8 @@ main(int argc, char *argv[]) >> unsigned int ovnnb_cond_seqno = UINT_MAX; >> unsigned int ovnsb_cond_seqno = UINT_MAX; >> >> + run_update_worker_pool(n_threads); >> + >> /* Main loop. */ >> exiting = false; >> >> @@ -1017,3 +1044,30 @@ cluster_state_reset_cmd(struct unixctl_conn *conn, >> int argc OVS_UNUSED, >> poll_immediate_wake(); >> unixctl_command_reply(conn, NULL); >> } >> + >> +static void >> +ovn_northd_set_thread_count_cmd(struct unixctl_conn *conn, int argc >> OVS_UNUSED, >> + const char *argv[], void *aux OVS_UNUSED) >> +{ >> + int n_threads = atoi(argv[1]); >> + >> + if ((n_threads < 1) || (n_threads > OVN_MAX_SUPPORTED_THREADS)) { >> + struct ds s = DS_EMPTY_INITIALIZER; >> + ds_put_format(&s, "invalid n_threads: %d\n", n_threads); >> + unixctl_command_reply_error(conn, ds_cstr(&s)); >> + ds_destroy(&s); >> + } else { >> + run_update_worker_pool(n_threads); >> + unixctl_command_reply(conn, NULL); >> + } >> +} >> + >> +static void >> +ovn_northd_get_thread_count_cmd(struct unixctl_conn *conn, int argc >> OVS_UNUSED, >> + const char *argv[] OVS_UNUSED, void *aux OVS_UNUSED) >> +{ >> + struct ds s = DS_EMPTY_INITIALIZER; >> + ds_put_format(&s, "%"PRIuSIZE"\n", get_worker_pool_size()); >> + unixctl_command_reply(conn, ds_cstr(&s)); >> + ds_destroy(&s); >> +} >> diff --git a/tests/ovn-macros.at b/tests/ovn-macros.at >> index d78315c75..c6f0f6251 100644 >> --- a/tests/ovn-macros.at >> +++ b/tests/ovn-macros.at >> @@ -170,8 +170,8 @@ ovn_start_northd() { >> ovn-northd-ddlog) northd_args="$northd_args >> --ddlog-record=${AZ:+$AZ/}northd$suffix/replay.dat -v" ;; >> esac >> >> - if test X$NORTHD_DUMMY_NUMA = Xyes; then >> - northd_args="$northd_args --dummy-numa=\"0,0,0,0\"" >> + if test X$NORTHD_USE_PARALLELIZATION = Xyes; then >> + northd_args="$northd_args --n-threads=4" >> fi >> >> local name=${d_prefix}northd${suffix} >> @@ -252,10 +252,6 @@ ovn_start () { >> else >> ovn-nbctl set NB_Global . options:use_logical_dp_groups=false >> fi >> - >> - if test X$NORTHD_USE_PARALLELIZATION = Xyes; then >> - ovn-nbctl set NB_Global . options:use_parallel_build=true >> - fi >> } >> >> # Interconnection networks. >> @@ -749,16 +745,6 @@ OVS_END_SHELL_HELPERS >> >> m4_define([OVN_POPULATE_ARP], [AT_CHECK(ovn_populate_arp__, [0], [ignore])]) >> >> -# Use --dummy-numa if system has low cores and we want to force >> parallelization >> -m4_define([NORTHD_DUMMY_NUMA], >> - [$(if test $(nproc) -lt 4 && test NORTHD_USE_PARALLELIZATION = yes >> - then >> - echo "yes" >> - else >> - echo "no" >> - fi) >> -]) >> - >> # Defines a versions of a test with all combinations of northd and >> # datapath groups. >> m4_define([OVN_FOR_EACH_NORTHD], >> @@ -770,35 +756,14 @@ m4_define([OVN_FOR_EACH_NORTHD], >> # Some tests aren't prepared for dp groups to be enabled. >> m4_define([OVN_FOR_EACH_NORTHD_WITHOUT_DP_GROUPS], >> [m4_foreach([NORTHD_TYPE], [ovn-northd, ovn-northd-ddlog], >> - [m4_foreach([NORTHD_USE_DP_GROUPS], [no], [$1 >> -])])]) >> - >> -# Test parallelization with dp groups enabled and disabled >> -m4_define([OVN_NORTHD_PARALLELIZATION_DUMMY], [ >> -m4_pushdef([NORTHD_TYPE], [ovn_northd]) >> -m4_pushdef(NORTHD_DUMMY_NUMA, [yes]) >> -[m4_foreach([NORTHD_USE_DP_GROUPS], [yes, no], >> - [[NORTHD_USE_PARALLELIZATION], [yes] >> -])]]) >> - >> -m4_define([OVN_NORTHD_PARALLELIZATION_NO_DUMMY], [ >> -m4_pushdef([NORTHD_TYPE], [ovn_northd]) >> -m4_pushdef(NORTHD_DUMMY_NUMA, [no]) >> -[m4_foreach([NORTHD_USE_DP_GROUPS], [yes, no], >> - [[NORTHD_USE_PARALLELIZATION], [yes] >> -])]]) >> - >> -# Use --dummy-numa if system has low cores >> -m4_define([HOST_HAS_LOW_CORES], [ >> - if test $(nproc) -lt 4; then >> - : >> - $1 >> - else >> - : >> - $2 >> - fi >> -]) >> + [m4_foreach([NORTHD_USE_DP_GROUPS], [no], >> + [m4_foreach([NORTHD_USE_PARALLELIZATION], [yes, no], [$1 >> +])])])]) >> + >> +# Some tests aren't prepared for ddlog to be enabled. >> +m4_define([OVN_FOR_EACH_NORTHD_WITHOUT_DDLOG], >> + [m4_foreach([NORTHD_TYPE], [ovn-northd], >> + [m4_foreach([NORTHD_USE_DP_GROUPS], [yes, no], >> + [m4_foreach([NORTHD_USE_PARALLELIZATION], [yes, no], [$1 >> +])])])]) >> >> -m4_define([NORTHD_PARALLELIZATION], [ >> - HOST_HAS_LOW_CORES([OVN_NORTHD_PARALLELIZATION_DUMMY], >> [OVN_NORTHD_PARALLELIZATION_NO_DUMMY]) >> -]) >> diff --git a/tests/ovn-northd.at b/tests/ovn-northd.at >> index 17e37fb77..1a9d5ef76 100644 >> --- a/tests/ovn-northd.at >> +++ b/tests/ovn-northd.at >> @@ -7174,3 +7174,112 @@ ct_next(ct_state=new|trk); >> >> AT_CLEANUP >> ]) >> + >> +OVN_FOR_EACH_NORTHD_WITHOUT_DDLOG([ >> +AT_SETUP([northd-parallelization unixctl]) >> +ovn_start >> + >> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 1 >> +OVS_WAIT_FOR_OUTPUT([as northd ovn-appctl -t NORTHD_TYPE >> parallel-build/get-n-threads], [0], [1 >> +]) >> + >> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 4 >> +OVS_WAIT_FOR_OUTPUT([as northd ovn-appctl -t NORTHD_TYPE >> parallel-build/get-n-threads], [0], [4 >> +]) >> + >> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 1 >> +OVS_WAIT_FOR_OUTPUT([as northd ovn-appctl -t NORTHD_TYPE >> parallel-build/get-n-threads], [0], [1 >> +]) >> + >> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads >> 0], [2], [], >> + [invalid n_threads: 0 >> +ovn-appctl: ovn-northd: server returned an error >> +]) >> + >> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads >> -1], [2], [], >> + [invalid n_threads: -1 >> +ovn-appctl: ovn-northd: server returned an error >> +]) >> + >> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads >> 300], [2], [], >> + [invalid n_threads: 300 >> +ovn-appctl: ovn-northd: server returned an error >> +]) >> + >> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE >> parallel-build/set-n-threads], [2], [], >> + ["parallel-build/set-n-threads" command requires at least 1 arguments >> +ovn-appctl: ovn-northd: server returned an error >> +]) >> + >> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads >> 1 2], [2], [], >> + ["parallel-build/set-n-threads" command takes at most 1 arguments >> +ovn-appctl: ovn-northd: server returned an error >> +]) >> + >> +AT_CLEANUP >> +]) >> + >> +OVN_FOR_EACH_NORTHD_WITHOUT_DDLOG([ >> +AT_SETUP([northd-parallelization runtime]) >> +ovn_start >> + >> +add_switch_ports() { >> + for port in $(seq $1 $2); do >> + logical_switch_port=lsp${port} >> + check ovn-nbctl lsp-add ls1 $logical_switch_port >> + check ovn-nbctl lsp-set-addresses $logical_switch_port dynamic >> + done >> +} >> + >> +# Build some rather heavy config and modify number of threads in the middle >> +check ovn-nbctl ls-add ls1 >> +check ovn-nbctl set Logical_Switch ls1 other_config:subnet=10.1.0.0/16 >> +check ovn-nbctl set Logical_Switch ls1 other_config:exclude_ips=10.1.255.254 >> + >> +check ovn-nbctl lr-add lr1 >> +check ovn-nbctl lsp-add ls1 lsp0 -- set Logical_Switch_Port lsp0 >> type=router options:router-port=lrp0 addresses=dynamic >> +check ovn-nbctl lrp-add lr1 lrp0 "f0:00:00:01:00:01" 10.1.255.254/16 >> +check ovn-nbctl lr-nat-add lr1 snat 10.2.0.1 10.1.0.0/16 >> +add_switch_ports 1 50 >> + >> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 4 >> +add_switch_ports 51 100 >> + >> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 8 >> +add_switch_ports 101 150 >> + >> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 4 >> +add_switch_ports 151 200 >> + >> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 1 >> +add_switch_ports 201 250 >> +check ovn-nbctl --wait=sb sync >> + >> +# Run 3 times: one with parallelization enabled, one with disabled, and one >> while changing >> +# Compare the flows produced by the three runs >> +# Ignore IP/MAC addresses >> +ovn-sbctl dump-flows | sed 's/arp.tpa == 10.1.0..../arp.tpa == 10.1.0.??/' >> | sed 's/eth.dst == ..:..:..:..:..:../??:??:??:??:??:??/' |sort > flows1 >> + >> +# Restart with 1 thread >> +for port in $(seq 1 250); do >> + logical_switch_port=lsp${port} >> + check ovn-nbctl lsp-del $logical_switch_port >> +done >> +add_switch_ports 1 250 >> +check ovn-nbctl --wait=sb sync >> +ovn-sbctl dump-flows | sed 's/arp.tpa == 10.1.0..../arp.tpa == 10.1.0.??/' >> | sed 's/eth.dst == ..:..:..:..:..:../??:??:??:??:??:??/' |sort > flows2 >> +AT_CHECK([diff flows1 flows2]) >> + >> +# Restart with with 8 threads >> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 8 >> +for port in $(seq 1 250); do >> + logical_switch_port=lsp${port} >> + check ovn-nbctl lsp-del $logical_switch_port >> +done >> +add_switch_ports 1 250 >> +check ovn-nbctl --wait=sb sync >> +ovn-sbctl dump-flows | sed 's/arp.tpa == 10.1.0..../arp.tpa == 10.1.0.??/' >> | sed 's/eth.dst == ..:..:..:..:..:../??:??:??:??:??:??/' |sort > flows3 >> +AT_CHECK([diff flows1 flows3]) >> + >> +AT_CLEANUP >> +]) > _______________________________________________ dev mailing list [email protected] https://mail.openvswitch.org/mailman/listinfo/ovs-dev
