This allows to collect packets from more than one RX burst and send them together with a configurable intervals.
'other_config:tx-flush-interval' can be used to configure time that a packet can wait in output batch for sending. dpif-netdev turned to microsecond resolution for time measuring to ensure desired resolution of 'tx-flush-interval'. Acked-by: Eelco Chaudron <[email protected]> Signed-off-by: Ilya Maximets <[email protected]> --- lib/dpif-netdev.c | 149 +++++++++++++++++++++++++++++++++++++++------------ vswitchd/vswitch.xml | 16 ++++++ 2 files changed, 131 insertions(+), 34 deletions(-) diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c index d7f6171..f5a7793 100644 --- a/lib/dpif-netdev.c +++ b/lib/dpif-netdev.c @@ -85,6 +85,9 @@ VLOG_DEFINE_THIS_MODULE(dpif_netdev); #define MAX_RECIRC_DEPTH 6 DEFINE_STATIC_PER_THREAD_DATA(uint32_t, recirc_depth, 0) +/* Use instant packet send by default. */ +#define DEFAULT_TX_FLUSH_INTERVAL 0 + /* Configuration parameters. */ enum { MAX_FLOWS = 65536 }; /* Maximum number of flows in flow table. */ enum { MAX_METERS = 65536 }; /* Maximum number of meters. */ @@ -178,12 +181,13 @@ struct emc_cache { /* Simple non-wildcarding single-priority classifier. */ -/* Time in ms between successive optimizations of the dpcls subtable vector */ -#define DPCLS_OPTIMIZATION_INTERVAL 1000 +/* Time in microseconds between successive optimizations of the dpcls + * subtable vector */ +#define DPCLS_OPTIMIZATION_INTERVAL 1000000LL -/* Time in ms of the interval in which rxq processing cycles used in - * rxq to pmd assignments is measured and stored. */ -#define PMD_RXQ_INTERVAL_LEN 10000 +/* Time in microseconds of the interval in which rxq processing cycles used + * in rxq to pmd assignments is measured and stored. */ +#define PMD_RXQ_INTERVAL_LEN 10000000LL /* Number of intervals for which cycles are stored * and used during rxq to pmd assignment. */ @@ -270,6 +274,9 @@ struct dp_netdev { struct hmap ports; struct seq *port_seq; /* Incremented whenever a port changes. */ + /* The time that a packet can wait in output batch for sending. */ + atomic_uint32_t tx_flush_interval; + /* Meters. */ struct ovs_mutex meter_locks[N_METER_LOCKS]; struct dp_meter *meters[MAX_METERS]; /* Meter bands. */ @@ -356,7 +363,7 @@ enum rxq_cycles_counter_type { RXQ_N_CYCLES }; -#define XPS_TIMEOUT_MS 500LL +#define XPS_TIMEOUT 500000LL /* In microseconds. */ /* Contained by struct dp_netdev_port's 'rxqs' member. */ struct dp_netdev_rxq { @@ -526,6 +533,7 @@ struct tx_port { int qid; long long last_used; struct hmap_node node; + long long flush_time; struct dp_packet_batch output_pkts; }; @@ -612,6 +620,9 @@ struct dp_netdev_pmd_thread { * than 'cmap_count(dp->poll_threads)'. */ uint32_t static_tx_qid; + /* Number of filled output batches. */ + int n_output_batches; + struct ovs_mutex port_mutex; /* Mutex for 'poll_list' and 'tx_ports'. */ /* List of rx queues to poll. */ struct hmap poll_list OVS_GUARDED; @@ -705,8 +716,9 @@ static void dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd, static void dp_netdev_del_rxq_from_pmd(struct dp_netdev_pmd_thread *pmd, struct rxq_poll *poll) OVS_REQUIRES(pmd->port_mutex); -static void -dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread *pmd); +static int +dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread *pmd, + bool force); static void reconfigure_datapath(struct dp_netdev *dp) OVS_REQUIRES(dp->port_mutex); @@ -797,7 +809,7 @@ emc_cache_slow_sweep(struct emc_cache *flow_cache) static inline void pmd_thread_ctx_time_update(struct dp_netdev_pmd_thread *pmd) { - pmd->ctx.now = time_msec(); + pmd->ctx.now = time_usec(); } /* Returns true if 'dpif' is a netdev or dummy dpif, false otherwise. */ @@ -1297,6 +1309,7 @@ create_dp_netdev(const char *name, const struct dpif_class *class, conntrack_init(&dp->conntrack); atomic_init(&dp->emc_insert_min, DEFAULT_EM_FLOW_INSERT_MIN); + atomic_init(&dp->tx_flush_interval, DEFAULT_TX_FLUSH_INTERVAL); cmap_init(&dp->poll_threads); @@ -2967,7 +2980,7 @@ dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute) dp_packet_batch_init_packet(&pp, execute->packet); dp_netdev_execute_actions(pmd, &pp, false, execute->flow, execute->actions, execute->actions_len); - dp_netdev_pmd_flush_output_packets(pmd); + dp_netdev_pmd_flush_output_packets(pmd, true); if (pmd->core_id == NON_PMD_CORE_ID) { ovs_mutex_unlock(&dp->non_pmd_mutex); @@ -3016,6 +3029,16 @@ dpif_netdev_set_config(struct dpif *dpif, const struct smap *other_config) smap_get_ullong(other_config, "emc-insert-inv-prob", DEFAULT_EM_FLOW_INSERT_INV_PROB); uint32_t insert_min, cur_min; + uint32_t tx_flush_interval, cur_tx_flush_interval; + + tx_flush_interval = smap_get_int(other_config, "tx-flush-interval", + DEFAULT_TX_FLUSH_INTERVAL); + atomic_read_relaxed(&dp->tx_flush_interval, &cur_tx_flush_interval); + if (tx_flush_interval != cur_tx_flush_interval) { + atomic_store_relaxed(&dp->tx_flush_interval, tx_flush_interval); + VLOG_INFO("Flushing interval for tx queues set to %"PRIu32" us", + tx_flush_interval); + } if (!nullable_string_is_equal(dp->pmd_cmask, cmask)) { free(dp->pmd_cmask); @@ -3254,12 +3277,14 @@ dp_netdev_rxq_get_intrvl_cycles(struct dp_netdev_rxq *rx, unsigned idx) return processing_cycles; } -static void +static int dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread *pmd, struct tx_port *p) { int tx_qid; + int output_cnt; bool dynamic_txqs; + uint32_t tx_flush_interval; dynamic_txqs = p->port->dynamic_txqs; if (dynamic_txqs) { @@ -3268,20 +3293,40 @@ dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread *pmd, tx_qid = pmd->static_tx_qid; } + output_cnt = dp_packet_batch_size(&p->output_pkts); + ovs_assert(output_cnt > 0); + netdev_send(p->port->netdev, tx_qid, &p->output_pkts, dynamic_txqs); dp_packet_batch_init(&p->output_pkts); + + /* Update time of the next flush. */ + atomic_read_relaxed(&pmd->dp->tx_flush_interval, &tx_flush_interval); + p->flush_time = pmd->ctx.now + tx_flush_interval; + + ovs_assert(pmd->n_output_batches > 0); + pmd->n_output_batches--; + + return output_cnt; } -static void -dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread *pmd) +static int +dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread *pmd, + bool force) { struct tx_port *p; + int output_cnt = 0; + + if (!pmd->n_output_batches) { + return 0; + } HMAP_FOR_EACH (p, node, &pmd->send_port_cache) { - if (!dp_packet_batch_is_empty(&p->output_pkts)) { - dp_netdev_pmd_flush_output_on_port(pmd, p); + if (!dp_packet_batch_is_empty(&p->output_pkts) + && (force || pmd->ctx.now >= p->flush_time)) { + output_cnt += dp_netdev_pmd_flush_output_on_port(pmd, p); } } + return output_cnt; } static int @@ -3291,7 +3336,7 @@ dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd, { struct dp_packet_batch batch; int error; - int batch_cnt = 0; + int batch_cnt = 0, output_cnt = 0; dp_packet_batch_init(&batch); error = netdev_rxq_recv(rx, &batch); @@ -3301,7 +3346,7 @@ dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd, batch_cnt = batch.count; dp_netdev_input(pmd, &batch, port_no); - dp_netdev_pmd_flush_output_packets(pmd); + output_cnt = dp_netdev_pmd_flush_output_packets(pmd, false); } else if (error != EAGAIN && error != EOPNOTSUPP) { static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5); @@ -3309,7 +3354,7 @@ dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd, netdev_rxq_get_name(rx), ovs_strerror(error)); } - return batch_cnt; + return batch_cnt + output_cnt; } static struct tx_port * @@ -3932,7 +3977,8 @@ dpif_netdev_run(struct dpif *dpif) struct dp_netdev *dp = get_dp_netdev(dpif); struct dp_netdev_pmd_thread *non_pmd; uint64_t new_tnl_seq; - int process_packets = 0; + int process_packets; + bool need_to_flush = true; ovs_mutex_lock(&dp->port_mutex); non_pmd = dp_netdev_get_pmd(dp, NON_PMD_CORE_ID); @@ -3952,11 +3998,25 @@ dpif_netdev_run(struct dpif *dpif) process_packets ? PMD_CYCLES_PROCESSING : PMD_CYCLES_IDLE); + if (process_packets) { + need_to_flush = false; + } } } } + if (need_to_flush) { + /* We didn't receive anything in the process loop. + * Check if we need to send something. + * There was no time updates on current iteration. */ + pmd_thread_ctx_time_update(non_pmd); + process_packets = dp_netdev_pmd_flush_output_packets(non_pmd, + false); + cycles_count_intermediate(non_pmd, NULL, process_packets + ? PMD_CYCLES_PROCESSING + : PMD_CYCLES_IDLE); + } + cycles_count_end(non_pmd, PMD_CYCLES_IDLE); - pmd_thread_ctx_time_update(non_pmd); dpif_netdev_xps_revalidate_pmd(non_pmd, false); ovs_mutex_unlock(&dp->non_pmd_mutex); @@ -4007,6 +4067,8 @@ pmd_free_cached_ports(struct dp_netdev_pmd_thread *pmd) { struct tx_port *tx_port_cached; + /* Flush all the queued packets. */ + dp_netdev_pmd_flush_output_packets(pmd, true); /* Free all used tx queue ids. */ dpif_netdev_xps_revalidate_pmd(pmd, true); @@ -4105,7 +4167,6 @@ pmd_thread_main(void *f_) bool exiting; int poll_cnt; int i; - int process_packets = 0; poll_list = NULL; @@ -4135,6 +4196,9 @@ reload: cycles_count_start(pmd); for (;;) { + int process_packets; + bool need_to_flush = true; + for (i = 0; i < poll_cnt; i++) { process_packets = dp_netdev_process_rxq_port(pmd, poll_list[i].rxq->rx, @@ -4142,6 +4206,20 @@ reload: cycles_count_intermediate(pmd, poll_list[i].rxq, process_packets ? PMD_CYCLES_PROCESSING : PMD_CYCLES_IDLE); + if (process_packets) { + need_to_flush = false; + } + } + + if (need_to_flush) { + /* We didn't receive anything in the process loop. + * Check if we need to send something. + * There was no time updates on current iteration. */ + pmd_thread_ctx_time_update(pmd); + process_packets = dp_netdev_pmd_flush_output_packets(pmd, false); + cycles_count_intermediate(pmd, NULL, + process_packets ? PMD_CYCLES_PROCESSING + : PMD_CYCLES_IDLE); } if (lc++ > 1024) { @@ -4150,9 +4228,6 @@ reload: lc = 0; coverage_try_clear(); - /* It's possible that the time was not updated on current - * iteration, if there were no received packets. */ - pmd_thread_ctx_time_update(pmd); dp_netdev_pmd_try_optimize(pmd, poll_list, poll_cnt); if (!ovsrcu_try_quiesce()) { emc_cache_slow_sweep(&pmd->flow_cache); @@ -4238,7 +4313,7 @@ dp_netdev_run_meter(struct dp_netdev *dp, struct dp_packet_batch *packets_, memset(exceeded_rate, 0, cnt * sizeof *exceeded_rate); /* All packets will hit the meter at the same time. */ - long_delta_t = (now - meter->used); /* msec */ + long_delta_t = (now - meter->used) / 1000; /* msec */ /* Make sure delta_t will not be too large, so that bucket will not * wrap around below. */ @@ -4394,7 +4469,7 @@ dpif_netdev_meter_set(struct dpif *dpif, ofproto_meter_id *meter_id, meter->flags = config->flags; meter->n_bands = config->n_bands; meter->max_delta_t = 0; - meter->used = time_msec(); + meter->used = time_usec(); /* set up bands */ for (i = 0; i < config->n_bands; ++i) { @@ -4592,6 +4667,7 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp, pmd->core_id = core_id; pmd->numa_id = numa_id; pmd->need_reload = false; + pmd->n_output_batches = 0; ovs_refcount_init(&pmd->ref_cnt); latch_init(&pmd->exit_latch); @@ -4779,6 +4855,7 @@ dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd, tx->port = port; tx->qid = -1; + tx->flush_time = 0LL; dp_packet_batch_init(&tx->output_pkts); hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port->port_no)); @@ -4942,7 +5019,7 @@ packet_batch_per_flow_execute(struct packet_batch_per_flow *batch, struct dp_netdev_flow *flow = batch->flow; dp_netdev_flow_used(flow, batch->array.count, batch->byte_count, - batch->tcp_flags, pmd->ctx.now); + batch->tcp_flags, pmd->ctx.now / 1000); actions = dp_netdev_flow_get_actions(flow); @@ -5317,7 +5394,7 @@ dpif_netdev_xps_revalidate_pmd(const struct dp_netdev_pmd_thread *pmd, continue; } interval = pmd->ctx.now - tx->last_used; - if (tx->qid >= 0 && (purge || interval >= XPS_TIMEOUT_MS)) { + if (tx->qid >= 0 && (purge || interval >= XPS_TIMEOUT)) { port = tx->port; ovs_mutex_lock(&port->txq_used_mutex); port->txq_used[tx->qid]--; @@ -5338,7 +5415,7 @@ dpif_netdev_xps_get_tx_qid(const struct dp_netdev_pmd_thread *pmd, interval = pmd->ctx.now - tx->last_used; tx->last_used = pmd->ctx.now; - if (OVS_LIKELY(tx->qid >= 0 && interval < XPS_TIMEOUT_MS)) { + if (OVS_LIKELY(tx->qid >= 0 && interval < XPS_TIMEOUT)) { return tx->qid; } @@ -5470,12 +5547,16 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_, dp_netdev_pmd_flush_output_on_port(pmd, p); } #endif - if (OVS_UNLIKELY(dp_packet_batch_size(&p->output_pkts) - + dp_packet_batch_size(packets_) > NETDEV_MAX_BURST)) { - /* Some packets was generated while input batch processing. - * Flush here to avoid overflow. */ + if (dp_packet_batch_size(&p->output_pkts) + + dp_packet_batch_size(packets_) > NETDEV_MAX_BURST) { + /* Flush here to avoid overflow. */ dp_netdev_pmd_flush_output_on_port(pmd, p); } + + if (dp_packet_batch_is_empty(&p->output_pkts)) { + pmd->n_output_batches++; + } + DP_PACKET_BATCH_FOR_EACH (packet, packets_) { dp_packet_batch_add(&p->output_pkts, packet); } @@ -5717,7 +5798,7 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_, conntrack_execute(&dp->conntrack, packets_, aux->flow->dl_type, force, commit, zone, setmark, setlabel, aux->flow->tp_src, aux->flow->tp_dst, helper, nat_action_info_ref, - pmd->ctx.now); + pmd->ctx.now / 1000); break; } diff --git a/vswitchd/vswitch.xml b/vswitchd/vswitch.xml index 21ffaf5..bc6b1be 100644 --- a/vswitchd/vswitch.xml +++ b/vswitchd/vswitch.xml @@ -359,6 +359,22 @@ </p> </column> + <column name="other_config" key="tx-flush-interval" + type='{"type": "integer", + "minInteger": 0, "maxInteger": 1000000}'> + <p> + Specifies the time in microseconds that a packet can wait in output + batch for sending i.e. amount of time that packet can spend in an + intermediate output queue before sending to netdev. + This option can be used to configure balance between throughput + and latency. Lower values decreases latency while higher values + may be useful to achieve higher performance. + </p> + <p> + Defaults to 0 i.e. instant packet sending (latency optimized). + </p> + </column> + <column name="other_config" key="n-handler-threads" type='{"type": "integer", "minInteger": 1}'> <p> -- 2.7.4 _______________________________________________ dev mailing list [email protected] https://mail.openvswitch.org/mailman/listinfo/ovs-dev
