Hi
I am currently working on a system in which a high-rate data stream is to be 
transmitted to an FPGA. As this only has small buffers available, I am using 
the packet pacing function of the NIC Mellanox ConnectX-6 MCX623106AN to send 
the packets at uniform intervals. This works if I only transfer 5 GB/s per 
second, but as soon as I step up to 10 GB/s, after a few seconds errors begin 
to occur: The tx_pp_wander value increases significantly (>80000ns) and there 
are large gaps in the packet stream (>100µs, the affected packets are not lost, 
but arrive later).

To demonstrate this, I connected my host to another computer with the same type 
of NIC via a DAC cable, enabling Rx hardware timestamping on the second device 
and observing the timing difference between adjacent packets. The code for this 
minimum working example is attached to this message. It includes an assertion 
to ensure that every packet is enqueued well before its Tx time comes, so 
software timing should not influence the issue.

I tested different packet pacing granularity settings (tx_pp) in the range of 
500ns-4µs, which did not change the outcome. Also, enabling Tx timestamping 
only for every 16th packet did not have the desired effect. Distributing the 
workload over multiple threads and Tx queues also has no effect. The NIC is 
connected via PCIe 4.0x16 and has firmware version 22.38.1002, DPDK version 
22.11.3-2.

To be able to use packet pacing, the configuration REAL_TIME_CLOCK_ENABLE=1 
must be set for this NIC. Is it possible that the large gaps are caused by the 
NIC and host clock synchronizing mechanism not working correctly under the high 
packet load? In my specific application I do not need a real-time NIC clock - 
the synchronization between the devices is done via feedback from the FPGA. Is 
there any way to eliminate these jumps in the NIC clock?

Thank you and best regards
Max
// (c) 2023 Fraunhofer IIS, Maximilian Engelhardt 
<[email protected]>

#ifndef MWE_COMMON_H
#define MWE_COMMON_H

#include <rte_eal.h>
#include <rte_ethdev.h>
#include <cstdint>
#include <csignal>


#define NUM_MBUFS 1024*1024
#define MBUF_CACHE_SIZE 250

int hwts_dynfield_offset = 0;
uint64_t hwts_dynflag_tx_mask = 0;

static const struct rte_eth_conf port_conf_default = {
        .rxmode = {
                .mtu = 9600,
        },
};

#define ASSERT(cond, text) do { \
        int ret = cond; \
        if (!ret) { \
                fprintf(stderr, "%s:%d: assertion { %s } failed (%s)\n", 
__FILE__, __LINE__, #cond, text); \
                abort(); \
        } \
} while (0)

#define ASSERTP(cond) do { \
        int ret = cond; \
        if (!(ret == 0)) { \
                fprintf(stderr, "%s:%d: assertion { %s == 0 } failed (%s)\n", 
__FILE__, __LINE__, #cond, strerror(ret)); \
                abort(); \
        } \
} while (0)

bool run = true;
struct rte_mempool *mbuf_pool;

struct Header {
    uint8_t stream_id;
    uint64_t stream_pos;
} __attribute__((packed));

void signalHandler( int signum ) {
    run = false;
}

void init_dpdk(int *argc, char **argv[]) {
    signal(SIGINT, signalHandler);
    signal(SIGTERM, signalHandler);

    int ret = rte_eal_init(*argc, *argv);
    if (ret < 0)
        rte_exit(EXIT_FAILURE, "Error with EAL initialization\n");

    *argc -= ret;
    *argv += ret;


    auto nb_ports = rte_eth_dev_count_avail();
    ASSERT(nb_ports>0, "No Port available!");
    mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL", NUM_MBUFS * nb_ports,
                                        MBUF_CACHE_SIZE, 0, 10000, 
rte_socket_id());

    int port_id = 0;
    struct rte_eth_conf port_conf = port_conf_default;
    uint16_t rx_rings = 1;
    uint16_t tx_rings = 2;
    uint16_t nb_rxd = 4096;
    uint16_t nb_txd = 4096;

    struct rte_eth_dev_info dev_info{};
    struct rte_eth_txconf txconf{};


    rte_eth_dev_info_get(port_id, &dev_info);


    if (dev_info.tx_offload_capa & RTE_ETH_TX_OFFLOAD_MBUF_FAST_FREE)
        port_conf.txmode.offloads |=
                RTE_ETH_TX_OFFLOAD_MBUF_FAST_FREE;
    port_conf.rxmode.offloads = RTE_ETH_RX_OFFLOAD_TIMESTAMP;
    port_conf.txmode.offloads |= RTE_ETH_TX_OFFLOAD_SEND_ON_TIMESTAMP;

    // Configure the Ethernet device.
    rte_eth_dev_configure(port_id, rx_rings, tx_rings, &port_conf);

    rte_eth_dev_adjust_nb_rx_tx_desc(port_id, &nb_rxd, &nb_txd);

    // RX setup
    for (int q = 0; q < rx_rings; q++) {
        ASSERTP(rte_eth_rx_queue_setup(port_id, q, nb_rxd,
                                       rte_eth_dev_socket_id(port_id), NULL, 
mbuf_pool));
    }


    // TX setup
    txconf = dev_info.default_txconf;
    txconf.offloads = port_conf.txmode.offloads;

    for (int q = 0; q < tx_rings; q++) {
        ASSERTP(rte_eth_tx_queue_setup(port_id, q, nb_txd,
                                       rte_eth_dev_socket_id(port_id), 
&txconf));
    }

    rte_mbuf_dyn_rx_timestamp_register(&hwts_dynfield_offset, NULL);
    rte_mbuf_dyn_tx_timestamp_register(&hwts_dynfield_offset, 
&hwts_dynflag_tx_mask);
    if (hwts_dynfield_offset < 0) {
        printf("ERROR: Failed to register timestamp field\n");
    }


    ASSERTP(rte_eth_dev_set_mtu(port_id, 9100));
    ASSERTP(rte_eth_promiscuous_enable(port_id));
    ASSERTP(rte_eth_dev_start(port_id));

    struct rte_ether_addr localaddr{};
    rte_eth_macaddr_get(port_id, &localaddr);

    rte_flow_flush(port_id, nullptr);


}

