From 8d55de7130d8b0b9587d1b1dc4adc4d1d6010c8c Mon Sep 17 00:00:00 2001
From: Amit Kapila <akapila@postgresql.org>
Date: Mon, 15 Feb 2021 16:50:47 +0530
Subject: [PATCH] Remove the unnecessary PrepareWrite in pgoutput.

This issue exists from the inception of this code (PG-10) but got exposed
by the recent commit ce0fdbfe97 where we are using origins in tablesync
workers. The problem was that we were sometimes sending the prepare_write
('w') message but then the actual message was not being sent and on the
subscriber side, we always expect a message after prepare_write message
which led to this bug.

I refrained from backpatching this because there is no way in the core
code to hit this prior to commit ce0fdbfe97 and we haven't received any
complaints so far.

Reported-by: Erik Rijkers
Author: Amit Kapila and Vignesh C
Tested-by: Erik Rijkers
Discussion: https://postgr.es/m/1295168140.139428.1613133237154@webmailclassic.xs4all.nl
---
 src/backend/replication/pgoutput/pgoutput.c | 19 ++++----
 src/test/subscription/t/100_bugs.pl         | 69 +++++++++++++++++++++++++++++
 2 files changed, 80 insertions(+), 8 deletions(-)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 79765f9..1b993fb 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -342,10 +342,6 @@ pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 	{
 		char	   *origin;
 
-		/* Message boundary */
-		OutputPluginWrite(ctx, false);
-		OutputPluginPrepareWrite(ctx, true);
-
 		/*----------
 		 * XXX: which behaviour do we want here?
 		 *
@@ -357,7 +353,13 @@ pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 		 *----------
 		 */
 		if (replorigin_by_oid(txn->origin_id, true, &origin))
+		{
+			/* Message boundary */
+			OutputPluginWrite(ctx, false);
+			OutputPluginPrepareWrite(ctx, true);
 			logicalrep_write_origin(ctx->out, origin, txn->origin_lsn);
+		}
+
 	}
 
 	OutputPluginWrite(ctx, true);
@@ -780,12 +782,13 @@ pgoutput_stream_start(struct LogicalDecodingContext *ctx,
 	{
 		char	   *origin;
 
-		/* Message boundary */
-		OutputPluginWrite(ctx, false);
-		OutputPluginPrepareWrite(ctx, true);
-
 		if (replorigin_by_oid(txn->origin_id, true, &origin))
+		{
+			/* Message boundary */
+			OutputPluginWrite(ctx, false);
+			OutputPluginPrepareWrite(ctx, true);
 			logicalrep_write_origin(ctx->out, origin, InvalidXLogRecPtr);
+		}
 	}
 
 	OutputPluginWrite(ctx, true);
diff --git a/src/test/subscription/t/100_bugs.pl b/src/test/subscription/t/100_bugs.pl
index d1e407a..b8f46f0 100644
--- a/src/test/subscription/t/100_bugs.pl
+++ b/src/test/subscription/t/100_bugs.pl
@@ -153,3 +153,72 @@ is($node_twoways->safe_psql('d2', "SELECT count(f) FROM t"),
 	$rows * 2, "2x$rows rows in t");
 is($node_twoways->safe_psql('d2', "SELECT count(f) FROM t2"),
 	$rows * 2, "2x$rows rows in t2");
+
+# Verify table data is synced with cascaded replication setup. This is mainly
+# to test whether the data written by tablesync worker gets replicated.
+my $node_pub = get_new_node('testpublisher1');
+$node_pub->init(allows_streaming => 'logical');
+$node_pub->start;
+
+my $node_pub_sub = get_new_node('testpublisher_subscriber');
+$node_pub_sub->init(allows_streaming => 'logical');
+$node_pub_sub->start;
+
+my $node_sub = get_new_node('testsubscriber1');
+$node_sub->init(allows_streaming => 'logical');
+$node_sub->start;
+
+# Create the tables in all nodes.
+$node_pub->safe_psql('postgres', "CREATE TABLE tab1 (a int)");
+$node_pub_sub->safe_psql('postgres', "CREATE TABLE tab1 (a int)");
+$node_sub->safe_psql('postgres', "CREATE TABLE tab1 (a int)");
+
+# Create a cascaded replication setup like:
+# N1 - Create publication testpub1.
+# N2 - Create publication testpub2 and also include subscriber which subscribes
+#      to testpub1.
+# N3 - Create subscription testsub2 subscribes to testpub2.
+#
+# Note that subscription on N3 needs to be created before subscription on N2 to
+# test whether the data written by tablesync worker of N2 gets replicated.
+$node_pub->safe_psql('postgres',
+	"CREATE PUBLICATION testpub1 FOR TABLE tab1");
+
+$node_pub_sub->safe_psql('postgres',
+	"CREATE PUBLICATION testpub2 FOR TABLE tab1");
+
+my $publisher1_connstr = $node_pub->connstr . ' dbname=postgres';
+my $publisher2_connstr = $node_pub_sub->connstr . ' dbname=postgres';
+
+$node_sub->safe_psql('postgres',
+	"CREATE SUBSCRIPTION testsub2 CONNECTION '$publisher2_connstr' PUBLICATION testpub2"
+);
+
+$node_pub_sub->safe_psql('postgres',
+	"CREATE SUBSCRIPTION testsub1 CONNECTION '$publisher1_connstr' PUBLICATION testpub1"
+);
+
+$node_pub->safe_psql('postgres',
+	"INSERT INTO tab1 values(generate_series(1,10))");
+
+# Verify that the data is cascaded from testpub1 to testsub1 and further from
+# testpub2 (which had testsub1) to testsub2.
+$node_pub->wait_for_catchup('testsub1');
+$node_pub_sub->wait_for_catchup('testsub2');
+
+# Drop subscriptions as we don't need them anymore
+$node_pub_sub->safe_psql('postgres', "DROP SUBSCRIPTION testsub1");
+$node_sub->safe_psql('postgres', "DROP SUBSCRIPTION testsub2");
+
+# Drop publications as we don't need them anymore
+$node_pub->safe_psql('postgres', "DROP PUBLICATION testpub1");
+$node_pub_sub->safe_psql('postgres', "DROP PUBLICATION testpub2");
+
+# Clean up the tables on both publisher and subscriber as we don't need them
+$node_pub->safe_psql('postgres', "DROP TABLE tab1");
+$node_pub_sub->safe_psql('postgres', "DROP TABLE tab1");
+$node_sub->safe_psql('postgres', "DROP TABLE tab1");
+
+$node_pub->stop('fast');
+$node_pub_sub->stop('fast');
+$node_sub->stop('fast');
-- 
1.8.3.1

