Hi,

I think I forgot to update the thread in my last message to note that I
had committed some of the preliminary changes.


On 2026-03-26 20:12:30 -0400, Andres Freund wrote:
> One test used did_io=(t|f). That was actually only needed once "aio: Don't
> wait for already in-progress IO" is in, as we might join the foreign IO. I
> chose to hide that by making that part of the query "did_io and not
> foreign_io", so we would detect if we were to falsely start IO ourselves.

I ended up not liking did_io, as that seems misleading when we just needed to
wait for a foreign IO.  I instead named it io_reqd.


> Still need to extend the test as part of the "don't wait" commit, to actually
> ensure that we reach the path for joining foreign IO.

That's done now.  I've added verification that we don't wrongly recognize
in-progress-ios without a wref as a foreign IO and an injection point based
test that verifies that we do see the foreign IO.

I've also done a bunch of cleanup in the commits. A few typos in commit
messages and the actual code changes and a few larger changes in the test code
& infrastructure. Mostly as part of allowing the aforementioned testing
(read_buffers() now only waits at the end, to make some of the tests
possible), but also just making the modified code a bit cleaner.

Greetings,

Andres Freund
>From f082db540be3217600e717d0430ae7d527623d8e Mon Sep 17 00:00:00 2001
From: Andres Freund <[email protected]>
Date: Wed, 25 Mar 2026 14:42:39 -0400
Subject: [PATCH v9 1/4] test_aio: Add basic tests for StartReadBuffers()

Upcoming commits will change StartReadBuffers() and its building blocks,
making it worthwhile to directly test StartReadBuffers().

Reviewed-by: Melanie Plageman <[email protected]>
Discussion: https://postgr.es/m/zljergweqti7x67lg5ije2rzjusie37nslsnkjkkby4laqqbfw@3p3zu522yykv
---
 src/test/modules/test_aio/t/001_aio.pl      | 243 ++++++++++++++++++++
 src/test/modules/test_aio/test_aio--1.0.sql |   7 +
 src/test/modules/test_aio/test_aio.c        | 227 +++++++++++++++++-
 3 files changed, 468 insertions(+), 9 deletions(-)

diff --git a/src/test/modules/test_aio/t/001_aio.pl b/src/test/modules/test_aio/t/001_aio.pl
index e18b2a2b8ae..0317c991f9f 100644
--- a/src/test/modules/test_aio/t/001_aio.pl
+++ b/src/test/modules/test_aio/t/001_aio.pl
@@ -1420,6 +1420,248 @@ SELECT read_rel_block_ll('tbl_cs_fail', 3, nblocks=>1, zero_on_error=>true);),
 }
 
 
