On 21/12/2020 12:06, Numan Siddique wrote:
On Wed, Nov 25, 2020 at 4:32 PM <[email protected]> wrote:
From: Anton Ivanov <[email protected]>

1. Add support for parallel lflow build.
2. Move combined lflow generation to be build in parallel.

Signed-off-by: Anton Ivanov <[email protected]>
Hi Anton,

Sorry for  the delay in reviews. I think now that we have branched and
released v20.12.0. I think these patches can be considered.
Can you please repost the patches rebasing and making the parallel
building disabled by default.
And a configuration option to enable it. Something like
   - ovn-nbctl set NB_Global . enable_parallel_lflow_build=true (or a
better name if you have in mind)

Recently Ilya added datapath groups feature and that is disabled by
default [1]. I think we can take the similar approach to begin with
and flip it later.

Cool.

I am on PTO as I have some days to burn, I will have a look at it the moment I am back at my desk.

A.


[1] - 
https://github.com/ovn-org/ovn/commit/44c323a077af3709a111a6156850fd77f9302f5e

Thanks
Numan

---
  northd/ovn-northd.c | 201 +++++++++++++++++++++++++++++++++++++-------
  1 file changed, 172 insertions(+), 29 deletions(-)

diff --git a/northd/ovn-northd.c b/northd/ovn-northd.c
index 47a177c99..076a6d27f 100644
--- a/northd/ovn-northd.c
+++ b/northd/ovn-northd.c
@@ -49,6 +49,7 @@
  #include "unixctl.h"
  #include "util.h"
  #include "uuid.h"
+#include "fasthmap.h"
  #include "openvswitch/vlog.h"

  VLOG_DEFINE_THIS_MODULE(ovn_northd);
@@ -4152,7 +4153,7 @@ ovn_lflow_add_at(struct hmap *lflow_map, struct 
ovn_datapath *od,
      ovn_lflow_init(lflow, od, stage, priority,
                     xstrdup(match), xstrdup(actions),
                     ovn_lflow_hint(stage_hint), where);
-    hmap_insert(lflow_map, &lflow->hmap_node, ovn_lflow_hash(lflow));
+    hmap_insert_fast(lflow_map, &lflow->hmap_node, ovn_lflow_hash(lflow));
  }

  /* Adds a row with the specified contents to the Logical_Flow table. */
@@ -11100,6 +11101,8 @@ struct lswitch_flow_build_info {

  /* Helper function to combine all lflow generation which is iterated by
   * datapath.
+ * Invoked by parallel build over a "chunk" of work or by single threaded
+ * build over a chunk which is initialized to contain "all" work.
   */

  static void
@@ -11134,6 +11137,8 @@ build_lswitch_and_lrouter_iterate_by_od(struct 
ovn_datapath *od,
  }

  /* Helper function to combine all lflow generation which is iterated by port.
+ * Invoked by parallel build over a "chunk" of work or by single threaded
+ * build over a chunk which is initialized to contain "all" work.
   */

  static void
@@ -11161,6 +11166,88 @@ build_lswitch_and_lrouter_iterate_by_op(struct 
ovn_port *op,
                                              &lsi->match, &lsi->actions);
  }

