From: Björn Töpel <bjorn.to...@intel.com>

Add packet array like functionality that acts directly on the
user/kernel shared ring. We'll use this in the zerocopy Rx scenario.

TODO Better naming...

Signed-off-by: Björn Töpel <bjorn.to...@intel.com>
---
 net/xdp/xsk_ring.c |   3 +-
 net/xdp/xsk_ring.h | 136 ++++++++++++++++++++++++++++++++++++++++++++++++-----
 2 files changed, 126 insertions(+), 13 deletions(-)

diff --git a/net/xdp/xsk_ring.c b/net/xdp/xsk_ring.c
index 11b590506ddf..f154ddfabcfc 100644
--- a/net/xdp/xsk_ring.c
+++ b/net/xdp/xsk_ring.c
@@ -41,7 +41,8 @@ struct xsk_queue *xskq_create(u32 nentries)
        q->queue_ops.enqueue = xskq_enqueue_from_array;
        q->queue_ops.enqueue_completed = xskq_enqueue_completed_from_array;
        q->queue_ops.dequeue = xskq_dequeue_to_array;
-       q->used_idx = 0;
+       q->used_idx_head = 0;
+       q->used_idx_tail = 0;
        q->last_avail_idx = 0;
        q->ring_mask = nentries - 1;
        q->num_free = 0;