+# Tests for StartReadBuffers()
+sub test_read_buffers
+{
+	my $io_method = shift;
+	my $node = shift;
+	my ($ret, $output);
+	my $table;
+
+	my $psql_a = $node->background_psql('postgres', on_error_stop => 0);
+	my $psql_b = $node->background_psql('postgres', on_error_stop => 0);
+
+	$psql_a->query_safe(
+		qq(
+CREATE TEMPORARY TABLE tmp_ok(data int not null);
+INSERT INTO tmp_ok SELECT generate_series(1, 5000);
+));
+
+	foreach my $persistency (qw(normal temporary))
+	{
+		$table = $persistency eq 'normal' ? 'tbl_ok' : 'tmp_ok';
+
+		# check that consecutive misses are combined into one read
+		$psql_a->query_safe(qq|SELECT evict_rel('$table')|);
+		psql_like(
+			$io_method,
+			$psql_a,
+			"$persistency: read buffers, combine, block 0-1",
+			qq|SELECT blockoff, blocknum, io_reqd, nblocks FROM read_buffers('$table', 0, 2)|,
+			qr/^0\|0\|t\|2$/,
+			qr/^$/);
+
+		# but if we do it again, i.e. it's in the buffer pool, there will be
+		# two operations
+		psql_like(
+			$io_method,
+			$psql_a,
+			"$persistency: read buffers, doesn't combine hits, block 0-1",
+			qq|SELECT blockoff, blocknum, io_reqd, nblocks FROM read_buffers('$table', 0, 2)|,
+			qr/^0\|0\|f\|1\n1\|1\|f\|1$/,
+			qr/^$/);
+
+		# Check that a larger read interrupted by a hit works
+		psql_like(
+			$io_method,
+			$psql_a,
+			"$persistency: read buffers, prep, block 3",
+			qq|SELECT blockoff, blocknum, io_reqd, nblocks FROM read_buffers('$table', 3, 1)|,
+			qr/^0\|3\|t\|1$/,
+			qr/^$/);
+		psql_like(
+			$io_method,
+			$psql_a,
+			"$persistency: read buffers, interrupted by hit on 3, block 2-5",
+			qq|SELECT blockoff, blocknum, io_reqd, nblocks FROM read_buffers('$table', 2, 4)|,
+			qr/^0\|2\|t\|1\n1\|3\|f\|1\n2\|4\|t\|2$/,
+			qr/^$/);
+
+
+		# Verify that a read with an initial buffer hit works
+		$psql_a->query_safe(qq|SELECT evict_rel('$table')|);
+		psql_like(
+			$io_method,
+			$psql_a,
+			"$persistency: read buffers, miss, block 0",
+			qq|SELECT blockoff, blocknum, io_reqd, nblocks FROM read_buffers('$table', 0, 1)|,
+			qr/^0\|0\|t\|1$/,
+			qr/^$/);
+		psql_like(
+			$io_method,
+			$psql_a,
+			"$persistency: read buffers, hit, block 0",
+			qq|SELECT blockoff, blocknum, io_reqd, nblocks FROM read_buffers('$table', 0, 1)|,
+			qr/^0\|0\|f\|1$/,
+			qr/^$/);
+		psql_like(
+			$io_method,
+			$psql_a,
+			"$persistency: read buffers, miss, block 1",
+			qq|SELECT blockoff, blocknum, io_reqd, nblocks FROM read_buffers('$table', 1, 1)|,
+			qr/^0\|1\|t\|1$/,
+			qr/^$/);
+		psql_like(
+			$io_method,
+			$psql_a,
+			"$persistency: read buffers, hit, block 1",
+			qq|SELECT blockoff, blocknum, io_reqd, nblocks FROM read_buffers('$table', 1, 1)|,
+			qr/^0\|1\|f\|1$/,
+			qr/^$/);
+		psql_like(
+			$io_method,
+			$psql_a,
+			"$persistency: read buffers, hit, block 0-1",
+			qq|SELECT blockoff, blocknum, io_reqd, nblocks FROM read_buffers('$table', 0, 2)|,
+			qr/^0\|0\|f\|1\n1\|1\|f\|1$/,
+			qr/^$/);
+		psql_like(
+			$io_method,
+			$psql_a,
+			"$persistency: read buffers, hit 0-1, miss 2",
+			qq|SELECT blockoff, blocknum, io_reqd, nblocks FROM read_buffers('$table', 0, 3)|,
+			qr/^0\|0\|f\|1\n1\|1\|f\|1\n2\|2\|t\|1$/,
+			qr/^$/);
+
+		# Verify that a read with an initial miss and trailing buffer hit(s) works
+		$psql_a->query_safe(qq|SELECT invalidate_rel_block('$table', 0)|);
+		psql_like(
+			$io_method,
+			$psql_a,
+			"$persistency: read buffers, miss 0, hit 1-2",
+			qq|SELECT blockoff, blocknum, io_reqd, nblocks FROM read_buffers('$table', 0, 3)|,
+			qr/^0\|0\|t\|1\n1\|1\|f\|1\n2\|2\|f\|1$/,
+			qr/^$/);
+		$psql_a->query_safe(qq|SELECT invalidate_rel_block('$table', 1)|);
+		$psql_a->query_safe(qq|SELECT invalidate_rel_block('$table', 2)|);
+		$psql_a->query_safe(qq|SELECT * FROM read_buffers('$table', 3, 2)|);
+		psql_like(
+			$io_method,
+			$psql_a,
+			"$persistency: read buffers, miss 1-2, hit 3-4",
+			qq|SELECT blockoff, blocknum, io_reqd, nblocks FROM read_buffers('$table', 1, 4)|,
+			qr/^0\|1\|t\|2\n2\|3\|f\|1\n3\|4\|f\|1$/,
+			qr/^$/);
+
+		# Verify that we aren't doing reads larger than
+		# io_combine_limit. That's just enforced in read_buffers() function,
+		# but kinda still worth testing.
+		$psql_a->query_safe(qq|SELECT evict_rel('$table')|);
+		$psql_a->query_safe(qq|SET io_combine_limit=3|);
+		psql_like(
+			$io_method,
+			$psql_a,
+			"$persistency: read buffers, io_combine_limit has effect",
+			qq|SELECT blockoff, blocknum, io_reqd, nblocks FROM read_buffers('$table', 1, 5)|,
+			qr/^0\|1\|t\|3\n3\|4\|t\|2$/,
+			qr/^$/);
+		$psql_a->query_safe(qq|RESET io_combine_limit|);
+
+
+		# Test encountering buffer IO we started in the first block of the
+		# range.
+		$psql_a->query_safe(qq|SELECT evict_rel('$table')|);
+		$psql_a->query_safe(
+			qq|SELECT read_rel_block_ll('$table', 1, wait_complete=>false)|);
+		psql_like(
+			$io_method,
+			$psql_a,
+			"$persistency: read buffers, in-progress 1, read 1-3",
+			qq|SELECT blockoff, blocknum, io_reqd, nblocks FROM read_buffers('$table', 1, 3)|,
+			qr/^0\|1\|f\|1\n1\|2\|t\|2$/,
+			qr/^$/);
+
+		# Test in-progress IO in the middle block of the range
+		$psql_a->query_safe(qq|SELECT evict_rel('$table')|);
+		$psql_a->query_safe(
+			qq|SELECT read_rel_block_ll('$table', 2, wait_complete=>false)|);
+		psql_like(
+			$io_method,
+			$psql_a,
+			"$persistency: read buffers, in-progress 2, read 1-3",
+			qq|SELECT blockoff, blocknum, io_reqd, nblocks FROM read_buffers('$table', 1, 3)|,
+			qr/^0\|1\|t\|1\n1\|2\|f\|1\n2\|3\|t\|1$/,
+			qr/^$/);
+
+		# Test in-progress IO on the last block of the range
+		$psql_a->query_safe(qq|SELECT evict_rel('$table')|);
+		$psql_a->query_safe(
+			qq|SELECT read_rel_block_ll('$table', 3, wait_complete=>false)|);
+		psql_like(
+			$io_method,
+			$psql_a,
+			"$persistency: read buffers, in-progress 3, read 1-3",
+			qq|SELECT blockoff, blocknum, io_reqd, nblocks FROM
+read_buffers('$table', 1, 3)|,
+			qr/^0\|1\|t\|2\n2\|3\|f\|1$/,
+			qr/^$/);
+	}
+
+	# The remaining tests don't make sense for temp tables, as they are
+	# concerned with multiple sessions interacting with each other.
+	$table = 'tbl_ok';
+	my $persistency = 'normal';
+
+	# Test start buffer IO will split IO if there's IO in progress. We can't
+	# observe this with sync, as that does not start the IO operation in
+	# StartReadBuffers().
+	if ($io_method ne 'sync')
+	{
+		$psql_a->query_safe(qq|SELECT evict_rel('$table')|);
+
+		my $buf_id =
+		  $psql_b->query_safe(qq|SELECT buffer_create_toy('$table', 3)|);
+		$psql_b->query_safe(
+			qq|SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false)|
+		);
+
+		query_wait_block(
+			$io_method,
+			$node,
+			$psql_a,
+			"$persistency: read buffers blocks waiting for concurrent IO",
+			qq|SELECT blockoff, blocknum, io_reqd, nblocks FROM read_buffers('$table', 1, 5);\n|,
+			"BufferIo");
+		$psql_b->query_safe(
+			qq|SELECT buffer_call_terminate_io($buf_id, for_input=>true, succeed=>false, io_error=>false, release_aio=>false)|
+		);
+		pump_until(
+			$psql_a->{run}, $psql_a->{timeout},
+			\$psql_a->{stdout}, qr/0\|1\|t\|2\n2\|3\|t\|3/);
+		ok(1,
+			"$io_method: $persistency: IO was split due to concurrent failed IO"
+		);
+
+		# Same as before, except the concurrent IO succeeds this time
+		$psql_a->query_safe(qq|SELECT evict_rel('$table')|);
+		$buf_id =
+		  $psql_b->query_safe(qq|SELECT buffer_create_toy('$table', 3)|);
+		$psql_b->query_safe(
+			qq|SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false)|
+		);
+
+		query_wait_block(
+			$io_method,
+			$node,
+			$psql_a,
+			"$persistency: read buffers blocks waiting for concurrent IO",
+			qq|SELECT blockoff, blocknum, io_reqd, nblocks FROM read_buffers('$table', 1, 5);\n|,
+			"BufferIo");
+		$psql_b->query_safe(
+			qq|SELECT buffer_call_terminate_io($buf_id, for_input=>true, succeed=>true, io_error=>false, release_aio=>false)|
+		);
+		pump_until($psql_a->{run}, $psql_a->{timeout}, \$psql_a->{stdout},
+			qr/0\|1\|t\|2\n2\|3\|f\|1\n3\|4\|t\|2/);
+		ok(1,
+			"$io_method: $persistency: IO was split due to concurrent successful IO"
+		);
+	}
+
+	$psql_a->quit();
+	$psql_b->quit();
+}
+
+
 # Run all tests that for the specified node / io_method
 sub test_io_method
 {
@@ -1455,6 +1697,7 @@ CHECKPOINT;
 	test_checksum($io_method, $node);
 	test_ignore_checksum($io_method, $node);
 	test_checksum_createdb($io_method, $node);
+	test_read_buffers($io_method, $node);
 
 	# generic injection tests
   SKIP:
diff --git a/src/test/modules/test_aio/test_aio--1.0.sql b/src/test/modules/test_aio/test_aio--1.0.sql
index e495481c41e..86beb563b6a 100644
--- a/src/test/modules/test_aio/test_aio--1.0.sql
+++ b/src/test/modules/test_aio/test_aio--1.0.sql
@@ -33,6 +33,10 @@ CREATE FUNCTION read_rel_block_ll(
 RETURNS pg_catalog.void STRICT
 AS 'MODULE_PATHNAME' LANGUAGE C;
 
+CREATE FUNCTION evict_rel(rel regclass)
+RETURNS pg_catalog.void STRICT
+AS 'MODULE_PATHNAME' LANGUAGE C;
+
 CREATE FUNCTION invalidate_rel_block(rel regclass, blockno int)
 RETURNS pg_catalog.void STRICT
 AS 'MODULE_PATHNAME' LANGUAGE C;
@@ -49,6 +53,9 @@ CREATE FUNCTION buffer_call_terminate_io(buffer int, for_input bool, succeed boo
 RETURNS pg_catalog.void STRICT
 AS 'MODULE_PATHNAME' LANGUAGE C;
 
+CREATE FUNCTION read_buffers(rel regclass, startblock int4, nblocks int4, OUT blockoff int4, OUT blocknum int4, OUT io_reqd bool, OUT nblocks int4, OUT buf int4[])
+RETURNS SETOF record STRICT
+AS 'MODULE_PATHNAME' LANGUAGE C;
 
 
 /*
diff --git a/src/test/modules/test_aio/test_aio.c b/src/test/modules/test_aio/test_aio.c
index b1aa8af9ec0..3e486a5806e 100644
--- a/src/test/modules/test_aio/test_aio.c
+++ b/src/test/modules/test_aio/test_aio.c
@@ -19,7 +19,9 @@
 #include "postgres.h"
 
 #include "access/relation.h"
+#include "catalog/pg_type.h"
 #include "fmgr.h"
+#include "funcapi.h"
 #include "storage/aio.h"
 #include "storage/aio_internal.h"
 #include "storage/buf_internals.h"
@@ -27,9 +29,11 @@
 #include "storage/checksum.h"
 #include "storage/ipc.h"
 #include "storage/lwlock.h"
+#include "utils/array.h"
 #include "utils/builtins.h"
 #include "utils/injection_point.h"
 #include "utils/rel.h"
+#include "utils/tuplestore.h"
 
 
 PG_MODULE_MAGIC;
@@ -458,18 +462,13 @@ read_rel_block_ll(PG_FUNCTION_ARGS)
 	PG_RETURN_VOID();
 }
 
-PG_FUNCTION_INFO_V1(invalidate_rel_block);
-Datum
-invalidate_rel_block(PG_FUNCTION_ARGS)
+/* helper for invalidate_rel_block() and evict_rel() */
+static void
+invalidate_one_block(Relation rel, ForkNumber forknum, BlockNumber blkno)
 {
-	Oid			relid = PG_GETARG_OID(0);
-	BlockNumber blkno = PG_GETARG_UINT32(1);
-	Relation	rel;
 	PrefetchBufferResult pr;
 	Buffer		buf;
 
-	rel = relation_open(relid, AccessExclusiveLock);
-
 	/*
 	 * This is a gross hack, but there's no other API exposed that allows to
 	 * get a buffer ID without actually reading the block in.
@@ -480,7 +479,7 @@ invalidate_rel_block(PG_FUNCTION_ARGS)
 	if (BufferIsValid(buf))
 	{
 		/* if the buffer contents aren't valid, this'll return false */
-		if (ReadRecentBuffer(rel->rd_locator, MAIN_FORKNUM, blkno, buf))
+		if (ReadRecentBuffer(rel->rd_locator, forknum, blkno, buf))
 		{
 			BufferDesc *buf_hdr = BufferIsLocal(buf) ?
 				GetLocalBufferDescriptor(-buf - 1)
@@ -506,11 +505,74 @@ invalidate_rel_block(PG_FUNCTION_ARGS)
 		}
 	}
 
+}
+
+PG_FUNCTION_INFO_V1(invalidate_rel_block);
+Datum
+invalidate_rel_block(PG_FUNCTION_ARGS)
+{
+	Oid			relid = PG_GETARG_OID(0);
+	BlockNumber blkno = PG_GETARG_UINT32(1);
+	Relation	rel;
+
+	rel = relation_open(relid, AccessExclusiveLock);
+
+	invalidate_one_block(rel, MAIN_FORKNUM, blkno);
+
 	relation_close(rel, AccessExclusiveLock);
 
 	PG_RETURN_VOID();
 }
 
+PG_FUNCTION_INFO_V1(evict_rel);
+Datum
+evict_rel(PG_FUNCTION_ARGS)
+{
+	Oid			relid = PG_GETARG_OID(0);
+	Relation	rel;
+
+	rel = relation_open(relid, AccessExclusiveLock);
+
+	/*
+	 * EvictRelUnpinnedBuffers() doesn't support temp tables, so for temp
+	 * tables we have to do it the expensive way and evict every possible
+	 * buffer.
+	 */
+	if (RelationUsesLocalBuffers(rel))
+	{
+		SMgrRelation smgr = RelationGetSmgr(rel);
+
+		for (int forknum = MAIN_FORKNUM; forknum <= MAX_FORKNUM; forknum++)
+		{
+			BlockNumber nblocks;
+
+			if (!smgrexists(smgr, forknum))
+				continue;
+
+			nblocks = smgrnblocks(smgr, forknum);
+
+			for (int blkno = 0; blkno < nblocks; blkno++)
+			{
+				invalidate_one_block(rel, forknum, blkno);
+			}
+		}
+	}
+	else
+	{
+		int32		buffers_evicted,
+					buffers_flushed,
+					buffers_skipped;
+
+		EvictRelUnpinnedBuffers(rel, &buffers_evicted, &buffers_flushed,
+								&buffers_skipped);
+	}
+
+	relation_close(rel, AccessExclusiveLock);
+
+
+	PG_RETURN_VOID();
+}
+
 PG_FUNCTION_INFO_V1(buffer_create_toy);
 Datum
 buffer_create_toy(PG_FUNCTION_ARGS)
@@ -610,6 +672,153 @@ buffer_call_terminate_io(PG_FUNCTION_ARGS)
 	PG_RETURN_VOID();
 }
 
+PG_FUNCTION_INFO_V1(read_buffers);
+/*
+ * Infrastructure to test StartReadBuffers()
+ */
+Datum
+read_buffers(PG_FUNCTION_ARGS)
+{
+	Oid			relid = PG_GETARG_OID(0);
+	BlockNumber startblock = PG_GETARG_UINT32(1);
+	int32		nblocks = PG_GETARG_INT32(2);
+	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+	Relation	rel;
+	SMgrRelation smgr;
+	int			nblocks_done = 0;
+	int			nblocks_disp = 0;
+	int			nios = 0;
+	ReadBuffersOperation *operations;
+	Buffer	   *buffers;
+	Datum	   *buffers_datum;
+	bool	   *io_reqds;
+
+	Assert(nblocks > 0);
+
+	InitMaterializedSRF(fcinfo, 0);
+
+	/* at worst each block gets its own IO */
+	operations = palloc0(sizeof(ReadBuffersOperation) * nblocks);
+	buffers = palloc0(sizeof(Buffer) * nblocks);
+	buffers_datum = palloc0(sizeof(Datum) * nblocks);
+	io_reqds = palloc0(sizeof(bool) * nblocks);
+
+	rel = relation_open(relid, AccessShareLock);
+	smgr = RelationGetSmgr(rel);
+
+	/*
+	 * Do StartReadBuffers() until IO for all the required blocks has been
+	 * started (if required).
+	 */
+	while (nblocks_done < nblocks)
+	{
+		ReadBuffersOperation *operation = &operations[nios];
+		int			nblocks_this_io =
+			Min(nblocks - nblocks_done, io_combine_limit);
+
+		operation->rel = rel;
+		operation->smgr = smgr;
+		operation->persistence = rel->rd_rel->relpersistence;
+		operation->strategy = NULL;
+		operation->forknum = MAIN_FORKNUM;
+
+		io_reqds[nios] = StartReadBuffers(operation,
+										  &buffers[nblocks_done],
+										  startblock + nblocks_done,
+										  &nblocks_this_io,
+										  0);
+		nios++;
+		nblocks_done += nblocks_this_io;
+	}
+
+	/*
+	 * Now wait for all operations that required IO. This is done at the end,
+	 * as otherwise waiting for IO in progress in other backends could
+	 * influence the result for subsequent buffers / blocks.
+	 */
+	for (int nio = 0; nio < nios; nio++)
+	{
+		ReadBuffersOperation *operation = &operations[nio];
+
+		if (io_reqds[nio])
+			WaitReadBuffers(operation);
+	}
+
+	/*
+	 * Convert what has been done into SQL SRF return value.
+	 */
+	for (int nio = 0; nio < nios; nio++)
+	{
+		ReadBuffersOperation *operation = &operations[nio];
+		int			nblocks_this_io = operation->nblocks;
+		Datum		values[5] = {0};
+		bool		nulls[5] = {0};
+		ArrayType  *buffers_arr;
+
+		/* convert buffer array to datum array */
+		for (int i = 0; i < nblocks_this_io; i++)
+		{
+			Buffer		buf = operation->buffers[i];
+
+			Assert(buffers[nblocks_disp + i] == buf);
+			Assert(BufferGetBlockNumber(buf) == startblock + nblocks_disp + i);
+
+			buffers_datum[nblocks_disp + i] = Int32GetDatum(buf);
+		}
+
+		buffers_arr = construct_array_builtin(&buffers_datum[nblocks_disp],
+											  nblocks_this_io,
+											  INT4OID);
+
+		/* blockoff */
+		values[0] = Int32GetDatum(nblocks_disp);
+		nulls[0] = false;
+
+		/* blocknum */
+		values[1] = UInt32GetDatum(startblock + nblocks_disp);
+		nulls[1] = false;
+
+		/* io_reqd */
+		values[2] = BoolGetDatum(io_reqds[nio]);
+		nulls[2] = false;
+
+		/* nblocks */
+		values[3] = Int32GetDatum(nblocks_this_io);
+		nulls[3] = false;
+
+		/* array of buffers */
+		values[4] = PointerGetDatum(buffers_arr);
+		nulls[4] = false;
+
+		tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
+
+		nblocks_disp += nblocks_this_io;
+	}
+
+	/* release pins on all the buffers */
+	for (int nio = 0; nio < nios; nio++)
+	{
+		ReadBuffersOperation *operation = &operations[nio];
+
+		for (int i = 0; i < operation->nblocks; i++)
+			ReleaseBuffer(operation->buffers[i]);
+	}
+
+	/*
+	 * Free explicitly, to have a chance to detect potential issues with too
+	 * long lived references to the operation.
+	 */
+	pfree(operations);
+	pfree(buffers);
+	pfree(buffers_datum);
+	pfree(io_reqds);
+
+	relation_close(rel, NoLock);
+
+	return (Datum) 0;
+}
+
+
 PG_FUNCTION_INFO_V1(handle_get);
 Datum
 handle_get(PG_FUNCTION_ARGS)
-- 
2.53.0.1.gb2826b52eb

>From 7031072c40494b151d9849160d403f719c60631c Mon Sep 17 00:00:00 2001
From: Andres Freund <[email protected]>
Date: Fri, 27 Mar 2026 13:08:26 -0400
Subject: [PATCH v9 2/4] test_aio: Add read_stream test infrastructure & tests

While we have a lot of indirect coverage of read streams, there are corner
cases that are hard to test when only indirectly controlling and observing the
read stream.  This commit adds an SQL callable SRF interface for a read stream
and uses that in a few tests.

To make some of the tests possible, the injection point infrastructure in
test_aio had to be expanded to allow blocking IO completion.

Author: Andres Freund <[email protected]>
Reviewed-by: Nazir Bilal Yavuz <[email protected]>
Reviewed-by: Melanie Plageman <[email protected]>
Discussion: https://postgr.es/m/zljergweqti7x67lg5ije2rzjusie37nslsnkjkkby4laqqbfw@3p3zu522yykv
---
 src/test/modules/test_aio/meson.build         |   1 +
 .../modules/test_aio/t/004_read_stream.pl     | 275 ++++++++++++++
 src/test/modules/test_aio/test_aio--1.0.sql   |  19 +-
 src/test/modules/test_aio/test_aio.c          | 336 +++++++++++++++---
 src/tools/pgindent/typedefs.list              |   1 +
 5 files changed, 582 insertions(+), 50 deletions(-)
 create mode 100644 src/test/modules/test_aio/t/004_read_stream.pl

diff --git a/src/test/modules/test_aio/meson.build b/src/test/modules/test_aio/meson.build
index 18a797f3a3b..909f81d96c1 100644
--- a/src/test/modules/test_aio/meson.build
+++ b/src/test/modules/test_aio/meson.build
@@ -33,6 +33,7 @@ tests += {
       't/001_aio.pl',
       't/002_io_workers.pl',
       't/003_initdb.pl',
+      't/004_read_stream.pl',
     ],
   },
 }
