From e63c14572a34efa4639e038a3c50e930f710d06c Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Wed, 17 Apr 2024 06:18:23 +0000
Subject: [PATCH v18 2/3] Alter slot option two_phase only when altering "true"
 to "false"

Since the two_phase option is controlled by both the publisher (as a slot option)
and the subscriber (as a subscription option), the slot option must also be
modified.

Regarding the false->true case, the backend process alters the subtwophase to
LOGICALREP_TWOPHASE_STATE_PENDING once. After the subscription is enabled, a new
logical replication worker requests to change the two_phase option of its slot
from pending to true after the initial data synchronization is done. The code
path is the same as the case in which two_phase is initially set to true, so
there is no need to do something remarkable. However, for the true->false case,
the backend must connect to the publisher and expressly change the parameter
because the apply worker does not alter the option to false. Because this
operation cannot be rolled back, altering the two_phase parameter from "true"
to "false" within a transaction is prohibited.
---
 doc/src/sgml/ref/alter_subscription.sgml      |  2 +-
 src/backend/commands/subscriptioncmds.c       | 80 ++++++++++++++-----
 .../libpqwalreceiver/libpqwalreceiver.c       | 23 ++++--
 src/include/replication/walreceiver.h         |  5 +-
 src/test/subscription/t/021_twophase.pl       | 41 ++++++----
 5 files changed, 105 insertions(+), 46 deletions(-)

diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index 0b23df1b77..df44415661 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -70,7 +70,7 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
    <command>ALTER SUBSCRIPTION ... {SET|ADD|DROP} PUBLICATION ...</command>
    with <literal>refresh</literal> option as <literal>true</literal>,
    <command>ALTER SUBSCRIPTION ... SET (failover = true|false)</command> and
-   <command>ALTER SUBSCRIPTION ... SET (two_phase = true|false)</command>
+   <command>ALTER SUBSCRIPTION ... SET (two_phase = false)</command>
    cannot be executed inside a transaction block.
 
    These commands also cannot be executed when the subscription has
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index d7e2b141b3..f09a6bb290 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -111,7 +111,7 @@ static void check_duplicates_in_publist(List *publist, Datum *datums);
 static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
 static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
 static void CheckAlterSubOption(Subscription *sub, const char *option,
-								bool isTopLevel);
+								bool needs_update, bool isTopLevel);
 
 
 /*
@@ -1074,11 +1074,9 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
  * Common checks for altering failover and two_phase option
  */
 static void
-CheckAlterSubOption(Subscription *sub, const char *option, bool isTopLevel)
+CheckAlterSubOption(Subscription *sub, const char *option, bool needs_update, bool isTopLevel)
 {
-	StringInfoData cmd;
-
-	if (!sub->slotname)
+	if (needs_update && !sub->slotname)
 		ereport(ERROR,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("cannot set %s for a subscription that does not have a slot name",
@@ -1096,16 +1094,20 @@ CheckAlterSubOption(Subscription *sub, const char *option, bool isTopLevel)
 				 errmsg("cannot set %s for enabled subscription",
 						option)));
 
-	initStringInfo(&cmd);
-	appendStringInfo(&cmd, "ALTER SUBSCRIPTION ... SET (%s)", option);
-
 	/*
 	 * The changed option of the slot can't be rolled back: prevent we are in
 	 * the transaction state.
 	 */
-	PreventInTransactionBlock(isTopLevel, cmd.data);
+	if (needs_update)
+	{
+		StringInfoData cmd;
 
-	pfree(cmd.data);
+		initStringInfo(&cmd);
+		appendStringInfo(&cmd, "ALTER SUBSCRIPTION ... SET (%s)", option);
+
+		PreventInTransactionBlock(isTopLevel, cmd.data);
+		pfree(cmd.data);
+	}
 }
 
 /*
@@ -1127,6 +1129,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 	Form_pg_subscription form;
 	bits32		supported_opts;
 	SubOpts		opts = {0};
+	bool		update_failover = false;
+	bool		update_two_phase = false;
 
 	rel = table_open(SubscriptionRelationId, RowExclusiveLock);
 
@@ -1259,7 +1263,16 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 				if (IsSet(opts.specified_opts, SUBOPT_FAILOVER))
 				{
-					CheckAlterSubOption(sub, "failover", isTopLevel);
+					/*
+					 * First mark the needs to alter the replication slot.
+					 * Failover option is controlled by both the publisher (as
+					 * a slot option) and the subscriber (as a subscription
+					 * option).
+					 */
+					update_failover = true;
+
+					CheckAlterSubOption(sub, "failover", update_failover,
+										isTopLevel);
 
 					values[Anum_pg_subscription_subfailover - 1] =
 						BoolGetDatum(opts.failover);
