Hi,

Attached is an updated set of patches.  I fixed the bug that Melanie noticed,
updated the comments a bit further and added a new commit that adds a decent
bit of coverage of StartReadBuffers(), including all the cases in
StartSharedBufferIO and most of the cases in StartLocalBufferIO().

I'm planning to commit 0001 soon - it hasn't changed in a while. Then I'd like
to get 0002 committed soon after, but I'll hold off for that until tomorrow,
given that nobody has looked at it (as it's new).  I think 0004-0007 can be
committed too, but I am not sure if you (Melanie) want to do so.

I'd like to get the rest committed tomorrow too.

Greetings,

Andres Freund
>From e2381eb540751eebb39a25797625a677dfa71387 Mon Sep 17 00:00:00 2001
From: Andres Freund <[email protected]>
Date: Wed, 25 Mar 2026 10:43:56 -0400
Subject: [PATCH v8 1/9] aio: Refactor tests in preparation for more tests

In a future commit more AIO related tests are due to be introduced. However
001_aio.pl already is fairly large.

This commit introduces a new TestAio package with helpers for writing AIO
related tests. Then it uses the new helpers to simplify the existing
001_aio.pl by iterating over all supported io_methods. This will be
particularly helpful because additional methods already have been submitted.

Additionally this commit splits out testing of initdb using a non-default
method into its own test. While that test is somewhat important, it's fairly
slow and doesn't break that often. For development velocity it's helpful for
001_aio.pl to be faster.

While particularly the latter could benefit from being its own commit, it
seems to introduce more back-and-forth than it's worth.

Author: Andres Freund <[email protected]>
Reviewed-by: Nazir Bilal Yavuz <[email protected]>
Reviewed-by: Melanie Plageman <[email protected]>
Discussion: https://postgr.es/m/zljergweqti7x67lg5ije2rzjusie37nslsnkjkkby4laqqbfw@3p3zu522yykv
---
 src/test/modules/test_aio/meson.build     |   1 +
 src/test/modules/test_aio/t/001_aio.pl    | 145 +++++++---------------
 src/test/modules/test_aio/t/003_initdb.pl |  71 +++++++++++
 src/test/modules/test_aio/t/TestAio.pm    |  90 ++++++++++++++
 4 files changed, 206 insertions(+), 101 deletions(-)
 create mode 100644 src/test/modules/test_aio/t/003_initdb.pl
 create mode 100644 src/test/modules/test_aio/t/TestAio.pm

diff --git a/src/test/modules/test_aio/meson.build b/src/test/modules/test_aio/meson.build
index fefa25bc5ab..18a797f3a3b 100644
--- a/src/test/modules/test_aio/meson.build
+++ b/src/test/modules/test_aio/meson.build
@@ -32,6 +32,7 @@ tests += {
     'tests': [
       't/001_aio.pl',
       't/002_io_workers.pl',
+      't/003_initdb.pl',
     ],
   },
 }
diff --git a/src/test/modules/test_aio/t/001_aio.pl b/src/test/modules/test_aio/t/001_aio.pl
index 5c634ec3ca9..e18b2a2b8ae 100644
--- a/src/test/modules/test_aio/t/001_aio.pl
+++ b/src/test/modules/test_aio/t/001_aio.pl
@@ -7,53 +7,48 @@ use PostgreSQL::Test::Cluster;
 use PostgreSQL::Test::Utils;
 use Test::More;
 
+use FindBin;
+use lib $FindBin::RealBin;
 
-###
-# Test io_method=worker
-###
-my $node_worker = create_node('worker');
-$node_worker->start();
-
-test_generic('worker', $node_worker);
-SKIP:
-{
-	skip 'Injection points not supported by this build', 1
-	  unless $ENV{enable_injection_points} eq 'yes';
-	test_inject_worker('worker', $node_worker);
-}
+use TestAio;
 
-$node_worker->stop();
+my @methods = TestAio::supported_io_methods();
+my %nodes;
 
 
 ###
-# Test io_method=io_uring
+# Create and configure one instance for each io_method
 ###
 
-if (have_io_uring())
+foreach my $method (@methods)
 {
-	my $node_uring = create_node('io_uring');
-	$node_uring->start();
-	test_generic('io_uring', $node_uring);
-	$node_uring->stop();
-}
+	my $node = PostgreSQL::Test::Cluster->new($method);
 
+	$nodes{$method} = $node;
+	$node->init();
+	$node->append_conf('postgresql.conf', "io_method=$method");
+	TestAio::configure($node);
+}
 
-###
-# Test io_method=sync
-###
-
-my $node_sync = create_node('sync');
-
-# just to have one test not use the default auto-tuning
-
-$node_sync->append_conf(
+# Just to have one test not use the default auto-tuning
+$nodes{'sync'}->append_conf(
 	'postgresql.conf', qq(
-io_max_concurrency=4
+ io_max_concurrency=4
 ));
 
-$node_sync->start();
-test_generic('sync', $node_sync);
-$node_sync->stop();
+
+###
+# Execute the tests for each io_method
+###
+
+foreach my $method (@methods)
+{
+	my $node = $nodes{$method};
+
+	$node->start();
+	test_io_method($method, $node);
+	$node->stop();
+}
 
 done_testing();
 
@@ -62,71 +57,6 @@ done_testing();
 # Test Helpers
 ###
 
-sub create_node
-{
-	local $Test::Builder::Level = $Test::Builder::Level + 1;
-
-	my $io_method = shift;
-
-	my $node = PostgreSQL::Test::Cluster->new($io_method);
-
-	# Want to test initdb for each IO method, otherwise we could just reuse
-	# the cluster.
-	#
-	# Unfortunately Cluster::init() puts PG_TEST_INITDB_EXTRA_OPTS after the
-	# options specified by ->extra, if somebody puts -c io_method=xyz in
-	# PG_TEST_INITDB_EXTRA_OPTS it would break this test. Fix that up if we
-	# detect it.
-	local $ENV{PG_TEST_INITDB_EXTRA_OPTS} = $ENV{PG_TEST_INITDB_EXTRA_OPTS};
-	if (defined $ENV{PG_TEST_INITDB_EXTRA_OPTS}
-		&& $ENV{PG_TEST_INITDB_EXTRA_OPTS} =~ m/io_method=/)
-	{
-		$ENV{PG_TEST_INITDB_EXTRA_OPTS} .= " -c io_method=$io_method";
-	}
-
-	$node->init(extra => [ '-c', "io_method=$io_method" ]);
-
-	$node->append_conf(
-		'postgresql.conf', qq(
-shared_preload_libraries=test_aio
-log_min_messages = 'DEBUG3'
-log_statement=all
-log_error_verbosity=default
-restart_after_crash=false
-temp_buffers=100
-));
-
-	# Even though we used -c io_method=... above, if TEMP_CONFIG sets
-	# io_method, it'd override the setting persisted at initdb time. While
-	# using (and later verifying) the setting from initdb provides some
-	# verification of having used the io_method during initdb, it's probably
-	# not worth the complication of only appending if the variable is set in
-	# in TEMP_CONFIG.
-	$node->append_conf(
-		'postgresql.conf', qq(
-io_method=$io_method
-));
-
-	ok(1, "$io_method: initdb");
-
-	return $node;
-}
-
-sub have_io_uring
-{
-	# To detect if io_uring is supported, we look at the error message for
-	# assigning an invalid value to an enum GUC, which lists all the valid
-	# options. We need to use -C to deal with running as administrator on
-	# windows, the superuser check is omitted if -C is used.
-	my ($stdout, $stderr) =
-	  run_command [qw(postgres -C invalid -c io_method=invalid)];
-	die "can't determine supported io_method values"
-	  unless $stderr =~ m/Available values: ([^\.]+)\./;
-	my $methods = $1;
-	note "supported io_method values are: $methods";
-
-	return ($methods =~ m/io_uring/) ? 1 : 0;
-}
 
 sub psql_like
 {
@@ -1490,8 +1420,8 @@ SELECT read_rel_block_ll('tbl_cs_fail', 3, nblocks=>1, zero_on_error=>true);),
 }
 
 
-# Run all tests that are supported for all io_methods
-sub test_generic
+# Run all tests that for the specified node / io_method
+sub test_io_method
 {
 	my $io_method = shift;
 	my $node = shift;
@@ -1526,10 +1456,23 @@ CHECKPOINT;
 	test_ignore_checksum($io_method, $node);
 	test_checksum_createdb($io_method, $node);
 
+	# generic injection tests
   SKIP:
 	{
 		skip 'Injection points not supported by this build', 1
 		  unless $ENV{enable_injection_points} eq 'yes';
 		test_inject($io_method, $node);
 	}
+
+	# worker specific injection tests
+	if ($io_method eq 'worker')
+	{
+	  SKIP:
+		{
+			skip 'Injection points not supported by this build', 1
+			  unless $ENV{enable_injection_points} eq 'yes';
+
+			test_inject_worker($io_method, $node);
+		}
+	}
 }
diff --git a/src/test/modules/test_aio/t/003_initdb.pl b/src/test/modules/test_aio/t/003_initdb.pl
new file mode 100644
index 00000000000..c03ae58d00a
--- /dev/null
+++ b/src/test/modules/test_aio/t/003_initdb.pl
@@ -0,0 +1,71 @@
+# Copyright (c) 2024-2025, PostgreSQL Global Development Group
+#
+# Test initdb for each IO method. This is done separately from 001_aio.pl, as
+# it isn't fast. This way the more commonly failing / hacked-on 001_aio.pl can
+# be iterated on more quickly.
+
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+use FindBin;
+use lib $FindBin::RealBin;
+
+use TestAio;
+
+
+foreach my $method (TestAio::supported_io_methods())
+{
+	test_create_node($method);
+}
+
+done_testing();
+
+
+sub test_create_node
+{
+	local $Test::Builder::Level = $Test::Builder::Level + 1;
+
+	my $io_method = shift;
+
+	my $node = PostgreSQL::Test::Cluster->new($io_method);
+
+	# Want to test initdb for each IO method, otherwise we could just reuse
+	# the cluster.
+	#
+	# Unfortunately Cluster::init() puts PG_TEST_INITDB_EXTRA_OPTS after the
+	# options specified by ->extra, if somebody puts -c io_method=xyz in
+	# PG_TEST_INITDB_EXTRA_OPTS it would break this test. Fix that up if we
+	# detect it.
+	local $ENV{PG_TEST_INITDB_EXTRA_OPTS} = $ENV{PG_TEST_INITDB_EXTRA_OPTS};
+	if (defined $ENV{PG_TEST_INITDB_EXTRA_OPTS}
+		&& $ENV{PG_TEST_INITDB_EXTRA_OPTS} =~ m/io_method=/)
+	{
+		$ENV{PG_TEST_INITDB_EXTRA_OPTS} .= " -c io_method=$io_method";
+	}
+
+	$node->init(extra => [ '-c', "io_method=$io_method" ]);
+
+	TestAio::configure($node);
+
+	# Even though we used -c io_method=... above, if TEMP_CONFIG sets
+	# io_method, it'd override the setting persisted at initdb time. While
+	# using (and later verifying) the setting from initdb provides some
+	# verification of having used the io_method during initdb, it's probably
+	# not worth the complication of only appending if the variable is set in
+	# in TEMP_CONFIG.
+	$node->append_conf(
+		'postgresql.conf', qq(
+io_method=$io_method
+));
+
+	ok(1, "$io_method: initdb");
+
+	$node->start();
+	$node->stop();
+	ok(1, "$io_method: start & stop");
+
+	return $node;
+}
diff --git a/src/test/modules/test_aio/t/TestAio.pm b/src/test/modules/test_aio/t/TestAio.pm
new file mode 100644
index 00000000000..5bc80a9b130
--- /dev/null
+++ b/src/test/modules/test_aio/t/TestAio.pm
@@ -0,0 +1,90 @@
+# Copyright (c) 2024-2025, PostgreSQL Global Development Group
+
+=pod
+
+=head1 NAME
+
+TestAio - helpers for writing AIO related tests
+
+=cut
+
+package TestAio;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+
+=pod
+
+=head1 METHODS
+
+=over
+
+=item TestAio::supported_io_methods()
+
+Return an array of all the supported values for the io_method GUC
+
+=cut
+
+sub supported_io_methods()
+{
+	my @io_methods = ('worker');
+
+	push(@io_methods, "io_uring") if have_io_uring();
+
+	# Return sync last, as it will least commonly fail
+	push(@io_methods, "sync");
+
+	return @io_methods;
+}
+
+
+=item TestAio::configure()
+
+Prepare a cluster for AIO test
+
+=cut
+
+sub configure
+{
+	my $node = shift;
+
+	$node->append_conf(
+		'postgresql.conf', qq(
+shared_preload_libraries=test_aio
+log_min_messages = 'DEBUG3'
+log_statement=all
+log_error_verbosity=default
+restart_after_crash=false
+temp_buffers=100
+));
+
+}
+
+
+=pod
+
+=item TestAio::have_io_uring()
+
+Return if io_uring is supported
+
+=cut
+
+sub have_io_uring
+{
+	# To detect if io_uring is supported, we look at the error message for
+	# assigning an invalid value to an enum GUC, which lists all the valid
+	# options. We need to use -C to deal with running as administrator on
+	# windows, the superuser check is omitted if -C is used.
+	my ($stdout, $stderr) =
+	  run_command [qw(postgres -C invalid -c io_method=invalid)];
+	die "can't determine supported io_method values"
+	  unless $stderr =~ m/Available values: ([^\.]+)\./;
+	my $methods = $1;
+	note "supported io_method values are: $methods";
+
+	return ($methods =~ m/io_uring/) ? 1 : 0;
+}
+
+1;
-- 
2.53.0.1.gb2826b52eb

>From 74cff50281b53098af676c38e65b1c8fce2ef0bd Mon Sep 17 00:00:00 2001
From: Andres Freund <[email protected]>
Date: Wed, 25 Mar 2026 14:42:39 -0400
Subject: [PATCH v8 2/9] test_aio: Add basic tests for StartReadBuffers()

Upcoming commits will change StartReadBuffers() and its building blocks,
making it worthwhile to have tests explicitly of StartReadBuffers().

Discussion: https://postgr.es/m/zljergweqti7x67lg5ije2rzjusie37nslsnkjkkby4laqqbfw@3p3zu522yykv
---
 src/test/modules/test_aio/t/001_aio.pl      | 239 ++++++++++++++++++++
 src/test/modules/test_aio/test_aio--1.0.sql |   7 +
 src/test/modules/test_aio/test_aio.c        | 177 ++++++++++++++-
 3 files changed, 415 insertions(+), 8 deletions(-)

diff --git a/src/test/modules/test_aio/t/001_aio.pl b/src/test/modules/test_aio/t/001_aio.pl
index e18b2a2b8ae..bde92eeb7d4 100644
--- a/src/test/modules/test_aio/t/001_aio.pl
+++ b/src/test/modules/test_aio/t/001_aio.pl
@@ -1420,6 +1420,244 @@ SELECT read_rel_block_ll('tbl_cs_fail', 3, nblocks=>1, zero_on_error=>true);),
 }
 
 