diff --git a/src/test/modules/test_aio/t/004_read_stream.pl b/src/test/modules/test_aio/t/004_read_stream.pl
new file mode 100644
index 00000000000..0d123ac0ed5
--- /dev/null
+++ b/src/test/modules/test_aio/t/004_read_stream.pl
@@ -0,0 +1,275 @@
+# Copyright (c) 2025-2026, PostgreSQL Global Development Group
+
+use strict;
+use warnings FATAL => 'all';
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+use FindBin;
+use lib $FindBin::RealBin;
+
+use TestAio;
+
+
+my $node = PostgreSQL::Test::Cluster->new('test');
+$node->init();
+
+TestAio::configure($node);
+
+$node->append_conf(
+	'postgresql.conf', qq(
+max_connections=8
+io_method=worker
+));
+
+$node->start();
+test_setup($node);
+$node->stop();
+
+
+foreach my $method (TestAio::supported_io_methods())
+{
+	$node->adjust_conf('postgresql.conf', 'io_method', $method);
+	$node->start();
+	test_io_method($method, $node);
+	$node->stop();
+}
+
+done_testing();
+
+
+sub test_setup
+{
+	my $node = shift;
+
+	$node->safe_psql(
+		'postgres', qq(
+CREATE EXTENSION test_aio;
+
+CREATE TABLE largeish(k int not null) WITH (FILLFACTOR=10);
+INSERT INTO largeish(k) SELECT generate_series(1, 10000);
+));
+	ok(1, "setup");
+}
+
+
+sub test_repeated_blocks
+{
+	my $io_method = shift;
+	my $node = shift;
+
+	my $psql = $node->background_psql('postgres', on_error_stop => 0);
+
+	# Preventing larger reads makes testing easier
+	$psql->query_safe(qq/SET io_combine_limit = 1/);
+
+	# test miss of the same block twice in a row
+	$psql->query_safe(qq/SELECT evict_rel('largeish');/);
+
+	# block 0 grows the distance enough that the stream will look ahead and try
+	# to start a pending read for block 2 (and later block 4) twice before
+	# returning any buffers.
+	$psql->query_safe(
+		qq/SELECT * FROM read_stream_for_blocks('largeish',
+		   ARRAY[0, 2, 2, 4, 4]);/);
+
+	ok(1, "$io_method: stream missing the same block repeatedly");
+
+	$psql->query_safe(
+		qq/SELECT * FROM read_stream_for_blocks('largeish',
+		   ARRAY[0, 2, 2, 4, 4]);/);
+	ok(1, "$io_method: stream hitting the same block repeatedly");
+
+	# test hit of the same block twice in a row
+	$psql->query_safe(qq/SELECT evict_rel('largeish');/);
+	$psql->query_safe(
+		qq/SELECT * FROM read_stream_for_blocks('largeish',
+		   ARRAY[0, 1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1, 0]);/);
+	ok(1, "$io_method: stream accessing same block");
+
+	# Test repeated blocks with a temp table, using invalidate_rel_block()
+	# to evict individual local buffers.
+	$psql->query_safe(
+		qq/CREATE TEMP TABLE largeish_temp(k int not null) WITH (FILLFACTOR=10);
+		   INSERT INTO largeish_temp(k) SELECT generate_series(1, 200);/);
+
+	# Evict the specific blocks we'll request to force misses
+	$psql->query_safe(qq/SELECT invalidate_rel_block('largeish_temp', 0);/);
+	$psql->query_safe(qq/SELECT invalidate_rel_block('largeish_temp', 2);/);
+	$psql->query_safe(qq/SELECT invalidate_rel_block('largeish_temp', 4);/);
+
+	$psql->query_safe(
+		qq/SELECT * FROM read_stream_for_blocks('largeish_temp',
+		   ARRAY[0, 2, 2, 4, 4]);/);
+	ok(1, "$io_method: temp stream missing the same block repeatedly");
+
+	# Now the blocks are cached, so repeated access should be hits
+	$psql->query_safe(
+		qq/SELECT * FROM read_stream_for_blocks('largeish_temp',
+		   ARRAY[0, 2, 2, 4, 4]);/);
+	ok(1, "$io_method: temp stream hitting the same block repeatedly");
+
+	$psql->quit();
+}
+
+
+sub test_inject_foreign
+{
+	my $io_method = shift;
+	my $node = shift;
+
+	my $psql_a = $node->background_psql('postgres', on_error_stop => 0);
+	my $psql_b = $node->background_psql('postgres', on_error_stop => 0);
+
+	my $pid_a = $psql_a->query_safe(qq/SELECT pg_backend_pid();/);
+
+
+	###
+	# Test read stream encountering buffers undergoing IO in another backend,
+	# with the other backend's reads succeeding.
+	###
+	$psql_a->query_safe(qq/SELECT evict_rel('largeish');/);
+
+	$psql_b->query_safe(
+		qq/SELECT inj_io_completion_wait(pid=>pg_backend_pid(),
+		   relfilenode=>pg_relation_filenode('largeish'));/);
+
+	$psql_b->{stdin} .= qq/SELECT read_rel_block_ll('largeish',
+		blockno=>5, nblocks=>1);\n/;
+	$psql_b->{run}->pump_nb();
+
+	$node->poll_query_until(
+		'postgres', qq/SELECT wait_event FROM pg_stat_activity
+			WHERE wait_event = 'completion_wait';/,
+		'completion_wait');
+
+	# Block 5 is undergoing IO in session b, so session a will move on to start
+	# a new IO for block 7.
+	$psql_a->{stdin} .= qq/SELECT array_agg(blocknum) FROM
+		read_stream_for_blocks('largeish', ARRAY[0, 2, 5, 7]);\n/;
+	$psql_a->{run}->pump_nb();
+
+	$node->poll_query_until('postgres',
+		qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid_a),
+		'AioIoCompletion');
+
+	$node->safe_psql('postgres', qq/SELECT inj_io_completion_continue()/);
+
+	pump_until(
+		$psql_a->{run}, $psql_a->{timeout},
+		\$psql_a->{stdout}, qr/\{0,2,5,7\}/);
+
+	ok(1,
+		qq/$io_method: read stream encounters succeeding IO by another backend/
+	);
+
+	###
+	# Test read stream encountering buffers undergoing IO in another backend,
+	# with the other backend's reads failing.
+	###
+	$psql_a->query_safe(qq/SELECT evict_rel('largeish');/);
+
+	$psql_b->query_safe(
+		qq/SELECT inj_io_completion_wait(pid=>pg_backend_pid(),
+		   relfilenode=>pg_relation_filenode('largeish'));/);
+
+	$psql_b->query_safe(
+		qq/SELECT inj_io_short_read_attach(-errno_from_string('EIO'),
+		   pid=>pg_backend_pid(),
+		   relfilenode=>pg_relation_filenode('largeish'));/);
+
+	$psql_b->{stdin} .= qq/SELECT read_rel_block_ll('largeish',
+		blockno=>5, nblocks=>1);\n/;
+	$psql_b->{run}->pump_nb();
+
+	$node->poll_query_until(
+		'postgres',
+		qq/SELECT wait_event FROM pg_stat_activity
+		   WHERE wait_event = 'completion_wait';/,
+		'completion_wait');
+
+	$psql_a->{stdin} .= qq/SELECT array_agg(blocknum) FROM
+		read_stream_for_blocks('largeish', ARRAY[0, 2, 5, 7]);\n/;
+	$psql_a->{run}->pump_nb();
+
+	$node->poll_query_until('postgres',
+		qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid_a),
+		'AioIoCompletion');
+
+	$node->safe_psql('postgres', qq/SELECT inj_io_completion_continue()/);
+
+	pump_until(
+		$psql_a->{run}, $psql_a->{timeout},
+		\$psql_a->{stdout}, qr/\{0,2,5,7\}/);
+
+	pump_until($psql_b->{run}, $psql_b->{timeout}, \$psql_b->{stderr},
+		qr/ERROR.*could not read blocks 5\.\.5/);
+	ok(1, "$io_method: injected error occurred");
+	$psql_b->{stderr} = '';
+	$psql_b->query_safe(qq/SELECT inj_io_short_read_detach();/);
+
+	ok(1,
+		qq/$io_method: read stream encounters failing IO by another backend/);
+
+
+	###
+	# Test read stream encountering two buffers that are undergoing the same
+	# IO, started by another backend.
+	###
+	$psql_a->query_safe(qq/SELECT evict_rel('largeish');/);
+
+	$psql_b->query_safe(
+		qq/SELECT inj_io_completion_wait(pid=>pg_backend_pid(),
+		   relfilenode=>pg_relation_filenode('largeish'));/);
+
+	$psql_b->{stdin} .= qq/SELECT read_rel_block_ll('largeish',
+		blockno=>2, nblocks=>3);\n/;
+	$psql_b->{run}->pump_nb();
+
+	$node->poll_query_until(
+		'postgres',
+		qq/SELECT wait_event FROM pg_stat_activity
+			WHERE wait_event = 'completion_wait';/,
+		'completion_wait');
+
+	# Blocks 2 and 4 are undergoing IO initiated by session b
+	$psql_a->{stdin} .= qq/SELECT array_agg(blocknum) FROM
+		read_stream_for_blocks('largeish', ARRAY[0, 2, 4]);\n/;
+	$psql_a->{run}->pump_nb();
+
+	$node->poll_query_until('postgres',
+		qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid_a),
+		'AioIoCompletion');
+
+	$node->safe_psql('postgres', qq/SELECT inj_io_completion_continue()/);
+
+	pump_until(
+		$psql_a->{run}, $psql_a->{timeout},
+		\$psql_a->{stdout}, qr/\{0,2,4\}/);
+
+	ok(1, qq/$io_method: read stream encounters two buffer read in one IO/);
+
+	$psql_a->quit();
+	$psql_b->quit();
+}
+
+
+sub test_io_method
+{
+	my $io_method = shift;
+	my $node = shift;
+
+	is($node->safe_psql('postgres', 'SHOW io_method'),
+		$io_method, "$io_method: io_method set correctly");
+
+	test_repeated_blocks($io_method, $node);
+
+  SKIP:
+	{
+		skip 'Injection points not supported by this build', 1
+		  unless $ENV{enable_injection_points} eq 'yes';
+		test_inject_foreign($io_method, $node);
+	}
+}
diff --git a/src/test/modules/test_aio/test_aio--1.0.sql b/src/test/modules/test_aio/test_aio--1.0.sql
index 86beb563b6a..4a5a379b3c5 100644
--- a/src/test/modules/test_aio/test_aio--1.0.sql
+++ b/src/test/modules/test_aio/test_aio--1.0.sql
@@ -57,6 +57,13 @@ CREATE FUNCTION read_buffers(rel regclass, startblock int4, nblocks int4, OUT bl
 RETURNS SETOF record STRICT
 AS 'MODULE_PATHNAME' LANGUAGE C;
 
+/*
+ * Read stream related functions
+ */
+CREATE FUNCTION read_stream_for_blocks(rel regclass, blocks int4[], OUT blockoff int4, OUT blocknum int4, OUT buf int4)
+RETURNS SETOF record STRICT
+AS 'MODULE_PATHNAME' LANGUAGE C;
+
 
 /*
  * Handle related functions
@@ -98,8 +105,16 @@ AS 'MODULE_PATHNAME' LANGUAGE C;
 /*
  * Injection point related functions
  */
-CREATE FUNCTION inj_io_short_read_attach(result int)
-RETURNS pg_catalog.void STRICT
+CREATE FUNCTION inj_io_completion_wait(pid int DEFAULT NULL, relfilenode oid DEFAULT NULL, blockno int4 DEFAULT NULL)
+RETURNS pg_catalog.void
+AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION inj_io_completion_continue()
+RETURNS pg_catalog.void
+AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION inj_io_short_read_attach(result int, pid int DEFAULT NULL, relfilenode oid DEFAULT NULL)
+RETURNS pg_catalog.void
 AS 'MODULE_PATHNAME' LANGUAGE C;
 
 CREATE FUNCTION inj_io_short_read_detach()
diff --git a/src/test/modules/test_aio/test_aio.c b/src/test/modules/test_aio/test_aio.c
index 3e486a5806e..cb614551914 100644
--- a/src/test/modules/test_aio/test_aio.c
+++ b/src/test/modules/test_aio/test_aio.c
@@ -27,13 +27,18 @@
 #include "storage/buf_internals.h"
 #include "storage/bufmgr.h"
 #include "storage/checksum.h"
+#include "storage/condition_variable.h"
 #include "storage/ipc.h"
 #include "storage/lwlock.h"
+#include "storage/proc.h"
+#include "storage/procnumber.h"
+#include "storage/read_stream.h"
 #include "utils/array.h"
 #include "utils/builtins.h"
 #include "utils/injection_point.h"
 #include "utils/rel.h"
 #include "utils/tuplestore.h"
+#include "utils/wait_event.h"
 
 
 PG_MODULE_MAGIC;
@@ -41,13 +46,31 @@ PG_MODULE_MAGIC;
 
 typedef struct InjIoErrorState
 {
+	ConditionVariable cv;
+
 	bool		enabled_short_read;
 	bool		enabled_reopen;
 
+	bool		enabled_completion_wait;
+	Oid			completion_wait_relfilenode;
+	BlockNumber completion_wait_blockno;
+	pid_t		completion_wait_pid;
+	uint32		completion_wait_event;
+
 	bool		short_read_result_set;
+	Oid			short_read_relfilenode;
+	pid_t		short_read_pid;
 	int			short_read_result;
 } InjIoErrorState;
 
+typedef struct BlocksReadStreamData
+{
+	int			nblocks;
+	int			curblock;
+	uint32	   *blocks;
+} BlocksReadStreamData;
+
+
 static InjIoErrorState *inj_io_error_state;
 
 /* Shared memory init callbacks */
@@ -88,11 +111,15 @@ test_aio_shmem_startup(void)
 		/* First time through, initialize */
 		inj_io_error_state->enabled_short_read = false;
 		inj_io_error_state->enabled_reopen = false;
+		inj_io_error_state->enabled_completion_wait = false;
+
+		ConditionVariableInit(&inj_io_error_state->cv);
+		inj_io_error_state->completion_wait_event = WaitEventInjectionPointNew("completion_wait");
 
 #ifdef USE_INJECTION_POINTS
 		InjectionPointAttach("aio-process-completion-before-shared",
 							 "test_aio",
-							 "inj_io_short_read",
+							 "inj_io_completion_hook",
 							 NULL,
 							 0);
 		InjectionPointLoad("aio-process-completion-before-shared");
@@ -388,7 +415,7 @@ read_rel_block_ll(PG_FUNCTION_ARGS)
 	if (nblocks <= 0 || nblocks > PG_IOV_MAX)
 		elog(ERROR, "nblocks is out of range");
 
-	rel = relation_open(relid, AccessExclusiveLock);
+	rel = relation_open(relid, AccessShareLock);
 
 	for (int i = 0; i < nblocks; i++)
 	{
@@ -819,6 +846,85 @@ read_buffers(PG_FUNCTION_ARGS)
 }
 
 
+static BlockNumber
+read_stream_for_blocks_cb(ReadStream *stream,
+						  void *callback_private_data,
+						  void *per_buffer_data)
+{
+	BlocksReadStreamData *stream_data = callback_private_data;
+
+	if (stream_data->curblock >= stream_data->nblocks)
+		return InvalidBlockNumber;
+	return stream_data->blocks[stream_data->curblock++];
+}
+
+PG_FUNCTION_INFO_V1(read_stream_for_blocks);
+Datum
+read_stream_for_blocks(PG_FUNCTION_ARGS)
+{
+	Oid			relid = PG_GETARG_OID(0);
+	ArrayType  *blocksarray = PG_GETARG_ARRAYTYPE_P(1);
+	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+	Relation	rel;
+	BlocksReadStreamData stream_data;
+	ReadStream *stream;
+
+	InitMaterializedSRF(fcinfo, 0);
+
+	/*
+	 * We expect the input to be an N-element int4 array; verify that. We
+	 * don't need to use deconstruct_array() since the array data is just
+	 * going to look like a C array of N int4 values.
+	 */
+	if (ARR_NDIM(blocksarray) != 1 ||
+		ARR_HASNULL(blocksarray) ||
+		ARR_ELEMTYPE(blocksarray) != INT4OID)
+		elog(ERROR, "expected 1 dimensional int4 array");
+
+	stream_data.curblock = 0;
+	stream_data.nblocks = ARR_DIMS(blocksarray)[0];
+	stream_data.blocks = (uint32 *) ARR_DATA_PTR(blocksarray);
+
+	rel = relation_open(relid, AccessShareLock);
+
+	stream = read_stream_begin_relation(READ_STREAM_FULL,
+										NULL,
+										rel,
+										MAIN_FORKNUM,
+										read_stream_for_blocks_cb,
+										&stream_data,
+										0);
+
+	for (int i = 0; i < stream_data.nblocks; i++)
+	{
+		Buffer		buf = read_stream_next_buffer(stream, NULL);
+		Datum		values[3] = {0};
+		bool		nulls[3] = {0};
+
+		if (!BufferIsValid(buf))
+			elog(ERROR, "read_stream_next_buffer() call %d is unexpectedly invalid", i);
+
+		values[0] = Int32GetDatum(i);
+		values[1] = UInt32GetDatum(stream_data.blocks[i]);
+		values[2] = UInt32GetDatum(buf);
+
+		tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
+
+		ReleaseBuffer(buf);
+	}
+
+	if (read_stream_next_buffer(stream, NULL) != InvalidBuffer)
+		elog(ERROR, "read_stream_next_buffer() call %d is unexpectedly valid",
+			 stream_data.nblocks);
+
+	read_stream_end(stream);
+
+	relation_close(rel, NoLock);
+
+	return (Datum) 0;
+}
+
+
 PG_FUNCTION_INFO_V1(handle_get);
 Datum
 handle_get(PG_FUNCTION_ARGS)
@@ -889,15 +995,111 @@ batch_end(PG_FUNCTION_ARGS)
 }
 
 #ifdef USE_INJECTION_POINTS
