On 5/18/22 12:07, Xavier Simonart wrote:
> This patch is intended to change the way to enable northd lflow build
> parallelization, as well as enable runtime change of number of threads.
> Before this patch, the following was needed to use parallelization:
> - enable parallelization through use_parallel_build in NBDB
> - use --dummy-numa to select number of threads.
> This second part was needed as otherwise as many threads as cores in the
> system
> were used, while parallelization showed some performance improvement only
> until
> using around 4 (or maybe 8) threads.
>
> With this patch, the number of threads used for lflow parallel build can be
> specified either:
> - at startup, using --n-threads=<N> as ovn-northd command line option
> - using unixctl
> If the number of threads specified is > 1, then parallelization is enabled.
> If the number is 1, parallelization is disabled.
> If the number is < 1, parallelization is disabled at startup and a warning
> is logged.
> If the number is > 256, parallelization is enabled (with 256 threads) and
> a warning is logged.
>
> The following unixctl have been added:
> - set-n-threads <N>: set the number of treads used.
> - get-n-threads: returns the number of threads used
> If the number of threads is within <2-256> bounds, parallelization is enabled.
> If the number of thread is 1, parallelization is disabled.
> Otherwise an error is thrown.
>
> Note that, if set-n-threads failed for any reason (e.g. failure to setup some
> semaphore), parallelization is disabled, and get-n-thread will return 1.
>
> Reported-at: https://bugzilla.redhat.com/show_bug.cgi?id=2078552
> Signed-off-by: Xavier Simonart <[email protected]>
>
> ---
> v2: - handled Dumitru's comments
> - added missing mutex_destroy
> - fixed issue when use_logical_dp_group is enabled after northd startup
> - rebased on top of main
> v3:
> - fix mutex_destroy issue
> v4:
> - handled Mark's comments
> - rebased on top of main
> ---
> NEWS | 7 +
> lib/ovn-parallel-hmap.c | 291 +++++++++++++++++++++-----------------
> lib/ovn-parallel-hmap.h | 30 ++--
> northd/northd.c | 176 ++++++++++++-----------
> northd/northd.h | 1 +
> northd/ovn-northd-ddlog.c | 6 -
> northd/ovn-northd.8.xml | 70 +++++----
> northd/ovn-northd.c | 68 ++++++++-
> tests/ovn-macros.at | 59 ++------
> tests/ovn-northd.at | 109 ++++++++++++++
> 10 files changed, 495 insertions(+), 322 deletions(-)
>
Hi Xavier,
Sorry, I should've mentioned this earlier but I forgot. Can you please
also add an option to ovn-ctl to allow specifiying the number of threads?
Something along the lines of:
ovn-ctl start_northd .. --ovn-northd-n-threads=42
This would allow CMSs that use ovn-ctl [0] to enable parallelization.
For ovn-org/ovn-kubernetes in particular, we could even make it such
that in the ovn-org/ovn repo the ovn-kubernetes CI run we do runs both
with and without northd parallelization.
Overall the code looks good but given that we probably also want the
ovn-ctl change I also left a few very small style comments below.
Thanks,
Dumitru
> diff --git a/NEWS b/NEWS
> index 244824e3f..6e489df32 100644
> --- a/NEWS
> +++ b/NEWS
> @@ -9,6 +9,13 @@ Post v22.03.0
> implicit drop behavior on logical switches with ACLs applied.
> - Support (LSP.options:qos_min_rate) to guarantee minimal bandwidth
> available
> for a logical port.
> + - Changed the way to enable northd parallelization.
> + Removed support for:
> + - use_parallel_build in NBDB.
> + - --dummy-numa in northd cmdline.
> + Added support for:
> + - --n-threads=<N> in northd cmdline.
> + - set-n-threads/get-n-threads unixctls.
>
> OVN v22.03.0 - 11 Mar 2022
> --------------------------
> diff --git a/lib/ovn-parallel-hmap.c b/lib/ovn-parallel-hmap.c
> index 7edc4c0b6..c4d8cee16 100644
> --- a/lib/ovn-parallel-hmap.c
> +++ b/lib/ovn-parallel-hmap.c
> @@ -38,14 +38,10 @@ VLOG_DEFINE_THIS_MODULE(ovn_parallel_hmap);
>
> #ifndef OVS_HAS_PARALLEL_HMAP
>
> -#define WORKER_SEM_NAME "%x-%p-%x"
> +#define WORKER_SEM_NAME "%x-%p-%"PRIxSIZE
> #define MAIN_SEM_NAME "%x-%p-main"
>
> -/* These are accessed under mutex inside add_worker_pool().
> - * They do not need to be atomic.
> - */
> static atomic_bool initial_pool_setup = ATOMIC_VAR_INIT(false);
> -static bool can_parallelize = false;
>
> /* This is set only in the process of exit and the set is
> * accompanied by a fence. It does not need to be atomic or be
> @@ -57,18 +53,18 @@ static struct ovs_list worker_pools =
> OVS_LIST_INITIALIZER(&worker_pools);
>
> static struct ovs_mutex init_mutex = OVS_MUTEX_INITIALIZER;
>
> -static int pool_size;
> +static size_t pool_size = 1;
>
> static int sembase;
>
> static void worker_pool_hook(void *aux OVS_UNUSED);
> -static void setup_worker_pools(bool force);
> +static void setup_worker_pools(void);
> static void merge_list_results(struct worker_pool *pool OVS_UNUSED,
> void *fin_result, void *result_frags,
> - int index);
> + size_t index);
> static void merge_hash_results(struct worker_pool *pool OVS_UNUSED,
> void *fin_result, void *result_frags,
> - int index);
> + size_t index);
>
> bool
> ovn_stop_parallel_processing(void)
> @@ -76,107 +72,184 @@ ovn_stop_parallel_processing(void)
> return workers_must_exit;
> }
>
> -bool
> -ovn_can_parallelize_hashes(bool force_parallel)
> +size_t
> +ovn_get_worker_pool_size(void)
> {
> - bool test = false;
> + return pool_size;
> +}
>
> - if (atomic_compare_exchange_strong(
> - &initial_pool_setup,
> - &test,
> - true)) {
> - ovs_mutex_lock(&init_mutex);
> - setup_worker_pools(force_parallel);
> - ovs_mutex_unlock(&init_mutex);
> +static void
> +stop_controls(struct worker_pool *pool)
> +{
> + if (pool->controls) {
> + workers_must_exit = true;
> +
> + /* unlock threads */
Nit: /* Unlock threads. */
> + for (size_t i = 0; i < pool->size ; i++) {
> + if (pool->controls[i].fire != SEM_FAILED) {
> + sem_post(pool->controls[i].fire);
> + }
> + }
> +
> + /* Wait completion */
Nit: /* Wait for completion. */
> + for (size_t i = 0; i < pool->size ; i++) {
> + if (pool->controls[i].worker) {
> + pthread_join(pool->controls[i].worker, NULL);
> + pool->controls[i].worker = 0;
> + }
> + }
> + workers_must_exit = false;
> }
> - return can_parallelize;
> }
>
> -struct worker_pool *
> -ovn_add_worker_pool(void *(*start)(void *))
> +static void
> +free_controls(struct worker_pool *pool)
> +{
> + char sem_name[256];
> + if (pool->controls) {
> + /* Close/unlink semaphores */
Nit: /* Close/unlink semaphores. */
> + for (size_t i = 0; i < pool->size; i++) {
> + ovs_mutex_destroy(&pool->controls[i].mutex);
> + if (pool->controls[i].fire != SEM_FAILED) {
> + sem_close(pool->controls[i].fire);
> + sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, i);
> + sem_unlink(sem_name);
> + } else {
> + /* This and following controls are not initialized */
Nit: indent needs one extra space.
> + break;
> + }
> + }
> + free(pool->controls);
> + pool->controls = NULL;
> + }
> +}
> +
> +static void
> +free_pool(struct worker_pool *pool)
> +{
> + char sem_name[256];
> + stop_controls(pool);
> + free_controls(pool);
> + if (pool->done != SEM_FAILED) {
> + sem_close(pool->done);
> + sprintf(sem_name, MAIN_SEM_NAME, sembase, pool);
> + sem_unlink(sem_name);
> + }
> + free(pool);
> +}
> +
> +static int
> +init_controls(struct worker_pool *pool)
> {
> - struct worker_pool *new_pool = NULL;
> struct worker_control *new_control;
> + char sem_name[256];
> +
> + pool->controls = xmalloc(sizeof(struct worker_control) * pool->size);
> + for (size_t i = 0; i < pool->size ; i++) {
> + pool->controls[i].fire = SEM_FAILED;
> + }
> + for (size_t i = 0; i < pool->size; i++) {
> + new_control = &pool->controls[i];
> + new_control->id = i;
> + new_control->done = pool->done;
> + new_control->data = NULL;
> + new_control->pool = pool;
> + new_control->worker = 0;
> + ovs_mutex_init(&new_control->mutex);
> + new_control->finished = ATOMIC_VAR_INIT(false);
> + sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, i);
> + new_control->fire = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
> + if (new_control->fire == SEM_FAILED) {
> + free_controls(pool);
> + return -1;
> + }
> + }
> + return 0;
> +}
> +
> +static void
> +init_threads(struct worker_pool *pool, void *(*start)(void *))
> +{
> + for (size_t i = 0; i < pool_size; i++) {
> + pool->controls[i].worker =
> + ovs_thread_create("worker pool helper", start,
> &pool->controls[i]);
> + }
> + ovs_list_push_back(&worker_pools, &pool->list_node);
> +}
> +
> +enum pool_update_status
> +ovn_update_worker_pool(size_t requested_pool_size,
> + struct worker_pool **pool, void *(*start)(void *))
> +{
> bool test = false;
> - int i;
> char sem_name[256];
>
> - /* Belt and braces - initialize the pool system just in case if
> - * if it is not yet initialized.
> - */
> + if (requested_pool_size == pool_size) {
> + return POOL_UNCHANGED;
> + }
> +
> if (atomic_compare_exchange_strong(
> &initial_pool_setup,
> &test,
> true)) {
> ovs_mutex_lock(&init_mutex);
> - setup_worker_pools(false);
> + setup_worker_pools();
> ovs_mutex_unlock(&init_mutex);
> }
> -
> ovs_mutex_lock(&init_mutex);
> - if (can_parallelize) {
> - new_pool = xmalloc(sizeof(struct worker_pool));
> - new_pool->size = pool_size;
> - new_pool->controls = NULL;
> - sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
> - new_pool->done = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
> - if (new_pool->done == SEM_FAILED) {
> - goto cleanup;
> - }
> -
> - new_pool->controls =
> - xmalloc(sizeof(struct worker_control) * new_pool->size);
> -
> - for (i = 0; i < new_pool->size; i++) {
> - new_control = &new_pool->controls[i];
> - new_control->id = i;
> - new_control->done = new_pool->done;
> - new_control->data = NULL;
> - ovs_mutex_init(&new_control->mutex);
> - new_control->finished = ATOMIC_VAR_INIT(false);
> - sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i);
> - new_control->fire = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
> - if (new_control->fire == SEM_FAILED) {
> + pool_size = requested_pool_size;
> + VLOG_INFO("Setting thread count to %"PRIuSIZE, pool_size);
> +
> + if (*pool == NULL) {
> + if (pool_size > 1) {
> + VLOG_INFO("Creating new pool with size %"PRIuSIZE, pool_size);
> + *pool = xmalloc(sizeof(struct worker_pool));
> + (*pool)->size = pool_size;
> + (*pool)->controls = NULL;
> + sprintf(sem_name, MAIN_SEM_NAME, sembase, *pool);
> + (*pool)->done = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
> + if ((*pool)->done == SEM_FAILED) {
> goto cleanup;
> }
> + if (init_controls(*pool) == -1) {
> + goto cleanup;
> + }
> + init_threads(*pool, start);
> }
> -
> - for (i = 0; i < pool_size; i++) {
> - new_pool->controls[i].worker =
> - ovs_thread_create("worker pool helper", start,
> &new_pool->controls[i]);
> + } else {
> + if (pool_size > 1) {
> + VLOG_INFO("Changing size of existing pool to %"PRIuSIZE,
> + pool_size);
> + stop_controls(*pool);
> + free_controls(*pool);
> + ovs_list_remove(&(*pool)->list_node);
> + (*pool)->size = pool_size;
> + if (init_controls(*pool) == -1) {
> + goto cleanup;
> + }
> + init_threads(*pool, start);
> + } else {
> + VLOG_INFO("Deleting existing pool");
> + worker_pool_hook(NULL);
> + *pool = NULL;
> }
> - ovs_list_push_back(&worker_pools, &new_pool->list_node);
> }
> ovs_mutex_unlock(&init_mutex);
> - return new_pool;
> -cleanup:
> + return POOL_UPDATED;
>
> +cleanup:
> /* Something went wrong when opening semaphores. In this case
> * it is better to shut off parallel procesing altogether
> */
> -
> - VLOG_INFO("Failed to initialize parallel processing, error %d", errno);
> - can_parallelize = false;
> - if (new_pool->controls) {
> - for (i = 0; i < new_pool->size; i++) {
> - if (new_pool->controls[i].fire != SEM_FAILED) {
> - sem_close(new_pool->controls[i].fire);
> - sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i);
> - sem_unlink(sem_name);
> - break; /* semaphores past this one are uninitialized */
> - }
> - }
> - }
> - if (new_pool->done != SEM_FAILED) {
> - sem_close(new_pool->done);
> - sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
> - sem_unlink(sem_name);
> - }
> + VLOG_ERR("Failed to initialize parallel processing: %s",
> + ovs_strerror(errno));
> + free_pool(*pool);
> + *pool = NULL;
> + pool_size = 1;
> ovs_mutex_unlock(&init_mutex);
> - return NULL;
> + return POOL_UPDATE_FAILED;
> }
>
> -
> /* Initializes 'hmap' as an empty hash table with mask N. */
> void
> ovn_fast_hmap_init(struct hmap *hmap, ssize_t mask)
> @@ -225,9 +298,9 @@ ovn_run_pool_callback(struct worker_pool *pool,
> void *fin_result, void *result_frags,
> void (*helper_func)(struct worker_pool *pool,
> void *fin_result,
> - void *result_frags, int index))
> + void *result_frags, size_t index))
> {
> - int index, completed;
> + size_t index, completed;
>
> /* Ensure that all worker threads see the same data as the
> * main thread.
> @@ -367,9 +440,7 @@ ovn_update_hashrow_locks(struct hmap *lflows, struct
> hashrow_locks *hrl)
>
> static void
> worker_pool_hook(void *aux OVS_UNUSED) {
> - int i;
> static struct worker_pool *pool;
> - char sem_name[256];
>
> workers_must_exit = true;
>
> @@ -380,55 +451,15 @@ worker_pool_hook(void *aux OVS_UNUSED) {
> */
> atomic_thread_fence(memory_order_acq_rel);
>
> - /* Wake up the workers after the must_exit flag has been set */
> -
> - LIST_FOR_EACH (pool, list_node, &worker_pools) {
> - for (i = 0; i < pool->size ; i++) {
> - sem_post(pool->controls[i].fire);
> - }
> - for (i = 0; i < pool->size ; i++) {
> - pthread_join(pool->controls[i].worker, NULL);
> - }
> - for (i = 0; i < pool->size ; i++) {
> - sem_close(pool->controls[i].fire);
> - sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, i);
> - sem_unlink(sem_name);
> - }
> - sem_close(pool->done);
> - sprintf(sem_name, MAIN_SEM_NAME, sembase, pool);
> - sem_unlink(sem_name);
> + LIST_FOR_EACH_SAFE (pool, list_node, &worker_pools) {
> + ovs_list_remove(&pool->list_node);
> + free_pool(pool);
> }
> }
>
> static void
> -setup_worker_pools(bool force) {
> - int cores, nodes;
> -
> - ovs_numa_init();
> - nodes = ovs_numa_get_n_numas();
> - if (nodes == OVS_NUMA_UNSPEC || nodes <= 0) {
> - nodes = 1;
> - }
> - cores = ovs_numa_get_n_cores();
> -
> - /* If there is no NUMA config, use 4 cores.
> - * If there is NUMA config use half the cores on
> - * one node so that the OS does not start pushing
> - * threads to other nodes.
> - */
> - if (cores == OVS_CORE_UNSPEC || cores <= 0) {
> - /* If there is no NUMA we can try the ovs-threads routine.
> - * It falls back to sysconf and/or affinity mask.
> - */
> - cores = count_cpu_cores();
> - pool_size = cores;
> - } else {
> - pool_size = cores / nodes;
> - }
> - if ((pool_size < 4) && force) {
> - pool_size = 4;
> - }
> - can_parallelize = (pool_size >= 3);
> +setup_worker_pools(void)
> +{
> fatal_signal_add_hook(worker_pool_hook, NULL, NULL, true);
> sembase = random_uint32();
> }
> @@ -436,7 +467,7 @@ setup_worker_pools(bool force) {
> static void
> merge_list_results(struct worker_pool *pool OVS_UNUSED,
> void *fin_result, void *result_frags,
> - int index)
> + size_t index)
> {
> struct ovs_list *result = (struct ovs_list *)fin_result;
> struct ovs_list *res_frags = (struct ovs_list *)result_frags;
> @@ -450,7 +481,7 @@ merge_list_results(struct worker_pool *pool OVS_UNUSED,
> static void
> merge_hash_results(struct worker_pool *pool OVS_UNUSED,
> void *fin_result, void *result_frags,
> - int index)
> + size_t index)
> {
> struct hmap *result = (struct hmap *)fin_result;
> struct hmap *res_frags = (struct hmap *)result_frags;
> diff --git a/lib/ovn-parallel-hmap.h b/lib/ovn-parallel-hmap.h
> index 0f7d68770..72b31b489 100644
> --- a/lib/ovn-parallel-hmap.h
> +++ b/lib/ovn-parallel-hmap.h
> @@ -81,21 +81,30 @@ struct worker_control {
> sem_t *done; /* Work completion semaphore - sem_post on completion. */
> struct ovs_mutex mutex; /* Guards the data. */
> void *data; /* Pointer to data to be processed. */
> - void *workload; /* back-pointer to the worker pool structure. */
> pthread_t worker;
> + struct worker_pool *pool;
> };
>
> struct worker_pool {
> - int size; /* Number of threads in the pool. */
> + size_t size; /* Number of threads in the pool. */
> struct ovs_list list_node; /* List of pools - used in cleanup/exit. */
> struct worker_control *controls; /* "Handles" in this pool. */
> sem_t *done; /* Work completion semaphorew. */
> };
>
> -/* Add a worker pool for thread function start() which expects a pointer to
> - * a worker_control structure as an argument. */
> +/* Return pool size; bigger than 1 means parallelization has been enabled. */
> +size_t ovn_get_worker_pool_size(void);
>
> -struct worker_pool *ovn_add_worker_pool(void *(*start)(void *));
> +enum pool_update_status {
> + POOL_UNCHANGED, /* no change to pool */
> + POOL_UPDATED, /* pool has been updated */
> + POOL_UPDATE_FAILED, /* pool update failed; parallelization disabled */
> +};
Nit: I would add an empty line here.
> +/* Add/delete a worker pool for thread function start() which expects a
> pointer
> + * to a worker_control structure as an argument. Return true if updated */
> +enum pool_update_status ovn_update_worker_pool(size_t requested_pool_size,
> + struct worker_pool **,
> + void *(*start)(void *));
>
> /* Setting this to true will make all processing threads exit */
>
> @@ -140,7 +149,8 @@ void ovn_run_pool_list(struct worker_pool *pool,
> void ovn_run_pool_callback(struct worker_pool *pool, void *fin_result,
> void *result_frags,
> void (*helper_func)(struct worker_pool *pool,
> - void *fin_result, void *result_frags, int index));
> + void *fin_result, void *result_frags,
> + size_t index));
>
>
> /* Returns the first node in 'hmap' in the bucket in which the given 'hash'
> @@ -251,17 +261,17 @@ static inline void init_hash_row_locks(struct
> hashrow_locks *hrl)
> hrl->row_locks = NULL;
> }
>
> -bool ovn_can_parallelize_hashes(bool force_parallel);
> -
> /* Use the OVN library functions for stuff which OVS has not defined
> * If OVS has defined these, they will still compile using the OVN
> * local names, but will be dropped by the linker in favour of the OVS
> * supplied functions.
> */
> +#define update_worker_pool(requested_pool_size, existing_pool, func) \
> + ovn_update_worker_pool(requested_pool_size, existing_pool, func)
>
> -#define update_hashrow_locks(lflows, hrl) ovn_update_hashrow_locks(lflows,
> hrl)
> +#define get_worker_pool_size() ovn_get_worker_pool_size()
>
> -#define can_parallelize_hashes(force) ovn_can_parallelize_hashes(force)
> +#define update_hashrow_locks(lflows, hrl) ovn_update_hashrow_locks(lflows,
> hrl)
>
> #define stop_parallel_processing() ovn_stop_parallel_processing()
>
> diff --git a/northd/northd.c b/northd/northd.c
> index 67c39df88..48426c801 100644
> --- a/northd/northd.c
> +++ b/northd/northd.c
> @@ -59,6 +59,7 @@
> VLOG_DEFINE_THIS_MODULE(northd);
>
> static bool controller_event_en;
> +static bool lflow_hash_lock_initialized = false;
>
> static bool check_lsp_is_up;
>
> @@ -4740,7 +4741,13 @@ ovn_lflow_equal(const struct ovn_lflow *a, const
> struct ovn_datapath *od,
> /* If this option is 'true' northd will combine logical flows that differ by
> * logical datapath only by creating a datapath group. */
> static bool use_logical_dp_groups = false;
> -static bool use_parallel_build = false;
> +
> +enum {
> + STATE_NULL, /* parallelization is off */
> + STATE_INIT_HASH_SIZES, /* parallelization is on; hashes sizing needed
> */
> + STATE_USE_PARALLELIZATION /* parallelization is on */
> +};
> +static int parallelization_state = STATE_NULL;
>
> static void
> ovn_lflow_init(struct ovn_lflow *lflow, struct ovn_datapath *od,
> @@ -4759,7 +4766,8 @@ ovn_lflow_init(struct ovn_lflow *lflow, struct
> ovn_datapath *od,
> lflow->ctrl_meter = ctrl_meter;
> lflow->dpg = NULL;
> lflow->where = where;
> - if (use_parallel_build && use_logical_dp_groups) {
> + if ((parallelization_state != STATE_NULL)
> + && use_logical_dp_groups) {
> ovs_mutex_init(&lflow->odg_lock);
> }
> }
> @@ -4773,7 +4781,7 @@ ovn_dp_group_add_with_reference(struct ovn_lflow
> *lflow_ref,
> return false;
> }
>
> - if (use_parallel_build) {
> + if (parallelization_state == STATE_USE_PARALLELIZATION) {
> ovs_mutex_lock(&lflow_ref->odg_lock);
> hmapx_add(&lflow_ref->od_group, od);
> ovs_mutex_unlock(&lflow_ref->odg_lock);
> @@ -4803,9 +4811,23 @@ static struct ovs_mutex
> lflow_hash_locks[LFLOW_HASH_LOCK_MASK + 1];
> static void
> lflow_hash_lock_init(void)
> {
> - for (size_t i = 0; i < LFLOW_HASH_LOCK_MASK + 1; i++) {
> - ovs_mutex_init(&lflow_hash_locks[i]);
> + if (!lflow_hash_lock_initialized) {
> + for (size_t i = 0; i < LFLOW_HASH_LOCK_MASK + 1; i++) {
> + ovs_mutex_init(&lflow_hash_locks[i]);
> + }
> + lflow_hash_lock_initialized = true;
> + }
> +}
> +
> +static void
> +lflow_hash_lock_destroy(void)
> +{
> + if (lflow_hash_lock_initialized) {
> + for (size_t i = 0; i < LFLOW_HASH_LOCK_MASK + 1; i++) {
> + ovs_mutex_destroy(&lflow_hash_locks[i]);
> + }
> }
> + lflow_hash_lock_initialized = false;
> }
>
> /* This thread-local var is used for parallel lflow building when dp-groups
> is
> @@ -4853,7 +4875,7 @@ do_ovn_lflow_add(struct hmap *lflow_map, struct
> ovn_datapath *od,
> nullable_xstrdup(ctrl_meter),
> ovn_lflow_hint(stage_hint), where);
> hmapx_add(&lflow->od_group, od);
> - if (!use_parallel_build) {
> + if (parallelization_state != STATE_USE_PARALLELIZATION) {
> hmap_insert(lflow_map, &lflow->hmap_node, hash);
> } else {
> hmap_insert_fast(lflow_map, &lflow->hmap_node, hash);
> @@ -4896,7 +4918,8 @@ ovn_lflow_add_at_with_hash(struct hmap *lflow_map,
> struct ovn_datapath *od,
> struct ovn_lflow *lflow;
>
> ovs_assert(ovn_stage_to_datapath_type(stage) ==
> ovn_datapath_get_type(od));
> - if (use_logical_dp_groups && use_parallel_build) {
> + if (use_logical_dp_groups
> + && (parallelization_state == STATE_USE_PARALLELIZATION)) {
> lflow = do_ovn_lflow_add_pd(lflow_map, od, hash, stage, priority,
> match, actions, io_port, stage_hint,
> where,
> ctrl_meter);
> @@ -4982,6 +5005,10 @@ static void
> ovn_lflow_destroy(struct hmap *lflows, struct ovn_lflow *lflow)
> {
> if (lflow) {
> + if ((parallelization_state != STATE_NULL)
> + && use_logical_dp_groups) {
> + ovs_mutex_destroy(&lflow->odg_lock);
> + }
> if (lflows) {
> hmap_remove(lflows, &lflow->hmap_node);
> }
> @@ -13925,15 +13952,10 @@ build_lswitch_and_lrouter_iterate_by_op(struct
> ovn_port *op,
> &lsi->actions);
> }
>
> -struct lflows_thread_pool {
> - struct worker_pool *pool;
> -};
> -
> static void *
> build_lflows_thread(void *arg)
> {
> struct worker_control *control = (struct worker_control *) arg;
> - struct lflows_thread_pool *workload;
> struct lswitch_flow_build_info *lsi;
>
> struct ovn_datapath *od;
> @@ -13944,17 +13966,16 @@ build_lflows_thread(void *arg)
>
> while (!stop_parallel_processing()) {
> wait_for_work(control);
> - workload = (struct lflows_thread_pool *) control->workload;
> lsi = (struct lswitch_flow_build_info *) control->data;
> if (stop_parallel_processing()) {
> return NULL;
> }
> thread_lflow_counter = 0;
> - if (lsi && workload) {
> + if (lsi) {
> /* Iterate over bucket ThreadID, ThreadID+size, ... */
> for (bnum = control->id;
> bnum <= lsi->datapaths->mask;
> - bnum += workload->pool->size)
> + bnum += control->pool->size)
> {
> HMAP_FOR_EACH_IN_PARALLEL (od, key_node, bnum,
> lsi->datapaths) {
> if (stop_parallel_processing()) {
> @@ -13965,7 +13986,7 @@ build_lflows_thread(void *arg)
> }
> for (bnum = control->id;
> bnum <= lsi->ports->mask;
> - bnum += workload->pool->size)
> + bnum += control->pool->size)
> {
> HMAP_FOR_EACH_IN_PARALLEL (op, key_node, bnum, lsi->ports) {
> if (stop_parallel_processing()) {
> @@ -13976,7 +13997,7 @@ build_lflows_thread(void *arg)
> }
> for (bnum = control->id;
> bnum <= lsi->lbs->mask;
> - bnum += workload->pool->size)
> + bnum += control->pool->size)
> {
> HMAP_FOR_EACH_IN_PARALLEL (lb, hmap_node, bnum, lsi->lbs) {
> if (stop_parallel_processing()) {
> @@ -13997,7 +14018,7 @@ build_lflows_thread(void *arg)
> }
> for (bnum = control->id;
> bnum <= lsi->igmp_groups->mask;
> - bnum += workload->pool->size)
> + bnum += control->pool->size)
> {
> HMAP_FOR_EACH_IN_PARALLEL (
> igmp_group, hmap_node, bnum, lsi->igmp_groups) {
> @@ -14016,39 +14037,13 @@ build_lflows_thread(void *arg)
> return NULL;
> }
>
> -static bool pool_init_done = false;
> -static struct lflows_thread_pool *build_lflows_pool = NULL;
> -
> -static void
> -init_lflows_thread_pool(void)
> -{
> - int index;
> -
> - if (!pool_init_done) {
> - struct worker_pool *pool = add_worker_pool(build_lflows_thread);
> - pool_init_done = true;
> - if (pool) {
> - build_lflows_pool = xmalloc(sizeof(*build_lflows_pool));
> - build_lflows_pool->pool = pool;
> - for (index = 0; index < build_lflows_pool->pool->size; index++) {
> - build_lflows_pool->pool->controls[index].workload =
> - build_lflows_pool;
> - }
> - }
> - }
> -}
> -
> -/* TODO: replace hard cutoffs by configurable via commands. These are
> - * temporary defines to determine single-thread to multi-thread processing
> - * cutoff.
> - * Setting to 1 forces "all parallel" lflow build.
> - */
> +static struct worker_pool *build_lflows_pool = NULL;
>
> static void
> noop_callback(struct worker_pool *pool OVS_UNUSED,
> void *fin_result OVS_UNUSED,
> void *result_frags OVS_UNUSED,
> - int index OVS_UNUSED)
> + size_t index OVS_UNUSED)
> {
> /* Do nothing */
> }
> @@ -14088,28 +14083,21 @@ build_lswitch_and_lrouter_flows(const struct hmap
> *datapaths,
>
> char *svc_check_match = xasprintf("eth.dst == %s", svc_monitor_mac);
>
> - if (use_parallel_build) {
> - init_lflows_thread_pool();
> - if (!can_parallelize_hashes(false)) {
> - use_parallel_build = false;
> - }
> - }
> -
> - if (use_parallel_build) {
> + if (parallelization_state == STATE_USE_PARALLELIZATION) {
> struct hmap *lflow_segs;
> struct lswitch_flow_build_info *lsiv;
> int index;
>
> - lsiv = xcalloc(sizeof(*lsiv), build_lflows_pool->pool->size);
> + lsiv = xcalloc(sizeof(*lsiv), build_lflows_pool->size);
> if (use_logical_dp_groups) {
> lflow_segs = NULL;
> } else {
> - lflow_segs = xcalloc(sizeof(*lflow_segs),
> build_lflows_pool->pool->size);
> + lflow_segs = xcalloc(sizeof(*lflow_segs),
> build_lflows_pool->size);
> }
>
> /* Set up "work chunks" for each thread to work on. */
>
> - for (index = 0; index < build_lflows_pool->pool->size; index++) {
> + for (index = 0; index < build_lflows_pool->size; index++) {
> if (use_logical_dp_groups) {
> /* if dp_groups are in use we lock a shared lflows hash
> * on a per-bucket level instead of merging hash frags */
> @@ -14132,19 +14120,19 @@ build_lswitch_and_lrouter_flows(const struct hmap
> *datapaths,
> ds_init(&lsiv[index].match);
> ds_init(&lsiv[index].actions);
>
> - build_lflows_pool->pool->controls[index].data = &lsiv[index];
> + build_lflows_pool->controls[index].data = &lsiv[index];
> }
>
> /* Run thread pool. */
> if (use_logical_dp_groups) {
> - run_pool_callback(build_lflows_pool->pool, NULL, NULL,
> + run_pool_callback(build_lflows_pool, NULL, NULL,
> noop_callback);
> - fix_flow_map_size(lflows, lsiv, build_lflows_pool->pool->size);
> + fix_flow_map_size(lflows, lsiv, build_lflows_pool->size);
> } else {
> - run_pool_hash(build_lflows_pool->pool, lflows, lflow_segs);
> + run_pool_hash(build_lflows_pool, lflows, lflow_segs);
> }
>
> - for (index = 0; index < build_lflows_pool->pool->size; index++) {
> + for (index = 0; index < build_lflows_pool->size; index++) {
> ds_destroy(&lsiv[index].match);
> ds_destroy(&lsiv[index].actions);
> }
> @@ -14280,8 +14268,39 @@ ovn_sb_set_lflow_logical_dp_group(
> }
>
> static ssize_t max_seen_lflow_size = 128;
> -static bool needs_parallel_init = true;
> -static bool reset_parallel = false;
> +
> +void run_update_worker_pool(int n_threads)
> +{
> + /* If number of threads has been updated (or initially set),
> + * update the worker pool. */
> + if (update_worker_pool(n_threads, &build_lflows_pool,
> + build_lflows_thread) != POOL_UNCHANGED) {
> + /* worker pool was updated */
> + if (get_worker_pool_size() <= 1) {
> + /* destroy potentially created lflow_hash_lock */
> + lflow_hash_lock_destroy();
> + parallelization_state = STATE_NULL;
> + } else if (parallelization_state != STATE_USE_PARALLELIZATION) {
> + if (use_logical_dp_groups) {
> + lflow_hash_lock_init();
> + parallelization_state = STATE_INIT_HASH_SIZES;
> + } else {
> + parallelization_state = STATE_USE_PARALLELIZATION;
> + }
> + }
> + }
> +}
> +
> +static void worker_pool_init_for_ldp(void)
> +{
> + /* If parallelization is enabled, make sure locks are initialized
> + * when ldp are used.
> + */
> + if (parallelization_state != STATE_NULL) {
> + lflow_hash_lock_init();
> + parallelization_state = STATE_INIT_HASH_SIZES;
> + }
> +}
>
> static void
> build_mcast_groups(struct lflow_input *data,
> @@ -14302,30 +14321,18 @@ void build_lflows(struct lflow_input *input_data,
> build_mcast_groups(input_data, input_data->datapaths, input_data->ports,
> &mcast_groups, &igmp_groups);
>
> - if (reset_parallel) {
> - /* Parallel build was disabled before, we need to
> - * re-enable it. */
> - use_parallel_build = true;
> - reset_parallel = false;
> - }
> -
> fast_hmap_size_for(&lflows, max_seen_lflow_size);
>
> - if (use_parallel_build && use_logical_dp_groups &&
> - needs_parallel_init) {
> - lflow_hash_lock_init();
> - needs_parallel_init = false;
> - /* Disable parallel build on first run with dp_groups
> - * to determine the correct sizing of hashes. */
> - use_parallel_build = false;
> - reset_parallel = true;
> - }
> build_lswitch_and_lrouter_flows(input_data->datapaths, input_data->ports,
> input_data->port_groups, &lflows,
> &mcast_groups, &igmp_groups,
> input_data->meter_groups,
> input_data->lbs,
> input_data->bfd_connections);
>
> + if (parallelization_state == STATE_INIT_HASH_SIZES) {
> + parallelization_state = STATE_USE_PARALLELIZATION;
> + }
> +
> /* Parallel build may result in a suboptimal hash. Resize the
> * hash to a correct size before doing lookups */
>
> @@ -15567,12 +15574,13 @@ ovnnb_db_run(struct northd_input *input_data,
>
> smap_destroy(&options);
>
> - use_parallel_build =
> - (smap_get_bool(&nb->options, "use_parallel_build", false) &&
> - can_parallelize_hashes(false));
> -
> + bool old_use_ldp = use_logical_dp_groups;
> use_logical_dp_groups = smap_get_bool(&nb->options,
> "use_logical_dp_groups", true);
> + if (use_logical_dp_groups && !old_use_ldp) {
> + worker_pool_init_for_ldp();
> + }
> +
> use_ct_inv_match = smap_get_bool(&nb->options,
> "use_ct_inv_match", true);
>
> diff --git a/northd/northd.h b/northd/northd.h
> index 2d804a22e..fe8dad03a 100644
> --- a/northd/northd.h
> +++ b/northd/northd.h
> @@ -107,5 +107,6 @@ void build_bfd_table(struct lflow_input *input_data,
> struct hmap *bfd_connections, struct hmap *ports);
> void bfd_cleanup_connections(struct lflow_input *input_data,
> struct hmap *bfd_map);
> +void run_update_worker_pool(int n_threads);
>
> #endif /* NORTHD_H */
> diff --git a/northd/ovn-northd-ddlog.c b/northd/ovn-northd-ddlog.c
> index 718d052cc..e9afda4c6 100644
> --- a/northd/ovn-northd-ddlog.c
> +++ b/northd/ovn-northd-ddlog.c
> @@ -1084,7 +1084,6 @@ parse_options(int argc OVS_UNUSED, char *argv[]
> OVS_UNUSED,
> SSL_OPTION_ENUMS,
> OPT_DRY_RUN,
> OPT_DDLOG_RECORD,
> - OPT_DUMMY_NUMA,
> };
> static const struct option long_options[] = {
> {"ovnsb-db", required_argument, NULL, 'd'},
> @@ -1098,7 +1097,6 @@ parse_options(int argc OVS_UNUSED, char *argv[]
> OVS_UNUSED,
> OVN_DAEMON_LONG_OPTIONS,
> VLOG_LONG_OPTIONS,
> STREAM_SSL_LONG_OPTIONS,
> - {"dummy-numa", required_argument, NULL, OPT_DUMMY_NUMA},
> {NULL, 0, NULL, 0},
> };
> char *short_options =
> ovs_cmdl_long_options_to_short_options(long_options);
> @@ -1155,10 +1153,6 @@ parse_options(int argc OVS_UNUSED, char *argv[]
> OVS_UNUSED,
> *pause = true;
> break;
>
> - case OPT_DUMMY_NUMA:
> - ovs_numa_set_dummy(optarg);
> - break;
> -
> case OPT_DDLOG_RECORD:
> record_file = optarg;
> break;
> diff --git a/northd/ovn-northd.8.xml b/northd/ovn-northd.8.xml
> index 73ecfbc2d..077bd1f41 100644
> --- a/northd/ovn-northd.8.xml
> +++ b/northd/ovn-northd.8.xml
> @@ -68,53 +68,26 @@
> restarting a process or disturbing a running system.
> </p>
> </dd>
> - <dt><code>--dummy-numa</code></dt>
> + <dt><code>n-threads N</code></dt>
> <dd>
> - <p>
> - Typically, OVS uses sysfs to determine the number of NUMA nodes and
> - CPU cores that are available on a machine. The parallelization code
> - in OVN uses this information to determine if there are enough
> - resources to use parallelization. The current algorithm enables
> - parallelization if the total number of CPU cores divided by the
> - number of NUMA nodes is greater than or equal to four.
> - </p>
> -
> <p>
> In certain situations, it may be desirable to enable
> parallelization
> - on a system that otherwise would not have it allowed. The
> - <code>--dummy-numa</code> option allows for you to fake the NUMA
> - nodes and cores that OVS thinks your system has. The syntax
> consists
> - of using numbers to represent the NUMA node IDs. The number of
> times
> - that a NUMA node ID appears represents how many CPU cores that NUMA
> - node contains. So for instance, if you did the following:
> - </p>
> -
> - <p>
> - <code>--dummy-numa=0,0,0,0</code>
> - </p>
> -
> - <p>
> - it would make OVS assume that you have a single NUMA node with ID
> 0,
> - and that NUMA node consists of four CPU cores. Similarly, you could
> - do:
> - </p>
> -
> - <p>
> - <code>--dummy-numa=0,0,0,0,0,0,1,1,1,1,1,1</code>
> + on a system to decrease latency (at the potential cost of
> increasing
> + CPU usage).
> </p>
>
> <p>
> - to make OVS assume you have two NUMA nodes with IDs 0 and 1, each
> - with six CPU cores.
> + This option will cause ovn-northd to use N threads when building
> + logical flows, when N is within [2-256].
> + If N is 1, parallelization is disabled (default behavior).
> + If N is less than 1, then N is set to 1, parallelization is
> disabled
> + and a warning is logged.
> + If N is more than 256, then N is set to 256, parallelization is
> + enabled (with 256 threads) and a warning is logged.
> </p>
>
> <p>
> - Currently, the only affect this option has is on whether
> - parallelization can be enabled in ovn-northd. There are no NUMA
> node
> - or CPU core-specific actions performed by OVN. Setting
> - <code>--dummy-numa</code> in ovn-northd does not affect how other
> OVS
> - processes on the system (such as ovs-vswitchd) count the number of
> - NUMA nodes and CPU cores; this setting is local to ovn-northd.
> + ovn-northd-ddlog does not support this option.
> </p>
> </dd>
> </dl>
> @@ -210,6 +183,27 @@
> except for the northbound database client.
> </p>
> </dd>
> +
> +
> + <dt><code>set-n-threads N</code></dt>
> + <dd>
> + <p>
> + Set the number of threads used for building logical flows.
> + When N is within [2-256], parallelization is enabled.
> + When N is 1 parallelization is disabled.
> + When N is less than 1 or more than 256, an error is returned.
> + If ovn-northd fails to start parallelization (e.g. fails to setup
> + semaphores, parallelization is disabled and an error is returned.
> + </p>
> + </dd>
> +
> + <dt><code>get-n-threads</code></dt>
> + <dd>
> + <p>
> + Return the number of threads used for building logical flows.
> + </p>
> + </dd>
> +
> </dl>
> </p>
>
> diff --git a/northd/ovn-northd.c b/northd/ovn-northd.c
> index 0a0f85010..b7363b16d 100644
> --- a/northd/ovn-northd.c
> +++ b/northd/ovn-northd.c
> @@ -41,6 +41,7 @@
> #include "unixctl.h"
> #include "util.h"
> #include "openvswitch/vlog.h"
> +#include "lib/ovn-parallel-hmap.h"
>
> VLOG_DEFINE_THIS_MODULE(ovn_northd);
>
> @@ -50,12 +51,16 @@ static unixctl_cb_func ovn_northd_resume;
> static unixctl_cb_func ovn_northd_is_paused;
> static unixctl_cb_func ovn_northd_status;
> static unixctl_cb_func cluster_state_reset_cmd;
> +static unixctl_cb_func ovn_northd_set_thread_count_cmd;
> +static unixctl_cb_func ovn_northd_get_thread_count_cmd;
>
> struct northd_state {
> bool had_lock;
> bool paused;
> };
>
> +#define OVN_MAX_SUPPORTED_THREADS 256
> +
> static const char *ovnnb_db;
> static const char *ovnsb_db;
> static const char *unixctl_path;
> @@ -525,7 +530,7 @@ Options:\n\
> --ovnsb-db=DATABASE connect to ovn-sb database at DATABASE\n\
> (default: %s)\n\
> --dry-run start in paused state (do not commit db
> changes)\n\
> - --dummy-numa override default NUMA node and CPU core
> discovery\n\
> + --n-threads=N specify number of threads\n\
> --unixctl=SOCKET override default control socket name\n\
> -h, --help display this help message\n\
> -o, --options list available options\n\
> @@ -538,14 +543,14 @@ Options:\n\
>
> static void
> parse_options(int argc OVS_UNUSED, char *argv[] OVS_UNUSED,
> - bool *paused)
> + bool *paused, int *n_threads)
> {
> enum {
> OVN_DAEMON_OPTION_ENUMS,
> VLOG_OPTION_ENUMS,
> SSL_OPTION_ENUMS,
> OPT_DRY_RUN,
> - OPT_DUMMY_NUMA,
> + OPT_N_THREADS,
> };
> static const struct option long_options[] = {
> {"ovnsb-db", required_argument, NULL, 'd'},
> @@ -555,7 +560,7 @@ parse_options(int argc OVS_UNUSED, char *argv[]
> OVS_UNUSED,
> {"options", no_argument, NULL, 'o'},
> {"version", no_argument, NULL, 'V'},
> {"dry-run", no_argument, NULL, OPT_DRY_RUN},
> - {"dummy-numa", required_argument, NULL, OPT_DUMMY_NUMA},
> + {"n-threads", required_argument, NULL, OPT_N_THREADS},
> OVN_DAEMON_LONG_OPTIONS,
> VLOG_LONG_OPTIONS,
> STREAM_SSL_LONG_OPTIONS,
> @@ -611,8 +616,21 @@ parse_options(int argc OVS_UNUSED, char *argv[]
> OVS_UNUSED,
> ovn_print_version(0, 0);
> exit(EXIT_SUCCESS);
>
> - case OPT_DUMMY_NUMA:
> - ovs_numa_set_dummy(optarg);
> + case OPT_N_THREADS:
> + *n_threads = strtoul(optarg, NULL, 10);
> + if (*n_threads < 1) {
> + *n_threads = 1;
> + VLOG_WARN("Setting n_threads to %d as --n-threads option was
> "
> + "set to : [%s]", *n_threads, optarg);
> + }
> + if (*n_threads > OVN_MAX_SUPPORTED_THREADS) {
> + *n_threads = OVN_MAX_SUPPORTED_THREADS;
> + VLOG_WARN("Setting n_threads to %d as --n-threads option was
> "
> + "set to : [%s]", *n_threads, optarg);
> + }
> + if (*n_threads != 1) {
> + VLOG_INFO("Using %d threads", *n_threads);
> + }
> break;
>
> case OPT_DRY_RUN:
> @@ -668,6 +686,7 @@ main(int argc, char *argv[])
> struct unixctl_server *unixctl;
> int retval;
> bool exiting;
> + int n_threads = 1;
> struct northd_state state = {
> .had_lock = false,
> .paused = false
> @@ -677,7 +696,7 @@ main(int argc, char *argv[])
> ovs_cmdl_proctitle_init(argc, argv);
> ovn_set_program_name(argv[0]);
> service_start(&argc, &argv);
> - parse_options(argc, argv, &state.paused);
> + parse_options(argc, argv, &state.paused, &n_threads);
>
> daemonize_start(false);
>
> @@ -704,6 +723,12 @@ main(int argc, char *argv[])
> unixctl_command_register("nb-cluster-state-reset", "", 0, 0,
> cluster_state_reset_cmd,
> &reset_ovnnb_idl_min_index);
> + unixctl_command_register("parallel-build/set-n-threads", "N_THREADS", 1,
> 1,
> + ovn_northd_set_thread_count_cmd,
> + NULL);
> + unixctl_command_register("parallel-build/get-n-threads", "", 0, 0,
> + ovn_northd_get_thread_count_cmd,
> + NULL);
>
> daemonize_complete();
>
> @@ -761,6 +786,8 @@ main(int argc, char *argv[])
> unsigned int ovnnb_cond_seqno = UINT_MAX;
> unsigned int ovnsb_cond_seqno = UINT_MAX;
>
> + run_update_worker_pool(n_threads);
> +
> /* Main loop. */
> exiting = false;
>
> @@ -1017,3 +1044,30 @@ cluster_state_reset_cmd(struct unixctl_conn *conn, int
> argc OVS_UNUSED,
> poll_immediate_wake();
> unixctl_command_reply(conn, NULL);
> }
> +
> +static void
> +ovn_northd_set_thread_count_cmd(struct unixctl_conn *conn, int argc
> OVS_UNUSED,
> + const char *argv[], void *aux OVS_UNUSED)
> +{
> + int n_threads = atoi(argv[1]);
> +
> + if ((n_threads < 1) || (n_threads > OVN_MAX_SUPPORTED_THREADS)) {
> + struct ds s = DS_EMPTY_INITIALIZER;
> + ds_put_format(&s, "invalid n_threads: %d\n", n_threads);
> + unixctl_command_reply_error(conn, ds_cstr(&s));
> + ds_destroy(&s);
> + } else {
> + run_update_worker_pool(n_threads);
> + unixctl_command_reply(conn, NULL);
> + }
> +}
> +
> +static void
> +ovn_northd_get_thread_count_cmd(struct unixctl_conn *conn, int argc
> OVS_UNUSED,
> + const char *argv[] OVS_UNUSED, void *aux OVS_UNUSED)
> +{
> + struct ds s = DS_EMPTY_INITIALIZER;
> + ds_put_format(&s, "%"PRIuSIZE"\n", get_worker_pool_size());
> + unixctl_command_reply(conn, ds_cstr(&s));
> + ds_destroy(&s);
> +}
> diff --git a/tests/ovn-macros.at b/tests/ovn-macros.at
> index d78315c75..c6f0f6251 100644
> --- a/tests/ovn-macros.at
> +++ b/tests/ovn-macros.at
> @@ -170,8 +170,8 @@ ovn_start_northd() {
> ovn-northd-ddlog) northd_args="$northd_args
> --ddlog-record=${AZ:+$AZ/}northd$suffix/replay.dat -v" ;;
> esac
>
> - if test X$NORTHD_DUMMY_NUMA = Xyes; then
> - northd_args="$northd_args --dummy-numa=\"0,0,0,0\""
> + if test X$NORTHD_USE_PARALLELIZATION = Xyes; then
> + northd_args="$northd_args --n-threads=4"
> fi
>
> local name=${d_prefix}northd${suffix}
> @@ -252,10 +252,6 @@ ovn_start () {
> else
> ovn-nbctl set NB_Global . options:use_logical_dp_groups=false
> fi
> -
> - if test X$NORTHD_USE_PARALLELIZATION = Xyes; then
> - ovn-nbctl set NB_Global . options:use_parallel_build=true
> - fi
> }
>
> # Interconnection networks.
> @@ -749,16 +745,6 @@ OVS_END_SHELL_HELPERS
>
> m4_define([OVN_POPULATE_ARP], [AT_CHECK(ovn_populate_arp__, [0], [ignore])])
>
> -# Use --dummy-numa if system has low cores and we want to force
> parallelization
> -m4_define([NORTHD_DUMMY_NUMA],
> - [$(if test $(nproc) -lt 4 && test NORTHD_USE_PARALLELIZATION = yes
> - then
> - echo "yes"
> - else
> - echo "no"
> - fi)
> -])
> -
> # Defines a versions of a test with all combinations of northd and
> # datapath groups.
> m4_define([OVN_FOR_EACH_NORTHD],
> @@ -770,35 +756,14 @@ m4_define([OVN_FOR_EACH_NORTHD],
> # Some tests aren't prepared for dp groups to be enabled.
> m4_define([OVN_FOR_EACH_NORTHD_WITHOUT_DP_GROUPS],
> [m4_foreach([NORTHD_TYPE], [ovn-northd, ovn-northd-ddlog],
> - [m4_foreach([NORTHD_USE_DP_GROUPS], [no], [$1
> -])])])
> -
> -# Test parallelization with dp groups enabled and disabled
> -m4_define([OVN_NORTHD_PARALLELIZATION_DUMMY], [
> -m4_pushdef([NORTHD_TYPE], [ovn_northd])
> -m4_pushdef(NORTHD_DUMMY_NUMA, [yes])
> -[m4_foreach([NORTHD_USE_DP_GROUPS], [yes, no],
> - [[NORTHD_USE_PARALLELIZATION], [yes]
> -])]])
> -
> -m4_define([OVN_NORTHD_PARALLELIZATION_NO_DUMMY], [
> -m4_pushdef([NORTHD_TYPE], [ovn_northd])
> -m4_pushdef(NORTHD_DUMMY_NUMA, [no])
> -[m4_foreach([NORTHD_USE_DP_GROUPS], [yes, no],
> - [[NORTHD_USE_PARALLELIZATION], [yes]
> -])]])
> -
> -# Use --dummy-numa if system has low cores
> -m4_define([HOST_HAS_LOW_CORES], [
> - if test $(nproc) -lt 4; then
> - :
> - $1
> - else
> - :
> - $2
> - fi
> -])
> + [m4_foreach([NORTHD_USE_DP_GROUPS], [no],
> + [m4_foreach([NORTHD_USE_PARALLELIZATION], [yes, no], [$1
> +])])])])
> +
> +# Some tests aren't prepared for ddlog to be enabled.
> +m4_define([OVN_FOR_EACH_NORTHD_WITHOUT_DDLOG],
> + [m4_foreach([NORTHD_TYPE], [ovn-northd],
> + [m4_foreach([NORTHD_USE_DP_GROUPS], [yes, no],
> + [m4_foreach([NORTHD_USE_PARALLELIZATION], [yes, no], [$1
> +])])])])
>
> -m4_define([NORTHD_PARALLELIZATION], [
> - HOST_HAS_LOW_CORES([OVN_NORTHD_PARALLELIZATION_DUMMY],
> [OVN_NORTHD_PARALLELIZATION_NO_DUMMY])
> -])
> diff --git a/tests/ovn-northd.at b/tests/ovn-northd.at
> index 17e37fb77..1a9d5ef76 100644
> --- a/tests/ovn-northd.at
> +++ b/tests/ovn-northd.at
> @@ -7174,3 +7174,112 @@ ct_next(ct_state=new|trk);
>
> AT_CLEANUP
> ])
> +
> +OVN_FOR_EACH_NORTHD_WITHOUT_DDLOG([
> +AT_SETUP([northd-parallelization unixctl])
> +ovn_start
> +
> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 1
> +OVS_WAIT_FOR_OUTPUT([as northd ovn-appctl -t NORTHD_TYPE
> parallel-build/get-n-threads], [0], [1
> +])
> +
> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 4
> +OVS_WAIT_FOR_OUTPUT([as northd ovn-appctl -t NORTHD_TYPE
> parallel-build/get-n-threads], [0], [4
> +])
> +
> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 1
> +OVS_WAIT_FOR_OUTPUT([as northd ovn-appctl -t NORTHD_TYPE
> parallel-build/get-n-threads], [0], [1
> +])
> +
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads
> 0], [2], [],
> + [invalid n_threads: 0
> +ovn-appctl: ovn-northd: server returned an error
> +])
> +
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads
> -1], [2], [],
> + [invalid n_threads: -1
> +ovn-appctl: ovn-northd: server returned an error
> +])
> +
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads
> 300], [2], [],
> + [invalid n_threads: 300
> +ovn-appctl: ovn-northd: server returned an error
> +])
> +
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads],
> [2], [],
> + ["parallel-build/set-n-threads" command requires at least 1 arguments
> +ovn-appctl: ovn-northd: server returned an error
> +])
> +
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 1
> 2], [2], [],
> + ["parallel-build/set-n-threads" command takes at most 1 arguments
> +ovn-appctl: ovn-northd: server returned an error
> +])
> +
> +AT_CLEANUP
> +])
> +
> +OVN_FOR_EACH_NORTHD_WITHOUT_DDLOG([
> +AT_SETUP([northd-parallelization runtime])
> +ovn_start
> +
> +add_switch_ports() {
> + for port in $(seq $1 $2); do
> + logical_switch_port=lsp${port}
> + check ovn-nbctl lsp-add ls1 $logical_switch_port
> + check ovn-nbctl lsp-set-addresses $logical_switch_port dynamic
> + done
> +}
> +
> +# Build some rather heavy config and modify number of threads in the middle
> +check ovn-nbctl ls-add ls1
> +check ovn-nbctl set Logical_Switch ls1 other_config:subnet=10.1.0.0/16
> +check ovn-nbctl set Logical_Switch ls1 other_config:exclude_ips=10.1.255.254
> +
> +check ovn-nbctl lr-add lr1
> +check ovn-nbctl lsp-add ls1 lsp0 -- set Logical_Switch_Port lsp0 type=router
> options:router-port=lrp0 addresses=dynamic
> +check ovn-nbctl lrp-add lr1 lrp0 "f0:00:00:01:00:01" 10.1.255.254/16
> +check ovn-nbctl lr-nat-add lr1 snat 10.2.0.1 10.1.0.0/16
> +add_switch_ports 1 50
> +
> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 4
> +add_switch_ports 51 100
> +
> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 8
> +add_switch_ports 101 150
> +
> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 4
> +add_switch_ports 151 200
> +
> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 1
> +add_switch_ports 201 250
> +check ovn-nbctl --wait=sb sync
> +
> +# Run 3 times: one with parallelization enabled, one with disabled, and one
> while changing
> +# Compare the flows produced by the three runs
> +# Ignore IP/MAC addresses
> +ovn-sbctl dump-flows | sed 's/arp.tpa == 10.1.0..../arp.tpa == 10.1.0.??/' |
> sed 's/eth.dst == ..:..:..:..:..:../??:??:??:??:??:??/' |sort > flows1
> +
> +# Restart with 1 thread
> +for port in $(seq 1 250); do
> + logical_switch_port=lsp${port}
> + check ovn-nbctl lsp-del $logical_switch_port
> +done
> +add_switch_ports 1 250
> +check ovn-nbctl --wait=sb sync
> +ovn-sbctl dump-flows | sed 's/arp.tpa == 10.1.0..../arp.tpa == 10.1.0.??/' |
> sed 's/eth.dst == ..:..:..:..:..:../??:??:??:??:??:??/' |sort > flows2
> +AT_CHECK([diff flows1 flows2])
> +
> +# Restart with with 8 threads
> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 8
> +for port in $(seq 1 250); do
> + logical_switch_port=lsp${port}
> + check ovn-nbctl lsp-del $logical_switch_port
> +done
> +add_switch_ports 1 250
> +check ovn-nbctl --wait=sb sync
> +ovn-sbctl dump-flows | sed 's/arp.tpa == 10.1.0..../arp.tpa == 10.1.0.??/' |
> sed 's/eth.dst == ..:..:..:..:..:../??:??:??:??:??:??/' |sort > flows3
> +AT_CHECK([diff flows1 flows3])
> +
> +AT_CLEANUP
> +])
_______________________________________________
dev mailing list
[email protected]
https://mail.openvswitch.org/mailman/listinfo/ovs-dev