This patch implements change handler for port-binding in flow_output
for physical flows computing, so that physical flow computing will
be incremental.

This patch together with previous incremental processing engine
related changes supports incremental processing for lflow changes
and port-binding changes of lports on other HVs, which are the most
common scenarios in a cloud where workloads come up and down.

The CPU time of ovn-controller in ovn-scale-test for 500 lports
creating and binding decreased 90% comparing with master without
incremental processing engine related changes.
---
 ovn/controller/ovn-controller.c |  36 +++++++++++-
 ovn/controller/physical.c       | 123 +++++++++++++++++++++++++++++-----------
 ovn/controller/physical.h       |   6 ++
 3 files changed, 131 insertions(+), 34 deletions(-)

diff --git a/ovn/controller/ovn-controller.c b/ovn/controller/ovn-controller.c
index ef85e1a..1fc41e2 100644
--- a/ovn/controller/ovn-controller.c
+++ b/ovn/controller/ovn-controller.c
@@ -780,6 +780,40 @@ flow_output_sb_logical_flow_handler(struct engine_node 
*node)
     return true;
 }
 
+static bool
+flow_output_sb_port_binding_handler(struct engine_node *node)
+{
+    struct controller_ctx *ctx = (struct controller_ctx *)node->context;
+    struct ed_type_runtime_data *data =
+        (struct ed_type_runtime_data *)engine_get_input(
+                "runtime_data", node)->data;
+    struct hmap *local_datapaths = data->local_datapaths;
+    struct sset *active_tunnels = data->active_tunnels;
+    struct chassis_index *chassis_index = data->chassis_index;
+    const struct ovsrec_bridge *br_int = get_br_int(ctx);
+
+    const char *chassis_id = get_chassis_id(ctx->ovs_idl);
+
+
+    const struct sbrec_chassis *chassis = NULL;
+    if (chassis_id) {
+        chassis = get_chassis(ctx->ovnsb_idl, chassis_id);
+    }
+
+    ovs_assert(br_int && chassis);
+
+    // TODO: handle port-binding for lflow processing
+
+    enum mf_field_id mff_ovn_geneve = ofctrl_get_mf_field_id();
+    physical_handle_port_binding_changes(ctx, mff_ovn_geneve,
+                 chassis, ctx->ct_zones,
+                 local_datapaths,
+                 chassis_index, active_tunnels);
+
+    node->changed = true;
+    return true;
+}
+
 int
 main(int argc, char *argv[])
 {
@@ -913,7 +947,7 @@ main(int argc, char *argv[])
     engine_add_input(&en_flow_output, &en_sb_address_set, NULL);
     engine_add_input(&en_flow_output, &en_sb_multicast_group, NULL);
     engine_add_input(&en_flow_output, &en_sb_datapath_binding, NULL);
-    engine_add_input(&en_flow_output, &en_sb_port_binding, NULL);
+    engine_add_input(&en_flow_output, &en_sb_port_binding, 
flow_output_sb_port_binding_handler);
     engine_add_input(&en_flow_output, &en_sb_mac_binding, NULL);
     engine_add_input(&en_flow_output, &en_sb_logical_flow, 
flow_output_sb_logical_flow_handler);
     engine_add_input(&en_flow_output, &en_sb_dhcp_options, NULL);
diff --git a/ovn/controller/physical.c b/ovn/controller/physical.c
index 41ea9fe..cda40de 100644
--- a/ovn/controller/physical.c
+++ b/ovn/controller/physical.c
@@ -358,7 +358,7 @@ consider_port_binding(struct controller_ctx *ctx,
         ofpact_finish_CLONE(ofpacts_p, &clone);
 
         ofctrl_add_flow(OFTABLE_LOG_TO_PHY, 100, 0,
-                        &match, ofpacts_p, hc_uuid);
+                        &match, ofpacts_p, &binding->header_.uuid);
         return;
     }
 
@@ -426,7 +426,7 @@ consider_port_binding(struct controller_ctx *ctx,
         }
 
         ofctrl_add_flow(OFTABLE_LOCAL_OUTPUT, 100, 0,
-                        &match, ofpacts_p, hc_uuid);
+                        &match, ofpacts_p, &binding->header_.uuid);
 
         goto out;
     }