-extern PGDLLEXPORT void inj_io_short_read(const char *name,
-										  const void *private_data,
-										  void *arg);
+extern PGDLLEXPORT void inj_io_completion_hook(const char *name,
+											   const void *private_data,
+											   void *arg);
 extern PGDLLEXPORT void inj_io_reopen(const char *name,
 									  const void *private_data,
 									  void *arg);
 
-void
-inj_io_short_read(const char *name, const void *private_data, void *arg)
+static bool
+inj_io_short_read_matches(PgAioHandle *ioh)
+{
+	PGPROC	   *io_proc;
+	int32		io_pid;
+	int32		inj_pid;
+	PgAioTargetData *td;
+
+	if (!inj_io_error_state->enabled_short_read)
+		return false;
+
+	if (!inj_io_error_state->short_read_result_set)
+		return false;
+
+	io_proc = GetPGProcByNumber(pgaio_io_get_owner(ioh));
+	io_pid = io_proc->pid;
+	inj_pid = inj_io_error_state->short_read_pid;
+
+	if (inj_pid != InvalidPid && inj_pid != io_pid)
+		return false;
+
+	td = pgaio_io_get_target_data(ioh);
+
+	if (inj_io_error_state->short_read_relfilenode != InvalidOid &&
+		td->smgr.rlocator.relNumber != inj_io_error_state->short_read_relfilenode)
+		return false;
+
+	/*
+	 * Only shorten reads that are actually longer than the target size,
+	 * otherwise we can trigger over-reads.
+	 */
+	if (inj_io_error_state->short_read_result >= ioh->result)
+		return false;
+
+	return true;
+}
+
+static bool
+inj_io_completion_wait_matches(PgAioHandle *ioh)
+{
+	PGPROC	   *io_proc;
+	int32		io_pid;
+	PgAioTargetData *td;
+	int32		inj_pid;
+	BlockNumber io_blockno;
+	BlockNumber inj_blockno;
+	Oid			inj_relfilenode;
+
+	if (!inj_io_error_state->enabled_completion_wait)
+		return false;
+
+	io_proc = GetPGProcByNumber(pgaio_io_get_owner(ioh));
+	io_pid = io_proc->pid;
+	inj_pid = inj_io_error_state->completion_wait_pid;
+
+	if (inj_pid != InvalidPid && inj_pid != io_pid)
+		return false;
+
+	td = pgaio_io_get_target_data(ioh);
+
+	inj_relfilenode = inj_io_error_state->completion_wait_relfilenode;
+	if (inj_relfilenode != InvalidOid &&
+		td->smgr.rlocator.relNumber != inj_relfilenode)
+		return false;
+
+	inj_blockno = inj_io_error_state->completion_wait_blockno;
+	io_blockno = td->smgr.blockNum;
+	if (inj_blockno != InvalidBlockNumber &&
+		!(inj_blockno >= io_blockno && inj_blockno < (io_blockno + td->smgr.nblocks)))
+		return false;
+
+	return true;
+}
+
+static void
+inj_io_completion_wait_hook(const char *name, const void *private_data, void *arg)
+{
+	PgAioHandle *ioh = (PgAioHandle *) arg;
+
+	if (!inj_io_completion_wait_matches(ioh))
+		return;
+
+	ConditionVariablePrepareToSleep(&inj_io_error_state->cv);
+
+	while (true)
+	{
+		if (!inj_io_completion_wait_matches(ioh))
+			break;
+
+		ConditionVariableSleep(&inj_io_error_state->cv,
+							   inj_io_error_state->completion_wait_event);
+	}
+
+	ConditionVariableCancelSleep();
+}
+
+static void
+inj_io_short_read_hook(const char *name, const void *private_data, void *arg)
 {
 	PgAioHandle *ioh = (PgAioHandle *) arg;
 
@@ -906,58 +1108,56 @@ inj_io_short_read(const char *name, const void *private_data, void *arg)
 				   inj_io_error_state->enabled_reopen),
 			errhidestmt(true), errhidecontext(true));
 
-	if (inj_io_error_state->enabled_short_read)
+	if (inj_io_short_read_matches(ioh))
 	{
+		struct iovec *iov = &pgaio_ctl->iovecs[ioh->iovec_off];
+		int32		old_result = ioh->result;
+		int32		new_result = inj_io_error_state->short_read_result;
+		int32		processed = 0;
+
+		ereport(LOG,
+				errmsg("short read inject point, changing result from %d to %d",
+					   old_result, new_result),
+				errhidestmt(true), errhidecontext(true));
+
 		/*
-		 * Only shorten reads that are actually longer than the target size,
-		 * otherwise we can trigger over-reads.
+		 * The underlying IO actually completed OK, and thus the "invalid"
+		 * portion of the IOV actually contains valid data. That can hide a
+		 * lot of problems, e.g. if we were to wrongly mark a buffer, that
+		 * wasn't read according to the shortened-read, IO as valid, the
+		 * contents would look valid and we might miss a bug.
+		 *
+		 * To avoid that, iterate through the IOV and zero out the "failed"
+		 * portion of the IO.
 		 */
-		if (inj_io_error_state->short_read_result_set
-			&& ioh->op == PGAIO_OP_READV
-			&& inj_io_error_state->short_read_result <= ioh->result)
+		for (int i = 0; i < ioh->op_data.read.iov_length; i++)
 		{
-			struct iovec *iov = &pgaio_ctl->iovecs[ioh->iovec_off];
-			int32		old_result = ioh->result;
-			int32		new_result = inj_io_error_state->short_read_result;
-			int32		processed = 0;
-
-			ereport(LOG,
-					errmsg("short read inject point, changing result from %d to %d",
-						   old_result, new_result),
-					errhidestmt(true), errhidecontext(true));
-
-			/*
-			 * The underlying IO actually completed OK, and thus the "invalid"
-			 * portion of the IOV actually contains valid data. That can hide
-			 * a lot of problems, e.g. if we were to wrongly mark a buffer,
-			 * that wasn't read according to the shortened-read, IO as valid,
-			 * the contents would look valid and we might miss a bug.
-			 *
-			 * To avoid that, iterate through the IOV and zero out the
-			 * "failed" portion of the IO.
-			 */
-			for (int i = 0; i < ioh->op_data.read.iov_length; i++)
+			if (processed + iov[i].iov_len <= new_result)
+				processed += iov[i].iov_len;
+			else if (processed <= new_result)
 			{
-				if (processed + iov[i].iov_len <= new_result)
-					processed += iov[i].iov_len;
-				else if (processed <= new_result)
-				{
-					uint32		ok_part = new_result - processed;
+				uint32		ok_part = new_result - processed;
 
-					memset((char *) iov[i].iov_base + ok_part, 0, iov[i].iov_len - ok_part);
-					processed += iov[i].iov_len;
-				}
-				else
-				{
-					memset((char *) iov[i].iov_base, 0, iov[i].iov_len);
-				}
+				memset((char *) iov[i].iov_base + ok_part, 0, iov[i].iov_len - ok_part);
+				processed += iov[i].iov_len;
+			}
+			else
+			{
+				memset((char *) iov[i].iov_base, 0, iov[i].iov_len);
 			}
-
-			ioh->result = new_result;
 		}
+
+		ioh->result = new_result;
 	}
 }
 
+void
+inj_io_completion_hook(const char *name, const void *private_data, void *arg)
+{
+	inj_io_completion_wait_hook(name, private_data, arg);
+	inj_io_short_read_hook(name, private_data, arg);
+}
+
 void
 inj_io_reopen(const char *name, const void *private_data, void *arg)
 {
@@ -971,6 +1171,42 @@ inj_io_reopen(const char *name, const void *private_data, void *arg)
 }
 #endif
 
+PG_FUNCTION_INFO_V1(inj_io_completion_wait);
+Datum
+inj_io_completion_wait(PG_FUNCTION_ARGS)
+{
+#ifdef USE_INJECTION_POINTS
+	inj_io_error_state->enabled_completion_wait = true;
+	inj_io_error_state->completion_wait_pid =
+		PG_ARGISNULL(0) ? InvalidPid : PG_GETARG_INT32(0);
+	inj_io_error_state->completion_wait_relfilenode =
+		PG_ARGISNULL(1) ? InvalidOid : PG_GETARG_OID(1);
+	inj_io_error_state->completion_wait_blockno =
+		PG_ARGISNULL(2) ? InvalidBlockNumber : PG_GETARG_UINT32(2);
+#else
+	elog(ERROR, "injection points not supported");
+#endif
+
+	PG_RETURN_VOID();
+}
+
+PG_FUNCTION_INFO_V1(inj_io_completion_continue);
+Datum
+inj_io_completion_continue(PG_FUNCTION_ARGS)
+{
+#ifdef USE_INJECTION_POINTS
+	inj_io_error_state->enabled_completion_wait = false;
+	inj_io_error_state->completion_wait_pid = InvalidPid;
+	inj_io_error_state->completion_wait_relfilenode = InvalidOid;
+	inj_io_error_state->completion_wait_blockno = InvalidBlockNumber;
+	ConditionVariableBroadcast(&inj_io_error_state->cv);
+#else
+	elog(ERROR, "injection points not supported");
+#endif
+
+	PG_RETURN_VOID();
+}
+
 PG_FUNCTION_INFO_V1(inj_io_short_read_attach);
 Datum
 inj_io_short_read_attach(PG_FUNCTION_ARGS)
@@ -980,6 +1216,10 @@ inj_io_short_read_attach(PG_FUNCTION_ARGS)
 	inj_io_error_state->short_read_result_set = !PG_ARGISNULL(0);
 	if (inj_io_error_state->short_read_result_set)
 		inj_io_error_state->short_read_result = PG_GETARG_INT32(0);
+	inj_io_error_state->short_read_pid =
+		PG_ARGISNULL(1) ? InvalidPid : PG_GETARG_INT32(1);
+	inj_io_error_state->short_read_relfilenode =
+		PG_ARGISNULL(2) ? InvalidOid : PG_GETARG_OID(2);
 #else
 	elog(ERROR, "injection points not supported");
 #endif
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 112653c1680..b2c7c9e6f7c 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -309,6 +309,7 @@ BlockSampler
 BlockSamplerData
 BlockedProcData
 BlockedProcsData
+BlocksReadStreamData
 BlocktableEntry
 BloomBuildState
 BloomFilter
-- 
2.53.0.1.gb2826b52eb

>From e4a2b6d32d7fa6e80c2dd8a933b7f174735c0e56 Mon Sep 17 00:00:00 2001
From: Andres Freund <[email protected]>
Date: Fri, 27 Mar 2026 13:09:05 -0400
Subject: [PATCH v9 3/4] bufmgr: Improve StartBufferIO interface

Until now StartBufferIO() had a few weaknesses:

- As it did not submit staged IOs, it was not safe to call StartBufferIO()
  where there was a potential for unsubmitted IO, which required
  AsyncReadBuffers() to use a wrapper (ReadBuffersCanStartIO()) around
  StartBufferIO().

- With nowait = true, the boolean return value did not allow to distinguish
  between no IO being necessary and having to wait, which would lead
  ReadBuffersCanStartIO() to unnecessarily submit staged IO.

