From: Anton Ivanov <[email protected]>

libs: add configuration support to parallel-hmap.[c,h]
northd: add support for configuring parallelization to northd

Signed-off-by: Anton Ivanov <[email protected]>
---
 lib/ovn-parallel-hmap.c | 185 ++++++++++++++++++++++++++++++++++++++--
 lib/ovn-parallel-hmap.h |  63 +++++++++++++-
 northd/northd.c         |  30 +++----
 northd/northd.h         |   2 -
 northd/ovn-northd.c     |   5 +-
 tests/ovn-macros.at     |  16 +++-
 6 files changed, 263 insertions(+), 38 deletions(-)

diff --git a/lib/ovn-parallel-hmap.c b/lib/ovn-parallel-hmap.c
index 1b3883441..6a6488a17 100644
--- a/lib/ovn-parallel-hmap.c
+++ b/lib/ovn-parallel-hmap.c
@@ -33,6 +33,7 @@
 #include "ovs-thread.h"
 #include "ovs-numa.h"
 #include "random.h"
+#include "unixctl.h"
 
 VLOG_DEFINE_THIS_MODULE(ovn_parallel_hmap);
 
@@ -46,6 +47,7 @@ VLOG_DEFINE_THIS_MODULE(ovn_parallel_hmap);
  */
 static atomic_bool initial_pool_setup = ATOMIC_VAR_INIT(false);
 static bool can_parallelize = false;
+static bool should_parallelize = false;
 
 /* This is set only in the process of exit and the set is
  * accompanied by a fence. It does not need to be atomic or be
@@ -83,7 +85,7 @@ static void *standard_helper_thread(void *arg);
 
 struct worker_pool *ovn_add_standard_pool(int size)
 {
-    return add_worker_pool(standard_helper_thread, size);
+    return add_worker_pool(standard_helper_thread, size, "default", true);
 }
 
 bool
@@ -92,6 +94,19 @@ ovn_stop_parallel_processing(struct worker_pool *pool)
     return pool->workers_must_exit;
 }
 
+bool
+ovn_set_parallel_processing(bool enable)
+{
+    should_parallelize = enable;
+    return can_parallelize;
+}
+
+bool
+ovn_get_parallel_processing(void)
+{
+    return can_parallelize && should_parallelize;
+}
+
 bool
 ovn_can_parallelize_hashes(bool force_parallel)
 {
@@ -117,6 +132,7 @@ destroy_pool(struct worker_pool *pool) {
     sem_close(pool->done);
     sprintf(sem_name, MAIN_SEM_NAME, sembase, pool);
     sem_unlink(sem_name);
+    free(pool->name);
     free(pool);
 }
 
@@ -127,6 +143,10 @@ ovn_resize_pool(struct worker_pool *pool, int size)
 
     ovs_assert(pool != NULL);
 
+    if (!pool->is_mutable) {
+        return false;
+    }
+
     if (!size) {
         size = pool_size;
     }
@@ -166,7 +186,8 @@ cleanup:
 
 
 struct worker_pool *
-ovn_add_worker_pool(void *(*start)(void *), int size)
+ovn_add_worker_pool(void *(*start)(void *), int size, char *name,
+                    bool is_mutable)
 {
     struct worker_pool *new_pool = NULL;
     bool test = false;
@@ -194,6 +215,8 @@ ovn_add_worker_pool(void *(*start)(void *), int size)
         new_pool = xmalloc(sizeof(struct worker_pool));
         new_pool->size = size;
         new_pool->start = start;
+        new_pool->is_mutable = is_mutable;
+        new_pool->name = xstrdup(name);
         sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
         new_pool->done = sem_open(sem_name, O_CREAT, S_IRWXU, 0);
         if (new_pool->done == SEM_FAILED) {
@@ -226,6 +249,7 @@ cleanup:
         sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool);
         sem_unlink(sem_name);
     }
+    free(new_pool->name);
     ovs_mutex_unlock(&init_mutex);
     return NULL;
 }
@@ -342,8 +366,7 @@ ovn_complete_pool_callback(struct worker_pool *pool,
         }
     } while (completed < pool->size);
 }
-
-/* Complete a thread pool which uses a callback function to process results
+/* Run a thread pool which uses a callback function to process results
  */
 void
 ovn_run_pool_callback(struct worker_pool *pool,
@@ -352,8 +375,8 @@ ovn_run_pool_callback(struct worker_pool *pool,
                                           void *fin_result,
                                           void *result_frags, int index))
 {
-    ovn_start_pool(pool);
-    ovn_complete_pool_callback(pool, fin_result, result_frags, helper_func);
+    start_pool(pool);
+    complete_pool_callback(pool, fin_result, result_frags, helper_func);
 }
 
 /* Run a thread pool - basic, does not do results processing.
@@ -401,6 +424,28 @@ ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc)
     inc->n = 0;
 }
 
+/* Run a thread pool which gathers results in an array
+ * of hashes. Merge results.
+ */
+void
+ovn_complete_pool_hash(struct worker_pool *pool,
+                  struct hmap *result,
+                  struct hmap *result_frags)
+{
+    complete_pool_callback(pool, result, result_frags, merge_hash_results);
+}
+
+/* Run a thread pool which gathers results in an array of lists.
+ * Merge results.
+ */
+void
+ovn_complete_pool_list(struct worker_pool *pool,
+                  struct ovs_list *result,
+                  struct ovs_list *result_frags)
+{
+    complete_pool_callback(pool, result, result_frags, merge_list_results);
+}
+
 /* Run a thread pool which gathers results in an array
  * of hashes. Merge results.
  */
