On Thu, Jul 05, 2018 at 05:13:27PM +0900, Michael Paquier wrote:
> This concerns as well v10, so that's not actually an open item...
> Well, it was an open item last year.  The last set of patches is from
> Simon here:
> https://www.postgresql.org/message-id/CANP8%2BjLwgsexwdPkBtkN5kdHN5TwV-d%3Di311Tq_FdOmzJ8QyRQ%40mail.gmail.com

logical_repl_caught_up_v4alt2.patch is actually incorrect after I tested
the thing, and that logical_repl_caught_up_v4.patch gets the correct
call.

> Simon, do you feel confident with your patch?  If yes, could you finish
> wrapping it?  I am getting myself familiar with the problem as this has
> been around for some time now so I am reviewing the thing as well and
> then I can board the ship..

Okay, I have spent some time today looking at this patch, and the error
is very easy to reproduce once you do that in the TAP tests:
1) Stop and start once the publisher in one of the tests of
src/test/subscription.
2) Enforce wait_for_catchup() to check for state = 'streaming'.
And then you would see the tests waiting until timeout is reached and
then die.

I would be inclined to add those tests in the final patch, the
disadvantage being that 1) makes one of the test scripts a bit longer,
but it can reproduce the failures most of the time.  Having 2) is
actually nice for physical replication as the tests in
src/test/recovery/ use wait_for_catchup() in various ways.

Some other notes about the patch:
- I switched the error message in WalSndLoop as mentioned upthread for
nodes catching up, aka no more "primary" but "upstream server".
- Added a note about using only GetFlushRecPtr in XLogSendLogical as
logical decoding cannot be used on standby nodes.  If one day logical
decoding gets supports on standby then this would need an update as
well.

Does this look fine to all the folks involved in this thread?  It is
Friday afternoon here so my brain is getting fried, but I can finish
wrapping up this patch at the beginning of next week if there are no
objections.  At quick glance this indeed would need a backpatch down to
9.4 but I have not spent time testing those configurations yet.
--
Michael
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index e47ddca6bc..3a0106bc93 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2169,7 +2169,7 @@ WalSndLoop(WalSndSendDataCallback send_data)
 			if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
 			{
 				ereport(DEBUG1,
-						(errmsg("standby \"%s\" has now caught up with primary",
+						(errmsg("\"%s\" has now caught up with upstream server",
 								application_name)));
 				WalSndSetState(WALSNDSTATE_STREAMING);
 			}
@@ -2758,10 +2758,10 @@ XLogSendLogical(void)
 	char	   *errm;
 
 	/*
-	 * Don't know whether we've caught up yet. We'll set it to true in
-	 * WalSndWaitForWal, if we're actually waiting. We also set to true if
-	 * XLogReadRecord() had to stop reading but WalSndWaitForWal didn't wait -
-	 * i.e. when we're shutting down.
+	 * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
+	 * true in WalSndWaitForWal, if we're actually waiting. We also set to
+	 * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
+	 * didn't wait - i.e. when we're shutting down.
 	 */
 	WalSndCaughtUp = false;
 
@@ -2774,6 +2774,9 @@ XLogSendLogical(void)
 
 	if (record != NULL)
 	{
+		/* XXX: Note that logical decoding cannot be used while in recovery */
+		XLogRecPtr	flushPtr = GetFlushRecPtr();
+
 		/*
 		 * Note the lack of any call to LagTrackerWrite() which is handled by
 		 * WalSndUpdateProgress which is called by output plugin through
@@ -2782,6 +2785,13 @@ XLogSendLogical(void)
 		LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);
 
 		sentPtr = logical_decoding_ctx->reader->EndRecPtr;
+
+		/*
+		 * If we have sent a record that is at or beyond the flushed point, we
+		 * have caught up.
+		 */
+		if (sentPtr >= flushPtr)
+			WalSndCaughtUp = true;
 	}
 	else
 	{
diff --git a/src/test/perl/PostgresNode.pm b/src/test/perl/PostgresNode.pm
index a08af65695..79fb457075 100644
--- a/src/test/perl/PostgresNode.pm
+++ b/src/test/perl/PostgresNode.pm
@@ -1535,7 +1535,8 @@ also works for logical subscriptions)
 until its replication location in pg_stat_replication equals or passes the
 upstream's WAL insert point at the time this function is called. By default
 the replay_lsn is waited for, but 'mode' may be specified to wait for any of
-sent|write|flush|replay.
+sent|write|flush|replay. The connection catching up must be in a streaming
+state.
 
 If there is no active replication connection from this peer, waits until
 poll_query_until timeout.
@@ -1580,7 +1581,7 @@ sub wait_for_catchup
 	  . $lsn_expr . " on "
 	  . $self->name . "\n";
 	my $query =
-	  qq[SELECT $lsn_expr <= ${mode}_lsn FROM pg_catalog.pg_stat_replication WHERE application_name = '$standby_name';];
+	  qq[SELECT $lsn_expr <= ${mode}_lsn AND state = 'streaming' FROM pg_catalog.pg_stat_replication WHERE application_name = '$standby_name';];
 	$self->poll_query_until('postgres', $query)
 	  or croak "timed out waiting for catchup";
 	print "done\n";
diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl
index 503556fd6c..d94458e00e 100644
--- a/src/test/subscription/t/001_rep_changes.pl
+++ b/src/test/subscription/t/001_rep_changes.pl
@@ -188,6 +188,11 @@ $node_publisher->safe_psql('postgres',
 	"INSERT INTO tab_ins SELECT generate_series(1001,1100)");
 $node_publisher->safe_psql('postgres', "DELETE FROM tab_rep");
 
+# Restart the publisher and check the state of the subscriber which
+# should be in a streaming state after catching up.
+$node_publisher->stop('fast');
+$node_publisher->start;
+
 $node_publisher->wait_for_catchup($appname);
 
 $result = $node_subscriber->safe_psql('postgres',

Attachment: signature.asc
Description: PGP signature

Reply via email to