- Several callers needed to handle both local and shared buffers, requiring
  the caller to differentiate between StartBufferIO() and StartLocalBufferIO()

- In a future commit some callers of StartBufferIO() want the BufferDesc's
  io_wref to be returned, to asynchronously wait for in-progress IO

- Indicating whether to wait with the nowait parameter was somewhat confusing
  compared to a wait parameter

Address these issues as follows:

- StartBufferIO() is renamed to StartSharedBufferIO()

- A new StartBufferIO() is introduced that supports both shared and local
  buffers

- The boolean return value has been replaced with an enum, indicating whether
  the IO is already done, already in progress or that the buffer has been
  readied for IO

- A new PgAioWaitRef * argument allows the caller to get the wait reference is
  desired.  All current callers pass NULL, a user of this will be introduced
  subsequently

- Instead of the nowait argument there now is wait

  This probably would not have been worthwhile on its own, but since all these
  lines needed to be touched anyway...

Author: Andres Freund <[email protected]>
Author: Melanie Plageman <[email protected]>
Discussion: https://postgr.es/m/zljergweqti7x67lg5ije2rzjusie37nslsnkjkkby4laqqbfw@3p3zu522yykv
---
 src/include/storage/buf_internals.h         |  24 +-
 src/backend/storage/buffer/bufmgr.c         | 299 ++++++++++++--------
 src/backend/storage/buffer/localbuf.c       |  46 ++-
 src/test/modules/test_aio/t/001_aio.pl      |  24 +-
 src/test/modules/test_aio/test_aio--1.0.sql |   2 +-
 src/test/modules/test_aio/test_aio.c        |  17 +-
 src/tools/pgindent/typedefs.list            |   1 +
 7 files changed, 261 insertions(+), 152 deletions(-)

diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h
index 8d1e16b5d51..ad1b7b2216a 100644
--- a/src/include/storage/buf_internals.h
+++ b/src/include/storage/buf_internals.h
@@ -554,8 +554,25 @@ extern void ScheduleBufferTagForWriteback(WritebackContext *wb_context,
 
 extern void TrackNewBufferPin(Buffer buf);
 
-/* solely to make it easier to write tests */
-extern bool StartBufferIO(BufferDesc *buf, bool forInput, bool nowait);
+/*
+ * Return value for StartBufferIO / StartSharedBufferIO / StartLocalBufferIO.
+ *
+ * When preparing a buffer for I/O and setting BM_IO_IN_PROGRESS, the buffer
+ * may already have I/O in progress or the I/O may have been done by another
+ * backend.  See the documentation of StartSharedBufferIO for more details.
+ */
+typedef enum StartBufferIOResult
+{
+	BUFFER_IO_ALREADY_DONE,
+	BUFFER_IO_IN_PROGRESS,
+	BUFFER_IO_READY_FOR_IO,
+} StartBufferIOResult;
+
+/* the following are exposed to make it easier to write tests */
+extern StartBufferIOResult StartBufferIO(Buffer buffer, bool forInput, bool wait,
+										 PgAioWaitRef *io_wref);
+extern StartBufferIOResult StartSharedBufferIO(BufferDesc *buf, bool forInput, bool wait,
+											   PgAioWaitRef *io_wref);
 extern void TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint64 set_flag_bits,
 							  bool forget_owner, bool release_aio);
 
@@ -600,7 +617,8 @@ extern BlockNumber ExtendBufferedRelLocal(BufferManagerRelation bmr,
 extern void MarkLocalBufferDirty(Buffer buffer);
 extern void TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty,
 								   uint64 set_flag_bits, bool release_aio);
-extern bool StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool nowait);
+extern StartBufferIOResult StartLocalBufferIO(BufferDesc *bufHdr, bool forInput,
+											  bool wait, PgAioWaitRef *io_wref);
 extern void FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln);
 extern void InvalidateLocalBuffer(BufferDesc *bufHdr, bool check_unreferenced);
 extern void DropRelationLocalBuffers(RelFileLocator rlocator,
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index e212f6110f2..39bea638c99 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -1148,6 +1148,7 @@ ZeroAndLockBuffer(Buffer buffer, ReadBufferMode mode, bool already_valid)
 	BufferDesc *bufHdr;
 	bool		need_to_zero;
 	bool		isLocalBuf = BufferIsLocal(buffer);
+	StartBufferIOResult sbres;
 
 	Assert(mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK);
 
@@ -1159,24 +1160,30 @@ ZeroAndLockBuffer(Buffer buffer, ReadBufferMode mode, bool already_valid)
 		 */
 		need_to_zero = false;
 	}
-	else if (isLocalBuf)
-	{
-		/* Simple case for non-shared buffers. */
-		bufHdr = GetLocalBufferDescriptor(-buffer - 1);
-		need_to_zero = StartLocalBufferIO(bufHdr, true, false);
-	}
 	else
 	{
-		/*
-		 * Take BM_IO_IN_PROGRESS, or discover that BM_VALID has been set
-		 * concurrently.  Even though we aren't doing I/O, that ensures that
-		 * we don't zero a page that someone else has pinned.  An exclusive
-		 * content lock wouldn't be enough, because readers are allowed to
-		 * drop the content lock after determining that a tuple is visible
-		 * (see buffer access rules in README).
-		 */
-		bufHdr = GetBufferDescriptor(buffer - 1);
-		need_to_zero = StartBufferIO(bufHdr, true, false);
+		if (isLocalBuf)
+		{
+			/* Simple case for non-shared buffers. */
+			bufHdr = GetLocalBufferDescriptor(-buffer - 1);
+			sbres = StartLocalBufferIO(bufHdr, true, true, NULL);
+		}
+		else
+		{
+			/*
+			 * Take BM_IO_IN_PROGRESS, or discover that BM_VALID has been set
+			 * concurrently.  Even though we aren't doing I/O, that ensures
+			 * that we don't zero a page that someone else has pinned.  An
+			 * exclusive content lock wouldn't be enough, because readers are
+			 * allowed to drop the content lock after determining that a tuple
+			 * is visible (see buffer access rules in README).
+			 */
+			bufHdr = GetBufferDescriptor(buffer - 1);
+			sbres = StartSharedBufferIO(bufHdr, true, true, NULL);
+		}
+
+		Assert(sbres != BUFFER_IO_IN_PROGRESS);
+		need_to_zero = sbres == BUFFER_IO_READY_FOR_IO;
 	}
 
 	if (need_to_zero)
@@ -1659,45 +1666,6 @@ CheckReadBuffersOperation(ReadBuffersOperation *operation, bool is_complete)
 #endif
 }
 
-/* helper for ReadBuffersCanStartIO(), to avoid repetition */
-static inline bool
-ReadBuffersCanStartIOOnce(Buffer buffer, bool nowait)
-{
-	if (BufferIsLocal(buffer))
-		return StartLocalBufferIO(GetLocalBufferDescriptor(-buffer - 1),
-								  true, nowait);
-	else
-		return StartBufferIO(GetBufferDescriptor(buffer - 1), true, nowait);
-}
-
-/*
- * Helper for AsyncReadBuffers that tries to get the buffer ready for IO.
- */
-static inline bool
-ReadBuffersCanStartIO(Buffer buffer, bool nowait)
-{
-	/*
-	 * If this backend currently has staged IO, we need to submit the pending
-	 * IO before waiting for the right to issue IO, to avoid the potential for
-	 * deadlocks (and, more commonly, unnecessary delays for other backends).
-	 */
-	if (!nowait && pgaio_have_staged())
-	{
-		if (ReadBuffersCanStartIOOnce(buffer, true))
-			return true;
-
-		/*
-		 * Unfortunately StartBufferIO() returning false doesn't allow to
-		 * distinguish between the buffer already being valid and IO already
-		 * being in progress. Since IO already being in progress is quite
-		 * rare, this approach seems fine.
-		 */
-		pgaio_submit_staged();
-	}
-
-	return ReadBuffersCanStartIOOnce(buffer, nowait);
-}
-
 /*
  * We track various stats related to buffer hits. Because this is done in a
  * few separate places, this helper exists for convenience.
@@ -1921,6 +1889,7 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
 	IOContext	io_context;
 	IOObject	io_object;
 	instr_time	io_start;
+	StartBufferIOResult status;
 
 	if (persistence == RELPERSISTENCE_TEMP)
 	{
@@ -1974,8 +1943,8 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
 	pgstat_prepare_report_checksum_failure(operation->smgr->smgr_rlocator.locator.dbOid);
 
 	/*
-	 * Get IO handle before ReadBuffersCanStartIO(), as pgaio_io_acquire()
-	 * might block, which we don't want after setting IO_IN_PROGRESS.
+	 * Get IO handle before StartBufferIO(), as pgaio_io_acquire() might
+	 * block, which we don't want after setting IO_IN_PROGRESS.
 	 *
 	 * If we need to wait for IO before we can get a handle, submit
 	 * already-staged IO first, so that other backends don't need to wait.
@@ -2004,31 +1973,41 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
 	 * for the outcome: either done, or something went wrong and we will
 	 * retry.
 	 */
-	if (!ReadBuffersCanStartIO(buffers[nblocks_done], false))
+	status = StartBufferIO(buffers[nblocks_done], true, true, NULL);
+	if (status != BUFFER_IO_READY_FOR_IO)
 	{
-		/*
-		 * Someone else has already completed this block, we're done.
-		 *
-		 * When IO is necessary, ->nblocks_done is updated in
-		 * ProcessReadBuffersResult(), but that is not called if no IO is
-		 * necessary. Thus update here.
-		 */
-		operation->nblocks_done += 1;
-		*nblocks_progress = 1;
-
 		pgaio_io_release(ioh);
-		pgaio_wref_clear(&operation->io_wref);
-
-		/*
-		 * Report and track this as a 'hit' for this backend, even though it
-		 * must have started out as a miss in PinBufferForBlock(). The other
-		 * backend will track this as a 'read'.
-		 */
-		TrackBufferHit(io_object, io_context,
-					   operation->rel, operation->persistence,
-					   operation->smgr, operation->forknum,
-					   blocknum);
-		return false;
+		*nblocks_progress = 1;
+		if (status == BUFFER_IO_ALREADY_DONE)
+		{
+			/*
+			 * Someone has already completed this block, we're done.
+			 *
+			 * When IO is necessary, ->nblocks_done is updated in
+			 * ProcessReadBuffersResult(), but that is not called if no IO is
+			 * necessary. Thus update here.
+			 */
+			operation->nblocks_done += 1;
+			Assert(operation->nblocks_done <= operation->nblocks);
+
+			Assert(!pgaio_wref_valid(&operation->io_wref));
+
+			/*
+			 * Report and track this as a 'hit' for this backend, even though
+			 * it must have started out as a miss in PinBufferForBlock(). The
+			 * other backend will track this as a 'read'.
+			 */
+			TrackBufferHit(io_object, io_context,
+						   operation->rel, operation->persistence,
+						   operation->smgr, operation->forknum,
+						   blocknum);
+			return false;
+		}
+
+		/* The IO is already in-progress */
+		Assert(status == BUFFER_IO_IN_PROGRESS);
+
+		return true;
 	}
 
 	Assert(io_buffers[0] == buffers[nblocks_done]);
@@ -2037,9 +2016,9 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
 
 	/*
 	 * NB: As little code as possible should be added between the
-	 * ReadBuffersCanStartIO() above, the further ReadBuffersCanStartIO()s
-	 * below and the smgrstartreadv(), as some of the buffers are now marked
-	 * as IO_IN_PROGRESS and will thus cause other backends to wait.
+	 * StartBufferIO() above, the further StartBufferIO()s below and the
+	 * smgrstartreadv(), as some of the buffers are now marked as
+	 * IO_IN_PROGRESS and will thus cause other backends to wait.
 	 */
 
 	/*
@@ -2053,7 +2032,8 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
 		Assert(BufferGetBlockNumber(buffers[i - 1]) ==
 			   BufferGetBlockNumber(buffers[i]) - 1);
 
-		if (!ReadBuffersCanStartIO(buffers[i], true))
+		status = StartBufferIO(buffers[i], true, false, NULL);
+		if (status != BUFFER_IO_READY_FOR_IO)
 			break;
 
 		Assert(io_buffers[io_buffers_len] == buffers[i]);
@@ -2893,16 +2873,23 @@ ExtendBufferedRelShared(BufferManagerRelation bmr,
 			 * We *must* do smgr[zero]extend before succeeding, else the page
 			 * will not be reserved by the kernel, and the next P_NEW call
 			 * will decide to return the same page.  Clear the BM_VALID bit,
-			 * do StartBufferIO() and proceed.
+			 * do StartSharedBufferIO() and proceed.
 			 *
 			 * Loop to handle the very small possibility that someone re-sets
-			 * BM_VALID between our clearing it and StartBufferIO inspecting
-			 * it.
+			 * BM_VALID between our clearing it and StartSharedBufferIO
+			 * inspecting it.
 			 */
-			do
+			while (true)
 			{
+				StartBufferIOResult sbres;
+
 				pg_atomic_fetch_and_u64(&existing_hdr->state, ~BM_VALID);
-			} while (!StartBufferIO(existing_hdr, true, false));
+
+				sbres = StartSharedBufferIO(existing_hdr, true, true, NULL);
+
+				if (sbres != BUFFER_IO_ALREADY_DONE)
+					break;
+			}
 		}
 		else
 		{
@@ -2928,7 +2915,7 @@ ExtendBufferedRelShared(BufferManagerRelation bmr,
 			LWLockRelease(partition_lock);
 
 			/* XXX: could combine the locked operations in it with the above */
-			StartBufferIO(victim_buf_hdr, true, false);
+			StartSharedBufferIO(victim_buf_hdr, true, true, NULL);
 		}
 	}
 
@@ -4450,7 +4437,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
 	 * someone else flushed the buffer before we could, so we need not do
 	 * anything.
 	 */
-	if (!StartBufferIO(buf, false, false))
+	if (StartSharedBufferIO(buf, false, true, NULL) == BUFFER_IO_ALREADY_DONE)
 		return;
 
 	/* Setup error traceback support for ereport() */
