This patch implements the intermediate Tx queues on 'dpdk' type ports. Test results: * In worst case scenario with fewer packets per batch, a significant bottleneck is observed for netdev_dpdk_eth_send() function due to expensive MMIO writes.
* Also its observed that CPI(cycles per instruction) Rate for the function stood between 3.15 and 4.1 which is significantly higher than acceptable limit of 1.0 for HPC applications and theoretical limit of 0.25 (As Backend pipeline can retire 4 micro-operations in a cycle). * With this patch, CPI for netdev_dpdk_eth_send() is at 0.55 and the overall throughput improved significantly. Signed-off-by: Antonio Fischetti <[email protected]> Signed-off-by: Bhanuprakash Bodireddy <[email protected]> Co-authored-by: Bhanuprakash Bodireddy <[email protected]> Signed-off-by: Markus Magnusson <[email protected]> Co-authored-by: Markus Magnusson <[email protected]> --- lib/dpif-netdev.c | 53 +++++++++++++++++++++++++++++++-- lib/netdev-bsd.c | 1 + lib/netdev-dpdk.c | 82 ++++++++++++++++++++++++++++++++++++++++++++++----- lib/netdev-dummy.c | 1 + lib/netdev-linux.c | 1 + lib/netdev-provider.h | 8 +++++ lib/netdev-vport.c | 3 +- lib/netdev.c | 9 ++++++ lib/netdev.h | 1 + 9 files changed, 149 insertions(+), 10 deletions(-) diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c index 3901129..58ac429 100644 --- a/lib/dpif-netdev.c +++ b/lib/dpif-netdev.c @@ -289,6 +289,8 @@ struct dp_netdev_rxq { struct dp_netdev_pmd_thread *pmd; /* pmd thread that will poll this queue. */ }; +#define LAST_USED_QID_NONE -1 + /* A port in a netdev-based datapath. */ struct dp_netdev_port { odp_port_t port_no; @@ -303,6 +305,8 @@ struct dp_netdev_port { char *type; /* Port type as requested by user. */ char *rxq_affinity_list; /* Requested affinity of rx queues. */ bool need_reconfigure; /* True if we should reconfigure netdev. */ + int last_used_qid; /* Last queue id where packets could be + enqueued. */ }; /* Contained by struct dp_netdev_flow's 'stats' member. */ @@ -619,6 +623,9 @@ static int dpif_netdev_xps_get_tx_qid(const struct dp_netdev_pmd_thread *pmd, static inline bool emc_entry_alive(struct emc_entry *ce); static void emc_clear_entry(struct emc_entry *ce); +static struct tx_port *pmd_send_port_cache_lookup +(const struct dp_netdev_pmd_thread *pmd, odp_port_t port_no); + static void emc_cache_init(struct emc_cache *flow_cache) { @@ -3507,15 +3514,19 @@ pmd_load_queues_and_ports(struct dp_netdev_pmd_thread *pmd, return i; } +enum { DRAIN_TSC = 20000ULL }; + static void * pmd_thread_main(void *f_) { struct dp_netdev_pmd_thread *pmd = f_; - unsigned int lc = 0; + unsigned int lc = 0, lc_drain = 0; struct polled_queue *poll_list; bool exiting; int poll_cnt; int i; + uint64_t prev = 0, now = 0; + struct tx_port *tx_port; poll_list = NULL; @@ -3548,6 +3559,26 @@ reload: poll_list[i].port_no); } +#define MAX_LOOP_TO_DRAIN 128 + if (lc_drain++ > MAX_LOOP_TO_DRAIN) { + lc_drain = 0; + prev = now; + now = pmd->last_cycles; + if ((now - prev) > DRAIN_TSC) { + HMAP_FOR_EACH (tx_port, node, &pmd->tx_ports) { + if (tx_port->port->last_used_qid != LAST_USED_QID_NONE) { + /* This queue may contain some buffered packets waiting + * to be sent out. */ + netdev_txq_drain(tx_port->port->netdev, + tx_port->port->last_used_qid, + tx_port->port->dynamic_txqs); + /* Mark it as empty. */ + tx_port->port->last_used_qid = LAST_USED_QID_NONE; + } + } + } + } + if (lc++ > 1024) { bool reload; @@ -3883,6 +3914,7 @@ dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd, tx->port = port; tx->qid = -1; + port->last_used_qid = LAST_USED_QID_NONE; hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port->port_no)); pmd->need_reload = true; @@ -4538,7 +4570,24 @@ dp_execute_cb(void *aux_, struct dp_packet_batch *packets_, } else { tx_qid = pmd->static_tx_qid; } +//TODO Add UNLIKELY to the 1st condition? + /* Is the current qid the same as the last one we used? */ + if ((p->port->last_used_qid != LAST_USED_QID_NONE) && + (p->port->last_used_qid != tx_qid)) { + /* The current assigned queue was changed, we need to drain + * packets from the previous queue. */ + netdev_txq_drain(p->port->netdev, + p->port->last_used_qid, + p->port->dynamic_txqs); + p->port->last_used_qid = LAST_USED_QID_NONE; + } + /* In case these packets gets buffered into an intermediate + * queue and XPS is enabled the drain function could find a + * different Tx qid assigned to its thread. We keep track + * of the qid we're now using, that will trigger the drain + * function and will select the right queue to flush. */ + p->port->last_used_qid = tx_qid; netdev_send(p->port->netdev, tx_qid, packets_, may_steal, dynamic_txqs); return; @@ -4952,7 +5001,7 @@ dpif_dummy_register(enum dummy_level level) "dp port new-number", 3, 3, dpif_dummy_change_port_number, NULL); } - + /* Datapath Classifier. */ /* A set of rules that all have the same fields wildcarded. */ diff --git a/lib/netdev-bsd.c b/lib/netdev-bsd.c index 94c515d..00d5263 100644 --- a/lib/netdev-bsd.c +++ b/lib/netdev-bsd.c @@ -1547,6 +1547,7 @@ netdev_bsd_update_flags(struct netdev *netdev_, enum netdev_flags off, netdev_bsd_rxq_recv, \ netdev_bsd_rxq_wait, \ netdev_bsd_rxq_drain, \ + NULL, \ } const struct netdev_class netdev_bsd_class = diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c index 94568a1..d560bf6 100644 --- a/lib/netdev-dpdk.c +++ b/lib/netdev-dpdk.c @@ -166,7 +166,6 @@ static const struct rte_eth_conf port_conf = { enum { DPDK_RING_SIZE = 256 }; BUILD_ASSERT_DECL(IS_POW2(DPDK_RING_SIZE)); -enum { DRAIN_TSC = 200000ULL }; enum dpdk_dev_type { DPDK_DEV_ETH = 0, @@ -289,12 +288,18 @@ struct dpdk_mp { /* There should be one 'struct dpdk_tx_queue' created for * each cpu core. */ struct dpdk_tx_queue { + int count; /* Number of buffered packets waiting to + be sent. */ rte_spinlock_t tx_lock; /* Protects the members and the NIC queue * from concurrent access. It is used only * if the queue is shared among different * pmd threads (see 'concurrent_txq'). */ int map; /* Mapping of configured vhost-user queues * to enabled by guest. */ + struct rte_mbuf *burst_pkts[NETDEV_MAX_BURST]; + /* Intermediate queues where packets can + * be buffered to amortize the cost of MMIO + * writes. */ }; /* dpdk has no way to remove dpdk ring ethernet devices @@ -1381,6 +1386,7 @@ static inline int netdev_dpdk_eth_tx_burst(struct netdev_dpdk *dev, int qid, struct rte_mbuf **pkts, int cnt) { + struct dpdk_tx_queue *txq = &dev->tx_q[qid]; uint32_t nb_tx = 0; while (nb_tx != cnt) { @@ -1404,6 +1410,8 @@ netdev_dpdk_eth_tx_burst(struct netdev_dpdk *dev, int qid, } } + txq->count = cnt - nb_tx; + return cnt - nb_tx; } @@ -1788,6 +1796,37 @@ dpdk_do_tx_copy(struct netdev *netdev, int qid, struct dp_packet_batch *batch) } } +/* Enqueue packets in an intermediate queue and call the burst + * function when the queue is full. This way we can amortize the + * cost of MMIO writes. */ +static inline int +netdev_dpdk_eth_tx_queue(struct netdev_dpdk *dev, int qid, + struct rte_mbuf **pkts, int cnt) +{ + struct dpdk_tx_queue *txq = &dev->tx_q[qid]; + + int i = 0; + int dropped = 0; + + while (i < cnt) { + int freeslots = NETDEV_MAX_BURST - txq->count; + int tocopy = MIN(freeslots, cnt-i); + + memcpy(&txq->burst_pkts[txq->count], &pkts[i], + tocopy * sizeof (struct rte_mbuf *)); + + txq->count += tocopy; + i += tocopy; + + /* Queue full, burst the packets */ + if (txq->count >= NETDEV_MAX_BURST) { + dropped += netdev_dpdk_eth_tx_burst(dev, qid, txq->burst_pkts, + txq->count); + } + } + return dropped; +} + static int netdev_dpdk_vhost_send(struct netdev *netdev, int qid, struct dp_packet_batch *batch, @@ -1836,7 +1875,7 @@ netdev_dpdk_send__(struct netdev_dpdk *dev, int qid, cnt = netdev_dpdk_qos_run(dev, pkts, cnt); dropped = batch->count - cnt; - dropped += netdev_dpdk_eth_tx_burst(dev, qid, pkts, cnt); + dropped += netdev_dpdk_eth_tx_queue(dev, qid, pkts, cnt); if (OVS_UNLIKELY(dropped)) { rte_spinlock_lock(&dev->stats_lock); @@ -1850,6 +1889,30 @@ netdev_dpdk_send__(struct netdev_dpdk *dev, int qid, } } +/* Drain tx queues, this is called periodically to empty the + * intermediate queue in case of few packets (< NETDEV_MAX_BURST) + * are buffered into the queue. */ +static int +netdev_dpdk_txq_drain(struct netdev *netdev, int qid, bool concurrent_txq) +{ + struct netdev_dpdk *dev = netdev_dpdk_cast(netdev); + struct dpdk_tx_queue *txq = &dev->tx_q[qid]; + + if (OVS_LIKELY(txq->count)) { + if (OVS_UNLIKELY(concurrent_txq)) { + qid = qid % dev->up.n_txq; // TODO: do we need this? + rte_spinlock_lock(&dev->tx_q[qid].tx_lock); + } + + netdev_dpdk_eth_tx_burst(dev, qid, txq->burst_pkts, txq->count); + + if (OVS_UNLIKELY(concurrent_txq)) { + rte_spinlock_unlock(&dev->tx_q[qid].tx_lock); + } + } + return 0; +} + static int netdev_dpdk_eth_send(struct netdev *netdev, int qid, struct dp_packet_batch *batch, bool may_steal, @@ -3243,7 +3306,7 @@ unlock: SET_CONFIG, SET_TX_MULTIQ, SEND, \ GET_CARRIER, GET_STATS, \ GET_FEATURES, GET_STATUS, \ - RECONFIGURE, RXQ_RECV) \ + RECONFIGURE, RXQ_RECV, TXQ_DRAIN) \ { \ NAME, \ true, /* is_pmd */ \ @@ -3310,6 +3373,7 @@ unlock: RXQ_RECV, \ NULL, /* rx_wait */ \ NULL, /* rxq_drain */ \ + TXQ_DRAIN, /* txq_drain */ \ } static const struct netdev_class dpdk_class = @@ -3326,7 +3390,8 @@ static const struct netdev_class dpdk_class = netdev_dpdk_get_features, netdev_dpdk_get_status, netdev_dpdk_reconfigure, - netdev_dpdk_rxq_recv); + netdev_dpdk_rxq_recv, + netdev_dpdk_txq_drain); static const struct netdev_class dpdk_ring_class = NETDEV_DPDK_CLASS( @@ -3342,7 +3407,8 @@ static const struct netdev_class dpdk_ring_class = netdev_dpdk_get_features, netdev_dpdk_get_status, netdev_dpdk_reconfigure, - netdev_dpdk_rxq_recv); + netdev_dpdk_rxq_recv, + NULL); static const struct netdev_class dpdk_vhost_class = NETDEV_DPDK_CLASS( @@ -3358,7 +3424,8 @@ static const struct netdev_class dpdk_vhost_class = NULL, NULL, netdev_dpdk_vhost_reconfigure, - netdev_dpdk_vhost_rxq_recv); + netdev_dpdk_vhost_rxq_recv, + NULL); static const struct netdev_class dpdk_vhost_client_class = NETDEV_DPDK_CLASS( "dpdkvhostuserclient", @@ -3373,7 +3440,8 @@ static const struct netdev_class dpdk_vhost_client_class = NULL, NULL, netdev_dpdk_vhost_client_reconfigure, - netdev_dpdk_vhost_rxq_recv); + netdev_dpdk_vhost_rxq_recv, + NULL); void netdev_dpdk_register(void) diff --git a/lib/netdev-dummy.c b/lib/netdev-dummy.c index e6e36cd..6a8ad45 100644 --- a/lib/netdev-dummy.c +++ b/lib/netdev-dummy.c @@ -1409,6 +1409,7 @@ netdev_dummy_update_flags(struct netdev *netdev_, netdev_dummy_rxq_recv, \ netdev_dummy_rxq_wait, \ netdev_dummy_rxq_drain, \ + NULL, \ } static const struct netdev_class dummy_class = diff --git a/lib/netdev-linux.c b/lib/netdev-linux.c index a5a9ec1..2499b3e 100644 --- a/lib/netdev-linux.c +++ b/lib/netdev-linux.c @@ -2831,6 +2831,7 @@ netdev_linux_update_flags(struct netdev *netdev_, enum netdev_flags off, netdev_linux_rxq_recv, \ netdev_linux_rxq_wait, \ netdev_linux_rxq_drain, \ + NULL, \ } const struct netdev_class netdev_linux_class = diff --git a/lib/netdev-provider.h b/lib/netdev-provider.h index 8346fc4..97e72c6 100644 --- a/lib/netdev-provider.h +++ b/lib/netdev-provider.h @@ -335,6 +335,11 @@ struct netdev_class { * If the function returns a non-zero value, some of the packets might have * been sent anyway. * + * Some netdev provider - like in case of 'dpdk' - may buffer the batch + * of packets into an intermediate queue. Buffered packets will be sent + * out when their number will exceed a threshold or by the periodic call + * to the drain function. + * * If 'may_steal' is false, the caller retains ownership of all the * packets. If 'may_steal' is true, the caller transfers ownership of all * the packets to the network device, regardless of success. @@ -769,6 +774,9 @@ struct netdev_class { /* Discards all packets waiting to be received from 'rx'. */ int (*rxq_drain)(struct netdev_rxq *rx); + + /* Drain all packets waiting to be sent on queue 'qid'. */ + int (*txq_drain)(struct netdev *netdev, int qid, bool concurrent_txq); }; int netdev_register_provider(const struct netdev_class *); diff --git a/lib/netdev-vport.c b/lib/netdev-vport.c index 4c2ced5..77225b8 100644 --- a/lib/netdev-vport.c +++ b/lib/netdev-vport.c @@ -838,7 +838,8 @@ get_stats(const struct netdev *netdev, struct netdev_stats *stats) NULL, /* rx_dealloc */ \ NULL, /* rx_recv */ \ NULL, /* rx_wait */ \ - NULL, /* rx_drain */ + NULL, /* rx_drain */ \ + NULL, /* tx_drain */ #define TUNNEL_CLASS(NAME, DPIF_PORT, BUILD_HEADER, PUSH_HEADER, POP_HEADER) \ diff --git a/lib/netdev.c b/lib/netdev.c index 839b1f6..0d48e41 100644 --- a/lib/netdev.c +++ b/lib/netdev.c @@ -670,6 +670,15 @@ netdev_rxq_drain(struct netdev_rxq *rx) : 0); } +/* Flush packets on the queue 'qid'. */ +int +netdev_txq_drain(struct netdev *netdev, int qid, bool netdev_txq_drain) +{ + return (netdev->netdev_class->txq_drain + ? netdev->netdev_class->txq_drain(netdev, qid, netdev_txq_drain) + : EOPNOTSUPP); +} + /* Configures the number of tx queues of 'netdev'. Returns 0 if successful, * otherwise a positive errno value. * diff --git a/lib/netdev.h b/lib/netdev.h index bef9cdd..49969a1 100644 --- a/lib/netdev.h +++ b/lib/netdev.h @@ -153,6 +153,7 @@ int netdev_rxq_drain(struct netdev_rxq *); int netdev_send(struct netdev *, int qid, struct dp_packet_batch *, bool may_steal, bool concurrent_txq); void netdev_send_wait(struct netdev *, int qid); +int netdev_txq_drain(struct netdev *, int qid, bool concurrent_txq); /* native tunnel APIs */ /* Structure to pass parameters required to build a tunnel header. */ -- 2.4.11 _______________________________________________ dev mailing list [email protected] https://mail.openvswitch.org/mailman/listinfo/ovs-dev
