> On Wed, Mar 23, 2022 at 4:22 PM Lorenzo Bianconi <
> [email protected]> wrote:
> >
> > Introduce the capability to run a recompute just on updated nodes in case
> > of an incremental processing abort (e.g. if the controller is not able to
> > write on the sb db and the change can't be processed incrementally).
> > This will avoid wasting CPU time on nodes that are not changed during
> > last run.
> >
> > Signed-off-by: Lorenzo Bianconi <[email protected]>
> 
> Thanks Lorenzo for the RFC. Here are my comments.

Hi Han,

thx for the review :)

> 
> Firstly, I am confused by "recompute just on updated nodes" - if a node is
> updated in the last run, it means it has already computed successfully its
> output, then why need to recompute on that node again? I think what we
> really want to do is that when an engine_run() is aborted, in the next run
> we want to recompute the nodes that are STALE or ABORTED, and the nodes
> that depend on (directly or indirectly) these nodes, instead of recomputing
> all the nodes.

Reviewing it, I agree, we should not rely on EN_UPDATED state to run
engine_recompute().
Probably we should just recompute aborted nodes and their directly connected
inputs on the next run but we should avoid breaking on the first aborted node in
engine_run().
Do you think this approach will end up recomputing all the nodes (or the high 
cost
ones e.g. lflow) since the graph is highly connected?

> 
> However, even if we implemented the above, it doesn't seem to help for the
> performance, because the high cost nodes, such as lflow_output, depend on
> almost all the nodes (indirectly), so any recompute of the nodes would
> still trigger a lflow_run(). To solve this, we need to track changes (or
> set state UNCHANGED if there is no change) when a node is recomputed, so
> that the nodes depend on it can still do incremental processing.

Do you think it is possible to keep track of changed db infos between multiple
runs in the IDL layer in order to re-process data incrementally in the next
run?

Regards,
Lorenzo

