On 4 January 2017 at 12:39, Craig Ringer <cr...@2ndquadrant.com> wrote:
> To keep things together, I've followed up on the logical decoding on > standby thread that now incorporates all these patches. Attached are the two patches discussed upthread, in their proposed-for-commit form, as requested by Simon. These correspond to patches 0001 and 0004 of the logical decoding on standby series at https://www.postgresql.org/message-id/CAMsr+YEzC=-+eV09A=ra150fjtkmtqt5q70piqbwytbor3c...@mail.gmail.com . This subset is tracked as https://commitfest.postgresql.org/12/883/ . When committed I will update the decoding on standby series to omit these pre-requisite patches. -- Craig Ringer http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Training & Services
From 79241c8052fbd1ecd079e98fd8564e4b2fcf797b Mon Sep 17 00:00:00 2001 From: Craig Ringer <cr...@2ndquadrant.com> Date: Mon, 14 Nov 2016 12:27:17 +0800 Subject: [PATCH 1/2] PostgresNode methods to wait for node catchup --- src/test/perl/PostgresNode.pm | 172 ++++++++++++++++++++++++++++- src/test/recovery/t/001_stream_rep.pl | 12 +- src/test/recovery/t/004_timeline_switch.pl | 16 +-- 3 files changed, 175 insertions(+), 25 deletions(-) diff --git a/src/test/perl/PostgresNode.pm b/src/test/perl/PostgresNode.pm index c1b16ca..2f009d4 100644 --- a/src/test/perl/PostgresNode.pm +++ b/src/test/perl/PostgresNode.pm @@ -1121,7 +1121,6 @@ sub psql my $exc_save = $@; if ($exc_save) { - # IPC::Run::run threw an exception. re-throw unless it's a # timeout, which we'll handle by testing is_expired die $exc_save @@ -1173,7 +1172,7 @@ sub psql if $ret == 1; die "connection error: '$$stderr'\nwhile running '@psql_params'" if $ret == 2; - die "error running SQL: '$$stderr'\nwhile running '@psql_params'" + die "error running SQL: '$$stderr'\nwhile running '@psql_params' with sql '$sql'" if $ret == 3; die "psql returns $ret: '$$stderr'\nwhile running '@psql_params'"; } @@ -1325,6 +1324,175 @@ sub run_log TestLib::run_log(@_); } +=pod $node->lsn(mode) + +Look up xlog positions on the server: + +* insert position (master only, error on replica) +* write position (master only, error on replica) +* flush position +* receive position (always undef on master) +* replay position + +mode must be specified. + +=cut + +sub lsn +{ + my ($self, $mode) = @_; + my %modes = ('insert' => 'pg_current_xlog_insert_location()', + 'flush' => 'pg_current_xlog_flush_location()', + 'write' => 'pg_current_xlog_location()', + 'receive' => 'pg_last_xlog_receive_location()', + 'replay' => 'pg_last_xlog_replay_location()'); + + $mode = '<undef>' if !defined($mode); + die "unknown mode for 'lsn': '$mode', valid modes are " . join(', ', keys %modes) + if !defined($modes{$mode}); + + my $result = $self->safe_psql('postgres', "SELECT $modes{$mode}"); + chomp($result); + if ($result eq '') + { + return undef; + } + else + { + return $result; + } +} + +=pod $node->wait_for_catchup(standby_name, mode, target_lsn) + +Wait for the node with application_name standby_name (usually from node->name) +until its replication position in pg_stat_replication equals or passes the +upstream's xlog insert point at the time this function is called. By default +the replay_location is waited for, but 'mode' may be specified to wait for any +of sent|write|flush|replay. + +If there is no active replication connection from this peer, waits until +poll_query_until timeout. + +Requires that the 'postgres' db exists and is accessible. + +target_lsn may be any arbitrary lsn, but is typically $master_node->lsn('insert'). + +This is not a test. It die()s on failure. + +=cut + +sub wait_for_catchup +{ + my ($self, $standby_name, $mode, $target_lsn) = @_; + $mode = defined($mode) ? $mode : 'replay'; + my %valid_modes = ( 'sent' => 1, 'write' => 1, 'flush' => 1, 'replay' => 1 ); + die "unknown mode $mode for 'wait_for_catchup', valid modes are " . join(', ', keys(%valid_modes)) unless exists($valid_modes{$mode}); + # Allow passing of a PostgresNode instance as shorthand + if ( blessed( $standby_name ) && $standby_name->isa("PostgresNode") ) + { + $standby_name = $standby_name->name; + } + die 'target_lsn must be specified' unless defined($target_lsn); + print "Waiting for replication conn " . $standby_name . "'s " . $mode . "_location to pass " . $target_lsn . " on " . $self->name . "\n"; + my $query = qq[SELECT '$target_lsn' <= ${mode}_location FROM pg_catalog.pg_stat_replication WHERE application_name = '$standby_name';]; + $self->poll_query_until('postgres', $query) + or die "timed out waiting for catchup, current position is " . ($self->safe_psql('postgres', $query) || '(unknown)'); + print "done\n"; +} + +=pod $node->wait_for_slot_catchup(slot_name, mode, target_lsn) + +Wait for the named replication slot to equal or pass the supplied target_lsn. +The position used is the restart_lsn unless mode is given, in which case it may +be 'restart' or 'confirmed_flush'. + +Requires that the 'postgres' db exists and is accessible. + +This is not a test. It die()s on failure. + +If the slot is not active, will time out after poll_query_until's timeout. + +target_lsn may be any arbitrary lsn, but is typically $master_node->lsn('insert'). + +Note that for logical slots, restart_lsn is held down by the oldest in-progress tx. + +=cut + +sub wait_for_slot_catchup +{ + my ($self, $slot_name, $mode, $target_lsn) = @_; + $mode = defined($mode) ? $mode : 'restart'; + if (!($mode eq 'restart' || $mode eq 'confirmed_flush')) + { + die "valid modes are restart, confirmed_flush"; + } + die 'target lsn must be specified' unless defined($target_lsn); + print "Waiting for replication slot " . $slot_name . "'s " . $mode . "_lsn to pass " . $target_lsn . " on " . $self->name . "\n"; + my $query = qq[SELECT '$target_lsn' <= ${mode}_lsn FROM pg_catalog.pg_replication_slots WHERE slot_name = '$slot_name';]; + $self->poll_query_until('postgres', $query) + or die "timed out waiting for catchup, current position is " . ($self->safe_psql('postgres', $query) || '(unknown)'); + print "done\n"; +} + +=pod $node->query_hash($dbname, $query, @columns) + +Execute $query on $dbname, replacing any appearance of the string __COLUMNS__ +within the query with a comma-separated list of @columns. + +If __COLUMNS__ does not appear in the query, its result columns must EXACTLY +match the order and number (but not necessarily alias) of supplied @columns. + +The query must return zero or one rows. + +Return a hash-ref representation of the results of the query, with any empty +or null results as defined keys with an empty-string value. There is no way +to differentiate between null and empty-string result fields. + +If the query returns zero rows, return a hash with all columns empty. There +is no way to differentiate between zero rows returned and a row with only +null columns. + +=cut + +sub query_hash +{ + my ($self, $dbname, $query, @columns) = @_; + die 'calls in array context for multi-row results not supported yet' if (wantarray); + # Replace __COLUMNS__ if found + substr($query, index($query, '__COLUMNS__'), length('__COLUMNS__')) = join(', ', @columns) + if index($query, '__COLUMNS__') >= 0; + my $result = $self->safe_psql($dbname, $query); + # hash slice, see http://stackoverflow.com/a/16755894/398670 . + # + # Fills the hash with empty strings produced by x-operator element + # duplication if result is an empty row + # + my %val; + @val{@columns} = $result ne '' ? split(qr/\|/, $result) : ('',) x scalar(@columns); + return \%val; +} + +=pod $node->slot(slot_name) + +Return hash-ref of replication slot data for the named slot, or a hash-ref with +all values '' if not found. Does not differentiate between null and empty string +for fields, no field is ever undef. + +The restart_lsn and confirmed_flush_lsn fields are returned verbatim, and also +as a 2-list of [highword, lowword] integer. Since we rely on Perl 5.8.8 we can't +"use bigint", it's from 5.20, and we can't assume we have Math::Bigint from CPAN +either. + +=cut + +sub slot +{ + my ($self, $slot_name) = @_; + my @columns = ('plugin', 'slot_type', 'datoid', 'database', 'active', 'active_pid', 'xmin', 'catalog_xmin', 'restart_lsn'); + return $self->query_hash('postgres', "SELECT __COLUMNS__ FROM pg_catalog.pg_replication_slots WHERE slot_name = '$slot_name'", @columns); +} + =pod =back diff --git a/src/test/recovery/t/001_stream_rep.pl b/src/test/recovery/t/001_stream_rep.pl index 981c00b..ba1da8c 100644 --- a/src/test/recovery/t/001_stream_rep.pl +++ b/src/test/recovery/t/001_stream_rep.pl @@ -40,16 +40,8 @@ $node_master->safe_psql('postgres', "CREATE TABLE tab_int AS SELECT generate_series(1,1002) AS a"); # Wait for standbys to catch up -my $applname_1 = $node_standby_1->name; -my $applname_2 = $node_standby_2->name; -my $caughtup_query = -"SELECT pg_current_xlog_location() <= replay_location FROM pg_stat_replication WHERE application_name = '$applname_1';"; -$node_master->poll_query_until('postgres', $caughtup_query) - or die "Timed out while waiting for standby 1 to catch up"; -$caughtup_query = -"SELECT pg_last_xlog_replay_location() <= replay_location FROM pg_stat_replication WHERE application_name = '$applname_2';"; -$node_standby_1->poll_query_until('postgres', $caughtup_query) - or die "Timed out while waiting for standby 2 to catch up"; +$node_master->wait_for_catchup($node_standby_1, 'replay', $node_master->lsn('insert')); +$node_standby_1->wait_for_catchup($node_standby_2, 'replay', $node_standby_1->lsn('replay')); my $result = $node_standby_1->safe_psql('postgres', "SELECT count(*) FROM tab_int"); diff --git a/src/test/recovery/t/004_timeline_switch.pl b/src/test/recovery/t/004_timeline_switch.pl index 5f3b2fe..7c6587a 100644 --- a/src/test/recovery/t/004_timeline_switch.pl +++ b/src/test/recovery/t/004_timeline_switch.pl @@ -32,14 +32,9 @@ $node_standby_2->start; # Create some content on master $node_master->safe_psql('postgres', "CREATE TABLE tab_int AS SELECT generate_series(1,1000) AS a"); -my $until_lsn = - $node_master->safe_psql('postgres', "SELECT pg_current_xlog_location();"); # Wait until standby has replayed enough data on standby 1 -my $caughtup_query = - "SELECT '$until_lsn'::pg_lsn <= pg_last_xlog_replay_location()"; -$node_standby_1->poll_query_until('postgres', $caughtup_query) - or die "Timed out while waiting for standby to catch up"; +$node_master->wait_for_catchup($node_standby_1, 'replay', $node_master->lsn('write')); # Stop and remove master, and promote standby 1, switching it to a new timeline $node_master->teardown_node; @@ -50,7 +45,7 @@ rmtree($node_standby_2->data_dir . '/recovery.conf'); my $connstr_1 = $node_standby_1->connstr; $node_standby_2->append_conf( 'recovery.conf', qq( -primary_conninfo='$connstr_1' +primary_conninfo='$connstr_1 application_name=@{[$node_standby_2->name]}' standby_mode=on recovery_target_timeline='latest' )); @@ -60,12 +55,7 @@ $node_standby_2->restart; # to ensure that the timeline switch has been done. $node_standby_1->safe_psql('postgres', "INSERT INTO tab_int VALUES (generate_series(1001,2000))"); -$until_lsn = $node_standby_1->safe_psql('postgres', - "SELECT pg_current_xlog_location();"); -$caughtup_query = - "SELECT '$until_lsn'::pg_lsn <= pg_last_xlog_replay_location()"; -$node_standby_2->poll_query_until('postgres', $caughtup_query) - or die "Timed out while waiting for standby to catch up"; +$node_standby_1->wait_for_catchup($node_standby_2, 'replay', $node_standby_1->lsn('write')); my $result = $node_standby_2->safe_psql('postgres', "SELECT count(*) FROM tab_int"); -- 2.5.5
From 859591607f203bb986c8eaa6acf4b3841324f5a6 Mon Sep 17 00:00:00 2001 From: Craig Ringer <cr...@2ndquadrant.com> Date: Wed, 9 Nov 2016 13:44:04 +0800 Subject: [PATCH 2/2] Expand streaming replication tests to cover hot standby feedback and physical replication slots --- src/test/recovery/t/001_stream_rep.pl | 105 +++++++++++++++++++++++++++++++++- 1 file changed, 104 insertions(+), 1 deletion(-) diff --git a/src/test/recovery/t/001_stream_rep.pl b/src/test/recovery/t/001_stream_rep.pl index ba1da8c..eef512d 100644 --- a/src/test/recovery/t/001_stream_rep.pl +++ b/src/test/recovery/t/001_stream_rep.pl @@ -3,7 +3,7 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 4; +use Test::More tests => 22; # Initialize master node my $node_master = get_new_node('master'); @@ -58,3 +58,106 @@ is($node_standby_1->psql('postgres', 'INSERT INTO tab_int VALUES (1)'), 3, 'read-only queries on standby 1'); is($node_standby_2->psql('postgres', 'INSERT INTO tab_int VALUES (1)'), 3, 'read-only queries on standby 2'); + +diag "switching to physical replication slot"; +# Switch to using a physical replication slot. We can do this without a new +# backup since physical slots can go backwards if needed. Do so on both +# standbys. Since we're going to be testing things that affect the slot state, +# also increase the standby feedback interval to ensure timely updates. +my ($slotname_1, $slotname_2) = ('standby_1', 'standby_2'); +$node_master->append_conf('postgresql.conf', "max_replication_slots = 4\n"); +$node_master->restart; +is($node_master->psql('postgres', qq[SELECT pg_create_physical_replication_slot('$slotname_1');]), 0, 'physical slot created on master'); +$node_standby_1->append_conf('recovery.conf', "primary_slot_name = $slotname_1\n"); +$node_standby_1->append_conf('postgresql.conf', "wal_receiver_status_interval = 1\n"); +$node_standby_1->append_conf('postgresql.conf', "max_replication_slots = 4\n"); +$node_standby_1->restart; +is($node_standby_1->psql('postgres', qq[SELECT pg_create_physical_replication_slot('$slotname_2');]), 0, 'physical slot created on intermediate replica'); +$node_standby_2->append_conf('recovery.conf', "primary_slot_name = $slotname_2\n"); +$node_standby_2->append_conf('postgresql.conf', "wal_receiver_status_interval = 1\n"); +$node_standby_2->restart; + +sub get_slot_xmins +{ + my ($node, $slotname) = @_; + my $slotinfo = $node->slot($slotname); + return ($slotinfo->{'xmin'}, $slotinfo->{'catalog_xmin'}); +} + +# There's no hot standby feedback and there are no logical slots on either peer +# so xmin and catalog_xmin should be null on both slots. +my ($xmin, $catalog_xmin) = get_slot_xmins($node_master, $slotname_1); +is($xmin, '', 'non-cascaded slot xmin null with no hs_feedback'); +is($catalog_xmin, '', 'non-cascaded slot xmin null with no hs_feedback'); + +($xmin, $catalog_xmin) = get_slot_xmins($node_standby_1, $slotname_2); +is($xmin, '', 'cascaded slot xmin null with no hs_feedback'); +is($catalog_xmin, '', 'cascaded slot xmin null with no hs_feedback'); + +# Replication still works? +$node_master->safe_psql('postgres', 'CREATE TABLE replayed(val integer);'); + +sub replay_check +{ + my $newval = $node_master->safe_psql('postgres', 'INSERT INTO replayed(val) SELECT coalesce(max(val),0) + 1 AS newval FROM replayed RETURNING val'); + $node_master->wait_for_catchup($node_standby_1, 'replay', $node_master->lsn('insert')); + $node_standby_1->wait_for_catchup($node_standby_2, 'replay', $node_standby_1->lsn('replay')); + $node_standby_1->safe_psql('postgres', qq[SELECT 1 FROM replayed WHERE val = $newval]) + or die "standby_1 didn't replay master value $newval"; + $node_standby_2->safe_psql('postgres', qq[SELECT 1 FROM replayed WHERE val = $newval]) + or die "standby_2 didn't replay standby_1 value $newval"; +} + +replay_check(); + +diag "enabling hot_standby_feedback"; +# Enable hs_feedback. The slot should gain an xmin. We set the status interval +# so we'll see the results promptly. +$node_standby_1->safe_psql('postgres', 'ALTER SYSTEM SET hot_standby_feedback = on;'); +$node_standby_1->reload; +$node_standby_2->safe_psql('postgres', 'ALTER SYSTEM SET hot_standby_feedback = on;'); +$node_standby_2->reload; +replay_check(); +sleep(2); + +($xmin, $catalog_xmin) = get_slot_xmins($node_master, $slotname_1); +isnt($xmin, '', 'non-cascaded slot xmin non-null with hs feedback'); +is($catalog_xmin, '', 'non-cascaded slot xmin still null with hs_feedback'); + +($xmin, $catalog_xmin) = get_slot_xmins($node_standby_1, $slotname_2); +isnt($xmin, '', 'cascaded slot xmin non-null with hs feedback'); +is($catalog_xmin, '', 'cascaded slot xmin still null with hs_feedback'); + +diag "doing some work to advance xmin"; +for my $i (10000..11000) { + $node_master->safe_psql('postgres', qq[INSERT INTO tab_int VALUES ($i);]); +} +$node_master->safe_psql('postgres', 'VACUUM;'); +$node_master->safe_psql('postgres', 'CHECKPOINT;'); + +my ($xmin2, $catalog_xmin2) = get_slot_xmins($node_master, $slotname_1); +diag "new xmin $xmin2, old xmin $xmin"; +isnt($xmin2, $xmin, 'non-cascaded slot xmin with hs feedback has changed'); +is($catalog_xmin2, '', 'non-cascaded slot xmin still null with hs_feedback unchanged'); + +($xmin2, $catalog_xmin2) = get_slot_xmins($node_standby_1, $slotname_2); +diag "new xmin $xmin2, old xmin $xmin"; +isnt($xmin2, $xmin, 'cascaded slot xmin with hs feedback has changed'); +is($catalog_xmin2, '', 'cascaded slot xmin still null with hs_feedback unchanged'); + +diag "disabling hot_standby_feedback"; +# Disable hs_feedback. Xmin should be cleared. +$node_standby_1->safe_psql('postgres', 'ALTER SYSTEM SET hot_standby_feedback = off;'); +$node_standby_1->reload; +$node_standby_2->safe_psql('postgres', 'ALTER SYSTEM SET hot_standby_feedback = off;'); +$node_standby_2->reload; +replay_check(); +sleep(2); + +($xmin, $catalog_xmin) = get_slot_xmins($node_master, $slotname_1); +is($xmin, '', 'non-cascaded slot xmin null with hs feedback reset'); +is($catalog_xmin, '', 'non-cascaded slot xmin still null with hs_feedback reset'); + +($xmin, $catalog_xmin) = get_slot_xmins($node_standby_1, $slotname_2); +is($xmin, '', 'cascaded slot xmin null with hs feedback reset'); +is($catalog_xmin, '', 'cascaded slot xmin still null with hs_feedback reset'); -- 2.5.5
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers