> Hi Lorenzo,

Hi Mark,

first of all, thx for the review :)

> 
> I'm not opposed to allowing for multiple engines to exist, and this sort of
> change is one that I'd accept gladly since it allows for easier dependency
> injection for unit tests.

I personally think this patch is not strictly related to patch 2/3 and 3/3.
I added them because it has been requested by Han.
I think patch 1/3 is a nice to have in order to manage IP more like a library.
Anyway I fine to drop it if we agree on it.

Regards,
Lorenzo

> 
> That said, I don't think the follow-up patches justify this change. My
> comments in patch 2 will explain why I think this is not a good idea for
> handling meters.
> 
> On 1/14/22 13:01, Lorenzo Bianconi wrote:
> > Remove global state variable and move move inc-proc code in an isolated
> > strucuture. This is a preliminary patch to add the capability to run
> > multiple inc-proc engines.
> > 
> > Signed-off-by: Lorenzo Bianconi <[email protected]>
> > ---
> >   controller/ovn-controller.c |  65 ++++++-----
> >   lib/inc-proc-eng.c          | 226 +++++++++++++++++++++++-------------
> >   lib/inc-proc-eng.h          |  42 +++++--
> >   northd/en-lflow.c           |   2 +-
> >   northd/en-northd.c          |   2 +-
> >   northd/inc-proc-northd.c    |  30 ++---
> >   6 files changed, 231 insertions(+), 136 deletions(-)
> > 
> > diff --git a/controller/ovn-controller.c b/controller/ovn-controller.c
> > index 5069aedfc..86cb6f769 100644
> > --- a/controller/ovn-controller.c
> > +++ b/controller/ovn-controller.c
> > @@ -114,6 +114,8 @@ static unixctl_cb_func debug_delay_nb_cfg_report;
> >   #define OVS_NB_CFG_TS_NAME "ovn-nb-cfg-ts"
> >   #define OVS_STARTUP_TS_NAME "ovn-startup-ts"
> > +static struct engine *flow_engine;
> > +
> >   static char *parse_options(int argc, char *argv[]);
> >   OVS_NO_RETURN static void usage(void);
> > @@ -557,7 +559,7 @@ update_sb_db(struct ovsdb_idl *ovs_idl, struct 
> > ovsdb_idl *ovnsb_idl,
> >       }
> >       if (reset_ovnsb_idl_min_index && *reset_ovnsb_idl_min_index) {
> >           VLOG_INFO("Resetting southbound database cluster state");
> > -        engine_set_force_recompute(true);
> > +        engine_set_force_recompute(flow_engine, true);
> >           ovsdb_idl_reset_min_index(ovnsb_idl);
> >           *reset_ovnsb_idl_min_index = false;
> >       }
> > @@ -1011,7 +1013,8 @@ en_ofctrl_is_connected_cleanup(void *data OVS_UNUSED)
> >   static void
> >   en_ofctrl_is_connected_run(struct engine_node *node, void *data)
> >   {
> > -    struct controller_engine_ctx *ctrl_ctx = 
> > engine_get_context()->client_ctx;
> > +    struct controller_engine_ctx *ctrl_ctx =
> > +        engine_get_context(flow_engine)->client_ctx;
> >       struct ed_type_ofctrl_is_connected *of_data = data;
> >       if (of_data->connected != ofctrl_is_connected()) {
> >           of_data->connected = !of_data->connected;
> > @@ -1226,10 +1229,11 @@ init_binding_ctx(struct engine_node *node,
> >                   engine_get_input("SB_port_binding", node),
> >                   "datapath");
> > -    struct controller_engine_ctx *ctrl_ctx = 
> > engine_get_context()->client_ctx;
> > +    struct controller_engine_ctx *ctrl_ctx =
> > +        engine_get_context(flow_engine)->client_ctx;
> > -    b_ctx_in->ovnsb_idl_txn = engine_get_context()->ovnsb_idl_txn;
> > -    b_ctx_in->ovs_idl_txn = engine_get_context()->ovs_idl_txn;
> > +    b_ctx_in->ovnsb_idl_txn = 
> > engine_get_context(flow_engine)->ovnsb_idl_txn;
> > +    b_ctx_in->ovs_idl_txn = engine_get_context(flow_engine)->ovs_idl_txn;
> >       b_ctx_in->sbrec_datapath_binding_by_key = 
> > sbrec_datapath_binding_by_key;
> >       b_ctx_in->sbrec_port_binding_by_datapath = 
> > sbrec_port_binding_by_datapath;
> >       b_ctx_in->sbrec_port_binding_by_name = sbrec_port_binding_by_name;
> > @@ -2387,7 +2391,8 @@ en_lflow_output_run(struct engine_node *node, void 
> > *data)
> >           lflow_conj_ids_clear(&fo->conj_ids);
> >       }
> > -    struct controller_engine_ctx *ctrl_ctx = 
> > engine_get_context()->client_ctx;
> > +    struct controller_engine_ctx *ctrl_ctx =
> > +        engine_get_context(flow_engine)->client_ctx;
> >       fo->pd.lflow_cache = ctrl_ctx->lflow_cache;
> > @@ -3040,7 +3045,7 @@ check_northd_version(struct ovsdb_idl *ovs_idl, 
> > struct ovsdb_idl *ovnsb_idl,
> >        * full recompute.
> >        */
> >       if (version_mismatch) {
> > -        engine_set_force_recompute(true);
> > +        engine_set_force_recompute(flow_engine, true);
> >       }
> >       version_mismatch = false;
> >       return true;
> > @@ -3206,6 +3211,8 @@ main(int argc, char *argv[])
> >       stopwatch_create(BFD_RUN_STOPWATCH_NAME, SW_MS);
> >       stopwatch_create(VIF_PLUG_RUN_STOPWATCH_NAME, SW_MS);
> > +    engine_init_global();
> > +
> >       /* Define inc-proc-engine nodes. */
> >       ENGINE_NODE_WITH_CLEAR_TRACK_DATA_IS_VALID(ct_zones, "ct_zones");
> >       ENGINE_NODE_WITH_CLEAR_TRACK_DATA(runtime_data, "runtime_data");
> > @@ -3344,7 +3351,7 @@ main(int argc, char *argv[])
> >           .sb_idl = ovnsb_idl_loop.idl,
> >           .ovs_idl = ovs_idl_loop.idl,
> >       };
> > -    engine_init(&en_flow_output, &engine_arg);
> > +    flow_engine = engine_new(&en_flow_output, &engine_arg, "flow_engine");
> >       engine_ovsdb_node_add_index(&en_sb_chassis, "name", 
> > sbrec_chassis_by_name);
> >       engine_ovsdb_node_add_index(&en_sb_multicast_group, "name_datapath",
> > @@ -3396,7 +3403,7 @@ main(int argc, char *argv[])
> >       unixctl_command_register("recompute", "[deprecated]", 0, 0,
> >                                engine_recompute_cmd,
> > -                             NULL);
> > +                             flow_engine);
> >       unixctl_command_register("lflow-cache/flush", "", 0, 0,
> >                                lflow_cache_flush_cmd,
> >                                &lflow_output_data->pd);
> > @@ -3480,7 +3487,7 @@ main(int argc, char *argv[])
> >               goto loop_done;
> >           }
> > -        engine_init_run();
> > +        engine_init_run(flow_engine);
> >           struct ovsdb_idl_txn *ovs_idl_txn = 
> > ovsdb_idl_loop_run(&ovs_idl_loop);
> >           unsigned int new_ovs_cond_seqno
> > @@ -3488,7 +3495,7 @@ main(int argc, char *argv[])
> >           if (new_ovs_cond_seqno != ovs_cond_seqno) {
> >               if (!new_ovs_cond_seqno) {
> >                   VLOG_INFO("OVS IDL reconnected, force recompute.");
> > -                engine_set_force_recompute(true);
> > +                engine_set_force_recompute(flow_engine, true);
> >               }
> >               ovs_cond_seqno = new_ovs_cond_seqno;
> >           }
> > @@ -3506,7 +3513,7 @@ main(int argc, char *argv[])
> >           if (new_ovnsb_cond_seqno != ovnsb_cond_seqno) {
> >               if (!new_ovnsb_cond_seqno) {
> >                   VLOG_INFO("OVNSB IDL reconnected, force recompute.");
> > -                engine_set_force_recompute(true);
> > +                engine_set_force_recompute(flow_engine, true);
> >                   vif_plug_reset_idl_prime_counter();
> >               }
> >               ovnsb_cond_seqno = new_ovnsb_cond_seqno;
> > @@ -3518,7 +3525,7 @@ main(int argc, char *argv[])
> >               .client_ctx = &ctrl_engine_ctx
> >           };
> > -        engine_set_context(&eng_ctx);
> > +        engine_set_context(flow_engine, &eng_ctx);
> >           bool northd_version_match =
> >               check_northd_version(ovs_idl_loop.idl, ovnsb_idl_loop.idl,
> > @@ -3584,7 +3591,7 @@ main(int argc, char *argv[])
> >                                              &br_int_dp->capabilities : 
> > NULL,
> >                                              br_int ? br_int->name : NULL)) 
> > {
> >                   VLOG_INFO("OVS feature set changed, force recompute.");
> > -                engine_set_force_recompute(true);
> > +                engine_set_force_recompute(flow_engine, true);
> >               }
> >               if (br_int) {
> > @@ -3619,9 +3626,9 @@ main(int argc, char *argv[])
> >                                * this round of engine_run and continue 
> > processing
> >                                * acculated changes incrementally later when
> >                                * ofctrl_can_put() returns true. */
> > -                            engine_run(false);
> > +                            engine_run(flow_engine, false);
> >                           } else {
> > -                            engine_run(true);
> > +                            engine_run(flow_engine, true);
> >                           }
> >                       } else {
> >                           /* Even if there's no SB DB transaction available,
> > @@ -3630,7 +3637,7 @@ main(int argc, char *argv[])
> >                            * If a recompute is required, the engine will 
> > abort,
> >                            * triggerring a full run in the next iteration.
> >                            */
> > -                        engine_run(false);
> > +                        engine_run(flow_engine, false);
> >                       }
> >                       stopwatch_stop(CONTROLLER_LOOP_STOPWATCH_NAME,
> >                                      time_msec());
> > @@ -3775,24 +3782,24 @@ main(int argc, char *argv[])
> >               }
> > -            if (!engine_has_run()) {
> > -                if (engine_need_run()) {
> > +            if (!engine_has_run(flow_engine)) {
> > +                if (engine_need_run(flow_engine)) {
> >                       VLOG_DBG("engine did not run, force recompute next 
> > time: "
> >                                "br_int %p, chassis %p", br_int, chassis);
> > -                    engine_set_force_recompute(true);
> > +                    engine_set_force_recompute(flow_engine, true);
> >                       poll_immediate_wake();
> >                   } else {
> >                       VLOG_DBG("engine did not run, and it was not needed"
> >                                " either: br_int %p, chassis %p",
> >                                br_int, chassis);
> >                   }
> > -            } else if (engine_aborted()) {
> > +            } else if (engine_aborted(flow_engine)) {
> >                   VLOG_DBG("engine was aborted, force recompute next time: "
> >                            "br_int %p, chassis %p", br_int, chassis);
> > -                engine_set_force_recompute(true);
> > +                engine_set_force_recompute(flow_engine, true);
> >                   poll_immediate_wake();
> >               } else {
> > -                engine_set_force_recompute(false);
> > +                engine_set_force_recompute(flow_engine, false);
> >               }
> >               store_nb_cfg(ovnsb_idl_txn, ovs_idl_txn, chassis_private,
> > @@ -3846,7 +3853,7 @@ main(int argc, char *argv[])
> >           if (!ovsdb_idl_loop_commit_and_wait(&ovnsb_idl_loop)) {
> >               VLOG_INFO("OVNSB commit failed, force recompute next time.");
> > -            engine_set_force_recompute(true);
> > +            engine_set_force_recompute(flow_engine, true);
> >           }
> >           int ovs_txn_status = 
> > ovsdb_idl_loop_commit_and_wait(&ovs_idl_loop);
> > @@ -3896,8 +3903,8 @@ loop_done:
> >           }
> >       }
> > -    engine_set_context(NULL);
> > -    engine_cleanup();
> > +    engine_set_context(flow_engine, NULL);
> > +    engine_cleanup(flow_engine);
> >       /* It's time to exit.  Clean up the databases if we are not 
> > restarting */
> >       if (!restart) {
> > @@ -4152,9 +4159,9 @@ inject_pkt(struct unixctl_conn *conn, int argc 
> > OVS_UNUSED,
> >   static void
> >   engine_recompute_cmd(struct unixctl_conn *conn OVS_UNUSED, int argc 
> > OVS_UNUSED,
> > -                     const char *argv[] OVS_UNUSED, void *arg OVS_UNUSED)
> > +                     const char *argv[] OVS_UNUSED, void *arg)
> >   {
> > -    engine_trigger_recompute();
> > +    engine_trigger_recompute(arg);
> >       unixctl_command_reply(conn, NULL);
> >   }
> > @@ -4166,7 +4173,7 @@ lflow_cache_flush_cmd(struct unixctl_conn *conn 
> > OVS_UNUSED,
> >       VLOG_INFO("User triggered lflow cache flush.");
> >       struct lflow_output_persistent_data *fo_pd = arg_;
> >       lflow_cache_flush(fo_pd->lflow_cache);
> > -    engine_set_force_recompute(true);
> > +    engine_set_force_recompute(flow_engine, true);
> >       poll_immediate_wake();
> >       unixctl_command_reply(conn, NULL);
> >   }
> > diff --git a/lib/inc-proc-eng.c b/lib/inc-proc-eng.c
> > index 2958a55e3..a4798efb7 100644
> > --- a/lib/inc-proc-eng.c
> > +++ b/lib/inc-proc-eng.c
> > @@ -33,12 +33,7 @@
> >   VLOG_DEFINE_THIS_MODULE(inc_proc_eng);
> > -static bool engine_force_recompute = false;
> > -static bool engine_run_aborted = false;
> > -static const struct engine_context *engine_context;
> > -
> > -static struct engine_node **engine_nodes;
> > -static size_t engine_n_nodes;
> > +static struct ovs_list engines = OVS_LIST_INITIALIZER(&engines);
> >   static const char *engine_node_state_name[EN_STATE_MAX] = {
> >       [EN_STALE]     = "Stale",
> > @@ -52,21 +47,21 @@ engine_recompute(struct engine_node *node, bool allowed,
> >                    const char *reason_fmt, ...) OVS_PRINTF_FORMAT(3, 4);
> >   void
> > -engine_set_force_recompute(bool val)
> > +engine_set_force_recompute(struct engine *e, bool val)
> >   {
> > -    engine_force_recompute = val;
> > +    e->engine_force_recompute = val;
> >   }
> >   const struct engine_context *
> > -engine_get_context(void)
> > +engine_get_context(struct engine *e)
> >   {
> > -    return engine_context;
> > +    return e->engine_context;
> >   }
> >   void
> > -engine_set_context(const struct engine_context *ctx)
> > +engine_set_context(struct engine *e, const struct engine_context *ctx)
> >   {
> > -    engine_context = ctx;
> > +    e->engine_context = ctx;
> >   }
> >   /* Builds the topologically sorted 'sorted_nodes' array starting from
> > @@ -113,30 +108,53 @@ static void
> >   engine_clear_stats(struct unixctl_conn *conn, int argc OVS_UNUSED,
> >                      const char *argv[] OVS_UNUSED, void *arg OVS_UNUSED)
> >   {
> > -    for (size_t i = 0; i < engine_n_nodes; i++) {
> > -        struct engine_node *node = engine_nodes[i];
> > +    const char *target = argc == 2 ? argv[1] : NULL;
> > +    struct ds reply = DS_EMPTY_INITIALIZER;
> > +    struct engine *e;
> > +
> > +    ds_put_format(&reply, "no %s engine found", target ? target : "");
> > +    LIST_FOR_EACH (e, node, &engines) {
> > +        for (size_t i = 0; i < e->engine_n_nodes; i++) {
> > +            struct engine_node *node = e->engine_nodes[i];
> > -        memset(&node->stats, 0, sizeof node->stats);
> > +            if (target && strcmp(target, e->name)) {
> > +                continue;
> > +            }
> > +            memset(&node->stats, 0, sizeof node->stats);
> > +            ds_clear(&reply);
> > +        }
> >       }
> > -    unixctl_command_reply(conn, NULL);
> > +
> > +    unixctl_command_reply(conn, ds_cstr(&reply));
> > +    ds_destroy(&reply);
> >   }
> >   static void
> > -engine_dump_stats(struct unixctl_conn *conn, int argc OVS_UNUSED,
> > +engine_dump_stats(struct unixctl_conn *conn, int argc,
> >                     const char *argv[] OVS_UNUSED, void *arg OVS_UNUSED)
> >   {
> > +    const char *target = argc == 2 ? argv[1] : NULL;
> >       struct ds dump = DS_EMPTY_INITIALIZER;
> > +    struct engine *e;
> > -    for (size_t i = 0; i < engine_n_nodes; i++) {
> > -        struct engine_node *node = engine_nodes[i];
> > +    LIST_FOR_EACH (e, node, &engines) {
> > +        for (size_t i = 0; i < e->engine_n_nodes; i++) {
> > +            struct engine_node *node = e->engine_nodes[i];
> > -        ds_put_format(&dump,
> > -                      "Node: %s\n"
> > -                      "- recompute: %12"PRIu64"\n"
> > -                      "- compute:   %12"PRIu64"\n"
> > -                      "- abort:     %12"PRIu64"\n",
> > -                      node->name, node->stats.recompute,
> > -                      node->stats.compute, node->stats.abort);
> > +            if (target && strcmp(target, e->name)) {
> > +                continue;
> > +            }
> > +            ds_put_format(&dump,
> > +                          "Node: %s\n"
> > +                          "- recompute: %12"PRIu64"\n"
> > +                          "- compute:   %12"PRIu64"\n"
> > +                          "- abort:     %12"PRIu64"\n",
> > +                          node->name, node->stats.recompute,
> > +                          node->stats.compute, node->stats.abort);
> > +        }
> > +    }
> > +    if (ds_last(&dump) == EOF) {
> > +        ds_put_format(&dump, "no %s engine found", target ? target : "");
> >       }
> >       unixctl_command_reply(conn, ds_cstr(&dump));
> > @@ -148,48 +166,92 @@ engine_trigger_recompute_cmd(struct unixctl_conn 
> > *conn, int argc OVS_UNUSED,
> >                                const char *argv[] OVS_UNUSED,
> >                                void *arg OVS_UNUSED)
> >   {
> > -    engine_trigger_recompute();
> > -    unixctl_command_reply(conn, NULL);
> > +    const char *target = argc == 2 ? argv[1] : NULL;
> > +    struct ds reply = DS_EMPTY_INITIALIZER;
> > +    struct engine *e;
> > +
> > +    ds_put_format(&reply, "no %s engine found", target ? target : "");
> > +    LIST_FOR_EACH (e, node, &engines) {
> > +        if (target && strcmp(target, e->name)) {
> > +            continue;
> > +        }
> > +        engine_trigger_recompute(e);
> > +        ds_clear(&reply);
> > +    }
> > +
> > +    unixctl_command_reply(conn, ds_cstr(&reply));
> > +    ds_destroy(&reply);
> >   }
> > -void
> > -engine_init(struct engine_node *node, struct engine_arg *arg)
> > +static void
> > +engine_list_engines(struct unixctl_conn *conn, int argc OVS_UNUSED,
> > +                    const char *argv[] OVS_UNUSED,
> > +                    void *arg OVS_UNUSED)
> >   {
> > -    engine_nodes = engine_get_nodes(node, &engine_n_nodes);
> > +    struct ds reply = DS_EMPTY_INITIALIZER;
> > +    struct engine *e;
> > -    for (size_t i = 0; i < engine_n_nodes; i++) {
> > -        if (engine_nodes[i]->init) {
> > -            engine_nodes[i]->data =
> > -                engine_nodes[i]->init(engine_nodes[i], arg);
> > -        } else {
> > -            engine_nodes[i]->data = NULL;
> > -        }
> > +    LIST_FOR_EACH (e, node, &engines) {
> > +            ds_put_format(&reply, "%s\n", e->name);
> >       }
> > +    unixctl_command_reply(conn, ds_cstr(&reply));
> > +    ds_destroy(&reply);
> > +}
> > -    unixctl_command_register("inc-engine/show-stats", "", 0, 0,
> > +void
> > +engine_init_global(void)
> > +{
> > +    unixctl_command_register("inc-engine/show-stats", "[engine]", 0, 1,
> >                                engine_dump_stats, NULL);
> > -    unixctl_command_register("inc-engine/clear-stats", "", 0, 0,
> > +    unixctl_command_register("inc-engine/clear-stats", "[engine]", 0, 1,
> >                                engine_clear_stats, NULL);
> > -    unixctl_command_register("inc-engine/recompute", "", 0, 0,
> > +    unixctl_command_register("inc-engine/recompute", "[engine]", 0, 1,
> >                                engine_trigger_recompute_cmd, NULL);
> > +    unixctl_command_register("inc-engine/list-engines", "", 0, 0,
> > +                             engine_list_engines, NULL);
> > +}
> > +
> > +struct engine *
> > +engine_new(struct engine_node *node, struct engine_arg *arg,
> > +           const char *name)
> > +{
> > +    struct engine *e = xzalloc(sizeof *e);
> > +
> > +    e->engine_nodes = engine_get_nodes(node, &e->engine_n_nodes);
> > +    e->name = name;
> > +
> > +    for (size_t i = 0; i < e->engine_n_nodes; i++) {
> > +        if (e->engine_nodes[i]->init) {
> > +            e->engine_nodes[i]->data =
> > +                e->engine_nodes[i]->init(e->engine_nodes[i], arg);
> > +        } else {
> > +            e->engine_nodes[i]->data = NULL;
> > +        }
> > +        e->engine_nodes[i]->e = e;
> > +    }
> > +
> > +    ovs_list_push_back(&engines, &e->node);
> > +
> > +    return e;
> >   }
> >   void
> > -engine_cleanup(void)
> > +engine_cleanup(struct engine *e)
> >   {
> > -    for (size_t i = 0; i < engine_n_nodes; i++) {
> > -        if (engine_nodes[i]->clear_tracked_data) {
> > -            engine_nodes[i]->clear_tracked_data(engine_nodes[i]->data);
> > +    for (size_t i = 0; i < e->engine_n_nodes; i++) {
> > +        if (e->engine_nodes[i]->clear_tracked_data) {
> > +            e->engine_nodes[i]->clear_tracked_data(
> > +                    e->engine_nodes[i]->data);
> >           }
> > -        if (engine_nodes[i]->cleanup) {
> > -            engine_nodes[i]->cleanup(engine_nodes[i]->data);
> > +        if (e->engine_nodes[i]->cleanup) {
> > +            e->engine_nodes[i]->cleanup(e->engine_nodes[i]->data);
> >           }
> > -        free(engine_nodes[i]->data);
> > +        free(e->engine_nodes[i]->data);
> >       }
> > -    free(engine_nodes);
> > -    engine_nodes = NULL;
> > -    engine_n_nodes = 0;
> > +    ovs_list_remove(&e->node);
> > +    free(e->engine_nodes);
> > +    free(e);
> >   }
> >   struct engine_node *
> > @@ -284,10 +346,10 @@ engine_node_changed(struct engine_node *node)
> >   }
> >   bool
> > -engine_has_run(void)
> > +engine_has_run(struct engine *e)
> >   {
> > -    for (size_t i = 0; i < engine_n_nodes; i++) {
> > -        if (engine_nodes[i]->state != EN_STALE) {
> > +    for (size_t i = 0; i < e->engine_n_nodes; i++) {
> > +        if (e->engine_nodes[i]->state != EN_STALE) {
> >               return true;
> >           }
> >       }
> > @@ -295,9 +357,9 @@ engine_has_run(void)
> >   }
> >   bool
> > -engine_aborted(void)
> > +engine_aborted(struct engine *e)
> >   {
> > -    return engine_run_aborted;
> > +    return e->engine_run_aborted;
> >   }
> >   void *
> > @@ -316,14 +378,15 @@ engine_get_internal_data(struct engine_node *node)
> >   }
> >   void
> > -engine_init_run(void)
> > +engine_init_run(struct engine *e)
> >   {
> >       VLOG_DBG("Initializing new run");
> > -    for (size_t i = 0; i < engine_n_nodes; i++) {
> > -        engine_set_node_state(engine_nodes[i], EN_STALE);
> > +    for (size_t i = 0; i < e->engine_n_nodes; i++) {
> > +        engine_set_node_state(e->engine_nodes[i], EN_STALE);
> > -        if (engine_nodes[i]->clear_tracked_data) {
> > -            engine_nodes[i]->clear_tracked_data(engine_nodes[i]->data);
> > +        if (e->engine_nodes[i]->clear_tracked_data) {
> > +            e->engine_nodes[i]->clear_tracked_data(
> > +                    e->engine_nodes[i]->data);
> >           }
> >       }
> >   }
> > @@ -397,7 +460,8 @@ engine_compute(struct engine_node *node, bool 
> > recompute_allowed)
> >   }
> >   static void
> > -engine_run_node(struct engine_node *node, bool recompute_allowed)
> > +engine_run_node(struct engine *e, struct engine_node *node,
> > +                bool recompute_allowed)
> >   {
> >       if (!node->n_inputs) {
> >           /* Run the node handler which might change state. */
> > @@ -406,7 +470,7 @@ engine_run_node(struct engine_node *node, bool 
> > recompute_allowed)
> >           return;
> >       }
> > -    if (engine_force_recompute) {
> > +    if (e->engine_force_recompute) {
> >           engine_recompute(node, recompute_allowed, "forced");
> >           return;
> >       }
> > @@ -447,41 +511,41 @@ engine_run_node(struct engine_node *node, bool 
> > recompute_allowed)
> >   }
> >   void
> > -engine_run(bool recompute_allowed)
> > +engine_run(struct engine *e, bool recompute_allowed)
> >   {
> >       /* If the last run was aborted skip the incremental run because a
> >        * recompute is needed first.
> >        */
> > -    if (!recompute_allowed && engine_run_aborted) {
> > +    if (!recompute_allowed && e->engine_run_aborted) {
> >           return;
> >       }
> > -    engine_run_aborted = false;
> > -    for (size_t i = 0; i < engine_n_nodes; i++) {
> > -        engine_run_node(engine_nodes[i], recompute_allowed);
> > +    e->engine_run_aborted = false;
> > +    for (size_t i = 0; i < e->engine_n_nodes; i++) {
> > +        engine_run_node(e, e->engine_nodes[i], recompute_allowed);
> > -        if (engine_nodes[i]->state == EN_ABORTED) {
> > -            engine_nodes[i]->stats.abort++;
> > -            engine_run_aborted = true;
> > +        if (e->engine_nodes[i]->state == EN_ABORTED) {
> > +            e->engine_nodes[i]->stats.abort++;
> > +            e->engine_run_aborted = true;
> >               return;
> >           }
> >       }
> >   }
> >   bool
> > -engine_need_run(void)
> > +engine_need_run(struct engine *e)
> >   {
> > -    for (size_t i = 0; i < engine_n_nodes; i++) {
> > +    for (size_t i = 0; i < e->engine_n_nodes; i++) {
> >           /* Check only leaf nodes for updates. */
> > -        if (engine_nodes[i]->n_inputs) {
> > +        if (e->engine_nodes[i]->n_inputs) {
> >               continue;
> >           }
> > -        engine_nodes[i]->run(engine_nodes[i], engine_nodes[i]->data);
> > -        engine_nodes[i]->stats.recompute++;
> > -        VLOG_DBG("input node: %s, state: %s", engine_nodes[i]->name,
> > -                 engine_node_state_name[engine_nodes[i]->state]);
> > -        if (engine_nodes[i]->state == EN_UPDATED) {
> > +        e->engine_nodes[i]->run(e->engine_nodes[i], 
> > e->engine_nodes[i]->data);
> > +        e->engine_nodes[i]->stats.recompute++;
> > +        VLOG_DBG("input node: %s, state: %s", e->engine_nodes[i]->name,
> > +                 engine_node_state_name[e->engine_nodes[i]->state]);
> > +        if (e->engine_nodes[i]->state == EN_UPDATED) {
> >               return true;
> >           }
> >       }
> > @@ -489,9 +553,9 @@ engine_need_run(void)
> >   }
> >   void
> > -engine_trigger_recompute(void)
> > +engine_trigger_recompute(struct engine *e)
> >   {
> >       VLOG_INFO("User triggered force recompute.");
> > -    engine_set_force_recompute(true);
> > +    engine_set_force_recompute(e, true);
> >       poll_immediate_wake();
> >   }
> > diff --git a/lib/inc-proc-eng.h b/lib/inc-proc-eng.h
> > index 9bfab1f7c..881242138 100644
> > --- a/lib/inc-proc-eng.h
> > +++ b/lib/inc-proc-eng.h
> > @@ -67,6 +67,7 @@
> >   #include <stdint.h>
> >   #include "compiler.h"
> > +#include "openvswitch/list.h"
> >   struct engine_context {
> >       struct ovsdb_idl_txn *ovs_idl_txn;
> > @@ -122,6 +123,8 @@ struct engine_stats {
> >   };
> >   struct engine_node {
> > +    struct engine *e;
> > +
> >       /* A unique name for each node. */
> >       char *name;
> > @@ -173,30 +176,47 @@ struct engine_node {
> >       struct engine_stats stats;
> >   };
> > +struct engine {
> > +    struct ovs_list node;
> > +
> > +    const char *name;
> > +
> > +    struct engine_node **engine_nodes;
> > +    size_t engine_n_nodes;
> > +
> > +    bool engine_force_recompute;
> > +    bool engine_run_aborted;
> > +
> > +    const struct engine_context *engine_context;
> > +};
> > +
> > +void engine_init_global(void);
> > +
> >   /* Initialize the data for the engine nodes. It calls each node's
> >    * init() method if not NULL passing the user supplied 'arg'.
> >    * It should be called before the main loop. */
> > -void engine_init(struct engine_node *node, struct engine_arg *arg);
> > +struct engine *engine_new(struct engine_node *node, struct engine_arg *arg,
> > +                          const char *name);
> >   /* Initialize the engine nodes for a new run. It should be called in the
> >    * main processing loop before every potential engine_run().
> >    */
> > -void engine_init_run(void);
> > +void engine_init_run(struct engine *e);
> >   /* Execute the processing, which should be called in the main loop.
> >    * Updates the engine node's states accordingly. If 'recompute_allowed' is
> >    * false and a recompute is required by the current engine run then the 
> > engine
> >    * aborts.
> >    */
> > -void engine_run(bool recompute_allowed);
> > +void engine_run(struct engine *e, bool recompute_allowed);
> >   /* Clean up the data for the engine nodes. It calls each node's
> >    * cleanup() method if not NULL. It should be called before the program
> >    * terminates. */
> > -void engine_cleanup(void);
> > +void engine_cleanup(struct engine *e);
> >   /* Check if engine needs to run but didn't. */
> > -bool engine_need_run(void);
> > +bool engine_need_run(struct engine *e);
> >   /* Get the input node with <name> for <node> */
> >   struct engine_node * engine_get_input(const char *input_name,
> > @@ -216,7 +236,7 @@ void engine_add_input(struct engine_node *node, struct 
> > engine_node *input,
> >    * in circumstances when we are not sure there is change or not, or
> >    * when there is change but the engine couldn't be executed in that
> >    * iteration, and the change can't be tracked across iterations */
> > -void engine_set_force_recompute(bool val);
> > +void engine_set_force_recompute(struct engine *e, bool val);
> >   /* Return the current engine_context. The values in the context can be 
> > NULL
> >    * if the engine is run with allow_recompute == false in the current
> > @@ -224,9 +244,9 @@ void engine_set_force_recompute(bool val);
> >    * Therefore, it is the responsibility of the caller to check the context
> >    * values when called from change handlers.
> >    */
> > -const struct engine_context *engine_get_context(void);
> > +const struct engine_context *engine_get_context(struct engine *e);
> > -void engine_set_context(const struct engine_context *);
> > +void engine_set_context(struct engine *e, const struct engine_context *);
> >   void engine_set_node_state_at(struct engine_node *node,
> >                                 enum engine_node_state state,
> > @@ -236,10 +256,10 @@ void engine_set_node_state_at(struct engine_node 
> > *node,
> >   bool engine_node_changed(struct engine_node *node);
> >   /* Return true if the engine has run in the last iteration. */
> > -bool engine_has_run(void);
> > +bool engine_has_run(struct engine *e);
> >   /* Returns true if during the last engine run we had to abort processing. 
> > */
> > -bool engine_aborted(void);
> > +bool engine_aborted(struct engine *e);
> >   /* Return a pointer to node data accessible for users outside the 
> > processing
> >    * engine. If the node data is not valid (e.g., last engine_run() failed 
> > or
> > @@ -265,7 +285,7 @@ void *engine_get_internal_data(struct engine_node 
> > *node);
> >       engine_set_node_state_at(node, state, OVS_SOURCE_LOCATOR)
> >   /* Trigger a full recompute. */
> > -void engine_trigger_recompute(void);
> > +void engine_trigger_recompute(struct engine *e);
> >   struct ed_ovsdb_index {
> >       const char *name;
> > diff --git a/northd/en-lflow.c b/northd/en-lflow.c
> > index ffbdaf4e8..5451e0551 100644
> > --- a/northd/en-lflow.c
> > +++ b/northd/en-lflow.c
> > @@ -32,7 +32,7 @@ VLOG_DEFINE_THIS_MODULE(en_lflow);
> >   void en_lflow_run(struct engine_node *node, void *data OVS_UNUSED)
> >   {
> > -    const struct engine_context *eng_ctx = engine_get_context();
> > +    const struct engine_context *eng_ctx = engine_get_context(node->e);
> >       struct lflow_input lflow_input;
> > diff --git a/northd/en-northd.c b/northd/en-northd.c
> > index 79da7e1c4..064f9d93a 100644
> > --- a/northd/en-northd.c
> > +++ b/northd/en-northd.c
> > @@ -32,7 +32,7 @@ VLOG_DEFINE_THIS_MODULE(en_northd);
> >   void en_northd_run(struct engine_node *node, void *data)
> >   {
> > -    const struct engine_context *eng_ctx = engine_get_context();
> > +    const struct engine_context *eng_ctx = engine_get_context(node->e);
> >       struct northd_input input_data;
> > diff --git a/northd/inc-proc-northd.c b/northd/inc-proc-northd.c
> > index af55221e3..049fe226a 100644
> > --- a/northd/inc-proc-northd.c
> > +++ b/northd/inc-proc-northd.c
> > @@ -33,6 +33,8 @@
> >   VLOG_DEFINE_THIS_MODULE(inc_proc_northd);
> > +static struct engine *flow_engine;
> > +
> >   #define NB_NODES \
> >       NB_NODE(nb_global, "nb_global") \
> >       NB_NODE(copp, "copp") \
> > @@ -150,6 +152,8 @@ static ENGINE_NODE(lflow, "lflow");
> >   void inc_proc_northd_init(struct ovsdb_idl_loop *nb,
> >                             struct ovsdb_idl_loop *sb)
> >   {
> > +    engine_init_global();
> > +
> >       /* Define relationships between nodes where first argument is 
> > dependent
> >        * on the second argument */
> >       engine_add_input(&en_northd, &en_nb_nb_global, NULL);
> > @@ -229,7 +233,7 @@ void inc_proc_northd_init(struct ovsdb_idl_loop *nb,
> >       struct ovsdb_idl_index *sbrec_chassis_by_hostname =
> >           chassis_hostname_index_create(sb->idl);
> > -    engine_init(&en_lflow, &engine_arg);
> > +    flow_engine = engine_new(&en_lflow, &engine_arg, "flow_engine");
> >       engine_ovsdb_node_add_index(&en_sb_chassis,
> >                                   "sbrec_chassis_by_name",
> > @@ -251,14 +255,14 @@ void inc_proc_northd_init(struct ovsdb_idl_loop *nb,
> >   void inc_proc_northd_run(struct ovsdb_idl_txn *ovnnb_txn,
> >                            struct ovsdb_idl_txn *ovnsb_txn,
> >                            bool recompute) {
> > -    engine_init_run();
> > +    engine_init_run(flow_engine);
> >       /* Force a full recompute if instructed to, for example, after a NB/SB
> >        * reconnect event.  However, make sure we don't overwrite an existing
> >        * force-recompute request if 'recompute' is false.
> >        */
> >       if (recompute) {
> > -        engine_set_force_recompute(recompute);
> > +        engine_set_force_recompute(flow_engine, recompute);
> >       }
> >       struct engine_context eng_ctx = {
> > @@ -266,31 +270,31 @@ void inc_proc_northd_run(struct ovsdb_idl_txn 
> > *ovnnb_txn,
> >           .ovnsb_idl_txn = ovnsb_txn,
> >       };
> > -    engine_set_context(&eng_ctx);
> > +    engine_set_context(flow_engine, &eng_ctx);
> >       if (ovnnb_txn && ovnsb_txn) {
> > -        engine_run(true);
> > +        engine_run(flow_engine, true);
> >       }
> > -    if (!engine_has_run()) {
> > -        if (engine_need_run()) {
> > +    if (!engine_has_run(flow_engine)) {
> > +        if (engine_need_run(flow_engine)) {
> >               VLOG_DBG("engine did not run, force recompute next time.");
> > -            engine_set_force_recompute(true);
> > +            engine_set_force_recompute(flow_engine, true);
> >               poll_immediate_wake();
> >           } else {
> >               VLOG_DBG("engine did not run, and it was not needed");
> >           }
> > -    } else if (engine_aborted()) {
> > +    } else if (engine_aborted(flow_engine)) {
> >           VLOG_DBG("engine was aborted, force recompute next time.");
> > -        engine_set_force_recompute(true);
> > +        engine_set_force_recompute(flow_engine, true);
> >           poll_immediate_wake();
> >       } else {
> > -        engine_set_force_recompute(false);
> > +        engine_set_force_recompute(flow_engine, false);
> >       }
> >   }
> >   void inc_proc_northd_cleanup(void)
> >   {
> > -    engine_cleanup();
> > -    engine_set_context(NULL);
> > +    engine_set_context(flow_engine, NULL);
> > +    engine_cleanup(flow_engine);
> >   }
> > 
> 
_______________________________________________
dev mailing list
[email protected]
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to