On Wed, Dec 4, 2019 at 8:28 AM Dumitru Ceara <[email protected]> wrote:
>
> The incremental processing engine might stop a run before the
> en_runtime_data node is processed. In such cases the ed_runtime_data
> fields might contain pointers to already deleted SB records. For
> example, if a port binding corresponding to a patch port is removed from
> the SB database and the incremental processing engine aborts before the
> en_runtime_data node is processed then the corresponding local_datapath
> hashtable entry in ed_runtime_data is stale and will store a pointer to
> the already freed sbrec_port_binding record.
>
> This will cause invalid memory accesses in various places (e.g.,
> pinctrl_run() -> prepare_ipv6_ras()).
>
> To fix the issue we introduce the engine_get_data() API which must be
> called in order to safely access internal node data. If the node is in
> state EN_STALE or EN_ABORTED, engine_get_data() returns NULL as the
> references might be stale.
>
> This commit also adds an "is_valid()" method to engine nodes to allow
> users to override the default behavior of determining if data is valid in
a
> node (e.g., for the ct-zones node the data is always safe to access).
>
> Also, all interactions with node data outside inc-proc-eng.c are now
> performed through APIs and never by directly accessing the node->data
> field. This makes it easier to ensure that we don't access invalid
> (stale) data.
>
> CC: Han Zhou <[email protected]>
> Fixes: ca278d98a4f5 ("ovn-controller: Initial use of incremental engine -
quiet mode.")
> Signed-off-by: Dumitru Ceara <[email protected]>
>
> ---
> v8:
> - First two patches were applied to master, so resending the last patch
>   in the series as standalone patch.
> - Address Han's comments:
>     - Remove internal_data from engine_node.
>     - Use the newly added engine_get_data() to make sure we access valid
>       data outside the incremental processing engine.
>     - Remove data storage outside the nodes and have the init()
>       callbacks allocate and initialize required memory.
> - Also, for better data encapsulations:
>     - Remove all references of engine_node->data from ovn-controller.c
>       and use inc-proc-eng APIs to access the data.
>     - Change all init/cleanup/run/change_handlers to use data supplied
>       as argument.
>     - Use the newly added engine_get_input_data api to access input node
>       data.
>     - At init time, use engine_get_internal_data() to initialize the
>       callback arguments for the unix cmd handlers and to initialize
>       ofctrl.
> ---
>  controller/ovn-controller.c | 457
+++++++++++++++++++++++---------------------
>  lib/inc-proc-eng.c          |  59 +++++-
>  lib/inc-proc-eng.h          | 115 ++++++++---
>  3 files changed, 370 insertions(+), 261 deletions(-)
>
> diff --git a/controller/ovn-controller.c b/controller/ovn-controller.c
> index 64c44c9..5874776 100644
> --- a/controller/ovn-controller.c
> +++ b/controller/ovn-controller.c
> @@ -739,26 +739,25 @@ struct ed_type_ofctrl_is_connected {
>      bool connected;
>  };
>
> -static void
> -en_ofctrl_is_connected_init(struct engine_node *node)
> +static void *
> +en_ofctrl_is_connected_init(struct engine_node *node OVS_UNUSED,
> +                            struct engine_arg *arg OVS_UNUSED)
>  {
> -    struct ed_type_ofctrl_is_connected *data =
> -        (struct ed_type_ofctrl_is_connected *)node->data;
> -    data->connected = false;
> +    struct ed_type_ofctrl_is_connected *data = xzalloc(sizeof *data);
> +    return data;
>  }
>
>  static void
> -en_ofctrl_is_connected_cleanup(struct engine_node *node OVS_UNUSED)
> +en_ofctrl_is_connected_cleanup(void *data OVS_UNUSED)
>  {
>  }
>
>  static void
> -en_ofctrl_is_connected_run(struct engine_node *node)
> +en_ofctrl_is_connected_run(struct engine_node *node, void *data)
>  {
> -    struct ed_type_ofctrl_is_connected *data =
> -        (struct ed_type_ofctrl_is_connected *)node->data;
> -    if (data->connected != ofctrl_is_connected()) {
> -        data->connected = !data->connected;
> +    struct ed_type_ofctrl_is_connected *of_data = data;
> +    if (of_data->connected != ofctrl_is_connected()) {
> +        of_data->connected = !of_data->connected;
>          engine_set_node_state(node, EN_UPDATED);
>          return;
>      }
> @@ -773,21 +772,24 @@ struct ed_type_addr_sets {
>      struct sset updated;
>  };
>
> -static void
> -en_addr_sets_init(struct engine_node *node)
> +static void *
> +en_addr_sets_init(struct engine_node *node OVS_UNUSED,
> +                  struct engine_arg *arg OVS_UNUSED)
>  {
> -    struct ed_type_addr_sets *as = (struct ed_type_addr_sets
*)node->data;
> +    struct ed_type_addr_sets *as = xzalloc(sizeof *as);
> +
>      shash_init(&as->addr_sets);
>      as->change_tracked = false;
>      sset_init(&as->new);
>      sset_init(&as->deleted);
>      sset_init(&as->updated);
> +    return as;
>  }
>
>  static void
> -en_addr_sets_cleanup(struct engine_node *node)
> +en_addr_sets_cleanup(void *data)
>  {
> -    struct ed_type_addr_sets *as = (struct ed_type_addr_sets
*)node->data;
> +    struct ed_type_addr_sets *as = data;
>      expr_const_sets_destroy(&as->addr_sets);
>      shash_destroy(&as->addr_sets);
>      sset_destroy(&as->new);
> @@ -796,9 +798,9 @@ en_addr_sets_cleanup(struct engine_node *node)
>  }
>
>  static void
> -en_addr_sets_run(struct engine_node *node)
> +en_addr_sets_run(struct engine_node *node, void *data)
>  {
> -    struct ed_type_addr_sets *as = (struct ed_type_addr_sets
*)node->data;
> +    struct ed_type_addr_sets *as = data;
>
>      sset_clear(&as->new);
>      sset_clear(&as->deleted);
> @@ -816,9 +818,9 @@ en_addr_sets_run(struct engine_node *node)
>  }
>
>  static bool
> -addr_sets_sb_address_set_handler(struct engine_node *node)
> +addr_sets_sb_address_set_handler(struct engine_node *node, void *data)
>  {
> -    struct ed_type_addr_sets *as = (struct ed_type_addr_sets
*)node->data;
> +    struct ed_type_addr_sets *as = data;
>
>      sset_clear(&as->new);
>      sset_clear(&as->deleted);
> @@ -850,21 +852,24 @@ struct ed_type_port_groups{
>      struct sset updated;
>  };
>
> -static void
> -en_port_groups_init(struct engine_node *node)
> +static void *
> +en_port_groups_init(struct engine_node *node OVS_UNUSED,
> +                    struct engine_arg *arg OVS_UNUSED)
>  {
> -    struct ed_type_port_groups *pg = (struct ed_type_port_groups
*)node->data;
> +    struct ed_type_port_groups *pg = xzalloc(sizeof *pg);
> +
>      shash_init(&pg->port_groups);
>      pg->change_tracked = false;
>      sset_init(&pg->new);
>      sset_init(&pg->deleted);
>      sset_init(&pg->updated);
> +    return pg;
>  }
>
>  static void
> -en_port_groups_cleanup(struct engine_node *node)
> +en_port_groups_cleanup(void *data)
>  {
> -    struct ed_type_port_groups *pg = (struct ed_type_port_groups
*)node->data;
> +    struct ed_type_port_groups *pg = data;
>      expr_const_sets_destroy(&pg->port_groups);
>      shash_destroy(&pg->port_groups);
>      sset_destroy(&pg->new);
> @@ -873,9 +878,9 @@ en_port_groups_cleanup(struct engine_node *node)
>  }
>
>  static void
> -en_port_groups_run(struct engine_node *node)
> +en_port_groups_run(struct engine_node *node, void *data)
>  {
> -    struct ed_type_port_groups *pg = (struct ed_type_port_groups
*)node->data;
> +    struct ed_type_port_groups *pg = data;
>
>      sset_clear(&pg->new);
>      sset_clear(&pg->deleted);
> @@ -893,9 +898,9 @@ en_port_groups_run(struct engine_node *node)
>  }
>
>  static bool
> -port_groups_sb_port_group_handler(struct engine_node *node)
> +port_groups_sb_port_group_handler(struct engine_node *node, void *data)
>  {
> -    struct ed_type_port_groups *pg = (struct ed_type_port_groups
*)node->data;
> +    struct ed_type_port_groups *pg = data;
>
>      sset_clear(&pg->new);
>      sset_clear(&pg->deleted);
> @@ -936,47 +941,46 @@ struct ed_type_runtime_data {
>      struct sset active_tunnels;
>  };
>
> -static void
> -en_runtime_data_init(struct engine_node *node)
> +static void *
> +en_runtime_data_init(struct engine_node *node OVS_UNUSED,
> +                     struct engine_arg *arg OVS_UNUSED)
>  {
> -    struct ed_type_runtime_data *data =
> -        (struct ed_type_runtime_data *)node->data;
> +    struct ed_type_runtime_data *data = xzalloc(sizeof *data);
>
>      hmap_init(&data->local_datapaths);
>      sset_init(&data->local_lports);
>      sset_init(&data->local_lport_ids);
>      sset_init(&data->active_tunnels);
> +    return data;
>  }
>
>  static void
> -en_runtime_data_cleanup(struct engine_node *node)
> +en_runtime_data_cleanup(void *data)
>  {
> -    struct ed_type_runtime_data *data =
> -        (struct ed_type_runtime_data *)node->data;
> +    struct ed_type_runtime_data *rt_data = data;
>
> -    sset_destroy(&data->local_lports);
> -    sset_destroy(&data->local_lport_ids);
> -    sset_destroy(&data->active_tunnels);
> +    sset_destroy(&rt_data->local_lports);
> +    sset_destroy(&rt_data->local_lport_ids);
> +    sset_destroy(&rt_data->active_tunnels);
>      struct local_datapath *cur_node, *next_node;
>      HMAP_FOR_EACH_SAFE (cur_node, next_node, hmap_node,
> -                        &data->local_datapaths) {
> +                        &rt_data->local_datapaths) {
>          free(cur_node->peer_ports);
>          free(cur_node->ports);
> -        hmap_remove(&data->local_datapaths, &cur_node->hmap_node);
> +        hmap_remove(&rt_data->local_datapaths, &cur_node->hmap_node);
>          free(cur_node);
>      }
> -    hmap_destroy(&data->local_datapaths);
> +    hmap_destroy(&rt_data->local_datapaths);
>  }
>
>  static void
> -en_runtime_data_run(struct engine_node *node)
> +en_runtime_data_run(struct engine_node *node, void *data)
>  {
> -    struct ed_type_runtime_data *data =
> -        (struct ed_type_runtime_data *)node->data;
> -    struct hmap *local_datapaths = &data->local_datapaths;
> -    struct sset *local_lports = &data->local_lports;
> -    struct sset *local_lport_ids = &data->local_lport_ids;
> -    struct sset *active_tunnels = &data->active_tunnels;
> +    struct ed_type_runtime_data *rt_data = data;
> +    struct hmap *local_datapaths = &rt_data->local_datapaths;
> +    struct sset *local_lports = &rt_data->local_lports;
> +    struct sset *local_lport_ids = &rt_data->local_lport_ids;
> +    struct sset *active_tunnels = &rt_data->active_tunnels;
>
>      static bool first_run = true;
>      if (first_run) {
> @@ -1020,8 +1024,7 @@ en_runtime_data_run(struct engine_node *node)
>      ovs_assert(chassis);
>
>      struct ed_type_ofctrl_is_connected *ed_ofctrl_is_connected =
> -        (struct ed_type_ofctrl_is_connected *)engine_get_input(
> -            "ofctrl_is_connected", node)->data;
> +        engine_get_input_data("ofctrl_is_connected", node);
>      if (ed_ofctrl_is_connected->connected) {
>          /* Calculate the active tunnels only if have an an active
>           * OpenFlow connection to br-int.
> @@ -1075,12 +1078,11 @@ en_runtime_data_run(struct engine_node *node)
>  }
>
>  static bool
> -runtime_data_sb_port_binding_handler(struct engine_node *node)
> +runtime_data_sb_port_binding_handler(struct engine_node *node, void
*data)
>  {
> -    struct ed_type_runtime_data *data =
> -        (struct ed_type_runtime_data *)node->data;
> -    struct sset *local_lports = &data->local_lports;
> -    struct sset *active_tunnels = &data->active_tunnels;
> +    struct ed_type_runtime_data *rt_data = data;
> +    struct sset *local_lports = &rt_data->local_lports;
> +    struct sset *active_tunnels = &rt_data->active_tunnels;
>
>      struct ovsrec_open_vswitch_table *ovs_table =
>          (struct ovsrec_open_vswitch_table *)EN_OVSDB_GET(
> @@ -1119,10 +1121,10 @@ struct ed_type_ct_zones {
>      struct simap current;
>  };
>
> -static void
> -en_ct_zones_init(struct engine_node *node)
> +static void *
> +en_ct_zones_init(struct engine_node *node, struct engine_arg *arg
OVS_UNUSED)
>  {
> -    struct ed_type_ct_zones *data = node->data;
> +    struct ed_type_ct_zones *data = xzalloc(sizeof *data);
>      struct ovsrec_open_vswitch_table *ovs_table =
>          (struct ovsrec_open_vswitch_table *)EN_OVSDB_GET(
>              engine_get_input("OVS_open_vswitch", node));
> @@ -1136,56 +1138,63 @@ en_ct_zones_init(struct engine_node *node)
>      memset(data->bitmap, 0, sizeof data->bitmap);
>      bitmap_set1(data->bitmap, 0); /* Zone 0 is reserved. */
>      restore_ct_zones(bridge_table, ovs_table, &data->current,
data->bitmap);
> +    return data;
>  }
>
>  static void
> -en_ct_zones_cleanup(struct engine_node *node)
> +en_ct_zones_cleanup(void *data)
>  {
> -    struct ed_type_ct_zones *data = node->data;
> +    struct ed_type_ct_zones *ct_zones_data = data;
>
> -    simap_destroy(&data->current);
> -    shash_destroy(&data->pending);
> +    simap_destroy(&ct_zones_data->current);
> +    shash_destroy(&ct_zones_data->pending);
>  }
>
>  static void
> -en_ct_zones_run(struct engine_node *node)
> +en_ct_zones_run(struct engine_node *node, void *data)
>  {
> -    struct ed_type_ct_zones *data = node->data;
> +    struct ed_type_ct_zones *ct_zones_data = data;
>      struct ed_type_runtime_data *rt_data =
> -        (struct ed_type_runtime_data *)engine_get_input(
> -            "runtime_data", node)->data;
> +        engine_get_input_data("runtime_data", node);
>
>      update_ct_zones(&rt_data->local_lports, &rt_data->local_datapaths,
> -                    &data->current, data->bitmap, &data->pending);
> +                    &ct_zones_data->current, ct_zones_data->bitmap,
> +                    &ct_zones_data->pending);
>
>      engine_set_node_state(node, EN_UPDATED);
>  }
>
> +/* The data in the ct_zones node is always valid (i.e., no stale
pointers). */
> +static bool
> +en_ct_zones_is_valid(struct engine_node *node OVS_UNUSED)
> +{
> +    return true;
> +}
> +
>  struct ed_type_mff_ovn_geneve {
>      enum mf_field_id mff_ovn_geneve;
>  };
>
> -static void
> -en_mff_ovn_geneve_init(struct engine_node *node)
> +static void *
> +en_mff_ovn_geneve_init(struct engine_node *node OVS_UNUSED,
> +                       struct engine_arg *arg OVS_UNUSED)
>  {
> -    struct ed_type_mff_ovn_geneve *data =
> -        (struct ed_type_mff_ovn_geneve *)node->data;
> -    data->mff_ovn_geneve = 0;
> +    struct ed_type_mff_ovn_geneve *data = xzalloc(sizeof *data);
> +    return data;
>  }
>
>  static void
> -en_mff_ovn_geneve_cleanup(struct engine_node *node OVS_UNUSED)
> +en_mff_ovn_geneve_cleanup(void *data OVS_UNUSED)
>  {
>  }
>
>  static void
> -en_mff_ovn_geneve_run(struct engine_node *node)
> +en_mff_ovn_geneve_run(struct engine_node *node, void *data)
>  {
> -    struct ed_type_mff_ovn_geneve *data =
> -        (struct ed_type_mff_ovn_geneve *)node->data;
> +    struct ed_type_mff_ovn_geneve *ed_mff_ovn_geneve = data;
>      enum mf_field_id mff_ovn_geneve = ofctrl_get_mf_field_id();
> -    if (data->mff_ovn_geneve != mff_ovn_geneve) {
> -        data->mff_ovn_geneve = mff_ovn_geneve;
> +    if (ed_mff_ovn_geneve->mff_ovn_geneve != mff_ovn_geneve) {
> +        ed_mff_ovn_geneve->mff_ovn_geneve = mff_ovn_geneve;
>          engine_set_node_state(node, EN_UPDATED);
>          return;
>      }
> @@ -1205,48 +1214,46 @@ struct ed_type_flow_output {
>      struct lflow_resource_ref lflow_resource_ref;
>  };
>
> -static void
> -en_flow_output_init(struct engine_node *node)
> +static void *
> +en_flow_output_init(struct engine_node *node OVS_UNUSED,
> +                    struct engine_arg *arg OVS_UNUSED)
>  {
> -    struct ed_type_flow_output *data =
> -        (struct ed_type_flow_output *)node->data;
> +    struct ed_type_flow_output *data = xzalloc(sizeof *data);
> +
>      ovn_desired_flow_table_init(&data->flow_table);
>      ovn_extend_table_init(&data->group_table);
>      ovn_extend_table_init(&data->meter_table);
>      data->conj_id_ofs = 1;
>      lflow_resource_init(&data->lflow_resource_ref);
> +    return data;
>  }
>
>  static void
> -en_flow_output_cleanup(struct engine_node *node)
> +en_flow_output_cleanup(void *data)
>  {
> -    struct ed_type_flow_output *data =
> -        (struct ed_type_flow_output *)node->data;
> -    ovn_desired_flow_table_destroy(&data->flow_table);
> -    ovn_extend_table_destroy(&data->group_table);
> -    ovn_extend_table_destroy(&data->meter_table);
> -    lflow_resource_destroy(&data->lflow_resource_ref);
> +    struct ed_type_flow_output *flow_output_data = data;
> +    ovn_desired_flow_table_destroy(&flow_output_data->flow_table);
> +    ovn_extend_table_destroy(&flow_output_data->group_table);
> +    ovn_extend_table_destroy(&flow_output_data->meter_table);
> +    lflow_resource_destroy(&flow_output_data->lflow_resource_ref);
>  }
>
>  static void
> -en_flow_output_run(struct engine_node *node)
> +en_flow_output_run(struct engine_node *node, void *data)
>  {
>      struct ed_type_runtime_data *rt_data =
> -        (struct ed_type_runtime_data *)engine_get_input(
> -            "runtime_data", node)->data;
> +        engine_get_input_data("runtime_data", node);
>      struct hmap *local_datapaths = &rt_data->local_datapaths;
>      struct sset *local_lports = &rt_data->local_lports;
>      struct sset *local_lport_ids = &rt_data->local_lport_ids;
>      struct sset *active_tunnels = &rt_data->active_tunnels;
>
>      struct ed_type_ct_zones *ct_zones_data =
> -        (struct ed_type_ct_zones *)engine_get_input(
> -            "ct_zones", node)->data;
> +        engine_get_input_data("ct_zones", node);
>      struct simap *ct_zones = &ct_zones_data->current;
>
>      struct ed_type_mff_ovn_geneve *ed_mff_ovn_geneve =
> -        (struct ed_type_mff_ovn_geneve *)engine_get_input(
> -            "mff_ovn_geneve", node)->data;
> +        engine_get_input_data("mff_ovn_geneve", node);
>      enum mf_field_id mff_ovn_geneve = ed_mff_ovn_geneve->mff_ovn_geneve;
>
>      struct ovsrec_open_vswitch_table *ovs_table =
> @@ -1263,12 +1270,11 @@ en_flow_output_run(struct engine_node *node)
>                  engine_get_input("SB_chassis", node),
>                  "name");
>      struct ed_type_addr_sets *as_data =
> -        (struct ed_type_addr_sets *)engine_get_input("addr_sets",
node)->data;
> +        engine_get_input_data("addr_sets", node);
>      struct shash *addr_sets = &as_data->addr_sets;
>
>      struct ed_type_port_groups *pg_data =
> -        (struct ed_type_port_groups *)engine_get_input(
> -            "port_groups", node)->data;
> +        engine_get_input_data("port_groups", node);
>      struct shash *port_groups = &pg_data->port_groups;
>
>      const struct sbrec_chassis *chassis = NULL;
> @@ -1278,8 +1284,7 @@ en_flow_output_run(struct engine_node *node)
>
>      ovs_assert(br_int && chassis);
>
> -    struct ed_type_flow_output *fo =
> -        (struct ed_type_flow_output *)node->data;
> +    struct ed_type_flow_output *fo = data;
>      struct ovn_desired_flow_table *flow_table = &fo->flow_table;
>      struct ovn_extend_table *group_table = &fo->group_table;
>      struct ovn_extend_table *meter_table = &fo->meter_table;
> @@ -1359,21 +1364,19 @@ en_flow_output_run(struct engine_node *node)
>  }
>
>  static bool
> -flow_output_sb_logical_flow_handler(struct engine_node *node)
> -{
> -    struct ed_type_runtime_data *data =
> -        (struct ed_type_runtime_data *)engine_get_input(
> -                "runtime_data", node)->data;
> -    struct hmap *local_datapaths = &data->local_datapaths;
> -    struct sset *local_lport_ids = &data->local_lport_ids;
> -    struct sset *active_tunnels = &data->active_tunnels;
> +flow_output_sb_logical_flow_handler(struct engine_node *node, void *data)
> +{
> +    struct ed_type_runtime_data *rt_data =
> +        engine_get_input_data("runtime_data", node);
> +    struct hmap *local_datapaths = &rt_data->local_datapaths;
> +    struct sset *local_lport_ids = &rt_data->local_lport_ids;
> +    struct sset *active_tunnels = &rt_data->active_tunnels;
>      struct ed_type_addr_sets *as_data =
> -        (struct ed_type_addr_sets *)engine_get_input("addr_sets",
node)->data;
> +        engine_get_input_data("addr_sets", node);
>      struct shash *addr_sets = &as_data->addr_sets;
>
>      struct ed_type_port_groups *pg_data =
> -        (struct ed_type_port_groups *)engine_get_input(
> -            "port_groups", node)->data;
> +        engine_get_input_data("port_groups", node);
>      struct shash *port_groups = &pg_data->port_groups;
>
>      struct ovsrec_open_vswitch_table *ovs_table =
> @@ -1397,8 +1400,7 @@ flow_output_sb_logical_flow_handler(struct
engine_node *node)
>
>      ovs_assert(br_int && chassis);
>
> -    struct ed_type_flow_output *fo =
> -        (struct ed_type_flow_output *)node->data;
> +    struct ed_type_flow_output *fo = data;
>      struct ovn_desired_flow_table *flow_table = &fo->flow_table;
>      struct ovn_extend_table *group_table = &fo->group_table;
>      struct ovn_extend_table *meter_table = &fo->meter_table;
> @@ -1442,7 +1444,7 @@ flow_output_sb_logical_flow_handler(struct
engine_node *node)
>  }
>
>  static bool
> -flow_output_sb_mac_binding_handler(struct engine_node *node)
> +flow_output_sb_mac_binding_handler(struct engine_node *node, void *data)
>  {
>      struct ovsdb_idl_index *sbrec_port_binding_by_name =
>          engine_ovsdb_node_get_index(
> @@ -1453,8 +1455,7 @@ flow_output_sb_mac_binding_handler(struct
engine_node *node)
>          (struct sbrec_mac_binding_table *)EN_OVSDB_GET(
>              engine_get_input("SB_mac_binding", node));
>
> -    struct ed_type_flow_output *fo =
> -        (struct ed_type_flow_output *)node->data;
> +    struct ed_type_flow_output *fo = data;
>      struct ovn_desired_flow_table *flow_table = &fo->flow_table;
>
>      lflow_handle_changed_neighbors(sbrec_port_binding_by_name,
> @@ -1465,22 +1466,19 @@ flow_output_sb_mac_binding_handler(struct
engine_node *node)
>  }
>
>  static bool
> -flow_output_sb_port_binding_handler(struct engine_node *node)
> +flow_output_sb_port_binding_handler(struct engine_node *node, void *data)
>  {
> -    struct ed_type_runtime_data *data =
> -        (struct ed_type_runtime_data *)engine_get_input(
> -                "runtime_data", node)->data;
> -    struct hmap *local_datapaths = &data->local_datapaths;
> -    struct sset *active_tunnels = &data->active_tunnels;
> +    struct ed_type_runtime_data *rt_data =
> +        engine_get_input_data("runtime_data", node);
> +    struct hmap *local_datapaths = &rt_data->local_datapaths;
> +    struct sset *active_tunnels = &rt_data->active_tunnels;
>
>      struct ed_type_ct_zones *ct_zones_data =
> -        (struct ed_type_ct_zones *)engine_get_input(
> -            "ct_zones", node)->data;
> +        engine_get_input_data("ct_zones", node);
>      struct simap *ct_zones = &ct_zones_data->current;
>
>      struct ed_type_mff_ovn_geneve *ed_mff_ovn_geneve =
> -        (struct ed_type_mff_ovn_geneve *)engine_get_input(
> -            "mff_ovn_geneve", node)->data;
> +        engine_get_input_data("mff_ovn_geneve", node);
>      enum mf_field_id mff_ovn_geneve = ed_mff_ovn_geneve->mff_ovn_geneve;
>
>      struct ovsrec_open_vswitch_table *ovs_table =
> @@ -1502,8 +1500,7 @@ flow_output_sb_port_binding_handler(struct
engine_node *node)
>      }
>      ovs_assert(br_int && chassis);
>
> -    struct ed_type_flow_output *fo =
> -        (struct ed_type_flow_output *)node->data;
> +    struct ed_type_flow_output *fo = data;
>      struct ovn_desired_flow_table *flow_table = &fo->flow_table;
>
>      struct ovsdb_idl_index *sbrec_port_binding_by_name =
> @@ -1573,21 +1570,18 @@ flow_output_sb_port_binding_handler(struct
engine_node *node)
>  }
>
>  static bool
> -flow_output_sb_multicast_group_handler(struct engine_node *node)
> +flow_output_sb_multicast_group_handler(struct engine_node *node, void
*data)
>  {
> -    struct ed_type_runtime_data *data =
> -        (struct ed_type_runtime_data *)engine_get_input(
> -                "runtime_data", node)->data;
> -    struct hmap *local_datapaths = &data->local_datapaths;
> +    struct ed_type_runtime_data *rt_data =
> +        engine_get_input_data("runtime_data", node);
> +    struct hmap *local_datapaths = &rt_data->local_datapaths;
>
>      struct ed_type_ct_zones *ct_zones_data =
> -        (struct ed_type_ct_zones *)engine_get_input(
> -            "ct_zones", node)->data;
> +        engine_get_input_data("ct_zones", node);
>      struct simap *ct_zones = &ct_zones_data->current;
>
>      struct ed_type_mff_ovn_geneve *ed_mff_ovn_geneve =
> -        (struct ed_type_mff_ovn_geneve *)engine_get_input(
> -            "mff_ovn_geneve", node)->data;
> +        engine_get_input_data("mff_ovn_geneve", node);
>      enum mf_field_id mff_ovn_geneve = ed_mff_ovn_geneve->mff_ovn_geneve;
>
>      struct ovsrec_open_vswitch_table *ovs_table =
> @@ -1609,8 +1603,7 @@ flow_output_sb_multicast_group_handler(struct
engine_node *node)
>      }
>      ovs_assert(br_int && chassis);
>
> -    struct ed_type_flow_output *fo =
> -        (struct ed_type_flow_output *)node->data;
> +    struct ed_type_flow_output *fo = data;
>      struct ovn_desired_flow_table *flow_table = &fo->flow_table;
>
>      struct sbrec_multicast_group_table *multicast_group_table =
> @@ -1627,23 +1620,21 @@ flow_output_sb_multicast_group_handler(struct
engine_node *node)
>  }
>
>  static bool
> -_flow_output_resource_ref_handler(struct engine_node *node,
> -                                 enum ref_type ref_type)
> +_flow_output_resource_ref_handler(struct engine_node *node, void *data,
> +                                  enum ref_type ref_type)
>  {
> -    struct ed_type_runtime_data *data =
> -        (struct ed_type_runtime_data *)engine_get_input(
> -                "runtime_data", node)->data;
> -    struct hmap *local_datapaths = &data->local_datapaths;
> -    struct sset *local_lport_ids = &data->local_lport_ids;
> -    struct sset *active_tunnels = &data->active_tunnels;
> +    struct ed_type_runtime_data *rt_data =
> +        engine_get_input_data("runtime_data", node);
> +    struct hmap *local_datapaths = &rt_data->local_datapaths;
> +    struct sset *local_lport_ids = &rt_data->local_lport_ids;
> +    struct sset *active_tunnels = &rt_data->active_tunnels;
>
>      struct ed_type_addr_sets *as_data =
> -        (struct ed_type_addr_sets *)engine_get_input("addr_sets",
node)->data;
> +        engine_get_input_data("addr_sets", node);
>      struct shash *addr_sets = &as_data->addr_sets;
>
>      struct ed_type_port_groups *pg_data =
> -        (struct ed_type_port_groups *)engine_get_input(
> -            "port_groups", node)->data;
> +        engine_get_input_data("port_groups", node);
>      struct shash *port_groups = &pg_data->port_groups;
>
>      struct ovsrec_open_vswitch_table *ovs_table =
> @@ -1666,8 +1657,7 @@ _flow_output_resource_ref_handler(struct
engine_node *node,
>
>      ovs_assert(br_int && chassis);
>
> -    struct ed_type_flow_output *fo =
> -        (struct ed_type_flow_output *)node->data;
> +    struct ed_type_flow_output *fo = data;
>      struct ovn_desired_flow_table *flow_table = &fo->flow_table;
>      struct ovn_extend_table *group_table = &fo->group_table;
>      struct ovn_extend_table *meter_table = &fo->meter_table;
> @@ -1774,15 +1764,15 @@ _flow_output_resource_ref_handler(struct
engine_node *node,
>  }
>
>  static bool
> -flow_output_addr_sets_handler(struct engine_node *node)
> +flow_output_addr_sets_handler(struct engine_node *node, void *data)
>  {
> -    return _flow_output_resource_ref_handler(node, REF_TYPE_ADDRSET);
> +    return _flow_output_resource_ref_handler(node, data,
REF_TYPE_ADDRSET);
>  }
>
>  static bool
> -flow_output_port_groups_handler(struct engine_node *node)
> +flow_output_port_groups_handler(struct engine_node *node, void *data)
>  {
> -    return _flow_output_resource_ref_handler(node, REF_TYPE_PORTGROUP);
> +    return _flow_output_resource_ref_handler(node, data,
REF_TYPE_PORTGROUP);
>  }
>
>  struct ovn_controller_exit_args {
> @@ -1892,15 +1882,7 @@ main(int argc, char *argv[])
>      stopwatch_create(CONTROLLER_LOOP_STOPWATCH_NAME, SW_MS);
>
>      /* Define inc-proc-engine nodes. */
> -    struct ed_type_ct_zones ed_ct_zones;
> -    struct ed_type_runtime_data ed_runtime_data;
> -    struct ed_type_mff_ovn_geneve ed_mff_ovn_geneve;
> -    struct ed_type_ofctrl_is_connected ed_ofctrl_is_connected;
> -    struct ed_type_flow_output ed_flow_output;
> -    struct ed_type_addr_sets ed_addr_sets;
> -    struct ed_type_port_groups ed_port_groups;
> -
> -    ENGINE_NODE(ct_zones, "ct_zones");
> +    ENGINE_NODE_CUSTOM_DATA(ct_zones, "ct_zones");
>      ENGINE_NODE(runtime_data, "runtime_data");
>      ENGINE_NODE(mff_ovn_geneve, "mff_ovn_geneve");
>      ENGINE_NODE(ofctrl_is_connected, "ofctrl_is_connected");
> @@ -1916,18 +1898,6 @@ main(int argc, char *argv[])
>      OVS_NODES
>  #undef OVS_NODE
>
> -    engine_ovsdb_node_add_index(&en_sb_chassis, "name",
sbrec_chassis_by_name);
> -    engine_ovsdb_node_add_index(&en_sb_multicast_group, "name_datapath",
> -                                sbrec_multicast_group_by_name_datapath);
> -    engine_ovsdb_node_add_index(&en_sb_port_binding, "name",
> -                                sbrec_port_binding_by_name);
> -    engine_ovsdb_node_add_index(&en_sb_port_binding, "key",
> -                                sbrec_port_binding_by_key);
> -    engine_ovsdb_node_add_index(&en_sb_port_binding, "datapath",
> -                                sbrec_port_binding_by_datapath);
> -    engine_ovsdb_node_add_index(&en_sb_datapath_binding, "key",
> -                                sbrec_datapath_binding_by_key);
> -
>      /* Add dependencies between inc-proc-engine nodes. */
>
>      engine_add_input(&en_addr_sets, &en_sb_address_set,
> @@ -1976,20 +1946,45 @@ main(int argc, char *argv[])
>      engine_add_input(&en_runtime_data, &en_sb_port_binding,
>                       runtime_data_sb_port_binding_handler);
>
> -    engine_init(&en_flow_output);
> +    struct engine_arg engine_arg = {
> +        .sb_idl = ovnsb_idl_loop.idl,
> +        .ovs_idl = ovs_idl_loop.idl,
> +    };
> +    engine_init(&en_flow_output, &engine_arg);
>
> -    ofctrl_init(&ed_flow_output.group_table,
> -                &ed_flow_output.meter_table,
> +    engine_ovsdb_node_add_index(&en_sb_chassis, "name",
sbrec_chassis_by_name);
> +    engine_ovsdb_node_add_index(&en_sb_multicast_group, "name_datapath",
> +                                sbrec_multicast_group_by_name_datapath);
> +    engine_ovsdb_node_add_index(&en_sb_port_binding, "name",
> +                                sbrec_port_binding_by_name);
> +    engine_ovsdb_node_add_index(&en_sb_port_binding, "key",
> +                                sbrec_port_binding_by_key);
> +    engine_ovsdb_node_add_index(&en_sb_port_binding, "datapath",
> +                                sbrec_port_binding_by_datapath);
> +    engine_ovsdb_node_add_index(&en_sb_datapath_binding, "key",
> +                                sbrec_datapath_binding_by_key);
> +
> +    struct ed_type_flow_output *flow_output_data =
> +        engine_get_internal_data(&en_flow_output);
> +    struct ed_type_ct_zones *ct_zones_data =
> +        engine_get_internal_data(&en_ct_zones);
> +    struct ed_type_runtime_data *runtime_data = NULL;
> +
> +    ofctrl_init(&flow_output_data->group_table,
> +                &flow_output_data->meter_table,
>                  get_ofctrl_probe_interval(ovs_idl_loop.idl));
>
>      unixctl_command_register("group-table-list", "", 0, 0,
> -                             group_table_list,
&ed_flow_output.group_table);
> +                             group_table_list,
> +                             &flow_output_data->group_table);
>
>      unixctl_command_register("meter-table-list", "", 0, 0,
> -                             meter_table_list,
&ed_flow_output.meter_table);
> +                             meter_table_list,
> +                             &flow_output_data->meter_table);
>
>      unixctl_command_register("ct-zone-list", "", 0, 0,
> -                             ct_zone_list, &ed_ct_zones.current);
> +                             ct_zone_list,
> +                             &ct_zones_data->current);
>
>      struct pending_pkt pending_pkt = { .conn = NULL };
>      unixctl_command_register("inject-pkt", "MICROFLOW", 1, 1, inject_pkt,
> @@ -2065,7 +2060,10 @@ main(int argc, char *argv[])
>              }
>
>              if (br_int) {
> -                ofctrl_run(br_int, &ed_ct_zones.pending);
> +                ct_zones_data = engine_get_data(&en_ct_zones);
> +                if (ct_zones_data) {
> +                    ofctrl_run(br_int, &ct_zones_data->pending);
> +                }
>
>                  if (chassis) {
>                      patch_run(ovs_idl_txn,
> @@ -2105,41 +2103,50 @@ main(int argc, char *argv[])
>                      }
>                      stopwatch_stop(CONTROLLER_LOOP_STOPWATCH_NAME,
>                                     time_msec());
> +                    ct_zones_data = engine_get_data(&en_ct_zones);
>                      if (ovs_idl_txn) {
> -                        commit_ct_zones(br_int, &ed_ct_zones.pending);
> +                        if (ct_zones_data) {
> +                            commit_ct_zones(br_int,
&ct_zones_data->pending);
> +                        }
>
 bfd_run(ovsrec_interface_table_get(ovs_idl_loop.idl),
>                                  br_int, chassis,
>                                  sbrec_ha_chassis_group_table_get(
>                                      ovnsb_idl_loop.idl),
>
 sbrec_sb_global_table_get(ovnsb_idl_loop.idl));
>                      }
> -                    ofctrl_put(&ed_flow_output.flow_table,
> -                               &ed_ct_zones.pending,
> -                               sbrec_meter_table_get(ovnsb_idl_loop.idl),
> -                               get_nb_cfg(sbrec_sb_global_table_get(
> -                                              ovnsb_idl_loop.idl)),
> -                               engine_node_changed(&en_flow_output));
> -                    pinctrl_run(ovnsb_idl_txn,
> -                                sbrec_datapath_binding_by_key,
> -                                sbrec_port_binding_by_datapath,
> -                                sbrec_port_binding_by_key,
> -                                sbrec_port_binding_by_name,
> -                                sbrec_mac_binding_by_lport_ip,
> -                                sbrec_igmp_group,
> -                                sbrec_ip_multicast,
> -                                sbrec_dns_table_get(ovnsb_idl_loop.idl),
> -                                sbrec_controller_event_table_get(
> -                                    ovnsb_idl_loop.idl),
> -                                sbrec_service_monitor_table_get(
> -                                    ovnsb_idl_loop.idl),
> -                                br_int, chassis,
> -                                &ed_runtime_data.local_datapaths,
> -                                &ed_runtime_data.active_tunnels);
>
> -                    if (engine_node_changed(&en_runtime_data)) {
> -                        update_sb_monitors(ovnsb_idl_loop.idl, chassis,
> -                                           &ed_runtime_data.local_lports,
> -
&ed_runtime_data.local_datapaths);
> +                    flow_output_data = engine_get_data(&en_flow_output);
> +                    if (flow_output_data && ct_zones_data) {
> +                        ofctrl_put(&flow_output_data->flow_table,
> +                                   &ct_zones_data->pending,
> +
sbrec_meter_table_get(ovnsb_idl_loop.idl),
> +                                   get_nb_cfg(sbrec_sb_global_table_get(
> +                                                   ovnsb_idl_loop.idl)),
> +                                   engine_node_changed(&en_flow_output));
> +                    }
> +                    runtime_data = engine_get_data(&en_runtime_data);
> +                    if (runtime_data) {
> +                        pinctrl_run(ovnsb_idl_txn,
> +                                    sbrec_datapath_binding_by_key,
> +                                    sbrec_port_binding_by_datapath,
> +                                    sbrec_port_binding_by_key,
> +                                    sbrec_port_binding_by_name,
> +                                    sbrec_mac_binding_by_lport_ip,
> +                                    sbrec_igmp_group,
> +                                    sbrec_ip_multicast,
> +
 sbrec_dns_table_get(ovnsb_idl_loop.idl),
> +                                    sbrec_controller_event_table_get(
> +                                        ovnsb_idl_loop.idl),
> +                                    sbrec_service_monitor_table_get(
> +                                        ovnsb_idl_loop.idl),
> +                                    br_int, chassis,
> +                                    &runtime_data->local_datapaths,
> +                                    &runtime_data->active_tunnels);
> +                        if (engine_node_changed(&en_runtime_data)) {
> +                            update_sb_monitors(ovnsb_idl_loop.idl,
chassis,
> +
&runtime_data->local_lports,
> +
&runtime_data->local_datapaths);
> +                        }
>                      }
>                  }
>
> @@ -2174,9 +2181,13 @@ main(int argc, char *argv[])
>
>
>              if (pending_pkt.conn) {
> -                if (br_int && chassis) {
> +                struct ed_type_addr_sets *as_data =
> +                    engine_get_data(&en_addr_sets);
> +                struct ed_type_port_groups *pg_data =
> +                    engine_get_data(&en_port_groups);
> +                if (br_int && chassis && as_data && pg_data) {
>                      char *error = ofctrl_inject_pkt(br_int,
pending_pkt.flow_s,
> -                        &ed_addr_sets.addr_sets,
&ed_port_groups.port_groups);
> +                        &as_data->addr_sets, &pg_data->port_groups);
>                      if (error) {
>                          unixctl_command_reply_error(pending_pkt.conn,
error);
>                          free(error);
> @@ -2214,12 +2225,16 @@ main(int argc, char *argv[])
>          }
>
>          if (ovsdb_idl_loop_commit_and_wait(&ovs_idl_loop) == 1) {
> -            struct shash_node *iter, *iter_next;
> -            SHASH_FOR_EACH_SAFE (iter, iter_next, &ed_ct_zones.pending) {
> -                struct ct_zone_pending_entry *ctzpe = iter->data;
> -                if (ctzpe->state == CT_ZONE_DB_SENT) {
> -                    shash_delete(&ed_ct_zones.pending, iter);
> -                    free(ctzpe);
> +            ct_zones_data = engine_get_data(&en_ct_zones);
> +            if (ct_zones_data) {
> +                struct shash_node *iter, *iter_next;
> +                SHASH_FOR_EACH_SAFE (iter, iter_next,
> +                                     &ct_zones_data->pending) {
> +                    struct ct_zone_pending_entry *ctzpe = iter->data;
> +                    if (ctzpe->state == CT_ZONE_DB_SENT) {
> +                        shash_delete(&ct_zones_data->pending, iter);
> +                        free(ctzpe);
> +                    }
>                  }
>              }
>          }
> diff --git a/lib/inc-proc-eng.c b/lib/inc-proc-eng.c
> index 59b5cac..9b1479a 100644
> --- a/lib/inc-proc-eng.c
> +++ b/lib/inc-proc-eng.c
> @@ -103,13 +103,16 @@ engine_get_nodes(struct engine_node *node, size_t
*n_count)
>  }
>
>  void
> -engine_init(struct engine_node *node)
> +engine_init(struct engine_node *node, struct engine_arg *arg)
>  {
>      engine_nodes = engine_get_nodes(node, &engine_n_nodes);
>
>      for (size_t i = 0; i < engine_n_nodes; i++) {
>          if (engine_nodes[i]->init) {
> -            engine_nodes[i]->init(engine_nodes[i]);
> +            engine_nodes[i]->data =
> +                engine_nodes[i]->init(engine_nodes[i], arg);
> +        } else {
> +            engine_nodes[i]->data = NULL;
>          }
>      }
>  }
> @@ -119,8 +122,9 @@ engine_cleanup(void)
>  {
>      for (size_t i = 0; i < engine_n_nodes; i++) {
>          if (engine_nodes[i]->cleanup) {
> -            engine_nodes[i]->cleanup(engine_nodes[i]);
> +            engine_nodes[i]->cleanup(engine_nodes[i]->data);
>          }
> +        free(engine_nodes[i]->data);
>      }
>      free(engine_nodes);
>      engine_nodes = NULL;
> @@ -140,9 +144,16 @@ engine_get_input(const char *input_name, struct
engine_node *node)
>      return NULL;
>  }
>
> +void *
> +engine_get_input_data(const char *input_name, struct engine_node *node)
> +{
> +    struct engine_node *input_node = engine_get_input(input_name, node);
> +    return engine_get_data(input_node);
> +}
> +
>  void
>  engine_add_input(struct engine_node *node, struct engine_node *input,
> -                 bool (*change_handler)(struct engine_node *))
> +                 bool (*change_handler)(struct engine_node *, void *))
>  {
>      ovs_assert(node->n_inputs < ENGINE_MAX_INPUT);
>      node->inputs[node->n_inputs].node = input;
> @@ -153,7 +164,7 @@ engine_add_input(struct engine_node *node, struct
engine_node *input,
>  struct ovsdb_idl_index *
>  engine_ovsdb_node_get_index(struct engine_node *node, const char *name)
>  {
> -    struct ed_type_ovsdb_table *ed = (struct ed_type_ovsdb_table
*)node->data;
> +    struct ed_type_ovsdb_table *ed = node->data;
>      for (size_t i = 0; i < ed->n_indexes; i++) {
>          if (!strcmp(ed->indexes[i].name, name)) {
>              return ed->indexes[i].index;
> @@ -167,7 +178,7 @@ void
>  engine_ovsdb_node_add_index(struct engine_node *node, const char *name,
>                              struct ovsdb_idl_index *index)
>  {
> -    struct ed_type_ovsdb_table *ed = (struct ed_type_ovsdb_table
*)node->data;
> +    struct ed_type_ovsdb_table *ed = node->data;
>      ovs_assert(ed->n_indexes < ENGINE_MAX_OVSDB_INDEX);
>
>      ed->indexes[ed->n_indexes].name = name;
> @@ -192,6 +203,19 @@ engine_set_node_state_at(struct engine_node *node,
>      node->state = state;
>  }
>
> +static bool
> +engine_node_valid(struct engine_node *node)
> +{
> +    if (node->state == EN_UPDATED || node->state == EN_VALID) {
> +        return true;
> +    }
> +
> +    if (node->is_valid) {
> +        return node->is_valid(node);
> +    }
> +    return false;
> +}
> +
>  bool
>  engine_node_changed(struct engine_node *node)
>  {
> @@ -215,6 +239,21 @@ engine_aborted(void)
>      return engine_run_aborted;
>  }
>
> +void *
> +engine_get_data(struct engine_node *node)
> +{
> +    if (engine_node_valid(node)) {
> +        return node->data;
> +    }
> +    return NULL;
> +}
> +
> +void *
> +engine_get_internal_data(struct engine_node *node)
> +{
> +    return node->data;
> +}
> +
>  void
>  engine_init_run(void)
>  {
> @@ -240,7 +279,7 @@ engine_recompute(struct engine_node *node, bool
forced, bool allowed)
>      }
>
>      /* Run the node handler which might change state. */
> -    node->run(node);
> +    node->run(node, node->data);
>  }
>
>  /* Return true if the node could be computed, false otherwise. */
> @@ -256,7 +295,7 @@ engine_compute(struct engine_node *node, bool
recompute_allowed)
>              /* If the input change can't be handled incrementally, run
>               * the node handler.
>               */
> -            if (!node->inputs[i].change_handler(node)) {
> +            if (!node->inputs[i].change_handler(node, node->data)) {
>                  VLOG_DBG("node: %s, can't handle change for input %s, "
>                           "fall back to recompute",
>                           node->name, node->inputs[i].node->name);
> @@ -273,7 +312,7 @@ engine_run_node(struct engine_node *node, bool
recompute_allowed)
>  {
>      if (!node->n_inputs) {
>          /* Run the node handler which might change state. */
> -        node->run(node);
> +        node->run(node, node->data);
>          return;
>      }
>
> @@ -345,7 +384,7 @@ engine_need_run(void)
>              continue;
>          }
>
> -        engine_nodes[i]->run(engine_nodes[i]);
> +        engine_nodes[i]->run(engine_nodes[i], engine_nodes[i]->data);
>          VLOG_DBG("input node: %s, state: %s", engine_nodes[i]->name,
>                   engine_node_state_name[engine_nodes[i]->state]);
>          if (engine_nodes[i]->state == EN_UPDATED) {
> diff --git a/lib/inc-proc-eng.h b/lib/inc-proc-eng.h
> index bb8df07..5b92971 100644
> --- a/lib/inc-proc-eng.h
> +++ b/lib/inc-proc-eng.h
> @@ -68,6 +68,12 @@ struct engine_context {
>      struct ovsdb_idl_txn *ovnsb_idl_txn;
>  };
>
> +/* Arguments to be passed to the engine at engine_init(). */
> +struct engine_arg {
> +    struct ovsdb_idl *sb_idl;
> +    struct ovsdb_idl *ovs_idl;
> +};
> +
>  struct engine_node;
>
>  struct engine_node_input {
> @@ -79,7 +85,7 @@ struct engine_node_input {
>       *  - true: if change can be handled
>       *  - false: if change cannot be handled (indicating full recompute
needed)
>       */
> -    bool (*change_handler)(struct engine_node *node);
> +    bool (*change_handler)(struct engine_node *node, void *data);
>  };
>
>  enum engine_node_state {
> @@ -106,30 +112,42 @@ struct engine_node {
>      /* Inputs of this node. */
>      struct engine_node_input inputs[ENGINE_MAX_INPUT];
>
> -    /* Data of this node. It is vague and interpreted by the related
functions.
> -     * The content of the data should be changed only by the
change_handlers
> -     * and run() function of the current node. Users should ensure that
the
> -     * data is read-only in change-handlers of the nodes that depends on
this
> -     * node. */
> +    /* A pointer to node internal data. The data is safely accessible to
> +     * users through the engine_get_data() API. For special cases, when
the
> +     * data is known to be valid (e.g., at init time), users can also
call
> +     * engine_get_internal_data().
> +     */
>      void *data;
>
>      /* State of the node after the last engine run. */
>      enum engine_node_state state;
>
> -    /* Method to initialize data. It may be NULL. */
> -    void (*init)(struct engine_node *);
> +    /* Method to allocate and initialize node data. It may be NULL.
> +     * The user supplied argument 'arg' is passed from the call to
> +     * engine_init().
> +     */
> +    void *(*init)(struct engine_node *node, struct engine_arg *arg);
>
>      /* Method to clean up data. It may be NULL. */
> -    void (*cleanup)(struct engine_node *);
> +    void (*cleanup)(void *data);
>
>      /* Fully processes all inputs of this node and regenerates the data
> -     * of this node */
> -    void (*run)(struct engine_node *);
> +     * of this node. The pointer to the node's data is passed as
argument.
> +     */
> +    void (*run)(struct engine_node *node, void *data);
> +
> +    /* Method to validate if the 'internal_data' is valid. This allows
users
> +     * to customize when 'data' can be used (e.g., even if the node
> +     * hasn't been refreshed in the last iteration, if 'data'
> +     * doesn't store pointers to DB records it's still safe to use).
> +     */
> +    bool (*is_valid)(struct engine_node *);
>  };
>
>  /* Initialize the data for the engine nodes. It calls each node's
> - * init() method if not NULL. It should be called before the main loop.
*/
> -void engine_init(struct engine_node *node);
> + * init() method if not NULL passing the user supplied 'arg'.
> + * It should be called before the main loop. */
> +void engine_init(struct engine_node *node, struct engine_arg *arg);
>
>  /* Initialize the engine nodes for a new run. It should be called in the
>   * main processing loop before every potential engine_run().
> @@ -155,12 +173,15 @@ bool engine_need_run(void);
>  struct engine_node * engine_get_input(const char *input_name,
>                                        struct engine_node *);
>
> +/* Get the data from the input node with <name> for <node> */
> +void *engine_get_input_data(const char *input_name, struct engine_node
*);
> +
>  /* Add an input (dependency) for <node>, with corresponding
change_handler,
>   * which can be NULL. If the change_handler is NULL, the engine will not
>   * be able to process the change incrementally, and will fall back to
call
>   * the run method to recompute. */
>  void engine_add_input(struct engine_node *node, struct engine_node
*input,
> -                      bool (*change_handler)(struct engine_node *));
> +                      bool (*change_handler)(struct engine_node *, void
*));
>
>  /* Force the engine to recompute everything if set to true. It is used
>   * in circumstances when we are not sure there is change or not, or
> @@ -185,6 +206,25 @@ bool engine_has_run(void);
>  /* Returns true if during the last engine run we had to abort
processing. */
>  bool engine_aborted(void);
>
> +/* Return a pointer to node data accessible for users outside the
processing
> + * engine. If the node data is not valid (e.g., last engine_run() failed
or
> + * didn't happen), the node's is_valid() method is used to determine if
the
> + * data can be safely accessed. If it's not the case, the function
returns
> + * NULL.
> + * The content of the data should be changed only by the change_handlers
> + * and run() function of the current node. Users should ensure that the
> + * data is read-only in change-handlers of the nodes that depends on this
> + * node.
> + */
> +void *engine_get_data(struct engine_node *node);
> +
> +/* Return a pointer to node data *without* performing any sanity checks
on
> + * the state of the node. This may be used only in specific cases when
data
> + * is guaranteed to be valid, e.g., immediately after initialization and
> + * before the first engine_run().
> + */
> +void *engine_get_internal_data(struct engine_node *node);
> +
>  /* Set the state of the node and log changes. */
>  #define engine_set_node_state(node, state) \
>      engine_set_node_state_at(node, state, OVS_SOURCE_LOCATOR)
> @@ -201,30 +241,42 @@ struct ed_type_ovsdb_table {
>  };
>
>  #define EN_OVSDB_GET(NODE) \
> -    (((struct ed_type_ovsdb_table *)NODE->data)->table)
> +    (((struct ed_type_ovsdb_table *)(NODE)->data)->table)
>
>  struct ovsdb_idl_index * engine_ovsdb_node_get_index(struct engine_node
*,
>                                                       const char *name);
>
> +/* Adds an OVSDB IDL index to the node. This should be called only after
> + * engine_init() as the index is stored in the node data.
> + */
>  void engine_ovsdb_node_add_index(struct engine_node *, const char *name,
>                                   struct ovsdb_idl_index *);
>
>  /* Macro to define an engine node. */
> -#define ENGINE_NODE(NAME, NAME_STR) \
> +#define ENGINE_NODE_DEF(NAME, NAME_STR) \
>      struct engine_node en_##NAME = { \
>          .name = NAME_STR, \
> -        .data = &ed_##NAME, \
> +        .data = NULL, \
>          .state = EN_STALE, \
>          .init = en_##NAME##_init, \
>          .run = en_##NAME##_run, \
>          .cleanup = en_##NAME##_cleanup, \
> +        .is_valid = en_##NAME##_is_valid, \
>      };
>
> +#define ENGINE_NODE_CUSTOM_DATA(NAME, NAME_STR) \
> +    ENGINE_NODE_DEF(NAME, NAME_STR)
> +
> +#define ENGINE_NODE(NAME, NAME_STR) \
> +    static bool (*en_##NAME##_is_valid)(struct engine_node *node) =
NULL; \
> +    ENGINE_NODE_DEF(NAME, NAME_STR)
> +
>  /* Macro to define member functions of an engine node which represents
>   * a table of OVSDB */
>  #define ENGINE_FUNC_OVSDB(DB_NAME, TBL_NAME) \
>  static void \
> -en_##DB_NAME##_##TBL_NAME##_run(struct engine_node *node) \
> +en_##DB_NAME##_##TBL_NAME##_run(struct engine_node *node, \
> +                                void *data OVS_UNUSED) \
>  { \
>      const struct DB_NAME##rec_##TBL_NAME##_table *table = \
>          EN_OVSDB_GET(node); \
> @@ -234,10 +286,18 @@ en_##DB_NAME##_##TBL_NAME##_run(struct engine_node
*node) \
>      } \
>      engine_set_node_state(node, EN_VALID); \
>  } \
> -static void (*en_##DB_NAME##_##TBL_NAME##_init)(struct engine_node
*node) \
> -            = NULL; \
> -static void (*en_##DB_NAME##_##TBL_NAME##_cleanup)(struct engine_node
*node) \
> -            = NULL;
> +static void *en_##DB_NAME##_##TBL_NAME##_init( \
> +    struct engine_node *node OVS_UNUSED, \
> +    struct engine_arg *arg) \
> +{ \
> +    struct ovsdb_idl *idl = arg->DB_NAME##_idl; \
> +    struct ed_type_ovsdb_table *data = xzalloc(sizeof *data); \
> +    data->table = DB_NAME##rec_##TBL_NAME##_table_get(idl); \
> +    return data; \
> +} \
> +static void en_##DB_NAME##_##TBL_NAME##_cleanup(void *data OVS_UNUSED) \
> +{ \
> +}
>
>  /* Macro to define member functions of an engine node which represents
>   * a table of OVN SB DB */
> @@ -250,21 +310,16 @@ static void
(*en_##DB_NAME##_##TBL_NAME##_cleanup)(struct engine_node *node) \
>      ENGINE_FUNC_OVSDB(ovs, TBL_NAME)
>
>  /* Macro to define an engine node which represents a table of OVSDB */
> -#define ENGINE_NODE_OVSDB(DB_NAME, DB_NAME_STR, TBL_NAME, TBL_NAME_STR,
IDL) \
> -    struct ed_type_ovsdb_table ed_##DB_NAME##_##TBL_NAME; \
> -    memset(&ed_##DB_NAME##_##TBL_NAME, 0, sizeof
ed_##DB_NAME##_##TBL_NAME); \
> -    ovs_assert(IDL); \
> -    ed_##DB_NAME##_##TBL_NAME.table = \
> -        DB_NAME##rec_##TBL_NAME##_table_get(IDL); \
> +#define ENGINE_NODE_OVSDB(DB_NAME, DB_NAME_STR, TBL_NAME, TBL_NAME_STR) \
>      ENGINE_NODE(DB_NAME##_##TBL_NAME, DB_NAME_STR"_"TBL_NAME_STR)
>
>  /* Macro to define an engine node which represents a table of OVN SB DB
*/
>  #define ENGINE_NODE_SB(TBL_NAME, TBL_NAME_STR) \
> -    ENGINE_NODE_OVSDB(sb, "SB", TBL_NAME, TBL_NAME_STR,
ovnsb_idl_loop.idl);
> +    ENGINE_NODE_OVSDB(sb, "SB", TBL_NAME, TBL_NAME_STR);
>
>  /* Macro to define an engine node which represents a table of
open_vswitch
>   * DB */
>  #define ENGINE_NODE_OVS(TBL_NAME, TBL_NAME_STR) \
> -    ENGINE_NODE_OVSDB(ovs, "OVS", TBL_NAME, TBL_NAME_STR,
ovs_idl_loop.idl);
> +    ENGINE_NODE_OVSDB(ovs, "OVS", TBL_NAME, TBL_NAME_STR);
>
>  #endif /* lib/inc-proc-eng.h */
> --
> 1.8.3.1
>

Thanks again. I applied this to master.
_______________________________________________
dev mailing list
[email protected]
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to