The goal of this is to allow for the syncing node to be able to take
logical flow tables from multiple sources and sync all of them with the
southbound database.

This essentially moves the lflow_table_sync_to_sb() function into a
separate incremental node from en-lflow. This way, en-lflow generates a
logical flow table, and en-lflow-sync syncs it with the southbound
database.

This commit slightly changes the logic of lflow_table_sync_to_sb() so
that it does not directly delete southbound logical flows. Instead, this
happens at the end of the syncing process. This gives the basis to allow
for multiple logical flow tables to be synced without prematurely
deleting southbound logical flows.

Signed-off-by: Mark Michelson <mmich...@redhat.com>
---
 northd/en-lflow.c         | 125 ++++++++++++++++++++++++++++++++++++--
 northd/en-lflow.h         |  18 ++++++
 northd/en-northd-output.c |  14 ++---
 northd/en-northd-output.h |   5 +-
 northd/inc-proc-northd.c  |  20 +++++-
 northd/lflow-mgr.c        |  25 ++++++--
 northd/lflow-mgr.h        |  31 +++++++++-
 northd/northd.c           |  12 +---
 northd/northd.h           |   3 +-
 9 files changed, 217 insertions(+), 36 deletions(-)

diff --git a/northd/en-lflow.c b/northd/en-lflow.c
index 898869287..4992f7ea5 100644
--- a/northd/en-lflow.c
+++ b/northd/en-lflow.c
@@ -107,8 +107,6 @@ lflow_get_input_data(struct engine_node *node,
 enum engine_node_state
 en_lflow_run(struct engine_node *node, void *data)
 {
-    const struct engine_context *eng_ctx = engine_get_context();
-
     struct lflow_input lflow_input;
     lflow_get_input_data(node, &lflow_input);
 
@@ -116,11 +114,11 @@ en_lflow_run(struct engine_node *node, void *data)
 
     struct lflow_data *lflow_data = data;
     lflow_table_clear(lflow_data->lflow_table);
+    lflow_data->handled_incrementally = false;
     lflow_reset_northd_refs(&lflow_input);
     lflow_ref_clear(lflow_input.igmp_lflow_ref);
 
-    build_lflows(eng_ctx->ovnsb_idl_txn, &lflow_input,
-                 lflow_data->lflow_table);
+    build_lflows(&lflow_input, lflow_data->lflow_table);
     stopwatch_stop(BUILD_LFLOWS_STOPWATCH_NAME, time_msec());
 
     return EN_UPDATED;
@@ -343,3 +341,122 @@ void en_lflow_cleanup(void *data_)
     struct lflow_data *data = data_;
     lflow_table_destroy(data->lflow_table);
 }
+
+void en_lflow_clear_tracked_data(void *data_)
+{
+    struct lflow_data *data = data_;
+    data->handled_incrementally = true;
+}
+
+static void
+lflow_sync_data_init(struct lflow_sync_data *lflow_sync,
+                     const struct sbrec_logical_flow_table *sb_lflow_table)
+{
+    hmap_init(&lflow_sync->sb_lflows.valid);
+    hmap_init(&lflow_sync->sb_lflows.to_delete);
+
+    if (!sb_lflow_table) {
+        return;
+    }
+
+    const struct sbrec_logical_flow *lflow;
+    SBREC_LOGICAL_FLOW_TABLE_FOR_EACH (lflow, sb_lflow_table) {
+        struct sb_lflow *sb_lflow = xzalloc(sizeof *sb_lflow);
+        sb_lflow->flow = lflow;
+        sb_lflow->delete_me = true;
+        hmap_insert(&lflow_sync->sb_lflows.valid, &sb_lflow->hmap_node,
+                    uuid_hash(&lflow->header_.uuid));
+    }
+}
+
+static void
+lflow_sync_data_destroy(struct lflow_sync_data *lflow_sync)
+{
+    struct sb_lflow *sb_lflow;
+    HMAP_FOR_EACH_SAFE (sb_lflow, hmap_node, &lflow_sync->sb_lflows.valid) {
+        hmap_remove(&lflow_sync->sb_lflows.valid, &sb_lflow->hmap_node);
+        free(sb_lflow);
+    }
+    HMAP_FOR_EACH_SAFE (sb_lflow, hmap_node,
+                        &lflow_sync->sb_lflows.to_delete) {
+        hmap_remove(&lflow_sync->sb_lflows.to_delete, &sb_lflow->hmap_node);
+        free(sb_lflow);
+    }
+    hmap_destroy(&lflow_sync->sb_lflows.valid);
+    hmap_destroy(&lflow_sync->sb_lflows.to_delete);
+}
+
+enum engine_node_state
+en_lflow_sync_run(struct engine_node *node, void *data)
+{
+    const struct sbrec_logical_flow_table *sb_lflow_table =
+        EN_OVSDB_GET(engine_get_input("SB_logical_flow", node));
+    /* XXX The lflow table is currently not treated as const because it
+     * contains mutable logical datapath groups. A future commit will
+     * separate the dp groups from the lflow_table so that this can be
+     * treated as const.
+     */
+    struct lflow_data *lflow_data =
+        engine_get_input_data("lflow", node);
+    const struct northd_data *northd =
+        engine_get_input_data("northd", node);
+    struct ed_type_global_config *global_config =
+        engine_get_input_data("global_config", node);
+    const struct sbrec_logical_dp_group_table *sb_dp_group_table =
+        EN_OVSDB_GET(engine_get_input("SB_logical_dp_group", node));
+    const struct engine_context *eng_ctx = engine_get_context();
+
+    struct lflow_sync_data *lflow_sync = data;
+    lflow_sync_data_destroy(lflow_sync);
+    lflow_sync_data_init(lflow_sync, sb_lflow_table);
+
+    stopwatch_start(LFLOWS_TO_SB_STOPWATCH_NAME, time_msec());
+
+    lflow_table_sync_to_sb(lflow_data->lflow_table, eng_ctx->ovnsb_idl_txn,
+                           &northd->ls_datapaths,
+                           &northd->lr_datapaths,
+                           global_config->ovn_internal_version_changed,
+                           &lflow_sync->sb_lflows, sb_dp_group_table);
+    lflow_table_sync_finish(&lflow_sync->sb_lflows);
+
+    struct sb_lflow *sb_lflow;
+    HMAP_FOR_EACH (sb_lflow, hmap_node, &lflow_sync->sb_lflows.to_delete) {
+        sbrec_logical_flow_delete(sb_lflow->flow);
+    }
+
+    stopwatch_stop(LFLOWS_TO_SB_STOPWATCH_NAME, time_msec());
+
+    return EN_UPDATED;
+}
+
+void *
+en_lflow_sync_init(struct engine_node *node OVS_UNUSED,
+                        struct engine_arg *arg OVS_UNUSED)
+{
+    struct lflow_sync_data *lflow_sync = xzalloc(sizeof *lflow_sync);
+    lflow_sync_data_init(lflow_sync, NULL);
+    return lflow_sync;
+}
+
+void
+en_lflow_sync_cleanup(void *data)
+{
+    struct lflow_sync_data *lflow_sync = data;
+    lflow_sync_data_destroy(lflow_sync);
+}
+
+enum engine_input_handler_result
+lflow_sync_lflow_handler(struct engine_node *node, void *data OVS_UNUSED)
+{
+    const struct lflow_data *lflow_data = engine_get_input_data("lflow", node);
+
+    /* The en-lflow node's handlers sync flows based on lflow_refs. If they
+     * were all able to handle the changes incrementally, then there's no need
+     * for en-lflow-sync to perform a sync of the entire lflow table.
+     */
+    if (lflow_data->handled_incrementally) {
+        return EN_HANDLED_UPDATED;
+    } else {
+        return EN_UNHANDLED;
+    }
+}
diff --git a/northd/en-lflow.h b/northd/en-lflow.h
index d3c96c027..c6e6672c6 100644
--- a/northd/en-lflow.h
+++ b/northd/en-lflow.h
@@ -8,16 +8,23 @@
 #include <stdio.h>
 
 #include "lib/inc-proc-eng.h"
+#include "lflow-mgr.h"
 
 struct lflow_table;
+struct sb_lflows;
 
 struct lflow_data {
     struct lflow_table *lflow_table;
+    /* This is set true if all of en-lflow's input handlers
+     * were able to handle northd's changes incrementally.
+     */
+    bool handled_incrementally;
 };
 
 enum engine_node_state en_lflow_run(struct engine_node *node, void *data);
 void *en_lflow_init(struct engine_node *node, struct engine_arg *arg);
 void en_lflow_cleanup(void *data);
+void en_lflow_clear_tracked_data(void *tracked_data);
 enum engine_input_handler_result lflow_northd_handler(struct engine_node *,
                                                       void *data);
 enum engine_input_handler_result lflow_port_group_handler(struct engine_node *,
@@ -31,4 +38,15 @@ lflow_multicast_igmp_handler(struct engine_node *node, void 
*data);
 enum engine_input_handler_result
 lflow_group_ecmp_route_change_handler(struct engine_node *node, void *data);
 
+struct sb_lflows;
+struct lflow_sync_data {
+    struct sb_lflows sb_lflows;
+};
+
+enum engine_node_state en_lflow_sync_run(struct engine_node *, void *data);
+void *en_lflow_sync_init(struct engine_node *, struct engine_arg *);
+void en_lflow_sync_cleanup(void *data);
+enum engine_input_handler_result lflow_sync_lflow_handler(struct engine_node *,
+                                                          void *data);
+
 #endif /* EN_LFLOW_H */
diff --git a/northd/en-northd-output.c b/northd/en-northd-output.c
index b492a771c..3710e0acc 100644
--- a/northd/en-northd-output.c
+++ b/northd/en-northd-output.c
@@ -50,13 +50,6 @@ northd_output_sync_to_sb_handler(struct engine_node *node 
OVS_UNUSED,
     return EN_HANDLED_UPDATED;
 }
 
-enum engine_input_handler_result
-northd_output_lflow_handler(struct engine_node *node OVS_UNUSED,
-                            void *data OVS_UNUSED)
-{
-    return EN_HANDLED_UPDATED;
-}
-
 enum engine_input_handler_result
 northd_output_mac_binding_aging_handler(struct engine_node *node OVS_UNUSED,
                                         void *data OVS_UNUSED)
@@ -91,3 +84,10 @@ northd_output_advertised_route_sync_handler(
 {
     return EN_HANDLED_UPDATED;
 }
+
+enum engine_input_handler_result
+northd_output_lflow_sync_handler(struct engine_node *node OVS_UNUSED,
+                                 void *data OVS_UNUSED)
+{
+    return EN_HANDLED_UPDATED;
+}
diff --git a/northd/en-northd-output.h b/northd/en-northd-output.h
index b7053e60c..5f8638535 100644
--- a/northd/en-northd-output.h
+++ b/northd/en-northd-output.h
@@ -13,8 +13,6 @@ enum engine_input_handler_result
 northd_output_sync_to_sb_handler(struct engine_node *node,
                                  void *data OVS_UNUSED);
 enum engine_input_handler_result
-northd_output_lflow_handler(struct engine_node *node, void *data OVS_UNUSED);
-enum engine_input_handler_result
 northd_output_mac_binding_aging_handler(struct engine_node *node,
                                         void *data OVS_UNUSED);
 enum engine_input_handler_result
@@ -28,5 +26,8 @@ northd_output_acl_id_handler(struct engine_node *node, void 
*data OVS_UNUSED);
 enum engine_input_handler_result
 northd_output_advertised_route_sync_handler(struct engine_node *node,
                                             void *data OVS_UNUSED);
+enum engine_input_handler_result
+northd_output_lflow_sync_handler(struct engine_node *node,
+                                 void *data OVS_UNUSED);
 
 #endif
diff --git a/northd/inc-proc-northd.c b/northd/inc-proc-northd.c
index 752f1e9dc..18b0ba9a5 100644
--- a/northd/inc-proc-northd.c
+++ b/northd/inc-proc-northd.c
@@ -159,7 +159,7 @@ enum sb_engine_node {
 static ENGINE_NODE(northd, CLEAR_TRACKED_DATA);
 static ENGINE_NODE(sync_from_sb);
 static ENGINE_NODE(sampling_app);
-static ENGINE_NODE(lflow);
+static ENGINE_NODE(lflow, CLEAR_TRACKED_DATA);
 static ENGINE_NODE(mac_binding_aging);
 static ENGINE_NODE(mac_binding_aging_waker);
 static ENGINE_NODE(northd_output);
@@ -201,6 +201,7 @@ static ENGINE_NODE(port_binding_paired_logical_switch_port);
 static ENGINE_NODE(port_binding_paired_chassisredirect_port);
 static ENGINE_NODE(port_binding_paired_mirror);
 static ENGINE_NODE(port_binding_pair);
+static ENGINE_NODE(lflow_sync);
 
 void inc_proc_northd_init(struct ovsdb_idl_loop *nb,
                           struct ovsdb_idl_loop *sb)
@@ -445,6 +446,18 @@ void inc_proc_northd_init(struct ovsdb_idl_loop *nb,
                      lflow_multicast_igmp_handler);
     engine_add_input(&en_lflow, &en_sb_acl_id, NULL);
 
+    engine_add_input(&en_lflow_sync, &en_sb_logical_flow, NULL);
+    engine_add_input(&en_lflow_sync, &en_global_config,
+                     node_global_config_handler);
+    engine_add_input(&en_lflow_sync, &en_lflow, lflow_sync_lflow_handler);
+    /* en_lflow reacts to en_northd's changes and recalculates flows.
+     * en_lflow_sync needs en_northd's data, but only needs to handle the
+     * flow changes that en_northd caused in en_lflow. Changes in en_northd
+     * can be ignored, so we use engine_noop_handler here.
+     */
+    engine_add_input(&en_lflow_sync, &en_northd, engine_noop_handler);
+    engine_add_input(&en_lflow_sync, &en_sb_logical_dp_group, NULL);
+
     engine_add_input(&en_sync_to_sb_addr_set, &en_northd, NULL);
     engine_add_input(&en_sync_to_sb_addr_set, &en_lr_stateful, NULL);
     engine_add_input(&en_sync_to_sb_addr_set, &en_sb_address_set, NULL);
@@ -496,8 +509,6 @@ void inc_proc_northd_init(struct ovsdb_idl_loop *nb,
     engine_add_input(&en_northd_output, &en_sync_from_sb, NULL);
     engine_add_input(&en_northd_output, &en_sync_to_sb,
                      northd_output_sync_to_sb_handler);
-    engine_add_input(&en_northd_output, &en_lflow,
-                     northd_output_lflow_handler);
     engine_add_input(&en_northd_output, &en_mac_binding_aging,
                      northd_output_mac_binding_aging_handler);
     engine_add_input(&en_northd_output, &en_fdb_aging,
@@ -508,6 +519,9 @@ void inc_proc_northd_init(struct ovsdb_idl_loop *nb,
                      northd_output_acl_id_handler);
     engine_add_input(&en_northd_output, &en_advertised_route_sync,
                      northd_output_advertised_route_sync_handler);
+    engine_add_input(&en_northd_output, &en_lflow_sync,
+                     northd_output_lflow_sync_handler);
+
 
     struct engine_arg engine_arg = {
         .nb_idl = nb->idl,
diff --git a/northd/lflow-mgr.c b/northd/lflow-mgr.c
index fcedbfe43..5973179b2 100644
--- a/northd/lflow-mgr.c
+++ b/northd/lflow-mgr.c
@@ -26,6 +26,7 @@
 #include "lflow-mgr.h"
 #include "lib/ovn-parallel-hmap.h"
 #include "lib/ovn-util.h"
+#include "en-datapath-logical-switch.h"
 
 VLOG_DEFINE_THIS_MODULE(lflow_mgr);
 
@@ -254,7 +255,7 @@ lflow_table_sync_to_sb(struct lflow_table *lflow_table,
                        const struct ovn_datapaths *ls_datapaths,
                        const struct ovn_datapaths *lr_datapaths,
                        bool ovn_internal_version_changed,
-                       const struct sbrec_logical_flow_table *sb_flow_table,
+                       struct sb_lflows *sb_lflows,
                        const struct sbrec_logical_dp_group_table *dpgrp_table)
 {
     struct hmap lflows_temp = HMAP_INITIALIZER(&lflows_temp);
@@ -265,8 +266,9 @@ lflow_table_sync_to_sb(struct lflow_table *lflow_table,
                        lflow_table->max_seen_lflow_size);
 
     /* Push changes to the Logical_Flow table to database. */
-    const struct sbrec_logical_flow *sbflow;
-    SBREC_LOGICAL_FLOW_TABLE_FOR_EACH_SAFE (sbflow, sb_flow_table) {
+    struct sb_lflow *sb_lflow;
+    HMAP_FOR_EACH (sb_lflow, hmap_node, &sb_lflows->valid) {
+        const struct sbrec_logical_flow *sbflow = sb_lflow->flow;
         struct sbrec_logical_dp_group *dp_group = sbflow->logical_dp_group;
         struct ovn_datapath *logical_datapath_od = NULL;
         size_t i;
@@ -294,7 +296,6 @@ lflow_table_sync_to_sb(struct lflow_table *lflow_table,
 
         if (!logical_datapath_od) {
             /* This lflow has no valid logical datapaths. */
-            sbrec_logical_flow_delete(sbflow);
             continue;
         }
 
@@ -326,8 +327,7 @@ lflow_table_sync_to_sb(struct lflow_table *lflow_table,
             hmap_remove(lflows, &lflow->hmap_node);
             hmap_insert(&lflows_temp, &lflow->hmap_node,
                         hmap_node_hash(&lflow->hmap_node));
-        } else {
-            sbrec_logical_flow_delete(sbflow);
+            sb_lflow->delete_me = false;
         }
     }
 
@@ -348,10 +348,23 @@ lflow_table_sync_to_sb(struct lflow_table *lflow_table,
         hmap_insert(&lflows_temp, &lflow->hmap_node,
                     hmap_node_hash(&lflow->hmap_node));
     }
+
     hmap_swap(lflows, &lflows_temp);
     hmap_destroy(&lflows_temp);
 }
 
+void lflow_table_sync_finish(struct sb_lflows *sb_lflows)
+{
+    struct sb_lflow *sb_lflow;
+    HMAP_FOR_EACH_SAFE (sb_lflow, hmap_node, &sb_lflows->valid) {
+        if (sb_lflow->delete_me) {
+            hmap_remove(&sb_lflows->valid, &sb_lflow->hmap_node);
+            hmap_insert(&sb_lflows->to_delete, &sb_lflow->hmap_node,
+                        hmap_node_hash(&sb_lflow->hmap_node));
+        }
+    }
+}
+
 /* Logical flow sync using 'struct lflow_ref'
  * ==========================================
  * The 'struct lflow_ref' represents a collection of (or references to)
diff --git a/northd/lflow-mgr.h b/northd/lflow-mgr.h
index 16ffc105c..2e454ce63 100644
--- a/northd/lflow-mgr.h
+++ b/northd/lflow-mgr.h
@@ -33,13 +33,23 @@ void lflow_table_clear(struct lflow_table *);
 void lflow_table_destroy(struct lflow_table *);
 void lflow_table_expand(struct lflow_table *);
 void lflow_table_set_size(struct lflow_table *, size_t);
+
+/* To sync lflows from a struct lflow_table to the southbound database,
+ * lflow_table_sync_to_sb() may be called multiple times, passing different
+ * lflow_tables in each time. Once all lflow_tables have been synced to the
+ * southbound database, call lflow_table_sync_finish() to free memory and
+ * delete any southbound logical flows that did not match any of the
+ * lflow_tables.
+ */
+struct sb_lflows;
 void lflow_table_sync_to_sb(struct lflow_table *,
                             struct ovsdb_idl_txn *ovnsb_txn,
                             const struct ovn_datapaths *ls_datapaths,
                             const struct ovn_datapaths *lr_datapaths,
                             bool ovn_internal_version_changed,
-                            const struct sbrec_logical_flow_table *,
+                            struct sb_lflows *sb_lflows,
                             const struct sbrec_logical_dp_group_table *);
+void lflow_table_sync_finish(struct sb_lflows *sb_lflows);
 void lflow_table_destroy(struct lflow_table *);
 
 void lflow_hash_lock_init(void);
@@ -164,6 +174,25 @@ struct ovn_dp_group {
     size_t refcnt;
 };
 
+/* Structure that allows for struct sbrec_logical_flow to be placed
+ * in an hmap. The hash is based on the UUID of the flow.
+ */
+struct sb_lflow {
+    const struct sbrec_logical_flow *flow;
+    struct hmap_node hmap_node;
+    bool delete_me;
+};
+
+struct sb_lflows {
+    /* struct sb_lflow that have not yet been ruled invalid. */
+    struct hmap valid;
+    /* struct sb_lflow that have been determined to be invalid
+     * and should be deleted.
+     */
+    struct hmap to_delete;
+};
+
+
 static inline void
 ovn_dp_groups_init(struct hmap *dp_groups)
 {
diff --git a/northd/northd.c b/northd/northd.c
index 5efadd3f7..0d450738d 100644
--- a/northd/northd.c
+++ b/northd/northd.c
@@ -17795,8 +17795,7 @@ void run_update_worker_pool(int n_threads)
 
 /* Updates the Logical_Flow and Multicast_Group tables in the OVN_SB database,
  * constructing their contents based on the OVN_NB database. */
-void build_lflows(struct ovsdb_idl_txn *ovnsb_txn,
-                  struct lflow_input *input_data,
+void build_lflows(struct lflow_input *input_data,
                   struct lflow_table *lflows)
 {
     build_lswitch_and_lrouter_flows(input_data->ls_datapaths,
@@ -17829,15 +17828,6 @@ void build_lflows(struct ovsdb_idl_txn *ovnsb_txn,
     /* Parallel build may result in a suboptimal hash. Resize the
      * lflow map to a correct size before doing lookups */
     lflow_table_expand(lflows);
-    
-    stopwatch_start(LFLOWS_TO_SB_STOPWATCH_NAME, time_msec());
-    lflow_table_sync_to_sb(lflows, ovnsb_txn, input_data->ls_datapaths,
-                           input_data->lr_datapaths,
-                           input_data->ovn_internal_version_changed,
-                           input_data->sbrec_logical_flow_table,
-                           input_data->sbrec_logical_dp_group_table);
-
-    stopwatch_stop(LFLOWS_TO_SB_STOPWATCH_NAME, time_msec());
 }
 
 void
diff --git a/northd/northd.h b/northd/northd.h
index 21c068f35..0239f40fe 100644
--- a/northd/northd.h
+++ b/northd/northd.h
@@ -867,8 +867,7 @@ struct lr_stateful_tracked_data;
 struct ls_stateful_tracked_data;
 struct group_ecmp_datapath;
 
-void build_lflows(struct ovsdb_idl_txn *ovnsb_txn,
-                  struct lflow_input *input_data,
+void build_lflows(struct lflow_input *input_data,
                   struct lflow_table *);
 void lflow_reset_northd_refs(struct lflow_input *);
 void build_route_data_flows_for_lrouter(
-- 
2.47.0

_______________________________________________
dev mailing list
d...@openvswitch.org
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to