> 
> So, I am afraid I didn't understand the approach of this implementation -
> why would UPDATED nodes are recomputed and STALE node are not, and the
> dependant nodes are not considered, either.
> 
> Could you describe more clearly the problem you are solving with this
> patch, and give a real example that without this change it is triggering
> lflow_run() unnecessarily and with this change it is avoided?
> 
> Thanks,
> Han
> 
> > ---
> >  controller/ovn-controller.c | 20 +++++-----
> >  lib/inc-proc-eng.c          | 73 ++++++++++++++++++++++++++++++++++---
> >  lib/inc-proc-eng.h          | 12 +++++-
> >  northd/inc-proc-northd.c    |  8 ++--
> >  4 files changed, 92 insertions(+), 21 deletions(-)
> >
> > diff --git a/controller/ovn-controller.c b/controller/ovn-controller.c
> > index ea5e9df41..0c9b6a162 100644
> > --- a/controller/ovn-controller.c
> > +++ b/controller/ovn-controller.c
> > @@ -557,7 +557,7 @@ update_sb_db(struct ovsdb_idl *ovs_idl, struct
> ovsdb_idl *ovnsb_idl,
> >      }
> >      if (reset_ovnsb_idl_min_index && *reset_ovnsb_idl_min_index) {
> >          VLOG_INFO("Resetting southbound database cluster state");
> > -        engine_set_force_recompute(true);
> > +        engine_request_recompute(EN_RECOMPUTE_FULL);
> >          ovsdb_idl_reset_min_index(ovnsb_idl);
> >          *reset_ovnsb_idl_min_index = false;
> >      }
> > @@ -3066,7 +3066,7 @@ check_northd_version(struct ovsdb_idl *ovs_idl,
> struct ovsdb_idl *ovnsb_idl,
> >       * full recompute.
> >       */
> >      if (version_mismatch) {
> > -        engine_set_force_recompute(true);
> > +        engine_request_recompute(EN_RECOMPUTE_FULL);
> >      }
> >      version_mismatch = false;
> >      return true;
> > @@ -3524,7 +3524,7 @@ main(int argc, char *argv[])
> >          if (new_ovs_cond_seqno != ovs_cond_seqno) {
> >              if (!new_ovs_cond_seqno) {
> >                  VLOG_INFO("OVS IDL reconnected, force recompute.");
> > -                engine_set_force_recompute(true);
> > +                engine_request_recompute(EN_RECOMPUTE_FULL);
> >              }
> >              ovs_cond_seqno = new_ovs_cond_seqno;
> >          }
> > @@ -3542,7 +3542,7 @@ main(int argc, char *argv[])
> >          if (new_ovnsb_cond_seqno != ovnsb_cond_seqno) {
> >              if (!new_ovnsb_cond_seqno) {
> >                  VLOG_INFO("OVNSB IDL reconnected, force recompute.");
> > -                engine_set_force_recompute(true);
> > +                engine_request_recompute(EN_RECOMPUTE_FULL);
> >                  vif_plug_reset_idl_prime_counter();
> >              }
> >              ovnsb_cond_seqno = new_ovnsb_cond_seqno;
> > @@ -3620,7 +3620,7 @@ main(int argc, char *argv[])
> >                                             &br_int_dp->capabilities :
> NULL,
> >                                             br_int ? br_int->name :
> NULL)) {
> >                  VLOG_INFO("OVS feature set changed, force recompute.");
> > -                engine_set_force_recompute(true);
> > +                engine_request_recompute(EN_RECOMPUTE_FULL);
> >              }
> >
> >              if (br_int) {
> > @@ -3815,7 +3815,7 @@ main(int argc, char *argv[])
> >                  if (engine_need_run()) {
> >                      VLOG_DBG("engine did not run, force recompute next
> time: "
> >                               "br_int %p, chassis %p", br_int, chassis);
> > -                    engine_set_force_recompute(true);
> > +                    engine_request_recompute(EN_RECOMPUTE_PARTIAL);
> >                      poll_immediate_wake();
> >                  } else {
> >                      VLOG_DBG("engine did not run, and it was not needed"
> > @@ -3825,10 +3825,10 @@ main(int argc, char *argv[])
> >              } else if (engine_aborted()) {
> >                  VLOG_DBG("engine was aborted, force recompute next time:
> "
> >                           "br_int %p, chassis %p", br_int, chassis);
> > -                engine_set_force_recompute(true);
> > +                engine_request_recompute(EN_RECOMPUTE_PARTIAL);
> >                  poll_immediate_wake();
> >              } else {
> > -                engine_set_force_recompute(false);
> > +                engine_request_recompute(EN_RECOMPUTE_NONE);
> >              }
> >
> >              store_nb_cfg(ovnsb_idl_txn, ovs_idl_txn, chassis_private,
> > @@ -3882,7 +3882,7 @@ main(int argc, char *argv[])
> >
> >          if (!ovsdb_idl_loop_commit_and_wait(&ovnsb_idl_loop)) {
> >              VLOG_INFO("OVNSB commit failed, force recompute next time.");
> > -            engine_set_force_recompute(true);
> > +            engine_request_recompute(EN_RECOMPUTE_FULL);
> >          }
> >
> >          int ovs_txn_status =
> ovsdb_idl_loop_commit_and_wait(&ovs_idl_loop);
> > @@ -4202,7 +4202,7 @@ lflow_cache_flush_cmd(struct unixctl_conn *conn
> OVS_UNUSED,
> >      VLOG_INFO("User triggered lflow cache flush.");
> >      struct lflow_output_persistent_data *fo_pd = arg_;
> >      lflow_cache_flush(fo_pd->lflow_cache);
> > -    engine_set_force_recompute(true);
> > +    engine_request_recompute(EN_RECOMPUTE_FULL);
> >      poll_immediate_wake();
> >      unixctl_command_reply(conn, NULL);
> >  }
> > diff --git a/lib/inc-proc-eng.c b/lib/inc-proc-eng.c
> > index 7b4391700..3b044598c 100644
> > --- a/lib/inc-proc-eng.c
> > +++ b/lib/inc-proc-eng.c
> > @@ -33,7 +33,7 @@
> >
> >  VLOG_DEFINE_THIS_MODULE(inc_proc_eng);
> >
> > -static bool engine_force_recompute = false;
> > +static enum engine_recompute_request  recompute_request =
> EN_RECOMPUTE_NONE;
> >  static bool engine_run_aborted = false;
> >  static const struct engine_context *engine_context;
> >
> > @@ -47,6 +47,12 @@ static const char
> *engine_node_state_name[EN_STATE_MAX] = {
> >      [EN_ABORTED]   = "Aborted",
> >  };
> >
> > +static const char *engine_recompute_request_name[EN_RECOMPUTE_MAX] = {
> > +    [EN_RECOMPUTE_NONE]     = "None",
> > +    [EN_RECOMPUTE_PARTIAL]  = "Partial",
> > +    [EN_RECOMPUTE_FULL]     = "Full",
> > +};
> > +
> >  static long long engine_compute_log_timeout_msec = 500;
> >
> >  static void
> > @@ -54,9 +60,28 @@ engine_recompute(struct engine_node *node, bool
> allowed,
> >                   const char *reason_fmt, ...) OVS_PRINTF_FORMAT(3, 4);
> >
> >  void
> > -engine_set_force_recompute(bool val)
> > +engine_request_recompute(enum engine_recompute_request val)
> >  {
> > -    engine_force_recompute = val;
> > +    if (val == EN_RECOMPUTE_PARTIAL &&
> > +        recompute_request == EN_RECOMPUTE_FULL) {
> > +        /* pending EN_RECOMPUTE_FULL already requested. */
> > +        return;
> > +    }
> > +
> > +    /* EN_RECOMPUTE_FULL is allowed to overwrite EN_RECOMPUTE_PARTIAL. */
> > +    recompute_request = val;
> > +    VLOG_DBG("Requested recompute %s",
> engine_recompute_request_name[val]);
> > +
> > +    for (size_t i = 0; i < engine_n_nodes; i++) {
> > +        if (recompute_request == EN_RECOMPUTE_PARTIAL) {
> > +            if (engine_nodes[i]->state == EN_UPDATED ||
> > +                engine_nodes[i]->state == EN_ABORTED) {
> > +                engine_nodes[i]->pending_recompute = true;
> > +            }
> > +        } else {
> > +            engine_nodes[i]->pending_recompute = false;
> > +        }
> > +    }
> >  }
> >
> >  const struct engine_context *
> > @@ -428,6 +453,34 @@ engine_compute(struct engine_node *node, bool
> recompute_allowed)
> >      return true;
> >  }
> >
> > +static bool
> > +engine_run_partial(struct engine_node *node, bool recompute_allowed)
> > +{
> > +    /* Let's try to do a selective recompute on pending changes before
> > +     * performing a full recompute.
> > +     */
> > +    if (recompute_request != EN_RECOMPUTE_PARTIAL) {
> > +        return false;
> > +    }
> > +
> > +    if (node->pending_recompute) {
> > +        goto recompute;
> > +    }
> > +
> > +    for (size_t i = 0; i < node->n_inputs; i++) {
> > +        if (node->inputs[i].node->pending_recompute) {
> > +            goto recompute;
> > +        }
> > +    }
> > +    return false;
> > +
> > +recompute:
> > +    node->pending_recompute = false;
> > +    engine_recompute(node, recompute_allowed, "selective recompute on
> %s",
> > +                     node->name);
> > +    return true;
> > +}
> > +
> >  static void
> >  engine_run_node(struct engine_node *node, bool recompute_allowed)
> >  {
> > @@ -438,11 +491,15 @@ engine_run_node(struct engine_node *node, bool
> recompute_allowed)
> >          return;
> >      }
> >
> > -    if (engine_force_recompute) {
> > +    if (recompute_request == EN_RECOMPUTE_FULL) {
> >          engine_recompute(node, recompute_allowed, "forced");
> >          return;
> >      }
> >
> > +    if (engine_run_partial(node, recompute_allowed)) {
> > +        return;
> > +    }
> > +
> >      /* If any of the inputs updated data but there is no change_handler,
> then
> >       * recompute the current node too.
> >       */
> > @@ -495,9 +552,13 @@ engine_run(bool recompute_allowed)
> >          if (engine_nodes[i]->state == EN_ABORTED) {
> >              engine_nodes[i]->stats.abort++;
> >              engine_run_aborted = true;
> > -            return;
> > +            break;
> >          }
> >      }
> > +
> > +    if (recompute_request == EN_RECOMPUTE_PARTIAL) {
> > +        recompute_request = EN_RECOMPUTE_NONE;
> > +    }
> >  }
> >
> >  bool
> > @@ -524,6 +585,6 @@ void
> >  engine_trigger_recompute(void)
> >  {
> >      VLOG_INFO("User triggered force recompute.");
> > -    engine_set_force_recompute(true);
> > +    engine_request_recompute(EN_RECOMPUTE_FULL);
> >      poll_immediate_wake();
> >  }
> > diff --git a/lib/inc-proc-eng.h b/lib/inc-proc-eng.h
> > index 9bfab1f7c..7b734e666 100644
> > --- a/lib/inc-proc-eng.h
> > +++ b/lib/inc-proc-eng.h
> > @@ -115,6 +115,13 @@ enum engine_node_state {
> >      EN_STATE_MAX,
> >  };
> >
> > +enum engine_recompute_request {
> > +    EN_RECOMPUTE_NONE,      /* No recompute is necessary. */
> > +    EN_RECOMPUTE_PARTIAL,   /* Partial recompute on updated node
> requested. */
> > +    EN_RECOMPUTE_FULL,      /* Full recompute requested. */
> > +    EN_RECOMPUTE_MAX,
> > +};
> > +
> >  struct engine_stats {
> >      uint64_t recompute;
> >      uint64_t compute;
> > @@ -141,6 +148,9 @@ struct engine_node {
> >      /* State of the node after the last engine run. */
> >      enum engine_node_state state;
> >
> > +    /* This node needs to be processed by the upcoming partial
> recompute. */
> > +    bool pending_recompute;
> > +
> >      /* Method to allocate and initialize node data. It may be NULL.
> >       * The user supplied argument 'arg' is passed from the call to
> >       * engine_init().
> > @@ -216,7 +226,7 @@ void engine_add_input(struct engine_node *node,
> struct engine_node *input,
> >   * in circumstances when we are not sure there is change or not, or
> >   * when there is change but the engine couldn't be executed in that
> >   * iteration, and the change can't be tracked across iterations */
> > -void engine_set_force_recompute(bool val);
> > +void engine_request_recompute(enum engine_recompute_request val);
> >
> >  /* Return the current engine_context. The values in the context can be
> NULL
> >   * if the engine is run with allow_recompute == false in the current
> > diff --git a/northd/inc-proc-northd.c b/northd/inc-proc-northd.c
> > index af55221e3..63c7faa7d 100644
> > --- a/northd/inc-proc-northd.c
> > +++ b/northd/inc-proc-northd.c
> > @@ -258,7 +258,7 @@ void inc_proc_northd_run(struct ovsdb_idl_txn
> *ovnnb_txn,
> >       * force-recompute request if 'recompute' is false.
> >       */
> >      if (recompute) {
> > -        engine_set_force_recompute(recompute);
> > +        engine_request_recompute(EN_RECOMPUTE_FULL);
> >      }
> >
> >      struct engine_context eng_ctx = {
> > @@ -275,17 +275,17 @@ void inc_proc_northd_run(struct ovsdb_idl_txn
> *ovnnb_txn,
> >      if (!engine_has_run()) {
> >          if (engine_need_run()) {
> >              VLOG_DBG("engine did not run, force recompute next time.");
> > -            engine_set_force_recompute(true);
> > +            engine_request_recompute(EN_RECOMPUTE_FULL);
> >              poll_immediate_wake();
> >          } else {
> >              VLOG_DBG("engine did not run, and it was not needed");
> >          }
> >      } else if (engine_aborted()) {
> >          VLOG_DBG("engine was aborted, force recompute next time.");
> > -        engine_set_force_recompute(true);
> > +        engine_request_recompute(EN_RECOMPUTE_FULL);
> >          poll_immediate_wake();
> >      } else {
> > -        engine_set_force_recompute(false);
> > +        engine_request_recompute(EN_RECOMPUTE_NONE);
> >      }
> >  }
> >
> > --
> > 2.35.1
> >
_______________________________________________
dev mailing list
[email protected]
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to