> This allows to collect packets from more than one RX burst
> and send them together with a configurable maximum latency.
> 
> 'other_config:output-max-latency' can be used to configure
> time that a packet can wait in output batch for sending.
> 
> Signed-off-by: Ilya Maximets <[email protected]>
> ---
> Notes:
> 
>     * This is an RFC and should not be used for performance testing.
>     * Millisecond granularity is used for now. Can be easily switched
>       to use microseconds instead.

>From earlier in-house trials we know we need to target flush times of 50 us or 
>less, so we clearly need better time resolution. Sub-ms timing in PMD should 
>be based on TSC cycles, which are already kept in the pmd struct. Could you 
>provide a corresponding patch for performance testing?

> 
>  lib/dpif-netdev.c    | 121
> ++++++++++++++++++++++++++++++++++++++++++---------
>  vswitchd/vswitch.xml |  15 +++++++
>  2 files changed, 115 insertions(+), 21 deletions(-)
> 
> diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
> index dcf55f3..0d78ae4 100644
> --- a/lib/dpif-netdev.c
> +++ b/lib/dpif-netdev.c
> @@ -85,6 +85,9 @@ VLOG_DEFINE_THIS_MODULE(dpif_netdev);
>  #define MAX_RECIRC_DEPTH 5
>  DEFINE_STATIC_PER_THREAD_DATA(uint32_t, recirc_depth, 0)
> 
> +/* Use instant packet send by default. */
> +#define DEFAULT_OUTPUT_MAX_LATENCY 0
> +
>  /* Configuration parameters. */
>  enum { MAX_FLOWS = 65536 };     /* Maximum number of flows in flow
> table. */
>  enum { MAX_METERS = 65536 };    /* Maximum number of meters. */
> @@ -262,6 +265,9 @@ struct dp_netdev {
>      struct hmap ports;
>      struct seq *port_seq;       /* Incremented whenever a port changes. */
> 
> +    /* The time that a packet can wait in output batch for sending. */
> +    atomic_uint32_t output_max_latency;
> +
>      /* Meters. */
>      struct ovs_mutex meter_locks[N_METER_LOCKS];
>      struct dp_meter *meters[MAX_METERS]; /* Meter bands. */
> @@ -502,6 +508,7 @@ struct tx_port {
>      int qid;
>      long long last_used;
>      struct hmap_node node;
> +    long long output_time;

Rename to flush_time?

>      struct dp_packet_batch output_pkts;
>  };
> 
> @@ -574,6 +581,9 @@ struct dp_netdev_pmd_thread {
>       * than 'cmap_count(dp->poll_threads)'. */
>      uint32_t static_tx_qid;
> 
> +    /* Number of filled output batches. */
> +    int n_output_batches;
> +
>      struct ovs_mutex port_mutex;    /* Mutex for 'poll_list' and 'tx_ports'.
> */
>      /* List of rx queues to poll. */
>      struct hmap poll_list OVS_GUARDED;
> @@ -669,9 +679,9 @@ static void dp_netdev_add_rxq_to_pmd(struct
> dp_netdev_pmd_thread *pmd,
>  static void dp_netdev_del_rxq_from_pmd(struct dp_netdev_pmd_thread
> *pmd,
>                                         struct rxq_poll *poll)
>      OVS_REQUIRES(pmd->port_mutex);
> -static void
> +static int
>  dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread
> *pmd,
> -                                   long long now);
> +                                   long long now, bool force);
>  static void reconfigure_datapath(struct dp_netdev *dp)
>      OVS_REQUIRES(dp->port_mutex);
>  static bool dp_netdev_pmd_try_ref(struct dp_netdev_pmd_thread *pmd);
> @@ -1193,6 +1203,7 @@ create_dp_netdev(const char *name, const
> struct dpif_class *class,
>      conntrack_init(&dp->conntrack);
> 
>      atomic_init(&dp->emc_insert_min, DEFAULT_EM_FLOW_INSERT_MIN);
> +    atomic_init(&dp->output_max_latency,
> DEFAULT_OUTPUT_MAX_LATENCY);
> 
>      cmap_init(&dp->poll_threads);
> 
> @@ -2858,7 +2869,7 @@ dpif_netdev_execute(struct dpif *dpif, struct
> dpif_execute *execute)
>      dp_packet_batch_init_packet(&pp, execute->packet);
>      dp_netdev_execute_actions(pmd, &pp, false, execute->flow,
>                                execute->actions, execute->actions_len, now);
> -    dp_netdev_pmd_flush_output_packets(pmd, now);
> +    dp_netdev_pmd_flush_output_packets(pmd, now, true);
> 
>      if (pmd->core_id == NON_PMD_CORE_ID) {
>          ovs_mutex_unlock(&dp->non_pmd_mutex);
> @@ -2907,6 +2918,16 @@ dpif_netdev_set_config(struct dpif *dpif, const
> struct smap *other_config)
>          smap_get_ullong(other_config, "emc-insert-inv-prob",
>                          DEFAULT_EM_FLOW_INSERT_INV_PROB);
>      uint32_t insert_min, cur_min;
> +    uint32_t output_max_latency, cur_max_latency;
> +
> +    output_max_latency = smap_get_int(other_config, "output-max-
> latency",
> +                                      DEFAULT_OUTPUT_MAX_LATENCY);
> +    atomic_read_relaxed(&dp->output_max_latency, &cur_max_latency);
> +    if (output_max_latency != cur_max_latency) {
> +        atomic_store_relaxed(&dp->output_max_latency,
> output_max_latency);
> +        VLOG_INFO("Output maximum latency set to %"PRIu32" ms",
> +                  output_max_latency);
> +    }
> 
>      if (!nullable_string_is_equal(dp->pmd_cmask, cmask)) {
>          free(dp->pmd_cmask);
> @@ -3107,11 +3128,12 @@ cycles_count_intermediate(struct
> dp_netdev_pmd_thread *pmd,
>      non_atomic_ullong_add(&pmd->cycles.n[type], interval);
>  }
> 
> -static void
> +static int
>  dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread
> *pmd,
>                                     struct tx_port *p, long long now)
>  {
>      int tx_qid;
> +    int output_cnt;
>      bool dynamic_txqs;
> 
>      dynamic_txqs = p->port->dynamic_txqs;
> @@ -3121,21 +3143,39 @@ dp_netdev_pmd_flush_output_on_port(struct
> dp_netdev_pmd_thread *pmd,
>          tx_qid = pmd->static_tx_qid;
>      }
> 
> +    output_cnt = dp_packet_batch_size(&p->output_pkts);
>      netdev_send(p->port->netdev, tx_qid, &p->output_pkts, dynamic_txqs);
>      dp_packet_batch_init(&p->output_pkts);
> +
> +    if (output_cnt) {
> +        ovs_assert(pmd->n_output_batches > 0);
> +        pmd->n_output_batches--;
> +    }
> +    return output_cnt;
>  }
> 
> -static void
> +static int
>  dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread
> *pmd,
> -                                   long long now)
> +                                   long long now, bool force)
>  {
>      struct tx_port *p;
> +    int output_cnt = 0;
> +
> +    if (!pmd->n_output_batches) {
> +        return 0;
> +    }
> +
> +    if (!now) {
> +        now = time_msec();
> +    }
> 
>      HMAP_FOR_EACH (p, node, &pmd->send_port_cache) {
> -        if (!dp_packet_batch_is_empty(&p->output_pkts)) {
> -            dp_netdev_pmd_flush_output_on_port(pmd, p, now);
> +        if (!dp_packet_batch_is_empty(&p->output_pkts)
> +            && (force || p->output_time <= now)) {
> +            output_cnt += dp_netdev_pmd_flush_output_on_port(pmd, p,
> now);
>          }
>      }
> +    return output_cnt;
>  }
> 
>  static int
> @@ -3145,7 +3185,7 @@ dp_netdev_process_rxq_port(struct
> dp_netdev_pmd_thread *pmd,
>  {
>      struct dp_packet_batch batch;
>      int error;
> -    int batch_cnt = 0;
> +    int batch_cnt = 0, output_cnt = 0;
> 
>      dp_packet_batch_init(&batch);
>      error = netdev_rxq_recv(rx, &batch);
> @@ -3156,7 +3196,7 @@ dp_netdev_process_rxq_port(struct
> dp_netdev_pmd_thread *pmd,
> 
>          batch_cnt = batch.count;
>          dp_netdev_input(pmd, &batch, port_no, now);
> -        dp_netdev_pmd_flush_output_packets(pmd, now);
> +        output_cnt = dp_netdev_pmd_flush_output_packets(pmd, now,
> false);

How large is the overhead of checking all ports for flushing after every Rx 
batch?
With time-based Tx batching this is no longer strictly necessary and we could 
do it in the main PMD loop only.

>      } else if (error != EAGAIN && error != EOPNOTSUPP) {
>          static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
> 
> @@ -3164,7 +3204,7 @@ dp_netdev_process_rxq_port(struct
> dp_netdev_pmd_thread *pmd,
>                      netdev_rxq_get_name(rx), ovs_strerror(error));
>      }
> 
> -    return batch_cnt;
> +    return batch_cnt + output_cnt;
>  }
> 
>  static struct tx_port *
> @@ -3691,7 +3731,8 @@ dpif_netdev_run(struct dpif *dpif)
>      struct dp_netdev *dp = get_dp_netdev(dpif);
>      struct dp_netdev_pmd_thread *non_pmd;
>      uint64_t new_tnl_seq;
> -    int process_packets = 0;
> +    int process_packets;
> +    bool need_to_flush = true;
> 
>      ovs_mutex_lock(&dp->port_mutex);
>      non_pmd = dp_netdev_get_pmd(dp, NON_PMD_CORE_ID);
> @@ -3707,12 +3748,25 @@ dpif_netdev_run(struct dpif *dpif)
>                          dp_netdev_process_rxq_port(non_pmd,
>                                                     port->rxqs[i].rx,
>                                                     port->port_no);
> -                    cycles_count_intermediate(non_pmd, process_packets ?
> -                                                       PMD_CYCLES_PROCESSING
> -                                                     : PMD_CYCLES_IDLE);
> +                    cycles_count_intermediate(non_pmd, process_packets
> +                                                       ? 
> PMD_CYCLES_PROCESSING
> +                                                       : PMD_CYCLES_IDLE);
> +                    if (process_packets) {
> +                        need_to_flush = false;
> +                    }
>                  }
>              }
>          }
> +        if (need_to_flush) {
> +            /* We didn't receive anything in the process loop.
> +             * Check if we need to send something. */
> +            process_packets =
> dp_netdev_pmd_flush_output_packets(non_pmd,
> +                                                                 0, false);
> +            cycles_count_intermediate(non_pmd, process_packets
> +                                               ? PMD_CYCLES_PROCESSING
> +                                               : PMD_CYCLES_IDLE);
> +        }
> +
>          cycles_count_end(non_pmd, PMD_CYCLES_IDLE);
>          dpif_netdev_xps_revalidate_pmd(non_pmd, time_msec(), false);
>          ovs_mutex_unlock(&dp->non_pmd_mutex);
> @@ -3764,6 +3818,8 @@ pmd_free_cached_ports(struct
> dp_netdev_pmd_thread *pmd)
>  {
>      struct tx_port *tx_port_cached;
> 
> +    /* Flush all the queued packets. */
> +    dp_netdev_pmd_flush_output_packets(pmd, 0, true);
>      /* Free all used tx queue ids. */
>      dpif_netdev_xps_revalidate_pmd(pmd, 0, true);
> 
> @@ -3860,7 +3916,6 @@ pmd_thread_main(void *f_)
>      bool exiting;
>      int poll_cnt;
>      int i;
> -    int process_packets = 0;
> 
>      poll_list = NULL;
> 
> @@ -3890,6 +3945,9 @@ reload:
> 
>      cycles_count_start(pmd);
>      for (;;) {
> +        int process_packets;
> +        bool need_to_flush = true;
> +
>          for (i = 0; i < poll_cnt; i++) {
>              process_packets =
>                  dp_netdev_process_rxq_port(pmd, poll_list[i].rx,
> @@ -3897,6 +3955,19 @@ reload:
>              cycles_count_intermediate(pmd,
>                                        process_packets ? PMD_CYCLES_PROCESSING
>                                                        : PMD_CYCLES_IDLE);
> +            if (process_packets) {
> +                need_to_flush = false;
> +            }
> +        }
> +
> +        if (need_to_flush) {
> +            /* We didn't receive anything in the process loop.
> +             * Check if we need to send something. */
> +            process_packets = dp_netdev_pmd_flush_output_packets(pmd,
> +                                                                 0, false);
> +            cycles_count_intermediate(pmd,
> +                                      process_packets ? PMD_CYCLES_PROCESSING
> +                                                      : PMD_CYCLES_IDLE);
>          }
> 
>          if (lc++ > 1024) {
> @@ -4336,6 +4407,7 @@ dp_netdev_configure_pmd(struct
> dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
>      pmd->core_id = core_id;
>      pmd->numa_id = numa_id;
>      pmd->need_reload = false;
> +    pmd->n_output_batches = 0;
> 
>      ovs_refcount_init(&pmd->ref_cnt);
>      latch_init(&pmd->exit_latch);
> @@ -4521,6 +4593,7 @@ dp_netdev_add_port_tx_to_pmd(struct
> dp_netdev_pmd_thread *pmd,
> 
>      tx->port = port;
>      tx->qid = -1;
> +    tx->output_time = 0LL;
>      dp_packet_batch_init(&tx->output_pkts);
> 
>      hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port-
> >port_no));
> @@ -5197,14 +5270,20 @@ dp_execute_cb(void *aux_, struct
> dp_packet_batch *packets_,
>                  dp_netdev_pmd_flush_output_on_port(pmd, p, now);
>              }
>  #endif
> -
> -            if (OVS_UNLIKELY(dp_packet_batch_size(&p->output_pkts)
> -                       + dp_packet_batch_size(packets_) > NETDEV_MAX_BURST))
> {
> -                /* Some packets was generated while input batch processing.
> -                 * Flush here to avoid overflow. */
> +            if (dp_packet_batch_size(&p->output_pkts)
> +                + dp_packet_batch_size(packets_) > NETDEV_MAX_BURST) {
> +                /* Flush here to avoid overflow. */
>                  dp_netdev_pmd_flush_output_on_port(pmd, p, now);
>              }
> 
> +            if (dp_packet_batch_is_empty(&p->output_pkts)) {
> +                uint32_t cur_max_latency;
> +
> +                atomic_read_relaxed(&dp->output_max_latency,
> &cur_max_latency);
> +                p->output_time = now + cur_max_latency;
> +                pmd->n_output_batches++;

Is there a guarantee that the packets_ batch is non-empty in dp_execute_cb? 
Otherwise it may be wrong to increment n_output_batches. Add an assertion or a 
condition !dp_packet_batch_is_empty(packets_) to the if clause.

> +            }
> +
>              DP_PACKET_BATCH_FOR_EACH (packet, packets_) {
>                  dp_packet_batch_add(&p->output_pkts, packet);
>              }
> diff --git a/vswitchd/vswitch.xml b/vswitchd/vswitch.xml
> index 074535b..23930f0 100644
> --- a/vswitchd/vswitch.xml
> +++ b/vswitchd/vswitch.xml
> @@ -344,6 +344,21 @@
>          </p>
>        </column>
> 
> +      <column name="other_config" key="output-max-latency"
> +              type='{"type": "integer", "minInteger": 0, "maxInteger": 
> 1000}'>
> +        <p>
> +          Specifies the time in milliseconds that a packet can wait in output
> +          batch for sending i.e. amount of time that packet can spend in an
> +          intermediate output queue before sending to netdev.
> +          This option can be used to configure balance between throughput
> +          and latency. Lower values decreases latency while higher values
> +          may be useful to achieve higher performance.
> +        </p>
> +        <p>
> +          Defaults to 0 i.e. instant packet sending (latency optimized).
> +        </p>
> +      </column>
> +
>        <column name="other_config" key="n-handler-threads"
>                type='{"type": "integer", "minInteger": 1}'>
>          <p>
> --
> 2.7.4
> 
> _______________________________________________
> dev mailing list
> [email protected]
> https://mail.openvswitch.org/mailman/listinfo/ovs-dev
_______________________________________________
dev mailing list
[email protected]
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to