+# Tests for StartReadBuffers()
+sub test_read_buffers
+{
+	my $io_method = shift;
+	my $node = shift;
+	my ($ret, $output);
+	my $table;
+	my $persistency;
+
+	my $psql_a = $node->background_psql('postgres', on_error_stop => 0);
+	my $psql_b = $node->background_psql('postgres', on_error_stop => 0);
+
+	$psql_a->query_safe(
+		qq(
+CREATE TEMPORARY TABLE tmp_ok(data int not null);
+INSERT INTO tmp_ok SELECT generate_series(1, 5000);
+));
+
+	foreach $persistency (qw(normal temporary))
+	{
+		$table = $persistency eq 'normal' ? 'tbl_ok' : 'tmp_ok';
+
+		# check that one larger read is done as multiple reads
+		$psql_a->query_safe(qq|SELECT evict_rel('$table')|);
+		psql_like(
+			$io_method,
+			$psql_a,
+			"$persistency: read buffers, combine, block 0-1",
+			qq|SELECT blockoff, blocknum, needs_io, nblocks FROM read_buffers('$table', 0, 2)|,
+			qr/^0\|0\|t\|2$/,
+			qr/^$/);
+
+		# but if we do it again, i.e. it's in s_b, there will be two
+		# operations
+		psql_like(
+			$io_method,
+			$psql_a,
+			"$persistency: read buffers, doesn't combine hits, block 0-1",
+			qq|SELECT blockoff, blocknum, needs_io, nblocks FROM read_buffers('$table', 0, 2)|,
+			qr/^0\|0\|f\|1\n1\|1\|f\|1$/,
+			qr/^$/);
+
+		# Check that a larger read interrupted by a hit works
+		psql_like(
+			$io_method,
+			$psql_a,
+			"$persistency: read buffers, prep, block 3",
+			qq|SELECT blockoff, blocknum, needs_io, nblocks FROM read_buffers('$table', 3, 1)|,
+			qr/^0\|3\|t\|1$/,
+			qr/^$/);
+		psql_like(
+			$io_method,
+			$psql_a,
+			"$persistency: read buffers, interrupted by hit on 3, block 2-5",
+			qq|SELECT blockoff, blocknum, needs_io, nblocks FROM read_buffers('$table', 2, 4)|,
+			qr/^0\|2\|t\|1\n1\|3\|f\|1\n2\|4\|t\|2$/,
+			qr/^$/);
+
+
+		# Verify that a read with an initial buffer hit works
+		$psql_a->query_safe(qq|SELECT evict_rel('$table')|);
+		psql_like(
+			$io_method,
+			$psql_a,
+			"$persistency: read buffers, miss, block 0",
+			qq|SELECT blockoff, blocknum, needs_io, nblocks FROM read_buffers('$table', 0, 1)|,
+			qr/^0\|0\|t\|1$/,
+			qr/^$/);
+		psql_like(
+			$io_method,
+			$psql_a,
+			"$persistency: read buffers, hit, block 0",
+			qq|SELECT blockoff, blocknum, needs_io, nblocks FROM read_buffers('$table', 0, 1)|,
+			qr/^0\|0\|f\|1$/,
+			qr/^$/);
+		psql_like(
+			$io_method,
+			$psql_a,
+			"$persistency: read buffers, miss, block 1",
+			qq|SELECT blockoff, blocknum, needs_io, nblocks FROM read_buffers('$table', 1, 1)|,
+			qr/^0\|1\|t\|1$/,
+			qr/^$/);
+		psql_like(
+			$io_method,
+			$psql_a,
+			"$persistency: read buffers, hit, block 1",
+			qq|SELECT blockoff, blocknum, needs_io, nblocks FROM read_buffers('$table', 1, 1)|,
+			qr/^0\|1\|f\|1$/,
+			qr/^$/);
+		psql_like(
+			$io_method,
+			$psql_a,
+			"$persistency: read buffers, hit, block 0-1",
+			qq|SELECT blockoff, blocknum, needs_io, nblocks FROM read_buffers('$table', 0, 2)|,
+			qr/^0\|0\|f\|1\n1\|1\|f\|1$/,
+			qr/^$/);
+		psql_like(
+			$io_method,
+			$psql_a,
+			"$persistency: read buffers, hit 0-1, miss 2",
+			qq|SELECT blockoff, blocknum, needs_io, nblocks FROM read_buffers('$table', 0, 3)|,
+			qr/^0\|0\|f\|1\n1\|1\|f\|1\n2\|2\|t\|1$/,
+			qr/^$/);
+
+		# Verify that a read with a initial miss and trailing buffer hit(s) works
+		$psql_a->query_safe(qq|SELECT invalidate_rel_block('$table', 0)|);
+		psql_like(
+			$io_method,
+			$psql_a,
+			"$persistency: read buffers, miss 0, hit 1-2",
+			qq|SELECT blockoff, blocknum, needs_io, nblocks FROM read_buffers('$table', 0, 3)|,
+			qr/^0\|0\|t\|1\n1\|1\|f\|1\n2\|2\|f\|1$/,
+			qr/^$/);
+		$psql_a->query_safe(qq|SELECT invalidate_rel_block('$table', 1)|);
+		$psql_a->query_safe(qq|SELECT invalidate_rel_block('$table', 2)|);
+		$psql_a->query_safe(qq|SELECT * FROM read_buffers('$table', 3, 2)|);
+		psql_like(
+			$io_method,
+			$psql_a,
+			"$persistency: read buffers, miss 1-2, hit 3-4",
+			qq|SELECT blockoff, blocknum, needs_io, nblocks FROM read_buffers('$table', 1, 4)|,
+			qr/^0\|1\|t\|2\n2\|3\|f\|1\n3\|4\|f\|1$/,
+			qr/^$/);
+
+		# Verify that we aren't doing reads larger than io_combine_limit
+		$psql_a->query_safe(qq|SELECT evict_rel('$table')|);
+		$psql_a->query_safe(qq|SET io_combine_limit=3|);
+		psql_like(
+			$io_method,
+			$psql_a,
+			"$persistency: read buffers, io_combine_limit has effect",
+			qq|SELECT blockoff, blocknum, needs_io, nblocks FROM read_buffers('$table', 1, 5)|,
+			qr/^0\|1\|t\|3\n3\|4\|t\|2$/,
+			qr/^$/);
+		$psql_a->query_safe(qq|RESET io_combine_limit|);
+
+
+		$psql_a->query_safe(qq|SELECT invalidate_rel_block('$table', 1)|);
+		$psql_a->query_safe(qq|SELECT invalidate_rel_block('$table', 2)|);
+		$psql_a->query_safe(qq|SELECT * FROM read_buffers('$table', 3, 2)|);
+		psql_like(
+			$io_method,
+			$psql_a,
+			"$persistency: read buffers, miss 1-2, hit 3-4",
+			qq|SELECT blockoff, blocknum, needs_io, nblocks FROM read_buffers('$table', 1, 4)|,
+			qr/^0\|1\|t\|2\n2\|3\|f\|1\n3\|4\|f\|1$/,
+			qr/^$/);
+
+		# Test encountering buffer IO we started, have to accept that IO may
+		# or may not have completed though.
+		$psql_a->query_safe(qq|SELECT evict_rel('$table')|);
+		$psql_a->query_safe(
+			qq|SELECT read_rel_block_ll('$table', 1, wait_complete=>false)|);
+		psql_like(
+			$io_method,
+			$psql_a,
+			"$persistency: read buffers, in-progress 1, read 1-3",
+			qq|SELECT blockoff, blocknum, needs_io, nblocks FROM read_buffers('$table', 1, 3)|,
+			qr/^0\|1\|(t|f)\|1\n1\|2\|t\|2$/,
+			qr/^$/);
+
+		$psql_a->query_safe(qq|SELECT evict_rel('$table')|);
+		$psql_a->query_safe(
+			qq|SELECT read_rel_block_ll('$table', 2, wait_complete=>false)|);
+		psql_like(
+			$io_method,
+			$psql_a,
+			"$persistency: read buffers, in-progress 2, read 1-3",
+			qq|SELECT blockoff, blocknum, needs_io, nblocks FROM read_buffers('$table', 1, 3)|,
+			qr/^0\|1\|t\|1\n1\|2\|f\|1\n2\|3\|t\|1$/,
+			qr/^$/);
+	}
+
+	# The remaining tests don't make sense for temp tables, as they are
+	# concerned with multiple sessions interacting with each other.
+	$table = 'tbl_ok';
+	$persistency = 'normal';
+
+	# Test start buffer IO will split IO if there's IO in progress. We can't
+	# observe this with sync, as that does not start the IO operation in
+	# StartReadBuffers().
+	if ($io_method ne 'sync')
+	{
+		$psql_a->query_safe(qq|SELECT evict_rel('$table')|);
+
+		my $buf_id =
+		  $psql_b->query_safe(qq|SELECT buffer_create_toy('tbl_ok', 3)|);
+		$psql_b->query_safe(
+			qq|SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false)|
+		);
+
+		query_wait_block(
+			$io_method,
+			$node,
+			$psql_a,
+			"$persistency: read buffers blocks waiting for concurrent IO",
+			qq|SELECT blockoff, blocknum, needs_io, nblocks FROM read_buffers('$table', 1, 5);\n|,
+			"BufferIo");
+		$psql_b->query_safe(
+			qq|SELECT buffer_call_terminate_io($buf_id, for_input=>true, succeed=>false, io_error=>false, release_aio=>false)|
+		);
+		pump_until(
+			$psql_a->{run}, $psql_a->{timeout},
+			\$psql_a->{stdout}, qr/0\|1\|t\|2\n2\|3\|t\|3/);
+		ok(1,
+			"$io_method: $persistency: IO was split due to concurrent failed IO"
+		);
+
+		# Same as before, except the concurrent IO succeeds this time
+		$psql_a->query_safe(qq|SELECT evict_rel('$table')|);
+		$buf_id =
+		  $psql_b->query_safe(qq|SELECT buffer_create_toy('tbl_ok', 3)|);
+		$psql_b->query_safe(
+			qq|SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false)|
+		);
+
+		query_wait_block(
+			$io_method,
+			$node,
+			$psql_a,
+			"$persistency: read buffers blocks waiting for concurrent IO",
+			qq|SELECT blockoff, blocknum, needs_io, nblocks FROM read_buffers('$table', 1, 5);\n|,
+			"BufferIo");
+		$psql_b->query_safe(
+			qq|SELECT buffer_call_terminate_io($buf_id, for_input=>true, succeed=>true, io_error=>false, release_aio=>false)|
+		);
+		pump_until($psql_a->{run}, $psql_a->{timeout}, \$psql_a->{stdout},
+			qr/0\|1\|t\|2\n2\|3\|f\|1\n3\|4\|t\|2/);
+		ok(1,
+			"$io_method: $persistency: IO was split due to concurrent successful IO"
+		);
+	}
+
+	$psql_a->quit();
+	$psql_b->quit();
+}
+
+
 # Run all tests that for the specified node / io_method
 sub test_io_method
 {
@@ -1455,6 +1693,7 @@ CHECKPOINT;
 	test_checksum($io_method, $node);
 	test_ignore_checksum($io_method, $node);
 	test_checksum_createdb($io_method, $node);
+	test_read_buffers($io_method, $node);
 
 	# generic injection tests
   SKIP:
diff --git a/src/test/modules/test_aio/test_aio--1.0.sql b/src/test/modules/test_aio/test_aio--1.0.sql
index e495481c41e..c0ac47339a9 100644
--- a/src/test/modules/test_aio/test_aio--1.0.sql
+++ b/src/test/modules/test_aio/test_aio--1.0.sql
@@ -33,6 +33,10 @@ CREATE FUNCTION read_rel_block_ll(
 RETURNS pg_catalog.void STRICT
 AS 'MODULE_PATHNAME' LANGUAGE C;
 
+CREATE FUNCTION evict_rel(rel regclass)
+RETURNS pg_catalog.void STRICT
+AS 'MODULE_PATHNAME' LANGUAGE C;
+
 CREATE FUNCTION invalidate_rel_block(rel regclass, blockno int)
 RETURNS pg_catalog.void STRICT
 AS 'MODULE_PATHNAME' LANGUAGE C;
@@ -49,6 +53,9 @@ CREATE FUNCTION buffer_call_terminate_io(buffer int, for_input bool, succeed boo
 RETURNS pg_catalog.void STRICT
 AS 'MODULE_PATHNAME' LANGUAGE C;
 
+CREATE FUNCTION read_buffers(rel regclass, startblock int4, nblocks int4, OUT blockoff int4, OUT blocknum int4, OUT needs_io bool, OUT nblocks int4, OUT buf int4[])
+RETURNS SETOF record STRICT
+AS 'MODULE_PATHNAME' LANGUAGE C;
 
 
 /*
diff --git a/src/test/modules/test_aio/test_aio.c b/src/test/modules/test_aio/test_aio.c
index b1aa8af9ec0..43f32039960 100644
--- a/src/test/modules/test_aio/test_aio.c
+++ b/src/test/modules/test_aio/test_aio.c
@@ -19,7 +19,9 @@
 #include "postgres.h"
 
 #include "access/relation.h"
+#include "catalog/pg_type.h"
 #include "fmgr.h"
+#include "funcapi.h"
 #include "storage/aio.h"
 #include "storage/aio_internal.h"
 #include "storage/buf_internals.h"
@@ -27,9 +29,11 @@
 #include "storage/checksum.h"
 #include "storage/ipc.h"
 #include "storage/lwlock.h"
+#include "utils/array.h"
 #include "utils/builtins.h"
 #include "utils/injection_point.h"
 #include "utils/rel.h"
+#include "utils/tuplestore.h"
 
 
 PG_MODULE_MAGIC;
@@ -458,18 +462,13 @@ read_rel_block_ll(PG_FUNCTION_ARGS)
 	PG_RETURN_VOID();
 }
 
-PG_FUNCTION_INFO_V1(invalidate_rel_block);
-Datum
-invalidate_rel_block(PG_FUNCTION_ARGS)
+/* helper for invalidate_rel_block() and evict_rel() */
+static void
+invalidate_one_block(Relation rel, ForkNumber forknum, BlockNumber blkno)
 {
-	Oid			relid = PG_GETARG_OID(0);
-	BlockNumber blkno = PG_GETARG_UINT32(1);
-	Relation	rel;
 	PrefetchBufferResult pr;
 	Buffer		buf;
 
-	rel = relation_open(relid, AccessExclusiveLock);
-
 	/*
 	 * This is a gross hack, but there's no other API exposed that allows to
 	 * get a buffer ID without actually reading the block in.
@@ -506,11 +505,74 @@ invalidate_rel_block(PG_FUNCTION_ARGS)
 		}
 	}
 
+}
+
+PG_FUNCTION_INFO_V1(invalidate_rel_block);
+Datum
+invalidate_rel_block(PG_FUNCTION_ARGS)
+{
+	Oid			relid = PG_GETARG_OID(0);
+	BlockNumber blkno = PG_GETARG_UINT32(1);
+	Relation	rel;
+
+	rel = relation_open(relid, AccessExclusiveLock);
+
+	invalidate_one_block(rel, MAIN_FORKNUM, blkno);
+
 	relation_close(rel, AccessExclusiveLock);
 
 	PG_RETURN_VOID();
 }
 
+PG_FUNCTION_INFO_V1(evict_rel);
+Datum
+evict_rel(PG_FUNCTION_ARGS)
+{
+	Oid			relid = PG_GETARG_OID(0);
+	Relation	rel;
+
+	rel = relation_open(relid, AccessExclusiveLock);
+
+	/*
+	 * EvictRelUnpinnedBuffers() doesn't support temp tables, so for temp
+	 * tables we have to do it the expensive way and evict every possible
+	 * buffer.
+	 */
+	if (RelationUsesLocalBuffers(rel))
+	{
+		SMgrRelation smgr = RelationGetSmgr(rel);
+
+		for (int forknum = MAIN_FORKNUM; forknum <= MAX_FORKNUM; forknum++)
+		{
+			BlockNumber nblocks;
+
+			if (!smgrexists(smgr, forknum))
+				continue;
+
+			nblocks = smgrnblocks(smgr, forknum);
+
+			for (int blkno = 0; blkno < nblocks; blkno++)
+			{
+				invalidate_one_block(rel, forknum, blkno);
+			}
+		}
+	}
+	else
+	{
+		int32		buffers_evicted,
+					buffers_flushed,
+					buffers_skipped;
+
+		EvictRelUnpinnedBuffers(rel, &buffers_evicted, &buffers_flushed,
+								&buffers_skipped);
+	}
+
+	relation_close(rel, AccessExclusiveLock);
+
+
+	PG_RETURN_VOID();
+}
+
 PG_FUNCTION_INFO_V1(buffer_create_toy);
 Datum
 buffer_create_toy(PG_FUNCTION_ARGS)
@@ -610,6 +672,105 @@ buffer_call_terminate_io(PG_FUNCTION_ARGS)
 	PG_RETURN_VOID();
 }
 
+PG_FUNCTION_INFO_V1(read_buffers);
+/*
+ * Infrastructure to test StartReadBuffers()
+ */
+Datum
+read_buffers(PG_FUNCTION_ARGS)
+{
+	Oid			relid = PG_GETARG_OID(0);
+	BlockNumber startblock = PG_GETARG_UINT32(1);
+	int32		nblocks = PG_GETARG_INT32(2);
+	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+	Relation	rel;
+	SMgrRelation smgr;
+	uint16		nblocks_done = 0;
+	uint16		nios = 0;
+	ReadBuffersOperation *operations;
+	Buffer	   *buffers;
+	Datum	   *buffers_datum;
+
+	Assert(nblocks > 0);
+
+	InitMaterializedSRF(fcinfo, 0);
+
+	/* at worst the each block gets its own IO */
+	operations = palloc0(sizeof(ReadBuffersOperation) * nblocks);
+	buffers = palloc0(sizeof(Buffer) * nblocks);
+	buffers_datum = palloc0(sizeof(Datum) * nblocks);
+
+	rel = relation_open(relid, AccessShareLock);
+	smgr = RelationGetSmgr(rel);
+
+	while (nblocks_done < nblocks)
+	{
+		ReadBuffersOperation *operation = &operations[nios];
+		int			nblocks_this_io =
+			Min(nblocks - nblocks_done, io_combine_limit);
+		Datum		values[5] = {0};
+		bool		nulls[5] = {0};
+		bool		needed_io;
+		ArrayType  *buffers_arr;
+
+		operation->rel = rel;
+		operation->smgr = smgr;
+		operation->persistence = rel->rd_rel->relpersistence;
+		operation->strategy = NULL;
+
+		needed_io = StartReadBuffers(operation,
+									 &buffers[nblocks_done],
+									 startblock + nblocks_done,
+									 &nblocks_this_io,
+									 0);
+		if (needed_io)
+			WaitReadBuffers(operation);
+
+		/* convert buffer array to datum array */
+		for (int i = 0; i < nblocks_this_io; i++)
+			buffers_datum[nblocks_done + i] = Int32GetDatum(buffers[nblocks_done + i]);
+		buffers_arr = construct_array_builtin(&buffers_datum[nblocks_done],
+											  nblocks_this_io,
+											  INT4OID);
+
+		values[0] = Int32GetDatum(nblocks_done);
+		nulls[0] = false;
+
+		values[1] = UInt32GetDatum(startblock + nblocks_done);
+		nulls[1] = false;
+
+		values[2] = BoolGetDatum(needed_io);
+		nulls[2] = false;
+
+		values[3] = Int32GetDatum((int16) nblocks_this_io);
+		nulls[3] = false;
+
+		values[4] = PointerGetDatum(buffers_arr);
+		nulls[4] = false;
+
+		tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
+
+		for (int i = 0; i < nblocks_this_io; i++)
+			ReleaseBuffer(buffers[nblocks_done + i]);
+
+		nios++;
+		nblocks_done += nblocks_this_io;
+	}
+
+	/*
+	 * Free explicitly, to have a chance to detect potential issues with too
+	 * long lived references to the operation.
+	 */
+	pfree(operations);
+	pfree(buffers);
+	pfree(buffers_datum);
+
+	relation_close(rel, NoLock);
+
+	return (Datum) 0;
+}
+
+
 PG_FUNCTION_INFO_V1(handle_get);
 Datum
 handle_get(PG_FUNCTION_ARGS)
-- 
2.53.0.1.gb2826b52eb

>From 38908352a1ecd3b94c47cb900a59531b02716a00 Mon Sep 17 00:00:00 2001
From: Andres Freund <[email protected]>
Date: Wed, 25 Mar 2026 14:46:38 -0400
Subject: [PATCH v8 3/9] test_aio: Add read_stream test infrastructure & tests

Author: Andres Freund <[email protected]>
Reviewed-by: Nazir Bilal Yavuz <[email protected]>
Reviewed-by: Melanie Plageman <[email protected]>
Discussion: https://postgr.es/m/zljergweqti7x67lg5ije2rzjusie37nslsnkjkkby4laqqbfw@3p3zu522yykv
---
 src/test/modules/test_aio/meson.build         |   1 +
 .../modules/test_aio/t/004_read_stream.pl     | 275 +++++++++++++++
 src/test/modules/test_aio/test_aio--1.0.sql   |  19 +-
 src/test/modules/test_aio/test_aio.c          | 318 +++++++++++++++---
 src/tools/pgindent/typedefs.list              |   1 +
 5 files changed, 564 insertions(+), 50 deletions(-)
 create mode 100644 src/test/modules/test_aio/t/004_read_stream.pl

diff --git a/src/test/modules/test_aio/meson.build b/src/test/modules/test_aio/meson.build
index 18a797f3a3b..909f81d96c1 100644
--- a/src/test/modules/test_aio/meson.build
+++ b/src/test/modules/test_aio/meson.build
@@ -33,6 +33,7 @@ tests += {
       't/001_aio.pl',
       't/002_io_workers.pl',
       't/003_initdb.pl',
+      't/004_read_stream.pl',
     ],
   },
 }
