On 4 January 2017 at 12:39, Craig Ringer <cr...@2ndquadrant.com> wrote:

> To keep things together, I've followed up on the logical decoding on
> standby thread that now incorporates all these patches.

Attached are the two patches discussed upthread, in their
proposed-for-commit form, as requested by Simon.

These correspond to patches 0001 and 0004 of the logical decoding on
standby series at
https://www.postgresql.org/message-id/CAMsr+YEzC=-+eV09A=ra150fjtkmtqt5q70piqbwytbor3c...@mail.gmail.com
.

This subset is tracked as https://commitfest.postgresql.org/12/883/ .

When committed I will update the decoding on standby series to omit
these pre-requisite patches.

-- 
 Craig Ringer                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services
From 79241c8052fbd1ecd079e98fd8564e4b2fcf797b Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Mon, 14 Nov 2016 12:27:17 +0800
Subject: [PATCH 1/2] PostgresNode methods to wait for node catchup

---
 src/test/perl/PostgresNode.pm              | 172 ++++++++++++++++++++++++++++-
 src/test/recovery/t/001_stream_rep.pl      |  12 +-
 src/test/recovery/t/004_timeline_switch.pl |  16 +--
 3 files changed, 175 insertions(+), 25 deletions(-)

diff --git a/src/test/perl/PostgresNode.pm b/src/test/perl/PostgresNode.pm
index c1b16ca..2f009d4 100644
--- a/src/test/perl/PostgresNode.pm
+++ b/src/test/perl/PostgresNode.pm
@@ -1121,7 +1121,6 @@ sub psql
 		my $exc_save = $@;
 		if ($exc_save)
 		{
-
 			# IPC::Run::run threw an exception. re-throw unless it's a
 			# timeout, which we'll handle by testing is_expired
 			die $exc_save
@@ -1173,7 +1172,7 @@ sub psql
 		  if $ret == 1;
 		die "connection error: '$$stderr'\nwhile running '@psql_params'"
 		  if $ret == 2;
-		die "error running SQL: '$$stderr'\nwhile running '@psql_params'"
+		die "error running SQL: '$$stderr'\nwhile running '@psql_params' with sql '$sql'"
 		  if $ret == 3;
 		die "psql returns $ret: '$$stderr'\nwhile running '@psql_params'";
 	}
@@ -1325,6 +1324,175 @@ sub run_log
 	TestLib::run_log(@_);
 }
 
