On Wed, Nov 20, 2019 at 9:21 AM Dumitru Ceara <dce...@redhat.com> wrote: > > On Tue, Nov 19, 2019 at 7:16 PM Han Zhou <hz...@ovn.org> wrote: > > > > > > > > On Tue, Nov 19, 2019 at 1:58 AM Dumitru Ceara <dce...@redhat.com> wrote: > > > > > > On Tue, Nov 19, 2019 at 1:41 AM Han Zhou <hz...@ovn.org> wrote: > > > > > > > > Thanks Dumitru. Please see my comments inline below. > > > > > > > > On Mon, Nov 18, 2019 at 6:07 AM Dumitru Ceara <dce...@redhat.com> wrote: > > > > > > > > > > This commit transforms the 'changed' field in struct engine_node in a > > > > > 'state' field. Possible node states are: > > > > > - "Stale": data in the node is not up to date with the DB. > > > > > - "Updated": data in the node is valid but was updated during > > > > > the last run of the engine. > > > > > - "Valid": data in the node is valid and didn't change during > > > > > the last run of the engine. > > > > > - "Aborted": during the last run, processing was aborted for > > > > > this node. > > > > > > > > > > This commit also further refactors the I-P engine: > > > > > - instead of recursively performing all the engine processing a > > > > > preprocessing stage is added (engine_get_nodes()) before the main > > > > > processing > > > > > loop is executed in order to topologically sort nodes in the engine > > > > > such > > > > > that all inputs of a given node appear in the sorted array before > > > > > the node > > > > > itself. This simplifies a bit the code in engine_run(). > > > > > > > > Could you tell the reason of changing it to non-recursive? It seems > > > > adding more code rather than simplifying, and effort is needed to > > > > ensure the correctness for the new code. Probably there are some > > > > benefit that make the later patches easier, but it is not obvious to > > > > me. Could you help point out if that's the case? > > > > > > My reasoning was that the engine graph is static (i.e., we build it > > > once at startup and it never changes afterwards) so all recursion > > > trees are always identical. > > > > > I agree that the graph is static, which makes non-recursive a good option, > > but it doesn't necessarily mean it simplifies code than the recursive > > version :) > > I might also just be my personal view that debugging recursive > functions is more complicated than with iterative ones :) > > > > > > Moreover, with adding engine node explicit states we don't really need > > > to store a run_id in the nodes because we have the state of each node > > > after an engine run. I think removing the run_id is a good idea > > > because we minimize the external state the user of the engine should > > > manage (in this case engine_run_id and it's incrementing logic). > > > > > > However, if we keep the engine processing in a recursive fashion then > > > for each of the recursive operations (engine_run, engine_need_run, > > > engine_init, engine_cleanup) we need a way to avoid executing the > > > operation twice for a given node. This can be done by adding more > > > flags to the nodes (or more state values) but given that our DAG is > > > fixed, precomputing the processing order made more sense to me. What > > > do you think? > > > > > It is true that run_id can be completely avoided with the non-recursive > > version. However, it can be kept as internal variable for recursive > > version, to skip repeated access of a node, if we use the engine_init_run() > > to increment it internally. I think we don't really need any more flags > > than the non-recursive version other than this internal run_id, and we can > > remove the external engine_run_id incrementing logic for recursive version, > > too. > > Yes, with an internal run_id this could work. > > > > > > > > > > > > - remove the need for using an engine_run_id by using the newly added > > > > > states. > > > > > > > > > > Signed-off-by: Dumitru Ceara <dce...@redhat.com> > > > > > --- > > > > > controller/ovn-controller.c | 88 ++++++++++------- > > > > > lib/inc-proc-eng.c | 219 > > > > > ++++++++++++++++++++++++++++++++----------- > > > > > lib/inc-proc-eng.h | 75 +++++++++++---- > > > > > 3 files changed, 271 insertions(+), 111 deletions(-) > > > > > > > > > > diff --git a/controller/ovn-controller.c b/controller/ovn-controller.c > > > > > index c56190f..033eff4 100644 > > > > > --- a/controller/ovn-controller.c > > > > > +++ b/controller/ovn-controller.c > > > > > @@ -758,10 +758,10 @@ en_ofctrl_is_connected_run(struct engine_node > > > > > *node) > > > > > (struct ed_type_ofctrl_is_connected *)node->data; > > > > > if (data->connected != ofctrl_is_connected()) { > > > > > data->connected = !data->connected; > > > > > - node->changed = true; > > > > > + engine_set_node_state(node, EN_UPDATED); > > > > > return; > > > > > } > > > > > - node->changed = false; > > > > > + engine_set_node_state(node, EN_VALID); > > > > > } > > > > > > > > > > struct ed_type_addr_sets { > > > > > @@ -811,7 +811,7 @@ en_addr_sets_run(struct engine_node *node) > > > > > addr_sets_init(as_table, &as->addr_sets); > > > > > > > > > > as->change_tracked = false; > > > > > - node->changed = true; > > > > > + engine_set_node_state(node, EN_UPDATED); > > > > > } > > > > > > > > > > static bool > > > > > @@ -830,11 +830,14 @@ addr_sets_sb_address_set_handler(struct > > > > > engine_node *node) > > > > > addr_sets_update(as_table, &as->addr_sets, &as->new, > > > > > &as->deleted, &as->updated); > > > > > > > > > > - node->changed = !sset_is_empty(&as->new) || > > > > > !sset_is_empty(&as->deleted) > > > > > - || !sset_is_empty(&as->updated); > > > > > + if (!sset_is_empty(&as->new) || !sset_is_empty(&as->deleted) || > > > > > + !sset_is_empty(&as->updated)) { > > > > > + engine_set_node_state(node, EN_UPDATED); > > > > > + } else { > > > > > + engine_set_node_state(node, EN_VALID); > > > > > + } > > > > > > > > > > as->change_tracked = true; > > > > > - node->changed = true; > > > > > return true; > > > > > } > > > > > > > > > > @@ -885,7 +888,7 @@ en_port_groups_run(struct engine_node *node) > > > > > port_groups_init(pg_table, &pg->port_groups); > > > > > > > > > > pg->change_tracked = false; > > > > > - node->changed = true; > > > > > + engine_set_node_state(node, EN_UPDATED); > > > > > } > > > > > > > > > > static bool > > > > > @@ -904,11 +907,14 @@ port_groups_sb_port_group_handler(struct > > > > > engine_node *node) > > > > > port_groups_update(pg_table, &pg->port_groups, &pg->new, > > > > > &pg->deleted, &pg->updated); > > > > > > > > > > - node->changed = !sset_is_empty(&pg->new) || > > > > > !sset_is_empty(&pg->deleted) > > > > > - || !sset_is_empty(&pg->updated); > > > > > + if (!sset_is_empty(&pg->new) || !sset_is_empty(&pg->deleted) || > > > > > + !sset_is_empty(&pg->updated)) { > > > > > + engine_set_node_state(node, EN_UPDATED); > > > > > + } else { > > > > > + engine_set_node_state(node, EN_VALID); > > > > > + } > > > > > > > > > > pg->change_tracked = true; > > > > > - node->changed = true; > > > > > return true; > > > > > } > > > > > > > > > > @@ -1091,7 +1097,7 @@ en_runtime_data_run(struct engine_node *node) > > > > > update_ct_zones(local_lports, local_datapaths, ct_zones, > > > > > ct_zone_bitmap, pending_ct_zones); > > > > > > > > > > - node->changed = true; > > > > > + engine_set_node_state(node, EN_UPDATED); > > > > > } > > > > > > > > > > static bool > > > > > @@ -1157,10 +1163,10 @@ en_mff_ovn_geneve_run(struct engine_node > > > > > *node) > > > > > enum mf_field_id mff_ovn_geneve = ofctrl_get_mf_field_id(); > > > > > if (data->mff_ovn_geneve != mff_ovn_geneve) { > > > > > data->mff_ovn_geneve = mff_ovn_geneve; > > > > > - node->changed = true; > > > > > + engine_set_node_state(node, EN_UPDATED); > > > > > return; > > > > > } > > > > > - node->changed = false; > > > > > + engine_set_node_state(node, EN_VALID); > > > > > } > > > > > > > > > > struct ed_type_flow_output { > > > > > @@ -1322,7 +1328,7 @@ en_flow_output_run(struct engine_node *node) > > > > > active_tunnels, > > > > > flow_table); > > > > > > > > > > - node->changed = true; > > > > > + engine_set_node_state(node, EN_UPDATED); > > > > > } > > > > > > > > > > static bool > > > > > @@ -1404,7 +1410,7 @@ flow_output_sb_logical_flow_handler(struct > > > > > engine_node *node) > > > > > flow_table, group_table, meter_table, lfrr, > > > > > conj_id_ofs); > > > > > > > > > > - node->changed = true; > > > > > + engine_set_node_state(node, EN_UPDATED); > > > > > return handled; > > > > > } > > > > > > > > > > @@ -1427,7 +1433,7 @@ flow_output_sb_mac_binding_handler(struct > > > > > engine_node *node) > > > > > lflow_handle_changed_neighbors(sbrec_port_binding_by_name, > > > > > mac_binding_table, flow_table); > > > > > > > > > > - node->changed = true; > > > > > + engine_set_node_state(node, EN_UPDATED); > > > > > return true; > > > > > } > > > > > > > > > > @@ -1531,7 +1537,7 @@ flow_output_sb_port_binding_handler(struct > > > > > engine_node *node) > > > > > chassis, ct_zones, local_datapaths, > > > > > active_tunnels, flow_table); > > > > > > > > > > - node->changed = true; > > > > > + engine_set_node_state(node, EN_UPDATED); > > > > > return true; > > > > > } > > > > > > > > > > @@ -1580,7 +1586,7 @@ flow_output_sb_multicast_group_handler(struct > > > > > engine_node *node) > > > > > mff_ovn_geneve, chassis, ct_zones, local_datapaths, > > > > > flow_table); > > > > > > > > > > - node->changed = true; > > > > > + engine_set_node_state(node, EN_UPDATED); > > > > > return true; > > > > > > > > > > } > > > > > @@ -1694,7 +1700,9 @@ _flow_output_resource_ref_handler(struct > > > > > engine_node *node, > > > > > conj_id_ofs, &changed)) { > > > > > return false; > > > > > } > > > > > - node->changed = changed || node->changed; > > > > > + if (changed) { > > > > > + engine_set_node_state(node, EN_UPDATED); > > > > > + } > > > > > } > > > > > SSET_FOR_EACH (ref_name, updated) { > > > > > if (!lflow_handle_changed_ref(ref_type, ref_name, > > > > > @@ -1707,7 +1715,9 @@ _flow_output_resource_ref_handler(struct > > > > > engine_node *node, > > > > > conj_id_ofs, &changed)) { > > > > > return false; > > > > > } > > > > > - node->changed = changed || node->changed; > > > > > + if (changed) { > > > > > + engine_set_node_state(node, EN_UPDATED); > > > > > + } > > > > > } > > > > > SSET_FOR_EACH (ref_name, new) { > > > > > if (!lflow_handle_changed_ref(ref_type, ref_name, > > > > > @@ -1720,7 +1730,9 @@ _flow_output_resource_ref_handler(struct > > > > > engine_node *node, > > > > > conj_id_ofs, &changed)) { > > > > > return false; > > > > > } > > > > > - node->changed = changed || node->changed; > > > > > + if (changed) { > > > > > + engine_set_node_state(node, EN_UPDATED); > > > > > + } > > > > > } > > > > > > > > > > return true; > > > > > @@ -1922,7 +1934,11 @@ main(int argc, char *argv[]) > > > > > engine_add_input(&en_runtime_data, &en_sb_port_binding, > > > > > runtime_data_sb_port_binding_handler); > > > > > > > > > > - engine_init(&en_flow_output); > > > > > + /* Get the sorted engine nodes to be used for every engine run. > > > > > */ > > > > > + size_t en_count = 0; > > > > > + struct engine_node **en_nodes = engine_get_nodes(&en_flow_output, > > > > > + &en_count); > > > > > + engine_init(en_nodes, en_count); > > > > > > > > I think engine_get_nodes() and engine_init() can be combined. We only > > > > need to expose engine_init() interface, which can call > > > > engine_get_nodes() internally and store n_count and nodes internally in > > > > engine module. > > > > > > Ok but then wouldn't it make more sense then to store all this in an > > > "struct inc_engine" structure which engine_init() would return? We'd > > > also need to move the globals engine_force_recompute, > > > engine_abort_recompute, engine_context inside the new inc_engine > > > structure then. > > > > > > > It is ok to have "struct inc_engine" structure, but it is only necessary if > > we want to support multiple instances of inc-engine. It seems not needed at > > this moment. I think it is ok to just keep the globals as module private > > (static) variables. > > I see, ok. > > > > > > > > > > > In addition, there can be more than 1 root node in the DAG. Originally > > > > we can just call engine_run(root_node) for each root node. Now with the > > > > non-recursive, I think we can handle this by: > > > > - in engine_get_nodes, we don't need to pass the root_node, but > > > > instead, the engine can just process all engine nodes. > > > > - remove the root_node parameter from all interfaces. > > > > > > In general, a DAG doesn't really have a root node, indeed. But even in > > > the original code everything was designed with one "ultimate" output > > > in mind, en_flow_output, and we would only call > > > engine_run(&en_flow_output, ..). I don't think we can remove the > > > root_node parameter from engine_get_nodes because we need to have a > > > starting point for precomputing the processing order of the nodes. We > > > were implicitly doing the same thing through recursion in the original > > > code. > > > > > > > In fact the recursive version didn't have such constraint. The idea was, if > > there is any node that needs compute but is not a offspring of the > > flow_output, we could just call engine_run(&en_xxx) in the same iteration > > as engine_run(&en_flow_output). (Maybe there were places need some update > > since it is never tested this way but in general this design should work). > > Now with the non-recursive version, I think we'd better not add any extra > > constraint for this. With the current patch, we can't just call engine_run > > with another *root node* any more because the interface changed. But I > > think this can be solved easily in the topo_sort logic, by processing all > > nodes in the graph, instead of starting from a given root node. > > In fact, even for the recursive version, it would be even better to let the > > engine figure out by itself which nodes are the roots and execute from > > there - it was just not that important, it would be needed if there were > > more root nodes. > > Removing the root node from all the interfaces would also ensure the engine > > is always executed/evaluated as a whole and rule out any possible misuse of > > the engine by passing a middle node that results in partial exeuction of > > the engine and unpredicatble consequences. > > Ok, I'll change the code to remove all root nodes from interfaces. I > need to think more about how to do it at engine_init() because there > we would need to pass the nodes to the incremental engine somehow.
Hi Han, I sent a v6 addressing everything we discussed in this thread: https://patchwork.ozlabs.org/project/openvswitch/list/?series=144565 The only exception is engine_init(). I still pass the en_flow_output node like we did in the recursive implementation because otherwise we don't really have a way to associate the nodes that we create in ovn-controller.c with the incremental engine. This can be further refined if/when we need to have multiple "root" nodes. Thanks, Dumitru > > > > > > > > > > > > > > > > > ofctrl_init(&ed_flow_output.group_table, > > > > > &ed_flow_output.meter_table, > > > > > @@ -1941,9 +1957,6 @@ main(int argc, char *argv[]) > > > > > unixctl_command_register("inject-pkt", "MICROFLOW", 1, 1, > > > > > inject_pkt, > > > > > &pending_pkt); > > > > > > > > > > - uint64_t engine_run_id = 0; > > > > > - bool engine_run_done = true; > > > > > - > > > > > unsigned int ovs_cond_seqno = UINT_MAX; > > > > > unsigned int ovnsb_cond_seqno = UINT_MAX; > > > > > > > > > > @@ -1951,7 +1964,7 @@ main(int argc, char *argv[]) > > > > > exiting = false; > > > > > restart = false; > > > > > while (!exiting) { > > > > > - engine_run_id++; > > > > > + engine_init_run(en_nodes, en_count, &en_flow_output); > > > > > > > > > > update_sb_db(ovs_idl_loop.idl, ovnsb_idl_loop.idl); > > > > > update_ssl_config(ovsrec_ssl_table_get(ovs_idl_loop.idl)); > > > > > @@ -2044,15 +2057,13 @@ main(int argc, char *argv[]) > > > > > * this round of engine_run and continue > > > > > processing > > > > > * acculated changes incrementally later > > > > > when > > > > > * ofctrl_can_put() returns true. */ > > > > > - if (engine_run_done) { > > > > > + if (!engine_aborted(&en_flow_output)) { > > > > > engine_set_abort_recompute(true); > > > > > - engine_run_done = > > > > > engine_run(&en_flow_output, > > > > > - > > > > > engine_run_id); > > > > > + engine_run(en_nodes, en_count); > > > > > } > > > > > } else { > > > > > engine_set_abort_recompute(false); > > > > > - engine_run_done = true; > > > > > - engine_run(&en_flow_output, > > > > > engine_run_id); > > > > > + engine_run(en_nodes, en_count); > > > > > } > > > > > } > > > > > stopwatch_stop(CONTROLLER_LOOP_STOPWATCH_NAME, > > > > > @@ -2071,7 +2082,7 @@ main(int argc, char *argv[]) > > > > > > > > > > sbrec_meter_table_get(ovnsb_idl_loop.idl), > > > > > get_nb_cfg(sbrec_sb_global_table_get( > > > > > ovnsb_idl_loop.idl)), > > > > > - en_flow_output.changed); > > > > > + engine_node_changed(&en_flow_output)); > > > > > pinctrl_run(ovnsb_idl_txn, > > > > > sbrec_datapath_binding_by_key, > > > > > sbrec_port_binding_by_datapath, > > > > > @@ -2089,7 +2100,7 @@ main(int argc, char *argv[]) > > > > > &ed_runtime_data.local_datapaths, > > > > > &ed_runtime_data.active_tunnels); > > > > > > > > > > - if (en_runtime_data.changed) { > > > > > + if (engine_node_changed(&en_runtime_data)) { > > > > > update_sb_monitors(ovnsb_idl_loop.idl, > > > > > chassis, > > > > > > > > > > &ed_runtime_data.local_lports, > > > > > > > > > > &ed_runtime_data.local_datapaths); > > > > > @@ -2097,17 +2108,17 @@ main(int argc, char *argv[]) > > > > > } > > > > > > > > > > } > > > > > - if (engine_need_run(&en_flow_output, engine_run_id)) { > > > > > + if (engine_need_run(en_nodes, en_count, > > > > > &en_flow_output)) { > > > > > VLOG_DBG("engine did not run, force recompute next > > > > > time: " > > > > > "br_int %p, chassis %p", br_int, > > > > > chassis); > > > > > engine_set_force_recompute(true); > > > > > poll_immediate_wake(); > > > > > - } else if (!engine_run_done) { > > > > > + } else if (engine_aborted(&en_flow_output)) { > > > > > VLOG_DBG("engine was aborted, force recompute next > > > > > time: " > > > > > "br_int %p, chassis %p", br_int, chassis); > > > > > engine_set_force_recompute(true); > > > > > poll_immediate_wake(); > > > > > - } else if (!engine_has_run(&en_flow_output, > > > > > engine_run_id)) { > > > > > + } else if (!engine_has_run(&en_flow_output)) { > > > > > VLOG_DBG("engine did not run, and it was not needed" > > > > > " either: br_int %p, chassis %p", > > > > > br_int, chassis); > > > > > @@ -2135,8 +2146,7 @@ main(int argc, char *argv[]) > > > > > } > > > > > } else { > > > > > VLOG_DBG("Pending_pkt conn but br_int %p or > > > > > chassis " > > > > > - "%p not ready. run-id: %"PRIu64, br_int, > > > > > - chassis, engine_run_id); > > > > > + "%p not ready.", br_int, chassis); > > > > > unixctl_command_reply_error(pending_pkt.conn, > > > > > "ovn-controller not ready."); > > > > > } > > > > > @@ -2185,7 +2195,7 @@ main(int argc, char *argv[]) > > > > > } > > > > > > > > > > engine_set_context(NULL); > > > > > - engine_cleanup(&en_flow_output); > > > > > + engine_cleanup(en_nodes, en_count); > > > > > > > > > > /* It's time to exit. Clean up the databases if we are not > > > > > restarting */ > > > > > if (!restart) { > > > > > diff --git a/lib/inc-proc-eng.c b/lib/inc-proc-eng.c > > > > > index 8a085e2..ee6afbe 100644 > > > > > --- a/lib/inc-proc-eng.c > > > > > +++ b/lib/inc-proc-eng.c > > > > > @@ -34,6 +34,13 @@ static bool engine_force_recompute = false; > > > > > static bool engine_abort_recompute = false; > > > > > static const struct engine_context *engine_context; > > > > > > > > > > +static const char *engine_node_state_name[EN_STATE_MAX] = { > > > > > + [EN_STALE] = "Stale", > > > > > + [EN_UPDATED] = "Updated", > > > > > + [EN_VALID] = "Valid", > > > > > + [EN_ABORTED] = "Aborted", > > > > > +}; > > > > > + > > > > > void > > > > > engine_set_force_recompute(bool val) > > > > > { > > > > > @@ -58,26 +65,62 @@ engine_set_context(const struct engine_context > > > > > *ctx) > > > > > engine_context = ctx; > > > > > } > > > > > > > > > > -void > > > > > -engine_init(struct engine_node *node) > > > > > +/* Builds the topologically sorted 'sorted_nodes' array starting from > > > > > + * 'node'. > > > > > + */ > > > > > +static struct engine_node ** > > > > > +engine_topo_sort(struct engine_node *node, struct engine_node > > > > > **sorted_nodes, > > > > > + size_t *n_count, size_t *n_size) > > > > > { > > > > > + /* It's not so efficient to walk the array of already sorted > > > > > nodes but > > > > > + * we know that sorting is done only once at startup so it's ok > > > > > for now. > > > > > + */ > > > > > + for (size_t i = 0; i < *n_count; i++) { > > > > > + if (sorted_nodes[i] == node) { > > > > > + return sorted_nodes; > > > > > + } > > > > > + } > > > > > + > > > > > for (size_t i = 0; i < node->n_inputs; i++) { > > > > > - engine_init(node->inputs[i].node); > > > > > + sorted_nodes = engine_topo_sort(node->inputs[i].node, > > > > > sorted_nodes, > > > > > + n_count, n_size); > > > > > } > > > > > - if (node->init) { > > > > > - node->init(node); > > > > > + if (*n_count == *n_size) { > > > > > + sorted_nodes = x2nrealloc(sorted_nodes, n_size, sizeof > > > > > *sorted_nodes); > > > > > } > > > > > + sorted_nodes[(*n_count)] = node; > > > > > + (*n_count)++; > > > > > + return sorted_nodes; > > > > > +} > > > > > + > > > > > +struct engine_node ** > > > > > +engine_get_nodes(struct engine_node *root_node, size_t *n_count) > > > > > +{ > > > > > + size_t n_size = 0; > > > > > + > > > > > + *n_count = 0; > > > > > + return engine_topo_sort(root_node, NULL, n_count, &n_size); > > > > > } > > > > > > > > > > void > > > > > -engine_cleanup(struct engine_node *node) > > > > > +engine_init(struct engine_node **nodes, size_t n_count) > > > > > { > > > > > - for (size_t i = 0; i < node->n_inputs; i++) { > > > > > - engine_cleanup(node->inputs[i].node); > > > > > + for (size_t i = 0; i < n_count; i++) { > > > > > + if (nodes[i]->init) { > > > > > + nodes[i]->init(nodes[i]); > > > > > + } > > > > > } > > > > > - if (node->cleanup) { > > > > > - node->cleanup(node); > > > > > +} > > > > > + > > > > > +void > > > > > +engine_cleanup(struct engine_node **nodes, size_t n_count) > > > > > +{ > > > > > + for (size_t i = 0; i < n_count; i++) { > > > > > + if (nodes[i]->cleanup) { > > > > > + nodes[i]->cleanup(nodes[i]); > > > > > + } > > > > > } > > > > > + free(nodes); > > > > > } > > > > > > > > > > struct engine_node * > > > > > @@ -128,16 +171,66 @@ engine_ovsdb_node_add_index(struct engine_node > > > > > *node, const char *name, > > > > > ed->n_indexes ++; > > > > > } > > > > > > > > > > +void > > > > > +engine_set_node_state_at(struct engine_node *node, > > > > > + enum engine_node_state state, > > > > > + const char *where) > > > > > +{ > > > > > + if (node->state == state) { > > > > > + return; > > > > > + } > > > > > + > > > > > + VLOG_DBG("%s: node: %s, old_state %s, new_state %s", > > > > > + where, node->name, > > > > > + engine_node_state_name[node->state], > > > > > + engine_node_state_name[state]); > > > > > + > > > > > + node->state = state; > > > > > +} > > > > > + > > > > > +static bool > > > > > +engine_node_valid(struct engine_node *node) > > > > > +{ > > > > > + return (node->state == EN_UPDATED || node->state == EN_VALID); > > > > > +} > > > > > + > > > > > bool > > > > > -engine_has_run(struct engine_node *node, uint64_t run_id) > > > > > +engine_node_changed(struct engine_node *node) > > > > > { > > > > > - return node->run_id == run_id; > > > > > + return node->state == EN_UPDATED; > > > > > +} > > > > > + > > > > > +bool > > > > > +engine_has_run(struct engine_node *node) > > > > > +{ > > > > > > > > engine_has_run() should go through all nodes. If any node is NOT STALE, > > > > return true. Engine hasn't run only if all nodes are STALE. (orginially > > > > it is easier to tell by utilizing engine_run_id) > > > > If some nodes are STALE, some are not, it means engine has run but > > > > aborted. > > > > > > Due to the way the code is written (and as you noticed in earlier > > > versions of the patches) the STALE state is transient. If the engine > > > ran, even if it aborted, then no node will be in state STALE. Please > > > see below regarding how ABORTED is propagated. > > > > > > > Ok, sorry that I misread the code earlier. However, after rewalk the code > > with your explain, it seems there are still chances (in theory) that some > > nodes are ABORTED but some nodes are STALE. For example, if A is input of > > B, and if A is UPDATED but is_valid(A) returns false because of custom > > implementation of is_valid(), then B will stay STALE. > > Would it be more generic and correct to go through all nodes and tell if > > the engine has run as a whole instead of based on the input node? (which > > suggests not having the node arg) > > > > Right, this is safer and there's no real performance trade off either. > I'll do it like this and walk all the nodes. > > > > > > > > > > + return node->state != EN_STALE; > > > > > +} > > > > > + > > > > > +bool > > > > > +engine_aborted(struct engine_node *node) > > > > > +{ > > > > > + return node->state == EN_ABORTED; > > > > > +} > > > > > + > > > > > +void > > > > > +engine_init_run(struct engine_node **nodes, size_t n_count, > > > > > + struct engine_node *root_node) > > > > > +{ > > > > > + /* No need to reinitialize if last run didn't happen. */ > > > > > + if (!engine_has_run(root_node)) { > > > > > > > > I think here is a problem. If in the last round, the root node didn't > > > > run because it is aborted at some intermediate node, but many other > > > > nodes could have run and state already changed to VALID/UPDATED. Now if > > > > we don't do the init, those nodes may contain invalid data since it is > > > > a new round of iteration, but the state telling they are valid. > > > > > > If one of the inputs of a given node went to ABORT state then the node > > > itself will also move to ABORT. And this will get propagated to all > > > successor nodes. > > > The idea of the check was to avoid walking all the nodes if the > > > root_node is in state STALE (engine_has_run() == false). STALE is > > > transient so if one node is STALE then all nodes are STALE and this > > > can only happen if the engine didn't run in the current iteration. > > > > > Sorry it was my misunderstanding of ABORT propagation logic. > > > > > > > > > > We don't need to pass "root_node" for engine_init_run. We can just > > > > reset all nodes to STALE state. > > > > > > I agree, we could decide if an engine needs init_run by checking that > > > the first node in 'nodes' is not in state STALE. > > > > > > > > > > > > > > > > + return; > > > > > + } > > > > > + > > > > > + VLOG_DBG("Initializing new run"); > > > > > + for (size_t i = 0; i < n_count; i++) { > > > > > + engine_set_node_state(nodes[i], EN_STALE); > > > > > + } > > > > > } > > > > > > > > > > /* Do a full recompute (or at least try). If we're not allowed then > > > > > * mark the node as "aborted". > > > > > */ > > > > > -static bool > > > > > +static void > > > > > engine_recompute(struct engine_node *node, bool forced, bool allowed) > > > > > { > > > > > VLOG_DBG("node: %s, recompute (%s)", node->name, > > > > > @@ -145,12 +238,12 @@ engine_recompute(struct engine_node *node, bool > > > > > forced, bool allowed) > > > > > > > > > > if (!allowed) { > > > > > VLOG_DBG("node: %s, recompute aborted", node->name); > > > > > - return false; > > > > > + engine_set_node_state(node, EN_ABORTED); > > > > > + return; > > > > > } > > > > > > > > > > + /* Run the node handler which might change state. */ > > > > > node->run(node); > > > > > - VLOG_DBG("node: %s, changed: %d", node->name, node->changed); > > > > > - return true; > > > > > } > > > > > > > > > > /* Return true if the node could be computed without triggerring a > > > > > full > > > > > @@ -161,7 +254,7 @@ engine_compute(struct engine_node *node, bool > > > > > recompute_allowed) > > > > > { > > > > > for (size_t i = 0; i < node->n_inputs; i++) { > > > > > /* If the input node data changed call its change handler. */ > > > > > - if (node->inputs[i].node->changed) { > > > > > + if (node->inputs[i].node->state == EN_UPDATED) { > > > > > VLOG_DBG("node: %s, handle change for input %s", > > > > > node->name, node->inputs[i].node->name); > > > > > > > > > > @@ -172,57 +265,61 @@ engine_compute(struct engine_node *node, bool > > > > > recompute_allowed) > > > > > VLOG_DBG("node: %s, can't handle change for input > > > > > %s, " > > > > > "fall back to recompute", > > > > > node->name, node->inputs[i].node->name); > > > > > - if (!engine_recompute(node, false, > > > > > recompute_allowed)) { > > > > > + engine_recompute(node, false, recompute_allowed); > > > > > + if (engine_aborted(node)) { > > > > > > > > The aborted state was propagated through the recursive logic, but it is > > > > not the case in this new implementation. Is this on purpose? > > > > > > With the new implementation the aborted state is also propagated, just > > > that it's done in engine_run_node(). > > > > > Ack. > > > > > > > > > > > return false; > > > > > } > > > > > } > > > > > } > > > > > } > > > > > - > > > > > return true; > > > > > } > > > > > > > > > > -bool engine_run(struct engine_node *node, uint64_t run_id) > > > > > +static void > > > > > +engine_run_node(struct engine_node *node) > > > > > { > > > > > - if (node->run_id == run_id) { > > > > > - /* The node was already updated in this run (could be input > > > > > for > > > > > - * multiple other nodes). Stop processing. > > > > > - */ > > > > > - return true; > > > > > - } > > > > > - > > > > > - /* Initialize the node for this run. */ > > > > > - node->run_id = run_id; > > > > > - node->changed = false; > > > > > - > > > > > if (!node->n_inputs) { > > > > > + /* Run the node handler which might change state. */ > > > > > node->run(node); > > > > > - VLOG_DBG("node: %s, changed: %d", node->name, node->changed); > > > > > - return true; > > > > > + return; > > > > > } > > > > > > > > > > + bool input_stale = false; > > > > > for (size_t i = 0; i < node->n_inputs; i++) { > > > > > - if (!engine_run(node->inputs[i].node, run_id)) { > > > > > - return false; > > > > > + if (!engine_node_valid(node->inputs[i].node)) { > > > > > + /* If the input node aborted computation, move to > > > > > EN_ABORTED. > > > > > + * This will be propagated to following nodes. > > > > > + */ > > > > > + if (engine_aborted(node->inputs[i].node)) { > > > > > + engine_set_node_state(node, EN_ABORTED); > > > > > + } > > > > > > Here we propagate the ABORTED state. > > > > > Ack. > > > > > > > + > > > > > + input_stale = true; > > > > > } > > > > > } > > > > > > > > > > - bool need_compute = false; > > > > > + /* If at least one input is stale, don't change state. */ > > > > > + if (input_stale) { > > > > > + return; > > > > > + } > > > > > > With the current ovn-controller code, input_stale will never be true > > > unless one of the input nodes aborted computation. However, because of > > > the is_valid interface we're adding in the last patch of the series, > > > engine_node_valid(input) might return false in the future based on a > > > user defined condition even if the state is computed. > > > > > > Maybe for better readability I can introduce a new state called > > > EN_INVALID and enter it every time engine_node_valid(input) returns > > > false. What do you think? > > > > > I think we have 2 options here, each based on different assumption. > > > > Option1: Assume there is no use case that a custom is_valid() would return > > false after the node is computed. We can assert this assumption in > > engine_node_valid() by checking the node state first, and only call the > > node->is_valid() when the node state is not valid (i.e. ABORTED/STALE). > > > > Option2: Assume there is real use case that a custom is_valid() would > > return false after the node is computed, probably because some of the input > > doesn't meet the node's expectation. In this case I think instead of > > relying on the call of is_valid(), it is more straightforward that the > > node->run() or node->change_handler_xxx() tells this unexpected situation > > and set the node state directly, like how the UPDATED state is set. It is > > better not to set the state to INVALID only when engine_node_valid() is > > called. > > > > I tend to do option1 first, because the more state, the more complexity, > > and I don't see such use case of option2 yet. > > Sounds good to me. I'll go with option1. > > > > > > > > > > > > > if (engine_force_recompute) { > > > > > - return engine_recompute(node, true, !engine_abort_recompute); > > > > > + engine_recompute(node, true, !engine_abort_recompute); > > > > > + return; > > > > > } > > > > > > > > > > /* If any of the inputs updated data but there is no > > > > > change_handler, then > > > > > * recompute the current node too. > > > > > */ > > > > > + bool need_compute = false; > > > > > for (size_t i = 0; i < node->n_inputs; i++) { > > > > > - if (node->inputs[i].node->changed) { > > > > > + if (node->inputs[i].node->state == EN_UPDATED) { > > > > > need_compute = true; > > > > > > > > > > /* Trigger a recompute if we don't have a change > > > > > handler. */ > > > > > if (!node->inputs[i].change_handler) { > > > > > - return engine_recompute(node, false, > > > > > !engine_abort_recompute); > > > > > + engine_recompute(node, false, > > > > > !engine_abort_recompute); > > > > > + return; > > > > > } > > > > > } > > > > > } > > > > > @@ -231,33 +328,47 @@ bool engine_run(struct engine_node *node, > > > > > uint64_t run_id) > > > > > /* If we couldn't compute the node we either aborted or > > > > > triggered > > > > > * a full recompute. In any case, stop processing. > > > > > */ > > > > > - return engine_compute(node, !engine_abort_recompute); > > > > > + if (!engine_compute(node, !engine_abort_recompute)) { > > > > > + return; > > > > > + } > > > > > } > > > > > > > > > > - VLOG_DBG("node: %s, changed: %d", node->name, node->changed); > > > > > - return true; > > > > > + /* If we reached this point, either the node was updated or its > > > > > state is > > > > > + * still valid. > > > > > + */ > > > > > + if (!engine_node_changed(node)) { > > > > > + engine_set_node_state(node, EN_VALID); > > > > > + } > > > > > } > > > > > > > > > > -bool > > > > > -engine_need_run(struct engine_node *node, uint64_t run_id) > > > > > +void > > > > > +engine_run(struct engine_node **nodes, size_t n_count) > > > > > { > > > > > - size_t i; > > > > > + for (size_t i = 0; i < n_count; i++) { > > > > > > > > If an input node didn't finish the run, e.g. aborted, then we shouldn't > > > > continue running for the node depends on it. > > > > > > In this case we have to so we propagate the ABORTED state (in the same > > > way we were doing all the recursive returns). > > > > > Ack. > > > > > > > > > > > + engine_run_node(nodes[i]); > > > > > + } > > > > > +} > > > > > > > > > > - if (node->run_id == run_id) { > > > > > +bool > > > > > +engine_need_run(struct engine_node **nodes, size_t n_count, > > > > > + struct engine_node *root_node) > > > > > +{ > > > > > + if (engine_has_run(root_node)) { > > > > > return false; > > > > > } > > > > > > > > > > - if (!node->n_inputs) { > > > > > - node->run(node); > > > > > - VLOG_DBG("input node: %s, changed: %d", node->name, > > > > > node->changed); > > > > > - return node->changed; > > > > > - } > > > > > + for (size_t i = 0; i < n_count; i++) { > > > > > + /* Check only leaf nodes. */ > > > > > + if (nodes[i]->n_inputs) { > > > > > + continue; > > > > > + } > > > > > > > > > > - for (i = 0; i < node->n_inputs; i++) { > > > > > - if (engine_need_run(node->inputs[i].node, run_id)) { > > > > > + nodes[i]->run(nodes[i]); > > > > > + VLOG_DBG("input node: %s, state: %s", nodes[i]->name, > > > > > + engine_node_state_name[nodes[i]->state]); > > > > > + if (nodes[i]->state == EN_UPDATED) { > > > > > return true; > > > > > } > > > > > } > > > > > - > > > > > return false; > > > > > } > > > > > diff --git a/lib/inc-proc-eng.h b/lib/inc-proc-eng.h > > > > > index abd41b2..69eb9b6 100644 > > > > > --- a/lib/inc-proc-eng.h > > > > > +++ b/lib/inc-proc-eng.h > > > > > @@ -82,10 +82,21 @@ struct engine_node_input { > > > > > bool (*change_handler)(struct engine_node *node); > > > > > }; > > > > > > > > > > -struct engine_node { > > > > > - /* A unique id to distinguish each iteration of the > > > > > engine_run(). */ > > > > > - uint64_t run_id; > > > > > +enum engine_node_state { > > > > > + EN_STALE, /* Data in the node is not up to date with the DB. > > > > > */ > > > > > + EN_UPDATED, /* Data in the node is valid but was updated > > > > > during the > > > > > + * last run. > > > > > + */ > > > > > + EN_VALID, /* Data in the node is valid and didn't change > > > > > during the > > > > > + * last run. > > > > > + */ > > > > > + EN_ABORTED, /* During the last run, processing was aborted for > > > > > + * this node. > > > > > + */ > > > > > + EN_STATE_MAX, > > > > > +}; > > > > > > > > > > +struct engine_node { > > > > > /* A unique name for each node. */ > > > > > char *name; > > > > > > > > > > @@ -102,8 +113,8 @@ struct engine_node { > > > > > * node. */ > > > > > void *data; > > > > > > > > > > - /* Whether the data changed in the last engine run. */ > > > > > - bool changed; > > > > > + /* State of the node after the last engine run. */ > > > > > + enum engine_node_state state; > > > > > > > > > > /* Method to initialize data. It may be NULL. */ > > > > > void (*init)(struct engine_node *); > > > > > @@ -116,23 +127,36 @@ struct engine_node { > > > > > void (*run)(struct engine_node *); > > > > > }; > > > > > > > > > > -/* Initialize the data for the engine nodes recursively. It calls > > > > > each node's > > > > > +/* Return the array of topologically sorted nodes when starting from > > > > > + * 'root_node'. Stores the number of nodes in 'n_count'. > > > > > + * It should be called before the main loop. > > > > > + */ > > > > > +struct engine_node **engine_get_nodes(struct engine_node *root_node, > > > > > + size_t *n_count); > > > > > + > > > > > +/* Initialize the data for the engine nodes. It calls each node's > > > > > * init() method if not NULL. It should be called before the main > > > > > loop. */ > > > > > -void engine_init(struct engine_node *); > > > > > +void engine_init(struct engine_node **nodes, size_t n_count); > > > > > + > > > > > +/* Initialize the engine nodes for a new run. It should be called in > > > > > the > > > > > + * main processing loop before every potential engine_run(). > > > > > + */ > > > > > +void engine_init_run(struct engine_node **nodes, size_t n_count, > > > > > + struct engine_node *root_node); > > > > > > > > > > /* Execute the processing recursively, which should be called in the > > > > > main > > > > > > > > This comment should be updated since you changed it to be non-recursive. > > > > > > Ack. > > > > > > > > > > > > - * loop. Returns true if the execution is compelte, false if it is > > > > > aborted, > > > > > - * which could happen when engine_abort_recompute is set. */ > > > > > -bool engine_run(struct engine_node *, uint64_t run_id); > > > > > + * loop. Updates the engine node's states accordingly. > > > > > + */ > > > > > +void engine_run(struct engine_node **nodes, size_t n_count); > > > > > > > > > > -/* Clean up the data for the engine nodes recursively. It calls each > > > > > node's > > > > > +/* Clean up the data for the engine nodes. It calls each node's > > > > > * cleanup() method if not NULL. It should be called before the > > > > > program > > > > > * terminates. */ > > > > > -void engine_cleanup(struct engine_node *); > > > > > +void engine_cleanup(struct engine_node **nodes, size_t n_count); > > > > > > > > > > /* Check if engine needs to run but didn't. */ > > > > > -bool > > > > > -engine_need_run(struct engine_node *, uint64_t run_id); > > > > > +bool engine_need_run(struct engine_node **nodes, size_t n_count, > > > > > + struct engine_node *root_node); > > > > > > > > > > /* Get the input node with <name> for <node> */ > > > > > struct engine_node * engine_get_input(const char *input_name, > > > > > @@ -159,8 +183,22 @@ const struct engine_context * > > > > > engine_get_context(void); > > > > > > > > > > void engine_set_context(const struct engine_context *); > > > > > > > > > > -/* Return true if the engine has run for 'node' in the 'run_id' > > > > > iteration. */ > > > > > -bool engine_has_run(struct engine_node *node, uint64_t run_id); > > > > > +void engine_set_node_state_at(struct engine_node *node, > > > > > + enum engine_node_state state, > > > > > + const char *where); > > > > > + > > > > > +/* Return true if during the last iteration the node's data was > > > > > updated. */ > > > > > +bool engine_node_changed(struct engine_node *node); > > > > > + > > > > > +/* Return true if the engine has run for 'node' in the last > > > > > iteration. */ > > > > > +bool engine_has_run(struct engine_node *node); > > > > > + > > > > > +/* Returns true if during the last engine run we had to abort > > > > > processing. */ > > > > > +bool engine_aborted(struct engine_node *node); > > > > > + > > > > > +/* Set the state of the node and log changes. */ > > > > > +#define engine_set_node_state(node, state) \ > > > > > + engine_set_node_state_at(node, state, OVS_SOURCE_LOCATOR) > > > > > > > > > > struct ed_ovsdb_index { > > > > > const char *name; > > > > > @@ -187,6 +225,7 @@ void engine_ovsdb_node_add_index(struct > > > > > engine_node *, const char *name, > > > > > struct engine_node en_##NAME = { \ > > > > > .name = NAME_STR, \ > > > > > .data = &ed_##NAME, \ > > > > > + .state = EN_STALE, \ > > > > > .init = en_##NAME##_init, \ > > > > > .run = en_##NAME##_run, \ > > > > > .cleanup = en_##NAME##_cleanup, \ > > > > > @@ -201,10 +240,10 @@ en_##DB_NAME##_##TBL_NAME##_run(struct > > > > > engine_node *node) \ > > > > > const struct DB_NAME##rec_##TBL_NAME##_table *table = \ > > > > > EN_OVSDB_GET(node); \ > > > > > if (DB_NAME##rec_##TBL_NAME##_table_track_get_first(table)) { \ > > > > > - node->changed = true; \ > > > > > + engine_set_node_state(node, EN_UPDATED); \ > > > > > return; \ > > > > > } \ > > > > > - node->changed = false; \ > > > > > + engine_set_node_state(node, EN_VALID); \ > > > > > } \ > > > > > static void (*en_##DB_NAME##_##TBL_NAME##_init)(struct engine_node > > > > > *node) \ > > > > > = NULL; \ > > > > > > > > > > > Thanks again for the thorough review Han. Before I start working on a > > > new version of the series maybe we need to agree on some of these > > > points: > > > > > > 1. Do we switch back to recursion or are you ok with the iterative > > > approach? > > In general I'd prefer to the one with simpler logic, and if they are > > similar then I prefer the one with less changes. In current case I've > > already got familiar the new approach, and I am not sure which one would > > require more effort for v4 at this point, so please pick either one that > > you are comfortable with. > > My personal preference is the iterative version so I'll stick to that. > > > > > > 2. Do we add a structure, e.g., inc_engine, to store an instance of > > > the incremental engine (sorted nodes and count, "root node", > > > force_recompute, abort_recompute)? > > We should move the sorted nodes and count as module static variables, but > > structure is not needed. > > Ack. > > > > > > 3. Do we add a new state EN_INVALID to be reached whenever > > > engine_node_valid(input) returns false and the input node didn't > > > abort? It would never be the case in this patch series but it could > > > happen in the future if more custom is_valid() handlers are added. > > > > I tend to choose option1 of the inlined comment above. > > > > > > > > Thanks, > > > Dumitru > > > > > Thanks for your follow up. I'll start working on a new version of the series. > > Regards, > Dumitru _______________________________________________ dev mailing list d...@openvswitch.org https://mail.openvswitch.org/mailman/listinfo/ovs-dev