diff --git a/src/test/modules/test_aio/t/004_read_stream.pl b/src/test/modules/test_aio/t/004_read_stream.pl
new file mode 100644
index 00000000000..f682fa0152c
--- /dev/null
+++ b/src/test/modules/test_aio/t/004_read_stream.pl
@@ -0,0 +1,275 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+use strict;
+use warnings FATAL => 'all';
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+use FindBin;
+use lib $FindBin::RealBin;
+
+use TestAio;
+
+
+my $node = PostgreSQL::Test::Cluster->new('test');
+$node->init();
+
+TestAio::configure($node);
+
+$node->append_conf(
+	'postgresql.conf', qq(
+max_connections=8
+io_method=worker
+));
+
+$node->start();
+test_setup($node);
+$node->stop();
+
+
+foreach my $method (TestAio::supported_io_methods())
+{
+	$node->adjust_conf('postgresql.conf', 'io_method', $method);
+	$node->start();
+	test_io_method($method, $node);
+	$node->stop();
+}
+
+done_testing();
+
+
+sub test_setup
+{
+	my $node = shift;
+
+	$node->safe_psql(
+		'postgres', qq(
+CREATE EXTENSION test_aio;
+
+CREATE TABLE largeish(k int not null) WITH (FILLFACTOR=10);
+INSERT INTO largeish(k) SELECT generate_series(1, 10000);
+));
+	ok(1, "setup");
+}
+
+
+sub test_repeated_blocks
+{
+	my $io_method = shift;
+	my $node = shift;
+
+	my $psql = $node->background_psql('postgres', on_error_stop => 0);
+
+	# Preventing larger reads makes testing easier
+	$psql->query_safe(qq/SET io_combine_limit = 1/);
+
+	# test miss of the same block twice in a row
+	$psql->query_safe(qq/SELECT evict_rel('largeish');/);
+
+	# block 0 grows the distance enough that the stream will look ahead and try
+	# to start a pending read for block 2 (and later block 4) twice before
+	# returning any buffers.
+	$psql->query_safe(
+		qq/SELECT * FROM read_stream_for_blocks('largeish',
+		   ARRAY[0, 2, 2, 4, 4]);/);
+
+	ok(1, "$io_method: stream missing the same block repeatedly");
+
+	$psql->query_safe(
+		qq/SELECT * FROM read_stream_for_blocks('largeish',
+		   ARRAY[0, 2, 2, 4, 4]);/);
+	ok(1, "$io_method: stream hitting the same block repeatedly");
+
+	# test hit of the same block twice in a row
+	$psql->query_safe(qq/SELECT evict_rel('largeish');/);
+	$psql->query_safe(
+		qq/SELECT * FROM read_stream_for_blocks('largeish',
+		   ARRAY[0, 1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1, 0]);/);
+	ok(1, "$io_method: stream accessing same block");
+
+	# Test repeated blocks with a temp table, using invalidate_rel_block()
+	# to evict individual local buffers.
+	$psql->query_safe(
+		qq/CREATE TEMP TABLE largeish_temp(k int not null) WITH (FILLFACTOR=10);
+		   INSERT INTO largeish_temp(k) SELECT generate_series(1, 200);/);
+
+	# Evict the specific blocks we'll request to force misses
+	$psql->query_safe(qq/SELECT invalidate_rel_block('largeish_temp', 0);/);
+	$psql->query_safe(qq/SELECT invalidate_rel_block('largeish_temp', 2);/);
+	$psql->query_safe(qq/SELECT invalidate_rel_block('largeish_temp', 4);/);
+
+	$psql->query_safe(
+		qq/SELECT * FROM read_stream_for_blocks('largeish_temp',
+		   ARRAY[0, 2, 2, 4, 4]);/);
+	ok(1, "$io_method: temp stream missing the same block repeatedly");
+
+	# Now the blocks are cached, so repeated access should be hits
+	$psql->query_safe(
+		qq/SELECT * FROM read_stream_for_blocks('largeish_temp',
+		   ARRAY[0, 2, 2, 4, 4]);/);
+	ok(1, "$io_method: temp stream hitting the same block repeatedly");
+
+	$psql->quit();
+}
+
+
+sub test_inject_foreign
+{
+	my $io_method = shift;
+	my $node = shift;
+
+	my $psql_a = $node->background_psql('postgres', on_error_stop => 0);
+	my $psql_b = $node->background_psql('postgres', on_error_stop => 0);
+
+	my $pid_a = $psql_a->query_safe(qq/SELECT pg_backend_pid();/);
+
+
+	###
+	# Test read stream encountering buffers undergoing IO in another backend,
+	# with the other backend's reads succeeding.
+	###
+	$psql_a->query_safe(qq/SELECT evict_rel('largeish');/);
+
+	$psql_b->query_safe(
+		qq/SELECT inj_io_completion_wait(pid=>pg_backend_pid(),
+		   relfilenode=>pg_relation_filenode('largeish'));/);
+
+	$psql_b->{stdin} .= qq/SELECT read_rel_block_ll('largeish',
+		blockno=>5, nblocks=>1);\n/;
+	$psql_b->{run}->pump_nb();
+
+	$node->poll_query_until(
+		'postgres', qq/SELECT wait_event FROM pg_stat_activity
+			WHERE wait_event = 'completion_wait';/,
+		'completion_wait');
+
+	# Block 5 is undergoing IO in session b, so session a will move on to start
+	# a new IO for block 7.
+	$psql_a->{stdin} .= qq/SELECT array_agg(blocknum) FROM
+		read_stream_for_blocks('largeish', ARRAY[0, 2, 5, 7]);\n/;
+	$psql_a->{run}->pump_nb();
+
+	$node->poll_query_until('postgres',
+		qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid_a),
+		'AioIoCompletion');
+
+	$node->safe_psql('postgres', qq/SELECT inj_io_completion_continue()/);
+
+	pump_until(
+		$psql_a->{run}, $psql_a->{timeout},
+		\$psql_a->{stdout}, qr/\{0,2,5,7\}/);
+
+	ok(1,
+		qq/$io_method: read stream encounters succeeding IO by another backend/
+	);
+
+	###
+	# Test read stream encountering buffers undergoing IO in another backend,
+	# with the other backend's reads failing.
+	###
+	$psql_a->query_safe(qq/SELECT evict_rel('largeish');/);
+
+	$psql_b->query_safe(
+		qq/SELECT inj_io_completion_wait(pid=>pg_backend_pid(),
+		   relfilenode=>pg_relation_filenode('largeish'));/);
+
+	$psql_b->query_safe(
+		qq/SELECT inj_io_short_read_attach(-errno_from_string('EIO'),
+		   pid=>pg_backend_pid(),
+		   relfilenode=>pg_relation_filenode('largeish'));/);
+
+	$psql_b->{stdin} .= qq/SELECT read_rel_block_ll('largeish',
+		blockno=>5, nblocks=>1);\n/;
+	$psql_b->{run}->pump_nb();
+
+	$node->poll_query_until(
+		'postgres',
+		qq/SELECT wait_event FROM pg_stat_activity
+		   WHERE wait_event = 'completion_wait';/,
+		'completion_wait');
+
+	$psql_a->{stdin} .= qq/SELECT array_agg(blocknum) FROM
+		read_stream_for_blocks('largeish', ARRAY[0, 2, 5, 7]);\n/;
+	$psql_a->{run}->pump_nb();
+
+	$node->poll_query_until('postgres',
+		qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid_a),
+		'AioIoCompletion');
+
+	$node->safe_psql('postgres', qq/SELECT inj_io_completion_continue()/);
+
+	pump_until(
+		$psql_a->{run}, $psql_a->{timeout},
+		\$psql_a->{stdout}, qr/\{0,2,5,7\}/);
+
+	pump_until($psql_b->{run}, $psql_b->{timeout}, \$psql_b->{stderr},
+		qr/ERROR.*could not read blocks 5\.\.5/);
+	ok(1, "$io_method: injected error occurred");
+	$psql_b->{stderr} = '';
+	$psql_b->query_safe(qq/SELECT inj_io_short_read_detach();/);
+
+	ok(1,
+		qq/$io_method: read stream encounters failing IO by another backend/);
+
+
+	###
+	# Test read stream encountering two buffers that are undergoing the same
+	# IO, started by another backend.
+	###
+	$psql_a->query_safe(qq/SELECT evict_rel('largeish');/);
+
+	$psql_b->query_safe(
+		qq/SELECT inj_io_completion_wait(pid=>pg_backend_pid(),
+		   relfilenode=>pg_relation_filenode('largeish'));/);
+
+	$psql_b->{stdin} .= qq/SELECT read_rel_block_ll('largeish',
+		blockno=>2, nblocks=>3);\n/;
+	$psql_b->{run}->pump_nb();
+
+	$node->poll_query_until(
+		'postgres',
+		qq/SELECT wait_event FROM pg_stat_activity
+			WHERE wait_event = 'completion_wait';/,
+		'completion_wait');
+
+	# Blocks 2 and 4 are undergoing IO initiated by session a
+	$psql_a->{stdin} .= qq/SELECT array_agg(blocknum) FROM
+		read_stream_for_blocks('largeish', ARRAY[0, 2, 4]);\n/;
+	$psql_a->{run}->pump_nb();
+
+	$node->poll_query_until('postgres',
+		qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid_a),
+		'AioIoCompletion');
+
+	$node->safe_psql('postgres', qq/SELECT inj_io_completion_continue()/);
+
+	pump_until(
+		$psql_a->{run}, $psql_a->{timeout},
+		\$psql_a->{stdout}, qr/\{0,2,4\}/);
+
+	ok(1, qq/$io_method: read stream encounters two buffer read in one IO/);
+
+	$psql_a->quit();
+	$psql_b->quit();
+}
+
+
+sub test_io_method
+{
+	my $io_method = shift;
+	my $node = shift;
+
+	is($node->safe_psql('postgres', 'SHOW io_method'),
+		$io_method, "$io_method: io_method set correctly");
+
+	test_repeated_blocks($io_method, $node);
+
+  SKIP:
+	{
+		skip 'Injection points not supported by this build', 1
+		  unless $ENV{enable_injection_points} eq 'yes';
+		test_inject_foreign($io_method, $node);
+	}
+}
diff --git a/src/test/modules/test_aio/test_aio--1.0.sql b/src/test/modules/test_aio/test_aio--1.0.sql
index c0ac47339a9..d42ba939f9a 100644
--- a/src/test/modules/test_aio/test_aio--1.0.sql
+++ b/src/test/modules/test_aio/test_aio--1.0.sql
@@ -57,6 +57,13 @@ CREATE FUNCTION read_buffers(rel regclass, startblock int4, nblocks int4, OUT bl
 RETURNS SETOF record STRICT
 AS 'MODULE_PATHNAME' LANGUAGE C;
 
+/*
+ * Read stream related functions
+ */
+CREATE FUNCTION read_stream_for_blocks(rel regclass, blocks int4[], OUT blockoff int4, OUT blocknum int4, OUT buf int4)
+RETURNS SETOF record STRICT
+AS 'MODULE_PATHNAME' LANGUAGE C;
+
 
 /*
  * Handle related functions
@@ -98,8 +105,16 @@ AS 'MODULE_PATHNAME' LANGUAGE C;
 /*
  * Injection point related functions
  */
-CREATE FUNCTION inj_io_short_read_attach(result int)
-RETURNS pg_catalog.void STRICT
+CREATE FUNCTION inj_io_completion_wait(pid int DEFAULT NULL, relfilenode oid DEFAULT 0)
+RETURNS pg_catalog.void
+AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION inj_io_completion_continue()
+RETURNS pg_catalog.void
+AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION inj_io_short_read_attach(result int, pid int DEFAULT NULL, relfilenode oid DEFAULT 0)
+RETURNS pg_catalog.void
 AS 'MODULE_PATHNAME' LANGUAGE C;
 
 CREATE FUNCTION inj_io_short_read_detach()
diff --git a/src/test/modules/test_aio/test_aio.c b/src/test/modules/test_aio/test_aio.c
index 43f32039960..c86a0885a5a 100644
--- a/src/test/modules/test_aio/test_aio.c
+++ b/src/test/modules/test_aio/test_aio.c
@@ -27,13 +27,18 @@
 #include "storage/buf_internals.h"
 #include "storage/bufmgr.h"
 #include "storage/checksum.h"
+#include "storage/condition_variable.h"
 #include "storage/ipc.h"
 #include "storage/lwlock.h"
+#include "storage/proc.h"
+#include "storage/procnumber.h"
+#include "storage/read_stream.h"
 #include "utils/array.h"
 #include "utils/builtins.h"
 #include "utils/injection_point.h"
 #include "utils/rel.h"
 #include "utils/tuplestore.h"
+#include "utils/wait_event.h"
 
 
 PG_MODULE_MAGIC;
@@ -41,13 +46,30 @@ PG_MODULE_MAGIC;
 
 typedef struct InjIoErrorState
 {
+	ConditionVariable cv;
+
 	bool		enabled_short_read;
 	bool		enabled_reopen;
 
+	bool		enabled_completion_wait;
+	Oid			completion_wait_relfilenode;
+	pid_t		completion_wait_pid;
+	uint32		completion_wait_event;
+
 	bool		short_read_result_set;
+	Oid			short_read_relfilenode;
+	pid_t		short_read_pid;
 	int			short_read_result;
 } InjIoErrorState;
 
+typedef struct BlocksReadStreamData
+{
+	int			nblocks;
+	int			curblock;
+	uint32	   *blocks;
+} BlocksReadStreamData;
+
+
 static InjIoErrorState *inj_io_error_state;
 
 /* Shared memory init callbacks */
@@ -89,10 +111,13 @@ test_aio_shmem_startup(void)
 		inj_io_error_state->enabled_short_read = false;
 		inj_io_error_state->enabled_reopen = false;
 
+		ConditionVariableInit(&inj_io_error_state->cv);
+		inj_io_error_state->completion_wait_event = WaitEventInjectionPointNew("completion_wait");
+
 #ifdef USE_INJECTION_POINTS
 		InjectionPointAttach("aio-process-completion-before-shared",
 							 "test_aio",
-							 "inj_io_short_read",
+							 "inj_io_completion_hook",
 							 NULL,
 							 0);
 		InjectionPointLoad("aio-process-completion-before-shared");
@@ -388,7 +413,7 @@ read_rel_block_ll(PG_FUNCTION_ARGS)
 	if (nblocks <= 0 || nblocks > PG_IOV_MAX)
 		elog(ERROR, "nblocks is out of range");
 
-	rel = relation_open(relid, AccessExclusiveLock);
+	rel = relation_open(relid, AccessShareLock);
 
 	for (int i = 0; i < nblocks; i++)
 	{
@@ -771,6 +796,85 @@ read_buffers(PG_FUNCTION_ARGS)
 }
 
 
+static BlockNumber
+read_stream_for_blocks_cb(ReadStream *stream,
+						  void *callback_private_data,
+						  void *per_buffer_data)
+{
+	BlocksReadStreamData *stream_data = callback_private_data;
+
+	if (stream_data->curblock >= stream_data->nblocks)
+		return InvalidBlockNumber;
+	return stream_data->blocks[stream_data->curblock++];
+}
+
+PG_FUNCTION_INFO_V1(read_stream_for_blocks);
+Datum
+read_stream_for_blocks(PG_FUNCTION_ARGS)
+{
+	Oid			relid = PG_GETARG_OID(0);
+	ArrayType  *blocksarray = PG_GETARG_ARRAYTYPE_P(1);
+	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+	Relation	rel;
+	BlocksReadStreamData stream_data;
+	ReadStream *stream;
+
+	InitMaterializedSRF(fcinfo, 0);
+
+	/*
+	 * We expect the input to be an N-element int4 array; verify that. We
+	 * don't need to use deconstruct_array() since the array data is just
+	 * going to look like a C array of N int4 values.
+	 */
+	if (ARR_NDIM(blocksarray) != 1 ||
+		ARR_HASNULL(blocksarray) ||
+		ARR_ELEMTYPE(blocksarray) != INT4OID)
+		elog(ERROR, "expected 1 dimensional int4 array");
+
+	stream_data.curblock = 0;
+	stream_data.nblocks = ARR_DIMS(blocksarray)[0];
+	stream_data.blocks = (uint32 *) ARR_DATA_PTR(blocksarray);
+
+	rel = relation_open(relid, AccessShareLock);
+
+	stream = read_stream_begin_relation(READ_STREAM_FULL,
+										NULL,
+										rel,
+										MAIN_FORKNUM,
+										read_stream_for_blocks_cb,
+										&stream_data,
+										0);
+
+	for (int i = 0; i < stream_data.nblocks; i++)
+	{
+		Buffer		buf = read_stream_next_buffer(stream, NULL);
+		Datum		values[3] = {0};
+		bool		nulls[3] = {0};
+
+		if (!BufferIsValid(buf))
+			elog(ERROR, "read_stream_next_buffer() call %d is unexpectedly invalid", i);
+
+		values[0] = Int32GetDatum(i);
+		values[1] = UInt32GetDatum(stream_data.blocks[i]);
+		values[2] = UInt32GetDatum(buf);
+
+		tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
+
+		ReleaseBuffer(buf);
+	}
+
+	if (read_stream_next_buffer(stream, NULL) != InvalidBuffer)
+		elog(ERROR, "read_stream_next_buffer() call %d is unexpectedly valid",
+			 stream_data.nblocks + 1);
+
+	read_stream_end(stream);
+
+	relation_close(rel, NoLock);
+
+	return (Datum) 0;
+}
+
+
 PG_FUNCTION_INFO_V1(handle_get);
 Datum
 handle_get(PG_FUNCTION_ARGS)
@@ -841,15 +945,98 @@ batch_end(PG_FUNCTION_ARGS)
 }
 
 #ifdef USE_INJECTION_POINTS