+=pod $node->lsn(mode)
+
+Look up xlog positions on the server:
+
+* insert position (master only, error on replica)
+* write position (master only, error on replica)
+* flush position
+* receive position (always undef on master)
+* replay position
+
+mode must be specified.
+
+=cut
+
+sub lsn
+{
+	my ($self, $mode) = @_;
+	my %modes = ('insert' => 'pg_current_xlog_insert_location()',
+				 'flush' => 'pg_current_xlog_flush_location()',
+				 'write' => 'pg_current_xlog_location()',
+				 'receive' => 'pg_last_xlog_receive_location()',
+				 'replay' => 'pg_last_xlog_replay_location()');
+
+	$mode = '<undef>' if !defined($mode);
+	die "unknown mode for 'lsn': '$mode', valid modes are " . join(', ', keys %modes)
+		if !defined($modes{$mode});
+
+	my $result = $self->safe_psql('postgres', "SELECT $modes{$mode}");
+	chomp($result);
+	if ($result eq '')
+	{
+		return undef;
+	}
+	else
+	{
+		return $result;
+	}
+}
+
+=pod $node->wait_for_catchup(standby_name, mode, target_lsn)
+
+Wait for the node with application_name standby_name (usually from node->name)
+until its replication position in pg_stat_replication equals or passes the
+upstream's xlog insert point at the time this function is called. By default
+the replay_location is waited for, but 'mode' may be specified to wait for any
+of sent|write|flush|replay.
+
+If there is no active replication connection from this peer, waits until
+poll_query_until timeout.
+
+Requires that the 'postgres' db exists and is accessible.
+
+target_lsn may be any arbitrary lsn, but is typically $master_node->lsn('insert').
+
+This is not a test. It die()s on failure.
+
+=cut
+
+sub wait_for_catchup
+{
+	my ($self, $standby_name, $mode, $target_lsn) = @_;
+	$mode = defined($mode) ? $mode : 'replay';
+	my %valid_modes = ( 'sent' => 1, 'write' => 1, 'flush' => 1, 'replay' => 1 );
+	die "unknown mode $mode for 'wait_for_catchup', valid modes are " . join(', ', keys(%valid_modes)) unless exists($valid_modes{$mode});
+	# Allow passing of a PostgresNode instance as shorthand
+	if ( blessed( $standby_name ) && $standby_name->isa("PostgresNode") )
+	{
+		$standby_name = $standby_name->name;
+	}
+	die 'target_lsn must be specified' unless defined($target_lsn);
+	print "Waiting for replication conn " . $standby_name . "'s " . $mode . "_location to pass " . $target_lsn . " on " . $self->name . "\n";
+	my $query = qq[SELECT '$target_lsn' <= ${mode}_location FROM pg_catalog.pg_stat_replication WHERE application_name = '$standby_name';];
+	$self->poll_query_until('postgres', $query)
+		or die "timed out waiting for catchup, current position is " . ($self->safe_psql('postgres', $query) || '(unknown)');
+	print "done\n";
+}
+
+=pod $node->wait_for_slot_catchup(slot_name, mode, target_lsn)
+
+Wait for the named replication slot to equal or pass the supplied target_lsn.
+The position used is the restart_lsn unless mode is given, in which case it may
+be 'restart' or 'confirmed_flush'.
+
+Requires that the 'postgres' db exists and is accessible.
+
+This is not a test. It die()s on failure.
+
+If the slot is not active, will time out after poll_query_until's timeout.
+
+target_lsn may be any arbitrary lsn, but is typically $master_node->lsn('insert').
+
+Note that for logical slots, restart_lsn is held down by the oldest in-progress tx.
+
+=cut
+
+sub wait_for_slot_catchup
+{
+	my ($self, $slot_name, $mode, $target_lsn) = @_;
+	$mode = defined($mode) ? $mode : 'restart';
+	if (!($mode eq 'restart' || $mode eq 'confirmed_flush'))
+	{
+		die "valid modes are restart, confirmed_flush";
+	}
+	die 'target lsn must be specified' unless defined($target_lsn);
+	print "Waiting for replication slot " . $slot_name . "'s " . $mode . "_lsn to pass " . $target_lsn . " on " . $self->name . "\n";
+	my $query = qq[SELECT '$target_lsn' <= ${mode}_lsn FROM pg_catalog.pg_replication_slots WHERE slot_name = '$slot_name';];
+	$self->poll_query_until('postgres', $query)
+		or die "timed out waiting for catchup, current position is " . ($self->safe_psql('postgres', $query) || '(unknown)');
+	print "done\n";
+}
+
+=pod $node->query_hash($dbname, $query, @columns)
+
+Execute $query on $dbname, replacing any appearance of the string __COLUMNS__
+within the query with a comma-separated list of @columns.
+
+If __COLUMNS__ does not appear in the query, its result columns must EXACTLY
+match the order and number (but not necessarily alias) of supplied @columns.
+
+The query must return zero or one rows.
+
+Return a hash-ref representation of the results of the query, with any empty
+or null results as defined keys with an empty-string value. There is no way
+to differentiate between null and empty-string result fields.
+
+If the query returns zero rows, return a hash with all columns empty. There
+is no way to differentiate between zero rows returned and a row with only
+null columns.
+
+=cut
+
+sub query_hash
+{
+	my ($self, $dbname, $query, @columns) = @_;
+	die 'calls in array context for multi-row results not supported yet' if (wantarray);
+	# Replace __COLUMNS__ if found
+	substr($query, index($query, '__COLUMNS__'), length('__COLUMNS__')) = join(', ', @columns)
+		if index($query, '__COLUMNS__') >= 0;
+	my $result = $self->safe_psql($dbname, $query);
+	# hash slice, see http://stackoverflow.com/a/16755894/398670 .
+	#
+	# Fills the hash with empty strings produced by x-operator element
+	# duplication if result is an empty row
+	#
+	my %val;
+	@val{@columns} = $result ne '' ? split(qr/\|/, $result) : ('',) x scalar(@columns);
+	return \%val;
+}
+
+=pod $node->slot(slot_name)
+
+Return hash-ref of replication slot data for the named slot, or a hash-ref with
+all values '' if not found. Does not differentiate between null and empty string
+for fields, no field is ever undef.
+
+The restart_lsn and confirmed_flush_lsn fields are returned verbatim, and also
+as a 2-list of [highword, lowword] integer. Since we rely on Perl 5.8.8 we can't
+"use bigint", it's from 5.20, and we can't assume we have Math::Bigint from CPAN
+either.
+
+=cut
+
+sub slot
+{
+	my ($self, $slot_name) = @_;
+	my @columns = ('plugin', 'slot_type', 'datoid', 'database', 'active', 'active_pid', 'xmin', 'catalog_xmin', 'restart_lsn');
+	return $self->query_hash('postgres', "SELECT __COLUMNS__ FROM pg_catalog.pg_replication_slots WHERE slot_name = '$slot_name'", @columns);
+}
+
 =pod
 
 =back