@@ -1268,15 +1281,37 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 				if (IsSet(opts.specified_opts, SUBOPT_TWOPHASE_COMMIT))
 				{
-					CheckAlterSubOption(sub, "two_phase", isTopLevel);
+					/*
+					 * First check the need to alter the replication slot.
+					 * Two_phase option is controlled by both the publisher
+					 * (as a slot option) and the subscriber (as a
+					 * subscription option). The slot option must be altered
+					 * only when changing "true" to "false".
+					 *
+					 * There is no need to do something remarkable regarding
+					 * the "false" to "true" case; the backend process alters
+					 * subtwophase to LOGICALREP_TWOPHASE_STATE_PENDING once.
+					 * After the subscription is enabled, a new logical
+					 * replication worker requests to change the two_phase
+					 * option of its slot from pending to true when the
+					 * initial data synchronization is done. The code path is
+					 * the same as the case in which two_phase is initially
+					 * set to true.
+					 */
+					update_two_phase = !opts.twophase;
+
+					CheckAlterSubOption(sub, "two_phase", update_two_phase,
+										isTopLevel);
 
 					/*
-					 * slot_name and two_phase cannot be altered
-					 * simultaneously. The latter part refers to the pre-set
-					 * slot name and tries to modify the slot option, so
-					 * changing both does not make sense.
+					 * If the wo_phase slot option must be altered, this
+					 * cannot be altered with slot_name simultaneously. The
+					 * latter part refers to the pre-set slot name and tries
+					 * to modify the slot option, so changing both does not
+					 * make sense.
 					 */
-					if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
+					if (update_two_phase &&
+						IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
 						ereport(ERROR,
 								(errcode(ERRCODE_SYNTAX_ERROR),
 								 errmsg("slot_name and two_phase cannot be altered at the same time")));
@@ -1298,7 +1333,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					 * two_phase cannot be disabled if there are any
 					 * uncommitted prepared transactions present.
 					 */
-					if (!opts.twophase &&
+					if (update_two_phase &&
 						sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED &&
 						LookupGXactBySubid(subid))
 						ereport(ERROR,
@@ -1557,14 +1592,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 	}
 
 	/*
-	 * Try to acquire the connection necessary for altering slot.
+	 * Try to acquire the connection necessary for altering slot, if needed.
 	 *
 	 * This has to be at the end because otherwise if there is an error while
 	 * doing the database operations we won't be able to rollback altered
 	 * slot.
 	 */
-	if (replaces[Anum_pg_subscription_subfailover - 1] ||
-		replaces[Anum_pg_subscription_subtwophasestate - 1])
+	if (update_failover || update_two_phase)
 	{
 		bool		must_use_password;
 		char	   *err;
@@ -1585,7 +1619,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 		PG_TRY();
 		{
-			walrcv_alter_slot(wrconn, sub->slotname, opts.failover, opts.twophase);
+			walrcv_alter_slot(wrconn, sub->slotname,
+							  update_failover ? &opts.failover : NULL,
+							  update_two_phase ? &opts.twophase : NULL);
 		}
 		PG_FINALLY();
 		{
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 1cb601a148..97f957cd87 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -80,7 +80,7 @@ static char *libpqrcv_create_slot(WalReceiverConn *conn,
 								  CRSSnapshotAction snapshot_action,
 								  XLogRecPtr *lsn);
 static void libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
-								bool failover, bool two_phase);
+								const bool *failover, const bool *two_phase);
 static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn);
 static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn,
 									   const char *query,
@@ -1121,16 +1121,27 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
  */
 static void
 libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
-					bool failover, bool two_phase)
+					const bool *failover, const bool *two_phase)
 {
 	StringInfoData cmd;
 	PGresult   *res;
 
 	initStringInfo(&cmd);
-	appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( FAILOVER %s, TWO_PHASE %s )",
-					 quote_identifier(slotname),
-					 failover ? "true" : "false",
-					 two_phase ? "true" : "false");
+	appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( ",
+					 quote_identifier(slotname));
+
+	if (failover)
+		appendStringInfo(&cmd, "FAILOVER %s",
+						 *failover ? "true" : "false");
+
+	if (failover && two_phase)
+		appendStringInfo(&cmd, ", ");
+
+	if (two_phase)
+		appendStringInfo(&cmd, "TWO_PHASE %s",
+						 *two_phase ? "true" : "false");
+
+	appendStringInfoString(&cmd, " );");
 
 	res = libpqrcv_PQexec(conn->streamConn, cmd.data);
 	pfree(cmd.data);
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 31fa1257ec..7ffa5a58b3 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -377,8 +377,9 @@ typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn,
  */
 typedef void (*walrcv_alter_slot_fn) (WalReceiverConn *conn,
 									  const char *slotname,
-									  bool failover,
-									  bool two_phase);
+									  const bool *failover,
+									  const bool *two_phase);
+
 
 /*
  * walrcv_get_backend_pid_fn
diff --git a/src/test/subscription/t/021_twophase.pl b/src/test/subscription/t/021_twophase.pl
index 4e8f627f7b..f56dff4b12 100644
--- a/src/test/subscription/t/021_twophase.pl
+++ b/src/test/subscription/t/021_twophase.pl
@@ -375,6 +375,12 @@ $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
 # then verify that the altered subscription reflects the two_phase option.
 ###############################
 
+# Confirm two-phase slot option is enabled before altering
+$result = $node_publisher->safe_psql('postgres',
+	"SELECT two_phase FROM pg_replication_slots WHERE slot_name = 'tap_sub_copy';"
+);
+is($result, qq(t), 'two-phase is enabled');
+
 # Alter subscription two_phase to false
 $node_subscriber->safe_psql('postgres',
     "ALTER SUBSCRIPTION tap_sub_copy DISABLE;");
@@ -393,7 +399,13 @@ $node_subscriber->wait_for_subscription_sync($node_publisher, $appname_copy);
 $result = $node_subscriber->safe_psql('postgres',
 	"SELECT subtwophasestate FROM pg_subscription WHERE subname = 'tap_sub_copy';"
 );
-is($result, qq(d), 'two-phase should be disabled');
+is($result, qq(d), 'two-phase subscription option should be disabled');
+
+# Make sure that the two-phase slot option is also disabled
+$result = $node_publisher->safe_psql('postgres',
+	"SELECT two_phase FROM pg_replication_slots WHERE slot_name = 'tap_sub_copy';"
+);
+is($result, qq(f), 'two-phase slot option should be disabled');
 
 # Now do a prepare on the publisher and make sure that it is not replicated.
 $node_publisher->safe_psql(
@@ -411,6 +423,19 @@ $result = $node_subscriber->safe_psql('postgres',
 	"SELECT count(*) FROM pg_prepared_xacts;");
 is($result, qq(0), 'should be no prepared transactions on subscriber');
 
+# Toggle the two_phase to "true" *before* the COMMIT PREPARED. Since we are the
+# special path for the case where both two_phase and failover are altered, it
+# is also set to "true".
+$node_subscriber->safe_psql('postgres',
+    "ALTER SUBSCRIPTION tap_sub_copy DISABLE;");
+$node_subscriber->poll_query_until('postgres',
+    "SELECT count(*) = 0 FROM pg_stat_activity WHERE backend_type = 'logical replication worker'"
+);
+$node_subscriber->safe_psql(
+	'postgres', "
+    ALTER SUBSCRIPTION tap_sub_copy SET (two_phase = true, failover = true);
+    ALTER SUBSCRIPTION tap_sub_copy ENABLE;");
+
 # Now commit the insert and verify that it is replicated
 $node_publisher->safe_psql('postgres', "COMMIT PREPARED 'newgid';");
 
@@ -422,20 +447,6 @@ $result =
   $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_copy;");
 is($result, qq(3), 'replicated data in subscriber table');
 
-# Alter subscription two_phase to true
-$node_subscriber->safe_psql('postgres',
-    "ALTER SUBSCRIPTION tap_sub_copy DISABLE;");
-$node_subscriber->poll_query_until('postgres',
-    "SELECT count(*) = 0 FROM pg_stat_activity WHERE backend_type = 'logical replication worker'"
-);
-$node_subscriber->safe_psql(
-	'postgres', "
-    ALTER SUBSCRIPTION tap_sub_copy SET (two_phase = true);
-    ALTER SUBSCRIPTION tap_sub_copy ENABLE;");
-
-# Wait for subscription startup
-$node_subscriber->wait_for_subscription_sync($node_publisher, $appname_copy);
-
 # Make sure that the two-phase is enabled on the subscriber
 $result = $node_subscriber->safe_psql('postgres',
 	"SELECT subtwophasestate FROM pg_subscription WHERE subname = 'tap_sub_copy';"
-- 
2.43.0

