Implement new ring buffer produce and consume functions for tun and tap
drivers that provide lockless producer-consumer synchronization and
netdev queue management to prevent ptr_ring tail drop and permanent
starvation.

- tun_ring_produce(): Produces packets to the ptr_ring with proper memory
  barriers and proactively stops the netdev queue when the ring is about
  to become full.

- __tun_ring_consume() / __tap_ring_consume(): Internal consume functions
  that check if the netdev queue was stopped due to a full ring, and wake
  it when space becomes available. Uses memory barriers to ensure proper
  ordering between producer and consumer.

- tun_ring_consume() / tap_ring_consume(): Wrapper functions that acquire
  the consumer lock before calling the internal consume functions.

Key features:
- Proactive queue stopping using __ptr_ring_full_next() to stop the queue
  before it becomes completely full.
- Not stopping the queue when the ptr_ring is full already, because if
  the consumer empties all entries in the meantime, stopping the queue
  would cause permanent starvation.
- Conditional queue waking using __ptr_ring_consume_created_space() to
  wake the queue only when space is actually created in the ring.
- Prevents permanent starvation by ensuring the queue is also woken when
  the ring becomes empty, which can happen when racing the producer.

NB: __always_unused on unused functions, to be removed later in the
series to not break bisectability.

Co-developed-by: Tim Gebauer <[email protected]>
Signed-off-by: Tim Gebauer <[email protected]>
Co-developed by: Jon Kohler <[email protected]>
Signed-off-by: Jon Kohler <[email protected]>
Signed-off-by: Simon Schippers <[email protected]>
---
 drivers/net/tap.c |  63 +++++++++++++++++++++++++++++
 drivers/net/tun.c | 101 ++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 164 insertions(+)

diff --git a/drivers/net/tap.c b/drivers/net/tap.c
index 1197f245e873..c370a02789eb 100644
--- a/drivers/net/tap.c
+++ b/drivers/net/tap.c
@@ -753,6 +753,69 @@ static ssize_t tap_put_user(struct tap_queue *q,
        return ret ? ret : total;
 }
 
+/*
+ * Consume a packet from the transmit ring. Callers must hold
+ * the consumer_lock of the ptr_ring. If the ring was full and the
+ * queue was stopped, this may wake up the queue if space is created.
+ */
+static void *__tap_ring_consume(struct tap_queue *q)
+{
+       struct ptr_ring *ring = &q->ring;
+       struct netdev_queue *txq;
+       struct net_device *dev;
+       bool stopped;
+       void *ptr;
+
+       ptr = __ptr_ring_peek(ring);
+       if (!ptr)
+               return ptr;
+
+       /* Paired with smp_wmb() in the ring producer path. Ensures we
+        * see any updated netdev queue state caused by a full ring.
+        * Needed for proper synchronization between the ring and the
+        * netdev queue.
+        */
+       smp_rmb();
+       rcu_read_lock();
+       dev = rcu_dereference(q->tap)->dev;
+       txq = netdev_get_tx_queue(dev, q->queue_index);
+       stopped = netif_tx_queue_stopped(txq);
+
+       /* Ensures the read for a stopped queue completes before the
+        * discard, so that we don't miss the window to wake the queue if
+        * needed.
+        */
+       smp_rmb();
+       __ptr_ring_discard_one(ring);
+
+       /* If the queue was stopped (meaning the producer couldn't have
+        * inserted new entries just now), and we have actually created
+        * space in the ring, or the ring is now empty (due to a race
+        * with the producer), then it is now safe to wake the queue.
+        */
+       if (unlikely(stopped &&
+                    (__ptr_ring_consume_created_space(ring) ||
+                     __ptr_ring_empty(ring)))) {
+               /* Paired with smp_rmb() in tun_ring_produce. */
+               smp_wmb();
+               netif_tx_wake_queue(txq);
+       }
+       rcu_read_unlock();
+
+       return ptr;
+}
+
+static __always_unused void *tap_ring_consume(struct tap_queue *q)
+{
+       void *ptr;
+
+       spin_lock(&q->ring.consumer_lock);
+       ptr = __tap_ring_consume(q);
+       spin_unlock(&q->ring.consumer_lock);
+
+       return ptr;
+}
+
 static ssize_t tap_do_read(struct tap_queue *q,
                           struct iov_iter *to,
                           int noblock, struct sk_buff *skb)
