> Hi Lorenzo, Hi Mark,
first of all, thx for the review :) > > I'm not opposed to allowing for multiple engines to exist, and this sort of > change is one that I'd accept gladly since it allows for easier dependency > injection for unit tests. I personally think this patch is not strictly related to patch 2/3 and 3/3. I added them because it has been requested by Han. I think patch 1/3 is a nice to have in order to manage IP more like a library. Anyway I fine to drop it if we agree on it. Regards, Lorenzo > > That said, I don't think the follow-up patches justify this change. My > comments in patch 2 will explain why I think this is not a good idea for > handling meters. > > On 1/14/22 13:01, Lorenzo Bianconi wrote: > > Remove global state variable and move move inc-proc code in an isolated > > strucuture. This is a preliminary patch to add the capability to run > > multiple inc-proc engines. > > > > Signed-off-by: Lorenzo Bianconi <[email protected]> > > --- > > controller/ovn-controller.c | 65 ++++++----- > > lib/inc-proc-eng.c | 226 +++++++++++++++++++++++------------- > > lib/inc-proc-eng.h | 42 +++++-- > > northd/en-lflow.c | 2 +- > > northd/en-northd.c | 2 +- > > northd/inc-proc-northd.c | 30 ++--- > > 6 files changed, 231 insertions(+), 136 deletions(-) > > > > diff --git a/controller/ovn-controller.c b/controller/ovn-controller.c > > index 5069aedfc..86cb6f769 100644 > > --- a/controller/ovn-controller.c > > +++ b/controller/ovn-controller.c > > @@ -114,6 +114,8 @@ static unixctl_cb_func debug_delay_nb_cfg_report; > > #define OVS_NB_CFG_TS_NAME "ovn-nb-cfg-ts" > > #define OVS_STARTUP_TS_NAME "ovn-startup-ts" > > +static struct engine *flow_engine; > > + > > static char *parse_options(int argc, char *argv[]); > > OVS_NO_RETURN static void usage(void); > > @@ -557,7 +559,7 @@ update_sb_db(struct ovsdb_idl *ovs_idl, struct > > ovsdb_idl *ovnsb_idl, > > } > > if (reset_ovnsb_idl_min_index && *reset_ovnsb_idl_min_index) { > > VLOG_INFO("Resetting southbound database cluster state"); > > - engine_set_force_recompute(true); > > + engine_set_force_recompute(flow_engine, true); > > ovsdb_idl_reset_min_index(ovnsb_idl); > > *reset_ovnsb_idl_min_index = false; > > } > > @@ -1011,7 +1013,8 @@ en_ofctrl_is_connected_cleanup(void *data OVS_UNUSED) > > static void > > en_ofctrl_is_connected_run(struct engine_node *node, void *data) > > { > > - struct controller_engine_ctx *ctrl_ctx = > > engine_get_context()->client_ctx; > > + struct controller_engine_ctx *ctrl_ctx = > > + engine_get_context(flow_engine)->client_ctx; > > struct ed_type_ofctrl_is_connected *of_data = data; > > if (of_data->connected != ofctrl_is_connected()) { > > of_data->connected = !of_data->connected; > > @@ -1226,10 +1229,11 @@ init_binding_ctx(struct engine_node *node, > > engine_get_input("SB_port_binding", node), > > "datapath"); > > - struct controller_engine_ctx *ctrl_ctx = > > engine_get_context()->client_ctx; > > + struct controller_engine_ctx *ctrl_ctx = > > + engine_get_context(flow_engine)->client_ctx; > > - b_ctx_in->ovnsb_idl_txn = engine_get_context()->ovnsb_idl_txn; > > - b_ctx_in->ovs_idl_txn = engine_get_context()->ovs_idl_txn; > > + b_ctx_in->ovnsb_idl_txn = > > engine_get_context(flow_engine)->ovnsb_idl_txn; > > + b_ctx_in->ovs_idl_txn = engine_get_context(flow_engine)->ovs_idl_txn; > > b_ctx_in->sbrec_datapath_binding_by_key = > > sbrec_datapath_binding_by_key; > > b_ctx_in->sbrec_port_binding_by_datapath = > > sbrec_port_binding_by_datapath; > > b_ctx_in->sbrec_port_binding_by_name = sbrec_port_binding_by_name; > > @@ -2387,7 +2391,8 @@ en_lflow_output_run(struct engine_node *node, void > > *data) > > lflow_conj_ids_clear(&fo->conj_ids); > > } > > - struct controller_engine_ctx *ctrl_ctx = > > engine_get_context()->client_ctx; > > + struct controller_engine_ctx *ctrl_ctx = > > + engine_get_context(flow_engine)->client_ctx; > > fo->pd.lflow_cache = ctrl_ctx->lflow_cache; > > @@ -3040,7 +3045,7 @@ check_northd_version(struct ovsdb_idl *ovs_idl, > > struct ovsdb_idl *ovnsb_idl, > > * full recompute. > > */ > > if (version_mismatch) { > > - engine_set_force_recompute(true); > > + engine_set_force_recompute(flow_engine, true); > > } > > version_mismatch = false; > > return true; > > @@ -3206,6 +3211,8 @@ main(int argc, char *argv[]) > > stopwatch_create(BFD_RUN_STOPWATCH_NAME, SW_MS); > > stopwatch_create(VIF_PLUG_RUN_STOPWATCH_NAME, SW_MS); > > + engine_init_global(); > > + > > /* Define inc-proc-engine nodes. */ > > ENGINE_NODE_WITH_CLEAR_TRACK_DATA_IS_VALID(ct_zones, "ct_zones"); > > ENGINE_NODE_WITH_CLEAR_TRACK_DATA(runtime_data, "runtime_data"); > > @@ -3344,7 +3351,7 @@ main(int argc, char *argv[]) > > .sb_idl = ovnsb_idl_loop.idl, > > .ovs_idl = ovs_idl_loop.idl, > > }; > > - engine_init(&en_flow_output, &engine_arg); > > + flow_engine = engine_new(&en_flow_output, &engine_arg, "flow_engine"); > > engine_ovsdb_node_add_index(&en_sb_chassis, "name", > > sbrec_chassis_by_name); > > engine_ovsdb_node_add_index(&en_sb_multicast_group, "name_datapath", > > @@ -3396,7 +3403,7 @@ main(int argc, char *argv[]) > > unixctl_command_register("recompute", "[deprecated]", 0, 0, > > engine_recompute_cmd, > > - NULL); > > + flow_engine); > > unixctl_command_register("lflow-cache/flush", "", 0, 0, > > lflow_cache_flush_cmd, > > &lflow_output_data->pd); > > @@ -3480,7 +3487,7 @@ main(int argc, char *argv[]) > > goto loop_done; > > } > > - engine_init_run(); > > + engine_init_run(flow_engine); > > struct ovsdb_idl_txn *ovs_idl_txn = > > ovsdb_idl_loop_run(&ovs_idl_loop); > > unsigned int new_ovs_cond_seqno > > @@ -3488,7 +3495,7 @@ main(int argc, char *argv[]) > > if (new_ovs_cond_seqno != ovs_cond_seqno) { > > if (!new_ovs_cond_seqno) { > > VLOG_INFO("OVS IDL reconnected, force recompute."); > > - engine_set_force_recompute(true); > > + engine_set_force_recompute(flow_engine, true); > > } > > ovs_cond_seqno = new_ovs_cond_seqno; > > } > > @@ -3506,7 +3513,7 @@ main(int argc, char *argv[]) > > if (new_ovnsb_cond_seqno != ovnsb_cond_seqno) { > > if (!new_ovnsb_cond_seqno) { > > VLOG_INFO("OVNSB IDL reconnected, force recompute."); > > - engine_set_force_recompute(true); > > + engine_set_force_recompute(flow_engine, true); > > vif_plug_reset_idl_prime_counter(); > > } > > ovnsb_cond_seqno = new_ovnsb_cond_seqno; > > @@ -3518,7 +3525,7 @@ main(int argc, char *argv[]) > > .client_ctx = &ctrl_engine_ctx > > }; > > - engine_set_context(&eng_ctx); > > + engine_set_context(flow_engine, &eng_ctx); > > bool northd_version_match = > > check_northd_version(ovs_idl_loop.idl, ovnsb_idl_loop.idl, > > @@ -3584,7 +3591,7 @@ main(int argc, char *argv[]) > > &br_int_dp->capabilities : > > NULL, > > br_int ? br_int->name : NULL)) > > { > > VLOG_INFO("OVS feature set changed, force recompute."); > > - engine_set_force_recompute(true); > > + engine_set_force_recompute(flow_engine, true); > > } > > if (br_int) { > > @@ -3619,9 +3626,9 @@ main(int argc, char *argv[]) > > * this round of engine_run and continue > > processing > > * acculated changes incrementally later when > > * ofctrl_can_put() returns true. */ > > - engine_run(false); > > + engine_run(flow_engine, false); > > } else { > > - engine_run(true); > > + engine_run(flow_engine, true); > > } > > } else { > > /* Even if there's no SB DB transaction available, > > @@ -3630,7 +3637,7 @@ main(int argc, char *argv[]) > > * If a recompute is required, the engine will > > abort, > > * triggerring a full run in the next iteration. > > */ > > - engine_run(false); > > + engine_run(flow_engine, false); > > } > > stopwatch_stop(CONTROLLER_LOOP_STOPWATCH_NAME, > > time_msec()); > > @@ -3775,24 +3782,24 @@ main(int argc, char *argv[]) > > } > > - if (!engine_has_run()) { > > - if (engine_need_run()) { > > + if (!engine_has_run(flow_engine)) { > > + if (engine_need_run(flow_engine)) { > > VLOG_DBG("engine did not run, force recompute next > > time: " > > "br_int %p, chassis %p", br_int, chassis); > > - engine_set_force_recompute(true); > > + engine_set_force_recompute(flow_engine, true); > > poll_immediate_wake(); > > } else { > > VLOG_DBG("engine did not run, and it was not needed" > > " either: br_int %p, chassis %p", > > br_int, chassis); > > } > > - } else if (engine_aborted()) { > > + } else if (engine_aborted(flow_engine)) { > > VLOG_DBG("engine was aborted, force recompute next time: " > > "br_int %p, chassis %p", br_int, chassis); > > - engine_set_force_recompute(true); > > + engine_set_force_recompute(flow_engine, true); > > poll_immediate_wake(); > > } else { > > - engine_set_force_recompute(false); > > + engine_set_force_recompute(flow_engine, false); > > } > > store_nb_cfg(ovnsb_idl_txn, ovs_idl_txn, chassis_private, > > @@ -3846,7 +3853,7 @@ main(int argc, char *argv[]) > > if (!ovsdb_idl_loop_commit_and_wait(&ovnsb_idl_loop)) { > > VLOG_INFO("OVNSB commit failed, force recompute next time."); > > - engine_set_force_recompute(true); > > + engine_set_force_recompute(flow_engine, true); > > } > > int ovs_txn_status = > > ovsdb_idl_loop_commit_and_wait(&ovs_idl_loop); > > @@ -3896,8 +3903,8 @@ loop_done: > > } > > } > > - engine_set_context(NULL); > > - engine_cleanup(); > > + engine_set_context(flow_engine, NULL); > > + engine_cleanup(flow_engine); > > /* It's time to exit. Clean up the databases if we are not > > restarting */ > > if (!restart) { > > @@ -4152,9 +4159,9 @@ inject_pkt(struct unixctl_conn *conn, int argc > > OVS_UNUSED, > > static void > > engine_recompute_cmd(struct unixctl_conn *conn OVS_UNUSED, int argc > > OVS_UNUSED, > > - const char *argv[] OVS_UNUSED, void *arg OVS_UNUSED) > > + const char *argv[] OVS_UNUSED, void *arg) > > { > > - engine_trigger_recompute(); > > + engine_trigger_recompute(arg); > > unixctl_command_reply(conn, NULL); > > } > > @@ -4166,7 +4173,7 @@ lflow_cache_flush_cmd(struct unixctl_conn *conn > > OVS_UNUSED, > > VLOG_INFO("User triggered lflow cache flush."); > > struct lflow_output_persistent_data *fo_pd = arg_; > > lflow_cache_flush(fo_pd->lflow_cache); > > - engine_set_force_recompute(true); > > + engine_set_force_recompute(flow_engine, true); > > poll_immediate_wake(); > > unixctl_command_reply(conn, NULL); > > } > > diff --git a/lib/inc-proc-eng.c b/lib/inc-proc-eng.c > > index 2958a55e3..a4798efb7 100644 > > --- a/lib/inc-proc-eng.c > > +++ b/lib/inc-proc-eng.c > > @@ -33,12 +33,7 @@ > > VLOG_DEFINE_THIS_MODULE(inc_proc_eng); > > -static bool engine_force_recompute = false; > > -static bool engine_run_aborted = false; > > -static const struct engine_context *engine_context; > > - > > -static struct engine_node **engine_nodes; > > -static size_t engine_n_nodes; > > +static struct ovs_list engines = OVS_LIST_INITIALIZER(&engines); > > static const char *engine_node_state_name[EN_STATE_MAX] = { > > [EN_STALE] = "Stale", > > @@ -52,21 +47,21 @@ engine_recompute(struct engine_node *node, bool allowed, > > const char *reason_fmt, ...) OVS_PRINTF_FORMAT(3, 4); > > void > > -engine_set_force_recompute(bool val) > > +engine_set_force_recompute(struct engine *e, bool val) > > { > > - engine_force_recompute = val; > > + e->engine_force_recompute = val; > > } > > const struct engine_context * > > -engine_get_context(void) > > +engine_get_context(struct engine *e) > > { > > - return engine_context; > > + return e->engine_context; > > } > > void > > -engine_set_context(const struct engine_context *ctx) > > +engine_set_context(struct engine *e, const struct engine_context *ctx) > > { > > - engine_context = ctx; > > + e->engine_context = ctx; > > } > > /* Builds the topologically sorted 'sorted_nodes' array starting from > > @@ -113,30 +108,53 @@ static void > > engine_clear_stats(struct unixctl_conn *conn, int argc OVS_UNUSED, > > const char *argv[] OVS_UNUSED, void *arg OVS_UNUSED) > > { > > - for (size_t i = 0; i < engine_n_nodes; i++) { > > - struct engine_node *node = engine_nodes[i]; > > + const char *target = argc == 2 ? argv[1] : NULL; > > + struct ds reply = DS_EMPTY_INITIALIZER; > > + struct engine *e; > > + > > + ds_put_format(&reply, "no %s engine found", target ? target : ""); > > + LIST_FOR_EACH (e, node, &engines) { > > + for (size_t i = 0; i < e->engine_n_nodes; i++) { > > + struct engine_node *node = e->engine_nodes[i]; > > - memset(&node->stats, 0, sizeof node->stats); > > + if (target && strcmp(target, e->name)) { > > + continue; > > + } > > + memset(&node->stats, 0, sizeof node->stats); > > + ds_clear(&reply); > > + } > > } > > - unixctl_command_reply(conn, NULL); > > + > > + unixctl_command_reply(conn, ds_cstr(&reply)); > > + ds_destroy(&reply); > > } > > static void > > -engine_dump_stats(struct unixctl_conn *conn, int argc OVS_UNUSED, > > +engine_dump_stats(struct unixctl_conn *conn, int argc, > > const char *argv[] OVS_UNUSED, void *arg OVS_UNUSED) > > { > > + const char *target = argc == 2 ? argv[1] : NULL; > > struct ds dump = DS_EMPTY_INITIALIZER; > > + struct engine *e; > > - for (size_t i = 0; i < engine_n_nodes; i++) { > > - struct engine_node *node = engine_nodes[i]; > > + LIST_FOR_EACH (e, node, &engines) { > > + for (size_t i = 0; i < e->engine_n_nodes; i++) { > > + struct engine_node *node = e->engine_nodes[i]; > > - ds_put_format(&dump, > > - "Node: %s\n" > > - "- recompute: %12"PRIu64"\n" > > - "- compute: %12"PRIu64"\n" > > - "- abort: %12"PRIu64"\n", > > - node->name, node->stats.recompute, > > - node->stats.compute, node->stats.abort); > > + if (target && strcmp(target, e->name)) { > > + continue; > > + } > > + ds_put_format(&dump, > > + "Node: %s\n" > > + "- recompute: %12"PRIu64"\n" > > + "- compute: %12"PRIu64"\n" > > + "- abort: %12"PRIu64"\n", > > + node->name, node->stats.recompute, > > + node->stats.compute, node->stats.abort); > > + } > > + } > > + if (ds_last(&dump) == EOF) { > > + ds_put_format(&dump, "no %s engine found", target ? target : ""); > > } > > unixctl_command_reply(conn, ds_cstr(&dump)); > > @@ -148,48 +166,92 @@ engine_trigger_recompute_cmd(struct unixctl_conn > > *conn, int argc OVS_UNUSED, > > const char *argv[] OVS_UNUSED, > > void *arg OVS_UNUSED) > > { > > - engine_trigger_recompute(); > > - unixctl_command_reply(conn, NULL); > > + const char *target = argc == 2 ? argv[1] : NULL; > > + struct ds reply = DS_EMPTY_INITIALIZER; > > + struct engine *e; > > + > > + ds_put_format(&reply, "no %s engine found", target ? target : ""); > > + LIST_FOR_EACH (e, node, &engines) { > > + if (target && strcmp(target, e->name)) { > > + continue; > > + } > > + engine_trigger_recompute(e); > > + ds_clear(&reply); > > + } > > + > > + unixctl_command_reply(conn, ds_cstr(&reply)); > > + ds_destroy(&reply); > > } > > -void > > -engine_init(struct engine_node *node, struct engine_arg *arg) > > +static void > > +engine_list_engines(struct unixctl_conn *conn, int argc OVS_UNUSED, > > + const char *argv[] OVS_UNUSED, > > + void *arg OVS_UNUSED) > > { > > - engine_nodes = engine_get_nodes(node, &engine_n_nodes); > > + struct ds reply = DS_EMPTY_INITIALIZER; > > + struct engine *e; > > - for (size_t i = 0; i < engine_n_nodes; i++) { > > - if (engine_nodes[i]->init) { > > - engine_nodes[i]->data = > > - engine_nodes[i]->init(engine_nodes[i], arg); > > - } else { > > - engine_nodes[i]->data = NULL; > > - } > > + LIST_FOR_EACH (e, node, &engines) { > > + ds_put_format(&reply, "%s\n", e->name); > > } > > + unixctl_command_reply(conn, ds_cstr(&reply)); > > + ds_destroy(&reply); > > +} > > - unixctl_command_register("inc-engine/show-stats", "", 0, 0, > > +void > > +engine_init_global(void) > > +{ > > + unixctl_command_register("inc-engine/show-stats", "[engine]", 0, 1, > > engine_dump_stats, NULL); > > - unixctl_command_register("inc-engine/clear-stats", "", 0, 0, > > + unixctl_command_register("inc-engine/clear-stats", "[engine]", 0, 1, > > engine_clear_stats, NULL); > > - unixctl_command_register("inc-engine/recompute", "", 0, 0, > > + unixctl_command_register("inc-engine/recompute", "[engine]", 0, 1, > > engine_trigger_recompute_cmd, NULL); > > + unixctl_command_register("inc-engine/list-engines", "", 0, 0, > > + engine_list_engines, NULL); > > +} > > + > > +struct engine * > > +engine_new(struct engine_node *node, struct engine_arg *arg, > > + const char *name) > > +{ > > + struct engine *e = xzalloc(sizeof *e); > > + > > + e->engine_nodes = engine_get_nodes(node, &e->engine_n_nodes); > > + e->name = name; > > + > > + for (size_t i = 0; i < e->engine_n_nodes; i++) { > > + if (e->engine_nodes[i]->init) { > > + e->engine_nodes[i]->data = > > + e->engine_nodes[i]->init(e->engine_nodes[i], arg); > > + } else { > > + e->engine_nodes[i]->data = NULL; > > + } > > + e->engine_nodes[i]->e = e; > > + } > > + > > + ovs_list_push_back(&engines, &e->node); > > + > > + return e; > > } > > void > > -engine_cleanup(void) > > +engine_cleanup(struct engine *e) > > { > > - for (size_t i = 0; i < engine_n_nodes; i++) { > > - if (engine_nodes[i]->clear_tracked_data) { > > - engine_nodes[i]->clear_tracked_data(engine_nodes[i]->data); > > + for (size_t i = 0; i < e->engine_n_nodes; i++) { > > + if (e->engine_nodes[i]->clear_tracked_data) { > > + e->engine_nodes[i]->clear_tracked_data( > > + e->engine_nodes[i]->data); > > } > > - if (engine_nodes[i]->cleanup) { > > - engine_nodes[i]->cleanup(engine_nodes[i]->data); > > + if (e->engine_nodes[i]->cleanup) { > > + e->engine_nodes[i]->cleanup(e->engine_nodes[i]->data); > > } > > - free(engine_nodes[i]->data); > > + free(e->engine_nodes[i]->data); > > } > > - free(engine_nodes); > > - engine_nodes = NULL; > > - engine_n_nodes = 0; > > + ovs_list_remove(&e->node); > > + free(e->engine_nodes); > > + free(e); > > } > > struct engine_node * > > @@ -284,10 +346,10 @@ engine_node_changed(struct engine_node *node) > > } > > bool > > -engine_has_run(void) > > +engine_has_run(struct engine *e) > > { > > - for (size_t i = 0; i < engine_n_nodes; i++) { > > - if (engine_nodes[i]->state != EN_STALE) { > > + for (size_t i = 0; i < e->engine_n_nodes; i++) { > > + if (e->engine_nodes[i]->state != EN_STALE) { > > return true; > > } > > } > > @@ -295,9 +357,9 @@ engine_has_run(void) > > } > > bool > > -engine_aborted(void) > > +engine_aborted(struct engine *e) > > { > > - return engine_run_aborted; > > + return e->engine_run_aborted; > > } > > void * > > @@ -316,14 +378,15 @@ engine_get_internal_data(struct engine_node *node) > > } > > void > > -engine_init_run(void) > > +engine_init_run(struct engine *e) > > { > > VLOG_DBG("Initializing new run"); > > - for (size_t i = 0; i < engine_n_nodes; i++) { > > - engine_set_node_state(engine_nodes[i], EN_STALE); > > + for (size_t i = 0; i < e->engine_n_nodes; i++) { > > + engine_set_node_state(e->engine_nodes[i], EN_STALE); > > - if (engine_nodes[i]->clear_tracked_data) { > > - engine_nodes[i]->clear_tracked_data(engine_nodes[i]->data); > > + if (e->engine_nodes[i]->clear_tracked_data) { > > + e->engine_nodes[i]->clear_tracked_data( > > + e->engine_nodes[i]->data); > > } > > } > > } > > @@ -397,7 +460,8 @@ engine_compute(struct engine_node *node, bool > > recompute_allowed) > > } > > static void > > -engine_run_node(struct engine_node *node, bool recompute_allowed) > > +engine_run_node(struct engine *e, struct engine_node *node, > > + bool recompute_allowed) > > { > > if (!node->n_inputs) { > > /* Run the node handler which might change state. */ > > @@ -406,7 +470,7 @@ engine_run_node(struct engine_node *node, bool > > recompute_allowed) > > return; > > } > > - if (engine_force_recompute) { > > + if (e->engine_force_recompute) { > > engine_recompute(node, recompute_allowed, "forced"); > > return; > > } > > @@ -447,41 +511,41 @@ engine_run_node(struct engine_node *node, bool > > recompute_allowed) > > } > > void > > -engine_run(bool recompute_allowed) > > +engine_run(struct engine *e, bool recompute_allowed) > > { > > /* If the last run was aborted skip the incremental run because a > > * recompute is needed first. > > */ > > - if (!recompute_allowed && engine_run_aborted) { > > + if (!recompute_allowed && e->engine_run_aborted) { > > return; > > } > > - engine_run_aborted = false; > > - for (size_t i = 0; i < engine_n_nodes; i++) { > > - engine_run_node(engine_nodes[i], recompute_allowed); > > + e->engine_run_aborted = false; > > + for (size_t i = 0; i < e->engine_n_nodes; i++) { > > + engine_run_node(e, e->engine_nodes[i], recompute_allowed); > > - if (engine_nodes[i]->state == EN_ABORTED) { > > - engine_nodes[i]->stats.abort++; > > - engine_run_aborted = true; > > + if (e->engine_nodes[i]->state == EN_ABORTED) { > > + e->engine_nodes[i]->stats.abort++; > > + e->engine_run_aborted = true; > > return; > > } > > } > > } > > bool > > -engine_need_run(void) > > +engine_need_run(struct engine *e) > > { > > - for (size_t i = 0; i < engine_n_nodes; i++) { > > + for (size_t i = 0; i < e->engine_n_nodes; i++) { > > /* Check only leaf nodes for updates. */ > > - if (engine_nodes[i]->n_inputs) { > > + if (e->engine_nodes[i]->n_inputs) { > > continue; > > } > > - engine_nodes[i]->run(engine_nodes[i], engine_nodes[i]->data); > > - engine_nodes[i]->stats.recompute++; > > - VLOG_DBG("input node: %s, state: %s", engine_nodes[i]->name, > > - engine_node_state_name[engine_nodes[i]->state]); > > - if (engine_nodes[i]->state == EN_UPDATED) { > > + e->engine_nodes[i]->run(e->engine_nodes[i], > > e->engine_nodes[i]->data); > > + e->engine_nodes[i]->stats.recompute++; > > + VLOG_DBG("input node: %s, state: %s", e->engine_nodes[i]->name, > > + engine_node_state_name[e->engine_nodes[i]->state]); > > + if (e->engine_nodes[i]->state == EN_UPDATED) { > > return true; > > } > > } > > @@ -489,9 +553,9 @@ engine_need_run(void) > > } > > void > > -engine_trigger_recompute(void) > > +engine_trigger_recompute(struct engine *e) > > { > > VLOG_INFO("User triggered force recompute."); > > - engine_set_force_recompute(true); > > + engine_set_force_recompute(e, true); > > poll_immediate_wake(); > > } > > diff --git a/lib/inc-proc-eng.h b/lib/inc-proc-eng.h > > index 9bfab1f7c..881242138 100644 > > --- a/lib/inc-proc-eng.h > > +++ b/lib/inc-proc-eng.h > > @@ -67,6 +67,7 @@ > > #include <stdint.h> > > #include "compiler.h" > > +#include "openvswitch/list.h" > > struct engine_context { > > struct ovsdb_idl_txn *ovs_idl_txn; > > @@ -122,6 +123,8 @@ struct engine_stats { > > }; > > struct engine_node { > > + struct engine *e; > > + > > /* A unique name for each node. */ > > char *name; > > @@ -173,30 +176,47 @@ struct engine_node { > > struct engine_stats stats; > > }; > > +struct engine { > > + struct ovs_list node; > > + > > + const char *name; > > + > > + struct engine_node **engine_nodes; > > + size_t engine_n_nodes; > > + > > + bool engine_force_recompute; > > + bool engine_run_aborted; > > + > > + const struct engine_context *engine_context; > > +}; > > + > > +void engine_init_global(void); > > + > > /* Initialize the data for the engine nodes. It calls each node's > > * init() method if not NULL passing the user supplied 'arg'. > > * It should be called before the main loop. */ > > -void engine_init(struct engine_node *node, struct engine_arg *arg); > > +struct engine *engine_new(struct engine_node *node, struct engine_arg *arg, > > + const char *name); > > /* Initialize the engine nodes for a new run. It should be called in the > > * main processing loop before every potential engine_run(). > > */ > > -void engine_init_run(void); > > +void engine_init_run(struct engine *e); > > /* Execute the processing, which should be called in the main loop. > > * Updates the engine node's states accordingly. If 'recompute_allowed' is > > * false and a recompute is required by the current engine run then the > > engine > > * aborts. > > */ > > -void engine_run(bool recompute_allowed); > > +void engine_run(struct engine *e, bool recompute_allowed); > > /* Clean up the data for the engine nodes. It calls each node's > > * cleanup() method if not NULL. It should be called before the program > > * terminates. */ > > -void engine_cleanup(void); > > +void engine_cleanup(struct engine *e); > > /* Check if engine needs to run but didn't. */ > > -bool engine_need_run(void); > > +bool engine_need_run(struct engine *e); > > /* Get the input node with <name> for <node> */ > > struct engine_node * engine_get_input(const char *input_name, > > @@ -216,7 +236,7 @@ void engine_add_input(struct engine_node *node, struct > > engine_node *input, > > * in circumstances when we are not sure there is change or not, or > > * when there is change but the engine couldn't be executed in that > > * iteration, and the change can't be tracked across iterations */ > > -void engine_set_force_recompute(bool val); > > +void engine_set_force_recompute(struct engine *e, bool val); > > /* Return the current engine_context. The values in the context can be > > NULL > > * if the engine is run with allow_recompute == false in the current > > @@ -224,9 +244,9 @@ void engine_set_force_recompute(bool val); > > * Therefore, it is the responsibility of the caller to check the context > > * values when called from change handlers. > > */ > > -const struct engine_context *engine_get_context(void); > > +const struct engine_context *engine_get_context(struct engine *e); > > -void engine_set_context(const struct engine_context *); > > +void engine_set_context(struct engine *e, const struct engine_context *); > > void engine_set_node_state_at(struct engine_node *node, > > enum engine_node_state state, > > @@ -236,10 +256,10 @@ void engine_set_node_state_at(struct engine_node > > *node, > > bool engine_node_changed(struct engine_node *node); > > /* Return true if the engine has run in the last iteration. */ > > -bool engine_has_run(void); > > +bool engine_has_run(struct engine *e); > > /* Returns true if during the last engine run we had to abort processing. > > */ > > -bool engine_aborted(void); > > +bool engine_aborted(struct engine *e); > > /* Return a pointer to node data accessible for users outside the > > processing > > * engine. If the node data is not valid (e.g., last engine_run() failed > > or > > @@ -265,7 +285,7 @@ void *engine_get_internal_data(struct engine_node > > *node); > > engine_set_node_state_at(node, state, OVS_SOURCE_LOCATOR) > > /* Trigger a full recompute. */ > > -void engine_trigger_recompute(void); > > +void engine_trigger_recompute(struct engine *e); > > struct ed_ovsdb_index { > > const char *name; > > diff --git a/northd/en-lflow.c b/northd/en-lflow.c > > index ffbdaf4e8..5451e0551 100644 > > --- a/northd/en-lflow.c > > +++ b/northd/en-lflow.c > > @@ -32,7 +32,7 @@ VLOG_DEFINE_THIS_MODULE(en_lflow); > > void en_lflow_run(struct engine_node *node, void *data OVS_UNUSED) > > { > > - const struct engine_context *eng_ctx = engine_get_context(); > > + const struct engine_context *eng_ctx = engine_get_context(node->e); > > struct lflow_input lflow_input; > > diff --git a/northd/en-northd.c b/northd/en-northd.c > > index 79da7e1c4..064f9d93a 100644 > > --- a/northd/en-northd.c > > +++ b/northd/en-northd.c > > @@ -32,7 +32,7 @@ VLOG_DEFINE_THIS_MODULE(en_northd); > > void en_northd_run(struct engine_node *node, void *data) > > { > > - const struct engine_context *eng_ctx = engine_get_context(); > > + const struct engine_context *eng_ctx = engine_get_context(node->e); > > struct northd_input input_data; > > diff --git a/northd/inc-proc-northd.c b/northd/inc-proc-northd.c > > index af55221e3..049fe226a 100644 > > --- a/northd/inc-proc-northd.c > > +++ b/northd/inc-proc-northd.c > > @@ -33,6 +33,8 @@ > > VLOG_DEFINE_THIS_MODULE(inc_proc_northd); > > +static struct engine *flow_engine; > > + > > #define NB_NODES \ > > NB_NODE(nb_global, "nb_global") \ > > NB_NODE(copp, "copp") \ > > @@ -150,6 +152,8 @@ static ENGINE_NODE(lflow, "lflow"); > > void inc_proc_northd_init(struct ovsdb_idl_loop *nb, > > struct ovsdb_idl_loop *sb) > > { > > + engine_init_global(); > > + > > /* Define relationships between nodes where first argument is > > dependent > > * on the second argument */ > > engine_add_input(&en_northd, &en_nb_nb_global, NULL); > > @@ -229,7 +233,7 @@ void inc_proc_northd_init(struct ovsdb_idl_loop *nb, > > struct ovsdb_idl_index *sbrec_chassis_by_hostname = > > chassis_hostname_index_create(sb->idl); > > - engine_init(&en_lflow, &engine_arg); > > + flow_engine = engine_new(&en_lflow, &engine_arg, "flow_engine"); > > engine_ovsdb_node_add_index(&en_sb_chassis, > > "sbrec_chassis_by_name", > > @@ -251,14 +255,14 @@ void inc_proc_northd_init(struct ovsdb_idl_loop *nb, > > void inc_proc_northd_run(struct ovsdb_idl_txn *ovnnb_txn, > > struct ovsdb_idl_txn *ovnsb_txn, > > bool recompute) { > > - engine_init_run(); > > + engine_init_run(flow_engine); > > /* Force a full recompute if instructed to, for example, after a NB/SB > > * reconnect event. However, make sure we don't overwrite an existing > > * force-recompute request if 'recompute' is false. > > */ > > if (recompute) { > > - engine_set_force_recompute(recompute); > > + engine_set_force_recompute(flow_engine, recompute); > > } > > struct engine_context eng_ctx = { > > @@ -266,31 +270,31 @@ void inc_proc_northd_run(struct ovsdb_idl_txn > > *ovnnb_txn, > > .ovnsb_idl_txn = ovnsb_txn, > > }; > > - engine_set_context(&eng_ctx); > > + engine_set_context(flow_engine, &eng_ctx); > > if (ovnnb_txn && ovnsb_txn) { > > - engine_run(true); > > + engine_run(flow_engine, true); > > } > > - if (!engine_has_run()) { > > - if (engine_need_run()) { > > + if (!engine_has_run(flow_engine)) { > > + if (engine_need_run(flow_engine)) { > > VLOG_DBG("engine did not run, force recompute next time."); > > - engine_set_force_recompute(true); > > + engine_set_force_recompute(flow_engine, true); > > poll_immediate_wake(); > > } else { > > VLOG_DBG("engine did not run, and it was not needed"); > > } > > - } else if (engine_aborted()) { > > + } else if (engine_aborted(flow_engine)) { > > VLOG_DBG("engine was aborted, force recompute next time."); > > - engine_set_force_recompute(true); > > + engine_set_force_recompute(flow_engine, true); > > poll_immediate_wake(); > > } else { > > - engine_set_force_recompute(false); > > + engine_set_force_recompute(flow_engine, false); > > } > > } > > void inc_proc_northd_cleanup(void) > > { > > - engine_cleanup(); > > - engine_set_context(NULL); > > + engine_set_context(flow_engine, NULL); > > + engine_cleanup(flow_engine); > > } > > > _______________________________________________ dev mailing list [email protected] https://mail.openvswitch.org/mailman/listinfo/ovs-dev
