anon_pipe_write() takes pipe->mutex (aka "mutex protecting the whole thing") and then, from the per-iteration anon_pipe_get_page() helper, used to call alloc_page(GFP_HIGHUSER | __GFP_ACCOUNT) once per page while still holding it.
That allocation can sleep doing direct reclaim and/or runs memcg charging, which extends the critical section and stalls a concurrent reader on the very same mutex. Just pre-alloc the required pages before the lock in an array and just pop them inside the lock. This can improve the pipe throughput up to 48% and reduce the latency in 33%, easily seen when there is memory pressure and direct reclaim. Signed-off-by: Breno Leitao <[email protected]> --- fs/pipe.c | 105 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 102 insertions(+), 3 deletions(-) diff --git a/fs/pipe.c b/fs/pipe.c index 9841648c9cf3e..cff255217bbfe 100644 --- a/fs/pipe.c +++ b/fs/pipe.c @@ -111,16 +111,76 @@ void pipe_double_lock(struct pipe_inode_info *pipe1, pipe_lock(pipe2); } -static struct page *anon_pipe_get_page(struct pipe_inode_info *pipe) +#define PIPE_PREALLOC_MAX 8 + +struct anon_pipe_prealloc { + struct page *pages[PIPE_PREALLOC_MAX]; + unsigned int count; +}; + +/* + * Pre-allocate pages outside pipe->mutex for multi-page writes. + * alloc_page() with GFP_HIGHUSER can sleep in reclaim and runs memcg + * charging; doing it under the mutex stalls a concurrent reader. + * + * Loop alloc_page() instead of alloc_pages_bulk_*(): the bulk path refuses + * __GFP_ACCOUNT under memcg (see commit 8dcb3060d81d "memcg: page_alloc: + * skip bulk allocator for __GFP_ACCOUNT") and silently degrades to a single + * page. A per-page loop keeps memcg accounting and the task NUMA mempolicy + * honoured for every page; the per-call overhead is small compared to the + * pipe->mutex hold-time being shrunk. Any shortfall is covered by the + * in-lock alloc_page() fallback in anon_pipe_get_page(). + */ +static void anon_pipe_get_page_prealloc(struct anon_pipe_prealloc *prealloc, + size_t total_len) +{ + unsigned int want, i; + struct page *page; + + prealloc->count = 0; + if (total_len <= PAGE_SIZE) + return; + + want = min_t(unsigned int, DIV_ROUND_UP(total_len, PAGE_SIZE), + PIPE_PREALLOC_MAX); + + for (i = 0; i < want; i++) { + page = alloc_page(GFP_HIGHUSER | __GFP_ACCOUNT); + if (!page) + break; + prealloc->pages[prealloc->count++] = page; + } +} + +static struct page *anon_pipe_prealloc_pop(struct anon_pipe_prealloc *prealloc) +{ + if (!prealloc->count) + return NULL; + + prealloc->count--; + + return prealloc->pages[prealloc->count]; +} + +static struct page *anon_pipe_get_page(struct pipe_inode_info *pipe, + struct anon_pipe_prealloc *prealloc) { + struct page *page; + + /* Drain prealloc first to keep tmp_page[] hot for later small writes. */ + page = anon_pipe_prealloc_pop(prealloc); + if (page) + return page; + for (int i = 0; i < ARRAY_SIZE(pipe->tmp_page); i++) { if (pipe->tmp_page[i]) { - struct page *page = pipe->tmp_page[i]; + page = pipe->tmp_page[i]; pipe->tmp_page[i] = NULL; return page; } } + /* FWIW: This is called with pipe->mutex held */ return alloc_page(GFP_HIGHUSER | __GFP_ACCOUNT); } @@ -139,6 +199,38 @@ static void anon_pipe_put_page(struct pipe_inode_info *pipe, put_page(page); } +/* + * Stash leftover prealloc pages in tmp_page[] so the next write to this + * pipe gets a hot page without entering the allocator. + */ +static void anon_pipe_refill_tmp_pages(struct pipe_inode_info *pipe, + struct anon_pipe_prealloc *prealloc) +{ + int i, idx; + + if (!prealloc->count) + return; + + for (i = 0; i < ARRAY_SIZE(pipe->tmp_page); i++) { + if (pipe->tmp_page[i]) + continue; + if (!prealloc->count) + return; + idx = --prealloc->count; + pipe->tmp_page[i] = prealloc->pages[idx]; + prealloc->pages[idx] = NULL; + } +} + +/* Runs after mutex_unlock() to keep put_page() out of the critical section. */ +static void anon_pipe_free_pages(struct anon_pipe_prealloc *prealloc) +{ + while (prealloc->count) { + prealloc->count--; + put_page(prealloc->pages[prealloc->count]); + } +} + static void anon_pipe_buf_release(struct pipe_inode_info *pipe, struct pipe_buffer *buf) { @@ -432,6 +524,7 @@ anon_pipe_write(struct kiocb *iocb, struct iov_iter *from) { struct file *filp = iocb->ki_filp; struct pipe_inode_info *pipe = filp->private_data; + struct anon_pipe_prealloc prealloc; unsigned int head; ssize_t ret = 0; size_t total_len = iov_iter_count(from); @@ -455,6 +548,8 @@ anon_pipe_write(struct kiocb *iocb, struct iov_iter *from) if (unlikely(total_len == 0)) return 0; + anon_pipe_get_page_prealloc(&prealloc, total_len); + mutex_lock(&pipe->mutex); if (!pipe->readers) { @@ -512,7 +607,7 @@ anon_pipe_write(struct kiocb *iocb, struct iov_iter *from) struct page *page; int copied; - page = anon_pipe_get_page(pipe); + page = anon_pipe_get_page(pipe, &prealloc); if (unlikely(!page)) { if (!ret) ret = -ENOMEM; @@ -566,7 +661,9 @@ anon_pipe_write(struct kiocb *iocb, struct iov_iter *from) * after waiting we need to re-check whether the pipe * become empty while we dropped the lock. */ + anon_pipe_refill_tmp_pages(pipe, &prealloc); mutex_unlock(&pipe->mutex); + anon_pipe_free_pages(&prealloc); if (was_empty) wake_up_interruptible_sync_poll(&pipe->rd_wait, EPOLLIN | EPOLLRDNORM); kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN); @@ -576,9 +673,11 @@ anon_pipe_write(struct kiocb *iocb, struct iov_iter *from) wake_next_writer = true; } out: + anon_pipe_refill_tmp_pages(pipe, &prealloc); if (pipe_is_full(pipe)) wake_next_writer = false; mutex_unlock(&pipe->mutex); + anon_pipe_free_pages(&prealloc); /* * If we do do a wakeup event, we do a 'sync' wakeup, because we -- 2.54.0

