Hi Sriharsha,

thanks for your patch.

I am pleased to see work in this area. I do however, have some questions
about the implementation. Please see comments inline.

On Sat, Sep 15, 2018 at 09:40:10PM +0530, Sriharsha Basavapatna via dev wrote:
> This is the third patch in the patch-set to support dynamic rebalancing
> of offloaded flows.
> 
> The dynamic rebalancing functionality is implemented in this patch. The
> ukeys that are not scheduled for deletion are obtained and passed as input
> to the rebalancing routine. The rebalancing is done in the context of
> revalidation leader thread, after all other revalidator threads are
> done with gathering rebalancing data for flows.
> 
> For each netdev that is in OOR state, a list of flows - both offloaded
> and non-offloaded (pending) - is obtained using the ukeys. For each netdev
> that is in OOR state, the flows are grouped and sorted into offloaded and
> pending flows.  The offloaded flows are sorted in descending order of
> pps-rate, while pending flows are sorted in ascending order of pps-rate.
> 
> The rebalancing is done in two phases. In the first phase, we try to
> offload all pending flows and if that succeeds, the OOR state on the device
> is cleared. If some (or none) of the pending flows could not be offloaded,
> then we start replacing an offloaded flow that has a lower pps-rate than
> a pending flow, until there are no more pending flows with a higher rate
> than an offloaded flow. The flows that are replaced from the device are
> added into kernel datapath.
> 
> A new OVS configuration parameter "offload-rebalance", is added to ovsdb.
> The default value of this is "false". To enable this feature, set the
> value of this parameter to "true", which provides packets-per-second
> rate based policy to dynamically offload and un-offload flows.
> 
> Note: This option can be enabled only when 'hw-offload' policy is enabled.
> It also requires 'tc-policy' to be set to 'skip_sw'; otherwise, flow
> offload errors (specifically ENOSPC error this feature depends on) reported
> by an offloaded device are supressed by TC-Flower kernel module.
> 
> Signed-off-by: Sriharsha Basavapatna <[email protected]>
> Co-authored-by: Venkat Duvvuru <[email protected]>
> Signed-off-by: Venkat Duvvuru <[email protected]>
> Reviewed-by: Sathya Perla <[email protected]>
> ---
>  NEWS                          |   3 +
>  lib/dpif-netdev.c             |   3 +-
>  lib/dpif-netlink.c            |  20 +-
>  lib/dpif-provider.h           |   8 +-
>  lib/dpif.c                    |  30 ++-
>  lib/dpif.h                    |  12 +-
>  lib/netdev-provider.h         |   7 +-
>  lib/netdev.c                  |  71 +++++-
>  lib/netdev.h                  |   2 +
>  ofproto/ofproto-dpif-upcall.c | 468 +++++++++++++++++++++++++++++++++-
>  vswitchd/vswitch.xml          |  21 ++
>  11 files changed, 614 insertions(+), 31 deletions(-)
> 
> diff --git a/NEWS b/NEWS
> index 33b4d8a23..846b46fb5 100644
> --- a/NEWS
> +++ b/NEWS
> @@ -8,6 +8,9 @@ Post-v2.10.0
>       as the default timeout for control utilities.
>     - ovn:
>       * ovn-ctl: allow passing user:group ids to the OVN daemons.
> +   - ovs-vswitchd:
> +     * New configuration option "offload-rebalance", that enables dynamic
> +       rebalancing of offloaded flows.
>  
>  
>  v2.10.0 - xx xxx xxxx
> diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
> index 7c0300cc5..1c01d2278 100644
> --- a/lib/dpif-netdev.c
> +++ b/lib/dpif-netdev.c
> @@ -3689,7 +3689,8 @@ dpif_netdev_execute(struct dpif *dpif, struct 
> dpif_execute *execute)
>  }
>  
>  static void
> -dpif_netdev_operate(struct dpif *dpif, struct dpif_op **ops, size_t n_ops)
> +dpif_netdev_operate(struct dpif *dpif, struct dpif_op **ops, size_t n_ops,
> +                    enum dpif_offload_type offload_type OVS_UNUSED)
>  {
>      size_t i;
>  
> diff --git a/lib/dpif-netlink.c b/lib/dpif-netlink.c
> index 6ffe8af25..881dbc69c 100644
> --- a/lib/dpif-netlink.c
> +++ b/lib/dpif-netlink.c
> @@ -2179,7 +2179,7 @@ parse_flow_put(struct dpif_netlink *dpif, struct 
> dpif_flow_put *put)
>          VLOG_DBG("added flow");
>      } else if (err != EEXIST) {
>          struct netdev *oor_netdev = NULL;
> -        if (err == ENOSPC) {
> +        if (err == ENOSPC && netdev_is_offload_rebalance_policy_enabled()) {

I think that the check on netdev_is_offload_rebalance_policy_enabled()
should be added to the patch that adds the lines immediately below.
Perhaps via a dummy implementation of
netdev_is_offload_rebalance_policy_enabled that always returns false.

>              oor_netdev = flow_get_tunnel_netdev(&match.flow.tunnel);
>              if (!oor_netdev) {
>                  oor_netdev = dev;
> @@ -2274,7 +2274,8 @@ dpif_netlink_operate_chunks(struct dpif_netlink *dpif, 
> struct dpif_op **ops,
>  }
>  
>  static void
> -dpif_netlink_operate(struct dpif *dpif_, struct dpif_op **ops, size_t n_ops)
> +dpif_netlink_operate(struct dpif *dpif_, struct dpif_op **ops, size_t n_ops,
> +                     enum dpif_offload_type offload_type)
>  {
>      struct dpif_netlink *dpif = dpif_netlink_cast(dpif_);
>      struct dpif_op *new_ops[OPERATE_MAX_OPS];
> @@ -2282,7 +2283,10 @@ dpif_netlink_operate(struct dpif *dpif_, struct 
> dpif_op **ops, size_t n_ops)
>      int i = 0;
>      int err = 0;
>  
> -    if (netdev_is_flow_api_enabled()) {
> +    ovs_assert(offload_type != DPIF_OFFLOAD_ALWAYS ||
> +               netdev_is_flow_api_enabled());

Assert seems rather heavy handed here.

> +
> +    if (offload_type != DPIF_OFFLOAD_NEVER && netdev_is_flow_api_enabled()) {
>          while (n_ops > 0) {
>              count = 0;
>  
> @@ -2291,6 +2295,14 @@ dpif_netlink_operate(struct dpif *dpif_, struct 
> dpif_op **ops, size_t n_ops)
>  
>                  err = try_send_to_netdev(dpif, op);
>                  if (err && err != EEXIST) {
> +                    if (offload_type == DPIF_OFFLOAD_ALWAYS) {
> +                        op->error = err;
> +                        while (--n_ops) {
> +                            op = ops[i++];
> +                            op->error = err;
> +                        }
> +                        return;
> +                    }

I'm not clear on the purpose of the above logic.

>                      new_ops[count++] = op;
>                  } else {
>                      op->error = err;
> @@ -2301,7 +2313,7 @@ dpif_netlink_operate(struct dpif *dpif_, struct dpif_op 
> **ops, size_t n_ops)
>  
>              dpif_netlink_operate_chunks(dpif, new_ops, count);
>          }
> -    } else {
> +    } else if (offload_type != DPIF_OFFLOAD_ALWAYS) {
>          dpif_netlink_operate_chunks(dpif, ops, n_ops);
>      }
>  }
> diff --git a/lib/dpif-provider.h b/lib/dpif-provider.h
> index 7a71f5c0a..a30de740f 100644
> --- a/lib/dpif-provider.h
> +++ b/lib/dpif-provider.h
> @@ -296,12 +296,14 @@ struct dpif_class {
>  
>      int (*flow_dump_next)(struct dpif_flow_dump_thread *thread,
>                            struct dpif_flow *flows, int max_flows);
> -
>      /* Executes each of the 'n_ops' operations in 'ops' on 'dpif', in the 
> order
>       * in which they are specified, placing each operation's results in the
>       * "output" members documented in comments and the 'error' member of each
> -     * dpif_op. */
> -    void (*operate)(struct dpif *dpif, struct dpif_op **ops, size_t n_ops);
> +     * dpif_op. The offload_type argument tells the provider if 'ops' should
> +     * be submitted to to a netdev (only offload) or to the kernel datapath
> +     * (never offload) or to both (offload if possible; software fallback). 
> */
> +    void (*operate)(struct dpif *dpif, struct dpif_op **ops, size_t n_ops,
> +                    enum dpif_offload_type offload_type);
>  
>      /* Enables or disables receiving packets with dpif_recv() for 'dpif'.
>       * Turning packet receive off and then back on is allowed to change 
> Netlink
> diff --git a/lib/dpif.c b/lib/dpif.c
> index d799f972c..65880b86a 100644
> --- a/lib/dpif.c
> +++ b/lib/dpif.c
> @@ -49,6 +49,7 @@
>  #include "valgrind.h"
>  #include "openvswitch/ofp-errors.h"
>  #include "openvswitch/vlog.h"
> +#include "lib/netdev-provider.h"
>  
>  VLOG_DEFINE_THIS_MODULE(dpif);
>  
> @@ -1015,7 +1016,7 @@ dpif_flow_get(struct dpif *dpif,
>      op.flow_get.flow->key_len = key_len;
>  
>      opp = &op;
> -    dpif_operate(dpif, &opp, 1);
> +    dpif_operate(dpif, &opp, 1, DPIF_OFFLOAD_AUTO);
>  
>      return op.error;
>  }
> @@ -1045,7 +1046,7 @@ dpif_flow_put(struct dpif *dpif, enum 
> dpif_flow_put_flags flags,
>      op.flow_put.stats = stats;
>  
>      opp = &op;
> -    dpif_operate(dpif, &opp, 1);
> +    dpif_operate(dpif, &opp, 1, DPIF_OFFLOAD_AUTO);
>  
>      return op.error;
>  }
> @@ -1068,7 +1069,7 @@ dpif_flow_del(struct dpif *dpif,
>      op.flow_del.terse = false;
>  
>      opp = &op;
> -    dpif_operate(dpif, &opp, 1);
> +    dpif_operate(dpif, &opp, 1, DPIF_OFFLOAD_AUTO);
>  
>      return op.error;
>  }
> @@ -1325,7 +1326,7 @@ dpif_execute(struct dpif *dpif, struct dpif_execute 
> *execute)
>          op.execute = *execute;
>  
>          opp = &op;
> -        dpif_operate(dpif, &opp, 1);
> +        dpif_operate(dpif, &opp, 1, DPIF_OFFLOAD_AUTO);
>  
>          return op.error;
>      } else {
> @@ -1336,10 +1337,21 @@ dpif_execute(struct dpif *dpif, struct dpif_execute 
> *execute)
>  /* Executes each of the 'n_ops' operations in 'ops' on 'dpif', in the order 
> in
>   * which they are specified.  Places each operation's results in the "output"
>   * members documented in comments, and 0 in the 'error' member on success or 
> a
> - * positive errno on failure. */
> + * positive errno on failure.
> + */
>  void
> -dpif_operate(struct dpif *dpif, struct dpif_op **ops, size_t n_ops)
> -{
> +dpif_operate(struct dpif *dpif, struct dpif_op **ops, size_t n_ops,
> +             enum dpif_offload_type offload_type)
> +{
> +    if (offload_type == DPIF_OFFLOAD_ALWAYS && 
> !netdev_is_flow_api_enabled()) {
> +        size_t i;
> +        for (i = 0; i < n_ops; i++) {
> +            struct dpif_op *op = ops[i];
> +            op->error = EINVAL;
> +        }
> +        return;
> +    }
> +
>      while (n_ops > 0) {
>          size_t chunk;
>  
> @@ -1360,7 +1372,7 @@ dpif_operate(struct dpif *dpif, struct dpif_op **ops, 
> size_t n_ops)
>               * handle itself, without help. */
>              size_t i;
>  
> -            dpif->dpif_class->operate(dpif, ops, chunk);
> +            dpif->dpif_class->operate(dpif, ops, chunk, offload_type);
>  
>              for (i = 0; i < chunk; i++) {
>                  struct dpif_op *op = ops[i];
> @@ -1657,7 +1669,7 @@ dpif_queue_to_priority(const struct dpif *dpif, 
> uint32_t queue_id,
>      log_operation(dpif, "queue_to_priority", error);
>      return error;
>  }
> -
> +
>  void
>  dpif_init(struct dpif *dpif, const struct dpif_class *dpif_class,
>            const char *name,
> diff --git a/lib/dpif.h b/lib/dpif.h
> index bbdc3eb6c..0675ab19f 100644
> --- a/lib/dpif.h
> +++ b/lib/dpif.h
> @@ -614,6 +614,13 @@ enum dpif_op_type {
>      DPIF_OP_FLOW_GET,
>  };
>  
> +/* offload_type argument types to (*operate) interface */
> +enum dpif_offload_type {
> +    DPIF_OFFLOAD_AUTO,         /* Offload if possible, fallback to software. 
> */
> +    DPIF_OFFLOAD_NEVER,        /* Never offload to hardware. */
> +    DPIF_OFFLOAD_ALWAYS,       /* Always offload to hardware. */
> +};
> +
>  /* Add or modify a flow.
>   *
>   * The flow is specified by the Netlink attributes with types OVS_KEY_ATTR_* 
> in
> @@ -768,8 +775,9 @@ struct dpif_op {
>      };
>  };
>  
> -void dpif_operate(struct dpif *, struct dpif_op **ops, size_t n_ops);
> -
> +void dpif_operate(struct dpif *, struct dpif_op **ops, size_t n_ops,
> +                  enum dpif_offload_type);
> +
>  /* Upcalls. */
>  
>  enum dpif_upcall_type {
> diff --git a/lib/netdev-provider.h b/lib/netdev-provider.h
> index e320dad61..fb0c27e6e 100644
> --- a/lib/netdev-provider.h
> +++ b/lib/netdev-provider.h
> @@ -38,10 +38,14 @@ struct netdev_tnl_build_header_params;
>  /* Offload-capable (HW) netdev information */
>  struct netdev_hw_info {
>      bool oor;                /* Out of Offload Resources ? */
> +    int offload_count;  /* Pending (non-offloaded) flow count */
> +    int pending_count;  /* Offloaded flow count */
>  };
>  
>  enum hw_info_type {
> -    HW_INFO_TYPE_OOR = 1     /* OOR state */
> +    HW_INFO_TYPE_OOR = 1,            /* OOR state */
> +    HW_INFO_TYPE_PEND_COUNT = 2,     /* Pending(non-offloaded) flow count */
> +    HW_INFO_TYPE_OFFL_COUNT = 3              /* Offloaded flow count */
>  };
>  
>  /* A network device (e.g. an Ethernet device).
> @@ -89,7 +93,6 @@ struct netdev {
>      int n_rxq;
>      struct shash_node *node;            /* Pointer to element in global map. 
> */
>      struct ovs_list saved_flags_list; /* Contains "struct 
> netdev_saved_flags". */
> -
>      struct netdev_hw_info hw_info;   /* offload-capable netdev info */
>  };
>  
> diff --git a/lib/netdev.c b/lib/netdev.c
> index 5ec4c935f..5a61bbd2e 100644
> --- a/lib/netdev.c
> +++ b/lib/netdev.c
> @@ -2260,11 +2260,23 @@ netdev_get_block_id(struct netdev *netdev)
>  int
>  netdev_get_hw_info(struct netdev *netdev, int type)
>  {
> -    if (type == HW_INFO_TYPE_OOR) {
> -        return netdev->hw_info.oor;
> +    int val = -1;
> +
> +    switch (type) {
> +    case HW_INFO_TYPE_OOR:
> +        val = netdev->hw_info.oor;
> +        break;
> +    case HW_INFO_TYPE_PEND_COUNT:
> +        val = netdev->hw_info.pending_count;
> +        break;
> +    case HW_INFO_TYPE_OFFL_COUNT:
> +        val = netdev->hw_info.offload_count;
> +        break;
> +    default:
> +        break;
>      }
>  
> -    return -1;
> +    return val;
>  }
>  
>  /*
> @@ -2273,9 +2285,47 @@ netdev_get_hw_info(struct netdev *netdev, int type)
>  void
>  netdev_set_hw_info(struct netdev *netdev, int type, int val)
>  {
> -    if (type == HW_INFO_TYPE_OOR) {
> +    switch (type) {
> +    case HW_INFO_TYPE_OOR:
> +        if (val == 0) {
> +            VLOG_DBG("Offload rebalance: netdev: %s is not OOR", 
> netdev->name);
> +        }
>          netdev->hw_info.oor = val;
> +        break;
> +    case HW_INFO_TYPE_PEND_COUNT:
> +        netdev->hw_info.pending_count = val;
> +        break;
> +    case HW_INFO_TYPE_OFFL_COUNT:
> +        netdev->hw_info.offload_count = val;
> +        break;
> +    default:
> +        break;
> +    }
> +}
> +
> +/*
> + * Find if any netdev is in OOR state. Return true if there's at least
> + * one netdev that's in OOR state; otherwise return false.
> + */
> +bool
> +netdev_any_oor(void)
> +    OVS_EXCLUDED(netdev_mutex)
> +{
> +    struct shash_node *node;
> +    bool oor = false;
> +
> +    ovs_mutex_lock(&netdev_mutex);
> +    SHASH_FOR_EACH (node, &netdev_shash) {
> +        struct netdev *dev = node->data;
> +
> +        if (dev->hw_info.oor) {
> +            oor = true;
> +            break;
> +        }
>      }
> +    ovs_mutex_unlock(&netdev_mutex);
> +
> +    return oor;
>  }
>  
>  bool
> @@ -2515,6 +2565,15 @@ netdev_free_custom_stats_counters(struct 
> netdev_custom_stats *custom_stats)
>  }
>  
>  #ifdef __linux__
> +
> +static bool netdev_offload_rebalance_policy = false;
> +
> +bool
> +netdev_is_offload_rebalance_policy_enabled(void)
> +{
> +    return netdev_offload_rebalance_policy;
> +}
> +
>  static void
>  netdev_ports_flow_init(void)
>  {
> @@ -2541,6 +2600,10 @@ netdev_set_flow_api_enabled(const struct smap 
> *ovs_other_config)
>              tc_set_policy(smap_get_def(ovs_other_config, "tc-policy",
>                                         TC_POLICY_DEFAULT));
>  
> +            if (smap_get_bool(ovs_other_config, "offload-rebalance", false)) 
> {
> +                netdev_offload_rebalance_policy = true;
> +            }
> +
>              netdev_ports_flow_init();
>  
>              ovsthread_once_done(&once);
> diff --git a/lib/netdev.h b/lib/netdev.h
> index dea727fcf..373be7cc0 100644
> --- a/lib/netdev.h
> +++ b/lib/netdev.h
> @@ -229,8 +229,10 @@ int netdev_init_flow_api(struct netdev *);
>  uint32_t netdev_get_block_id(struct netdev *);
>  int netdev_get_hw_info(struct netdev *, int);
>  void netdev_set_hw_info(struct netdev *, int, int);
> +bool netdev_any_oor(void);
>  bool netdev_is_flow_api_enabled(void);
>  void netdev_set_flow_api_enabled(const struct smap *ovs_other_config);
> +bool netdev_is_offload_rebalance_policy_enabled(void);
>  
>  struct dpif_port;
>  int netdev_ports_insert(struct netdev *, const struct dpif_class *,
> diff --git a/ofproto/ofproto-dpif-upcall.c b/ofproto/ofproto-dpif-upcall.c
> index 9a36dca74..b6dc9de82 100644
> --- a/ofproto/ofproto-dpif-upcall.c
> +++ b/ofproto/ofproto-dpif-upcall.c
> @@ -22,6 +22,7 @@
>  #include "connmgr.h"
>  #include "coverage.h"
>  #include "cmap.h"
> +#include "lib/dpif-provider.h"
>  #include "dpif.h"
>  #include "openvswitch/dynamic-string.h"
>  #include "fail-open.h"
> @@ -42,7 +43,6 @@
>  #include "tunnel.h"
>  #include "unixctl.h"
>  #include "openvswitch/vlog.h"
> -#include "lib/dpif-provider.h"
>  #include "lib/netdev-provider.h"
>  
>  #define MAX_QUEUE_LENGTH 512
> @@ -182,6 +182,8 @@ struct udpif {
>      uint64_t conn_seq;                 /* Corresponds to 'dump_seq' when
>                                            conns[n_conns-1] was stored. */
>      size_t n_conns;                    /* Number of connections waiting. */
> +
> +    long long int offload_rebalance_time;  /* Time of last offload rebalance 
> */
>  };
>  
>  enum upcall_type {
> @@ -398,6 +400,12 @@ static int upcall_receive(struct upcall *, const struct 
> dpif_backer *,
>                            const ovs_u128 *ufid, const unsigned pmd_id);
>  static void upcall_uninit(struct upcall *);
>  
> +static void udpif_flow_rebalance(struct udpif *udpif);
> +static int udpif_flow_program(struct udpif *udpif, struct udpif_key *ukey,
> +                              enum dpif_offload_type offload_type);
> +static int udpif_flow_unprogram(struct udpif *udpif, struct udpif_key *ukey,
> +                                enum dpif_offload_type offload_type);
> +
>  static upcall_callback upcall_cb;
>  static dp_purge_callback dp_purge_cb;
>  
> @@ -569,6 +577,7 @@ udpif_start_threads(struct udpif *udpif, size_t 
> n_handlers_,
>          ovs_barrier_init(&udpif->pause_barrier, udpif->n_revalidators + 1);
>          udpif->reval_exit = false;
>          udpif->pause = false;
> +        udpif->offload_rebalance_time = time_msec();
>          udpif->revalidators = xzalloc(udpif->n_revalidators
>                                        * sizeof *udpif->revalidators);
>          for (size_t i = 0; i < udpif->n_revalidators; i++) {
> @@ -861,6 +870,26 @@ free_dupcall:
>      return n_upcalls;
>  }
>  
> +static void
> +udpif_run_flow_rebalance(struct udpif *udpif)
> +{
> +    long long int now = 0;
> +
> +    /* Don't rebalance if OFFL_REBAL_INTVL_MSEC have not elapsed */
> +    now = time_msec();
> +    if (now < udpif->offload_rebalance_time + OFFL_REBAL_INTVL_MSEC) {
> +        return;
> +    }
> +
> +    if (!netdev_any_oor()) {
> +        return;
> +    }
> +
> +    VLOG_DBG("Offload rebalance: Found OOR netdevs");
> +    udpif->offload_rebalance_time = now;
> +    udpif_flow_rebalance(udpif);
> +}
> +
>  static void *
>  udpif_revalidator(void *arg)
>  {
> @@ -935,6 +964,9 @@ udpif_revalidator(void *arg)
>  
>              dpif_flow_dump_destroy(udpif->dump);
>              seq_change(udpif->dump_seq);
> +            if (netdev_is_offload_rebalance_policy_enabled()) {
> +                udpif_run_flow_rebalance(udpif);
> +            }
>  
>              duration = MAX(time_msec() - start_time, 1);
>              udpif->dump_duration = duration;
> @@ -979,7 +1011,7 @@ udpif_revalidator(void *arg)
>  
>      return NULL;
>  }
> -
> +
>  static enum upcall_type
>  classify_upcall(enum dpif_upcall_type type, const struct nlattr *userdata,
>                  struct user_action_cookie *cookie)
> @@ -1581,7 +1613,7 @@ handle_upcalls(struct udpif *udpif, struct upcall 
> *upcalls,
>      for (i = 0; i < n_ops; i++) {
>          opsp[n_opsp++] = &ops[i].dop;
>      }
> -    dpif_operate(udpif->dpif, opsp, n_opsp);
> +    dpif_operate(udpif->dpif, opsp, n_opsp, DPIF_OFFLOAD_AUTO);
>      for (i = 0; i < n_ops; i++) {
>          struct udpif_key *ukey = ops[i].ukey;
>  
> @@ -1673,7 +1705,7 @@ ukey_create__(const struct nlattr *key, size_t key_len,
>      ukey->state = UKEY_CREATED;
>      ukey->state_thread = ovsthread_id_self();
>      ukey->state_where = OVS_SOURCE_LOCATOR;
> -    ukey->created = time_msec();
> +    ukey->created = ukey->flow_time = time_msec();
>      memset(&ukey->stats, 0, sizeof ukey->stats);
>      ukey->stats.used = used;
>      ukey->xcache = NULL;
> @@ -2332,7 +2364,7 @@ push_dp_ops(struct udpif *udpif, struct ukey_op *ops, 
> size_t n_ops)
>      for (i = 0; i < n_ops; i++) {
>          opsp[i] = &ops[i].dop;
>      }
> -    dpif_operate(udpif->dpif, opsp, n_ops);
> +    dpif_operate(udpif->dpif, opsp, n_ops, DPIF_OFFLOAD_AUTO);
>  
>      for (i = 0; i < n_ops; i++) {
>          struct ukey_op *op = &ops[i];
> @@ -2458,6 +2490,79 @@ reval_op_init(struct ukey_op *op, enum reval_result 
> result,
>      }
>  }
>  
> +static void
> +ukey_netdev_unref(struct udpif_key *ukey)
> +{
> +    if (ukey->in_netdev) {
> +        netdev_close(ukey->in_netdev);
> +        ukey->in_netdev = NULL;
> +    }
> +    if (ukey->out_netdev) {
> +        netdev_close(ukey->out_netdev);
> +        ukey->out_netdev = NULL;
> +    }
> +}
> +
> +/*
> + * Given a udpif_key, get its input and output ports (netdevs) by parsing
> + * the flow keys and actions. The flow may not contain flow attributes if
> + * it is a terse dump; read its attributes from the ukey and then parse
> + * the flow to get the port info. Save them in udpif_key.
> + */
> +static void
> +ukey_to_flow_netdevs(struct udpif *udpif, struct udpif_key *ukey)
> +{
> +    const struct dpif *dpif = udpif->dpif;
> +    const struct dpif_class *dpif_class = dpif->dpif_class;
> +    const struct nlattr *actions;
> +    const struct nlattr *a;
> +    const struct nlattr *k;
> +    size_t actions_len;
> +    unsigned int left;
> +
> +    /* Remove existing references to netdevs */
> +    ukey_netdev_unref(ukey);
> +
> +    /* Read actions from ukey */
> +    ukey_get_actions(ukey, &actions, &actions_len);
> +
> +    /* Capture the output port and get a reference to its netdev; we are
> +     * only interested that the flow has an output port, so we just save the
> +     * first port if there are multiple output actions associated with it.
> +     */
> +    NL_ATTR_FOR_EACH (a, left, actions, actions_len) {
> +        enum ovs_action_attr type = nl_attr_type(a);
> +        if (type == OVS_ACTION_ATTR_OUTPUT) {
> +            ukey->out_netdev = netdev_ports_get(nl_attr_get_odp_port(a),
> +                                                dpif_class);
> +            break;
> +        }
> +    }
> +
> +    /* Now find the input port and get a reference to its netdev */
> +    NL_ATTR_FOR_EACH (k, left, ukey->key, ukey->key_len) {
> +        enum ovs_key_attr type = nl_attr_type(k);
> +
> +        if (type == OVS_KEY_ATTR_IN_PORT) {
> +            ukey->in_netdev = netdev_ports_get(nl_attr_get_odp_port(k),
> +                                               dpif_class);
> +        } else if (type == OVS_KEY_ATTR_TUNNEL) {
> +            struct flow_tnl tnl;
> +            enum odp_key_fitness res;
> +
> +            if (ukey->in_netdev) {
> +                netdev_close(ukey->in_netdev);
> +                ukey->in_netdev = NULL;
> +            }
> +            res = odp_tun_key_from_attr(k, &tnl);
> +            if (res != ODP_FIT_ERROR) {
> +                ukey->in_netdev = flow_get_tunnel_netdev(&tnl);
> +                break;
> +            }
> +        }
> +    }
> +}
> +
>  static uint64_t
>  udpif_flow_packet_delta(struct udpif_key *ukey, const struct dpif_flow *f)
>  {
> @@ -2471,6 +2576,16 @@ udpif_flow_time_delta(struct udpif *udpif, struct 
> udpif_key *ukey)
>      return (udpif->dpif->current_ms - ukey->flow_time) / 1000;
>  }
>  
> +/*
> + * Save backlog packet count while switching modes
> + * between offloaded and kernel datapaths.
> + */
> +static void
> +udpif_set_ukey_backlog_packets(struct udpif_key *ukey)
> +{
> +    ukey->flow_backlog_packets = ukey->flow_packets;
> +}
> +
>  /* Gather pps-rate for the given dpif_flow and save it in its ukey */
>  static void
>  udpif_update_flow_pps(struct udpif *udpif, struct udpif_key *ukey,
> @@ -2545,6 +2660,7 @@ revalidate(struct revalidator *revalidator)
>          kill_them_all = n_dp_flows > flow_limit * 2;
>          max_idle = n_dp_flows > flow_limit ? 100 : ofproto_max_idle;
>  
> +        udpif->dpif->current_ms = time_msec();
>          for (f = flows; f < &flows[n_dumped]; f++) {
>              long long int used = f->stats.used;
>              struct recirc_refs recircs = RECIRC_REFS_EMPTY_INITIALIZER;
> @@ -2602,7 +2718,8 @@ revalidate(struct revalidator *revalidator)
>              }
>              ukey->dump_seq = dump_seq;
>  
> -            if (result != UKEY_DELETE) {
> +            if (netdev_is_offload_rebalance_policy_enabled() &&
> +                result != UKEY_DELETE) {
>                  udpif_update_flow_pps(udpif, ukey, f);
>              }
>  
> @@ -2920,3 +3037,342 @@ upcall_unixctl_purge(struct unixctl_conn *conn, int 
> argc OVS_UNUSED,
>      }
>      unixctl_command_reply(conn, "");
>  }
> +
> +/* Flows are sorted in the following order:
> + * netdev, flow state (offloaded/kernel path), flow_pps_rate.
> + */
> +static int
> +flow_compare_rebalance(const void *elem1, const void *elem2)
> +{
> +    const struct udpif_key *f1 = *(struct udpif_key **)elem1;
> +    const struct udpif_key *f2 = *(struct udpif_key **)elem2;
> +    int64_t diff;
> +
> +    if (f1->in_netdev < f2->in_netdev) {
> +        return -1;
> +    } else if (f1->in_netdev > f2->in_netdev) {
> +        return 1;
> +    }
> +
> +    if (f1->offloaded != f2->offloaded) {
> +        return f2->offloaded - f1->offloaded;
> +    }
> +
> +    diff = (f1->offloaded == true) ?
> +        f1->flow_pps_rate - f2->flow_pps_rate :
> +        f2->flow_pps_rate - f1->flow_pps_rate;
> +
> +    return (diff < 0) ? -1 : 1;
> +}
> +
> +/* Insert flows from pending array during rebalancing */
> +static int
> +rebalance_insert_pending(struct udpif *udpif, struct udpif_key 
> **pending_flows,
> +                         int pending_count, int insert_count,
> +                         uint64_t rate_threshold)
> +{
> +    int i, err = 0;
> +    struct udpif_key *flow;
> +    int count = 0;
> +
> +    for (i = 0; i < pending_count; i++) {
> +        flow = pending_flows[i];
> +
> +        /* Stop offloading pending flows if the insert count is
> +         * reached and the flow rate is less than the threshold
> +         */
> +        if (count >= insert_count && flow->flow_pps_rate < rate_threshold) {
> +                break;
> +        }
> +
> +        /* Offload the flow to netdev */
> +        err = udpif_flow_program(udpif, flow, DPIF_OFFLOAD_ALWAYS);
> +
> +        if (err == ENOSPC) {
> +            /* Stop if we are out of resources */
> +            break;
> +        }
> +
> +        if (err) {
> +            continue;
> +        }
> +
> +        /* Offload succeeded; delete it from the kernel datapath */
> +        udpif_flow_unprogram(udpif, flow, DPIF_OFFLOAD_NEVER);
> +
> +        /* Change the state of the flow, adjust dpif counters */
> +        flow->offloaded = true;
> +
> +        udpif_set_ukey_backlog_packets(flow);
> +        count++;
> +    }
> +
> +    return count;
> +}
> +
> +/* Remove flows from offloaded array during rebalancing */
> +static void
> +rebalance_remove_offloaded(struct udpif *udpif,
> +                           struct udpif_key **offloaded_flows,
> +                           int offload_count)
> +{
> +    int i;
> +    struct udpif_key *flow;
> +
> +    for (i = 0; i < offload_count; i++) {
> +        flow = offloaded_flows[i];
> +
> +        /* Remove offloaded flow from netdev */
> +        udpif_flow_unprogram(udpif, flow, DPIF_OFFLOAD_ALWAYS);

I am concerned there is a race here. What if an upcall occurs
for a flow removed by udpif_flow_unprogram() before udpif_flow_program()
is called.

> +
> +        /* Install the flow into kernel path */
> +        udpif_flow_program(udpif, flow, DPIF_OFFLOAD_NEVER);
> +        flow->offloaded = false;
> +
> +        udpif_set_ukey_backlog_packets(flow);
> +    }
> +}
> +
> +/*
> + * Rebalance offloaded flows on a netdev that's in OOR state.
> + *
> + * The rebalancing is done in two phases. In the first phase, we check if
> + * the pending flows can be offloaded (if some resources became available
> + * in the meantime) by trying to offload each pending flow. If all pending
> + * flows get successfully offloaded, the OOR state is cleared on the netdev
> + * and there's nothing to rebalance.
> + *
> + * If some of the pending flows could not be offloaded, i.e, we still see
> + * the OOR error, then we move to the second phase of rebalancing. In this
> + * phase, the rebalancer compares pps-rate of an offloaded flow with the
> + * least pps-rate with that of a pending flow with the highest pps-rate from
> + * their respective sorted arrays. If pps-rate of the offloaded flow is less
> + * than the pps-rate of the pending flow, then it deletes the offloaded flow
> + * from the HW/netdev and adds it to kernel datapath and then offloads 
> pending
> + * to HW/netdev. This process is repeated for every pair of offloaded and
> + * pending flows in the ordered list. The process stops when we encounter an
> + * offloaded flow that has a higher pps-rate than the corresponding pending
> + * flow. The entire rebalancing process is repeated in the next iteration.
> + */
> +static bool
> +rebalance_device(struct udpif *udpif, struct udpif_key **offloaded_flows,
> +                 int offload_count, struct udpif_key **pending_flows,
> +                 int pending_count)
> +{
> +
> +    /* Phase 1 */
> +    int num_inserted = rebalance_insert_pending(udpif, pending_flows,
> +                                                pending_count, pending_count,
> +                                                0);
> +    if (num_inserted) {
> +        VLOG_DBG("Offload rebalance: Phase1: inserted %d pending flows",
> +                  num_inserted);
> +    }
> +
> +    /* Adjust pending array */
> +    pending_flows = &pending_flows[num_inserted];
> +    pending_count -= num_inserted;
> +
> +    if (!pending_count) {
> +        /*
> +         * Successfully offloaded all pending flows. The device
> +         * is no longer in OOR state; done rebalancing this device.
> +         */
> +        return false;
> +    }
> +
> +    /*
> +     * Phase 2; determine how many offloaded flows to churn.
> +     */
> +#define      OFFL_REBAL_MAX_CHURN    1024
> +    int churn_count = 0;
> +    while (churn_count < OFFL_REBAL_MAX_CHURN && churn_count < offload_count
> +           && churn_count < pending_count) {
> +        if (pending_flows[churn_count]->flow_pps_rate <=
> +            offloaded_flows[churn_count]->flow_pps_rate)
> +                break;
> +        churn_count++;
> +    }
> +
> +    if (churn_count) {
> +        VLOG_DBG("Offload rebalance: Phase2: removed %d offloaded flows",
> +                  churn_count);

nit: the flows haven't been removed yet

> +    }
> +
> +    /* Bail early if nothing to churn */
> +    if (!churn_count) {
> +        return true;
> +    }
> +
> +    /* Remove offloaded flows */
> +    rebalance_remove_offloaded(udpif, offloaded_flows, churn_count);
> +
> +    /* Adjust offloaded array */
> +    offloaded_flows = &offloaded_flows[churn_count];
> +    offload_count -= churn_count;
> +
> +    /* Replace offloaded flows with pending flows */
> +    num_inserted = rebalance_insert_pending(udpif, pending_flows,
> +                                            pending_count, churn_count,
> +                                            offload_count ?
> +                                            
> offloaded_flows[0]->flow_pps_rate :
> +                                            0);
> +    if (num_inserted) {
> +        VLOG_DBG("Offload rebalance: Phase2: inserted %d pending flows",
> +                  num_inserted);
> +    }
> +
> +    return true;
> +}
> +
> +static struct udpif_key **
> +udpif_add_oor_flows(struct udpif_key **sort_flows, size_t *total_flow_count,
> +                    size_t *alloc_flow_count, struct udpif_key *ukey)
> +{
> +    if (*total_flow_count >= *alloc_flow_count) {
> +        sort_flows = x2nrealloc(sort_flows, alloc_flow_count, sizeof ukey);
> +    }
> +    sort_flows[(*total_flow_count)++] = ukey;
> +    return sort_flows;
> +}
> +
> +/*
> + * Build sort_flows[] initially with flows that
> + * reference an 'OOR' netdev as their input port.
> + */
> +static struct udpif_key **
> +udpif_build_oor_flows(struct udpif_key **sort_flows, size_t 
> *total_flow_count,
> +                      size_t *alloc_flow_count, struct udpif_key *ukey,
> +                      int *oor_netdev_count)
> +{
> +    struct netdev *netdev;
> +    int count;
> +
> +    /* Both input and output netdevs must be available for the flow */

I'm unclear on why this condition is necessary.

Presumably all flows have an in_netdev.
But why exclude those that do not have an out_netdev, i.e. drop rules.

> +    if (!ukey->in_netdev || !ukey->out_netdev) {
> +        ukey_netdev_unref(ukey);
> +        return sort_flows;
> +    }
> +
> +    /* Get input netdev for the flow */
> +    netdev = ukey->in_netdev;
> +
> +    /* Is the in-netdev for this flow in OOR state ? */
> +    if (!netdev_get_hw_info(netdev, HW_INFO_TYPE_OOR)) {
> +        ukey_netdev_unref(ukey);
> +        return sort_flows;
> +    }
> +
> +    /* Add the flow to sort_flows[] */
> +    sort_flows = udpif_add_oor_flows(sort_flows, total_flow_count,
> +                                      alloc_flow_count, ukey);
> +    if (ukey->offloaded) {
> +        count = netdev_get_hw_info(netdev, HW_INFO_TYPE_OFFL_COUNT);
> +        ovs_assert(count >= 0);
> +        if (count++ == 0) {
> +            (*oor_netdev_count)++;
> +        }
> +        netdev_set_hw_info(netdev, HW_INFO_TYPE_OFFL_COUNT, count);
> +    } else {
> +        count = netdev_get_hw_info(netdev, HW_INFO_TYPE_PEND_COUNT);
> +        ovs_assert(count >= 0);
> +        netdev_set_hw_info(netdev, HW_INFO_TYPE_PEND_COUNT, ++count);
> +    }
> +
> +    return sort_flows;
> +}
> +
> +/*
> + * Rebalance offloaded flows on HW netdevs that are in OOR state.
> + */
> +static void
> +udpif_flow_rebalance(struct udpif *udpif)
> +{
> +    struct udpif_key **sort_flows = NULL;
> +    size_t alloc_flow_count = 0;
> +    size_t total_flow_count = 0;
> +    int oor_netdev_count = 0;
> +    int offload_index = 0;
> +    int pending_index;
> +
> +    /* Collect flows (offloaded and pending) that reference OOR netdevs */
> +    for (size_t i = 0; i < N_UMAPS; i++) {
> +        struct udpif_key *ukey;
> +        struct umap *umap = &udpif->ukeys[i];
> +
> +        CMAP_FOR_EACH (ukey, cmap_node, &umap->cmap) {
> +            ukey_to_flow_netdevs(udpif, ukey);
> +            sort_flows = udpif_build_oor_flows(sort_flows, &total_flow_count,
> +                                               &alloc_flow_count, ukey,
> +                                               &oor_netdev_count);
> +        }
> +    }

If I read the above correctly it is collecting all flows present in the
system into a sorted list and then operating on that list below. I am
concerned about memory and CPU impact of this if a large number of flows,
lets say 500k, are present.

> +
> +    /* Sort flows by OOR netdevs, state (offloaded/pending) and pps-rate  */
> +    qsort(sort_flows, total_flow_count, sizeof(struct udpif_key *),
> +          flow_compare_rebalance);
> +
> +    /*
> +     * We now have flows referencing OOR netdevs, that are sorted. We also
> +     * have a count of offloaded and pending flows on each of the netdevs
> +     * that are in OOR state. Now rebalance each oor-netdev.
> +     */
> +    while (oor_netdev_count) {
> +        struct netdev *netdev;
> +        int offload_count;
> +        int pending_count;
> +        bool oor;
> +
> +        netdev = sort_flows[offload_index]->in_netdev;
> +        ovs_assert(netdev_get_hw_info(netdev, HW_INFO_TYPE_OOR) == true);
> +        VLOG_DBG("Offload rebalance: netdev: %s is OOR", netdev->name);
> +
> +        offload_count = netdev_get_hw_info(netdev, HW_INFO_TYPE_OFFL_COUNT);
> +        pending_count = netdev_get_hw_info(netdev, HW_INFO_TYPE_PEND_COUNT);
> +        pending_index = offload_index + offload_count;
> +
> +        oor = rebalance_device(udpif,
> +                               &sort_flows[offload_index], offload_count,
> +                               &sort_flows[pending_index], pending_count);
> +        netdev_set_hw_info(netdev, HW_INFO_TYPE_OOR, oor);
> +
> +        offload_index = pending_index + pending_count;
> +        netdev_set_hw_info(netdev, HW_INFO_TYPE_OFFL_COUNT, 0);
> +        netdev_set_hw_info(netdev, HW_INFO_TYPE_PEND_COUNT, 0);
> +        oor_netdev_count--;
> +    }
> +
> +    for (int i = 0; i < total_flow_count; i++) {
> +        struct udpif_key *ukey = sort_flows[i];
> +        ukey_netdev_unref(ukey);
> +    }
> +    free(sort_flows);
> +}
> +
> +static int
> +udpif_flow_program(struct udpif *udpif, struct udpif_key *ukey,
> +                   enum dpif_offload_type offload_type)
> +{
> +    struct dpif_op *opsp;
> +    struct ukey_op uop;
> +
> +    opsp = &uop.dop;
> +    put_op_init(&uop, ukey, DPIF_FP_CREATE);
> +    dpif_operate(udpif->dpif, &opsp, 1, offload_type);
> +
> +    return opsp->error;
> +}
> +
> +static int
> +udpif_flow_unprogram(struct udpif *udpif, struct udpif_key *ukey,
> +                     enum dpif_offload_type offload_type)
> +{
> +    struct dpif_op *opsp;
> +    struct ukey_op uop;
> +
> +    opsp = &uop.dop;
> +    delete_op_init(udpif, &uop, ukey);
> +    dpif_operate(udpif->dpif, &opsp, 1, offload_type);
> +
> +    return opsp->error;
> +}
> diff --git a/vswitchd/vswitch.xml b/vswitchd/vswitch.xml
> index f05f616fe..2bfe4ff24 100644
> --- a/vswitchd/vswitch.xml
> +++ b/vswitchd/vswitch.xml
> @@ -519,6 +519,27 @@
>          </p>
>      </column>
>  
> +      <column name="other_config" key="offload-rebalance"
> +              type='{"type": "boolean"}'>
> +        <p>
> +            Configures HW offload rebalancing, that allows to dynamically
> +            offload and un-offload flows while an offload-device is out of
> +            resources (OOR). This policy allows flows to be selected for
> +            offloading based on the packets-per-second (pps) rate of flows.
> +        </p>
> +        <p>
> +          Set this value to <code>true</code> to enable this option.
> +        </p>
> +        <p>
> +          The default value is <code>false</code>. Changing this value 
> requires
> +          restarting the daemon.
> +        </p>
> +        <p>
> +            This is only relevant if HW offloading is enabled (hw-offload).
> +            When this policy is enabled, it also requires 'tc-policy' to
> +            be set to 'skip_sw'.
> +        </p>
> +      </column>
>      </group>
>  
>      <group title="Status">
> -- 
> 2.18.0.rc1.1.g6f333ff
> 
> _______________________________________________
> dev mailing list
> [email protected]
> https://mail.openvswitch.org/mailman/listinfo/ovs-dev
> 
_______________________________________________
dev mailing list
[email protected]
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to