On 4/29/22 18:31, Xavier Simonart wrote:
> This patch is intended to change the way to enable northd lflow build
> parallelization, as well as enable runtime change of number of threads.
> Before this patch, the following was needed to use parallelization:
> - enable parallelization through use_parallel_build in NBDB
> - use --dummy-numa to select number of threads.
> This second part was needed as otherwise as many threads as cores in the 
> system
> were used, while parallelization showed some performance improvement only 
> until
> using around 4 (or maybe 8) threads.
> 
> With this patch, the number of threads used for lflow parallel build can be
> specified either:
> - at startup, using --n-threads=<N> as ovn-northd command line option
> - using unixctl
> If the number of threads specified is > 1, then parallelization is enabled.
> If the number is 1, parallelization is disabled.
> If the number is < 1 or > 256, parallelization is disabled at startup and
> a warning is logged.
> 
> Th following unixctl have been added:
> - set-n-threads <N>: set the number of treads used.
> - get-n-threads: returns the number of threads used
> If the number of threads is within <2-256> bounds, parallelization is enabled.
> If the number of thread is 1, parallelization is disabled.
> Otherwise an error is thrown.
> 
> Note that, if set-n-threads failed for any reason (e.g. failure to setup some
> semaphore), parallelization is disabled, and get-n-thread will return 1.
> 
> Reported-at: https://bugzilla.redhat.com/show_bug.cgi?id=2078552
> Signed-off-by: Xavier Simonart <[email protected]>
> ---

Hi Xavier,

Thanks for working on this, I think it's already a nice improvement on
the previous interface for parallel lflow build!

We probably also need a NEWS entry.

I have a few more minor comments below.

Thanks,
Dumitru

