anon_pipe_write() takes pipe->mutex and then, from the per-iteration anon_pipe_get_page() helper, used to call alloc_page(GFP_HIGHUSER | __GFP_ACCOUNT) once per page while still holding it. That allocation can sleep doing direct reclaim and/or runs memcg charging, which extends the critical section and stalls a concurrent reader on the very same mutex.
Bulk pre-allocate DIV_ROUND_UP(total_len, PAGE_SIZE) pages (up to PIPE_PREALLOC_MAX (8)) pages outside the mutex when total_len >= PAGE_SIZE, using alloc_pages_bulk(). (Under memcg, alloc_pages_bulk() with __GFP_ACCOUNT might return less pages than requested, but, this is still a win, given some pages allocation is moved outside of the lock). Pass the array into anon_pipe_get_page(), which now consumes from tmp_page[] first, then from the prealloc array, and only as a last resort falls back to alloc_page() under the mutex (reached only for writes larger than 8 pages, where the prealloc cap is exhausted). Doing this in one bulk call before the lock keeps the fast path's mutex held for a single, write-bounded critical section -- no extra mutex_unlock/_lock cycles -- so it avoids the per-page lock-handoff overhead that a per-page drop-and-retake design would introduce, while still moving the typical multi-page allocation off the critical section. Unused prealloc pages are pushed to the pipe's tmp_page[] cache (or freed) before unlock, so a subsequent write to the same pipe gets a hot cached page rather than paying for an alloc again. Sub-PAGE_SIZE writes are unchanged: the merge path handles them without ever needing a fresh page, so it is not worth speculatively allocating for them. This can improve the pipe throughput up to 48% and reduce the latency in 33%. Signed-off-by: Breno Leitao <[email protected]> --- fs/pipe.c | 40 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 38 insertions(+), 2 deletions(-) diff --git a/fs/pipe.c b/fs/pipe.c index 9841648c9cf3e..7a1517d15107a 100644 --- a/fs/pipe.c +++ b/fs/pipe.c @@ -111,7 +111,11 @@ void pipe_double_lock(struct pipe_inode_info *pipe1, pipe_lock(pipe2); } -static struct page *anon_pipe_get_page(struct pipe_inode_info *pipe) +#define PIPE_PREALLOC_MAX 8 + +static struct page *anon_pipe_get_page(struct pipe_inode_info *pipe, + struct page **prealloc, + unsigned int *prealloc_n) { for (int i = 0; i < ARRAY_SIZE(pipe->tmp_page); i++) { if (pipe->tmp_page[i]) { @@ -121,6 +125,14 @@ static struct page *anon_pipe_get_page(struct pipe_inode_info *pipe) } } + if (*prealloc_n) { + unsigned int idx = --(*prealloc_n); + struct page *page = prealloc[idx]; + + prealloc[idx] = NULL; + return page; + } + return alloc_page(GFP_HIGHUSER | __GFP_ACCOUNT); } @@ -438,6 +450,8 @@ anon_pipe_write(struct kiocb *iocb, struct iov_iter *from) ssize_t chars; bool was_empty = false; bool wake_next_writer = false; + struct page *prealloc[PIPE_PREALLOC_MAX] = { NULL }; + unsigned int prealloc_n = 0; /* * Reject writing to watch queue pipes before the point where we lock @@ -455,6 +469,26 @@ anon_pipe_write(struct kiocb *iocb, struct iov_iter *from) if (unlikely(total_len == 0)) return 0; + /* + * Bulk pre-allocate up to PIPE_PREALLOC_MAX pages outside pipe->mutex + * for writes that span at least one full page. alloc_page() with + * GFP_HIGHUSER may sleep doing reclaim and runs memcg charging, so + * doing it under the mutex extends the critical section and stalls + * the reader. The merge path handles sub-PAGE_SIZE writes without + * needing a fresh page; for writes larger than PIPE_PREALLOC_MAX + * pages, anon_pipe_get_page() falls back to a single alloc_page() + * under the mutex for the remainder. Unused prealloc pages are + * returned to the pipe's tmp_page[] cache (or freed) before unlock. + */ + if (total_len >= PAGE_SIZE) { + unsigned int want = min_t(unsigned int, + DIV_ROUND_UP(total_len, PAGE_SIZE), + PIPE_PREALLOC_MAX); + + prealloc_n = alloc_pages_bulk(GFP_HIGHUSER | __GFP_ACCOUNT, + want, prealloc); + } + mutex_lock(&pipe->mutex); if (!pipe->readers) { @@ -512,7 +546,7 @@ anon_pipe_write(struct kiocb *iocb, struct iov_iter *from) struct page *page; int copied; - page = anon_pipe_get_page(pipe); + page = anon_pipe_get_page(pipe, prealloc, &prealloc_n); if (unlikely(!page)) { if (!ret) ret = -ENOMEM; @@ -576,6 +610,8 @@ anon_pipe_write(struct kiocb *iocb, struct iov_iter *from) wake_next_writer = true; } out: + while (prealloc_n) + anon_pipe_put_page(pipe, prealloc[--prealloc_n]); if (pipe_is_full(pipe)) wake_next_writer = false; mutex_unlock(&pipe->mutex); -- 2.53.0-Meta

