On Thu, Aug 10, 2023 at 2:45 PM Dumitru Ceara <dce...@redhat.com> wrote:

> It's similar to the processing we do for address sets.  There's a bit
> more mechanics involved due to the fact that we need to split NB port
> groups per datapath.
>
> We currently only partially implement incremental processing of
> port_group changes in the lflow node.  That is, we deal with the case
> when the sets of "switches per port group" doesn't change.  In that
> specific case ACL lflows don't need to be reprocessed.
>
> In a synthetic benchmark that created (in this order):
> - 500 switches
> - 2000 port groups
> - 4 ACLs per port group
> - 10000 ports distributed equally between the switches and port groups
>
> we measured the following ovn-northd CPU usage:
>
>   +-------------------------+------------+--------------------+
>   | Incremental processing? | --wait=sb? | northd avg cpu (%) |
>   +-------------------------+------------+--------------------+
>   |           N             |     Y      |        84.2        |
>   +-------------------------+------------+--------------------+
>   |           Y             |     Y      |        41.5        |
>   +-------------------------+------------+--------------------+
>   |           N             |     N      |        93.2        |
>   +-------------------------+------------+--------------------+
>   |           Y             |     N      |        53.6        |
>   +-------------------------+------------+--------------------+
>
> where '--wait=sb' set to 'Y'  means the benchmark was waiting for the
> port and port group operations to be propagated to the Southbound DB
> before continuing to the next operation.
>
> Reported-at: https://bugzilla.redhat.com/show_bug.cgi?id=2228162
> Signed-off-by: Dumitru Ceara <dce...@redhat.com>
> ---
>

Hi Dumitru,

I have a couple of comments down below.

 northd/en-lflow.c        |   17 ++
