Hi, On 2026-04-03 15:04:51 -0400, Melanie Plageman wrote: > On Fri, Apr 3, 2026 at 1:30 PM Andres Freund <[email protected]> wrote: > > > > > - why not remove the combine_distance requirement from the fast path > > > entry criteria (could save resume_combine_distance in the fast path > > > and restore it after a miss) > > > > Because entering the fast path prevents IO combining from happening. So it's > > absolutely crucial that we do *not* enter it while we would combine. > > But if it is a buffer hit, obviously we can't do IO combining anyway, > or am I misunderstanding the fast path's common case?
It's true that we can't do combining in the fast path, but the problem is that with eic=0/1 (or a recent history that leaves us with low distances or a low pin limit), we will not start the next IO until there are no more buffers pinned. Imagine that we started one 16 block IO and have a readahead_distance of 1. After consuming 15 buffers, we will have one more buffer pinned, but read_stream_look_ahead() will not yet start another IO, due to the readahead_distance condition (or max_pinned_buffers or ...). Without the stream->combine_distance == 1 check, the subsequent check for read_stream_next_buffer() would consider this a valid case for entering fast-path. > > > You mentioned that we don't want to read too far ahead (including for > > > a single combined IO) in part because: > > > > > > > The resowner and private refcount mechanisms take more CPU cycles if > > > > you have > > > > more buffers pinned > > > > > > But I don't see how either distance is responding to this or > > > self-calibrating with this in mind > > > > Using the minimal required distance to avoid needing to wait for IO > > completion > > is responding to that, no? Without these patches we read ahead as far as > > possible, even if all the data is in the page cache, which makes this issue > > way worse (without these patches it's a major source of regressions in the > > index prefetching patch). > > But we aren't using the minimal distance to avoid needing to wait for > IO completion. We are also using a higher distance to try and get IO > combining and toallow for async copying into the kernel buffer cache, > etc, etc. My testing suggests that doing IO combining for a reasonble io_combine_limit is pretty much always a win in a steady-state stream (i.e. not a short one that's not fully consumed), the gain from avoiding the larger amounts of syscalls sufficiently large. One we start doing async copying from the kernel page cache, we will have to wait for the completion of that async work, which will lead to readahead_distance being increased if necessary. > There's a lot of different considerations; it isn't just two opposing > forces. It's not, but I think always performing io_combine_limit sized IOs after a ramp-up and increasing the distance based on needing to wait is a pretty decent heuristic. For best results it does require pgaio_uring_should_use_async() to trigger, as otherwise we do not get get the parallelized memory copy. Which means it may never trigger if we don't occasionally reach the size based condition. Luckily it does not seem like using async is beneficial for small IOs. > And, I'd imagine that the relationship between the > number of buffers pinned and CPU cycles required for resowner/refcount > isn't perfectly linear. It's definitely not. > I'm not saying that we don't do IO combining at high distances, I'm > more saying that it is confusing that combine_distance controls how > far we look ahead when readahead_distance is low but when > readahead_distance is high, it controls when we issue the IO and not > how far we look ahead. I don't think we should change course now, but > I wanted to call out that this felt a little uncomfortable to me. I'm not sure I see an alternative. I tried to at least improve the comments around this. Attached are a revised set of commits. The largest changes are: - Reordered the series to put "read_stream: Only increase read-ahead distance when waiting for IO" after "stream: Split decision about look ahead for AIO and combining" Previously I thought it'd be too awkward from a comment perspective, but there's only one comment where it is a bit odd. Think it's much clearer this way. - Largely rewrote "Hacky implementation of making read_stream_reset()/end() not wait for IO". Looks a lot saner now. Think this needs a few more tests, in particular for the read stream and foreign_io paths. Will do that in the next version. - Tried to address most of Bilal's and Melanie's feedback - Removed some redundant checks from read_stream_should_issue_now() - Lots of comment polishing, including revising the top-level read_stream.c comment Greetings, Andres Freund
>From b9101ab942bb8f954450fc5a2b2b72168c370ce1 Mon Sep 17 00:00:00 2001 From: Andres Freund <[email protected]> Date: Tue, 3 Mar 2026 20:23:55 -0500 Subject: [PATCH v5 1/5] aio: io_uring: Trigger async processing for large IOs io_method=io_uring has a heuristic to trigger asynchronous processing of IOs once the IO depth is a bit larger. That heuristic is important when doing buffered IO from the kernel page cache, to allow parallelizing of the memory copy, as otherwise io_method=io_uring would be a lot slower than io_method=worker in that case. An upcoming commit will make read_stream.c only increase the read-ahead distance if we needed to wait for IO to complete. If to-be-read data is in the kernel page cache, io_uring will synchronously execute IO, unless the IO is flagged as async. Therefore the aforementioned change in read_stream.c heuristic would lead to a substantial performance regression with io_uring when data is in the page cache, as we would never reach a deep enough queue to actually trigger the existing heuristic. Parallelizing the copy from the page cache is mainly important when doing a lot of IO, which commonly is only possible when doing largely sequential IO. The reason we don't just mark all io_uring IOs as asynchronous is that the dispatch to a kernel thread has overhead. This overhead is mostly noticeable with small random IOs with a low queue depth, as in that case the gain from parallelizing the memory copy is small and the latency cost high. The facts from the two prior paragraphs show a way out: Use the size of the IO in addition to the depth of the queue to trigger asynchronous processing. One might think that just using the IO size might be enough, but experimentation has shown that not to be the case - with deep look-ahead distances being able to parallelize the memory copy is important even with smaller IOs. Reviewed-by: Melanie Plageman <[email protected]> Reviewed-by: Nazir Bilal Yavuz <[email protected]> Discussion: https://postgr.es/m/f3xxfrkafjxpyqxywcxricxgyizjirfceychyxsgn7bwjp5eda@kwbduhy7tfmu Discussion: https://postgr.es/m/ca+hukgl2phfydoqrhefqasonaxhsg48t1phs3vm8badrzqk...@mail.gmail.com --- src/backend/storage/aio/method_io_uring.c | 89 +++++++++++++++++------ 1 file changed, 67 insertions(+), 22 deletions(-) diff --git a/src/backend/storage/aio/method_io_uring.c b/src/backend/storage/aio/method_io_uring.c index 39984df31b4..b42d28647b8 100644 --- a/src/backend/storage/aio/method_io_uring.c +++ b/src/backend/storage/aio/method_io_uring.c @@ -409,7 +409,6 @@ static int pgaio_uring_submit(uint16 num_staged_ios, PgAioHandle **staged_ios) { struct io_uring *uring_instance = &pgaio_my_uring_context->io_uring_ring; - int in_flight_before = dclist_count(&pgaio_my_backend->in_flight_ios); Assert(num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE); @@ -425,27 +424,6 @@ pgaio_uring_submit(uint16 num_staged_ios, PgAioHandle **staged_ios) pgaio_io_prepare_submit(ioh); pgaio_uring_sq_from_io(ioh, sqe); - - /* - * io_uring executes IO in process context if possible. That's - * generally good, as it reduces context switching. When performing a - * lot of buffered IO that means that copying between page cache and - * userspace memory happens in the foreground, as it can't be - * offloaded to DMA hardware as is possible when using direct IO. When - * executing a lot of buffered IO this causes io_uring to be slower - * than worker mode, as worker mode parallelizes the copying. io_uring - * can be told to offload work to worker threads instead. - * - * If an IO is buffered IO and we already have IOs in flight or - * multiple IOs are being submitted, we thus tell io_uring to execute - * the IO in the background. We don't do so for the first few IOs - * being submitted as executing in this process' context has lower - * latency. - */ - if (in_flight_before > 4 && (ioh->flags & PGAIO_HF_BUFFERED)) - io_uring_sqe_set_flags(sqe, IOSQE_ASYNC); - - in_flight_before++; } while (true) @@ -701,10 +679,63 @@ pgaio_uring_check_one(PgAioHandle *ioh, uint64 ref_generation) LWLockRelease(&owner_context->completion_lock); } +/* + * io_uring executes IO in process context if possible. That's generally good, + * as it reduces context switching. When performing a lot of buffered IO that + * means that copying between page cache and userspace memory happens in the + * foreground, as it can't be offloaded to DMA hardware as is possible when + * using direct IO. When executing a lot of buffered IO this causes io_uring + * to be slower than worker mode, as worker mode parallelizes the + * copying. io_uring can be told to offload work to worker threads instead. + * + * If the IOs are small, we only benefit from forcing things into the + * background if there is a lot of IO, as otherwise the overhead from context + * switching is higher than the gain. + * + * If IOs are large, there is benefit from asynchronous processing at lower + * queue depths, as IO latency is less of a crucial factor and parallelizing + * memory copies is more important. In addition, it is important to trigger + * asynchronous processing even at low queue depth, as with foreground + * processing we might never actually reach deep enough IO depths to trigger + * asynchronous processing, which in turn would deprive readahead control + * logic of information about whether a deeper look-ahead distance would be + * advantageous. + * + * We have done some basic benchmarking to validate the thresholds used, but + * it's quite plausible that there are better values. + */ +static bool +pgaio_uring_should_use_async(PgAioHandle *ioh, size_t io_size) +{ + /* + * With DIO there's no benefit from forcing asynchronous processing, as + * io_uring will never execute direct IO synchronously. + */ + if (!(ioh->flags & PGAIO_HF_BUFFERED)) + return false; + + /* + * Once the IO queue depth is not that shallow anymore, the overhead of + * dispatching to the background is a less significant factor. + */ + if (dclist_count(&pgaio_my_backend->in_flight_ios) > 4) + return true; + + /* + * If the IO is larger, the gains from parallelizing the memory copy are + * larger and typically the impact of the latency is smaller. + */ + if (io_size >= (BLCKSZ * 4)) + return true; + + return false; +} + static void pgaio_uring_sq_from_io(PgAioHandle *ioh, struct io_uring_sqe *sqe) { struct iovec *iov; + size_t io_size = 0; switch ((PgAioOp) ioh->op) { @@ -717,6 +748,8 @@ pgaio_uring_sq_from_io(PgAioHandle *ioh, struct io_uring_sqe *sqe) iov->iov_base, iov->iov_len, ioh->op_data.read.offset); + + io_size = iov->iov_len; } else { @@ -726,7 +759,13 @@ pgaio_uring_sq_from_io(PgAioHandle *ioh, struct io_uring_sqe *sqe) ioh->op_data.read.iov_length, ioh->op_data.read.offset); + for (int i = 0; i < ioh->op_data.read.iov_length; i++, iov++) + io_size += iov->iov_len; } + + if (pgaio_uring_should_use_async(ioh, io_size)) + io_uring_sqe_set_flags(sqe, IOSQE_ASYNC); + break; case PGAIO_OP_WRITEV: @@ -747,6 +786,12 @@ pgaio_uring_sq_from_io(PgAioHandle *ioh, struct io_uring_sqe *sqe) ioh->op_data.write.iov_length, ioh->op_data.write.offset); } + + /* + * For now don't trigger use of IOSQE_ASYNC for writes, it's not + * clear there is a performance benefit in doing so. + */ + break; case PGAIO_OP_INVALID: -- 2.53.0.1.gb2826b52eb
>From 6d46d92d95ed7ef13dbfa05aa652d93f8e0367b3 Mon Sep 17 00:00:00 2001 From: Andres Freund <[email protected]> Date: Tue, 31 Mar 2026 12:49:28 -0400 Subject: [PATCH v5 2/5] read_stream: Move logic about IO combining & issuing to helpers The long if statements were hard to read and hard to document. Splitting them into inline helpers makes it much easier to explain each part separately. This is done in preparation for making the logic more complicated... Reviewed-by: Nazir Bilal Yavuz <[email protected]> Discussion: https://postgr.es/m/f3xxfrkafjxpyqxywcxricxgyizjirfceychyxsgn7bwjp5eda@kwbduhy7tfmu --- src/backend/storage/aio/read_stream.c | 97 ++++++++++++++++++++++----- 1 file changed, 80 insertions(+), 17 deletions(-) diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c index 31f9e35dee3..3499776d210 100644 --- a/src/backend/storage/aio/read_stream.c +++ b/src/backend/storage/aio/read_stream.c @@ -440,6 +440,78 @@ read_stream_start_pending_read(ReadStream *stream) return true; } +/* + * Should we continue to perform look ahead? The look ahead may allow us to + * make the pending IO larger via IO combining or to issue more read ahead. + */ +static inline bool +read_stream_should_look_ahead(ReadStream *stream) +{ + /* If the callback has signaled end-of-stream, we're done */ + if (stream->distance == 0) + return false; + + /* never start more IOs than our cap */ + if (stream->ios_in_progress >= stream->max_ios) + return false; + + /* + * Don't start more read-ahead if that'd put us over the distance limit + * for doing read-ahead. As stream->distance is capped by + * max_pinned_buffers, this prevents us from looking ahead so far that it + * would put us over the pin limit. + */ + if (stream->pinned_buffers + stream->pending_read_nblocks >= stream->distance) + return false; + + return true; +} + +/* + * We don't start the pending read just because we've hit the distance limit, + * preferring to give it another chance to grow to full io_combine_limit size + * once more buffers have been consumed. But this is not desirable in all + * situations - see below. + */ +static inline bool +read_stream_should_issue_now(ReadStream *stream) +{ + int16 pending_read_nblocks = stream->pending_read_nblocks; + + /* there is no pending IO that could be issued */ + if (pending_read_nblocks == 0) + return false; + + /* never start more IOs than our cap */ + if (stream->ios_in_progress >= stream->max_ios) + return false; + + /* + * If the callback has signaled end-of-stream, start the pending read + * immediately. There is no further potential for IO combining. + */ + if (stream->distance == 0) + return true; + + /* + * If we've already reached io_combine_limit, there's no chance of growing + * the read further. + */ + if (pending_read_nblocks >= stream->io_combine_limit) + return true; + + /* + * If we currently have no reads in flight or prepared, issue the IO once + * we are not looking ahead further. This ensures there's always at least + * one IO prepared. + */ + if (stream->pinned_buffers == 0 && + !read_stream_should_look_ahead(stream)) + return true; + + return false; +} + static void read_stream_look_ahead(ReadStream *stream) { @@ -452,14 +524,13 @@ read_stream_look_ahead(ReadStream *stream) if (stream->batch_mode) pgaio_enter_batchmode(); - while (stream->ios_in_progress < stream->max_ios && - stream->pinned_buffers + stream->pending_read_nblocks < stream->distance) + while (read_stream_should_look_ahead(stream)) { BlockNumber blocknum; int16 buffer_index; void *per_buffer_data; - if (stream->pending_read_nblocks == stream->io_combine_limit) + if (read_stream_should_issue_now(stream)) { read_stream_start_pending_read(stream); continue; @@ -511,21 +582,13 @@ read_stream_look_ahead(ReadStream *stream) } /* - * We don't start the pending read just because we've hit the distance - * limit, preferring to give it another chance to grow to full - * io_combine_limit size once more buffers have been consumed. However, - * if we've already reached io_combine_limit, or we've reached the - * distance limit and there isn't anything pinned yet, or the callback has - * signaled end-of-stream, we start the read immediately. Note that the - * pending read can exceed the distance goal, if the latter was reduced - * after hitting the per-backend buffer limit. + * Check if the pending read should be issued now, or if we should give it + * another chance to grow to the full size. + * + * Note that the pending read can exceed the distance goal, if the latter + * was reduced after hitting the per-backend buffer limit. */ - if (stream->pending_read_nblocks > 0 && - (stream->pending_read_nblocks == stream->io_combine_limit || - (stream->pending_read_nblocks >= stream->distance && - stream->pinned_buffers == 0) || - stream->distance == 0) && - stream->ios_in_progress < stream->max_ios) + if (read_stream_should_issue_now(stream)) read_stream_start_pending_read(stream); /* -- 2.53.0.1.gb2826b52eb
>From c3858e25c03f8f949cf1564e52236b5647427cbb Mon Sep 17 00:00:00 2001 From: Andres Freund <[email protected]> Date: Fri, 3 Apr 2026 13:01:26 -0400 Subject: [PATCH v5 3/5] read stream: Split decision about look ahead for AIO and combining In a subsequent commit the read-ahead distance will only be increased when waiting for IO. Without further work that would cause a regression: As IO combining and read-ahead are currently controlled by the same mechanism, we would end up not allowing IO combining when never needing to wait for IO (as the distance ends up too small to allow for full sized IOs), which can increase CPU overhead. A typical reason to not have to wait for IO completion at a low look-ahead distance is use of io_uring with the to-be-read data in the page cache. But even with worker the IO submission rate may be low enough for the worker to keep up. One might think that we could just always perform IO combining, but doing so at the start of a scan can cause performance regressions: 1) Performing a large IO commonly has a higher latency than smaller IOs. That is not a problem once reading ahead far enough, but at the start of a stream it can lead to longer waits for IO completion. 2) Sometimes read streams will not be read to completion. Immediately starting with full sized IOs leads to more wasted effort. This is not commonly an issue with existing read stream users, but the upcoming use of read streams to fetch table pages as part of an index scan frequently encounters this. One of the comments in read_stream_should_look_ahead() refers to a motivation that only really exists as of the next commit, but without it the code doesn't make sense on its own. Reviewed-by: Nazir Bilal Yavuz <[email protected]> Reviewed-by: Melanie Plageman <[email protected]> Discussion: https://postgr.es/m/f3xxfrkafjxpyqxywcxricxgyizjirfceychyxsgn7bwjp5eda@kwbduhy7tfmu Discussion: https://postgr.es/m/ca+hukgl2phfydoqrhefqasonaxhsg48t1phs3vm8badrzqk...@mail.gmail.com --- src/backend/storage/aio/read_stream.c | 141 +++++++++++++++++++------- 1 file changed, 107 insertions(+), 34 deletions(-) diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c index 3499776d210..a63e66988e1 100644 --- a/src/backend/storage/aio/read_stream.c +++ b/src/backend/storage/aio/read_stream.c @@ -98,10 +98,23 @@ struct ReadStream int16 max_pinned_buffers; int16 forwarded_buffers; int16 pinned_buffers; - int16 distance; + + /* + * Limit of how far, in blocks, to look-ahead for IO combining and for + * read-ahead. + * + * The limits for read-ahead and combining are handled separately to allow + * for IO combining even in cases where the IO subsystem can keep up at a + * low read-ahead distance, as doing larger IOs is more efficient. + * + * Set to 0 when the end of the stream is reached. + */ + int16 combine_distance; + int16 readahead_distance; uint16 distance_decay_holdoff; int16 initialized_buffers; - int16 resume_distance; + int16 resume_readahead_distance; + int16 resume_combine_distance; int read_buffers_flags; bool sync_mode; /* using io_method=sync */ bool batch_mode; /* READ_STREAM_USE_BATCHING */ @@ -332,8 +345,8 @@ read_stream_start_pending_read(ReadStream *stream) /* Shrink distance: no more look-ahead until buffers are released. */ new_distance = stream->pinned_buffers + buffer_limit; - if (stream->distance > new_distance) - stream->distance = new_distance; + if (stream->readahead_distance > new_distance) + stream->readahead_distance = new_distance; /* Unless we have nothing to give the consumer, stop here. */ if (stream->pinned_buffers > 0) @@ -374,12 +387,23 @@ read_stream_start_pending_read(ReadStream *stream) * perform IO asynchronously when starting out with a small look-ahead * distance. */ - if (stream->distance > 1 && stream->ios_in_progress == 0) + if (stream->ios_in_progress == 0) { - if (stream->distance_decay_holdoff == 0) - stream->distance--; - else + if (stream->distance_decay_holdoff > 0) stream->distance_decay_holdoff--; + else + { + if (stream->readahead_distance > 1) + stream->readahead_distance--; + + /* + * XXX: Should we actually reduce this at any time other than + * a reset? For now we have to, as this is also a condition + * for re-enabling fast_path. + */ + if (stream->combine_distance > 1) + stream->combine_distance--; + } } } else @@ -448,20 +472,42 @@ static inline bool read_stream_should_look_ahead(ReadStream *stream) { /* If the callback has signaled end-of-stream, we're done */ - if (stream->distance == 0) + if (stream->readahead_distance == 0) return false; /* never start more IOs than our cap */ if (stream->ios_in_progress >= stream->max_ios) return false; + /* + * Allow looking further ahead if we are in the process of building a + * larger IO, the IO is not yet big enough and we don't yet have IO in + * flight. Note that this is allowed even if we have reached the + * read-ahead limit (but not the buffer pin limit, combine_distance is + * capped by it and we are checking for pinned_buffers == 0). + * + * The reason this is restricted to not yet having an IO in flight is that + * once we are actually reading ahead, we will not issue IOs before they + * have reached the full size (or can't be grown further). But we *have* + * to issue an IO once pinned_buffers == 0, otherwise there won't be a + * buffer to return to the caller. + * + * This is important for cases where either effective_io_concurrency is + * low or we never need to wait for IO and thus are not increasing the + * distance. Without this we would end up with lots of small IOs. + */ + if (stream->pending_read_nblocks > 0 && + stream->pinned_buffers == 0 && + stream->pending_read_nblocks < stream->combine_distance) + return true; + /* * Don't start more read-ahead if that'd put us over the distance limit * for doing read-ahead. As stream->distance is capped by * max_pinned_buffers, this prevents us from looking ahead so far that it * would put us over the pin limit. */ - if (stream->pinned_buffers + stream->pending_read_nblocks >= stream->distance) + if (stream->pinned_buffers + stream->pending_read_nblocks >= stream->readahead_distance) return false; return true; @@ -490,14 +536,14 @@ read_stream_should_issue_now(ReadStream *stream) * If the callback has signaled end-of-stream, start the pending read * immediately. There is no further potential for IO combining. */ - if (stream->distance == 0) + if (stream->readahead_distance == 0) return true; /* - * If we've already reached io_combine_limit, there's no chance of growing + * If we've already reached combine_distance, there's no chance of growing * the read further. */ - if (pending_read_nblocks >= stream->io_combine_limit) + if (pending_read_nblocks >= stream->combine_distance) return true; /* @@ -550,7 +596,8 @@ read_stream_look_ahead(ReadStream *stream) if (blocknum == InvalidBlockNumber) { /* End of stream. */ - stream->distance = 0; + stream->readahead_distance = 0; + stream->combine_distance = 0; break; } @@ -597,7 +644,7 @@ read_stream_look_ahead(ReadStream *stream) * stream. In the worst case we can always make progress one buffer at a * time. */ - Assert(stream->pinned_buffers > 0 || stream->distance == 0); + Assert(stream->pinned_buffers > 0 || stream->readahead_distance == 0); if (stream->batch_mode) pgaio_exit_batchmode(); @@ -787,10 +834,17 @@ read_stream_begin_impl(int flags, * doing full io_combine_limit sized reads. */ if (flags & READ_STREAM_FULL) - stream->distance = Min(max_pinned_buffers, stream->io_combine_limit); + { + stream->readahead_distance = Min(max_pinned_buffers, stream->io_combine_limit); + stream->combine_distance = Min(max_pinned_buffers, stream->io_combine_limit); + } else - stream->distance = 1; - stream->resume_distance = stream->distance; + { + stream->readahead_distance = 1; + stream->combine_distance = 1; + } + stream->resume_readahead_distance = stream->readahead_distance; + stream->resume_combine_distance = stream->combine_distance; /* * Since we always access the same relation, we can initialize parts of @@ -889,7 +943,8 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) Assert(stream->ios_in_progress == 0); Assert(stream->forwarded_buffers == 0); Assert(stream->pinned_buffers == 1); - Assert(stream->distance == 1); + Assert(stream->readahead_distance == 1); + Assert(stream->combine_distance == 1); Assert(stream->pending_read_nblocks == 0); Assert(stream->per_buffer_data_size == 0); Assert(stream->initialized_buffers > stream->oldest_buffer_index); @@ -963,7 +1018,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) else { /* No more blocks, end of stream. */ - stream->distance = 0; + stream->readahead_distance = 0; stream->oldest_buffer_index = stream->next_buffer_index; stream->pinned_buffers = 0; stream->buffers[oldest_buffer_index] = InvalidBuffer; @@ -979,7 +1034,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) Assert(stream->oldest_buffer_index == stream->next_buffer_index); /* End of stream reached? */ - if (stream->distance == 0) + if (stream->readahead_distance == 0) return InvalidBuffer; /* @@ -993,7 +1048,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) /* End of stream reached? */ if (stream->pinned_buffers == 0) { - Assert(stream->distance == 0); + Assert(stream->readahead_distance == 0); return InvalidBuffer; } } @@ -1014,7 +1069,10 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index) { int16 io_index = stream->oldest_io_index; - int32 distance; /* wider temporary value, clamped below */ + + /* wider temporary values, clamped below */ + int32 readahead_distance; + int32 combine_distance; /* Sanity check that we still agree on the buffers. */ Assert(stream->ios[io_index].op.buffers == @@ -1027,10 +1085,18 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) if (++stream->oldest_io_index == stream->max_ios) stream->oldest_io_index = 0; - /* Look-ahead distance ramps up rapidly after we do I/O. */ - distance = stream->distance * 2; - distance = Min(distance, stream->max_pinned_buffers); - stream->distance = distance; + /* + * Read-ahead and IO combining distances ramp up rapidly after we do + * I/O. + */ + readahead_distance = stream->readahead_distance * 2; + readahead_distance = Min(readahead_distance, stream->max_pinned_buffers); + stream->readahead_distance = readahead_distance; + + combine_distance = stream->combine_distance * 2; + combine_distance = Min(combine_distance, stream->io_combine_limit); + combine_distance = Min(combine_distance, stream->max_pinned_buffers); + stream->combine_distance = combine_distance; /* * As we needed IO, prevent distance from being reduced within our @@ -1111,7 +1177,8 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) if (stream->ios_in_progress == 0 && stream->forwarded_buffers == 0 && stream->pinned_buffers == 1 && - stream->distance == 1 && + stream->readahead_distance == 1 && + stream->combine_distance == 1 && stream->pending_read_nblocks == 0 && stream->per_buffer_data_size == 0) { @@ -1157,8 +1224,10 @@ read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy) BlockNumber read_stream_pause(ReadStream *stream) { - stream->resume_distance = stream->distance; - stream->distance = 0; + stream->resume_readahead_distance = stream->readahead_distance; + stream->resume_combine_distance = stream->combine_distance; + stream->readahead_distance = 0; + stream->combine_distance = 0; return InvalidBlockNumber; } @@ -1170,7 +1239,8 @@ read_stream_pause(ReadStream *stream) void read_stream_resume(ReadStream *stream) { - stream->distance = stream->resume_distance; + stream->readahead_distance = stream->resume_readahead_distance; + stream->combine_distance = stream->resume_combine_distance; } /* @@ -1186,7 +1256,8 @@ read_stream_reset(ReadStream *stream) Buffer buffer; /* Stop looking ahead. */ - stream->distance = 0; + stream->readahead_distance = 0; + stream->combine_distance = 0; /* Forget buffered block number and fast path state. */ stream->buffered_blocknum = InvalidBlockNumber; @@ -1218,8 +1289,10 @@ read_stream_reset(ReadStream *stream) Assert(stream->ios_in_progress == 0); /* Start off assuming data is cached. */ - stream->distance = 1; - stream->resume_distance = stream->distance; + stream->readahead_distance = 1; + stream->combine_distance = 1; + stream->resume_readahead_distance = stream->readahead_distance; + stream->resume_combine_distance = stream->combine_distance; stream->distance_decay_holdoff = 0; } -- 2.53.0.1.gb2826b52eb
>From 4e1df86fbc1c4e3014ef96cc6d1992c3f93e64e0 Mon Sep 17 00:00:00 2001 From: Andres Freund <[email protected]> Date: Tue, 3 Mar 2026 18:00:53 -0500 Subject: [PATCH v5 4/5] read_stream: Only increase read-ahead distance when waiting for IO This avoids increasing the distance to the maximum in cases where the IO subsystem is already keeping up. This turns out to be important for performance for two reasons: - Pinning a lot of buffers is not cheap. If additional pins allow us to avoid IO waits, it's definitely worth it, but if we can already do all the necessary readahead at a distance of 16, reading ahead 512 buffers can increase the CPU overhead substantially. This is particularly noticeable when the to-be-read blocks are already in the kernel page cache. - If the read stream is read to completion, reading in data earlier than needed is of limited consequences, leaving aside the CPU costs mentioned above. But if the read stream will not be fully consumed, e.g. because it is on the inner side of a nested loop join, the additional IO can be a serious performance issue. This is not that commonly a problem for current read stream users, but the upcoming work, to use a read stream to fetch table pages as part of an index scan, frequently encounters this. Note that this commit would have substantial performance downsides without earlier commits: - Commit 6e36930f9aa, which avoids decreasing the readahead distance when there was recent IO, is crucial, as otherwise we very often would end up not reading ahead aggressively enough anymore with this commit, due to increasing the distance less often. - "read stream: Split decision about look ahead for AIO and combining" is important as we would otherwise not perform IO combining when the IO subsystem can keep up. - "aio: io_uring: Trigger async processing for large IOs" is important to continue to benefit from memory copy parallelism when using fewer IOs. Reviewed-by: Melanie Plageman <[email protected]> Reviewed-by: Nazir Bilal Yavuz <[email protected]> Tested-by: Tomas Vondra <[email protected]> Discussion: https://postgr.es/m/f3xxfrkafjxpyqxywcxricxgyizjirfceychyxsgn7bwjp5eda@kwbduhy7tfmu Discussion: https://postgr.es/m/ca+hukgl2phfydoqrhefqasonaxhsg48t1phs3vm8badrzqk...@mail.gmail.com --- src/backend/storage/aio/read_stream.c | 89 ++++++++++++++++++++------- 1 file changed, 68 insertions(+), 21 deletions(-) diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c index a63e66988e1..2adc04601b6 100644 --- a/src/backend/storage/aio/read_stream.c +++ b/src/backend/storage/aio/read_stream.c @@ -18,11 +18,13 @@ * to StartReadBuffers() so that a new one can begin to form. * * The algorithm for controlling the look-ahead distance is based on recent - * cache hit and miss history. When no I/O is necessary, there is no benefit - * in looking ahead more than one block. This is the default initial - * assumption, but when blocks needing I/O are streamed, the distance is - * increased rapidly to try to benefit from I/O combining and concurrency. It - * is reduced gradually when cached blocks are streamed. + * cache / miss history, as well as whether we need to wait for I/O completion + * after a miss. When no I/O is necessary, there is no benefit in looking + * ahead more than one block. This is the default initial assumption. When + * blocks needing I/O are streamed, the combine distance is increased to + * benefit from I/O combining and the read-ahead distance is increased + * whenever we need to wait for IO to try to benefit from increased I/O + * concurrency. Both are reduced gradually when cached blocks are streamed. * * The main data structure is a circular queue of buffers of size * max_pinned_buffers plus some extra space for technical reasons, ready to be @@ -1069,16 +1071,13 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index) { int16 io_index = stream->oldest_io_index; - - /* wider temporary values, clamped below */ - int32 readahead_distance; - int32 combine_distance; + bool needed_wait; /* Sanity check that we still agree on the buffers. */ Assert(stream->ios[io_index].op.buffers == &stream->buffers[oldest_buffer_index]); - WaitReadBuffers(&stream->ios[io_index].op); + needed_wait = WaitReadBuffers(&stream->ios[io_index].op); Assert(stream->ios_in_progress > 0); stream->ios_in_progress--; @@ -1086,21 +1085,45 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) stream->oldest_io_index = 0; /* - * Read-ahead and IO combining distances ramp up rapidly after we do - * I/O. + * If the IO was executed synchronously, we will never see + * WaitReadBuffers() block. Treat it as if it did block. This is + * particularly crucial when effective_io_concurrency=0 is used, as + * all IO will be synchronous. Without treating synchronous IO as + * having waited, we'd never allow the distance to get large enough to + * allow for IO combining, resulting in bad performance. */ - readahead_distance = stream->readahead_distance * 2; - readahead_distance = Min(readahead_distance, stream->max_pinned_buffers); - stream->readahead_distance = readahead_distance; + if (stream->ios[io_index].op.flags & READ_BUFFERS_SYNCHRONOUSLY) + needed_wait = true; - combine_distance = stream->combine_distance * 2; - combine_distance = Min(combine_distance, stream->io_combine_limit); - combine_distance = Min(combine_distance, stream->max_pinned_buffers); - stream->combine_distance = combine_distance; + /* + * Have the read-ahead distance ramp up rapidly after we needed to + * wait for IO. We only increase the read-ahead-distance when we + * needed to wait, to avoid increasing the distance further than + * necessary, as looking ahead too far can be costly, both due to the + * cost of unnecessarily pinning many buffers and due to doing IOs + * that may never be consumed if the stream is ended/reset before + * completion. + * + * If we did not need to wait, the current distance was evidently + * sufficient. + * + * NB: May not increase the distance if we already reached the end of + * the stream, as stream->readahead_distance == 0 is used to keep + * track of having reached the end. + */ + if (stream->readahead_distance > 0 && needed_wait) + { + /* wider temporary value, due to overflow risk */ + int32 readahead_distance; + + readahead_distance = stream->readahead_distance * 2; + readahead_distance = Min(readahead_distance, stream->max_pinned_buffers); + stream->readahead_distance = readahead_distance; + } /* - * As we needed IO, prevent distance from being reduced within our - * maximum look-ahead window. This avoids having distance collapse too + * As we needed IO, prevent distances from being reduced within our + * maximum look-ahead window. This avoids collapsing distances too * quickly in workloads where most of the required blocks are cached, * but where the remaining IOs are a sufficient enough factor to cause * a substantial slowdown if executed synchronously. @@ -1112,6 +1135,30 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) */ stream->distance_decay_holdoff = stream->max_pinned_buffers; + /* + * Whether we needed to wait or not, allow for more IO combining if we + * needed to do IO. The reason to do so independent of needing to wait + * is that when the data is resident in the kernel page cache, IO + * combining reduces the syscall / dispatch overhead, making it + * worthwhile regardless of needing to wait. + * + * It is also important with io_uring as it will never signal the need + * to wait for reads if all the data is in the page cache. There are + * heuristics to deal with that in method_io_uring.c, but they only + * work when the IO gets large enough. + */ + if (stream->combine_distance > 0 && + stream->combine_distance < stream->io_combine_limit) + { + /* wider temporary value, due to overflow risk */ + int32 combine_distance; + + combine_distance = stream->combine_distance * 2; + combine_distance = Min(combine_distance, stream->io_combine_limit); + combine_distance = Min(combine_distance, stream->max_pinned_buffers); + stream->combine_distance = combine_distance; + } + /* * If we've reached the first block of a sequential region we're * issuing advice for, cancel that until the next jump. The kernel -- 2.53.0.1.gb2826b52eb
>From 80e6fb3c4e7deab9ad8dab21adb8475aea6f4177 Mon Sep 17 00:00:00 2001 From: Andres Freund <[email protected]> Date: Fri, 3 Apr 2026 13:02:52 -0400 Subject: [PATCH v5 5/5] Allow read_stream_reset() to not wait for IO completion Not waiting for IO during read_stream_reset() can be important for performance in cases where read streams are frequently reset before the end is reached. Current users do not commonly do that, but the upcoming work to use a read stream to prefetch table blocks as part of index scans can do so frequently in some query patterns. E.g. if there is an index scan on the inner side of a nested loop antijoin. This takes a bit of care to do right. Just introducing support for abandoning a AIO handle could lead to the IO's completion not being processed until the backend exits. That's bad because it would lead to resources held onto for the IO (e.g. buffer pins) not being released and the handle showing up in pg_aios. To avoid that, the existing resowner cleanup is changed to wait for the IO's completion, which guarantees that by the end of the statement the IO has completed. We might eventually want to relax that for some operations (e.g., for background WAL writes or opportunistic prefetching). Discussion: https://postgr.es/m/f3xxfrkafjxpyqxywcxricxgyizjirfceychyxsgn7bwjp5eda@kwbduhy7tfmu --- src/include/storage/aio.h | 2 + src/include/storage/bufmgr.h | 1 + src/backend/storage/aio/aio.c | 99 +++++++++++++++--- src/backend/storage/aio/read_stream.c | 31 ++++-- src/backend/storage/buffer/bufmgr.c | 34 +++++++ src/test/modules/test_aio/t/001_aio.pl | 107 ++++++++++++++++++++ src/test/modules/test_aio/test_aio--1.0.sql | 2 +- src/test/modules/test_aio/test_aio.c | 38 +++++-- 8 files changed, 282 insertions(+), 32 deletions(-) diff --git a/src/include/storage/aio.h b/src/include/storage/aio.h index ec543b78409..ab7fad130bd 100644 --- a/src/include/storage/aio.h +++ b/src/include/storage/aio.h @@ -328,6 +328,8 @@ extern int pgaio_wref_get_id(PgAioWaitRef *iow); extern void pgaio_wref_wait(PgAioWaitRef *iow); extern bool pgaio_wref_check_done(PgAioWaitRef *iow); +extern void pgaio_wref_abandon(PgAioWaitRef *iow); + /* -------------------------------------------------------------------------------- diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index aa61a39d9e6..005d09905e3 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -252,6 +252,7 @@ extern bool StartReadBuffers(ReadBuffersOperation *operation, int *nblocks, int flags); extern bool WaitReadBuffers(ReadBuffersOperation *operation); +extern void AbandonReadBuffers(ReadBuffersOperation *operation); extern void ReleaseBuffer(Buffer buffer); extern void UnlockReleaseBuffer(Buffer buffer); diff --git a/src/backend/storage/aio/aio.c b/src/backend/storage/aio/aio.c index 8f7e26607b9..11ab9b9acff 100644 --- a/src/backend/storage/aio/aio.c +++ b/src/backend/storage/aio/aio.c @@ -152,11 +152,15 @@ const IoMethodOps *pgaio_method_ops; * operation succeeded and details about the first failure, if any. The error * can be raised / logged with pgaio_result_report(). * - * The lifetime of the memory pointed to be *ret needs to be at least as long - * as the passed in resowner. If the resowner releases resources before the IO - * completes (typically due to an error), the reference to *ret will be - * cleared. In case of resowner cleanup *ret will not be updated with the - * results of the IO operation. + * The lifetime of the memory pointed to by *ret needs to be at least as long + * as the passed in resowner. + * + * If the resowner releases resources before the IO completes (typically due + * to an error), the reference to *ret will be cleared. In case of resowner + * cleanup *ret will not be updated with the results of the IO operation. + * + * If the caller loses interest in the IO before completion + * pgaio_wref_abandon() can be used. */ PgAioHandle * pgaio_io_acquire(struct ResourceOwnerData *resowner, PgAioReturn *ret) @@ -278,6 +282,14 @@ pgaio_io_release_resowner(dlist_node *ioh_node, bool on_error) ResourceOwnerForgetAioHandle(ioh->resowner, &ioh->resowner_node); ioh->resowner = NULL; + /* + * Need to unregister the reporting of the IO's result, the memory it's + * referencing likely has gone away. Do so before potentially waiting + * below, as that could cause the undesired writes. + */ + if (ioh->report_return) + ioh->report_return = NULL; + switch ((PgAioHandleState) ioh->state) { case PGAIO_HS_IDLE: @@ -300,22 +312,32 @@ pgaio_io_release_resowner(dlist_node *ioh_node, bool on_error) if (!on_error) elog(WARNING, "AIO handle was not submitted"); pgaio_submit_staged(); - break; + + /* now that the IO is submitted, need to wait */ + pg_fallthrough; case PGAIO_HS_SUBMITTED: case PGAIO_HS_COMPLETED_IO: case PGAIO_HS_COMPLETED_SHARED: case PGAIO_HS_COMPLETED_LOCAL: - /* this is expected to happen */ + + /* + * This is expected to happen, e.g. after an error or after + * pgaio_wref_abandon() was called. + * + * For now always wait for the IO's completion during resowner + * cleanup. This provides a bound on how long after an error or + * pgaio_wref_abandon() an IO handle will show up as used and how + * long an uncompleted IO can cause resources to be retained. + * + * It is quite possible that we eventually want to support IO + * operations that last longer, e.g. for WAL writes in the + * background. If so we will either need to use a longer lived + * resowner or add a flag controlling when this cleanup happens. + */ + pgaio_io_wait(ioh, ioh->generation); break; } - /* - * Need to unregister the reporting of the IO's result, the memory it's - * referencing likely has gone away. - */ - if (ioh->report_return) - ioh->report_return = NULL; - RESUME_INTERRUPTS(); } @@ -1050,6 +1072,55 @@ pgaio_wref_check_done(PgAioWaitRef *iow) return false; } +/* + * Declare that a wait reference to an IO, started by this backend, is not of + * interest to this backend anymore. Once called, the PgAioReturn *ret passed + * to pgaio_io_acquire[_nb]() will not be updated anymore and thus can be + * freed. + */ +void +pgaio_wref_abandon(PgAioWaitRef *iow) +{ + uint64 ref_generation; + bool am_owner; + PgAioHandle *ioh; + PgAioHandleState state; + + ioh = pgaio_io_from_wref(iow, &ref_generation); + + am_owner = ioh->owner_procno == MyProcNumber; + + /* + * To ensure that the IO won't be recycled while we check (e.g. during the + * emission of a debug message). + */ + HOLD_INTERRUPTS(); + + /* + * It is safe to perform this check before checking if the IO was recycled + * as the owner of an IO cannot change. + */ + if (!am_owner) + elog(ERROR, "only IOs owned by current backend can be abandoned"); + + if (!pgaio_io_was_recycled(ioh, ref_generation, &state)) + { + pgaio_debug_io(DEBUG3, ioh, + "discarding result %p, resowner: %p", + ioh->report_return, ioh->resowner); + + /* + * All we need to do to abandon the IO is to clear its report_return + * field. Without that we could end up writing to freed/reused memory + * when the IO completes. + */ + if (ioh->report_return) + ioh->report_return = NULL; + } + + RESUME_INTERRUPTS(); +} + /* -------------------------------------------------------------------------------- diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c index 2adc04601b6..5e70a4f0ced 100644 --- a/src/backend/storage/aio/read_stream.c +++ b/src/backend/storage/aio/read_stream.c @@ -109,7 +109,8 @@ struct ReadStream * for IO combining even in cases where the IO subsystem can keep up at a * low read-ahead distance, as doing larger IOs is more efficient. * - * Set to 0 when the end of the stream is reached. + * Set to 0 when the end of the stream is reached and to -1 when the + * stream is reset. */ int16 combine_distance; int16 readahead_distance; @@ -473,8 +474,8 @@ read_stream_start_pending_read(ReadStream *stream) static inline bool read_stream_should_look_ahead(ReadStream *stream) { - /* If the callback has signaled end-of-stream, we're done */ - if (stream->readahead_distance == 0) + /* If we reached end-of-stream or a reset, we're done */ + if (stream->readahead_distance <= 0) return false; /* never start more IOs than our cap */ @@ -538,7 +539,7 @@ read_stream_should_issue_now(ReadStream *stream) * If the callback has signaled end-of-stream, start the pending read * immediately. There is no further potential for IO combining. */ - if (stream->readahead_distance == 0) + if (stream->readahead_distance <= 0) return true; /* @@ -646,7 +647,7 @@ read_stream_look_ahead(ReadStream *stream) * stream. In the worst case we can always make progress one buffer at a * time. */ - Assert(stream->pinned_buffers > 0 || stream->readahead_distance == 0); + Assert(stream->pinned_buffers > 0 || stream->readahead_distance <= 0); if (stream->batch_mode) pgaio_exit_batchmode(); @@ -1035,8 +1036,8 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) { Assert(stream->oldest_buffer_index == stream->next_buffer_index); - /* End of stream reached? */ - if (stream->readahead_distance == 0) + /* End of stream / reset reached? */ + if (stream->readahead_distance <= 0) return InvalidBuffer; /* @@ -1077,7 +1078,17 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) Assert(stream->ios[io_index].op.buffers == &stream->buffers[oldest_buffer_index]); - needed_wait = WaitReadBuffers(&stream->ios[io_index].op); + /* + * If the stream has been reset, don't even wait for the IO, just + * abandon it. + */ + if (stream->readahead_distance < 0) + { + AbandonReadBuffers(&stream->ios[io_index].op); + needed_wait = false; + } + else + needed_wait = WaitReadBuffers(&stream->ios[io_index].op); Assert(stream->ios_in_progress > 0); stream->ios_in_progress--; @@ -1303,8 +1314,8 @@ read_stream_reset(ReadStream *stream) Buffer buffer; /* Stop looking ahead. */ - stream->readahead_distance = 0; - stream->combine_distance = 0; + stream->readahead_distance = -1; + stream->combine_distance = -1; /* Forget buffered block number and fast path state. */ stream->buffered_blocknum = InvalidBlockNumber; diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 5c64570020d..3167df4b06e 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -1604,6 +1604,10 @@ StartReadBuffersImpl(ReadBuffersOperation *operation, * buffers must remain valid until WaitReadBuffers() is called, and any * forwarded buffers must also be preserved for a continuing call unless * they are explicitly released. + * + * If true was returned, the memory underlying the ReadBuffersOperation needs + * to stay around until either WaitReadBuffers() or AbandonReadBuffers() is + * called (or an error is thrown). */ bool StartReadBuffers(ReadBuffersOperation *operation, @@ -2165,6 +2169,36 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) return true; } +/* + * Declare that this backend is not interested in the operation anymore. This + * needs to be called if StartReadBuffers() returned true and the + * ReadBuffersOperation is to be freed without calling WaitReadBuffers() + * (leaving errors aside). + * + * It is the caller's responsibility to release buffer pins (seems simpler + * that way, as that already is required if no IO had been necessary). + */ +void +AbandonReadBuffers(ReadBuffersOperation *operation) +{ + PgAioWaitRef io_wref = operation->io_wref; + + /* see equivalent WaitReadBuffers() check */ + if (!pgaio_wref_valid(&io_wref) && io_method != IOMETHOD_SYNC) + elog(ERROR, "abandoning read operation that didn't read"); + + if (!pgaio_wref_valid(&io_wref)) + return; + + pgaio_wref_clear(&operation->io_wref); + + /* can't abandon foreign IOs (nor do we need to) */ + if (operation->foreign_io) + operation->foreign_io = false; + else + pgaio_wref_abandon(&io_wref); +} + /* * BufferAlloc -- subroutine for PinBufferForBlock. Handles lookup of a shared * buffer. If no buffer exists already, selects a replacement victim and diff --git a/src/test/modules/test_aio/t/001_aio.pl b/src/test/modules/test_aio/t/001_aio.pl index 63cadd64c15..1fbca2b5955 100644 --- a/src/test/modules/test_aio/t/001_aio.pl +++ b/src/test/modules/test_aio/t/001_aio.pl @@ -15,6 +15,10 @@ use TestAio; my @methods = TestAio::supported_io_methods(); my %nodes; +# Putting it inline makes perltidy do ugly things +my $count_my_aios_query = + 'SELECT count(*) FROM pg_aios WHERE pid = pg_backend_pid()'; + ### # Create and configure one instance for each io_method @@ -1616,6 +1620,61 @@ INSERT INTO tmp_ok SELECT generate_series(1, 5000); qq|SELECT blockoff, blocknum, io_reqd and not foreign_io, nblocks FROM read_buffers('$table', 1, 3)|, qr/^0\|1\|t\|2\n2\|3\|f\|1$/, qr/^$/); + + + ### + # Test that abandoning IO works and that it does not cause issues. + ### + + # Test that even after abandoning IO we do wait for the IOs at the end + # of the statement. + $psql_a->query_safe(qq|SET io_combine_limit=2|); + $psql_a->query_safe(qq|SELECT evict_rel('$table')|); + psql_like( + $io_method, + $psql_a, + "$persistency: read buffers & abandon it", + qq|SELECT abandoned, count(*) FROM read_buffers('$table', 0, 6, abandon_after => 2) GROUP BY 1 ORDER BY 1|, + qr/^f\|1\nt\|2$/, + qr/^$/); + # Due to the end-of-statement wait there should be no IOs anymore + is($psql_a->query_safe($count_my_aios_query), 0, + "$io_method: $persistency: abandoned IO completed by end of statement" + ); + + # Test that after abandoning IO buffer access still works. + # + # First test that by just issuing another read_buffers() in the same + # statement (so that the abandoned IOs aren't waited-for during + # end-of-statement resowner handling). + $psql_a->query_safe(qq|SELECT evict_rel('$table')|); + psql_like( + $io_method, + $psql_a, + "$persistency: read buffers & abandon it & read again", + qq| + SELECT SUM(nblocks) FROM read_buffers('$table', 0, 4, abandon_after => 2) + UNION ALL + SELECT SUM(nblocks) FROM read_buffers('$table', 0, 4) + |, + qr/^4\n4$/, + qr/^$/); + + # Now test that a plain SELECT needing buffers affected by abandoned + # IO work + $psql_a->query_safe(qq|SELECT evict_rel('$table')|); + psql_like( + $io_method, + $psql_a, + "$persistency: read buffers & abandon it & read again via SELECT", + qq| + SELECT SUM(nblocks) FROM read_buffers('$table', 0, 4, abandon_after => 2) + UNION ALL + SELECT SUM(data) FROM $table WHERE data < 1000 + |, + qr/^4\n499500$/, + qr/^$/); + $psql_a->query_safe(qq|RESET io_combine_limit|); } # The remaining tests don't make sense for temp tables, as they are @@ -1837,6 +1896,54 @@ read_buffers('$table', 0, 4)|, $psql_a->{stdout} = ''; + ### + # Test that abandoning IO actually avoids waiting for IO. + ### + + # Testing not waiting only works if the IO method doesn't execute IO + # synchronously, which is why we fundamentally can't test with + # io_method=sync. Furthermore we need to work around io_method=io_uring + # potentially executing the IO synchronously - we can do so by making the + # IOs big enough (c.f. pgaio_uring_should_use_async()). + # + # To make the test reliable we have to abandon all IOs, as waiting for + # some IOs could lead to also consuming the completion of the IO that will + # trigger a wait in the completion. + if ($io_method ne 'sync') + { + $psql_a->query_safe(qq|SELECT evict_rel('$table')|); + $psql_a->query_safe(qq|SET io_combine_limit=5|); + $psql_b->query_safe( + qq/SELECT inj_io_completion_wait( + relfilenode=>pg_relation_filenode('$table'), + blockno=>4);/); + ok(1, + "$io_method: $persistency: configure wait in completion of block 4" + ); + + # Need to end the wait in the completion before the statement is over, + # otherwise we'll wait during resowner cleanup at the end of the + # statement. + psql_like( + $io_method, + $psql_a, + "$persistency: read buffers abandoning blocked IO avoids wait", + qq| + SELECT count(*) > 0 FROM read_buffers('$table', 0, 10, abandon_after => 0) WHERE abandoned + UNION ALL + SELECT count(*) > 0 FROM pg_aios WHERE pid = pg_backend_pid() + UNION ALL + SELECT inj_io_completion_continue() IS NOT NULL + |, + qr/^t\nt\nt$/, + qr/^$/); + is($psql_a->query_safe($count_my_aios_query), 0, + "$io_method: $persistency: abandoned IO is completed at end of statement" + ); + + $psql_a->query_safe(qq|RESET io_combine_limit|); + } + $psql_a->quit(); $psql_b->quit(); $psql_c->quit(); diff --git a/src/test/modules/test_aio/test_aio--1.0.sql b/src/test/modules/test_aio/test_aio--1.0.sql index 762ac29512f..ff3b200cff3 100644 --- a/src/test/modules/test_aio/test_aio--1.0.sql +++ b/src/test/modules/test_aio/test_aio--1.0.sql @@ -53,7 +53,7 @@ CREATE FUNCTION buffer_call_terminate_io(buffer int, for_input bool, succeed boo RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME' LANGUAGE C; -CREATE FUNCTION read_buffers(rel regclass, startblock int4, nblocks int4, OUT blockoff int4, OUT blocknum int4, OUT io_reqd bool, OUT foreign_io bool, OUT nblocks int4, OUT buf int4[]) +CREATE FUNCTION read_buffers(rel regclass, startblock int4, nblocks int4, abandon_after int4 DEFAULT -1, OUT blockoff int4, OUT blocknum int4, OUT io_reqd bool, OUT foreign_io bool, OUT abandoned bool, OUT nblocks int4, OUT buf int4[]) RETURNS SETOF record STRICT AS 'MODULE_PATHNAME' LANGUAGE C; diff --git a/src/test/modules/test_aio/test_aio.c b/src/test/modules/test_aio/test_aio.c index d7530681192..c9a8b21b972 100644 --- a/src/test/modules/test_aio/test_aio.c +++ b/src/test/modules/test_aio/test_aio.c @@ -709,11 +709,13 @@ read_buffers(PG_FUNCTION_ARGS) Oid relid = PG_GETARG_OID(0); BlockNumber startblock = PG_GETARG_UINT32(1); int32 nblocks = PG_GETARG_INT32(2); + int32 abandon_after = PG_GETARG_INT32(3); ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; Relation rel; SMgrRelation smgr; int nblocks_done = 0; int nblocks_disp = 0; + int nblocks_wait = 0; int nios = 0; ReadBuffersOperation *operations; Buffer *buffers; @@ -735,6 +737,9 @@ read_buffers(PG_FUNCTION_ARGS) rel = relation_open(relid, AccessShareLock); smgr = RelationGetSmgr(rel); + if (abandon_after < 0) + abandon_after = nblocks; + /* * Do StartReadBuffers() until IO for all the required blocks has been * started (if required). @@ -769,9 +774,17 @@ read_buffers(PG_FUNCTION_ARGS) for (int nio = 0; nio < nios; nio++) { ReadBuffersOperation *operation = &operations[nio]; + int nblocks_this_io = nblocks_per_io[nio]; if (io_reqds[nio]) - WaitReadBuffers(operation); + { + if (nblocks_wait < abandon_after) + WaitReadBuffers(operation); + else + AbandonReadBuffers(operation); + } + + nblocks_wait += nblocks_this_io; } /* @@ -781,8 +794,8 @@ read_buffers(PG_FUNCTION_ARGS) { ReadBuffersOperation *operation = &operations[nio]; int nblocks_this_io = nblocks_per_io[nio]; - Datum values[6] = {0}; - bool nulls[6] = {0}; + Datum values[7] = {0}; + bool nulls[7] = {0}; ArrayType *buffers_arr; /* convert buffer array to datum array */ @@ -815,14 +828,18 @@ read_buffers(PG_FUNCTION_ARGS) values[3] = BoolGetDatum(io_reqds[nio] ? operation->foreign_io : false); nulls[3] = false; - /* nblocks */ - values[4] = Int32GetDatum(nblocks_this_io); + /* abandoned */ + values[4] = BoolGetDatum(nblocks_disp >= abandon_after); nulls[4] = false; - /* array of buffers */ - values[5] = PointerGetDatum(buffers_arr); + /* nblocks */ + values[5] = Int32GetDatum(nblocks_this_io); nulls[5] = false; + /* array of buffers */ + values[6] = PointerGetDatum(buffers_arr); + nulls[6] = false; + tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls); nblocks_disp += nblocks_this_io; @@ -1075,6 +1092,13 @@ inj_io_completion_wait_matches(PgAioHandle *ioh) !(inj_blockno >= io_blockno && inj_blockno < (io_blockno + td->smgr.nblocks))) return false; + ereport(LOG, + errmsg("wait injection point matches for IO %d, inj blockno %d, io blockno %d, io nblocks %d", + pgaio_io_get_id(ioh), + inj_blockno, io_blockno, td->smgr.nblocks + ), + errhidestmt(true), errhidecontext(true)); + return true; } -- 2.53.0.1.gb2826b52eb