diff --git a/src/test/recovery/t/001_stream_rep.pl b/src/test/recovery/t/001_stream_rep.pl
index 981c00b..ba1da8c 100644
--- a/src/test/recovery/t/001_stream_rep.pl
+++ b/src/test/recovery/t/001_stream_rep.pl
@@ -40,16 +40,8 @@ $node_master->safe_psql('postgres',
 	"CREATE TABLE tab_int AS SELECT generate_series(1,1002) AS a");
 
 # Wait for standbys to catch up
-my $applname_1 = $node_standby_1->name;
-my $applname_2 = $node_standby_2->name;
-my $caughtup_query =
-"SELECT pg_current_xlog_location() <= replay_location FROM pg_stat_replication WHERE application_name = '$applname_1';";
-$node_master->poll_query_until('postgres', $caughtup_query)
-  or die "Timed out while waiting for standby 1 to catch up";
-$caughtup_query =
-"SELECT pg_last_xlog_replay_location() <= replay_location FROM pg_stat_replication WHERE application_name = '$applname_2';";
-$node_standby_1->poll_query_until('postgres', $caughtup_query)
-  or die "Timed out while waiting for standby 2 to catch up";
+$node_master->wait_for_catchup($node_standby_1, 'replay', $node_master->lsn('insert'));
+$node_standby_1->wait_for_catchup($node_standby_2, 'replay', $node_standby_1->lsn('replay'));
 
 my $result =
   $node_standby_1->safe_psql('postgres', "SELECT count(*) FROM tab_int");
diff --git a/src/test/recovery/t/004_timeline_switch.pl b/src/test/recovery/t/004_timeline_switch.pl
index 5f3b2fe..7c6587a 100644
--- a/src/test/recovery/t/004_timeline_switch.pl
+++ b/src/test/recovery/t/004_timeline_switch.pl
@@ -32,14 +32,9 @@ $node_standby_2->start;
 # Create some content on master
 $node_master->safe_psql('postgres',
 	"CREATE TABLE tab_int AS SELECT generate_series(1,1000) AS a");
-my $until_lsn =
-  $node_master->safe_psql('postgres', "SELECT pg_current_xlog_location();");
 
 # Wait until standby has replayed enough data on standby 1
-my $caughtup_query =
-  "SELECT '$until_lsn'::pg_lsn <= pg_last_xlog_replay_location()";
-$node_standby_1->poll_query_until('postgres', $caughtup_query)
-  or die "Timed out while waiting for standby to catch up";
+$node_master->wait_for_catchup($node_standby_1, 'replay', $node_master->lsn('write'));
 
 # Stop and remove master, and promote standby 1, switching it to a new timeline
 $node_master->teardown_node;
@@ -50,7 +45,7 @@ rmtree($node_standby_2->data_dir . '/recovery.conf');
 my $connstr_1 = $node_standby_1->connstr;
 $node_standby_2->append_conf(
 	'recovery.conf', qq(
-primary_conninfo='$connstr_1'
+primary_conninfo='$connstr_1 application_name=@{[$node_standby_2->name]}'
 standby_mode=on
 recovery_target_timeline='latest'
 ));
