Hi Paolo,

On Mon, Nov 29, 2021, at 19:06, Paolo Valerio wrote:
> The purpose of this commit is to split the current way of storing the
> conn nodes. Before this patch the nodes were stored into a single cmap
> using ct->lock to avoid concurrent write access.
> With this commit a single connection can be stored into one or two (at
> most) CONNTRACK_BUCKETS available based on the outcome of the function
> hash_scale() on the key.

I think this approach overall works.
Removing the expiration list altogether might be worth it, but it would be nice
to have data to back it up. It simplifies expiration update and removes
another shared data structure, which is nice. However to accept the change
we must ensure that the ct sweep is within acceptable bounds.

How does the two solution compares regarding:

  * expiration latency (delta between expected expiration time vs. actual).
  * Memory consumption: depending on the structure, it's one additional node 
per conn.
    For a list, that represents 2x64 bits.
    Not that much, but that's a benefit of removing the exp list.
  * CPU utilization / average cycles per conn: how heavy is it on the CPU to
    iterate a CMAP vs. a linked-list?
    CMAPs are heavy in coherency traffic due to constant atomic reads, vs. a 
list
    that is mostly pointer dereference.

Maybe a micro-benchmark in tests/test-conntrack.c for the aging process could 
help
measure those metrics isolated from the rest of OVS.

That seems quite a bit of additional work, maybe you would be able to provide 
relevant
comparison without writing this additional benchmark.
On the other hand, maybe it only consists in inserting N conns, artificially 
advancing
the internal OVS clock by the timeout and measuring the behavior. I'm not sure.

I measured the time to insert 1M connections with your patchset, my ct-scale 
series and
the current baseline:

n-thread\version   baseline  ct-scale  ct-bucket
               1    831 ms    737 ms    715 ms
               2   1177 ms    882 ms    867 ms
               3   1524 ms   1000 ms   1012 ms
               4   2595 ms   1128 ms   1139 ms

So overall at least on insertion in conntrack_execute, it seem your solution is
pretty much on par with the series I proposed, and both are improvements 
compared to current baseline.

Scaling is not yet linear though, and the mutex locking still appears in 
profiling with
multiple threads.

> Every bucket has its local lock that needs to be acquired every time a
> node has to be removed/inserted from/to the cmap.
> This means that, in case the hash of the CT_DIR_FWD key differs from
> the one of the CT_DIR_REV, we can end up having the reference of the
> two key nodes in different buckets, and consequently acquiring two locks
> (one per bucket).
> This approach may be handy in different ways, depending on the way the
> stale connection removal gets designed. The attempt of this patch is
> to remove the expiration lists, removing the stale entries mostly in
> two ways:
>
> - during the key lookup
> - when the sweeper task wakes up
>
> the first case is not very strict, as we remove only expired entries
> with the same hash. To increase its effectiveness, we should probably
> increase the number of buckets and replace the cmaps with other data
> structures like rcu lists.

Did you try several number of buckets?
I see 2^10 being used now, it seems a bit large to me. Does it provide tangible
benefits? Did you measure contention vs. 512, or 256 buckets?

Replacing a CMAP by N rculists seems to be re-implementing a sharded CMAP with
one lock per internal bucket in a sense. However the bucket array would be 
static
and would now resize with the CMAP.

It would result in a poorly implemented hash-map with integrated
locks and static number of buckets. It should either be possible to find a 
solution
with existing structure, or a new structure should be instead cleanly designed.

> The sweeper task instead takes charge of the remaining stale entries
> removal. The heuristics used in the sweeper task are mostly an
> example, but could be modified to match any possible uncovered use
> case.

Which metric did you use to choose the new heuristics for the cleanup?

Otherwise I have a few remarks or suggestions below.

>
> Signed-off-by: Paolo Valerio <[email protected]>
> ---
> The cover letter includes further details.
> ---
>  lib/conntrack-private.h |   34 +++
>  lib/conntrack-tp.c      |   42 ----
>  lib/conntrack.c         |  461 
> +++++++++++++++++++++++++++++++----------------
>  tests/system-traffic.at |    5 -
>  4 files changed, 331 insertions(+), 211 deletions(-)
>
> diff --git a/lib/conntrack-private.h b/lib/conntrack-private.h
> index ea5ba3d9e..a89ff96fa 100644
> --- a/lib/conntrack-private.h
> +++ b/lib/conntrack-private.h
> @@ -95,6 +95,7 @@ struct alg_exp_node {
> 
>  struct conn_key_node {
>      struct conn_key key;
> +    uint32_t key_hash;

Caching the key hash is necessary, for quick lookup and deletion
in the cmap.

Unfortunately, as you need to use hash_key_scale() to find the corresponding
bucket, you are unable to use the OVS_ACQUIRES / OVS_RELEASES marks on 
buckets_{lock,unlock}(), which throws off clang's safety analysis.

This safety analysis is very useful. The basic workaround would be to mark 
those functions
with OVS_NO_THREAD_SAFETY_ANALYSIS but it is not a good solution as it will 
mask issues.

Instead, a possible way is to store there the key_bucket, which will be the
bucket index corresponding to this key_hash. Then:

buckets_lock(ct, p1, p2)
   OVS_ACQUIRES(ct->ct_buckets[p1].lock, ct->ct_buckets[p2].lock)

buckets_unlock(ct, p1, p2)
   OVS_RELEASES(ct->ct_buckets[p1].lock, ct->ct_buckets[p2].lock)

where p1 == conn->key_node[0].key_bucket;
(by the way, those functions might be better renamed ct_bucket_lock() I think).

Not sure this solution would work, but I think it's definitely worth trying to 
find one.
If you select CC=clang with ./configure, -Wthread-safety should be 
automatically used.

>      struct cmap_node cm_node;
>  };
> 
> @@ -102,7 +103,6 @@ struct conn {
>      /* Immutable data. */
>      struct conn_key_node key_node[CT_DIR_MAX];
>      struct conn_key parent_key; /* Only used for orig_tuple support. */
> -    struct ovs_list exp_node;
> 
>      uint16_t nat_action;
>      char *alg;
> @@ -121,7 +121,9 @@ struct conn {
>      /* Mutable data. */
>      bool seq_skew_dir; /* TCP sequence skew direction due to NATTing of FTP
>                          * control messages; true if reply direction. */
> -    bool cleaned; /* True if cleaned from expiry lists. */
> +    atomic_flag cleaned; /* True if the entry was stale and one of the
> +                          * cleaner (i.e. packet path or sweeper) took
> +                          * charge of it. */
> 
>      /* Immutable data. */
>      bool alg_related; /* True if alg data connection. */
> @@ -192,10 +194,25 @@ enum ct_timeout {
>      N_CT_TM
>  };
> 
> -struct conntrack {
> -    struct ovs_mutex ct_lock; /* Protects 2 following fields. */
> +#define CONNTRACK_BUCKETS_SHIFT 10
> +#define CONNTRACK_BUCKETS (1 << CONNTRACK_BUCKETS_SHIFT)
> +
> +struct ct_bucket {
> +    /* Protects 'conns'. In case of natted conns, there's a high
> +     * chance that the forward and the reverse key stand in different
> +     * buckets. buckets_lock() should be the preferred way to acquire
> +     * these locks (unless otherwise needed), as it deals with the
> +     * acquisition order. */
> +    struct ovs_mutex lock;
> +    /* Contains the connections in the bucket, indexed by
> +     * 'struct conn_key'. */
>      struct cmap conns OVS_GUARDED;
> -    struct ovs_list exp_lists[N_CT_TM] OVS_GUARDED;
> +};
> +
> +struct conntrack {
> +    struct ct_bucket buckets[CONNTRACK_BUCKETS];
> +    unsigned int next_bucket;
> +    struct ovs_mutex ct_lock;
>      struct cmap zone_limits OVS_GUARDED;
>      struct cmap timeout_policies OVS_GUARDED;
>      uint32_t hash_basis; /* Salt for hashing a connection key. */
> @@ -220,9 +237,10 @@ struct conntrack {
>  };
> 
>  /* Lock acquisition order:
> - *    1. 'ct_lock'
> - *    2. 'conn->lock'
> - *    3. 'resources_lock'
> + *    1. 'buckets[p1]->lock'
> + *    2  'buckets[p2]->lock' (with p1 < p2)
> + *    3. 'conn->lock'
> + *    4. 'resources_lock'
>   */
> 
>  extern struct ct_l4_proto ct_proto_tcp;
> diff --git a/lib/conntrack-tp.c b/lib/conntrack-tp.c
> index 9ecb06978..117810528 100644
> --- a/lib/conntrack-tp.c
> +++ b/lib/conntrack-tp.c
> @@ -236,27 +236,6 @@ tm_to_ct_dpif_tp(enum ct_timeout tm)
>      return CT_DPIF_TP_ATTR_MAX;
>  }
> 
> -static void
> -conn_update_expiration__(struct conntrack *ct, struct conn *conn,
> -                         enum ct_timeout tm, long long now,
> -                         uint32_t tp_value)
> -    OVS_REQUIRES(conn->lock)
> -{
> -    ovs_mutex_unlock(&conn->lock);
> -
> -    ovs_mutex_lock(&ct->ct_lock);
> -    ovs_mutex_lock(&conn->lock);
> -    if (!conn->cleaned) {
> -        conn->expiration = now + tp_value * 1000;
> -        ovs_list_remove(&conn->exp_node);
> -        ovs_list_push_back(&ct->exp_lists[tm], &conn->exp_node);
> -    }
> -    ovs_mutex_unlock(&conn->lock);
> -    ovs_mutex_unlock(&ct->ct_lock);
> -
> -    ovs_mutex_lock(&conn->lock);
> -}
> -
>  /* The conn entry lock must be held on entry and exit. */
>  void
>  conn_update_expiration(struct conntrack *ct, struct conn *conn,
> @@ -266,42 +245,25 @@ conn_update_expiration(struct conntrack *ct, 
> struct conn *conn,
>      struct timeout_policy *tp;
>      uint32_t val;
> 
> -    ovs_mutex_unlock(&conn->lock);
> -
> -    ovs_mutex_lock(&ct->ct_lock);
> -    ovs_mutex_lock(&conn->lock);
>      tp = timeout_policy_lookup(ct, conn->tp_id);
>      if (tp) {
>          val = tp->policy.attrs[tm_to_ct_dpif_tp(tm)];
>      } else {
>          val = ct_dpif_netdev_tp_def[tm_to_ct_dpif_tp(tm)];
>      }
> -    ovs_mutex_unlock(&conn->lock);
> -    ovs_mutex_unlock(&ct->ct_lock);
> 
> -    ovs_mutex_lock(&conn->lock);
>      VLOG_DBG_RL(&rl, "Update timeout %s zone=%u with policy id=%d "
>                  "val=%u sec.",
>                  ct_timeout_str[tm], conn->key_node[CT_DIR_FWD].key.zone,
>                  conn->tp_id, val);
> 
> -    conn_update_expiration__(ct, conn, tm, now, val);
> -}
> -
> -static void
> -conn_init_expiration__(struct conntrack *ct, struct conn *conn,
> -                       enum ct_timeout tm, long long now,
> -                       uint32_t tp_value)
> -{
> -    conn->expiration = now + tp_value * 1000;
> -    ovs_list_push_back(&ct->exp_lists[tm], &conn->exp_node);
> +    conn->expiration = now + val * 1000;
>  }
> 
>  /* ct_lock must be held. */
>  void
>  conn_init_expiration(struct conntrack *ct, struct conn *conn,
>                       enum ct_timeout tm, long long now)
> -    OVS_REQUIRES(ct->ct_lock)
>  {
>      struct timeout_policy *tp;
>      uint32_t val;
> @@ -317,5 +279,5 @@ conn_init_expiration(struct conntrack *ct, struct 
> conn *conn,
>                  ct_timeout_str[tm], 
> conn->key_node[CT_DIR_FWD].key.zone,
>                  conn->tp_id, val);
> 
> -    conn_init_expiration__(ct, conn, tm, now, val);
> +    conn->expiration = now + val * 1000;
>  }
> diff --git a/lib/conntrack.c b/lib/conntrack.c
> index a284c57c0..1c019af29 100644
> --- a/lib/conntrack.c
> +++ b/lib/conntrack.c

At the beginning of the file, there is a remaining reference to a stats
that you otherwise removed:

@@ -44,7 +44,6 @@
 VLOG_DEFINE_THIS_MODULE(conntrack);

 COVERAGE_DEFINE(conntrack_full);
-COVERAGE_DEFINE(conntrack_long_cleanup);
 COVERAGE_DEFINE(conntrack_l3csum_err);
 COVERAGE_DEFINE(conntrack_l4csum_err);
 COVERAGE_DEFINE(conntrack_lookup_natted_miss);
@@ -560,6 +559,7 @@ conn_out_found:

> @@ -85,9 +85,12 @@ struct zone_limit {
>      struct conntrack_zone_limit czl;
>  };
> 
> +static unsigned hash_scale(uint32_t hash);
> +static void conn_clean(struct conntrack *ct, struct conn *conn);
>  static bool conn_key_extract(struct conntrack *, struct dp_packet *,
>                               ovs_be16 dl_type, struct conn_lookup_ctx *,
>                               uint16_t zone);
> +static uint32_t cached_key_hash(struct conn_key_node *n);

This function made me think something was being computed,
while it's only reading the key_node field. I think it's worth
just directly accessing it.

>  static uint32_t conn_key_hash(const struct conn_key *, uint32_t basis);
>  static void conn_key_reverse(struct conn_key *);
>  static bool valid_new(struct dp_packet *pkt, struct conn_key *);
> @@ -109,8 +112,9 @@ static void set_label(struct dp_packet *, struct conn *,
>  static void *clean_thread_main(void *f_);
> 
>  static bool
> -nat_get_unique_tuple(struct conntrack *ct, struct conn *conn,
> -                     const struct nat_action_info_t *nat_info);
> +nat_get_unique_tuple_lock(struct conntrack *ct, struct conn *conn,
> +                          const struct nat_action_info_t *nat_info,
> +                          uint32_t *rev_hash);
> 
>  static uint8_t
>  reverse_icmp_type(uint8_t type);
> @@ -249,16 +253,17 @@ conntrack_init(void)
>      ovs_rwlock_unlock(&ct->resources_lock);
> 
>      ovs_mutex_init_adaptive(&ct->ct_lock);
> -    ovs_mutex_lock(&ct->ct_lock);
> -    cmap_init(&ct->conns);
> -    for (unsigned i = 0; i < ARRAY_SIZE(ct->exp_lists); i++) {
> -        ovs_list_init(&ct->exp_lists[i]);
> +
> +    ct->next_bucket = 0;
> +    for (unsigned i = 0; i < CONNTRACK_BUCKETS; i++) {
> +        struct ct_bucket *bucket = &ct->buckets[i];
> +        cmap_init(&bucket->conns);
> +        ovs_mutex_init_recursive(&bucket->lock);

Does it need to be recursive? How does it happen?
I think this mutex could be made adaptive, it might be worth profiling 
contention
and checking if it helps or not.

>      }
> +
>      cmap_init(&ct->zone_limits);
>      ct->zone_limit_seq = 0;
>      timeout_policy_init(ct);
> -    ovs_mutex_unlock(&ct->ct_lock);
> -
>      atomic_count_init(&ct->n_conn, 0);
>      atomic_init(&ct->n_conn_limit, DEFAULT_N_CONN_LIMIT);
>      atomic_init(&ct->tcp_seq_chk, true);
> @@ -410,9 +415,9 @@ zone_limit_delete(struct conntrack *ct, uint16_t zone)
>  }
> 
>  static void
> -conn_clean(struct conntrack *ct, struct conn *conn)
> -    OVS_REQUIRES(ct->ct_lock)
> +conn_clean__(struct conntrack *ct, struct conn *conn)
>  {
> +    struct ct_bucket *bucket;
>      struct zone_limit *zl;
>      uint32_t hash;
> 
> @@ -420,8 +425,9 @@ conn_clean(struct conntrack *ct, struct conn *conn)
>          expectation_clean(ct, &conn->key_node[CT_DIR_FWD].key);
>      }
> 
> -    hash = conn_key_hash(&conn->key_node[CT_DIR_FWD].key, ct->hash_basis);
> -    cmap_remove(&ct->conns, &conn->key_node[CT_DIR_FWD].cm_node, hash);
> +    hash = cached_key_hash(&conn->key_node[CT_DIR_FWD]);
> +    bucket = &ct->buckets[hash_scale(hash)];
> +    cmap_remove(&bucket->conns, &conn->key_node[CT_DIR_FWD].cm_node, hash);
> 
>      zl = zone_limit_lookup(ct, conn->admit_zone);
>      if (zl && zl->czl.zone_limit_seq == conn->zone_limit_seq) {
> @@ -429,12 +435,10 @@ conn_clean(struct conntrack *ct, struct conn *conn)
>      }
> 
>      if (conn->nat_action) {
> -        hash = conn_key_hash(&conn->key_node[CT_DIR_REV].key,
> -                             ct->hash_basis);
> -        cmap_remove(&ct->conns, &conn->key_node[CT_DIR_REV].cm_node, hash);
> +        hash = cached_key_hash(&conn->key_node[CT_DIR_REV]);
> +        bucket = &ct->buckets[hash_scale(hash)];
> +        cmap_remove(&bucket->conns, &conn->key_node[CT_DIR_REV].cm_node, 
> hash);
>      }
> -    ovs_list_remove(&conn->exp_node);
> -    conn->cleaned = true;
>      ovsrcu_postpone(delete_conn, conn);
>      atomic_count_dec(&ct->n_conn);
>  }
> @@ -446,22 +450,35 @@ void
>  conntrack_destroy(struct conntrack *ct)
>  {
>      struct conn_key_node *keyn;
> +    struct ct_bucket *bucket;
>      struct conn *conn;
> +    int i;
> 
>      latch_set(&ct->clean_thread_exit);
>      pthread_join(ct->clean_thread, NULL);
>      latch_destroy(&ct->clean_thread_exit);
> 
> -    ovs_mutex_lock(&ct->ct_lock);
> -    CMAP_FOR_EACH (keyn, cm_node, &ct->conns) {
> -        if (keyn->key.dir != CT_DIR_FWD) {
> -            continue;
> +    for (i = 0; i < CONNTRACK_BUCKETS; i++) {
> +        bucket = &ct->buckets[i];
> +        CMAP_FOR_EACH (keyn, cm_node, &bucket->conns) {
> +            if (keyn->key.dir != CT_DIR_FWD) {
> +                continue;
> +            }
> +            conn = CONTAINER_OF(keyn, struct conn, key_node[keyn->key.dir]);
> +            conn_clean(ct, conn);
>          }
> -        conn = CONTAINER_OF(keyn, struct conn, key_node[keyn->key.dir]);
> -        conn_clean(ct, conn);
>      }
> -    cmap_destroy(&ct->conns);
> 
> +    /* XXX: we need this loop because connections may be in multiple
> +     * buckets.  The former loop should probably use conn_clean__()
> +     * or an unlocked version of conn_clean(). */
> +    for (i = 0; i < CONNTRACK_BUCKETS; i++) {
> +        bucket = &ct->buckets[i];
> +        ovs_mutex_destroy(&bucket->lock);
> +        cmap_destroy(&ct->buckets[i].conns);
> +    }
> +
> +    ovs_mutex_lock(&ct->ct_lock);
>      struct zone_limit *zl;
>      CMAP_FOR_EACH (zl, node, &ct->zone_limits) {
>          uint32_t hash = zone_key_hash(zl->czl.zone, ct->hash_basis);
> @@ -498,45 +515,108 @@ conntrack_destroy(struct conntrack *ct)
>  }
> 
> 
> +static unsigned hash_scale(uint32_t hash)
> +{
> +    return (hash >> (32 - CONNTRACK_BUCKETS_SHIFT)) % CONNTRACK_BUCKETS;
> +}
> +

I am not sure I understand this.

You have a hash of width 32 bits.
You shift right by 22 bits (10 bits remaining).
Then you use modulo with a power of two, effectively doing bitwise & on the
10 LSBs, which seems redundant at this point.

Is there a reason to prefer the MSBs to the LSBs? Is there a specific property 
of the hash function that requires it?

>  static bool
> -conn_key_lookup(struct conntrack *ct, const struct conn_key *key,
> -                uint32_t hash, long long now, struct conn **conn_out,
> +conn_key_lookup(struct conntrack *ct, unsigned bucket,
> +                const struct conn_key *key, uint32_t hash,
> +                long long now, struct conn **conn_out,
>                  bool *reply)
>  {
> +    struct ct_bucket *ctb = &ct->buckets[bucket];
>      struct conn_key_node *keyn;
> -    struct conn *conn = NULL;
>      bool found = false;
> +    struct conn *conn;
> 
> -    CMAP_FOR_EACH_WITH_HASH (keyn, cm_node, hash, &ct->conns) {
> +    CMAP_FOR_EACH_WITH_HASH (keyn, cm_node, hash, &ctb->conns) {
>          conn = CONTAINER_OF(keyn, struct conn, key_node[keyn->key.dir]);
> +        if (conn_expired(conn, now)) {
> +            conn_clean(ct, conn);
> +            continue;
> +        }
> +
>          for (int i = CT_DIR_FWD; i < CT_DIR_MAX; i++) {
> -            if (!conn_key_cmp(&conn->key_node[i].key, key) &&
> -                !conn_expired(conn, now)) {
> +            if (!conn_key_cmp(&conn->key_node[i].key, key)) {
>                  found = true;
>                  if (reply) {
>                      *reply = i;
>                  }
> -                goto out_found;
> +
> +                goto conn_out_found;
>              }
>          }
>      }
> 
> -out_found:
> -    if (found && conn_out) {
> -        *conn_out = conn;
> -    } else if (conn_out) {
> -        *conn_out = NULL;
> +conn_out_found:
> +    if (conn_out) {
> +        *conn_out = found ? conn : NULL;
>      }
> 
>      return found;
>  }
> 
> +static void
> +buckets_unlock(struct conntrack *ct, uint32_t h1, uint32_t h2)

ct_buckets_unlock?

> +{
> +    unsigned p1 = hash_scale(h1),
> +        p2 = hash_scale(h2);
> +
> +    if (p1 > p2) {

Keeping the comparison the same as in buckets_lock permits to see at a glance
that the locking order is the reverse of their precedence for unlock, which
IMO helps seeing that it is correctly done.

So I think (p1 < p2) there and then swapping p1 and p2 below would be easier to 
read.

> +        ovs_mutex_unlock(&ct->buckets[p1].lock);
> +        ovs_mutex_unlock(&ct->buckets[p2].lock);
> +    } else if (p1 < p2) {
> +        ovs_mutex_unlock(&ct->buckets[p2].lock);
> +        ovs_mutex_unlock(&ct->buckets[p1].lock);
> +    } else {
> +        ovs_mutex_unlock(&ct->buckets[p1].lock);
> +    }
> +}
> +
> +/* Acquires both locks in an ordered way. */
> +static void
> +buckets_lock(struct conntrack *ct, uint32_t h1, uint32_t h2)
> +{
> +    unsigned p1 = hash_scale(h1),
> +        p2 = hash_scale(h2);
> +
> +    if (p1 < p2) {
> +        ovs_mutex_lock(&ct->buckets[p1].lock);
> +        ovs_mutex_lock(&ct->buckets[p2].lock);
> +    } else if (p1 > p2) {
> +        ovs_mutex_lock(&ct->buckets[p2].lock);
> +        ovs_mutex_lock(&ct->buckets[p1].lock);
> +    } else {
> +        ovs_mutex_lock(&ct->buckets[p1].lock);
> +    }
> +}
> +
> +static void
> +conn_clean(struct conntrack *ct, struct conn *conn)
> +{
> +    uint32_t h1, h2;
> +
> +    if (atomic_flag_test_and_set(&conn->cleaned)) {
> +        return;
> +    }
> +
> +    h1 = cached_key_hash(&conn->key_node[CT_DIR_FWD]);
> +    h2 = cached_key_hash(&conn->key_node[CT_DIR_REV]);
> +    buckets_lock(ct, h1, h2);
> +    conn_clean__(ct, conn);
> +    buckets_unlock(ct, h1, h2);
> +}
> +
>  static bool
>  conn_lookup(struct conntrack *ct, const struct conn_key *key,
>              long long now, struct conn **conn_out, bool *reply)
>  {
>      uint32_t hash = conn_key_hash(key, ct->hash_basis);
> -    return conn_key_lookup(ct, key, hash, now, conn_out, reply);
> +    unsigned bucket = hash_scale(hash);
> +
> +    return conn_key_lookup(ct, bucket, key, hash, now, conn_out, reply);
>  }
> 
>  static void
> @@ -944,7 +1024,6 @@ conn_not_found(struct conntrack *ct, struct dp_packet 
> *pkt,
>                 const struct nat_action_info_t *nat_action_info,
>                 const char *helper, const struct alg_exp_node *alg_exp,
>                 enum ct_alg_ctl_type ct_alg_ctl, uint32_t tp_id)
> -    OVS_REQUIRES(ct->ct_lock)
>  {
>      struct conn *nc = NULL;
>      uint32_t rev_hash = ctx->hash;
> @@ -954,6 +1033,8 @@ conn_not_found(struct conntrack *ct, struct dp_packet 
> *pkt,
>          return nc;
>      }
> 
> +    /* XXX: We are unlocked here, so we don't know
> +     * if the tuple already exists in the table. */

Can you expand more on the issue here?
Shouldn't we be locked for the lookup?

>      pkt->md.ct_state = CS_NEW;
> 
>      if (alg_exp) {
> @@ -961,10 +1042,11 @@ conn_not_found(struct conntrack *ct, struct 
> dp_packet *pkt,
>      }
> 
>      if (commit) {
> -        struct conn_key_node *fwd_key_node, *rev_key_node;
> -
>          struct zone_limit *zl = zone_limit_lookup_or_default(ct,
>                                                               
> ctx->key.zone);
> +        struct conn_key_node *fwd_key_node, *rev_key_node;
> +        bool handle_tuple = false;
> +
>          if (zl && atomic_count_get(&zl->czl.count) >= zl->czl.limit) {
>              return nc;
>          }
> @@ -1007,22 +1089,40 @@ conn_not_found(struct conntrack *ct, struct 
> dp_packet *pkt,
>                      nc->nat_action = NAT_ACTION_DST;
>                  }
>              } else {
> -                bool nat_res = nat_get_unique_tuple(ct, nc, 
> nat_action_info);
> -
> -                if (!nat_res) {
> -                    goto nat_res_exhaustion;
> -                }
> +                handle_tuple = true;
>              }
> -
> -            nat_packet(pkt, nc, ctx->icmp_related);
> -            rev_hash = conn_key_hash(&rev_key_node->key, 
> ct->hash_basis);
> -            rev_key_node->key.dir = CT_DIR_REV;
> -            cmap_insert(&ct->conns, &rev_key_node->cm_node, rev_hash);
>          }
> 
>          ovs_mutex_init_adaptive(&nc->lock);
> +        atomic_flag_clear(&nc->cleaned);
>          fwd_key_node->key.dir = CT_DIR_FWD;
> -        cmap_insert(&ct->conns, &fwd_key_node->cm_node, ctx->hash);
> +        rev_key_node->key.dir = CT_DIR_REV;
> +
> +        if (handle_tuple) {
> +            bool nat_res = nat_get_unique_tuple_lock(ct, nc, nat_action_info,
> +                                                     &rev_hash);
> +
> +            if (!nat_res) {
> +                goto out_error;
> +            }
> +        } else {
> +            rev_hash = conn_key_hash(&rev_key_node->key, ct->hash_basis);
> +            buckets_lock(ct, ctx->hash, rev_hash);
> +        }
> +
> +        if (conn_lookup(ct, &ctx->key, now, NULL, NULL)) {
> +            goto out_error_unlock;
> +        }
> +
> +        fwd_key_node->key_hash = ctx->hash;
> +        cmap_insert(&ct->buckets[hash_scale(ctx->hash)].conns,
> +                    &fwd_key_node->cm_node, ctx->hash);
> +        if (nat_action_info) {
> +            rev_key_node->key_hash = rev_hash;
> +            cmap_insert(&ct->buckets[hash_scale(rev_hash)].conns,
> +                        &rev_key_node->cm_node, rev_hash);
> +            nat_packet(pkt, nc, ctx->icmp_related);
> +        }
> 
>          atomic_count_inc(&ct->n_conn);
>          ctx->conn = nc; /* For completeness. */
> @@ -1033,21 +1133,23 @@ conn_not_found(struct conntrack *ct, struct 
> dp_packet *pkt,
>          } else {
>              nc->admit_zone = INVALID_ZONE;
>          }
> +        buckets_unlock(ct, ctx->hash, rev_hash);
>      }
> 
>      return nc;
> 
> +out_error_unlock:
> +    buckets_unlock(ct, ctx->hash, rev_hash);
>      /* This would be a user error or a DOS attack.  A user error is prevented
>       * by allocating enough combinations of NAT addresses when combined with
>       * ephemeral ports.  A DOS attack should be protected against with
>       * firewall rules or a separate firewall.  Also using zone partitioning
>       * can limit DoS impact. */
> -nat_res_exhaustion:
> -    ovs_list_remove(&nc->exp_node);
> +out_error:
> +    ovs_mutex_destroy(&nc->lock);
>      delete_conn_cmn(nc);
>      static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
> -    VLOG_WARN_RL(&rl, "Unable to NAT due to tuple space exhaustion - "
> -                 "if DoS attack, use firewalling and/or zone partitioning.");
> +    VLOG_WARN_RL(&rl, "Unable to insert a new connection.");
>      return NULL;
>  }
> 
> @@ -1082,12 +1184,7 @@ conn_update_state(struct conntrack *ct, struct 
> dp_packet *pkt,
>              pkt->md.ct_state = CS_INVALID;
>              break;
>          case CT_UPDATE_NEW:
> -            ovs_mutex_lock(&ct->ct_lock);
> -            if (conn_lookup(ct, &conn->key_node[CT_DIR_FWD].key,
> -                            now, NULL, NULL)) {
> -                conn_clean(ct, conn);
> -            }
> -            ovs_mutex_unlock(&ct->ct_lock);
> +            conn_clean(ct, conn);
>              create_new_conn = true;
>              break;
>          case CT_UPDATE_VALID_NEW:
> @@ -1253,6 +1350,8 @@ static void
>  initial_conn_lookup(struct conntrack *ct, struct conn_lookup_ctx *ctx,
>                      long long now, bool natted)
>  {
> +    unsigned bucket = hash_scale(ctx->hash);
> +
>      if (natted) {
>          /* If the packet has been already natted (e.g. a previous
>           * action took place), retrieve it performing a lookup of its
> @@ -1260,7 +1359,8 @@ initial_conn_lookup(struct conntrack *ct, struct 
> conn_lookup_ctx *ctx,
>          conn_key_reverse(&ctx->key);
>      }
> 
> -    conn_key_lookup(ct, &ctx->key, ctx->hash, now, &ctx->conn, &ctx->reply);
> +    conn_key_lookup(ct, bucket, &ctx->key, ctx->hash,
> +                    now, &ctx->conn, &ctx->reply);
> 
>      if (natted) {
>          if (OVS_LIKELY(ctx->conn)) {
> @@ -1287,24 +1387,20 @@ process_one(struct conntrack *ct, struct dp_packet 
> *pkt,
>              ovs_be16 tp_src, ovs_be16 tp_dst, const char *helper,
>              uint32_t tp_id)
>  {
> +    bool create_new_conn = false;
> +
>      /* Reset ct_state whenever entering a new zone. */
>      if (pkt->md.ct_state && pkt->md.ct_zone != zone) {
>          pkt->md.ct_state = 0;
>      }
> 
> -    bool create_new_conn = false;
>      initial_conn_lookup(ct, ctx, now, !!(pkt->md.ct_state &
>                                           (CS_SRC_NAT | CS_DST_NAT)));
>      struct conn *conn = ctx->conn;
> 
>      /* Delete found entry if in wrong direction. 'force' implies commit. */
>      if (OVS_UNLIKELY(force && ctx->reply && conn)) {
> -        ovs_mutex_lock(&ct->ct_lock);
> -        if (conn_lookup(ct, &conn->key_node[CT_DIR_FWD].key,
> -                        now, NULL, NULL)) {
> -            conn_clean(ct, conn);
> -        }
> -        ovs_mutex_unlock(&ct->ct_lock);
> +        conn_clean(ct, conn);
>          conn = NULL;
>      }
> 
> @@ -1338,7 +1434,6 @@ process_one(struct conntrack *ct, struct dp_packet *pkt,
>      struct alg_exp_node alg_exp_entry;
> 
>      if (OVS_UNLIKELY(create_new_conn)) {
> -
>          ovs_rwlock_rdlock(&ct->resources_lock);
>          alg_exp = expectation_lookup(&ct->alg_expectations, &ctx->key,
>                                       ct->hash_basis,
> @@ -1349,12 +1444,9 @@ process_one(struct conntrack *ct, struct dp_packet 
> *pkt,
>          }
>          ovs_rwlock_unlock(&ct->resources_lock);
> 
> -        ovs_mutex_lock(&ct->ct_lock);
> -        if (!conn_lookup(ct, &ctx->key, now, NULL, NULL)) {
> -            conn = conn_not_found(ct, pkt, ctx, commit, now, nat_action_info,
> -                                  helper, alg_exp, ct_alg_ctl, tp_id);
> -        }
> -        ovs_mutex_unlock(&ct->ct_lock);
> +        conn = conn_not_found(ct, pkt, ctx, commit, now,
> +                              nat_action_info, helper, alg_exp,
> +                              ct_alg_ctl, tp_id);
>      }
> 
>      write_ct_md(pkt, zone, conn, &ctx->key, alg_exp);
> @@ -1467,83 +1559,92 @@ set_label(struct dp_packet *pkt, struct conn *conn,
>  }
> 
> 
> -/* Delete the expired connections from 'ctb', up to 'limit'. Returns the
> - * earliest expiration time among the remaining connections in 'ctb'.  
> Returns
> - * LLONG_MAX if 'ctb' is empty.  The return value might be smaller than 
> 'now',
> - * if 'limit' is reached */
> +/* Delete the expired connections from 'bucket', up to 'limit'.
> + * Returns the earliest expiration time among the remaining
> + * connections in 'bucket'.  Returns LLONG_MAX if 'bucket' is empty.
> + * The return value might be smaller than 'now', if 'limit' is
> + * reached. */
>  static long long
> -ct_sweep(struct conntrack *ct, long long now, size_t limit)
> +sweep_bucket(struct conntrack *ct, struct ct_bucket *bucket,
> +             long long now)
>  {
> -    struct conn *conn, *next;
> -    long long min_expiration = LLONG_MAX;
> -    size_t count = 0;
> +    struct conn_key_node *keyn;
> +    unsigned int conn_count = 0;
> +    struct conn *conn;
> +    long long expiration;
> 
> -    ovs_mutex_lock(&ct->ct_lock);
> +    CMAP_FOR_EACH (keyn, cm_node, &bucket->conns) {
> +        if (keyn->key.dir != CT_DIR_FWD) {
> +            continue;
> +        }
> 
> -    for (unsigned i = 0; i < N_CT_TM; i++) {
> -        LIST_FOR_EACH_SAFE (conn, next, exp_node, &ct->exp_lists[i]) {
> -            ovs_mutex_lock(&conn->lock);
> -            if (now < conn->expiration || count >= limit) {
> -                min_expiration = MIN(min_expiration, conn->expiration);
> -                ovs_mutex_unlock(&conn->lock);
> -                if (count >= limit) {
> -                    /* Do not check other lists. */
> -                    COVERAGE_INC(conntrack_long_cleanup);
> -                    goto out;
> -                }
> -                break;
> -            } else {
> -                ovs_mutex_unlock(&conn->lock);
> -                conn_clean(ct, conn);
> -            }
> -            count++;
> +        conn = CONTAINER_OF(keyn, struct conn, key_node[keyn->key.dir]);
> +        ovs_mutex_lock(&conn->lock);
> +        expiration = conn->expiration;
> +        ovs_mutex_unlock(&conn->lock);

Atomic expiration might be worth considering as well.

> +
> +        if (now >= expiration) {
> +            conn_clean(ct, conn);
>          }
> +
> +        conn_count++;
>      }
> 
> -out:
> -    VLOG_DBG("conntrack cleanup %"PRIuSIZE" entries in %lld msec", count,
> -             time_msec() - now);
> -    ovs_mutex_unlock(&ct->ct_lock);
> -    return min_expiration;
> +    return conn_count;
>  }
> 
> -/* Cleans up old connection entries from 'ct'.  Returns the time when the
> - * next expiration might happen.  The return value might be smaller than
> - * 'now', meaning that an internal limit has been reached, and some expired
> - * connections have not been deleted. */
> +/* Cleans up old connection entries from 'ct'.  Returns the the next
> + * wake up time.  The return value might be smaller than 'now', meaning
> + * that an internal limit has been reached, that is, the table
> + * hasn't been entirely scanned. */
>  static long long
>  conntrack_clean(struct conntrack *ct, long long now)
>  {
> -    unsigned int n_conn_limit;
> +    long long next_wakeup = now + 90 * 1000;
> +    unsigned int n_conn_limit, i, count = 0;
> +    size_t clean_end;
> +
>      atomic_read_relaxed(&ct->n_conn_limit, &n_conn_limit);
> -    size_t clean_max = n_conn_limit > 10 ? n_conn_limit / 10 : 1;
> -    long long min_exp = ct_sweep(ct, now, clean_max);
> -    long long next_wakeup = MIN(min_exp, now + CT_DPIF_NETDEV_TP_MIN);
> +    clean_end = n_conn_limit / 64;

I don't know if the change in heuristics is better or not here.
Which metrics should be considered relevant? CPU load? Latency?

> +
> +    for (i = ct->next_bucket; i < CONNTRACK_BUCKETS; i++) {
> +        struct ct_bucket *bucket = &ct->buckets[i];
> +
> +        count += sweep_bucket(ct, bucket, now);
> +
> +        if (count > clean_end) {
> +            next_wakeup = 0;
> +            break;
> +        }
> +    }
> +
> +    ct->next_bucket = (i < CONNTRACK_BUCKETS) ? i : 0;
> 
>      return next_wakeup;
>  }
> 
>  /* Cleanup:
>   *
> - * We must call conntrack_clean() periodically.  conntrack_clean() return
> - * value gives an hint on when the next cleanup must be done (either because
> - * there is an actual connection that expires, or because a new connection
> - * might be created with the minimum timeout).
> + * We must call conntrack_clean() periodically.  conntrack_clean()
> + * return value gives an hint on when the next cleanup must be done
> + * (either because there is still work to do, or because a new
> + * connection might be created).
>   *
>   * The logic below has two goals:
>   *
> - * - We want to reduce the number of wakeups and batch connection cleanup
> - *   when the load is not very high.  CT_CLEAN_INTERVAL ensures that if we
> - *   are coping with the current cleanup tasks, then we wait at least
> - *   5 seconds to do further cleanup.
> + * - When the load is high, we want to avoid to hog the CPU scanning
> + *   all the buckets and their respective CMAPs "at once". For this
> + *   reason, every batch cleanup aims to scan at most n_conn_limit /
> + *   64 entries (more if the buckets contains many entrie) before
> + *   yielding the CPU. In this case, the next wake up will happen in
> + *   CT_CLEAN_MIN_INTERVAL_MS and the scan will resume starting from
> + *   the first bucket not scanned.
>   *
> - * - We don't want to keep the map locked too long, as we might prevent
> - *   traffic from flowing.  CT_CLEAN_MIN_INTERVAL ensures that if cleanup is
> - *   behind, there is at least some 200ms blocks of time when the map will be
> - *   left alone, so the datapath can operate unhindered.
> - */
> -#define CT_CLEAN_INTERVAL 5000 /* 5 seconds */
> -#define CT_CLEAN_MIN_INTERVAL 200  /* 0.2 seconds */
> + * - We also don't want to scan the buckets so frequently, as going
> + *   through all the connections, during high loads, may be costly in
> + *   terms of CPU time. In this case the next wake up is set to 90
> + *   seconds. */
> +#define CT_CLEAN_MIN_INTERVAL_MS 100  /* 0.1 seconds */
> 
>  static void *
>  clean_thread_main(void *f_)
> @@ -1556,9 +1657,9 @@ clean_thread_main(void *f_)
>          next_wake = conntrack_clean(ct, now);
> 
>          if (next_wake < now) {
> -            poll_timer_wait_until(now + CT_CLEAN_MIN_INTERVAL);
> +            poll_timer_wait_until(now + CT_CLEAN_MIN_INTERVAL_MS);
>          } else {
> -            poll_timer_wait_until(MAX(next_wake, now + 
> CT_CLEAN_INTERVAL));
> +            poll_timer_wait_until(next_wake);
>          }
>          latch_wait(&ct->clean_thread_exit);
>          poll_block();
> @@ -2088,6 +2189,12 @@ ct_endpoint_hash_add(uint32_t hash, const struct 
> ct_endpoint *ep)
>      return hash_add_bytes32(hash, (const uint32_t *) ep, sizeof *ep);
>  }
> 
> +static uint32_t
> +cached_key_hash(struct conn_key_node *n)
> +{
> +    return n->key_hash;
> +}
> +
>  /* Symmetric */
>  static uint32_t
>  conn_key_hash(const struct conn_key *key, uint32_t basis)
> @@ -2357,8 +2464,9 @@ next_addr_in_range_guarded(union ct_addr *curr, 
> union ct_addr *min,
>   *
>   * If none can be found, return exhaustion to the caller. */
>  static bool
> -nat_get_unique_tuple(struct conntrack *ct, struct conn *conn,
> -                     const struct nat_action_info_t *nat_info)
> +nat_get_unique_tuple_lock(struct conntrack *ct, struct conn *conn,
> +                          const struct nat_action_info_t *nat_info,
> +                          uint32_t *rev_hash)
>  {
>      union ct_addr min_addr = {0}, max_addr = {0}, curr_addr = {0},
>                    guard_addr = {0};
> @@ -2392,10 +2500,15 @@ another_round:
>                        nat_info->nat_action);
> 
>      if (!pat_proto) {
> +        uint32_t key_hash = conn_key_hash(fwd_key, ct->hash_basis);
> +        *rev_hash = conn_key_hash(rev_key, ct->hash_basis);
> +
> +        buckets_lock(ct, key_hash, *rev_hash);
>          if (!conn_lookup(ct, rev_key,
>                           time_msec(), NULL, NULL)) {
>              return true;
>          }
> +        buckets_unlock(ct, key_hash, *rev_hash);
> 
>          goto next_addr;
>      }
> @@ -2404,10 +2517,15 @@ another_round:
>          rev_key->src.port = htons(curr_dport);
>          FOR_EACH_PORT_IN_RANGE(curr_sport, min_sport, max_sport) {
>              rev_key->dst.port = htons(curr_sport);
> +            uint32_t key_hash = conn_key_hash(fwd_key, ct->hash_basis);
> +            *rev_hash = conn_key_hash(rev_key, ct->hash_basis);
> +
> +            buckets_lock(ct, key_hash, *rev_hash);
>              if (!conn_lookup(ct, rev_key,
>                               time_msec(), NULL, NULL)) {
>                  return true;
>              }
> +            buckets_unlock(ct, key_hash, *rev_hash);
>          }
>      }
> 
> @@ -2615,20 +2733,39 @@ conntrack_dump_next(struct conntrack_dump 
> *dump, struct ct_dpif_entry *entry)
>  {
>      struct conntrack *ct = dump->ct;
>      long long now = time_msec();
> +    struct ct_bucket *bucket;
> 
> -    for (;;) {
> -        struct cmap_node *cm_node = cmap_next_position(&ct->conns,
> -                                                       &dump->cm_pos);
> -        if (!cm_node) {
> -            break;
> +    while (dump->bucket < CONNTRACK_BUCKETS) {
> +        struct cmap_node *cm_node;
> +        bucket = &ct->buckets[dump->bucket];
> +
> +        for (;;) {
> +            cm_node = cmap_next_position(&bucket->conns,
> +                                         &dump->cm_pos);
> +            if (!cm_node) {
> +                break;
> +            }
> +            struct conn_key_node *keyn;
> +            struct conn *conn;
> +            INIT_CONTAINER(keyn, cm_node, cm_node);
> +            conn = CONTAINER_OF(keyn, struct conn, 
> key_node[keyn->key.dir]);
> +
> +            if (conn_expired(conn, now)) {
> +                /* XXX: ideally this should call conn_clean(). */
> +                continue;
> +            }
> +
> +            if ((!dump->filter_zone || keyn->key.zone == dump->zone) &&
> +                (keyn->key.dir == CT_DIR_FWD)) {
> +                conn_to_ct_dpif_entry(conn, entry, now);
> +                break;
> +            }
>          }
> -        struct conn_key_node *keyn;
> -        struct conn *conn;
> -        INIT_CONTAINER(keyn, cm_node, cm_node);
> -        conn = CONTAINER_OF(keyn, struct conn, 
> key_node[keyn->key.dir]);
> -        if ((!dump->filter_zone || keyn->key.zone == dump->zone) &&
> -            (keyn->key.dir == CT_DIR_FWD)) {
> -            conn_to_ct_dpif_entry(conn, entry, now);
> +
> +        if (!cm_node) {
> +            memset(&dump->cm_pos, 0, sizeof dump->cm_pos);
> +            dump->bucket++;
> +        } else {
>              return 0;
>          }
>      }
> @@ -2648,17 +2785,18 @@ conntrack_flush(struct conntrack *ct, const 
> uint16_t *zone)
>      struct conn_key_node *keyn;
>      struct conn *conn;
> 
> -    ovs_mutex_lock(&ct->ct_lock);
> -    CMAP_FOR_EACH (keyn, cm_node, &ct->conns) {
> -        if (keyn->key.dir != CT_DIR_FWD) {
> -            continue;
> -        }
> -        conn = CONTAINER_OF(keyn, struct conn, key_node[keyn->key.dir]);
> -        if (!zone || *zone == keyn->key.zone) {
> -            conn_clean(ct, conn);
> +    for (unsigned i = 0; i < CONNTRACK_BUCKETS; i++) {
> +        CMAP_FOR_EACH (keyn, cm_node, &ct->buckets[i].conns) {
> +            if (keyn->key.dir != CT_DIR_FWD) {
> +                continue;
> +            }
> +
> +            conn = CONTAINER_OF(keyn, struct conn, key_node[keyn->key.dir]);
> +            if (!zone || *zone == keyn->key.zone) {
> +                conn_clean(ct, conn);
> +            }
>          }
>      }
> -    ovs_mutex_unlock(&ct->ct_lock);
> 
>      return 0;
>  }
> @@ -2667,15 +2805,19 @@ int
>  conntrack_flush_tuple(struct conntrack *ct, const struct ct_dpif_tuple 
> *tuple,
>                        uint16_t zone)
>  {
> -    int error = 0;
>      struct conn_key key;
> -    struct conn *conn;
> +    struct conn *conn = NULL;
> +    unsigned bucket;
> +    uint32_t hash;
> +    int error = 0;
> 
>      memset(&key, 0, sizeof(key));
>      tuple_to_conn_key(tuple, zone, &key);
> -    ovs_mutex_lock(&ct->ct_lock);
> -    conn_lookup(ct, &key, time_msec(), &conn, NULL);
> 
> +    hash = conn_key_hash(&key, ct->hash_basis);
> +    bucket = hash_scale(hash);
> +
> +    conn_key_lookup(ct, bucket, &key, hash, time_msec(), &conn, NULL);
>      if (conn) {
>          conn_clean(ct, conn);
>      } else {
> @@ -2683,7 +2825,6 @@ conntrack_flush_tuple(struct conntrack *ct, const 
> struct ct_dpif_tuple *tuple,
>          error = ENOENT;
>      }
> 
> -    ovs_mutex_unlock(&ct->ct_lock);
>      return error;
>  }
> 

Best regards,
-- 
Gaetan Rivet

_______________________________________________
dev mailing list
[email protected]
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to