@@ -514,7 +559,7 @@ static struct worker_control *alloc_controls(int size)
 
 static void
 worker_pool_hook(void *aux OVS_UNUSED) {
-    static struct worker_pool *pool;
+    struct worker_pool *pool;
     char sem_name[256];
 
     /* All workers must honour the must_exit flag and check for it regularly.
@@ -628,4 +673,130 @@ standard_helper_thread(void *arg)
 }
 
 
+static void
+ovn_thread_pool_resize_pool(struct unixctl_conn *conn, int argc OVS_UNUSED,
+                            const char *argv[], void *unused OVS_UNUSED)
+{
+
+    struct worker_pool *pool;
+    int value;
+
+    if (!str_to_int(argv[2], 10, &value)) {
+        unixctl_command_reply_error(conn, "invalid argument");
+        return;
+    }
+
+    if (value > 0) {
+        pool_size = value;
+    }
+    LIST_FOR_EACH (pool, list_node, &worker_pools) {
+        if (strcmp(pool->name, argv[1]) == 0) {
+            resize_pool(pool, value);
+            unixctl_command_reply_error(conn, NULL);
+        }
+    }
+    unixctl_command_reply_error(conn, "pool not found");
+}
+
+static void
+ovn_thread_pool_list_pools(struct unixctl_conn *conn, int argc OVS_UNUSED,
+                           const char *argv[] OVS_UNUSED,
+                           void *unused OVS_UNUSED)
+{
+
+    char *reply = NULL;
+    char *new_reply;
+    char buf[256];
+    struct worker_pool *pool;
+
+    LIST_FOR_EACH (pool, list_node, &worker_pools) {
+        snprintf(buf, 255, "%s : %d\n", pool->name, pool->size);
+        if (reply) {
+            new_reply = xmalloc(strlen(reply) + strlen(buf) + 1);
+            ovs_strlcpy(new_reply, reply, strlen(reply));
+            strcat(new_reply, buf);
+            free(reply);
+        }
+        reply = new_reply;
+    }
+    unixctl_command_reply(conn, reply);
+}
+
+static void
+ovn_thread_pool_set_parallel_on(struct unixctl_conn *conn, int argc OVS_UNUSED,
+                                const char *argv[], void *unused OVS_UNUSED)
+{
+    int value;
+    bool result;
+    if (!str_to_int(argv[1], 10, &value)) {
+        unixctl_command_reply_error(conn, "invalid argument");
+        return;
+    }
+
+    if (!ovn_can_parallelize_hashes(true)) {
+        unixctl_command_reply_error(conn, "cannot enable parallel processing");
+        return;
+    }
+
+    if (value > 0) {
+        /* Change default pool size */
+        ovs_mutex_lock(&init_mutex);
+        pool_size = value;
+        ovs_mutex_unlock(&init_mutex);
+    }
+
+    result = ovn_set_parallel_processing(true);
+    unixctl_command_reply(conn, result ? "enabled" : "disabled");
+}
+
+static void
+ovn_thread_pool_set_parallel_off(struct unixctl_conn *conn,
+                                 int argc OVS_UNUSED,
+                                 const char *argv[] OVS_UNUSED,
+                                 void *unused OVS_UNUSED)
+{
+    ovn_set_parallel_processing(false);
+    unixctl_command_reply(conn, NULL);
+}
+
+static void
+ovn_thread_pool_parallel_status(struct unixctl_conn *conn, int argc OVS_UNUSED,
+                                const char *argv[] OVS_UNUSED,
+                                void *unused OVS_UNUSED)
+{
+    char status[256];
+
+    sprintf(status, "%s, default pool size %d",
+            get_parallel_processing() ? "active" : "inactive",
+            pool_size);
+
+    unixctl_command_reply(conn, status);
+}
+
+void
+ovn_parallel_thread_pools_init(void)
+{
+    bool test = false;
+
+    if (atomic_compare_exchange_strong(
+            &initial_pool_setup,
+            &test,
+            true)) {
+        ovs_mutex_lock(&init_mutex);
+        setup_worker_pools(false);
+        ovs_mutex_unlock(&init_mutex);
+    }
+
+    unixctl_command_register("thread-pool/set-parallel-on", "N", 1, 1,
+                             ovn_thread_pool_set_parallel_on, NULL);
+    unixctl_command_register("thread-pool/set-parallel-off", "", 0, 0,
+                             ovn_thread_pool_set_parallel_off, NULL);
+    unixctl_command_register("thread-pool/status", "", 0, 0,
+                             ovn_thread_pool_parallel_status, NULL);
+    unixctl_command_register("thread-pool/list", "", 0, 0,
+                             ovn_thread_pool_list_pools, NULL);
+    unixctl_command_register("thread-pool/reload-pool", "Pool Threads", 2, 2,
+                             ovn_thread_pool_resize_pool, NULL);
+}
+
 #endif