void print_xstats() {
    rte_eth_xstat_name stats_names[512];
    rte_eth_xstat stats[512];
    auto num_names = rte_eth_xstats_get_names(0, stats_names, 512);
    auto num_stats = rte_eth_xstats_get(0, stats, 512);

    for (int i = 0; i < num_stats; ++i) {
        auto name = std::string(stats_names[i].name);
        if (stats[i].value == 0)
            continue;
        printf("%s: %lu\n", stats_names[i].name, stats[i].value);
    }

}

#endif
// (c) 2023 Fraunhofer IIS, Maximilian Engelhardt 
<[email protected]>

#include <iostream>

#include "common.h"
#include "rte_pause.h"
#include "rte_ethdev.h"
#include "rte_byteorder.h"
#include "rte_byteorder_64.h"
#include "rte_malloc.h"
#include "rte_hexdump.h"


const size_t BURST_SIZE = 128;

struct ChannelStats {
    uint64_t stream_pos = 0;
    uint64_t last_packet_ts = -1;
    uint64_t max_ts_diff = 0;
};

ChannelStats channel_stats_arr[2] = {};
uint64_t total_bytes_rx = 0;


int worker(void* arg) {
    rte_mbuf* packets[BURST_SIZE];

    while (run) {

        int num_packets = rte_eth_rx_burst(0, 0, packets, BURST_SIZE);


        for (int i = 0; i<num_packets; i++) {
            auto packet = packets[i];

            rte_pktmbuf_adj(packet,sizeof(struct rte_ether_hdr));
            rte_pktmbuf_adj(packet,sizeof(struct rte_vlan_hdr));

            auto header = (Header*)rte_pktmbuf_read(packet, 0 , sizeof(Header), 
nullptr);
            rte_pktmbuf_adj(packet, sizeof(Header));

            auto len = rte_pktmbuf_pkt_len(packet);
            auto header_stream_pos = header->stream_pos;
            auto rx_ts = *RTE_MBUF_DYNFIELD(packet, hwts_dynfield_offset, 
uint64_t *);

            auto& channel_stats = channel_stats_arr[header->stream_id];

            ASSERT(channel_stats.stream_pos == header_stream_pos, "Stream 
error!");

            if (channel_stats.last_packet_ts != -1) {
                auto diff = rx_ts - channel_stats.last_packet_ts;
                if (diff > channel_stats.max_ts_diff)
                    channel_stats.max_ts_diff = diff;
            }
            channel_stats.last_packet_ts = rx_ts;

            total_bytes_rx += len;
            channel_stats.stream_pos = header_stream_pos + len;


            rte_pktmbuf_free(packet);

        }


    }
}

int main(int argc, char *argv[]) {
    init_dpdk(&argc, &argv);
    rte_eal_remote_launch(&worker, nullptr, rte_get_next_lcore(-1, true, 
false));

    while (run) {
        rte_delay_ms(1000);
        printf("=== Total Rx: %lu bytes\r\n", total_bytes_rx);
        for (auto& channel_stats: channel_stats_arr) {
            printf("Stream pos %lu, Max timestamp difference: %lu\n", 
channel_stats.stream_pos, channel_stats.max_ts_diff);
        }
    }
    return 0;
}

// (c) 2023 Fraunhofer IIS, Maximilian Engelhardt 
<[email protected]>

#include <iostream>

#include "common.h"
#include "rte_pause.h"
#include "rte_ethdev.h"
#include "rte_byteorder.h"
#include "rte_byteorder_64.h"
#include "rte_malloc.h"


const size_t BURST_SIZE = 16; // Packets
const size_t TARGET_RATE = 10ul * 1000ul * 1000ul * 1000ul;
const size_t PAYLOAD_LEN = 8000;
const size_t NIC_CLOCK_RATE = 1000ul * 1000ul * 1000ul;
const size_t NUM_TX_THREADS = 1;


uint64_t total_bytes_tx = 0;
struct worker_arg {
    uint16_t queue_id;
    uint64_t start_time;
    double tx_shift_ratio;
};


int worker(void* arg_) {
    struct sched_param sp;
    memset( &sp, 0, sizeof(sp) );
    sp.sched_priority = 99;
    ASSERTP(sched_setscheduler( 0, SCHED_FIFO, &sp ));

    auto arg = (worker_arg*)arg_;
    uint64_t nic_delay_packet = NIC_CLOCK_RATE * PAYLOAD_LEN / 
(TARGET_RATE/NUM_TX_THREADS);
    uint64_t nic_delay_burst = nic_delay_packet * BURST_SIZE;

    uint64_t nic_clock_time = 0;

    uint64_t next_burst_nic = arg->start_time;
    uint64_t next_packet_pp_timestamp = next_burst_nic + 1000*1000;
    next_packet_pp_timestamp += 
static_cast<uint64_t>((double)nic_delay_packet*arg->tx_shift_ratio);
    uint64_t stream_pos = 0;
    rte_mbuf* packets[BURST_SIZE];

    uint16_t queue_id = arg->queue_id;

    while (run) {

        rte_eth_read_clock(0, &nic_clock_time);
        while (next_burst_nic > nic_clock_time && run)  {
            rte_pause();
            rte_eth_read_clock(0, &nic_clock_time);
        }

        auto r = rte_pktmbuf_alloc_bulk(mbuf_pool, packets, BURST_SIZE);
        ASSERT(r == 0, "MBuf pool drained!");

        auto first_ts_in_burst = next_packet_pp_timestamp;
        for (auto packet : packets) {
            auto data = rte_pktmbuf_append(packet, PAYLOAD_LEN);

            auto header = (Header*)rte_pktmbuf_prepend(packet, sizeof(Header));
            header->stream_pos = stream_pos;
            header->stream_id = queue_id;

            auto vlan_header = (struct rte_vlan_hdr 
*)rte_pktmbuf_prepend(packet,sizeof(struct rte_vlan_hdr));
            vlan_header->vlan_tci = rte_cpu_to_be_16(11);
            vlan_header->eth_proto = rte_cpu_to_be_16(0x0811);

            auto ether_header = (struct rte_ether_hdr 
*)rte_pktmbuf_prepend(packet,sizeof(struct rte_ether_hdr));
            ether_header->ether_type = rte_cpu_to_be_16(RTE_ETHER_TYPE_VLAN);
            ether_header->src_addr = {0,0,0,0,0,0};
            ether_header->dst_addr = {0xff,0xff,0xff,0xff,0xff,0xff};

            auto timestamp = RTE_MBUF_DYNFIELD(packet, hwts_dynfield_offset, 
int64_t *);
            *timestamp = (int64_t)next_packet_pp_timestamp;
            packet->ol_flags |= hwts_dynflag_tx_mask;

            stream_pos += PAYLOAD_LEN;
            next_packet_pp_timestamp += nic_delay_packet;

        }


        rte_eth_read_clock(0, &nic_clock_time);
        ASSERT(first_ts_in_burst > nic_clock_time + 50*1000, "TX queue slack 
target missed!");

        int packets_sent = rte_eth_tx_burst(0, queue_id, packets, BURST_SIZE);
        ASSERT(packets_sent == BURST_SIZE, "TX ring overflow!");
        total_bytes_tx += BURST_SIZE*PAYLOAD_LEN;
        next_burst_nic += nic_delay_burst;

    }
}

int main(int argc, char *argv[]) {

    init_dpdk(&argc, &argv);

    uint64_t start_time = 0;
    rte_eth_read_clock(0, &start_time);
    start_time += 1000ul*1000ul*1000ul;

    struct rte_eth_dev *dev;

    worker_arg args[NUM_TX_THREADS];
    uint lcore_id = -1;
    for (int i = 0; i < NUM_TX_THREADS; ++i) {
        args[i] = {.queue_id = static_cast<uint16_t>(i), 
.start_time=start_time, .tx_shift_ratio=(1.0/NUM_TX_THREADS)*i};
        lcore_id = rte_get_next_lcore(lcore_id, true, false);
        rte_eal_remote_launch(&worker, &args[i], lcore_id);
    }

    uint64_t xstats_wander_id;
    uint64_t pp_wander=0, pp_wander_max=0;
    rte_eth_xstats_get_id_by_name(0, "tx_pp_wander", &xstats_wander_id);

    int i=0;
    while (run) {
        rte_delay_ms(1000);
        printf("=== Total bytes sent %lu - running for %d seconds, %lu max 
tx_pp_wander\r\n", total_bytes_tx, i++, pp_wander_max);
        print_xstats();

        rte_eth_xstats_get_by_id(0, &xstats_wander_id, &pp_wander, 1);
        if (pp_wander_max < pp_wander) pp_wander_max = pp_wander;

    }
    return 0;
}

Reply via email to