Hi,

On 2026-04-03 15:04:51 -0400, Melanie Plageman wrote:
> On Fri, Apr 3, 2026 at 1:30 PM Andres Freund <[email protected]> wrote:
> >
> > > - why not remove the combine_distance requirement from the fast path
> > > entry criteria (could save resume_combine_distance in the fast path
> > > and restore it after a miss)
> >
> > Because entering the fast path prevents IO combining from happening. So it's
> > absolutely crucial that we do *not* enter it while we would combine.
>
> But if it is a buffer hit, obviously we can't do IO combining anyway,
> or am I misunderstanding the fast path's common case?

It's true that we can't do combining in the fast path, but the problem is that
with eic=0/1 (or a recent history that leaves us with low distances or a low
pin limit), we will not start the next IO until there are no more buffers
pinned.

Imagine that we started one 16 block IO and have a readahead_distance of
1. After consuming 15 buffers, we will have one more buffer pinned, but
read_stream_look_ahead() will not yet start another IO, due to the
readahead_distance condition (or max_pinned_buffers or ...).  Without the
stream->combine_distance == 1 check, the subsequent check for
read_stream_next_buffer() would consider this a valid case for entering
fast-path.


> > > You mentioned that we don't want to read too far ahead (including for
> > > a single combined IO) in part because:
> > >
> > > > The resowner and private refcount mechanisms take more CPU cycles if 
> > > > you have
> > > > more buffers pinned
> > >
> > > But I don't see how either distance is responding to this or
> > > self-calibrating with this in mind
> >
> > Using the minimal required distance to avoid needing to wait for IO 
> > completion
> > is responding to that, no?  Without these patches we read ahead as far as
> > possible, even if all the data is in the page cache, which makes this issue
> > way worse (without these patches it's a major source of regressions in the
> > index prefetching patch).
>
> But we aren't using the minimal distance to avoid needing to wait for
> IO completion. We are also using a higher distance to try and get IO
> combining and  toallow for async copying into the kernel buffer cache,
> etc, etc.

My testing suggests that doing IO combining for a reasonble io_combine_limit
is pretty much always a win in a steady-state stream (i.e. not a short one
that's not fully consumed), the gain from avoiding the larger amounts of
syscalls sufficiently large.

One we start doing async copying from the kernel page cache, we will have to
wait for the completion of that async work, which will lead to
readahead_distance being increased if necessary.


> There's a lot of different considerations; it isn't just two opposing
> forces.

It's not, but I think always performing io_combine_limit sized IOs after a
ramp-up and increasing the distance based on needing to wait is a pretty
decent heuristic.

For best results it does require pgaio_uring_should_use_async() to trigger, as
otherwise we do not get get the parallelized memory copy. Which means it may
never trigger if we don't occasionally reach the size based condition. Luckily
it does not seem like using async is beneficial for small IOs.


> And, I'd imagine that the relationship between the
> number of buffers pinned and CPU cycles required for resowner/refcount
> isn't perfectly linear.

It's definitely not.


> I'm not saying that we don't do IO combining at high distances, I'm
> more saying that it is confusing that combine_distance controls how
> far we look ahead when readahead_distance is low but when
> readahead_distance is high, it controls when we issue the IO and not
> how far we look ahead. I don't think we should change course now, but
> I wanted to call out that this felt a little uncomfortable to me.

I'm not sure I see an alternative.  I tried to at least improve the comments
around this.



Attached are a revised set of commits. The largest changes are:

- Reordered the series to put
  "read_stream: Only increase read-ahead distance when waiting for IO"
  after
  "stream: Split decision about look ahead for AIO and combining"

  Previously I thought it'd be too awkward from a comment perspective, but
  there's only one comment where it is a bit odd.

  Think it's much clearer this way.


- Largely rewrote "Hacky implementation of making read_stream_reset()/end()
  not wait for IO". Looks a lot saner now.

  Think this needs a few more tests, in particular for the read stream and
  foreign_io paths. Will do that in the next version.

- Tried to address most of Bilal's and Melanie's feedback

- Removed some redundant checks from read_stream_should_issue_now()

- Lots of comment polishing, including revising the top-level read_stream.c
  comment

Greetings,

Andres Freund
>From b9101ab942bb8f954450fc5a2b2b72168c370ce1 Mon Sep 17 00:00:00 2001
From: Andres Freund <[email protected]>
Date: Tue, 3 Mar 2026 20:23:55 -0500
Subject: [PATCH v5 1/5] aio: io_uring: Trigger async processing for large IOs

io_method=io_uring has a heuristic to trigger asynchronous processing of IOs
once the IO depth is a bit larger. That heuristic is important when doing
buffered IO from the kernel page cache, to allow parallelizing of the memory
copy, as otherwise io_method=io_uring would be a lot slower than
io_method=worker in that case.

An upcoming commit will make read_stream.c only increase the read-ahead
distance if we needed to wait for IO to complete. If to-be-read data is in the
kernel page cache, io_uring will synchronously execute IO, unless the IO is
flagged as async.  Therefore the aforementioned change in read_stream.c
heuristic would lead to a substantial performance regression with io_uring
when data is in the page cache, as we would never reach a deep enough queue to
actually trigger the existing heuristic.

Parallelizing the copy from the page cache is mainly important when doing a
lot of IO, which commonly is only possible when doing largely sequential IO.

The reason we don't just mark all io_uring IOs as asynchronous is that the
dispatch to a kernel thread has overhead. This overhead is mostly noticeable
with small random IOs with a low queue depth, as in that case the gain from
parallelizing the memory copy is small and the latency cost high.

The facts from the two prior paragraphs show a way out: Use the size of the IO
in addition to the depth of the queue to trigger asynchronous processing.

One might think that just using the IO size might be enough, but
experimentation has shown that not to be the case - with deep look-ahead
distances being able to parallelize the memory copy is important even with
smaller IOs.

Reviewed-by: Melanie Plageman <[email protected]>
Reviewed-by: Nazir Bilal Yavuz <[email protected]>
Discussion: https://postgr.es/m/f3xxfrkafjxpyqxywcxricxgyizjirfceychyxsgn7bwjp5eda@kwbduhy7tfmu
Discussion: https://postgr.es/m/ca+hukgl2phfydoqrhefqasonaxhsg48t1phs3vm8badrzqk...@mail.gmail.com
---
 src/backend/storage/aio/method_io_uring.c | 89 +++++++++++++++++------
 1 file changed, 67 insertions(+), 22 deletions(-)

diff --git a/src/backend/storage/aio/method_io_uring.c b/src/backend/storage/aio/method_io_uring.c
index 39984df31b4..b42d28647b8 100644
--- a/src/backend/storage/aio/method_io_uring.c
+++ b/src/backend/storage/aio/method_io_uring.c
@@ -409,7 +409,6 @@ static int
 pgaio_uring_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
 {
 	struct io_uring *uring_instance = &pgaio_my_uring_context->io_uring_ring;
-	int			in_flight_before = dclist_count(&pgaio_my_backend->in_flight_ios);
 
 	Assert(num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE);
 
@@ -425,27 +424,6 @@ pgaio_uring_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
 
 		pgaio_io_prepare_submit(ioh);
 		pgaio_uring_sq_from_io(ioh, sqe);
-
-		/*
-		 * io_uring executes IO in process context if possible. That's
-		 * generally good, as it reduces context switching. When performing a
-		 * lot of buffered IO that means that copying between page cache and
-		 * userspace memory happens in the foreground, as it can't be
-		 * offloaded to DMA hardware as is possible when using direct IO. When
-		 * executing a lot of buffered IO this causes io_uring to be slower
-		 * than worker mode, as worker mode parallelizes the copying. io_uring
-		 * can be told to offload work to worker threads instead.
-		 *
-		 * If an IO is buffered IO and we already have IOs in flight or
-		 * multiple IOs are being submitted, we thus tell io_uring to execute
-		 * the IO in the background. We don't do so for the first few IOs
-		 * being submitted as executing in this process' context has lower
-		 * latency.
-		 */
-		if (in_flight_before > 4 && (ioh->flags & PGAIO_HF_BUFFERED))
-			io_uring_sqe_set_flags(sqe, IOSQE_ASYNC);
-
-		in_flight_before++;
 	}
 
 	while (true)
@@ -701,10 +679,63 @@ pgaio_uring_check_one(PgAioHandle *ioh, uint64 ref_generation)
 	LWLockRelease(&owner_context->completion_lock);
 }
 