diff --git a/drivers/net/tun.c b/drivers/net/tun.c
index 8192740357a0..3b9d8d406ff5 100644
--- a/drivers/net/tun.c
+++ b/drivers/net/tun.c
@@ -999,6 +999,107 @@ static unsigned int run_ebpf_filter(struct tun_struct 
*tun,
        return len;
 }
 
+/* Produce a packet into the transmit ring. If the ring becomes full, the
+ * netdev queue is stopped until the consumer wakes it again.
+ */
+static __always_unused int tun_ring_produce(struct ptr_ring *ring,
+                                           struct netdev_queue *queue,
+                                           struct sk_buff *skb)
+{
+       int ret;
+
+       spin_lock(&ring->producer_lock);
+
+       /* Pairs with smp_wmb() in __tun_ring_consume/__tap_ring_consume.
+        * Ensures that freed space by the consumer is visible.
+        */
+       smp_rmb();
+
+       /* Do not stop the netdev queue if the ptr_ring is full already.
+        * The consumer could empty out the ptr_ring in the meantime
+        * without noticing the stopped netdev queue, resulting in a
+        * stopped netdev queue and an empty ptr_ring. In this case the
+        * netdev queue would stay stopped forever.
+        */
+       if (unlikely(!__ptr_ring_full(ring) &&
+                    __ptr_ring_full_next(ring)))
+               netif_tx_stop_queue(queue);
+
+       /* Note: __ptr_ring_produce has an internal smp_wmb() to synchronize the
+        * state with the consumer. This ensures that after adding an entry to
+        * the ring, any stopped queue state is visible to the consumer after
+        * dequeueing.
+        */
+       ret = __ptr_ring_produce(ring, skb);
+
+       spin_unlock(&ring->producer_lock);
+
+       return ret;
+}
+
+/*
+ * Consume a packet from the transmit ring. Callers must hold
+ * the consumer_lock of the ptr_ring. If the ring was full and the
+ * queue was stopped, this may wake up the queue if space is created.
+ */
+static void *__tun_ring_consume(struct tun_file *tfile)
+{
+       struct ptr_ring *ring = &tfile->tx_ring;
+       struct netdev_queue *txq;
+       struct net_device *dev;
+       bool stopped;
+       void *ptr;
+
+       ptr = __ptr_ring_peek(ring);
+       if (!ptr)
+               return ptr;
+
+       /* Paired with smp_wmb() in the ring producer path. Ensures we
+        * see any updated netdev queue state caused by a full ring.
+        * Needed for proper synchronization between the ring and the
+        * netdev queue.
+        */
+       smp_rmb();
+       rcu_read_lock();
+       dev = rcu_dereference(tfile->tun)->dev;
+       txq = netdev_get_tx_queue(dev, tfile->queue_index);
+       stopped = netif_tx_queue_stopped(txq);
+
+       /* Ensures the read for a stopped queue completes before the
+        * discard, so that we don't miss the window to wake the queue if
+        * needed.
+        */
+       smp_rmb();
+       __ptr_ring_discard_one(ring);
+
+       /* If the queue was stopped (meaning the producer couldn't have
+        * inserted new entries just now), and we have actually created
+        * space in the ring, or the ring is now empty (due to a race
+        * with the producer), then it is now safe to wake the queue.
+        */
+       if (unlikely(stopped &&
+                    (__ptr_ring_consume_created_space(ring) ||
+                     __ptr_ring_empty(ring)))) {
+               /* Paired with smp_rmb() in tun_ring_produce. */
+               smp_wmb();
+               netif_tx_wake_queue(txq);
+       }
+       rcu_read_unlock();
+
+       return ptr;
+}
+
+static void __always_unused *tun_ring_consume(struct tun_file *tfile)
+{
+       void *ptr;
+
+       spin_lock(&tfile->tx_ring.consumer_lock);
+       ptr = __tun_ring_consume(tfile);
+       spin_unlock(&tfile->tx_ring.consumer_lock);
+
+       return ptr;
+}
+
 /* Net device start xmit */
 static netdev_tx_t tun_net_xmit(struct sk_buff *skb, struct net_device *dev)
 {
-- 
2.43.0


Reply via email to