Hi Ilya, Thanks for the feedback.
<snip> > > +static struct dp_packet_afxdp * > > +dp_packet_cast_afxdp(const struct dp_packet *d) > > +{ > > + ovs_assert(d->source == DPBUF_AFXDP); > > + return CONTAINER_OF(d, struct dp_packet_afxdp, packet); > > +} > > + > > +static inline void > > +prepare_fill_queue(struct xsk_socket_info *xsk_info) > > +{ > > + struct umem_elem *elems[BATCH_SIZE]; > > + struct xsk_umem_info *umem; > > + unsigned int idx_fq; > > + int nb_free; > > + int i, ret; > > + > > + umem = xsk_info->umem; > > + > > + nb_free = PROD_NUM_DESCS / 2; > > + if (xsk_prod_nb_free(&umem->fq, nb_free) < nb_free) { > > + return; > > + } > > > Why you're using 'PROD_NUM_DESCS / 2' here? I don't want to be too aggressive to refill the fq. > IIUC, we're keeping fill queue half-loaded. Isn't it better to > use BATCH_SIZE instead? > yes, that also works. > > > + > > + ret = umem_elem_pop_n(&umem->mpool, BATCH_SIZE, (void **)elems); > > + if (OVS_UNLIKELY(ret)) { > > + return; > > + } > > + > > + if (!xsk_ring_prod__reserve(&umem->fq, BATCH_SIZE, &idx_fq)) { > > + umem_elem_push_n(&umem->mpool, BATCH_SIZE, (void **)elems); > > + COVERAGE_INC(afxdp_fq_full); > > + return; > > + } > > + > > + for (i = 0; i < BATCH_SIZE; i++) { > > + uint64_t index; > > + struct umem_elem *elem; > > + > > + elem = elems[i]; > > + index = (uint64_t)((char *)elem - (char *)umem->buffer); > > + ovs_assert((index & FRAME_SHIFT_MASK) == 0); > > + *xsk_ring_prod__fill_addr(&umem->fq, idx_fq) = index; > > + > > + idx_fq++; > > + } > > + xsk_ring_prod__submit(&umem->fq, BATCH_SIZE); > > +} > > + > > +int > > +netdev_afxdp_rxq_recv(struct netdev_rxq *rxq_, struct dp_packet_batch > > *batch, > > + int *qfill) > > +{ > > + struct netdev_rxq_linux *rx = netdev_rxq_linux_cast(rxq_); > > + struct netdev *netdev = rx->up.netdev; > > + struct netdev_linux *dev = netdev_linux_cast(netdev); > > + struct xsk_socket_info *xsk_info; > > + struct xsk_umem_info *umem; > > + uint32_t idx_rx = 0; > > + int qid = rxq_->queue_id; > > + unsigned int rcvd, i; > > + > > + xsk_info = dev->xsks[qid]; > > + if (!xsk_info || !xsk_info->xsk) { > > + return 0; > > Need to return EAGAIN. OK > > > + } > > + > > + prepare_fill_queue(xsk_info); > > + > > + umem = xsk_info->umem; > > + rx->fd = xsk_socket__fd(xsk_info->xsk); > > + > > + rcvd = xsk_ring_cons__peek(&xsk_info->rx, BATCH_SIZE, &idx_rx); > > + if (!rcvd) { > > + return 0; > > Need to return EAGAIN. OK > > > + } > > + > > + /* Setup a dp_packet batch from descriptors in RX queue */ > > + for (i = 0; i < rcvd; i++) { > > + uint64_t addr = xsk_ring_cons__rx_desc(&xsk_info->rx, > > idx_rx)->addr; > > + uint32_t len = xsk_ring_cons__rx_desc(&xsk_info->rx, idx_rx)->len; > > + char *pkt = xsk_umem__get_data(umem->buffer, addr); > > + uint64_t index; > > + > > + struct dp_packet_afxdp *xpacket; > > + struct dp_packet *packet; > > + > > + index = addr >> FRAME_SHIFT; > > + xpacket = UMEM2XPKT(umem->xpool.array, index); > > + packet = &xpacket->packet; > > + > > + /* Initialize the struct dp_packet */ > > + dp_packet_use_afxdp(packet, pkt, FRAME_SIZE - FRAME_HEADROOM); > > + dp_packet_set_size(packet, len); > > + > > + /* Add packet into batch, increase batch->count */ > > + dp_packet_batch_add(batch, packet); > > + > > + idx_rx++; > > + } > > + /* Release the RX queue */ > > + xsk_ring_cons__release(&xsk_info->rx, rcvd); > > + > > + if (qfill) { > > + /* TODO: return the number of remaining packets in the queue. */ > > + *qfill = 0; > > + } > > + > > +#ifdef AFXDP_DEBUG > > + log_xsk_stat(xsk_info); > > +#endif > > + return 0; > > +} > > + > > +static inline int > > +kick_tx(struct xsk_socket_info *xsk_info) > > +{ > > + int ret; > > + > > + if (!xsk_info->outstanding_tx) { > > + return 0; > > + } > > + > > + /* This causes system call into kernel's xsk_sendmsg, and > > + * xsk_generic_xmit (skb mode) or xsk_async_xmit (driver mode). > > + */ > > + ret = sendto(xsk_socket__fd(xsk_info->xsk), NULL, 0, MSG_DONTWAIT, > > + NULL, 0); > > + if (OVS_UNLIKELY(ret < 0)) { > > + if (errno == ENXIO || errno == ENOBUFS || errno == EOPNOTSUPP) { > > + return errno; > > + } > > + } > > + /* no error, or EBUSY or EAGAIN */ > > + return 0; > > +} > > + > > +void > > +free_afxdp_buf(struct dp_packet *p) > > +{ > > + struct dp_packet_afxdp *xpacket; > > + uintptr_t addr; > > + > > + xpacket = dp_packet_cast_afxdp(p); > > + if (xpacket->mpool) { > > + void *base = dp_packet_base(p); > > + > > + addr = (uintptr_t)base & (~FRAME_SHIFT_MASK); > > + umem_elem_push(xpacket->mpool, (void *)addr); > > + } > > +} > > + > > +static void > > +free_afxdp_buf_batch(struct dp_packet_batch *batch) > > +{ > > + struct dp_packet_afxdp *xpacket = NULL; > > + struct dp_packet *packet; > > + void *elems[BATCH_SIZE]; > > + uintptr_t addr; > > + > > + DP_PACKET_BATCH_FOR_EACH (i, packet, batch) { > > + xpacket = dp_packet_cast_afxdp(packet); > > + if (xpacket->mpool) { > > > Above checking seems useless. Also, if any packet will be > skipped, we'll push trash pointer to mpool. > Thanks, will skip it. > If you're worrying about the value, you may just assert: > > ovs_assert(xpacket->mpool); > > > + void *base = dp_packet_base(packet); > > + > > + addr = (uintptr_t)base & (~FRAME_SHIFT_MASK); > > + elems[i] = (void *)addr; > > + } > > + } > > + umem_elem_push_n(xpacket->mpool, batch->count, elems); > > + dp_packet_batch_init(batch); > > +} > > + > > +static inline bool > > +check_free_batch(struct dp_packet_batch *batch) > > +{ > > + struct umem_pool *first_mpool = NULL; > > + struct dp_packet_afxdp *xpacket; > > + struct dp_packet *packet; > > + > > + DP_PACKET_BATCH_FOR_EACH (i, packet, batch) { > > + if (packet->source != DPBUF_AFXDP) { > > + return false; > > + } > > + xpacket = dp_packet_cast_afxdp(packet); > > + if (i == 0) { > > + first_mpool = xpacket->mpool; > > + continue; > > + } > > + if (xpacket->mpool != first_mpool) { > > + return false; > > + } > > + } > > + /* All packets are DPBUF_AFXDP and from the same mpool */ > > + return true; > > +} > > + > > +static inline void > > +afxdp_complete_tx(struct xsk_socket_info *xsk_info) > > +{ > > + struct umem_elem *elems_push[BATCH_SIZE]; > > + struct xsk_umem_info *umem; > > + uint32_t idx_cq = 0; > > + int tx_to_free = 0; > > + int tx_done, j; > > + > > + umem = xsk_info->umem; > > + tx_done = xsk_ring_cons__peek(&umem->cq, BATCH_SIZE, &idx_cq); > > + > > + /* Recycle back to umem pool */ > > + for (j = 0; j < tx_done; j++) { > > + struct umem_elem *elem; > > + uint64_t *addr; > > + > > + addr = (uint64_t *)xsk_ring_cons__comp_addr(&umem->cq, idx_cq++); > > + if (*addr == 0) { > > 'addr' is an offset from 'umem->buffer'. Zero seems a valid value. > Maybe it's better to use UINT64_MAX instead? Thanks a lot! I shouldn't use zero, will switch to use UINT64_MAX. > > > + /* The elem has been pushed already */ > > + continue; > > + } > > + elem = ALIGNED_CAST(struct umem_elem *, > > + (char *)umem->buffer + *addr); > > + elems_push[tx_to_free] = elem; > > + *addr = 0; /* Mark as pushed */ > > + tx_to_free++; > > + } > > + > > + umem_elem_push_n(&umem->mpool, tx_to_free, (void **)elems_push); > > + > > + if (tx_done > 0) { > > + xsk_ring_cons__release(&umem->cq, tx_done); > > + xsk_info->outstanding_tx -= tx_done; > > We, probably, should substract the 'tx_to_free' instead and do this > outside of the 'if'. > OK --William _______________________________________________ dev mailing list d...@openvswitch.org https://mail.openvswitch.org/mailman/listinfo/ovs-dev