>This allows to collect packets from more than one RX burst and send them
>together with a configurable intervals.
>
>'other_config:tx-flush-interval' can be used to configure time that a packet
>can wait in output batch for sending.
>
>dpif-netdev turned to microsecond resolution for time measuring to ensure
>desired resolution of 'tx-flush-interval'.
>
>Signed-off-by: Ilya Maximets <[email protected]>
>---
> lib/dpif-netdev.c | 141
>++++++++++++++++++++++++++++++++++++++++-----------
> vswitchd/vswitch.xml | 16 ++++++
> 2 files changed, 127 insertions(+), 30 deletions(-)
>
>diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c index 166b73a..3ddb711
>100644
>--- a/lib/dpif-netdev.c
>+++ b/lib/dpif-netdev.c
>@@ -85,6 +85,9 @@ VLOG_DEFINE_THIS_MODULE(dpif_netdev);
> #define MAX_RECIRC_DEPTH 5
> DEFINE_STATIC_PER_THREAD_DATA(uint32_t, recirc_depth, 0)
>
>+/* Use instant packet send by default. */ #define
>+DEFAULT_TX_FLUSH_INTERVAL 0
>+
> /* Configuration parameters. */
> enum { MAX_FLOWS = 65536 }; /* Maximum number of flows in flow table.
>*/
> enum { MAX_METERS = 65536 }; /* Maximum number of meters. */
>@@ -178,12 +181,13 @@ struct emc_cache {
>
> /* Simple non-wildcarding single-priority classifier. */
>
>-/* Time in ms between successive optimizations of the dpcls subtable vector
>*/ -#define DPCLS_OPTIMIZATION_INTERVAL 1000
>+/* Time in microseconds between successive optimizations of the dpcls
>+ * subtable vector */
>+#define DPCLS_OPTIMIZATION_INTERVAL 1000000LL
>
>-/* Time in ms of the interval in which rxq processing cycles used in
>- * rxq to pmd assignments is measured and stored. */ -#define
>PMD_RXQ_INTERVAL_LEN 10000
>+/* Time in microseconds of the interval in which rxq processing cycles
>+used
>+ * in rxq to pmd assignments is measured and stored. */ #define
>+PMD_RXQ_INTERVAL_LEN 10000000LL
>
> /* Number of intervals for which cycles are stored
> * and used during rxq to pmd assignment. */ @@ -270,6 +274,9 @@ struct
>dp_netdev {
> struct hmap ports;
> struct seq *port_seq; /* Incremented whenever a port changes. */
>
>+ /* The time that a packet can wait in output batch for sending. */
>+ atomic_uint32_t tx_flush_interval;
>+
> /* Meters. */
> struct ovs_mutex meter_locks[N_METER_LOCKS];
> struct dp_meter *meters[MAX_METERS]; /* Meter bands. */ @@ -356,7
>+363,7 @@ enum rxq_cycles_counter_type {
> RXQ_N_CYCLES
> };
>
>-#define XPS_TIMEOUT_MS 500LL
>+#define XPS_TIMEOUT 500000LL /* In microseconds. */
>
> /* Contained by struct dp_netdev_port's 'rxqs' member. */ struct
>dp_netdev_rxq { @@ -526,6 +533,7 @@ struct tx_port {
> int qid;
> long long last_used;
> struct hmap_node node;
>+ long long flush_time;
> struct dp_packet_batch output_pkts; };
>
>@@ -614,6 +622,9 @@ struct dp_netdev_pmd_thread {
> * than 'cmap_count(dp->poll_threads)'. */
> uint32_t static_tx_qid;
>
>+ /* Number of filled output batches. */
>+ int n_output_batches;
>+
> struct ovs_mutex port_mutex; /* Mutex for 'poll_list' and 'tx_ports'.
> */
> /* List of rx queues to poll. */
> struct hmap poll_list OVS_GUARDED;
>@@ -707,8 +718,9 @@ static void dp_netdev_add_rxq_to_pmd(struct
>dp_netdev_pmd_thread *pmd, static void
>dp_netdev_del_rxq_from_pmd(struct dp_netdev_pmd_thread *pmd,
> struct rxq_poll *poll)
> OVS_REQUIRES(pmd->port_mutex);
>-static void
>-dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread
>*pmd);
>+static int
>+dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread
>*pmd,
>+ bool force);
>
> static void reconfigure_datapath(struct dp_netdev *dp)
> OVS_REQUIRES(dp->port_mutex);
>@@ -783,7 +795,7 @@ emc_cache_slow_sweep(struct emc_cache
>*flow_cache) static inline void pmd_thread_ctx_time_update(struct
>dp_netdev_pmd_thread *pmd) {
>- pmd->ctx.now = time_msec();
>+ pmd->ctx.now = time_usec();
> }
>
> /* Returns true if 'dpif' is a netdev or dummy dpif, false otherwise. */ @@ -
>1283,6 +1295,7 @@ create_dp_netdev(const char *name, const struct
>dpif_class *class,
> conntrack_init(&dp->conntrack);
>
> atomic_init(&dp->emc_insert_min, DEFAULT_EM_FLOW_INSERT_MIN);
>+ atomic_init(&dp->tx_flush_interval, DEFAULT_TX_FLUSH_INTERVAL);
>
> cmap_init(&dp->poll_threads);
>
>@@ -2950,7 +2963,7 @@ dpif_netdev_execute(struct dpif *dpif, struct
>dpif_execute *execute)
> dp_packet_batch_init_packet(&pp, execute->packet);
> dp_netdev_execute_actions(pmd, &pp, false, execute->flow,
> execute->actions, execute->actions_len);
>- dp_netdev_pmd_flush_output_packets(pmd);
>+ dp_netdev_pmd_flush_output_packets(pmd, true);
>
> if (pmd->core_id == NON_PMD_CORE_ID) {
> ovs_mutex_unlock(&dp->non_pmd_mutex);
>@@ -2999,6 +3012,16 @@ dpif_netdev_set_config(struct dpif *dpif, const
>struct smap *other_config)
> smap_get_ullong(other_config, "emc-insert-inv-prob",
> DEFAULT_EM_FLOW_INSERT_INV_PROB);
> uint32_t insert_min, cur_min;
>+ uint32_t tx_flush_interval, cur_tx_flush_interval;
>+
>+ tx_flush_interval = smap_get_int(other_config, "tx-flush-interval",
>+ DEFAULT_TX_FLUSH_INTERVAL);
>+ atomic_read_relaxed(&dp->tx_flush_interval, &cur_tx_flush_interval);
>+ if (tx_flush_interval != cur_tx_flush_interval) {
>+ atomic_store_relaxed(&dp->tx_flush_interval, tx_flush_interval);
>+ VLOG_INFO("Flushing interval for tx queues set to %"PRIu32" us",
>+ tx_flush_interval);
>+ }
>
> if (!nullable_string_is_equal(dp->pmd_cmask, cmask)) {
> free(dp->pmd_cmask);
>@@ -3237,11 +3260,12 @@ dp_netdev_rxq_get_intrvl_cycles(struct
>dp_netdev_rxq *rx, unsigned idx)
> return processing_cycles;
> }
>
>-static void
>+static int
> dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread
>*pmd,
> struct tx_port *p) {
> int tx_qid;
>+ int output_cnt;
> bool dynamic_txqs;
>
> dynamic_txqs = p->port->dynamic_txqs; @@ -3251,20 +3275,41 @@
>dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread
>*pmd,
> tx_qid = pmd->static_tx_qid;
> }
>
>+ output_cnt = dp_packet_batch_size(&p->output_pkts);
> netdev_send(p->port->netdev, tx_qid, &p->output_pkts, dynamic_txqs);
> dp_packet_batch_init(&p->output_pkts);
>+
>+ if (output_cnt) {
[BHANU] Reading the output batch size and having this extra check above seems
redundant as
we will only come here when !dp_packet_batch_is_empty() and the caller is
dp_netdev_pmd_flush_output_packets().
Even for other cases where this API is called directly (from dp_execute_cb),
the checks here
seems redundant as its checked at the caller.
- Bhanuprakash.
>+ uint32_t tx_flush_interval;
>+
>+ /* Update time of the next flush. */
>+ atomic_read_relaxed(&pmd->dp->tx_flush_interval,
>&tx_flush_interval);
>+ p->flush_time = pmd->ctx.now + tx_flush_interval;
>+
>+ ovs_assert(pmd->n_output_batches > 0);
>+ pmd->n_output_batches--;
>+ }
>+ return output_cnt;
> }
>
>-static void
>-dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread
>*pmd)
>+static int
>+dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread
>*pmd,
>+ bool force)
> {
> struct tx_port *p;
>+ int output_cnt = 0;
>+
>+ if (!pmd->n_output_batches) {
>+ return 0;
>+ }
>
> HMAP_FOR_EACH (p, node, &pmd->send_port_cache) {
>- if (!dp_packet_batch_is_empty(&p->output_pkts)) {
>- dp_netdev_pmd_flush_output_on_port(pmd, p);
>+ if (!dp_packet_batch_is_empty(&p->output_pkts)
>+ && (force || pmd->ctx.now >= p->flush_time)) {
>+ output_cnt += dp_netdev_pmd_flush_output_on_port(pmd, p);
> }
> }
>+ return output_cnt;
> }
>
> static int
>@@ -3274,7 +3319,7 @@ dp_netdev_process_rxq_port(struct
>dp_netdev_pmd_thread *pmd, {
> struct dp_packet_batch batch;
> int error;
>- int batch_cnt = 0;
>+ int batch_cnt = 0, output_cnt = 0;
>
> dp_packet_batch_init(&batch);
> error = netdev_rxq_recv(rx, &batch); @@ -3284,7 +3329,7 @@
>dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
>
> batch_cnt = batch.count;
> dp_netdev_input(pmd, &batch, port_no);
>- dp_netdev_pmd_flush_output_packets(pmd);
>+ output_cnt = dp_netdev_pmd_flush_output_packets(pmd, false);
> } else if (error != EAGAIN && error != EOPNOTSUPP) {
> static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
>
>@@ -3292,7 +3337,7 @@ dp_netdev_process_rxq_port(struct
>dp_netdev_pmd_thread *pmd,
> netdev_rxq_get_name(rx), ovs_strerror(error));
> }
>
>- return batch_cnt;
>+ return batch_cnt + output_cnt;
> }
>
> static struct tx_port *
>@@ -3904,7 +3949,8 @@ dpif_netdev_run(struct dpif *dpif)
> struct dp_netdev *dp = get_dp_netdev(dpif);
> struct dp_netdev_pmd_thread *non_pmd;
> uint64_t new_tnl_seq;
>- int process_packets = 0;
>+ int process_packets;
>+ bool need_to_flush = true;
>
> ovs_mutex_lock(&dp->port_mutex);
> non_pmd = dp_netdev_get_pmd(dp, NON_PMD_CORE_ID); @@ -3924,9
>+3970,22 @@ dpif_netdev_run(struct dpif *dpif)
> process_packets
> ? PMD_CYCLES_PROCESSING
> : PMD_CYCLES_IDLE);
>+ if (process_packets) {
>+ need_to_flush = false;
>+ }
> }
> }
> }
>+ if (need_to_flush) {
>+ /* We didn't receive anything in the process loop.
>+ * Check if we need to send something. */
>+ process_packets = dp_netdev_pmd_flush_output_packets(non_pmd,
>+ false);
>+ cycles_count_intermediate(non_pmd, NULL, process_packets
>+ ? PMD_CYCLES_PROCESSING
>+ : PMD_CYCLES_IDLE);
>+ }
>+
> cycles_count_end(non_pmd, PMD_CYCLES_IDLE);
> pmd_thread_ctx_time_update(non_pmd);
> dpif_netdev_xps_revalidate_pmd(non_pmd, false); @@ -3979,6 +4038,8
>@@ pmd_free_cached_ports(struct dp_netdev_pmd_thread *pmd) {
> struct tx_port *tx_port_cached;
>
>+ /* Flush all the queued packets. */
>+ dp_netdev_pmd_flush_output_packets(pmd, true);
> /* Free all used tx queue ids. */
> dpif_netdev_xps_revalidate_pmd(pmd, true);
>
>@@ -4077,7 +4138,6 @@ pmd_thread_main(void *f_)
> bool exiting;
> int poll_cnt;
> int i;
>- int process_packets = 0;
>
> poll_list = NULL;
>
>@@ -4107,6 +4167,9 @@ reload:
>
> cycles_count_start(pmd);
> for (;;) {
>+ int process_packets;
>+ bool need_to_flush = true;
>+
> for (i = 0; i < poll_cnt; i++) {
> process_packets =
> dp_netdev_process_rxq_port(pmd, poll_list[i].rxq->rx, @@
> -4114,6
>+4177,18 @@ reload:
> cycles_count_intermediate(pmd, poll_list[i].rxq,
> process_packets ? PMD_CYCLES_PROCESSING
> : PMD_CYCLES_IDLE);
>+ if (process_packets) {
>+ need_to_flush = false;
>+ }
>+ }
>+
>+ if (need_to_flush) {
>+ /* We didn't receive anything in the process loop.
>+ * Check if we need to send something. */
>+ process_packets = dp_netdev_pmd_flush_output_packets(pmd,
>false);
>+ cycles_count_intermediate(pmd, NULL,
>+ process_packets ? PMD_CYCLES_PROCESSING
>+ :
>+ PMD_CYCLES_IDLE);
> }
>
> if (lc++ > 1024) {
>@@ -4207,7 +4282,7 @@ dp_netdev_run_meter(struct dp_netdev *dp,
>struct dp_packet_batch *packets_,
> memset(exceeded_rate, 0, cnt * sizeof *exceeded_rate);
>
> /* All packets will hit the meter at the same time. */
>- long_delta_t = (now - meter->used); /* msec */
>+ long_delta_t = (now - meter->used) / 1000; /* msec */
>
> /* Make sure delta_t will not be too large, so that bucket will not
> * wrap around below. */
>@@ -4363,7 +4438,7 @@ dpif_netdev_meter_set(struct dpif *dpif,
>ofproto_meter_id *meter_id,
> meter->flags = config->flags;
> meter->n_bands = config->n_bands;
> meter->max_delta_t = 0;
>- meter->used = time_msec();
>+ meter->used = time_usec();
>
> /* set up bands */
> for (i = 0; i < config->n_bands; ++i) { @@ -4561,6 +4636,7 @@
>dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct
>dp_netdev *dp,
> pmd->core_id = core_id;
> pmd->numa_id = numa_id;
> pmd->need_reload = false;
>+ pmd->n_output_batches = 0;
>
> ovs_refcount_init(&pmd->ref_cnt);
> latch_init(&pmd->exit_latch);
>@@ -4748,6 +4824,7 @@ dp_netdev_add_port_tx_to_pmd(struct
>dp_netdev_pmd_thread *pmd,
>
> tx->port = port;
> tx->qid = -1;
>+ tx->flush_time = 0LL;
> dp_packet_batch_init(&tx->output_pkts);
>
> hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port-
>>port_no)); @@ -4911,7 +4988,7 @@ packet_batch_per_flow_execute(struct
>packet_batch_per_flow *batch,
> struct dp_netdev_flow *flow = batch->flow;
>
> dp_netdev_flow_used(flow, batch->array.count, batch->byte_count,
>- batch->tcp_flags, pmd->ctx.now);
>+ batch->tcp_flags, pmd->ctx.now / 1000);
>
> actions = dp_netdev_flow_get_actions(flow);
>
>@@ -5286,7 +5363,7 @@ dpif_netdev_xps_revalidate_pmd(const struct
>dp_netdev_pmd_thread *pmd,
> continue;
> }
> interval = pmd->ctx.now - tx->last_used;
>- if (tx->qid >= 0 && (purge || interval >= XPS_TIMEOUT_MS)) {
>+ if (tx->qid >= 0 && (purge || interval >= XPS_TIMEOUT)) {
> port = tx->port;
> ovs_mutex_lock(&port->txq_used_mutex);
> port->txq_used[tx->qid]--;
>@@ -5307,7 +5384,7 @@ dpif_netdev_xps_get_tx_qid(const struct
>dp_netdev_pmd_thread *pmd,
> interval = pmd->ctx.now - tx->last_used;
> tx->last_used = pmd->ctx.now;
>
>- if (OVS_LIKELY(tx->qid >= 0 && interval < XPS_TIMEOUT_MS)) {
>+ if (OVS_LIKELY(tx->qid >= 0 && interval < XPS_TIMEOUT)) {
> return tx->qid;
> }
>
>@@ -5439,12 +5516,16 @@ dp_execute_cb(void *aux_, struct
>dp_packet_batch *packets_,
> dp_netdev_pmd_flush_output_on_port(pmd, p);
> }
> #endif
>- if (OVS_UNLIKELY(dp_packet_batch_size(&p->output_pkts)
>- + dp_packet_batch_size(packets_) > NETDEV_MAX_BURST)) {
>- /* Some packets was generated while input batch processing.
>- * Flush here to avoid overflow. */
>+ if (dp_packet_batch_size(&p->output_pkts)
>+ + dp_packet_batch_size(packets_) > NETDEV_MAX_BURST) {
>+ /* Flush here to avoid overflow. */
> dp_netdev_pmd_flush_output_on_port(pmd, p);
> }
>+
>+ if (dp_packet_batch_is_empty(&p->output_pkts)) {
>+ pmd->n_output_batches++;
>+ }
>+
> DP_PACKET_BATCH_FOR_EACH (packet, packets_) {
> dp_packet_batch_add(&p->output_pkts, packet);
> }
>@@ -5685,7 +5766,7 @@ dp_execute_cb(void *aux_, struct dp_packet_batch
>*packets_,
>
> conntrack_execute(&dp->conntrack, packets_, aux->flow->dl_type,
>force,
> commit, zone, setmark, setlabel, helper,
>- nat_action_info_ref, pmd->ctx.now);
>+ nat_action_info_ref, pmd->ctx.now / 1000);
> break;
> }
>
>diff --git a/vswitchd/vswitch.xml b/vswitchd/vswitch.xml index
>074535b..b13b0fa 100644
>--- a/vswitchd/vswitch.xml
>+++ b/vswitchd/vswitch.xml
>@@ -344,6 +344,22 @@
> </p>
> </column>
>
>+ <column name="other_config" key="tx-flush-interval"
>+ type='{"type": "integer",
>+ "minInteger": 0, "maxInteger": 1000000}'>
>+ <p>
>+ Specifies the time in microseconds that a packet can wait in output
>+ batch for sending i.e. amount of time that packet can spend in an
>+ intermediate output queue before sending to netdev.
>+ This option can be used to configure balance between throughput
>+ and latency. Lower values decreases latency while higher values
>+ may be useful to achieve higher performance.
>+ </p>
>+ <p>
>+ Defaults to 0 i.e. instant packet sending (latency optimized).
>+ </p>
>+ </column>
>+
> <column name="other_config" key="n-handler-threads"
> type='{"type": "integer", "minInteger": 1}'>
> <p>
>--
>2.7.4
_______________________________________________
dev mailing list
[email protected]
https://mail.openvswitch.org/mailman/listinfo/ovs-dev