@@ -60,12 +55,7 @@ $node_standby_2->restart;
 # to ensure that the timeline switch has been done.
 $node_standby_1->safe_psql('postgres',
 	"INSERT INTO tab_int VALUES (generate_series(1001,2000))");
-$until_lsn = $node_standby_1->safe_psql('postgres',
-	"SELECT pg_current_xlog_location();");
-$caughtup_query =
-  "SELECT '$until_lsn'::pg_lsn <= pg_last_xlog_replay_location()";
-$node_standby_2->poll_query_until('postgres', $caughtup_query)
-  or die "Timed out while waiting for standby to catch up";
+$node_standby_1->wait_for_catchup($node_standby_2, 'replay', $node_standby_1->lsn('write'));
 
 my $result =
   $node_standby_2->safe_psql('postgres', "SELECT count(*) FROM tab_int");
-- 
2.5.5

From 859591607f203bb986c8eaa6acf4b3841324f5a6 Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Wed, 9 Nov 2016 13:44:04 +0800
Subject: [PATCH 2/2] Expand streaming replication tests to cover hot standby
 feedback and physical replication slots

---
 src/test/recovery/t/001_stream_rep.pl | 105 +++++++++++++++++++++++++++++++++-
 1 file changed, 104 insertions(+), 1 deletion(-)

diff --git a/src/test/recovery/t/001_stream_rep.pl b/src/test/recovery/t/001_stream_rep.pl
index ba1da8c..eef512d 100644
--- a/src/test/recovery/t/001_stream_rep.pl
+++ b/src/test/recovery/t/001_stream_rep.pl
@@ -3,7 +3,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 4;
+use Test::More tests => 22;
 
 # Initialize master node
 my $node_master = get_new_node('master');
@@ -58,3 +58,106 @@ is($node_standby_1->psql('postgres', 'INSERT INTO tab_int VALUES (1)'),
 	3, 'read-only queries on standby 1');
 is($node_standby_2->psql('postgres', 'INSERT INTO tab_int VALUES (1)'),
 	3, 'read-only queries on standby 2');