+struct lflows_thread_pool {
+    struct worker_pool *pool;
+};
+
+static void *build_lflows_thread(void *arg) {
+    struct worker_control *control = (struct worker_control *) arg;
+    struct lflows_thread_pool *workload;
+    struct lswitch_flow_build_info *lsi;
+
+    struct ovn_datapath *od;
+    struct ovn_port *op;
+    int bnum;
+
+    while (!cease_fire()) {
+        sem_wait(&control->fire);
+        workload = (struct lflows_thread_pool *) control->workload;
+        lsi = (struct lswitch_flow_build_info *) control->data;
+        if (lsi && workload) {
+            /* Iterate over bucket ThreadID, ThreadID+size, ... */
+            for (bnum = control->id;
+                    bnum <= lsi->datapaths->mask;
+                    bnum += workload->pool->size)
+            {
+                HMAP_FOR_EACH_IN_PARALLEL (
+                        od, key_node, bnum, lsi->datapaths) {
+                    if (cease_fire()) {
+                        return NULL;
+                    }
+                    build_lswitch_and_lrouter_iterate_by_od(od, lsi);
+                }
+            }
+            for (bnum = control->id;
+                    bnum <= lsi->ports->mask;
+                    bnum += workload->pool->size)
+            {
+                HMAP_FOR_EACH_IN_PARALLEL (
+                        op, key_node, bnum, lsi->ports) {
+                    if (cease_fire()) {
+                        return NULL;
+                    }
+                    build_lswitch_and_lrouter_iterate_by_op(op, lsi);
+                }
+            }
+            atomic_store_relaxed(&control->finished, true);
+            atomic_thread_fence(memory_order_release);
+        }
+        sem_post(control->done);
+     }
+    return NULL;
+}
+
+static bool pool_init_done = false;
+static struct lflows_thread_pool *build_lflows_pool = NULL;
+
+static void init_lflows_thread_pool(void)
+{
+    int index;
+
+    if (!pool_init_done) {
+        struct worker_pool *pool = add_worker_pool(build_lflows_thread);
+        pool_init_done = true;
+        if (pool) {
+            build_lflows_pool =
+                xmalloc(sizeof(struct lflows_thread_pool));
+            build_lflows_pool->pool = pool;
+            for (index = 0; index < build_lflows_pool->pool->size; index++) {
+                build_lflows_pool->pool->controls[index].workload =
+                    build_lflows_pool;
+            }
+        }
+    }
+}
+
+/* TODO: replace hard cutoffs by configurable via commands. These are
+ * temporary defines to determine single-thread to multi-thread processing
+ * cutoff.
+ * Setting to 1 forces "all parallel" lflow build.
+ */
+
+#define OD_CUTOFF 1
+#define OP_CUTOFF 1
+
  static void
  build_lswitch_and_lrouter_flows(struct hmap *datapaths, struct hmap *ports,
                                  struct hmap *port_groups, struct hmap *lflows,
@@ -11168,38 +11255,87 @@ build_lswitch_and_lrouter_flows(struct hmap 
*datapaths, struct hmap *ports,
                                  struct hmap *igmp_groups,
                                  struct shash *meter_groups, struct hmap *lbs)
  {
-    struct ovn_datapath *od;
-    struct ovn_port *op;
-
      char *svc_check_match = xasprintf("eth.dst == %s", svc_monitor_mac);

-    struct lswitch_flow_build_info lsi = {
-        .datapaths = datapaths,
-        .ports = ports,
-        .port_groups = port_groups,
-        .lflows = lflows,
-        .mcgroups = mcgroups,
-        .igmp_groups = igmp_groups,
-        .meter_groups = meter_groups,
-        .lbs = lbs,
-        .svc_check_match = svc_check_match,
-        .match = DS_EMPTY_INITIALIZER,
-        .actions = DS_EMPTY_INITIALIZER,
-    };
+    init_lflows_thread_pool();

-    /* Combined build - all lflow generation from lswitch and lrouter
-     * will move here and will be reogranized by iterator type.
-     */
-    HMAP_FOR_EACH (od, key_node, datapaths) {
-        build_lswitch_and_lrouter_iterate_by_od(od, &lsi);
-    }
-    HMAP_FOR_EACH (op, key_node, ports) {
-        build_lswitch_and_lrouter_iterate_by_op(op, &lsi);
+    if (build_lflows_pool &&
+        (hmap_count(datapaths) > OD_CUTOFF || hmap_count(ports) > OP_CUTOFF)) {
+
+        struct hmap *lflow_segs;
+        struct lswitch_flow_build_info *lsiv;
+        int index;
+
+        lsiv = xmalloc(
+            sizeof(struct lswitch_flow_build_info) *
+                build_lflows_pool->pool->size);
+        lflow_segs = xmalloc(
+            sizeof(struct hmap) * build_lflows_pool->pool->size);
+
+        /* Set up "work chunks" for each thread to work on. */
+
+        for (index = 0; index < build_lflows_pool->pool->size; index++) {
+            fast_hmap_init(&lflow_segs[index], lflows->mask);
+
+            lsiv[index].datapaths = datapaths;
+            lsiv[index].ports = ports;
+            lsiv[index].port_groups = port_groups;
+            lsiv[index].lflows = &lflow_segs[index];
+            lsiv[index].mcgroups = mcgroups;
+            lsiv[index].igmp_groups = igmp_groups;
+            lsiv[index].meter_groups = meter_groups;
+            lsiv[index].lbs = lbs;
+            lsiv[index].svc_check_match = svc_check_match;
+            ds_init(&lsiv[index].match);
+            ds_init(&lsiv[index].actions);
+
+            build_lflows_pool->pool->controls[index].data = &lsiv[index];
+        }
+
+        /* Run thread pool. */
+
+        run_pool_hash(build_lflows_pool->pool, lflows, lflow_segs);
+
+        for (index = 0; index < build_lflows_pool->pool->size; index++) {
+            ds_destroy(&lsiv[index].match);
+            ds_destroy(&lsiv[index].actions);
+        }
+
+        free(lflow_segs);
+        free(lsiv);
+    } else {
+        struct ovn_datapath *od;
+        struct ovn_port *op;
+
+        struct lswitch_flow_build_info lsi = {
+            .datapaths = datapaths,
+            .ports = ports,
+            .port_groups = port_groups,
+            .lflows = lflows,
+            .mcgroups = mcgroups,
+            .igmp_groups = igmp_groups,
+            .meter_groups = meter_groups,
+            .lbs = lbs,
+            .svc_check_match = svc_check_match,
+            .match = DS_EMPTY_INITIALIZER,
+            .actions = DS_EMPTY_INITIALIZER,
+        };
+
+
+        /* Converged build - all lflow generation from lswitch and lrouter
+         * will move here and will be reogranized by iterator type.
+         */
+        HMAP_FOR_EACH (od, key_node, datapaths) {
+            build_lswitch_and_lrouter_iterate_by_od(od, &lsi);
+        }
+        HMAP_FOR_EACH (op, key_node, ports) {
+            build_lswitch_and_lrouter_iterate_by_op(op, &lsi);
+        }
+        ds_destroy(&lsi.match);
+        ds_destroy(&lsi.actions);
      }
-    free(svc_check_match);

-    ds_destroy(&lsi.match);
-    ds_destroy(&lsi.actions);
+    free(svc_check_match);

      /* Legacy lswitch build - to be migrated. */
      build_lswitch_flows(datapaths, ports, lflows, mcgroups,
@@ -11209,6 +11345,7 @@ build_lswitch_and_lrouter_flows(struct hmap *datapaths, 
struct hmap *ports,
      build_lrouter_flows(datapaths, ports, lflows, meter_groups, lbs);
  }

+static ssize_t max_seen_lflow_size = 128;

  /* Updates the Logical_Flow and Multicast_Group tables in the OVN_SB database,
   * constructing their contents based on the OVN_NB database. */
@@ -11219,12 +11356,18 @@ build_lflows(struct northd_context *ctx, struct hmap 
*datapaths,
               struct shash *meter_groups,
               struct hmap *lbs)
  {
-    struct hmap lflows = HMAP_INITIALIZER(&lflows);
+    struct hmap lflows;
+
+    fast_hmap_size_for(&lflows, max_seen_lflow_size);

      build_lswitch_and_lrouter_flows(datapaths, ports,
                                      port_groups, &lflows, mcgroups,
                                      igmp_groups, meter_groups, lbs);

+    if (hmap_count(&lflows) > max_seen_lflow_size) {
+        max_seen_lflow_size = hmap_count(&lflows);
+    }
+
      /* Push changes to the Logical_Flow table to database. */
      const struct sbrec_logical_flow *sbflow, *next_sbflow;
      SBREC_LOGICAL_FLOW_FOR_EACH_SAFE (sbflow, next_sbflow, ctx->ovnsb_idl) {
--
2.20.1


--
Anton R. Ivanov
Cambridgegreys Limited. Registered in England. Company Number 10273661
https://www.cambridgegreys.com/

_______________________________________________
dev mailing list
[email protected]
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to