>At first, this patch should be applied after the patch with flushing on >reconfiguration because we must not reconfigure ports while there are >unsent packets in the intermediate queue. >Otherwise we may destroy the memory pool which contains that packets and >will try to send them after that. This may lead to serious problems.
This is a good point. Will handle this appropriately in next version. > >Second thing is that you should also modify 'dpdk_do_tx_copy' >function, otherwise where will be reordering issues and flood traffic will have >accidentally higher priority because not buffered. I presume you are referring to netdev_dpdk_eth_tx_burst()where we burst the packets. You think we should queue the packets here and flush them? - Bhanuprakash. >On 18.06.2017 22:56, Bhanuprakash Bodireddy wrote: >> This commit introduces netdev_dpdk_eth_tx_queue() function that >> implements intermediate queue and packet buffering. The packets get >> buffered till the threshold 'INTERIM_QUEUE_BURST_THRESHOLD[32] is >> reached and eventually gets transmitted. >> >> To handle the case(eg: ping) where packets are sent at low rate and >> can potentially get stuck in the queue, flush logic is implemented >> that gets invoked from dp_netdev_flush_txq_ports() as part of PMD >> packet processing loop. >> >> Signed-off-by: Bhanuprakash Bodireddy >> <[email protected]> >> Signed-off-by: Antonio Fischetti <[email protected]> >> Co-authored-by: Antonio Fischetti <[email protected]> >> Signed-off-by: Markus Magnusson <[email protected]> >> Co-authored-by: Markus Magnusson <[email protected]> >> Acked-by: Eelco Chaudron <[email protected]> >> --- >> lib/dpif-netdev.c | 44 >+++++++++++++++++++++++++++++++++++++++++++- >> lib/netdev-dpdk.c | 35 ++++++++++++++++++++++++++++++++++- >> 2 files changed, 77 insertions(+), 2 deletions(-) >> >> diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c index >> 2b65dc7..d59208e 100644 >> --- a/lib/dpif-netdev.c >> +++ b/lib/dpif-netdev.c >> @@ -332,6 +332,7 @@ enum pmd_cycles_counter_type { }; >> >> #define XPS_TIMEOUT_MS 500LL >> +#define LAST_USED_QID_NONE -1 >> >> /* Contained by struct dp_netdev_port's 'rxqs' member. */ struct >> dp_netdev_rxq { @@ -492,7 +493,13 @@ struct rxq_poll { struct tx_port >> { >> struct dp_netdev_port *port; >> int qid; >> - long long last_used; >> + int last_used_qid; /* Last queue id where packets got >> + enqueued. */ >> + long long last_used; /* In case XPS is enabled, it contains the >> + * timestamp of the last time the port was >> + * used by the thread to send data. After >> + * XPS_TIMEOUT_MS elapses the qid will be >> + * marked as -1. */ >> struct hmap_node node; >> }; >> >> @@ -3081,6 +3088,25 @@ cycles_count_end(struct >dp_netdev_pmd_thread >> *pmd, } >> >> static void >> +dp_netdev_flush_txq_ports(struct dp_netdev_pmd_thread *pmd) { >> + struct tx_port *cached_tx_port; >> + int tx_qid; >> + >> + HMAP_FOR_EACH (cached_tx_port, node, &pmd->send_port_cache) { >> + tx_qid = cached_tx_port->last_used_qid; >> + >> + if (tx_qid != LAST_USED_QID_NONE) { >> + netdev_txq_flush(cached_tx_port->port->netdev, tx_qid, >> + cached_tx_port->port->dynamic_txqs); >> + >> + /* Queue flushed and mark it empty. */ >> + cached_tx_port->last_used_qid = LAST_USED_QID_NONE; >> + } >> + } >> +} >> + >> +static void >> dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd, >> struct netdev_rxq *rx, >> odp_port_t port_no) @@ -4356,6 +4382,7 @@ >> dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd, >> >> tx->port = port; >> tx->qid = -1; >> + tx->last_used_qid = LAST_USED_QID_NONE; >> >> hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port- >>port_no)); >> pmd->need_reload = true; >> @@ -4926,6 +4953,14 @@ dpif_netdev_xps_get_tx_qid(const struct >> dp_netdev_pmd_thread *pmd, >> >> dpif_netdev_xps_revalidate_pmd(pmd, now, false); >> >> + /* The tx queue can change in XPS case, make sure packets in previous >> + * queue is flushed properly. */ >> + if (tx->last_used_qid != LAST_USED_QID_NONE && >> + tx->qid != tx->last_used_qid) { >> + netdev_txq_flush(port->netdev, tx->last_used_qid, port- >>dynamic_txqs); >> + tx->last_used_qid = LAST_USED_QID_NONE; >> + } >> + >> VLOG_DBG("Core %d: New TX queue ID %d for port \'%s\'.", >> pmd->core_id, tx->qid, netdev_get_name(tx->port->netdev)); >> return min_qid; >> @@ -5021,6 +5056,13 @@ dp_execute_cb(void *aux_, struct >dp_packet_batch *packets_, >> tx_qid = pmd->static_tx_qid; >> } >> >> + /* In case these packets gets buffered into an intermediate >> + * queue and XPS is enabled the flush function could find a >> + * different tx qid assigned to its thread. We keep track >> + * of the qid we're now using, that will trigger the flush >> + * function and will select the right queue to flush. */ >> + p->last_used_qid = tx_qid; >> + >> netdev_send(p->port->netdev, tx_qid, packets_, may_steal, >> dynamic_txqs); >> return; >> diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c index >> 1e83116..50a9a2c 100644 >> --- a/lib/netdev-dpdk.c >> +++ b/lib/netdev-dpdk.c >> @@ -1434,6 +1434,7 @@ static inline int >> netdev_dpdk_eth_tx_burst(struct netdev_dpdk *dev, int qid, >> struct rte_mbuf **pkts, int cnt) { >> + struct dpdk_tx_queue *txq = &dev->tx_q[qid]; >> uint32_t nb_tx = 0; >> >> while (nb_tx != cnt) { >> @@ -1457,6 +1458,7 @@ netdev_dpdk_eth_tx_burst(struct netdev_dpdk >*dev, int qid, >> } >> } >> >> + txq->dpdk_pkt_cnt = 0; >> return cnt - nb_tx; >> } >> >> @@ -1841,6 +1843,37 @@ dpdk_do_tx_copy(struct netdev *netdev, int qid, >struct dp_packet_batch *batch) >> } >> } >> >> +/* Enqueue packets in an intermediate queue and call the flush >> + * function when the queue is full. This way we can amortize the >> + * cost of MMIO writes. */ >> +static inline int >> +netdev_dpdk_eth_tx_queue(struct netdev_dpdk *dev, int qid, >> + struct rte_mbuf **pkts, int cnt) { >> + struct dpdk_tx_queue *txq = &dev->tx_q[qid]; >> + >> + int i = 0; >> + int dropped = 0; >> + >> + while (i < cnt) { >> + int freeslots = INTERIM_QUEUE_BURST_THRESHOLD - txq- >>dpdk_pkt_cnt; >> + int tocopy = MIN(freeslots, cnt-i); >> + >> + memcpy(&txq->dpdk_burst_pkts[txq->dpdk_pkt_cnt], &pkts[i], >> + tocopy * sizeof (struct rte_mbuf *)); >> + >> + txq->dpdk_pkt_cnt += tocopy; >> + i += tocopy; >> + >> + /* Queue full, burst the packets. */ >> + if (txq->dpdk_pkt_cnt >= INTERIM_QUEUE_BURST_THRESHOLD) { >> + dropped += netdev_dpdk_eth_tx_burst(dev, qid, txq- >>dpdk_burst_pkts, >> + txq->dpdk_pkt_cnt); >> + } >> + } >> + return dropped; >> +} >> + >> static int >> netdev_dpdk_vhost_send(struct netdev *netdev, int qid, >> struct dp_packet_batch *batch, @@ -1889,7 >> +1922,7 @@ netdev_dpdk_send__(struct netdev_dpdk *dev, int qid, >> cnt = netdev_dpdk_qos_run(dev, pkts, cnt); >> dropped = batch->count - cnt; >> >> - dropped += netdev_dpdk_eth_tx_burst(dev, qid, pkts, cnt); >> + dropped += netdev_dpdk_eth_tx_queue(dev, qid, pkts, cnt); >> >> if (OVS_UNLIKELY(dropped)) { >> rte_spinlock_lock(&dev->stats_lock); >> _______________________________________________ dev mailing list [email protected] https://mail.openvswitch.org/mailman/listinfo/ovs-dev