-extern PGDLLEXPORT void inj_io_short_read(const char *name,
-										  const void *private_data,
-										  void *arg);
+extern PGDLLEXPORT void inj_io_completion_hook(const char *name,
+											   const void *private_data,
+											   void *arg);
 extern PGDLLEXPORT void inj_io_reopen(const char *name,
 									  const void *private_data,
 									  void *arg);
 
-void
-inj_io_short_read(const char *name, const void *private_data, void *arg)
+static bool
+inj_io_short_read_matches(PgAioHandle *ioh)
+{
+	PGPROC	   *owner_proc;
+	int32		owner_pid;
+	PgAioTargetData *td;
+
+	if (!inj_io_error_state->enabled_short_read)
+		return false;
+
+	if (!inj_io_error_state->short_read_result_set)
+		return false;
+
+	owner_proc = GetPGProcByNumber(pgaio_io_get_owner(ioh));
+	owner_pid = owner_proc->pid;
+
+	if (inj_io_error_state->short_read_pid != 0 &&
+		inj_io_error_state->short_read_pid != owner_pid)
+		return false;
+
+	td = pgaio_io_get_target_data(ioh);
+
+	if (inj_io_error_state->short_read_relfilenode != InvalidOid &&
+		td->smgr.rlocator.relNumber != inj_io_error_state->short_read_relfilenode)
+		return false;
+
+	/*
+	 * Only shorten reads that are actually longer than the target size,
+	 * otherwise we can trigger over-reads.
+	 */
+	if (inj_io_error_state->short_read_result >= ioh->result)
+		return false;
+
+	return true;
+}
+
+static bool
+inj_io_completion_wait_matches(PgAioHandle *ioh)
+{
+	PGPROC	   *owner_proc;
+	int32		owner_pid;
+	PgAioTargetData *td;
+
+	if (!inj_io_error_state->enabled_completion_wait)
+		return false;
+
+	owner_proc = GetPGProcByNumber(pgaio_io_get_owner(ioh));
+	owner_pid = owner_proc->pid;
+
+	if (inj_io_error_state->completion_wait_pid != owner_pid)
+		return false;
+
+	td = pgaio_io_get_target_data(ioh);
+
+	if (inj_io_error_state->completion_wait_relfilenode != InvalidOid &&
+		td->smgr.rlocator.relNumber != inj_io_error_state->completion_wait_relfilenode)
+		return false;
+
+	return true;
+}
+
+static void
+inj_io_completion_wait_hook(const char *name, const void *private_data, void *arg)
+{
+	PgAioHandle *ioh = (PgAioHandle *) arg;
+
+	if (!inj_io_completion_wait_matches(ioh))
+		return;
+
+	ConditionVariablePrepareToSleep(&inj_io_error_state->cv);
+
+	while (true)
+	{
+		if (!inj_io_completion_wait_matches(ioh))
+			break;
+
+		ConditionVariableSleep(&inj_io_error_state->cv,
+							   inj_io_error_state->completion_wait_event);
+	}
+
+	ConditionVariableCancelSleep();
+}
+
+static void
+inj_io_short_read_hook(const char *name, const void *private_data, void *arg)
 {
 	PgAioHandle *ioh = (PgAioHandle *) arg;
 
@@ -858,58 +1045,56 @@ inj_io_short_read(const char *name, const void *private_data, void *arg)
 				   inj_io_error_state->enabled_reopen),
 			errhidestmt(true), errhidecontext(true));
 
-	if (inj_io_error_state->enabled_short_read)
+	if (inj_io_short_read_matches(ioh))
 	{
+		struct iovec *iov = &pgaio_ctl->iovecs[ioh->iovec_off];
+		int32		old_result = ioh->result;
+		int32		new_result = inj_io_error_state->short_read_result;
+		int32		processed = 0;
+
+		ereport(LOG,
+				errmsg("short read inject point, changing result from %d to %d",
+					   old_result, new_result),
+				errhidestmt(true), errhidecontext(true));
+
 		/*
-		 * Only shorten reads that are actually longer than the target size,
-		 * otherwise we can trigger over-reads.
+		 * The underlying IO actually completed OK, and thus the "invalid"
+		 * portion of the IOV actually contains valid data. That can hide a
+		 * lot of problems, e.g. if we were to wrongly mark a buffer, that
+		 * wasn't read according to the shortened-read, IO as valid, the
+		 * contents would look valid and we might miss a bug.
+		 *
+		 * To avoid that, iterate through the IOV and zero out the "failed"
+		 * portion of the IO.
 		 */
-		if (inj_io_error_state->short_read_result_set
-			&& ioh->op == PGAIO_OP_READV
-			&& inj_io_error_state->short_read_result <= ioh->result)
+		for (int i = 0; i < ioh->op_data.read.iov_length; i++)
 		{
-			struct iovec *iov = &pgaio_ctl->iovecs[ioh->iovec_off];
-			int32		old_result = ioh->result;
-			int32		new_result = inj_io_error_state->short_read_result;
-			int32		processed = 0;
-
-			ereport(LOG,
-					errmsg("short read inject point, changing result from %d to %d",
-						   old_result, new_result),
-					errhidestmt(true), errhidecontext(true));
-
-			/*
-			 * The underlying IO actually completed OK, and thus the "invalid"
-			 * portion of the IOV actually contains valid data. That can hide
-			 * a lot of problems, e.g. if we were to wrongly mark a buffer,
-			 * that wasn't read according to the shortened-read, IO as valid,
-			 * the contents would look valid and we might miss a bug.
-			 *
-			 * To avoid that, iterate through the IOV and zero out the
-			 * "failed" portion of the IO.
-			 */
-			for (int i = 0; i < ioh->op_data.read.iov_length; i++)
+			if (processed + iov[i].iov_len <= new_result)
+				processed += iov[i].iov_len;
+			else if (processed <= new_result)
 			{
-				if (processed + iov[i].iov_len <= new_result)
-					processed += iov[i].iov_len;
-				else if (processed <= new_result)
-				{
-					uint32		ok_part = new_result - processed;
+				uint32		ok_part = new_result - processed;
 
-					memset((char *) iov[i].iov_base + ok_part, 0, iov[i].iov_len - ok_part);
-					processed += iov[i].iov_len;
-				}
-				else
-				{
-					memset((char *) iov[i].iov_base, 0, iov[i].iov_len);
-				}
+				memset((char *) iov[i].iov_base + ok_part, 0, iov[i].iov_len - ok_part);
+				processed += iov[i].iov_len;
+			}
+			else
+			{
+				memset((char *) iov[i].iov_base, 0, iov[i].iov_len);
 			}
-
-			ioh->result = new_result;
 		}
+
+		ioh->result = new_result;
 	}
 }
 
+void
+inj_io_completion_hook(const char *name, const void *private_data, void *arg)
+{
+	inj_io_completion_wait_hook(name, private_data, arg);
+	inj_io_short_read_hook(name, private_data, arg);
+}
+
 void
 inj_io_reopen(const char *name, const void *private_data, void *arg)
 {
@@ -923,6 +1108,39 @@ inj_io_reopen(const char *name, const void *private_data, void *arg)
 }
 #endif
 
+PG_FUNCTION_INFO_V1(inj_io_completion_wait);
+Datum
+inj_io_completion_wait(PG_FUNCTION_ARGS)
+{
+#ifdef USE_INJECTION_POINTS
+	inj_io_error_state->enabled_completion_wait = true;
+	inj_io_error_state->completion_wait_pid =
+		PG_ARGISNULL(0) ? 0 : PG_GETARG_INT32(0);
+	inj_io_error_state->completion_wait_relfilenode =
+		PG_ARGISNULL(1) ? InvalidOid : PG_GETARG_OID(1);
+#else
+	elog(ERROR, "injection points not supported");
+#endif
+
+	PG_RETURN_VOID();
+}
+
+PG_FUNCTION_INFO_V1(inj_io_completion_continue);
+Datum
+inj_io_completion_continue(PG_FUNCTION_ARGS)
+{
+#ifdef USE_INJECTION_POINTS
+	inj_io_error_state->enabled_completion_wait = false;
+	inj_io_error_state->completion_wait_pid = 0;
+	inj_io_error_state->completion_wait_relfilenode = InvalidOid;
+	ConditionVariableBroadcast(&inj_io_error_state->cv);
+#else
+	elog(ERROR, "injection points not supported");
+#endif
+
+	PG_RETURN_VOID();
+}
+
 PG_FUNCTION_INFO_V1(inj_io_short_read_attach);
 Datum
 inj_io_short_read_attach(PG_FUNCTION_ARGS)