+/*
+ * io_uring executes IO in process context if possible. That's generally good,
+ * as it reduces context switching. When performing a lot of buffered IO that
+ * means that copying between page cache and userspace memory happens in the
+ * foreground, as it can't be offloaded to DMA hardware as is possible when
+ * using direct IO. When executing a lot of buffered IO this causes io_uring
+ * to be slower than worker mode, as worker mode parallelizes the
+ * copying. io_uring can be told to offload work to worker threads instead.
+ *
+ * If the IOs are small, we only benefit from forcing things into the
+ * background if there is a lot of IO, as otherwise the overhead from context
+ * switching is higher than the gain.
+ *
+ * If IOs are large, there is benefit from asynchronous processing at lower
+ * queue depths, as IO latency is less of a crucial factor and parallelizing
+ * memory copies is more important.  In addition, it is important to trigger
+ * asynchronous processing even at low queue depth, as with foreground
+ * processing we might never actually reach deep enough IO depths to trigger
+ * asynchronous processing, which in turn would deprive readahead control
+ * logic of information about whether a deeper look-ahead distance would be
+ * advantageous.
+ *
+ * We have done some basic benchmarking to validate the thresholds used, but
+ * it's quite plausible that there are better values.
+ */
+static bool
+pgaio_uring_should_use_async(PgAioHandle *ioh, size_t io_size)
+{
+	/*
+	 * With DIO there's no benefit from forcing asynchronous processing, as
+	 * io_uring will never execute direct IO synchronously.
+	 */
+	if (!(ioh->flags & PGAIO_HF_BUFFERED))
+		return false;
+
+	/*
+	 * Once the IO queue depth is not that shallow anymore, the overhead of
+	 * dispatching to the background is a less significant factor.
+	 */
+	if (dclist_count(&pgaio_my_backend->in_flight_ios) > 4)
+		return true;
+
+	/*
+	 * If the IO is larger, the gains from parallelizing the memory copy are
+	 * larger and typically the impact of the latency is smaller.
+	 */
+	if (io_size >= (BLCKSZ * 4))
+		return true;
+
+	return false;
+}
+
 static void
 pgaio_uring_sq_from_io(PgAioHandle *ioh, struct io_uring_sqe *sqe)
 {
 	struct iovec *iov;
+	size_t		io_size = 0;
 
 	switch ((PgAioOp) ioh->op)
 	{
@@ -717,6 +748,8 @@ pgaio_uring_sq_from_io(PgAioHandle *ioh, struct io_uring_sqe *sqe)
 								   iov->iov_base,
 								   iov->iov_len,
 								   ioh->op_data.read.offset);
+
+				io_size = iov->iov_len;
 			}
 			else
 			{
@@ -726,7 +759,13 @@ pgaio_uring_sq_from_io(PgAioHandle *ioh, struct io_uring_sqe *sqe)
 									ioh->op_data.read.iov_length,
 									ioh->op_data.read.offset);
 
+				for (int i = 0; i < ioh->op_data.read.iov_length; i++, iov++)
+					io_size += iov->iov_len;
 			}
+
+			if (pgaio_uring_should_use_async(ioh, io_size))
+				io_uring_sqe_set_flags(sqe, IOSQE_ASYNC);
+
 			break;
 
 		case PGAIO_OP_WRITEV:
@@ -747,6 +786,12 @@ pgaio_uring_sq_from_io(PgAioHandle *ioh, struct io_uring_sqe *sqe)
 									 ioh->op_data.write.iov_length,
 									 ioh->op_data.write.offset);
 			}
+
+			/*
+			 * For now don't trigger use of IOSQE_ASYNC for writes, it's not
+			 * clear there is a performance benefit in doing so.
+			 */
+
 			break;
 
 		case PGAIO_OP_INVALID:
-- 
2.53.0.1.gb2826b52eb

>From 6d46d92d95ed7ef13dbfa05aa652d93f8e0367b3 Mon Sep 17 00:00:00 2001
From: Andres Freund <[email protected]>
Date: Tue, 31 Mar 2026 12:49:28 -0400
Subject: [PATCH v5 2/5] read_stream: Move logic about IO combining & issuing
 to helpers

The long if statements were hard to read and hard to document. Splitting them
into inline helpers makes it much easier to explain each part separately.

This is done in preparation for making the logic more complicated...

Reviewed-by: Nazir Bilal Yavuz <[email protected]>
Discussion: https://postgr.es/m/f3xxfrkafjxpyqxywcxricxgyizjirfceychyxsgn7bwjp5eda@kwbduhy7tfmu
---
 src/backend/storage/aio/read_stream.c | 97 ++++++++++++++++++++++-----
 1 file changed, 80 insertions(+), 17 deletions(-)

diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index 31f9e35dee3..3499776d210 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -440,6 +440,78 @@ read_stream_start_pending_read(ReadStream *stream)
 	return true;
 }
 
