Introduce the capability to run a recompute just on updated nodes in case
of an incremental processing abort (e.g. if the controller is not able to
write on the sb db and the change can't be processed incrementally).
This will avoid wasting CPU time on nodes that are not changed during
last run.

Signed-off-by: Lorenzo Bianconi <[email protected]>
---
 controller/ovn-controller.c | 20 +++++-----
 lib/inc-proc-eng.c          | 73 ++++++++++++++++++++++++++++++++++---
 lib/inc-proc-eng.h          | 12 +++++-
 northd/inc-proc-northd.c    |  8 ++--
 4 files changed, 92 insertions(+), 21 deletions(-)

diff --git a/controller/ovn-controller.c b/controller/ovn-controller.c
index ea5e9df41..0c9b6a162 100644
--- a/controller/ovn-controller.c
+++ b/controller/ovn-controller.c
@@ -557,7 +557,7 @@ update_sb_db(struct ovsdb_idl *ovs_idl, struct ovsdb_idl 
*ovnsb_idl,
     }
     if (reset_ovnsb_idl_min_index && *reset_ovnsb_idl_min_index) {
         VLOG_INFO("Resetting southbound database cluster state");
-        engine_set_force_recompute(true);
+        engine_request_recompute(EN_RECOMPUTE_FULL);
         ovsdb_idl_reset_min_index(ovnsb_idl);
         *reset_ovnsb_idl_min_index = false;
     }
@@ -3066,7 +3066,7 @@ check_northd_version(struct ovsdb_idl *ovs_idl, struct 
ovsdb_idl *ovnsb_idl,
      * full recompute.
      */
     if (version_mismatch) {
-        engine_set_force_recompute(true);
+        engine_request_recompute(EN_RECOMPUTE_FULL);
     }
     version_mismatch = false;
     return true;
@@ -3524,7 +3524,7 @@ main(int argc, char *argv[])
         if (new_ovs_cond_seqno != ovs_cond_seqno) {
             if (!new_ovs_cond_seqno) {
                 VLOG_INFO("OVS IDL reconnected, force recompute.");
-                engine_set_force_recompute(true);
+                engine_request_recompute(EN_RECOMPUTE_FULL);
             }
             ovs_cond_seqno = new_ovs_cond_seqno;
         }
@@ -3542,7 +3542,7 @@ main(int argc, char *argv[])
         if (new_ovnsb_cond_seqno != ovnsb_cond_seqno) {
             if (!new_ovnsb_cond_seqno) {
                 VLOG_INFO("OVNSB IDL reconnected, force recompute.");
-                engine_set_force_recompute(true);
+                engine_request_recompute(EN_RECOMPUTE_FULL);
                 vif_plug_reset_idl_prime_counter();
             }
             ovnsb_cond_seqno = new_ovnsb_cond_seqno;
@@ -3620,7 +3620,7 @@ main(int argc, char *argv[])
                                            &br_int_dp->capabilities : NULL,
                                            br_int ? br_int->name : NULL)) {
                 VLOG_INFO("OVS feature set changed, force recompute.");
-                engine_set_force_recompute(true);
+                engine_request_recompute(EN_RECOMPUTE_FULL);
             }
 
             if (br_int) {
@@ -3815,7 +3815,7 @@ main(int argc, char *argv[])
                 if (engine_need_run()) {
                     VLOG_DBG("engine did not run, force recompute next time: "
                              "br_int %p, chassis %p", br_int, chassis);
-                    engine_set_force_recompute(true);
+                    engine_request_recompute(EN_RECOMPUTE_PARTIAL);
                     poll_immediate_wake();
                 } else {
                     VLOG_DBG("engine did not run, and it was not needed"
@@ -3825,10 +3825,10 @@ main(int argc, char *argv[])
             } else if (engine_aborted()) {
                 VLOG_DBG("engine was aborted, force recompute next time: "
                          "br_int %p, chassis %p", br_int, chassis);
-                engine_set_force_recompute(true);
+                engine_request_recompute(EN_RECOMPUTE_PARTIAL);
                 poll_immediate_wake();
             } else {
-                engine_set_force_recompute(false);
+                engine_request_recompute(EN_RECOMPUTE_NONE);
             }
 
             store_nb_cfg(ovnsb_idl_txn, ovs_idl_txn, chassis_private,
@@ -3882,7 +3882,7 @@ main(int argc, char *argv[])
 
         if (!ovsdb_idl_loop_commit_and_wait(&ovnsb_idl_loop)) {
             VLOG_INFO("OVNSB commit failed, force recompute next time.");
-            engine_set_force_recompute(true);
+            engine_request_recompute(EN_RECOMPUTE_FULL);
         }
 
         int ovs_txn_status = ovsdb_idl_loop_commit_and_wait(&ovs_idl_loop);
@@ -4202,7 +4202,7 @@ lflow_cache_flush_cmd(struct unixctl_conn *conn 
OVS_UNUSED,
     VLOG_INFO("User triggered lflow cache flush.");
     struct lflow_output_persistent_data *fo_pd = arg_;
     lflow_cache_flush(fo_pd->lflow_cache);
-    engine_set_force_recompute(true);
+    engine_request_recompute(EN_RECOMPUTE_FULL);
     poll_immediate_wake();
     unixctl_command_reply(conn, NULL);
 }
diff --git a/lib/inc-proc-eng.c b/lib/inc-proc-eng.c
index 7b4391700..3b044598c 100644
--- a/lib/inc-proc-eng.c
+++ b/lib/inc-proc-eng.c
@@ -33,7 +33,7 @@
 
 VLOG_DEFINE_THIS_MODULE(inc_proc_eng);
 
-static bool engine_force_recompute = false;
+static enum engine_recompute_request  recompute_request = EN_RECOMPUTE_NONE;
 static bool engine_run_aborted = false;
 static const struct engine_context *engine_context;
 
@@ -47,6 +47,12 @@ static const char *engine_node_state_name[EN_STATE_MAX] = {
     [EN_ABORTED]   = "Aborted",
 };
 
+static const char *engine_recompute_request_name[EN_RECOMPUTE_MAX] = {
+    [EN_RECOMPUTE_NONE]     = "None",
+    [EN_RECOMPUTE_PARTIAL]  = "Partial",
+    [EN_RECOMPUTE_FULL]     = "Full",
+};
+
 static long long engine_compute_log_timeout_msec = 500;
 
 static void
@@ -54,9 +60,28 @@ engine_recompute(struct engine_node *node, bool allowed,
                  const char *reason_fmt, ...) OVS_PRINTF_FORMAT(3, 4);
 
 void
-engine_set_force_recompute(bool val)
+engine_request_recompute(enum engine_recompute_request val)
 {
-    engine_force_recompute = val;
+    if (val == EN_RECOMPUTE_PARTIAL &&
+        recompute_request == EN_RECOMPUTE_FULL) {
+        /* pending EN_RECOMPUTE_FULL already requested. */
+        return;
+    }
+
+    /* EN_RECOMPUTE_FULL is allowed to overwrite EN_RECOMPUTE_PARTIAL. */
+    recompute_request = val;
+    VLOG_DBG("Requested recompute %s", engine_recompute_request_name[val]);
+
+    for (size_t i = 0; i < engine_n_nodes; i++) {
+        if (recompute_request == EN_RECOMPUTE_PARTIAL) {
+            if (engine_nodes[i]->state == EN_UPDATED ||
+                engine_nodes[i]->state == EN_ABORTED) {
+                engine_nodes[i]->pending_recompute = true;
+            }
+        } else {
+            engine_nodes[i]->pending_recompute = false;
+        }
+    }
 }
 
 const struct engine_context *
@@ -428,6 +453,34 @@ engine_compute(struct engine_node *node, bool 
recompute_allowed)
     return true;
 }
 
+static bool
+engine_run_partial(struct engine_node *node, bool recompute_allowed)
+{
+    /* Let's try to do a selective recompute on pending changes before
+     * performing a full recompute.
+     */
+    if (recompute_request != EN_RECOMPUTE_PARTIAL) {
+        return false;
+    }
+
+    if (node->pending_recompute) {
+        goto recompute;
+    }
+
+    for (size_t i = 0; i < node->n_inputs; i++) {
+        if (node->inputs[i].node->pending_recompute) {
+            goto recompute;
+        }
+    }
+    return false;
+
+recompute:
+    node->pending_recompute = false;
+    engine_recompute(node, recompute_allowed, "selective recompute on %s",
+                     node->name);
+    return true;
+}
+
 static void
 engine_run_node(struct engine_node *node, bool recompute_allowed)
 {
@@ -438,11 +491,15 @@ engine_run_node(struct engine_node *node, bool 
recompute_allowed)
         return;
     }
 
-    if (engine_force_recompute) {
+    if (recompute_request == EN_RECOMPUTE_FULL) {
         engine_recompute(node, recompute_allowed, "forced");
         return;
     }
 
+    if (engine_run_partial(node, recompute_allowed)) {
+        return;
+    }
+
     /* If any of the inputs updated data but there is no change_handler, then
      * recompute the current node too.
      */
@@ -495,9 +552,13 @@ engine_run(bool recompute_allowed)
         if (engine_nodes[i]->state == EN_ABORTED) {
             engine_nodes[i]->stats.abort++;
             engine_run_aborted = true;
-            return;
+            break;
         }
     }
+
+    if (recompute_request == EN_RECOMPUTE_PARTIAL) {
+        recompute_request = EN_RECOMPUTE_NONE;
+    }
 }
 
 bool
@@ -524,6 +585,6 @@ void
 engine_trigger_recompute(void)
 {
     VLOG_INFO("User triggered force recompute.");
-    engine_set_force_recompute(true);
+    engine_request_recompute(EN_RECOMPUTE_FULL);
     poll_immediate_wake();
 }
diff --git a/lib/inc-proc-eng.h b/lib/inc-proc-eng.h
index 9bfab1f7c..7b734e666 100644
--- a/lib/inc-proc-eng.h
+++ b/lib/inc-proc-eng.h
@@ -115,6 +115,13 @@ enum engine_node_state {
     EN_STATE_MAX,
 };
 
+enum engine_recompute_request {
+    EN_RECOMPUTE_NONE,      /* No recompute is necessary. */
+    EN_RECOMPUTE_PARTIAL,   /* Partial recompute on updated node requested. */
+    EN_RECOMPUTE_FULL,      /* Full recompute requested. */
+    EN_RECOMPUTE_MAX,
+};
+
 struct engine_stats {
     uint64_t recompute;
     uint64_t compute;
@@ -141,6 +148,9 @@ struct engine_node {
     /* State of the node after the last engine run. */
     enum engine_node_state state;
 
+    /* This node needs to be processed by the upcoming partial recompute. */
+    bool pending_recompute;
+
     /* Method to allocate and initialize node data. It may be NULL.
      * The user supplied argument 'arg' is passed from the call to
      * engine_init().
@@ -216,7 +226,7 @@ void engine_add_input(struct engine_node *node, struct 
engine_node *input,
  * in circumstances when we are not sure there is change or not, or
  * when there is change but the engine couldn't be executed in that
  * iteration, and the change can't be tracked across iterations */
-void engine_set_force_recompute(bool val);
+void engine_request_recompute(enum engine_recompute_request val);
 
 /* Return the current engine_context. The values in the context can be NULL
  * if the engine is run with allow_recompute == false in the current
diff --git a/northd/inc-proc-northd.c b/northd/inc-proc-northd.c
index af55221e3..63c7faa7d 100644
--- a/northd/inc-proc-northd.c
+++ b/northd/inc-proc-northd.c
@@ -258,7 +258,7 @@ void inc_proc_northd_run(struct ovsdb_idl_txn *ovnnb_txn,
      * force-recompute request if 'recompute' is false.
      */
     if (recompute) {
-        engine_set_force_recompute(recompute);
+        engine_request_recompute(EN_RECOMPUTE_FULL);
     }
 
     struct engine_context eng_ctx = {
@@ -275,17 +275,17 @@ void inc_proc_northd_run(struct ovsdb_idl_txn *ovnnb_txn,
     if (!engine_has_run()) {
         if (engine_need_run()) {
             VLOG_DBG("engine did not run, force recompute next time.");
-            engine_set_force_recompute(true);
+            engine_request_recompute(EN_RECOMPUTE_FULL);
             poll_immediate_wake();
         } else {
             VLOG_DBG("engine did not run, and it was not needed");
         }
     } else if (engine_aborted()) {
         VLOG_DBG("engine was aborted, force recompute next time.");
-        engine_set_force_recompute(true);
+        engine_request_recompute(EN_RECOMPUTE_FULL);
         poll_immediate_wake();
     } else {
-        engine_set_force_recompute(false);
+        engine_request_recompute(EN_RECOMPUTE_NONE);
     }
 }
 
-- 
2.35.1

_______________________________________________
dev mailing list
[email protected]
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to