diff --git a/net/xdp/xsk_ring.h b/net/xdp/xsk_ring.h
index c9d61195ab2d..43c841d55093 100644
--- a/net/xdp/xsk_ring.h
+++ b/net/xdp/xsk_ring.h
@@ -27,7 +27,8 @@ struct xsk_queue {
        struct xsk_user_queue queue_ops;
        struct xdp_desc *ring;
 
-       u32 used_idx;
+       u32 used_idx_head;
+       u32 used_idx_tail;
        u32 last_avail_idx;
        u32 ring_mask;
        u32 num_free;
@@ -51,8 +52,7 @@ static inline unsigned int xsk_get_data_headroom(struct 
xsk_umem *umem)
  *
  * Returns true if the entry is a valid, otherwise false
  **/
-static inline bool xskq_is_valid_entry(struct xsk_queue *q,
-                                      struct xdp_desc *d)
+static inline bool xskq_is_valid_entry(struct xsk_queue *q, struct xdp_desc *d)
 {
        unsigned int buff_len;
 
@@ -115,7 +115,7 @@ static inline int xskq_nb_avail(struct xsk_queue *q, int 
dcnt)
 static inline int xskq_enqueue(struct xsk_queue *q,
                               const struct xdp_desc *d, int dcnt)
 {
-       unsigned int used_idx = q->used_idx;
+       unsigned int used_idx = q->used_idx_tail;
        int i;
 
        if (q->num_free < dcnt)
@@ -136,11 +136,12 @@ static inline int xskq_enqueue(struct xsk_queue *q,
        smp_wmb();
 
        for (i = dcnt - 1; i >= 0; i--) {
-               unsigned int idx = (q->used_idx + i) & q->ring_mask;
+               unsigned int idx = (q->used_idx_tail + i) & q->ring_mask;
 
                q->ring[idx].flags = d[i].flags & ~XDP_DESC_KERNEL;
        }
-       q->used_idx += dcnt;
+       q->used_idx_head += dcnt;
+       q->used_idx_tail += dcnt;
 
        return 0;
 }
@@ -157,7 +158,7 @@ static inline int xskq_enqueue_from_array(struct 
xsk_packet_array *a,
                                          u32 dcnt)
 {
        struct xsk_queue *q = (struct xsk_queue *)a->q_ops;
-       unsigned int used_idx = q->used_idx;
+       unsigned int used_idx = q->used_idx_tail;
        struct xdp_desc *d = a->items;
        int i;
 
@@ -180,12 +181,13 @@ static inline int xskq_enqueue_from_array(struct 
xsk_packet_array *a,
        smp_wmb();
 
        for (i = dcnt - 1; i >= 0; i--) {
-               unsigned int idx = (q->used_idx + i) & q->ring_mask;
+               unsigned int idx = (q->used_idx_tail + i) & q->ring_mask;
                unsigned int didx = (a->start + i) & a->mask;
 
                q->ring[idx].flags = d[didx].flags & ~XDP_DESC_KERNEL;
        }
-       q->used_idx += dcnt;
+       q->used_idx_tail += dcnt;
+       q->used_idx_head += dcnt;
 
        return 0;
 }
@@ -204,7 +206,7 @@ static inline int xskq_enqueue_completed_from_array(struct 
xsk_packet_array *a,
                                                    u32 dcnt)
 {
        struct xsk_queue *q = (struct xsk_queue *)a->q_ops;
-       unsigned int used_idx = q->used_idx;
+       unsigned int used_idx = q->used_idx_tail;
        struct xdp_desc *d = a->items;
        int i, j;
 
@@ -233,13 +235,14 @@ static inline int 
xskq_enqueue_completed_from_array(struct xsk_packet_array *a,
        smp_wmb();
 
        for (j = i - 1; j >= 0; j--) {
-               unsigned int idx = (q->used_idx + j) & q->ring_mask;
+               unsigned int idx = (q->used_idx_tail + j) & q->ring_mask;
                unsigned int didx = (a->start + j) & a->mask;
 
                q->ring[idx].flags = d[didx].flags & ~XDP_DESC_KERNEL;
        }
        q->num_free -= i;
-       q->used_idx += i;
+       q->used_idx_tail += i;
+       q->used_idx_head += i;
 
        return i;
 }
@@ -301,6 +304,115 @@ static inline void xskq_set_buff_info(struct xsk_queue *q,
        q->validation = validation;
 }
 
+/* --- */
+
+struct xskq_iter {
+       unsigned int head;
+       unsigned int tail;
+};
+
+static inline bool xskq_iter_end(struct xskq_iter *it)
+{
+       return it->tail == it->head;
+}
+
+static inline void xskq_iter_validate(struct xsk_queue *q, struct xskq_iter 
*it)
+{
+       while (it->head != it->tail) {
+               unsigned int idx = it->head & q->ring_mask;
+               struct xdp_desc *d, *du;
+
+               d = &q->ring[idx];
+               if (xskq_is_valid_entry(q, d))
+                       break;
+
+               /* Slow error path! */
+               du = &q->ring[q->used_idx_tail & q->ring_mask];
+               du->idx = d->idx;
+               du->len = d->len;
+               du->offset = d->offset;
+               du->error = EINVAL;
+
+               q->last_avail_idx++;
+               it->head++;
+
+               smp_wmb();
+
+               du->flags = d->flags & ~XDP_DESC_KERNEL;
+       }
+}
+
+static inline struct xskq_iter xskq_deq_iter(struct xsk_queue *q,
+                                            int cnt)
+{
+       struct xskq_iter it;
+
+       it.head = q->last_avail_idx;
+       it.tail = it.head + (unsigned int)xskq_nb_avail(q, cnt);
+
+       smp_rmb();
+
+       xskq_iter_validate(q, &it);
+
+       return it;
+}
+
+static inline void xskq_deq_iter_next(struct xsk_queue *q, struct xskq_iter 
*it)
+{
+       it->head++;
+       xskq_iter_validate(q, it);
+}
+
+static inline void xskq_deq_iter_done(struct xsk_queue *q, struct xskq_iter 
*it)
+{
+       int entries = it->head - q->last_avail_idx;
+
+       q->num_free += entries;
+       q->last_avail_idx = it->head;
+}
+
+static inline u32 xskq_deq_iter_get_id(struct xsk_queue *q,
+                                      struct xskq_iter *it)
+{
+       return q->ring[it->head & q->ring_mask].idx;
+}
+
+static inline void xskq_return_id(struct xsk_queue *q, u32 id)
+{
+       struct xdp_desc d = { .idx = id };
+
+       WARN(xskq_enqueue(q, &d, 1), "%s failed!\n", __func__);
+}
+
+static inline void xskq_enq_lazy(struct xsk_queue *q,
+                                u32 id, u32 len, u16 offset)
+{
+       unsigned int idx;
+
+       if (q->num_free == 0) {
+               WARN(1, "%s xsk_queue deq/enq out of sync!\n", __func__);
+               return;
+       }
+
+       q->num_free--;
+       idx = (q->used_idx_tail++) & q->ring_mask;
+       q->ring[idx].idx = id;
+       q->ring[idx].len = len;
+       q->ring[idx].offset = offset;
+       q->ring[idx].error = 0;
+}
+
+static inline void xskq_enq_flush(struct xsk_queue *q)
+{
+       smp_wmb();
+
+       while (q->used_idx_head != q->used_idx_tail) {
+               unsigned int idx = (q->used_idx_head++) & q->ring_mask;
+
+               q->ring[idx].flags = 0;
+       }
+}
+
 struct xsk_queue *xskq_create(u32 nentries);
 void xskq_destroy(struct xsk_queue *q_ops);
 
-- 
2.14.1

Reply via email to