@@ -7029,6 +7016,13 @@ WaitIO(BufferDesc *buf)
 {
 	ConditionVariable *cv = BufferDescriptorGetIOCV(buf);
 
+	/*
+	 * Should never end up here with unsubmitted IO, as no AIO unaware code
+	 * may be used while in batch mode and AIO aware code needs to have
+	 * submitted all staged IO to avoid deadlocks & slowness.
+	 */
+	Assert(!pgaio_have_staged());
+
 	ConditionVariablePrepareToSleep(cv);
 	for (;;)
 	{
@@ -7081,30 +7075,46 @@ WaitIO(BufferDesc *buf)
 }
 
 /*
- * StartBufferIO: begin I/O on this buffer
+ * StartSharedBufferIO: begin I/O on this buffer
  *	(Assumptions)
- *	My process is executing no IO on this buffer
  *	The buffer is Pinned
  *
- * In some scenarios multiple backends could attempt the same I/O operation
- * concurrently.  If someone else has already started I/O on this buffer then
- * we will wait for completion of the IO using WaitIO().
+ * In several scenarios the buffer may already be undergoing I/O in this or
+ * another backend. How to best handle that depends on the caller's
+ * situation. It might be appropriate to wait synchronously (e.g., because the
+ * buffer is about to be invalidated); wait asynchronously, using the buffer's
+ * IO wait reference (e.g., because the caller is doing readahead and doesn't
+ * need the buffer to be ready immediately); or to not wait at all (e.g.,
+ * because the caller is trying to combine IO for this buffer with another
+ * buffer).
  *
- * Input operations are only attempted on buffers that are not BM_VALID,
- * and output operations only on buffers that are BM_VALID and BM_DIRTY,
- * so we can always tell if the work is already done.
+ * How and whether to wait is controlled by the wait and io_wref
+ * parameters. In detail:
  *
- * Returns true if we successfully marked the buffer as I/O busy,
- * false if someone else already did the work.
+ * - If the caller passes a non-NULL io_wref and the buffer has an I/O wait
+ *   reference, the *io_wref is set to the buffer's io_wref and
+ *   BUFFER_IO_IN_PROGRESS is returned. This is done regardless of the wait
+ *   parameter.
  *
- * If nowait is true, then we don't wait for an I/O to be finished by another
- * backend.  In that case, false indicates either that the I/O was already
- * finished, or is still in progress.  This is useful for callers that want to
- * find out if they can perform the I/O as part of a larger operation, without
- * waiting for the answer or distinguishing the reasons why not.
+ * - If the caller passes a NULL io_wref (i.e. the caller does not want to
+ *   asynchronously wait for the completion of the IO), wait = false and the
+ *   buffer is undergoing IO, BUFFER_IO_IN_PROGRESS is returned.
+ *
+ * - If wait = true and either the buffer does not have a wait reference,
+ *   or the caller passes io_wref = NULL, WaitIO() is used to wait for the IO
+ *   to complete.  To avoid the potential of deadlocks and unnecessary delays,
+ *   all staged I/O is submitted before waiting.
+ *
+ * Input operations are only attempted on buffers that are not BM_VALID, and
+ * output operations only on buffers that are BM_VALID and BM_DIRTY, so we can
+ * always tell if the work is already done.  If no I/O is necessary,
+ * BUFFER_IO_ALREADY_DONE is returned.
+ *
+ * If we successfully marked the buffer as BM_IO_IN_PROGRESS,
+ * BUFFER_IO_READY_FOR_IO is returned.
  */
-bool
-StartBufferIO(BufferDesc *buf, bool forInput, bool nowait)
+StartBufferIOResult
+StartSharedBufferIO(BufferDesc *buf, bool forInput, bool wait, PgAioWaitRef *io_wref)
 {
 	uint64		buf_state;
 
@@ -7116,10 +7126,42 @@ StartBufferIO(BufferDesc *buf, bool forInput, bool nowait)
 
 		if (!(buf_state & BM_IO_IN_PROGRESS))
 			break;
-		UnlockBufHdr(buf);
-		if (nowait)
-			return false;
-		WaitIO(buf);
+
+		/* Join the existing IO */
+		if (io_wref != NULL && pgaio_wref_valid(&buf->io_wref))
+		{
+			*io_wref = buf->io_wref;
+			UnlockBufHdr(buf);
+
+			return BUFFER_IO_IN_PROGRESS;
+		}
+		else if (!wait)
+		{
+			UnlockBufHdr(buf);
+			return BUFFER_IO_IN_PROGRESS;
+		}
+		else
+		{
+			/*
+			 * With wait = true, we always have to wait if the caller has
+			 * passed io_wref = NULL.
+			 *
+			 * Even with io_wref != NULL, we have to wait if the buffer's wait
+			 * ref is not valid but the IO is in progress, someone else
+			 * started IO but hasn't set the wait ref yet. We have no choice
+			 * but to wait until the IO completes.
+			 */
+			UnlockBufHdr(buf);
+
+			/*
+			 * If this backend currently has staged IO, submit it before
+			 * waiting for in-progress IO, to avoid potential deadlocks and
+			 * unnecessary delays.
+			 */
+			pgaio_submit_staged();
+
+			WaitIO(buf);
+		}
 	}
 
 	/* Once we get here, there is definitely no I/O active on this buffer */
@@ -7128,9 +7170,14 @@ StartBufferIO(BufferDesc *buf, bool forInput, bool nowait)
 	if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY))
 	{
 		UnlockBufHdr(buf);
-		return false;
+		return BUFFER_IO_ALREADY_DONE;
 	}
 
+	/*
+	 * No IO in progress and not already done; we will start IO. It's possible
+	 * that the IO was in progress but we're not done, because the IO errored
+	 * out. We'll do the IO ourselves.
+	 */
 	UnlockBufHdrExt(buf, buf_state,
 					BM_IO_IN_PROGRESS, 0,
 					0);
@@ -7138,7 +7185,31 @@ StartBufferIO(BufferDesc *buf, bool forInput, bool nowait)
 	ResourceOwnerRememberBufferIO(CurrentResourceOwner,
 								  BufferDescriptorGetBuffer(buf));
 
-	return true;
+	return BUFFER_IO_READY_FOR_IO;
+}
+
+/*
+ * Wrapper around StartSharedBufferIO / StartLocalBufferIO. Only to be used
+ * when the caller doesn't otherwise need to care about local vs shared. See
+ * StartSharedBufferIO() for details.
+ */
+StartBufferIOResult
+StartBufferIO(Buffer buffer, bool forInput, bool wait, PgAioWaitRef *io_wref)
+{
+	BufferDesc *buf_hdr;
+
+	if (BufferIsLocal(buffer))
+	{
+		buf_hdr = GetLocalBufferDescriptor(-buffer - 1);
+
+		return StartLocalBufferIO(buf_hdr, forInput, wait, io_wref);
+	}
+	else
+	{
+		buf_hdr = GetBufferDescriptor(buffer - 1);
+
+		return StartSharedBufferIO(buf_hdr, forInput, wait, io_wref);
+	}
 }
 
 /*
diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c
index 404c6bccbdd..a31cf94a252 100644
--- a/src/backend/storage/buffer/localbuf.c
+++ b/src/backend/storage/buffer/localbuf.c
@@ -189,9 +189,10 @@ FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln)
 
 	/*
 	 * Try to start an I/O operation.  There currently are no reasons for
-	 * StartLocalBufferIO to return false, so we raise an error in that case.
+	 * StartLocalBufferIO to return anything other than
+	 * BUFFER_IO_READY_FOR_IO, so we raise an error in that case.
 	 */
-	if (!StartLocalBufferIO(bufHdr, false, false))
+	if (StartLocalBufferIO(bufHdr, false, true, NULL) != BUFFER_IO_READY_FOR_IO)
 		elog(ERROR, "failed to start write IO on local buffer");
 
 	/* Find smgr relation for buffer */
@@ -435,7 +436,7 @@ ExtendBufferedRelLocal(BufferManagerRelation bmr,
 			pg_atomic_unlocked_write_u64(&existing_hdr->state, buf_state);
 
 			/* no need to loop for local buffers */
-			StartLocalBufferIO(existing_hdr, true, false);
+			StartLocalBufferIO(existing_hdr, true, true, NULL);
 		}
 		else
 		{
@@ -451,7 +452,7 @@ ExtendBufferedRelLocal(BufferManagerRelation bmr,
 
 			hresult->id = victim_buf_id;
 
-			StartLocalBufferIO(victim_buf_hdr, true, false);
+			StartLocalBufferIO(victim_buf_hdr, true, true, NULL);
 		}
 	}
 
@@ -517,26 +518,41 @@ MarkLocalBufferDirty(Buffer buffer)
 }
 
 /*
- * Like StartBufferIO, but for local buffers
+ * Like StartSharedBufferIO, but for local buffers
  */
-bool
-StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool nowait)
+StartBufferIOResult
+StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool wait, PgAioWaitRef *io_wref)
 {
 	uint64		buf_state;
 
 	/*
 	 * With AIO the buffer could have IO in progress, e.g. when there are two
-	 * scans of the same relation. Either wait for the other IO or return
-	 * false.
+	 * scans of the same relation.  Either wait for the other IO (if wait =
+	 * true and io_wref == NULL) or return BUFFER_IO_IN_PROGRESS;
 	 */
 	if (pgaio_wref_valid(&bufHdr->io_wref))
 	{
-		PgAioWaitRef iow = bufHdr->io_wref;
+		PgAioWaitRef buf_wref = bufHdr->io_wref;
 
-		if (nowait)
-			return false;
+		if (io_wref != NULL)
+		{
+			/* We've already asynchronously started this IO, so join it */
+			*io_wref = buf_wref;
+			return BUFFER_IO_IN_PROGRESS;
+		}
 
-		pgaio_wref_wait(&iow);
+		/*
+		 * For temp buffers we should never need to wait in
+		 * StartLocalBufferIO() when called with io_wref == NULL while there
+		 * are staged IOs, as it's not allowed to call code that is not aware
+		 * of AIO while in batch mode.
+		 */
+		Assert(!pgaio_have_staged());
+
+		if (!wait)
+			return BUFFER_IO_IN_PROGRESS;
+
+		pgaio_wref_wait(&buf_wref);
 	}
 
 	/* Once we get here, there is definitely no I/O active on this buffer */
@@ -545,14 +561,14 @@ StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool nowait)
 	buf_state = pg_atomic_read_u64(&bufHdr->state);
 	if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY))
 	{
-		return false;
+		return BUFFER_IO_ALREADY_DONE;
 	}
 
 	/* BM_IO_IN_PROGRESS isn't currently used for local buffers */
 
 	/* local buffers don't track IO using resowners */
 