@@ -932,6 +1150,10 @@ inj_io_short_read_attach(PG_FUNCTION_ARGS)
 	inj_io_error_state->short_read_result_set = !PG_ARGISNULL(0);
 	if (inj_io_error_state->short_read_result_set)
 		inj_io_error_state->short_read_result = PG_GETARG_INT32(0);
+	inj_io_error_state->short_read_pid =
+		PG_ARGISNULL(1) ? 0 : PG_GETARG_INT32(1);
+	inj_io_error_state->short_read_relfilenode =
+		PG_ARGISNULL(2) ? 0 : PG_GETARG_OID(2);
 #else
 	elog(ERROR, "injection points not supported");
 #endif
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index dbbec84b222..e097613eb66 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -308,6 +308,7 @@ BlockSampler
 BlockSamplerData
 BlockedProcData
 BlockedProcsData
+BlocksReadStreamData
 BlocktableEntry
 BloomBuildState
 BloomFilter
-- 
2.53.0.1.gb2826b52eb

>From 2bb630e96187dbb6833f7435a4600f9be2f9b851 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <[email protected]>
Date: Mon, 16 Mar 2026 16:50:56 -0400
Subject: [PATCH v8 4/9] Fix off-by-one error in read IO tracing

AsyncReadBuffer()'s no-IO needed path passed
TRACE_POSTGRESQL_BUFFER_READ_DONE the wrong block number because it had
already incremented operation->nblocks_done. Fix by folding the
nblocks_done offset into the blocknum local variable at initialization.

Author: Melanie Plageman <[email protected]>
Reviewed-by: Andres Freund <[email protected]>
Discussion: https://postgr.es/m/u73un3xeljr4fiidzwi4ikcr6vm7oqugn4fo5vqpstjio6anl2%40hph6fvdiiria
---
 src/backend/storage/buffer/bufmgr.c | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 00bc609529a..10afae1990b 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -1875,10 +1875,10 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
 {
 	Buffer	   *buffers = &operation->buffers[0];
 	int			flags = operation->flags;
-	BlockNumber blocknum = operation->blocknum;
 	ForkNumber	forknum = operation->forknum;
 	char		persistence = operation->persistence;
 	int16		nblocks_done = operation->nblocks_done;
+	BlockNumber blocknum = operation->blocknum + nblocks_done;
 	Buffer	   *io_buffers = &operation->buffers[nblocks_done];
 	int			io_buffers_len = 0;
 	PgAioHandle *ioh;
@@ -1990,7 +1990,7 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
 		 * must have started out as a miss in PinBufferForBlock(). The other
 		 * backend will track this as a 'read'.
 		 */
-		TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, blocknum + operation->nblocks_done,
+		TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, blocknum,
 										  operation->smgr->smgr_rlocator.locator.spcOid,
 										  operation->smgr->smgr_rlocator.locator.dbOid,
 										  operation->smgr->smgr_rlocator.locator.relNumber,
@@ -2062,7 +2062,7 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
 		 */
 		io_start = pgstat_prepare_io_time(track_io_timing);
 		smgrstartreadv(ioh, operation->smgr, forknum,
-					   blocknum + nblocks_done,
+					   blocknum,
 					   io_pages, io_buffers_len);
 		pgstat_count_io_op_time(io_object, io_context, IOOP_READ,
 								io_start, 1, io_buffers_len * BLCKSZ);
-- 
2.53.0.1.gb2826b52eb

>From a68f562d9cbe3b57853fc2196a29f58b064a82d7 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <[email protected]>
Date: Tue, 17 Mar 2026 15:49:52 -0400
Subject: [PATCH v8 5/9] Pass io_object and io_context through to
 PinBufferForBlock()

PinBufferForBlock() is always_inline and called in a loop in
StartReadBuffersImpl(). Previously it computed io_context and io_object
internally, which required calling IOContextForStrategy() -- a non-inline
function the compiler cannot prove is side-effect-free. This could
potential cause unneeded redundant function calls.

Compute io_context and io_object in the callers instead, allowing
StartReadBuffersImpl() to do so once before entering the loop.

Suggested-by: Andres Freund <[email protected]>
Discussion: https://postgr.es/m/zljergweqti7x67lg5ije2rzjusie37nslsnkjkkby4laqqbfw@3p3zu522yykv
---
 src/backend/storage/buffer/bufmgr.c | 45 ++++++++++++++++++++---------
 1 file changed, 31 insertions(+), 14 deletions(-)

diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 10afae1990b..ab9c2a4b904 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -1223,11 +1223,11 @@ PinBufferForBlock(Relation rel,
 				  ForkNumber forkNum,
 				  BlockNumber blockNum,
 				  BufferAccessStrategy strategy,
+				  IOObject io_object,
+				  IOContext io_context,
 				  bool *foundPtr)
 {
 	BufferDesc *bufHdr;
-	IOContext	io_context;
-	IOObject	io_object;
 
 	Assert(blockNum != P_NEW);
 
@@ -1236,17 +1236,6 @@ PinBufferForBlock(Relation rel,
 			persistence == RELPERSISTENCE_PERMANENT ||
 			persistence == RELPERSISTENCE_UNLOGGED));
 
-	if (persistence == RELPERSISTENCE_TEMP)
-	{
-		io_context = IOCONTEXT_NORMAL;
-		io_object = IOOBJECT_TEMP_RELATION;
-	}
-	else
-	{
-		io_context = IOContextForStrategy(strategy);
-		io_object = IOOBJECT_RELATION;
-	}
-
 	TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum,
 									   smgr->smgr_rlocator.locator.spcOid,
 									   smgr->smgr_rlocator.locator.dbOid,
@@ -1339,9 +1328,23 @@ ReadBuffer_common(Relation rel, SMgrRelation smgr, char smgr_persistence,
 				 mode == RBM_ZERO_AND_LOCK))
 	{
 		bool		found;
+		IOContext	io_context;
+		IOObject	io_object;
+
+		if (persistence == RELPERSISTENCE_TEMP)
+		{
+			io_context = IOCONTEXT_NORMAL;
+			io_object = IOOBJECT_TEMP_RELATION;
+		}
+		else
+		{
+			io_context = IOContextForStrategy(strategy);
+			io_object = IOOBJECT_RELATION;
+		}
 
 		buffer = PinBufferForBlock(rel, smgr, persistence,
-								   forkNum, blockNum, strategy, &found);
+								   forkNum, blockNum, strategy,
+								   io_object, io_context, &found);
 		ZeroAndLockBuffer(buffer, mode, found);
 		return buffer;
 	}
@@ -1379,11 +1382,24 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
 	int			actual_nblocks = *nblocks;
 	int			maxcombine = 0;
 	bool		did_start_io;
+	IOContext	io_context;
+	IOObject	io_object;
 
 	Assert(*nblocks == 1 || allow_forwarding);
 	Assert(*nblocks > 0);
 	Assert(*nblocks <= MAX_IO_COMBINE_LIMIT);
 
+	if (operation->persistence == RELPERSISTENCE_TEMP)
+	{
+		io_context = IOCONTEXT_NORMAL;
+		io_object = IOOBJECT_TEMP_RELATION;
+	}
+	else
+	{
+		io_context = IOContextForStrategy(operation->strategy);
+		io_object = IOOBJECT_RELATION;
+	}
+
 	for (int i = 0; i < actual_nblocks; ++i)
 	{
 		bool		found;
@@ -1432,6 +1448,7 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
 										   operation->forknum,
 										   blockNum + i,
 										   operation->strategy,
+										   io_object, io_context,
 										   &found);
 		}
 
-- 
2.53.0.1.gb2826b52eb

>From 6f174b440e08191b78b1927e29fc31a26cfa3be8 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <[email protected]>
Date: Fri, 23 Jan 2026 13:54:02 -0500
Subject: [PATCH v8 6/9] Make buffer hit helper

Already two places count buffer hits, requiring quite a few lines of
code since we do accounting in so many places. Future commits will add
more locations, so refactor into a helper.

Author: Melanie Plageman <[email protected]>
Reviewed-by: Nazir Bilal Yavuz <[email protected]>
Discussion: https://postgr.es/m/zljergweqti7x67lg5ije2rzjusie37nslsnkjkkby4laqqbfw@3p3zu522yykv
---
 src/backend/storage/buffer/bufmgr.c | 84 ++++++++++++++---------------
 1 file changed, 42 insertions(+), 42 deletions(-)

diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index ab9c2a4b904..fa85570a791 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -648,6 +648,11 @@ static inline BufferDesc *BufferAlloc(SMgrRelation smgr,
 									  bool *foundPtr, IOContext io_context);
 static bool AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress);
 static void CheckReadBuffersOperation(ReadBuffersOperation *operation, bool is_complete);
+
+static pg_attribute_always_inline void TrackBufferHit(IOObject io_object,
+													  IOContext io_context,
+													  Relation rel, char persistence, SMgrRelation smgr,
+													  ForkNumber forknum, BlockNumber blocknum);
 static Buffer GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context);
 static void FlushUnlockedBuffer(BufferDesc *buf, SMgrRelation reln,
 								IOObject io_object, IOContext io_context);
@@ -1243,18 +1248,14 @@ PinBufferForBlock(Relation rel,
 									   smgr->smgr_rlocator.backend);
 
 	if (persistence == RELPERSISTENCE_TEMP)
-	{
 		bufHdr = LocalBufferAlloc(smgr, forkNum, blockNum, foundPtr);
-		if (*foundPtr)
-			pgBufferUsage.local_blks_hit++;
-	}
 	else
-	{
 		bufHdr = BufferAlloc(smgr, persistence, forkNum, blockNum,
 							 strategy, foundPtr, io_context);
-		if (*foundPtr)
-			pgBufferUsage.shared_blks_hit++;
-	}
+
+	if (*foundPtr)
+		TrackBufferHit(io_object, io_context, rel, persistence, smgr, forkNum, blockNum);
+
 	if (rel)
 	{
 		/*
@@ -1263,21 +1264,6 @@ PinBufferForBlock(Relation rel,
 		 * zeroed instead), the per-relation stats always count them.
 		 */
 		pgstat_count_buffer_read(rel);
-		if (*foundPtr)
-			pgstat_count_buffer_hit(rel);
-	}
-	if (*foundPtr)
-	{
-		pgstat_count_io_op(io_object, io_context, IOOP_HIT, 1, 0);
-		if (VacuumCostActive)
-			VacuumCostBalance += VacuumCostPageHit;
-
-		TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum,
-										  smgr->smgr_rlocator.locator.spcOid,
-										  smgr->smgr_rlocator.locator.dbOid,
-										  smgr->smgr_rlocator.locator.relNumber,
-										  smgr->smgr_rlocator.backend,
-										  true);
 	}
 
 	return BufferDescriptorGetBuffer(bufHdr);
@@ -1712,6 +1698,37 @@ ReadBuffersCanStartIO(Buffer buffer, bool nowait)
 	return ReadBuffersCanStartIOOnce(buffer, nowait);
 }
 
+/*
+ * We track various stats related to buffer hits. Because this is done in a
+ * few separate places, this helper exists for convenience.
+ */
+static pg_attribute_always_inline void
+TrackBufferHit(IOObject io_object, IOContext io_context,
+			   Relation rel, char persistence, SMgrRelation smgr,
+			   ForkNumber forknum, BlockNumber blocknum)
+{
+	TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum,
+									  blocknum,
+									  smgr->smgr_rlocator.locator.spcOid,
+									  smgr->smgr_rlocator.locator.dbOid,
+									  smgr->smgr_rlocator.locator.relNumber,
+									  smgr->smgr_rlocator.backend,
+									  true);
+
+	if (persistence == RELPERSISTENCE_TEMP)
+		pgBufferUsage.local_blks_hit += 1;
+	else
+		pgBufferUsage.shared_blks_hit += 1;
+
+	pgstat_count_io_op(io_object, io_context, IOOP_HIT, 1, 0);
+
+	if (VacuumCostActive)
+		VacuumCostBalance += VacuumCostPageHit;
+
+	if (rel)
+		pgstat_count_buffer_hit(rel);
+}
+
 /*
  * Helper for WaitReadBuffers() that processes the results of a readv
  * operation, raising an error if necessary.
@@ -2007,25 +2024,8 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
 		 * must have started out as a miss in PinBufferForBlock(). The other
 		 * backend will track this as a 'read'.
 		 */
-		TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, blocknum,
-										  operation->smgr->smgr_rlocator.locator.spcOid,
-										  operation->smgr->smgr_rlocator.locator.dbOid,
-										  operation->smgr->smgr_rlocator.locator.relNumber,
-										  operation->smgr->smgr_rlocator.backend,
-										  true);
-
-		if (persistence == RELPERSISTENCE_TEMP)
-			pgBufferUsage.local_blks_hit += 1;
-		else
-			pgBufferUsage.shared_blks_hit += 1;
-
-		if (operation->rel)
-			pgstat_count_buffer_hit(operation->rel);
-
-		pgstat_count_io_op(io_object, io_context, IOOP_HIT, 1, 0);
-
-		if (VacuumCostActive)
-			VacuumCostBalance += VacuumCostPageHit;
+		TrackBufferHit(io_object, io_context, operation->rel, persistence,
+					   operation->smgr, forknum, blocknum);
 	}
 	else
 	{
-- 
2.53.0.1.gb2826b52eb

>From c24ee51b259125b1ab73860b72ea7abae4bc7c60 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <[email protected]>
Date: Wed, 18 Mar 2026 11:09:58 -0400
Subject: [PATCH v8 7/9] Restructure AsyncReadBuffers()

Restructure AsyncReadBuffers() to use early return when the head buffer
is already valid, instead of using a did_start_io flag and if/else
branches. Also move around a bit of the code to be located closer to
where it is used. This is a refactor only.

Author: Melanie Plageman <[email protected]>
Reviewed-by: Andres Freund <[email protected]>
Discussion: https://postgr.es/m/zljergweqti7x67lg5ije2rzjusie37nslsnkjkkby4laqqbfw@3p3zu522yykv
---
 src/backend/storage/buffer/bufmgr.c | 171 ++++++++++++++--------------
 1 file changed, 88 insertions(+), 83 deletions(-)

diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index fa85570a791..e212f6110f2 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -1920,7 +1920,18 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
 	void	   *io_pages[MAX_IO_COMBINE_LIMIT];
 	IOContext	io_context;
 	IOObject	io_object;
-	bool		did_start_io;
+	instr_time	io_start;
+
+	if (persistence == RELPERSISTENCE_TEMP)
+	{
+		io_context = IOCONTEXT_NORMAL;
+		io_object = IOOBJECT_TEMP_RELATION;
+	}
+	else
+	{
+		io_context = IOContextForStrategy(operation->strategy);
+		io_object = IOOBJECT_RELATION;
+	}
 
 	/*
 	 * When this IO is executed synchronously, either because the caller will
@@ -1931,16 +1942,7 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
 		ioh_flags |= PGAIO_HF_SYNCHRONOUS;
 
 	if (persistence == RELPERSISTENCE_TEMP)
-	{
-		io_context = IOCONTEXT_NORMAL;
-		io_object = IOOBJECT_TEMP_RELATION;
 		ioh_flags |= PGAIO_HF_REFERENCES_LOCAL;
-	}
-	else
-	{
-		io_context = IOContextForStrategy(operation->strategy);
-		io_object = IOOBJECT_RELATION;
-	}
 
 	/*
 	 * If zero_damaged_pages is enabled, add the READ_BUFFERS_ZERO_ON_ERROR
@@ -1992,7 +1994,6 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
 	if (unlikely(!ioh))
 	{
 		pgaio_submit_staged();
-
 		ioh = pgaio_io_acquire(CurrentResourceOwner, &operation->io_return);
 	}
 
@@ -2017,91 +2018,95 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
 
 		pgaio_io_release(ioh);
 		pgaio_wref_clear(&operation->io_wref);
-		did_start_io = false;
 
 		/*
 		 * Report and track this as a 'hit' for this backend, even though it
 		 * must have started out as a miss in PinBufferForBlock(). The other
 		 * backend will track this as a 'read'.
 		 */
-		TrackBufferHit(io_object, io_context, operation->rel, persistence,
-					   operation->smgr, forknum, blocknum);
+		TrackBufferHit(io_object, io_context,
+					   operation->rel, operation->persistence,
+					   operation->smgr, operation->forknum,
+					   blocknum);
+		return false;
 	}
