On Fri, May 22, 2026 at 6:44 PM Breno Leitao <[email protected]> wrote:
>
> anon_pipe_write() takes pipe->mutex (aka "mutex protecting the whole
> thing") and then, from the per-iteration anon_pipe_get_page() helper,
> used to call alloc_page(GFP_HIGHUSER | __GFP_ACCOUNT) once per page
> while still holding it.
>
> That allocation can sleep doing direct reclaim and/or runs memcg
> charging, which extends the critical section and stalls a concurrent
> reader on the very same mutex.
>
> Just pre-alloc the required pages before the lock in an array and just pop
> them inside the lock.
>
> This can improve the pipe throughput up to 48% and reduce the
> latency in 33%, easily seen when there is memory pressure and direct
> reclaim.
>
> Signed-off-by: Breno Leitao <[email protected]>
> ---
>  fs/pipe.c | 105 
> ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--
>  1 file changed, 102 insertions(+), 3 deletions(-)
>
> diff --git a/fs/pipe.c b/fs/pipe.c
> index 9841648c9cf3e..cff255217bbfe 100644
> --- a/fs/pipe.c
> +++ b/fs/pipe.c
> @@ -111,16 +111,76 @@ void pipe_double_lock(struct pipe_inode_info *pipe1,
>         pipe_lock(pipe2);
>  }
>
> -static struct page *anon_pipe_get_page(struct pipe_inode_info *pipe)
> +#define PIPE_PREALLOC_MAX 8
> +
> +struct anon_pipe_prealloc {
> +       struct page *pages[PIPE_PREALLOC_MAX];
> +       unsigned int count;
> +};
> +
> +/*
> + * Pre-allocate pages outside pipe->mutex for multi-page writes.
> + * alloc_page() with GFP_HIGHUSER can sleep in reclaim and runs memcg
> + * charging; doing it under the mutex stalls a concurrent reader.
> + *
> + * Loop alloc_page() instead of alloc_pages_bulk_*(): the bulk path refuses
> + * __GFP_ACCOUNT under memcg (see commit 8dcb3060d81d "memcg: page_alloc:
> + * skip bulk allocator for __GFP_ACCOUNT") and silently degrades to a single
> + * page. A per-page loop keeps memcg accounting and the task NUMA mempolicy
> + * honoured for every page; the per-call overhead is small compared to the
> + * pipe->mutex hold-time being shrunk. Any shortfall is covered by the
> + * in-lock alloc_page() fallback in anon_pipe_get_page().
> + */
> +static void anon_pipe_get_page_prealloc(struct anon_pipe_prealloc *prealloc,
> +                                       size_t total_len)
> +{
> +       unsigned int want, i;
> +       struct page *page;
> +
> +       prealloc->count = 0;
> +       if (total_len <= PAGE_SIZE)
> +               return;
> +
> +       want = min_t(unsigned int, DIV_ROUND_UP(total_len, PAGE_SIZE),
> +                    PIPE_PREALLOC_MAX);
> +
> +       for (i = 0; i < want; i++) {
> +               page = alloc_page(GFP_HIGHUSER | __GFP_ACCOUNT);
> +               if (!page)
> +                       break;
> +               prealloc->pages[prealloc->count++] = page;
> +       }
> +}
> +
> +static struct page *anon_pipe_prealloc_pop(struct anon_pipe_prealloc 
> *prealloc)
> +{
> +       if (!prealloc->count)
> +               return NULL;
> +
> +       prealloc->count--;
> +
> +       return prealloc->pages[prealloc->count];
> +}
> +
> +static struct page *anon_pipe_get_page(struct pipe_inode_info *pipe,
> +                                      struct anon_pipe_prealloc *prealloc)
>  {
> +       struct page *page;
> +
> +       /* Drain prealloc first to keep tmp_page[] hot for later small 
> writes. */
> +       page = anon_pipe_prealloc_pop(prealloc);
> +       if (page)
> +               return page;
> +
>         for (int i = 0; i < ARRAY_SIZE(pipe->tmp_page); i++) {
>                 if (pipe->tmp_page[i]) {
> -                       struct page *page = pipe->tmp_page[i];
> +                       page = pipe->tmp_page[i];
>                         pipe->tmp_page[i] = NULL;
>                         return page;
>                 }
>         }
>
> +       /* FWIW: This is called with pipe->mutex held */
>         return alloc_page(GFP_HIGHUSER | __GFP_ACCOUNT);
>  }
>
> @@ -139,6 +199,38 @@ static void anon_pipe_put_page(struct pipe_inode_info 
> *pipe,
>         put_page(page);
>  }
>
> +/*
> + * Stash leftover prealloc pages in tmp_page[] so the next write to this
> + * pipe gets a hot page without entering the allocator.
> + */
> +static void anon_pipe_refill_tmp_pages(struct pipe_inode_info *pipe,
> +                                      struct anon_pipe_prealloc *prealloc)
> +{
> +       int i, idx;
> +
> +       if (!prealloc->count)
> +               return;
> +
> +       for (i = 0; i < ARRAY_SIZE(pipe->tmp_page); i++) {
> +               if (pipe->tmp_page[i])
> +                       continue;
> +               if (!prealloc->count)
> +                       return;
> +               idx = --prealloc->count;
> +               pipe->tmp_page[i] = prealloc->pages[idx];
> +               prealloc->pages[idx] = NULL;
> +       }
> +}
> +
> +/* Runs after mutex_unlock() to keep put_page() out of the critical section. 
> */
> +static void anon_pipe_free_pages(struct anon_pipe_prealloc *prealloc)
> +{
> +       while (prealloc->count) {
> +               prealloc->count--;
> +               put_page(prealloc->pages[prealloc->count]);
> +       }
> +}
> +
>  static void anon_pipe_buf_release(struct pipe_inode_info *pipe,
>                                   struct pipe_buffer *buf)
>  {
> @@ -432,6 +524,7 @@ anon_pipe_write(struct kiocb *iocb, struct iov_iter *from)
>  {
>         struct file *filp = iocb->ki_filp;
>         struct pipe_inode_info *pipe = filp->private_data;
> +       struct anon_pipe_prealloc prealloc;
>         unsigned int head;
>         ssize_t ret = 0;
>         size_t total_len = iov_iter_count(from);
> @@ -455,6 +548,8 @@ anon_pipe_write(struct kiocb *iocb, struct iov_iter *from)
>         if (unlikely(total_len == 0))
>                 return 0;
>
> +       anon_pipe_get_page_prealloc(&prealloc, total_len);
> +
>         mutex_lock(&pipe->mutex);
>
>         if (!pipe->readers) {
> @@ -512,7 +607,7 @@ anon_pipe_write(struct kiocb *iocb, struct iov_iter *from)
>                         struct page *page;
>                         int copied;
>
> -                       page = anon_pipe_get_page(pipe);
> +                       page = anon_pipe_get_page(pipe, &prealloc);
>                         if (unlikely(!page)) {
>                                 if (!ret)
>                                         ret = -ENOMEM;
> @@ -566,7 +661,9 @@ anon_pipe_write(struct kiocb *iocb, struct iov_iter *from)
>                  * after waiting we need to re-check whether the pipe
>                  * become empty while we dropped the lock.
>                  */
> +               anon_pipe_refill_tmp_pages(pipe, &prealloc);
>                 mutex_unlock(&pipe->mutex);
> +               anon_pipe_free_pages(&prealloc);
>                 if (was_empty)
>                         wake_up_interruptible_sync_poll(&pipe->rd_wait, 
> EPOLLIN | EPOLLRDNORM);
>                 kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
> @@ -576,9 +673,11 @@ anon_pipe_write(struct kiocb *iocb, struct iov_iter 
> *from)
>                 wake_next_writer = true;
>         }
>  out:
> +       anon_pipe_refill_tmp_pages(pipe, &prealloc);
>         if (pipe_is_full(pipe))
>                 wake_next_writer = false;

maybe the new call would be nicer moved below pipe_is_full, but that's
just cosmetics

>         mutex_unlock(&pipe->mutex);
> +       anon_pipe_free_pages(&prealloc);
>
>         /*
>          * If we do do a wakeup event, we do a 'sync' wakeup, because we
>
> --
> 2.54.0
>

Reviewed-by: Mateusz Guzik <[email protected]>

Reply via email to