diff --git a/lib/ovn-parallel-hmap.h b/lib/ovn-parallel-hmap.h
index 4cdd5c4e5..9c0a69cb1 100644
--- a/lib/ovn-parallel-hmap.h
+++ b/lib/ovn-parallel-hmap.h
@@ -33,6 +33,7 @@ extern "C" {
 #include "openvswitch/hmap.h"
 #include "openvswitch/thread.h"
 #include "ovs-atomic.h"
+#include "unixctl.h"
 
 /* Process this include only if OVS does not supply parallel definitions
  */
@@ -93,6 +94,8 @@ struct worker_pool {
     sem_t *done; /* Work completion semaphorew. */
     void *(*start)(void *); /* Work function. */
     bool workers_must_exit; /* Pool to be destroyed flag. */
+    char *name; /* Name to be used in cli commands */
+    bool is_mutable; /* Can the pool be reloaded with different params */
 };
 
 
@@ -109,7 +112,9 @@ struct helper_data {
  * size uses system defaults.
  */
 
-struct worker_pool *ovn_add_worker_pool(void *(*start)(void *), int size);
+struct worker_pool *ovn_add_worker_pool(void *(*start)(void *),
+                                        int size, char *name,
+                                        bool is_mutable);
 
 struct worker_pool *ovn_add_standard_pool(int size);
 
@@ -188,6 +193,35 @@ void ovn_complete_pool_callback(struct worker_pool *pool, 
void *fin_result,
                            void *fin_result, void *result_frags, int index));
 
 
+/* Start a pool. Do not wait for any results. They will be collected
+ * using the _complete_ functions.
+ */
+void ovn_start_pool(struct worker_pool *pool);
+
+/* Complete a pool run started using start_pool();
+ * Merge results from hash frags into a final hash result.
+ * The hash frags must be pre-sized to the same size.
+ */
+
+void ovn_complete_pool_hash(struct worker_pool *pool,
+                       struct hmap *result, struct hmap *result_frags);
+
+/* Complete a pool run started using start_pool();
+ * Merge results from list frags into a final list result.
+ */
+
+void ovn_complete_pool_list(struct worker_pool *pool,
+                       struct ovs_list *result, struct ovs_list *result_frags);
+
+/* Complete a pool run started using start_pool();
+ * Call a callback function to perform processing of results.
+ */
+
+void ovn_complete_pool_callback(struct worker_pool *pool, void *fin_result,
+                           void *result_frags,
+                           void (*helper_func)(struct worker_pool *pool,
+                           void *fin_result, void *result_frags, int index));
+
 /* Returns the first node in 'hmap' in the bucket in which the given 'hash'
  * would land, or a null pointer if that bucket is empty. */
 
@@ -298,10 +332,16 @@ static inline void init_hash_row_locks(struct 
hashrow_locks *hrl)
 
 bool ovn_can_parallelize_hashes(bool force_parallel);
 
+bool ovn_set_parallel_processing(bool enable);
+
+bool ovn_get_parallel_processing(void);
+
 void ovn_destroy_pool(struct worker_pool *pool);
 
 bool ovn_resize_pool(struct worker_pool *pool, int size);
 
+void ovn_parallel_thread_pools_init(void);
+
 /* Use the OVN library functions for stuff which OVS has not defined
  * If OVS has defined these, they will still compile using the OVN
  * local names, but will be dropped by the linker in favour of the OVS
@@ -312,9 +352,16 @@ bool ovn_resize_pool(struct worker_pool *pool, int size);
 
 #define can_parallelize_hashes(force) ovn_can_parallelize_hashes(force)
 
+#define set_parallel_processing(enable) ovn_set_parallel_processing(enable)
+
+#define get_parallel_processing() ovn_get_parallel_processing()
+
+#define enable_parallel_processing() ovn_enable_parallel_processing()
+
 #define stop_parallel_processing(pool) ovn_stop_parallel_processing(pool)
 
-#define add_worker_pool(start, size) ovn_add_worker_pool(start, size)
+#define add_worker_pool(start, size, name, is_mutable) \
+        ovn_add_worker_pool(start, size, name, is_mutable)
 
 #define add_standard_pool(start, size) ovn_add_standard_pool(start, size)
 
@@ -339,6 +386,17 @@ bool ovn_resize_pool(struct worker_pool *pool, int size);
 
 #define start_pool(pool) ovn_start_pool(pool)
 
+#define complete_pool_hash(pool, result, result_frags) \
+    ovn_complete_pool_hash(pool, result, result_frags)
+
+#define complete_pool_list(pool, result, result_frags) \
+    ovn_complete_pool_list(pool, result, result_frags)
+
+#define complete_pool_callback(pool, fin_result, result_frags, helper_func) \
+    ovn_complete_pool_callback(pool, fin_result, result_frags, helper_func)
+
+#define start_pool(pool) ovn_start_pool(pool)
+
 #define complete_pool_hash(pool, result, result_frags) \
     ovn_complete_pool_hash(pool, result, result_frags)
 
@@ -352,6 +410,7 @@ bool ovn_resize_pool(struct worker_pool *pool, int size);
 
 #define resize_pool(pool, size) ovn_resize_pool(pool, size)
 
+#define parallel_thread_pools_init() ovn_parallel_thread_pools_init()
 
 #ifdef __clang__
 #pragma clang diagnostic pop
diff --git a/northd/northd.c b/northd/northd.c
index 7724d27e9..d6401fe62 100644
--- a/northd/northd.c
+++ b/northd/northd.c
@@ -4279,7 +4279,6 @@ ovn_lflow_equal(const struct ovn_lflow *a, const struct 
ovn_datapath *od,
 /* If this option is 'true' northd will combine logical flows that differ by
  * logical datapath only by creating a datapath group. */
 static bool use_logical_dp_groups = false;
-static bool use_parallel_build = true;
 
 static void
 ovn_lflow_init(struct ovn_lflow *lflow, struct ovn_datapath *od,
@@ -4298,7 +4297,7 @@ ovn_lflow_init(struct ovn_lflow *lflow, struct 
ovn_datapath *od,
     lflow->ctrl_meter = ctrl_meter;
     lflow->dpg = NULL;
     lflow->where = where;
-    if (use_parallel_build && use_logical_dp_groups) {
+    if (get_parallel_processing() && use_logical_dp_groups) {
         ovs_mutex_init(&lflow->odg_lock);
     }
 }
@@ -4370,7 +4369,7 @@ do_ovn_lflow_add(struct hmap *lflow_map, struct 
ovn_datapath *od,
                    nullable_xstrdup(ctrl_meter),
                    ovn_lflow_hint(stage_hint), where);
     hmapx_add(&lflow->od_group, od);
-    if (!use_parallel_build) {
+    if (!get_parallel_processing()) {
         hmap_insert(lflow_map, &lflow->hmap_node, hash);
     } else {
         hmap_insert_fast(lflow_map, &lflow->hmap_node, hash);
@@ -4441,7 +4440,7 @@ ovn_lflow_add_at_with_hash(struct hmap *lflow_map, struct 
ovn_datapath *od,
     struct ovn_lflow *lflow;
 
     ovs_assert(ovn_stage_to_datapath_type(stage) == ovn_datapath_get_type(od));
-    if (use_logical_dp_groups && use_parallel_build) {
+    if (use_logical_dp_groups && get_parallel_processing()) {
         lflow = do_ovn_lflow_add_pd(lflow_map, od, hash, stage, priority,
                                     match, actions, io_port, stage_hint, where,
                                     ctrl_meter);
@@ -4479,7 +4478,7 @@ ovn_dp_group_add_with_reference(struct ovn_lflow 
*lflow_ref,
         return false;
     }
 
-    if (use_parallel_build && use_logical_dp_groups) {
+    if (get_parallel_processing() && use_logical_dp_groups) {
         ovs_mutex_lock(&lflow_ref->odg_lock);
         hmapx_add(&lflow_ref->od_group, od);
         ovs_mutex_unlock(&lflow_ref->odg_lock);
@@ -12962,7 +12961,8 @@ init_lflows_thread_pool(void)
 {
 
     if (!pool_init_done) {
-        build_lflows_pool = add_worker_pool(build_lflows_thread, 0);
+        build_lflows_pool = add_worker_pool(build_lflows_thread, 0,
+                                            "lflows", true);
         pool_init_done = true;
     }
 }
@@ -12978,14 +12978,11 @@ build_lswitch_and_lrouter_flows(struct hmap 
*datapaths, struct hmap *ports,
 
     char *svc_check_match = xasprintf("eth.dst == %s", svc_monitor_mac);
 
-    if (use_parallel_build) {
+    if (get_parallel_processing()) {
         init_lflows_thread_pool();
-        if (!can_parallelize_hashes(false)) {
-            use_parallel_build = false;
-        }
     }
 
-    if (use_parallel_build) {
+    if (get_parallel_processing()) {
         struct hmap *lflow_segs;
         struct lswitch_flow_build_info *lsiv;
         int index;
@@ -13185,19 +13182,19 @@ build_lflows(struct northd_context *ctx, struct hmap 
*datapaths,
     if (reset_parallel) {
         /* Parallel build was disabled before, we need to
          * re-enable it. */
-        use_parallel_build = true;
+        set_parallel_processing(true);
         reset_parallel = false;
     }
 
     fast_hmap_size_for(&lflows, max_seen_lflow_size);
 
-    if (use_parallel_build && use_logical_dp_groups &&
+    if (get_parallel_processing() && use_logical_dp_groups &&
         needs_parallel_init) {
         ovs_rwlock_init(&flowtable_lock);
         needs_parallel_init = false;
         /* Disable parallel build on first run with dp_groups
          * to determine the correct sizing of hashes. */
-        use_parallel_build = false;
+        set_parallel_processing(false);
         reset_parallel = true;
     }
     build_lswitch_and_lrouter_flows(datapaths, ports,
@@ -14279,10 +14276,6 @@ ovnnb_db_run(struct northd_context *ctx,
     ovsdb_idl_set_probe_interval(ctx->ovnnb_idl, northd_probe_interval_nb);
     ovsdb_idl_set_probe_interval(ctx->ovnsb_idl, northd_probe_interval_sb);
 
-    use_parallel_build =
-        (smap_get_bool(&nb->options, "use_parallel_build", false) &&
-         can_parallelize_hashes(false));
-
     use_logical_dp_groups = smap_get_bool(&nb->options,
                                           "use_logical_dp_groups", true);
     use_ct_inv_match = smap_get_bool(&nb->options,
@@ -14652,7 +14645,6 @@ ovn_db_run(struct northd_context *ctx,
     ovs_list_init(&lr_list);
     hmap_init(&datapaths);
     hmap_init(&ports);
-    use_parallel_build = ctx->use_parallel_build;
 
     int64_t start_time = time_wall_msec();
     stopwatch_start(OVNNB_DB_RUN_STOPWATCH_NAME, time_msec());
diff --git a/northd/northd.h b/northd/northd.h
index ffa2bbb4e..5cbd183ef 100644
--- a/northd/northd.h
+++ b/northd/northd.h
@@ -27,8 +27,6 @@ struct northd_context {
     struct ovsdb_idl_index *sbrec_ha_chassis_grp_by_name;
     struct ovsdb_idl_index *sbrec_mcast_group_by_name_dp;
     struct ovsdb_idl_index *sbrec_ip_mcast_by_dp;
-
-    bool use_parallel_build;
 };
 
 void
diff --git a/northd/ovn-northd.c b/northd/ovn-northd.c
index 42c0ad644..fb1268661 100644
--- a/northd/ovn-northd.c
+++ b/northd/ovn-northd.c
@@ -65,8 +65,6 @@ static const char *ssl_private_key_file;
 static const char *ssl_certificate_file;
 static const char *ssl_ca_cert_file;
 
-static bool use_parallel_build = true;
-
 static const char *rbac_chassis_auth[] =
     {"name"};
 static const char *rbac_chassis_update[] =
@@ -622,7 +620,7 @@ main(int argc, char *argv[])
 
     daemonize_complete();
 
-    use_parallel_build = can_parallelize_hashes(false);
+    ovn_parallel_thread_pools_init();
 
     /* We want to detect (almost) all changes to the ovn-nb db. */
     struct ovsdb_idl_loop ovnnb_idl_loop = OVSDB_IDL_LOOP_INITIALIZER(
@@ -941,7 +939,6 @@ main(int argc, char *argv[])
                 .sbrec_ha_chassis_grp_by_name = sbrec_ha_chassis_grp_by_name,
                 .sbrec_mcast_group_by_name_dp = sbrec_mcast_group_by_name_dp,
                 .sbrec_ip_mcast_by_dp = sbrec_ip_mcast_by_dp,
-                .use_parallel_build = use_parallel_build,
             };
 
             if (!state.had_lock && ovsdb_idl_has_lock(ovnsb_idl_loop.idl)) {
diff --git a/tests/ovn-macros.at b/tests/ovn-macros.at
index f06f2e68e..958ce18b0 100644
--- a/tests/ovn-macros.at
+++ b/tests/ovn-macros.at
@@ -179,6 +179,18 @@ ovn_start_northd() {
     test -d "$ovs_base/$name" || mkdir "$ovs_base/$name"
     as $name start_daemon $NORTHD_TYPE $northd_args -vjsonrpc \
                --ovnnb-db=$OVN_NB_DB --ovnsb-db=$OVN_SB_DB
+    if test -z "$USE_PARALLEL_THREADS" ; then
+        USE_PARALLEL_THREADS=0
+    fi
+
+    if test X$NORTHD_USE_PARALLELIZATION = Xyes; then
+        case ${NORTHD_TYPE:=ovn-northd} in
+            ovn-northd) ovs-appctl --timeout=10 --target 
northd$suffix/ovn-northd \
+                            thread-pool/set-parallel-on $USE_PARALLEL_THREADS
+            ;;
+        esac
+    fi
+
 }
 
 # ovn_start [--backup-northd=none|paused] [AZ]
@@ -252,10 +264,6 @@ ovn_start () {
     else
         ovn-nbctl set NB_Global . options:use_logical_dp_groups=false
     fi
-
-    if test X$NORTHD_USE_PARALLELIZATION = Xyes; then
-        ovn-nbctl set NB_Global . options:use_parallel_build=true
-    fi
 }
 
 # Interconnection networks.
-- 
2.20.1

_______________________________________________
dev mailing list
[email protected]
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to