On 5/18/22 14:59, Dumitru Ceara wrote:
> On 5/18/22 12:07, Xavier Simonart wrote:
>> This patch is intended to change the way to enable northd lflow build
>> parallelization, as well as enable runtime change of number of threads.
>> Before this patch, the following was needed to use parallelization:
>> - enable parallelization through use_parallel_build in NBDB
>> - use --dummy-numa to select number of threads.
>> This second part was needed as otherwise as many threads as cores in the 
>> system
>> were used, while parallelization showed some performance improvement only 
>> until
>> using around 4 (or maybe 8) threads.
>>
>> With this patch, the number of threads used for lflow parallel build can be
>> specified either:
>> - at startup, using --n-threads=<N> as ovn-northd command line option
>> - using unixctl
>> If the number of threads specified is > 1, then parallelization is enabled.
>> If the number is 1, parallelization is disabled.
>> If the number is < 1, parallelization is disabled at startup and a warning
>> is logged.
>> If the number is > 256, parallelization is enabled (with 256 threads) and
>> a warning is logged.
>>
>> The following unixctl have been added:
>> - set-n-threads <N>: set the number of treads used.
>> - get-n-threads: returns the number of threads used
>> If the number of threads is within <2-256> bounds, parallelization is 
>> enabled.
>> If the number of thread is 1, parallelization is disabled.
>> Otherwise an error is thrown.
>>
>> Note that, if set-n-threads failed for any reason (e.g. failure to setup some
>> semaphore), parallelization is disabled, and get-n-thread will return 1.
>>
>> Reported-at: https://bugzilla.redhat.com/show_bug.cgi?id=2078552
>> Signed-off-by: Xavier Simonart <[email protected]>
>>
>> ---
>> v2:  - handled Dumitru's comments
>>      - added missing mutex_destroy
>>      - fixed issue when use_logical_dp_group is enabled after northd startup
>>      - rebased on top of main
>> v3:
>>      - fix mutex_destroy issue
>> v4:
>>      - handled Mark's comments
>>      - rebased on top of main
>> ---
>>  NEWS                      |   7 +
>>  lib/ovn-parallel-hmap.c   | 291 +++++++++++++++++++++-----------------
>>  lib/ovn-parallel-hmap.h   |  30 ++--
>>  northd/northd.c           | 176 ++++++++++++-----------
>>  northd/northd.h           |   1 +
>>  northd/ovn-northd-ddlog.c |   6 -
>>  northd/ovn-northd.8.xml   |  70 +++++----
>>  northd/ovn-northd.c       |  68 ++++++++-
>>  tests/ovn-macros.at       |  59 ++------
>>  tests/ovn-northd.at       | 109 ++++++++++++++
>>  10 files changed, 495 insertions(+), 322 deletions(-)
>>
> 
> Hi Xavier,
> 
> Sorry, I should've mentioned this earlier but I forgot.  Can you please
> also add an option to ovn-ctl to allow specifiying the number of threads?
> 
> Something along the lines of:
> 
> ovn-ctl start_northd .. --ovn-northd-n-threads=42
> 
> This would allow CMSs that use ovn-ctl [0] to enable parallelization.
> For ovn-org/ovn-kubernetes in particular, we could even make it such
> that in the ovn-org/ovn repo the ovn-kubernetes CI run we do runs both
> with and without northd parallelization.
> 
> Overall the code looks good but given that we probably also want the
> ovn-ctl change I also left a few very small style comments below.
> 
> Thanks,
> Dumitru

[0]
https://github.com/ovn-org/ovn-kubernetes/blob/4d79168271dcb22e8f2a580f5e75e5e38232224b/dist/images/ovnkube.sh#L857