@@ -558,7 +558,7 @@ consider_port_binding(struct controller_ctx *ctx,
         /* Resubmit to first logical ingress pipeline table. */
         put_resubmit(OFTABLE_LOG_INGRESS_PIPELINE, ofpacts_p);
         ofctrl_add_flow(OFTABLE_PHY_TO_LOG,
-                        tag ? 150 : 100, 0, &match, ofpacts_p, hc_uuid);
+                        tag ? 150 : 100, 0, &match, ofpacts_p, 
&binding->header_.uuid);
 
         if (!tag && (!strcmp(binding->type, "localnet")
                      || !strcmp(binding->type, "l2gateway"))) {
@@ -568,7 +568,7 @@ consider_port_binding(struct controller_ctx *ctx,
              * action. */
             ofpbuf_pull(ofpacts_p, ofpacts_orig_size);
             match_set_dl_tci_masked(&match, 0, htons(VLAN_CFI));
-            ofctrl_add_flow(0, 100, 0, &match, ofpacts_p, hc_uuid);
+            ofctrl_add_flow(0, 100, 0, &match, ofpacts_p, 
&binding->header_.uuid);
         }
 
         /* Table 65, Priority 100.
@@ -596,7 +596,7 @@ consider_port_binding(struct controller_ctx *ctx,
             ofpact_put_STRIP_VLAN(ofpacts_p);
         }
         ofctrl_add_flow(OFTABLE_LOG_TO_PHY, 100, 0,
-                        &match, ofpacts_p, hc_uuid);
+                        &match, ofpacts_p, &binding->header_.uuid);
     } else if (!tun && !is_ha_remote) {
         /* Remote port connected by localnet port */
         /* Table 33, priority 100.
@@ -619,7 +619,7 @@ consider_port_binding(struct controller_ctx *ctx,
         /* Resubmit to table 33. */
         put_resubmit(OFTABLE_LOCAL_OUTPUT, ofpacts_p);
         ofctrl_add_flow(OFTABLE_LOCAL_OUTPUT, 100, 0,
-                        &match, ofpacts_p, hc_uuid);
+                        &match, ofpacts_p, &binding->header_.uuid);
     } else {
         /* Remote port connected by tunnel */
 
@@ -710,7 +710,7 @@ consider_port_binding(struct controller_ctx *ctx,
             ofpact_finish_BUNDLE(ofpacts_p, &bundle);
         }
         ofctrl_add_flow(OFTABLE_REMOTE_OUTPUT, 100, 0,
-                        &match, ofpacts_p, hc_uuid);
+                        &match, ofpacts_p, &binding->header_.uuid);
     }
 out:
     if (gateway_chassis) {
@@ -723,9 +723,7 @@ consider_mc_group(enum mf_field_id mff_ovn_geneve,
                   const struct simap *ct_zones,
                   struct hmap *local_datapaths,
                   const struct sbrec_chassis *chassis,
-                  const struct sbrec_multicast_group *mc,
-                  struct ofpbuf *ofpacts_p,
-                  struct ofpbuf *remote_ofpacts_p)
+                  const struct sbrec_multicast_group *mc)
 {
     uint32_t dp_key = mc->datapath->tunnel_key;
     if (!get_local_datapath(local_datapaths, dp_key)) {
@@ -751,8 +749,10 @@ consider_mc_group(enum mf_field_id mff_ovn_geneve,
      *      would happen on every hypervisor in the multicast group,
      *      effectively duplicating the packet.)
      */
-    ofpbuf_clear(ofpacts_p);
-    ofpbuf_clear(remote_ofpacts_p);
+    struct ofpbuf ofpacts;
+    ofpbuf_init(&ofpacts, 0);
+    struct ofpbuf remote_ofpacts;
+    ofpbuf_init(&remote_ofpacts, 0);
     for (size_t i = 0; i < mc->n_ports; i++) {
         struct sbrec_port_binding *port = mc->ports[i];
 
@@ -766,20 +766,20 @@ consider_mc_group(enum mf_field_id mff_ovn_geneve,
 
         int zone_id = simap_get(ct_zones, port->logical_port);
         if (zone_id) {
-            put_load(zone_id, MFF_LOG_CT_ZONE, 0, 32, ofpacts_p);
+            put_load(zone_id, MFF_LOG_CT_ZONE, 0, 32, &ofpacts);
         }
 
         if (!strcmp(port->type, "patch")) {
             put_load(port->tunnel_key, MFF_LOG_OUTPORT, 0, 32,
-                     remote_ofpacts_p);
-            put_resubmit(OFTABLE_CHECK_LOOPBACK, remote_ofpacts_p);
+                     &remote_ofpacts);
+            put_resubmit(OFTABLE_CHECK_LOOPBACK, &remote_ofpacts);
         } else if (simap_contains(&localvif_to_ofport,
                            (port->parent_port && *port->parent_port)
                            ? port->parent_port : port->logical_port)
                    || (!strcmp(port->type, "l3gateway")
                        && port->chassis == chassis)) {
-            put_load(port->tunnel_key, MFF_LOG_OUTPORT, 0, 32, ofpacts_p);
-            put_resubmit(OFTABLE_CHECK_LOOPBACK, ofpacts_p);
+            put_load(port->tunnel_key, MFF_LOG_OUTPORT, 0, 32, &ofpacts);
+            put_resubmit(OFTABLE_CHECK_LOOPBACK, &ofpacts);
         } else if (port->chassis && !get_localnet_port(local_datapaths,
                                          mc->datapath->tunnel_key)) {
             /* Add remote chassis only when localnet port not exist,
@@ -794,14 +794,14 @@ consider_mc_group(enum mf_field_id mff_ovn_geneve,
      *
      * Handle output to the local logical ports in the multicast group, if
      * any. */
-    bool local_ports = ofpacts_p->size > 0;
+    bool local_ports = ofpacts.size > 0;
     if (local_ports) {
         /* Following delivery to local logical ports, restore the multicast
          * group as the logical output port. */
-        put_load(mc->tunnel_key, MFF_LOG_OUTPORT, 0, 32, ofpacts_p);
+        put_load(mc->tunnel_key, MFF_LOG_OUTPORT, 0, 32, &ofpacts);
 
         ofctrl_add_flow(OFTABLE_LOCAL_OUTPUT, 100, 0,
-                        &match, ofpacts_p, hc_uuid);
+                        &match, &ofpacts, &mc->header_.uuid);
     }
 
     /* Table 32, priority 100.
@@ -809,12 +809,12 @@ consider_mc_group(enum mf_field_id mff_ovn_geneve,
      *
      * Handle output to the remote chassis in the multicast group, if
      * any. */
-    if (!sset_is_empty(&remote_chassis) || remote_ofpacts_p->size > 0) {
-        if (remote_ofpacts_p->size > 0) {
+    if (!sset_is_empty(&remote_chassis) || remote_ofpacts.size > 0) {
+        if (remote_ofpacts.size > 0) {
             /* Following delivery to logical patch ports, restore the
              * multicast group as the logical output port. */
             put_load(mc->tunnel_key, MFF_LOG_OUTPORT, 0, 32,
-                     remote_ofpacts_p);
+                     &remote_ofpacts);
         }
 
         const char *chassis_name;
@@ -828,20 +828,22 @@ consider_mc_group(enum mf_field_id mff_ovn_geneve,
 
             if (!prev || tun->type != prev->type) {
                 put_encapsulation(mff_ovn_geneve, tun, mc->datapath,
-                                  mc->tunnel_key, remote_ofpacts_p);
+                                  mc->tunnel_key, &remote_ofpacts);
                 prev = tun;
             }
-            ofpact_put_OUTPUT(remote_ofpacts_p)->port = tun->ofport;
+            ofpact_put_OUTPUT(&remote_ofpacts)->port = tun->ofport;
         }
 
-        if (remote_ofpacts_p->size) {
+        if (remote_ofpacts.size) {
             if (local_ports) {
-                put_resubmit(OFTABLE_LOCAL_OUTPUT, remote_ofpacts_p);
+                put_resubmit(OFTABLE_LOCAL_OUTPUT, &remote_ofpacts);
             }
             ofctrl_add_flow(OFTABLE_REMOTE_OUTPUT, 100, 0,
-                            &match, remote_ofpacts_p, hc_uuid);
+                            &match, &remote_ofpacts, &mc->header_.uuid);
         }
     }
+    ofpbuf_uninit(&ofpacts);
+    ofpbuf_uninit(&remote_ofpacts);
     sset_destroy(&remote_chassis);
 }
 
@@ -856,6 +858,64 @@ update_ofports(struct simap *old, struct simap *new)
     return changed;
 }
 
+static void
+reconsider_mc_group_for_pb(struct controller_ctx *ctx,
+                           const struct sbrec_port_binding *pb,
+                           const char *mc_name,
+                           enum mf_field_id mff_ovn_geneve,
+                           const struct sbrec_chassis *chassis,
+                           const struct simap *ct_zones,
+                           struct hmap *local_datapaths)
+{
+    const struct sbrec_multicast_group *mc
+        = mcgroup_lookup_by_dp_name(ctx->ovnsb_idl, pb->datapath, mc_name);
+    if (mc) {
+        ofctrl_remove_flows(&mc->header_.uuid);
+
+        consider_mc_group(mff_ovn_geneve, ct_zones, local_datapaths, chassis,
+                          mc);
+    } else {
+        static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 1);
+        VLOG_WARN_RL(&rl, "MC group %s is not found in datapath: "UUID_FMT,
+                     mc_name, UUID_ARGS(&pb->datapath->header_.uuid));
+    }
+}
+
+void
+physical_handle_port_binding_changes(struct controller_ctx *ctx,
+                                     enum mf_field_id mff_ovn_geneve,
+                                     const struct sbrec_chassis *chassis,
+                                     const struct simap *ct_zones,
+                                     struct hmap *local_datapaths,
+                                     struct chassis_index *chassis_index,
+                                     struct sset *active_tunnels)
+{
+    const struct sbrec_port_binding *binding;
+    struct ofpbuf ofpacts;
+    ofpbuf_init(&ofpacts, 0);
+    SBREC_PORT_BINDING_FOR_EACH_TRACKED (binding, ctx->ovnsb_idl) {
+        if (sbrec_port_binding_is_deleted(binding)) {
+            ofctrl_remove_flows(&binding->header_.uuid);
+        } else {
+            if (!sbrec_port_binding_is_new(binding)) {
+                ofctrl_remove_flows(&binding->header_.uuid);
+
+                reconsider_mc_group_for_pb(ctx, binding, "_MC_flood",
+                                           mff_ovn_geneve, chassis,
+                                           ct_zones, local_datapaths);
+                reconsider_mc_group_for_pb(ctx, binding, "_MC_unknown",
+                                           mff_ovn_geneve, chassis,
+                                           ct_zones, local_datapaths);
+            }
+            consider_port_binding(ctx, mff_ovn_geneve, ct_zones,
+                                  chassis_index, active_tunnels,
+                                  local_datapaths, binding, chassis,
+                                  &ofpacts);
+        }
+    }
+
+}
+
 void
 physical_run(struct controller_ctx *ctx, enum mf_field_id mff_ovn_geneve,
              const struct ovsrec_bridge *br_int,
@@ -978,6 +1038,7 @@ physical_run(struct controller_ctx *ctx, enum mf_field_id 
mff_ovn_geneve,
     /* Capture changed or removed openflow ports. */
     physical_map_changed |= update_ofports(&localvif_to_ofport,
                                            &new_localvif_to_ofport);
+    // TODO: maybe this is not needed any more?
     if (physical_map_changed) {
         /* Reprocess logical flow table immediately. */
         poll_immediate_wake();
@@ -998,8 +1059,6 @@ physical_run(struct controller_ctx *ctx, enum mf_field_id 
mff_ovn_geneve,
 
     /* Handle output to multicast groups, in tables 32 and 33. */
     const struct sbrec_multicast_group *mc;
-    struct ofpbuf remote_ofpacts;
-    ofpbuf_init(&remote_ofpacts, 0);
     SBREC_MULTICAST_GROUP_FOR_EACH (mc, ctx->ovnsb_idl) {
         /* Table 32, priority 150.
          * =======================
@@ -1017,11 +1076,9 @@ physical_run(struct controller_ctx *ctx, enum 
mf_field_id mff_ovn_geneve,
                         &ofpacts, hc_uuid);
 
         consider_mc_group(mff_ovn_geneve, ct_zones, local_datapaths, chassis,
-                          mc, &ofpacts, &remote_ofpacts);
+                          mc);
     }
 
-    ofpbuf_uninit(&remote_ofpacts);
-
     /* Table 0, priority 100.
      * ======================
      *
diff --git a/ovn/controller/physical.h b/ovn/controller/physical.h
index 4aeec04..e0b31a4 100644
--- a/ovn/controller/physical.h
+++ b/ovn/controller/physical.h
@@ -51,5 +51,11 @@ void physical_run(struct controller_ctx *, enum mf_field_id 
mff_ovn_geneve,
                   const struct sset *local_lports,
                   struct chassis_index *chassis_index,
                   struct sset *active_tunnels);
+void physical_handle_port_binding_changes(struct controller_ctx *ctx, enum 
mf_field_id mff_ovn_geneve,
+             const struct sbrec_chassis *chassis,
+             const struct simap *ct_zones,
+             struct hmap *local_datapaths,
+             struct chassis_index *chassis_index,
+             struct sset *active_tunnels);
 
 #endif /* ovn/physical.h */
-- 
2.1.0

_______________________________________________
dev mailing list
d...@openvswitch.org
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to