On Wed, Nov 20, 2019 at 9:21 AM Dumitru Ceara <dce...@redhat.com> wrote:
>
> On Tue, Nov 19, 2019 at 7:16 PM Han Zhou <hz...@ovn.org> wrote:
> >
> >
> >
> > On Tue, Nov 19, 2019 at 1:58 AM Dumitru Ceara <dce...@redhat.com> wrote:
> > >
> > > On Tue, Nov 19, 2019 at 1:41 AM Han Zhou <hz...@ovn.org> wrote:
> > > >
> > > > Thanks Dumitru. Please see my comments inline below.
> > > >
> > > > On Mon, Nov 18, 2019 at 6:07 AM Dumitru Ceara <dce...@redhat.com> wrote:
> > > > >
> > > > > This commit transforms the 'changed' field in struct engine_node in a
> > > > > 'state' field. Possible node states are:
> > > > > - "Stale": data in the node is not up to date with the DB.
> > > > > - "Updated": data in the node is valid but was updated during
> > > > >   the last run of the engine.
> > > > > - "Valid": data in the node is valid and didn't change during
> > > > >   the last run of the engine.
> > > > > - "Aborted": during the last run, processing was aborted for
> > > > >   this node.
> > > > >
> > > > > This commit also further refactors the I-P engine:
> > > > > - instead of recursively performing all the engine processing a
> > > > >   preprocessing stage is added (engine_get_nodes()) before the main 
> > > > > processing
> > > > >   loop is executed in order to topologically sort nodes in the engine 
> > > > > such
> > > > >   that all inputs of a given node appear in the sorted array before 
> > > > > the node
> > > > >   itself. This simplifies a bit the code in engine_run().
> > > >
> > > > Could you tell the reason of changing it to non-recursive? It seems 
> > > > adding more code rather than simplifying, and effort is needed to 
> > > > ensure the correctness for the new code. Probably there are some 
> > > > benefit that make the later patches easier, but it is not obvious to 
> > > > me. Could you help point out if that's the case?
> > >
> > > My reasoning was that the engine graph is static (i.e., we build it
> > > once at startup and it never changes afterwards) so all recursion
> > > trees are always identical.
> > >
> > I agree that the graph is static, which makes non-recursive a good option, 
> > but it doesn't necessarily mean it simplifies code than the recursive 
> > version :)
>
> I might also just be my personal view that debugging recursive
> functions is more complicated than with iterative ones :)
>
> >
> > > Moreover, with adding engine node explicit states we don't really need
> > > to store a run_id in the nodes because we have the state of each node
> > > after an engine run. I think removing the run_id is a good idea
> > > because we minimize the external state the user of the engine should
> > > manage (in this case engine_run_id and it's incrementing logic).
> > >
> > > However, if we keep the engine processing in a recursive fashion then
> > > for each of the recursive operations (engine_run, engine_need_run,
> > > engine_init, engine_cleanup) we need a way to avoid executing the
> > > operation twice for a given node. This can be done by adding more
> > > flags to the nodes (or more state values) but given that our DAG is
> > > fixed, precomputing the processing order made more sense to me. What
> > > do you think?
> > >
> > It is true that run_id can be completely avoided with the non-recursive 
> > version. However, it can be kept as internal variable for recursive 
> > version, to skip repeated access of a node, if we use the engine_init_run() 
> > to increment it internally. I think we don't really need any more flags 
> > than the non-recursive version other than this internal run_id, and we can 
> > remove the external engine_run_id incrementing logic for recursive version, 
> > too.
>
> Yes, with an internal run_id this could work.
>
> >
> > > >
> > > > > - remove the need for using an engine_run_id by using the newly added 
> > > > > states.
> > > > >
> > > > > Signed-off-by: Dumitru Ceara <dce...@redhat.com>
> > > > > ---
> > > > >  controller/ovn-controller.c |   88 ++++++++++-------
> > > > >  lib/inc-proc-eng.c          |  219 
> > > > > ++++++++++++++++++++++++++++++++-----------
> > > > >  lib/inc-proc-eng.h          |   75 +++++++++++----
> > > > >  3 files changed, 271 insertions(+), 111 deletions(-)
> > > > >
> > > > > diff --git a/controller/ovn-controller.c b/controller/ovn-controller.c
> > > > > index c56190f..033eff4 100644
> > > > > --- a/controller/ovn-controller.c
> > > > > +++ b/controller/ovn-controller.c
> > > > > @@ -758,10 +758,10 @@ en_ofctrl_is_connected_run(struct engine_node 
> > > > > *node)
> > > > >          (struct ed_type_ofctrl_is_connected *)node->data;
> > > > >      if (data->connected != ofctrl_is_connected()) {
> > > > >          data->connected = !data->connected;
> > > > > -        node->changed = true;
> > > > > +        engine_set_node_state(node, EN_UPDATED);
> > > > >          return;
> > > > >      }
> > > > > -    node->changed = false;
> > > > > +    engine_set_node_state(node, EN_VALID);
> > > > >  }
> > > > >
> > > > >  struct ed_type_addr_sets {
> > > > > @@ -811,7 +811,7 @@ en_addr_sets_run(struct engine_node *node)
> > > > >      addr_sets_init(as_table, &as->addr_sets);
> > > > >
> > > > >      as->change_tracked = false;
> > > > > -    node->changed = true;
> > > > > +    engine_set_node_state(node, EN_UPDATED);
> > > > >  }
> > > > >
> > > > >  static bool
> > > > > @@ -830,11 +830,14 @@ addr_sets_sb_address_set_handler(struct 
> > > > > engine_node *node)
> > > > >      addr_sets_update(as_table, &as->addr_sets, &as->new,
> > > > >                       &as->deleted, &as->updated);
> > > > >
> > > > > -    node->changed = !sset_is_empty(&as->new) || 
> > > > > !sset_is_empty(&as->deleted)
> > > > > -                    || !sset_is_empty(&as->updated);
> > > > > +    if (!sset_is_empty(&as->new) || !sset_is_empty(&as->deleted) ||
> > > > > +            !sset_is_empty(&as->updated)) {
> > > > > +        engine_set_node_state(node, EN_UPDATED);
> > > > > +    } else {
> > > > > +        engine_set_node_state(node, EN_VALID);
> > > > > +    }
> > > > >
> > > > >      as->change_tracked = true;
> > > > > -    node->changed = true;
> > > > >      return true;
> > > > >  }
> > > > >
> > > > > @@ -885,7 +888,7 @@ en_port_groups_run(struct engine_node *node)
> > > > >      port_groups_init(pg_table, &pg->port_groups);
> > > > >
> > > > >      pg->change_tracked = false;
> > > > > -    node->changed = true;
> > > > > +    engine_set_node_state(node, EN_UPDATED);
> > > > >  }
> > > > >
> > > > >  static bool
> > > > > @@ -904,11 +907,14 @@ port_groups_sb_port_group_handler(struct 
> > > > > engine_node *node)
> > > > >      port_groups_update(pg_table, &pg->port_groups, &pg->new,
> > > > >                       &pg->deleted, &pg->updated);
> > > > >
> > > > > -    node->changed = !sset_is_empty(&pg->new) || 
> > > > > !sset_is_empty(&pg->deleted)
> > > > > -                    || !sset_is_empty(&pg->updated);
> > > > > +    if (!sset_is_empty(&pg->new) || !sset_is_empty(&pg->deleted) ||
> > > > > +            !sset_is_empty(&pg->updated)) {
> > > > > +        engine_set_node_state(node, EN_UPDATED);
> > > > > +    } else {
> > > > > +        engine_set_node_state(node, EN_VALID);
> > > > > +    }
> > > > >
> > > > >      pg->change_tracked = true;
> > > > > -    node->changed = true;
> > > > >      return true;
> > > > >  }
> > > > >
> > > > > @@ -1091,7 +1097,7 @@ en_runtime_data_run(struct engine_node *node)
> > > > >      update_ct_zones(local_lports, local_datapaths, ct_zones,
> > > > >                      ct_zone_bitmap, pending_ct_zones);
> > > > >
> > > > > -    node->changed = true;
> > > > > +    engine_set_node_state(node, EN_UPDATED);
> > > > >  }
> > > > >
> > > > >  static bool
> > > > > @@ -1157,10 +1163,10 @@ en_mff_ovn_geneve_run(struct engine_node 
> > > > > *node)
> > > > >      enum mf_field_id mff_ovn_geneve = ofctrl_get_mf_field_id();
> > > > >      if (data->mff_ovn_geneve != mff_ovn_geneve) {
> > > > >          data->mff_ovn_geneve = mff_ovn_geneve;
> > > > > -        node->changed = true;
> > > > > +        engine_set_node_state(node, EN_UPDATED);
> > > > >          return;
> > > > >      }
> > > > > -    node->changed = false;
> > > > > +    engine_set_node_state(node, EN_VALID);
> > > > >  }
> > > > >
> > > > >  struct ed_type_flow_output {
> > > > > @@ -1322,7 +1328,7 @@ en_flow_output_run(struct engine_node *node)
> > > > >                   active_tunnels,
> > > > >                   flow_table);
> > > > >
> > > > > -    node->changed = true;
> > > > > +    engine_set_node_state(node, EN_UPDATED);
> > > > >  }
> > > > >
> > > > >  static bool
> > > > > @@ -1404,7 +1410,7 @@ flow_output_sb_logical_flow_handler(struct 
> > > > > engine_node *node)
> > > > >                flow_table, group_table, meter_table, lfrr,
> > > > >                conj_id_ofs);
> > > > >
> > > > > -    node->changed = true;
> > > > > +    engine_set_node_state(node, EN_UPDATED);
> > > > >      return handled;
> > > > >  }
> > > > >
> > > > > @@ -1427,7 +1433,7 @@ flow_output_sb_mac_binding_handler(struct 
> > > > > engine_node *node)
> > > > >      lflow_handle_changed_neighbors(sbrec_port_binding_by_name,
> > > > >              mac_binding_table, flow_table);
> > > > >
> > > > > -    node->changed = true;
> > > > > +    engine_set_node_state(node, EN_UPDATED);
> > > > >      return true;
> > > > >  }
> > > > >
> > > > > @@ -1531,7 +1537,7 @@ flow_output_sb_port_binding_handler(struct 
> > > > > engine_node *node)
> > > > >              chassis, ct_zones, local_datapaths,
> > > > >              active_tunnels, flow_table);
> > > > >
> > > > > -    node->changed = true;
> > > > > +    engine_set_node_state(node, EN_UPDATED);
> > > > >      return true;
> > > > >  }
> > > > >
> > > > > @@ -1580,7 +1586,7 @@ flow_output_sb_multicast_group_handler(struct 
> > > > > engine_node *node)
> > > > >              mff_ovn_geneve, chassis, ct_zones, local_datapaths,
> > > > >              flow_table);
> > > > >
> > > > > -    node->changed = true;
> > > > > +    engine_set_node_state(node, EN_UPDATED);
> > > > >      return true;
> > > > >
> > > > >  }
> > > > > @@ -1694,7 +1700,9 @@ _flow_output_resource_ref_handler(struct 
> > > > > engine_node *node,
> > > > >                      conj_id_ofs, &changed)) {
> > > > >              return false;
> > > > >          }
> > > > > -        node->changed = changed || node->changed;
> > > > > +        if (changed) {
> > > > > +            engine_set_node_state(node, EN_UPDATED);
> > > > > +        }
> > > > >      }
> > > > >      SSET_FOR_EACH (ref_name, updated) {
> > > > >          if (!lflow_handle_changed_ref(ref_type, ref_name,
> > > > > @@ -1707,7 +1715,9 @@ _flow_output_resource_ref_handler(struct 
> > > > > engine_node *node,
> > > > >                      conj_id_ofs, &changed)) {
> > > > >              return false;
> > > > >          }
> > > > > -        node->changed = changed || node->changed;
> > > > > +        if (changed) {
> > > > > +            engine_set_node_state(node, EN_UPDATED);
> > > > > +        }
> > > > >      }
> > > > >      SSET_FOR_EACH (ref_name, new) {
> > > > >          if (!lflow_handle_changed_ref(ref_type, ref_name,
> > > > > @@ -1720,7 +1730,9 @@ _flow_output_resource_ref_handler(struct 
> > > > > engine_node *node,
> > > > >                      conj_id_ofs, &changed)) {
> > > > >              return false;
> > > > >          }
> > > > > -        node->changed = changed || node->changed;
> > > > > +        if (changed) {
> > > > > +            engine_set_node_state(node, EN_UPDATED);
> > > > > +        }
> > > > >      }
> > > > >
> > > > >      return true;
> > > > > @@ -1922,7 +1934,11 @@ main(int argc, char *argv[])
> > > > >      engine_add_input(&en_runtime_data, &en_sb_port_binding,
> > > > >                       runtime_data_sb_port_binding_handler);
> > > > >
> > > > > -    engine_init(&en_flow_output);
> > > > > +    /* Get the sorted engine nodes to be used for every engine run. 
> > > > > */
> > > > > +    size_t en_count = 0;
> > > > > +    struct engine_node **en_nodes = engine_get_nodes(&en_flow_output,
> > > > > +                                                     &en_count);
> > > > > +    engine_init(en_nodes, en_count);
> > > >
> > > > I think engine_get_nodes() and engine_init() can be combined. We only 
> > > > need to expose engine_init() interface, which can call 
> > > > engine_get_nodes() internally and store n_count and nodes internally in 
> > > > engine module.
> > >
> > > Ok but then wouldn't it make more sense then to store all this in an
> > > "struct inc_engine" structure which engine_init() would return? We'd
> > > also need to move the globals engine_force_recompute,
> > > engine_abort_recompute, engine_context inside the new inc_engine
> > > structure then.
> > >
> >
> > It is ok to have "struct inc_engine" structure, but it is only necessary if 
> > we want to support multiple instances of inc-engine. It seems not needed at 
> > this moment. I think it is ok to just keep the globals as module private 
> > (static) variables.
>
> I see, ok.
>
> >
> > > >
> > > > In addition, there can be more than 1 root node in the DAG. Originally 
> > > > we can just call engine_run(root_node) for each root node. Now with the 
> > > > non-recursive, I think we can handle this by:
> > > >   - in engine_get_nodes, we don't need to pass the root_node, but 
> > > > instead, the engine can just process all engine nodes.
> > > >   - remove the root_node parameter from all interfaces.
> > >
> > > In general, a DAG doesn't really have a root node, indeed. But even in
> > > the original code everything was designed with one "ultimate" output
> > > in mind, en_flow_output, and we would only call
> > > engine_run(&en_flow_output, ..). I don't think we can remove the
> > > root_node parameter from engine_get_nodes because we need to have a
> > > starting point for precomputing the processing order of the nodes. We
> > > were implicitly doing the same thing through recursion in the original
> > > code.
> > >
> >
> > In fact the recursive version didn't have such constraint. The idea was, if 
> > there is any node that needs compute but is not a offspring of the 
> > flow_output, we could just call engine_run(&en_xxx) in the same iteration 
> > as engine_run(&en_flow_output). (Maybe there were places need some update 
> > since it is never tested this way but in general this design should work).
> > Now with the non-recursive version, I think we'd better not add any extra 
> > constraint for this. With the current patch, we can't just call engine_run 
> > with another *root node* any more because the interface changed. But I 
> > think this can be solved easily in the topo_sort logic, by processing all 
> > nodes in the graph, instead of starting from a given root node.
> > In fact, even for the recursive version, it would be even better to let the 
> > engine figure out by itself which nodes are the roots and execute from 
> > there - it was just not that important, it would be needed if there were 
> > more root nodes.
> > Removing the root node from all the interfaces would also ensure the engine 
> > is always executed/evaluated as a whole and rule out any possible misuse of 
> > the engine by passing a middle node that results in partial exeuction of 
> > the engine and unpredicatble consequences.
>
> Ok, I'll change the code to remove all root nodes from interfaces. I
> need to think more about how to do it at engine_init() because there
> we would need to pass the nodes to the incremental engine somehow.

Hi Han,

I sent a v6 addressing everything we discussed in this thread:

https://patchwork.ozlabs.org/project/openvswitch/list/?series=144565

The only exception is engine_init(). I still pass the en_flow_output
node like we did in the recursive implementation because otherwise we
don't really have a way to associate the nodes that we create in
ovn-controller.c with the incremental engine.

This can be further refined if/when we need to have multiple "root" nodes.

Thanks,
Dumitru

>
> >
> > > >
> > > > >
> > > > >      ofctrl_init(&ed_flow_output.group_table,
> > > > >                  &ed_flow_output.meter_table,
> > > > > @@ -1941,9 +1957,6 @@ main(int argc, char *argv[])
> > > > >      unixctl_command_register("inject-pkt", "MICROFLOW", 1, 1, 
> > > > > inject_pkt,
> > > > >                               &pending_pkt);
> > > > >
> > > > > -    uint64_t engine_run_id = 0;
> > > > > -    bool engine_run_done = true;
> > > > > -
> > > > >      unsigned int ovs_cond_seqno = UINT_MAX;
> > > > >      unsigned int ovnsb_cond_seqno = UINT_MAX;
> > > > >
> > > > > @@ -1951,7 +1964,7 @@ main(int argc, char *argv[])
> > > > >      exiting = false;
> > > > >      restart = false;
> > > > >      while (!exiting) {
> > > > > -        engine_run_id++;
> > > > > +        engine_init_run(en_nodes, en_count, &en_flow_output);
> > > > >
> > > > >          update_sb_db(ovs_idl_loop.idl, ovnsb_idl_loop.idl);
> > > > >          update_ssl_config(ovsrec_ssl_table_get(ovs_idl_loop.idl));
> > > > > @@ -2044,15 +2057,13 @@ main(int argc, char *argv[])
> > > > >                               * this round of engine_run and continue 
> > > > > processing
> > > > >                               * acculated changes incrementally later 
> > > > > when
> > > > >                               * ofctrl_can_put() returns true. */
> > > > > -                            if (engine_run_done) {
> > > > > +                            if (!engine_aborted(&en_flow_output)) {
> > > > >                                  engine_set_abort_recompute(true);
> > > > > -                                engine_run_done = 
> > > > > engine_run(&en_flow_output,
> > > > > -                                                             
> > > > > engine_run_id);
> > > > > +                                engine_run(en_nodes, en_count);
> > > > >                              }
> > > > >                          } else {
> > > > >                              engine_set_abort_recompute(false);
> > > > > -                            engine_run_done = true;
> > > > > -                            engine_run(&en_flow_output, 
> > > > > engine_run_id);
> > > > > +                            engine_run(en_nodes, en_count);
> > > > >                          }
> > > > >                      }
> > > > >                      stopwatch_stop(CONTROLLER_LOOP_STOPWATCH_NAME,
> > > > > @@ -2071,7 +2082,7 @@ main(int argc, char *argv[])
> > > > >                                 
> > > > > sbrec_meter_table_get(ovnsb_idl_loop.idl),
> > > > >                                 get_nb_cfg(sbrec_sb_global_table_get(
> > > > >                                                ovnsb_idl_loop.idl)),
> > > > > -                               en_flow_output.changed);
> > > > > +                               engine_node_changed(&en_flow_output));
> > > > >                      pinctrl_run(ovnsb_idl_txn,
> > > > >                                  sbrec_datapath_binding_by_key,
> > > > >                                  sbrec_port_binding_by_datapath,
> > > > > @@ -2089,7 +2100,7 @@ main(int argc, char *argv[])
> > > > >                                  &ed_runtime_data.local_datapaths,
> > > > >                                  &ed_runtime_data.active_tunnels);
> > > > >
> > > > > -                    if (en_runtime_data.changed) {
> > > > > +                    if (engine_node_changed(&en_runtime_data)) {
> > > > >                          update_sb_monitors(ovnsb_idl_loop.idl, 
> > > > > chassis,
> > > > >                                             
> > > > > &ed_runtime_data.local_lports,
> > > > >                                             
> > > > > &ed_runtime_data.local_datapaths);
> > > > > @@ -2097,17 +2108,17 @@ main(int argc, char *argv[])
> > > > >                  }
> > > > >
> > > > >              }
> > > > > -            if (engine_need_run(&en_flow_output, engine_run_id)) {
> > > > > +            if (engine_need_run(en_nodes, en_count, 
> > > > > &en_flow_output)) {
> > > > >                  VLOG_DBG("engine did not run, force recompute next 
> > > > > time: "
> > > > >                              "br_int %p, chassis %p", br_int, 
> > > > > chassis);
> > > > >                  engine_set_force_recompute(true);
> > > > >                  poll_immediate_wake();
> > > > > -            } else if (!engine_run_done) {
> > > > > +            } else if (engine_aborted(&en_flow_output)) {
> > > > >                  VLOG_DBG("engine was aborted, force recompute next 
> > > > > time: "
> > > > >                           "br_int %p, chassis %p", br_int, chassis);
> > > > >                  engine_set_force_recompute(true);
> > > > >                  poll_immediate_wake();
> > > > > -            } else if (!engine_has_run(&en_flow_output, 
> > > > > engine_run_id)) {
> > > > > +            } else if (!engine_has_run(&en_flow_output)) {
> > > > >                  VLOG_DBG("engine did not run, and it was not needed"
> > > > >                           " either: br_int %p, chassis %p",
> > > > >                           br_int, chassis);
> > > > > @@ -2135,8 +2146,7 @@ main(int argc, char *argv[])
> > > > >                      }
> > > > >                  } else {
> > > > >                      VLOG_DBG("Pending_pkt conn but br_int %p or 
> > > > > chassis "
> > > > > -                             "%p not ready. run-id: %"PRIu64, br_int,
> > > > > -                             chassis, engine_run_id);
> > > > > +                             "%p not ready.", br_int, chassis);
> > > > >                      unixctl_command_reply_error(pending_pkt.conn,
> > > > >                          "ovn-controller not ready.");
> > > > >                  }
> > > > > @@ -2185,7 +2195,7 @@ main(int argc, char *argv[])
> > > > >      }
> > > > >
> > > > >      engine_set_context(NULL);
> > > > > -    engine_cleanup(&en_flow_output);
> > > > > +    engine_cleanup(en_nodes, en_count);
> > > > >
> > > > >      /* It's time to exit.  Clean up the databases if we are not 
> > > > > restarting */
> > > > >      if (!restart) {
> > > > > diff --git a/lib/inc-proc-eng.c b/lib/inc-proc-eng.c
> > > > > index 8a085e2..ee6afbe 100644
> > > > > --- a/lib/inc-proc-eng.c
> > > > > +++ b/lib/inc-proc-eng.c
> > > > > @@ -34,6 +34,13 @@ static bool engine_force_recompute = false;
> > > > >  static bool engine_abort_recompute = false;
> > > > >  static const struct engine_context *engine_context;
> > > > >
> > > > > +static const char *engine_node_state_name[EN_STATE_MAX] = {
> > > > > +    [EN_STALE]   = "Stale",
> > > > > +    [EN_UPDATED] = "Updated",
> > > > > +    [EN_VALID]   = "Valid",
> > > > > +    [EN_ABORTED] = "Aborted",
> > > > > +};
> > > > > +
> > > > >  void
> > > > >  engine_set_force_recompute(bool val)
> > > > >  {
> > > > > @@ -58,26 +65,62 @@ engine_set_context(const struct engine_context 
> > > > > *ctx)
> > > > >      engine_context = ctx;
> > > > >  }
> > > > >
> > > > > -void
> > > > > -engine_init(struct engine_node *node)
> > > > > +/* Builds the topologically sorted 'sorted_nodes' array starting from
> > > > > + * 'node'.
> > > > > + */
> > > > > +static struct engine_node **
> > > > > +engine_topo_sort(struct engine_node *node, struct engine_node 
> > > > > **sorted_nodes,
> > > > > +                 size_t *n_count, size_t *n_size)
> > > > >  {
> > > > > +    /* It's not so efficient to walk the array of already sorted 
> > > > > nodes but
> > > > > +     * we know that sorting is done only once at startup so it's ok 
> > > > > for now.
> > > > > +     */
> > > > > +    for (size_t i = 0; i < *n_count; i++) {
> > > > > +        if (sorted_nodes[i] == node) {
> > > > > +            return sorted_nodes;
> > > > > +        }
> > > > > +    }
> > > > > +
> > > > >      for (size_t i = 0; i < node->n_inputs; i++) {
> > > > > -        engine_init(node->inputs[i].node);
> > > > > +        sorted_nodes = engine_topo_sort(node->inputs[i].node, 
> > > > > sorted_nodes,
> > > > > +                                        n_count, n_size);
> > > > >      }
> > > > > -    if (node->init) {
> > > > > -        node->init(node);
> > > > > +    if (*n_count == *n_size) {
> > > > > +        sorted_nodes = x2nrealloc(sorted_nodes, n_size, sizeof 
> > > > > *sorted_nodes);
> > > > >      }
> > > > > +    sorted_nodes[(*n_count)] = node;
> > > > > +    (*n_count)++;
> > > > > +    return sorted_nodes;
> > > > > +}
> > > > > +
> > > > > +struct engine_node **
> > > > > +engine_get_nodes(struct engine_node *root_node, size_t *n_count)
> > > > > +{
> > > > > +    size_t n_size = 0;
> > > > > +
> > > > > +    *n_count = 0;
> > > > > +    return engine_topo_sort(root_node, NULL, n_count, &n_size);
> > > > >  }
> > > > >
> > > > >  void
> > > > > -engine_cleanup(struct engine_node *node)
> > > > > +engine_init(struct engine_node **nodes, size_t n_count)
> > > > >  {
> > > > > -    for (size_t i = 0; i < node->n_inputs; i++) {
> > > > > -        engine_cleanup(node->inputs[i].node);
> > > > > +    for (size_t i = 0; i < n_count; i++) {
> > > > > +        if (nodes[i]->init) {
> > > > > +            nodes[i]->init(nodes[i]);
> > > > > +        }
> > > > >      }
> > > > > -    if (node->cleanup) {
> > > > > -        node->cleanup(node);
> > > > > +}
> > > > > +
> > > > > +void
> > > > > +engine_cleanup(struct engine_node **nodes, size_t n_count)
> > > > > +{
> > > > > +    for (size_t i = 0; i < n_count; i++) {
> > > > > +        if (nodes[i]->cleanup) {
> > > > > +            nodes[i]->cleanup(nodes[i]);
> > > > > +        }
> > > > >      }
> > > > > +    free(nodes);
> > > > >  }
> > > > >
> > > > >  struct engine_node *
> > > > > @@ -128,16 +171,66 @@ engine_ovsdb_node_add_index(struct engine_node 
> > > > > *node, const char *name,
> > > > >      ed->n_indexes ++;
> > > > >  }
> > > > >
> > > > > +void
> > > > > +engine_set_node_state_at(struct engine_node *node,
> > > > > +                         enum engine_node_state state,
> > > > > +                         const char *where)
> > > > > +{
> > > > > +    if (node->state == state) {
> > > > > +        return;
> > > > > +    }
> > > > > +
> > > > > +    VLOG_DBG("%s: node: %s, old_state %s, new_state %s",
> > > > > +             where, node->name,
> > > > > +             engine_node_state_name[node->state],
> > > > > +             engine_node_state_name[state]);
> > > > > +
> > > > > +    node->state = state;
> > > > > +}
> > > > > +
> > > > > +static bool
> > > > > +engine_node_valid(struct engine_node *node)
> > > > > +{
> > > > > +    return (node->state == EN_UPDATED || node->state == EN_VALID);
> > > > > +}
> > > > > +
> > > > >  bool
> > > > > -engine_has_run(struct engine_node *node, uint64_t run_id)
> > > > > +engine_node_changed(struct engine_node *node)
> > > > >  {
> > > > > -    return node->run_id == run_id;
> > > > > +    return node->state == EN_UPDATED;
> > > > > +}
> > > > > +
> > > > > +bool
> > > > > +engine_has_run(struct engine_node *node)
> > > > > +{
> > > >
> > > > engine_has_run() should go through all nodes. If any node is NOT STALE, 
> > > > return true. Engine hasn't run only if all nodes are STALE. (orginially 
> > > > it is easier to tell by utilizing engine_run_id)
> > > > If some nodes are STALE, some are not, it means engine has run but 
> > > > aborted.
> > >
> > > Due to the way the code is written (and as you noticed in earlier
> > > versions of the patches) the STALE state is transient. If the engine
> > > ran, even if it aborted, then no node will be in state STALE. Please
> > > see below regarding how ABORTED is propagated.
> > >
> >
> > Ok, sorry that I misread the code earlier. However, after rewalk the code 
> > with your explain, it seems there are still chances (in theory) that some 
> > nodes are ABORTED but some nodes are STALE. For example, if A is input of 
> > B, and if A is UPDATED but is_valid(A) returns false because of custom 
> > implementation of is_valid(), then B will stay STALE.
> > Would it be more generic and correct to go through all nodes and tell if 
> > the engine has run as a whole instead of based on the input node? (which 
> > suggests not having the node arg)
> >
>
> Right, this is safer and there's no real performance trade off either.
> I'll do it like this and walk all the nodes.
>
> > > >
> > > > > +    return node->state != EN_STALE;
> > > > > +}
> > > > > +
> > > > > +bool
> > > > > +engine_aborted(struct engine_node *node)
> > > > > +{
> > > > > +    return node->state == EN_ABORTED;
> > > > > +}
> > > > > +
> > > > > +void
> > > > > +engine_init_run(struct engine_node **nodes, size_t n_count,
> > > > > +                struct engine_node *root_node)
> > > > > +{
> > > > > +    /* No need to reinitialize if last run didn't happen. */
> > > > > +    if (!engine_has_run(root_node)) {
> > > >
> > > > I think here is a problem. If in the last round, the root node didn't 
> > > > run because it is aborted at some intermediate node, but many other 
> > > > nodes could have run and state already changed to VALID/UPDATED. Now if 
> > > > we don't do the init, those nodes may contain invalid data since it is 
> > > > a new round of iteration, but the state telling they are valid.
> > >
> > > If one of the inputs of a given node went to ABORT state then the node
> > > itself will also move to ABORT. And this will get propagated to all
> > > successor nodes.
> > > The idea of the check was to avoid walking all the nodes if the
> > > root_node is in state STALE (engine_has_run() == false). STALE is
> > > transient so if one node is STALE then all nodes are STALE and this
> > > can only happen if the engine didn't run in the current iteration.
> > >
> > Sorry it was my misunderstanding of ABORT propagation logic.
> >
> > > >
> > > > We don't need to pass "root_node" for engine_init_run. We can just 
> > > > reset all nodes to STALE state.
> > >
> > > I agree, we could decide if an engine needs init_run by checking that
> > > the first node in 'nodes' is not in state STALE.
> > >
> > > >
> > > >
> > > > > +        return;
> > > > > +    }
> > > > > +
> > > > > +    VLOG_DBG("Initializing new run");
> > > > > +    for (size_t i = 0; i < n_count; i++) {
> > > > > +        engine_set_node_state(nodes[i], EN_STALE);
> > > > > +    }
> > > > >  }
> > > > >
> > > > >  /* Do a full recompute (or at least try). If we're not allowed then
> > > > >   * mark the node as "aborted".
> > > > >   */
> > > > > -static bool
> > > > > +static void
> > > > >  engine_recompute(struct engine_node *node, bool forced, bool allowed)
> > > > >  {
> > > > >      VLOG_DBG("node: %s, recompute (%s)", node->name,
> > > > > @@ -145,12 +238,12 @@ engine_recompute(struct engine_node *node, bool 
> > > > > forced, bool allowed)
> > > > >
> > > > >      if (!allowed) {
> > > > >          VLOG_DBG("node: %s, recompute aborted", node->name);
> > > > > -        return false;
> > > > > +        engine_set_node_state(node, EN_ABORTED);
> > > > > +        return;
> > > > >      }
> > > > >
> > > > > +    /* Run the node handler which might change state. */
> > > > >      node->run(node);
> > > > > -    VLOG_DBG("node: %s, changed: %d", node->name, node->changed);
> > > > > -    return true;
> > > > >  }
> > > > >
> > > > >  /* Return true if the node could be computed without triggerring a 
> > > > > full
> > > > > @@ -161,7 +254,7 @@ engine_compute(struct engine_node *node, bool 
> > > > > recompute_allowed)
> > > > >  {
> > > > >      for (size_t i = 0; i < node->n_inputs; i++) {
> > > > >          /* If the input node data changed call its change handler. */
> > > > > -        if (node->inputs[i].node->changed) {
> > > > > +        if (node->inputs[i].node->state == EN_UPDATED) {
> > > > >              VLOG_DBG("node: %s, handle change for input %s",
> > > > >                       node->name, node->inputs[i].node->name);
> > > > >
> > > > > @@ -172,57 +265,61 @@ engine_compute(struct engine_node *node, bool 
> > > > > recompute_allowed)
> > > > >                  VLOG_DBG("node: %s, can't handle change for input 
> > > > > %s, "
> > > > >                           "fall back to recompute",
> > > > >                           node->name, node->inputs[i].node->name);
> > > > > -                if (!engine_recompute(node, false, 
> > > > > recompute_allowed)) {
> > > > > +                engine_recompute(node, false, recompute_allowed);
> > > > > +                if (engine_aborted(node)) {
> > > >
> > > > The aborted state was propagated through the recursive logic, but it is 
> > > > not the case in this new implementation. Is this on purpose?
> > >
> > > With the new implementation the aborted state is also propagated, just
> > > that it's done in engine_run_node().
> > >
> > Ack.
> >
> > > >
> > > > >                      return false;
> > > > >                  }
> > > > >              }
> > > > >          }
> > > > >      }
> > > > > -
> > > > >      return true;
> > > > >  }
> > > > >
> > > > > -bool engine_run(struct engine_node *node, uint64_t run_id)
> > > > > +static void
> > > > > +engine_run_node(struct engine_node *node)
> > > > >  {
> > > > > -    if (node->run_id == run_id) {
> > > > > -        /* The node was already updated in this run (could be input 
> > > > > for
> > > > > -         * multiple other nodes). Stop processing.
> > > > > -         */
> > > > > -        return true;
> > > > > -    }
> > > > > -
> > > > > -    /* Initialize the node for this run. */
> > > > > -    node->run_id = run_id;
> > > > > -    node->changed = false;
> > > > > -
> > > > >      if (!node->n_inputs) {
> > > > > +        /* Run the node handler which might change state. */
> > > > >          node->run(node);
> > > > > -        VLOG_DBG("node: %s, changed: %d", node->name, node->changed);
> > > > > -        return true;
> > > > > +        return;
> > > > >      }
> > > > >
> > > > > +    bool input_stale = false;
> > > > >      for (size_t i = 0; i < node->n_inputs; i++) {
> > > > > -        if (!engine_run(node->inputs[i].node, run_id)) {
> > > > > -            return false;
> > > > > +        if (!engine_node_valid(node->inputs[i].node)) {
> > > > > +            /* If the input node aborted computation, move to 
> > > > > EN_ABORTED.
> > > > > +             * This will be propagated to following nodes.
> > > > > +             */
> > > > > +            if (engine_aborted(node->inputs[i].node)) {
> > > > > +                engine_set_node_state(node, EN_ABORTED);
> > > > > +            }
> > >
> > > Here we propagate the ABORTED state.
> > >
> > Ack.
> >
> > > > > +
> > > > > +            input_stale = true;
> > > > >          }
> > > > >      }
> > > > >
> > > > > -    bool need_compute = false;
> > > > > +    /* If at least one input is stale, don't change state. */
> > > > > +    if (input_stale) {
> > > > > +        return;
> > > > > +    }
> > >
> > > With the current ovn-controller code, input_stale will never be true
> > > unless one of the input nodes aborted computation. However, because of
> > > the is_valid interface we're adding in the last patch of the series,
> > > engine_node_valid(input) might return false in the future based on a
> > > user defined condition even if the state is computed.
> > >
> > > Maybe for better readability I can introduce a new state called
> > > EN_INVALID and enter it every time engine_node_valid(input) returns
> > > false. What do you think?
> > >
> > I think we have 2 options here, each based on different assumption.
> >
> > Option1: Assume there is no use case that a custom is_valid() would return 
> > false after the node is computed. We can assert this assumption in 
> > engine_node_valid() by checking the node state first, and only call the 
> > node->is_valid() when the node state is not valid (i.e. ABORTED/STALE).
> >
> > Option2: Assume there is real use case that a custom is_valid() would 
> > return false after the node is computed, probably because some of the input 
> > doesn't meet the node's expectation. In this case I think instead of 
> > relying on the call of is_valid(), it is more straightforward that the 
> > node->run() or node->change_handler_xxx() tells this unexpected situation 
> > and set the node state directly, like how the UPDATED state is set. It is 
> > better not to set the state to INVALID only when engine_node_valid() is 
> > called.
> >
> > I tend to do option1 first, because the more state, the more complexity, 
> > and I don't see such use case of option2 yet.
>
> Sounds good to me. I'll go with option1.
>
> >
> > > > >
> > > > >      if (engine_force_recompute) {
> > > > > -        return engine_recompute(node, true, !engine_abort_recompute);
> > > > > +        engine_recompute(node, true, !engine_abort_recompute);
> > > > > +        return;
> > > > >      }
> > > > >
> > > > >      /* If any of the inputs updated data but there is no 
> > > > > change_handler, then
> > > > >       * recompute the current node too.
> > > > >       */
> > > > > +    bool need_compute = false;
> > > > >      for (size_t i = 0; i < node->n_inputs; i++) {
> > > > > -        if (node->inputs[i].node->changed) {
> > > > > +        if (node->inputs[i].node->state == EN_UPDATED) {
> > > > >              need_compute = true;
> > > > >
> > > > >              /* Trigger a recompute if we don't have a change 
> > > > > handler. */
> > > > >              if (!node->inputs[i].change_handler) {
> > > > > -                return engine_recompute(node, false, 
> > > > > !engine_abort_recompute);
> > > > > +                engine_recompute(node, false, 
> > > > > !engine_abort_recompute);
> > > > > +                return;
> > > > >              }
> > > > >          }
> > > > >      }
> > > > > @@ -231,33 +328,47 @@ bool engine_run(struct engine_node *node, 
> > > > > uint64_t run_id)
> > > > >          /* If we couldn't compute the node we either aborted or 
> > > > > triggered
> > > > >           * a full recompute. In any case, stop processing.
> > > > >           */
> > > > > -        return engine_compute(node, !engine_abort_recompute);
> > > > > +        if (!engine_compute(node, !engine_abort_recompute)) {
> > > > > +            return;
> > > > > +        }
> > > > >      }
> > > > >
> > > > > -    VLOG_DBG("node: %s, changed: %d", node->name, node->changed);
> > > > > -    return true;
> > > > > +    /* If we reached this point, either the node was updated or its 
> > > > > state is
> > > > > +     * still valid.
> > > > > +     */
> > > > > +    if (!engine_node_changed(node)) {
> > > > > +        engine_set_node_state(node, EN_VALID);
> > > > > +    }
> > > > >  }
> > > > >
> > > > > -bool
> > > > > -engine_need_run(struct engine_node *node, uint64_t run_id)
> > > > > +void
> > > > > +engine_run(struct engine_node **nodes, size_t n_count)
> > > > >  {
> > > > > -    size_t i;
> > > > > +    for (size_t i = 0; i < n_count; i++) {
> > > >
> > > > If an input node didn't finish the run, e.g. aborted, then we shouldn't 
> > > > continue running for the node depends on it.
> > >
> > > In this case we have to so we propagate the ABORTED state (in the same
> > > way we were doing all the recursive returns).
> > >
> > Ack.
> >
> > > >
> > > > > +        engine_run_node(nodes[i]);
> > > > > +    }
> > > > > +}
> > > > >
> > > > > -    if (node->run_id == run_id) {
> > > > > +bool
> > > > > +engine_need_run(struct engine_node **nodes, size_t n_count,
> > > > > +                struct engine_node *root_node)
> > > > > +{
> > > > > +    if (engine_has_run(root_node)) {
> > > > >          return false;
> > > > >      }
> > > > >
> > > > > -    if (!node->n_inputs) {
> > > > > -        node->run(node);
> > > > > -        VLOG_DBG("input node: %s, changed: %d", node->name, 
> > > > > node->changed);
> > > > > -        return node->changed;
> > > > > -    }
> > > > > +    for (size_t i = 0; i < n_count; i++) {
> > > > > +        /* Check only leaf nodes. */
> > > > > +        if (nodes[i]->n_inputs) {
> > > > > +            continue;
> > > > > +        }
> > > > >
> > > > > -    for (i = 0; i < node->n_inputs; i++) {
> > > > > -        if (engine_need_run(node->inputs[i].node, run_id)) {
> > > > > +        nodes[i]->run(nodes[i]);
> > > > > +        VLOG_DBG("input node: %s, state: %s", nodes[i]->name,
> > > > > +                 engine_node_state_name[nodes[i]->state]);
> > > > > +        if (nodes[i]->state == EN_UPDATED) {
> > > > >              return true;
> > > > >          }
> > > > >      }
> > > > > -
> > > > >      return false;
> > > > >  }
> > > > > diff --git a/lib/inc-proc-eng.h b/lib/inc-proc-eng.h
> > > > > index abd41b2..69eb9b6 100644
> > > > > --- a/lib/inc-proc-eng.h
> > > > > +++ b/lib/inc-proc-eng.h
> > > > > @@ -82,10 +82,21 @@ struct engine_node_input {
> > > > >      bool (*change_handler)(struct engine_node *node);
> > > > >  };
> > > > >
> > > > > -struct engine_node {
> > > > > -    /* A unique id to distinguish each iteration of the 
> > > > > engine_run(). */
> > > > > -    uint64_t run_id;
> > > > > +enum engine_node_state {
> > > > > +    EN_STALE,     /* Data in the node is not up to date with the DB. 
> > > > > */
> > > > > +    EN_UPDATED,   /* Data in the node is valid but was updated 
> > > > > during the
> > > > > +                   * last run.
> > > > > +                   */
> > > > > +    EN_VALID,     /* Data in the node is valid and didn't change 
> > > > > during the
> > > > > +                   * last run.
> > > > > +                   */
> > > > > +    EN_ABORTED,   /* During the last run, processing was aborted for
> > > > > +                   * this node.
> > > > > +                   */
> > > > > +    EN_STATE_MAX,
> > > > > +};
> > > > >
> > > > > +struct engine_node {
> > > > >      /* A unique name for each node. */
> > > > >      char *name;
> > > > >
> > > > > @@ -102,8 +113,8 @@ struct engine_node {
> > > > >       * node. */
> > > > >      void *data;
> > > > >
> > > > > -    /* Whether the data changed in the last engine run. */
> > > > > -    bool changed;
> > > > > +    /* State of the node after the last engine run. */
> > > > > +    enum engine_node_state state;
> > > > >
> > > > >      /* Method to initialize data. It may be NULL. */
> > > > >      void (*init)(struct engine_node *);
> > > > > @@ -116,23 +127,36 @@ struct engine_node {
> > > > >      void (*run)(struct engine_node *);
> > > > >  };
> > > > >
> > > > > -/* Initialize the data for the engine nodes recursively. It calls 
> > > > > each node's
> > > > > +/* Return the array of topologically sorted nodes when starting from
> > > > > + * 'root_node'. Stores the number of nodes in 'n_count'.
> > > > > + * It should be called before the main loop.
> > > > > + */
> > > > > +struct engine_node **engine_get_nodes(struct engine_node *root_node,
> > > > > +                                      size_t *n_count);
> > > > > +
> > > > > +/* Initialize the data for the engine nodes. It calls each node's
> > > > >   * init() method if not NULL. It should be called before the main 
> > > > > loop. */
> > > > > -void engine_init(struct engine_node *);
> > > > > +void engine_init(struct engine_node **nodes, size_t n_count);
> > > > > +
> > > > > +/* Initialize the engine nodes for a new run. It should be called in 
> > > > > the
> > > > > + * main processing loop before every potential engine_run().
> > > > > + */
> > > > > +void engine_init_run(struct engine_node **nodes, size_t n_count,
> > > > > +                     struct engine_node *root_node);
> > > > >
> > > > >  /* Execute the processing recursively, which should be called in the 
> > > > > main
> > > >
> > > > This comment should be updated since you changed it to be non-recursive.
> > >
> > > Ack.
> > >
> > > >
> > > > > - * loop. Returns true if the execution is compelte, false if it is 
> > > > > aborted,
> > > > > - * which could happen when engine_abort_recompute is set. */
> > > > > -bool engine_run(struct engine_node *, uint64_t run_id);
> > > > > + * loop. Updates the engine node's states accordingly.
> > > > > + */
> > > > > +void engine_run(struct engine_node **nodes, size_t n_count);
> > > > >
> > > > > -/* Clean up the data for the engine nodes recursively. It calls each 
> > > > > node's
> > > > > +/* Clean up the data for the engine nodes. It calls each node's
> > > > >   * cleanup() method if not NULL. It should be called before the 
> > > > > program
> > > > >   * terminates. */
> > > > > -void engine_cleanup(struct engine_node *);
> > > > > +void engine_cleanup(struct engine_node **nodes, size_t n_count);
> > > > >
> > > > >  /* Check if engine needs to run but didn't. */
> > > > > -bool
> > > > > -engine_need_run(struct engine_node *, uint64_t run_id);
> > > > > +bool engine_need_run(struct engine_node **nodes, size_t n_count,
> > > > > +                     struct engine_node *root_node);
> > > > >
> > > > >  /* Get the input node with <name> for <node> */
> > > > >  struct engine_node * engine_get_input(const char *input_name,
> > > > > @@ -159,8 +183,22 @@ const struct engine_context * 
> > > > > engine_get_context(void);
> > > > >
> > > > >  void engine_set_context(const struct engine_context *);
> > > > >
> > > > > -/* Return true if the engine has run for 'node' in the 'run_id' 
> > > > > iteration. */
> > > > > -bool engine_has_run(struct engine_node *node, uint64_t run_id);
> > > > > +void engine_set_node_state_at(struct engine_node *node,
> > > > > +                              enum engine_node_state state,
> > > > > +                              const char *where);
> > > > > +
> > > > > +/* Return true if during the last iteration the node's data was 
> > > > > updated. */
> > > > > +bool engine_node_changed(struct engine_node *node);
> > > > > +
> > > > > +/* Return true if the engine has run for 'node' in the last 
> > > > > iteration. */
> > > > > +bool engine_has_run(struct engine_node *node);
> > > > > +
> > > > > +/* Returns true if during the last engine run we had to abort 
> > > > > processing. */
> > > > > +bool engine_aborted(struct engine_node *node);
> > > > > +
> > > > > +/* Set the state of the node and log changes. */
> > > > > +#define engine_set_node_state(node, state) \
> > > > > +    engine_set_node_state_at(node, state, OVS_SOURCE_LOCATOR)
> > > > >
> > > > >  struct ed_ovsdb_index {
> > > > >      const char *name;
> > > > > @@ -187,6 +225,7 @@ void engine_ovsdb_node_add_index(struct 
> > > > > engine_node *, const char *name,
> > > > >      struct engine_node en_##NAME = { \
> > > > >          .name = NAME_STR, \
> > > > >          .data = &ed_##NAME, \
> > > > > +        .state = EN_STALE, \
> > > > >          .init = en_##NAME##_init, \
> > > > >          .run = en_##NAME##_run, \
> > > > >          .cleanup = en_##NAME##_cleanup, \
> > > > > @@ -201,10 +240,10 @@ en_##DB_NAME##_##TBL_NAME##_run(struct 
> > > > > engine_node *node) \
> > > > >      const struct DB_NAME##rec_##TBL_NAME##_table *table = \
> > > > >          EN_OVSDB_GET(node); \
> > > > >      if (DB_NAME##rec_##TBL_NAME##_table_track_get_first(table)) { \
> > > > > -        node->changed = true; \
> > > > > +        engine_set_node_state(node, EN_UPDATED); \
> > > > >          return; \
> > > > >      } \
> > > > > -    node->changed = false; \
> > > > > +    engine_set_node_state(node, EN_VALID); \
> > > > >  } \
> > > > >  static void (*en_##DB_NAME##_##TBL_NAME##_init)(struct engine_node 
> > > > > *node) \
> > > > >              = NULL; \
> > > > >
> > >
> > > Thanks again for the thorough review Han. Before I start working on a
> > > new version of the series maybe we need to agree on some of these
> > > points:
> > >
> > > 1. Do we switch back to recursion or are you ok with the iterative 
> > > approach?
> > In general I'd prefer to the one with simpler logic, and if they are 
> > similar then I prefer the one with less changes. In current case I've 
> > already got familiar the new approach, and I am not sure which one would 
> > require more effort for v4 at this point, so please pick either one that 
> > you are comfortable with.
>
> My personal preference is the iterative version so I'll stick to that.
>
> >
> > > 2. Do we add a structure, e.g., inc_engine, to store an instance of
> > > the incremental engine (sorted nodes and count, "root node",
> > > force_recompute, abort_recompute)?
> > We should move the sorted nodes and count as module static variables, but 
> > structure is not needed.
>
> Ack.
>
> >
> > > 3. Do we add a new state EN_INVALID to be reached whenever
> > > engine_node_valid(input) returns false and the input node didn't
> > > abort? It would never be the case in this patch series but it could
> > > happen in the future if more custom is_valid() handlers are added.
> >
> > I tend to choose option1 of the inlined comment above.
> >
> > >
> > > Thanks,
> > > Dumitru
> > >
>
> Thanks for your follow up. I'll start working on a new version of the series.
>
> Regards,
> Dumitru

_______________________________________________
dev mailing list
d...@openvswitch.org
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to