On Fri, 2026-05-22 at 09:44 -0700, Breno Leitao wrote:
> anon_pipe_write() takes pipe->mutex (aka "mutex protecting the whole
> thing") and then, from the per-iteration anon_pipe_get_page() helper,
> used to call alloc_page(GFP_HIGHUSER | __GFP_ACCOUNT) once per page
> while still holding it.
> 
> That allocation can sleep doing direct reclaim and/or runs memcg
> charging, which extends the critical section and stalls a concurrent
> reader on the very same mutex.
> 
> Just pre-alloc the required pages before the lock in an array and just pop
> them inside the lock.
> 
> This can improve the pipe throughput up to 48% and reduce the
> latency in 33%, easily seen when there is memory pressure and direct
> reclaim.
> 
> Signed-off-by: Breno Leitao <[email protected]>
> ---
>  fs/pipe.c | 105 
> ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--
>  1 file changed, 102 insertions(+), 3 deletions(-)
> 
> diff --git a/fs/pipe.c b/fs/pipe.c
> index 9841648c9cf3e..cff255217bbfe 100644
> --- a/fs/pipe.c
> +++ b/fs/pipe.c
> @@ -111,16 +111,76 @@ void pipe_double_lock(struct pipe_inode_info *pipe1,
>       pipe_lock(pipe2);
>  }
>  
> -static struct page *anon_pipe_get_page(struct pipe_inode_info *pipe)
> +#define PIPE_PREALLOC_MAX 8
> +
> +struct anon_pipe_prealloc {
> +     struct page *pages[PIPE_PREALLOC_MAX];
> +     unsigned int count;
> +};
> +
> +/*
> + * Pre-allocate pages outside pipe->mutex for multi-page writes.
> + * alloc_page() with GFP_HIGHUSER can sleep in reclaim and runs memcg
> + * charging; doing it under the mutex stalls a concurrent reader.
> + *
> + * Loop alloc_page() instead of alloc_pages_bulk_*(): the bulk path refuses
> + * __GFP_ACCOUNT under memcg (see commit 8dcb3060d81d "memcg: page_alloc:
> + * skip bulk allocator for __GFP_ACCOUNT") and silently degrades to a single
> + * page. A per-page loop keeps memcg accounting and the task NUMA mempolicy
> + * honoured for every page; the per-call overhead is small compared to the
> + * pipe->mutex hold-time being shrunk. Any shortfall is covered by the
> + * in-lock alloc_page() fallback in anon_pipe_get_page().
> + */
> +static void anon_pipe_get_page_prealloc(struct anon_pipe_prealloc *prealloc,
> +                                     size_t total_len)
> +{
> +     unsigned int want, i;
> +     struct page *page;
> +
> +     prealloc->count = 0;
> +     if (total_len <= PAGE_SIZE)
> +             return;
> +
> +     want = min_t(unsigned int, DIV_ROUND_UP(total_len, PAGE_SIZE),
> +                  PIPE_PREALLOC_MAX);
> +
> +     for (i = 0; i < want; i++) {
> +             page = alloc_page(GFP_HIGHUSER | __GFP_ACCOUNT);
> +             if (!page)
> +                     break;
> +             prealloc->pages[prealloc->count++] = page;
> +     }

I believe alloc_pages_bulk() is supposed to be more efficient when
allocating several pages at once like this.

> +}
> +
> +static struct page *anon_pipe_prealloc_pop(struct anon_pipe_prealloc 
> *prealloc)
> +{
> +     if (!prealloc->count)
> +             return NULL;
> +
> +     prealloc->count--;
> +
> +     return prealloc->pages[prealloc->count];
> +}
> +
> +static struct page *anon_pipe_get_page(struct pipe_inode_info *pipe,
> +                                    struct anon_pipe_prealloc *prealloc)
>  {
> +     struct page *page;
> +
> +     /* Drain prealloc first to keep tmp_page[] hot for later small writes. 
> */
> +     page = anon_pipe_prealloc_pop(prealloc);
> +     if (page)
> +             return page;
> +
>       for (int i = 0; i < ARRAY_SIZE(pipe->tmp_page); i++) {
>               if (pipe->tmp_page[i]) {
> -                     struct page *page = pipe->tmp_page[i];
> +                     page = pipe->tmp_page[i];
>                       pipe->tmp_page[i] = NULL;
>                       return page;
>               }
>       }
>  
> +     /* FWIW: This is called with pipe->mutex held */
>       return alloc_page(GFP_HIGHUSER | __GFP_ACCOUNT);
>  }
>  
> @@ -139,6 +199,38 @@ static void anon_pipe_put_page(struct pipe_inode_info 
> *pipe,
>       put_page(page);
>  }
>  
> +/*
> + * Stash leftover prealloc pages in tmp_page[] so the next write to this
> + * pipe gets a hot page without entering the allocator.
> + */
> +static void anon_pipe_refill_tmp_pages(struct pipe_inode_info *pipe,
> +                                    struct anon_pipe_prealloc *prealloc)
> +{
> +     int i, idx;
> +
> +     if (!prealloc->count)
> +             return;
> +
> +     for (i = 0; i < ARRAY_SIZE(pipe->tmp_page); i++) {
> +             if (pipe->tmp_page[i])
> +                     continue;
> +             if (!prealloc->count)
> +                     return;
> +             idx = --prealloc->count;
> +             pipe->tmp_page[i] = prealloc->pages[idx];
> +             prealloc->pages[idx] = NULL;
> +     }
> +}
> +
> +/* Runs after mutex_unlock() to keep put_page() out of the critical section. 
> */
> +static void anon_pipe_free_pages(struct anon_pipe_prealloc *prealloc)
> +{
> +     while (prealloc->count) {
> +             prealloc->count--;
> +             put_page(prealloc->pages[prealloc->count]);
> +     }
> +}
> +
>  static void anon_pipe_buf_release(struct pipe_inode_info *pipe,
>                                 struct pipe_buffer *buf)
>  {
> @@ -432,6 +524,7 @@ anon_pipe_write(struct kiocb *iocb, struct iov_iter *from)
>  {
>       struct file *filp = iocb->ki_filp;
>       struct pipe_inode_info *pipe = filp->private_data;
> +     struct anon_pipe_prealloc prealloc;
>       unsigned int head;
>       ssize_t ret = 0;
>       size_t total_len = iov_iter_count(from);
> @@ -455,6 +548,8 @@ anon_pipe_write(struct kiocb *iocb, struct iov_iter *from)
>       if (unlikely(total_len == 0))
>               return 0;
>  
> +     anon_pipe_get_page_prealloc(&prealloc, total_len);
> +
>       mutex_lock(&pipe->mutex);
>  
>       if (!pipe->readers) {
> @@ -512,7 +607,7 @@ anon_pipe_write(struct kiocb *iocb, struct iov_iter *from)
>                       struct page *page;
>                       int copied;
>  
> -                     page = anon_pipe_get_page(pipe);
> +                     page = anon_pipe_get_page(pipe, &prealloc);
>                       if (unlikely(!page)) {
>                               if (!ret)
>                                       ret = -ENOMEM;
> @@ -566,7 +661,9 @@ anon_pipe_write(struct kiocb *iocb, struct iov_iter *from)
>                * after waiting we need to re-check whether the pipe
>                * become empty while we dropped the lock.
>                */
> +             anon_pipe_refill_tmp_pages(pipe, &prealloc);
>               mutex_unlock(&pipe->mutex);
> +             anon_pipe_free_pages(&prealloc);
>               if (was_empty)
>                       wake_up_interruptible_sync_poll(&pipe->rd_wait, EPOLLIN 
> | EPOLLRDNORM);
>               kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
> @@ -576,9 +673,11 @@ anon_pipe_write(struct kiocb *iocb, struct iov_iter 
> *from)
>               wake_next_writer = true;
>       }
>  out:
> +     anon_pipe_refill_tmp_pages(pipe, &prealloc);
>       if (pipe_is_full(pipe))
>               wake_next_writer = false;
>       mutex_unlock(&pipe->mutex);
> +     anon_pipe_free_pages(&prealloc);
>  
>       /*
>        * If we do do a wakeup event, we do a 'sync' wakeup, because we

-- 
Jeff Layton <[email protected]>

Reply via email to