-	else
+
+	Assert(io_buffers[0] == buffers[nblocks_done]);
+	io_pages[0] = BufferGetBlock(buffers[nblocks_done]);
+	io_buffers_len = 1;
+
+	/*
+	 * NB: As little code as possible should be added between the
+	 * ReadBuffersCanStartIO() above, the further ReadBuffersCanStartIO()s
+	 * below and the smgrstartreadv(), as some of the buffers are now marked
+	 * as IO_IN_PROGRESS and will thus cause other backends to wait.
+	 */
+
+	/*
+	 * How many neighboring-on-disk blocks can we scatter-read into other
+	 * buffers at the same time?  In this case we don't wait if we see an I/O
+	 * already in progress (see comment above).
+	 */
+	for (int i = nblocks_done + 1; i < operation->nblocks; i++)
 	{
-		instr_time	io_start;
+		/* Must be consecutive block numbers. */
+		Assert(BufferGetBlockNumber(buffers[i - 1]) ==
+			   BufferGetBlockNumber(buffers[i]) - 1);
 
-		/* We found a buffer that we need to read in. */
-		Assert(io_buffers[0] == buffers[nblocks_done]);
-		io_pages[0] = BufferGetBlock(buffers[nblocks_done]);
-		io_buffers_len = 1;
+		if (!ReadBuffersCanStartIO(buffers[i], true))
+			break;
 
-		/*
-		 * How many neighboring-on-disk blocks can we scatter-read into other
-		 * buffers at the same time?  In this case we don't wait if we see an
-		 * I/O already in progress.  We already set BM_IO_IN_PROGRESS for the
-		 * head block, so we should get on with that I/O as soon as possible.
-		 */
-		for (int i = nblocks_done + 1; i < operation->nblocks; i++)
-		{
-			if (!ReadBuffersCanStartIO(buffers[i], true))
-				break;
-			/* Must be consecutive block numbers. */
-			Assert(BufferGetBlockNumber(buffers[i - 1]) ==
-				   BufferGetBlockNumber(buffers[i]) - 1);
-			Assert(io_buffers[io_buffers_len] == buffers[i]);
+		Assert(io_buffers[io_buffers_len] == buffers[i]);
 
-			io_pages[io_buffers_len++] = BufferGetBlock(buffers[i]);
-		}
-
-		/* get a reference to wait for in WaitReadBuffers() */
-		pgaio_io_get_wref(ioh, &operation->io_wref);
-
-		/* provide the list of buffers to the completion callbacks */
-		pgaio_io_set_handle_data_32(ioh, (uint32 *) io_buffers, io_buffers_len);
-
-		pgaio_io_register_callbacks(ioh,
-									persistence == RELPERSISTENCE_TEMP ?
-									PGAIO_HCB_LOCAL_BUFFER_READV :
-									PGAIO_HCB_SHARED_BUFFER_READV,
-									flags);
-
-		pgaio_io_set_flag(ioh, ioh_flags);
-
-		/* ---
-		 * Even though we're trying to issue IO asynchronously, track the time
-		 * in smgrstartreadv():
-		 * - if io_method == IOMETHOD_SYNC, we will always perform the IO
-		 *   immediately
-		 * - the io method might not support the IO (e.g. worker IO for a temp
-		 *   table)
-		 * ---
-		 */
-		io_start = pgstat_prepare_io_time(track_io_timing);
-		smgrstartreadv(ioh, operation->smgr, forknum,
-					   blocknum,
-					   io_pages, io_buffers_len);
-		pgstat_count_io_op_time(io_object, io_context, IOOP_READ,
-								io_start, 1, io_buffers_len * BLCKSZ);
-
-		if (persistence == RELPERSISTENCE_TEMP)
-			pgBufferUsage.local_blks_read += io_buffers_len;
-		else
-			pgBufferUsage.shared_blks_read += io_buffers_len;
-
-		/*
-		 * Track vacuum cost when issuing IO, not after waiting for it.
-		 * Otherwise we could end up issuing a lot of IO in a short timespan,
-		 * despite a low cost limit.
-		 */
-		if (VacuumCostActive)
-			VacuumCostBalance += VacuumCostPageMiss * io_buffers_len;
-
-		*nblocks_progress = io_buffers_len;
-		did_start_io = true;
+		io_pages[io_buffers_len++] = BufferGetBlock(buffers[i]);
 	}
 
-	return did_start_io;
+	/* get a reference to wait for in WaitReadBuffers() */
+	pgaio_io_get_wref(ioh, &operation->io_wref);
+
+	/* provide the list of buffers to the completion callbacks */
+	pgaio_io_set_handle_data_32(ioh, (uint32 *) io_buffers, io_buffers_len);
+
+	pgaio_io_register_callbacks(ioh,
+								persistence == RELPERSISTENCE_TEMP ?
+								PGAIO_HCB_LOCAL_BUFFER_READV :
+								PGAIO_HCB_SHARED_BUFFER_READV,
+								flags);
+
+	pgaio_io_set_flag(ioh, ioh_flags);
+
+	/* ---
+	 * Even though we're trying to issue IO asynchronously, track the time
+	 * in smgrstartreadv():
+	 * - if io_method == IOMETHOD_SYNC, we will always perform the IO
+	 *   immediately
+	 * - the io method might not support the IO (e.g. worker IO for a temp
+	 *   table)
+	 * ---
+	 */
+	io_start = pgstat_prepare_io_time(track_io_timing);
+	smgrstartreadv(ioh, operation->smgr, forknum,
+				   blocknum,
+				   io_pages, io_buffers_len);
+	pgstat_count_io_op_time(io_object, io_context, IOOP_READ,
+							io_start, 1, io_buffers_len * BLCKSZ);
+
+	if (persistence == RELPERSISTENCE_TEMP)
+		pgBufferUsage.local_blks_read += io_buffers_len;
+	else
+		pgBufferUsage.shared_blks_read += io_buffers_len;
+
+	/*
+	 * Track vacuum cost when issuing IO, not after waiting for it. Otherwise
+	 * we could end up issuing a lot of IO in a short timespan, despite a low
+	 * cost limit.
+	 */
+	if (VacuumCostActive)
+		VacuumCostBalance += VacuumCostPageMiss * io_buffers_len;
+
+	*nblocks_progress = io_buffers_len;
+
+	return true;
 }
 
 /*
-- 
2.53.0.1.gb2826b52eb

>From e1abf0c05f4cfa8ac8436131fbd1835f901901cf Mon Sep 17 00:00:00 2001
From: Andres Freund <[email protected]>
Date: Thu, 19 Mar 2026 16:30:38 -0400
Subject: [PATCH v8 8/9] bufmgr: Improve StartBufferIO interface

Until now StartBufferIO() had a few weaknesses:

- As it did not submit staged IOs, it was not safe to call StartBufferIO()
  where there was a potential for unsubmitted IO, which required
  AsyncReadBuffers() to use a wrapper (ReadBuffersCanStartIO()) around
  StartBufferIO().

- With nowait = true, the boolean return value did not allow to distinguish
  between no IO being necessary and having to wait, which would lead
  ReadBuffersCanStartIO() to unnecessarily submit staged IO.

- Several callers needed to handle both local and shared buffers, requiring
  the caller to differentiate between StartBufferIO() and StartLocalBufferIO()

- In a future commit some callers of StartBufferIO() want the BufferDesc's
  io_wref to be returned, to asynchronously wait for in-progress IO

- Indicating whether to wait with the nowait parameter was somewhat confusing
  compared to a wait parameter

Address these issue as follows:

- StartBufferIO() is renamed to StartSharedBufferIO()

- A new StartBufferIO() is introduced that supports both shared and local
  buffers

- The boolean return value has been replaced with an enum, indicating whether
  the IO is already done, already in progress or that the buffer has been
  readied for IO

- A new PgAioWaitRef * argument allows the caller to get the wait reference is
  desired.  All current callers pass NULL, a user of this will be introduced
  subsequently

- Instead of the nowait argument there now is wait

  This probably would not have been worthwhile on its own, but since all these
  lines needed to be touched anyway...

Author: Andres Freund <[email protected]>
Author: Melanie Plageman <[email protected]>
Discussion: https://postgr.es/m/zljergweqti7x67lg5ije2rzjusie37nslsnkjkkby4laqqbfw@3p3zu522yykv
---
 src/include/storage/buf_internals.h         |  24 +-
 src/backend/storage/buffer/bufmgr.c         | 296 ++++++++++++--------
 src/backend/storage/buffer/localbuf.c       |  43 ++-
 src/test/modules/test_aio/t/001_aio.pl      |  24 +-
 src/test/modules/test_aio/test_aio--1.0.sql |   2 +-
 src/test/modules/test_aio/test_aio.c        |  17 +-
 src/tools/pgindent/typedefs.list            |   1 +
 7 files changed, 258 insertions(+), 149 deletions(-)

diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h
index 8d1e16b5d51..ad1b7b2216a 100644
--- a/src/include/storage/buf_internals.h
+++ b/src/include/storage/buf_internals.h
@@ -554,8 +554,25 @@ extern void ScheduleBufferTagForWriteback(WritebackContext *wb_context,
 
 extern void TrackNewBufferPin(Buffer buf);
 
-/* solely to make it easier to write tests */
-extern bool StartBufferIO(BufferDesc *buf, bool forInput, bool nowait);
+/*
+ * Return value for StartBufferIO / StartSharedBufferIO / StartLocalBufferIO.
+ *
+ * When preparing a buffer for I/O and setting BM_IO_IN_PROGRESS, the buffer
+ * may already have I/O in progress or the I/O may have been done by another
+ * backend.  See the documentation of StartSharedBufferIO for more details.
+ */
+typedef enum StartBufferIOResult
+{
+	BUFFER_IO_ALREADY_DONE,
+	BUFFER_IO_IN_PROGRESS,
+	BUFFER_IO_READY_FOR_IO,
+} StartBufferIOResult;
+
+/* the following are exposed to make it easier to write tests */
+extern StartBufferIOResult StartBufferIO(Buffer buffer, bool forInput, bool wait,
+										 PgAioWaitRef *io_wref);
+extern StartBufferIOResult StartSharedBufferIO(BufferDesc *buf, bool forInput, bool wait,
+											   PgAioWaitRef *io_wref);
 extern void TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint64 set_flag_bits,
 							  bool forget_owner, bool release_aio);
 
@@ -600,7 +617,8 @@ extern BlockNumber ExtendBufferedRelLocal(BufferManagerRelation bmr,
 extern void MarkLocalBufferDirty(Buffer buffer);
 extern void TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty,
 								   uint64 set_flag_bits, bool release_aio);
-extern bool StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool nowait);
+extern StartBufferIOResult StartLocalBufferIO(BufferDesc *bufHdr, bool forInput,
+											  bool wait, PgAioWaitRef *io_wref);
 extern void FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln);
 extern void InvalidateLocalBuffer(BufferDesc *bufHdr, bool check_unreferenced);
 extern void DropRelationLocalBuffers(RelFileLocator rlocator,
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index e212f6110f2..fec4e826f62 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -1148,6 +1148,7 @@ ZeroAndLockBuffer(Buffer buffer, ReadBufferMode mode, bool already_valid)
 	BufferDesc *bufHdr;
 	bool		need_to_zero;
 	bool		isLocalBuf = BufferIsLocal(buffer);
+	StartBufferIOResult sbres;
 
 	Assert(mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK);
 
@@ -1159,24 +1160,30 @@ ZeroAndLockBuffer(Buffer buffer, ReadBufferMode mode, bool already_valid)
 		 */
 		need_to_zero = false;
 	}
-	else if (isLocalBuf)
-	{
-		/* Simple case for non-shared buffers. */
-		bufHdr = GetLocalBufferDescriptor(-buffer - 1);
-		need_to_zero = StartLocalBufferIO(bufHdr, true, false);
-	}
 	else
 	{
-		/*
-		 * Take BM_IO_IN_PROGRESS, or discover that BM_VALID has been set
-		 * concurrently.  Even though we aren't doing I/O, that ensures that
-		 * we don't zero a page that someone else has pinned.  An exclusive
-		 * content lock wouldn't be enough, because readers are allowed to
-		 * drop the content lock after determining that a tuple is visible
-		 * (see buffer access rules in README).
-		 */
-		bufHdr = GetBufferDescriptor(buffer - 1);
-		need_to_zero = StartBufferIO(bufHdr, true, false);
+		if (isLocalBuf)
+		{
+			/* Simple case for non-shared buffers. */
+			bufHdr = GetLocalBufferDescriptor(-buffer - 1);
+			sbres = StartLocalBufferIO(bufHdr, true, true, NULL);
+		}
+		else
+		{
+			/*
+			 * Take BM_IO_IN_PROGRESS, or discover that BM_VALID has been set
+			 * concurrently.  Even though we aren't doing I/O, that ensures
+			 * that we don't zero a page that someone else has pinned.  An
+			 * exclusive content lock wouldn't be enough, because readers are
+			 * allowed to drop the content lock after determining that a tuple
+			 * is visible (see buffer access rules in README).
+			 */
+			bufHdr = GetBufferDescriptor(buffer - 1);
+			sbres = StartSharedBufferIO(bufHdr, true, true, NULL);
+		}
+
+		Assert(sbres != BUFFER_IO_IN_PROGRESS);
+		need_to_zero = sbres == BUFFER_IO_READY_FOR_IO;
 	}
 
 	if (need_to_zero)
@@ -1659,45 +1666,6 @@ CheckReadBuffersOperation(ReadBuffersOperation *operation, bool is_complete)
 #endif
 }
 
-/* helper for ReadBuffersCanStartIO(), to avoid repetition */
-static inline bool
-ReadBuffersCanStartIOOnce(Buffer buffer, bool nowait)
-{
-	if (BufferIsLocal(buffer))
-		return StartLocalBufferIO(GetLocalBufferDescriptor(-buffer - 1),
-								  true, nowait);
-	else
-		return StartBufferIO(GetBufferDescriptor(buffer - 1), true, nowait);
-}
-
-/*
- * Helper for AsyncReadBuffers that tries to get the buffer ready for IO.
- */
-static inline bool
-ReadBuffersCanStartIO(Buffer buffer, bool nowait)
-{
-	/*
-	 * If this backend currently has staged IO, we need to submit the pending
-	 * IO before waiting for the right to issue IO, to avoid the potential for
-	 * deadlocks (and, more commonly, unnecessary delays for other backends).
-	 */
-	if (!nowait && pgaio_have_staged())
-	{
-		if (ReadBuffersCanStartIOOnce(buffer, true))
-			return true;
-
-		/*
-		 * Unfortunately StartBufferIO() returning false doesn't allow to
-		 * distinguish between the buffer already being valid and IO already
-		 * being in progress. Since IO already being in progress is quite
-		 * rare, this approach seems fine.
-		 */
-		pgaio_submit_staged();
-	}
-
-	return ReadBuffersCanStartIOOnce(buffer, nowait);
-}
-
 /*
  * We track various stats related to buffer hits. Because this is done in a
  * few separate places, this helper exists for convenience.
@@ -1921,6 +1889,7 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
 	IOContext	io_context;
 	IOObject	io_object;
 	instr_time	io_start;
+	StartBufferIOResult status;
 
 	if (persistence == RELPERSISTENCE_TEMP)
 	{
@@ -1974,8 +1943,8 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
 	pgstat_prepare_report_checksum_failure(operation->smgr->smgr_rlocator.locator.dbOid);
 
 	/*
-	 * Get IO handle before ReadBuffersCanStartIO(), as pgaio_io_acquire()
-	 * might block, which we don't want after setting IO_IN_PROGRESS.
+	 * Get IO handle before StartBufferIO(), as pgaio_io_acquire() might
+	 * block, which we don't want after setting IO_IN_PROGRESS.
 	 *
 	 * If we need to wait for IO before we can get a handle, submit
 	 * already-staged IO first, so that other backends don't need to wait.
@@ -2004,31 +1973,41 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
 	 * for the outcome: either done, or something went wrong and we will
 	 * retry.
 	 */
