On Sun, Nov 9, 2025 at 5:21 PM Thomas Munro <[email protected]> wrote: > > I suppose (or perhaps vaguely recall from an off-list discussion?) > that you must have considered merging the new > is-it-already-in-progress check into ReadBuffersCanStartIO(). I > suppose the nowait argument would become a tri-state argument with a > value that means "don't wait for an in-progress read, just give me the > IO handle so I can 'join' it as a foreign waiter", with a new output > argument to receive the handle, or something along those lines, and I > guess you'd need a tri-state result, and perhaps s/Can/Try/ in the > name. That'd remove the double-check (extra header lock-unlock cycle) > and associated race that can cause that rare synchronous wait (which > must still happen sometimes in the duelling concurrent scan use > case?), at the slight extra cost of having to allocate and free a > handle in the case of repeated blocks (eg the index->heap scan use > case), but at least that's just backend-local list pushups and doesn't > do extra work otherwise. Is there some logical problem with that > approach? Is the code just too clumsy?
Attached v3 basically does what you suggested above. Now, we should only have to wait if the backend encounters a buffer after another backend has set BM_IO_IN_PROGRESS but before that other backend has set the buffer descriptor's wait reference. 0001 and 0002 are Andres' test-related patches. 0003 is a change I think is required to make one of the tests stable (esp on the BSDs). 0004 is a bit of preliminary refactoring and 0005 is Andres' foreign IO concept but with your suggested structure and my suggested styling. I could potentially break out more into smaller refactoring commits, but I don't think it's too bad the way it is. A few things about the patch that I'm not sure about: - I don't know if pgaio_submit_staged() is in all the right places (and not in too many places). I basically do it before we would wait when starting read IO on the buffer. In the permanent buffers case, that's now only when BM_IO_IN_PROGRESS is set but the wait reference isn't valid yet. This can't happen in the temporary buffers case, so I'm not sure we need to call pgaio_submit_staged(). - StartBufferIO() is no longer invoked in the AsyncReadBuffers() path. We could refactor it so that it works for AsyncReadBuffers(), but that would involve returning something that distinguishes between IO_IN_PROGRESS and IO already done. And StartBufferIO()'s comment explicitly says it wants to avoid that. If we keep my structure, with AsyncReadBuffers() using its own helper (PrepareNewReadBufferIO()) instead of StartBufferIO(), then it seems like we need some way to make it clear what StartBufferIO() is for. I'm not sure what would collectively describe its current users, though. It also now has no non-test callers passing nowait as true. However, once we add write combining, it will, so it seems like we should leave it the way it is to avoid churn. However, other developers might be confused in the interim. - In the 004_read_stream tests, I wonder if there is a way to test that we don't wait for foreign IO until WaitReadBuffers(). We have tests for the stream accessing the same block, which in some cases will exercise the foreign IO path. But it doesn't distinguish between the old behavior -- waiting for the IO to complete when starting read IO on it -- and the new behavior -- not waiting until WaitReadBuffers(). That may not be possible to test, though. - Melanie
From 1340e52fe88ebddfabcd8285e4bcc48ca21722ed Mon Sep 17 00:00:00 2001 From: Andres Freund <[email protected]> Date: Tue, 9 Sep 2025 10:14:34 -0400 Subject: [PATCH v3 1/5] aio: Refactor tests in preparation for more tests In a future commit more AIO related tests are due to be introduced. However 001_aio.pl already is fairly large. This commit introduces a new TestAio package with helpers for writing AIO related tests. Then it uses the new helpers to simplify the existing 001_aio.pl by iterating over all supported io_methods. This will be particularly helpful because additional methods already have been submitted. Additionally this commit splits out testing of initdb using a non-default method into its own test. While that test is somewhat important, it's fairly slow and doesn't break that often. For development velocity it's helpful for 001_aio.pl to be faster. While particularly the latter could benefit from being its own commit, it seems to introduce more back-and-forth than it's worth. Author: Reviewed-by: Discussion: https://postgr.es/m/ Backpatch: --- src/test/modules/test_aio/meson.build | 1 + src/test/modules/test_aio/t/001_aio.pl | 140 +++++++--------------- src/test/modules/test_aio/t/003_initdb.pl | 71 +++++++++++ src/test/modules/test_aio/t/TestAio.pm | 90 ++++++++++++++ 4 files changed, 203 insertions(+), 99 deletions(-) create mode 100644 src/test/modules/test_aio/t/003_initdb.pl create mode 100644 src/test/modules/test_aio/t/TestAio.pm diff --git a/src/test/modules/test_aio/meson.build b/src/test/modules/test_aio/meson.build index fefa25bc5ab..18a797f3a3b 100644 --- a/src/test/modules/test_aio/meson.build +++ b/src/test/modules/test_aio/meson.build @@ -32,6 +32,7 @@ tests += { 'tests': [ 't/001_aio.pl', 't/002_io_workers.pl', + 't/003_initdb.pl', ], }, } diff --git a/src/test/modules/test_aio/t/001_aio.pl b/src/test/modules/test_aio/t/001_aio.pl index 5c634ec3ca9..27ee96898e0 100644 --- a/src/test/modules/test_aio/t/001_aio.pl +++ b/src/test/modules/test_aio/t/001_aio.pl @@ -7,126 +7,55 @@ use PostgreSQL::Test::Cluster; use PostgreSQL::Test::Utils; use Test::More; +use FindBin; +use lib $FindBin::RealBin; -### -# Test io_method=worker -### -my $node_worker = create_node('worker'); -$node_worker->start(); - -test_generic('worker', $node_worker); -SKIP: -{ - skip 'Injection points not supported by this build', 1 - unless $ENV{enable_injection_points} eq 'yes'; - test_inject_worker('worker', $node_worker); -} +use TestAio; -$node_worker->stop(); +my %nodes; ### -# Test io_method=io_uring +# Create and configure one instance for each io_method ### -if (have_io_uring()) +foreach my $method (TestAio::supported_io_methods()) { - my $node_uring = create_node('io_uring'); - $node_uring->start(); - test_generic('io_uring', $node_uring); - $node_uring->stop(); -} - - -### -# Test io_method=sync -### - -my $node_sync = create_node('sync'); + my $node = PostgreSQL::Test::Cluster->new($method); -# just to have one test not use the default auto-tuning + $nodes{$method} = $node; + $node->init(); + $node->append_conf('postgresql.conf', "io_method=$method"); + TestAio::configure($node); +} -$node_sync->append_conf( +# Just to have one test not use the default auto-tuning +$nodes{'sync'}->append_conf( 'postgresql.conf', qq( -io_max_concurrency=4 + io_max_concurrency=4 )); -$node_sync->start(); -test_generic('sync', $node_sync); -$node_sync->stop(); - -done_testing(); - ### -# Test Helpers +# Execute the tests for each io_method ### -sub create_node +foreach my $method (TestAio::supported_io_methods()) { - local $Test::Builder::Level = $Test::Builder::Level + 1; - - my $io_method = shift; + my $node = $nodes{$method}; - my $node = PostgreSQL::Test::Cluster->new($io_method); - - # Want to test initdb for each IO method, otherwise we could just reuse - # the cluster. - # - # Unfortunately Cluster::init() puts PG_TEST_INITDB_EXTRA_OPTS after the - # options specified by ->extra, if somebody puts -c io_method=xyz in - # PG_TEST_INITDB_EXTRA_OPTS it would break this test. Fix that up if we - # detect it. - local $ENV{PG_TEST_INITDB_EXTRA_OPTS} = $ENV{PG_TEST_INITDB_EXTRA_OPTS}; - if (defined $ENV{PG_TEST_INITDB_EXTRA_OPTS} - && $ENV{PG_TEST_INITDB_EXTRA_OPTS} =~ m/io_method=/) - { - $ENV{PG_TEST_INITDB_EXTRA_OPTS} .= " -c io_method=$io_method"; - } - - $node->init(extra => [ '-c', "io_method=$io_method" ]); - - $node->append_conf( - 'postgresql.conf', qq( -shared_preload_libraries=test_aio -log_min_messages = 'DEBUG3' -log_statement=all -log_error_verbosity=default -restart_after_crash=false -temp_buffers=100 -)); + $node->start(); + test_io_method($method, $node); + $node->stop(); +} - # Even though we used -c io_method=... above, if TEMP_CONFIG sets - # io_method, it'd override the setting persisted at initdb time. While - # using (and later verifying) the setting from initdb provides some - # verification of having used the io_method during initdb, it's probably - # not worth the complication of only appending if the variable is set in - # in TEMP_CONFIG. - $node->append_conf( - 'postgresql.conf', qq( -io_method=$io_method -)); +done_testing(); - ok(1, "$io_method: initdb"); - return $node; -} +### +# Test Helpers +### -sub have_io_uring -{ - # To detect if io_uring is supported, we look at the error message for - # assigning an invalid value to an enum GUC, which lists all the valid - # options. We need to use -C to deal with running as administrator on - # windows, the superuser check is omitted if -C is used. - my ($stdout, $stderr) = - run_command [qw(postgres -C invalid -c io_method=invalid)]; - die "can't determine supported io_method values" - unless $stderr =~ m/Available values: ([^\.]+)\./; - my $methods = $1; - note "supported io_method values are: $methods"; - - return ($methods =~ m/io_uring/) ? 1 : 0; -} sub psql_like { @@ -1490,8 +1419,8 @@ SELECT read_rel_block_ll('tbl_cs_fail', 3, nblocks=>1, zero_on_error=>true);), } -# Run all tests that are supported for all io_methods -sub test_generic +# Run all tests that for the specified node / io_method +sub test_io_method { my $io_method = shift; my $node = shift; @@ -1526,10 +1455,23 @@ CHECKPOINT; test_ignore_checksum($io_method, $node); test_checksum_createdb($io_method, $node); + # generic injection tests SKIP: { skip 'Injection points not supported by this build', 1 unless $ENV{enable_injection_points} eq 'yes'; test_inject($io_method, $node); } + + # worker specific injection tests + if ($io_method eq 'worker') + { + SKIP: + { + skip 'Injection points not supported by this build', 1 + unless $ENV{enable_injection_points} eq 'yes'; + + test_inject_worker($io_method, $node); + } + } } diff --git a/src/test/modules/test_aio/t/003_initdb.pl b/src/test/modules/test_aio/t/003_initdb.pl new file mode 100644 index 00000000000..c03ae58d00a --- /dev/null +++ b/src/test/modules/test_aio/t/003_initdb.pl @@ -0,0 +1,71 @@ +# Copyright (c) 2024-2025, PostgreSQL Global Development Group +# +# Test initdb for each IO method. This is done separately from 001_aio.pl, as +# it isn't fast. This way the more commonly failing / hacked-on 001_aio.pl can +# be iterated on more quickly. + +use strict; +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +use FindBin; +use lib $FindBin::RealBin; + +use TestAio; + + +foreach my $method (TestAio::supported_io_methods()) +{ + test_create_node($method); +} + +done_testing(); + + +sub test_create_node +{ + local $Test::Builder::Level = $Test::Builder::Level + 1; + + my $io_method = shift; + + my $node = PostgreSQL::Test::Cluster->new($io_method); + + # Want to test initdb for each IO method, otherwise we could just reuse + # the cluster. + # + # Unfortunately Cluster::init() puts PG_TEST_INITDB_EXTRA_OPTS after the + # options specified by ->extra, if somebody puts -c io_method=xyz in + # PG_TEST_INITDB_EXTRA_OPTS it would break this test. Fix that up if we + # detect it. + local $ENV{PG_TEST_INITDB_EXTRA_OPTS} = $ENV{PG_TEST_INITDB_EXTRA_OPTS}; + if (defined $ENV{PG_TEST_INITDB_EXTRA_OPTS} + && $ENV{PG_TEST_INITDB_EXTRA_OPTS} =~ m/io_method=/) + { + $ENV{PG_TEST_INITDB_EXTRA_OPTS} .= " -c io_method=$io_method"; + } + + $node->init(extra => [ '-c', "io_method=$io_method" ]); + + TestAio::configure($node); + + # Even though we used -c io_method=... above, if TEMP_CONFIG sets + # io_method, it'd override the setting persisted at initdb time. While + # using (and later verifying) the setting from initdb provides some + # verification of having used the io_method during initdb, it's probably + # not worth the complication of only appending if the variable is set in + # in TEMP_CONFIG. + $node->append_conf( + 'postgresql.conf', qq( +io_method=$io_method +)); + + ok(1, "$io_method: initdb"); + + $node->start(); + $node->stop(); + ok(1, "$io_method: start & stop"); + + return $node; +} diff --git a/src/test/modules/test_aio/t/TestAio.pm b/src/test/modules/test_aio/t/TestAio.pm new file mode 100644 index 00000000000..5bc80a9b130 --- /dev/null +++ b/src/test/modules/test_aio/t/TestAio.pm @@ -0,0 +1,90 @@ +# Copyright (c) 2024-2025, PostgreSQL Global Development Group + +=pod + +=head1 NAME + +TestAio - helpers for writing AIO related tests + +=cut + +package TestAio; + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + + +=pod + +=head1 METHODS + +=over + +=item TestAio::supported_io_methods() + +Return an array of all the supported values for the io_method GUC + +=cut + +sub supported_io_methods() +{ + my @io_methods = ('worker'); + + push(@io_methods, "io_uring") if have_io_uring(); + + # Return sync last, as it will least commonly fail + push(@io_methods, "sync"); + + return @io_methods; +} + + +=item TestAio::configure() + +Prepare a cluster for AIO test + +=cut + +sub configure +{ + my $node = shift; + + $node->append_conf( + 'postgresql.conf', qq( +shared_preload_libraries=test_aio +log_min_messages = 'DEBUG3' +log_statement=all +log_error_verbosity=default +restart_after_crash=false +temp_buffers=100 +)); + +} + + +=pod + +=item TestAio::have_io_uring() + +Return if io_uring is supported + +=cut + +sub have_io_uring +{ + # To detect if io_uring is supported, we look at the error message for + # assigning an invalid value to an enum GUC, which lists all the valid + # options. We need to use -C to deal with running as administrator on + # windows, the superuser check is omitted if -C is used. + my ($stdout, $stderr) = + run_command [qw(postgres -C invalid -c io_method=invalid)]; + die "can't determine supported io_method values" + unless $stderr =~ m/Available values: ([^\.]+)\./; + my $methods = $1; + note "supported io_method values are: $methods"; + + return ($methods =~ m/io_uring/) ? 1 : 0; +} + +1; -- 2.43.0
From a88039dc34144abc6ad742938435538d8dc70f8c Mon Sep 17 00:00:00 2001 From: Andres Freund <[email protected]> Date: Wed, 10 Sep 2025 14:00:02 -0400 Subject: [PATCH v3 2/5] test_aio: Add read_stream test infrastructure & tests Author: Reviewed-by: Discussion: https://postgr.es/m/ Backpatch: --- src/test/modules/test_aio/meson.build | 1 + .../modules/test_aio/t/004_read_stream.pl | 282 ++++++++++++++ src/test/modules/test_aio/test_aio--1.0.sql | 26 +- src/test/modules/test_aio/test_aio.c | 344 +++++++++++++++--- src/tools/pgindent/typedefs.list | 1 + 5 files changed, 602 insertions(+), 52 deletions(-) create mode 100644 src/test/modules/test_aio/t/004_read_stream.pl diff --git a/src/test/modules/test_aio/meson.build b/src/test/modules/test_aio/meson.build index 18a797f3a3b..909f81d96c1 100644 --- a/src/test/modules/test_aio/meson.build +++ b/src/test/modules/test_aio/meson.build @@ -33,6 +33,7 @@ tests += { 't/001_aio.pl', 't/002_io_workers.pl', 't/003_initdb.pl', + 't/004_read_stream.pl', ], }, } diff --git a/src/test/modules/test_aio/t/004_read_stream.pl b/src/test/modules/test_aio/t/004_read_stream.pl new file mode 100644 index 00000000000..89cfabbb1d3 --- /dev/null +++ b/src/test/modules/test_aio/t/004_read_stream.pl @@ -0,0 +1,282 @@ +# Copyright (c) 2025, PostgreSQL Global Development Group + +use strict; +use warnings FATAL => 'all'; + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +use FindBin; +use lib $FindBin::RealBin; + +use TestAio; + + +my $node = PostgreSQL::Test::Cluster->new('test'); +$node->init(); + +$node->append_conf( + 'postgresql.conf', qq( +shared_preload_libraries=test_aio +log_min_messages = 'DEBUG3' +log_statement=all +log_error_verbosity=default +restart_after_crash=false +temp_buffers=100 +max_connections=8 +io_method=worker +)); + +$node->start(); +test_setup($node); +$node->stop(); + + +foreach my $method (TestAio::supported_io_methods()) +{ + $node->adjust_conf('postgresql.conf', 'io_method', 'worker'); + $node->start(); + test_io_method($method, $node); + $node->stop(); +} + +done_testing(); + + +sub test_setup +{ + my $node = shift; + + $node->safe_psql( + 'postgres', qq( +CREATE EXTENSION test_aio; + +CREATE TABLE largeish(k int not null) WITH (FILLFACTOR=10); +INSERT INTO largeish(k) SELECT generate_series(1, 10000); +)); + ok(1, "setup"); +} + + +sub test_repeated_blocks +{ + my $io_method = shift; + my $node = shift; + + my $psql = $node->background_psql('postgres', on_error_stop => 0); + + # Preventing larger reads makes testing easier + $psql->query_safe( + qq/ +SET io_combine_limit = 1; +/); + + # test miss of the same block twice in a row + $psql->query_safe( + qq/ +SELECT evict_rel('largeish'); +/); + $psql->query_safe( + qq/ +SELECT * FROM read_stream_for_blocks('largeish', ARRAY[0, 2, 2, 4, 4]); +/); + ok(1, "$io_method: stream missing the same block repeatedly"); + + $psql->query_safe( + qq/ +SELECT * FROM read_stream_for_blocks('largeish', ARRAY[0, 2, 2, 4, 4]); +/); + ok(1, "$io_method: stream hitting the same block repeatedly"); + + # test hit of the same block twice in a row + $psql->query_safe( + qq/ +SELECT evict_rel('largeish'); +/); + $psql->query_safe( + qq/ +SELECT * FROM read_stream_for_blocks('largeish', ARRAY[0, 1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1, 0]); +/); + ok(1, "$io_method: stream accessing same block"); + + $psql->quit(); +} + + +sub test_inject_foreign +{ + my $io_method = shift; + my $node = shift; + + my $psql_a = $node->background_psql('postgres', on_error_stop => 0); + my $psql_b = $node->background_psql('postgres', on_error_stop => 0); + + my $pid_a = $psql_a->query_safe(qq/SELECT pg_backend_pid();/); + + + ### + # Test read stream encountering buffers undergoing IO in another backend, + # with the other backend's reads succeeding. + ### + $psql_a->query_safe( + qq/ +SELECT evict_rel('largeish'); +/); + + $psql_b->query_safe( + qq/ +SELECT inj_io_completion_wait(pid=>pg_backend_pid(), relfilenode=>pg_relation_filenode('largeish')); +/); + + $psql_b->{stdin} .= qq/ +SELECT read_rel_block_ll('largeish', blockno=>5, nblocks=>1); +/; + $psql_b->{run}->pump_nb(); + + $node->poll_query_until( + 'postgres', qq/ +SELECT wait_event FROM pg_stat_activity WHERE wait_event = 'completion_wait'; +/, + 'completion_wait'); + + $psql_a->{stdin} .= qq/ +SELECT array_agg(blocknum) FROM read_stream_for_blocks('largeish', ARRAY[0, 2, 5, 7]); +/; + $psql_a->{run}->pump_nb(); + + $node->poll_query_until('postgres', + qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid_a), + 'AioIoCompletion'); + + $node->safe_psql('postgres', qq/SELECT inj_io_completion_continue()/); + + pump_until( + $psql_a->{run}, $psql_a->{timeout}, + \$psql_a->{stdout}, qr/\{0,2,5,7\}/); + + ok(1, + qq/$io_method: read stream encounters succeeding IO by another backend/ + ); + + + ### + # Test read stream encountering buffers undergoing IO in another backend, + # with the other backend's reads failing. + ### + $psql_a->query_safe( + qq/ +SELECT evict_rel('largeish'); +/); + + $psql_b->query_safe( + qq/ +SELECT inj_io_completion_wait(pid=>pg_backend_pid(), relfilenode=>pg_relation_filenode('largeish')); +/); + + $psql_b->query_safe( + qq/ +SELECT inj_io_short_read_attach(-errno_from_string('EIO'), pid=>pg_backend_pid(), relfilenode=>pg_relation_filenode('largeish')); +/); + + $psql_b->{stdin} .= qq/ +SELECT read_rel_block_ll('largeish', blockno=>5, nblocks=>1); +/; + $psql_b->{run}->pump_nb(); + + $node->poll_query_until( + 'postgres', qq/ +SELECT wait_event FROM pg_stat_activity WHERE wait_event = 'completion_wait'; +/, + 'completion_wait'); + + $psql_a->{stdin} .= qq/ +SELECT array_agg(blocknum) FROM read_stream_for_blocks('largeish', ARRAY[0, 2, 5, 7]); +/; + $psql_a->{run}->pump_nb(); + + $node->poll_query_until('postgres', + qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid_a), + 'AioIoCompletion'); + + $node->safe_psql('postgres', qq/SELECT inj_io_completion_continue()/); + + pump_until( + $psql_a->{run}, $psql_a->{timeout}, + \$psql_a->{stdout}, qr/\{0,2,5,7\}/); + + $psql_b->{run}->pump_nb(); + like( + $psql_b->{stderr}, + qr/.*ERROR.*could not read blocks 5..5.*$/, + "$io_method: injected error occurred"); + $psql_b->{stderr} = ''; + $psql_b->query_safe(qq/SELECT inj_io_short_read_detach();/); + + + ok(1, + qq/$io_method: read stream encounters failing IO by another backend/); + + + ### + # Test read stream encountering two buffers that are undergoing the same + # IO, started by another backend + ### + $psql_a->query_safe( + qq/ +SELECT evict_rel('largeish'); +/); + + $psql_b->query_safe( + qq/ +SELECT inj_io_completion_wait(pid=>pg_backend_pid(), relfilenode=>pg_relation_filenode('largeish')); +/); + + $psql_b->{stdin} .= qq/ +SELECT read_rel_block_ll('largeish', blockno=>2, nblocks=>3); +/; + $psql_b->{run}->pump_nb(); + + $node->poll_query_until( + 'postgres', qq/ +SELECT wait_event FROM pg_stat_activity WHERE wait_event = 'completion_wait'; +/, + 'completion_wait'); + + $psql_a->{stdin} .= qq/ +SELECT array_agg(blocknum) FROM read_stream_for_blocks('largeish', ARRAY[0, 2, 4]); +/; + $psql_a->{run}->pump_nb(); + + $node->poll_query_until('postgres', + qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid_a), + 'AioIoCompletion'); + + $node->safe_psql('postgres', qq/SELECT inj_io_completion_continue()/); + + pump_until( + $psql_a->{run}, $psql_a->{timeout}, + \$psql_a->{stdout}, qr/\{0,2,4\}/); + + ok(1, qq/$io_method: read stream encounters two buffer read in one IO/); + + + $psql_a->quit(); + $psql_b->quit(); +} + + +sub test_io_method +{ + my $io_method = shift; + my $node = shift; + + test_repeated_blocks($io_method, $node); + + SKIP: + { + skip 'Injection points not supported by this build', 1 + unless $ENV{enable_injection_points} eq 'yes'; + test_inject_foreign($io_method, $node); + } +} diff --git a/src/test/modules/test_aio/test_aio--1.0.sql b/src/test/modules/test_aio/test_aio--1.0.sql index e495481c41e..da7cc03829a 100644 --- a/src/test/modules/test_aio/test_aio--1.0.sql +++ b/src/test/modules/test_aio/test_aio--1.0.sql @@ -33,6 +33,10 @@ CREATE FUNCTION read_rel_block_ll( RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME' LANGUAGE C; +CREATE FUNCTION evict_rel(rel regclass) +RETURNS pg_catalog.void STRICT +AS 'MODULE_PATHNAME' LANGUAGE C; + CREATE FUNCTION invalidate_rel_block(rel regclass, blockno int) RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME' LANGUAGE C; @@ -41,7 +45,7 @@ CREATE FUNCTION buffer_create_toy(rel regclass, blockno int4) RETURNS pg_catalog.int4 STRICT AS 'MODULE_PATHNAME' LANGUAGE C; -CREATE FUNCTION buffer_call_start_io(buffer int, for_input bool, nowait bool) +CREATE FUNCTION buffer_call_start_io(buffer int, for_input bool, nowait bool, assign_io bool DEFAULT false) RETURNS pg_catalog.bool STRICT AS 'MODULE_PATHNAME' LANGUAGE C; @@ -50,6 +54,14 @@ RETURNS pg_catalog.void STRICT AS 'MODULE_PATHNAME' LANGUAGE C; +/* + * Read stream related functions + */ +CREATE FUNCTION read_stream_for_blocks(rel regclass, blocks int4[], OUT blockoff int4, OUT blocknum int4, OUT buf int4) +RETURNS SETOF record STRICT +AS 'MODULE_PATHNAME' LANGUAGE C; + + /* * Handle related functions @@ -91,8 +103,16 @@ AS 'MODULE_PATHNAME' LANGUAGE C; /* * Injection point related functions */ -CREATE FUNCTION inj_io_short_read_attach(result int) -RETURNS pg_catalog.void STRICT +CREATE FUNCTION inj_io_completion_wait(pid int DEFAULT NULL, relfilenode oid DEFAULT 0) +RETURNS pg_catalog.void +AS 'MODULE_PATHNAME' LANGUAGE C; + +CREATE FUNCTION inj_io_completion_continue() +RETURNS pg_catalog.void +AS 'MODULE_PATHNAME' LANGUAGE C; + +CREATE FUNCTION inj_io_short_read_attach(result int, pid int DEFAULT NULL, relfilenode oid DEFAULT 0) +RETURNS pg_catalog.void AS 'MODULE_PATHNAME' LANGUAGE C; CREATE FUNCTION inj_io_short_read_detach() diff --git a/src/test/modules/test_aio/test_aio.c b/src/test/modules/test_aio/test_aio.c index b1aa8af9ec0..911a7102a34 100644 --- a/src/test/modules/test_aio/test_aio.c +++ b/src/test/modules/test_aio/test_aio.c @@ -20,16 +20,23 @@ #include "access/relation.h" #include "fmgr.h" +#include "funcapi.h" #include "storage/aio.h" #include "storage/aio_internal.h" #include "storage/buf_internals.h" #include "storage/bufmgr.h" #include "storage/checksum.h" +#include "storage/condition_variable.h" #include "storage/ipc.h" #include "storage/lwlock.h" +#include "storage/proc.h" +#include "storage/procnumber.h" +#include "storage/read_stream.h" +#include "utils/array.h" #include "utils/builtins.h" #include "utils/injection_point.h" #include "utils/rel.h" +#include "utils/wait_event.h" PG_MODULE_MAGIC; @@ -37,13 +44,30 @@ PG_MODULE_MAGIC; typedef struct InjIoErrorState { + ConditionVariable cv; + bool enabled_short_read; bool enabled_reopen; + bool enabled_completion_wait; + Oid completion_wait_relfilenode; + pid_t completion_wait_pid; + uint32 completion_wait_event; + bool short_read_result_set; + Oid short_read_relfilenode; + pid_t short_read_pid; int short_read_result; } InjIoErrorState; +typedef struct BlocksReadStreamData +{ + int nblocks; + int curblock; + uint32 *blocks; +} BlocksReadStreamData; + + static InjIoErrorState *inj_io_error_state; /* Shared memory init callbacks */ @@ -85,10 +109,13 @@ test_aio_shmem_startup(void) inj_io_error_state->enabled_short_read = false; inj_io_error_state->enabled_reopen = false; + ConditionVariableInit(&inj_io_error_state->cv); + inj_io_error_state->completion_wait_event = WaitEventInjectionPointNew("completion_wait"); + #ifdef USE_INJECTION_POINTS InjectionPointAttach("aio-process-completion-before-shared", "test_aio", - "inj_io_short_read", + "inj_io_completion_hook", NULL, 0); InjectionPointLoad("aio-process-completion-before-shared"); @@ -384,7 +411,7 @@ read_rel_block_ll(PG_FUNCTION_ARGS) if (nblocks <= 0 || nblocks > PG_IOV_MAX) elog(ERROR, "nblocks is out of range"); - rel = relation_open(relid, AccessExclusiveLock); + rel = relation_open(relid, AccessShareLock); for (int i = 0; i < nblocks; i++) { @@ -458,6 +485,27 @@ read_rel_block_ll(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } +PG_FUNCTION_INFO_V1(evict_rel); +Datum +evict_rel(PG_FUNCTION_ARGS) +{ + Oid relid = PG_GETARG_OID(0); + Relation rel; + int32 buffers_evicted, + buffers_flushed, + buffers_skipped; + + rel = relation_open(relid, AccessExclusiveLock); + + EvictRelUnpinnedBuffers(rel, &buffers_evicted, &buffers_flushed, + &buffers_skipped); + + relation_close(rel, AccessExclusiveLock); + + + PG_RETURN_VOID(); +} + PG_FUNCTION_INFO_V1(invalidate_rel_block); Datum invalidate_rel_block(PG_FUNCTION_ARGS) @@ -610,6 +658,86 @@ buffer_call_terminate_io(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } + +static BlockNumber +read_stream_for_blocks_cb(ReadStream *stream, + void *callback_private_data, + void *per_buffer_data) +{ + BlocksReadStreamData *stream_data = callback_private_data; + + if (stream_data->curblock >= stream_data->nblocks) + return InvalidBlockNumber; + return stream_data->blocks[stream_data->curblock++]; +} + +PG_FUNCTION_INFO_V1(read_stream_for_blocks); +Datum +read_stream_for_blocks(PG_FUNCTION_ARGS) +{ + Oid relid = PG_GETARG_OID(0); + ArrayType *blocksarray = PG_GETARG_ARRAYTYPE_P(1); + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + Relation rel; + BlocksReadStreamData stream_data; + ReadStream *stream; + + InitMaterializedSRF(fcinfo, 0); + + /* + * We expect the input to be an N-element int4 array; verify that. We + * don't need to use deconstruct_array() since the array data is just + * going to look like a C array of N int4 values. + */ + if (ARR_NDIM(blocksarray) != 1 || + ARR_HASNULL(blocksarray) || + ARR_ELEMTYPE(blocksarray) != INT4OID) + elog(ERROR, "expected 1 dimensional int4 array"); + + stream_data.curblock = 0; + stream_data.nblocks = ARR_DIMS(blocksarray)[0]; + stream_data.blocks = (uint32 *) ARR_DATA_PTR(blocksarray); + + rel = relation_open(relid, AccessShareLock); + + stream = read_stream_begin_relation(READ_STREAM_FULL, + NULL, + rel, + MAIN_FORKNUM, + read_stream_for_blocks_cb, + &stream_data, + 0); + + for (int i = 0; i < stream_data.nblocks; i++) + { + Buffer buf = read_stream_next_buffer(stream, NULL); + Datum values[3] = {0}; + bool nulls[3] = {0}; + + if (!BufferIsValid(buf)) + elog(ERROR, "read_stream_next_buffer() call %d is unexpectedly invalid", i); + + values[0] = Int32GetDatum(i); + values[1] = UInt32GetDatum(stream_data.blocks[i]); + values[2] = UInt32GetDatum(buf); + + tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls); + + ReleaseBuffer(buf); + } + + if (read_stream_next_buffer(stream, NULL) != InvalidBuffer) + elog(ERROR, "read_stream_next_buffer() call %d is unexpectedly valid", + stream_data.nblocks + 1); + + read_stream_end(stream); + + relation_close(rel, NoLock); + + return (Datum) 0; +} + + PG_FUNCTION_INFO_V1(handle_get); Datum handle_get(PG_FUNCTION_ARGS) @@ -680,15 +808,98 @@ batch_end(PG_FUNCTION_ARGS) } #ifdef USE_INJECTION_POINTS -extern PGDLLEXPORT void inj_io_short_read(const char *name, - const void *private_data, - void *arg); +extern PGDLLEXPORT void inj_io_completion_hook(const char *name, + const void *private_data, + void *arg); extern PGDLLEXPORT void inj_io_reopen(const char *name, const void *private_data, void *arg); -void -inj_io_short_read(const char *name, const void *private_data, void *arg) +static bool +inj_io_short_read_matches(PgAioHandle *ioh) +{ + PGPROC *owner_proc; + int32 owner_pid; + PgAioTargetData *td; + + if (!inj_io_error_state->enabled_short_read) + return false; + + if (!inj_io_error_state->short_read_result_set) + return false; + + owner_proc = GetPGProcByNumber(pgaio_io_get_owner(ioh)); + owner_pid = owner_proc->pid; + + if (inj_io_error_state->short_read_pid != 0 && + inj_io_error_state->short_read_pid != owner_pid) + return false; + + td = pgaio_io_get_target_data(ioh); + + if (inj_io_error_state->short_read_relfilenode != InvalidOid && + td->smgr.rlocator.relNumber != inj_io_error_state->short_read_relfilenode) + return false; + + /* + * Only shorten reads that are actually longer than the target size, + * otherwise we can trigger over-reads. + */ + if (inj_io_error_state->short_read_result >= ioh->result) + return false; + + return true; +} + +static bool +inj_io_completion_wait_matches(PgAioHandle *ioh) +{ + PGPROC *owner_proc; + int32 owner_pid; + PgAioTargetData *td; + + if (!inj_io_error_state->enabled_completion_wait) + return false; + + owner_proc = GetPGProcByNumber(pgaio_io_get_owner(ioh)); + owner_pid = owner_proc->pid; + + if (inj_io_error_state->completion_wait_pid != owner_pid) + return false; + + td = pgaio_io_get_target_data(ioh); + + if (inj_io_error_state->completion_wait_relfilenode != InvalidOid && + td->smgr.rlocator.relNumber != inj_io_error_state->completion_wait_relfilenode) + return false; + + return true; +} + +static void +inj_io_completion_wait_hook(const char *name, const void *private_data, void *arg) +{ + PgAioHandle *ioh = (PgAioHandle *) arg; + + if (!inj_io_completion_wait_matches(ioh)) + return; + + ConditionVariablePrepareToSleep(&inj_io_error_state->cv); + + while (true) + { + if (!inj_io_completion_wait_matches(ioh)) + break; + + ConditionVariableSleep(&inj_io_error_state->cv, + inj_io_error_state->completion_wait_event); + } + + ConditionVariableCancelSleep(); +} + +static void +inj_io_short_read_hook(const char *name, const void *private_data, void *arg) { PgAioHandle *ioh = (PgAioHandle *) arg; @@ -697,58 +908,56 @@ inj_io_short_read(const char *name, const void *private_data, void *arg) inj_io_error_state->enabled_reopen), errhidestmt(true), errhidecontext(true)); - if (inj_io_error_state->enabled_short_read) + if (inj_io_short_read_matches(ioh)) { + struct iovec *iov = &pgaio_ctl->iovecs[ioh->iovec_off]; + int32 old_result = ioh->result; + int32 new_result = inj_io_error_state->short_read_result; + int32 processed = 0; + + ereport(LOG, + errmsg("short read inject point, changing result from %d to %d", + old_result, new_result), + errhidestmt(true), errhidecontext(true)); + /* - * Only shorten reads that are actually longer than the target size, - * otherwise we can trigger over-reads. + * The underlying IO actually completed OK, and thus the "invalid" + * portion of the IOV actually contains valid data. That can hide a + * lot of problems, e.g. if we were to wrongly mark a buffer, that + * wasn't read according to the shortened-read, IO as valid, the + * contents would look valid and we might miss a bug. + * + * To avoid that, iterate through the IOV and zero out the "failed" + * portion of the IO. */ - if (inj_io_error_state->short_read_result_set - && ioh->op == PGAIO_OP_READV - && inj_io_error_state->short_read_result <= ioh->result) + for (int i = 0; i < ioh->op_data.read.iov_length; i++) { - struct iovec *iov = &pgaio_ctl->iovecs[ioh->iovec_off]; - int32 old_result = ioh->result; - int32 new_result = inj_io_error_state->short_read_result; - int32 processed = 0; - - ereport(LOG, - errmsg("short read inject point, changing result from %d to %d", - old_result, new_result), - errhidestmt(true), errhidecontext(true)); - - /* - * The underlying IO actually completed OK, and thus the "invalid" - * portion of the IOV actually contains valid data. That can hide - * a lot of problems, e.g. if we were to wrongly mark a buffer, - * that wasn't read according to the shortened-read, IO as valid, - * the contents would look valid and we might miss a bug. - * - * To avoid that, iterate through the IOV and zero out the - * "failed" portion of the IO. - */ - for (int i = 0; i < ioh->op_data.read.iov_length; i++) + if (processed + iov[i].iov_len <= new_result) + processed += iov[i].iov_len; + else if (processed <= new_result) { - if (processed + iov[i].iov_len <= new_result) - processed += iov[i].iov_len; - else if (processed <= new_result) - { - uint32 ok_part = new_result - processed; - - memset((char *) iov[i].iov_base + ok_part, 0, iov[i].iov_len - ok_part); - processed += iov[i].iov_len; - } - else - { - memset((char *) iov[i].iov_base, 0, iov[i].iov_len); - } - } + uint32 ok_part = new_result - processed; - ioh->result = new_result; + memset((char *) iov[i].iov_base + ok_part, 0, iov[i].iov_len - ok_part); + processed += iov[i].iov_len; + } + else + { + memset((char *) iov[i].iov_base, 0, iov[i].iov_len); + } } + + ioh->result = new_result; } } +void +inj_io_completion_hook(const char *name, const void *private_data, void *arg) +{ + inj_io_completion_wait_hook(name, private_data, arg); + inj_io_short_read_hook(name, private_data, arg); +} + void inj_io_reopen(const char *name, const void *private_data, void *arg) { @@ -762,6 +971,39 @@ inj_io_reopen(const char *name, const void *private_data, void *arg) } #endif +PG_FUNCTION_INFO_V1(inj_io_completion_wait); +Datum +inj_io_completion_wait(PG_FUNCTION_ARGS) +{ +#ifdef USE_INJECTION_POINTS + inj_io_error_state->enabled_completion_wait = true; + inj_io_error_state->completion_wait_pid = + PG_ARGISNULL(0) ? 0 : PG_GETARG_INT32(0); + inj_io_error_state->completion_wait_relfilenode = + PG_ARGISNULL(1) ? InvalidOid : PG_GETARG_OID(1); +#else + elog(ERROR, "injection points not supported"); +#endif + + PG_RETURN_VOID(); +} + +PG_FUNCTION_INFO_V1(inj_io_completion_continue); +Datum +inj_io_completion_continue(PG_FUNCTION_ARGS) +{ +#ifdef USE_INJECTION_POINTS + inj_io_error_state->enabled_completion_wait = false; + inj_io_error_state->completion_wait_pid = 0; + inj_io_error_state->completion_wait_relfilenode = InvalidOid; + ConditionVariableBroadcast(&inj_io_error_state->cv); +#else + elog(ERROR, "injection points not supported"); +#endif + + PG_RETURN_VOID(); +} + PG_FUNCTION_INFO_V1(inj_io_short_read_attach); Datum inj_io_short_read_attach(PG_FUNCTION_ARGS) @@ -771,6 +1013,10 @@ inj_io_short_read_attach(PG_FUNCTION_ARGS) inj_io_error_state->short_read_result_set = !PG_ARGISNULL(0); if (inj_io_error_state->short_read_result_set) inj_io_error_state->short_read_result = PG_GETARG_INT32(0); + inj_io_error_state->short_read_pid = + PG_ARGISNULL(1) ? 0 : PG_GETARG_INT32(1); + inj_io_error_state->short_read_relfilenode = + PG_ARGISNULL(2) ? 0 : PG_GETARG_OID(2); #else elog(ERROR, "injection points not supported"); #endif diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 1c8610fd46c..db583985813 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -302,6 +302,7 @@ BlockSampler BlockSamplerData BlockedProcData BlockedProcsData +BlocksReadStreamData BlocktableEntry BloomBuildState BloomFilter -- 2.43.0
From e9e0bc1c73de0edc23a391a59f48ea3ee64cf707 Mon Sep 17 00:00:00 2001 From: Melanie Plageman <[email protected]> Date: Fri, 23 Jan 2026 13:09:14 -0500 Subject: [PATCH v3 3/5] fix test --- src/test/modules/test_aio/t/004_read_stream.pl | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/test/modules/test_aio/t/004_read_stream.pl b/src/test/modules/test_aio/t/004_read_stream.pl index 89cfabbb1d3..64567de37c2 100644 --- a/src/test/modules/test_aio/t/004_read_stream.pl +++ b/src/test/modules/test_aio/t/004_read_stream.pl @@ -205,15 +205,13 @@ SELECT array_agg(blocknum) FROM read_stream_for_blocks('largeish', ARRAY[0, 2, 5 $psql_a->{run}, $psql_a->{timeout}, \$psql_a->{stdout}, qr/\{0,2,5,7\}/); - $psql_b->{run}->pump_nb(); - like( - $psql_b->{stderr}, - qr/.*ERROR.*could not read blocks 5..5.*$/, - "$io_method: injected error occurred"); + pump_until( + $psql_b->{run}, $psql_b->{timeout}, + \$psql_b->{stderr}, qr/ERROR.*could not read blocks 5\.\.5/); + ok(1, "$io_method: injected error occurred"); $psql_b->{stderr} = ''; $psql_b->query_safe(qq/SELECT inj_io_short_read_detach();/); - ok(1, qq/$io_method: read stream encounters failing IO by another backend/); -- 2.43.0
From 276450fe9f84c409164cbd0f33971a224644eb79 Mon Sep 17 00:00:00 2001 From: Melanie Plageman <[email protected]> Date: Fri, 23 Jan 2026 13:54:02 -0500 Subject: [PATCH v3 4/5] Make buffer hit helper Already two places count buffer hits, requiring quite a few lines of code since we do accounting in so many places. Future commits will add more locations, so refactor into a helper. --- src/backend/storage/buffer/bufmgr.c | 111 ++++++++++++++-------------- 1 file changed, 56 insertions(+), 55 deletions(-) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 6f935648ae9..bad8894011a 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -638,6 +638,10 @@ static inline BufferDesc *BufferAlloc(SMgrRelation smgr, bool *foundPtr, IOContext io_context); static bool AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress); static void CheckReadBuffersOperation(ReadBuffersOperation *operation, bool is_complete); + +static void ProcessBufferHit(BufferAccessStrategy strategy, + Relation rel, char persistence, SMgrRelation smgr, + ForkNumber forknum, BlockNumber blocknum); static Buffer GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context); static void FlushUnlockedBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, IOContext io_context); @@ -1216,8 +1220,6 @@ PinBufferForBlock(Relation rel, bool *foundPtr) { BufferDesc *bufHdr; - IOContext io_context; - IOObject io_object; Assert(blockNum != P_NEW); @@ -1226,17 +1228,6 @@ PinBufferForBlock(Relation rel, persistence == RELPERSISTENCE_PERMANENT || persistence == RELPERSISTENCE_UNLOGGED)); - if (persistence == RELPERSISTENCE_TEMP) - { - io_context = IOCONTEXT_NORMAL; - io_object = IOOBJECT_TEMP_RELATION; - } - else - { - io_context = IOContextForStrategy(strategy); - io_object = IOOBJECT_RELATION; - } - TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum, smgr->smgr_rlocator.locator.spcOid, smgr->smgr_rlocator.locator.dbOid, @@ -1244,18 +1235,11 @@ PinBufferForBlock(Relation rel, smgr->smgr_rlocator.backend); if (persistence == RELPERSISTENCE_TEMP) - { bufHdr = LocalBufferAlloc(smgr, forkNum, blockNum, foundPtr); - if (*foundPtr) - pgBufferUsage.local_blks_hit++; - } else - { bufHdr = BufferAlloc(smgr, persistence, forkNum, blockNum, - strategy, foundPtr, io_context); - if (*foundPtr) - pgBufferUsage.shared_blks_hit++; - } + strategy, foundPtr, IOContextForStrategy(strategy)); + if (rel) { /* @@ -1264,22 +1248,10 @@ PinBufferForBlock(Relation rel, * zeroed instead), the per-relation stats always count them. */ pgstat_count_buffer_read(rel); - if (*foundPtr) - pgstat_count_buffer_hit(rel); } - if (*foundPtr) - { - pgstat_count_io_op(io_object, io_context, IOOP_HIT, 1, 0); - if (VacuumCostActive) - VacuumCostBalance += VacuumCostPageHit; - TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum, - smgr->smgr_rlocator.locator.spcOid, - smgr->smgr_rlocator.locator.dbOid, - smgr->smgr_rlocator.locator.relNumber, - smgr->smgr_rlocator.backend, - true); - } + if (*foundPtr) + ProcessBufferHit(strategy, rel, persistence, smgr, forkNum, blockNum); return BufferDescriptorGetBuffer(bufHdr); } @@ -1685,6 +1657,51 @@ ReadBuffersCanStartIO(Buffer buffer, bool nowait) return ReadBuffersCanStartIOOnce(buffer, nowait); } +/* + * We track various stats related to buffer hits. Because this is done in a + * few separate places, this helper exists for convenience. + */ +static void +ProcessBufferHit(BufferAccessStrategy strategy, + Relation rel, char persistence, SMgrRelation smgr, + ForkNumber forknum, BlockNumber blocknum) +{ + IOContext io_context; + IOObject io_object; + + if (persistence == RELPERSISTENCE_TEMP) + { + io_context = IOCONTEXT_NORMAL; + io_object = IOOBJECT_TEMP_RELATION; + } + else + { + io_context = IOContextForStrategy(strategy); + io_object = IOOBJECT_RELATION; + } + + TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, + blocknum, + smgr->smgr_rlocator.locator.spcOid, + smgr->smgr_rlocator.locator.dbOid, + smgr->smgr_rlocator.locator.relNumber, + smgr->smgr_rlocator.backend, + true); + + if (persistence == RELPERSISTENCE_TEMP) + pgBufferUsage.local_blks_hit += 1; + else + pgBufferUsage.shared_blks_hit += 1; + + if (rel) + pgstat_count_buffer_hit(rel); + + pgstat_count_io_op(io_object, io_context, IOOP_HIT, 1, 0); + + if (VacuumCostActive) + VacuumCostBalance += VacuumCostPageHit; +} + /* * Helper for WaitReadBuffers() that processes the results of a readv * operation, raising an error if necessary. @@ -1980,25 +1997,9 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) * must have started out as a miss in PinBufferForBlock(). The other * backend will track this as a 'read'. */ - TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, blocknum + operation->nblocks_done, - operation->smgr->smgr_rlocator.locator.spcOid, - operation->smgr->smgr_rlocator.locator.dbOid, - operation->smgr->smgr_rlocator.locator.relNumber, - operation->smgr->smgr_rlocator.backend, - true); - - if (persistence == RELPERSISTENCE_TEMP) - pgBufferUsage.local_blks_hit += 1; - else - pgBufferUsage.shared_blks_hit += 1; - - if (operation->rel) - pgstat_count_buffer_hit(operation->rel); - - pgstat_count_io_op(io_object, io_context, IOOP_HIT, 1, 0); - - if (VacuumCostActive) - VacuumCostBalance += VacuumCostPageHit; + ProcessBufferHit(operation->strategy, operation->rel, persistence, + operation->smgr, forknum, + blocknum + operation->nblocks_done); } else { -- 2.43.0
From fb9ba6b67df5060bcd788cbd72988734718c6a7d Mon Sep 17 00:00:00 2001 From: Melanie Plageman <[email protected]> Date: Fri, 23 Jan 2026 14:00:31 -0500 Subject: [PATCH v3 5/5] Don't wait for already in-progress IO MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a backend attempts to start a read on a buffer and finds that I/O is already in progress, it previously waited for that I/O to complete before initiating reads for any other buffers. Although the backend must still wait for the I/O to finish when later acquiring the buffer, it should not need to wait at read start time. Other buffers may be available for I/O, and in some workloads this waiting significantly reduces concurrency. For example, index scans may repeatedly request the same heap block. If the backend waits each time it encounters an in-progress read, the access pattern effectively degenerates into synchronous I/O. By introducing the concept of foreign I/O operations, a backend can record the buffer’s wait reference and defer waiting until WaitReadBuffers() when it actually acquires the buffer. In rare cases, a backend may still need to wait when starting a read if it encounters a buffer after another backend has set BM_IO_IN_PROGRESS but before the buffer descriptor’s wait reference has been set. Such windows should be brief and uncommon. --- src/backend/storage/buffer/bufmgr.c | 481 ++++++++++++++++++---------- src/include/storage/bufmgr.h | 1 + src/tools/pgindent/typedefs.list | 1 + 3 files changed, 320 insertions(+), 163 deletions(-) diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index bad8894011a..55c77e10a81 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -169,6 +169,21 @@ typedef struct SMgrSortArray SMgrRelation srel; } SMgrSortArray; + +/* + * In AsyncReadBuffers(), when preparing a buffer for reading and setting + * BM_IO_IN_PROGRESS, the buffer may already have I/O in progress or may + * already contain the desired block. AsyncReadBuffers() must distinguish + * between these cases (and the case where it should initiate I/O) so it can + * mark an in-progress buffer as foreign I/O rather than waiting on it. + */ +typedef enum PrepareReadBuffer_Status +{ + READ_BUFFER_ALREADY_DONE, + READ_BUFFER_IN_PROGRESS, + READ_BUFFER_READY_FOR_IO, +} PrepareReadBuffer_Status; + /* GUC variables */ bool zero_damaged_pages = false; int bgwriter_lru_maxpages = 100; @@ -1618,45 +1633,6 @@ CheckReadBuffersOperation(ReadBuffersOperation *operation, bool is_complete) #endif } -/* helper for ReadBuffersCanStartIO(), to avoid repetition */ -static inline bool -ReadBuffersCanStartIOOnce(Buffer buffer, bool nowait) -{ - if (BufferIsLocal(buffer)) - return StartLocalBufferIO(GetLocalBufferDescriptor(-buffer - 1), - true, nowait); - else - return StartBufferIO(GetBufferDescriptor(buffer - 1), true, nowait); -} - -/* - * Helper for AsyncReadBuffers that tries to get the buffer ready for IO. - */ -static inline bool -ReadBuffersCanStartIO(Buffer buffer, bool nowait) -{ - /* - * If this backend currently has staged IO, we need to submit the pending - * IO before waiting for the right to issue IO, to avoid the potential for - * deadlocks (and, more commonly, unnecessary delays for other backends). - */ - if (!nowait && pgaio_have_staged()) - { - if (ReadBuffersCanStartIOOnce(buffer, true)) - return true; - - /* - * Unfortunately StartBufferIO() returning false doesn't allow to - * distinguish between the buffer already being valid and IO already - * being in progress. Since IO already being in progress is quite - * rare, this approach seems fine. - */ - pgaio_submit_staged(); - } - - return ReadBuffersCanStartIOOnce(buffer, nowait); -} - /* * We track various stats related to buffer hits. Because this is done in a * few separate places, this helper exists for convenience. @@ -1806,7 +1782,7 @@ WaitReadBuffers(ReadBuffersOperation *operation) * * we first check if we already know the IO is complete. */ - if (aio_ret->result.status == PGAIO_RS_UNKNOWN && + if ((operation->foreign_io || aio_ret->result.status == PGAIO_RS_UNKNOWN) && !pgaio_wref_check_done(&operation->io_wref)) { instr_time io_start = pgstat_prepare_io_time(track_io_timing); @@ -1825,11 +1801,33 @@ WaitReadBuffers(ReadBuffersOperation *operation) Assert(pgaio_wref_check_done(&operation->io_wref)); } - /* - * We now are sure the IO completed. Check the results. This - * includes reporting on errors if there were any. - */ - ProcessReadBuffersResult(operation); + if (unlikely(operation->foreign_io)) + { + Buffer buffer = operation->buffers[operation->nblocks_done]; + BufferDesc *desc = BufferIsLocal(buffer) ? + GetLocalBufferDescriptor(-buffer - 1) : + GetBufferDescriptor(buffer - 1); + uint32 buf_state = pg_atomic_read_u64(&desc->state); + + if (buf_state & BM_VALID) + { + operation->nblocks_done += 1; + Assert(operation->nblocks_done <= operation->nblocks); + + ProcessBufferHit(operation->strategy, + operation->rel, operation->persistence, + operation->smgr, operation->forknum, + operation->blocknum + operation->nblocks_done); + } + } + else + { + /* + * We now are sure the IO completed. Check the results. This + * includes reporting on errors if there were any. + */ + ProcessReadBuffersResult(operation); + } } /* @@ -1860,6 +1858,159 @@ WaitReadBuffers(ReadBuffersOperation *operation) /* NB: READ_DONE tracepoint was already executed in completion callback */ } +/* + * Local version of PrepareNewReadBufferIO(). Here instead of localbuf.c to + * avoid an external function call. + */ +static PrepareReadBuffer_Status +PrepareNewLocalReadBufferIO(ReadBuffersOperation *operation, + Buffer buffer) +{ + BufferDesc *desc = GetLocalBufferDescriptor(-buffer - 1); + uint64 buf_state = pg_atomic_read_u64(&desc->state); + + /* Already valid, no work to do */ + if (buf_state & BM_VALID) + { + pgaio_wref_clear(&operation->io_wref); + return READ_BUFFER_ALREADY_DONE; + } + + pgaio_submit_staged(); + + if (pgaio_wref_valid(&desc->io_wref)) + { + operation->io_wref = desc->io_wref; + operation->foreign_io = true; + return READ_BUFFER_IN_PROGRESS; + } + + return READ_BUFFER_READY_FOR_IO; +} + +/* + * Try to start IO on the first buffer in a new run of blocks. If AIO is in + * progress, be it in this backend or another backend, we just associate the + * wait reference with the operation and wait in WaitReadBuffers(). This turns + * out to be important for performance in two workloads: + * + * 1) A read stream that has to read the same block multiple times within the + * readahead distance. This can happen e.g. for the table accesses of an + * index scan. + * + * 2) Concurrent scans by multiple backends on the same relation. + * + * If we were to synchronously wait for the in-progress IO, we'd not be able + * to keep enough I/O in flight. + * + * If we do find there is ongoing I/O for the buffer, we set up a 1-block + * ReadBuffersOperation that WaitReadBuffers then can wait on. + * + * It's possible that another backend has started IO on the buffer but not yet + * set its wait reference. In this case, we have no choice but to wait for + * either the wait reference to be valid or the IO to be done. + */ +static PrepareReadBuffer_Status +PrepareNewReadBufferIO(ReadBuffersOperation *operation, + Buffer buffer) +{ + uint64 buf_state; + BufferDesc *desc; + + if (BufferIsLocal(buffer)) + return PrepareNewLocalReadBufferIO(operation, buffer); + + ResourceOwnerEnlarge(CurrentResourceOwner); + desc = GetBufferDescriptor(buffer - 1); + + for (;;) + { + buf_state = LockBufHdr(desc); + + /* Already valid, no work to do */ + if (buf_state & BM_VALID) + { + UnlockBufHdr(desc); + pgaio_wref_clear(&operation->io_wref); + return READ_BUFFER_ALREADY_DONE; + } + + if (buf_state & BM_IO_IN_PROGRESS) + { + /* Join existing read */ + if (pgaio_wref_valid(&desc->io_wref)) + { + operation->io_wref = desc->io_wref; + operation->foreign_io = true; + UnlockBufHdr(desc); + return READ_BUFFER_IN_PROGRESS; + } + + /* + * If the wait ref is not valid but the IO is in progress, someone + * else started IO but hasn't set the wait ref yet. We have no + * choice but to wait until the wait ref is set or the IO + * completes. + */ + UnlockBufHdr(desc); + pgaio_submit_staged(); + WaitIO(desc); + continue; + } + + /* + * No IO in progress and not already valid; We will start IO. It's + * possible that the IO was in progress and never became valid because + * the IO errored out. We'll do the IO ourselves. + */ + UnlockBufHdrExt(desc, buf_state, BM_IO_IN_PROGRESS, 0, 0); + ResourceOwnerRememberBufferIO(CurrentResourceOwner, + BufferDescriptorGetBuffer(desc)); + + return READ_BUFFER_READY_FOR_IO; + } +} + + +/* + * When building a new IO from multiple buffers, we won't include buffers + * that are already valid or already in progress. This function should only be + * used for additional adjacent buffers following the head buffer in a new IO. + * + * Returns true if the buffer was successfully prepared for IO and false if it + * is rejected and the read IO should not include this buffer. +*/ +static bool +PrepareAdditionalReadBuffer(Buffer buffer) +{ + uint64 buf_state; + BufferDesc *desc; + + if (BufferIsLocal(buffer)) + { + desc = GetLocalBufferDescriptor(-buffer - 1); + buf_state = pg_atomic_read_u64(&desc->state); + /* Local buffers don't use BM_IO_IN_PROGRESS */ + if (buf_state & BM_VALID || pgaio_wref_valid(&desc->io_wref)) + return false; + } + else + { + ResourceOwnerEnlarge(CurrentResourceOwner); + desc = GetBufferDescriptor(buffer - 1); + buf_state = LockBufHdr(desc); + if (buf_state & (BM_VALID | BM_IO_IN_PROGRESS)) + { + UnlockBufHdr(desc); + return false; + } + UnlockBufHdrExt(desc, buf_state, BM_IO_IN_PROGRESS, 0, 0); + ResourceOwnerRememberBufferIO(CurrentResourceOwner, buffer); + } + + return true; +} + /* * Initiate IO for the ReadBuffersOperation * @@ -1893,7 +2044,75 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) void *io_pages[MAX_IO_COMBINE_LIMIT]; IOContext io_context; IOObject io_object; - bool did_start_io; + instr_time io_start; + PrepareReadBuffer_Status status; + + /* + * We must get an IO handle before StartNewBufferReadIO(), as + * pgaio_io_acquire() might block, which we don't want after setting + * IO_IN_PROGRESS. If we don't need to do the IO, we'll release the + * handle. + * + * If we need to wait for IO before we can get a handle, submit + * already-staged IO first, so that other backends don't need to wait. + * There wouldn't be a deadlock risk, as pgaio_io_acquire() just needs to + * wait for already submitted IO, which doesn't require additional locks, + * but it could still cause undesirable waits. + * + * A secondary benefit is that this would allow us to measure the time in + * pgaio_io_acquire() without causing undue timer overhead in the common, + * non-blocking, case. However, currently the pgstats infrastructure + * doesn't really allow that, as it a) asserts that an operation can't + * have time without operations b) doesn't have an API to report + * "accumulated" time. + */ + ioh = pgaio_io_acquire_nb(CurrentResourceOwner, &operation->io_return); + if (unlikely(!ioh)) + { + pgaio_submit_staged(); + ioh = pgaio_io_acquire(CurrentResourceOwner, &operation->io_return); + } + + operation->foreign_io = false; + + /* Check if we can start IO on the first to-be-read buffer */ + if ((status = PrepareNewReadBufferIO(operation, buffers[nblocks_done])) < + READ_BUFFER_READY_FOR_IO) + { + pgaio_io_release(ioh); + *nblocks_progress = 1; + if (status == READ_BUFFER_ALREADY_DONE) + { + /* + * Someone else has already completed this block, we're done. + * + * When IO is necessary, ->nblocks_done is updated in + * ProcessReadBuffersResult(), but that is not called if no IO is + * necessary. Thus update here. + */ + operation->nblocks_done += 1; + Assert(operation->nblocks_done <= operation->nblocks); + + /* + * Report and track this as a 'hit' for this backend, even though + * it must have started out as a miss in PinBufferForBlock(). The + * other backend will track this as a 'read'. + */ + ProcessBufferHit(operation->strategy, + operation->rel, operation->persistence, + operation->smgr, operation->forknum, + operation->blocknum + operation->nblocks_done); + return false; + } + + /* The IO is already in-progress */ + Assert(status == READ_BUFFER_IN_PROGRESS); + CheckReadBuffersOperation(operation, false); + return true; + } + + /* We can read in at least the head buffer . */ + Assert(status == READ_BUFFER_READY_FOR_IO); /* * When this IO is executed synchronously, either because the caller will @@ -1944,138 +2163,74 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) */ pgstat_prepare_report_checksum_failure(operation->smgr->smgr_rlocator.locator.dbOid); - /* - * Get IO handle before ReadBuffersCanStartIO(), as pgaio_io_acquire() - * might block, which we don't want after setting IO_IN_PROGRESS. - * - * If we need to wait for IO before we can get a handle, submit - * already-staged IO first, so that other backends don't need to wait. - * There wouldn't be a deadlock risk, as pgaio_io_acquire() just needs to - * wait for already submitted IO, which doesn't require additional locks, - * but it could still cause undesirable waits. - * - * A secondary benefit is that this would allow us to measure the time in - * pgaio_io_acquire() without causing undue timer overhead in the common, - * non-blocking, case. However, currently the pgstats infrastructure - * doesn't really allow that, as it a) asserts that an operation can't - * have time without operations b) doesn't have an API to report - * "accumulated" time. - */ - ioh = pgaio_io_acquire_nb(CurrentResourceOwner, &operation->io_return); - if (unlikely(!ioh)) - { - pgaio_submit_staged(); - - ioh = pgaio_io_acquire(CurrentResourceOwner, &operation->io_return); - } + Assert(io_buffers[0] == buffers[nblocks_done]); + io_pages[0] = BufferGetBlock(buffers[nblocks_done]); + io_buffers_len = 1; /* - * Check if we can start IO on the first to-be-read buffer. - * - * If an I/O is already in progress in another backend, we want to wait - * for the outcome: either done, or something went wrong and we will - * retry. + * How many neighboring-on-disk blocks can we scatter-read into other + * buffers at the same time? In this case we don't wait if we see an I/O + * already in progress. We already set BM_IO_IN_PROGRESS for the head + * block, so we should get on with that I/O as soon as possible. */ - if (!ReadBuffersCanStartIO(buffers[nblocks_done], false)) + for (int i = nblocks_done + 1; i < operation->nblocks; i++) { - /* - * Someone else has already completed this block, we're done. - * - * When IO is necessary, ->nblocks_done is updated in - * ProcessReadBuffersResult(), but that is not called if no IO is - * necessary. Thus update here. - */ - operation->nblocks_done += 1; - *nblocks_progress = 1; - - pgaio_io_release(ioh); - pgaio_wref_clear(&operation->io_wref); - did_start_io = false; + if (!PrepareAdditionalReadBuffer(buffers[i])) + break; + /* Must be consecutive block numbers. */ + Assert(BufferGetBlockNumber(buffers[i - 1]) == + BufferGetBlockNumber(buffers[i]) - 1); + Assert(io_buffers[io_buffers_len] == buffers[i]); - /* - * Report and track this as a 'hit' for this backend, even though it - * must have started out as a miss in PinBufferForBlock(). The other - * backend will track this as a 'read'. - */ - ProcessBufferHit(operation->strategy, operation->rel, persistence, - operation->smgr, forknum, - blocknum + operation->nblocks_done); + io_pages[io_buffers_len++] = BufferGetBlock(buffers[i]); } - else - { - instr_time io_start; - - /* We found a buffer that we need to read in. */ - Assert(io_buffers[0] == buffers[nblocks_done]); - io_pages[0] = BufferGetBlock(buffers[nblocks_done]); - io_buffers_len = 1; - - /* - * How many neighboring-on-disk blocks can we scatter-read into other - * buffers at the same time? In this case we don't wait if we see an - * I/O already in progress. We already set BM_IO_IN_PROGRESS for the - * head block, so we should get on with that I/O as soon as possible. - */ - for (int i = nblocks_done + 1; i < operation->nblocks; i++) - { - if (!ReadBuffersCanStartIO(buffers[i], true)) - break; - /* Must be consecutive block numbers. */ - Assert(BufferGetBlockNumber(buffers[i - 1]) == - BufferGetBlockNumber(buffers[i]) - 1); - Assert(io_buffers[io_buffers_len] == buffers[i]); - io_pages[io_buffers_len++] = BufferGetBlock(buffers[i]); - } + /* get a reference to wait for in WaitReadBuffers() */ + pgaio_io_get_wref(ioh, &operation->io_wref); - /* get a reference to wait for in WaitReadBuffers() */ - pgaio_io_get_wref(ioh, &operation->io_wref); + /* provide the list of buffers to the completion callbacks */ + pgaio_io_set_handle_data_32(ioh, (uint32 *) io_buffers, io_buffers_len); - /* provide the list of buffers to the completion callbacks */ - pgaio_io_set_handle_data_32(ioh, (uint32 *) io_buffers, io_buffers_len); + pgaio_io_register_callbacks(ioh, + persistence == RELPERSISTENCE_TEMP ? + PGAIO_HCB_LOCAL_BUFFER_READV : + PGAIO_HCB_SHARED_BUFFER_READV, + flags); - pgaio_io_register_callbacks(ioh, - persistence == RELPERSISTENCE_TEMP ? - PGAIO_HCB_LOCAL_BUFFER_READV : - PGAIO_HCB_SHARED_BUFFER_READV, - flags); + pgaio_io_set_flag(ioh, ioh_flags); - pgaio_io_set_flag(ioh, ioh_flags); + /* --- + * Even though we're trying to issue IO asynchronously, track the time + * in smgrstartreadv(): + * - if io_method == IOMETHOD_SYNC, we will always perform the IO + * immediately + * - the io method might not support the IO (e.g. worker IO for a temp + * table) + * --- + */ + io_start = pgstat_prepare_io_time(track_io_timing); + smgrstartreadv(ioh, operation->smgr, forknum, + blocknum + nblocks_done, + io_pages, io_buffers_len); + pgstat_count_io_op_time(io_object, io_context, IOOP_READ, + io_start, 1, io_buffers_len * BLCKSZ); - /* --- - * Even though we're trying to issue IO asynchronously, track the time - * in smgrstartreadv(): - * - if io_method == IOMETHOD_SYNC, we will always perform the IO - * immediately - * - the io method might not support the IO (e.g. worker IO for a temp - * table) - * --- - */ - io_start = pgstat_prepare_io_time(track_io_timing); - smgrstartreadv(ioh, operation->smgr, forknum, - blocknum + nblocks_done, - io_pages, io_buffers_len); - pgstat_count_io_op_time(io_object, io_context, IOOP_READ, - io_start, 1, io_buffers_len * BLCKSZ); - - if (persistence == RELPERSISTENCE_TEMP) - pgBufferUsage.local_blks_read += io_buffers_len; - else - pgBufferUsage.shared_blks_read += io_buffers_len; + if (persistence == RELPERSISTENCE_TEMP) + pgBufferUsage.local_blks_read += io_buffers_len; + else + pgBufferUsage.shared_blks_read += io_buffers_len; - /* - * Track vacuum cost when issuing IO, not after waiting for it. - * Otherwise we could end up issuing a lot of IO in a short timespan, - * despite a low cost limit. - */ - if (VacuumCostActive) - VacuumCostBalance += VacuumCostPageMiss * io_buffers_len; + /* + * Track vacuum cost when issuing IO, not after waiting for it. Otherwise + * we could end up issuing a lot of IO in a short timespan, despite a low + * cost limit. + */ + if (VacuumCostActive) + VacuumCostBalance += VacuumCostPageMiss * io_buffers_len; - *nblocks_progress = io_buffers_len; - did_start_io = true; - } + *nblocks_progress = io_buffers_len; - return did_start_io; + return true; } /* diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index a40adf6b2a8..1358fc7fa64 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -147,6 +147,7 @@ struct ReadBuffersOperation int flags; int16 nblocks; int16 nblocks_done; + bool foreign_io; PgAioWaitRef io_wref; PgAioReturn io_return; }; diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index db583985813..6c6bdc8ac4f 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2341,6 +2341,7 @@ PredicateLockData PredicateLockTargetType PrefetchBufferResult PrepParallelRestorePtrType +PrepareReadBuffer_Status PrepareStmt PreparedStatement PresortedKeyData -- 2.43.0