-	return true;
+	return BUFFER_IO_READY_FOR_IO;
 }
 
 /*
diff --git a/src/test/modules/test_aio/t/001_aio.pl b/src/test/modules/test_aio/t/001_aio.pl
index 0317c991f9f..1f9e83690f4 100644
--- a/src/test/modules/test_aio/t/001_aio.pl
+++ b/src/test/modules/test_aio/t/001_aio.pl
@@ -383,7 +383,7 @@ sub test_startwait_io
 		$io_method,
 		$psql_a,
 		"first StartBufferIO",
-		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false);),
+		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true);),
 		qr/^t$/,
 		qr/^$/);
 
@@ -392,14 +392,14 @@ sub test_startwait_io
 		$io_method,
 		$psql_a,
 		"second StartBufferIO fails, same session",
-		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>true);),
+		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>false);),
 		qr/^f$/,
 		qr/^$/);
 	psql_like(
 		$io_method,
 		$psql_b,
 		"second StartBufferIO fails, other session",
-		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>true);),
+		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>false);),
 		qr/^f$/,
 		qr/^$/);
 
@@ -409,7 +409,7 @@ sub test_startwait_io
 		$node,
 		$psql_b,
 		"blocking start buffer io",
-		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false);),
+		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true);),
 		"BufferIo");
 
 	# Terminate the IO, without marking it as success, this should trigger the
@@ -438,7 +438,7 @@ sub test_startwait_io
 		$io_method,
 		$psql_a,
 		"blocking buffer io w/ success: first start buffer io",
-		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false);),
+		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true);),
 		qr/^t$/,
 		qr/^$/);
 
@@ -448,7 +448,7 @@ sub test_startwait_io
 		$node,
 		$psql_b,
 		"blocking start buffer io",
-		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false);),
+		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true);),
 		"BufferIo");
 
 	# Terminate the IO, marking it as success
@@ -486,7 +486,7 @@ INSERT INTO tmp_ok SELECT generate_series(1, 10000);
 		$io_method,
 		$psql_a,
 		"first StartLocalBufferIO",
-		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>true);),
+		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>false);),
 		qr/^t$/,
 		qr/^$/);
 
@@ -497,7 +497,7 @@ INSERT INTO tmp_ok SELECT generate_series(1, 10000);
 		$io_method,
 		$psql_a,
 		"second StartLocalBufferIO succeeds, same session",
-		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>true);),
+		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>false);),
 		qr/^t$/,
 		qr/^$/);
 
@@ -509,7 +509,7 @@ INSERT INTO tmp_ok SELECT generate_series(1, 10000);
 		$io_method,
 		$psql_a,
 		"StartLocalBufferIO after not marking valid succeeds, same session",
-		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>true);),
+		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>false);),
 		qr/^t$/,
 		qr/^$/);
 
@@ -524,7 +524,7 @@ INSERT INTO tmp_ok SELECT generate_series(1, 10000);
 		$io_method,
 		$psql_a,
 		"StartLocalBufferIO after marking valid fails",
-		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false);),
+		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true);),
 		qr/^f$/,
 		qr/^$/);
 
@@ -1612,7 +1612,7 @@ read_buffers('$table', 1, 3)|,
 		my $buf_id =
 		  $psql_b->query_safe(qq|SELECT buffer_create_toy('$table', 3)|);
 		$psql_b->query_safe(
-			qq|SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false)|
+			qq|SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true)|
 		);
 
 		query_wait_block(
@@ -1637,7 +1637,7 @@ read_buffers('$table', 1, 3)|,
 		$buf_id =
 		  $psql_b->query_safe(qq|SELECT buffer_create_toy('$table', 3)|);
 		$psql_b->query_safe(
-			qq|SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false)|
+			qq|SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true)|
 		);
 
 		query_wait_block(
diff --git a/src/test/modules/test_aio/test_aio--1.0.sql b/src/test/modules/test_aio/test_aio--1.0.sql
index 4a5a379b3c5..53c83b74e53 100644
--- a/src/test/modules/test_aio/test_aio--1.0.sql
+++ b/src/test/modules/test_aio/test_aio--1.0.sql
@@ -45,7 +45,7 @@ CREATE FUNCTION buffer_create_toy(rel regclass, blockno int4)
 RETURNS pg_catalog.int4 STRICT
 AS 'MODULE_PATHNAME' LANGUAGE C;
 
-CREATE FUNCTION buffer_call_start_io(buffer int, for_input bool, nowait bool)
+CREATE FUNCTION buffer_call_start_io(buffer int, for_input bool, wait bool)
 RETURNS pg_catalog.bool STRICT
 AS 'MODULE_PATHNAME' LANGUAGE C;
 
diff --git a/src/test/modules/test_aio/test_aio.c b/src/test/modules/test_aio/test_aio.c
index cb614551914..3e13f38c902 100644
--- a/src/test/modules/test_aio/test_aio.c
+++ b/src/test/modules/test_aio/test_aio.c
@@ -436,13 +436,13 @@ read_rel_block_ll(PG_FUNCTION_ARGS)
 	if (RelationUsesLocalBuffers(rel))
 	{
 		for (int i = 0; i < nblocks; i++)
-			StartLocalBufferIO(buf_hdrs[i], true, false);
+			StartLocalBufferIO(buf_hdrs[i], true, true, NULL);
 		pgaio_io_set_flag(ioh, PGAIO_HF_REFERENCES_LOCAL);
 	}
 	else
 	{
 		for (int i = 0; i < nblocks; i++)
-			StartBufferIO(buf_hdrs[i], true, false);
+			StartSharedBufferIO(buf_hdrs[i], true, true, NULL);
 	}
 
 	pgaio_io_set_handle_data_32(ioh, (uint32 *) bufs, nblocks);
@@ -625,15 +625,18 @@ buffer_call_start_io(PG_FUNCTION_ARGS)
 {
 	Buffer		buf = PG_GETARG_INT32(0);
 	bool		for_input = PG_GETARG_BOOL(1);
-	bool		nowait = PG_GETARG_BOOL(2);
+	bool		wait = PG_GETARG_BOOL(2);
+	StartBufferIOResult result;
 	bool		can_start;
 
 	if (BufferIsLocal(buf))
-		can_start = StartLocalBufferIO(GetLocalBufferDescriptor(-buf - 1),
-									   for_input, nowait);
+		result = StartLocalBufferIO(GetLocalBufferDescriptor(-buf - 1),
+									for_input, wait, NULL);
 	else
-		can_start = StartBufferIO(GetBufferDescriptor(buf - 1),
-								  for_input, nowait);
+		result = StartSharedBufferIO(GetBufferDescriptor(buf - 1),
+									 for_input, wait, NULL);
+
+	can_start = result == BUFFER_IO_READY_FOR_IO;
 
 	/*
 	 * For tests we don't want the resowner release preventing us from
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index b2c7c9e6f7c..e3c1007abdf 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2946,6 +2946,7 @@ SplitTextOutputData
 SplitVar
 StackElem
 StakindFlags
+StartBufferIOResult
 StartDataPtrType
 StartLOPtrType
 StartLOsPtrType
-- 
2.53.0.1.gb2826b52eb

>From dae9b2d20390d24a12281850049e84b50ec8b6b0 Mon Sep 17 00:00:00 2001
From: Andres Freund <[email protected]>
Date: Fri, 27 Mar 2026 12:11:53 -0400
Subject: [PATCH v9 4/4] aio: Don't wait for already in-progress IO

When a backend attempts to start a read IO and finds the first buffer
already has I/O in progress, previously it waited for that I/O to
complete before initiating reads for any of the subsequent buffers.

Although the backend must wait for the I/O to finish when acquiring the
buffer, there's no reason for it to wait when setting up the read
operation. Waiting at this point prevents the backend from starting I/O
on subsequent buffers and can significantly reduce concurrency.

This matters in two workloads: when multiple backends scan the same
relation concurrently, and when a single backend requests the same block
multiple times within the readahead distance.

If backends wait each time they encounter an in-progress read,
the access pattern effectively degenerates into synchronous I/O.

To fix this, when encountering an already in-progress IO for the head
buffer, a backend now records the buffer's wait reference and defers
waiting until WaitReadBuffers(), when it actually needs to acquire the
buffer.

In rare cases, a backend may still need to wait synchronously at IO
start time: if another backend has set BM_IO_IN_PROGRESS on the buffer
but has not yet set the wait reference. Such windows should be brief and
uncommon.

Author: Melanie Plageman <[email protected]>
Reviewed-by: Andres Freund <[email protected]>
Reviewed-by: Nazir Bilal Yavuz <[email protected]>
Discussion: https://postgr.es/m/flat/zljergweqti7x67lg5ije2rzjusie37nslsnkjkkby4laqqbfw%403p3zu522yykv
---
 src/include/storage/bufmgr.h                |   4 +-
 src/backend/storage/buffer/bufmgr.c         |  89 ++++++--
 src/test/modules/test_aio/t/001_aio.pl      | 216 ++++++++++++++++++--
 src/test/modules/test_aio/test_aio--1.0.sql |   2 +-
 src/test/modules/test_aio/test_aio.c        |  16 +-
 5 files changed, 288 insertions(+), 39 deletions(-)

diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index 4017896f951..dd41b92f944 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -144,9 +144,11 @@ struct ReadBuffersOperation
 	 */
 	Buffer	   *buffers;
 	BlockNumber blocknum;
-	int			flags;
+	uint16		flags;
 	int16		nblocks;
 	int16		nblocks_done;
+	/* true if waiting on another backend's IO */
+	bool		foreign_io;
 	PgAioWaitRef io_wref;
 	PgAioReturn io_return;
 };
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 39bea638c99..b14d2ce1b52 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -1800,8 +1800,11 @@ WaitReadBuffers(ReadBuffersOperation *operation)
 			 * b) reports some time as waiting, even if we never waited
 			 *
 			 * we first check if we already know the IO is complete.
+			 *
+			 * Note that operation->io_return is uninitialized for foreign IO,
+			 * so we cannot use the cheaper PGAIO_RS_UNKNOWN pre-check.
 			 */
-			if (aio_ret->result.status == PGAIO_RS_UNKNOWN &&
+			if ((operation->foreign_io || aio_ret->result.status == PGAIO_RS_UNKNOWN) &&
 				!pgaio_wref_check_done(&operation->io_wref))
 			{
 				instr_time	io_start = pgstat_prepare_io_time(track_io_timing);
@@ -1820,11 +1823,45 @@ WaitReadBuffers(ReadBuffersOperation *operation)
 				Assert(pgaio_wref_check_done(&operation->io_wref));
 			}
 
-			/*
-			 * We now are sure the IO completed. Check the results. This
-			 * includes reporting on errors if there were any.
-			 */
-			ProcessReadBuffersResult(operation);
+			if (unlikely(operation->foreign_io))
+			{
+				Buffer		buffer = operation->buffers[operation->nblocks_done];
+				BufferDesc *desc = BufferIsLocal(buffer) ?
+					GetLocalBufferDescriptor(-buffer - 1) :
+					GetBufferDescriptor(buffer - 1);
+				uint64		buf_state = pg_atomic_read_u64(&desc->state);
+
+				if (buf_state & BM_VALID)
+				{
+					BlockNumber blocknum = operation->blocknum + operation->nblocks_done;
+
+					operation->nblocks_done += 1;
+					Assert(operation->nblocks_done <= operation->nblocks);
+
+					/*
+					 * Track this as a 'hit' for this backend. The backend
+					 * performing the IO will track it as a 'read'.
+					 */
+					TrackBufferHit(io_object, io_context,
+								   operation->rel, operation->persistence,
+								   operation->smgr, operation->forknum,
+								   blocknum);
+				}
+
+				/*
+				 * If the foreign IO failed and left the buffer invalid,
+				 * nblocks_done is not incremented. The retry loop below will
+				 * call AsyncReadBuffers() which will attempt the IO itself.
+				 */
+			}
+			else
+			{
+				/*
+				 * We now are sure the IO completed. Check the results. This
+				 * includes reporting on errors if there were any.
+				 */
+				ProcessReadBuffersResult(operation);
+			}
 		}
 
 		/*
@@ -1870,7 +1907,8 @@ WaitReadBuffers(ReadBuffersOperation *operation)
  * affected by the call. If the first buffer is valid, *nblocks_progress is
  * set to 1 and operation->nblocks_done is incremented.
  *
- * Returns true if IO was initiated, false if no IO was necessary.
+ * Returns true if IO was initiated or is already in progress (foreign IO),
+ * false if the buffer was already valid.
  */
 static bool
 AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
@@ -1943,8 +1981,9 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
 	pgstat_prepare_report_checksum_failure(operation->smgr->smgr_rlocator.locator.dbOid);
 
 	/*
-	 * Get IO handle before StartBufferIO(), as pgaio_io_acquire() might
-	 * block, which we don't want after setting IO_IN_PROGRESS.
+	 * We must get an IO handle before StartBufferIO(), as pgaio_io_acquire()
+	 * might block, which we don't want after setting IO_IN_PROGRESS. If we
+	 * don't need to do the IO, we'll release the handle.
 	 *
 	 * If we need to wait for IO before we can get a handle, submit
 	 * already-staged IO first, so that other backends don't need to wait.
@@ -1966,14 +2005,34 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
 		ioh = pgaio_io_acquire(CurrentResourceOwner, &operation->io_return);
 	}
 
+	operation->foreign_io = false;
+	pgaio_wref_clear(&operation->io_wref);
+
 	/*
-	 * Check if we can start IO on the first to-be-read buffer.
+	 * Try to start IO on the first buffer in a new run of blocks. If AIO is
+	 * in progress, be it in this backend or another backend, we just
+	 * associate the wait reference with the operation and wait in
+	 * WaitReadBuffers(). This turns out to be important for performance in
+	 * two workloads:
 	 *
-	 * If an I/O is already in progress in another backend, we want to wait
-	 * for the outcome: either done, or something went wrong and we will
-	 * retry.
+	 * 1) A read stream that has to read the same block multiple times within
+	 * the readahead distance. This can happen e.g. for the table accesses of
+	 * an index scan.
+	 *
+	 * 2) Concurrent scans by multiple backends on the same relation.
+	 *
+	 * If we were to synchronously wait for the in-progress IO, we'd not be
+	 * able to keep enough I/O in flight.
+	 *
+	 * If we do find there is ongoing I/O for the buffer, we set up a 1-block
+	 * ReadBuffersOperation that WaitReadBuffers then can wait on.
+	 *
+	 * It's possible that another backend has started IO on the buffer but not
+	 * yet set its wait reference. In this case, we have no choice but to wait
+	 * for either the wait reference to be valid or the IO to be done.
 	 */
-	status = StartBufferIO(buffers[nblocks_done], true, true, NULL);
+	status = StartBufferIO(buffers[nblocks_done], true, true,
+						   &operation->io_wref);
 	if (status != BUFFER_IO_READY_FOR_IO)
 	{
 		pgaio_io_release(ioh);
@@ -2006,6 +2065,8 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
 
 		/* The IO is already in-progress */
 		Assert(status == BUFFER_IO_IN_PROGRESS);
+		Assert(pgaio_wref_valid(&operation->io_wref));
+		operation->foreign_io = true;
 
 		return true;
 	}
diff --git a/src/test/modules/test_aio/t/001_aio.pl b/src/test/modules/test_aio/t/001_aio.pl
index 1f9e83690f4..0ac6554af61 100644
--- a/src/test/modules/test_aio/t/001_aio.pl
+++ b/src/test/modules/test_aio/t/001_aio.pl
@@ -79,6 +79,9 @@ sub psql_like
 	return $output;
 }
 
+# Issue query, wait for the specified wait event to be reached. If
+# wait_current_session is true, we will wait for the event in the current
+# session, otherwise we'll wait for any session.
 sub query_wait_block
 {
 	local $Test::Builder::Level = $Test::Builder::Level + 1;
@@ -88,16 +91,29 @@ sub query_wait_block
 	my $name = shift;
 	my $sql = shift;
 	my $waitfor = shift;
+	my $wait_current_session = shift;
 
 	my $pid = $psql->query_safe('SELECT pg_backend_pid()');
 
 	$psql->{stdin} .= qq($sql;\n);
 	$psql->{run}->pump_nb();
+	note "issued sql: $sql;\n";
 	ok(1, "$io_method: $name: issued sql");
 
-	$node->poll_query_until('postgres',
-		qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid),
-		$waitfor);
+	my $waitquery;
+	if ($wait_current_session)
+	{
+		$waitquery =
+		  qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid);
+	}
+	else
+	{
+		$waitquery =
+		  qq(SELECT wait_event FROM pg_stat_activity WHERE wait_event = '$waitfor');
+	}
+
+	note "polling for completion with $waitquery";
+	$node->poll_query_until('postgres', $waitquery, $waitfor);
 	ok(1, "$io_method: $name: observed $waitfor wait event");
 }
 
@@ -410,7 +426,8 @@ sub test_startwait_io
 		$psql_b,
 		"blocking start buffer io",
 		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true);),
-		"BufferIo");
+		"BufferIo",
+		1);
 
 	# Terminate the IO, without marking it as success, this should trigger the
 	# waiting session to be able to start the io
@@ -449,7 +466,8 @@ sub test_startwait_io
 		$psql_b,
 		"blocking start buffer io",
 		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true);),