-	if (!ReadBuffersCanStartIO(buffers[nblocks_done], false))
+	status = StartBufferIO(buffers[nblocks_done], true, true, NULL);
+	if (status != BUFFER_IO_READY_FOR_IO)
 	{
-		/*
-		 * Someone else has already completed this block, we're done.
-		 *
-		 * When IO is necessary, ->nblocks_done is updated in
-		 * ProcessReadBuffersResult(), but that is not called if no IO is
-		 * necessary. Thus update here.
-		 */
-		operation->nblocks_done += 1;
-		*nblocks_progress = 1;
-
 		pgaio_io_release(ioh);
-		pgaio_wref_clear(&operation->io_wref);
-
-		/*
-		 * Report and track this as a 'hit' for this backend, even though it
-		 * must have started out as a miss in PinBufferForBlock(). The other
-		 * backend will track this as a 'read'.
-		 */
-		TrackBufferHit(io_object, io_context,
-					   operation->rel, operation->persistence,
-					   operation->smgr, operation->forknum,
-					   blocknum);
-		return false;
+		*nblocks_progress = 1;
+		if (status == BUFFER_IO_ALREADY_DONE)
+		{
+			/*
+			 * Someone has already completed this block, we're done.
+			 *
+			 * When IO is necessary, ->nblocks_done is updated in
+			 * ProcessReadBuffersResult(), but that is not called if no IO is
+			 * necessary. Thus update here.
+			 */
+			operation->nblocks_done += 1;
+			Assert(operation->nblocks_done <= operation->nblocks);
+
+			Assert(!pgaio_wref_valid(&operation->io_wref));
+
+			/*
+			 * Report and track this as a 'hit' for this backend, even though
+			 * it must have started out as a miss in PinBufferForBlock(). The
+			 * other backend will track this as a 'read'.
+			 */
+			TrackBufferHit(io_object, io_context,
+						   operation->rel, operation->persistence,
+						   operation->smgr, operation->forknum,
+						   blocknum);
+			return false;
+		}
+
+		/* The IO is already in-progress */
+		Assert(status == BUFFER_IO_IN_PROGRESS);
+
+		return true;
 	}
 
 	Assert(io_buffers[0] == buffers[nblocks_done]);
@@ -2037,9 +2016,9 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
 
 	/*
 	 * NB: As little code as possible should be added between the
-	 * ReadBuffersCanStartIO() above, the further ReadBuffersCanStartIO()s
-	 * below and the smgrstartreadv(), as some of the buffers are now marked
-	 * as IO_IN_PROGRESS and will thus cause other backends to wait.
+	 * StartBufferIO() above, the further StartBufferIO()s below and the
+	 * smgrstartreadv(), as some of the buffers are now marked as
+	 * IO_IN_PROGRESS and will thus cause other backends to wait.
 	 */
 
 	/*
@@ -2053,7 +2032,8 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
 		Assert(BufferGetBlockNumber(buffers[i - 1]) ==
 			   BufferGetBlockNumber(buffers[i]) - 1);
 
-		if (!ReadBuffersCanStartIO(buffers[i], true))
+		status = StartBufferIO(buffers[i], true, false, NULL);
+		if (status != BUFFER_IO_READY_FOR_IO)
 			break;
 
 		Assert(io_buffers[io_buffers_len] == buffers[i]);
@@ -2893,16 +2873,23 @@ ExtendBufferedRelShared(BufferManagerRelation bmr,
 			 * We *must* do smgr[zero]extend before succeeding, else the page
 			 * will not be reserved by the kernel, and the next P_NEW call
 			 * will decide to return the same page.  Clear the BM_VALID bit,
-			 * do StartBufferIO() and proceed.
+			 * do StartSharedBufferIO() and proceed.
 			 *
 			 * Loop to handle the very small possibility that someone re-sets
 			 * BM_VALID between our clearing it and StartBufferIO inspecting
 			 * it.
 			 */
-			do
+			while (true)
 			{
+				StartBufferIOResult sbres;
+
 				pg_atomic_fetch_and_u64(&existing_hdr->state, ~BM_VALID);
-			} while (!StartBufferIO(existing_hdr, true, false));
+
+				sbres = StartSharedBufferIO(existing_hdr, true, true, NULL);
+
+				if (sbres != BUFFER_IO_ALREADY_DONE)
+					break;
+			}
 		}
 		else
 		{
@@ -2928,7 +2915,7 @@ ExtendBufferedRelShared(BufferManagerRelation bmr,
 			LWLockRelease(partition_lock);
 
 			/* XXX: could combine the locked operations in it with the above */
-			StartBufferIO(victim_buf_hdr, true, false);
+			StartSharedBufferIO(victim_buf_hdr, true, true, NULL);
 		}
 	}
 
@@ -4450,7 +4437,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
 	 * someone else flushed the buffer before we could, so we need not do
 	 * anything.
 	 */
-	if (!StartBufferIO(buf, false, false))
+	if (StartSharedBufferIO(buf, false, true, NULL) == BUFFER_IO_ALREADY_DONE)
 		return;
 
 	/* Setup error traceback support for ereport() */
@@ -7029,6 +7016,13 @@ WaitIO(BufferDesc *buf)
 {
 	ConditionVariable *cv = BufferDescriptorGetIOCV(buf);
 
+	/*
+	 * Should never end up here with unsubmitted IO, as no AIO unaware code
+	 * may be used while in batch mode and AIO aware code needs to have
+	 * submitted all staged IO to avoid deadlocks & slowness.
+	 */
+	Assert(!pgaio_have_staged());
+
 	ConditionVariablePrepareToSleep(cv);
 	for (;;)
 	{
@@ -7081,30 +7075,47 @@ WaitIO(BufferDesc *buf)
 }
 
 /*
- * StartBufferIO: begin I/O on this buffer
+ * StartSharedBufferIO: begin I/O on this buffer
  *	(Assumptions)
- *	My process is executing no IO on this buffer
  *	The buffer is Pinned
  *
- * In some scenarios multiple backends could attempt the same I/O operation
- * concurrently.  If someone else has already started I/O on this buffer then
- * we will wait for completion of the IO using WaitIO().
+ * In several scenarios the buffer may already be undergoing I/O in this or
+ * another backend. How to best handle that depends on the caller's
+ * situation. It might be appropriate to wait synchronously (e.g., because the
+ * buffer is about to be invalidated); wait asynchronously, using the buffer's
+ * IO wait reference (e.g., because the caller is doing readahead and doesn't
+ * need the buffer to be ready immediately); or to not wait at all (e.g.,
+ * because the caller is trying to combine IO for this buffer with another
+ * buffer).
  *
- * Input operations are only attempted on buffers that are not BM_VALID,
- * and output operations only on buffers that are BM_VALID and BM_DIRTY,
- * so we can always tell if the work is already done.
+ * How and whether to wait is controlled by the wait in io_wref parameters. In
+ * detail:
  *
- * Returns true if we successfully marked the buffer as I/O busy,
- * false if someone else already did the work.
+ * - If the caller passes a non-NULL io_wref and the buffer has an I/O wait
+ *   reference, the *io_wref is set to the buffer's io_wref and
+ *   BUFFER_IO_IN_PROGRESS is returned. This is done regardless of the wait
+ *   parameter.
  *
- * If nowait is true, then we don't wait for an I/O to be finished by another
- * backend.  In that case, false indicates either that the I/O was already
- * finished, or is still in progress.  This is useful for callers that want to
- * find out if they can perform the I/O as part of a larger operation, without
- * waiting for the answer or distinguishing the reasons why not.
+ * - If the caller passes a NULL io_wref (i.e. the caller does not want to
+ *   asynchronously wait for the completion of the IO), wait = false and the
+ *   buffer is undergoing IO, BUFFER_IO_IN_PROGRESS is returned
+ *
+ * - If wait = true and either the buffer does not have a wait reference,
+ *   or the caller passes io_wref = NULL, WaitIO() is used to wait for the IO
+ *   to complete.  To avoid the potential of deadlocks and unnecessary delays,
+ *   all staged I/O is submitted before waiting.
+ *
+ *
+ * Input operations are only attempted on buffers that are not BM_VALID, and
+ * output operations only on buffers that are BM_VALID and BM_DIRTY, so we can
+ * always tell if the work is already done.  If no I/O is necessary,
+ * BUFFER_IO_ALREADY_DONE is returned.
+ *
+ * If we successfully marked the buffer as BM_IO_IN_PROGRESS,
+ * BUFFER_IO_READY_FOR_IO is returned.
  */
-bool
-StartBufferIO(BufferDesc *buf, bool forInput, bool nowait)
+StartBufferIOResult
+StartSharedBufferIO(BufferDesc *buf, bool forInput, bool wait, PgAioWaitRef *io_wref)
 {
 	uint64		buf_state;
 
@@ -7116,10 +7127,42 @@ StartBufferIO(BufferDesc *buf, bool forInput, bool nowait)
 
 		if (!(buf_state & BM_IO_IN_PROGRESS))
 			break;
-		UnlockBufHdr(buf);
-		if (nowait)
-			return false;
-		WaitIO(buf);
+
+		/* Join the existing IO */
+		if (io_wref != NULL && pgaio_wref_valid(&buf->io_wref))
+		{
+			*io_wref = buf->io_wref;
+			UnlockBufHdr(buf);
+
+			return BUFFER_IO_IN_PROGRESS;
+		}
+		else if (!wait)
+		{
+			UnlockBufHdr(buf);
+			return BUFFER_IO_IN_PROGRESS;
+		}
+		else
+		{
+			/*
+			 * With wait = true, we always have to wait if the caller has
+			 * passed io_wref = NULL.
+			 *
+			 * Even with io_wref != NULL, we have to wait if the buffer's wait
+			 * ref is not valid but the IO is in progress, someone else
+			 * started IO but hasn't set the wait ref yet. We have no choice
+			 * but to wait until the IO completes.
+			 */
+			UnlockBufHdr(buf);
+
+			/*
+			 * If this backend currently has staged IO, submit it before
+			 * waiting for in-progress IO, to avoid potential deadlocks and
+			 * unnecessary delays.
+			 */
+			pgaio_submit_staged();
+
+			WaitIO(buf);
+		}
 	}
 
 	/* Once we get here, there is definitely no I/O active on this buffer */
@@ -7128,9 +7171,14 @@ StartBufferIO(BufferDesc *buf, bool forInput, bool nowait)
 	if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY))
 	{
 		UnlockBufHdr(buf);
-		return false;
+		return BUFFER_IO_ALREADY_DONE;
 	}
 
+	/*
+	 * No IO in progress and not already done; We will start IO. It's possible
+	 * that the IO was in progress but we're not done, because the IO errored
+	 * out. We'll do the IO ourselves.
+	 */
 	UnlockBufHdrExt(buf, buf_state,
 					BM_IO_IN_PROGRESS, 0,
 					0);
@@ -7138,7 +7186,31 @@ StartBufferIO(BufferDesc *buf, bool forInput, bool nowait)
 	ResourceOwnerRememberBufferIO(CurrentResourceOwner,
 								  BufferDescriptorGetBuffer(buf));
 
-	return true;
+	return BUFFER_IO_READY_FOR_IO;
+}
+
+/*
+ * Wrapper around StartSharedBufferIO / StartLocalBufferIO. Only to be used
+ * when the caller doesn't otherwise need to care about local vs shared. See
+ * StartSharedBufferIO() for details.
+ */
+StartBufferIOResult
+StartBufferIO(Buffer buffer, bool forInput, bool wait, PgAioWaitRef *io_wref)
+{
+	BufferDesc *buf_hdr;
+
+	if (BufferIsLocal(buffer))
+	{
+		buf_hdr = GetLocalBufferDescriptor(-buffer - 1);
+
+		return StartLocalBufferIO(buf_hdr, forInput, wait, io_wref);
+	}
+	else
+	{
+		buf_hdr = GetBufferDescriptor(buffer - 1);
+
+		return StartSharedBufferIO(buf_hdr, forInput, wait, io_wref);
+	}
 }
 
 /*
diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c
index 404c6bccbdd..64da741e101 100644
--- a/src/backend/storage/buffer/localbuf.c
+++ b/src/backend/storage/buffer/localbuf.c
@@ -191,7 +191,7 @@ FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln)
 	 * Try to start an I/O operation.  There currently are no reasons for
 	 * StartLocalBufferIO to return false, so we raise an error in that case.
 	 */
-	if (!StartLocalBufferIO(bufHdr, false, false))
+	if (StartLocalBufferIO(bufHdr, false, true, NULL) != BUFFER_IO_READY_FOR_IO)
 		elog(ERROR, "failed to start write IO on local buffer");
 
 	/* Find smgr relation for buffer */
@@ -435,7 +435,7 @@ ExtendBufferedRelLocal(BufferManagerRelation bmr,
 			pg_atomic_unlocked_write_u64(&existing_hdr->state, buf_state);
 
 			/* no need to loop for local buffers */
-			StartLocalBufferIO(existing_hdr, true, false);
+			StartLocalBufferIO(existing_hdr, true, true, NULL);
 		}
 		else
 		{
@@ -451,7 +451,7 @@ ExtendBufferedRelLocal(BufferManagerRelation bmr,
 
 			hresult->id = victim_buf_id;
 
-			StartLocalBufferIO(victim_buf_hdr, true, false);
+			StartLocalBufferIO(victim_buf_hdr, true, true, NULL);
 		}
 	}
 
@@ -517,26 +517,41 @@ MarkLocalBufferDirty(Buffer buffer)
 }
 
 /*
- * Like StartBufferIO, but for local buffers
+ * Like StartSharedBufferIO, but for local buffers
  */
-bool
-StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool nowait)
+StartBufferIOResult
+StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool wait, PgAioWaitRef *io_wref)
 {
 	uint64		buf_state;
 
 	/*
 	 * With AIO the buffer could have IO in progress, e.g. when there are two
-	 * scans of the same relation. Either wait for the other IO or return
-	 * false.
+	 * scans of the same relation.  Either wait for the other IO (if wait =
+	 * true and io_wref == NULL) or return BUFFER_IO_IN_PROGRESS;
 	 */
 	if (pgaio_wref_valid(&bufHdr->io_wref))
 	{
-		PgAioWaitRef iow = bufHdr->io_wref;
+		PgAioWaitRef buf_wref = bufHdr->io_wref;
 
-		if (nowait)
-			return false;
+		if (io_wref != NULL)
+		{
+			/* We've already asynchronously started this IO, so join it */
+			*io_wref = buf_wref;
+			return BUFFER_IO_IN_PROGRESS;
+		}
 
-		pgaio_wref_wait(&iow);
+		/*
+		 * For temp buffers we should never need to wait in
+		 * StartLocalBufferIO() when called with io_wref == NULL while there
+		 * are staged IOs, as it's not allowed to call code that is not aware
+		 * of AIO while in batch mode.
+		 */
+		Assert(!pgaio_have_staged());
+
+		if (!wait)
+			return BUFFER_IO_IN_PROGRESS;
+
+		pgaio_wref_wait(&buf_wref);
 	}
 
 	/* Once we get here, there is definitely no I/O active on this buffer */
@@ -545,14 +560,14 @@ StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool nowait)
 	buf_state = pg_atomic_read_u64(&bufHdr->state);
 	if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY))
 	{
-		return false;
+		return BUFFER_IO_ALREADY_DONE;
 	}
 
 	/* BM_IO_IN_PROGRESS isn't currently used for local buffers */
 
 	/* local buffers don't track IO using resowners */
 
