On Fri, May 22, 2026 at 6:44 PM Breno Leitao <[email protected]> wrote: > > anon_pipe_write() takes pipe->mutex (aka "mutex protecting the whole > thing") and then, from the per-iteration anon_pipe_get_page() helper, > used to call alloc_page(GFP_HIGHUSER | __GFP_ACCOUNT) once per page > while still holding it. > > That allocation can sleep doing direct reclaim and/or runs memcg > charging, which extends the critical section and stalls a concurrent > reader on the very same mutex. > > Just pre-alloc the required pages before the lock in an array and just pop > them inside the lock. > > This can improve the pipe throughput up to 48% and reduce the > latency in 33%, easily seen when there is memory pressure and direct > reclaim. > > Signed-off-by: Breno Leitao <[email protected]> > --- > fs/pipe.c | 105 > ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-- > 1 file changed, 102 insertions(+), 3 deletions(-) > > diff --git a/fs/pipe.c b/fs/pipe.c > index 9841648c9cf3e..cff255217bbfe 100644 > --- a/fs/pipe.c > +++ b/fs/pipe.c > @@ -111,16 +111,76 @@ void pipe_double_lock(struct pipe_inode_info *pipe1, > pipe_lock(pipe2); > } > > -static struct page *anon_pipe_get_page(struct pipe_inode_info *pipe) > +#define PIPE_PREALLOC_MAX 8 > + > +struct anon_pipe_prealloc { > + struct page *pages[PIPE_PREALLOC_MAX]; > + unsigned int count; > +}; > + > +/* > + * Pre-allocate pages outside pipe->mutex for multi-page writes. > + * alloc_page() with GFP_HIGHUSER can sleep in reclaim and runs memcg > + * charging; doing it under the mutex stalls a concurrent reader. > + * > + * Loop alloc_page() instead of alloc_pages_bulk_*(): the bulk path refuses > + * __GFP_ACCOUNT under memcg (see commit 8dcb3060d81d "memcg: page_alloc: > + * skip bulk allocator for __GFP_ACCOUNT") and silently degrades to a single > + * page. A per-page loop keeps memcg accounting and the task NUMA mempolicy > + * honoured for every page; the per-call overhead is small compared to the > + * pipe->mutex hold-time being shrunk. Any shortfall is covered by the > + * in-lock alloc_page() fallback in anon_pipe_get_page(). > + */ > +static void anon_pipe_get_page_prealloc(struct anon_pipe_prealloc *prealloc, > + size_t total_len) > +{ > + unsigned int want, i; > + struct page *page; > + > + prealloc->count = 0; > + if (total_len <= PAGE_SIZE) > + return; > + > + want = min_t(unsigned int, DIV_ROUND_UP(total_len, PAGE_SIZE), > + PIPE_PREALLOC_MAX); > + > + for (i = 0; i < want; i++) { > + page = alloc_page(GFP_HIGHUSER | __GFP_ACCOUNT); > + if (!page) > + break; > + prealloc->pages[prealloc->count++] = page; > + } > +} > + > +static struct page *anon_pipe_prealloc_pop(struct anon_pipe_prealloc > *prealloc) > +{ > + if (!prealloc->count) > + return NULL; > + > + prealloc->count--; > + > + return prealloc->pages[prealloc->count]; > +} > + > +static struct page *anon_pipe_get_page(struct pipe_inode_info *pipe, > + struct anon_pipe_prealloc *prealloc) > { > + struct page *page; > + > + /* Drain prealloc first to keep tmp_page[] hot for later small > writes. */ > + page = anon_pipe_prealloc_pop(prealloc); > + if (page) > + return page; > + > for (int i = 0; i < ARRAY_SIZE(pipe->tmp_page); i++) { > if (pipe->tmp_page[i]) { > - struct page *page = pipe->tmp_page[i]; > + page = pipe->tmp_page[i]; > pipe->tmp_page[i] = NULL; > return page; > } > } > > + /* FWIW: This is called with pipe->mutex held */ > return alloc_page(GFP_HIGHUSER | __GFP_ACCOUNT); > } > > @@ -139,6 +199,38 @@ static void anon_pipe_put_page(struct pipe_inode_info > *pipe, > put_page(page); > } > > +/* > + * Stash leftover prealloc pages in tmp_page[] so the next write to this > + * pipe gets a hot page without entering the allocator. > + */ > +static void anon_pipe_refill_tmp_pages(struct pipe_inode_info *pipe, > + struct anon_pipe_prealloc *prealloc) > +{ > + int i, idx; > + > + if (!prealloc->count) > + return; > + > + for (i = 0; i < ARRAY_SIZE(pipe->tmp_page); i++) { > + if (pipe->tmp_page[i]) > + continue; > + if (!prealloc->count) > + return; > + idx = --prealloc->count; > + pipe->tmp_page[i] = prealloc->pages[idx]; > + prealloc->pages[idx] = NULL; > + } > +} > + > +/* Runs after mutex_unlock() to keep put_page() out of the critical section. > */ > +static void anon_pipe_free_pages(struct anon_pipe_prealloc *prealloc) > +{ > + while (prealloc->count) { > + prealloc->count--; > + put_page(prealloc->pages[prealloc->count]); > + } > +} > + > static void anon_pipe_buf_release(struct pipe_inode_info *pipe, > struct pipe_buffer *buf) > { > @@ -432,6 +524,7 @@ anon_pipe_write(struct kiocb *iocb, struct iov_iter *from) > { > struct file *filp = iocb->ki_filp; > struct pipe_inode_info *pipe = filp->private_data; > + struct anon_pipe_prealloc prealloc; > unsigned int head; > ssize_t ret = 0; > size_t total_len = iov_iter_count(from); > @@ -455,6 +548,8 @@ anon_pipe_write(struct kiocb *iocb, struct iov_iter *from) > if (unlikely(total_len == 0)) > return 0; > > + anon_pipe_get_page_prealloc(&prealloc, total_len); > + > mutex_lock(&pipe->mutex); > > if (!pipe->readers) { > @@ -512,7 +607,7 @@ anon_pipe_write(struct kiocb *iocb, struct iov_iter *from) > struct page *page; > int copied; > > - page = anon_pipe_get_page(pipe); > + page = anon_pipe_get_page(pipe, &prealloc); > if (unlikely(!page)) { > if (!ret) > ret = -ENOMEM; > @@ -566,7 +661,9 @@ anon_pipe_write(struct kiocb *iocb, struct iov_iter *from) > * after waiting we need to re-check whether the pipe > * become empty while we dropped the lock. > */ > + anon_pipe_refill_tmp_pages(pipe, &prealloc); > mutex_unlock(&pipe->mutex); > + anon_pipe_free_pages(&prealloc); > if (was_empty) > wake_up_interruptible_sync_poll(&pipe->rd_wait, > EPOLLIN | EPOLLRDNORM); > kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN); > @@ -576,9 +673,11 @@ anon_pipe_write(struct kiocb *iocb, struct iov_iter > *from) > wake_next_writer = true; > } > out: > + anon_pipe_refill_tmp_pages(pipe, &prealloc); > if (pipe_is_full(pipe)) > wake_next_writer = false;
maybe the new call would be nicer moved below pipe_is_full, but that's just cosmetics > mutex_unlock(&pipe->mutex); > + anon_pipe_free_pages(&prealloc); > > /* > * If we do do a wakeup event, we do a 'sync' wakeup, because we > > -- > 2.54.0 > Reviewed-by: Mateusz Guzik <[email protected]>

