Thanks for the review! Everything you suggested that I don't elaborate on below, I've just gone ahead and done in attached v6.
On Tue, Mar 17, 2026 at 1:26 PM Andres Freund <[email protected]> wrote: > > > --- a/src/backend/storage/buffer/bufmgr.c > > +++ b/src/backend/storage/buffer/bufmgr.c > > @@ -1990,7 +1990,7 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int > > *nblocks_progress) > > * must have started out as a miss in PinBufferForBlock(). > > The other > > * backend will track this as a 'read'. > > */ > > - TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, blocknum + > > operation->nblocks_done, > > + TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, blocknum + > > operation->nblocks_done - 1, > > > > operation->smgr->smgr_rlocator.locator.spcOid, > > > > operation->smgr->smgr_rlocator.locator.dbOid, > > > > operation->smgr->smgr_rlocator.locator.relNumber, > > -- > > Ah, the issue is that we already incremented nblocks_done, right? Maybe it'd > be easier to understand if we stashed blocknum + nblocks_done into a local > var, and use it in in both branches of if (!ReadBuffersCanStartIO())? > > This probably needs to be backpatched... 0003 in v6 does as you suggest. I'll backport it after a quick +1 here. > > Subject: [PATCH v5 4/5] Make buffer hit helper > > > @@ -1236,17 +1238,6 @@ PinBufferForBlock(Relation rel, > > persistence == RELPERSISTENCE_PERMANENT || > > persistence == RELPERSISTENCE_UNLOGGED)); > > > > - if (persistence == RELPERSISTENCE_TEMP) > > - { > > - io_context = IOCONTEXT_NORMAL; > > - io_object = IOOBJECT_TEMP_RELATION; > > - } > > - else > > - { > > - io_context = IOContextForStrategy(strategy); > > - io_object = IOOBJECT_RELATION; > > - } > > - > > I'm mildly worried that this will lead to a bit worse code generation, the > compiler might have a harder time figuring out that io_context/io_object > doesn't change across multiple PinBufferForBlock calls. Although it already > might be unable to do so (we don't mark IOContextForStrategy as > pure [1]). > > I kinda wonder if, for StartReadBuffersImpl(), we should go the opposite > direction, and explicitly look up IOContextForStrategy(strategy) *before* the > actual_nblocks loop to make sure the compiler doesn't inject external function > calls (which will in all likelihood require register spilling etc). I added a separate patch to refactor the code to do this first (0004). > > @@ -1254,18 +1245,11 @@ PinBufferForBlock(Relation rel, > > > > smgr->smgr_rlocator.backend); > > > > if (persistence == RELPERSISTENCE_TEMP) > > And here it might end up adding a separate persistence == RELPERSISTENCE_TEMP > branch in CountBufferHit(), I suspect the compiler may not be able to optimize > it away. And you think it is optimizing it away in PinBufferForBlock()? > At the very least I'd invert the call to CountBufferHit() and the > pgstat_count_buffer_read(), as the latter will probably prevent most > optimizations (due to the compiler not being able to prove that > (rel)->pgstat_info->counts.blocks_fetched is a different memory location as > *foundPtr). I did this. I did not check the compiled code before or after though. > > +CountBufferHit(BufferAccessStrategy strategy, > > + Relation rel, char persistence, SMgrRelation smgr, > > + ForkNumber forknum, BlockNumber blocknum) > > I don't think "Count*" is a great name for something that does tracepoints and > vacuum cost balance accounting, the latter actually changes behavior of the > program due to the sleeps it injects. > > The first alternative I have is AccountForBufferHit(), not great, but still > seems a bit better. At some point, I had ProcessBufferHit(), but Bilal felt it implied the function did more than counting. I've changed it now to TrackBufferHit(). > > From 4d737fa14f333abc4ee6ade8cb0340530695e887 Mon Sep 17 00:00:00 2001 > > From: Melanie Plageman <[email protected]> > > Date: Fri, 23 Jan 2026 14:00:31 -0500 > > Subject: [PATCH v5 5/5] Don't wait for already in-progress IO > > I wonder if might be worth splitting this up in a refactoring and a > "behavioural change" commit. Might be too complicated. > > Candidates for a split seem to be: > - Moving pgaio_io_acquire_nb() to earlier > - Introduce PrepareNewReadBufferIO/PrepareAdditionalReadBuffer without support > for READ_BUFFER_IN_PROGRESS > - introduce READ_BUFFER_IN_PROGRESS I've done something like this in v6. > > + * Local version of PrepareNewReadBufferIO(). Here instead of localbuf.c to > > + * avoid an external function call. > > + */ > > +static PrepareReadBuffer_Status > > +PrepareNewLocalReadBufferIO(ReadBuffersOperation *operation, > > + Buffer buffer) > > Hm, seems the test in 0002 should be extended to cover the the temp table > case. I did this. However, I was a bit lazy in how many cases I added because I used invalidate_rel_block(), which is pretty verbose (since evict_rel() doesn't work yet for local buffers). I don't think we'll be able to easily test READ_BUFFER_ALREADY_DONE (though perhaps we aren't testing it for shared buffers either?). > > +{ > > + BufferDesc *desc = GetLocalBufferDescriptor(-buffer - 1); > > + uint64 buf_state = pg_atomic_read_u64(&desc->state); > > + > > + /* Already valid, no work to do */ > > + if (buf_state & BM_VALID) > > + { > > + pgaio_wref_clear(&operation->io_wref); > > + return READ_BUFFER_ALREADY_DONE; > > + } > > Is this reachable for local buffers? Yes, I think this is reachable by local buffers that started the IO already and then completed it when acquiring a new IO handle at the top of AsyncReadBuffers(). > > + if (pgaio_wref_valid(&desc->io_wref)) > > + { > > + operation->io_wref = desc->io_wref; > > + operation->foreign_io = true; > > + return READ_BUFFER_IN_PROGRESS; > > + } > > + > > + /* > > + * While it is possible for a buffer to have been prepared for IO but > > not > > + * yet had its wait reference set, there's no way for us to know that > > for > > + * temporary buffers. Thus, we'll prepare for own IO on this buffer. > > + */ > > + return READ_BUFFER_READY_FOR_IO; > > Is that actually possible? And would it be ok to just do start IO in that > case? You're right, that's not possible for local buffers. For local buffers, we "prepare for IO" by calling PrepareNewLocalReadBufferIO() and then set the wait ref in a codepath initiated by calling smgrstartreadv() as part of "staging" the IO. No one can observe that buffer in between the call to PrepareNewLocalReadBufferIO() and setting the wait reference. So, I've deleted the comment. > > +static PrepareReadBuffer_Status > > +PrepareNewReadBufferIO(ReadBuffersOperation *operation, > > + Buffer buffer) > > +{ > > I'm not sure I love "New" here, compared to "Additional". Perhaps "Begin" & > "Continue"? Or "First" & "Additional"? Or ... I changed the names to PrepareHeadBufferReadIO() and PrepareAdditionalBufferReadIO(). "Head" instead of "First" because First felt like it implied the first buffer ever and head seems to make it clear it is the first buffer of this new IO. - Melanie
From 53f010e7072fb5bda9a342c32fb6035da41c9c5c Mon Sep 17 00:00:00 2001 From: Andres Freund <[email protected]> Date: Tue, 9 Sep 2025 10:14:34 -0400 Subject: [PATCH v6 1/8] aio: Refactor tests in preparation for more tests In a future commit more AIO related tests are due to be introduced. However 001_aio.pl already is fairly large. This commit introduces a new TestAio package with helpers for writing AIO related tests. Then it uses the new helpers to simplify the existing 001_aio.pl by iterating over all supported io_methods. This will be particularly helpful because additional methods already have been submitted. Additionally this commit splits out testing of initdb using a non-default method into its own test. While that test is somewhat important, it's fairly slow and doesn't break that often. For development velocity it's helpful for 001_aio.pl to be faster. While particularly the latter could benefit from being its own commit, it seems to introduce more back-and-forth than it's worth. Author: Andres Freund <[email protected]> Reviewed-by: Nazir Bilal Yavuz <[email protected]> Reviewed-by: Melanie Plageman <[email protected]> Discussion: https://postgr.es/m/ --- src/test/modules/test_aio/meson.build | 1 + src/test/modules/test_aio/t/001_aio.pl | 141 +++++++--------------- src/test/modules/test_aio/t/003_initdb.pl | 71 +++++++++++ src/test/modules/test_aio/t/TestAio.pm | 90 ++++++++++++++ 4 files changed, 204 insertions(+), 99 deletions(-) create mode 100644 src/test/modules/test_aio/t/003_initdb.pl create mode 100644 src/test/modules/test_aio/t/TestAio.pm diff --git a/src/test/modules/test_aio/meson.build b/src/test/modules/test_aio/meson.build index fefa25bc5ab..18a797f3a3b 100644 --- a/src/test/modules/test_aio/meson.build +++ b/src/test/modules/test_aio/meson.build @@ -32,6 +32,7 @@ tests += { 'tests': [ 't/001_aio.pl', 't/002_io_workers.pl', + 't/003_initdb.pl', ], }, } diff --git a/src/test/modules/test_aio/t/001_aio.pl b/src/test/modules/test_aio/t/001_aio.pl index 5c634ec3ca9..e18b2a2b8ae 100644 --- a/src/test/modules/test_aio/t/001_aio.pl +++ b/src/test/modules/test_aio/t/001_aio.pl @@ -7,126 +7,56 @@ use PostgreSQL::Test::Cluster; use PostgreSQL::Test::Utils; use Test::More; +use FindBin; +use lib $FindBin::RealBin; -### -# Test io_method=worker -### -my $node_worker = create_node('worker'); -$node_worker->start(); - -test_generic('worker', $node_worker); -SKIP: -{ - skip 'Injection points not supported by this build', 1 - unless $ENV{enable_injection_points} eq 'yes'; - test_inject_worker('worker', $node_worker); -} +use TestAio; -$node_worker->stop(); +my @methods = TestAio::supported_io_methods(); +my %nodes; ### -# Test io_method=io_uring +# Create and configure one instance for each io_method ### -if (have_io_uring()) +foreach my $method (@methods) { - my $node_uring = create_node('io_uring'); - $node_uring->start(); - test_generic('io_uring', $node_uring); - $node_uring->stop(); -} - - -### -# Test io_method=sync -### - -my $node_sync = create_node('sync'); + my $node = PostgreSQL::Test::Cluster->new($method); -# just to have one test not use the default auto-tuning + $nodes{$method} = $node; + $node->init(); + $node->append_conf('postgresql.conf', "io_method=$method"); + TestAio::configure($node); +} -$node_sync->append_conf( +# Just to have one test not use the default auto-tuning +$nodes{'sync'}->append_conf( 'postgresql.conf', qq( -io_max_concurrency=4 + io_max_concurrency=4 )); -$node_sync->start(); -test_generic('sync', $node_sync); -$node_sync->stop(); - -done_testing(); - ### -# Test Helpers +# Execute the tests for each io_method ### -sub create_node +foreach my $method (@methods) { - local $Test::Builder::Level = $Test::Builder::Level + 1; - - my $io_method = shift; + my $node = $nodes{$method}; - my $node = PostgreSQL::Test::Cluster->new($io_method); - - # Want to test initdb for each IO method, otherwise we could just reuse - # the cluster. - # - # Unfortunately Cluster::init() puts PG_TEST_INITDB_EXTRA_OPTS after the - # options specified by ->extra, if somebody puts -c io_method=xyz in - # PG_TEST_INITDB_EXTRA_OPTS it would break this test. Fix that up if we - # detect it. - local $ENV{PG_TEST_INITDB_EXTRA_OPTS} = $ENV{PG_TEST_INITDB_EXTRA_OPTS}; - if (defined $ENV{PG_TEST_INITDB_EXTRA_OPTS} - && $ENV{PG_TEST_INITDB_EXTRA_OPTS} =~ m/io_method=/) - { - $ENV{PG_TEST_INITDB_EXTRA_OPTS} .= " -c io_method=$io_method"; - } - - $node->init(extra => [ '-c', "io_method=$io_method" ]); - - $node->append_conf( - 'postgresql.conf', qq( -shared_preload_libraries=test_aio -log_min_messages = 'DEBUG3' -log_statement=all -log_error_verbosity=default -restart_after_crash=false -temp_buffers=100 -)); + $node->start(); + test_io_method($method, $node); + $node->stop(); +} - # Even though we used -c io_method=... above, if TEMP_CONFIG sets - # io_method, it'd override the setting persisted at initdb time. While - # using (and later verifying) the setting from initdb provides some - # verification of having used the io_method during initdb, it's probably - # not worth the complication of only appending if the variable is set in - # in TEMP_CONFIG. - $node->append_conf( - 'postgresql.conf', qq( -io_method=$io_method -)); +done_testing(); - ok(1, "$io_method: initdb"); - return $node; -} +### +# Test Helpers +### -sub have_io_uring -{ - # To detect if io_uring is supported, we look at the error message for - # assigning an invalid value to an enum GUC, which lists all the valid - # options. We need to use -C to deal with running as administrator on - # windows, the superuser check is omitted if -C is used. - my ($stdout, $stderr) = - run_command [qw(postgres -C invalid -c io_method=invalid)]; - die "can't determine supported io_method values" - unless $stderr =~ m/Available values: ([^\.]+)\./; - my $methods = $1; - note "supported io_method values are: $methods"; - - return ($methods =~ m/io_uring/) ? 1 : 0; -} sub psql_like { @@ -1490,8 +1420,8 @@ SELECT read_rel_block_ll('tbl_cs_fail', 3, nblocks=>1, zero_on_error=>true);), } -# Run all tests that are supported for all io_methods -sub test_generic +# Run all tests that for the specified node / io_method +sub test_io_method { my $io_method = shift; my $node = shift; @@ -1526,10 +1456,23 @@ CHECKPOINT; test_ignore_checksum($io_method, $node); test_checksum_createdb($io_method, $node); + # generic injection tests SKIP: { skip 'Injection points not supported by this build', 1 unless $ENV{enable_injection_points} eq 'yes'; test_inject($io_method, $node); } + + # worker specific injection tests + if ($io_method eq 'worker') + { + SKIP: + { + skip 'Injection points not supported by this build', 1 + unless $ENV{enable_injection_points} eq 'yes'; + + test_inject_worker($io_method, $node); + } + } } diff --git a/src/test/modules/test_aio/t/003_initdb.pl b/src/test/modules/test_aio/t/003_initdb.pl new file mode 100644 index 00000000000..c03ae58d00a --- /dev/null +++ b/src/test/modules/test_aio/t/003_initdb.pl @@ -0,0 +1,71 @@ +# Copyright (c) 2024-2025, PostgreSQL Global Development Group +# +# Test initdb for each IO method. This is done separately from 001_aio.pl, as +# it isn't fast. This way the more commonly failing / hacked-on 001_aio.pl can +# be iterated on more quickly. + +use strict; +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +use FindBin; +use lib $FindBin::RealBin; + +use TestAio; + + +foreach my $method (TestAio::supported_io_methods()) +{ + test_create_node($method); +} + +done_testing(); + + +sub test_create_node +{ + local $Test::Builder::Level = $Test::Builder::Level + 1; + + my $io_method = shift; + + my $node = PostgreSQL::Test::Cluster->new($io_method); + + # Want to test initdb for each IO method, otherwise we could just reuse + # the cluster. + # + # Unfortunately Cluster::init() puts PG_TEST_INITDB_EXTRA_OPTS after the + # options specified by ->extra, if somebody puts -c io_method=xyz in + # PG_TEST_INITDB_EXTRA_OPTS it would break this test. Fix that up if we + # detect it. + local $ENV{PG_TEST_INITDB_EXTRA_OPTS} = $ENV{PG_TEST_INITDB_EXTRA_OPTS}; + if (defined $ENV{PG_TEST_INITDB_EXTRA_OPTS} + && $ENV{PG_TEST_INITDB_EXTRA_OPTS} =~ m/io_method=/) + { + $ENV{PG_TEST_INITDB_EXTRA_OPTS} .= " -c io_method=$io_method"; + } + + $node->init(extra => [ '-c', "io_method=$io_method" ]); + + TestAio::configure($node); + + # Even though we used -c io_method=... above, if TEMP_CONFIG sets + # io_method, it'd override the setting persisted at initdb time. While + # using (and later verifying) the setting from initdb provides some + # verification of having used the io_method during initdb, it's probably + # not worth the complication of only appending if the variable is set in + # in TEMP_CONFIG. + $node->append_conf( + 'postgresql.conf', qq( +io_method=$io_method +)); + + ok(1, "$io_method: initdb"); + + $node->start(); + $node->stop(); + ok(1, "$io_method: start & stop"); + + return $node; +} diff --git a/src/test/modules/test_aio/t/TestAio.pm b/src/test/modules/test_aio/t/TestAio.pm new file mode 100644 index 00000000000..5bc80a9b130 --- /dev/null +++ b/src/test/modules/test_aio/t/TestAio.pm @@ -0,0 +1,90 @@ +# Copyright (c) 2024-2025, PostgreSQL Global Development Group + +=pod + +=head1 NAME + +TestAio - helpers for writing AIO related tests + +=cut + +package TestAio; + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + + +=pod + +=head1 METHODS + +=over + +=item TestAio::supported_io_methods() + +Return an array of all the supported values for the io_method GUC + +=cut + +sub supported_io_methods() +{ + my @io_methods = ('worker'); + + push(@io_methods, "io_uring") if have_io_uring(); + + # Return sync last, as it will least commonly fail + push(@io_methods, "sync"); + + return @io_methods; +} + + +=item TestAio::configure() + +Prepare a cluster for AIO test + +=cut + +sub configure +{ + my $node = shift; + + $node->append_conf( + 'postgresql.conf', qq( +shared_preload_libraries=test_aio +log_min_messages = 'DEBUG3' +log_statement=all +log_error_verbosity=default +restart_after_crash=false +temp_buffers=100 +)); + +} + + +=pod + +=item TestAio::have_io_uring() + +Return if io_uring is supported + +=cut + +sub have_io_uring +{ + # To detect if io_uring is supported, we look at the error message for + # assigning an invalid value to an enum GUC, which lists all the valid + # options. We need to use -C to deal with running as administrator on + # windows, the superuser check is omitted if -C is used. + my ($stdout, $stderr) = + run_command [qw(postgres -C invalid -c io_method=invalid)]; + die "can't determine supported io_method values" + unless $stderr =~ m/Available values: ([^\.]+)\./; + my $methods = $1; + note "supported io_method values are: $methods"; + + return ($methods =~ m/io_uring/) ? 1 : 0; +} + +1; -- 2.43.0
From d510b46530bd219c9767e42e02f093d3460babef Mon Sep 17 00:00:00 2001 From: Andres Freund <[email protected]> Date: Wed, 10 Sep 2025 14:00:02 -0400 Subject: [PATCH v6 2/8] test_aio: Add read_stream test infrastructure & tests Author: Andres Freund <[email protected]> Reviewed-by: Nazir Bilal Yavuz <[email protected]> Reviewed-by: Melanie Plageman <[email protected]> Discussion: https://postgr.es/m/flat/zljergweqti7x67lg5ije2rzjusie37nslsnkjkkby4laqqbfw%403p3zu522yykv --- src/test/modules/test_aio/meson.build | 1 + .../modules/test_aio/t/004_read_stream.pl | 286 +++++++++++++++ src/test/modules/test_aio/test_aio--1.0.sql | 24 +- src/test/modules/test_aio/test_aio.c | 346 +++++++++++++++--- src/tools/pgindent/typedefs.list | 1 + 5 files changed, 607 insertions(+), 51 deletions(-) create mode 100644 src/test/modules/test_aio/t/004_read_stream.pl diff --git a/src/test/modules/test_aio/meson.build b/src/test/modules/test_aio/meson.build index 18a797f3a3b..909f81d96c1 100644 --- a/src/test/modules/test_aio/meson.build +++ b/src/test/modules/test_aio/meson.build @@ -33,6 +33,7 @@ tests += { 't/001_aio.pl', 't/002_io_workers.pl', 't/003_initdb.pl', + 't/004_read_stream.pl', ], }, } diff --git a/src/test/modules/test_aio/t/004_read_stream.pl b/src/test/modules/test_aio/t/004_read_stream.pl new file mode 100644 index 00000000000..17a68e35c1d --- /dev/null +++ b/src/test/modules/test_aio/t/004_read_stream.pl @@ -0,0 +1,286 @@ +# Copyright (c) 2025, PostgreSQL Global Development Group + +use strict; +use warnings FATAL => 'all'; + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +use FindBin; +use lib $FindBin::RealBin; + +use TestAio; + + +my $node = PostgreSQL::Test::Cluster->new('test'); +$node->init(); + +$node->append_conf( + 'postgresql.conf', qq( +shared_preload_libraries=test_aio +log_min_messages = 'DEBUG3' +log_statement=all +log_error_verbosity=default +restart_after_crash=false +temp_buffers=100 +max_connections=8 +io_method=worker +)); + +$node->start(); +test_setup($node); +$node->stop(); + + +foreach my $method (TestAio::supported_io_methods()) +{ + $node->adjust_conf('postgresql.conf', 'io_method', $method); + $node->start(); + test_io_method($method, $node); + $node->stop(); +} + +done_testing(); + + +sub test_setup +{ + my $node = shift; + + $node->safe_psql( + 'postgres', qq( +CREATE EXTENSION test_aio; + +CREATE TABLE largeish(k int not null) WITH (FILLFACTOR=10); +INSERT INTO largeish(k) SELECT generate_series(1, 10000); +)); + ok(1, "setup"); +} + + +sub test_repeated_blocks +{ + my $io_method = shift; + my $node = shift; + + my $psql = $node->background_psql('postgres', on_error_stop => 0); + + # Preventing larger reads makes testing easier + $psql->query_safe( + qq/ SET io_combine_limit = 1; /); + + # test miss of the same block twice in a row + $psql->query_safe( + qq/ SELECT evict_rel('largeish'); /); + + # block 0 grows the distance enough that the stream will look ahead and try + # to start a pending read for block 2 (and later block 4) twice before + # returning any buffers. + $psql->query_safe( + qq/ SELECT * FROM read_stream_for_blocks('largeish', + ARRAY[0, 2, 2, 4, 4]); /); + + ok(1, "$io_method: stream missing the same block repeatedly"); + + $psql->query_safe( + qq/ SELECT * FROM read_stream_for_blocks('largeish', + ARRAY[0, 2, 2, 4, 4]); /); + ok(1, "$io_method: stream hitting the same block repeatedly"); + + # test hit of the same block twice in a row + $psql->query_safe( + qq/ SELECT evict_rel('largeish'); /); + $psql->query_safe( + qq/ SELECT * FROM read_stream_for_blocks('largeish', + ARRAY[0, 1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1, 0]); /); + ok(1, "$io_method: stream accessing same block"); + + # Test repeated blocks with a temp table, using invalidate_rel_block() + # to evict individual local buffers. + $psql->query_safe( + qq/ CREATE TEMP TABLE largeish_temp(k int not null) WITH (FILLFACTOR=10); + INSERT INTO largeish_temp(k) SELECT generate_series(1, 200); /); + + # Evict the specific blocks we'll request to force misses + $psql->query_safe( + qq/ SELECT invalidate_rel_block('largeish_temp', 0); /); + $psql->query_safe( + qq/ SELECT invalidate_rel_block('largeish_temp', 2); /); + $psql->query_safe( + qq/ SELECT invalidate_rel_block('largeish_temp', 4); /); + + $psql->query_safe( + qq/ SELECT * FROM read_stream_for_blocks('largeish_temp', + ARRAY[0, 2, 2, 4, 4]); /); + ok(1, "$io_method: temp stream missing the same block repeatedly"); + + # Now the blocks are cached, so repeated access should be hits + $psql->query_safe( + qq/ SELECT * FROM read_stream_for_blocks('largeish_temp', + ARRAY[0, 2, 2, 4, 4]); /); + ok(1, "$io_method: temp stream hitting the same block repeatedly"); + + $psql->quit(); +} + + +sub test_inject_foreign +{ + my $io_method = shift; + my $node = shift; + + my $psql_a = $node->background_psql('postgres', on_error_stop => 0); + my $psql_b = $node->background_psql('postgres', on_error_stop => 0); + + my $pid_a = $psql_a->query_safe(qq/SELECT pg_backend_pid();/); + + + ### + # Test read stream encountering buffers undergoing IO in another backend, + # with the other backend's reads succeeding. + ### + $psql_a->query_safe( + qq/ SELECT evict_rel('largeish'); /); + + $psql_b->query_safe( + qq/ SELECT inj_io_completion_wait(pid=>pg_backend_pid(), + relfilenode=>pg_relation_filenode('largeish')); /); + + $psql_b->{stdin} .= qq/ SELECT read_rel_block_ll('largeish', + blockno=>5, nblocks=>1);\n/; + $psql_b->{run}->pump_nb(); + + $node->poll_query_until( + 'postgres', qq/ SELECT wait_event FROM pg_stat_activity + WHERE wait_event = 'completion_wait'; /, + 'completion_wait'); + + # Block 5 is undergoing IO in session b, so session a will move on to start + # a new IO for block 7. + $psql_a->{stdin} .= qq/ SELECT array_agg(blocknum) FROM + read_stream_for_blocks('largeish', ARRAY[0, 2, 5, 7]);\n/; + $psql_a->{run}->pump_nb(); + + $node->poll_query_until('postgres', + qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid_a), + 'AioIoCompletion'); + + $node->safe_psql('postgres', qq/SELECT inj_io_completion_continue()/); + + pump_until( + $psql_a->{run}, $psql_a->{timeout}, + \$psql_a->{stdout}, qr/\{0,2,5,7\}/); + + ok(1, qq/$io_method: read stream encounters succeeding IO by another backend/); + + ### + # Test read stream encountering buffers undergoing IO in another backend, + # with the other backend's reads failing. + ### + $psql_a->query_safe( + qq/ SELECT evict_rel('largeish'); /); + + $psql_b->query_safe( + qq/ SELECT inj_io_completion_wait(pid=>pg_backend_pid(), + relfilenode=>pg_relation_filenode('largeish')); /); + + $psql_b->query_safe( + qq/ SELECT inj_io_short_read_attach(-errno_from_string('EIO'), + pid=>pg_backend_pid(), + relfilenode=>pg_relation_filenode('largeish')); /); + + $psql_b->{stdin} .= qq/ SELECT read_rel_block_ll('largeish', + blockno=>5, nblocks=>1);\n/; + $psql_b->{run}->pump_nb(); + + $node->poll_query_until('postgres', + qq/ SELECT wait_event FROM pg_stat_activity + WHERE wait_event = 'completion_wait'; /, + 'completion_wait'); + + $psql_a->{stdin} .= qq/ SELECT array_agg(blocknum) FROM + read_stream_for_blocks('largeish', ARRAY[0, 2, 5, 7]);\n/; + $psql_a->{run}->pump_nb(); + + $node->poll_query_until('postgres', + qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid_a), + 'AioIoCompletion'); + + $node->safe_psql('postgres', qq/SELECT inj_io_completion_continue()/); + + pump_until( + $psql_a->{run}, $psql_a->{timeout}, + \$psql_a->{stdout}, qr/\{0,2,5,7\}/); + + pump_until( + $psql_b->{run}, $psql_b->{timeout}, + \$psql_b->{stderr}, qr/ERROR.*could not read blocks 5\.\.5/); + ok(1, "$io_method: injected error occurred"); + $psql_b->{stderr} = ''; + $psql_b->query_safe(qq/SELECT inj_io_short_read_detach();/); + + ok(1, + qq/$io_method: read stream encounters failing IO by another backend/); + + + ### + # Test read stream encountering two buffers that are undergoing the same + # IO, started by another backend. + ### + $psql_a->query_safe( + qq/ SELECT evict_rel('largeish'); /); + + $psql_b->query_safe( + qq/ SELECT inj_io_completion_wait(pid=>pg_backend_pid(), + relfilenode=>pg_relation_filenode('largeish')); /); + + $psql_b->{stdin} .= qq/ SELECT read_rel_block_ll('largeish', + blockno=>2, nblocks=>3);\n/; + $psql_b->{run}->pump_nb(); + + $node->poll_query_until('postgres', + qq/ SELECT wait_event FROM pg_stat_activity + WHERE wait_event = 'completion_wait'; /, + 'completion_wait'); + + # Blocks 2 and 4 are undergoing IO initiated by session a + $psql_a->{stdin} .= qq/ SELECT array_agg(blocknum) FROM + read_stream_for_blocks('largeish', ARRAY[0, 2, 4]);\n/; + $psql_a->{run}->pump_nb(); + + $node->poll_query_until('postgres', + qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid_a), + 'AioIoCompletion'); + + $node->safe_psql('postgres', + qq/ SELECT inj_io_completion_continue() /); + + pump_until( + $psql_a->{run}, $psql_a->{timeout}, + \$psql_a->{stdout}, qr/\{0,2,4\}/); + + ok(1, qq/$io_method: read stream encounters two buffer read in one IO/); + + $psql_a->quit(); + $psql_b->quit(); +} + + +sub test_io_method +{ + my $io_method = shift; + my $node = shift; + + is($node->safe_psql('postgres', 'SHOW io_method'), + $io_method, "$io_method: io_method set correctly"); + + test_repeated_blocks($io_method, $node); + + SKIP: + { + skip 'Injection points not supported by this build', 1 + unless $ENV{enable_injection_points} eq 'yes'; + test_inject_foreign($io_method, $node); + } +} diff --git a/src/test/modules/test_aio/test_aio--1.0.sql b/src/test/modules/test_aio/test_aio--1.0.sql index e495481c41e..1cc4734a746 100644 --- a/src/test/modules/test_aio/test_aio--1.0.sql +++ b/src/test/modules/test_aio/test_aio--1.0.sql @@ -33,6 +33,10 @@ CREATE FUNCTION read_rel_block_ll( RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME' LANGUAGE C; +CREATE FUNCTION evict_rel(rel regclass) +RETURNS pg_catalog.void STRICT +AS 'MODULE_PATHNAME' LANGUAGE C; + CREATE FUNCTION invalidate_rel_block(rel regclass, blockno int) RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME' LANGUAGE C; @@ -50,6 +54,14 @@ RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME' LANGUAGE C; +/* + * Read stream related functions + */ +CREATE FUNCTION read_stream_for_blocks(rel regclass, blocks int4[], OUT blockoff int4, OUT blocknum int4, OUT buf int4) +RETURNS SETOF record STRICT +AS 'MODULE_PATHNAME' LANGUAGE C; + + /* * Handle related functions @@ -91,8 +103,16 @@ AS 'MODULE_PATHNAME' LANGUAGE C; /* * Injection point related functions */ -CREATE FUNCTION inj_io_short_read_attach(result int) -RETURNS pg_catalog.void STRICT +CREATE FUNCTION inj_io_completion_wait(pid int DEFAULT NULL, relfilenode oid DEFAULT 0) +RETURNS pg_catalog.void +AS 'MODULE_PATHNAME' LANGUAGE C; + +CREATE FUNCTION inj_io_completion_continue() +RETURNS pg_catalog.void +AS 'MODULE_PATHNAME' LANGUAGE C; + +CREATE FUNCTION inj_io_short_read_attach(result int, pid int DEFAULT NULL, relfilenode oid DEFAULT 0) +RETURNS pg_catalog.void AS 'MODULE_PATHNAME' LANGUAGE C; CREATE FUNCTION inj_io_short_read_detach() diff --git a/src/test/modules/test_aio/test_aio.c b/src/test/modules/test_aio/test_aio.c index b1aa8af9ec0..061f0c9f92a 100644 --- a/src/test/modules/test_aio/test_aio.c +++ b/src/test/modules/test_aio/test_aio.c @@ -19,17 +19,26 @@ #include "postgres.h" #include "access/relation.h" +#include "catalog/pg_type.h" #include "fmgr.h" +#include "funcapi.h" #include "storage/aio.h" #include "storage/aio_internal.h" #include "storage/buf_internals.h" #include "storage/bufmgr.h" #include "storage/checksum.h" +#include "storage/condition_variable.h" #include "storage/ipc.h" #include "storage/lwlock.h" +#include "storage/proc.h" +#include "storage/procnumber.h" +#include "storage/read_stream.h" +#include "utils/array.h" #include "utils/builtins.h" #include "utils/injection_point.h" #include "utils/rel.h" +#include "utils/tuplestore.h" +#include "utils/wait_event.h" PG_MODULE_MAGIC; @@ -37,13 +46,30 @@ PG_MODULE_MAGIC; typedef struct InjIoErrorState { + ConditionVariable cv; + bool enabled_short_read; bool enabled_reopen; + bool enabled_completion_wait; + Oid completion_wait_relfilenode; + pid_t completion_wait_pid; + uint32 completion_wait_event; + bool short_read_result_set; + Oid short_read_relfilenode; + pid_t short_read_pid; int short_read_result; } InjIoErrorState; +typedef struct BlocksReadStreamData +{ + int nblocks; + int curblock; + uint32 *blocks; +} BlocksReadStreamData; + + static InjIoErrorState *inj_io_error_state; /* Shared memory init callbacks */ @@ -85,10 +111,13 @@ test_aio_shmem_startup(void) inj_io_error_state->enabled_short_read = false; inj_io_error_state->enabled_reopen = false; + ConditionVariableInit(&inj_io_error_state->cv); + inj_io_error_state->completion_wait_event = WaitEventInjectionPointNew("completion_wait"); + #ifdef USE_INJECTION_POINTS InjectionPointAttach("aio-process-completion-before-shared", "test_aio", - "inj_io_short_read", + "inj_io_completion_hook", NULL, 0); InjectionPointLoad("aio-process-completion-before-shared"); @@ -384,7 +413,7 @@ read_rel_block_ll(PG_FUNCTION_ARGS) if (nblocks <= 0 || nblocks > PG_IOV_MAX) elog(ERROR, "nblocks is out of range"); - rel = relation_open(relid, AccessExclusiveLock); + rel = relation_open(relid, AccessShareLock); for (int i = 0; i < nblocks; i++) { @@ -458,6 +487,27 @@ read_rel_block_ll(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } +PG_FUNCTION_INFO_V1(evict_rel); +Datum +evict_rel(PG_FUNCTION_ARGS) +{ + Oid relid = PG_GETARG_OID(0); + Relation rel; + int32 buffers_evicted, + buffers_flushed, + buffers_skipped; + + rel = relation_open(relid, AccessExclusiveLock); + + EvictRelUnpinnedBuffers(rel, &buffers_evicted, &buffers_flushed, + &buffers_skipped); + + relation_close(rel, AccessExclusiveLock); + + + PG_RETURN_VOID(); +} + PG_FUNCTION_INFO_V1(invalidate_rel_block); Datum invalidate_rel_block(PG_FUNCTION_ARGS) @@ -610,6 +660,86 @@ buffer_call_terminate_io(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } + +static BlockNumber +read_stream_for_blocks_cb(ReadStream *stream, + void *callback_private_data, + void *per_buffer_data) +{ + BlocksReadStreamData *stream_data = callback_private_data; + + if (stream_data->curblock >= stream_data->nblocks) + return InvalidBlockNumber; + return stream_data->blocks[stream_data->curblock++]; +} + +PG_FUNCTION_INFO_V1(read_stream_for_blocks); +Datum +read_stream_for_blocks(PG_FUNCTION_ARGS) +{ + Oid relid = PG_GETARG_OID(0); + ArrayType *blocksarray = PG_GETARG_ARRAYTYPE_P(1); + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + Relation rel; + BlocksReadStreamData stream_data; + ReadStream *stream; + + InitMaterializedSRF(fcinfo, 0); + + /* + * We expect the input to be an N-element int4 array; verify that. We + * don't need to use deconstruct_array() since the array data is just + * going to look like a C array of N int4 values. + */ + if (ARR_NDIM(blocksarray) != 1 || + ARR_HASNULL(blocksarray) || + ARR_ELEMTYPE(blocksarray) != INT4OID) + elog(ERROR, "expected 1 dimensional int4 array"); + + stream_data.curblock = 0; + stream_data.nblocks = ARR_DIMS(blocksarray)[0]; + stream_data.blocks = (uint32 *) ARR_DATA_PTR(blocksarray); + + rel = relation_open(relid, AccessShareLock); + + stream = read_stream_begin_relation(READ_STREAM_FULL, + NULL, + rel, + MAIN_FORKNUM, + read_stream_for_blocks_cb, + &stream_data, + 0); + + for (int i = 0; i < stream_data.nblocks; i++) + { + Buffer buf = read_stream_next_buffer(stream, NULL); + Datum values[3] = {0}; + bool nulls[3] = {0}; + + if (!BufferIsValid(buf)) + elog(ERROR, "read_stream_next_buffer() call %d is unexpectedly invalid", i); + + values[0] = Int32GetDatum(i); + values[1] = UInt32GetDatum(stream_data.blocks[i]); + values[2] = UInt32GetDatum(buf); + + tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls); + + ReleaseBuffer(buf); + } + + if (read_stream_next_buffer(stream, NULL) != InvalidBuffer) + elog(ERROR, "read_stream_next_buffer() call %d is unexpectedly valid", + stream_data.nblocks + 1); + + read_stream_end(stream); + + relation_close(rel, NoLock); + + return (Datum) 0; +} + + PG_FUNCTION_INFO_V1(handle_get); Datum handle_get(PG_FUNCTION_ARGS) @@ -680,15 +810,98 @@ batch_end(PG_FUNCTION_ARGS) } #ifdef USE_INJECTION_POINTS -extern PGDLLEXPORT void inj_io_short_read(const char *name, - const void *private_data, - void *arg); +extern PGDLLEXPORT void inj_io_completion_hook(const char *name, + const void *private_data, + void *arg); extern PGDLLEXPORT void inj_io_reopen(const char *name, const void *private_data, void *arg); -void -inj_io_short_read(const char *name, const void *private_data, void *arg) +static bool +inj_io_short_read_matches(PgAioHandle *ioh) +{ + PGPROC *owner_proc; + int32 owner_pid; + PgAioTargetData *td; + + if (!inj_io_error_state->enabled_short_read) + return false; + + if (!inj_io_error_state->short_read_result_set) + return false; + + owner_proc = GetPGProcByNumber(pgaio_io_get_owner(ioh)); + owner_pid = owner_proc->pid; + + if (inj_io_error_state->short_read_pid != 0 && + inj_io_error_state->short_read_pid != owner_pid) + return false; + + td = pgaio_io_get_target_data(ioh); + + if (inj_io_error_state->short_read_relfilenode != InvalidOid && + td->smgr.rlocator.relNumber != inj_io_error_state->short_read_relfilenode) + return false; + + /* + * Only shorten reads that are actually longer than the target size, + * otherwise we can trigger over-reads. + */ + if (inj_io_error_state->short_read_result >= ioh->result) + return false; + + return true; +} + +static bool +inj_io_completion_wait_matches(PgAioHandle *ioh) +{ + PGPROC *owner_proc; + int32 owner_pid; + PgAioTargetData *td; + + if (!inj_io_error_state->enabled_completion_wait) + return false; + + owner_proc = GetPGProcByNumber(pgaio_io_get_owner(ioh)); + owner_pid = owner_proc->pid; + + if (inj_io_error_state->completion_wait_pid != owner_pid) + return false; + + td = pgaio_io_get_target_data(ioh); + + if (inj_io_error_state->completion_wait_relfilenode != InvalidOid && + td->smgr.rlocator.relNumber != inj_io_error_state->completion_wait_relfilenode) + return false; + + return true; +} + +static void +inj_io_completion_wait_hook(const char *name, const void *private_data, void *arg) +{ + PgAioHandle *ioh = (PgAioHandle *) arg; + + if (!inj_io_completion_wait_matches(ioh)) + return; + + ConditionVariablePrepareToSleep(&inj_io_error_state->cv); + + while (true) + { + if (!inj_io_completion_wait_matches(ioh)) + break; + + ConditionVariableSleep(&inj_io_error_state->cv, + inj_io_error_state->completion_wait_event); + } + + ConditionVariableCancelSleep(); +} + +static void +inj_io_short_read_hook(const char *name, const void *private_data, void *arg) { PgAioHandle *ioh = (PgAioHandle *) arg; @@ -697,58 +910,56 @@ inj_io_short_read(const char *name, const void *private_data, void *arg) inj_io_error_state->enabled_reopen), errhidestmt(true), errhidecontext(true)); - if (inj_io_error_state->enabled_short_read) + if (inj_io_short_read_matches(ioh)) { + struct iovec *iov = &pgaio_ctl->iovecs[ioh->iovec_off]; + int32 old_result = ioh->result; + int32 new_result = inj_io_error_state->short_read_result; + int32 processed = 0; + + ereport(LOG, + errmsg("short read inject point, changing result from %d to %d", + old_result, new_result), + errhidestmt(true), errhidecontext(true)); + /* - * Only shorten reads that are actually longer than the target size, - * otherwise we can trigger over-reads. + * The underlying IO actually completed OK, and thus the "invalid" + * portion of the IOV actually contains valid data. That can hide a + * lot of problems, e.g. if we were to wrongly mark a buffer, that + * wasn't read according to the shortened-read, IO as valid, the + * contents would look valid and we might miss a bug. + * + * To avoid that, iterate through the IOV and zero out the "failed" + * portion of the IO. */ - if (inj_io_error_state->short_read_result_set - && ioh->op == PGAIO_OP_READV - && inj_io_error_state->short_read_result <= ioh->result) + for (int i = 0; i < ioh->op_data.read.iov_length; i++) { - struct iovec *iov = &pgaio_ctl->iovecs[ioh->iovec_off]; - int32 old_result = ioh->result; - int32 new_result = inj_io_error_state->short_read_result; - int32 processed = 0; - - ereport(LOG, - errmsg("short read inject point, changing result from %d to %d", - old_result, new_result), - errhidestmt(true), errhidecontext(true)); - - /* - * The underlying IO actually completed OK, and thus the "invalid" - * portion of the IOV actually contains valid data. That can hide - * a lot of problems, e.g. if we were to wrongly mark a buffer, - * that wasn't read according to the shortened-read, IO as valid, - * the contents would look valid and we might miss a bug. - * - * To avoid that, iterate through the IOV and zero out the - * "failed" portion of the IO. - */ - for (int i = 0; i < ioh->op_data.read.iov_length; i++) + if (processed + iov[i].iov_len <= new_result) + processed += iov[i].iov_len; + else if (processed <= new_result) { - if (processed + iov[i].iov_len <= new_result) - processed += iov[i].iov_len; - else if (processed <= new_result) - { - uint32 ok_part = new_result - processed; - - memset((char *) iov[i].iov_base + ok_part, 0, iov[i].iov_len - ok_part); - processed += iov[i].iov_len; - } - else - { - memset((char *) iov[i].iov_base, 0, iov[i].iov_len); - } - } + uint32 ok_part = new_result - processed; - ioh->result = new_result; + memset((char *) iov[i].iov_base + ok_part, 0, iov[i].iov_len - ok_part); + processed += iov[i].iov_len; + } + else + { + memset((char *) iov[i].iov_base, 0, iov[i].iov_len); + } } + + ioh->result = new_result; } } +void +inj_io_completion_hook(const char *name, const void *private_data, void *arg) +{ + inj_io_completion_wait_hook(name, private_data, arg); + inj_io_short_read_hook(name, private_data, arg); +} + void inj_io_reopen(const char *name, const void *private_data, void *arg) { @@ -762,6 +973,39 @@ inj_io_reopen(const char *name, const void *private_data, void *arg) } #endif +PG_FUNCTION_INFO_V1(inj_io_completion_wait); +Datum +inj_io_completion_wait(PG_FUNCTION_ARGS) +{ +#ifdef USE_INJECTION_POINTS + inj_io_error_state->enabled_completion_wait = true; + inj_io_error_state->completion_wait_pid = + PG_ARGISNULL(0) ? 0 : PG_GETARG_INT32(0); + inj_io_error_state->completion_wait_relfilenode = + PG_ARGISNULL(1) ? InvalidOid : PG_GETARG_OID(1); +#else + elog(ERROR, "injection points not supported"); +#endif + + PG_RETURN_VOID(); +} + +PG_FUNCTION_INFO_V1(inj_io_completion_continue); +Datum +inj_io_completion_continue(PG_FUNCTION_ARGS) +{ +#ifdef USE_INJECTION_POINTS + inj_io_error_state->enabled_completion_wait = false; + inj_io_error_state->completion_wait_pid = 0; + inj_io_error_state->completion_wait_relfilenode = InvalidOid; + ConditionVariableBroadcast(&inj_io_error_state->cv); +#else + elog(ERROR, "injection points not supported"); +#endif + + PG_RETURN_VOID(); +} + PG_FUNCTION_INFO_V1(inj_io_short_read_attach); Datum inj_io_short_read_attach(PG_FUNCTION_ARGS) @@ -771,6 +1015,10 @@ inj_io_short_read_attach(PG_FUNCTION_ARGS) inj_io_error_state->short_read_result_set = !PG_ARGISNULL(0); if (inj_io_error_state->short_read_result_set) inj_io_error_state->short_read_result = PG_GETARG_INT32(0); + inj_io_error_state->short_read_pid = + PG_ARGISNULL(1) ? 0 : PG_GETARG_INT32(1); + inj_io_error_state->short_read_relfilenode = + PG_ARGISNULL(2) ? 0 : PG_GETARG_OID(2); #else elog(ERROR, "injection points not supported"); #endif diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 174e2798443..340662cf72c 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -305,6 +305,7 @@ BlockSampler BlockSamplerData BlockedProcData BlockedProcsData +BlocksReadStreamData BlocktableEntry BloomBuildState BloomFilter -- 2.43.0
From 0606856c97cec2da29a70fa5fedfb0ec4bbed842 Mon Sep 17 00:00:00 2001 From: Melanie Plageman <[email protected]> Date: Mon, 16 Mar 2026 16:50:56 -0400 Subject: [PATCH v6 3/8] Fix off-by-one error in read IO tracing AsyncReadBuffer()'s no-IO needed path passed TRACE_POSTGRESQL_BUFFER_READ_DONE the wrong block number because it had already incremented operation->nblocks_done. Fix by folding the nblocks_done offset into the blocknum local variable at initialization. Author: Melanie Plageman <[email protected]> Reviewed-by: Andres Freund <[email protected]> Discussion: https://postgr.es/m/u73un3xeljr4fiidzwi4ikcr6vm7oqugn4fo5vqpstjio6anl2%40hph6fvdiiria --- src/backend/storage/buffer/bufmgr.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 00bc609529a..10afae1990b 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -1875,10 +1875,10 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) { Buffer *buffers = &operation->buffers[0]; int flags = operation->flags; - BlockNumber blocknum = operation->blocknum; ForkNumber forknum = operation->forknum; char persistence = operation->persistence; int16 nblocks_done = operation->nblocks_done; + BlockNumber blocknum = operation->blocknum + nblocks_done; Buffer *io_buffers = &operation->buffers[nblocks_done]; int io_buffers_len = 0; PgAioHandle *ioh; @@ -1990,7 +1990,7 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) * must have started out as a miss in PinBufferForBlock(). The other * backend will track this as a 'read'. */ - TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, blocknum + operation->nblocks_done, + TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, blocknum, operation->smgr->smgr_rlocator.locator.spcOid, operation->smgr->smgr_rlocator.locator.dbOid, operation->smgr->smgr_rlocator.locator.relNumber, @@ -2062,7 +2062,7 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) */ io_start = pgstat_prepare_io_time(track_io_timing); smgrstartreadv(ioh, operation->smgr, forknum, - blocknum + nblocks_done, + blocknum, io_pages, io_buffers_len); pgstat_count_io_op_time(io_object, io_context, IOOP_READ, io_start, 1, io_buffers_len * BLCKSZ); -- 2.43.0
From fbfd1d9df11d81903e5810c5165ca9e234a6aa26 Mon Sep 17 00:00:00 2001 From: Melanie Plageman <[email protected]> Date: Tue, 17 Mar 2026 15:49:52 -0400 Subject: [PATCH v6 4/8] Pass io_object and io_context through to PinBufferForBlock() PinBufferForBlock() is always_inline and called in a loop in StartReadBuffersImpl(). Previously it computed io_context and io_object internally, which required calling IOContextForStrategy() -- a non-inline function the compiler cannot prove is side-effect-free. This could potential cause unneeded redundant function calls. Compute io_context and io_object in the callers instead, allowing StartReadBuffersImpl() to do so once before entering the loop. Suggested-by: Andres Freund <[email protected]> --- src/backend/storage/buffer/bufmgr.c | 45 ++++++++++++++++++++--------- 1 file changed, 31 insertions(+), 14 deletions(-) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 10afae1990b..ab9c2a4b904 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -1223,11 +1223,11 @@ PinBufferForBlock(Relation rel, ForkNumber forkNum, BlockNumber blockNum, BufferAccessStrategy strategy, + IOObject io_object, + IOContext io_context, bool *foundPtr) { BufferDesc *bufHdr; - IOContext io_context; - IOObject io_object; Assert(blockNum != P_NEW); @@ -1236,17 +1236,6 @@ PinBufferForBlock(Relation rel, persistence == RELPERSISTENCE_PERMANENT || persistence == RELPERSISTENCE_UNLOGGED)); - if (persistence == RELPERSISTENCE_TEMP) - { - io_context = IOCONTEXT_NORMAL; - io_object = IOOBJECT_TEMP_RELATION; - } - else - { - io_context = IOContextForStrategy(strategy); - io_object = IOOBJECT_RELATION; - } - TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum, smgr->smgr_rlocator.locator.spcOid, smgr->smgr_rlocator.locator.dbOid, @@ -1339,9 +1328,23 @@ ReadBuffer_common(Relation rel, SMgrRelation smgr, char smgr_persistence, mode == RBM_ZERO_AND_LOCK)) { bool found; + IOContext io_context; + IOObject io_object; + + if (persistence == RELPERSISTENCE_TEMP) + { + io_context = IOCONTEXT_NORMAL; + io_object = IOOBJECT_TEMP_RELATION; + } + else + { + io_context = IOContextForStrategy(strategy); + io_object = IOOBJECT_RELATION; + } buffer = PinBufferForBlock(rel, smgr, persistence, - forkNum, blockNum, strategy, &found); + forkNum, blockNum, strategy, + io_object, io_context, &found); ZeroAndLockBuffer(buffer, mode, found); return buffer; } @@ -1379,11 +1382,24 @@ StartReadBuffersImpl(ReadBuffersOperation *operation, int actual_nblocks = *nblocks; int maxcombine = 0; bool did_start_io; + IOContext io_context; + IOObject io_object; Assert(*nblocks == 1 || allow_forwarding); Assert(*nblocks > 0); Assert(*nblocks <= MAX_IO_COMBINE_LIMIT); + if (operation->persistence == RELPERSISTENCE_TEMP) + { + io_context = IOCONTEXT_NORMAL; + io_object = IOOBJECT_TEMP_RELATION; + } + else + { + io_context = IOContextForStrategy(operation->strategy); + io_object = IOOBJECT_RELATION; + } + for (int i = 0; i < actual_nblocks; ++i) { bool found; @@ -1432,6 +1448,7 @@ StartReadBuffersImpl(ReadBuffersOperation *operation, operation->forknum, blockNum + i, operation->strategy, + io_object, io_context, &found); } -- 2.43.0
From 9e57a5deff40b0e3272809a048fc6950646f8146 Mon Sep 17 00:00:00 2001 From: Melanie Plageman <[email protected]> Date: Fri, 23 Jan 2026 13:54:02 -0500 Subject: [PATCH v6 5/8] Make buffer hit helper Already two places count buffer hits, requiring quite a few lines of code since we do accounting in so many places. Future commits will add more locations, so refactor into a helper. Reviewed-by: Nazir Bilal Yavuz <[email protected]> Discussion: https://postgr.es/m/flat/zljergweqti7x67lg5ije2rzjusie37nslsnkjkkby4laqqbfw%403p3zu522yykv --- src/backend/storage/buffer/bufmgr.c | 84 ++++++++++++++--------------- 1 file changed, 42 insertions(+), 42 deletions(-) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index ab9c2a4b904..fa85570a791 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -648,6 +648,11 @@ static inline BufferDesc *BufferAlloc(SMgrRelation smgr, bool *foundPtr, IOContext io_context); static bool AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress); static void CheckReadBuffersOperation(ReadBuffersOperation *operation, bool is_complete); + +static pg_attribute_always_inline void TrackBufferHit(IOObject io_object, + IOContext io_context, + Relation rel, char persistence, SMgrRelation smgr, + ForkNumber forknum, BlockNumber blocknum); static Buffer GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context); static void FlushUnlockedBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, IOContext io_context); @@ -1243,18 +1248,14 @@ PinBufferForBlock(Relation rel, smgr->smgr_rlocator.backend); if (persistence == RELPERSISTENCE_TEMP) - { bufHdr = LocalBufferAlloc(smgr, forkNum, blockNum, foundPtr); - if (*foundPtr) - pgBufferUsage.local_blks_hit++; - } else - { bufHdr = BufferAlloc(smgr, persistence, forkNum, blockNum, strategy, foundPtr, io_context); - if (*foundPtr) - pgBufferUsage.shared_blks_hit++; - } + + if (*foundPtr) + TrackBufferHit(io_object, io_context, rel, persistence, smgr, forkNum, blockNum); + if (rel) { /* @@ -1263,21 +1264,6 @@ PinBufferForBlock(Relation rel, * zeroed instead), the per-relation stats always count them. */ pgstat_count_buffer_read(rel); - if (*foundPtr) - pgstat_count_buffer_hit(rel); - } - if (*foundPtr) - { - pgstat_count_io_op(io_object, io_context, IOOP_HIT, 1, 0); - if (VacuumCostActive) - VacuumCostBalance += VacuumCostPageHit; - - TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum, - smgr->smgr_rlocator.locator.spcOid, - smgr->smgr_rlocator.locator.dbOid, - smgr->smgr_rlocator.locator.relNumber, - smgr->smgr_rlocator.backend, - true); } return BufferDescriptorGetBuffer(bufHdr); @@ -1712,6 +1698,37 @@ ReadBuffersCanStartIO(Buffer buffer, bool nowait) return ReadBuffersCanStartIOOnce(buffer, nowait); } +/* + * We track various stats related to buffer hits. Because this is done in a + * few separate places, this helper exists for convenience. + */ +static pg_attribute_always_inline void +TrackBufferHit(IOObject io_object, IOContext io_context, + Relation rel, char persistence, SMgrRelation smgr, + ForkNumber forknum, BlockNumber blocknum) +{ + TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, + blocknum, + smgr->smgr_rlocator.locator.spcOid, + smgr->smgr_rlocator.locator.dbOid, + smgr->smgr_rlocator.locator.relNumber, + smgr->smgr_rlocator.backend, + true); + + if (persistence == RELPERSISTENCE_TEMP) + pgBufferUsage.local_blks_hit += 1; + else + pgBufferUsage.shared_blks_hit += 1; + + pgstat_count_io_op(io_object, io_context, IOOP_HIT, 1, 0); + + if (VacuumCostActive) + VacuumCostBalance += VacuumCostPageHit; + + if (rel) + pgstat_count_buffer_hit(rel); +} + /* * Helper for WaitReadBuffers() that processes the results of a readv * operation, raising an error if necessary. @@ -2007,25 +2024,8 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) * must have started out as a miss in PinBufferForBlock(). The other * backend will track this as a 'read'. */ - TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, blocknum, - operation->smgr->smgr_rlocator.locator.spcOid, - operation->smgr->smgr_rlocator.locator.dbOid, - operation->smgr->smgr_rlocator.locator.relNumber, - operation->smgr->smgr_rlocator.backend, - true); - - if (persistence == RELPERSISTENCE_TEMP) - pgBufferUsage.local_blks_hit += 1; - else - pgBufferUsage.shared_blks_hit += 1; - - if (operation->rel) - pgstat_count_buffer_hit(operation->rel); - - pgstat_count_io_op(io_object, io_context, IOOP_HIT, 1, 0); - - if (VacuumCostActive) - VacuumCostBalance += VacuumCostPageHit; + TrackBufferHit(io_object, io_context, operation->rel, persistence, + operation->smgr, forknum, blocknum); } else { -- 2.43.0
From b73b896febc35253ca2607cb0fe143355b91256f Mon Sep 17 00:00:00 2001 From: Melanie Plageman <[email protected]> Date: Wed, 18 Mar 2026 11:09:58 -0400 Subject: [PATCH v6 6/8] Restructure AsyncReadBuffers() Restructure AsyncReadBuffers() to use early return when the head buffer is already valid, instead of using a did_start_io flag and if/else branches. Also move around a bit of the code to be located closer to where it is used. This is a refactor only. --- src/backend/storage/buffer/bufmgr.c | 208 ++++++++++++++-------------- 1 file changed, 103 insertions(+), 105 deletions(-) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index fa85570a791..a9995b75917 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -1920,21 +1920,12 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) void *io_pages[MAX_IO_COMBINE_LIMIT]; IOContext io_context; IOObject io_object; - bool did_start_io; - - /* - * When this IO is executed synchronously, either because the caller will - * immediately block waiting for the IO or because IOMETHOD_SYNC is used, - * the AIO subsystem needs to know. - */ - if (flags & READ_BUFFERS_SYNCHRONOUSLY) - ioh_flags |= PGAIO_HF_SYNCHRONOUS; + instr_time io_start; if (persistence == RELPERSISTENCE_TEMP) { io_context = IOCONTEXT_NORMAL; io_object = IOOBJECT_TEMP_RELATION; - ioh_flags |= PGAIO_HF_REFERENCES_LOCAL; } else { @@ -1942,35 +1933,6 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) io_object = IOOBJECT_RELATION; } - /* - * If zero_damaged_pages is enabled, add the READ_BUFFERS_ZERO_ON_ERROR - * flag. The reason for that is that, hopefully, zero_damaged_pages isn't - * set globally, but on a per-session basis. The completion callback, - * which may be run in other processes, e.g. in IO workers, may have a - * different value of the zero_damaged_pages GUC. - * - * XXX: We probably should eventually use a different flag for - * zero_damaged_pages, so we can report different log levels / error codes - * for zero_damaged_pages and ZERO_ON_ERROR. - */ - if (zero_damaged_pages) - flags |= READ_BUFFERS_ZERO_ON_ERROR; - - /* - * For the same reason as with zero_damaged_pages we need to use this - * backend's ignore_checksum_failure value. - */ - if (ignore_checksum_failure) - flags |= READ_BUFFERS_IGNORE_CHECKSUM_FAILURES; - - - /* - * To be allowed to report stats in the local completion callback we need - * to prepare to report stats now. This ensures we can safely report the - * checksum failure even in a critical section. - */ - pgstat_prepare_report_checksum_failure(operation->smgr->smgr_rlocator.locator.dbOid); - /* * Get IO handle before ReadBuffersCanStartIO(), as pgaio_io_acquire() * might block, which we don't want after setting IO_IN_PROGRESS. @@ -1992,7 +1954,6 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) if (unlikely(!ioh)) { pgaio_submit_staged(); - ioh = pgaio_io_acquire(CurrentResourceOwner, &operation->io_return); } @@ -2017,91 +1978,128 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) pgaio_io_release(ioh); pgaio_wref_clear(&operation->io_wref); - did_start_io = false; /* * Report and track this as a 'hit' for this backend, even though it * must have started out as a miss in PinBufferForBlock(). The other * backend will track this as a 'read'. */ - TrackBufferHit(io_object, io_context, operation->rel, persistence, - operation->smgr, forknum, blocknum); + TrackBufferHit(io_object, io_context, + operation->rel, operation->persistence, + operation->smgr, operation->forknum, + blocknum); + return false; } - else + + /* + * When this IO is executed synchronously, either because the caller will + * immediately block waiting for the IO or because IOMETHOD_SYNC is used, + * the AIO subsystem needs to know. + */ + if (flags & READ_BUFFERS_SYNCHRONOUSLY) + ioh_flags |= PGAIO_HF_SYNCHRONOUS; + + if (persistence == RELPERSISTENCE_TEMP) + ioh_flags |= PGAIO_HF_REFERENCES_LOCAL; + + /* + * If zero_damaged_pages is enabled, add the READ_BUFFERS_ZERO_ON_ERROR + * flag. The reason for that is that, hopefully, zero_damaged_pages isn't + * set globally, but on a per-session basis. The completion callback, + * which may be run in other processes, e.g. in IO workers, may have a + * different value of the zero_damaged_pages GUC. + * + * XXX: We probably should eventually use a different flag for + * zero_damaged_pages, so we can report different log levels / error codes + * for zero_damaged_pages and ZERO_ON_ERROR. + */ + if (zero_damaged_pages) + flags |= READ_BUFFERS_ZERO_ON_ERROR; + + /* + * For the same reason as with zero_damaged_pages we need to use this + * backend's ignore_checksum_failure value. + */ + if (ignore_checksum_failure) + flags |= READ_BUFFERS_IGNORE_CHECKSUM_FAILURES; + + /* + * To be allowed to report stats in the local completion callback we need + * to prepare to report stats now. This ensures we can safely report the + * checksum failure even in a critical section. + */ + pgstat_prepare_report_checksum_failure(operation->smgr->smgr_rlocator.locator.dbOid); + + Assert(io_buffers[0] == buffers[nblocks_done]); + io_pages[0] = BufferGetBlock(buffers[nblocks_done]); + io_buffers_len = 1; + + /* + * How many neighboring-on-disk blocks can we scatter-read into other + * buffers at the same time? In this case we don't wait if we see an I/O + * already in progress. We already set BM_IO_IN_PROGRESS for the head + * block, so we should get on with that I/O as soon as possible. + */ + for (int i = nblocks_done + 1; i < operation->nblocks; i++) { - instr_time io_start; + /* Must be consecutive block numbers. */ + Assert(BufferGetBlockNumber(buffers[i - 1]) == + BufferGetBlockNumber(buffers[i]) - 1); - /* We found a buffer that we need to read in. */ - Assert(io_buffers[0] == buffers[nblocks_done]); - io_pages[0] = BufferGetBlock(buffers[nblocks_done]); - io_buffers_len = 1; + if (!ReadBuffersCanStartIO(buffers[i], true)) + break; - /* - * How many neighboring-on-disk blocks can we scatter-read into other - * buffers at the same time? In this case we don't wait if we see an - * I/O already in progress. We already set BM_IO_IN_PROGRESS for the - * head block, so we should get on with that I/O as soon as possible. - */ - for (int i = nblocks_done + 1; i < operation->nblocks; i++) - { - if (!ReadBuffersCanStartIO(buffers[i], true)) - break; - /* Must be consecutive block numbers. */ - Assert(BufferGetBlockNumber(buffers[i - 1]) == - BufferGetBlockNumber(buffers[i]) - 1); - Assert(io_buffers[io_buffers_len] == buffers[i]); + Assert(io_buffers[io_buffers_len] == buffers[i]); - io_pages[io_buffers_len++] = BufferGetBlock(buffers[i]); - } + io_pages[io_buffers_len++] = BufferGetBlock(buffers[i]); + } - /* get a reference to wait for in WaitReadBuffers() */ - pgaio_io_get_wref(ioh, &operation->io_wref); + /* get a reference to wait for in WaitReadBuffers() */ + pgaio_io_get_wref(ioh, &operation->io_wref); - /* provide the list of buffers to the completion callbacks */ - pgaio_io_set_handle_data_32(ioh, (uint32 *) io_buffers, io_buffers_len); + /* provide the list of buffers to the completion callbacks */ + pgaio_io_set_handle_data_32(ioh, (uint32 *) io_buffers, io_buffers_len); - pgaio_io_register_callbacks(ioh, - persistence == RELPERSISTENCE_TEMP ? - PGAIO_HCB_LOCAL_BUFFER_READV : - PGAIO_HCB_SHARED_BUFFER_READV, - flags); + pgaio_io_register_callbacks(ioh, + persistence == RELPERSISTENCE_TEMP ? + PGAIO_HCB_LOCAL_BUFFER_READV : + PGAIO_HCB_SHARED_BUFFER_READV, + flags); - pgaio_io_set_flag(ioh, ioh_flags); + pgaio_io_set_flag(ioh, ioh_flags); - /* --- - * Even though we're trying to issue IO asynchronously, track the time - * in smgrstartreadv(): - * - if io_method == IOMETHOD_SYNC, we will always perform the IO - * immediately - * - the io method might not support the IO (e.g. worker IO for a temp - * table) - * --- - */ - io_start = pgstat_prepare_io_time(track_io_timing); - smgrstartreadv(ioh, operation->smgr, forknum, - blocknum, - io_pages, io_buffers_len); - pgstat_count_io_op_time(io_object, io_context, IOOP_READ, - io_start, 1, io_buffers_len * BLCKSZ); + /* --- + * Even though we're trying to issue IO asynchronously, track the time + * in smgrstartreadv(): + * - if io_method == IOMETHOD_SYNC, we will always perform the IO + * immediately + * - the io method might not support the IO (e.g. worker IO for a temp + * table) + * --- + */ + io_start = pgstat_prepare_io_time(track_io_timing); + smgrstartreadv(ioh, operation->smgr, forknum, + blocknum, + io_pages, io_buffers_len); + pgstat_count_io_op_time(io_object, io_context, IOOP_READ, + io_start, 1, io_buffers_len * BLCKSZ); - if (persistence == RELPERSISTENCE_TEMP) - pgBufferUsage.local_blks_read += io_buffers_len; - else - pgBufferUsage.shared_blks_read += io_buffers_len; + if (persistence == RELPERSISTENCE_TEMP) + pgBufferUsage.local_blks_read += io_buffers_len; + else + pgBufferUsage.shared_blks_read += io_buffers_len; - /* - * Track vacuum cost when issuing IO, not after waiting for it. - * Otherwise we could end up issuing a lot of IO in a short timespan, - * despite a low cost limit. - */ - if (VacuumCostActive) - VacuumCostBalance += VacuumCostPageMiss * io_buffers_len; + /* + * Track vacuum cost when issuing IO, not after waiting for it. Otherwise + * we could end up issuing a lot of IO in a short timespan, despite a low + * cost limit. + */ + if (VacuumCostActive) + VacuumCostBalance += VacuumCostPageMiss * io_buffers_len; - *nblocks_progress = io_buffers_len; - did_start_io = true; - } + *nblocks_progress = io_buffers_len; - return did_start_io; + return true; } /* -- 2.43.0
From 200af0d589054f8d015a1ed4ae347c684149bde8 Mon Sep 17 00:00:00 2001 From: Melanie Plageman <[email protected]> Date: Wed, 18 Mar 2026 11:13:25 -0400 Subject: [PATCH v6 7/8] Introduce PrepareHeadBufferReadIO() and PrepareAdditionalBufferReadIO() Replace ReadBuffersCanStartIO() and ReadBuffersCanStartIOOnce() with new explicit helper functions that inline the logic from StartBufferIO() and StartLocalBufferIO(). Besides the inlined logic being easier to reason, StartBufferIO() doesn't distinguish between 'already valid' and 'IO in progress' (and explicitly states it does not want to), which is required to defer waiting for in-progress IO. A future commit will implement deferred waiting for in-progress IO. --- src/backend/storage/buffer/bufmgr.c | 171 +++++++++++++++++++++++----- 1 file changed, 141 insertions(+), 30 deletions(-) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index a9995b75917..2179ade07cc 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -1659,43 +1659,150 @@ CheckReadBuffersOperation(ReadBuffersOperation *operation, bool is_complete) #endif } -/* helper for ReadBuffersCanStartIO(), to avoid repetition */ -static inline bool -ReadBuffersCanStartIOOnce(Buffer buffer, bool nowait) +/* + * Local version of PrepareHeadBufferReadIO(). Here instead of localbuf.c to + * avoid an external function call. + */ +static bool +PrepareHeadLocalBufferReadIO(Buffer buffer) { - if (BufferIsLocal(buffer)) - return StartLocalBufferIO(GetLocalBufferDescriptor(-buffer - 1), - true, nowait); - else - return StartBufferIO(GetBufferDescriptor(buffer - 1), true, nowait); + BufferDesc *desc = GetLocalBufferDescriptor(-buffer - 1); + uint64 buf_state = pg_atomic_read_u64(&desc->state); + + /* + * The buffer could already be valid if a prior IO by this backend was + * completed and reclaimed incidentally (e.g. while acquiring a new AIO + * handle). Only the owning backend can set BM_VALID on a local buffer. + */ + if (buf_state & BM_VALID) + return false; + + /* + * Submit any staged IO before checking for in-progress IO. Without this, + * the wref check below could find IO that this backend staged but hasn't + * submitted yet. Waiting on that would PANIC because the owner can't wait + * on its own staged IO. + */ + pgaio_submit_staged(); + + /* Wait for in-progress IO */ + if (pgaio_wref_valid(&desc->io_wref)) + { + PgAioWaitRef iow = desc->io_wref; + + pgaio_wref_wait(&iow); + + buf_state = pg_atomic_read_u64(&desc->state); + } + + /* + * If BM_VALID is set, we waited on IO and it completed successfully. + * Otherwise, we'll initiate IO on the buffer. + */ + return !(buf_state & BM_VALID); } /* - * Helper for AsyncReadBuffers that tries to get the buffer ready for IO. + * Try to start IO on the first buffer in a new run of blocks. If AIO is in + * progress, be it in this backend or another backend, we wait for it to + * finish and then check the result. + * + * Returns true if the buffer is ready for IO, false if the buffer is already + * valid. */ -static inline bool -ReadBuffersCanStartIO(Buffer buffer, bool nowait) +static bool +PrepareHeadBufferReadIO(Buffer buffer) { - /* - * If this backend currently has staged IO, we need to submit the pending - * IO before waiting for the right to issue IO, to avoid the potential for - * deadlocks (and, more commonly, unnecessary delays for other backends). - */ - if (!nowait && pgaio_have_staged()) + uint64 buf_state; + BufferDesc *desc; + + if (BufferIsLocal(buffer)) + return PrepareHeadLocalBufferReadIO(buffer); + + ResourceOwnerEnlarge(CurrentResourceOwner); + desc = GetBufferDescriptor(buffer - 1); + + for (;;) { - if (ReadBuffersCanStartIOOnce(buffer, true)) - return true; + buf_state = LockBufHdr(desc); + + Assert(buf_state & BM_TAG_VALID); + + /* Already valid, no work to do */ + if (buf_state & BM_VALID) + { + UnlockBufHdr(desc); + return false; + } + + if (buf_state & BM_IO_IN_PROGRESS) + { + UnlockBufHdr(desc); + + /* + * If this backend currently has staged IO, submit it before + * waiting for in-progress IO, to avoid potential deadlocks and + * unnecessary delays. + */ + pgaio_submit_staged(); + WaitIO(desc); + continue; + } /* - * Unfortunately StartBufferIO() returning false doesn't allow to - * distinguish between the buffer already being valid and IO already - * being in progress. Since IO already being in progress is quite - * rare, this approach seems fine. + * No IO in progress and not already valid; We will start IO. It's + * possible that the IO was in progress and never became valid because + * the IO errored out. We'll do the IO ourselves. */ - pgaio_submit_staged(); + UnlockBufHdrExt(desc, buf_state, BM_IO_IN_PROGRESS, 0, 0); + ResourceOwnerRememberBufferIO(CurrentResourceOwner, + BufferDescriptorGetBuffer(desc)); + + return true; + } +} + +/* + * When building a new IO from multiple buffers, we won't include buffers + * that are already valid or already in progress. This function should only be + * used for additional adjacent buffers following the head buffer in a new IO. + * + * This function must never wait for IO to avoid deadlocks. The head buffer + * already has BM_IO_IN_PROGRESS set, so we'll just issue that IO and come + * back in lieu of waiting here. + * + * Returns true if the buffer was successfully prepared for IO and false if it + * is rejected and the read IO should not include this buffer. + */ +static bool +PrepareAdditionalBufferReadIO(Buffer buffer) +{ + uint64 buf_state; + BufferDesc *desc; + + if (BufferIsLocal(buffer)) + { + desc = GetLocalBufferDescriptor(-buffer - 1); + buf_state = pg_atomic_read_u64(&desc->state); + /* Local buffers don't use BM_IO_IN_PROGRESS */ + if (buf_state & BM_VALID || pgaio_wref_valid(&desc->io_wref)) + return false; + } + else + { + ResourceOwnerEnlarge(CurrentResourceOwner); + desc = GetBufferDescriptor(buffer - 1); + buf_state = LockBufHdr(desc); + if (buf_state & (BM_VALID | BM_IO_IN_PROGRESS)) + { + UnlockBufHdr(desc); + return false; + } + UnlockBufHdrExt(desc, buf_state, BM_IO_IN_PROGRESS, 0, 0); + ResourceOwnerRememberBufferIO(CurrentResourceOwner, buffer); } - return ReadBuffersCanStartIOOnce(buffer, nowait); + return true; } /* @@ -1934,8 +2041,10 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) } /* - * Get IO handle before ReadBuffersCanStartIO(), as pgaio_io_acquire() - * might block, which we don't want after setting IO_IN_PROGRESS. + * We must get an IO handle before PrepareHeadBufferReadIO(), as + * pgaio_io_acquire() might block, which we don't want after setting + * IO_IN_PROGRESS. If we don't need to do the IO, we'll release the + * handle. * * If we need to wait for IO before we can get a handle, submit * already-staged IO first, so that other backends don't need to wait. @@ -1957,6 +2066,8 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) ioh = pgaio_io_acquire(CurrentResourceOwner, &operation->io_return); } + pgaio_wref_clear(&operation->io_wref); + /* * Check if we can start IO on the first to-be-read buffer. * @@ -1964,10 +2075,10 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) * for the outcome: either done, or something went wrong and we will * retry. */ - if (!ReadBuffersCanStartIO(buffers[nblocks_done], false)) + if (!PrepareHeadBufferReadIO(buffers[nblocks_done])) { /* - * Someone else has already completed this block, we're done. + * Someone has already completed this block, we're done. * * When IO is necessary, ->nblocks_done is updated in * ProcessReadBuffersResult(), but that is not called if no IO is @@ -2046,7 +2157,7 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) Assert(BufferGetBlockNumber(buffers[i - 1]) == BufferGetBlockNumber(buffers[i]) - 1); - if (!ReadBuffersCanStartIO(buffers[i], true)) + if (!PrepareAdditionalBufferReadIO(buffers[i])) break; Assert(io_buffers[io_buffers_len] == buffers[i]); -- 2.43.0
From 63cb731176a62320d296f968b12a5d4d36e703d0 Mon Sep 17 00:00:00 2001 From: Melanie Plageman <[email protected]> Date: Wed, 18 Mar 2026 11:17:57 -0400 Subject: [PATCH v6 8/8] AIO: Don't wait for already in-progress IO When a backend attempts to start a read IO and finds the first buffer already has I/O in progress, previously it waited for that I/O to complete before initiating reads for any of the subsequent buffers. Although the backend must wait for the I/O to finish when acquiring the buffer, there's no reason for it to wait when setting up the read operation. Waiting at this point prevents the backend from starting I/O on subsequent buffers and can significantly reduce concurrency. This matters in two workloads: when multiple backends scan the same relation concurrently, and when a single backend requests the same block multiple times within the readahead distance. If backends wait each time they encounter an in-progress read, the access pattern effectively degenerates into synchronous I/O. To fix this, when encountering an already in-progress IO for the head buffer, a backend now records the buffer's wait reference and defers waiting until WaitReadBuffers(), when it actually needs to acquire the buffer. In rare cases, a backend may still need to wait synchronously at IO start time: if another backend has set BM_IO_IN_PROGRESS on the buffer but has not yet set the wait reference. Such windows should be brief and uncommon. Author: Melanie Plageman <[email protected]> Reviewed-by: Andres Freund <[email protected]> Reviewed-by: Nazir Bilal Yavuz <[email protected]> Discussion: https://postgr.es/m/flat/zljergweqti7x67lg5ije2rzjusie37nslsnkjkkby4laqqbfw%403p3zu522yykv --- src/backend/storage/buffer/bufmgr.c | 201 +++++++++++++++++++--------- src/include/storage/bufmgr.h | 4 +- src/tools/pgindent/typedefs.list | 1 + 3 files changed, 145 insertions(+), 61 deletions(-) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 2179ade07cc..31d1563a69f 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -185,6 +185,20 @@ typedef struct SMgrSortArray SMgrRelation srel; } SMgrSortArray; +/* + * In AsyncReadBuffers(), when preparing a buffer for reading and setting + * BM_IO_IN_PROGRESS, the buffer may already have I/O in progress or may + * already contain the desired block. AsyncReadBuffers() must distinguish + * between these cases (and the case where it should initiate I/O) so it can + * mark an in-progress buffer as foreign I/O rather than waiting on it. + */ +typedef enum PrepareReadBufferStatus +{ + READ_BUFFER_ALREADY_DONE, + READ_BUFFER_IN_PROGRESS, + READ_BUFFER_READY_FOR_IO, +} PrepareReadBufferStatus; + /* GUC variables */ bool zero_damaged_pages = false; int bgwriter_lru_maxpages = 100; @@ -1663,8 +1677,9 @@ CheckReadBuffersOperation(ReadBuffersOperation *operation, bool is_complete) * Local version of PrepareHeadBufferReadIO(). Here instead of localbuf.c to * avoid an external function call. */ -static bool -PrepareHeadLocalBufferReadIO(Buffer buffer) +static PrepareReadBufferStatus +PrepareHeadLocalBufferReadIO(ReadBuffersOperation *operation, + Buffer buffer) { BufferDesc *desc = GetLocalBufferDescriptor(-buffer - 1); uint64 buf_state = pg_atomic_read_u64(&desc->state); @@ -1675,49 +1690,60 @@ PrepareHeadLocalBufferReadIO(Buffer buffer) * handle). Only the owning backend can set BM_VALID on a local buffer. */ if (buf_state & BM_VALID) - return false; + return READ_BUFFER_ALREADY_DONE; /* * Submit any staged IO before checking for in-progress IO. Without this, * the wref check below could find IO that this backend staged but hasn't - * submitted yet. Waiting on that would PANIC because the owner can't wait - * on its own staged IO. + * submitted yet. If we returned READ_BUFFER_IN_PROGRESS and + * WaitReadBuffers() then tried to wait on it, we'd PANIC because the + * owner can't wait on its own staged IO. */ pgaio_submit_staged(); - /* Wait for in-progress IO */ + /* We've already asynchronously started this IO, so join it */ if (pgaio_wref_valid(&desc->io_wref)) { - PgAioWaitRef iow = desc->io_wref; - - pgaio_wref_wait(&iow); - - buf_state = pg_atomic_read_u64(&desc->state); + operation->io_wref = desc->io_wref; + operation->foreign_io = true; + return READ_BUFFER_IN_PROGRESS; } - /* - * If BM_VALID is set, we waited on IO and it completed successfully. - * Otherwise, we'll initiate IO on the buffer. - */ - return !(buf_state & BM_VALID); + /* Prepare to start IO on this buffer */ + return READ_BUFFER_READY_FOR_IO; } /* * Try to start IO on the first buffer in a new run of blocks. If AIO is in - * progress, be it in this backend or another backend, we wait for it to - * finish and then check the result. + * progress, be it in this backend or another backend, we just associate the + * wait reference with the operation and wait in WaitReadBuffers(). This turns + * out to be important for performance in two workloads: + * + * 1) A read stream that has to read the same block multiple times within the + * readahead distance. This can happen e.g. for the table accesses of an + * index scan. + * + * 2) Concurrent scans by multiple backends on the same relation. + * + * If we were to synchronously wait for the in-progress IO, we'd not be able + * to keep enough I/O in flight. + * + * If we do find there is ongoing I/O for the buffer, we set up a 1-block + * ReadBuffersOperation that WaitReadBuffers then can wait on. * - * Returns true if the buffer is ready for IO, false if the buffer is already - * valid. + * It's possible that another backend has started IO on the buffer but not yet + * set its wait reference. In this case, we have no choice but to wait for + * either the wait reference to be valid or the IO to be done. */ -static bool -PrepareHeadBufferReadIO(Buffer buffer) +static PrepareReadBufferStatus +PrepareHeadBufferReadIO(ReadBuffersOperation *operation, + Buffer buffer) { uint64 buf_state; BufferDesc *desc; if (BufferIsLocal(buffer)) - return PrepareHeadLocalBufferReadIO(buffer); + return PrepareHeadLocalBufferReadIO(operation, buffer); ResourceOwnerEnlarge(CurrentResourceOwner); desc = GetBufferDescriptor(buffer - 1); @@ -1732,11 +1758,25 @@ PrepareHeadBufferReadIO(Buffer buffer) if (buf_state & BM_VALID) { UnlockBufHdr(desc); - return false; + return READ_BUFFER_ALREADY_DONE; } if (buf_state & BM_IO_IN_PROGRESS) { + /* Join existing read */ + if (pgaio_wref_valid(&desc->io_wref)) + { + operation->io_wref = desc->io_wref; + operation->foreign_io = true; + UnlockBufHdr(desc); + return READ_BUFFER_IN_PROGRESS; + } + + /* + * If the wait ref is not valid but the IO is in progress, someone + * else started IO but hasn't set the wait ref yet. We have no + * choice but to wait until the IO completes. + */ UnlockBufHdr(desc); /* @@ -1758,7 +1798,7 @@ PrepareHeadBufferReadIO(Buffer buffer) ResourceOwnerRememberBufferIO(CurrentResourceOwner, BufferDescriptorGetBuffer(desc)); - return true; + return READ_BUFFER_READY_FOR_IO; } } @@ -1939,8 +1979,11 @@ WaitReadBuffers(ReadBuffersOperation *operation) * b) reports some time as waiting, even if we never waited * * we first check if we already know the IO is complete. + * + * Note that operation->io_return is uninitialized for foreign IO, + * so we cannot use the cheaper PGAIO_RS_UNKNOWN pre-check. */ - if (aio_ret->result.status == PGAIO_RS_UNKNOWN && + if ((operation->foreign_io || aio_ret->result.status == PGAIO_RS_UNKNOWN) && !pgaio_wref_check_done(&operation->io_wref)) { instr_time io_start = pgstat_prepare_io_time(track_io_timing); @@ -1959,11 +2002,45 @@ WaitReadBuffers(ReadBuffersOperation *operation) Assert(pgaio_wref_check_done(&operation->io_wref)); } - /* - * We now are sure the IO completed. Check the results. This - * includes reporting on errors if there were any. - */ - ProcessReadBuffersResult(operation); + if (unlikely(operation->foreign_io)) + { + Buffer buffer = operation->buffers[operation->nblocks_done]; + BufferDesc *desc = BufferIsLocal(buffer) ? + GetLocalBufferDescriptor(-buffer - 1) : + GetBufferDescriptor(buffer - 1); + uint64 buf_state = pg_atomic_read_u64(&desc->state); + + if (buf_state & BM_VALID) + { + BlockNumber blocknum = operation->blocknum + operation->nblocks_done; + + operation->nblocks_done += 1; + Assert(operation->nblocks_done <= operation->nblocks); + + /* + * Track this as a 'hit' for this backend. The backend + * performing the IO will track it as a 'read'. + */ + TrackBufferHit(io_object, io_context, + operation->rel, operation->persistence, + operation->smgr, operation->forknum, + blocknum); + } + + /* + * If the foreign IO failed and left the buffer invalid, + * nblocks_done is not incremented. The retry loop below will + * call AsyncReadBuffers() which will attempt the IO itself. + */ + } + else + { + /* + * We now are sure the IO completed. Check the results. This + * includes reporting on errors if there were any. + */ + ProcessReadBuffersResult(operation); + } } /* @@ -2009,7 +2086,8 @@ WaitReadBuffers(ReadBuffersOperation *operation) * affected by the call. If the first buffer is valid, *nblocks_progress is * set to 1 and operation->nblocks_done is incremented. * - * Returns true if IO was initiated, false if no IO was necessary. + * Returns true if IO was initiated or is already in progress (foreign IO), + * false if the buffer was already valid. */ static bool AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) @@ -2028,6 +2106,7 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) IOContext io_context; IOObject io_object; instr_time io_start; + PrepareReadBufferStatus status; if (persistence == RELPERSISTENCE_TEMP) { @@ -2066,40 +2145,42 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) ioh = pgaio_io_acquire(CurrentResourceOwner, &operation->io_return); } + operation->foreign_io = false; pgaio_wref_clear(&operation->io_wref); - /* - * Check if we can start IO on the first to-be-read buffer. - * - * If an I/O is already in progress in another backend, we want to wait - * for the outcome: either done, or something went wrong and we will - * retry. - */ - if (!PrepareHeadBufferReadIO(buffers[nblocks_done])) + /* Check if we can start IO on the first to-be-read buffer */ + status = PrepareHeadBufferReadIO(operation, buffers[nblocks_done]); + if (status != READ_BUFFER_READY_FOR_IO) { - /* - * Someone has already completed this block, we're done. - * - * When IO is necessary, ->nblocks_done is updated in - * ProcessReadBuffersResult(), but that is not called if no IO is - * necessary. Thus update here. - */ - operation->nblocks_done += 1; + pgaio_io_release(ioh); *nblocks_progress = 1; + if (status == READ_BUFFER_ALREADY_DONE) + { + /* + * Someone has already completed this block, we're done. + * + * When IO is necessary, ->nblocks_done is updated in + * ProcessReadBuffersResult(), but that is not called if no IO is + * necessary. Thus update here. + */ + operation->nblocks_done += 1; + Assert(operation->nblocks_done <= operation->nblocks); - pgaio_io_release(ioh); - pgaio_wref_clear(&operation->io_wref); + /* + * Report and track this as a 'hit' for this backend, even though + * it must have started out as a miss in PinBufferForBlock(). The + * other backend will track this as a 'read'. + */ + TrackBufferHit(io_object, io_context, + operation->rel, operation->persistence, + operation->smgr, operation->forknum, + blocknum); + return false; + } - /* - * Report and track this as a 'hit' for this backend, even though it - * must have started out as a miss in PinBufferForBlock(). The other - * backend will track this as a 'read'. - */ - TrackBufferHit(io_object, io_context, - operation->rel, operation->persistence, - operation->smgr, operation->forknum, - blocknum); - return false; + /* The IO is already in-progress */ + Assert(status == READ_BUFFER_IN_PROGRESS); + return true; } /* diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index 4017896f951..dd41b92f944 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -144,9 +144,11 @@ struct ReadBuffersOperation */ Buffer *buffers; BlockNumber blocknum; - int flags; + uint16 flags; int16 nblocks; int16 nblocks_done; + /* true if waiting on another backend's IO */ + bool foreign_io; PgAioWaitRef io_wref; PgAioReturn io_return; }; diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 340662cf72c..ffaea427952 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2365,6 +2365,7 @@ PredicateLockData PredicateLockTargetType PrefetchBufferResult PrepParallelRestorePtrType +PrepareReadBufferStatus PrepareStmt PreparedStatement PresortedKeyData -- 2.43.0