-	return true;
+	return BUFFER_IO_READY_FOR_IO;
 }
 
 /*
diff --git a/src/test/modules/test_aio/t/001_aio.pl b/src/test/modules/test_aio/t/001_aio.pl
index bde92eeb7d4..c0f258890ee 100644
--- a/src/test/modules/test_aio/t/001_aio.pl
+++ b/src/test/modules/test_aio/t/001_aio.pl
@@ -383,7 +383,7 @@ sub test_startwait_io
 		$io_method,
 		$psql_a,
 		"first StartBufferIO",
-		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false);),
+		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true);),
 		qr/^t$/,
 		qr/^$/);
 
@@ -392,14 +392,14 @@ sub test_startwait_io
 		$io_method,
 		$psql_a,
 		"second StartBufferIO fails, same session",
-		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>true);),
+		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>false);),
 		qr/^f$/,
 		qr/^$/);
 	psql_like(
 		$io_method,
 		$psql_b,
 		"second StartBufferIO fails, other session",
-		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>true);),
+		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>false);),
 		qr/^f$/,
 		qr/^$/);
 
@@ -409,7 +409,7 @@ sub test_startwait_io
 		$node,
 		$psql_b,
 		"blocking start buffer io",
-		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false);),
+		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true);),
 		"BufferIo");
 
 	# Terminate the IO, without marking it as success, this should trigger the
@@ -438,7 +438,7 @@ sub test_startwait_io
 		$io_method,
 		$psql_a,
 		"blocking buffer io w/ success: first start buffer io",
-		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false);),
+		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true);),
 		qr/^t$/,
 		qr/^$/);
 
@@ -448,7 +448,7 @@ sub test_startwait_io
 		$node,
 		$psql_b,
 		"blocking start buffer io",
-		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false);),
+		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true);),
 		"BufferIo");
 
 	# Terminate the IO, marking it as success
@@ -486,7 +486,7 @@ INSERT INTO tmp_ok SELECT generate_series(1, 10000);
 		$io_method,
 		$psql_a,
 		"first StartLocalBufferIO",
-		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>true);),
+		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>false);),
 		qr/^t$/,
 		qr/^$/);
 
@@ -497,7 +497,7 @@ INSERT INTO tmp_ok SELECT generate_series(1, 10000);
 		$io_method,
 		$psql_a,
 		"second StartLocalBufferIO succeeds, same session",
-		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>true);),
+		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>false);),
 		qr/^t$/,
 		qr/^$/);
 
@@ -509,7 +509,7 @@ INSERT INTO tmp_ok SELECT generate_series(1, 10000);
 		$io_method,
 		$psql_a,
 		"StartLocalBufferIO after not marking valid succeeds, same session",
-		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>true);),
+		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>false);),
 		qr/^t$/,
 		qr/^$/);
 
@@ -524,7 +524,7 @@ INSERT INTO tmp_ok SELECT generate_series(1, 10000);
 		$io_method,
 		$psql_a,
 		"StartLocalBufferIO after marking valid fails",
-		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false);),
+		qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true);),
 		qr/^f$/,
 		qr/^$/);
 
@@ -1608,7 +1608,7 @@ INSERT INTO tmp_ok SELECT generate_series(1, 5000);
 		my $buf_id =
 		  $psql_b->query_safe(qq|SELECT buffer_create_toy('tbl_ok', 3)|);
 		$psql_b->query_safe(
-			qq|SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false)|
+			qq|SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true)|
 		);
 
 		query_wait_block(
@@ -1633,7 +1633,7 @@ INSERT INTO tmp_ok SELECT generate_series(1, 5000);
 		$buf_id =
 		  $psql_b->query_safe(qq|SELECT buffer_create_toy('tbl_ok', 3)|);
 		$psql_b->query_safe(
-			qq|SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false)|
+			qq|SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true)|
 		);
 
 		query_wait_block(
diff --git a/src/test/modules/test_aio/test_aio--1.0.sql b/src/test/modules/test_aio/test_aio--1.0.sql
index d42ba939f9a..c3dc3402ce7 100644
--- a/src/test/modules/test_aio/test_aio--1.0.sql
+++ b/src/test/modules/test_aio/test_aio--1.0.sql
@@ -45,7 +45,7 @@ CREATE FUNCTION buffer_create_toy(rel regclass, blockno int4)
 RETURNS pg_catalog.int4 STRICT
 AS 'MODULE_PATHNAME' LANGUAGE C;
 
-CREATE FUNCTION buffer_call_start_io(buffer int, for_input bool, nowait bool)
+CREATE FUNCTION buffer_call_start_io(buffer int, for_input bool, wait bool)
 RETURNS pg_catalog.bool STRICT
 AS 'MODULE_PATHNAME' LANGUAGE C;
 
diff --git a/src/test/modules/test_aio/test_aio.c b/src/test/modules/test_aio/test_aio.c
index c86a0885a5a..66ba6507e1c 100644
--- a/src/test/modules/test_aio/test_aio.c
+++ b/src/test/modules/test_aio/test_aio.c
@@ -434,13 +434,13 @@ read_rel_block_ll(PG_FUNCTION_ARGS)
 	if (RelationUsesLocalBuffers(rel))
 	{
 		for (int i = 0; i < nblocks; i++)
-			StartLocalBufferIO(buf_hdrs[i], true, false);
+			StartLocalBufferIO(buf_hdrs[i], true, true, NULL);
 		pgaio_io_set_flag(ioh, PGAIO_HF_REFERENCES_LOCAL);
 	}
 	else
 	{
 		for (int i = 0; i < nblocks; i++)
-			StartBufferIO(buf_hdrs[i], true, false);
+			StartSharedBufferIO(buf_hdrs[i], true, true, NULL);
 	}
 
 	pgaio_io_set_handle_data_32(ioh, (uint32 *) bufs, nblocks);
@@ -623,15 +623,18 @@ buffer_call_start_io(PG_FUNCTION_ARGS)
 {
 	Buffer		buf = PG_GETARG_INT32(0);
 	bool		for_input = PG_GETARG_BOOL(1);
-	bool		nowait = PG_GETARG_BOOL(2);
+	bool		wait = PG_GETARG_BOOL(2);
+	StartBufferIOResult result;
 	bool		can_start;
 
 	if (BufferIsLocal(buf))
-		can_start = StartLocalBufferIO(GetLocalBufferDescriptor(-buf - 1),
-									   for_input, nowait);
+		result = StartLocalBufferIO(GetLocalBufferDescriptor(-buf - 1),
+									for_input, wait, NULL);
 	else
-		can_start = StartBufferIO(GetBufferDescriptor(buf - 1),
-								  for_input, nowait);
+		result = StartSharedBufferIO(GetBufferDescriptor(buf - 1),
+									 for_input, wait, NULL);
+
+	can_start = result == BUFFER_IO_READY_FOR_IO;
 
 	/*
 	 * For tests we don't want the resowner release preventing us from
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index e097613eb66..67524535026 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2944,6 +2944,7 @@ SplitTextOutputData
 SplitVar
 StackElem
 StakindFlags
+StartBufferIOResult
 StartDataPtrType
 StartLOPtrType
 StartLOsPtrType
-- 
2.53.0.1.gb2826b52eb

>From 11ee2f2b6f3a9c0e757fce992bce4befbbcc07f1 Mon Sep 17 00:00:00 2001
From: Andres Freund <[email protected]>
Date: Wed, 25 Mar 2026 16:42:00 -0400
Subject: [PATCH v8 9/9] aio: Don't wait for already in-progress IO

When a backend attempts to start a read IO and finds the first buffer
already has I/O in progress, previously it waited for that I/O to
complete before initiating reads for any of the subsequent buffers.

Although the backend must wait for the I/O to finish when acquiring the
buffer, there's no reason for it to wait when setting up the read
operation. Waiting at this point prevents the backend from starting I/O
on subsequent buffers and can significantly reduce concurrency.

This matters in two workloads: when multiple backends scan the same
relation concurrently, and when a single backend requests the same block
multiple times within the readahead distance.

If backends wait each time they encounter an in-progress read,
the access pattern effectively degenerates into synchronous I/O.

To fix this, when encountering an already in-progress IO for the head
buffer, a backend now records the buffer's wait reference and defers
waiting until WaitReadBuffers(), when it actually needs to acquire the
buffer.

In rare cases, a backend may still need to wait synchronously at IO
start time: if another backend has set BM_IO_IN_PROGRESS on the buffer
but has not yet set the wait reference. Such windows should be brief and
uncommon.

Author: Melanie Plageman <[email protected]>
Reviewed-by: Andres Freund <[email protected]>
Reviewed-by: Nazir Bilal Yavuz <[email protected]>
Discussion: https://postgr.es/m/flat/zljergweqti7x67lg5ije2rzjusie37nslsnkjkkby4laqqbfw%403p3zu522yykv
---
 src/include/storage/bufmgr.h        |  4 +-
 src/backend/storage/buffer/bufmgr.c | 89 ++++++++++++++++++++++++-----
 2 files changed, 78 insertions(+), 15 deletions(-)

diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index 4017896f951..dd41b92f944 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -144,9 +144,11 @@ struct ReadBuffersOperation
 	 */
 	Buffer	   *buffers;
 	BlockNumber blocknum;
-	int			flags;
+	uint16		flags;
 	int16		nblocks;
 	int16		nblocks_done;
+	/* true if waiting on another backend's IO */
+	bool		foreign_io;
 	PgAioWaitRef io_wref;
 	PgAioReturn io_return;
 };
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index fec4e826f62..cc6bdc7d2bb 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -1800,8 +1800,11 @@ WaitReadBuffers(ReadBuffersOperation *operation)
 			 * b) reports some time as waiting, even if we never waited
 			 *
 			 * we first check if we already know the IO is complete.
+			 *
+			 * Note that operation->io_return is uninitialized for foreign IO,
+			 * so we cannot use the cheaper PGAIO_RS_UNKNOWN pre-check.
 			 */
-			if (aio_ret->result.status == PGAIO_RS_UNKNOWN &&
+			if ((operation->foreign_io || aio_ret->result.status == PGAIO_RS_UNKNOWN) &&
 				!pgaio_wref_check_done(&operation->io_wref))
 			{
 				instr_time	io_start = pgstat_prepare_io_time(track_io_timing);
@@ -1820,11 +1823,45 @@ WaitReadBuffers(ReadBuffersOperation *operation)
 				Assert(pgaio_wref_check_done(&operation->io_wref));
 			}
 
-			/*
-			 * We now are sure the IO completed. Check the results. This
-			 * includes reporting on errors if there were any.
-			 */
-			ProcessReadBuffersResult(operation);
+			if (unlikely(operation->foreign_io))
+			{
+				Buffer		buffer = operation->buffers[operation->nblocks_done];
+				BufferDesc *desc = BufferIsLocal(buffer) ?
+					GetLocalBufferDescriptor(-buffer - 1) :
+					GetBufferDescriptor(buffer - 1);
+				uint64		buf_state = pg_atomic_read_u64(&desc->state);
+
+				if (buf_state & BM_VALID)
+				{
+					BlockNumber blocknum = operation->blocknum + operation->nblocks_done;
+
+					operation->nblocks_done += 1;
+					Assert(operation->nblocks_done <= operation->nblocks);
+
+					/*
+					 * Track this as a 'hit' for this backend. The backend
+					 * performing the IO will track it as a 'read'.
+					 */
+					TrackBufferHit(io_object, io_context,
+								   operation->rel, operation->persistence,
+								   operation->smgr, operation->forknum,
+								   blocknum);
+				}
+
+				/*
+				 * If the foreign IO failed and left the buffer invalid,
+				 * nblocks_done is not incremented. The retry loop below will
+				 * call AsyncReadBuffers() which will attempt the IO itself.
+				 */
+			}
+			else
+			{
+				/*
+				 * We now are sure the IO completed. Check the results. This
+				 * includes reporting on errors if there were any.
+				 */
+				ProcessReadBuffersResult(operation);
+			}
 		}
 
 		/*
@@ -1870,7 +1907,8 @@ WaitReadBuffers(ReadBuffersOperation *operation)
  * affected by the call. If the first buffer is valid, *nblocks_progress is
  * set to 1 and operation->nblocks_done is incremented.
  *
- * Returns true if IO was initiated, false if no IO was necessary.
+ * Returns true if IO was initiated or is already in progress (foreign IO),
+ * false if the buffer was already valid.
  */
 static bool
 AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
@@ -1943,8 +1981,9 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
 	pgstat_prepare_report_checksum_failure(operation->smgr->smgr_rlocator.locator.dbOid);
 
 	/*
-	 * Get IO handle before StartBufferIO(), as pgaio_io_acquire() might
-	 * block, which we don't want after setting IO_IN_PROGRESS.
+	 * We must get an IO handle before StartBufferIO(), as pgaio_io_acquire()
+	 * might block, which we don't want after setting IO_IN_PROGRESS. If we
+	 * don't need to do the IO, we'll release the handle.
 	 *
 	 * If we need to wait for IO before we can get a handle, submit
 	 * already-staged IO first, so that other backends don't need to wait.
@@ -1966,14 +2005,34 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
 		ioh = pgaio_io_acquire(CurrentResourceOwner, &operation->io_return);
 	}
 
+	operation->foreign_io = false;
+	pgaio_wref_clear(&operation->io_wref);
+
 	/*
-	 * Check if we can start IO on the first to-be-read buffer.
+	 * Try to start IO on the first buffer in a new run of blocks. If AIO is
+	 * in progress, be it in this backend or another backend, we just
+	 * associate the wait reference with the operation and wait in
+	 * WaitReadBuffers(). This turns out to be important for performance in
+	 * two workloads:
 	 *
-	 * If an I/O is already in progress in another backend, we want to wait
-	 * for the outcome: either done, or something went wrong and we will
-	 * retry.
+	 * 1) A read stream that has to read the same block multiple times within
+	 * the readahead distance. This can happen e.g. for the table accesses of
+	 * an index scan.
+	 *
+	 * 2) Concurrent scans by multiple backends on the same relation.
+	 *
+	 * If we were to synchronously wait for the in-progress IO, we'd not be
+	 * able to keep enough I/O in flight.
+	 *
+	 * If we do find there is ongoing I/O for the buffer, we set up a 1-block
+	 * ReadBuffersOperation that WaitReadBuffers then can wait on.
+	 *
+	 * It's possible that another backend has started IO on the buffer but not
+	 * yet set its wait reference. In this case, we have no choice but to wait
+	 * for either the wait reference to be valid or the IO to be done.
 	 */
-	status = StartBufferIO(buffers[nblocks_done], true, true, NULL);
+	status = StartBufferIO(buffers[nblocks_done], true, true,
+						   &operation->io_wref);
 	if (status != BUFFER_IO_READY_FOR_IO)
 	{
 		pgaio_io_release(ioh);
@@ -2006,6 +2065,8 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
 
 		/* The IO is already in-progress */
 		Assert(status == BUFFER_IO_IN_PROGRESS);
+		Assert(pgaio_wref_valid(&operation->io_wref));
+		operation->foreign_io = true;
 
 		return true;
 	}
-- 
2.53.0.1.gb2826b52eb

Reply via email to