+/*
+ * Should we continue to perform look ahead?  The look ahead may allow us to
+ * make the pending IO larger via IO combining or to issue more read ahead.
+ */
+static inline bool
+read_stream_should_look_ahead(ReadStream *stream)
+{
+	/* If the callback has signaled end-of-stream, we're done */
+	if (stream->distance == 0)
+		return false;
+
+	/* never start more IOs than our cap */
+	if (stream->ios_in_progress >= stream->max_ios)
+		return false;
+
+	/*
+	 * Don't start more read-ahead if that'd put us over the distance limit
+	 * for doing read-ahead. As stream->distance is capped by
+	 * max_pinned_buffers, this prevents us from looking ahead so far that it
+	 * would put us over the pin limit.
+	 */
+	if (stream->pinned_buffers + stream->pending_read_nblocks >= stream->distance)
+		return false;
+
+	return true;
+}
+
+/*
+ * We don't start the pending read just because we've hit the distance limit,
+ * preferring to give it another chance to grow to full io_combine_limit size
+ * once more buffers have been consumed.  But this is not desirable in all
+ * situations - see below.
+ */
+static inline bool
+read_stream_should_issue_now(ReadStream *stream)
+{
+	int16		pending_read_nblocks = stream->pending_read_nblocks;
+
+	/* there is no pending IO that could be issued */
+	if (pending_read_nblocks == 0)
+		return false;
+
+	/* never start more IOs than our cap */
+	if (stream->ios_in_progress >= stream->max_ios)
+		return false;
+
+	/*
+	 * If the callback has signaled end-of-stream, start the pending read
+	 * immediately. There is no further potential for IO combining.
+	 */
+	if (stream->distance == 0)
+		return true;
+
+	/*
+	 * If we've already reached io_combine_limit, there's no chance of growing
+	 * the read further.
+	 */
+	if (pending_read_nblocks >= stream->io_combine_limit)
+		return true;
+
+	/*
+	 * If we currently have no reads in flight or prepared, issue the IO once
+	 * we are not looking ahead further. This ensures there's always at least
+	 * one IO prepared.
+	 */
+	if (stream->pinned_buffers == 0 &&
+		!read_stream_should_look_ahead(stream))
+		return true;
+
+	return false;
+}
+
 static void
 read_stream_look_ahead(ReadStream *stream)
 {
@@ -452,14 +524,13 @@ read_stream_look_ahead(ReadStream *stream)
 	if (stream->batch_mode)
 		pgaio_enter_batchmode();
 
-	while (stream->ios_in_progress < stream->max_ios &&
-		   stream->pinned_buffers + stream->pending_read_nblocks < stream->distance)
+	while (read_stream_should_look_ahead(stream))
 	{
 		BlockNumber blocknum;
 		int16		buffer_index;
 		void	   *per_buffer_data;
 
-		if (stream->pending_read_nblocks == stream->io_combine_limit)
+		if (read_stream_should_issue_now(stream))
 		{
 			read_stream_start_pending_read(stream);
 			continue;
@@ -511,21 +582,13 @@ read_stream_look_ahead(ReadStream *stream)
 	}
 
 	/*
-	 * We don't start the pending read just because we've hit the distance
-	 * limit, preferring to give it another chance to grow to full
-	 * io_combine_limit size once more buffers have been consumed.  However,
-	 * if we've already reached io_combine_limit, or we've reached the
-	 * distance limit and there isn't anything pinned yet, or the callback has
-	 * signaled end-of-stream, we start the read immediately.  Note that the
-	 * pending read can exceed the distance goal, if the latter was reduced
-	 * after hitting the per-backend buffer limit.
+	 * Check if the pending read should be issued now, or if we should give it
+	 * another chance to grow to the full size.
+	 *
+	 * Note that the pending read can exceed the distance goal, if the latter
+	 * was reduced after hitting the per-backend buffer limit.
 	 */
-	if (stream->pending_read_nblocks > 0 &&
-		(stream->pending_read_nblocks == stream->io_combine_limit ||
-		 (stream->pending_read_nblocks >= stream->distance &&
-		  stream->pinned_buffers == 0) ||
-		 stream->distance == 0) &&
-		stream->ios_in_progress < stream->max_ios)
+	if (read_stream_should_issue_now(stream))
 		read_stream_start_pending_read(stream);
 
 	/*
-- 
2.53.0.1.gb2826b52eb

>From c3858e25c03f8f949cf1564e52236b5647427cbb Mon Sep 17 00:00:00 2001
From: Andres Freund <[email protected]>
Date: Fri, 3 Apr 2026 13:01:26 -0400
Subject: [PATCH v5 3/5] read stream: Split decision about look ahead for AIO
 and combining

In a subsequent commit the read-ahead distance will only be increased when
waiting for IO. Without further work that would cause a regression: As IO
combining and read-ahead are currently controlled by the same mechanism, we
would end up not allowing IO combining when never needing to wait for IO (as
the distance ends up too small to allow for full sized IOs), which can
increase CPU overhead. A typical reason to not have to wait for IO completion
at a low look-ahead distance is use of io_uring with the to-be-read data in
the page cache. But even with worker the IO submission rate may be low enough
for the worker to keep up.

One might think that we could just always perform IO combining, but doing so
at the start of a scan can cause performance regressions:

1) Performing a large IO commonly has a higher latency than smaller IOs. That
   is not a problem once reading ahead far enough, but at the start of a stream
   it can lead to longer waits for IO completion.

2) Sometimes read streams will not be read to completion. Immediately starting
   with full sized IOs leads to more wasted effort. This is not commonly an
   issue with existing read stream users, but the upcoming use of read streams
   to fetch table pages as part of an index scan frequently encounters this.

One of the comments in read_stream_should_look_ahead() refers to a motivation
that only really exists as of the next commit, but without it the code doesn't
make sense on its own.

Reviewed-by: Nazir Bilal Yavuz <[email protected]>
Reviewed-by: Melanie Plageman <[email protected]>
Discussion: https://postgr.es/m/f3xxfrkafjxpyqxywcxricxgyizjirfceychyxsgn7bwjp5eda@kwbduhy7tfmu
Discussion: https://postgr.es/m/ca+hukgl2phfydoqrhefqasonaxhsg48t1phs3vm8badrzqk...@mail.gmail.com
---
 src/backend/storage/aio/read_stream.c | 141 +++++++++++++++++++-------
 1 file changed, 107 insertions(+), 34 deletions(-)

diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index 3499776d210..a63e66988e1 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -98,10 +98,23 @@ struct ReadStream
 	int16		max_pinned_buffers;
 	int16		forwarded_buffers;
 	int16		pinned_buffers;
-	int16		distance;
+
+	/*
+	 * Limit of how far, in blocks, to look-ahead for IO combining and for
+	 * read-ahead.
+	 *
+	 * The limits for read-ahead and combining are handled separately to allow
+	 * for IO combining even in cases where the IO subsystem can keep up at a
+	 * low read-ahead distance, as doing larger IOs is more efficient.
+	 *
+	 * Set to 0 when the end of the stream is reached.
+	 */
+	int16		combine_distance;
+	int16		readahead_distance;
 	uint16		distance_decay_holdoff;
 	int16		initialized_buffers;
-	int16		resume_distance;
+	int16		resume_readahead_distance;
+	int16		resume_combine_distance;
 	int			read_buffers_flags;
 	bool		sync_mode;		/* using io_method=sync */
 	bool		batch_mode;		/* READ_STREAM_USE_BATCHING */
@@ -332,8 +345,8 @@ read_stream_start_pending_read(ReadStream *stream)
 
 		/* Shrink distance: no more look-ahead until buffers are released. */
 		new_distance = stream->pinned_buffers + buffer_limit;
-		if (stream->distance > new_distance)
-			stream->distance = new_distance;
+		if (stream->readahead_distance > new_distance)
+			stream->readahead_distance = new_distance;
 
 		/* Unless we have nothing to give the consumer, stop here. */
 		if (stream->pinned_buffers > 0)
@@ -374,12 +387,23 @@ read_stream_start_pending_read(ReadStream *stream)
 		 * perform IO asynchronously when starting out with a small look-ahead
 		 * distance.
 		 */
-		if (stream->distance > 1 && stream->ios_in_progress == 0)
+		if (stream->ios_in_progress == 0)
 		{
-			if (stream->distance_decay_holdoff == 0)
-				stream->distance--;
-			else
+			if (stream->distance_decay_holdoff > 0)
 				stream->distance_decay_holdoff--;
+			else
+			{
+				if (stream->readahead_distance > 1)
+					stream->readahead_distance--;
+
+				/*
+				 * XXX: Should we actually reduce this at any time other than
+				 * a reset? For now we have to, as this is also a condition
+				 * for re-enabling fast_path.
+				 */
+				if (stream->combine_distance > 1)
+					stream->combine_distance--;
+			}
 		}
 	}
 	else
@@ -448,20 +472,42 @@ static inline bool
 read_stream_should_look_ahead(ReadStream *stream)
 {
 	/* If the callback has signaled end-of-stream, we're done */
-	if (stream->distance == 0)
+	if (stream->readahead_distance == 0)
 		return false;
 
 	/* never start more IOs than our cap */
 	if (stream->ios_in_progress >= stream->max_ios)
 		return false;
 
+	/*
+	 * Allow looking further ahead if we are in the process of building a
+	 * larger IO, the IO is not yet big enough and we don't yet have IO in
+	 * flight.  Note that this is allowed even if we have reached the
+	 * read-ahead limit (but not the buffer pin limit, combine_distance is
+	 * capped by it and we are checking for pinned_buffers == 0).
+	 *
+	 * The reason this is restricted to not yet having an IO in flight is that
+	 * once we are actually reading ahead, we will not issue IOs before they
+	 * have reached the full size (or can't be grown further). But we *have*
+	 * to issue an IO once pinned_buffers == 0, otherwise there won't be a
+	 * buffer to return to the caller.
+	 *
+	 * This is important for cases where either effective_io_concurrency is
+	 * low or we never need to wait for IO and thus are not increasing the
+	 * distance. Without this we would end up with lots of small IOs.
+	 */
+	if (stream->pending_read_nblocks > 0 &&
+		stream->pinned_buffers == 0 &&
+		stream->pending_read_nblocks < stream->combine_distance)
+		return true;
+
 	/*
 	 * Don't start more read-ahead if that'd put us over the distance limit
 	 * for doing read-ahead. As stream->distance is capped by
 	 * max_pinned_buffers, this prevents us from looking ahead so far that it
 	 * would put us over the pin limit.
 	 */
-	if (stream->pinned_buffers + stream->pending_read_nblocks >= stream->distance)
+	if (stream->pinned_buffers + stream->pending_read_nblocks >= stream->readahead_distance)
 		return false;
 
 	return true;
@@ -490,14 +536,14 @@ read_stream_should_issue_now(ReadStream *stream)
 	 * If the callback has signaled end-of-stream, start the pending read
 	 * immediately. There is no further potential for IO combining.
 	 */
-	if (stream->distance == 0)
+	if (stream->readahead_distance == 0)
 		return true;
 
 	/*
-	 * If we've already reached io_combine_limit, there's no chance of growing
+	 * If we've already reached combine_distance, there's no chance of growing
 	 * the read further.
 	 */
-	if (pending_read_nblocks >= stream->io_combine_limit)
+	if (pending_read_nblocks >= stream->combine_distance)
 		return true;
 
 	/*
@@ -550,7 +596,8 @@ read_stream_look_ahead(ReadStream *stream)
 		if (blocknum == InvalidBlockNumber)
 		{
 			/* End of stream. */
-			stream->distance = 0;
+			stream->readahead_distance = 0;
+			stream->combine_distance = 0;
 			break;
 		}
 
@@ -597,7 +644,7 @@ read_stream_look_ahead(ReadStream *stream)
 	 * stream.  In the worst case we can always make progress one buffer at a
 	 * time.
 	 */
-	Assert(stream->pinned_buffers > 0 || stream->distance == 0);
+	Assert(stream->pinned_buffers > 0 || stream->readahead_distance == 0);
 
 	if (stream->batch_mode)
 		pgaio_exit_batchmode();
@@ -787,10 +834,17 @@ read_stream_begin_impl(int flags,
 	 * doing full io_combine_limit sized reads.
 	 */
 	if (flags & READ_STREAM_FULL)
-		stream->distance = Min(max_pinned_buffers, stream->io_combine_limit);
+	{
+		stream->readahead_distance = Min(max_pinned_buffers, stream->io_combine_limit);
+		stream->combine_distance = Min(max_pinned_buffers, stream->io_combine_limit);
+	}
 	else
-		stream->distance = 1;
-	stream->resume_distance = stream->distance;
+	{
+		stream->readahead_distance = 1;
+		stream->combine_distance = 1;
+	}
+	stream->resume_readahead_distance = stream->readahead_distance;
+	stream->resume_combine_distance = stream->combine_distance;
 
 	/*
 	 * Since we always access the same relation, we can initialize parts of
@@ -889,7 +943,8 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		Assert(stream->ios_in_progress == 0);
 		Assert(stream->forwarded_buffers == 0);
 		Assert(stream->pinned_buffers == 1);
-		Assert(stream->distance == 1);
+		Assert(stream->readahead_distance == 1);
+		Assert(stream->combine_distance == 1);
 		Assert(stream->pending_read_nblocks == 0);
 		Assert(stream->per_buffer_data_size == 0);
 		Assert(stream->initialized_buffers > stream->oldest_buffer_index);
@@ -963,7 +1018,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		else
 		{
 			/* No more blocks, end of stream. */
-			stream->distance = 0;
+			stream->readahead_distance = 0;
 			stream->oldest_buffer_index = stream->next_buffer_index;
 			stream->pinned_buffers = 0;
 			stream->buffers[oldest_buffer_index] = InvalidBuffer;
@@ -979,7 +1034,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		Assert(stream->oldest_buffer_index == stream->next_buffer_index);
 
 		/* End of stream reached?  */
-		if (stream->distance == 0)
+		if (stream->readahead_distance == 0)
 			return InvalidBuffer;
 
 		/*
@@ -993,7 +1048,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		/* End of stream reached? */
 		if (stream->pinned_buffers == 0)
 		{
-			Assert(stream->distance == 0);
+			Assert(stream->readahead_distance == 0);
 			return InvalidBuffer;
 		}
 	}
@@ -1014,7 +1069,10 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
 	{
 		int16		io_index = stream->oldest_io_index;
-		int32		distance;	/* wider temporary value, clamped below */
+
+		/* wider temporary values, clamped below */
+		int32		readahead_distance;
+		int32		combine_distance;
 
 		/* Sanity check that we still agree on the buffers. */
 		Assert(stream->ios[io_index].op.buffers ==
@@ -1027,10 +1085,18 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		if (++stream->oldest_io_index == stream->max_ios)
 			stream->oldest_io_index = 0;
 
-		/* Look-ahead distance ramps up rapidly after we do I/O. */
-		distance = stream->distance * 2;
-		distance = Min(distance, stream->max_pinned_buffers);
-		stream->distance = distance;
+		/*
+		 * Read-ahead and IO combining distances ramp up rapidly after we do
+		 * I/O.
+		 */
+		readahead_distance = stream->readahead_distance * 2;
+		readahead_distance = Min(readahead_distance, stream->max_pinned_buffers);
+		stream->readahead_distance = readahead_distance;
+
+		combine_distance = stream->combine_distance * 2;
+		combine_distance = Min(combine_distance, stream->io_combine_limit);
+		combine_distance = Min(combine_distance, stream->max_pinned_buffers);
+		stream->combine_distance = combine_distance;
 
 		/*
 		 * As we needed IO, prevent distance from being reduced within our
@@ -1111,7 +1177,8 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 	if (stream->ios_in_progress == 0 &&
 		stream->forwarded_buffers == 0 &&
 		stream->pinned_buffers == 1 &&
-		stream->distance == 1 &&
+		stream->readahead_distance == 1 &&
+		stream->combine_distance == 1 &&
 		stream->pending_read_nblocks == 0 &&
 		stream->per_buffer_data_size == 0)
 	{
@@ -1157,8 +1224,10 @@ read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
 BlockNumber
 read_stream_pause(ReadStream *stream)
 {
-	stream->resume_distance = stream->distance;
-	stream->distance = 0;
+	stream->resume_readahead_distance = stream->readahead_distance;
+	stream->resume_combine_distance = stream->combine_distance;
+	stream->readahead_distance = 0;
+	stream->combine_distance = 0;
 	return InvalidBlockNumber;
 }
 
@@ -1170,7 +1239,8 @@ read_stream_pause(ReadStream *stream)
 void
 read_stream_resume(ReadStream *stream)
 {
-	stream->distance = stream->resume_distance;
+	stream->readahead_distance = stream->resume_readahead_distance;
+	stream->combine_distance = stream->resume_combine_distance;
 }
 
 /*
@@ -1186,7 +1256,8 @@ read_stream_reset(ReadStream *stream)
 	Buffer		buffer;
 
 	/* Stop looking ahead. */
-	stream->distance = 0;
+	stream->readahead_distance = 0;
+	stream->combine_distance = 0;
 
 	/* Forget buffered block number and fast path state. */
 	stream->buffered_blocknum = InvalidBlockNumber;
@@ -1218,8 +1289,10 @@ read_stream_reset(ReadStream *stream)
 	Assert(stream->ios_in_progress == 0);
 
 	/* Start off assuming data is cached. */
-	stream->distance = 1;
-	stream->resume_distance = stream->distance;
+	stream->readahead_distance = 1;
+	stream->combine_distance = 1;
+	stream->resume_readahead_distance = stream->readahead_distance;
+	stream->resume_combine_distance = stream->combine_distance;
 	stream->distance_decay_holdoff = 0;
 }
 
-- 
2.53.0.1.gb2826b52eb

>From 4e1df86fbc1c4e3014ef96cc6d1992c3f93e64e0 Mon Sep 17 00:00:00 2001
From: Andres Freund <[email protected]>
Date: Tue, 3 Mar 2026 18:00:53 -0500
Subject: [PATCH v5 4/5] read_stream: Only increase read-ahead distance when
 waiting for IO

This avoids increasing the distance to the maximum in cases where the IO
subsystem is already keeping up. This turns out to be important for
performance for two reasons:

- Pinning a lot of buffers is not cheap. If additional pins allow us to avoid
  IO waits, it's definitely worth it, but if we can already do all the
  necessary readahead at a distance of 16, reading ahead 512 buffers can
  increase the CPU overhead substantially.  This is particularly noticeable
  when the to-be-read blocks are already in the kernel page cache.

- If the read stream is read to completion, reading in data earlier than
  needed is of limited consequences, leaving aside the CPU costs mentioned
  above. But if the read stream will not be fully consumed, e.g. because it is
  on the inner side of a nested loop join, the additional IO can be a serious
  performance issue. This is not that commonly a problem for current read
  stream users, but the upcoming work, to use a read stream to fetch table
  pages as part of an index scan, frequently encounters this.

Note that this commit would have substantial performance downsides without
earlier commits:

- Commit 6e36930f9aa, which avoids decreasing the readahead distance when
  there was recent IO, is crucial, as otherwise we very often would end up not
  reading ahead aggressively enough anymore with this commit, due to
  increasing the distance less often.

- "read stream: Split decision about look ahead for AIO and combining" is
  important as we would otherwise not perform IO combining when the IO
  subsystem can keep up.

- "aio: io_uring: Trigger async processing for large IOs" is important to
  continue to benefit from memory copy parallelism when using fewer IOs.

Reviewed-by: Melanie Plageman <[email protected]>
Reviewed-by: Nazir Bilal Yavuz <[email protected]>
Tested-by: Tomas Vondra <[email protected]>
Discussion: https://postgr.es/m/f3xxfrkafjxpyqxywcxricxgyizjirfceychyxsgn7bwjp5eda@kwbduhy7tfmu
Discussion: https://postgr.es/m/ca+hukgl2phfydoqrhefqasonaxhsg48t1phs3vm8badrzqk...@mail.gmail.com
---
 src/backend/storage/aio/read_stream.c | 89 ++++++++++++++++++++-------
 1 file changed, 68 insertions(+), 21 deletions(-)

diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index a63e66988e1..2adc04601b6 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -18,11 +18,13 @@
  * to StartReadBuffers() so that a new one can begin to form.
  *
  * The algorithm for controlling the look-ahead distance is based on recent
- * cache hit and miss history.  When no I/O is necessary, there is no benefit
- * in looking ahead more than one block.  This is the default initial
- * assumption, but when blocks needing I/O are streamed, the distance is
- * increased rapidly to try to benefit from I/O combining and concurrency.  It
- * is reduced gradually when cached blocks are streamed.
+ * cache / miss history, as well as whether we need to wait for I/O completion
+ * after a miss.  When no I/O is necessary, there is no benefit in looking
+ * ahead more than one block.  This is the default initial assumption.  When
+ * blocks needing I/O are streamed, the combine distance is increased to
+ * benefit from I/O combining and the read-ahead distance is increased
+ * whenever we need to wait for IO to try to benefit from increased I/O
+ * concurrency. Both are reduced gradually when cached blocks are streamed.
  *
  * The main data structure is a circular queue of buffers of size
  * max_pinned_buffers plus some extra space for technical reasons, ready to be
@@ -1069,16 +1071,13 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
 	{
 		int16		io_index = stream->oldest_io_index;
-
-		/* wider temporary values, clamped below */
-		int32		readahead_distance;
-		int32		combine_distance;
+		bool		needed_wait;
 
 		/* Sanity check that we still agree on the buffers. */
 		Assert(stream->ios[io_index].op.buffers ==
 			   &stream->buffers[oldest_buffer_index]);
 
-		WaitReadBuffers(&stream->ios[io_index].op);
+		needed_wait = WaitReadBuffers(&stream->ios[io_index].op);
 
 		Assert(stream->ios_in_progress > 0);
 		stream->ios_in_progress--;
@@ -1086,21 +1085,45 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 			stream->oldest_io_index = 0;
 
 		/*
-		 * Read-ahead and IO combining distances ramp up rapidly after we do
-		 * I/O.
+		 * If the IO was executed synchronously, we will never see
+		 * WaitReadBuffers() block. Treat it as if it did block. This is
+		 * particularly crucial when effective_io_concurrency=0 is used, as
+		 * all IO will be synchronous.  Without treating synchronous IO as
+		 * having waited, we'd never allow the distance to get large enough to
+		 * allow for IO combining, resulting in bad performance.
 		 */
-		readahead_distance = stream->readahead_distance * 2;
-		readahead_distance = Min(readahead_distance, stream->max_pinned_buffers);
-		stream->readahead_distance = readahead_distance;
+		if (stream->ios[io_index].op.flags & READ_BUFFERS_SYNCHRONOUSLY)
+			needed_wait = true;
 
-		combine_distance = stream->combine_distance * 2;
-		combine_distance = Min(combine_distance, stream->io_combine_limit);
-		combine_distance = Min(combine_distance, stream->max_pinned_buffers);
-		stream->combine_distance = combine_distance;
+		/*
+		 * Have the read-ahead distance ramp up rapidly after we needed to
+		 * wait for IO. We only increase the read-ahead-distance when we
+		 * needed to wait, to avoid increasing the distance further than
+		 * necessary, as looking ahead too far can be costly, both due to the
+		 * cost of unnecessarily pinning many buffers and due to doing IOs
+		 * that may never be consumed if the stream is ended/reset before
+		 * completion.
+		 *
+		 * If we did not need to wait, the current distance was evidently
+		 * sufficient.
+		 *
+		 * NB: May not increase the distance if we already reached the end of
+		 * the stream, as stream->readahead_distance == 0 is used to keep
+		 * track of having reached the end.
+		 */
+		if (stream->readahead_distance > 0 && needed_wait)
+		{
+			/* wider temporary value, due to overflow risk */
+			int32		readahead_distance;
+
+			readahead_distance = stream->readahead_distance * 2;
+			readahead_distance = Min(readahead_distance, stream->max_pinned_buffers);
+			stream->readahead_distance = readahead_distance;
+		}
 
 		/*
-		 * As we needed IO, prevent distance from being reduced within our
-		 * maximum look-ahead window. This avoids having distance collapse too
+		 * As we needed IO, prevent distances from being reduced within our
+		 * maximum look-ahead window. This avoids collapsing distances too
 		 * quickly in workloads where most of the required blocks are cached,
 		 * but where the remaining IOs are a sufficient enough factor to cause
 		 * a substantial slowdown if executed synchronously.
@@ -1112,6 +1135,30 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		 */
 		stream->distance_decay_holdoff = stream->max_pinned_buffers;
 
+		/*
+		 * Whether we needed to wait or not, allow for more IO combining if we
+		 * needed to do IO. The reason to do so independent of needing to wait
+		 * is that when the data is resident in the kernel page cache, IO
+		 * combining reduces the syscall / dispatch overhead, making it
+		 * worthwhile regardless of needing to wait.
+		 *
+		 * It is also important with io_uring as it will never signal the need
+		 * to wait for reads if all the data is in the page cache. There are
+		 * heuristics to deal with that in method_io_uring.c, but they only
+		 * work when the IO gets large enough.
+		 */
+		if (stream->combine_distance > 0 &&
+			stream->combine_distance < stream->io_combine_limit)
+		{
+			/* wider temporary value, due to overflow risk */
+			int32		combine_distance;
+
+			combine_distance = stream->combine_distance * 2;
+			combine_distance = Min(combine_distance, stream->io_combine_limit);
+			combine_distance = Min(combine_distance, stream->max_pinned_buffers);
+			stream->combine_distance = combine_distance;
+		}
+
 		/*
 		 * If we've reached the first block of a sequential region we're
 		 * issuing advice for, cancel that until the next jump.  The kernel
-- 
2.53.0.1.gb2826b52eb

>From 80e6fb3c4e7deab9ad8dab21adb8475aea6f4177 Mon Sep 17 00:00:00 2001
From: Andres Freund <[email protected]>
Date: Fri, 3 Apr 2026 13:02:52 -0400
Subject: [PATCH v5 5/5] Allow read_stream_reset() to not wait for IO
 completion

Not waiting for IO during read_stream_reset() can be important for performance
in cases where read streams are frequently reset before the end is
reached. Current users do not commonly do that, but the upcoming work to use a
read stream to prefetch table blocks as part of index scans can do so
frequently in some query patterns. E.g. if there is an index scan on the inner
side of a nested loop antijoin.

This takes a bit of care to do right. Just introducing support for abandoning
a AIO handle could lead to the IO's completion not being processed until the
backend exits. That's bad because it would lead to resources held onto for the
IO (e.g. buffer pins) not being released and the handle showing up in pg_aios.

To avoid that, the existing resowner cleanup is changed to wait for the IO's
completion, which guarantees that by the end of the statement the IO has
completed. We might eventually want to relax that for some operations (e.g.,
for background WAL writes or opportunistic prefetching).

Discussion: https://postgr.es/m/f3xxfrkafjxpyqxywcxricxgyizjirfceychyxsgn7bwjp5eda@kwbduhy7tfmu
---
 src/include/storage/aio.h                   |   2 +
 src/include/storage/bufmgr.h                |   1 +
 src/backend/storage/aio/aio.c               |  99 +++++++++++++++---
 src/backend/storage/aio/read_stream.c       |  31 ++++--
 src/backend/storage/buffer/bufmgr.c         |  34 +++++++
 src/test/modules/test_aio/t/001_aio.pl      | 107 ++++++++++++++++++++
 src/test/modules/test_aio/test_aio--1.0.sql |   2 +-
 src/test/modules/test_aio/test_aio.c        |  38 +++++--
 8 files changed, 282 insertions(+), 32 deletions(-)

diff --git a/src/include/storage/aio.h b/src/include/storage/aio.h
index ec543b78409..ab7fad130bd 100644
--- a/src/include/storage/aio.h
+++ b/src/include/storage/aio.h
@@ -328,6 +328,8 @@ extern int	pgaio_wref_get_id(PgAioWaitRef *iow);
 extern void pgaio_wref_wait(PgAioWaitRef *iow);
 extern bool pgaio_wref_check_done(PgAioWaitRef *iow);
 
+extern void pgaio_wref_abandon(PgAioWaitRef *iow);
+
 
 
 /* --------------------------------------------------------------------------------
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index aa61a39d9e6..005d09905e3 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -252,6 +252,7 @@ extern bool StartReadBuffers(ReadBuffersOperation *operation,
 							 int *nblocks,
 							 int flags);
 extern bool WaitReadBuffers(ReadBuffersOperation *operation);
+extern void AbandonReadBuffers(ReadBuffersOperation *operation);
 
 extern void ReleaseBuffer(Buffer buffer);
 extern void UnlockReleaseBuffer(Buffer buffer);
diff --git a/src/backend/storage/aio/aio.c b/src/backend/storage/aio/aio.c
index 8f7e26607b9..11ab9b9acff 100644
--- a/src/backend/storage/aio/aio.c
+++ b/src/backend/storage/aio/aio.c
@@ -152,11 +152,15 @@ const IoMethodOps *pgaio_method_ops;
  * operation succeeded and details about the first failure, if any. The error
  * can be raised / logged with pgaio_result_report().
  *
- * The lifetime of the memory pointed to be *ret needs to be at least as long
- * as the passed in resowner. If the resowner releases resources before the IO
- * completes (typically due to an error), the reference to *ret will be
- * cleared. In case of resowner cleanup *ret will not be updated with the
- * results of the IO operation.
+ * The lifetime of the memory pointed to by *ret needs to be at least as long
+ * as the passed in resowner.
+ *
+ * If the resowner releases resources before the IO completes (typically due
+ * to an error), the reference to *ret will be cleared. In case of resowner
+ * cleanup *ret will not be updated with the results of the IO operation.
+ *
+ * If the caller loses interest in the IO before completion
+ * pgaio_wref_abandon() can be used.
  */
 PgAioHandle *
 pgaio_io_acquire(struct ResourceOwnerData *resowner, PgAioReturn *ret)
@@ -278,6 +282,14 @@ pgaio_io_release_resowner(dlist_node *ioh_node, bool on_error)
 	ResourceOwnerForgetAioHandle(ioh->resowner, &ioh->resowner_node);
 	ioh->resowner = NULL;
 
+	/*
+	 * Need to unregister the reporting of the IO's result, the memory it's
+	 * referencing likely has gone away. Do so before potentially waiting
+	 * below, as that could cause the undesired writes.
+	 */
+	if (ioh->report_return)
+		ioh->report_return = NULL;
+
 	switch ((PgAioHandleState) ioh->state)
 	{
 		case PGAIO_HS_IDLE:
@@ -300,22 +312,32 @@ pgaio_io_release_resowner(dlist_node *ioh_node, bool on_error)
 			if (!on_error)
 				elog(WARNING, "AIO handle was not submitted");
 			pgaio_submit_staged();
-			break;
+
+			/* now that the IO is submitted, need to wait */
+			pg_fallthrough;
 		case PGAIO_HS_SUBMITTED:
 		case PGAIO_HS_COMPLETED_IO:
 		case PGAIO_HS_COMPLETED_SHARED:
 		case PGAIO_HS_COMPLETED_LOCAL:
-			/* this is expected to happen */
+
+			/*
+			 * This is expected to happen, e.g. after an error or after
+			 * pgaio_wref_abandon() was called.
+			 *
+			 * For now always wait for the IO's completion during resowner
+			 * cleanup. This provides a bound on how long after an error or
+			 * pgaio_wref_abandon() an IO handle will show up as used and how
+			 * long an uncompleted IO can cause resources to be retained.
+			 *
+			 * It is quite possible that we eventually want to support IO
+			 * operations that last longer, e.g. for WAL writes in the
+			 * background. If so we will either need to use a longer lived
+			 * resowner or add a flag controlling when this cleanup happens.
+			 */
+			pgaio_io_wait(ioh, ioh->generation);
 			break;
 	}
 
-	/*
-	 * Need to unregister the reporting of the IO's result, the memory it's
-	 * referencing likely has gone away.
-	 */
-	if (ioh->report_return)
-		ioh->report_return = NULL;
-
 	RESUME_INTERRUPTS();
 }
 
@@ -1050,6 +1072,55 @@ pgaio_wref_check_done(PgAioWaitRef *iow)
 	return false;
 }
 
+/*
+ * Declare that a wait reference to an IO, started by this backend, is not of
+ * interest to this backend anymore. Once called, the PgAioReturn *ret passed
+ * to pgaio_io_acquire[_nb]() will not be updated anymore and thus can be
+ * freed.
+ */
+void
+pgaio_wref_abandon(PgAioWaitRef *iow)
+{
+	uint64		ref_generation;
+	bool		am_owner;
+	PgAioHandle *ioh;
+	PgAioHandleState state;
+
+	ioh = pgaio_io_from_wref(iow, &ref_generation);
+
+	am_owner = ioh->owner_procno == MyProcNumber;
+
+	/*
+	 * To ensure that the IO won't be recycled while we check (e.g. during the
+	 * emission of a debug message).
+	 */
+	HOLD_INTERRUPTS();
+
+	/*
+	 * It is safe to perform this check before checking if the IO was recycled
+	 * as the owner of an IO cannot change.
+	 */
+	if (!am_owner)
+		elog(ERROR, "only IOs owned by current backend can be abandoned");
+
+	if (!pgaio_io_was_recycled(ioh, ref_generation, &state))
+	{
+		pgaio_debug_io(DEBUG3, ioh,
+					   "discarding result %p, resowner: %p",
+					   ioh->report_return, ioh->resowner);
+
+		/*
+		 * All we need to do to abandon the IO is to clear its report_return
+		 * field. Without that we could end up writing to freed/reused memory
+		 * when the IO completes.
+		 */
+		if (ioh->report_return)
+			ioh->report_return = NULL;
+	}
+
+	RESUME_INTERRUPTS();
+}
+
 
 
 /* --------------------------------------------------------------------------------
diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index 2adc04601b6..5e70a4f0ced 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -109,7 +109,8 @@ struct ReadStream
 	 * for IO combining even in cases where the IO subsystem can keep up at a
 	 * low read-ahead distance, as doing larger IOs is more efficient.
 	 *
-	 * Set to 0 when the end of the stream is reached.
+	 * Set to 0 when the end of the stream is reached and to -1 when the
+	 * stream is reset.
 	 */
 	int16		combine_distance;
 	int16		readahead_distance;
@@ -473,8 +474,8 @@ read_stream_start_pending_read(ReadStream *stream)
 static inline bool
 read_stream_should_look_ahead(ReadStream *stream)
 {
-	/* If the callback has signaled end-of-stream, we're done */
-	if (stream->readahead_distance == 0)
+	/* If we reached end-of-stream or a reset, we're done */
+	if (stream->readahead_distance <= 0)
 		return false;
 
 	/* never start more IOs than our cap */
@@ -538,7 +539,7 @@ read_stream_should_issue_now(ReadStream *stream)
 	 * If the callback has signaled end-of-stream, start the pending read
 	 * immediately. There is no further potential for IO combining.
 	 */
-	if (stream->readahead_distance == 0)
+	if (stream->readahead_distance <= 0)
 		return true;
 
 	/*
@@ -646,7 +647,7 @@ read_stream_look_ahead(ReadStream *stream)
 	 * stream.  In the worst case we can always make progress one buffer at a
 	 * time.
 	 */
-	Assert(stream->pinned_buffers > 0 || stream->readahead_distance == 0);
+	Assert(stream->pinned_buffers > 0 || stream->readahead_distance <= 0);
 
 	if (stream->batch_mode)
 		pgaio_exit_batchmode();
@@ -1035,8 +1036,8 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 	{
 		Assert(stream->oldest_buffer_index == stream->next_buffer_index);
 
-		/* End of stream reached?  */
-		if (stream->readahead_distance == 0)
+		/* End of stream / reset reached?  */
+		if (stream->readahead_distance <= 0)
 			return InvalidBuffer;
 
 		/*
@@ -1077,7 +1078,17 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		Assert(stream->ios[io_index].op.buffers ==
 			   &stream->buffers[oldest_buffer_index]);
 
-		needed_wait = WaitReadBuffers(&stream->ios[io_index].op);
+		/*
+		 * If the stream has been reset, don't even wait for the IO, just
+		 * abandon it.
+		 */
+		if (stream->readahead_distance < 0)
+		{
+			AbandonReadBuffers(&stream->ios[io_index].op);
+			needed_wait = false;
+		}
+		else
+			needed_wait = WaitReadBuffers(&stream->ios[io_index].op);
 
 		Assert(stream->ios_in_progress > 0);
 		stream->ios_in_progress--;
@@ -1303,8 +1314,8 @@ read_stream_reset(ReadStream *stream)
 	Buffer		buffer;
 
 	/* Stop looking ahead. */
-	stream->readahead_distance = 0;
-	stream->combine_distance = 0;
+	stream->readahead_distance = -1;
+	stream->combine_distance = -1;
 
 	/* Forget buffered block number and fast path state. */
 	stream->buffered_blocknum = InvalidBlockNumber;
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 5c64570020d..3167df4b06e 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -1604,6 +1604,10 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
  * buffers must remain valid until WaitReadBuffers() is called, and any
  * forwarded buffers must also be preserved for a continuing call unless
  * they are explicitly released.
+ *
+ * If true was returned, the memory underlying the ReadBuffersOperation needs
+ * to stay around until either WaitReadBuffers() or AbandonReadBuffers() is
+ * called (or an error is thrown).
  */
 bool
 StartReadBuffers(ReadBuffersOperation *operation,
@@ -2165,6 +2169,36 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
 	return true;
 }
 
+/*
+ * Declare that this backend is not interested in the operation anymore. This
+ * needs to be called if StartReadBuffers() returned true and the
+ * ReadBuffersOperation is to be freed without calling WaitReadBuffers()
+ * (leaving errors aside).
+ *
+ * It is the caller's responsibility to release buffer pins (seems simpler
+ * that way, as that already is required if no IO had been necessary).
+ */
+void
+AbandonReadBuffers(ReadBuffersOperation *operation)
+{
+	PgAioWaitRef io_wref = operation->io_wref;
+
+	/* see equivalent WaitReadBuffers() check */
+	if (!pgaio_wref_valid(&io_wref) && io_method != IOMETHOD_SYNC)
+		elog(ERROR, "abandoning read operation that didn't read");
+
+	if (!pgaio_wref_valid(&io_wref))
+		return;
+
+	pgaio_wref_clear(&operation->io_wref);
+
+	/* can't abandon foreign IOs (nor do we need to) */
+	if (operation->foreign_io)
+		operation->foreign_io = false;
+	else
+		pgaio_wref_abandon(&io_wref);
+}
+
 /*
  * BufferAlloc -- subroutine for PinBufferForBlock.  Handles lookup of a shared
  *		buffer.  If no buffer exists already, selects a replacement victim and
diff --git a/src/test/modules/test_aio/t/001_aio.pl b/src/test/modules/test_aio/t/001_aio.pl
index 63cadd64c15..1fbca2b5955 100644
--- a/src/test/modules/test_aio/t/001_aio.pl
+++ b/src/test/modules/test_aio/t/001_aio.pl
@@ -15,6 +15,10 @@ use TestAio;
 my @methods = TestAio::supported_io_methods();
 my %nodes;
 
+# Putting it inline makes perltidy do ugly things
+my $count_my_aios_query =
+  'SELECT count(*) FROM pg_aios WHERE pid = pg_backend_pid()';
+
 
 ###
 # Create and configure one instance for each io_method
@@ -1616,6 +1620,61 @@ INSERT INTO tmp_ok SELECT generate_series(1, 5000);
 			qq|SELECT blockoff, blocknum, io_reqd and not foreign_io, nblocks FROM read_buffers('$table', 1, 3)|,
 			qr/^0\|1\|t\|2\n2\|3\|f\|1$/,
 			qr/^$/);
+
+
+		###
+		# Test that abandoning IO works and that it does not cause issues.
+		###
+
+		# Test that even after abandoning IO we do wait for the IOs at the end
+		# of the statement.
+		$psql_a->query_safe(qq|SET io_combine_limit=2|);
+		$psql_a->query_safe(qq|SELECT evict_rel('$table')|);
+		psql_like(
+			$io_method,
+			$psql_a,
+			"$persistency: read buffers & abandon it",
+			qq|SELECT abandoned, count(*) FROM read_buffers('$table', 0, 6, abandon_after => 2) GROUP BY 1 ORDER BY 1|,
+			qr/^f\|1\nt\|2$/,
+			qr/^$/);
+		# Due to the end-of-statement wait there should be no IOs anymore
+		is($psql_a->query_safe($count_my_aios_query), 0,
+			"$io_method: $persistency: abandoned IO completed by end of statement"
+		);
+
+		# Test that after abandoning IO buffer access still works.
+		#
+		# First test that by just issuing another read_buffers() in the same
+		# statement (so that the abandoned IOs aren't waited-for during
+		# end-of-statement resowner handling).
+		$psql_a->query_safe(qq|SELECT evict_rel('$table')|);
+		psql_like(
+			$io_method,
+			$psql_a,
+			"$persistency: read buffers & abandon it & read again",
+			qq|
+			   SELECT SUM(nblocks) FROM read_buffers('$table', 0, 4, abandon_after => 2)
+			   UNION ALL
+			   SELECT SUM(nblocks) FROM read_buffers('$table', 0, 4)
+			   |,
+			qr/^4\n4$/,
+			qr/^$/);
+
+		# Now test that a plain SELECT needing buffers affected by abandoned
+		# IO work
+		$psql_a->query_safe(qq|SELECT evict_rel('$table')|);
+		psql_like(
+			$io_method,
+			$psql_a,
+			"$persistency: read buffers & abandon it & read again via SELECT",
+			qq|
+			   SELECT SUM(nblocks) FROM read_buffers('$table', 0, 4, abandon_after => 2)
+			   UNION ALL
+			   SELECT SUM(data) FROM $table WHERE data < 1000
+			   |,
+			qr/^4\n499500$/,
+			qr/^$/);
+		$psql_a->query_safe(qq|RESET io_combine_limit|);
 	}
 
 	# The remaining tests don't make sense for temp tables, as they are
@@ -1837,6 +1896,54 @@ read_buffers('$table', 0, 4)|,
 	$psql_a->{stdout} = '';
 
 
+	###
+	# Test that abandoning IO actually avoids waiting for IO.
+	###
+
+	# Testing not waiting only works if the IO method doesn't execute IO
+	# synchronously, which is why we fundamentally can't test with
+	# io_method=sync. Furthermore we need to work around io_method=io_uring
+	# potentially executing the IO synchronously - we can do so by making the
+	# IOs big enough (c.f. pgaio_uring_should_use_async()).
+	#
+	# To make the test reliable we have to abandon all IOs, as waiting for
+	# some IOs could lead to also consuming the completion of the IO that will
+	# trigger a wait in the completion.
+	if ($io_method ne 'sync')
+	{
+		$psql_a->query_safe(qq|SELECT evict_rel('$table')|);
+		$psql_a->query_safe(qq|SET io_combine_limit=5|);
+		$psql_b->query_safe(
+			qq/SELECT inj_io_completion_wait(
+		   relfilenode=>pg_relation_filenode('$table'),
+		   blockno=>4);/);
+		ok(1,
+			"$io_method: $persistency: configure wait in completion of block 4"
+		);
+
+		# Need to end the wait in the completion before the statement is over,
+		# otherwise we'll wait during resowner cleanup at the end of the
+		# statement.
+		psql_like(
+			$io_method,
+			$psql_a,
+			"$persistency: read buffers abandoning blocked IO avoids wait",
+			qq|
+			   SELECT count(*) > 0 FROM read_buffers('$table', 0, 10, abandon_after => 0) WHERE abandoned
+			   UNION ALL
+			   SELECT count(*) > 0 FROM pg_aios WHERE pid = pg_backend_pid()
+			   UNION ALL
+			   SELECT inj_io_completion_continue() IS NOT NULL
+			|,
+			qr/^t\nt\nt$/,
+			qr/^$/);
+		is($psql_a->query_safe($count_my_aios_query), 0,
+			"$io_method: $persistency: abandoned IO is completed at end of statement"
+		);
+
+		$psql_a->query_safe(qq|RESET io_combine_limit|);
+	}
+
 	$psql_a->quit();
 	$psql_b->quit();
 	$psql_c->quit();
diff --git a/src/test/modules/test_aio/test_aio--1.0.sql b/src/test/modules/test_aio/test_aio--1.0.sql
index 762ac29512f..ff3b200cff3 100644
--- a/src/test/modules/test_aio/test_aio--1.0.sql
+++ b/src/test/modules/test_aio/test_aio--1.0.sql
@@ -53,7 +53,7 @@ CREATE FUNCTION buffer_call_terminate_io(buffer int, for_input bool, succeed boo
 RETURNS pg_catalog.void STRICT
 AS 'MODULE_PATHNAME' LANGUAGE C;
 
-CREATE FUNCTION read_buffers(rel regclass, startblock int4, nblocks int4, OUT blockoff int4, OUT blocknum int4, OUT io_reqd bool, OUT foreign_io bool, OUT nblocks int4, OUT buf int4[])
+CREATE FUNCTION read_buffers(rel regclass, startblock int4, nblocks int4, abandon_after int4 DEFAULT -1, OUT blockoff int4, OUT blocknum int4, OUT io_reqd bool, OUT foreign_io bool, OUT abandoned bool, OUT nblocks int4, OUT buf int4[])
 RETURNS SETOF record STRICT
 AS 'MODULE_PATHNAME' LANGUAGE C;
 
diff --git a/src/test/modules/test_aio/test_aio.c b/src/test/modules/test_aio/test_aio.c
index d7530681192..c9a8b21b972 100644
--- a/src/test/modules/test_aio/test_aio.c
+++ b/src/test/modules/test_aio/test_aio.c
@@ -709,11 +709,13 @@ read_buffers(PG_FUNCTION_ARGS)
 	Oid			relid = PG_GETARG_OID(0);
 	BlockNumber startblock = PG_GETARG_UINT32(1);
 	int32		nblocks = PG_GETARG_INT32(2);
+	int32		abandon_after = PG_GETARG_INT32(3);
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	Relation	rel;
 	SMgrRelation smgr;
 	int			nblocks_done = 0;
 	int			nblocks_disp = 0;
+	int			nblocks_wait = 0;
 	int			nios = 0;
 	ReadBuffersOperation *operations;
 	Buffer	   *buffers;
@@ -735,6 +737,9 @@ read_buffers(PG_FUNCTION_ARGS)
 	rel = relation_open(relid, AccessShareLock);
 	smgr = RelationGetSmgr(rel);
 
+	if (abandon_after < 0)
+		abandon_after = nblocks;
+
 	/*
 	 * Do StartReadBuffers() until IO for all the required blocks has been
 	 * started (if required).
@@ -769,9 +774,17 @@ read_buffers(PG_FUNCTION_ARGS)
 	for (int nio = 0; nio < nios; nio++)
 	{
 		ReadBuffersOperation *operation = &operations[nio];
+		int			nblocks_this_io = nblocks_per_io[nio];
 
 		if (io_reqds[nio])
-			WaitReadBuffers(operation);
+		{
+			if (nblocks_wait < abandon_after)
+				WaitReadBuffers(operation);
+			else
+				AbandonReadBuffers(operation);
+		}
+
+		nblocks_wait += nblocks_this_io;
 	}
 
 	/*
@@ -781,8 +794,8 @@ read_buffers(PG_FUNCTION_ARGS)
 	{
 		ReadBuffersOperation *operation = &operations[nio];
 		int			nblocks_this_io = nblocks_per_io[nio];
-		Datum		values[6] = {0};
-		bool		nulls[6] = {0};
+		Datum		values[7] = {0};
+		bool		nulls[7] = {0};
 		ArrayType  *buffers_arr;
 
 		/* convert buffer array to datum array */
@@ -815,14 +828,18 @@ read_buffers(PG_FUNCTION_ARGS)
 		values[3] = BoolGetDatum(io_reqds[nio] ? operation->foreign_io : false);
 		nulls[3] = false;
 
-		/* nblocks */
-		values[4] = Int32GetDatum(nblocks_this_io);
+		/* abandoned */
+		values[4] = BoolGetDatum(nblocks_disp >= abandon_after);
 		nulls[4] = false;
 
-		/* array of buffers */
-		values[5] = PointerGetDatum(buffers_arr);
+		/* nblocks */
+		values[5] = Int32GetDatum(nblocks_this_io);
 		nulls[5] = false;
 
+		/* array of buffers */
+		values[6] = PointerGetDatum(buffers_arr);
+		nulls[6] = false;
+
 		tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
 
 		nblocks_disp += nblocks_this_io;
@@ -1075,6 +1092,13 @@ inj_io_completion_wait_matches(PgAioHandle *ioh)
 		!(inj_blockno >= io_blockno && inj_blockno < (io_blockno + td->smgr.nblocks)))
 		return false;
 
+	ereport(LOG,
+			errmsg("wait injection point matches for IO %d, inj blockno %d, io blockno %d, io nblocks %d",
+				   pgaio_io_get_id(ioh),
+				   inj_blockno, io_blockno, td->smgr.nblocks
+				   ),
+			errhidestmt(true), errhidecontext(true));
+
 	return true;
 }
 
-- 
2.53.0.1.gb2826b52eb

Reply via email to