-		"BufferIo");
+		"BufferIo",
+		1);
 
 	# Terminate the IO, marking it as success
 	psql_like(
@@ -1560,6 +1578,10 @@ INSERT INTO tmp_ok SELECT generate_series(1, 5000);
 
 		# Test encountering buffer IO we started in the first block of the
 		# range.
+		#
+		# Depending on how quick the IO we start completes, the IO might be
+		# completed or we "join" the foreign IO. To hide that variability, the
+		# query below treats a foreign IO as not having needed to do IO.
 		$psql_a->query_safe(qq|SELECT evict_rel('$table')|);
 		$psql_a->query_safe(
 			qq|SELECT read_rel_block_ll('$table', 1, wait_complete=>false)|);
@@ -1567,7 +1589,7 @@ INSERT INTO tmp_ok SELECT generate_series(1, 5000);
 			$io_method,
 			$psql_a,
 			"$persistency: read buffers, in-progress 1, read 1-3",
-			qq|SELECT blockoff, blocknum, io_reqd, nblocks FROM read_buffers('$table', 1, 3)|,
+			qq|SELECT blockoff, blocknum, io_reqd and not foreign_io, nblocks FROM read_buffers('$table', 1, 3)|,
 			qr/^0\|1\|f\|1\n1\|2\|t\|2$/,
 			qr/^$/);
 
@@ -1579,7 +1601,7 @@ INSERT INTO tmp_ok SELECT generate_series(1, 5000);
 			$io_method,
 			$psql_a,
 			"$persistency: read buffers, in-progress 2, read 1-3",
-			qq|SELECT blockoff, blocknum, io_reqd, nblocks FROM read_buffers('$table', 1, 3)|,
+			qq|SELECT blockoff, blocknum, io_reqd and not foreign_io, nblocks FROM read_buffers('$table', 1, 3)|,
 			qr/^0\|1\|t\|1\n1\|2\|f\|1\n2\|3\|t\|1$/,
 			qr/^$/);
 
@@ -1591,8 +1613,7 @@ INSERT INTO tmp_ok SELECT generate_series(1, 5000);
 			$io_method,
 			$psql_a,
 			"$persistency: read buffers, in-progress 3, read 1-3",
-			qq|SELECT blockoff, blocknum, io_reqd, nblocks FROM
-read_buffers('$table', 1, 3)|,
+			qq|SELECT blockoff, blocknum, io_reqd and not foreign_io, nblocks FROM read_buffers('$table', 1, 3)|,
 			qr/^0\|1\|t\|2\n2\|3\|f\|1$/,
 			qr/^$/);
 	}
@@ -1620,14 +1641,15 @@ read_buffers('$table', 1, 3)|,
 			$node,
 			$psql_a,
 			"$persistency: read buffers blocks waiting for concurrent IO",
-			qq|SELECT blockoff, blocknum, io_reqd, nblocks FROM read_buffers('$table', 1, 5);\n|,
-			"BufferIo");
+			qq|SELECT blockoff, blocknum, io_reqd, foreign_io, nblocks FROM read_buffers('$table', 1, 5);\n|,
+			"BufferIo",
+			1);
 		$psql_b->query_safe(
 			qq|SELECT buffer_call_terminate_io($buf_id, for_input=>true, succeed=>false, io_error=>false, release_aio=>false)|
 		);
-		pump_until(
-			$psql_a->{run}, $psql_a->{timeout},
-			\$psql_a->{stdout}, qr/0\|1\|t\|2\n2\|3\|t\|3/);
+		# Because no IO wref was assigned, block 2 should not report foreign IO
+		pump_until($psql_a->{run}, $psql_a->{timeout}, \$psql_a->{stdout},
+			qr/0\|1\|t\|f\|2\n2\|3\|t\|f\|3/);
 		ok(1,
 			"$io_method: $persistency: IO was split due to concurrent failed IO"
 		);
@@ -1645,13 +1667,15 @@ read_buffers('$table', 1, 3)|,
 			$node,
 			$psql_a,
 			"$persistency: read buffers blocks waiting for concurrent IO",
-			qq|SELECT blockoff, blocknum, io_reqd, nblocks FROM read_buffers('$table', 1, 5);\n|,
-			"BufferIo");
+			qq|SELECT blockoff, blocknum, io_reqd, foreign_io, nblocks FROM read_buffers('$table', 1, 5);\n|,
+			"BufferIo",
+			1);
 		$psql_b->query_safe(
 			qq|SELECT buffer_call_terminate_io($buf_id, for_input=>true, succeed=>true, io_error=>false, release_aio=>false)|
 		);
+		# Because no IO wref was assigned, block 2 should not report foreign IO
 		pump_until($psql_a->{run}, $psql_a->{timeout}, \$psql_a->{stdout},
-			qr/0\|1\|t\|2\n2\|3\|f\|1\n3\|4\|t\|2/);
+			qr/0\|1\|t\|f\|2\n2\|3\|f\|f\|1\n3\|4\|t\|f\|2/);
 		ok(1,
 			"$io_method: $persistency: IO was split due to concurrent successful IO"
 		);
@@ -1662,6 +1686,163 @@ read_buffers('$table', 1, 3)|,
 }
 
 
+# Tests for StartReadBuffers() that dependent on injection point support
+sub test_read_buffers_inject
+{
+	my $io_method = shift;
+	my $node = shift;
+
+	my $psql_a = $node->background_psql('postgres', on_error_stop => 0);
+	my $psql_b = $node->background_psql('postgres', on_error_stop => 0);
+	my $psql_c = $node->background_psql('postgres', on_error_stop => 0);
+
+	my $expected;
+
+	# We can't easily test waiting for foreign IOs on temporary tables, as the
+	# waiting in the completion hook will just stall the backend. For worker
+	# that is because temporary table IO is executed synchronously, for
+	# io_uring the completion will be executed in the same process, but due to
+	# temporary tables not being shared, we can't do the wait in another
+	# backend.
+	my $table = 'tbl_ok';
+	my $persistency = 'normal';
+
+
+	###
+	# Test if a read buffers encounters AIO in progress by another backend, it
+	# recognizes that other IO as a foreign IO.
+	###
+	$psql_a->query_safe(qq|SELECT evict_rel('$table')|);
+
+	# B: Trigger wait in the next AIO read for block 2.
+	$psql_b->query_safe(
+		qq/SELECT inj_io_completion_wait(pid=>pg_backend_pid(),
+		   relfilenode=>pg_relation_filenode('$table'),
+		   blockno=>2);/);
+	ok(1,
+		"$io_method: $persistency: configure wait in completion of block 2");
+
+	# B: Read block 2 and wait for the completion hook to be reached (which could
+	# be in B itself or in an IO worker)
+	query_wait_block(
+		$io_method,
+		$node,
+		$psql_b,
+		"$persistency: wait in completion of block 2",
+		qq|SELECT read_rel_block_ll('$table', blockno=>2, nblocks=>1)|,
+		'completion_wait',
+		0);
+
+	# A: Start read, wait until we're waiting for IO completion
+	query_wait_block(
+		$io_method,
+		$node,
+		$psql_a,
+		"$persistency: read 0-3, blocked on in-progress 2",
+		qq|SELECT blockoff, blocknum, io_reqd, foreign_io, nblocks FROM
+read_buffers('$table', 0, 4)|,
+		"AioIoCompletion", 1);
+
+	# C: Release B from completion hook
+	$psql_c->query_safe(qq|SELECT inj_io_completion_continue()|);
+	ok(1, "$io_method: $persistency: continued completion of block 2");
+
+	# A: Check that we recognized the foreign IO wait, if possible
+	#
+	# Due to sync mode not actually issuing IO below StartReadBuffers(), we
+	# can't observe encountering foreign IO. It still seems worth exercising these
+	# paths however.
+	if ($io_method ne 'sync')
+	{
+		# One IO covering blocks 0-1, A foreign IO covering block 2, and a
+		# third IO for the remainder.
+		$expected = qr/0\|0\|t\|f\|2\n2\|2\|t\|t\|1\n3\|3\|t\|f\|1/;
+	}
+	else
+	{
+		# One IO covering everything, as that's what StartReadBuffers() will
+		# return for something with misses in sync mode.
+		$expected = qr/0\|0\|t\|f\|4/;
+	}
+	pump_until($psql_a->{run}, $psql_a->{timeout}, \$psql_a->{stdout},
+		$expected);
+	ok(1,
+		"$io_method: $persistency: read 0-3, blocked on in-progress 2, see expected result"
+	);
+	$psql_a->{stdout} = '';
+
+
+	###
+	# Test if a read buffers encounters AIO in progress by another backend, it
+	# recognizes that other IO as a foreign IO. This time we encounter the
+	# foreign IO multiple times.
+	###
+	$psql_a->query_safe(qq|SELECT evict_rel('$table')|);
+
+	# B: Trigger wait in the next AIO read for block 3.
+	$psql_b->query_safe(
+		qq/SELECT inj_io_completion_wait(pid=>pg_backend_pid(),
+		   relfilenode=>pg_relation_filenode('$table'),
+		   blockno=>3);/);
+	ok(1,
+		"$io_method: $persistency: configure wait in completion of block 3");
+
+	# B: Read block 2 and wait for the completion hook to be reached (which could
+	# be in B itself or in an IO worker)
+	query_wait_block(
+		$io_method,
+		$node,
+		$psql_b,
+		"$persistency: wait in completion of block 2+3",
+		qq|SELECT read_rel_block_ll('$table', blockno=>2, nblocks=>2)|,
+		'completion_wait',
+		0);
+
+	# A: Start read, wait until we're waiting for IO completion
+	#
+	# Note that we need to defer waiting for IO until the end of
+	# read_buffers(), to be able to see that the IO on 3 is still in progress.
+	query_wait_block(
+		$io_method,
+		$node,
+		$psql_a,
+		"$persistency: read 0-3, blocked on in-progress 2+3",
+		qq|SELECT blockoff, blocknum, io_reqd, foreign_io, nblocks FROM
+read_buffers('$table', 0, 4)|,
+		"AioIoCompletion", 1);
+
+	# C: Release B from completion hook
+	$psql_c->query_safe(qq|SELECT inj_io_completion_continue()|);
+	ok(1, "$io_method: $persistency: continued completion of block 2+3");
+
+	# A: Check that we recognized the foreign IO wait, if possible
+	#
+	# See comment further up about sync mode.
+	if ($io_method ne 'sync')
+	{
+		# One IO covering blocks 0-1, A foreign IO covering block 2, and a
+		# foreign IO covering block 3 (same wref as for block 2).
+		$expected = qr/0\|0\|t\|f\|2\n2\|2\|t\|t\|1\n3\|3\|t\|t\|1/;
+	}
+	else
+	{
+		# One IO covering everything, as that's what StartReadBuffers() will
+		# return for something with misses in sync mode.
+		$expected = qr/0\|0\|t\|f\|4/;
+	}
+	pump_until($psql_a->{run}, $psql_a->{timeout}, \$psql_a->{stdout},
+		$expected);
+	ok(1,
+		"$io_method: $persistency: read 0-3, blocked on in-progress 2+3, see expected result"
+	);
+	$psql_a->{stdout} = '';
+
+
+	$psql_a->quit();
+	$psql_b->quit();
+	$psql_c->quit();
+}
+
 # Run all tests that for the specified node / io_method
 sub test_io_method
 {
@@ -1705,6 +1886,7 @@ CHECKPOINT;
 		skip 'Injection points not supported by this build', 1
 		  unless $ENV{enable_injection_points} eq 'yes';
 		test_inject($io_method, $node);
+		test_read_buffers_inject($io_method, $node);
 	}
 
 	# worker specific injection tests
diff --git a/src/test/modules/test_aio/test_aio--1.0.sql b/src/test/modules/test_aio/test_aio--1.0.sql
index 53c83b74e53..762ac29512f 100644
--- a/src/test/modules/test_aio/test_aio--1.0.sql
+++ b/src/test/modules/test_aio/test_aio--1.0.sql
@@ -53,7 +53,7 @@ CREATE FUNCTION buffer_call_terminate_io(buffer int, for_input bool, succeed boo
 RETURNS pg_catalog.void STRICT
 AS 'MODULE_PATHNAME' LANGUAGE C;
 
-CREATE FUNCTION read_buffers(rel regclass, startblock int4, nblocks int4, OUT blockoff int4, OUT blocknum int4, OUT io_reqd bool, OUT nblocks int4, OUT buf int4[])
+CREATE FUNCTION read_buffers(rel regclass, startblock int4, nblocks int4, OUT blockoff int4, OUT blocknum int4, OUT io_reqd bool, OUT foreign_io bool, OUT nblocks int4, OUT buf int4[])
 RETURNS SETOF record STRICT
 AS 'MODULE_PATHNAME' LANGUAGE C;
 
diff --git a/src/test/modules/test_aio/test_aio.c b/src/test/modules/test_aio/test_aio.c
index 3e13f38c902..521b090a4e5 100644
--- a/src/test/modules/test_aio/test_aio.c
+++ b/src/test/modules/test_aio/test_aio.c
@@ -781,8 +781,8 @@ read_buffers(PG_FUNCTION_ARGS)
 	{
 		ReadBuffersOperation *operation = &operations[nio];
 		int			nblocks_this_io = operation->nblocks;
-		Datum		values[5] = {0};
-		bool		nulls[5] = {0};
+		Datum		values[6] = {0};
+		bool		nulls[6] = {0};
 		ArrayType  *buffers_arr;
 
 		/* convert buffer array to datum array */
@@ -812,14 +812,18 @@ read_buffers(PG_FUNCTION_ARGS)
 		values[2] = BoolGetDatum(io_reqds[nio]);
 		nulls[2] = false;
 
-		/* nblocks */
-		values[3] = Int32GetDatum(nblocks_this_io);
+		/* foreign IO */
+		values[3] = BoolGetDatum(operation->foreign_io);
 		nulls[3] = false;
 
-		/* array of buffers */
-		values[4] = PointerGetDatum(buffers_arr);
+		/* nblocks */
+		values[4] = Int32GetDatum(nblocks_this_io);
 		nulls[4] = false;
 
+		/* array of buffers */
+		values[5] = PointerGetDatum(buffers_arr);
+		nulls[5] = false;
+
 		tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
 
 		nblocks_disp += nblocks_this_io;
-- 
2.53.0.1.gb2826b52eb

Reply via email to