> 
>> diff --git a/NEWS b/NEWS
>> index 244824e3f..6e489df32 100644
>> --- a/NEWS
>> +++ b/NEWS
>> @@ -9,6 +9,13 @@ Post v22.03.0
>>      implicit drop behavior on logical switches with ACLs applied.
>>    - Support (LSP.options:qos_min_rate) to guarantee minimal bandwidth 
>> available
>>      for a logical port.
>> +  - Changed the way to enable northd parallelization.
>> +    Removed support for:
>> +    - use_parallel_build in NBDB.
>> +    - --dummy-numa in northd cmdline.
>> +    Added support for:
>> +    -  --n-threads=<N> in northd cmdline.
>> +    - set-n-threads/get-n-threads unixctls.
>>  
>>  OVN v22.03.0 - 11 Mar 2022
>>  --------------------------
>> diff --git a/lib/ovn-parallel-hmap.c b/lib/ovn-parallel-hmap.c
>> index 7edc4c0b6..c4d8cee16 100644
>> --- a/lib/ovn-parallel-hmap.c
>> +++ b/lib/ovn-parallel-hmap.c
>> @@ -38,14 +38,10 @@ VLOG_DEFINE_THIS_MODULE(ovn_parallel_hmap);
>>  
>>  #ifndef OVS_HAS_PARALLEL_HMAP
>>  
>> -#define WORKER_SEM_NAME "%x-%p-%x"
>> +#define WORKER_SEM_NAME "%x-%p-%"PRIxSIZE
>>  #define MAIN_SEM_NAME "%x-%p-main"
>>  
>> -/* These are accessed under mutex inside add_worker_pool().
>> - * They do not need to be atomic.
>> - */
>>  static atomic_bool initial_pool_setup = ATOMIC_VAR_INIT(false);
>> -static bool can_parallelize = false;
>>  
>>  /* This is set only in the process of exit and the set is
>>   * accompanied by a fence. It does not need to be atomic or be
>> @@ -57,18 +53,18 @@ static struct ovs_list worker_pools = 
>> OVS_LIST_INITIALIZER(&worker_pools);
>>  
>>  static struct ovs_mutex init_mutex = OVS_MUTEX_INITIALIZER;
>>  
>> -static int pool_size;
>> +static size_t pool_size = 1;
>>  
>>  static int sembase;
>>  
>>  static void worker_pool_hook(void *aux OVS_UNUSED);
>> -static void setup_worker_pools(bool force);
>> +static void setup_worker_pools(void);
>>  static void merge_list_results(struct worker_pool *pool OVS_UNUSED,
>>                                 void *fin_result, void *result_frags,
>> -                               int index);
>> +                               size_t index);
>>  static void merge_hash_results(struct worker_pool *pool OVS_UNUSED,
>>                                 void *fin_result, void *result_frags,
>> -                               int index);
>> +                               size_t index);
>>  
>>  bool
>>  ovn_stop_parallel_processing(void)
>> @@ -76,107 +72,184 @@ ovn_stop_parallel_processing(void)
>>      return workers_must_exit;
>>  }
>>  
>> -bool
>> -ovn_can_parallelize_hashes(bool force_parallel)
>> +size_t
>> +ovn_get_worker_pool_size(void)
>>  {
>> -    bool test = false;
>> +    return pool_size;
>> +}
>>  
>> -    if (atomic_compare_exchange_strong(
>> -            &initial_pool_setup,
>> -            &test,
>> -            true)) {
>> -        ovs_mutex_lock(&init_mutex);
>> -        setup_worker_pools(force_parallel);
>> -        ovs_mutex_unlock(&init_mutex);
>> +static void
>> +stop_controls(struct worker_pool *pool)
>> +{
>> +    if (pool->controls) {
>> +        workers_must_exit = true;
>> +
>> +        /* unlock threads */
> 
> Nit: /* Unlock threads. */
> 
>> +        for (size_t i = 0; i < pool->size ; i++) {
>> +            if (pool->controls[i].fire != SEM_FAILED) {
>> +                sem_post(pool->controls[i].fire);
>> +            }
>> +        }
>> +
>> +        /* Wait completion */
> 
> Nit: /* Wait for completion. */
> 
>> +        for (size_t i = 0; i < pool->size ; i++) {
>> +            if (pool->controls[i].worker) {
>> +                pthread_join(pool->controls[i].worker, NULL);
>> +                pool->controls[i].worker = 0;
>> +            }
>> +        }
>> +        workers_must_exit = false;
>>      }
>> -    return can_parallelize;
>>  }
>>  
>> -struct worker_pool *
>> -ovn_add_worker_pool(void *(*start)(void *))
>> +static void
>> +free_controls(struct worker_pool *pool)
>> +{
>> +    char sem_name[256];
>> +    if (pool->controls) {
>> +        /* Close/unlink semaphores */
> 
> Nit: /* Close/unlink semaphores. */
> 
>> +        for (size_t i = 0; i < pool->size; i++) {
>> +            ovs_mutex_destroy(&pool->controls[i].mutex);
>> +            if (pool->controls[i].fire != SEM_FAILED) {
>> +                sem_close(pool->controls[i].fire);
>> +                sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, i);
>> +                sem_unlink(sem_name);
>> +            } else {
>> +               /* This and following controls are not initialized */
> 
> Nit: indent needs one extra space.
> 
>> +                break;
>> +            }
>> +        }
>> +        free(pool->controls);
>> +        pool->controls = NULL;
>> +    }
>> +}
>> +
>> +static void
>> +free_pool(struct worker_pool *pool)
>> +{
>> +    char sem_name[256];
>> +    stop_controls(pool);
>> +    free_controls(pool);
>> +    if (pool->done != SEM_FAILED) {
>> +        sem_close(pool->done);
>> +        sprintf(sem_name, MAIN_SEM_NAME, sembase, pool);
>> +        sem_unlink(sem_name);
>> +    }
>> +    free(pool);
>> +}
>> +
>> +static int
>> +init_controls(struct worker_pool *pool)
>>  {
>> -    struct worker_pool *new_pool = NULL;
>>      struct worker_control *new_control;
>> +    char sem_name[256];
>> +
>> +    pool->controls = xmalloc(sizeof(struct worker_control) * pool->size);
>> +    for (size_t i = 0; i < pool->size ; i++) {
>> +        pool->controls[i].fire = SEM_FAILED;
>> +    }
>> +    for (size_t i = 0; i < pool->size; i++) {
>> +        new_control = &pool->controls[i];
>> +        new_control->id = i;
>> +        new_control->done = pool->done;
>> +        new_control->data = NULL;
>> +        new_control->pool = pool;
>> +        new_control->worker = 0;
>> +        ovs_mutex_init(&new_control->mutex);
>> +        new_control->finished = ATOMIC_VAR_INIT(false);
>> +        sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, i);
>> +        new_control->fire = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
>> +        if (new_control->fire == SEM_FAILED) {
>> +            free_controls(pool);
>> +            return -1;
>> +        }
>> +    }
>> +    return 0;
>> +}
>> +
>> +static void
>> +init_threads(struct worker_pool *pool, void *(*start)(void *))
>> +{
>> +    for (size_t i = 0; i < pool_size; i++) {
>> +        pool->controls[i].worker =
>> +            ovs_thread_create("worker pool helper", start, 
>> &pool->controls[i]);
>> +    }
>> +    ovs_list_push_back(&worker_pools, &pool->list_node);
>> +}
>> +
>> +enum pool_update_status
>> +ovn_update_worker_pool(size_t requested_pool_size,
>> +                       struct worker_pool **pool, void *(*start)(void *))
>> +{
>>      bool test = false;
>> -    int i;
>>      char sem_name[256];
>>  
>> -    /* Belt and braces - initialize the pool system just in case if
>> -     * if it is not yet initialized.
>> -     */
>> +    if (requested_pool_size == pool_size) {
>> +        return POOL_UNCHANGED;
>> +    }
>> +
>>      if (atomic_compare_exchange_strong(
>>              &initial_pool_setup,
>>              &test,
>>              true)) {
>>          ovs_mutex_lock(&init_mutex);
>> -        setup_worker_pools(false);
>> +        setup_worker_pools();
>>          ovs_mutex_unlock(&init_mutex);
>>      }
>> -
>>      ovs_mutex_lock(&init_mutex);
>> -    if (can_parallelize) {
>> -        new_pool = xmalloc(sizeof(struct worker_pool));
>> -        new_pool->size = pool_size;
>> -        new_pool->controls = NULL;
>> -        sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
>> -        new_pool->done = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
>> -        if (new_pool->done == SEM_FAILED) {
>> -            goto cleanup;
>> -        }
>> -
>> -        new_pool->controls =
>> -            xmalloc(sizeof(struct worker_control) * new_pool->size);
>> -
>> -        for (i = 0; i < new_pool->size; i++) {
>> -            new_control = &new_pool->controls[i];
>> -            new_control->id = i;
>> -            new_control->done = new_pool->done;
>> -            new_control->data = NULL;
>> -            ovs_mutex_init(&new_control->mutex);
>> -            new_control->finished = ATOMIC_VAR_INIT(false);
>> -            sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i);
>> -            new_control->fire = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
>> -            if (new_control->fire == SEM_FAILED) {
>> +    pool_size = requested_pool_size;
>> +    VLOG_INFO("Setting thread count to %"PRIuSIZE, pool_size);
>> +
>> +    if (*pool == NULL) {
>> +        if (pool_size > 1) {
>> +            VLOG_INFO("Creating new pool with size %"PRIuSIZE, pool_size);
>> +            *pool = xmalloc(sizeof(struct worker_pool));
>> +            (*pool)->size = pool_size;
>> +            (*pool)->controls = NULL;
>> +            sprintf(sem_name, MAIN_SEM_NAME, sembase, *pool);
>> +            (*pool)->done = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
>> +            if ((*pool)->done == SEM_FAILED) {
>>                  goto cleanup;
>>              }
>> +            if (init_controls(*pool) == -1) {
>> +                goto cleanup;
>> +            }
>> +            init_threads(*pool, start);
>>          }
>> -
>> -        for (i = 0; i < pool_size; i++) {
>> -            new_pool->controls[i].worker =
>> -                ovs_thread_create("worker pool helper", start, 
>> &new_pool->controls[i]);
>> +    } else {
>> +        if (pool_size > 1) {
>> +            VLOG_INFO("Changing size of existing pool to %"PRIuSIZE,
>> +                      pool_size);
>> +            stop_controls(*pool);
>> +            free_controls(*pool);
>> +            ovs_list_remove(&(*pool)->list_node);
>> +            (*pool)->size = pool_size;
>> +            if (init_controls(*pool) == -1) {
>> +                goto cleanup;
>> +            }
>> +            init_threads(*pool, start);
>> +        } else {
>> +            VLOG_INFO("Deleting existing pool");
>> +            worker_pool_hook(NULL);
>> +            *pool = NULL;
>>          }
>> -        ovs_list_push_back(&worker_pools, &new_pool->list_node);
>>      }
>>      ovs_mutex_unlock(&init_mutex);
>> -    return new_pool;
>> -cleanup:
>> +    return POOL_UPDATED;
>>  
>> +cleanup:
>>      /* Something went wrong when opening semaphores. In this case
>>       * it is better to shut off parallel procesing altogether
>>       */
>> -
>> -    VLOG_INFO("Failed to initialize parallel processing, error %d", errno);
>> -    can_parallelize = false;
>> -    if (new_pool->controls) {
>> -        for (i = 0; i < new_pool->size; i++) {
>> -            if (new_pool->controls[i].fire != SEM_FAILED) {
>> -                sem_close(new_pool->controls[i].fire);
>> -                sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i);
>> -                sem_unlink(sem_name);
>> -                break; /* semaphores past this one are uninitialized */
>> -            }
>> -        }
>> -    }
>> -    if (new_pool->done != SEM_FAILED) {
>> -        sem_close(new_pool->done);
>> -        sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
>> -        sem_unlink(sem_name);
>> -    }
>> +    VLOG_ERR("Failed to initialize parallel processing: %s",
>> +             ovs_strerror(errno));
>> +    free_pool(*pool);
>> +    *pool = NULL;
>> +    pool_size = 1;
>>      ovs_mutex_unlock(&init_mutex);
>> -    return NULL;
>> +    return POOL_UPDATE_FAILED;
>>  }
>>  
>> -
>>  /* Initializes 'hmap' as an empty hash table with mask N. */
>>  void
>>  ovn_fast_hmap_init(struct hmap *hmap, ssize_t mask)
>> @@ -225,9 +298,9 @@ ovn_run_pool_callback(struct worker_pool *pool,
>>                        void *fin_result, void *result_frags,
>>                        void (*helper_func)(struct worker_pool *pool,
>>                                            void *fin_result,
>> -                                          void *result_frags, int index))
>> +                                          void *result_frags, size_t index))
>>  {
>> -    int index, completed;
>> +    size_t index, completed;
>>  
>>      /* Ensure that all worker threads see the same data as the
>>       * main thread.
>> @@ -367,9 +440,7 @@ ovn_update_hashrow_locks(struct hmap *lflows, struct 
>> hashrow_locks *hrl)
>>  
>>  static void
>>  worker_pool_hook(void *aux OVS_UNUSED) {
>> -    int i;
>>      static struct worker_pool *pool;
>> -    char sem_name[256];
>>  
>>      workers_must_exit = true;
>>  
>> @@ -380,55 +451,15 @@ worker_pool_hook(void *aux OVS_UNUSED) {
>>       */
>>      atomic_thread_fence(memory_order_acq_rel);
>>  
>> -    /* Wake up the workers after the must_exit flag has been set */
>> -
>> -    LIST_FOR_EACH (pool, list_node, &worker_pools) {
>> -        for (i = 0; i < pool->size ; i++) {
>> -            sem_post(pool->controls[i].fire);
>> -        }
>> -        for (i = 0; i < pool->size ; i++) {
>> -            pthread_join(pool->controls[i].worker, NULL);
>> -        }
>> -        for (i = 0; i < pool->size ; i++) {
>> -            sem_close(pool->controls[i].fire);
>> -            sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, i);
>> -            sem_unlink(sem_name);
>> -        }
>> -        sem_close(pool->done);
>> -        sprintf(sem_name, MAIN_SEM_NAME, sembase, pool);
>> -        sem_unlink(sem_name);
>> +    LIST_FOR_EACH_SAFE (pool, list_node, &worker_pools) {
>> +        ovs_list_remove(&pool->list_node);
>> +        free_pool(pool);
>>      }
>>  }
>>  
>>  static void
>> -setup_worker_pools(bool force) {
>> -    int cores, nodes;
>> -
>> -    ovs_numa_init();
>> -    nodes = ovs_numa_get_n_numas();
>> -    if (nodes == OVS_NUMA_UNSPEC || nodes <= 0) {
>> -        nodes = 1;
>> -    }
>> -    cores = ovs_numa_get_n_cores();
>> -
>> -    /* If there is no NUMA config, use 4 cores.
>> -     * If there is NUMA config use half the cores on
>> -     * one node so that the OS does not start pushing
>> -     * threads to other nodes.
>> -     */
>> -    if (cores == OVS_CORE_UNSPEC || cores <= 0) {
>> -        /* If there is no NUMA we can try the ovs-threads routine.
>> -         * It falls back to sysconf and/or affinity mask.
>> -         */
>> -        cores = count_cpu_cores();
>> -        pool_size = cores;
>> -    } else {
>> -        pool_size = cores / nodes;
>> -    }
>> -    if ((pool_size < 4) && force) {
>> -        pool_size = 4;
>> -    }
>> -    can_parallelize = (pool_size >= 3);
>> +setup_worker_pools(void)
>> +{
>>      fatal_signal_add_hook(worker_pool_hook, NULL, NULL, true);
>>      sembase = random_uint32();
>>  }
>> @@ -436,7 +467,7 @@ setup_worker_pools(bool force) {
>>  static void
>>  merge_list_results(struct worker_pool *pool OVS_UNUSED,
>>                     void *fin_result, void *result_frags,
>> -                   int index)
>> +                   size_t index)
>>  {
>>      struct ovs_list *result = (struct ovs_list *)fin_result;
>>      struct ovs_list *res_frags = (struct ovs_list *)result_frags;
>> @@ -450,7 +481,7 @@ merge_list_results(struct worker_pool *pool OVS_UNUSED,
>>  static void
>>  merge_hash_results(struct worker_pool *pool OVS_UNUSED,
>>                     void *fin_result, void *result_frags,
>> -                   int index)
>> +                   size_t index)
>>  {
>>      struct hmap *result = (struct hmap *)fin_result;
>>      struct hmap *res_frags = (struct hmap *)result_frags;
>> diff --git a/lib/ovn-parallel-hmap.h b/lib/ovn-parallel-hmap.h
>> index 0f7d68770..72b31b489 100644
>> --- a/lib/ovn-parallel-hmap.h
>> +++ b/lib/ovn-parallel-hmap.h
>> @@ -81,21 +81,30 @@ struct worker_control {
>>      sem_t *done; /* Work completion semaphore - sem_post on completion. */
>>      struct ovs_mutex mutex; /* Guards the data. */
>>      void *data; /* Pointer to data to be processed. */
>> -    void *workload; /* back-pointer to the worker pool structure. */
>>      pthread_t worker;
>> +    struct worker_pool *pool;
>>  };
>>  
>>  struct worker_pool {
>> -    int size;   /* Number of threads in the pool. */
>> +    size_t size;   /* Number of threads in the pool. */
>>      struct ovs_list list_node; /* List of pools - used in cleanup/exit. */
>>      struct worker_control *controls; /* "Handles" in this pool. */
>>      sem_t *done; /* Work completion semaphorew. */
>>  };
>>  
>> -/* Add a worker pool for thread function start() which expects a pointer to
>> - * a worker_control structure as an argument. */
>> +/* Return pool size; bigger than 1 means parallelization has been enabled. 
>> */
>> +size_t ovn_get_worker_pool_size(void);
>>  
>> -struct worker_pool *ovn_add_worker_pool(void *(*start)(void *));
>> +enum pool_update_status {
>> +     POOL_UNCHANGED,     /* no change to pool */
>> +     POOL_UPDATED,       /* pool has been updated */
>> +     POOL_UPDATE_FAILED, /* pool update failed; parallelization disabled */
>> +};
> 
> Nit: I would add an empty line here.
> 
>> +/* Add/delete a worker pool for thread function start() which expects a 
>> pointer
>> + * to a worker_control structure as an argument. Return true if updated */
>> +enum pool_update_status ovn_update_worker_pool(size_t requested_pool_size,
>> +                                               struct worker_pool **,
>> +                                               void *(*start)(void *));
>>  
>>  /* Setting this to true will make all processing threads exit */
>>  
>> @@ -140,7 +149,8 @@ void ovn_run_pool_list(struct worker_pool *pool,
>>  void ovn_run_pool_callback(struct worker_pool *pool, void *fin_result,
>>                             void *result_frags,
>>                             void (*helper_func)(struct worker_pool *pool,
>> -                           void *fin_result, void *result_frags, int 
>> index));
>> +                           void *fin_result, void *result_frags,
>> +                           size_t index));
>>  
>>  
>>  /* Returns the first node in 'hmap' in the bucket in which the given 'hash'
>> @@ -251,17 +261,17 @@ static inline void init_hash_row_locks(struct 
>> hashrow_locks *hrl)
>>      hrl->row_locks = NULL;
>>  }
>>  
>> -bool ovn_can_parallelize_hashes(bool force_parallel);
>> -
>>  /* Use the OVN library functions for stuff which OVS has not defined
>>   * If OVS has defined these, they will still compile using the OVN
>>   * local names, but will be dropped by the linker in favour of the OVS
>>   * supplied functions.
>>   */
>> +#define update_worker_pool(requested_pool_size, existing_pool, func) \
>> +    ovn_update_worker_pool(requested_pool_size, existing_pool, func)
>>  
>> -#define update_hashrow_locks(lflows, hrl) ovn_update_hashrow_locks(lflows, 
>> hrl)
>> +#define get_worker_pool_size() ovn_get_worker_pool_size()
>>  
>> -#define can_parallelize_hashes(force) ovn_can_parallelize_hashes(force)
>> +#define update_hashrow_locks(lflows, hrl) ovn_update_hashrow_locks(lflows, 
>> hrl)
>>  
>>  #define stop_parallel_processing() ovn_stop_parallel_processing()
>>  
>> diff --git a/northd/northd.c b/northd/northd.c
>> index 67c39df88..48426c801 100644
>> --- a/northd/northd.c
>> +++ b/northd/northd.c
>> @@ -59,6 +59,7 @@
>>  VLOG_DEFINE_THIS_MODULE(northd);
>>  
>>  static bool controller_event_en;
>> +static bool lflow_hash_lock_initialized = false;
>>  
>>  static bool check_lsp_is_up;
>>  
>> @@ -4740,7 +4741,13 @@ ovn_lflow_equal(const struct ovn_lflow *a, const 
>> struct ovn_datapath *od,
>>  /* If this option is 'true' northd will combine logical flows that differ by
>>   * logical datapath only by creating a datapath group. */
>>  static bool use_logical_dp_groups = false;
>> -static bool use_parallel_build = false;
>> +
>> +enum {
>> +    STATE_NULL,               /* parallelization is off */
>> +    STATE_INIT_HASH_SIZES,    /* parallelization is on; hashes sizing 
>> needed */
>> +    STATE_USE_PARALLELIZATION /* parallelization is on */
>> +};
>> +static int parallelization_state = STATE_NULL;
>>  
>>  static void
>>  ovn_lflow_init(struct ovn_lflow *lflow, struct ovn_datapath *od,
>> @@ -4759,7 +4766,8 @@ ovn_lflow_init(struct ovn_lflow *lflow, struct 
>> ovn_datapath *od,
>>      lflow->ctrl_meter = ctrl_meter;
>>      lflow->dpg = NULL;
>>      lflow->where = where;
>> -    if (use_parallel_build && use_logical_dp_groups) {
>> +    if ((parallelization_state != STATE_NULL)
>> +        && use_logical_dp_groups) {
>>          ovs_mutex_init(&lflow->odg_lock);
>>      }
>>  }
>> @@ -4773,7 +4781,7 @@ ovn_dp_group_add_with_reference(struct ovn_lflow 
>> *lflow_ref,
>>          return false;
>>      }
>>  
>> -    if (use_parallel_build) {
>> +    if (parallelization_state == STATE_USE_PARALLELIZATION) {
>>          ovs_mutex_lock(&lflow_ref->odg_lock);
>>          hmapx_add(&lflow_ref->od_group, od);
>>          ovs_mutex_unlock(&lflow_ref->odg_lock);
>> @@ -4803,9 +4811,23 @@ static struct ovs_mutex 
>> lflow_hash_locks[LFLOW_HASH_LOCK_MASK + 1];
>>  static void
>>  lflow_hash_lock_init(void)
>>  {
>> -    for (size_t i = 0; i < LFLOW_HASH_LOCK_MASK + 1; i++) {
>> -        ovs_mutex_init(&lflow_hash_locks[i]);
>> +    if (!lflow_hash_lock_initialized) {
>> +        for (size_t i = 0; i < LFLOW_HASH_LOCK_MASK + 1; i++) {
>> +            ovs_mutex_init(&lflow_hash_locks[i]);
>> +        }
>> +        lflow_hash_lock_initialized = true;
>> +    }
>> +}
>> +
>> +static void
>> +lflow_hash_lock_destroy(void)
>> +{
>> +    if (lflow_hash_lock_initialized) {
>> +        for (size_t i = 0; i < LFLOW_HASH_LOCK_MASK + 1; i++) {
>> +            ovs_mutex_destroy(&lflow_hash_locks[i]);
>> +        }
>>      }
>> +    lflow_hash_lock_initialized = false;
>>  }
>>  
>>  /* This thread-local var is used for parallel lflow building when dp-groups 
>> is
>> @@ -4853,7 +4875,7 @@ do_ovn_lflow_add(struct hmap *lflow_map, struct 
>> ovn_datapath *od,
>>                     nullable_xstrdup(ctrl_meter),
>>                     ovn_lflow_hint(stage_hint), where);
>>      hmapx_add(&lflow->od_group, od);
>> -    if (!use_parallel_build) {
>> +    if (parallelization_state != STATE_USE_PARALLELIZATION) {
>>          hmap_insert(lflow_map, &lflow->hmap_node, hash);
>>      } else {
>>          hmap_insert_fast(lflow_map, &lflow->hmap_node, hash);
>> @@ -4896,7 +4918,8 @@ ovn_lflow_add_at_with_hash(struct hmap *lflow_map, 
>> struct ovn_datapath *od,
>>      struct ovn_lflow *lflow;
>>  
>>      ovs_assert(ovn_stage_to_datapath_type(stage) == 
>> ovn_datapath_get_type(od));
>> -    if (use_logical_dp_groups && use_parallel_build) {
>> +    if (use_logical_dp_groups
>> +        && (parallelization_state == STATE_USE_PARALLELIZATION)) {
>>          lflow = do_ovn_lflow_add_pd(lflow_map, od, hash, stage, priority,
>>                                      match, actions, io_port, stage_hint, 
>> where,
>>                                      ctrl_meter);
>> @@ -4982,6 +5005,10 @@ static void
>>  ovn_lflow_destroy(struct hmap *lflows, struct ovn_lflow *lflow)
>>  {
>>      if (lflow) {
>> +        if ((parallelization_state != STATE_NULL)
>> +            && use_logical_dp_groups) {
>> +            ovs_mutex_destroy(&lflow->odg_lock);
>> +        }
>>          if (lflows) {
>>              hmap_remove(lflows, &lflow->hmap_node);
>>          }
>> @@ -13925,15 +13952,10 @@ build_lswitch_and_lrouter_iterate_by_op(struct 
>> ovn_port *op,
>>                                        &lsi->actions);
>>  }
>>  
>> -struct lflows_thread_pool {
>> -    struct worker_pool *pool;
>> -};
>> -
>>  static void *
>>  build_lflows_thread(void *arg)
>>  {
>>      struct worker_control *control = (struct worker_control *) arg;
>> -    struct lflows_thread_pool *workload;
>>      struct lswitch_flow_build_info *lsi;
>>  
>>      struct ovn_datapath *od;
>> @@ -13944,17 +13966,16 @@ build_lflows_thread(void *arg)
>>  
>>      while (!stop_parallel_processing()) {
>>          wait_for_work(control);
>> -        workload = (struct lflows_thread_pool *) control->workload;
>>          lsi = (struct lswitch_flow_build_info *) control->data;
>>          if (stop_parallel_processing()) {
>>              return NULL;
>>          }
>>          thread_lflow_counter = 0;
>> -        if (lsi && workload) {
>> +        if (lsi) {
>>              /* Iterate over bucket ThreadID, ThreadID+size, ... */
>>              for (bnum = control->id;
>>                      bnum <= lsi->datapaths->mask;
>> -                    bnum += workload->pool->size)
>> +                    bnum += control->pool->size)
>>              {
>>                  HMAP_FOR_EACH_IN_PARALLEL (od, key_node, bnum, 
>> lsi->datapaths) {
>>                      if (stop_parallel_processing()) {
>> @@ -13965,7 +13986,7 @@ build_lflows_thread(void *arg)
>>              }
>>              for (bnum = control->id;
>>                      bnum <= lsi->ports->mask;
>> -                    bnum += workload->pool->size)
>> +                    bnum += control->pool->size)
>>              {
>>                  HMAP_FOR_EACH_IN_PARALLEL (op, key_node, bnum, lsi->ports) {
>>                      if (stop_parallel_processing()) {
>> @@ -13976,7 +13997,7 @@ build_lflows_thread(void *arg)
>>              }
>>              for (bnum = control->id;
>>                      bnum <= lsi->lbs->mask;
>> -                    bnum += workload->pool->size)
>> +                    bnum += control->pool->size)
>>              {
>>                  HMAP_FOR_EACH_IN_PARALLEL (lb, hmap_node, bnum, lsi->lbs) {
>>                      if (stop_parallel_processing()) {
>> @@ -13997,7 +14018,7 @@ build_lflows_thread(void *arg)
>>              }
>>              for (bnum = control->id;
>>                      bnum <= lsi->igmp_groups->mask;
>> -                    bnum += workload->pool->size)
>> +                    bnum += control->pool->size)
>>              {
>>                  HMAP_FOR_EACH_IN_PARALLEL (
>>                          igmp_group, hmap_node, bnum, lsi->igmp_groups) {
>> @@ -14016,39 +14037,13 @@ build_lflows_thread(void *arg)
>>      return NULL;
>>  }
>>  
>> -static bool pool_init_done = false;
>> -static struct lflows_thread_pool *build_lflows_pool = NULL;
>> -
>> -static void
>> -init_lflows_thread_pool(void)
>> -{
>> -    int index;
>> -
>> -    if (!pool_init_done) {
>> -        struct worker_pool *pool = add_worker_pool(build_lflows_thread);
>> -        pool_init_done = true;
>> -        if (pool) {
>> -            build_lflows_pool = xmalloc(sizeof(*build_lflows_pool));
>> -            build_lflows_pool->pool = pool;
>> -            for (index = 0; index < build_lflows_pool->pool->size; index++) 
>> {
>> -                build_lflows_pool->pool->controls[index].workload =
>> -                    build_lflows_pool;
>> -            }
>> -        }
>> -    }
>> -}
>> -
>> -/* TODO: replace hard cutoffs by configurable via commands. These are
>> - * temporary defines to determine single-thread to multi-thread processing
>> - * cutoff.
>> - * Setting to 1 forces "all parallel" lflow build.
>> - */
>> +static struct worker_pool *build_lflows_pool = NULL;
>>  
>>  static void
>>  noop_callback(struct worker_pool *pool OVS_UNUSED,
>>                void *fin_result OVS_UNUSED,
>>                void *result_frags OVS_UNUSED,
>> -              int index OVS_UNUSED)
>> +              size_t index OVS_UNUSED)
>>  {
>>      /* Do nothing */
>>  }
>> @@ -14088,28 +14083,21 @@ build_lswitch_and_lrouter_flows(const struct hmap 
>> *datapaths,
>>  
>>      char *svc_check_match = xasprintf("eth.dst == %s", svc_monitor_mac);
>>  
>> -    if (use_parallel_build) {
>> -        init_lflows_thread_pool();
>> -        if (!can_parallelize_hashes(false)) {
>> -            use_parallel_build = false;
>> -        }
>> -    }
>> -
>> -    if (use_parallel_build) {
>> +    if (parallelization_state == STATE_USE_PARALLELIZATION) {
>>          struct hmap *lflow_segs;
>>          struct lswitch_flow_build_info *lsiv;
>>          int index;
>>  
>> -        lsiv = xcalloc(sizeof(*lsiv), build_lflows_pool->pool->size);
>> +        lsiv = xcalloc(sizeof(*lsiv), build_lflows_pool->size);
>>          if (use_logical_dp_groups) {
>>              lflow_segs = NULL;
>>          } else {
>> -            lflow_segs = xcalloc(sizeof(*lflow_segs), 
>> build_lflows_pool->pool->size);
>> +            lflow_segs = xcalloc(sizeof(*lflow_segs), 
>> build_lflows_pool->size);
>>          }
>>  
>>          /* Set up "work chunks" for each thread to work on. */
>>  
>> -        for (index = 0; index < build_lflows_pool->pool->size; index++) {
>> +        for (index = 0; index < build_lflows_pool->size; index++) {
>>              if (use_logical_dp_groups) {
>>                  /* if dp_groups are in use we lock a shared lflows hash
>>                   * on a per-bucket level instead of merging hash frags */
>> @@ -14132,19 +14120,19 @@ build_lswitch_and_lrouter_flows(const struct hmap 
>> *datapaths,
>>              ds_init(&lsiv[index].match);
>>              ds_init(&lsiv[index].actions);
>>  
>> -            build_lflows_pool->pool->controls[index].data = &lsiv[index];
>> +            build_lflows_pool->controls[index].data = &lsiv[index];
>>          }
>>  
>>          /* Run thread pool. */
>>          if (use_logical_dp_groups) {
>> -            run_pool_callback(build_lflows_pool->pool, NULL, NULL,
>> +            run_pool_callback(build_lflows_pool, NULL, NULL,
>>                                noop_callback);
>> -            fix_flow_map_size(lflows, lsiv, build_lflows_pool->pool->size);
>> +            fix_flow_map_size(lflows, lsiv, build_lflows_pool->size);
>>          } else {
>> -            run_pool_hash(build_lflows_pool->pool, lflows, lflow_segs);
>> +            run_pool_hash(build_lflows_pool, lflows, lflow_segs);
>>          }
>>  
>> -        for (index = 0; index < build_lflows_pool->pool->size; index++) {
>> +        for (index = 0; index < build_lflows_pool->size; index++) {
>>              ds_destroy(&lsiv[index].match);
>>              ds_destroy(&lsiv[index].actions);
>>          }
>> @@ -14280,8 +14268,39 @@ ovn_sb_set_lflow_logical_dp_group(
>>  }
>>  
>>  static ssize_t max_seen_lflow_size = 128;
>> -static bool needs_parallel_init = true;
>> -static bool reset_parallel = false;
>> +
>> +void run_update_worker_pool(int n_threads)
>> +{
>> +    /* If number of threads has been updated (or initially set),
>> +     * update the worker pool. */
>> +    if (update_worker_pool(n_threads, &build_lflows_pool,
>> +                           build_lflows_thread) != POOL_UNCHANGED) {
>> +        /* worker pool was updated */
>> +        if (get_worker_pool_size() <= 1) {
>> +            /* destroy potentially created lflow_hash_lock */
>> +            lflow_hash_lock_destroy();
>> +            parallelization_state = STATE_NULL;
>> +        } else if (parallelization_state != STATE_USE_PARALLELIZATION) {
>> +            if (use_logical_dp_groups) {
>> +                lflow_hash_lock_init();
>> +                parallelization_state = STATE_INIT_HASH_SIZES;
>> +            } else {
>> +                parallelization_state = STATE_USE_PARALLELIZATION;
>> +            }
>> +        }
>> +    }
>> +}
>> +
>> +static void worker_pool_init_for_ldp(void)
>> +{
>> +    /* If parallelization is enabled, make sure locks are initialized
>> +     * when ldp are used.
>> +     */
>> +    if (parallelization_state != STATE_NULL) {
>> +        lflow_hash_lock_init();
>> +        parallelization_state = STATE_INIT_HASH_SIZES;
>> +    }
>> +}
>>  
>>  static void
>>  build_mcast_groups(struct lflow_input *data,
>> @@ -14302,30 +14321,18 @@ void build_lflows(struct lflow_input *input_data,
>>      build_mcast_groups(input_data, input_data->datapaths, input_data->ports,
>>                         &mcast_groups, &igmp_groups);
>>  
>> -    if (reset_parallel) {
>> -        /* Parallel build was disabled before, we need to
>> -         * re-enable it. */
>> -        use_parallel_build = true;
>> -        reset_parallel = false;
>> -    }
>> -
>>      fast_hmap_size_for(&lflows, max_seen_lflow_size);
>>  
>> -    if (use_parallel_build && use_logical_dp_groups &&
>> -        needs_parallel_init) {
>> -        lflow_hash_lock_init();
>> -        needs_parallel_init = false;
>> -        /* Disable parallel build on first run with dp_groups
>> -         * to determine the correct sizing of hashes. */
>> -        use_parallel_build = false;
>> -        reset_parallel = true;
>> -    }
>>      build_lswitch_and_lrouter_flows(input_data->datapaths, 
>> input_data->ports,
>>                                      input_data->port_groups, &lflows,
>>                                      &mcast_groups, &igmp_groups,
>>                                      input_data->meter_groups, 
>> input_data->lbs,
>>                                      input_data->bfd_connections);
>>  
>> +    if (parallelization_state == STATE_INIT_HASH_SIZES) {
>> +        parallelization_state = STATE_USE_PARALLELIZATION;
>> +    }
>> +
>>      /* Parallel build may result in a suboptimal hash. Resize the
>>       * hash to a correct size before doing lookups */
>>  
>> @@ -15567,12 +15574,13 @@ ovnnb_db_run(struct northd_input *input_data,
>>  
>>      smap_destroy(&options);
>>  
>> -    use_parallel_build =
>> -        (smap_get_bool(&nb->options, "use_parallel_build", false) &&
>> -         can_parallelize_hashes(false));
>> -
>> +    bool old_use_ldp = use_logical_dp_groups;
>>      use_logical_dp_groups = smap_get_bool(&nb->options,
>>                                            "use_logical_dp_groups", true);
>> +    if (use_logical_dp_groups && !old_use_ldp) {
>> +        worker_pool_init_for_ldp();
>> +    }
>> +
>>      use_ct_inv_match = smap_get_bool(&nb->options,
>>                                       "use_ct_inv_match", true);
>>  
>> diff --git a/northd/northd.h b/northd/northd.h
>> index 2d804a22e..fe8dad03a 100644
>> --- a/northd/northd.h
>> +++ b/northd/northd.h
>> @@ -107,5 +107,6 @@ void build_bfd_table(struct lflow_input *input_data,
>>                       struct hmap *bfd_connections, struct hmap *ports);
>>  void bfd_cleanup_connections(struct lflow_input *input_data,
>>                               struct hmap *bfd_map);
>> +void run_update_worker_pool(int n_threads);
>>  
>>  #endif /* NORTHD_H */
>> diff --git a/northd/ovn-northd-ddlog.c b/northd/ovn-northd-ddlog.c
>> index 718d052cc..e9afda4c6 100644
>> --- a/northd/ovn-northd-ddlog.c
>> +++ b/northd/ovn-northd-ddlog.c
>> @@ -1084,7 +1084,6 @@ parse_options(int argc OVS_UNUSED, char *argv[] 
>> OVS_UNUSED,
>>          SSL_OPTION_ENUMS,
>>          OPT_DRY_RUN,
>>          OPT_DDLOG_RECORD,
>> -        OPT_DUMMY_NUMA,
>>      };
>>      static const struct option long_options[] = {
>>          {"ovnsb-db", required_argument, NULL, 'd'},
>> @@ -1098,7 +1097,6 @@ parse_options(int argc OVS_UNUSED, char *argv[] 
>> OVS_UNUSED,
>>          OVN_DAEMON_LONG_OPTIONS,
>>          VLOG_LONG_OPTIONS,
>>          STREAM_SSL_LONG_OPTIONS,
>> -        {"dummy-numa", required_argument, NULL, OPT_DUMMY_NUMA},
>>          {NULL, 0, NULL, 0},
>>      };
>>      char *short_options = 
>> ovs_cmdl_long_options_to_short_options(long_options);
>> @@ -1155,10 +1153,6 @@ parse_options(int argc OVS_UNUSED, char *argv[] 
>> OVS_UNUSED,
>>              *pause = true;
>>              break;
>>  
>> -        case OPT_DUMMY_NUMA:
>> -            ovs_numa_set_dummy(optarg);
>> -            break;
>> -
>>          case OPT_DDLOG_RECORD:
>>              record_file = optarg;
>>              break;
>> diff --git a/northd/ovn-northd.8.xml b/northd/ovn-northd.8.xml
>> index 73ecfbc2d..077bd1f41 100644
>> --- a/northd/ovn-northd.8.xml
>> +++ b/northd/ovn-northd.8.xml
>> @@ -68,53 +68,26 @@
>>            restarting a process or disturbing a running system.
>>          </p>
>>        </dd>
>> -      <dt><code>--dummy-numa</code></dt>
>> +      <dt><code>n-threads N</code></dt>
>>        <dd>
>> -        <p>
>> -          Typically, OVS uses sysfs to determine the number of NUMA nodes 
>> and
>> -          CPU cores that are available on a machine. The parallelization 
>> code
>> -          in OVN uses this information to determine if there are enough
>> -          resources to use parallelization. The current algorithm enables
>> -          parallelization if the total number of CPU cores divided by the
>> -          number of NUMA nodes is greater than or equal to four.
>> -        </p>
>> -
>>          <p>
>>            In certain situations, it may be desirable to enable 
>> parallelization
>> -          on a system that otherwise would not have it allowed. The
>> -          <code>--dummy-numa</code> option allows for you to fake the NUMA
>> -          nodes and cores that OVS thinks your system has. The syntax 
>> consists
>> -          of using numbers to represent the NUMA node IDs. The number of 
>> times
>> -          that a NUMA node ID appears represents how many CPU cores that 
>> NUMA
>> -          node contains. So for instance, if you did the following:
>> -        </p>
>> -
>> -        <p>
>> -          <code>--dummy-numa=0,0,0,0</code>
>> -        </p>
>> -
>> -        <p>
>> -          it would make OVS assume that you have a single NUMA node with ID 
>> 0,
>> -          and that NUMA node consists of four CPU cores. Similarly, you 
>> could
>> -          do:
>> -        </p>
>> -
>> -        <p>
>> -          <code>--dummy-numa=0,0,0,0,0,0,1,1,1,1,1,1</code>
>> +          on a system to decrease latency (at the potential cost of 
>> increasing
>> +          CPU usage).
>>          </p>
>>  
>>          <p>
>> -          to make OVS assume you have two NUMA nodes with IDs 0 and 1, each
>> -          with six CPU cores.
>> +          This option will cause ovn-northd to use N threads when building
>> +          logical flows, when N is within [2-256].
>> +          If N is 1, parallelization is disabled (default behavior).
>> +          If N is less than 1, then N is set to 1, parallelization is 
>> disabled
>> +          and a warning is logged.
>> +          If N is more than 256, then N is set to 256, parallelization is
>> +          enabled (with 256 threads) and a warning is logged.
>>          </p>
>>  
>>          <p>
>> -          Currently, the only affect this option has is on whether
>> -          parallelization can be enabled in ovn-northd. There are no NUMA 
>> node
>> -          or CPU core-specific actions performed by OVN. Setting
>> -          <code>--dummy-numa</code> in ovn-northd does not affect how other 
>> OVS
>> -          processes on the system (such as ovs-vswitchd) count the number of
>> -          NUMA nodes and CPU cores; this setting is local to ovn-northd.
>> +          ovn-northd-ddlog does not support this option.
>>          </p>
>>        </dd>
>>      </dl>
>> @@ -210,6 +183,27 @@
>>          except for the northbound database client.
>>        </p>
>>        </dd>
>> +
>> +
>> +      <dt><code>set-n-threads N</code></dt>
>> +      <dd>
>> +      <p>
>> +        Set the number of threads used for building logical flows.
>> +        When N is within [2-256], parallelization is enabled.
>> +        When N is 1 parallelization is disabled.
>> +        When N is less than 1 or more than 256, an error is returned.
>> +        If ovn-northd fails to start parallelization (e.g. fails to setup
>> +        semaphores, parallelization is disabled and an error is returned.
>> +      </p>
>> +      </dd>
>> +
>> +      <dt><code>get-n-threads</code></dt>
>> +      <dd>
>> +      <p>
>> +        Return the number of threads used for building logical flows.
>> +      </p>
>> +      </dd>
>> +
>>        </dl>
>>      </p>
>>  
>> diff --git a/northd/ovn-northd.c b/northd/ovn-northd.c
>> index 0a0f85010..b7363b16d 100644
>> --- a/northd/ovn-northd.c
>> +++ b/northd/ovn-northd.c
>> @@ -41,6 +41,7 @@
>>  #include "unixctl.h"
>>  #include "util.h"
>>  #include "openvswitch/vlog.h"
>> +#include "lib/ovn-parallel-hmap.h"
>>  
>>  VLOG_DEFINE_THIS_MODULE(ovn_northd);
>>  
>> @@ -50,12 +51,16 @@ static unixctl_cb_func ovn_northd_resume;
>>  static unixctl_cb_func ovn_northd_is_paused;
>>  static unixctl_cb_func ovn_northd_status;
>>  static unixctl_cb_func cluster_state_reset_cmd;
>> +static unixctl_cb_func ovn_northd_set_thread_count_cmd;
>> +static unixctl_cb_func ovn_northd_get_thread_count_cmd;
>>  
>>  struct northd_state {
>>      bool had_lock;
>>      bool paused;
>>  };
>>  
>> +#define OVN_MAX_SUPPORTED_THREADS 256
>> +
>>  static const char *ovnnb_db;
>>  static const char *ovnsb_db;
>>  static const char *unixctl_path;
>> @@ -525,7 +530,7 @@ Options:\n\
>>    --ovnsb-db=DATABASE       connect to ovn-sb database at DATABASE\n\
>>                              (default: %s)\n\
>>    --dry-run                 start in paused state (do not commit db 
>> changes)\n\
>> -  --dummy-numa              override default NUMA node and CPU core 
>> discovery\n\
>> +  --n-threads=N             specify number of threads\n\
>>    --unixctl=SOCKET          override default control socket name\n\
>>    -h, --help                display this help message\n\
>>    -o, --options             list available options\n\
>> @@ -538,14 +543,14 @@ Options:\n\
>>  
>>  static void
>>  parse_options(int argc OVS_UNUSED, char *argv[] OVS_UNUSED,
>> -              bool *paused)
>> +              bool *paused, int *n_threads)
>>  {
>>      enum {
>>          OVN_DAEMON_OPTION_ENUMS,
>>          VLOG_OPTION_ENUMS,
>>          SSL_OPTION_ENUMS,
>>          OPT_DRY_RUN,
>> -        OPT_DUMMY_NUMA,
>> +        OPT_N_THREADS,
>>      };
>>      static const struct option long_options[] = {
>>          {"ovnsb-db", required_argument, NULL, 'd'},
>> @@ -555,7 +560,7 @@ parse_options(int argc OVS_UNUSED, char *argv[] 
>> OVS_UNUSED,
>>          {"options", no_argument, NULL, 'o'},
>>          {"version", no_argument, NULL, 'V'},
>>          {"dry-run", no_argument, NULL, OPT_DRY_RUN},
>> -        {"dummy-numa", required_argument, NULL, OPT_DUMMY_NUMA},
>> +        {"n-threads", required_argument, NULL, OPT_N_THREADS},
>>          OVN_DAEMON_LONG_OPTIONS,
>>          VLOG_LONG_OPTIONS,
>>          STREAM_SSL_LONG_OPTIONS,
>> @@ -611,8 +616,21 @@ parse_options(int argc OVS_UNUSED, char *argv[] 
>> OVS_UNUSED,
>>              ovn_print_version(0, 0);
>>              exit(EXIT_SUCCESS);
>>  
>> -        case OPT_DUMMY_NUMA:
>> -            ovs_numa_set_dummy(optarg);
>> +        case OPT_N_THREADS:
>> +            *n_threads = strtoul(optarg, NULL, 10);
>> +            if (*n_threads < 1) {
>> +                *n_threads = 1;
>> +                VLOG_WARN("Setting n_threads to %d as --n-threads option 
>> was "
>> +                    "set to : [%s]", *n_threads, optarg);
>> +            }
>> +            if (*n_threads > OVN_MAX_SUPPORTED_THREADS) {
>> +                *n_threads = OVN_MAX_SUPPORTED_THREADS;
>> +                VLOG_WARN("Setting n_threads to %d as --n-threads option 
>> was "
>> +                    "set to : [%s]", *n_threads, optarg);
>> +            }
>> +            if (*n_threads != 1) {
>> +                VLOG_INFO("Using %d threads", *n_threads);
>> +            }
>>              break;
>>  
>>          case OPT_DRY_RUN:
>> @@ -668,6 +686,7 @@ main(int argc, char *argv[])
>>      struct unixctl_server *unixctl;
>>      int retval;
>>      bool exiting;
>> +    int n_threads = 1;
>>      struct northd_state state = {
>>          .had_lock = false,
>>          .paused = false
>> @@ -677,7 +696,7 @@ main(int argc, char *argv[])
>>      ovs_cmdl_proctitle_init(argc, argv);
>>      ovn_set_program_name(argv[0]);
>>      service_start(&argc, &argv);
>> -    parse_options(argc, argv, &state.paused);
>> +    parse_options(argc, argv, &state.paused, &n_threads);
>>  
>>      daemonize_start(false);
>>  
>> @@ -704,6 +723,12 @@ main(int argc, char *argv[])
>>      unixctl_command_register("nb-cluster-state-reset", "", 0, 0,
>>                               cluster_state_reset_cmd,
>>                               &reset_ovnnb_idl_min_index);
>> +    unixctl_command_register("parallel-build/set-n-threads", "N_THREADS", 
>> 1, 1,
>> +                             ovn_northd_set_thread_count_cmd,
>> +                             NULL);
>> +    unixctl_command_register("parallel-build/get-n-threads", "", 0, 0,
>> +                             ovn_northd_get_thread_count_cmd,
>> +                             NULL);
>>  
>>      daemonize_complete();
>>  
>> @@ -761,6 +786,8 @@ main(int argc, char *argv[])
>>      unsigned int ovnnb_cond_seqno = UINT_MAX;
>>      unsigned int ovnsb_cond_seqno = UINT_MAX;
>>  
>> +    run_update_worker_pool(n_threads);
>> +
>>      /* Main loop. */
>>      exiting = false;
>>  
>> @@ -1017,3 +1044,30 @@ cluster_state_reset_cmd(struct unixctl_conn *conn, 
>> int argc OVS_UNUSED,
>>      poll_immediate_wake();
>>      unixctl_command_reply(conn, NULL);
>>  }
>> +
>> +static void
>> +ovn_northd_set_thread_count_cmd(struct unixctl_conn *conn, int argc 
>> OVS_UNUSED,
>> +               const char *argv[], void *aux OVS_UNUSED)
>> +{
>> +    int n_threads = atoi(argv[1]);
>> +
>> +    if ((n_threads < 1) || (n_threads > OVN_MAX_SUPPORTED_THREADS)) {
>> +        struct ds s = DS_EMPTY_INITIALIZER;
>> +        ds_put_format(&s, "invalid n_threads: %d\n", n_threads);
>> +        unixctl_command_reply_error(conn, ds_cstr(&s));
>> +        ds_destroy(&s);
>> +    } else {
>> +        run_update_worker_pool(n_threads);
>> +        unixctl_command_reply(conn, NULL);
>> +    }
>> +}
>> +
>> +static void
>> +ovn_northd_get_thread_count_cmd(struct unixctl_conn *conn, int argc 
>> OVS_UNUSED,
>> +               const char *argv[] OVS_UNUSED, void *aux OVS_UNUSED)
>> +{
>> +    struct ds s = DS_EMPTY_INITIALIZER;
>> +    ds_put_format(&s, "%"PRIuSIZE"\n", get_worker_pool_size());
>> +    unixctl_command_reply(conn, ds_cstr(&s));
>> +    ds_destroy(&s);
>> +}
>> diff --git a/tests/ovn-macros.at b/tests/ovn-macros.at
>> index d78315c75..c6f0f6251 100644
>> --- a/tests/ovn-macros.at
>> +++ b/tests/ovn-macros.at
>> @@ -170,8 +170,8 @@ ovn_start_northd() {
>>          ovn-northd-ddlog) northd_args="$northd_args 
>> --ddlog-record=${AZ:+$AZ/}northd$suffix/replay.dat -v" ;;
>>      esac
>>  
>> -    if test X$NORTHD_DUMMY_NUMA = Xyes; then
>> -        northd_args="$northd_args --dummy-numa=\"0,0,0,0\""
>> +    if test X$NORTHD_USE_PARALLELIZATION = Xyes; then
>> +        northd_args="$northd_args --n-threads=4"
>>      fi
>>  
>>      local name=${d_prefix}northd${suffix}
>> @@ -252,10 +252,6 @@ ovn_start () {
>>      else
>>          ovn-nbctl set NB_Global . options:use_logical_dp_groups=false
>>      fi
>> -
>> -    if test X$NORTHD_USE_PARALLELIZATION = Xyes; then
>> -        ovn-nbctl set NB_Global . options:use_parallel_build=true
>> -    fi
>>  }
>>  
>>  # Interconnection networks.
>> @@ -749,16 +745,6 @@ OVS_END_SHELL_HELPERS
>>  
>>  m4_define([OVN_POPULATE_ARP], [AT_CHECK(ovn_populate_arp__, [0], [ignore])])
>>  
>> -# Use --dummy-numa if system has low cores and we want to force 
>> parallelization
>> -m4_define([NORTHD_DUMMY_NUMA],
>> -  [$(if test $(nproc) -lt 4 && test NORTHD_USE_PARALLELIZATION = yes
>> -     then
>> -       echo "yes"
>> -     else
>> -       echo "no"
>> -     fi)
>> -])
>> -
>>  # Defines a versions of a test with all combinations of northd and
>>  # datapath groups.
>>  m4_define([OVN_FOR_EACH_NORTHD],
>> @@ -770,35 +756,14 @@ m4_define([OVN_FOR_EACH_NORTHD],
>>  # Some tests aren't prepared for dp groups to be enabled.
>>  m4_define([OVN_FOR_EACH_NORTHD_WITHOUT_DP_GROUPS],
>>    [m4_foreach([NORTHD_TYPE], [ovn-northd, ovn-northd-ddlog],
>> -     [m4_foreach([NORTHD_USE_DP_GROUPS], [no], [$1
>> -])])])
>> -
>> -# Test parallelization with dp groups enabled and disabled
>> -m4_define([OVN_NORTHD_PARALLELIZATION_DUMMY], [
>> -m4_pushdef([NORTHD_TYPE], [ovn_northd])
>> -m4_pushdef(NORTHD_DUMMY_NUMA, [yes])
>> -[m4_foreach([NORTHD_USE_DP_GROUPS], [yes, no],
>> -    [[NORTHD_USE_PARALLELIZATION], [yes]
>> -])]])
>> -
>> -m4_define([OVN_NORTHD_PARALLELIZATION_NO_DUMMY], [
>> -m4_pushdef([NORTHD_TYPE], [ovn_northd])
>> -m4_pushdef(NORTHD_DUMMY_NUMA, [no])
>> -[m4_foreach([NORTHD_USE_DP_GROUPS], [yes, no],
>> -    [[NORTHD_USE_PARALLELIZATION], [yes]
>> -])]])
>> -
>> -# Use --dummy-numa if system has low cores
>> -m4_define([HOST_HAS_LOW_CORES], [
>> -    if test $(nproc) -lt 4; then
>> -        :
>> -        $1
>> -    else
>> -        :
>> -        $2
>> -    fi
>> -])
>> +     [m4_foreach([NORTHD_USE_DP_GROUPS], [no],
>> +       [m4_foreach([NORTHD_USE_PARALLELIZATION], [yes, no], [$1
>> +])])])])
>> +
>> +# Some tests aren't prepared for ddlog to be enabled.
>> +m4_define([OVN_FOR_EACH_NORTHD_WITHOUT_DDLOG],
>> +  [m4_foreach([NORTHD_TYPE], [ovn-northd],
>> +     [m4_foreach([NORTHD_USE_DP_GROUPS], [yes, no],
>> +       [m4_foreach([NORTHD_USE_PARALLELIZATION], [yes, no], [$1
>> +])])])])
>>  
>> -m4_define([NORTHD_PARALLELIZATION], [
>> -    HOST_HAS_LOW_CORES([OVN_NORTHD_PARALLELIZATION_DUMMY], 
>> [OVN_NORTHD_PARALLELIZATION_NO_DUMMY])
>> -])
>> diff --git a/tests/ovn-northd.at b/tests/ovn-northd.at
>> index 17e37fb77..1a9d5ef76 100644
>> --- a/tests/ovn-northd.at
>> +++ b/tests/ovn-northd.at
>> @@ -7174,3 +7174,112 @@ ct_next(ct_state=new|trk);
>>  
>>  AT_CLEANUP
>>  ])
>> +
>> +OVN_FOR_EACH_NORTHD_WITHOUT_DDLOG([
>> +AT_SETUP([northd-parallelization unixctl])
>> +ovn_start
>> +
>> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 1
>> +OVS_WAIT_FOR_OUTPUT([as northd ovn-appctl -t NORTHD_TYPE 
>> parallel-build/get-n-threads], [0], [1
>> +])
>> +
>> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 4
>> +OVS_WAIT_FOR_OUTPUT([as northd ovn-appctl -t NORTHD_TYPE 
>> parallel-build/get-n-threads], [0], [4
>> +])
>> +
>> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 1
>> +OVS_WAIT_FOR_OUTPUT([as northd ovn-appctl -t NORTHD_TYPE 
>> parallel-build/get-n-threads], [0], [1
>> +])
>> +
>> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 
>> 0], [2], [],
>> +  [invalid n_threads: 0
>> +ovn-appctl: ovn-northd: server returned an error
>> +])
>> +
>> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 
>> -1], [2], [],
>> +  [invalid n_threads: -1
>> +ovn-appctl: ovn-northd: server returned an error
>> +])
>> +
>> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 
>> 300], [2], [],
>> +  [invalid n_threads: 300
>> +ovn-appctl: ovn-northd: server returned an error
>> +])
>> +
>> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE 
>> parallel-build/set-n-threads], [2], [],
>> +  ["parallel-build/set-n-threads" command requires at least 1 arguments
>> +ovn-appctl: ovn-northd: server returned an error
>> +])
>> +
>> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 
>> 1 2], [2], [],
>> +  ["parallel-build/set-n-threads" command takes at most 1 arguments
>> +ovn-appctl: ovn-northd: server returned an error
>> +])
>> +
>> +AT_CLEANUP
>> +])
>> +
>> +OVN_FOR_EACH_NORTHD_WITHOUT_DDLOG([
>> +AT_SETUP([northd-parallelization runtime])
>> +ovn_start
>> +
>> +add_switch_ports() {
>> +    for port in $(seq $1 $2); do
>> +        logical_switch_port=lsp${port}
>> +        check ovn-nbctl lsp-add ls1 $logical_switch_port
>> +        check ovn-nbctl lsp-set-addresses $logical_switch_port dynamic
>> +    done
>> +}
>> +
>> +# Build some rather heavy config and modify number of threads in the middle
>> +check ovn-nbctl ls-add ls1
>> +check ovn-nbctl set Logical_Switch ls1 other_config:subnet=10.1.0.0/16
>> +check ovn-nbctl set Logical_Switch ls1 other_config:exclude_ips=10.1.255.254
>> +
>> +check ovn-nbctl lr-add lr1
>> +check ovn-nbctl lsp-add ls1 lsp0 -- set Logical_Switch_Port lsp0 
>> type=router options:router-port=lrp0 addresses=dynamic
>> +check ovn-nbctl lrp-add lr1 lrp0 "f0:00:00:01:00:01" 10.1.255.254/16
>> +check ovn-nbctl lr-nat-add lr1 snat 10.2.0.1 10.1.0.0/16
>> +add_switch_ports 1 50
>> +
>> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 4
>> +add_switch_ports 51 100
>> +
>> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 8
>> +add_switch_ports 101 150
>> +
>> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 4
>> +add_switch_ports 151 200
>> +
>> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 1
>> +add_switch_ports 201 250
>> +check ovn-nbctl --wait=sb sync
>> +
>> +# Run 3 times: one with parallelization enabled, one with disabled, and one 
>> while changing
>> +# Compare the flows produced by the three runs
>> +# Ignore IP/MAC addresses
>> +ovn-sbctl dump-flows | sed 's/arp.tpa == 10.1.0..../arp.tpa == 10.1.0.??/' 
>> | sed 's/eth.dst == ..:..:..:..:..:../??:??:??:??:??:??/' |sort > flows1
>> +
>> +# Restart with 1 thread
>> +for port in $(seq 1 250); do
>> +    logical_switch_port=lsp${port}
>> +    check ovn-nbctl lsp-del $logical_switch_port
>> +done
>> +add_switch_ports 1 250
>> +check ovn-nbctl --wait=sb sync
>> +ovn-sbctl dump-flows | sed 's/arp.tpa == 10.1.0..../arp.tpa == 10.1.0.??/' 
>> | sed 's/eth.dst == ..:..:..:..:..:../??:??:??:??:??:??/' |sort > flows2
>> +AT_CHECK([diff flows1 flows2])
>> +
>> +# Restart with with 8 threads
>> +check as northd ovn-appctl -t NORTHD_TYPE parallel-build/set-n-threads 8
>> +for port in $(seq 1 250); do
>> +    logical_switch_port=lsp${port}
>> +    check ovn-nbctl lsp-del $logical_switch_port
>> +done
>> +add_switch_ports 1 250
>> +check ovn-nbctl --wait=sb sync
>> +ovn-sbctl dump-flows | sed 's/arp.tpa == 10.1.0..../arp.tpa == 10.1.0.??/' 
>> | sed 's/eth.dst == ..:..:..:..:..:../??:??:??:??:??:??/' |sort > flows3
>> +AT_CHECK([diff flows1 flows3])
>> +
>> +AT_CLEANUP
>> +])
> 

_______________________________________________
dev mailing list
[email protected]
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to