>  northd/en-lflow.h        |    1
>  northd/en-port-group.c   |  451
> ++++++++++++++++++++++++++++++++++++++++------
>  northd/en-port-group.h   |   36 +++-
>  northd/inc-proc-northd.c |   13 +
>  northd/ovn-northd.c      |    4
>  tests/ovn-northd.at      |  246 +++++++++++++++++++++++++
>  7 files changed, 708 insertions(+), 60 deletions(-)
>
> diff --git a/northd/en-lflow.c b/northd/en-lflow.c
> index 7f6a7872b2..1321f79036 100644
> --- a/northd/en-lflow.c
> +++ b/northd/en-lflow.c
> @@ -119,6 +119,23 @@ lflow_northd_handler(struct engine_node *node,
>      return true;
>  }
>
> +bool
> +lflow_port_group_handler(struct engine_node *node, void *data OVS_UNUSED)
> +{
> +    struct port_group_data *pg_data =
> +        engine_get_input_data("port_group", node);
> +
> +    /* If the set of switches per port group didn't change then there's no
> +     * need to reprocess lflows.  Otherwise, there might be a need to add
> +     * port-group ACLs to new switches. */
> +    if (!pg_data->ls_port_groups_sets_unchanged) {
> +        return false;
> +    }
> +
> +    engine_set_node_state(node, EN_UPDATED);
> +    return true;
> +}
> +
>  void *en_lflow_init(struct engine_node *node OVS_UNUSED,
>                       struct engine_arg *arg OVS_UNUSED)
>  {
> diff --git a/northd/en-lflow.h b/northd/en-lflow.h
> index 5e3fbc25e3..5417b2faff 100644
> --- a/northd/en-lflow.h
> +++ b/northd/en-lflow.h
> @@ -13,5 +13,6 @@ void en_lflow_run(struct engine_node *node, void *data);
>  void *en_lflow_init(struct engine_node *node, struct engine_arg *arg);
>  void en_lflow_cleanup(void *data);
>  bool lflow_northd_handler(struct engine_node *, void *data);
> +bool lflow_port_group_handler(struct engine_node *, void *data);
>
>  #endif /* EN_LFLOW_H */
> diff --git a/northd/en-port-group.c b/northd/en-port-group.c
> index 2c36410246..6902695a01 100644
> --- a/northd/en-port-group.c
> +++ b/northd/en-port-group.c
> @@ -33,15 +33,46 @@ static struct ls_port_group *ls_port_group_create(
>  static void ls_port_group_destroy(struct ls_port_group_table *,
>                                    struct ls_port_group *);
>
> +static bool ls_port_group_process(
> +    struct ls_port_group_table *,
> +    struct port_group_to_ls_table *,
> +    const struct hmap *ls_ports,
> +    const struct nbrec_port_group *,
> +    struct hmapx *updated_ls_port_groups
> +);
> +
> +static void ls_port_group_record_clear(
> +    struct ls_port_group_table *,
> +    struct port_group_to_ls *,
> +    struct hmapx *updated_ls_port_groups);
> +static void ls_port_group_record_prune(struct ls_port_group *);
> +
>  static struct ls_port_group_record *ls_port_group_record_add(
>      struct ls_port_group *,
>      const struct nbrec_port_group *,
>      const char *port_name);
>
> +static struct ls_port_group_record *ls_port_group_record_find(
> +    struct ls_port_group *, const struct nbrec_port_group *nb_pg);
> +
>  static void ls_port_group_record_destroy(
>      struct ls_port_group *,
>      struct ls_port_group_record *);
>
> +static struct port_group_to_ls *port_group_to_ls_create(
> +    struct port_group_to_ls_table *,
> +    const struct nbrec_port_group *);
> +static void port_group_to_ls_destroy(struct port_group_to_ls_table *,
> +                                     struct port_group_to_ls *);
> +
> +static void update_sb_port_group(struct sorted_array *nb_ports,
> +                                 const struct sbrec_port_group *sb_pg);
> +static void sync_port_group(struct ovsdb_idl_txn *, const char
> *sb_pg_name,
> +                            struct sorted_array *ports,
> +                            struct shash *sb_port_groups);
> +static const struct sbrec_port_group *sb_port_group_lookup_by_name(
> +    struct ovsdb_idl_index *sbrec_port_group_by_name, const char *name);
> +
>  void
>  ls_port_group_table_init(struct ls_port_group_table *table)
>  {
> @@ -82,39 +113,16 @@ ls_port_group_table_find(const struct
> ls_port_group_table *table,
>  }
>
>  void
> -ls_port_group_table_build(struct ls_port_group_table *ls_port_groups,
> -                          const struct nbrec_port_group_table *pg_table,
> -                          const struct hmap *ls_ports)
> +ls_port_group_table_build(
> +    struct ls_port_group_table *ls_port_groups,
> +    struct port_group_to_ls_table *port_group_to_switches,
> +    const struct nbrec_port_group_table *pg_table,
> +    const struct hmap *ls_ports)
>  {
>      const struct nbrec_port_group *nb_pg;
>      NBREC_PORT_GROUP_TABLE_FOR_EACH (nb_pg, pg_table) {
> -        for (size_t i = 0; i < nb_pg->n_ports; i++) {
> -            const char *port_name = nb_pg->ports[i]->name;
> -            const struct ovn_datapath *od =
> -                northd_get_datapath_for_port(ls_ports, port_name);
> -
> -            if (!od) {
> -                static struct vlog_rate_limit rl =
> VLOG_RATE_LIMIT_INIT(1, 1);
> -                VLOG_ERR_RL(&rl, "lport %s in port group %s not found.",
> -                            port_name, nb_pg->name);
> -                continue;
> -            }
> -
> -            if (!od->nbs) {
> -                static struct vlog_rate_limit rl =
> VLOG_RATE_LIMIT_INIT(1, 1);
> -                VLOG_WARN_RL(&rl, "lport %s in port group %s has no
> lswitch.",
> -                             nb_pg->ports[i]->name,
> -                             nb_pg->name);
> -                continue;
> -            }
> -
> -            struct ls_port_group *ls_pg =
> -                ls_port_group_table_find(ls_port_groups, od->nbs);
> -            if (!ls_pg) {
> -                ls_pg = ls_port_group_create(ls_port_groups, od->nbs,
> od->sb);
> -            }
> -            ls_port_group_record_add(ls_pg, nb_pg, port_name);
> -        }
> +        ls_port_group_process(ls_port_groups, port_group_to_switches,
> +                              ls_ports, nb_pg, NULL);
>      }
>  }
>
> @@ -145,18 +153,11 @@ ls_port_group_table_sync(
>              get_sb_port_group_name(ls_pg_rec->nb_pg->name,
>                                     ls_pg->sb_datapath_key,
>                                     &sb_name);
> -            sb_port_group = shash_find_and_delete(&sb_port_groups,
> -                                                  ds_cstr(&sb_name));
> -            if (!sb_port_group) {
> -                sb_port_group = sbrec_port_group_insert(ovnsb_txn);
> -                sbrec_port_group_set_name(sb_port_group,
> ds_cstr(&sb_name));
> -            }
> -
> -            const char **nb_port_names = sset_array(&ls_pg_rec->ports);
> -            sbrec_port_group_set_ports(sb_port_group,
> -                                       nb_port_names,
> -                                       sset_count(&ls_pg_rec->ports));
> -            free(nb_port_names);
> +            struct sorted_array ports =
> +                sorted_array_from_sset(&ls_pg_rec->ports);
> +            sync_port_group(ovnsb_txn, ds_cstr(&sb_name),
> +                            &ports, &sb_port_groups);
> +            sorted_array_destroy(&ports);
>          }
>      }
>      ds_destroy(&sb_name);
> @@ -201,31 +202,165 @@ ls_port_group_destroy(struct ls_port_group_table
> *ls_port_groups,
>      }
>  }
>
> +/* Process a NB.Port_Group record and stores any updated ls_port_groups
> + * in updated_ls_port_groups.  Returns true if a new ls_port_group had
> + * to be created or destroyed.
> + */
> +static bool
> +ls_port_group_process(struct ls_port_group_table *ls_port_groups,
> +                      struct port_group_to_ls_table
> *port_group_to_switches,
> +                      const struct hmap *ls_ports,
> +                      const struct nbrec_port_group *nb_pg,
> +                      struct hmapx *updated_ls_port_groups)
> +{
> +    struct hmapx cleared_ls_port_groups =
> +        HMAPX_INITIALIZER(&cleared_ls_port_groups);
> +    bool ls_port_group_created = false;
> +
> +    struct port_group_to_ls *pg_ls =
> +        port_group_to_ls_table_find(port_group_to_switches, nb_pg);
> +    if (!pg_ls) {
> +        pg_ls = port_group_to_ls_create(port_group_to_switches, nb_pg);
> +    } else {
> +        /* Clear all old records corresponding to this port group; we'll
> +         * reprocess it below. */
> +        ls_port_group_record_clear(ls_port_groups, pg_ls,
> +                                   &cleared_ls_port_groups);
> +    }
> +
> +    for (size_t i = 0; i < nb_pg->n_ports; i++) {
> +        const char *port_name = nb_pg->ports[i]->name;
> +        const struct ovn_datapath *od =
> +            northd_get_datapath_for_port(ls_ports, port_name);
> +
> +        if (!od) {
> +            static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
> +            VLOG_ERR_RL(&rl, "lport %s in port group %s not found.",
> +                        port_name, nb_pg->name);
> +            continue;
> +        }
> +
> +        if (!od->nbs) {
> +            static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
> +            VLOG_WARN_RL(&rl, "lport %s in port group %s has no lswitch.",
> +                         nb_pg->ports[i]->name,
> +                         nb_pg->name);
> +            continue;
> +        }
> +
> +        struct ls_port_group *ls_pg =
> +            ls_port_group_table_find(ls_port_groups, od->nbs);
> +        if (!ls_pg) {
> +            ls_pg = ls_port_group_create(ls_port_groups, od->nbs, od->sb);
> +            ls_port_group_created = true;
> +        }
> +        ls_port_group_record_add(ls_pg, nb_pg, port_name);
> +        hmapx_add(&pg_ls->switches,
> +                  CONST_CAST(struct nbrec_logical_switch *, od->nbs));
> +        if (updated_ls_port_groups) {
> +            hmapx_add(updated_ls_port_groups, ls_pg);
> +        }
> +    }
> +
> +    bool ls_port_group_destroyed = false;
> +    struct hmapx_node *node;
> +    HMAPX_FOR_EACH (node, &cleared_ls_port_groups) {
> +        struct ls_port_group *ls_pg = node->data;
> +
> +        ls_port_group_record_prune(ls_pg);
> +
> +        if (hmap_is_empty(&ls_pg->nb_pgs)) {
> +            ls_port_group_destroy(ls_port_groups, ls_pg);
> +            ls_port_group_destroyed = true;
> +        }
> +    }
> +    hmapx_destroy(&cleared_ls_port_groups);
> +
> +    return ls_port_group_created || ls_port_group_destroyed;
> +}
> +
> +/* Destroys all the struct ls_port_group_record that might be associated
> to
> + * northbound database logical switches.  Stores ls_port_groups that
> became
> + * were updated in the 'updated_ls_port_groups' map.
> + */
> +static void
> +ls_port_group_record_clear(struct ls_port_group_table *ls_port_groups,
> +                           struct port_group_to_ls *pg_ls,
> +                           struct hmapx *cleared_ls_port_groups)
> +{
> +    struct hmapx_node *node;
> +
> +    HMAPX_FOR_EACH (node, &pg_ls->switches) {
> +        const struct nbrec_logical_switch *nbs = node->data;
> +
> +        struct ls_port_group *ls_pg =
> +            ls_port_group_table_find(ls_port_groups, nbs);
> +        if (!ls_pg) {
> +            continue;
> +        }
> +
> +        /* Clear ports in the port group record. */
> +        struct ls_port_group_record *ls_pg_rec =
> +            ls_port_group_record_find(ls_pg, pg_ls->nb_pg);
> +        if (!ls_pg_rec) {
> +            continue;
> +        }
> +
> +        sset_clear(&ls_pg_rec->ports);
> +        hmapx_add(cleared_ls_port_groups, ls_pg);
> +    }
> +}
> +
> +static void
> +ls_port_group_record_prune(struct ls_port_group *ls_pg)
> +{
> +    struct ls_port_group_record *ls_pg_rec;
> +
> +    HMAP_FOR_EACH_SAFE (ls_pg_rec, key_node, &ls_pg->nb_pgs) {
> +        if (sset_is_empty(&ls_pg_rec->ports)) {
> +            ls_port_group_record_destroy(ls_pg, ls_pg_rec);
> +        }
> +    }
> +}
> +
>  static struct ls_port_group_record *
>  ls_port_group_record_add(struct ls_port_group *ls_pg,
>                           const struct nbrec_port_group *nb_pg,
>                           const char *port_name)
>  {
> -    struct ls_port_group_record *ls_pg_rec = NULL;
> +    struct ls_port_group_record *ls_pg_rec =
> +        ls_port_group_record_find(ls_pg, nb_pg);
>      size_t hash = uuid_hash(&nb_pg->header_.uuid);
>
> -    HMAP_FOR_EACH_WITH_HASH (ls_pg_rec, key_node, hash, &ls_pg->nb_pgs) {
> -        if (ls_pg_rec->nb_pg == nb_pg) {
> -            goto done;
> -        }
> +    if (!ls_pg_rec) {
> +        ls_pg_rec = xzalloc(sizeof *ls_pg_rec);
>

nit: No need for zeroed alloc as all the fields are immediately overwritten.

+        *ls_pg_rec = (struct ls_port_group_record) {
> +            .nb_pg = nb_pg,
> +            .ports = SSET_INITIALIZER(&ls_pg_rec->ports),
> +        };
> +        hmap_insert(&ls_pg->nb_pgs, &ls_pg_rec->key_node, hash);
>      }
>
> -    ls_pg_rec = xzalloc(sizeof *ls_pg_rec);
> -    *ls_pg_rec = (struct ls_port_group_record) {
> -        .nb_pg = nb_pg,
> -        .ports = SSET_INITIALIZER(&ls_pg_rec->ports),
> -    };
> -    hmap_insert(&ls_pg->nb_pgs, &ls_pg_rec->key_node, hash);
> -done:
>      sset_add(&ls_pg_rec->ports, port_name);
>      return ls_pg_rec;
>  }
>
> +static struct ls_port_group_record *
> +ls_port_group_record_find(struct ls_port_group *ls_pg,
> +                          const struct nbrec_port_group *nb_pg)
> +{
> +    size_t hash = uuid_hash(&nb_pg->header_.uuid);
> +    struct ls_port_group_record *ls_pg_rec;
> +
> +    HMAP_FOR_EACH_WITH_HASH (ls_pg_rec, key_node, hash, &ls_pg->nb_pgs) {
> +        if (ls_pg_rec->nb_pg == nb_pg) {
> +            return ls_pg_rec;
> +        }
> +    }
> +    return NULL;
> +}
> +
> +
>  static void
>  ls_port_group_record_destroy(struct ls_port_group *ls_pg,
>                               struct ls_port_group_record *ls_pg_rec)
> @@ -237,6 +372,71 @@ ls_port_group_record_destroy(struct ls_port_group
> *ls_pg,
>      }
>  }
>
> +void
> +port_group_to_ls_table_init(struct port_group_to_ls_table *table)
> +{
> +    *table = (struct port_group_to_ls_table) {
> +        .entries = HMAP_INITIALIZER(&table->entries),
> +    };
> +}
> +
> +void
> +port_group_to_ls_table_clear(struct port_group_to_ls_table *table)
> +{
> +    struct port_group_to_ls *pg_ls;
> +    HMAP_FOR_EACH_SAFE (pg_ls, key_node, &table->entries) {
> +        port_group_to_ls_destroy(table, pg_ls);
> +    }
> +}
> +
> +void
> +port_group_to_ls_table_destroy(struct port_group_to_ls_table *table)
> +{
> +    port_group_to_ls_table_clear(table);
> +    hmap_destroy(&table->entries);
> +}
> +
> +struct port_group_to_ls *
> +port_group_to_ls_table_find(const struct port_group_to_ls_table *table,
> +                            const struct nbrec_port_group *nb_pg)
> +{
> +    struct port_group_to_ls *pg_ls;
> +
> +    HMAP_FOR_EACH_WITH_HASH (pg_ls, key_node,
> uuid_hash(&nb_pg->header_.uuid),
> +                             &table->entries) {
>

We should move the uuid_hash call outside the loop.


> +        if (nb_pg == pg_ls->nb_pg) {
> +            return pg_ls;
> +        }
> +    }
> +    return NULL;
> +}
> +
> +static struct port_group_to_ls *
> +port_group_to_ls_create(struct port_group_to_ls_table *table,
> +                        const struct nbrec_port_group *nb_pg)
> +{
> +    struct port_group_to_ls *pg_ls = xmalloc(sizeof *pg_ls);
> +
> +    *pg_ls = (struct port_group_to_ls) {
> +        .nb_pg = nb_pg,
> +        .switches = HMAPX_INITIALIZER(&pg_ls->switches),
> +    };
> +    hmap_insert(&table->entries, &pg_ls->key_node,
> +                uuid_hash(&nb_pg->header_.uuid));
> +    return pg_ls;
> +}
> +
> +static void
> +port_group_to_ls_destroy(struct port_group_to_ls_table *table,
> +                         struct port_group_to_ls *pg_ls)
> +{
> +    if (pg_ls) {
> +        hmapx_destroy(&pg_ls->switches);
> +        hmap_remove(&table->entries, &pg_ls->key_node);
> +        free(pg_ls);
> +    }
> +}
> +
>  /* Incremental processing implementation. */
>  static struct port_group_input
>  port_group_get_input_data(struct engine_node *node)
> @@ -259,6 +459,7 @@ en_port_group_init(struct engine_node *node OVS_UNUSED,
>      struct port_group_data *pg_data = xmalloc(sizeof *pg_data);
>
>      ls_port_group_table_init(&pg_data->ls_port_groups);
> +    port_group_to_ls_table_init(&pg_data->port_groups_to_ls);
>      return pg_data;
>  }
>
> @@ -268,6 +469,15 @@ en_port_group_cleanup(void *data_)
>      struct port_group_data *data = data_;
>
>      ls_port_group_table_destroy(&data->ls_port_groups);
> +    port_group_to_ls_table_destroy(&data->port_groups_to_ls);
> +}
> +
> +void
> +en_port_group_clear_tracked_data(void *data_)
> +{
> +    struct port_group_data *data = data_;
> +
> +    data->ls_port_groups_sets_unchanged = false;
>  }
>
>  void
> @@ -280,7 +490,10 @@ en_port_group_run(struct engine_node *node, void
> *data_)
>      stopwatch_start(PORT_GROUP_RUN_STOPWATCH_NAME, time_msec());
>
>      ls_port_group_table_clear(&data->ls_port_groups);
> +    port_group_to_ls_table_clear(&data->port_groups_to_ls);
> +
>      ls_port_group_table_build(&data->ls_port_groups,
> +                              &data->port_groups_to_ls,
>                                input_data.nbrec_port_group_table,
>                                input_data.ls_ports);
>
> @@ -291,3 +504,133 @@ en_port_group_run(struct engine_node *node, void
> *data_)
>      stopwatch_stop(PORT_GROUP_RUN_STOPWATCH_NAME, time_msec());
>      engine_set_node_state(node, EN_UPDATED);
>  }
> +
> +bool
> +port_group_nb_port_group_handler(struct engine_node *node, void *data_)
> +{
> +    struct port_group_input input_data = port_group_get_input_data(node);
> +    struct port_group_data *data = data_;
> +    bool success = true;
> +
> +    const struct nbrec_port_group_table *nb_pg_table =
> +        EN_OVSDB_GET(engine_get_input("NB_port_group", node));
> +    const struct nbrec_port_group *nb_pg;
> +
> +    /* Return false if a port group is created or deleted.
> +     * Handle I-P for only updated port groups. */
> +    NBREC_PORT_GROUP_TABLE_FOR_EACH_TRACKED (nb_pg, nb_pg_table) {
> +        if (nbrec_port_group_is_new(nb_pg) ||
> +                nbrec_port_group_is_deleted(nb_pg)) {
> +            return false;
> +        }
> +    }
> +
> +    struct hmapx updated_ls_port_groups =
> +        HMAPX_INITIALIZER(&updated_ls_port_groups);
> +
> +    NBREC_PORT_GROUP_TABLE_FOR_EACH_TRACKED (nb_pg, nb_pg_table) {
> +        /* Newly created port groups can't be incrementally processed;
> +         * the rest yes. */
> +        if (ls_port_group_process(&data->ls_port_groups,
> +                                  &data->port_groups_to_ls,
> +                                  input_data.ls_ports,
> +                                  nb_pg, &updated_ls_port_groups)) {
> +            success = false;
> +            break;
> +        }
> +    }
> +
> +    /* If changes have been successfully processed incrementally then
> update
> +     * the SB too. */
> +    if (success) {
> +        struct ovsdb_idl_index *sbrec_port_group_by_name =
> +            engine_ovsdb_node_get_index(
> +                    engine_get_input("SB_port_group", node),
> +                    "sbrec_port_group_by_name");
> +        struct ds sb_pg_name = DS_EMPTY_INITIALIZER;
> +
> +        struct hmapx_node *updated_node;
> +        HMAPX_FOR_EACH (updated_node, &updated_ls_port_groups) {
> +            const struct ls_port_group *ls_pg = updated_node->data;
> +            struct ls_port_group_record *ls_pg_rec;
> +
> +            HMAP_FOR_EACH (ls_pg_rec, key_node, &ls_pg->nb_pgs) {
> +                get_sb_port_group_name(ls_pg_rec->nb_pg->name,
> +                                        ls_pg->sb_datapath_key,
> +                                        &sb_pg_name);
> +
> +                const struct sbrec_port_group *sb_pg =
> +                    sb_port_group_lookup_by_name(sbrec_port_group_by_name,
> +                                                 ds_cstr(&sb_pg_name));
> +                if (!sb_pg) {
> +                    success = false;
> +                    break;
> +                }
> +                struct sorted_array nb_ports =
> +                    sorted_array_from_sset(&ls_pg_rec->ports);
> +                update_sb_port_group(&nb_ports, sb_pg);
> +                sorted_array_destroy(&nb_ports);
> +            }
> +        }
> +        ds_destroy(&sb_pg_name);
> +    }
> +
> +    data->ls_port_groups_sets_unchanged = success;
> +    engine_set_node_state(node, EN_UPDATED);
> +    hmapx_destroy(&updated_ls_port_groups);
> +    return success;
> +}
> +
> +static void
> +sb_port_group_apply_diff(const void *arg, const char *item, bool add)
> +{
> +    const struct sbrec_port_group *pg = arg;
> +    if (add) {
> +        sbrec_port_group_update_ports_addvalue(pg, item);
> +    } else {
> +        sbrec_port_group_update_ports_delvalue(pg, item);
> +    }
> +}
> +
> +static void
> +update_sb_port_group(struct sorted_array *nb_ports,
> +                     const struct sbrec_port_group *sb_pg)
> +{
> +    struct sorted_array sb_ports = sorted_array_from_dbrec(sb_pg, ports);
> +    sorted_array_apply_diff(nb_ports, &sb_ports,
> +                            sb_port_group_apply_diff, sb_pg);
> +    sorted_array_destroy(&sb_ports);
> +}
> +
> +static void
> +sync_port_group(struct ovsdb_idl_txn *ovnsb_txn, const char *sb_pg_name,
> +                struct sorted_array *ports,
> +                struct shash *sb_port_groups)
> +{
> +    const struct sbrec_port_group *sb_port_group =
> +        shash_find_and_delete(sb_port_groups, sb_pg_name);
> +    if (!sb_port_group) {
> +        sb_port_group = sbrec_port_group_insert(ovnsb_txn);
> +        sbrec_port_group_set_name(sb_port_group, sb_pg_name);
> +        sbrec_port_group_set_ports(sb_port_group, ports->arr, ports->n);
> +    } else {
> +        update_sb_port_group(ports, sb_port_group);
> +    }
> +}
> +
> +/* Finds and returns the port group set with the given 'name', or NULL
> + * if no such port group exists. */
> +static const struct sbrec_port_group *
> +sb_port_group_lookup_by_name(struct ovsdb_idl_index
> *sbrec_port_group_by_name,
> +                             const char *name)
> +{
> +    struct sbrec_port_group *target = sbrec_port_group_index_init_row(
> +        sbrec_port_group_by_name);
> +    sbrec_port_group_index_set_name(target, name);
> +
> +    struct sbrec_port_group *retval = sbrec_port_group_index_find(
> +        sbrec_port_group_by_name, target);
> +
> +    sbrec_port_group_index_destroy_row(target);
> +    return retval;
> +}
> diff --git a/northd/en-port-group.h b/northd/en-port-group.h
> index 5cbf6c6c4a..c3975f64ee 100644
> --- a/northd/en-port-group.h
> +++ b/northd/en-port-group.h
> @@ -18,6 +18,7 @@
>
>  #include <stdint.h>
>
> +#include "lib/hmapx.h"
>  #include "lib/inc-proc-eng.h"
>  #include "lib/ovn-nb-idl.h"
>  #include "lib/ovn-sb-idl.h"
> @@ -54,9 +55,33 @@ struct ls_port_group *ls_port_group_table_find(
>      const struct ls_port_group_table *,
>      const struct nbrec_logical_switch *);
>
> -void ls_port_group_table_build(struct ls_port_group_table *ls_port_groups,
> -                               const struct nbrec_port_group_table *,
> -                               const struct hmap *ls_ports);
> +/* Per port group map of datapaths with ports in the group. */
> +struct port_group_to_ls_table {
> +    struct hmap entries; /* Stores struct port_group_to_ls. */
> +};
> +
> +struct port_group_to_ls {
> +    struct hmap_node key_node; /* Index on 'pg->header_.uuid'. */
> +
> +    const struct nbrec_port_group *nb_pg;
> +
> +    /* Map of 'struct nbrec_logical_switch *' with ports in the group. */
> +    struct hmapx switches;
> +};
> +
> +void port_group_to_ls_table_init(struct port_group_to_ls_table *);
> +void port_group_to_ls_table_clear(struct port_group_to_ls_table *);
> +void port_group_to_ls_table_destroy(struct port_group_to_ls_table *);
> +
> +struct port_group_to_ls *port_group_to_ls_table_find(
> +    const struct port_group_to_ls_table *,
> +    const struct nbrec_port_group *);
> +
> +void ls_port_group_table_build(
> +    struct ls_port_group_table *ls_port_groups,
> +    struct port_group_to_ls_table *port_group_to_switches,
> +    const struct nbrec_port_group_table *,
> +    const struct hmap *ls_ports);
>  void ls_port_group_table_sync(const struct ls_port_group_table
> *ls_port_groups,
>                                const struct sbrec_port_group_table *,
>                                struct ovsdb_idl_txn *ovnsb_txn);
> @@ -75,10 +100,15 @@ struct port_group_input {
>
>  struct port_group_data {
>      struct ls_port_group_table ls_port_groups;
> +    struct port_group_to_ls_table port_groups_to_ls;
> +    bool ls_port_groups_sets_unchanged;
>  };
>
>  void *en_port_group_init(struct engine_node *, struct engine_arg *);
>  void en_port_group_cleanup(void *data);
> +void en_port_group_clear_tracked_data(void *data);
>  void en_port_group_run(struct engine_node *, void *data);
>
> +bool port_group_nb_port_group_handler(struct engine_node *, void *data);
> +
>  #endif /* EN_PORT_GROUP_H */
> diff --git a/northd/inc-proc-northd.c b/northd/inc-proc-northd.c
> index 6d5f9e8d16..bd598ba5e2 100644
> --- a/northd/inc-proc-northd.c
> +++ b/northd/inc-proc-northd.c
> @@ -137,7 +137,7 @@ static ENGINE_NODE(mac_binding_aging_waker,
> "mac_binding_aging_waker");
>  static ENGINE_NODE(northd_output, "northd_output");
>  static ENGINE_NODE(sync_to_sb, "sync_to_sb");
>  static ENGINE_NODE(sync_to_sb_addr_set, "sync_to_sb_addr_set");
> -static ENGINE_NODE(port_group, "port_group");
> +static ENGINE_NODE_WITH_CLEAR_TRACK_DATA(port_group, "port_group");
>  static ENGINE_NODE(fdb_aging, "fdb_aging");
>  static ENGINE_NODE(fdb_aging_waker, "fdb_aging_waker");
>
> @@ -193,7 +193,7 @@ void inc_proc_northd_init(struct ovsdb_idl_loop *nb,
>      engine_add_input(&en_lflow, &en_sb_multicast_group, NULL);
>      engine_add_input(&en_lflow, &en_sb_igmp_group, NULL);
>      engine_add_input(&en_lflow, &en_northd, lflow_northd_handler);
> -    engine_add_input(&en_lflow, &en_port_group, NULL);
> +    engine_add_input(&en_lflow, &en_port_group, lflow_port_group_handler);
>
>      engine_add_input(&en_sync_to_sb_addr_set, &en_nb_address_set,
>                       sync_to_sb_addr_set_nb_address_set_handler);
> @@ -202,7 +202,8 @@ void inc_proc_northd_init(struct ovsdb_idl_loop *nb,
>      engine_add_input(&en_sync_to_sb_addr_set, &en_northd, NULL);
>      engine_add_input(&en_sync_to_sb_addr_set, &en_sb_address_set, NULL);
>
> -    engine_add_input(&en_port_group, &en_nb_port_group, NULL);
> +    engine_add_input(&en_port_group, &en_nb_port_group,
> +                     port_group_nb_port_group_handler);
>      engine_add_input(&en_port_group, &en_sb_port_group, NULL);
>      /* No need for an explicit handler for northd changes.  Port changes
>       * that affect port_groups trigger updates to the NB.Port_Group
> @@ -287,6 +288,12 @@ void inc_proc_northd_init(struct ovsdb_idl_loop *nb,
>                                  "sbrec_address_set_by_name",
>                                  sbrec_address_set_by_name);
>
> +    struct ovsdb_idl_index *sbrec_port_group_by_name
> +        = ovsdb_idl_index_create1(sb->idl, &sbrec_port_group_col_name);
> +    engine_ovsdb_node_add_index(&en_sb_port_group,
> +                                "sbrec_port_group_by_name",
> +                                sbrec_port_group_by_name);
> +
>      struct ovsdb_idl_index *sbrec_fdb_by_dp_and_port
>          = ovsdb_idl_index_create2(sb->idl, &sbrec_fdb_col_dp_key,
>                                    &sbrec_fdb_col_port_key);
> diff --git a/northd/ovn-northd.c b/northd/ovn-northd.c
> index 4fa1b039ea..44385d604c 100644
> --- a/northd/ovn-northd.c
> +++ b/northd/ovn-northd.c
> @@ -836,6 +836,10 @@ main(int argc, char *argv[])
>          ovsdb_idl_omit_alert(ovnsb_idl_loop.idl,
>                               &sbrec_multicast_group_columns[i]);
>      }
> +    for (size_t i = 0; i < SBREC_PORT_GROUP_N_COLUMNS; i++) {
> +        ovsdb_idl_omit_alert(ovnsb_idl_loop.idl,
> +                             &sbrec_port_group_columns[i]);
> +    }
>
>      unixctl_command_register("sb-connection-status", "", 0, 0,
>                               ovn_conn_show, ovnsb_idl_loop.idl);
> diff --git a/tests/ovn-northd.at b/tests/ovn-northd.at
> index 1a12513d7a..a04ba2b23f 100644
> --- a/tests/ovn-northd.at
> +++ b/tests/ovn-northd.at
> @@ -8936,6 +8936,252 @@ AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE
> inc-engine/show-stats sync_to_sb_a
>  AT_CLEANUP
>  ])
>
> +OVN_FOR_EACH_NORTHD_NO_HV([
> +AT_SETUP([Port group incremental processing])
> +ovn_start
> +
> +check ovn-nbctl ls-add sw1 \
> +  -- lsp-add sw1 sw1.1     \
> +  -- lsp-add sw1 sw1.2     \
> +  -- lsp-add sw1 sw1.3     \
> +  -- ls-add sw2            \
> +  -- lsp-add sw2 sw2.1     \
> +  -- lsp-add sw2 sw2.2     \
> +  -- lsp-add sw2 sw2.3
> +
> +check ovn-nbctl --wait=sb sync
> +sw1_key=$(fetch_column Datapath_Binding tunnel_key external_ids:name=sw1)
> +sw2_key=$(fetch_column Datapath_Binding tunnel_key external_ids:name=sw2)
> +
> +check_acl_lflows() {
> +AT_CHECK_UNQUOTED([ovn-sbctl lflow-list sw1 | grep ls_in_acl_eval | grep
> eth.src==41:41:41:41:41:41 -c], [ignore], [dnl
> +$1
> +])
> +AT_CHECK_UNQUOTED([ovn-sbctl lflow-list sw1 | grep ls_in_acl_eval | grep
> eth.src==42:42:42:42:42:42 -c], [ignore], [dnl
> +$2
> +])
> +AT_CHECK_UNQUOTED([ovn-sbctl lflow-list sw2 | grep ls_in_acl_eval | grep
> eth.src==41:41:41:41:41:41 -c], [ignore], [dnl
> +$3
> +])
> +AT_CHECK_UNQUOTED([ovn-sbctl lflow-list sw2 | grep ls_in_acl_eval | grep
> eth.src==42:42:42:42:42:42 -c], [ignore], [dnl
> +$4
> +])
> +}
> +
> +AS_BOX([Create new PG1 and PG2])
> +check as northd ovn-appctl -t NORTHD_TYPE inc-engine/clear-stats
> +check ovn-nbctl --wait=sb -- pg-add pg1 -- pg-add pg2
> +dnl The northd node should not recompute, it should handle nb_global
> update
> +dnl though, therefore "compute: 1".
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
> northd], [0], [dnl
> +Node: northd
> +- recompute:            0
> +- compute:              1
> +- abort:                0
> +])
> +dnl The port_group node recomputes every time a NB port group is
> added/deleted.
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
> port_group], [0], [dnl
> +Node: port_group
> +- recompute:            1
> +- compute:              0
> +- abort:                0
> +])
> +dnl The port_group node is an input for the lflow node.  Port_group
> +dnl recompute/compute triggers lflow recompute.
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
> lflow], [0], [dnl
> +Node: lflow
> +- recompute:            1
> +- compute:              0
> +- abort:                0
> +])
> +
> +AS_BOX([Add ACLs on PG1 and PG2])
> +check ovn-nbctl --wait=sb             \
> +  -- acl-add pg1 from-lport 1 eth.src==41:41:41:41:41:41 allow \
> +  -- acl-add pg2 from-lport 1 eth.src==42:42:42:42:42:42 allow
> +
> +AS_BOX([Add one port from the two switches to PG1])
> +check as northd ovn-appctl -t NORTHD_TYPE inc-engine/clear-stats
> +check ovn-nbctl --wait=sb         \
> +  -- pg-set-ports pg1 sw1.1 sw2.1
> +check_column "sw1.1" sb:Port_Group ports name="${sw1_key}_pg1"
> +check_column "sw2.1" sb:Port_Group ports name="${sw2_key}_pg1"
> +
> +dnl The northd node should not recompute, it should handle nb_global
> update
> +dnl though, therefore "compute: 1".
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
> northd], [0], [dnl
> +Node: northd
> +- recompute:            0
> +- compute:              1
> +- abort:                0
> +])
> +dnl The port_group node recomputes also every time a port from a new
> switch
> +dnl is added to the group.
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
> port_group], [0], [dnl
> +Node: port_group
> +- recompute:            1
> +- compute:              0
> +- abort:                0
> +])
> +dnl The port_group node is an input for the lflow node.  Port_group
> +dnl recompute/compute triggers lflow recompute.
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
> lflow], [0], [dnl
> +Node: lflow
> +- recompute:            1
> +- compute:              0
> +- abort:                0
> +])
> +dnl Expect ACL1 on sw1 and sw2
> +check_acl_lflows 1 0 1 0
> +
> +AS_BOX([Add one port from the two switches to PG2])
> +check as northd ovn-appctl -t NORTHD_TYPE inc-engine/clear-stats
> +check ovn-nbctl --wait=sb \
> +  -- pg-set-ports pg2 sw1.2 sw2.2
> +check_column "sw1.1" sb:Port_Group ports name="${sw1_key}_pg1"
> +check_column "sw2.1" sb:Port_Group ports name="${sw2_key}_pg1"
> +check_column "sw1.2" sb:Port_Group ports name="${sw1_key}_pg2"
> +check_column "sw2.2" sb:Port_Group ports name="${sw2_key}_pg2"
> +
> +dnl The northd node should not recompute, it should handle nb_global
> update
> +dnl though, therefore "compute: 1".
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
> northd], [0], [dnl
> +Node: northd
> +- recompute:            0
> +- compute:              1
> +- abort:                0
> +])
> +dnl The port_group node recomputes also every time a port from a new
> switch
> +dnl is added to the group.
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
> port_group], [0], [dnl
> +Node: port_group
> +- recompute:            1
> +- compute:              0
> +- abort:                0
> +])
> +dnl The port_group node is an input for the lflow node.  Port_group
> +dnl recompute/compute triggers lflow recompute (for ACLs).
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
> lflow], [0], [dnl
> +Node: lflow
> +- recompute:            1
> +- compute:              0
> +- abort:                0
> +])
> +dnl Expect both ACLs on sw1 and sw2
> +check_acl_lflows 1 1 1 1
> +
> +AS_BOX([Add one more port from the two switches to PG1 and PG2])
> +check as northd ovn-appctl -t NORTHD_TYPE inc-engine/clear-stats
> +check ovn-nbctl --wait=sb                     \
> +  -- pg-set-ports pg1 sw1.1 sw2.1 sw1.3 sw2.3 \
> +  -- pg-set-ports pg2 sw1.2 sw2.2 sw1.3 sw2.3
> +check_column "sw1.1 sw1.3" sb:Port_Group ports name="${sw1_key}_pg1"
> +check_column "sw2.1 sw2.3" sb:Port_Group ports name="${sw2_key}_pg1"
> +check_column "sw1.2 sw1.3" sb:Port_Group ports name="${sw1_key}_pg2"
> +check_column "sw2.2 sw2.3" sb:Port_Group ports name="${sw2_key}_pg2"
> +
> +dnl The northd node should not recompute, it should handle nb_global
> update
> +dnl though, therefore "compute: 1".
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
> northd], [0], [dnl
> +Node: northd
> +- recompute:            0
> +- compute:              1
> +- abort:                0
> +])
> +dnl We did not change the set of switches a pg is applied to, there
> should be
> +dnl no recompute.
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
> port_group], [0], [dnl
> +Node: port_group
> +- recompute:            0
> +- compute:              1
> +- abort:                0
> +])
> +dnl We did not change the set of switches a pg is applied to, there
> should be
> +dnl no recompute.
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
> lflow], [0], [dnl
> +Node: lflow
> +- recompute:            0
> +- compute:              1
> +- abort:                0
> +])
> +dnl Expect both ACLs on sw1 and sw2
> +check_acl_lflows 1 1 1 1
> +
> +AS_BOX([Remove the last port from PG1 and PG2])
> +check as northd ovn-appctl -t NORTHD_TYPE inc-engine/clear-stats
> +check ovn-nbctl --wait=sb         \
> +  -- pg-set-ports pg1 sw1.1 sw2.1 \
> +  -- pg-set-ports pg2 sw1.2 sw2.2
> +check_column "sw1.1" sb:Port_Group ports name="${sw1_key}_pg1"
> +check_column "sw2.1" sb:Port_Group ports name="${sw2_key}_pg1"
> +check_column "sw1.2" sb:Port_Group ports name="${sw1_key}_pg2"
> +check_column "sw2.2" sb:Port_Group ports name="${sw2_key}_pg2"
> +
> +dnl The northd node should not recompute, it should handle nb_global
> update
> +dnl though, therefore "compute: 1".
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
> northd], [0], [dnl
> +Node: northd
> +- recompute:            0
> +- compute:              1
> +- abort:                0
> +])
> +dnl We did not change the set of switches a pg is applied to, there
> should be
> +dnl no recompute.
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
> port_group], [0], [dnl
> +Node: port_group
> +- recompute:            0
> +- compute:              1
> +- abort:                0
> +])
> +dnl We did not change the set of switches a pg is applied to, there
> should be
> +dnl no recompute.
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
> lflow], [0], [dnl
> +Node: lflow
> +- recompute:            0
> +- compute:              1
> +- abort:                0
> +])
> +dnl Expect both ACLs on sw1 and sw2
> +check_acl_lflows 1 1 1 1
> +
> +AS_BOX([Remove the second port from PG1 and PG2])
> +check as northd ovn-appctl -t NORTHD_TYPE inc-engine/clear-stats
> +check ovn-nbctl --wait=sb         \
> +  -- pg-set-ports pg1 sw1.1 \
> +  -- pg-set-ports pg2 sw1.2
> +check_column "sw1.1" sb:Port_Group ports name="${sw1_key}_pg1"
> +check_column "sw1.2" sb:Port_Group ports name="${sw1_key}_pg2"
> +
> +dnl The northd node should not recompute, it should handle nb_global
> update
> +dnl though, therefore "compute: 1".
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
> northd], [0], [dnl
> +Node: northd
> +- recompute:            0
> +- compute:              1
> +- abort:                0
> +])
> +dnl We did changed the set of switches a pg is applied to, there should be
> +dnl a recompute.
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
> port_group], [0], [dnl
> +Node: port_group
> +- recompute:            1
> +- compute:              0
> +- abort:                0
> +])
> +dnl We did changed the set of switches a pg is applied to, there should be
> +dnl a recompute (for ACLs).
> +AT_CHECK([as northd ovn-appctl -t NORTHD_TYPE inc-engine/show-stats
> lflow], [0], [dnl
> +Node: lflow
> +- recompute:            1
> +- compute:              0
> +- abort:                0
> +])
> +dnl Expect both ACLs on sw1 and not on sw2.
> +check_acl_lflows 1 1 0 0
> +
> +AT_CLEANUP
> +])
> +
>  OVN_FOR_EACH_NORTHD([
>  AT_SETUP([Check default drop])
>  AT_KEYWORDS([drop])
>
>
With that addressed:

Acked-by: Ales Musil <amu...@redhat.com>
-- 

Ales Musil

Senior Software Engineer - OVN Core

Red Hat EMEA <https://www.redhat.com>

amu...@redhat.com
<https://red.ht/sig>
_______________________________________________
dev mailing list
d...@openvswitch.org
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to