+
+diag "switching to physical replication slot";
+# Switch to using a physical replication slot. We can do this without a new
+# backup since physical slots can go backwards if needed. Do so on both
+# standbys. Since we're going to be testing things that affect the slot state,
+# also increase the standby feedback interval to ensure timely updates.
+my ($slotname_1, $slotname_2) = ('standby_1', 'standby_2');
+$node_master->append_conf('postgresql.conf', "max_replication_slots = 4\n");
+$node_master->restart;
+is($node_master->psql('postgres', qq[SELECT pg_create_physical_replication_slot('$slotname_1');]), 0, 'physical slot created on master');
+$node_standby_1->append_conf('recovery.conf', "primary_slot_name = $slotname_1\n");
+$node_standby_1->append_conf('postgresql.conf', "wal_receiver_status_interval = 1\n");
+$node_standby_1->append_conf('postgresql.conf', "max_replication_slots = 4\n");
+$node_standby_1->restart;
+is($node_standby_1->psql('postgres', qq[SELECT pg_create_physical_replication_slot('$slotname_2');]), 0, 'physical slot created on intermediate replica');
+$node_standby_2->append_conf('recovery.conf', "primary_slot_name = $slotname_2\n");
+$node_standby_2->append_conf('postgresql.conf', "wal_receiver_status_interval = 1\n");
+$node_standby_2->restart;
+
+sub get_slot_xmins
+{
+	my ($node, $slotname) = @_;
+	my $slotinfo = $node->slot($slotname);
+	return ($slotinfo->{'xmin'}, $slotinfo->{'catalog_xmin'});
+}
+
+# There's no hot standby feedback and there are no logical slots on either peer
+# so xmin and catalog_xmin should be null on both slots.
+my ($xmin, $catalog_xmin) = get_slot_xmins($node_master, $slotname_1);
+is($xmin, '', 'non-cascaded slot xmin null with no hs_feedback');
+is($catalog_xmin, '', 'non-cascaded slot xmin null with no hs_feedback');
+
+($xmin, $catalog_xmin) = get_slot_xmins($node_standby_1, $slotname_2);
+is($xmin, '', 'cascaded slot xmin null with no hs_feedback');
+is($catalog_xmin, '', 'cascaded slot xmin null with no hs_feedback');
+
+# Replication still works?
+$node_master->safe_psql('postgres', 'CREATE TABLE replayed(val integer);');
+
+sub replay_check
+{
+	my $newval = $node_master->safe_psql('postgres', 'INSERT INTO replayed(val) SELECT coalesce(max(val),0) + 1 AS newval FROM replayed RETURNING val');
+	$node_master->wait_for_catchup($node_standby_1, 'replay', $node_master->lsn('insert'));
+	$node_standby_1->wait_for_catchup($node_standby_2, 'replay', $node_standby_1->lsn('replay'));
+	$node_standby_1->safe_psql('postgres', qq[SELECT 1 FROM replayed WHERE val = $newval])
+		or die "standby_1 didn't replay master value $newval";
+	$node_standby_2->safe_psql('postgres', qq[SELECT 1 FROM replayed WHERE val = $newval])
+		or die "standby_2 didn't replay standby_1 value $newval";
+}
+
+replay_check();
+
+diag "enabling hot_standby_feedback";
+# Enable hs_feedback. The slot should gain an xmin. We set the status interval
+# so we'll see the results promptly.
+$node_standby_1->safe_psql('postgres', 'ALTER SYSTEM SET hot_standby_feedback = on;');
+$node_standby_1->reload;
+$node_standby_2->safe_psql('postgres', 'ALTER SYSTEM SET hot_standby_feedback = on;');
+$node_standby_2->reload;
+replay_check();
+sleep(2);
+
+($xmin, $catalog_xmin) = get_slot_xmins($node_master, $slotname_1);
+isnt($xmin, '', 'non-cascaded slot xmin non-null with hs feedback');
+is($catalog_xmin, '', 'non-cascaded slot xmin still null with hs_feedback');
+
+($xmin, $catalog_xmin) = get_slot_xmins($node_standby_1, $slotname_2);
+isnt($xmin, '', 'cascaded slot xmin non-null with hs feedback');
+is($catalog_xmin, '', 'cascaded slot xmin still null with hs_feedback');
+
+diag "doing some work to advance xmin";
+for my $i (10000..11000) {
+	$node_master->safe_psql('postgres', qq[INSERT INTO tab_int VALUES ($i);]);
+}
+$node_master->safe_psql('postgres', 'VACUUM;');
+$node_master->safe_psql('postgres', 'CHECKPOINT;');
+
+my ($xmin2, $catalog_xmin2) = get_slot_xmins($node_master, $slotname_1);
+diag "new xmin $xmin2, old xmin $xmin";
+isnt($xmin2, $xmin, 'non-cascaded slot xmin with hs feedback has changed');
+is($catalog_xmin2, '', 'non-cascaded slot xmin still null with hs_feedback unchanged');
+
+($xmin2, $catalog_xmin2) = get_slot_xmins($node_standby_1, $slotname_2);
+diag "new xmin $xmin2, old xmin $xmin";
+isnt($xmin2, $xmin, 'cascaded slot xmin with hs feedback has changed');
+is($catalog_xmin2, '', 'cascaded slot xmin still null with hs_feedback unchanged');
+
+diag "disabling hot_standby_feedback";
+# Disable hs_feedback. Xmin should be cleared.
+$node_standby_1->safe_psql('postgres', 'ALTER SYSTEM SET hot_standby_feedback = off;');
+$node_standby_1->reload;
+$node_standby_2->safe_psql('postgres', 'ALTER SYSTEM SET hot_standby_feedback = off;');
+$node_standby_2->reload;
+replay_check();
+sleep(2);
+
+($xmin, $catalog_xmin) = get_slot_xmins($node_master, $slotname_1);
+is($xmin, '', 'non-cascaded slot xmin null with hs feedback reset');
+is($catalog_xmin, '', 'non-cascaded slot xmin still null with hs_feedback reset');
+
+($xmin, $catalog_xmin) = get_slot_xmins($node_standby_1, $slotname_2);
+is($xmin, '', 'cascaded slot xmin null with hs feedback reset');
+is($catalog_xmin, '', 'cascaded slot xmin still null with hs_feedback reset');
-- 
2.5.5

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to