The patch creates a new netdev type "afxdp" and copies some of the afxdp api implementation from xdpsock_user.c at linux sample code. The afxdp ebpf programs/maps are loaded when dpif-netdev is created, and when users add a netdev with type="afxdp", ovs attaches the ebpf program/map to the netdev, and initializes the af_xdp socket.
Signed-off-by: William Tu <[email protected]> --- lib/automake.mk | 3 +- lib/dpif-netdev.c | 74 ++++- lib/if_xdp.h | 79 ++++++ lib/netdev-dummy.c | 1 + lib/netdev-linux.c | 741 +++++++++++++++++++++++++++++++++++++++++++++++++- lib/netdev-provider.h | 2 + lib/netdev-vport.c | 4 + lib/netdev.c | 11 + lib/netdev.h | 1 + 9 files changed, 907 insertions(+), 9 deletions(-) create mode 100644 lib/if_xdp.h diff --git a/lib/automake.mk b/lib/automake.mk index 61fef23152d3..e0528f74989f 100644 --- a/lib/automake.mk +++ b/lib/automake.mk @@ -302,7 +302,8 @@ lib_libopenvswitch_la_SOURCES = \ lib/lldp/lldpd.c \ lib/lldp/lldpd.h \ lib/lldp/lldpd-structs.c \ - lib/lldp/lldpd-structs.h + lib/lldp/lldpd-structs.h \ + lib/if_xdp.h if WIN32 lib_libopenvswitch_la_SOURCES += \ diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c index baff020fe3d0..9f0300ac4e91 100644 --- a/lib/dpif-netdev.c +++ b/lib/dpif-netdev.c @@ -76,6 +76,11 @@ #include "unixctl.h" #include "util.h" +#include "bpf.h" +#include "netdev.h" +#include "openvswitch/thread.h" +#include <bpf/bpf.h> + VLOG_DEFINE_THIS_MODULE(dpif_netdev); #define FLOW_DUMP_MAX_BATCH 50 @@ -507,6 +512,12 @@ struct tx_port { struct dp_netdev_rxq *output_pkts_rxqs[NETDEV_MAX_BURST]; }; +static struct dp_bpf { + struct bpf_state bpf; + struct netdev *outport; /* Used for downcall. */ +} bpf_datapath; + + /* A set of properties for the current processing loop that is not directly * associated with the pmd thread itself, but with the packets being * processed or the short-term system configuration (for example, time). @@ -1121,6 +1132,8 @@ dpif_netdev_pmd_info(struct unixctl_conn *conn, int argc, const char *argv[], static int dpif_netdev_init(void) { + int error; + static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER; static enum pmd_info_type show_aux = PMD_INFO_SHOW_STATS, clear_aux = PMD_INFO_CLEAR_STATS, poll_aux = PMD_INFO_SHOW_RXQ; @@ -1137,6 +1150,17 @@ dpif_netdev_init(void) unixctl_command_register("dpif-netdev/pmd-rxq-rebalance", "[dp]", 0, 1, dpif_netdev_pmd_rebalance, NULL); + + // load the bpf program + if (ovsthread_once_start(&once)) { + // we don't need downcall device here + error = bpf_get(&bpf_datapath.bpf, true); + if (error) { + VLOG_ERR("%s: Load BPF datapath failed", __func__); + } + } + ovsthread_once_done(&once); + return 0; } @@ -1504,7 +1528,26 @@ dp_netdev_reload_pmd__(struct dp_netdev_pmd_thread *pmd) ovs_mutex_cond_wait(&pmd->cond, &pmd->cond_mutex); ovs_mutex_unlock(&pmd->cond_mutex); } - +/* +static bool output_to_local_stack(struct netdev *netdev) +{ + return !strcmp(netdev_get_type(netdev), "tap"); +} +*/ +static bool netdev_support_xdp(const char *devname) +{ + /* + struct netdev_linux *netdev_linux = netdev_linux_cast(netdev_linux); + if (netdev_linux->ifindex == 0) + return false; +*/ + if (!strstr(devname, "afxdp")) { + return false; + } else { + return true; + } +} +static int afxdp_idx; static int port_create(const char *devname, const char *type, odp_port_t port_no, struct dp_netdev_port **portp) @@ -1519,7 +1562,7 @@ port_create(const char *devname, const char *type, /* Open and validate network device. */ error = netdev_open(devname, type, &netdev); - VLOG_INFO("%s %s error %d", __func__, devname, error); + VLOG_INFO("%s %s type = %s error %d", __func__, devname, type, error); if (error) { return error; } @@ -1538,6 +1581,23 @@ port_create(const char *devname, const char *type, goto out; } + if (!strcmp(type, "afxdp")) { + // or a separate set_af_xdp? + // FIXME: + VLOG_INFO("using afxdp port idx %d", afxdp_idx); + error = netdev_set_xdp(netdev, &bpf_datapath.bpf.afxdp[afxdp_idx]); + if (error) { + VLOG_WARN("%s XDP set failed", __func__); + goto out; + } + error = netdev_set_xskmap(netdev, bpf_datapath.bpf.xsks_map[afxdp_idx].fd); + if (error) { + VLOG_ERR("%s XSK map set error\n", __func__); + goto out; + } + afxdp_idx++; + } + port = xzalloc(sizeof *port); port->port_no = port_no; port->netdev = netdev; @@ -5008,8 +5068,11 @@ emc_processing(struct dp_netdev_pmd_thread *pmd, struct dp_netdev_flow *flow; if (OVS_UNLIKELY(dp_packet_size(packet) < ETH_HEADER_LEN)) { + static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5); + dp_packet_delete(packet); n_dropped++; + VLOG_ERR_RL(&rl, "%s dropped packet size %d\n", __func__, dp_packet_size(packet)); continue; } @@ -5254,6 +5317,13 @@ dp_netdev_input__(struct dp_netdev_pmd_thread *pmd, n_batches = 0; emc_processing(pmd, packets, keys, batches, &n_batches, md_is_valid, port_no); +/* + if (dp_packet_batch_is_empty(packets)) { + VLOG_WARN("%s: batch is empty ", __func__); + } else { + VLOG_WARN("%s: batch is %lu ", __func__, packets->count); + } +*/ if (!dp_packet_batch_is_empty(packets)) { /* Get ingress port from first packet's metadata. */ in_port = packets->packets[0]->md.in_port.odp_port; diff --git a/lib/if_xdp.h b/lib/if_xdp.h new file mode 100644 index 000000000000..2a8c5780166f --- /dev/null +++ b/lib/if_xdp.h @@ -0,0 +1,79 @@ +/* SPDX-License-Identifier: GPL-2.0 WITH Linux-syscall-note */ +/* + * if_xdp: XDP socket user-space interface + * Copyright(c) 2018 Intel Corporation. + * + * Author(s): Björn Töpel <[email protected]> + * Magnus Karlsson <[email protected]> + */ + +#ifndef _LINUX_IF_XDP_H +#define _LINUX_IF_XDP_H + +#include <linux/types.h> +#include <stdbool.h> + +/* Options for the sxdp_flags field */ +#define XDP_SHARED_UMEM (1 << 0) +#define XDP_COPY (1 << 1) /* Force copy-mode */ +#define XDP_ZEROCOPY (1 << 2) /* Force zero-copy mode */ + +struct sockaddr_xdp { + __u16 sxdp_family; + __u16 sxdp_flags; + __u32 sxdp_ifindex; + __u32 sxdp_queue_id; + __u32 sxdp_shared_umem_fd; +}; + +struct xdp_ring_offset { + __u64 producer; + __u64 consumer; + __u64 desc; +}; + +struct xdp_mmap_offsets { + struct xdp_ring_offset rx; + struct xdp_ring_offset tx; + struct xdp_ring_offset fr; /* Fill */ + struct xdp_ring_offset cr; /* Completion */ +}; + +/* XDP socket options */ +#define XDP_MMAP_OFFSETS 1 +#define XDP_RX_RING 2 +#define XDP_TX_RING 3 +#define XDP_UMEM_REG 4 +#define XDP_UMEM_FILL_RING 5 +#define XDP_UMEM_COMPLETION_RING 6 +#define XDP_STATISTICS 7 + +struct xdp_umem_reg { + __u64 addr; /* Start of packet data area */ + __u64 len; /* Length of packet data area */ + __u32 chunk_size; + __u32 headroom; +}; + +struct xdp_statistics { + __u64 rx_dropped; /* Dropped for reasons other than invalid desc */ + __u64 rx_invalid_descs; /* Dropped due to invalid descriptor */ + __u64 tx_invalid_descs; /* Dropped due to invalid descriptor */ +}; + +/* Pgoff for mmaping the rings */ +#define XDP_PGOFF_RX_RING 0 +#define XDP_PGOFF_TX_RING 0x80000000 +#define XDP_UMEM_PGOFF_FILL_RING 0x100000000ULL +#define XDP_UMEM_PGOFF_COMPLETION_RING 0x180000000ULL + +/* Rx/Tx descriptor */ +struct xdp_desc { + __u64 addr; + __u32 len; + __u32 options; +}; + +/* UMEM descriptor is __u64 */ + +#endif /* _LINUX_IF_XDP_H */ diff --git a/lib/netdev-dummy.c b/lib/netdev-dummy.c index 44c9458a9a22..c7a065ed7ba8 100644 --- a/lib/netdev-dummy.c +++ b/lib/netdev-dummy.c @@ -1429,6 +1429,7 @@ netdev_dummy_update_flags(struct netdev *netdev_, NULL, /* set_policing */ \ NULL, /* set_filter */ \ NULL, /* set_xdp */ \ + NULL, /* set_xskmap */ \ NULL, /* get_qos_types */ \ NULL, /* get_qos_capabilities */ \ NULL, /* get_qos */ \ diff --git a/lib/netdev-linux.c b/lib/netdev-linux.c index 121dd3bc738e..6546ff88aee6 100644 --- a/lib/netdev-linux.c +++ b/lib/netdev-linux.c @@ -88,6 +88,519 @@ COVERAGE_DEFINE(netdev_set_hwaddr); COVERAGE_DEFINE(netdev_get_ethtool); COVERAGE_DEFINE(netdev_set_ethtool); +#ifdef AFXDP_NETDEV +// ========================================================= +#ifndef SOL_XDP +#define SOL_XDP 283 +#endif + +#ifndef AF_XDP +#define AF_XDP 44 +#endif + +#ifndef PF_XDP +#define PF_XDP AF_XDP +#endif + +#define NUM_FRAMES 128 +#define FRAME_HEADROOM 0 +#define FRAME_SIZE 2048 +#define NUM_DESCS 32 + +#define FQ_NUM_DESCS 32 +#define CQ_NUM_DESCS 32 + +#define DEBUG_HEXDUMP 0 + +typedef __u32 u32; +typedef uint64_t u64; + +#include "lib/xdpsock.h" +static u32 opt_xdp_flags; // now alwyas set to SKB_MODE at bpf_set_link_xdp_fd +static u32 opt_xdp_bind_flags; + +struct xdp_uqueue { + u32 cached_prod; + u32 cached_cons; + u32 mask; + u32 size; + u32 *producer; + u32 *consumer; + struct xdp_desc *ring; + void *map; +}; + +struct xdpsock { + struct xdp_uqueue rx; + struct xdp_uqueue tx; + int sfd; + struct xdp_umem *umem; + u32 outstanding_tx; + unsigned long rx_npkts; + unsigned long tx_npkts; + unsigned long prev_rx_npkts; + unsigned long prev_tx_npkts; +}; + +#define MAX_SOCKS 4 + +#define barrier() __asm__ __volatile__("": : :"memory") +#define u_smp_rmb() barrier() +#define u_smp_wmb() barrier() +#define likely(x) __builtin_expect(!!(x), 1) +#define unlikely(x) __builtin_expect(!!(x), 0) + +static const char pkt_data[] = + "\x3c\xfd\xfe\x9e\x7f\x71\xec\xb1\xd7\x98\x3a\xc0\x08\x00\x45\x00" + "\x00\x2e\x00\x00\x00\x00\x40\x11\x88\x97\x05\x08\x07\x08\xc8\x14" + "\x1e\x04\x10\x92\x10\x92\x00\x1a\x6d\xa3\x34\x33\x1f\x69\x40\x6b" + "\x54\x59\xb6\x14\x2d\x11\x44\xbf\xaf\xd9\xbe\xaa"; + +static inline u32 xq_nb_avail(struct xdp_uqueue *q, u32 ndescs) +{ + u32 entries = q->cached_prod - q->cached_cons; + + if (entries == 0) { + q->cached_prod = *q->producer; + entries = q->cached_prod - q->cached_cons; + } + + return (entries > ndescs) ? ndescs : entries; +} + +static inline u32 umem_nb_free(struct xdp_umem_uqueue *q, u32 nb) +{ + u32 free_entries = q->cached_cons - q->cached_prod; + VLOG_INFO("0: %s cons %d prod %d\n", __func__, q->cached_cons, q->cached_prod); + + if (free_entries >= nb) + return free_entries; + + /* Refresh the local tail pointer */ + q->cached_cons = (*q->consumer + q->size) & q->mask; + + VLOG_INFO("%s cons %d prod %d\n", __func__, q->cached_cons, q->cached_prod); + VLOG_INFO("consumer %d, size %d\n", *q->consumer, q->size); + + return q->cached_cons - q->cached_prod; +} + +static inline int umem_fill_to_kernel_ex(struct xdp_umem_uqueue *fq, + struct xdp_desc *d, + size_t nb) +{ + u32 i; + + VLOG_INFO("%s nb = %d", __func__, nb); + if (umem_nb_free(fq, nb) < nb) { + VLOG_ERR("%s error\n", __func__); + return -ENOSPC; + } + + for (i = 0; i < nb; i++) { + u32 idx = fq->cached_prod++ & fq->mask; + + fq->ring[idx] = d[i].addr; + } + + u_smp_wmb(); + + *fq->producer = fq->cached_prod; + + VLOG_INFO("%s producer at %d\n", __func__, *fq->producer); + return 0; +} + +static inline int umem_fill_to_kernel(struct xdp_umem_uqueue *fq, uint64_t *d, + size_t nb) +{ + u32 i; + + if (umem_nb_free(fq, nb) < nb) { + VLOG_ERR("%s error\n", __func__); + return -ENOSPC; + } + + for (i = 0; i < nb; i++) { + u32 idx = fq->cached_prod++ & fq->mask; + + fq->ring[idx] = d[i]; + } + + u_smp_wmb(); + + *fq->producer = fq->cached_prod; + + VLOG_INFO("%s producer at %d\n", __func__, *fq->producer); + return 0; +} + +static inline u32 umem_nb_avail(struct xdp_umem_uqueue *q, u32 nb) +{ + u32 entries = q->cached_prod - q->cached_cons; + + if (entries == 0) { + q->cached_prod = *q->producer; + entries = q->cached_prod - q->cached_cons; + } + + return (entries > nb) ? nb : entries; +} + +static inline size_t umem_complete_from_kernel(struct xdp_umem_uqueue *cq, + uint64_t *d, size_t nb) +{ + u32 idx, i, entries = umem_nb_avail(cq, nb); + + u_smp_rmb(); + + for (i = 0; i < entries; i++) { + idx = cq->cached_cons++ & cq->mask; + d[i] = cq->ring[idx]; + } + + if (entries > 0) { + u_smp_wmb(); + + *cq->consumer = cq->cached_cons; + } + + return entries; +} + +static struct xdp_umem *xdp_umem_configure(int sfd) +{ + int fq_size = FQ_NUM_DESCS, cq_size = CQ_NUM_DESCS; + struct xdp_mmap_offsets off; + struct xdp_umem_reg mr; + struct xdp_umem *umem; + socklen_t optlen; + void *bufs; + + umem = calloc(1, sizeof(*umem)); + ovs_assert(umem); + + VLOG_DBG("enter: %s \n", __func__); + ovs_assert(posix_memalign(&bufs, getpagesize(), /* PAGE_SIZE aligned */ + NUM_FRAMES * FRAME_SIZE) == 0); + + VLOG_INFO("%s shared umem from %p to %p", __func__, + bufs, (char*)bufs + NUM_FRAMES * FRAME_SIZE); + + mr.addr = (__u64)bufs; + mr.len = NUM_FRAMES * FRAME_SIZE; + mr.chunk_size = FRAME_SIZE; + mr.headroom = FRAME_HEADROOM; + + ovs_assert(setsockopt(sfd, SOL_XDP, XDP_UMEM_REG, &mr, sizeof(mr)) == 0); + ovs_assert(setsockopt(sfd, SOL_XDP, XDP_UMEM_FILL_RING, &fq_size, + sizeof(int)) == 0); + ovs_assert(setsockopt(sfd, SOL_XDP, XDP_UMEM_COMPLETION_RING, &cq_size, + sizeof(int)) == 0); + + optlen = sizeof(off); + ovs_assert(getsockopt(sfd, SOL_XDP, XDP_MMAP_OFFSETS, &off, + &optlen) == 0); + + umem->fq.map = mmap(0, off.fr.desc + + FQ_NUM_DESCS * sizeof(u64), + PROT_READ | PROT_WRITE, + MAP_SHARED | MAP_POPULATE, sfd, + XDP_UMEM_PGOFF_FILL_RING); + ovs_assert(umem->fq.map != MAP_FAILED); + + umem->fq.mask = FQ_NUM_DESCS - 1; + umem->fq.size = FQ_NUM_DESCS; + umem->fq.producer = (void *)((char *)umem->fq.map + off.fr.producer); + umem->fq.consumer = (void *)((char *)umem->fq.map + off.fr.consumer); + umem->fq.ring = (void *)((char *)umem->fq.map + off.fr.desc); + umem->fq.cached_cons = FQ_NUM_DESCS; + + umem->cq.map = mmap(0, off.cr.desc + + CQ_NUM_DESCS * sizeof(u64), + PROT_READ | PROT_WRITE, + MAP_SHARED | MAP_POPULATE, sfd, + XDP_UMEM_PGOFF_COMPLETION_RING); + ovs_assert(umem->cq.map != MAP_FAILED); + + umem->cq.mask = CQ_NUM_DESCS - 1; + umem->cq.size = CQ_NUM_DESCS; + umem->cq.producer = umem->cq.map + off.cr.producer; + umem->cq.consumer = umem->cq.map + off.cr.consumer; + umem->cq.ring = umem->cq.map + off.cr.desc; + + umem->frames = bufs; + umem->fd = sfd; + +#if 0 + if (opt_bench == BENCH_TXONLY) { + int i; + + for (i = 0; i < NUM_FRAMES; i++) + (void)gen_eth_frame(&umem->frames[i][0]); + } +#endif + return umem; +} + +static struct xdpsock *xsk_configure(struct xdp_umem *umem, + int ifindex, int queue) +{ + struct sockaddr_xdp sxdp = {}; + struct xdp_mmap_offsets off; + int sfd, ndescs = NUM_DESCS; + struct xdpsock *xsk; + bool shared = false; + socklen_t optlen; + u64 i; + + opt_xdp_flags |= XDP_FLAGS_SKB_MODE; + opt_xdp_bind_flags |= XDP_COPY; + + sfd = socket(PF_XDP, SOCK_RAW, 0); + ovs_assert(sfd >= 0); + + xsk = calloc(1, sizeof(*xsk)); + ovs_assert(xsk); + + xsk->sfd = sfd; + xsk->outstanding_tx = 0; + + VLOG_DBG("enter: %s xsk fd %d", __func__, sfd); + if (!umem) { + shared = false; + xsk->umem = xdp_umem_configure(sfd); + } else { + xsk->umem = umem; + ovs_assert(0); + } + + ovs_assert(setsockopt(sfd, SOL_XDP, XDP_RX_RING, + &ndescs, sizeof(int)) == 0); + ovs_assert(setsockopt(sfd, SOL_XDP, XDP_TX_RING, + &ndescs, sizeof(int)) == 0); + optlen = sizeof(off); + ovs_assert(getsockopt(sfd, SOL_XDP, XDP_MMAP_OFFSETS, &off, + &optlen) == 0); + + /* Rx */ + xsk->rx.map = mmap(NULL, + off.rx.desc + + NUM_DESCS * sizeof(struct xdp_desc), + PROT_READ | PROT_WRITE, + MAP_SHARED | MAP_POPULATE, sfd, + XDP_PGOFF_RX_RING); + ovs_assert(xsk->rx.map != MAP_FAILED); + + if (!shared) { + for (i = 0; i < NUM_DESCS * FRAME_SIZE; i += FRAME_SIZE) + ovs_assert(umem_fill_to_kernel(&xsk->umem->fq, &i, 1) + == 0); + } + + // FIXME: we also configure tx here + /* Tx */ + xsk->tx.map = mmap(NULL, + off.tx.desc + + NUM_DESCS * sizeof(struct xdp_desc), + PROT_READ | PROT_WRITE, + MAP_SHARED | MAP_POPULATE, sfd, + XDP_PGOFF_TX_RING); + ovs_assert(xsk->tx.map != MAP_FAILED); + + xsk->rx.mask = NUM_DESCS - 1; + xsk->rx.size = NUM_DESCS; + xsk->rx.producer = xsk->rx.map + off.rx.producer; + xsk->rx.consumer = xsk->rx.map + off.rx.consumer; + xsk->rx.ring = xsk->rx.map + off.rx.desc; + + xsk->tx.mask = NUM_DESCS - 1; + xsk->tx.size = NUM_DESCS; + xsk->tx.producer = xsk->tx.map + off.tx.producer; + xsk->tx.consumer = xsk->tx.map + off.tx.consumer; + xsk->tx.ring = xsk->tx.map + off.tx.desc; + xsk->tx.cached_cons = NUM_DESCS; + + /* XSK socket */ + sxdp.sxdp_family = PF_XDP; + sxdp.sxdp_ifindex = ifindex; + sxdp.sxdp_queue_id = queue; + + if (shared) { + sxdp.sxdp_flags = XDP_SHARED_UMEM; + sxdp.sxdp_shared_umem_fd = umem->fd; + } else { + sxdp.sxdp_flags = opt_xdp_bind_flags; + } + + ovs_assert(bind(sfd, (struct sockaddr *)&sxdp, sizeof(sxdp)) == 0); + + return xsk; +} + +static inline int xq_deq(struct xdp_uqueue *uq, + struct xdp_desc *descs, + int ndescs) +{ + struct xdp_desc *r = uq->ring; + unsigned int idx; + int i, entries; + + entries = xq_nb_avail(uq, ndescs); + + u_smp_rmb(); + + for (i = 0; i < entries; i++) { + idx = uq->cached_cons++ & uq->mask; + descs[i] = r[idx]; + } + + if (entries > 0) { + u_smp_wmb(); + + *uq->consumer = uq->cached_cons; + VLOG_INFO("%s entries %d consumer %d\n", __func__, entries, *uq->consumer); + } + return entries; +} + +static inline void *xq_get_data(struct xdpsock *xsk, u64 addr) +{ + return &xsk->umem->frames[addr]; +} + +static void vlog_hex_dump(const void *buf, size_t count) +{ + struct ds ds = DS_EMPTY_INITIALIZER; + ds_put_hex_dump(&ds, buf, count, 0, false); + VLOG_INFO("\n%s", ds_cstr(&ds)); + ds_destroy(&ds); +} + +static void kick_tx(int fd) +{ + int ret; + + VLOG_DBG("%s: send to fd %d", __func__, fd); + ret = sendto(fd, NULL, 0, MSG_DONTWAIT, NULL, 0); + if (ret >= 0 || errno == ENOBUFS || errno == EAGAIN) + return; + ovs_assert(0); +} + +static inline void complete_tx_l2fwd(struct xdpsock *xsk) +{ + u64 descs[BATCH_SIZE]; + unsigned int rcvd; + size_t ndescs; + + if (!xsk->outstanding_tx) + return; + + kick_tx(xsk->sfd); + ndescs = (xsk->outstanding_tx > BATCH_SIZE) ? BATCH_SIZE : + xsk->outstanding_tx; + + /* re-add completed Tx buffers */ + rcvd = umem_complete_from_kernel(&xsk->umem->cq, descs, ndescs); + + if (rcvd > 0) { + umem_fill_to_kernel(&xsk->umem->fq, descs, rcvd); + xsk->outstanding_tx -= rcvd; + xsk->tx_npkts += rcvd; + } +} + +static inline void complete_tx_only(struct xdpsock *xsk) +{ + u64 descs[BATCH_SIZE]; + unsigned int rcvd; + + if (!xsk->outstanding_tx) { + VLOG_DBG("no outstanding_tx"); + return; + } + + kick_tx(xsk->sfd); + + rcvd = umem_complete_from_kernel(&xsk->umem->cq, descs, BATCH_SIZE); + if (rcvd > 0) { + xsk->outstanding_tx -= rcvd; + xsk->tx_npkts += rcvd; + } +} + +static inline u32 xq_nb_free(struct xdp_uqueue *q, u32 ndescs) +{ + u32 free_entries = q->cached_cons - q->cached_prod; + + if (free_entries >= ndescs) + return free_entries; + + /* Refresh the local tail pointer */ + q->cached_cons = *q->consumer + q->size; + return q->cached_cons - q->cached_prod; +} + +static inline int xq_enq(struct xdp_uqueue *uq, + const struct xdp_desc *descs, + unsigned int ndescs) +{ + struct xdp_desc *r = uq->ring; + unsigned int i; + + if (xq_nb_free(uq, ndescs) < ndescs) + return -ENOSPC; + + for (i = 0; i < ndescs; i++) { + u32 idx = uq->cached_prod++ & uq->mask; + + r[idx].addr = descs[i].addr; + r[idx].len = descs[i].len; + } + + u_smp_wmb(); + + *uq->producer = uq->cached_prod; + return 0; +} + +static inline int xq_enq_tx_only(struct xdp_uqueue *uq, + unsigned int id, unsigned int ndescs) +{ + struct xdp_desc *r = uq->ring; + unsigned int i; + + if (xq_nb_free(uq, ndescs) < ndescs) + return -ENOSPC; + + for (i = 0; i < ndescs; i++) { + u32 idx = uq->cached_prod++ & uq->mask; + + r[idx].addr = (id + i) << FRAME_SHIFT; + r[idx].len = sizeof(pkt_data) - 1; + } + + u_smp_wmb(); + + *uq->producer = uq->cached_prod; + return 0; +} + +static inline void print_xsk_stat(struct xdpsock *xsk) { + struct xdp_statistics stat; + socklen_t optlen; + + optlen = sizeof(stat); + ovs_assert(getsockopt(xsk->sfd, SOL_XDP, XDP_STATISTICS, + &stat, &optlen) == 0); + + VLOG_INFO("rx dropped %llu, rx_invalid %llu, tx_invalid %llu", + stat.rx_dropped, stat.rx_invalid_descs, stat.tx_invalid_descs); + +} +// ========================================================= +#endif /* These were introduced in Linux 2.6.14, so they might be missing if we have * old headers. */ @@ -522,6 +1035,8 @@ struct netdev_linux { int tap_fd; bool present; /* If the device is present in the namespace */ uint64_t tx_dropped; /* tap device can drop if the iface is down */ + struct xdpsock *xsk[16]; /* af_xdp socket: each queue has one xdp sock */ + int xskmap_fd; /* map netdev's queue id to xsk fd */ }; struct netdev_rxq_linux { @@ -571,6 +1086,12 @@ is_netdev_linux_class(const struct netdev_class *netdev_class) } static bool +is_afxdp_netdev(const struct netdev *netdev) +{ + return netdev_get_class(netdev) == &netdev_afxdp_class; +} + +static bool is_tap_netdev(const struct netdev *netdev) { return netdev_get_class(netdev) == &netdev_tap_class; @@ -921,6 +1442,13 @@ netdev_linux_destruct(struct netdev *netdev_) atomic_count_dec(&miimon_cnt); } + if (is_afxdp_netdev(netdev_)) { + int ifindex; + + get_ifindex(netdev_, &ifindex); + bpf_set_link_xdp_fd(ifindex, -1, XDP_FLAGS_SKB_MODE); + } + ovs_mutex_destroy(&netdev->mutex); } @@ -950,6 +1478,44 @@ netdev_linux_rxq_construct(struct netdev_rxq *rxq_) rx->is_tap = is_tap_netdev(netdev_); if (rx->is_tap) { rx->fd = netdev->tap_fd; + } else if (is_afxdp_netdev(netdev_)) { + // setup AF_XDP socket here, see xsk_configure + struct rlimit r = {RLIM_INFINITY, RLIM_INFINITY}; + int ifindex, num_socks = 0; + struct xdpsock *xsk; + int queue_id = 0; // FIXME + int key = 0; + int xsk_fd; + + if (setrlimit(RLIMIT_MEMLOCK, &r)) { + VLOG_ERR("ERROR: setrlimit(RLIMIT_MEMLOCK) \"%s\"\n", + ovs_strerror(errno)); + ovs_assert(0); + } + + VLOG_INFO("%s: %s: queue=%d configuring xdp sock", + __func__, netdev_->name, queue_id); + + /* Get ethernet device index. */ + error = get_ifindex(&netdev->up, &ifindex); + if (error) { + goto error; + } + + xsk = xsk_configure(NULL, ifindex, queue_id); + + netdev->xsk[num_socks++] = xsk; + rx->fd = xsk->sfd; //for upper layer to poll + xsk_fd = xsk->sfd; + + if (xsk_fd) { + error = bpf_map_update_elem(netdev->xskmap_fd, &key, &xsk_fd, 0); + if (error) { + VLOG_ERR("failed to set xsks_map: %s", ovs_strerror(error)); + return error; + } + } + } else { struct sockaddr_ll sll; int ifindex, val; @@ -1149,6 +1715,58 @@ netdev_linux_rxq_recv_tap(int fd, struct dp_packet *buffer) return 0; } +/* Receive packet from AF_XDP socket */ +static int +netdev_linux_rxq_xsk(struct xdpsock *xsk, + struct dp_packet_batch *batch) +{ + struct xdp_desc descs[NETDEV_MAX_BURST]; + unsigned int rcvd, i = 0; + int ret = 0; + + rcvd = xq_deq(&xsk->rx, descs, NETDEV_MAX_BURST); + if (rcvd == 0) { + return 0; + } + + VLOG_INFO("%s receive %d packets xsk fd %d", + __func__, rcvd, xsk->sfd); + + for (i = 0; i < rcvd; i++) { + struct dp_packet *packet; + void *base, *new_packet; + + packet = xmalloc(sizeof *packet); + + VLOG_INFO("%s packet len %d", __func__, descs[i].len); + base = xq_get_data(xsk, descs[i].addr); + + //vlog_hex_dump(base, 14); + new_packet = malloc(2048); + memcpy(new_packet, base, descs[i].len); + + //dp_packet_use(packet, base, descs[i].len); + dp_packet_use(packet, new_packet, descs[i].len); + + packet->source = DPBUF_MALLOC; + //dp_packet_set_data(packet, base); // no offset now? + dp_packet_set_data(packet, new_packet); // no offset now? + dp_packet_set_size(packet, descs[i].len); + + /* add packet into batch, batch->count inc */ + dp_packet_batch_add(batch, packet); + } + + xsk->rx_npkts += rcvd; + umem_fill_to_kernel_ex(&xsk->umem->fq, descs, rcvd); + + //batch->count = rcvd; // batch_add inc the counter + //don't put it back to FILL queue yet. + + print_xsk_stat(xsk); + return ret; +} + static int netdev_linux_rxq_recv(struct netdev_rxq *rxq_, struct dp_packet_batch *batch) { @@ -1157,6 +1775,8 @@ netdev_linux_rxq_recv(struct netdev_rxq *rxq_, struct dp_packet_batch *batch) struct dp_packet *buffer; ssize_t retval; int mtu; + struct netdev_linux *netdev_ = netdev_linux_cast(netdev); + if (netdev_linux_get_mtu__(netdev_linux_cast(netdev), &mtu)) { mtu = ETH_PAYLOAD_MAX; @@ -1166,15 +1786,20 @@ netdev_linux_rxq_recv(struct netdev_rxq *rxq_, struct dp_packet_batch *batch) buffer = dp_packet_new_with_headroom(VLAN_ETH_HEADER_LEN + mtu, DP_NETDEV_HEADROOM); retval = (rx->is_tap - ? netdev_linux_rxq_recv_tap(rx->fd, buffer) - : netdev_linux_rxq_recv_sock(rx->fd, buffer)); - + ? netdev_linux_rxq_recv_tap(rx->fd, buffer) : + (is_afxdp_netdev(netdev) ? netdev_linux_rxq_xsk(netdev_->xsk[0], batch) : + netdev_linux_rxq_recv_sock(rx->fd, buffer))); if (retval) { if (retval != EAGAIN && retval != EMSGSIZE) { VLOG_WARN_RL(&rl, "error receiving Ethernet packet on %s: %s", netdev_rxq_get_name(rxq_), ovs_strerror(errno)); } dp_packet_delete(buffer); + } else if (is_afxdp_netdev(netdev)) { + dp_packet_batch_init_packet_fields(batch); + + if (batch->count != 0) + VLOG_INFO("%s AFXDP recv %lu packets", __func__, batch->count); } else { dp_packet_batch_init_packet(batch, buffer); } @@ -1208,6 +1833,66 @@ netdev_linux_rxq_drain(struct netdev_rxq *rxq_) } static int +netdev_linux_afxdp_batch_send(struct xdpsock *xsk, /* send to xdp socket! */ + struct dp_packet_batch *batch) +{ + struct dp_packet *packet; + struct xdp_uqueue *uq; + struct xdp_desc *r; + int ndescs = batch->count; + u32 id = NUM_FRAMES / 2; + + VLOG_INFO("%s send %lu packet to fd %d", __func__, batch->count, xsk->sfd); + VLOG_INFO("%s outstanding tx %d", __func__, xsk->outstanding_tx); + + /* cleanup and refill */ + uq = &xsk->tx; + r = uq->ring; + + // see tx_only and xq_enq_tx_only + if (xq_nb_free(uq, ndescs) < ndescs) { + VLOG_ERR("no free desc"); + return -ENOSPC; + } + + DP_PACKET_BATCH_FOR_EACH (packet, batch) { + void *umem_buf; + + u32 idx = uq->cached_prod++ & uq->mask; + // FIXME: find available id + umem_buf = xsk->umem->frames + (id << FRAME_SHIFT); + + memcpy(umem_buf, dp_packet_data(packet), dp_packet_size(packet)); + //vlog_hex_dump(dp_packet_data(packet), 14); + r[idx].addr = (id << FRAME_SHIFT); + r[idx].len = dp_packet_size(packet); + id++; +#if 0 /* avoid copy */ + } else { + u32 idx = uq->cached_prod++ & uq->mask; + + VLOG_WARN("packet from umem %p", dp_packet_base(packet)); + vlog_hex_dump(dp_packet_base(packet), 14); + + r[idx].addr = (u64)(u64 *)dp_packet_base(packet); + r[idx].len = dp_packet_size(packet); + } +#endif + } + u_smp_wmb(); + + *uq->producer = uq->cached_prod; + + xsk->outstanding_tx += batch->count; + + complete_tx_only(xsk); + print_xsk_stat(xsk); + + return 0; +} + + +static int netdev_linux_sock_batch_send(int sock, int ifindex, struct dp_packet_batch *batch) { @@ -1312,21 +1997,32 @@ netdev_linux_send(struct netdev *netdev_, int qid OVS_UNUSED, int error = 0; int sock = 0; - if (!is_tap_netdev(netdev_)) { + if (!is_tap_netdev(netdev_) && + !is_afxdp_netdev(netdev_)) { sock = af_packet_sock(); if (sock < 0) { error = -sock; + VLOG_WARN("%s af sock < 0", __func__); goto free_batch; } int ifindex = netdev_get_ifindex(netdev_); if (ifindex < 0) { + VLOG_WARN("%s ifindex < 0", __func__); error = -ifindex; goto free_batch; } error = netdev_linux_sock_batch_send(sock, ifindex, batch); + } else if (is_afxdp_netdev(netdev_)) { + struct xdpsock *xsk; + struct netdev_linux *netdev = netdev_linux_cast(netdev_); + + xsk = netdev->xsk[0]; // FIXME: always use queue 0 + VLOG_INFO_RL(&rl, "XXX %s sent to AFXDP dev xsk %d", __func__, xsk->sfd); + error = netdev_linux_afxdp_batch_send(xsk, batch); } else { + VLOG_INFO_RL(&rl, "%s sent to tap dev", __func__); error = netdev_linux_tap_batch_send(netdev_, batch); } if (error) { @@ -2426,12 +3122,22 @@ netdev_linux_set_xdp__(struct netdev *netdev_, const struct bpf_prog *prog, { struct netdev_linux *netdev = netdev_linux_cast(netdev_); const char *netdev_name = netdev_get_name(netdev_); - int ifindex = netdev->ifindex; + int ifindex; int error; - VLOG_DBG("Setting %s XDP filter %d on %s (ifindex %d)", prog->name, + error = get_ifindex(netdev_, &ifindex); + if (error) { + return ENODEV; + } + + + VLOG_INFO("Setting %s XDP filter %d on %s (ifindex %d)", prog->name, prog->fd, netdev_name, ifindex); + if (ifindex == 0) { + VLOG_WARN("skip device %s", netdev_name); + return 0; + } if (netdev->cache_valid & valid_bit) { error = *filter_error; if (error || (prog && prog->fd == *netdev_filter)) { @@ -2456,6 +3162,19 @@ out: } static int +netdev_linux_set_xskmap(struct netdev *netdev_, int xskmap_fd) +{ + struct netdev_linux *netdev = netdev_linux_cast(netdev_); + + ovs_assert(xskmap_fd != 0); + + VLOG_INFO("%s xsks_map fd %d", __func__, xskmap_fd); + netdev->xskmap_fd = xskmap_fd; + + return 0; +} + +static int netdev_linux_set_xdp(struct netdev *netdev_, const struct bpf_prog *prog) { struct netdev_linux *netdev = netdev_linux_cast(netdev_); @@ -3167,6 +3886,7 @@ netdev_linux_update_flags(struct netdev *netdev_, enum netdev_flags off, netdev_linux_set_policing, \ netdev_linux_set_filter, \ netdev_linux_set_xdp, \ + netdev_linux_set_xskmap, \ netdev_linux_get_qos_types, \ netdev_linux_get_qos_capabilities, \ netdev_linux_get_qos, \ @@ -3201,6 +3921,15 @@ netdev_linux_update_flags(struct netdev *netdev_, enum netdev_flags off, FLOW_OFFLOAD_API \ } +const struct netdev_class netdev_afxdp_class = + NETDEV_LINUX_CLASS( + "afxdp", + netdev_linux_construct, + netdev_linux_get_stats, + netdev_linux_get_features, + netdev_linux_get_status, + LINUX_FLOW_OFFLOAD_API); + const struct netdev_class netdev_linux_class = NETDEV_LINUX_CLASS( "system", diff --git a/lib/netdev-provider.h b/lib/netdev-provider.h index 3e53a5b76272..df92275d5aff 100644 --- a/lib/netdev-provider.h +++ b/lib/netdev-provider.h @@ -515,6 +515,7 @@ struct netdev_class { * * This function may be set to null if filters are not supported. */ int (*set_xdp)(struct netdev *netdev, const struct bpf_prog *); + int (*set_xskmap)(struct netdev *netdev, int xsks_map_fd); /* Adds to 'types' all of the forms of QoS supported by 'netdev', or leaves * it empty if 'netdev' does not support QoS. Any names added to 'types' @@ -884,6 +885,7 @@ extern const struct netdev_class netdev_bsd_class; extern const struct netdev_class netdev_windows_class; #else extern const struct netdev_class netdev_linux_class; +extern const struct netdev_class netdev_afxdp_class; #endif extern const struct netdev_class netdev_internal_class; extern const struct netdev_class netdev_tap_class; diff --git a/lib/netdev-vport.c b/lib/netdev-vport.c index 4341c89894a3..a61ff4b6808c 100644 --- a/lib/netdev-vport.c +++ b/lib/netdev-vport.c @@ -1000,6 +1000,9 @@ netdev_vport_set_xdp(struct netdev *netdev_, const struct bpf_prog *prog) ifindex = netdev_vport_get_ifindex(netdev_); error = bpf_set_link_xdp_fd(ifindex, prog->fd, XDP_FLAGS_SKB_MODE); + // FIXME / TODO + // update xsks_map_fd + ovs_mutex_unlock(&netdev->mutex); VLOG_INFO("%s %d", __func__, error); @@ -1057,6 +1060,7 @@ netdev_vport_set_xdp(struct netdev *netdev_, const struct bpf_prog *prog) NULL, /* set_policing */ \ netdev_vport_set_filter, /* set_filter */ \ netdev_vport_set_xdp, /* set_xdp */ \ + NULL, /* set_xskmap */ \ NULL, /* get_qos_types */ \ NULL, /* get_qos_capabilities */ \ NULL, /* get_qos */ \ diff --git a/lib/netdev.c b/lib/netdev.c index c44a1a683b92..826555dd92f6 100644 --- a/lib/netdev.c +++ b/lib/netdev.c @@ -142,6 +142,7 @@ netdev_initialize(void) #ifdef __linux__ netdev_register_provider(&netdev_linux_class); + netdev_register_provider(&netdev_afxdp_class); netdev_register_provider(&netdev_internal_class); netdev_register_provider(&netdev_tap_class); netdev_vport_tunnel_register(); @@ -1474,6 +1475,16 @@ netdev_set_xdp(struct netdev *netdev, struct bpf_prog *prog) : EOPNOTSUPP); } +/* set xsk map */ +int +netdev_set_xskmap(struct netdev *netdev, int xskmap) +{ + return (netdev->netdev_class->set_xskmap + ? netdev->netdev_class->set_xskmap(netdev, xskmap) + : EOPNOTSUPP); +} + + /* Adds to 'types' all of the forms of QoS supported by 'netdev', or leaves it * empty if 'netdev' does not support QoS. Any names added to 'types' should * be documented as valid for the "type" column in the "QoS" table in diff --git a/lib/netdev.h b/lib/netdev.h index 3388504d85c9..3a8d7118378e 100644 --- a/lib/netdev.h +++ b/lib/netdev.h @@ -320,6 +320,7 @@ int netdev_set_policing(struct netdev *, uint32_t kbits_rate, uint32_t kbits_burst); int netdev_set_filter(struct netdev *netdev, struct bpf_prog *prog); int netdev_set_xdp(struct netdev *netdev, struct bpf_prog *prog); +int netdev_set_xskmap(struct netdev *netdev, int xsks_map_fd); int netdev_get_qos_types(const struct netdev *, struct sset *types); int netdev_get_qos_capabilities(const struct netdev *, -- 2.7.4 _______________________________________________ dev mailing list [email protected] https://mail.openvswitch.org/mailman/listinfo/ovs-dev
