anon_pipe_write() takes pipe->mutex (aka "mutex protecting the whole
thing") and then, from the per-iteration anon_pipe_get_page() helper,
used to call alloc_page(GFP_HIGHUSER | __GFP_ACCOUNT) once per page
while still holding it.

That allocation can sleep doing direct reclaim and/or runs memcg
charging, which extends the critical section and stalls a concurrent
reader on the very same mutex.

Just pre-alloc the required pages before the lock in an array and just pop
them inside the lock.

This can improve the pipe throughput up to 48% and reduce the
latency in 33%, easily seen when there is memory pressure and direct
reclaim.

Signed-off-by: Breno Leitao <[email protected]>
---
 fs/pipe.c | 105 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 102 insertions(+), 3 deletions(-)

diff --git a/fs/pipe.c b/fs/pipe.c
index 9841648c9cf3e..cff255217bbfe 100644
--- a/fs/pipe.c
+++ b/fs/pipe.c
@@ -111,16 +111,76 @@ void pipe_double_lock(struct pipe_inode_info *pipe1,
        pipe_lock(pipe2);
 }
 
-static struct page *anon_pipe_get_page(struct pipe_inode_info *pipe)
+#define PIPE_PREALLOC_MAX 8
+
+struct anon_pipe_prealloc {
+       struct page *pages[PIPE_PREALLOC_MAX];
+       unsigned int count;
+};
+
+/*
+ * Pre-allocate pages outside pipe->mutex for multi-page writes.
+ * alloc_page() with GFP_HIGHUSER can sleep in reclaim and runs memcg
+ * charging; doing it under the mutex stalls a concurrent reader.
+ *
+ * Loop alloc_page() instead of alloc_pages_bulk_*(): the bulk path refuses
+ * __GFP_ACCOUNT under memcg (see commit 8dcb3060d81d "memcg: page_alloc:
+ * skip bulk allocator for __GFP_ACCOUNT") and silently degrades to a single
+ * page. A per-page loop keeps memcg accounting and the task NUMA mempolicy
+ * honoured for every page; the per-call overhead is small compared to the
+ * pipe->mutex hold-time being shrunk. Any shortfall is covered by the
+ * in-lock alloc_page() fallback in anon_pipe_get_page().
+ */
+static void anon_pipe_get_page_prealloc(struct anon_pipe_prealloc *prealloc,
+                                       size_t total_len)
+{
+       unsigned int want, i;
+       struct page *page;
+
+       prealloc->count = 0;
+       if (total_len <= PAGE_SIZE)
+               return;
+
+       want = min_t(unsigned int, DIV_ROUND_UP(total_len, PAGE_SIZE),
+                    PIPE_PREALLOC_MAX);
+
+       for (i = 0; i < want; i++) {
+               page = alloc_page(GFP_HIGHUSER | __GFP_ACCOUNT);
+               if (!page)
+                       break;
+               prealloc->pages[prealloc->count++] = page;
+       }
+}
+
+static struct page *anon_pipe_prealloc_pop(struct anon_pipe_prealloc *prealloc)
+{
+       if (!prealloc->count)
+               return NULL;
+
+       prealloc->count--;
+
+       return prealloc->pages[prealloc->count];
+}
+
+static struct page *anon_pipe_get_page(struct pipe_inode_info *pipe,
+                                      struct anon_pipe_prealloc *prealloc)
 {
+       struct page *page;
+
+       /* Drain prealloc first to keep tmp_page[] hot for later small writes. 
*/
+       page = anon_pipe_prealloc_pop(prealloc);
+       if (page)
+               return page;
+
        for (int i = 0; i < ARRAY_SIZE(pipe->tmp_page); i++) {
                if (pipe->tmp_page[i]) {
-                       struct page *page = pipe->tmp_page[i];
+                       page = pipe->tmp_page[i];
                        pipe->tmp_page[i] = NULL;
                        return page;
                }
        }
 
+       /* FWIW: This is called with pipe->mutex held */
        return alloc_page(GFP_HIGHUSER | __GFP_ACCOUNT);
 }
 
@@ -139,6 +199,38 @@ static void anon_pipe_put_page(struct pipe_inode_info 
*pipe,
        put_page(page);
 }
 
+/*
+ * Stash leftover prealloc pages in tmp_page[] so the next write to this
+ * pipe gets a hot page without entering the allocator.
+ */
+static void anon_pipe_refill_tmp_pages(struct pipe_inode_info *pipe,
+                                      struct anon_pipe_prealloc *prealloc)
+{
+       int i, idx;
+
+       if (!prealloc->count)
+               return;
+
+       for (i = 0; i < ARRAY_SIZE(pipe->tmp_page); i++) {
+               if (pipe->tmp_page[i])
+                       continue;
+               if (!prealloc->count)
+                       return;
+               idx = --prealloc->count;
+               pipe->tmp_page[i] = prealloc->pages[idx];
+               prealloc->pages[idx] = NULL;
+       }
+}
+
+/* Runs after mutex_unlock() to keep put_page() out of the critical section. */
+static void anon_pipe_free_pages(struct anon_pipe_prealloc *prealloc)
+{
+       while (prealloc->count) {
+               prealloc->count--;
+               put_page(prealloc->pages[prealloc->count]);
+       }
+}
+
 static void anon_pipe_buf_release(struct pipe_inode_info *pipe,
                                  struct pipe_buffer *buf)
 {
@@ -432,6 +524,7 @@ anon_pipe_write(struct kiocb *iocb, struct iov_iter *from)
 {
        struct file *filp = iocb->ki_filp;
        struct pipe_inode_info *pipe = filp->private_data;
+       struct anon_pipe_prealloc prealloc;
        unsigned int head;
        ssize_t ret = 0;
        size_t total_len = iov_iter_count(from);
@@ -455,6 +548,8 @@ anon_pipe_write(struct kiocb *iocb, struct iov_iter *from)
        if (unlikely(total_len == 0))
                return 0;
 
+       anon_pipe_get_page_prealloc(&prealloc, total_len);
+
        mutex_lock(&pipe->mutex);
 
        if (!pipe->readers) {
@@ -512,7 +607,7 @@ anon_pipe_write(struct kiocb *iocb, struct iov_iter *from)
                        struct page *page;
                        int copied;
 
-                       page = anon_pipe_get_page(pipe);
+                       page = anon_pipe_get_page(pipe, &prealloc);
                        if (unlikely(!page)) {
                                if (!ret)
                                        ret = -ENOMEM;
@@ -566,7 +661,9 @@ anon_pipe_write(struct kiocb *iocb, struct iov_iter *from)
                 * after waiting we need to re-check whether the pipe
                 * become empty while we dropped the lock.
                 */
+               anon_pipe_refill_tmp_pages(pipe, &prealloc);
                mutex_unlock(&pipe->mutex);
+               anon_pipe_free_pages(&prealloc);
                if (was_empty)
                        wake_up_interruptible_sync_poll(&pipe->rd_wait, EPOLLIN 
| EPOLLRDNORM);
                kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
@@ -576,9 +673,11 @@ anon_pipe_write(struct kiocb *iocb, struct iov_iter *from)
                wake_next_writer = true;
        }
 out:
+       anon_pipe_refill_tmp_pages(pipe, &prealloc);
        if (pipe_is_full(pipe))
                wake_next_writer = false;
        mutex_unlock(&pipe->mutex);
+       anon_pipe_free_pages(&prealloc);
 
        /*
         * If we do do a wakeup event, we do a 'sync' wakeup, because we

-- 
2.54.0


Reply via email to