Hi, Attached is an updated set of patches. I fixed the bug that Melanie noticed, updated the comments a bit further and added a new commit that adds a decent bit of coverage of StartReadBuffers(), including all the cases in StartSharedBufferIO and most of the cases in StartLocalBufferIO().
I'm planning to commit 0001 soon - it hasn't changed in a while. Then I'd like to get 0002 committed soon after, but I'll hold off for that until tomorrow, given that nobody has looked at it (as it's new). I think 0004-0007 can be committed too, but I am not sure if you (Melanie) want to do so. I'd like to get the rest committed tomorrow too. Greetings, Andres Freund
>From e2381eb540751eebb39a25797625a677dfa71387 Mon Sep 17 00:00:00 2001 From: Andres Freund <[email protected]> Date: Wed, 25 Mar 2026 10:43:56 -0400 Subject: [PATCH v8 1/9] aio: Refactor tests in preparation for more tests In a future commit more AIO related tests are due to be introduced. However 001_aio.pl already is fairly large. This commit introduces a new TestAio package with helpers for writing AIO related tests. Then it uses the new helpers to simplify the existing 001_aio.pl by iterating over all supported io_methods. This will be particularly helpful because additional methods already have been submitted. Additionally this commit splits out testing of initdb using a non-default method into its own test. While that test is somewhat important, it's fairly slow and doesn't break that often. For development velocity it's helpful for 001_aio.pl to be faster. While particularly the latter could benefit from being its own commit, it seems to introduce more back-and-forth than it's worth. Author: Andres Freund <[email protected]> Reviewed-by: Nazir Bilal Yavuz <[email protected]> Reviewed-by: Melanie Plageman <[email protected]> Discussion: https://postgr.es/m/zljergweqti7x67lg5ije2rzjusie37nslsnkjkkby4laqqbfw@3p3zu522yykv --- src/test/modules/test_aio/meson.build | 1 + src/test/modules/test_aio/t/001_aio.pl | 145 +++++++--------------- src/test/modules/test_aio/t/003_initdb.pl | 71 +++++++++++ src/test/modules/test_aio/t/TestAio.pm | 90 ++++++++++++++ 4 files changed, 206 insertions(+), 101 deletions(-) create mode 100644 src/test/modules/test_aio/t/003_initdb.pl create mode 100644 src/test/modules/test_aio/t/TestAio.pm diff --git a/src/test/modules/test_aio/meson.build b/src/test/modules/test_aio/meson.build index fefa25bc5ab..18a797f3a3b 100644 --- a/src/test/modules/test_aio/meson.build +++ b/src/test/modules/test_aio/meson.build @@ -32,6 +32,7 @@ tests += { 'tests': [ 't/001_aio.pl', 't/002_io_workers.pl', + 't/003_initdb.pl', ], }, } diff --git a/src/test/modules/test_aio/t/001_aio.pl b/src/test/modules/test_aio/t/001_aio.pl index 5c634ec3ca9..e18b2a2b8ae 100644 --- a/src/test/modules/test_aio/t/001_aio.pl +++ b/src/test/modules/test_aio/t/001_aio.pl @@ -7,53 +7,48 @@ use PostgreSQL::Test::Cluster; use PostgreSQL::Test::Utils; use Test::More; +use FindBin; +use lib $FindBin::RealBin; -### -# Test io_method=worker -### -my $node_worker = create_node('worker'); -$node_worker->start(); - -test_generic('worker', $node_worker); -SKIP: -{ - skip 'Injection points not supported by this build', 1 - unless $ENV{enable_injection_points} eq 'yes'; - test_inject_worker('worker', $node_worker); -} +use TestAio; -$node_worker->stop(); +my @methods = TestAio::supported_io_methods(); +my %nodes; ### -# Test io_method=io_uring +# Create and configure one instance for each io_method ### -if (have_io_uring()) +foreach my $method (@methods) { - my $node_uring = create_node('io_uring'); - $node_uring->start(); - test_generic('io_uring', $node_uring); - $node_uring->stop(); -} + my $node = PostgreSQL::Test::Cluster->new($method); + $nodes{$method} = $node; + $node->init(); + $node->append_conf('postgresql.conf', "io_method=$method"); + TestAio::configure($node); +} -### -# Test io_method=sync -### - -my $node_sync = create_node('sync'); - -# just to have one test not use the default auto-tuning - -$node_sync->append_conf( +# Just to have one test not use the default auto-tuning +$nodes{'sync'}->append_conf( 'postgresql.conf', qq( -io_max_concurrency=4 + io_max_concurrency=4 )); -$node_sync->start(); -test_generic('sync', $node_sync); -$node_sync->stop(); + +### +# Execute the tests for each io_method +### + +foreach my $method (@methods) +{ + my $node = $nodes{$method}; + + $node->start(); + test_io_method($method, $node); + $node->stop(); +} done_testing(); @@ -62,71 +57,6 @@ done_testing(); # Test Helpers ### -sub create_node -{ - local $Test::Builder::Level = $Test::Builder::Level + 1; - - my $io_method = shift; - - my $node = PostgreSQL::Test::Cluster->new($io_method); - - # Want to test initdb for each IO method, otherwise we could just reuse - # the cluster. - # - # Unfortunately Cluster::init() puts PG_TEST_INITDB_EXTRA_OPTS after the - # options specified by ->extra, if somebody puts -c io_method=xyz in - # PG_TEST_INITDB_EXTRA_OPTS it would break this test. Fix that up if we - # detect it. - local $ENV{PG_TEST_INITDB_EXTRA_OPTS} = $ENV{PG_TEST_INITDB_EXTRA_OPTS}; - if (defined $ENV{PG_TEST_INITDB_EXTRA_OPTS} - && $ENV{PG_TEST_INITDB_EXTRA_OPTS} =~ m/io_method=/) - { - $ENV{PG_TEST_INITDB_EXTRA_OPTS} .= " -c io_method=$io_method"; - } - - $node->init(extra => [ '-c', "io_method=$io_method" ]); - - $node->append_conf( - 'postgresql.conf', qq( -shared_preload_libraries=test_aio -log_min_messages = 'DEBUG3' -log_statement=all -log_error_verbosity=default -restart_after_crash=false -temp_buffers=100 -)); - - # Even though we used -c io_method=... above, if TEMP_CONFIG sets - # io_method, it'd override the setting persisted at initdb time. While - # using (and later verifying) the setting from initdb provides some - # verification of having used the io_method during initdb, it's probably - # not worth the complication of only appending if the variable is set in - # in TEMP_CONFIG. - $node->append_conf( - 'postgresql.conf', qq( -io_method=$io_method -)); - - ok(1, "$io_method: initdb"); - - return $node; -} - -sub have_io_uring -{ - # To detect if io_uring is supported, we look at the error message for - # assigning an invalid value to an enum GUC, which lists all the valid - # options. We need to use -C to deal with running as administrator on - # windows, the superuser check is omitted if -C is used. - my ($stdout, $stderr) = - run_command [qw(postgres -C invalid -c io_method=invalid)]; - die "can't determine supported io_method values" - unless $stderr =~ m/Available values: ([^\.]+)\./; - my $methods = $1; - note "supported io_method values are: $methods"; - - return ($methods =~ m/io_uring/) ? 1 : 0; -} sub psql_like { @@ -1490,8 +1420,8 @@ SELECT read_rel_block_ll('tbl_cs_fail', 3, nblocks=>1, zero_on_error=>true);), } -# Run all tests that are supported for all io_methods -sub test_generic +# Run all tests that for the specified node / io_method +sub test_io_method { my $io_method = shift; my $node = shift; @@ -1526,10 +1456,23 @@ CHECKPOINT; test_ignore_checksum($io_method, $node); test_checksum_createdb($io_method, $node); + # generic injection tests SKIP: { skip 'Injection points not supported by this build', 1 unless $ENV{enable_injection_points} eq 'yes'; test_inject($io_method, $node); } + + # worker specific injection tests + if ($io_method eq 'worker') + { + SKIP: + { + skip 'Injection points not supported by this build', 1 + unless $ENV{enable_injection_points} eq 'yes'; + + test_inject_worker($io_method, $node); + } + } } diff --git a/src/test/modules/test_aio/t/003_initdb.pl b/src/test/modules/test_aio/t/003_initdb.pl new file mode 100644 index 00000000000..c03ae58d00a --- /dev/null +++ b/src/test/modules/test_aio/t/003_initdb.pl @@ -0,0 +1,71 @@ +# Copyright (c) 2024-2025, PostgreSQL Global Development Group +# +# Test initdb for each IO method. This is done separately from 001_aio.pl, as +# it isn't fast. This way the more commonly failing / hacked-on 001_aio.pl can +# be iterated on more quickly. + +use strict; +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +use FindBin; +use lib $FindBin::RealBin; + +use TestAio; + + +foreach my $method (TestAio::supported_io_methods()) +{ + test_create_node($method); +} + +done_testing(); + + +sub test_create_node +{ + local $Test::Builder::Level = $Test::Builder::Level + 1; + + my $io_method = shift; + + my $node = PostgreSQL::Test::Cluster->new($io_method); + + # Want to test initdb for each IO method, otherwise we could just reuse + # the cluster. + # + # Unfortunately Cluster::init() puts PG_TEST_INITDB_EXTRA_OPTS after the + # options specified by ->extra, if somebody puts -c io_method=xyz in + # PG_TEST_INITDB_EXTRA_OPTS it would break this test. Fix that up if we + # detect it. + local $ENV{PG_TEST_INITDB_EXTRA_OPTS} = $ENV{PG_TEST_INITDB_EXTRA_OPTS}; + if (defined $ENV{PG_TEST_INITDB_EXTRA_OPTS} + && $ENV{PG_TEST_INITDB_EXTRA_OPTS} =~ m/io_method=/) + { + $ENV{PG_TEST_INITDB_EXTRA_OPTS} .= " -c io_method=$io_method"; + } + + $node->init(extra => [ '-c', "io_method=$io_method" ]); + + TestAio::configure($node); + + # Even though we used -c io_method=... above, if TEMP_CONFIG sets + # io_method, it'd override the setting persisted at initdb time. While + # using (and later verifying) the setting from initdb provides some + # verification of having used the io_method during initdb, it's probably + # not worth the complication of only appending if the variable is set in + # in TEMP_CONFIG. + $node->append_conf( + 'postgresql.conf', qq( +io_method=$io_method +)); + + ok(1, "$io_method: initdb"); + + $node->start(); + $node->stop(); + ok(1, "$io_method: start & stop"); + + return $node; +} diff --git a/src/test/modules/test_aio/t/TestAio.pm b/src/test/modules/test_aio/t/TestAio.pm new file mode 100644 index 00000000000..5bc80a9b130 --- /dev/null +++ b/src/test/modules/test_aio/t/TestAio.pm @@ -0,0 +1,90 @@ +# Copyright (c) 2024-2025, PostgreSQL Global Development Group + +=pod + +=head1 NAME + +TestAio - helpers for writing AIO related tests + +=cut + +package TestAio; + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + + +=pod + +=head1 METHODS + +=over + +=item TestAio::supported_io_methods() + +Return an array of all the supported values for the io_method GUC + +=cut + +sub supported_io_methods() +{ + my @io_methods = ('worker'); + + push(@io_methods, "io_uring") if have_io_uring(); + + # Return sync last, as it will least commonly fail + push(@io_methods, "sync"); + + return @io_methods; +} + + +=item TestAio::configure() + +Prepare a cluster for AIO test + +=cut + +sub configure +{ + my $node = shift; + + $node->append_conf( + 'postgresql.conf', qq( +shared_preload_libraries=test_aio +log_min_messages = 'DEBUG3' +log_statement=all +log_error_verbosity=default +restart_after_crash=false +temp_buffers=100 +)); + +} + + +=pod + +=item TestAio::have_io_uring() + +Return if io_uring is supported + +=cut + +sub have_io_uring +{ + # To detect if io_uring is supported, we look at the error message for + # assigning an invalid value to an enum GUC, which lists all the valid + # options. We need to use -C to deal with running as administrator on + # windows, the superuser check is omitted if -C is used. + my ($stdout, $stderr) = + run_command [qw(postgres -C invalid -c io_method=invalid)]; + die "can't determine supported io_method values" + unless $stderr =~ m/Available values: ([^\.]+)\./; + my $methods = $1; + note "supported io_method values are: $methods"; + + return ($methods =~ m/io_uring/) ? 1 : 0; +} + +1; -- 2.53.0.1.gb2826b52eb
>From 74cff50281b53098af676c38e65b1c8fce2ef0bd Mon Sep 17 00:00:00 2001 From: Andres Freund <[email protected]> Date: Wed, 25 Mar 2026 14:42:39 -0400 Subject: [PATCH v8 2/9] test_aio: Add basic tests for StartReadBuffers() Upcoming commits will change StartReadBuffers() and its building blocks, making it worthwhile to have tests explicitly of StartReadBuffers(). Discussion: https://postgr.es/m/zljergweqti7x67lg5ije2rzjusie37nslsnkjkkby4laqqbfw@3p3zu522yykv --- src/test/modules/test_aio/t/001_aio.pl | 239 ++++++++++++++++++++ src/test/modules/test_aio/test_aio--1.0.sql | 7 + src/test/modules/test_aio/test_aio.c | 177 ++++++++++++++- 3 files changed, 415 insertions(+), 8 deletions(-) diff --git a/src/test/modules/test_aio/t/001_aio.pl b/src/test/modules/test_aio/t/001_aio.pl index e18b2a2b8ae..bde92eeb7d4 100644 --- a/src/test/modules/test_aio/t/001_aio.pl +++ b/src/test/modules/test_aio/t/001_aio.pl @@ -1420,6 +1420,244 @@ SELECT read_rel_block_ll('tbl_cs_fail', 3, nblocks=>1, zero_on_error=>true);), } +# Tests for StartReadBuffers() +sub test_read_buffers +{ + my $io_method = shift; + my $node = shift; + my ($ret, $output); + my $table; + my $persistency; + + my $psql_a = $node->background_psql('postgres', on_error_stop => 0); + my $psql_b = $node->background_psql('postgres', on_error_stop => 0); + + $psql_a->query_safe( + qq( +CREATE TEMPORARY TABLE tmp_ok(data int not null); +INSERT INTO tmp_ok SELECT generate_series(1, 5000); +)); + + foreach $persistency (qw(normal temporary)) + { + $table = $persistency eq 'normal' ? 'tbl_ok' : 'tmp_ok'; + + # check that one larger read is done as multiple reads + $psql_a->query_safe(qq|SELECT evict_rel('$table')|); + psql_like( + $io_method, + $psql_a, + "$persistency: read buffers, combine, block 0-1", + qq|SELECT blockoff, blocknum, needs_io, nblocks FROM read_buffers('$table', 0, 2)|, + qr/^0\|0\|t\|2$/, + qr/^$/); + + # but if we do it again, i.e. it's in s_b, there will be two + # operations + psql_like( + $io_method, + $psql_a, + "$persistency: read buffers, doesn't combine hits, block 0-1", + qq|SELECT blockoff, blocknum, needs_io, nblocks FROM read_buffers('$table', 0, 2)|, + qr/^0\|0\|f\|1\n1\|1\|f\|1$/, + qr/^$/); + + # Check that a larger read interrupted by a hit works + psql_like( + $io_method, + $psql_a, + "$persistency: read buffers, prep, block 3", + qq|SELECT blockoff, blocknum, needs_io, nblocks FROM read_buffers('$table', 3, 1)|, + qr/^0\|3\|t\|1$/, + qr/^$/); + psql_like( + $io_method, + $psql_a, + "$persistency: read buffers, interrupted by hit on 3, block 2-5", + qq|SELECT blockoff, blocknum, needs_io, nblocks FROM read_buffers('$table', 2, 4)|, + qr/^0\|2\|t\|1\n1\|3\|f\|1\n2\|4\|t\|2$/, + qr/^$/); + + + # Verify that a read with an initial buffer hit works + $psql_a->query_safe(qq|SELECT evict_rel('$table')|); + psql_like( + $io_method, + $psql_a, + "$persistency: read buffers, miss, block 0", + qq|SELECT blockoff, blocknum, needs_io, nblocks FROM read_buffers('$table', 0, 1)|, + qr/^0\|0\|t\|1$/, + qr/^$/); + psql_like( + $io_method, + $psql_a, + "$persistency: read buffers, hit, block 0", + qq|SELECT blockoff, blocknum, needs_io, nblocks FROM read_buffers('$table', 0, 1)|, + qr/^0\|0\|f\|1$/, + qr/^$/); + psql_like( + $io_method, + $psql_a, + "$persistency: read buffers, miss, block 1", + qq|SELECT blockoff, blocknum, needs_io, nblocks FROM read_buffers('$table', 1, 1)|, + qr/^0\|1\|t\|1$/, + qr/^$/); + psql_like( + $io_method, + $psql_a, + "$persistency: read buffers, hit, block 1", + qq|SELECT blockoff, blocknum, needs_io, nblocks FROM read_buffers('$table', 1, 1)|, + qr/^0\|1\|f\|1$/, + qr/^$/); + psql_like( + $io_method, + $psql_a, + "$persistency: read buffers, hit, block 0-1", + qq|SELECT blockoff, blocknum, needs_io, nblocks FROM read_buffers('$table', 0, 2)|, + qr/^0\|0\|f\|1\n1\|1\|f\|1$/, + qr/^$/); + psql_like( + $io_method, + $psql_a, + "$persistency: read buffers, hit 0-1, miss 2", + qq|SELECT blockoff, blocknum, needs_io, nblocks FROM read_buffers('$table', 0, 3)|, + qr/^0\|0\|f\|1\n1\|1\|f\|1\n2\|2\|t\|1$/, + qr/^$/); + + # Verify that a read with a initial miss and trailing buffer hit(s) works + $psql_a->query_safe(qq|SELECT invalidate_rel_block('$table', 0)|); + psql_like( + $io_method, + $psql_a, + "$persistency: read buffers, miss 0, hit 1-2", + qq|SELECT blockoff, blocknum, needs_io, nblocks FROM read_buffers('$table', 0, 3)|, + qr/^0\|0\|t\|1\n1\|1\|f\|1\n2\|2\|f\|1$/, + qr/^$/); + $psql_a->query_safe(qq|SELECT invalidate_rel_block('$table', 1)|); + $psql_a->query_safe(qq|SELECT invalidate_rel_block('$table', 2)|); + $psql_a->query_safe(qq|SELECT * FROM read_buffers('$table', 3, 2)|); + psql_like( + $io_method, + $psql_a, + "$persistency: read buffers, miss 1-2, hit 3-4", + qq|SELECT blockoff, blocknum, needs_io, nblocks FROM read_buffers('$table', 1, 4)|, + qr/^0\|1\|t\|2\n2\|3\|f\|1\n3\|4\|f\|1$/, + qr/^$/); + + # Verify that we aren't doing reads larger than io_combine_limit + $psql_a->query_safe(qq|SELECT evict_rel('$table')|); + $psql_a->query_safe(qq|SET io_combine_limit=3|); + psql_like( + $io_method, + $psql_a, + "$persistency: read buffers, io_combine_limit has effect", + qq|SELECT blockoff, blocknum, needs_io, nblocks FROM read_buffers('$table', 1, 5)|, + qr/^0\|1\|t\|3\n3\|4\|t\|2$/, + qr/^$/); + $psql_a->query_safe(qq|RESET io_combine_limit|); + + + $psql_a->query_safe(qq|SELECT invalidate_rel_block('$table', 1)|); + $psql_a->query_safe(qq|SELECT invalidate_rel_block('$table', 2)|); + $psql_a->query_safe(qq|SELECT * FROM read_buffers('$table', 3, 2)|); + psql_like( + $io_method, + $psql_a, + "$persistency: read buffers, miss 1-2, hit 3-4", + qq|SELECT blockoff, blocknum, needs_io, nblocks FROM read_buffers('$table', 1, 4)|, + qr/^0\|1\|t\|2\n2\|3\|f\|1\n3\|4\|f\|1$/, + qr/^$/); + + # Test encountering buffer IO we started, have to accept that IO may + # or may not have completed though. + $psql_a->query_safe(qq|SELECT evict_rel('$table')|); + $psql_a->query_safe( + qq|SELECT read_rel_block_ll('$table', 1, wait_complete=>false)|); + psql_like( + $io_method, + $psql_a, + "$persistency: read buffers, in-progress 1, read 1-3", + qq|SELECT blockoff, blocknum, needs_io, nblocks FROM read_buffers('$table', 1, 3)|, + qr/^0\|1\|(t|f)\|1\n1\|2\|t\|2$/, + qr/^$/); + + $psql_a->query_safe(qq|SELECT evict_rel('$table')|); + $psql_a->query_safe( + qq|SELECT read_rel_block_ll('$table', 2, wait_complete=>false)|); + psql_like( + $io_method, + $psql_a, + "$persistency: read buffers, in-progress 2, read 1-3", + qq|SELECT blockoff, blocknum, needs_io, nblocks FROM read_buffers('$table', 1, 3)|, + qr/^0\|1\|t\|1\n1\|2\|f\|1\n2\|3\|t\|1$/, + qr/^$/); + } + + # The remaining tests don't make sense for temp tables, as they are + # concerned with multiple sessions interacting with each other. + $table = 'tbl_ok'; + $persistency = 'normal'; + + # Test start buffer IO will split IO if there's IO in progress. We can't + # observe this with sync, as that does not start the IO operation in + # StartReadBuffers(). + if ($io_method ne 'sync') + { + $psql_a->query_safe(qq|SELECT evict_rel('$table')|); + + my $buf_id = + $psql_b->query_safe(qq|SELECT buffer_create_toy('tbl_ok', 3)|); + $psql_b->query_safe( + qq|SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false)| + ); + + query_wait_block( + $io_method, + $node, + $psql_a, + "$persistency: read buffers blocks waiting for concurrent IO", + qq|SELECT blockoff, blocknum, needs_io, nblocks FROM read_buffers('$table', 1, 5);\n|, + "BufferIo"); + $psql_b->query_safe( + qq|SELECT buffer_call_terminate_io($buf_id, for_input=>true, succeed=>false, io_error=>false, release_aio=>false)| + ); + pump_until( + $psql_a->{run}, $psql_a->{timeout}, + \$psql_a->{stdout}, qr/0\|1\|t\|2\n2\|3\|t\|3/); + ok(1, + "$io_method: $persistency: IO was split due to concurrent failed IO" + ); + + # Same as before, except the concurrent IO succeeds this time + $psql_a->query_safe(qq|SELECT evict_rel('$table')|); + $buf_id = + $psql_b->query_safe(qq|SELECT buffer_create_toy('tbl_ok', 3)|); + $psql_b->query_safe( + qq|SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false)| + ); + + query_wait_block( + $io_method, + $node, + $psql_a, + "$persistency: read buffers blocks waiting for concurrent IO", + qq|SELECT blockoff, blocknum, needs_io, nblocks FROM read_buffers('$table', 1, 5);\n|, + "BufferIo"); + $psql_b->query_safe( + qq|SELECT buffer_call_terminate_io($buf_id, for_input=>true, succeed=>true, io_error=>false, release_aio=>false)| + ); + pump_until($psql_a->{run}, $psql_a->{timeout}, \$psql_a->{stdout}, + qr/0\|1\|t\|2\n2\|3\|f\|1\n3\|4\|t\|2/); + ok(1, + "$io_method: $persistency: IO was split due to concurrent successful IO" + ); + } + + $psql_a->quit(); + $psql_b->quit(); +} + + # Run all tests that for the specified node / io_method sub test_io_method { @@ -1455,6 +1693,7 @@ CHECKPOINT; test_checksum($io_method, $node); test_ignore_checksum($io_method, $node); test_checksum_createdb($io_method, $node); + test_read_buffers($io_method, $node); # generic injection tests SKIP: diff --git a/src/test/modules/test_aio/test_aio--1.0.sql b/src/test/modules/test_aio/test_aio--1.0.sql index e495481c41e..c0ac47339a9 100644 --- a/src/test/modules/test_aio/test_aio--1.0.sql +++ b/src/test/modules/test_aio/test_aio--1.0.sql @@ -33,6 +33,10 @@ CREATE FUNCTION read_rel_block_ll( RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME' LANGUAGE C; +CREATE FUNCTION evict_rel(rel regclass) +RETURNS pg_catalog.void STRICT +AS 'MODULE_PATHNAME' LANGUAGE C; + CREATE FUNCTION invalidate_rel_block(rel regclass, blockno int) RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME' LANGUAGE C; @@ -49,6 +53,9 @@ CREATE FUNCTION buffer_call_terminate_io(buffer int, for_input bool, succeed boo RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME' LANGUAGE C; +CREATE FUNCTION read_buffers(rel regclass, startblock int4, nblocks int4, OUT blockoff int4, OUT blocknum int4, OUT needs_io bool, OUT nblocks int4, OUT buf int4[]) +RETURNS SETOF record STRICT +AS 'MODULE_PATHNAME' LANGUAGE C; /* diff --git a/src/test/modules/test_aio/test_aio.c b/src/test/modules/test_aio/test_aio.c index b1aa8af9ec0..43f32039960 100644 --- a/src/test/modules/test_aio/test_aio.c +++ b/src/test/modules/test_aio/test_aio.c @@ -19,7 +19,9 @@ #include "postgres.h" #include "access/relation.h" +#include "catalog/pg_type.h" #include "fmgr.h" +#include "funcapi.h" #include "storage/aio.h" #include "storage/aio_internal.h" #include "storage/buf_internals.h" @@ -27,9 +29,11 @@ #include "storage/checksum.h" #include "storage/ipc.h" #include "storage/lwlock.h" +#include "utils/array.h" #include "utils/builtins.h" #include "utils/injection_point.h" #include "utils/rel.h" +#include "utils/tuplestore.h" PG_MODULE_MAGIC; @@ -458,18 +462,13 @@ read_rel_block_ll(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } -PG_FUNCTION_INFO_V1(invalidate_rel_block); -Datum -invalidate_rel_block(PG_FUNCTION_ARGS) +/* helper for invalidate_rel_block() and evict_rel() */ +static void +invalidate_one_block(Relation rel, ForkNumber forknum, BlockNumber blkno) { - Oid relid = PG_GETARG_OID(0); - BlockNumber blkno = PG_GETARG_UINT32(1); - Relation rel; PrefetchBufferResult pr; Buffer buf; - rel = relation_open(relid, AccessExclusiveLock); - /* * This is a gross hack, but there's no other API exposed that allows to * get a buffer ID without actually reading the block in. @@ -506,11 +505,74 @@ invalidate_rel_block(PG_FUNCTION_ARGS) } } +} + +PG_FUNCTION_INFO_V1(invalidate_rel_block); +Datum +invalidate_rel_block(PG_FUNCTION_ARGS) +{ + Oid relid = PG_GETARG_OID(0); + BlockNumber blkno = PG_GETARG_UINT32(1); + Relation rel; + + rel = relation_open(relid, AccessExclusiveLock); + + invalidate_one_block(rel, MAIN_FORKNUM, blkno); + relation_close(rel, AccessExclusiveLock); PG_RETURN_VOID(); } +PG_FUNCTION_INFO_V1(evict_rel); +Datum +evict_rel(PG_FUNCTION_ARGS) +{ + Oid relid = PG_GETARG_OID(0); + Relation rel; + + rel = relation_open(relid, AccessExclusiveLock); + + /* + * EvictRelUnpinnedBuffers() doesn't support temp tables, so for temp + * tables we have to do it the expensive way and evict every possible + * buffer. + */ + if (RelationUsesLocalBuffers(rel)) + { + SMgrRelation smgr = RelationGetSmgr(rel); + + for (int forknum = MAIN_FORKNUM; forknum <= MAX_FORKNUM; forknum++) + { + BlockNumber nblocks; + + if (!smgrexists(smgr, forknum)) + continue; + + nblocks = smgrnblocks(smgr, forknum); + + for (int blkno = 0; blkno < nblocks; blkno++) + { + invalidate_one_block(rel, forknum, blkno); + } + } + } + else + { + int32 buffers_evicted, + buffers_flushed, + buffers_skipped; + + EvictRelUnpinnedBuffers(rel, &buffers_evicted, &buffers_flushed, + &buffers_skipped); + } + + relation_close(rel, AccessExclusiveLock); + + + PG_RETURN_VOID(); +} + PG_FUNCTION_INFO_V1(buffer_create_toy); Datum buffer_create_toy(PG_FUNCTION_ARGS) @@ -610,6 +672,105 @@ buffer_call_terminate_io(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } +PG_FUNCTION_INFO_V1(read_buffers); +/* + * Infrastructure to test StartReadBuffers() + */ +Datum +read_buffers(PG_FUNCTION_ARGS) +{ + Oid relid = PG_GETARG_OID(0); + BlockNumber startblock = PG_GETARG_UINT32(1); + int32 nblocks = PG_GETARG_INT32(2); + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + Relation rel; + SMgrRelation smgr; + uint16 nblocks_done = 0; + uint16 nios = 0; + ReadBuffersOperation *operations; + Buffer *buffers; + Datum *buffers_datum; + + Assert(nblocks > 0); + + InitMaterializedSRF(fcinfo, 0); + + /* at worst the each block gets its own IO */ + operations = palloc0(sizeof(ReadBuffersOperation) * nblocks); + buffers = palloc0(sizeof(Buffer) * nblocks); + buffers_datum = palloc0(sizeof(Datum) * nblocks); + + rel = relation_open(relid, AccessShareLock); + smgr = RelationGetSmgr(rel); + + while (nblocks_done < nblocks) + { + ReadBuffersOperation *operation = &operations[nios]; + int nblocks_this_io = + Min(nblocks - nblocks_done, io_combine_limit); + Datum values[5] = {0}; + bool nulls[5] = {0}; + bool needed_io; + ArrayType *buffers_arr; + + operation->rel = rel; + operation->smgr = smgr; + operation->persistence = rel->rd_rel->relpersistence; + operation->strategy = NULL; + + needed_io = StartReadBuffers(operation, + &buffers[nblocks_done], + startblock + nblocks_done, + &nblocks_this_io, + 0); + if (needed_io) + WaitReadBuffers(operation); + + /* convert buffer array to datum array */ + for (int i = 0; i < nblocks_this_io; i++) + buffers_datum[nblocks_done + i] = Int32GetDatum(buffers[nblocks_done + i]); + buffers_arr = construct_array_builtin(&buffers_datum[nblocks_done], + nblocks_this_io, + INT4OID); + + values[0] = Int32GetDatum(nblocks_done); + nulls[0] = false; + + values[1] = UInt32GetDatum(startblock + nblocks_done); + nulls[1] = false; + + values[2] = BoolGetDatum(needed_io); + nulls[2] = false; + + values[3] = Int32GetDatum((int16) nblocks_this_io); + nulls[3] = false; + + values[4] = PointerGetDatum(buffers_arr); + nulls[4] = false; + + tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls); + + for (int i = 0; i < nblocks_this_io; i++) + ReleaseBuffer(buffers[nblocks_done + i]); + + nios++; + nblocks_done += nblocks_this_io; + } + + /* + * Free explicitly, to have a chance to detect potential issues with too + * long lived references to the operation. + */ + pfree(operations); + pfree(buffers); + pfree(buffers_datum); + + relation_close(rel, NoLock); + + return (Datum) 0; +} + + PG_FUNCTION_INFO_V1(handle_get); Datum handle_get(PG_FUNCTION_ARGS) -- 2.53.0.1.gb2826b52eb
>From 38908352a1ecd3b94c47cb900a59531b02716a00 Mon Sep 17 00:00:00 2001 From: Andres Freund <[email protected]> Date: Wed, 25 Mar 2026 14:46:38 -0400 Subject: [PATCH v8 3/9] test_aio: Add read_stream test infrastructure & tests Author: Andres Freund <[email protected]> Reviewed-by: Nazir Bilal Yavuz <[email protected]> Reviewed-by: Melanie Plageman <[email protected]> Discussion: https://postgr.es/m/zljergweqti7x67lg5ije2rzjusie37nslsnkjkkby4laqqbfw@3p3zu522yykv --- src/test/modules/test_aio/meson.build | 1 + .../modules/test_aio/t/004_read_stream.pl | 275 +++++++++++++++ src/test/modules/test_aio/test_aio--1.0.sql | 19 +- src/test/modules/test_aio/test_aio.c | 318 +++++++++++++++--- src/tools/pgindent/typedefs.list | 1 + 5 files changed, 564 insertions(+), 50 deletions(-) create mode 100644 src/test/modules/test_aio/t/004_read_stream.pl diff --git a/src/test/modules/test_aio/meson.build b/src/test/modules/test_aio/meson.build index 18a797f3a3b..909f81d96c1 100644 --- a/src/test/modules/test_aio/meson.build +++ b/src/test/modules/test_aio/meson.build @@ -33,6 +33,7 @@ tests += { 't/001_aio.pl', 't/002_io_workers.pl', 't/003_initdb.pl', + 't/004_read_stream.pl', ], }, } diff --git a/src/test/modules/test_aio/t/004_read_stream.pl b/src/test/modules/test_aio/t/004_read_stream.pl new file mode 100644 index 00000000000..f682fa0152c --- /dev/null +++ b/src/test/modules/test_aio/t/004_read_stream.pl @@ -0,0 +1,275 @@ +# Copyright (c) 2025, PostgreSQL Global Development Group + +use strict; +use warnings FATAL => 'all'; + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +use FindBin; +use lib $FindBin::RealBin; + +use TestAio; + + +my $node = PostgreSQL::Test::Cluster->new('test'); +$node->init(); + +TestAio::configure($node); + +$node->append_conf( + 'postgresql.conf', qq( +max_connections=8 +io_method=worker +)); + +$node->start(); +test_setup($node); +$node->stop(); + + +foreach my $method (TestAio::supported_io_methods()) +{ + $node->adjust_conf('postgresql.conf', 'io_method', $method); + $node->start(); + test_io_method($method, $node); + $node->stop(); +} + +done_testing(); + + +sub test_setup +{ + my $node = shift; + + $node->safe_psql( + 'postgres', qq( +CREATE EXTENSION test_aio; + +CREATE TABLE largeish(k int not null) WITH (FILLFACTOR=10); +INSERT INTO largeish(k) SELECT generate_series(1, 10000); +)); + ok(1, "setup"); +} + + +sub test_repeated_blocks +{ + my $io_method = shift; + my $node = shift; + + my $psql = $node->background_psql('postgres', on_error_stop => 0); + + # Preventing larger reads makes testing easier + $psql->query_safe(qq/SET io_combine_limit = 1/); + + # test miss of the same block twice in a row + $psql->query_safe(qq/SELECT evict_rel('largeish');/); + + # block 0 grows the distance enough that the stream will look ahead and try + # to start a pending read for block 2 (and later block 4) twice before + # returning any buffers. + $psql->query_safe( + qq/SELECT * FROM read_stream_for_blocks('largeish', + ARRAY[0, 2, 2, 4, 4]);/); + + ok(1, "$io_method: stream missing the same block repeatedly"); + + $psql->query_safe( + qq/SELECT * FROM read_stream_for_blocks('largeish', + ARRAY[0, 2, 2, 4, 4]);/); + ok(1, "$io_method: stream hitting the same block repeatedly"); + + # test hit of the same block twice in a row + $psql->query_safe(qq/SELECT evict_rel('largeish');/); + $psql->query_safe( + qq/SELECT * FROM read_stream_for_blocks('largeish', + ARRAY[0, 1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1, 0]);/); + ok(1, "$io_method: stream accessing same block"); + + # Test repeated blocks with a temp table, using invalidate_rel_block() + # to evict individual local buffers. + $psql->query_safe( + qq/CREATE TEMP TABLE largeish_temp(k int not null) WITH (FILLFACTOR=10); + INSERT INTO largeish_temp(k) SELECT generate_series(1, 200);/); + + # Evict the specific blocks we'll request to force misses + $psql->query_safe(qq/SELECT invalidate_rel_block('largeish_temp', 0);/); + $psql->query_safe(qq/SELECT invalidate_rel_block('largeish_temp', 2);/); + $psql->query_safe(qq/SELECT invalidate_rel_block('largeish_temp', 4);/); + + $psql->query_safe( + qq/SELECT * FROM read_stream_for_blocks('largeish_temp', + ARRAY[0, 2, 2, 4, 4]);/); + ok(1, "$io_method: temp stream missing the same block repeatedly"); + + # Now the blocks are cached, so repeated access should be hits + $psql->query_safe( + qq/SELECT * FROM read_stream_for_blocks('largeish_temp', + ARRAY[0, 2, 2, 4, 4]);/); + ok(1, "$io_method: temp stream hitting the same block repeatedly"); + + $psql->quit(); +} + + +sub test_inject_foreign +{ + my $io_method = shift; + my $node = shift; + + my $psql_a = $node->background_psql('postgres', on_error_stop => 0); + my $psql_b = $node->background_psql('postgres', on_error_stop => 0); + + my $pid_a = $psql_a->query_safe(qq/SELECT pg_backend_pid();/); + + + ### + # Test read stream encountering buffers undergoing IO in another backend, + # with the other backend's reads succeeding. + ### + $psql_a->query_safe(qq/SELECT evict_rel('largeish');/); + + $psql_b->query_safe( + qq/SELECT inj_io_completion_wait(pid=>pg_backend_pid(), + relfilenode=>pg_relation_filenode('largeish'));/); + + $psql_b->{stdin} .= qq/SELECT read_rel_block_ll('largeish', + blockno=>5, nblocks=>1);\n/; + $psql_b->{run}->pump_nb(); + + $node->poll_query_until( + 'postgres', qq/SELECT wait_event FROM pg_stat_activity + WHERE wait_event = 'completion_wait';/, + 'completion_wait'); + + # Block 5 is undergoing IO in session b, so session a will move on to start + # a new IO for block 7. + $psql_a->{stdin} .= qq/SELECT array_agg(blocknum) FROM + read_stream_for_blocks('largeish', ARRAY[0, 2, 5, 7]);\n/; + $psql_a->{run}->pump_nb(); + + $node->poll_query_until('postgres', + qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid_a), + 'AioIoCompletion'); + + $node->safe_psql('postgres', qq/SELECT inj_io_completion_continue()/); + + pump_until( + $psql_a->{run}, $psql_a->{timeout}, + \$psql_a->{stdout}, qr/\{0,2,5,7\}/); + + ok(1, + qq/$io_method: read stream encounters succeeding IO by another backend/ + ); + + ### + # Test read stream encountering buffers undergoing IO in another backend, + # with the other backend's reads failing. + ### + $psql_a->query_safe(qq/SELECT evict_rel('largeish');/); + + $psql_b->query_safe( + qq/SELECT inj_io_completion_wait(pid=>pg_backend_pid(), + relfilenode=>pg_relation_filenode('largeish'));/); + + $psql_b->query_safe( + qq/SELECT inj_io_short_read_attach(-errno_from_string('EIO'), + pid=>pg_backend_pid(), + relfilenode=>pg_relation_filenode('largeish'));/); + + $psql_b->{stdin} .= qq/SELECT read_rel_block_ll('largeish', + blockno=>5, nblocks=>1);\n/; + $psql_b->{run}->pump_nb(); + + $node->poll_query_until( + 'postgres', + qq/SELECT wait_event FROM pg_stat_activity + WHERE wait_event = 'completion_wait';/, + 'completion_wait'); + + $psql_a->{stdin} .= qq/SELECT array_agg(blocknum) FROM + read_stream_for_blocks('largeish', ARRAY[0, 2, 5, 7]);\n/; + $psql_a->{run}->pump_nb(); + + $node->poll_query_until('postgres', + qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid_a), + 'AioIoCompletion'); + + $node->safe_psql('postgres', qq/SELECT inj_io_completion_continue()/); + + pump_until( + $psql_a->{run}, $psql_a->{timeout}, + \$psql_a->{stdout}, qr/\{0,2,5,7\}/); + + pump_until($psql_b->{run}, $psql_b->{timeout}, \$psql_b->{stderr}, + qr/ERROR.*could not read blocks 5\.\.5/); + ok(1, "$io_method: injected error occurred"); + $psql_b->{stderr} = ''; + $psql_b->query_safe(qq/SELECT inj_io_short_read_detach();/); + + ok(1, + qq/$io_method: read stream encounters failing IO by another backend/); + + + ### + # Test read stream encountering two buffers that are undergoing the same + # IO, started by another backend. + ### + $psql_a->query_safe(qq/SELECT evict_rel('largeish');/); + + $psql_b->query_safe( + qq/SELECT inj_io_completion_wait(pid=>pg_backend_pid(), + relfilenode=>pg_relation_filenode('largeish'));/); + + $psql_b->{stdin} .= qq/SELECT read_rel_block_ll('largeish', + blockno=>2, nblocks=>3);\n/; + $psql_b->{run}->pump_nb(); + + $node->poll_query_until( + 'postgres', + qq/SELECT wait_event FROM pg_stat_activity + WHERE wait_event = 'completion_wait';/, + 'completion_wait'); + + # Blocks 2 and 4 are undergoing IO initiated by session a + $psql_a->{stdin} .= qq/SELECT array_agg(blocknum) FROM + read_stream_for_blocks('largeish', ARRAY[0, 2, 4]);\n/; + $psql_a->{run}->pump_nb(); + + $node->poll_query_until('postgres', + qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid_a), + 'AioIoCompletion'); + + $node->safe_psql('postgres', qq/SELECT inj_io_completion_continue()/); + + pump_until( + $psql_a->{run}, $psql_a->{timeout}, + \$psql_a->{stdout}, qr/\{0,2,4\}/); + + ok(1, qq/$io_method: read stream encounters two buffer read in one IO/); + + $psql_a->quit(); + $psql_b->quit(); +} + + +sub test_io_method +{ + my $io_method = shift; + my $node = shift; + + is($node->safe_psql('postgres', 'SHOW io_method'), + $io_method, "$io_method: io_method set correctly"); + + test_repeated_blocks($io_method, $node); + + SKIP: + { + skip 'Injection points not supported by this build', 1 + unless $ENV{enable_injection_points} eq 'yes'; + test_inject_foreign($io_method, $node); + } +} diff --git a/src/test/modules/test_aio/test_aio--1.0.sql b/src/test/modules/test_aio/test_aio--1.0.sql index c0ac47339a9..d42ba939f9a 100644 --- a/src/test/modules/test_aio/test_aio--1.0.sql +++ b/src/test/modules/test_aio/test_aio--1.0.sql @@ -57,6 +57,13 @@ CREATE FUNCTION read_buffers(rel regclass, startblock int4, nblocks int4, OUT bl RETURNS SETOF record STRICT AS 'MODULE_PATHNAME' LANGUAGE C; +/* + * Read stream related functions + */ +CREATE FUNCTION read_stream_for_blocks(rel regclass, blocks int4[], OUT blockoff int4, OUT blocknum int4, OUT buf int4) +RETURNS SETOF record STRICT +AS 'MODULE_PATHNAME' LANGUAGE C; + /* * Handle related functions @@ -98,8 +105,16 @@ AS 'MODULE_PATHNAME' LANGUAGE C; /* * Injection point related functions */ -CREATE FUNCTION inj_io_short_read_attach(result int) -RETURNS pg_catalog.void STRICT +CREATE FUNCTION inj_io_completion_wait(pid int DEFAULT NULL, relfilenode oid DEFAULT 0) +RETURNS pg_catalog.void +AS 'MODULE_PATHNAME' LANGUAGE C; + +CREATE FUNCTION inj_io_completion_continue() +RETURNS pg_catalog.void +AS 'MODULE_PATHNAME' LANGUAGE C; + +CREATE FUNCTION inj_io_short_read_attach(result int, pid int DEFAULT NULL, relfilenode oid DEFAULT 0) +RETURNS pg_catalog.void AS 'MODULE_PATHNAME' LANGUAGE C; CREATE FUNCTION inj_io_short_read_detach() diff --git a/src/test/modules/test_aio/test_aio.c b/src/test/modules/test_aio/test_aio.c index 43f32039960..c86a0885a5a 100644 --- a/src/test/modules/test_aio/test_aio.c +++ b/src/test/modules/test_aio/test_aio.c @@ -27,13 +27,18 @@ #include "storage/buf_internals.h" #include "storage/bufmgr.h" #include "storage/checksum.h" +#include "storage/condition_variable.h" #include "storage/ipc.h" #include "storage/lwlock.h" +#include "storage/proc.h" +#include "storage/procnumber.h" +#include "storage/read_stream.h" #include "utils/array.h" #include "utils/builtins.h" #include "utils/injection_point.h" #include "utils/rel.h" #include "utils/tuplestore.h" +#include "utils/wait_event.h" PG_MODULE_MAGIC; @@ -41,13 +46,30 @@ PG_MODULE_MAGIC; typedef struct InjIoErrorState { + ConditionVariable cv; + bool enabled_short_read; bool enabled_reopen; + bool enabled_completion_wait; + Oid completion_wait_relfilenode; + pid_t completion_wait_pid; + uint32 completion_wait_event; + bool short_read_result_set; + Oid short_read_relfilenode; + pid_t short_read_pid; int short_read_result; } InjIoErrorState; +typedef struct BlocksReadStreamData +{ + int nblocks; + int curblock; + uint32 *blocks; +} BlocksReadStreamData; + + static InjIoErrorState *inj_io_error_state; /* Shared memory init callbacks */ @@ -89,10 +111,13 @@ test_aio_shmem_startup(void) inj_io_error_state->enabled_short_read = false; inj_io_error_state->enabled_reopen = false; + ConditionVariableInit(&inj_io_error_state->cv); + inj_io_error_state->completion_wait_event = WaitEventInjectionPointNew("completion_wait"); + #ifdef USE_INJECTION_POINTS InjectionPointAttach("aio-process-completion-before-shared", "test_aio", - "inj_io_short_read", + "inj_io_completion_hook", NULL, 0); InjectionPointLoad("aio-process-completion-before-shared"); @@ -388,7 +413,7 @@ read_rel_block_ll(PG_FUNCTION_ARGS) if (nblocks <= 0 || nblocks > PG_IOV_MAX) elog(ERROR, "nblocks is out of range"); - rel = relation_open(relid, AccessExclusiveLock); + rel = relation_open(relid, AccessShareLock); for (int i = 0; i < nblocks; i++) { @@ -771,6 +796,85 @@ read_buffers(PG_FUNCTION_ARGS) } +static BlockNumber +read_stream_for_blocks_cb(ReadStream *stream, + void *callback_private_data, + void *per_buffer_data) +{ + BlocksReadStreamData *stream_data = callback_private_data; + + if (stream_data->curblock >= stream_data->nblocks) + return InvalidBlockNumber; + return stream_data->blocks[stream_data->curblock++]; +} + +PG_FUNCTION_INFO_V1(read_stream_for_blocks); +Datum +read_stream_for_blocks(PG_FUNCTION_ARGS) +{ + Oid relid = PG_GETARG_OID(0); + ArrayType *blocksarray = PG_GETARG_ARRAYTYPE_P(1); + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + Relation rel; + BlocksReadStreamData stream_data; + ReadStream *stream; + + InitMaterializedSRF(fcinfo, 0); + + /* + * We expect the input to be an N-element int4 array; verify that. We + * don't need to use deconstruct_array() since the array data is just + * going to look like a C array of N int4 values. + */ + if (ARR_NDIM(blocksarray) != 1 || + ARR_HASNULL(blocksarray) || + ARR_ELEMTYPE(blocksarray) != INT4OID) + elog(ERROR, "expected 1 dimensional int4 array"); + + stream_data.curblock = 0; + stream_data.nblocks = ARR_DIMS(blocksarray)[0]; + stream_data.blocks = (uint32 *) ARR_DATA_PTR(blocksarray); + + rel = relation_open(relid, AccessShareLock); + + stream = read_stream_begin_relation(READ_STREAM_FULL, + NULL, + rel, + MAIN_FORKNUM, + read_stream_for_blocks_cb, + &stream_data, + 0); + + for (int i = 0; i < stream_data.nblocks; i++) + { + Buffer buf = read_stream_next_buffer(stream, NULL); + Datum values[3] = {0}; + bool nulls[3] = {0}; + + if (!BufferIsValid(buf)) + elog(ERROR, "read_stream_next_buffer() call %d is unexpectedly invalid", i); + + values[0] = Int32GetDatum(i); + values[1] = UInt32GetDatum(stream_data.blocks[i]); + values[2] = UInt32GetDatum(buf); + + tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls); + + ReleaseBuffer(buf); + } + + if (read_stream_next_buffer(stream, NULL) != InvalidBuffer) + elog(ERROR, "read_stream_next_buffer() call %d is unexpectedly valid", + stream_data.nblocks + 1); + + read_stream_end(stream); + + relation_close(rel, NoLock); + + return (Datum) 0; +} + + PG_FUNCTION_INFO_V1(handle_get); Datum handle_get(PG_FUNCTION_ARGS) @@ -841,15 +945,98 @@ batch_end(PG_FUNCTION_ARGS) } #ifdef USE_INJECTION_POINTS -extern PGDLLEXPORT void inj_io_short_read(const char *name, - const void *private_data, - void *arg); +extern PGDLLEXPORT void inj_io_completion_hook(const char *name, + const void *private_data, + void *arg); extern PGDLLEXPORT void inj_io_reopen(const char *name, const void *private_data, void *arg); -void -inj_io_short_read(const char *name, const void *private_data, void *arg) +static bool +inj_io_short_read_matches(PgAioHandle *ioh) +{ + PGPROC *owner_proc; + int32 owner_pid; + PgAioTargetData *td; + + if (!inj_io_error_state->enabled_short_read) + return false; + + if (!inj_io_error_state->short_read_result_set) + return false; + + owner_proc = GetPGProcByNumber(pgaio_io_get_owner(ioh)); + owner_pid = owner_proc->pid; + + if (inj_io_error_state->short_read_pid != 0 && + inj_io_error_state->short_read_pid != owner_pid) + return false; + + td = pgaio_io_get_target_data(ioh); + + if (inj_io_error_state->short_read_relfilenode != InvalidOid && + td->smgr.rlocator.relNumber != inj_io_error_state->short_read_relfilenode) + return false; + + /* + * Only shorten reads that are actually longer than the target size, + * otherwise we can trigger over-reads. + */ + if (inj_io_error_state->short_read_result >= ioh->result) + return false; + + return true; +} + +static bool +inj_io_completion_wait_matches(PgAioHandle *ioh) +{ + PGPROC *owner_proc; + int32 owner_pid; + PgAioTargetData *td; + + if (!inj_io_error_state->enabled_completion_wait) + return false; + + owner_proc = GetPGProcByNumber(pgaio_io_get_owner(ioh)); + owner_pid = owner_proc->pid; + + if (inj_io_error_state->completion_wait_pid != owner_pid) + return false; + + td = pgaio_io_get_target_data(ioh); + + if (inj_io_error_state->completion_wait_relfilenode != InvalidOid && + td->smgr.rlocator.relNumber != inj_io_error_state->completion_wait_relfilenode) + return false; + + return true; +} + +static void +inj_io_completion_wait_hook(const char *name, const void *private_data, void *arg) +{ + PgAioHandle *ioh = (PgAioHandle *) arg; + + if (!inj_io_completion_wait_matches(ioh)) + return; + + ConditionVariablePrepareToSleep(&inj_io_error_state->cv); + + while (true) + { + if (!inj_io_completion_wait_matches(ioh)) + break; + + ConditionVariableSleep(&inj_io_error_state->cv, + inj_io_error_state->completion_wait_event); + } + + ConditionVariableCancelSleep(); +} + +static void +inj_io_short_read_hook(const char *name, const void *private_data, void *arg) { PgAioHandle *ioh = (PgAioHandle *) arg; @@ -858,58 +1045,56 @@ inj_io_short_read(const char *name, const void *private_data, void *arg) inj_io_error_state->enabled_reopen), errhidestmt(true), errhidecontext(true)); - if (inj_io_error_state->enabled_short_read) + if (inj_io_short_read_matches(ioh)) { + struct iovec *iov = &pgaio_ctl->iovecs[ioh->iovec_off]; + int32 old_result = ioh->result; + int32 new_result = inj_io_error_state->short_read_result; + int32 processed = 0; + + ereport(LOG, + errmsg("short read inject point, changing result from %d to %d", + old_result, new_result), + errhidestmt(true), errhidecontext(true)); + /* - * Only shorten reads that are actually longer than the target size, - * otherwise we can trigger over-reads. + * The underlying IO actually completed OK, and thus the "invalid" + * portion of the IOV actually contains valid data. That can hide a + * lot of problems, e.g. if we were to wrongly mark a buffer, that + * wasn't read according to the shortened-read, IO as valid, the + * contents would look valid and we might miss a bug. + * + * To avoid that, iterate through the IOV and zero out the "failed" + * portion of the IO. */ - if (inj_io_error_state->short_read_result_set - && ioh->op == PGAIO_OP_READV - && inj_io_error_state->short_read_result <= ioh->result) + for (int i = 0; i < ioh->op_data.read.iov_length; i++) { - struct iovec *iov = &pgaio_ctl->iovecs[ioh->iovec_off]; - int32 old_result = ioh->result; - int32 new_result = inj_io_error_state->short_read_result; - int32 processed = 0; - - ereport(LOG, - errmsg("short read inject point, changing result from %d to %d", - old_result, new_result), - errhidestmt(true), errhidecontext(true)); - - /* - * The underlying IO actually completed OK, and thus the "invalid" - * portion of the IOV actually contains valid data. That can hide - * a lot of problems, e.g. if we were to wrongly mark a buffer, - * that wasn't read according to the shortened-read, IO as valid, - * the contents would look valid and we might miss a bug. - * - * To avoid that, iterate through the IOV and zero out the - * "failed" portion of the IO. - */ - for (int i = 0; i < ioh->op_data.read.iov_length; i++) + if (processed + iov[i].iov_len <= new_result) + processed += iov[i].iov_len; + else if (processed <= new_result) { - if (processed + iov[i].iov_len <= new_result) - processed += iov[i].iov_len; - else if (processed <= new_result) - { - uint32 ok_part = new_result - processed; + uint32 ok_part = new_result - processed; - memset((char *) iov[i].iov_base + ok_part, 0, iov[i].iov_len - ok_part); - processed += iov[i].iov_len; - } - else - { - memset((char *) iov[i].iov_base, 0, iov[i].iov_len); - } + memset((char *) iov[i].iov_base + ok_part, 0, iov[i].iov_len - ok_part); + processed += iov[i].iov_len; + } + else + { + memset((char *) iov[i].iov_base, 0, iov[i].iov_len); } - - ioh->result = new_result; } + + ioh->result = new_result; } } +void +inj_io_completion_hook(const char *name, const void *private_data, void *arg) +{ + inj_io_completion_wait_hook(name, private_data, arg); + inj_io_short_read_hook(name, private_data, arg); +} + void inj_io_reopen(const char *name, const void *private_data, void *arg) { @@ -923,6 +1108,39 @@ inj_io_reopen(const char *name, const void *private_data, void *arg) } #endif +PG_FUNCTION_INFO_V1(inj_io_completion_wait); +Datum +inj_io_completion_wait(PG_FUNCTION_ARGS) +{ +#ifdef USE_INJECTION_POINTS + inj_io_error_state->enabled_completion_wait = true; + inj_io_error_state->completion_wait_pid = + PG_ARGISNULL(0) ? 0 : PG_GETARG_INT32(0); + inj_io_error_state->completion_wait_relfilenode = + PG_ARGISNULL(1) ? InvalidOid : PG_GETARG_OID(1); +#else + elog(ERROR, "injection points not supported"); +#endif + + PG_RETURN_VOID(); +} + +PG_FUNCTION_INFO_V1(inj_io_completion_continue); +Datum +inj_io_completion_continue(PG_FUNCTION_ARGS) +{ +#ifdef USE_INJECTION_POINTS + inj_io_error_state->enabled_completion_wait = false; + inj_io_error_state->completion_wait_pid = 0; + inj_io_error_state->completion_wait_relfilenode = InvalidOid; + ConditionVariableBroadcast(&inj_io_error_state->cv); +#else + elog(ERROR, "injection points not supported"); +#endif + + PG_RETURN_VOID(); +} + PG_FUNCTION_INFO_V1(inj_io_short_read_attach); Datum inj_io_short_read_attach(PG_FUNCTION_ARGS) @@ -932,6 +1150,10 @@ inj_io_short_read_attach(PG_FUNCTION_ARGS) inj_io_error_state->short_read_result_set = !PG_ARGISNULL(0); if (inj_io_error_state->short_read_result_set) inj_io_error_state->short_read_result = PG_GETARG_INT32(0); + inj_io_error_state->short_read_pid = + PG_ARGISNULL(1) ? 0 : PG_GETARG_INT32(1); + inj_io_error_state->short_read_relfilenode = + PG_ARGISNULL(2) ? 0 : PG_GETARG_OID(2); #else elog(ERROR, "injection points not supported"); #endif diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index dbbec84b222..e097613eb66 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -308,6 +308,7 @@ BlockSampler BlockSamplerData BlockedProcData BlockedProcsData +BlocksReadStreamData BlocktableEntry BloomBuildState BloomFilter -- 2.53.0.1.gb2826b52eb
>From 2bb630e96187dbb6833f7435a4600f9be2f9b851 Mon Sep 17 00:00:00 2001 From: Melanie Plageman <[email protected]> Date: Mon, 16 Mar 2026 16:50:56 -0400 Subject: [PATCH v8 4/9] Fix off-by-one error in read IO tracing AsyncReadBuffer()'s no-IO needed path passed TRACE_POSTGRESQL_BUFFER_READ_DONE the wrong block number because it had already incremented operation->nblocks_done. Fix by folding the nblocks_done offset into the blocknum local variable at initialization. Author: Melanie Plageman <[email protected]> Reviewed-by: Andres Freund <[email protected]> Discussion: https://postgr.es/m/u73un3xeljr4fiidzwi4ikcr6vm7oqugn4fo5vqpstjio6anl2%40hph6fvdiiria --- src/backend/storage/buffer/bufmgr.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 00bc609529a..10afae1990b 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -1875,10 +1875,10 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) { Buffer *buffers = &operation->buffers[0]; int flags = operation->flags; - BlockNumber blocknum = operation->blocknum; ForkNumber forknum = operation->forknum; char persistence = operation->persistence; int16 nblocks_done = operation->nblocks_done; + BlockNumber blocknum = operation->blocknum + nblocks_done; Buffer *io_buffers = &operation->buffers[nblocks_done]; int io_buffers_len = 0; PgAioHandle *ioh; @@ -1990,7 +1990,7 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) * must have started out as a miss in PinBufferForBlock(). The other * backend will track this as a 'read'. */ - TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, blocknum + operation->nblocks_done, + TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, blocknum, operation->smgr->smgr_rlocator.locator.spcOid, operation->smgr->smgr_rlocator.locator.dbOid, operation->smgr->smgr_rlocator.locator.relNumber, @@ -2062,7 +2062,7 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) */ io_start = pgstat_prepare_io_time(track_io_timing); smgrstartreadv(ioh, operation->smgr, forknum, - blocknum + nblocks_done, + blocknum, io_pages, io_buffers_len); pgstat_count_io_op_time(io_object, io_context, IOOP_READ, io_start, 1, io_buffers_len * BLCKSZ); -- 2.53.0.1.gb2826b52eb
>From a68f562d9cbe3b57853fc2196a29f58b064a82d7 Mon Sep 17 00:00:00 2001 From: Melanie Plageman <[email protected]> Date: Tue, 17 Mar 2026 15:49:52 -0400 Subject: [PATCH v8 5/9] Pass io_object and io_context through to PinBufferForBlock() PinBufferForBlock() is always_inline and called in a loop in StartReadBuffersImpl(). Previously it computed io_context and io_object internally, which required calling IOContextForStrategy() -- a non-inline function the compiler cannot prove is side-effect-free. This could potential cause unneeded redundant function calls. Compute io_context and io_object in the callers instead, allowing StartReadBuffersImpl() to do so once before entering the loop. Suggested-by: Andres Freund <[email protected]> Discussion: https://postgr.es/m/zljergweqti7x67lg5ije2rzjusie37nslsnkjkkby4laqqbfw@3p3zu522yykv --- src/backend/storage/buffer/bufmgr.c | 45 ++++++++++++++++++++--------- 1 file changed, 31 insertions(+), 14 deletions(-) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 10afae1990b..ab9c2a4b904 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -1223,11 +1223,11 @@ PinBufferForBlock(Relation rel, ForkNumber forkNum, BlockNumber blockNum, BufferAccessStrategy strategy, + IOObject io_object, + IOContext io_context, bool *foundPtr) { BufferDesc *bufHdr; - IOContext io_context; - IOObject io_object; Assert(blockNum != P_NEW); @@ -1236,17 +1236,6 @@ PinBufferForBlock(Relation rel, persistence == RELPERSISTENCE_PERMANENT || persistence == RELPERSISTENCE_UNLOGGED)); - if (persistence == RELPERSISTENCE_TEMP) - { - io_context = IOCONTEXT_NORMAL; - io_object = IOOBJECT_TEMP_RELATION; - } - else - { - io_context = IOContextForStrategy(strategy); - io_object = IOOBJECT_RELATION; - } - TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum, smgr->smgr_rlocator.locator.spcOid, smgr->smgr_rlocator.locator.dbOid, @@ -1339,9 +1328,23 @@ ReadBuffer_common(Relation rel, SMgrRelation smgr, char smgr_persistence, mode == RBM_ZERO_AND_LOCK)) { bool found; + IOContext io_context; + IOObject io_object; + + if (persistence == RELPERSISTENCE_TEMP) + { + io_context = IOCONTEXT_NORMAL; + io_object = IOOBJECT_TEMP_RELATION; + } + else + { + io_context = IOContextForStrategy(strategy); + io_object = IOOBJECT_RELATION; + } buffer = PinBufferForBlock(rel, smgr, persistence, - forkNum, blockNum, strategy, &found); + forkNum, blockNum, strategy, + io_object, io_context, &found); ZeroAndLockBuffer(buffer, mode, found); return buffer; } @@ -1379,11 +1382,24 @@ StartReadBuffersImpl(ReadBuffersOperation *operation, int actual_nblocks = *nblocks; int maxcombine = 0; bool did_start_io; + IOContext io_context; + IOObject io_object; Assert(*nblocks == 1 || allow_forwarding); Assert(*nblocks > 0); Assert(*nblocks <= MAX_IO_COMBINE_LIMIT); + if (operation->persistence == RELPERSISTENCE_TEMP) + { + io_context = IOCONTEXT_NORMAL; + io_object = IOOBJECT_TEMP_RELATION; + } + else + { + io_context = IOContextForStrategy(operation->strategy); + io_object = IOOBJECT_RELATION; + } + for (int i = 0; i < actual_nblocks; ++i) { bool found; @@ -1432,6 +1448,7 @@ StartReadBuffersImpl(ReadBuffersOperation *operation, operation->forknum, blockNum + i, operation->strategy, + io_object, io_context, &found); } -- 2.53.0.1.gb2826b52eb
>From 6f174b440e08191b78b1927e29fc31a26cfa3be8 Mon Sep 17 00:00:00 2001 From: Melanie Plageman <[email protected]> Date: Fri, 23 Jan 2026 13:54:02 -0500 Subject: [PATCH v8 6/9] Make buffer hit helper Already two places count buffer hits, requiring quite a few lines of code since we do accounting in so many places. Future commits will add more locations, so refactor into a helper. Author: Melanie Plageman <[email protected]> Reviewed-by: Nazir Bilal Yavuz <[email protected]> Discussion: https://postgr.es/m/zljergweqti7x67lg5ije2rzjusie37nslsnkjkkby4laqqbfw@3p3zu522yykv --- src/backend/storage/buffer/bufmgr.c | 84 ++++++++++++++--------------- 1 file changed, 42 insertions(+), 42 deletions(-) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index ab9c2a4b904..fa85570a791 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -648,6 +648,11 @@ static inline BufferDesc *BufferAlloc(SMgrRelation smgr, bool *foundPtr, IOContext io_context); static bool AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress); static void CheckReadBuffersOperation(ReadBuffersOperation *operation, bool is_complete); + +static pg_attribute_always_inline void TrackBufferHit(IOObject io_object, + IOContext io_context, + Relation rel, char persistence, SMgrRelation smgr, + ForkNumber forknum, BlockNumber blocknum); static Buffer GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context); static void FlushUnlockedBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, IOContext io_context); @@ -1243,18 +1248,14 @@ PinBufferForBlock(Relation rel, smgr->smgr_rlocator.backend); if (persistence == RELPERSISTENCE_TEMP) - { bufHdr = LocalBufferAlloc(smgr, forkNum, blockNum, foundPtr); - if (*foundPtr) - pgBufferUsage.local_blks_hit++; - } else - { bufHdr = BufferAlloc(smgr, persistence, forkNum, blockNum, strategy, foundPtr, io_context); - if (*foundPtr) - pgBufferUsage.shared_blks_hit++; - } + + if (*foundPtr) + TrackBufferHit(io_object, io_context, rel, persistence, smgr, forkNum, blockNum); + if (rel) { /* @@ -1263,21 +1264,6 @@ PinBufferForBlock(Relation rel, * zeroed instead), the per-relation stats always count them. */ pgstat_count_buffer_read(rel); - if (*foundPtr) - pgstat_count_buffer_hit(rel); - } - if (*foundPtr) - { - pgstat_count_io_op(io_object, io_context, IOOP_HIT, 1, 0); - if (VacuumCostActive) - VacuumCostBalance += VacuumCostPageHit; - - TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum, - smgr->smgr_rlocator.locator.spcOid, - smgr->smgr_rlocator.locator.dbOid, - smgr->smgr_rlocator.locator.relNumber, - smgr->smgr_rlocator.backend, - true); } return BufferDescriptorGetBuffer(bufHdr); @@ -1712,6 +1698,37 @@ ReadBuffersCanStartIO(Buffer buffer, bool nowait) return ReadBuffersCanStartIOOnce(buffer, nowait); } +/* + * We track various stats related to buffer hits. Because this is done in a + * few separate places, this helper exists for convenience. + */ +static pg_attribute_always_inline void +TrackBufferHit(IOObject io_object, IOContext io_context, + Relation rel, char persistence, SMgrRelation smgr, + ForkNumber forknum, BlockNumber blocknum) +{ + TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, + blocknum, + smgr->smgr_rlocator.locator.spcOid, + smgr->smgr_rlocator.locator.dbOid, + smgr->smgr_rlocator.locator.relNumber, + smgr->smgr_rlocator.backend, + true); + + if (persistence == RELPERSISTENCE_TEMP) + pgBufferUsage.local_blks_hit += 1; + else + pgBufferUsage.shared_blks_hit += 1; + + pgstat_count_io_op(io_object, io_context, IOOP_HIT, 1, 0); + + if (VacuumCostActive) + VacuumCostBalance += VacuumCostPageHit; + + if (rel) + pgstat_count_buffer_hit(rel); +} + /* * Helper for WaitReadBuffers() that processes the results of a readv * operation, raising an error if necessary. @@ -2007,25 +2024,8 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) * must have started out as a miss in PinBufferForBlock(). The other * backend will track this as a 'read'. */ - TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, blocknum, - operation->smgr->smgr_rlocator.locator.spcOid, - operation->smgr->smgr_rlocator.locator.dbOid, - operation->smgr->smgr_rlocator.locator.relNumber, - operation->smgr->smgr_rlocator.backend, - true); - - if (persistence == RELPERSISTENCE_TEMP) - pgBufferUsage.local_blks_hit += 1; - else - pgBufferUsage.shared_blks_hit += 1; - - if (operation->rel) - pgstat_count_buffer_hit(operation->rel); - - pgstat_count_io_op(io_object, io_context, IOOP_HIT, 1, 0); - - if (VacuumCostActive) - VacuumCostBalance += VacuumCostPageHit; + TrackBufferHit(io_object, io_context, operation->rel, persistence, + operation->smgr, forknum, blocknum); } else { -- 2.53.0.1.gb2826b52eb
>From c24ee51b259125b1ab73860b72ea7abae4bc7c60 Mon Sep 17 00:00:00 2001 From: Melanie Plageman <[email protected]> Date: Wed, 18 Mar 2026 11:09:58 -0400 Subject: [PATCH v8 7/9] Restructure AsyncReadBuffers() Restructure AsyncReadBuffers() to use early return when the head buffer is already valid, instead of using a did_start_io flag and if/else branches. Also move around a bit of the code to be located closer to where it is used. This is a refactor only. Author: Melanie Plageman <[email protected]> Reviewed-by: Andres Freund <[email protected]> Discussion: https://postgr.es/m/zljergweqti7x67lg5ije2rzjusie37nslsnkjkkby4laqqbfw@3p3zu522yykv --- src/backend/storage/buffer/bufmgr.c | 171 ++++++++++++++-------------- 1 file changed, 88 insertions(+), 83 deletions(-) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index fa85570a791..e212f6110f2 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -1920,7 +1920,18 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) void *io_pages[MAX_IO_COMBINE_LIMIT]; IOContext io_context; IOObject io_object; - bool did_start_io; + instr_time io_start; + + if (persistence == RELPERSISTENCE_TEMP) + { + io_context = IOCONTEXT_NORMAL; + io_object = IOOBJECT_TEMP_RELATION; + } + else + { + io_context = IOContextForStrategy(operation->strategy); + io_object = IOOBJECT_RELATION; + } /* * When this IO is executed synchronously, either because the caller will @@ -1931,16 +1942,7 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) ioh_flags |= PGAIO_HF_SYNCHRONOUS; if (persistence == RELPERSISTENCE_TEMP) - { - io_context = IOCONTEXT_NORMAL; - io_object = IOOBJECT_TEMP_RELATION; ioh_flags |= PGAIO_HF_REFERENCES_LOCAL; - } - else - { - io_context = IOContextForStrategy(operation->strategy); - io_object = IOOBJECT_RELATION; - } /* * If zero_damaged_pages is enabled, add the READ_BUFFERS_ZERO_ON_ERROR @@ -1992,7 +1994,6 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) if (unlikely(!ioh)) { pgaio_submit_staged(); - ioh = pgaio_io_acquire(CurrentResourceOwner, &operation->io_return); } @@ -2017,91 +2018,95 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) pgaio_io_release(ioh); pgaio_wref_clear(&operation->io_wref); - did_start_io = false; /* * Report and track this as a 'hit' for this backend, even though it * must have started out as a miss in PinBufferForBlock(). The other * backend will track this as a 'read'. */ - TrackBufferHit(io_object, io_context, operation->rel, persistence, - operation->smgr, forknum, blocknum); + TrackBufferHit(io_object, io_context, + operation->rel, operation->persistence, + operation->smgr, operation->forknum, + blocknum); + return false; } - else + + Assert(io_buffers[0] == buffers[nblocks_done]); + io_pages[0] = BufferGetBlock(buffers[nblocks_done]); + io_buffers_len = 1; + + /* + * NB: As little code as possible should be added between the + * ReadBuffersCanStartIO() above, the further ReadBuffersCanStartIO()s + * below and the smgrstartreadv(), as some of the buffers are now marked + * as IO_IN_PROGRESS and will thus cause other backends to wait. + */ + + /* + * How many neighboring-on-disk blocks can we scatter-read into other + * buffers at the same time? In this case we don't wait if we see an I/O + * already in progress (see comment above). + */ + for (int i = nblocks_done + 1; i < operation->nblocks; i++) { - instr_time io_start; + /* Must be consecutive block numbers. */ + Assert(BufferGetBlockNumber(buffers[i - 1]) == + BufferGetBlockNumber(buffers[i]) - 1); - /* We found a buffer that we need to read in. */ - Assert(io_buffers[0] == buffers[nblocks_done]); - io_pages[0] = BufferGetBlock(buffers[nblocks_done]); - io_buffers_len = 1; + if (!ReadBuffersCanStartIO(buffers[i], true)) + break; - /* - * How many neighboring-on-disk blocks can we scatter-read into other - * buffers at the same time? In this case we don't wait if we see an - * I/O already in progress. We already set BM_IO_IN_PROGRESS for the - * head block, so we should get on with that I/O as soon as possible. - */ - for (int i = nblocks_done + 1; i < operation->nblocks; i++) - { - if (!ReadBuffersCanStartIO(buffers[i], true)) - break; - /* Must be consecutive block numbers. */ - Assert(BufferGetBlockNumber(buffers[i - 1]) == - BufferGetBlockNumber(buffers[i]) - 1); - Assert(io_buffers[io_buffers_len] == buffers[i]); + Assert(io_buffers[io_buffers_len] == buffers[i]); - io_pages[io_buffers_len++] = BufferGetBlock(buffers[i]); - } - - /* get a reference to wait for in WaitReadBuffers() */ - pgaio_io_get_wref(ioh, &operation->io_wref); - - /* provide the list of buffers to the completion callbacks */ - pgaio_io_set_handle_data_32(ioh, (uint32 *) io_buffers, io_buffers_len); - - pgaio_io_register_callbacks(ioh, - persistence == RELPERSISTENCE_TEMP ? - PGAIO_HCB_LOCAL_BUFFER_READV : - PGAIO_HCB_SHARED_BUFFER_READV, - flags); - - pgaio_io_set_flag(ioh, ioh_flags); - - /* --- - * Even though we're trying to issue IO asynchronously, track the time - * in smgrstartreadv(): - * - if io_method == IOMETHOD_SYNC, we will always perform the IO - * immediately - * - the io method might not support the IO (e.g. worker IO for a temp - * table) - * --- - */ - io_start = pgstat_prepare_io_time(track_io_timing); - smgrstartreadv(ioh, operation->smgr, forknum, - blocknum, - io_pages, io_buffers_len); - pgstat_count_io_op_time(io_object, io_context, IOOP_READ, - io_start, 1, io_buffers_len * BLCKSZ); - - if (persistence == RELPERSISTENCE_TEMP) - pgBufferUsage.local_blks_read += io_buffers_len; - else - pgBufferUsage.shared_blks_read += io_buffers_len; - - /* - * Track vacuum cost when issuing IO, not after waiting for it. - * Otherwise we could end up issuing a lot of IO in a short timespan, - * despite a low cost limit. - */ - if (VacuumCostActive) - VacuumCostBalance += VacuumCostPageMiss * io_buffers_len; - - *nblocks_progress = io_buffers_len; - did_start_io = true; + io_pages[io_buffers_len++] = BufferGetBlock(buffers[i]); } - return did_start_io; + /* get a reference to wait for in WaitReadBuffers() */ + pgaio_io_get_wref(ioh, &operation->io_wref); + + /* provide the list of buffers to the completion callbacks */ + pgaio_io_set_handle_data_32(ioh, (uint32 *) io_buffers, io_buffers_len); + + pgaio_io_register_callbacks(ioh, + persistence == RELPERSISTENCE_TEMP ? + PGAIO_HCB_LOCAL_BUFFER_READV : + PGAIO_HCB_SHARED_BUFFER_READV, + flags); + + pgaio_io_set_flag(ioh, ioh_flags); + + /* --- + * Even though we're trying to issue IO asynchronously, track the time + * in smgrstartreadv(): + * - if io_method == IOMETHOD_SYNC, we will always perform the IO + * immediately + * - the io method might not support the IO (e.g. worker IO for a temp + * table) + * --- + */ + io_start = pgstat_prepare_io_time(track_io_timing); + smgrstartreadv(ioh, operation->smgr, forknum, + blocknum, + io_pages, io_buffers_len); + pgstat_count_io_op_time(io_object, io_context, IOOP_READ, + io_start, 1, io_buffers_len * BLCKSZ); + + if (persistence == RELPERSISTENCE_TEMP) + pgBufferUsage.local_blks_read += io_buffers_len; + else + pgBufferUsage.shared_blks_read += io_buffers_len; + + /* + * Track vacuum cost when issuing IO, not after waiting for it. Otherwise + * we could end up issuing a lot of IO in a short timespan, despite a low + * cost limit. + */ + if (VacuumCostActive) + VacuumCostBalance += VacuumCostPageMiss * io_buffers_len; + + *nblocks_progress = io_buffers_len; + + return true; } /* -- 2.53.0.1.gb2826b52eb
>From e1abf0c05f4cfa8ac8436131fbd1835f901901cf Mon Sep 17 00:00:00 2001 From: Andres Freund <[email protected]> Date: Thu, 19 Mar 2026 16:30:38 -0400 Subject: [PATCH v8 8/9] bufmgr: Improve StartBufferIO interface Until now StartBufferIO() had a few weaknesses: - As it did not submit staged IOs, it was not safe to call StartBufferIO() where there was a potential for unsubmitted IO, which required AsyncReadBuffers() to use a wrapper (ReadBuffersCanStartIO()) around StartBufferIO(). - With nowait = true, the boolean return value did not allow to distinguish between no IO being necessary and having to wait, which would lead ReadBuffersCanStartIO() to unnecessarily submit staged IO. - Several callers needed to handle both local and shared buffers, requiring the caller to differentiate between StartBufferIO() and StartLocalBufferIO() - In a future commit some callers of StartBufferIO() want the BufferDesc's io_wref to be returned, to asynchronously wait for in-progress IO - Indicating whether to wait with the nowait parameter was somewhat confusing compared to a wait parameter Address these issue as follows: - StartBufferIO() is renamed to StartSharedBufferIO() - A new StartBufferIO() is introduced that supports both shared and local buffers - The boolean return value has been replaced with an enum, indicating whether the IO is already done, already in progress or that the buffer has been readied for IO - A new PgAioWaitRef * argument allows the caller to get the wait reference is desired. All current callers pass NULL, a user of this will be introduced subsequently - Instead of the nowait argument there now is wait This probably would not have been worthwhile on its own, but since all these lines needed to be touched anyway... Author: Andres Freund <[email protected]> Author: Melanie Plageman <[email protected]> Discussion: https://postgr.es/m/zljergweqti7x67lg5ije2rzjusie37nslsnkjkkby4laqqbfw@3p3zu522yykv --- src/include/storage/buf_internals.h | 24 +- src/backend/storage/buffer/bufmgr.c | 296 ++++++++++++-------- src/backend/storage/buffer/localbuf.c | 43 ++- src/test/modules/test_aio/t/001_aio.pl | 24 +- src/test/modules/test_aio/test_aio--1.0.sql | 2 +- src/test/modules/test_aio/test_aio.c | 17 +- src/tools/pgindent/typedefs.list | 1 + 7 files changed, 258 insertions(+), 149 deletions(-) diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h index 8d1e16b5d51..ad1b7b2216a 100644 --- a/src/include/storage/buf_internals.h +++ b/src/include/storage/buf_internals.h @@ -554,8 +554,25 @@ extern void ScheduleBufferTagForWriteback(WritebackContext *wb_context, extern void TrackNewBufferPin(Buffer buf); -/* solely to make it easier to write tests */ -extern bool StartBufferIO(BufferDesc *buf, bool forInput, bool nowait); +/* + * Return value for StartBufferIO / StartSharedBufferIO / StartLocalBufferIO. + * + * When preparing a buffer for I/O and setting BM_IO_IN_PROGRESS, the buffer + * may already have I/O in progress or the I/O may have been done by another + * backend. See the documentation of StartSharedBufferIO for more details. + */ +typedef enum StartBufferIOResult +{ + BUFFER_IO_ALREADY_DONE, + BUFFER_IO_IN_PROGRESS, + BUFFER_IO_READY_FOR_IO, +} StartBufferIOResult; + +/* the following are exposed to make it easier to write tests */ +extern StartBufferIOResult StartBufferIO(Buffer buffer, bool forInput, bool wait, + PgAioWaitRef *io_wref); +extern StartBufferIOResult StartSharedBufferIO(BufferDesc *buf, bool forInput, bool wait, + PgAioWaitRef *io_wref); extern void TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint64 set_flag_bits, bool forget_owner, bool release_aio); @@ -600,7 +617,8 @@ extern BlockNumber ExtendBufferedRelLocal(BufferManagerRelation bmr, extern void MarkLocalBufferDirty(Buffer buffer); extern void TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty, uint64 set_flag_bits, bool release_aio); -extern bool StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool nowait); +extern StartBufferIOResult StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, + bool wait, PgAioWaitRef *io_wref); extern void FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln); extern void InvalidateLocalBuffer(BufferDesc *bufHdr, bool check_unreferenced); extern void DropRelationLocalBuffers(RelFileLocator rlocator, diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index e212f6110f2..fec4e826f62 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -1148,6 +1148,7 @@ ZeroAndLockBuffer(Buffer buffer, ReadBufferMode mode, bool already_valid) BufferDesc *bufHdr; bool need_to_zero; bool isLocalBuf = BufferIsLocal(buffer); + StartBufferIOResult sbres; Assert(mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK); @@ -1159,24 +1160,30 @@ ZeroAndLockBuffer(Buffer buffer, ReadBufferMode mode, bool already_valid) */ need_to_zero = false; } - else if (isLocalBuf) - { - /* Simple case for non-shared buffers. */ - bufHdr = GetLocalBufferDescriptor(-buffer - 1); - need_to_zero = StartLocalBufferIO(bufHdr, true, false); - } else { - /* - * Take BM_IO_IN_PROGRESS, or discover that BM_VALID has been set - * concurrently. Even though we aren't doing I/O, that ensures that - * we don't zero a page that someone else has pinned. An exclusive - * content lock wouldn't be enough, because readers are allowed to - * drop the content lock after determining that a tuple is visible - * (see buffer access rules in README). - */ - bufHdr = GetBufferDescriptor(buffer - 1); - need_to_zero = StartBufferIO(bufHdr, true, false); + if (isLocalBuf) + { + /* Simple case for non-shared buffers. */ + bufHdr = GetLocalBufferDescriptor(-buffer - 1); + sbres = StartLocalBufferIO(bufHdr, true, true, NULL); + } + else + { + /* + * Take BM_IO_IN_PROGRESS, or discover that BM_VALID has been set + * concurrently. Even though we aren't doing I/O, that ensures + * that we don't zero a page that someone else has pinned. An + * exclusive content lock wouldn't be enough, because readers are + * allowed to drop the content lock after determining that a tuple + * is visible (see buffer access rules in README). + */ + bufHdr = GetBufferDescriptor(buffer - 1); + sbres = StartSharedBufferIO(bufHdr, true, true, NULL); + } + + Assert(sbres != BUFFER_IO_IN_PROGRESS); + need_to_zero = sbres == BUFFER_IO_READY_FOR_IO; } if (need_to_zero) @@ -1659,45 +1666,6 @@ CheckReadBuffersOperation(ReadBuffersOperation *operation, bool is_complete) #endif } -/* helper for ReadBuffersCanStartIO(), to avoid repetition */ -static inline bool -ReadBuffersCanStartIOOnce(Buffer buffer, bool nowait) -{ - if (BufferIsLocal(buffer)) - return StartLocalBufferIO(GetLocalBufferDescriptor(-buffer - 1), - true, nowait); - else - return StartBufferIO(GetBufferDescriptor(buffer - 1), true, nowait); -} - -/* - * Helper for AsyncReadBuffers that tries to get the buffer ready for IO. - */ -static inline bool -ReadBuffersCanStartIO(Buffer buffer, bool nowait) -{ - /* - * If this backend currently has staged IO, we need to submit the pending - * IO before waiting for the right to issue IO, to avoid the potential for - * deadlocks (and, more commonly, unnecessary delays for other backends). - */ - if (!nowait && pgaio_have_staged()) - { - if (ReadBuffersCanStartIOOnce(buffer, true)) - return true; - - /* - * Unfortunately StartBufferIO() returning false doesn't allow to - * distinguish between the buffer already being valid and IO already - * being in progress. Since IO already being in progress is quite - * rare, this approach seems fine. - */ - pgaio_submit_staged(); - } - - return ReadBuffersCanStartIOOnce(buffer, nowait); -} - /* * We track various stats related to buffer hits. Because this is done in a * few separate places, this helper exists for convenience. @@ -1921,6 +1889,7 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) IOContext io_context; IOObject io_object; instr_time io_start; + StartBufferIOResult status; if (persistence == RELPERSISTENCE_TEMP) { @@ -1974,8 +1943,8 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) pgstat_prepare_report_checksum_failure(operation->smgr->smgr_rlocator.locator.dbOid); /* - * Get IO handle before ReadBuffersCanStartIO(), as pgaio_io_acquire() - * might block, which we don't want after setting IO_IN_PROGRESS. + * Get IO handle before StartBufferIO(), as pgaio_io_acquire() might + * block, which we don't want after setting IO_IN_PROGRESS. * * If we need to wait for IO before we can get a handle, submit * already-staged IO first, so that other backends don't need to wait. @@ -2004,31 +1973,41 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) * for the outcome: either done, or something went wrong and we will * retry. */ - if (!ReadBuffersCanStartIO(buffers[nblocks_done], false)) + status = StartBufferIO(buffers[nblocks_done], true, true, NULL); + if (status != BUFFER_IO_READY_FOR_IO) { - /* - * Someone else has already completed this block, we're done. - * - * When IO is necessary, ->nblocks_done is updated in - * ProcessReadBuffersResult(), but that is not called if no IO is - * necessary. Thus update here. - */ - operation->nblocks_done += 1; - *nblocks_progress = 1; - pgaio_io_release(ioh); - pgaio_wref_clear(&operation->io_wref); - - /* - * Report and track this as a 'hit' for this backend, even though it - * must have started out as a miss in PinBufferForBlock(). The other - * backend will track this as a 'read'. - */ - TrackBufferHit(io_object, io_context, - operation->rel, operation->persistence, - operation->smgr, operation->forknum, - blocknum); - return false; + *nblocks_progress = 1; + if (status == BUFFER_IO_ALREADY_DONE) + { + /* + * Someone has already completed this block, we're done. + * + * When IO is necessary, ->nblocks_done is updated in + * ProcessReadBuffersResult(), but that is not called if no IO is + * necessary. Thus update here. + */ + operation->nblocks_done += 1; + Assert(operation->nblocks_done <= operation->nblocks); + + Assert(!pgaio_wref_valid(&operation->io_wref)); + + /* + * Report and track this as a 'hit' for this backend, even though + * it must have started out as a miss in PinBufferForBlock(). The + * other backend will track this as a 'read'. + */ + TrackBufferHit(io_object, io_context, + operation->rel, operation->persistence, + operation->smgr, operation->forknum, + blocknum); + return false; + } + + /* The IO is already in-progress */ + Assert(status == BUFFER_IO_IN_PROGRESS); + + return true; } Assert(io_buffers[0] == buffers[nblocks_done]); @@ -2037,9 +2016,9 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) /* * NB: As little code as possible should be added between the - * ReadBuffersCanStartIO() above, the further ReadBuffersCanStartIO()s - * below and the smgrstartreadv(), as some of the buffers are now marked - * as IO_IN_PROGRESS and will thus cause other backends to wait. + * StartBufferIO() above, the further StartBufferIO()s below and the + * smgrstartreadv(), as some of the buffers are now marked as + * IO_IN_PROGRESS and will thus cause other backends to wait. */ /* @@ -2053,7 +2032,8 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) Assert(BufferGetBlockNumber(buffers[i - 1]) == BufferGetBlockNumber(buffers[i]) - 1); - if (!ReadBuffersCanStartIO(buffers[i], true)) + status = StartBufferIO(buffers[i], true, false, NULL); + if (status != BUFFER_IO_READY_FOR_IO) break; Assert(io_buffers[io_buffers_len] == buffers[i]); @@ -2893,16 +2873,23 @@ ExtendBufferedRelShared(BufferManagerRelation bmr, * We *must* do smgr[zero]extend before succeeding, else the page * will not be reserved by the kernel, and the next P_NEW call * will decide to return the same page. Clear the BM_VALID bit, - * do StartBufferIO() and proceed. + * do StartSharedBufferIO() and proceed. * * Loop to handle the very small possibility that someone re-sets * BM_VALID between our clearing it and StartBufferIO inspecting * it. */ - do + while (true) { + StartBufferIOResult sbres; + pg_atomic_fetch_and_u64(&existing_hdr->state, ~BM_VALID); - } while (!StartBufferIO(existing_hdr, true, false)); + + sbres = StartSharedBufferIO(existing_hdr, true, true, NULL); + + if (sbres != BUFFER_IO_ALREADY_DONE) + break; + } } else { @@ -2928,7 +2915,7 @@ ExtendBufferedRelShared(BufferManagerRelation bmr, LWLockRelease(partition_lock); /* XXX: could combine the locked operations in it with the above */ - StartBufferIO(victim_buf_hdr, true, false); + StartSharedBufferIO(victim_buf_hdr, true, true, NULL); } } @@ -4450,7 +4437,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, * someone else flushed the buffer before we could, so we need not do * anything. */ - if (!StartBufferIO(buf, false, false)) + if (StartSharedBufferIO(buf, false, true, NULL) == BUFFER_IO_ALREADY_DONE) return; /* Setup error traceback support for ereport() */ @@ -7029,6 +7016,13 @@ WaitIO(BufferDesc *buf) { ConditionVariable *cv = BufferDescriptorGetIOCV(buf); + /* + * Should never end up here with unsubmitted IO, as no AIO unaware code + * may be used while in batch mode and AIO aware code needs to have + * submitted all staged IO to avoid deadlocks & slowness. + */ + Assert(!pgaio_have_staged()); + ConditionVariablePrepareToSleep(cv); for (;;) { @@ -7081,30 +7075,47 @@ WaitIO(BufferDesc *buf) } /* - * StartBufferIO: begin I/O on this buffer + * StartSharedBufferIO: begin I/O on this buffer * (Assumptions) - * My process is executing no IO on this buffer * The buffer is Pinned * - * In some scenarios multiple backends could attempt the same I/O operation - * concurrently. If someone else has already started I/O on this buffer then - * we will wait for completion of the IO using WaitIO(). + * In several scenarios the buffer may already be undergoing I/O in this or + * another backend. How to best handle that depends on the caller's + * situation. It might be appropriate to wait synchronously (e.g., because the + * buffer is about to be invalidated); wait asynchronously, using the buffer's + * IO wait reference (e.g., because the caller is doing readahead and doesn't + * need the buffer to be ready immediately); or to not wait at all (e.g., + * because the caller is trying to combine IO for this buffer with another + * buffer). * - * Input operations are only attempted on buffers that are not BM_VALID, - * and output operations only on buffers that are BM_VALID and BM_DIRTY, - * so we can always tell if the work is already done. + * How and whether to wait is controlled by the wait in io_wref parameters. In + * detail: * - * Returns true if we successfully marked the buffer as I/O busy, - * false if someone else already did the work. + * - If the caller passes a non-NULL io_wref and the buffer has an I/O wait + * reference, the *io_wref is set to the buffer's io_wref and + * BUFFER_IO_IN_PROGRESS is returned. This is done regardless of the wait + * parameter. * - * If nowait is true, then we don't wait for an I/O to be finished by another - * backend. In that case, false indicates either that the I/O was already - * finished, or is still in progress. This is useful for callers that want to - * find out if they can perform the I/O as part of a larger operation, without - * waiting for the answer or distinguishing the reasons why not. + * - If the caller passes a NULL io_wref (i.e. the caller does not want to + * asynchronously wait for the completion of the IO), wait = false and the + * buffer is undergoing IO, BUFFER_IO_IN_PROGRESS is returned + * + * - If wait = true and either the buffer does not have a wait reference, + * or the caller passes io_wref = NULL, WaitIO() is used to wait for the IO + * to complete. To avoid the potential of deadlocks and unnecessary delays, + * all staged I/O is submitted before waiting. + * + * + * Input operations are only attempted on buffers that are not BM_VALID, and + * output operations only on buffers that are BM_VALID and BM_DIRTY, so we can + * always tell if the work is already done. If no I/O is necessary, + * BUFFER_IO_ALREADY_DONE is returned. + * + * If we successfully marked the buffer as BM_IO_IN_PROGRESS, + * BUFFER_IO_READY_FOR_IO is returned. */ -bool -StartBufferIO(BufferDesc *buf, bool forInput, bool nowait) +StartBufferIOResult +StartSharedBufferIO(BufferDesc *buf, bool forInput, bool wait, PgAioWaitRef *io_wref) { uint64 buf_state; @@ -7116,10 +7127,42 @@ StartBufferIO(BufferDesc *buf, bool forInput, bool nowait) if (!(buf_state & BM_IO_IN_PROGRESS)) break; - UnlockBufHdr(buf); - if (nowait) - return false; - WaitIO(buf); + + /* Join the existing IO */ + if (io_wref != NULL && pgaio_wref_valid(&buf->io_wref)) + { + *io_wref = buf->io_wref; + UnlockBufHdr(buf); + + return BUFFER_IO_IN_PROGRESS; + } + else if (!wait) + { + UnlockBufHdr(buf); + return BUFFER_IO_IN_PROGRESS; + } + else + { + /* + * With wait = true, we always have to wait if the caller has + * passed io_wref = NULL. + * + * Even with io_wref != NULL, we have to wait if the buffer's wait + * ref is not valid but the IO is in progress, someone else + * started IO but hasn't set the wait ref yet. We have no choice + * but to wait until the IO completes. + */ + UnlockBufHdr(buf); + + /* + * If this backend currently has staged IO, submit it before + * waiting for in-progress IO, to avoid potential deadlocks and + * unnecessary delays. + */ + pgaio_submit_staged(); + + WaitIO(buf); + } } /* Once we get here, there is definitely no I/O active on this buffer */ @@ -7128,9 +7171,14 @@ StartBufferIO(BufferDesc *buf, bool forInput, bool nowait) if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY)) { UnlockBufHdr(buf); - return false; + return BUFFER_IO_ALREADY_DONE; } + /* + * No IO in progress and not already done; We will start IO. It's possible + * that the IO was in progress but we're not done, because the IO errored + * out. We'll do the IO ourselves. + */ UnlockBufHdrExt(buf, buf_state, BM_IO_IN_PROGRESS, 0, 0); @@ -7138,7 +7186,31 @@ StartBufferIO(BufferDesc *buf, bool forInput, bool nowait) ResourceOwnerRememberBufferIO(CurrentResourceOwner, BufferDescriptorGetBuffer(buf)); - return true; + return BUFFER_IO_READY_FOR_IO; +} + +/* + * Wrapper around StartSharedBufferIO / StartLocalBufferIO. Only to be used + * when the caller doesn't otherwise need to care about local vs shared. See + * StartSharedBufferIO() for details. + */ +StartBufferIOResult +StartBufferIO(Buffer buffer, bool forInput, bool wait, PgAioWaitRef *io_wref) +{ + BufferDesc *buf_hdr; + + if (BufferIsLocal(buffer)) + { + buf_hdr = GetLocalBufferDescriptor(-buffer - 1); + + return StartLocalBufferIO(buf_hdr, forInput, wait, io_wref); + } + else + { + buf_hdr = GetBufferDescriptor(buffer - 1); + + return StartSharedBufferIO(buf_hdr, forInput, wait, io_wref); + } } /* diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c index 404c6bccbdd..64da741e101 100644 --- a/src/backend/storage/buffer/localbuf.c +++ b/src/backend/storage/buffer/localbuf.c @@ -191,7 +191,7 @@ FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln) * Try to start an I/O operation. There currently are no reasons for * StartLocalBufferIO to return false, so we raise an error in that case. */ - if (!StartLocalBufferIO(bufHdr, false, false)) + if (StartLocalBufferIO(bufHdr, false, true, NULL) != BUFFER_IO_READY_FOR_IO) elog(ERROR, "failed to start write IO on local buffer"); /* Find smgr relation for buffer */ @@ -435,7 +435,7 @@ ExtendBufferedRelLocal(BufferManagerRelation bmr, pg_atomic_unlocked_write_u64(&existing_hdr->state, buf_state); /* no need to loop for local buffers */ - StartLocalBufferIO(existing_hdr, true, false); + StartLocalBufferIO(existing_hdr, true, true, NULL); } else { @@ -451,7 +451,7 @@ ExtendBufferedRelLocal(BufferManagerRelation bmr, hresult->id = victim_buf_id; - StartLocalBufferIO(victim_buf_hdr, true, false); + StartLocalBufferIO(victim_buf_hdr, true, true, NULL); } } @@ -517,26 +517,41 @@ MarkLocalBufferDirty(Buffer buffer) } /* - * Like StartBufferIO, but for local buffers + * Like StartSharedBufferIO, but for local buffers */ -bool -StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool nowait) +StartBufferIOResult +StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool wait, PgAioWaitRef *io_wref) { uint64 buf_state; /* * With AIO the buffer could have IO in progress, e.g. when there are two - * scans of the same relation. Either wait for the other IO or return - * false. + * scans of the same relation. Either wait for the other IO (if wait = + * true and io_wref == NULL) or return BUFFER_IO_IN_PROGRESS; */ if (pgaio_wref_valid(&bufHdr->io_wref)) { - PgAioWaitRef iow = bufHdr->io_wref; + PgAioWaitRef buf_wref = bufHdr->io_wref; - if (nowait) - return false; + if (io_wref != NULL) + { + /* We've already asynchronously started this IO, so join it */ + *io_wref = buf_wref; + return BUFFER_IO_IN_PROGRESS; + } - pgaio_wref_wait(&iow); + /* + * For temp buffers we should never need to wait in + * StartLocalBufferIO() when called with io_wref == NULL while there + * are staged IOs, as it's not allowed to call code that is not aware + * of AIO while in batch mode. + */ + Assert(!pgaio_have_staged()); + + if (!wait) + return BUFFER_IO_IN_PROGRESS; + + pgaio_wref_wait(&buf_wref); } /* Once we get here, there is definitely no I/O active on this buffer */ @@ -545,14 +560,14 @@ StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool nowait) buf_state = pg_atomic_read_u64(&bufHdr->state); if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY)) { - return false; + return BUFFER_IO_ALREADY_DONE; } /* BM_IO_IN_PROGRESS isn't currently used for local buffers */ /* local buffers don't track IO using resowners */ - return true; + return BUFFER_IO_READY_FOR_IO; } /* diff --git a/src/test/modules/test_aio/t/001_aio.pl b/src/test/modules/test_aio/t/001_aio.pl index bde92eeb7d4..c0f258890ee 100644 --- a/src/test/modules/test_aio/t/001_aio.pl +++ b/src/test/modules/test_aio/t/001_aio.pl @@ -383,7 +383,7 @@ sub test_startwait_io $io_method, $psql_a, "first StartBufferIO", - qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false);), + qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true);), qr/^t$/, qr/^$/); @@ -392,14 +392,14 @@ sub test_startwait_io $io_method, $psql_a, "second StartBufferIO fails, same session", - qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>true);), + qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>false);), qr/^f$/, qr/^$/); psql_like( $io_method, $psql_b, "second StartBufferIO fails, other session", - qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>true);), + qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>false);), qr/^f$/, qr/^$/); @@ -409,7 +409,7 @@ sub test_startwait_io $node, $psql_b, "blocking start buffer io", - qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false);), + qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true);), "BufferIo"); # Terminate the IO, without marking it as success, this should trigger the @@ -438,7 +438,7 @@ sub test_startwait_io $io_method, $psql_a, "blocking buffer io w/ success: first start buffer io", - qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false);), + qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true);), qr/^t$/, qr/^$/); @@ -448,7 +448,7 @@ sub test_startwait_io $node, $psql_b, "blocking start buffer io", - qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false);), + qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true);), "BufferIo"); # Terminate the IO, marking it as success @@ -486,7 +486,7 @@ INSERT INTO tmp_ok SELECT generate_series(1, 10000); $io_method, $psql_a, "first StartLocalBufferIO", - qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>true);), + qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>false);), qr/^t$/, qr/^$/); @@ -497,7 +497,7 @@ INSERT INTO tmp_ok SELECT generate_series(1, 10000); $io_method, $psql_a, "second StartLocalBufferIO succeeds, same session", - qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>true);), + qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>false);), qr/^t$/, qr/^$/); @@ -509,7 +509,7 @@ INSERT INTO tmp_ok SELECT generate_series(1, 10000); $io_method, $psql_a, "StartLocalBufferIO after not marking valid succeeds, same session", - qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>true);), + qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>false);), qr/^t$/, qr/^$/); @@ -524,7 +524,7 @@ INSERT INTO tmp_ok SELECT generate_series(1, 10000); $io_method, $psql_a, "StartLocalBufferIO after marking valid fails", - qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false);), + qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true);), qr/^f$/, qr/^$/); @@ -1608,7 +1608,7 @@ INSERT INTO tmp_ok SELECT generate_series(1, 5000); my $buf_id = $psql_b->query_safe(qq|SELECT buffer_create_toy('tbl_ok', 3)|); $psql_b->query_safe( - qq|SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false)| + qq|SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true)| ); query_wait_block( @@ -1633,7 +1633,7 @@ INSERT INTO tmp_ok SELECT generate_series(1, 5000); $buf_id = $psql_b->query_safe(qq|SELECT buffer_create_toy('tbl_ok', 3)|); $psql_b->query_safe( - qq|SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false)| + qq|SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true)| ); query_wait_block( diff --git a/src/test/modules/test_aio/test_aio--1.0.sql b/src/test/modules/test_aio/test_aio--1.0.sql index d42ba939f9a..c3dc3402ce7 100644 --- a/src/test/modules/test_aio/test_aio--1.0.sql +++ b/src/test/modules/test_aio/test_aio--1.0.sql @@ -45,7 +45,7 @@ CREATE FUNCTION buffer_create_toy(rel regclass, blockno int4) RETURNS pg_catalog.int4 STRICT AS 'MODULE_PATHNAME' LANGUAGE C; -CREATE FUNCTION buffer_call_start_io(buffer int, for_input bool, nowait bool) +CREATE FUNCTION buffer_call_start_io(buffer int, for_input bool, wait bool) RETURNS pg_catalog.bool STRICT AS 'MODULE_PATHNAME' LANGUAGE C; diff --git a/src/test/modules/test_aio/test_aio.c b/src/test/modules/test_aio/test_aio.c index c86a0885a5a..66ba6507e1c 100644 --- a/src/test/modules/test_aio/test_aio.c +++ b/src/test/modules/test_aio/test_aio.c @@ -434,13 +434,13 @@ read_rel_block_ll(PG_FUNCTION_ARGS) if (RelationUsesLocalBuffers(rel)) { for (int i = 0; i < nblocks; i++) - StartLocalBufferIO(buf_hdrs[i], true, false); + StartLocalBufferIO(buf_hdrs[i], true, true, NULL); pgaio_io_set_flag(ioh, PGAIO_HF_REFERENCES_LOCAL); } else { for (int i = 0; i < nblocks; i++) - StartBufferIO(buf_hdrs[i], true, false); + StartSharedBufferIO(buf_hdrs[i], true, true, NULL); } pgaio_io_set_handle_data_32(ioh, (uint32 *) bufs, nblocks); @@ -623,15 +623,18 @@ buffer_call_start_io(PG_FUNCTION_ARGS) { Buffer buf = PG_GETARG_INT32(0); bool for_input = PG_GETARG_BOOL(1); - bool nowait = PG_GETARG_BOOL(2); + bool wait = PG_GETARG_BOOL(2); + StartBufferIOResult result; bool can_start; if (BufferIsLocal(buf)) - can_start = StartLocalBufferIO(GetLocalBufferDescriptor(-buf - 1), - for_input, nowait); + result = StartLocalBufferIO(GetLocalBufferDescriptor(-buf - 1), + for_input, wait, NULL); else - can_start = StartBufferIO(GetBufferDescriptor(buf - 1), - for_input, nowait); + result = StartSharedBufferIO(GetBufferDescriptor(buf - 1), + for_input, wait, NULL); + + can_start = result == BUFFER_IO_READY_FOR_IO; /* * For tests we don't want the resowner release preventing us from diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index e097613eb66..67524535026 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2944,6 +2944,7 @@ SplitTextOutputData SplitVar StackElem StakindFlags +StartBufferIOResult StartDataPtrType StartLOPtrType StartLOsPtrType -- 2.53.0.1.gb2826b52eb
>From 11ee2f2b6f3a9c0e757fce992bce4befbbcc07f1 Mon Sep 17 00:00:00 2001 From: Andres Freund <[email protected]> Date: Wed, 25 Mar 2026 16:42:00 -0400 Subject: [PATCH v8 9/9] aio: Don't wait for already in-progress IO When a backend attempts to start a read IO and finds the first buffer already has I/O in progress, previously it waited for that I/O to complete before initiating reads for any of the subsequent buffers. Although the backend must wait for the I/O to finish when acquiring the buffer, there's no reason for it to wait when setting up the read operation. Waiting at this point prevents the backend from starting I/O on subsequent buffers and can significantly reduce concurrency. This matters in two workloads: when multiple backends scan the same relation concurrently, and when a single backend requests the same block multiple times within the readahead distance. If backends wait each time they encounter an in-progress read, the access pattern effectively degenerates into synchronous I/O. To fix this, when encountering an already in-progress IO for the head buffer, a backend now records the buffer's wait reference and defers waiting until WaitReadBuffers(), when it actually needs to acquire the buffer. In rare cases, a backend may still need to wait synchronously at IO start time: if another backend has set BM_IO_IN_PROGRESS on the buffer but has not yet set the wait reference. Such windows should be brief and uncommon. Author: Melanie Plageman <[email protected]> Reviewed-by: Andres Freund <[email protected]> Reviewed-by: Nazir Bilal Yavuz <[email protected]> Discussion: https://postgr.es/m/flat/zljergweqti7x67lg5ije2rzjusie37nslsnkjkkby4laqqbfw%403p3zu522yykv --- src/include/storage/bufmgr.h | 4 +- src/backend/storage/buffer/bufmgr.c | 89 ++++++++++++++++++++++++----- 2 files changed, 78 insertions(+), 15 deletions(-) diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index 4017896f951..dd41b92f944 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -144,9 +144,11 @@ struct ReadBuffersOperation */ Buffer *buffers; BlockNumber blocknum; - int flags; + uint16 flags; int16 nblocks; int16 nblocks_done; + /* true if waiting on another backend's IO */ + bool foreign_io; PgAioWaitRef io_wref; PgAioReturn io_return; }; diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index fec4e826f62..cc6bdc7d2bb 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -1800,8 +1800,11 @@ WaitReadBuffers(ReadBuffersOperation *operation) * b) reports some time as waiting, even if we never waited * * we first check if we already know the IO is complete. + * + * Note that operation->io_return is uninitialized for foreign IO, + * so we cannot use the cheaper PGAIO_RS_UNKNOWN pre-check. */ - if (aio_ret->result.status == PGAIO_RS_UNKNOWN && + if ((operation->foreign_io || aio_ret->result.status == PGAIO_RS_UNKNOWN) && !pgaio_wref_check_done(&operation->io_wref)) { instr_time io_start = pgstat_prepare_io_time(track_io_timing); @@ -1820,11 +1823,45 @@ WaitReadBuffers(ReadBuffersOperation *operation) Assert(pgaio_wref_check_done(&operation->io_wref)); } - /* - * We now are sure the IO completed. Check the results. This - * includes reporting on errors if there were any. - */ - ProcessReadBuffersResult(operation); + if (unlikely(operation->foreign_io)) + { + Buffer buffer = operation->buffers[operation->nblocks_done]; + BufferDesc *desc = BufferIsLocal(buffer) ? + GetLocalBufferDescriptor(-buffer - 1) : + GetBufferDescriptor(buffer - 1); + uint64 buf_state = pg_atomic_read_u64(&desc->state); + + if (buf_state & BM_VALID) + { + BlockNumber blocknum = operation->blocknum + operation->nblocks_done; + + operation->nblocks_done += 1; + Assert(operation->nblocks_done <= operation->nblocks); + + /* + * Track this as a 'hit' for this backend. The backend + * performing the IO will track it as a 'read'. + */ + TrackBufferHit(io_object, io_context, + operation->rel, operation->persistence, + operation->smgr, operation->forknum, + blocknum); + } + + /* + * If the foreign IO failed and left the buffer invalid, + * nblocks_done is not incremented. The retry loop below will + * call AsyncReadBuffers() which will attempt the IO itself. + */ + } + else + { + /* + * We now are sure the IO completed. Check the results. This + * includes reporting on errors if there were any. + */ + ProcessReadBuffersResult(operation); + } } /* @@ -1870,7 +1907,8 @@ WaitReadBuffers(ReadBuffersOperation *operation) * affected by the call. If the first buffer is valid, *nblocks_progress is * set to 1 and operation->nblocks_done is incremented. * - * Returns true if IO was initiated, false if no IO was necessary. + * Returns true if IO was initiated or is already in progress (foreign IO), + * false if the buffer was already valid. */ static bool AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) @@ -1943,8 +1981,9 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) pgstat_prepare_report_checksum_failure(operation->smgr->smgr_rlocator.locator.dbOid); /* - * Get IO handle before StartBufferIO(), as pgaio_io_acquire() might - * block, which we don't want after setting IO_IN_PROGRESS. + * We must get an IO handle before StartBufferIO(), as pgaio_io_acquire() + * might block, which we don't want after setting IO_IN_PROGRESS. If we + * don't need to do the IO, we'll release the handle. * * If we need to wait for IO before we can get a handle, submit * already-staged IO first, so that other backends don't need to wait. @@ -1966,14 +2005,34 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) ioh = pgaio_io_acquire(CurrentResourceOwner, &operation->io_return); } + operation->foreign_io = false; + pgaio_wref_clear(&operation->io_wref); + /* - * Check if we can start IO on the first to-be-read buffer. + * Try to start IO on the first buffer in a new run of blocks. If AIO is + * in progress, be it in this backend or another backend, we just + * associate the wait reference with the operation and wait in + * WaitReadBuffers(). This turns out to be important for performance in + * two workloads: * - * If an I/O is already in progress in another backend, we want to wait - * for the outcome: either done, or something went wrong and we will - * retry. + * 1) A read stream that has to read the same block multiple times within + * the readahead distance. This can happen e.g. for the table accesses of + * an index scan. + * + * 2) Concurrent scans by multiple backends on the same relation. + * + * If we were to synchronously wait for the in-progress IO, we'd not be + * able to keep enough I/O in flight. + * + * If we do find there is ongoing I/O for the buffer, we set up a 1-block + * ReadBuffersOperation that WaitReadBuffers then can wait on. + * + * It's possible that another backend has started IO on the buffer but not + * yet set its wait reference. In this case, we have no choice but to wait + * for either the wait reference to be valid or the IO to be done. */ - status = StartBufferIO(buffers[nblocks_done], true, true, NULL); + status = StartBufferIO(buffers[nblocks_done], true, true, + &operation->io_wref); if (status != BUFFER_IO_READY_FOR_IO) { pgaio_io_release(ioh); @@ -2006,6 +2065,8 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) /* The IO is already in-progress */ Assert(status == BUFFER_IO_IN_PROGRESS); + Assert(pgaio_wref_valid(&operation->io_wref)); + operation->foreign_io = true; return true; } -- 2.53.0.1.gb2826b52eb