>  lib/ovn-parallel-hmap.c   | 273 +++++++++++++++++++++-----------------
>  lib/ovn-parallel-hmap.h   |  19 +--
>  northd/northd.c           | 148 ++++++++++-----------
>  northd/northd.h           |   1 +
>  northd/ovn-northd-ddlog.c |   6 -
>  northd/ovn-northd.8.xml   |  68 +++++-----
>  northd/ovn-northd.c       |  68 +++++++++-
>  tests/ovn-macros.at       |  59 ++------
>  tests/ovn-northd.at       | 109 +++++++++++++++
>  9 files changed, 444 insertions(+), 307 deletions(-)
> 
> diff --git a/lib/ovn-parallel-hmap.c b/lib/ovn-parallel-hmap.c
> index 7edc4c0b6..0f5140d25 100644
> --- a/lib/ovn-parallel-hmap.c
> +++ b/lib/ovn-parallel-hmap.c
> @@ -41,11 +41,7 @@ VLOG_DEFINE_THIS_MODULE(ovn_parallel_hmap);
>  #define WORKER_SEM_NAME "%x-%p-%x"
>  #define MAIN_SEM_NAME "%x-%p-main"
>  
> -/* These are accessed under mutex inside add_worker_pool().
> - * They do not need to be atomic.
> - */
>  static atomic_bool initial_pool_setup = ATOMIC_VAR_INIT(false);
> -static bool can_parallelize = false;
>  
>  /* This is set only in the process of exit and the set is
>   * accompanied by a fence. It does not need to be atomic or be
> @@ -57,12 +53,12 @@ static struct ovs_list worker_pools = 
> OVS_LIST_INITIALIZER(&worker_pools);
>  
>  static struct ovs_mutex init_mutex = OVS_MUTEX_INITIALIZER;
>  
> -static int pool_size;
> +static int pool_size = 1;

Everywhere in the ovn-parallel-hmap.[hc] files we define the pool size
as signed integer.  But that's never the case, the pool size is always >= 1.

Now that you're refactoring and improving all this code, would it be
possible to change the pool sizes to size_t wherever appropriate (e.g.,
in 'struct worker_pool' too)?

>  
>  static int sembase;
>  
>  static void worker_pool_hook(void *aux OVS_UNUSED);
> -static void setup_worker_pools(bool force);
> +static void setup_worker_pools(void);
>  static void merge_list_results(struct worker_pool *pool OVS_UNUSED,
>                                 void *fin_result, void *result_frags,
>                                 int index);
> @@ -76,107 +72,182 @@ ovn_stop_parallel_processing(void)
>      return workers_must_exit;
>  }
>  
> -bool
> -ovn_can_parallelize_hashes(bool force_parallel)
> +int
> +ovn_get_worker_pool_size(void)
>  {
> -    bool test = false;
> +    return pool_size;
> +}
>  
> -    if (atomic_compare_exchange_strong(
> -            &initial_pool_setup,
> -            &test,
> -            true)) {
> -        ovs_mutex_lock(&init_mutex);
> -        setup_worker_pools(force_parallel);
> -        ovs_mutex_unlock(&init_mutex);
> +static void
> +stop_controls(struct worker_pool *pool)
> +{
> +    if (pool->controls) {
> +        workers_must_exit = true;
> +
> +        /* unlock threads */
> +        for (int i = 0; i < pool->size ; i++) {

If we change pool->size to be size_t, 'i' can be size_t too (and in a
few more places below).

> +            if (pool->controls[i].fire != SEM_FAILED) {
> +                sem_post(pool->controls[i].fire);
> +            }
> +        }
> +
> +        /* Wait completion */
> +        for (int i = 0; i < pool->size ; i++) {
> +            if (pool->controls[i].worker) {
> +                pthread_join(pool->controls[i].worker, NULL);
> +                pool->controls[i].worker = 0;
> +            }
> +        }
> +        workers_must_exit = false;
> +    }
> +}
> +
> +static void
> +free_controls(struct worker_pool *pool)
> +{
> +    char sem_name[256];
> +    if (pool->controls) {
> +        /* Close/unlink semaphores */
> +        for (int i = 0; i < pool->size; i++) {
> +            ovs_mutex_destroy(&pool->controls[i].mutex);
> +            if (pool->controls[i].fire != SEM_FAILED) {
> +                sem_close(pool->controls[i].fire);
> +                sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, i);
> +                sem_unlink(sem_name);
> +            } else {
> +               /* This and following controls are not initialized */
> +                break;
> +            }
> +        }
> +        free(pool->controls);
> +        pool->controls = NULL;
>      }
> -    return can_parallelize;
>  }
>  
> -struct worker_pool *
> -ovn_add_worker_pool(void *(*start)(void *))
> +static void
> +free_pool(struct worker_pool *pool)
> +{
> +    char sem_name[256];
> +    stop_controls(pool);
> +    free_controls(pool);
> +    if (pool->done != SEM_FAILED) {
> +        sem_close(pool->done);
> +        sprintf(sem_name, MAIN_SEM_NAME, sembase, pool);
> +        sem_unlink(sem_name);
> +    }
> +    free(pool);
> +}
> +
> +static int
> +init_controls(struct worker_pool *pool)
>  {
> -    struct worker_pool *new_pool = NULL;
>      struct worker_control *new_control;
> +    char sem_name[256];
> +
> +    pool->controls = xmalloc(sizeof(struct worker_control) * pool->size);
> +    for (int i = 0; i < pool->size ; i++) {
> +        pool->controls[i].fire = SEM_FAILED;
> +    }
> +    for (int i = 0; i < pool->size; i++) {
> +        new_control = &pool->controls[i];
> +        new_control->id = i;
> +        new_control->done = pool->done;
> +        new_control->data = NULL;
> +        new_control->pool = pool;
> +        new_control->worker = 0;
> +        ovs_mutex_init(&new_control->mutex);
> +        new_control->finished = ATOMIC_VAR_INIT(false);
> +        sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, i);
> +        new_control->fire = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
> +        if (new_control->fire == SEM_FAILED) {
> +            free_controls(pool);
> +            return -1;
> +        }
> +    }
> +    return 0;
> +}
> +
> +static void
> +init_threads(struct worker_pool *pool, void *(*start)(void *))
> +{
> +    for (int i = 0; i < pool_size; i++) {
> +        pool->controls[i].worker =
> +            ovs_thread_create("worker pool helper", start, 
> &pool->controls[i]);
> +    }
> +    ovs_list_push_back(&worker_pools, &pool->list_node);
> +}
> +
> +bool
> +ovn_update_worker_pool(int requested_pool_size,
> +                       struct worker_pool **pool, void *(*start)(void *))
> +{
>      bool test = false;
> -    int i;
>      char sem_name[256];
>  
> -    /* Belt and braces - initialize the pool system just in case if
> -     * if it is not yet initialized.
> -     */
> +    if (requested_pool_size == pool_size) {
> +        return false;
> +    }
> +
>      if (atomic_compare_exchange_strong(
>              &initial_pool_setup,
>              &test,
>              true)) {
>          ovs_mutex_lock(&init_mutex);
> -        setup_worker_pools(false);
> +        setup_worker_pools();
>          ovs_mutex_unlock(&init_mutex);
>      }
> -
>      ovs_mutex_lock(&init_mutex);
> -    if (can_parallelize) {
> -        new_pool = xmalloc(sizeof(struct worker_pool));
> -        new_pool->size = pool_size;
> -        new_pool->controls = NULL;
> -        sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
> -        new_pool->done = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
> -        if (new_pool->done == SEM_FAILED) {
> -            goto cleanup;
> -        }
> -
> -        new_pool->controls =
> -            xmalloc(sizeof(struct worker_control) * new_pool->size);
> -
> -        for (i = 0; i < new_pool->size; i++) {
> -            new_control = &new_pool->controls[i];
> -            new_control->id = i;
> -            new_control->done = new_pool->done;
> -            new_control->data = NULL;
> -            ovs_mutex_init(&new_control->mutex);
> -            new_control->finished = ATOMIC_VAR_INIT(false);
> -            sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i);
> -            new_control->fire = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
> -            if (new_control->fire == SEM_FAILED) {
> +    pool_size = requested_pool_size;
> +    VLOG_DBG("setting thread count to %d", pool_size);

This can be VLOG_INFO, it's useful information and we should probably
always display it.

> +
> +    if (*pool == NULL) {
> +        if (pool_size > 1) {
> +            VLOG_DBG("Creating new pool with size %d", pool_size);

Same here.

> +            *pool = xmalloc(sizeof(struct worker_pool));
> +            (*pool)->size = pool_size;
> +            (*pool)->controls = NULL;
> +            sprintf(sem_name, MAIN_SEM_NAME, sembase, *pool);
> +            (*pool)->done = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
> +            if ((*pool)->done == SEM_FAILED) {
>                  goto cleanup;
>              }
> +            if (init_controls(*pool) == -1) {
> +                goto cleanup;
> +            }
> +            init_threads(*pool, start);
>          }
> -
> -        for (i = 0; i < pool_size; i++) {
> -            new_pool->controls[i].worker =
> -                ovs_thread_create("worker pool helper", start, 
> &new_pool->controls[i]);
> +    } else {
> +        if (pool_size > 1) {
> +            VLOG_DBG("Changing size of existing pool to %d", pool_size);

This too.

> +            stop_controls(*pool);
> +            free_controls(*pool);
> +            ovs_list_remove(&(*pool)->list_node);
> +            (*pool)->size = pool_size;
> +            if (init_controls(*pool) == -1) {
> +                goto cleanup;
> +            }
> +            init_threads(*pool, start);
> +        } else {
> +            VLOG_DBG("Deleting existing pool");

Here too.

> +            worker_pool_hook(NULL);
> +            *pool = NULL;
>          }
> -        ovs_list_push_back(&worker_pools, &new_pool->list_node);
>      }
>      ovs_mutex_unlock(&init_mutex);
> -    return new_pool;
> -cleanup:
> +    return true;
>  
> +cleanup:
>      /* Something went wrong when opening semaphores. In this case
>       * it is better to shut off parallel procesing altogether
>       */
> -
>      VLOG_INFO("Failed to initialize parallel processing, error %d", errno);

This should be VLOG_ERR in my opinion.

> -    can_parallelize = false;
> -    if (new_pool->controls) {
> -        for (i = 0; i < new_pool->size; i++) {
> -            if (new_pool->controls[i].fire != SEM_FAILED) {
> -                sem_close(new_pool->controls[i].fire);
> -                sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i);
> -                sem_unlink(sem_name);
> -                break; /* semaphores past this one are uninitialized */
> -            }
> -        }
> -    }
> -    if (new_pool->done != SEM_FAILED) {
> -        sem_close(new_pool->done);
> -        sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
> -        sem_unlink(sem_name);
> -    }
> +    free_pool(*pool);
> +    *pool = NULL;
> +    pool_size = 1;
>      ovs_mutex_unlock(&init_mutex);
> -    return NULL;
> +    return true;
>  }
>  
> -
>  /* Initializes 'hmap' as an empty hash table with mask N. */
>  void
>  ovn_fast_hmap_init(struct hmap *hmap, ssize_t mask)
> @@ -367,9 +438,7 @@ ovn_update_hashrow_locks(struct hmap *lflows, struct 
> hashrow_locks *hrl)
>  
>  static void
>  worker_pool_hook(void *aux OVS_UNUSED) {
> -    int i;
>      static struct worker_pool *pool;
> -    char sem_name[256];
>  
>      workers_must_exit = true;
>  
> @@ -380,55 +449,15 @@ worker_pool_hook(void *aux OVS_UNUSED) {
>       */
>      atomic_thread_fence(memory_order_acq_rel);
>  
> -    /* Wake up the workers after the must_exit flag has been set */
> -
> -    LIST_FOR_EACH (pool, list_node, &worker_pools) {
> -        for (i = 0; i < pool->size ; i++) {
> -            sem_post(pool->controls[i].fire);
> -        }
> -        for (i = 0; i < pool->size ; i++) {
> -            pthread_join(pool->controls[i].worker, NULL);
> -        }
> -        for (i = 0; i < pool->size ; i++) {
> -            sem_close(pool->controls[i].fire);
> -            sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, i);
> -            sem_unlink(sem_name);
> -        }
> -        sem_close(pool->done);
> -        sprintf(sem_name, MAIN_SEM_NAME, sembase, pool);
> -        sem_unlink(sem_name);
> +    LIST_FOR_EACH_SAFE (pool, list_node, &worker_pools) {
> +        ovs_list_remove(&pool->list_node);
> +        free_pool(pool);
>      }
>  }
>  
>  static void
> -setup_worker_pools(bool force) {
> -    int cores, nodes;
> -
> -    ovs_numa_init();
> -    nodes = ovs_numa_get_n_numas();
> -    if (nodes == OVS_NUMA_UNSPEC || nodes <= 0) {
> -        nodes = 1;
> -    }
> -    cores = ovs_numa_get_n_cores();
> -
> -    /* If there is no NUMA config, use 4 cores.
> -     * If there is NUMA config use half the cores on
> -     * one node so that the OS does not start pushing
> -     * threads to other nodes.
> -     */
> -    if (cores == OVS_CORE_UNSPEC || cores <= 0) {
> -        /* If there is no NUMA we can try the ovs-threads routine.
> -         * It falls back to sysconf and/or affinity mask.
> -         */
> -        cores = count_cpu_cores();
> -        pool_size = cores;
> -    } else {
> -        pool_size = cores / nodes;
> -    }
> -    if ((pool_size < 4) && force) {
> -        pool_size = 4;
> -    }
> -    can_parallelize = (pool_size >= 3);
> +setup_worker_pools(void)
> +{
>      fatal_signal_add_hook(worker_pool_hook, NULL, NULL, true);
>      sembase = random_uint32();
>  }
> diff --git a/lib/ovn-parallel-hmap.h b/lib/ovn-parallel-hmap.h
> index 0f7d68770..dc00ad635 100644
> --- a/lib/ovn-parallel-hmap.h
> +++ b/lib/ovn-parallel-hmap.h
> @@ -81,8 +81,8 @@ struct worker_control {
>      sem_t *done; /* Work completion semaphore - sem_post on completion. */
>      struct ovs_mutex mutex; /* Guards the data. */
>      void *data; /* Pointer to data to be processed. */
> -    void *workload; /* back-pointer to the worker pool structure. */
>      pthread_t worker;
> +    struct worker_pool *pool;
>  };
>  
>  struct worker_pool {
> @@ -92,10 +92,13 @@ struct worker_pool {
>      sem_t *done; /* Work completion semaphorew. */
>  };
>  
> -/* Add a worker pool for thread function start() which expects a pointer to
> - * a worker_control structure as an argument. */
> +/* Return pool size; bigger than 1 means parallelization has been enabled */

Nit: missing '.' at the end of the comment.

> +int ovn_get_worker_pool_size(void);
>  
> -struct worker_pool *ovn_add_worker_pool(void *(*start)(void *));
> +/* Add/delete a worker pool for thread function start() which expects a 
> pointer
> + * to a worker_control structure as an argument. Return true if updated */
> +bool ovn_update_worker_pool(int requested_pool_size, struct worker_pool **,
> +                            void *(*start)(void *));
>  
>  /* Setting this to true will make all processing threads exit */
>  
> @@ -251,17 +254,17 @@ static inline void init_hash_row_locks(struct 
> hashrow_locks *hrl)
>      hrl->row_locks = NULL;
>  }
>  
> -bool ovn_can_parallelize_hashes(bool force_parallel);
> -
>  /* Use the OVN library functions for stuff which OVS has not defined
>   * If OVS has defined these, they will still compile using the OVN
>   * local names, but will be dropped by the linker in favour of the OVS
>   * supplied functions.
>   */
> +#define update_worker_pool(requested_pool_size, existing_pool, func) \
> +    ovn_update_worker_pool(requested_pool_size, existing_pool, func)
>  
> -#define update_hashrow_locks(lflows, hrl) ovn_update_hashrow_locks(lflows, 
> hrl)
> +#define get_worker_pool_size() ovn_get_worker_pool_size()
>  
> -#define can_parallelize_hashes(force) ovn_can_parallelize_hashes(force)
> +#define update_hashrow_locks(lflows, hrl) ovn_update_hashrow_locks(lflows, 
> hrl)
>  
>  #define stop_parallel_processing() ovn_stop_parallel_processing()
>  
> diff --git a/northd/northd.c b/northd/northd.c
> index a56666297..a113349cd 100644
> --- a/northd/northd.c
> +++ b/northd/northd.c
> @@ -59,6 +59,7 @@
>  VLOG_DEFINE_THIS_MODULE(northd);
>  
>  static bool controller_event_en;
> +static bool lflow_hash_lock_initialized = false;
>  
>  static bool check_lsp_is_up;
>  
> @@ -4673,7 +4674,13 @@ ovn_lflow_equal(const struct ovn_lflow *a, const 
> struct ovn_datapath *od,
>  /* If this option is 'true' northd will combine logical flows that differ by
>   * logical datapath only by creating a datapath group. */
>  static bool use_logical_dp_groups = false;
> -static bool use_parallel_build = false;
> +
> +enum {
> +    STATE_NULL,
> +    STATE_INIT_HASH_SIZES,
> +    STATE_USE_PARALLELIZATION
> +};
> +static int parallelization_state = STATE_NULL;
>  
>  static void
>  ovn_lflow_init(struct ovn_lflow *lflow, struct ovn_datapath *od,
> @@ -4692,7 +4699,8 @@ ovn_lflow_init(struct ovn_lflow *lflow, struct 
> ovn_datapath *od,
>      lflow->ctrl_meter = ctrl_meter;
>      lflow->dpg = NULL;
>      lflow->where = where;
> -    if (use_parallel_build && use_logical_dp_groups) {
> +    if ((parallelization_state == STATE_USE_PARALLELIZATION)
> +        && use_logical_dp_groups) {
>          ovs_mutex_init(&lflow->odg_lock);
>      }
>  }
> @@ -4706,7 +4714,7 @@ ovn_dp_group_add_with_reference(struct ovn_lflow 
> *lflow_ref,
>          return false;
>      }
>  
> -    if (use_parallel_build) {
> +    if (parallelization_state == STATE_USE_PARALLELIZATION) {
>          ovs_mutex_lock(&lflow_ref->odg_lock);
>          hmapx_add(&lflow_ref->od_group, od);
>          ovs_mutex_unlock(&lflow_ref->odg_lock);
> @@ -4739,6 +4747,18 @@ lflow_hash_lock_init(void)
>      for (size_t i = 0; i < LFLOW_HASH_LOCK_MASK + 1; i++) {
>          ovs_mutex_init(&lflow_hash_locks[i]);
>      }
> +    lflow_hash_lock_initialized = true;
> +}
> +
> +static void
> +lflow_hash_lock_destroy(void)
> +{
> +    if (lflow_hash_lock_initialized) {
> +        for (size_t i = 0; i < LFLOW_HASH_LOCK_MASK + 1; i++) {
> +            ovs_mutex_destroy(&lflow_hash_locks[i]);
> +        }
> +    }
> +    lflow_hash_lock_initialized = false;
>  }
>  
>  /* This thread-local var is used for parallel lflow building when dp-groups 
> is
> @@ -4786,7 +4806,7 @@ do_ovn_lflow_add(struct hmap *lflow_map, struct 
> ovn_datapath *od,
>                     nullable_xstrdup(ctrl_meter),
>                     ovn_lflow_hint(stage_hint), where);
>      hmapx_add(&lflow->od_group, od);
> -    if (!use_parallel_build) {
> +    if (parallelization_state != STATE_USE_PARALLELIZATION) {
>          hmap_insert(lflow_map, &lflow->hmap_node, hash);
>      } else {
>          hmap_insert_fast(lflow_map, &lflow->hmap_node, hash);
> @@ -4829,7 +4849,8 @@ ovn_lflow_add_at_with_hash(struct hmap *lflow_map, 
> struct ovn_datapath *od,
>      struct ovn_lflow *lflow;
>  
>      ovs_assert(ovn_stage_to_datapath_type(stage) == 
> ovn_datapath_get_type(od));
> -    if (use_logical_dp_groups && use_parallel_build) {
> +    if (use_logical_dp_groups
> +        && (parallelization_state == STATE_USE_PARALLELIZATION)) {
>          lflow = do_ovn_lflow_add_pd(lflow_map, od, hash, stage, priority,
>                                      match, actions, io_port, stage_hint, 
> where,
>                                      ctrl_meter);
> @@ -13708,15 +13729,10 @@ build_lswitch_and_lrouter_iterate_by_op(struct 
> ovn_port *op,
>                                        &lsi->actions);
>  }
>  
> -struct lflows_thread_pool {
> -    struct worker_pool *pool;
> -};
> -
>  static void *
>  build_lflows_thread(void *arg)
>  {
>      struct worker_control *control = (struct worker_control *) arg;
> -    struct lflows_thread_pool *workload;
>      struct lswitch_flow_build_info *lsi;
>  
>      struct ovn_datapath *od;
> @@ -13727,17 +13743,16 @@ build_lflows_thread(void *arg)
>  
>      while (!stop_parallel_processing()) {
>          wait_for_work(control);
> -        workload = (struct lflows_thread_pool *) control->workload;
>          lsi = (struct lswitch_flow_build_info *) control->data;
>          if (stop_parallel_processing()) {
>              return NULL;
>          }
>          thread_lflow_counter = 0;
> -        if (lsi && workload) {
> +        if (lsi) {
>              /* Iterate over bucket ThreadID, ThreadID+size, ... */
>              for (bnum = control->id;
>                      bnum <= lsi->datapaths->mask;
> -                    bnum += workload->pool->size)
> +                    bnum += control->pool->size)
>              {
>                  HMAP_FOR_EACH_IN_PARALLEL (od, key_node, bnum, 
> lsi->datapaths) {
>                      if (stop_parallel_processing()) {
> @@ -13748,7 +13763,7 @@ build_lflows_thread(void *arg)
>              }
>              for (bnum = control->id;
>                      bnum <= lsi->ports->mask;
> -                    bnum += workload->pool->size)
> +                    bnum += control->pool->size)
>              {
>                  HMAP_FOR_EACH_IN_PARALLEL (op, key_node, bnum, lsi->ports) {
>                      if (stop_parallel_processing()) {
> @@ -13759,7 +13774,7 @@ build_lflows_thread(void *arg)
>              }
>              for (bnum = control->id;
>                      bnum <= lsi->lbs->mask;
> -                    bnum += workload->pool->size)
> +                    bnum += control->pool->size)
>              {
>                  HMAP_FOR_EACH_IN_PARALLEL (lb, hmap_node, bnum, lsi->lbs) {
>                      if (stop_parallel_processing()) {
> @@ -13780,7 +13795,7 @@ build_lflows_thread(void *arg)
>              }
>              for (bnum = control->id;
>                      bnum <= lsi->igmp_groups->mask;
> -                    bnum += workload->pool->size)
> +                    bnum += control->pool->size)
>              {
>                  HMAP_FOR_EACH_IN_PARALLEL (
>                          igmp_group, hmap_node, bnum, lsi->igmp_groups) {
> @@ -13799,33 +13814,7 @@ build_lflows_thread(void *arg)
>      return NULL;
>  }
>  
> -static bool pool_init_done = false;
> -static struct lflows_thread_pool *build_lflows_pool = NULL;
> -
> -static void
> -init_lflows_thread_pool(void)
> -{
> -    int index;
> -
> -    if (!pool_init_done) {
> -        struct worker_pool *pool = add_worker_pool(build_lflows_thread);
> -        pool_init_done = true;
> -        if (pool) {
> -            build_lflows_pool = xmalloc(sizeof(*build_lflows_pool));
> -            build_lflows_pool->pool = pool;
> -            for (index = 0; index < build_lflows_pool->pool->size; index++) {
> -                build_lflows_pool->pool->controls[index].workload =
> -                    build_lflows_pool;
> -            }
> -        }
> -    }
> -}
> -
> -/* TODO: replace hard cutoffs by configurable via commands. These are
> - * temporary defines to determine single-thread to multi-thread processing
> - * cutoff.
> - * Setting to 1 forces "all parallel" lflow build.
> - */
> +static struct worker_pool *build_lflows_pool = NULL;
>  
>  static void
>  noop_callback(struct worker_pool *pool OVS_UNUSED,
> @@ -13871,28 +13860,21 @@ build_lswitch_and_lrouter_flows(const struct hmap 
> *datapaths,
>  
>      char *svc_check_match = xasprintf("eth.dst == %s", svc_monitor_mac);
>  
> -    if (use_parallel_build) {
> -        init_lflows_thread_pool();
> -        if (!can_parallelize_hashes(false)) {
> -            use_parallel_build = false;
> -        }
> -    }
> -
> -    if (use_parallel_build) {
> +    if (parallelization_state == STATE_USE_PARALLELIZATION) {
>          struct hmap *lflow_segs;
>          struct lswitch_flow_build_info *lsiv;
>          int index;
>  
> -        lsiv = xcalloc(sizeof(*lsiv), build_lflows_pool->pool->size);
> +        lsiv = xcalloc(sizeof(*lsiv), build_lflows_pool->size);
>          if (use_logical_dp_groups) {
>              lflow_segs = NULL;
>          } else {
> -            lflow_segs = xcalloc(sizeof(*lflow_segs), 
> build_lflows_pool->pool->size);
> +            lflow_segs = xcalloc(sizeof(*lflow_segs), 
> build_lflows_pool->size);
>          }
>  
>          /* Set up "work chunks" for each thread to work on. */
>  
> -        for (index = 0; index < build_lflows_pool->pool->size; index++) {
> +        for (index = 0; index < build_lflows_pool->size; index++) {
>              if (use_logical_dp_groups) {
>                  /* if dp_groups are in use we lock a shared lflows hash
>                   * on a per-bucket level instead of merging hash frags */
> @@ -13915,19 +13897,19 @@ build_lswitch_and_lrouter_flows(const struct hmap 
> *datapaths,
>              ds_init(&lsiv[index].match);
>              ds_init(&lsiv[index].actions);
>  
> -            build_lflows_pool->pool->controls[index].data = &lsiv[index];
> +            build_lflows_pool->controls[index].data = &lsiv[index];
>          }
>  
>          /* Run thread pool. */
>          if (use_logical_dp_groups) {
> -            run_pool_callback(build_lflows_pool->pool, NULL, NULL,
> +            run_pool_callback(build_lflows_pool, NULL, NULL,
>                                noop_callback);
> -            fix_flow_map_size(lflows, lsiv, build_lflows_pool->pool->size);
> +            fix_flow_map_size(lflows, lsiv, build_lflows_pool->size);
>          } else {
> -            run_pool_hash(build_lflows_pool->pool, lflows, lflow_segs);
> +            run_pool_hash(build_lflows_pool, lflows, lflow_segs);
>          }
>  
> -        for (index = 0; index < build_lflows_pool->pool->size; index++) {
> +        for (index = 0; index < build_lflows_pool->size; index++) {
>              ds_destroy(&lsiv[index].match);
>              ds_destroy(&lsiv[index].actions);
>          }
> @@ -14063,8 +14045,21 @@ ovn_sb_set_lflow_logical_dp_group(
>  }
>  
>  static ssize_t max_seen_lflow_size = 128;
> -static bool needs_parallel_init = true;
> -static bool reset_parallel = false;
> +
> +void run_update_worker_pool(int n_threads)
> +{
> +    /* If number of threads has been updated (or initially set),
> +     * update the worker pool */

Nit: missing '.' at the end of the comment.

> +    if (update_worker_pool(n_threads,
> +        &build_lflows_pool, build_lflows_thread)) {

Indentation looks a bit weird to me here.

> +        /* worker pool was updated */
> +        if (get_worker_pool_size() <= 1) {
> +            /* destroy potentially created lflow_hash_lock */
> +            lflow_hash_lock_destroy();
> +            parallelization_state = STATE_NULL;
> +        }
> +    }
> +}
>  
>  static void
>  build_mcast_groups(struct lflow_input *data,
> @@ -14085,24 +14080,23 @@ void build_lflows(struct lflow_input *input_data,
>      build_mcast_groups(input_data, input_data->datapaths, input_data->ports,
>                         &mcast_groups, &igmp_groups);
>  
> -    if (reset_parallel) {
> -        /* Parallel build was disabled before, we need to
> -         * re-enable it. */
> -        use_parallel_build = true;
> -        reset_parallel = false;
> +    if (get_worker_pool_size() > 1) {

Can we avoid relying on get_worker_pool_size() here and instead set the
state directly in run_update_worker_pool?

Something like:

In run_update_worker_pool(), based on the value returned by
get_worker_pool_size() after update_worker_pool(), set the state to:

a. STATE_INIT_HASH_SIZES if pool_size > 1 and also call
lflow_hash_lock_init().
b. STATE_NULL if pool_size <= 1 and also call lflow_hash_lock_destroy().

> +        /* Disable parallel build on first run with dp_groups
> +         * to determine the correct sizing of hashes.
> +         */
> +        if (parallelization_state != STATE_USE_PARALLELIZATION) {
> +            if ((parallelization_state == STATE_NULL)
> +                && (use_logical_dp_groups)) {
> +                lflow_hash_lock_init();
> +                parallelization_state = STATE_INIT_HASH_SIZES;
> +            } else {
> +                parallelization_state = STATE_USE_PARALLELIZATION;
> +            }
> +        }
>      }
>  
>      fast_hmap_size_for(&lflows, max_seen_lflow_size);
>  
> -    if (use_parallel_build && use_logical_dp_groups &&
> -        needs_parallel_init) {
> -        lflow_hash_lock_init();
> -        needs_parallel_init = false;
> -        /* Disable parallel build on first run with dp_groups
> -         * to determine the correct sizing of hashes. */
> -        use_parallel_build = false;
> -        reset_parallel = true;
> -    }
>      build_lswitch_and_lrouter_flows(input_data->datapaths, input_data->ports,
>                                      input_data->port_groups, &lflows,
>                                      &mcast_groups, &igmp_groups,

Then at this point it should be safe to "advance" state from
STATE_INIT_HASH_SIZES to STATE_USE_PARALLELIZATION.  If I'm reading the
code correctly, this would ensure next runs will use parallel lflow build.

> @@ -15350,10 +15344,6 @@ ovnnb_db_run(struct northd_input *input_data,
>  
>      smap_destroy(&options);
>  
> -    use_parallel_build =
> -        (smap_get_bool(&nb->options, "use_parallel_build", false) &&
> -         can_parallelize_hashes(false));
> -
>      use_logical_dp_groups = smap_get_bool(&nb->options,
>                                            "use_logical_dp_groups", true);
>      use_ct_inv_match = smap_get_bool(&nb->options,
> diff --git a/northd/northd.h b/northd/northd.h
> index 2d804a22e..fe8dad03a 100644
> --- a/northd/northd.h
> +++ b/northd/northd.h
> @@ -107,5 +107,6 @@ void build_bfd_table(struct lflow_input *input_data,
>                       struct hmap *bfd_connections, struct hmap *ports);
>  void bfd_cleanup_connections(struct lflow_input *input_data,
>                               struct hmap *bfd_map);
> +void run_update_worker_pool(int n_threads);
>  
>  #endif /* NORTHD_H */
> diff --git a/northd/ovn-northd-ddlog.c b/northd/ovn-northd-ddlog.c
> index 718d052cc..e9afda4c6 100644
> --- a/northd/ovn-northd-ddlog.c
> +++ b/northd/ovn-northd-ddlog.c
> @@ -1084,7 +1084,6 @@ parse_options(int argc OVS_UNUSED, char *argv[] 
> OVS_UNUSED,
>          SSL_OPTION_ENUMS,
>          OPT_DRY_RUN,
>          OPT_DDLOG_RECORD,
> -        OPT_DUMMY_NUMA,
>      };
>      static const struct option long_options[] = {
>          {"ovnsb-db", required_argument, NULL, 'd'},
> @@ -1098,7 +1097,6 @@ parse_options(int argc OVS_UNUSED, char *argv[] 
> OVS_UNUSED,
>          OVN_DAEMON_LONG_OPTIONS,
>          VLOG_LONG_OPTIONS,
>          STREAM_SSL_LONG_OPTIONS,
> -        {"dummy-numa", required_argument, NULL, OPT_DUMMY_NUMA},
>          {NULL, 0, NULL, 0},
>      };
>      char *short_options = 
> ovs_cmdl_long_options_to_short_options(long_options);
> @@ -1155,10 +1153,6 @@ parse_options(int argc OVS_UNUSED, char *argv[] 
> OVS_UNUSED,
>              *pause = true;
>              break;
>  
> -        case OPT_DUMMY_NUMA:
> -            ovs_numa_set_dummy(optarg);
> -            break;
> -
>          case OPT_DDLOG_RECORD:
>              record_file = optarg;
>              break;
> diff --git a/northd/ovn-northd.8.xml b/northd/ovn-northd.8.xml
> index 0fe350d0e..b6a5e9121 100644
> --- a/northd/ovn-northd.8.xml
> +++ b/northd/ovn-northd.8.xml
> @@ -68,53 +68,24 @@
>            restarting a process or disturbing a running system.
>          </p>
>        </dd>
> -      <dt><code>--dummy-numa</code></dt>
> +      <dt><code>n-threads N</code></dt>
>        <dd>
> -        <p>
> -          Typically, OVS uses sysfs to determine the number of NUMA nodes and
> -          CPU cores that are available on a machine. The parallelization code
> -          in OVN uses this information to determine if there are enough
> -          resources to use parallelization. The current algorithm enables
> -          parallelization if the total number of CPU cores divided by the
> -          number of NUMA nodes is greater than or equal to four.
> -        </p>
> -
>          <p>
>            In certain situations, it may be desirable to enable 
> parallelization
> -          on a system that otherwise would not have it allowed. The
> -          <code>--dummy-numa</code> option allows for you to fake the NUMA
> -          nodes and cores that OVS thinks your system has. The syntax 
> consists
> -          of using numbers to represent the NUMA node IDs. The number of 
> times
> -          that a NUMA node ID appears represents how many CPU cores that NUMA
> -          node contains. So for instance, if you did the following:
> -        </p>
> -
> -        <p>
> -          <code>--dummy-numa=0,0,0,0</code>
> -        </p>
> -
> -        <p>
> -          it would make OVS assume that you have a single NUMA node with ID 
> 0,
> -          and that NUMA node consists of four CPU cores. Similarly, you could
> -          do:
> -        </p>
> -
> -        <p>
> -          <code>--dummy-numa=0,0,0,0,0,0,1,1,1,1,1,1</code>
> +          on a system to decrease latency (at the potential cost of 
> increasing
> +          CPU usage).
>          </p>
>  
>          <p>
> -          to make OVS assume you have two NUMA nodes with IDs 0 and 1, each
> -          with six CPU cores.
> +          This option will cause ovn-northd to use N threads when building
> +          logical flows, when N is within [2-256].
> +          If N is 1, parallelization is disabled (default behavior).
> +          If N is less than 1 or more than 256, then N is set to 1,
> +          parallelization is disabled and a warning is logged.
>          </p>
>  
>          <p>
> -          Currently, the only affect this option has is on whether
> -          parallelization can be enabled in ovn-northd. There are no NUMA 
> node
> -          or CPU core-specific actions performed by OVN. Setting
> -          <code>--dummy-numa</code> in ovn-northd does not affect how other 
> OVS
> -          processes on the system (such as ovs-vswitchd) count the number of
> -          NUMA nodes and CPU cores; this setting is local to ovn-northd.
> +          ovn-northd-ddlog does not support this option.
>          </p>
>        </dd>
>      </dl>
> @@ -210,6 +181,27 @@
>          except for the northbound database client.
>        </p>
>        </dd>
> +
> +
> +      <dt><code>set-n-threads N</code></dt>
> +      <dd>
> +      <p>
> +        Set the number of threads used for building logical flows.
> +        When N is within [2-256], parallelization is enabled.
> +        When N is 1 parallelization is disabled.
> +        When N is less than 1 or more than 256, an error is retuned.
> +        If ovn-northd fails to start parallelization (e.g. fails to setup
> +        semaphores, parallelization is disabled and an error is returned.
> +      </p>
> +      </dd>
> +
> +      <dt><code>get-n-threads</code></dt>
> +      <dd>
> +      <p>
> +        Return the number of threads used for building logical flows.
> +      </p>
> +      </dd>
> +
>        </dl>
>      </p>
>  
> diff --git a/northd/ovn-northd.c b/northd/ovn-northd.c
> index 45b120697..8d60376eb 100644
> --- a/northd/ovn-northd.c
> +++ b/northd/ovn-northd.c
> @@ -41,6 +41,7 @@
>  #include "unixctl.h"
>  #include "util.h"
>  #include "openvswitch/vlog.h"
> +#include "lib/ovn-parallel-hmap.h"
>  
>  VLOG_DEFINE_THIS_MODULE(ovn_northd);
>  
> @@ -50,12 +51,16 @@ static unixctl_cb_func ovn_northd_resume;
>  static unixctl_cb_func ovn_northd_is_paused;
>  static unixctl_cb_func ovn_northd_status;
>  static unixctl_cb_func cluster_state_reset_cmd;
> +static unixctl_cb_func ovn_northd_set_thread_count_cmd;
> +static unixctl_cb_func ovn_northd_get_thread_count_cmd;
>  
>  struct northd_state {
>      bool had_lock;
>      bool paused;
>  };
>  
> +#define OVN_MAX_SUPPORTED_THREADS 256
> +
>  static const char *ovnnb_db;
>  static const char *ovnsb_db;
>  static const char *unixctl_path;
> @@ -525,7 +530,7 @@ Options:\n\
>    --ovnsb-db=DATABASE       connect to ovn-sb database at DATABASE\n\
>                              (default: %s)\n\
>    --dry-run                 start in paused state (do not commit db 
> changes)\n\
> -  --dummy-numa              override default NUMA node and CPU core 
> discovery\n\
> +  --n-threads=N             specify number of threads\n\
>    --unixctl=SOCKET          override default control socket name\n\
>    -h, --help                display this help message\n\
>    -o, --options             list available options\n\
> @@ -538,14 +543,14 @@ Options:\n\
>  
>  static void
>  parse_options(int argc OVS_UNUSED, char *argv[] OVS_UNUSED,
> -              bool *paused)
> +              bool *paused, int *n_threads)
>  {
>      enum {
>          OVN_DAEMON_OPTION_ENUMS,
>          VLOG_OPTION_ENUMS,
>          SSL_OPTION_ENUMS,
>          OPT_DRY_RUN,
> -        OPT_DUMMY_NUMA,
> +        OPT_N_THREADS,
>      };
>      static const struct option long_options[] = {
>          {"ovnsb-db", required_argument, NULL, 'd'},
> @@ -555,7 +560,7 @@ parse_options(int argc OVS_UNUSED, char *argv[] 
> OVS_UNUSED,
>          {"options", no_argument, NULL, 'o'},
>          {"version", no_argument, NULL, 'V'},
>          {"dry-run", no_argument, NULL, OPT_DRY_RUN},
> -        {"dummy-numa", required_argument, NULL, OPT_DUMMY_NUMA},
> +        {"n-threads", required_argument, NULL, OPT_N_THREADS},
>          OVN_DAEMON_LONG_OPTIONS,
>          VLOG_LONG_OPTIONS,
>          STREAM_SSL_LONG_OPTIONS,
> @@ -611,8 +616,21 @@ parse_options(int argc OVS_UNUSED, char *argv[] 
> OVS_UNUSED,
>              ovn_print_version(0, 0);
>              exit(EXIT_SUCCESS);
>  
> -        case OPT_DUMMY_NUMA:
> -            ovs_numa_set_dummy(optarg);
> +        case OPT_N_THREADS:
> +            *n_threads = atoi(optarg);

I'd use strtoul() instead.

> +            if (*n_threads < 1) {
> +                *n_threads = 1;
> +                VLOG_WARN("Setting n_threads to %d as --n-threads option was 
> "
> +                    "set to : [%s]", *n_threads, optarg);
> +            }
> +            if (*n_threads > OVN_MAX_SUPPORTED_THREADS) {
> +                *n_threads = OVN_MAX_SUPPORTED_THREADS;
> +                VLOG_WARN("Setting n_threads to %d as --n-threads option was 
> "
> +                    "set to : [%s]", *n_threads, optarg);
> +            }
> +            if (*n_threads != 1) {
> +                VLOG_INFO("Using %d threads", *n_threads);
> +            }
>              break;
>  
>          case OPT_DRY_RUN:
> @@ -668,6 +686,7 @@ main(int argc, char *argv[])
>      struct unixctl_server *unixctl;
>      int retval;
>      bool exiting;
> +    int n_threads = 1;
>      struct northd_state state = {
>          .had_lock = false,
>          .paused = false
> @@ -677,7 +696,7 @@ main(int argc, char *argv[])
>      ovs_cmdl_proctitle_init(argc, argv);
>      ovn_set_program_name(argv[0]);
>      service_start(&argc, &argv);
> -    parse_options(argc, argv, &state.paused);
> +    parse_options(argc, argv, &state.paused, &n_threads);
>  
>      daemonize_start(false);
>  
> @@ -704,6 +723,12 @@ main(int argc, char *argv[])
>      unixctl_command_register("nb-cluster-state-reset", "", 0, 0,
>                               cluster_state_reset_cmd,
>                               &reset_ovnnb_idl_min_index);
> +    unixctl_command_register("parallel-build/set-n-threads", "N_THREADS", 1, 
> 1,
> +                             ovn_northd_set_thread_count_cmd,
> +                             NULL);
> +    unixctl_command_register("parallel-build/get-n-threads", "", 0, 0,
> +                             ovn_northd_get_thread_count_cmd,
> +                             NULL);
>  
>      daemonize_complete();
>  
> @@ -753,6 +778,8 @@ main(int argc, char *argv[])
>      unsigned int ovnnb_cond_seqno = UINT_MAX;
>      unsigned int ovnsb_cond_seqno = UINT_MAX;
>  
> +    run_update_worker_pool(n_threads);
> +
>      /* Main loop. */
>      exiting = false;
>  
> @@ -1009,3 +1036,30 @@ cluster_state_reset_cmd(struct unixctl_conn *conn, int 
> argc OVS_UNUSED,
>      poll_immediate_wake();
>      unixctl_command_reply(conn, NULL);
>  }
> +
> +static void
> +ovn_northd_set_thread_count_cmd(struct unixctl_conn *conn, int argc 
> OVS_UNUSED,
> +               const char *argv[], void *aux OVS_UNUSED)
> +{
> +    int n_threads = atoi(argv[1]);
> +
> +    if ((n_threads < 1) || (n_threads > OVN_MAX_SUPPORTED_THREADS)) {
> +        struct ds s = DS_EMPTY_INITIALIZER;
> +        ds_put_format(&s, "invalid n_threads: %d\n", n_threads);
> +        unixctl_command_reply_error(conn, ds_cstr(&s));
> +        ds_destroy(&s);
> +    } else {
> +        run_update_worker_pool(n_threads);
> +        unixctl_command_reply(conn, NULL);
> +    }
> +}
> +
> +static void
> +ovn_northd_get_thread_count_cmd(struct unixctl_conn *conn, int argc 
> OVS_UNUSED,
> +               const char *argv[] OVS_UNUSED, void *aux OVS_UNUSED)
> +{
> +    struct ds s = DS_EMPTY_INITIALIZER;
> +    ds_put_format(&s, "%d\n", get_worker_pool_size());
> +    unixctl_command_reply(conn, ds_cstr(&s));
> +    ds_destroy(&s);
> +}
> diff --git a/tests/ovn-macros.at b/tests/ovn-macros.at
> index d78315c75..c6f0f6251 100644
> --- a/tests/ovn-macros.at
> +++ b/tests/ovn-macros.at
> @@ -170,8 +170,8 @@ ovn_start_northd() {
>          ovn-northd-ddlog) northd_args="$northd_args 
> --ddlog-record=${AZ:+$AZ/}northd$suffix/replay.dat -v" ;;
>      esac
>  
> -    if test X$NORTHD_DUMMY_NUMA = Xyes; then
> -        northd_args="$northd_args --dummy-numa=\"0,0,0,0\""
> +    if test X$NORTHD_USE_PARALLELIZATION = Xyes; then
> +        northd_args="$northd_args --n-threads=4"
>      fi
>  
>      local name=${d_prefix}northd${suffix}
> @@ -252,10 +252,6 @@ ovn_start () {
>      else
>          ovn-nbctl set NB_Global . options:use_logical_dp_groups=false
>      fi
> -
> -    if test X$NORTHD_USE_PARALLELIZATION = Xyes; then
> -        ovn-nbctl set NB_Global . options:use_parallel_build=true
> -    fi
>  }
>  
>  # Interconnection networks.
> @@ -749,16 +745,6 @@ OVS_END_SHELL_HELPERS
>  
>  m4_define([OVN_POPULATE_ARP], [AT_CHECK(ovn_populate_arp__, [0], [ignore])])
>  
> -# Use --dummy-numa if system has low cores and we want to force 
> parallelization
> -m4_define([NORTHD_DUMMY_NUMA],
> -  [$(if test $(nproc) -lt 4 && test NORTHD_USE_PARALLELIZATION = yes
> -     then
> -       echo "yes"
> -     else
> -       echo "no"
> -     fi)
> -])
> -
>  # Defines a versions of a test with all combinations of northd and
>  # datapath groups.
>  m4_define([OVN_FOR_EACH_NORTHD],
> @@ -770,35 +756,14 @@ m4_define([OVN_FOR_EACH_NORTHD],
>  # Some tests aren't prepared for dp groups to be enabled.
>  m4_define([OVN_FOR_EACH_NORTHD_WITHOUT_DP_GROUPS],
>    [m4_foreach([NORTHD_TYPE], [ovn-northd, ovn-northd-ddlog],
> -     [m4_foreach([NORTHD_USE_DP_GROUPS], [no], [$1
> -])])])
> -
> -# Test parallelization with dp groups enabled and disabled
> -m4_define([OVN_NORTHD_PARALLELIZATION_DUMMY], [
> -m4_pushdef([NORTHD_TYPE], [ovn_northd])
> -m4_pushdef(NORTHD_DUMMY_NUMA, [yes])
> -[m4_foreach([NORTHD_USE_DP_GROUPS], [yes, no],
> -    [[NORTHD_USE_PARALLELIZATION], [yes]
> -])]])
> -
> -m4_define([OVN_NORTHD_PARALLELIZATION_NO_DUMMY], [
> -m4_pushdef([NORTHD_TYPE], [ovn_northd])
> -m4_pushdef(NORTHD_DUMMY_NUMA, [no])
> -[m4_foreach([NORTHD_USE_DP_GROUPS], [yes, no],
> -    [[NORTHD_USE_PARALLELIZATION], [yes]
> -])]])
> -
> -# Use --dummy-numa if system has low cores
> -m4_define([HOST_HAS_LOW_CORES], [
> -    if test $(nproc) -lt 4; then
> -        :
> -        $1
> -    else
> -        :
> -        $2
> -    fi
> -])
> +     [m4_foreach([NORTHD_USE_DP_GROUPS], [no],
> +       [m4_foreach([NORTHD_USE_PARALLELIZATION], [yes, no], [$1
> +])])])])
> +
> +# Some tests aren't prepared for ddlog to be enabled.
> +m4_define([OVN_FOR_EACH_NORTHD_WITHOUT_DDLOG],
> +  [m4_foreach([NORTHD_TYPE], [ovn-northd],
> +     [m4_foreach([NORTHD_USE_DP_GROUPS], [yes, no],
> +       [m4_foreach([NORTHD_USE_PARALLELIZATION], [yes, no], [$1
> +])])])])
>  
> -m4_define([NORTHD_PARALLELIZATION], [
> -    HOST_HAS_LOW_CORES([OVN_NORTHD_PARALLELIZATION_DUMMY], 
> [OVN_NORTHD_PARALLELIZATION_NO_DUMMY])
> -])
> diff --git a/tests/ovn-northd.at b/tests/ovn-northd.at
> index 69ad85533..798bb5552 100644
> --- a/tests/ovn-northd.at
> +++ b/tests/ovn-northd.at
> @@ -7158,3 +7158,112 @@ ct_next(ct_state=new|trk);
>  
>  AT_CLEANUP
>  ])
> +
> +OVN_FOR_EACH_NORTHD_WITHOUT_DDLOG([
> +AT_SETUP([northd-parallelization unixctl])
> +ovn_start
> +
> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 1
> +OVS_WAIT_FOR_OUTPUT([as northd ovn-appctl -t NORTHD_TYPE 
> parallel-build/get-n-threads], [0], [1
> +])
> +
> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 4
> +OVS_WAIT_FOR_OUTPUT([as northd ovn-appctl -t NORTHD_TYPE 
> parallel-build/get-n-threads], [0], [4
> +])
> +
> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 1
> +OVS_WAIT_FOR_OUTPUT([as northd ovn-appctl -t NORTHD_TYPE 
> parallel-build/get-n-threads], [0], [1
> +])
> +
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 
> 0], [2], [],
> +  [invalid n_threads: 0
> +ovn-appctl: ovn-northd: server returned an error
> +])
> +
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 
> -1], [2], [],
> +  [invalid n_threads: -1
> +ovn-appctl: ovn-northd: server returned an error
> +])
> +
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 
> 300], [2], [],
> +  [invalid n_threads: 300
> +ovn-appctl: ovn-northd: server returned an error
> +])
> +
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads], 
> [2], [],
> +  ["parallel-build/set-n-threads" command requires at least 1 arguments
> +ovn-appctl: ovn-northd: server returned an error
> +])
> +
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 1 
> 2], [2], [],
> +  ["parallel-build/set-n-threads" command takes at most 1 arguments
> +ovn-appctl: ovn-northd: server returned an error
> +])
> +
> +AT_CLEANUP
> +])
> +
> +OVN_FOR_EACH_NORTHD_WITHOUT_DDLOG([
> +AT_SETUP([northd-parallelization runtime])
> +ovn_start
> +
> +add_switch_ports() {
> +    for port in $(seq $1 $2); do
> +        logical_switch_port=lsp${port}
> +        check ovn-nbctl lsp-add ls1 $logical_switch_port
> +        check ovn-nbctl lsp-set-addresses $logical_switch_port dynamic
> +    done
> +}
> +
> +# Build some rather heavy config and modify number of threads in the middle
> +check ovn-nbctl ls-add ls1
> +check ovn-nbctl set Logical_Switch ls1 other_config:subnet=10.1.0.0/16
> +check ovn-nbctl set Logical_Switch ls1 other_config:exclude_ips=10.1.255.254
> +
> +check ovn-nbctl lr-add lr1
> +check ovn-nbctl lsp-add ls1 lsp0 -- set Logical_Switch_Port lsp0 type=router 
> options:router-port=lrp0 addresses=dynamic
> +check ovn-nbctl lrp-add lr1 lrp0 "f0:00:00:01:00:01" 10.1.255.254/16
> +check ovn-nbctl lr-nat-add lr1 snat 10.2.0.1 10.1.0.0/16
> +add_switch_ports 1 50
> +
> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 4
> +add_switch_ports 51 100
> +
> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 8
> +add_switch_ports 101 150
> +
> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 4
> +add_switch_ports 151 200
> +
> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 1
> +add_switch_ports 201 250
> +check ovn-nbctl --wait=sb sync
> +
> +# Run 3 times: one with parallelization enabled, one with disabled, and one 
> while changing
> +# Compare the flows produced by the three runs
> +# Ignore IP/MAC addresses
> +ovn-sbctl dump-flows | sed 's/arp.tpa == 10.1.0..../arp.tpa == 10.1.0.??/' | 
> sed 's/eth.dst == ..:..:..:..:..:../??:??:??:??:??:??/' |sort > flows1
> +
> +# Restart with 1 thread
> +for port in $(seq 1 250); do
> +    logical_switch_port=lsp${port}
> +    check ovn-nbctl lsp-del $logical_switch_port
> +done
> +add_switch_ports 1 250
> +check ovn-nbctl --wait=sb sync
> +ovn-sbctl dump-flows | sed 's/arp.tpa == 10.1.0..../arp.tpa == 10.1.0.??/' | 
> sed 's/eth.dst == ..:..:..:..:..:../??:??:??:??:??:??/' |sort > flows2
> +AT_CHECK([diff flows1 flows2])
> +
> +# Restart with with 8 threads
> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 8
> +for port in $(seq 1 250); do
> +    logical_switch_port=lsp${port}
> +    check ovn-nbctl lsp-del $logical_switch_port
> +done
> +add_switch_ports 1 250
> +check ovn-nbctl --wait=sb sync
> +ovn-sbctl dump-flows | sed 's/arp.tpa == 10.1.0..../arp.tpa == 10.1.0.??/' | 
> sed 's/eth.dst == ..:..:..:..:..:../??:??:??:??:??:??/' |sort > flows3
> +AT_CHECK([diff flows1 flows3])
> +
> +AT_CLEANUP
> +])

_______________________________________________
dev mailing list
[email protected]
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to