From d6d4b8ac93c60dcaadb53c6cb9a446ae0882511b Mon Sep 17 00:00:00 2001
From: Ajin Cherian <ajinc@fast.au.fujitsu.com>
Date: Fri, 5 Apr 2024 06:47:18 -0400
Subject: [PATCH v3 1/5] Allow altering of two_phase option of a SUBSCRIPTION

This patch allows user to alter two_phase option of a subscriber provided no uncommitted
prepared transactions are pending on that subscription.

Author: Cherian Ajin, Hayato Kuroda
---
 src/backend/access/transam/twophase.c         | 43 +++++++++++++++++++
 src/backend/commands/subscriptioncmds.c       | 42 +++++++++++++++---
 .../libpqwalreceiver/libpqwalreceiver.c       |  7 +--
 src/backend/replication/logical/launcher.c    | 21 +++++++--
 src/backend/replication/logical/worker.c      |  3 --
 src/backend/replication/slot.c                | 19 +++++++-
 src/backend/replication/walsender.c           | 20 +++++++--
 src/bin/psql/tab-complete.c                   |  2 +-
 src/include/access/twophase.h                 |  3 ++
 src/include/replication/logicallauncher.h     |  2 +-
 src/include/replication/slot.h                |  3 +-
 src/include/replication/walreceiver.h         |  5 ++-
 src/test/regress/expected/subscription.out    |  5 +--
 src/test/regress/sql/subscription.sql         |  5 +--
 14 files changed, 146 insertions(+), 34 deletions(-)

diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 8090ac9fc1..495f99a357 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -2682,3 +2682,46 @@ LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn,
 	LWLockRelease(TwoPhaseStateLock);
 	return found;
 }
+
+/*
+ * checkGid
+ */
+static bool
+checkGid(char *gid, Oid subid)
+{
+	int			ret;
+	Oid			subid_written,
+				xid;
+
+	ret = sscanf(gid, "pg_gid_%u_%u", &subid_written, &xid);
+
+	if (ret != 2 || subid != subid_written)
+		return false;
+
+	return true;
+}
+
+/*
+ * LookupGXactBySubid
+ *		Check if the prepared transaction done by apply worker exists.
+ */
+bool
+LookupGXactBySubid(Oid subid)
+{
+	bool		found = false;
+
+	LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
+	for (int i = 0; i < TwoPhaseState->numPrepXacts; i++)
+	{
+		GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
+
+		/* Ignore not-yet-valid GIDs. */
+		if (gxact->valid && checkGid(gxact->gid, subid))
+		{
+			found = true;
+			break;
+		}
+	}
+	LWLockRelease(TwoPhaseStateLock);
+	return found;
+}
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 5a47fa984d..6643fc08a6 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -16,6 +16,7 @@
 
 #include "access/htup_details.h"
 #include "access/table.h"
+#include "access/twophase.h"
 #include "access/xact.h"
 #include "catalog/catalog.h"
 #include "catalog/dependency.h"
@@ -849,7 +850,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 			else if (opts.slot_name &&
 					 (opts.failover || walrcv_server_version(wrconn) >= 170000))
 			{
-				walrcv_alter_slot(wrconn, opts.slot_name, opts.failover);
+				walrcv_alter_slot(wrconn, opts.slot_name, opts.twophase, opts.failover);
 			}
 		}
 		PG_FINALLY();
@@ -868,7 +869,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	pgstat_create_subscription(subid);
 
 	if (opts.enabled)
-		ApplyLauncherWakeupAtCommit();
+		ApplyLauncherWakeupAtEOXact(true);
 
 	ObjectAddressSet(myself, SubscriptionRelationId, subid);
 
@@ -1165,7 +1166,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 			{
 				supported_opts = (SUBOPT_SLOT_NAME |
 								  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
-								  SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR |
+								  SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
+								  SUBOPT_DISABLE_ON_ERR |
 								  SUBOPT_PASSWORD_REQUIRED |
 								  SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
 								  SUBOPT_ORIGIN);
@@ -1173,6 +1175,31 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 				parse_subscription_options(pstate, stmt->options,
 										   supported_opts, &opts);
 
+				/* XXX */
+				if (IsSet(opts.specified_opts, SUBOPT_TWOPHASE_COMMIT))
+				{
+					/* Stop corresponding worker */
+					logicalrep_worker_stop(subid, InvalidOid);
+
+					/* Request to start worker at the end of transaction */
+					ApplyLauncherWakeupAtEOXact(false);
+
+					/* Check whether the number of prepared transactions */
+					if (!opts.twophase &&
+						form->subtwophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED &&
+						LookupGXactBySubid(subid))
+						ereport(ERROR,
+								(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+								 errmsg("cannot disable two_phase when uncommitted prepared transactions present")));
+
+					/* Change system catalog acoordingly */
+					values[Anum_pg_subscription_subtwophasestate - 1] =
+						CharGetDatum(opts.twophase ?
+									 LOGICALREP_TWOPHASE_STATE_PENDING :
+									 LOGICALREP_TWOPHASE_STATE_DISABLED);
+					replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
+				}
+
 				if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
 				{
 					/*
@@ -1299,7 +1326,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 				replaces[Anum_pg_subscription_subenabled - 1] = true;
 
 				if (opts.enabled)
-					ApplyLauncherWakeupAtCommit();
+					ApplyLauncherWakeupAtEOXact(true);
 
 				update_tuple = true;
 				break;
@@ -1521,7 +1548,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 	 * doing the database operations we won't be able to rollback altered
 	 * slot.
 	 */
-	if (replaces[Anum_pg_subscription_subfailover - 1])
+	if (replaces[Anum_pg_subscription_subtwophasestate - 1] ||
+		replaces[Anum_pg_subscription_subfailover - 1])
 	{
 		bool		must_use_password;
 		char	   *err;
@@ -1541,7 +1569,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 		PG_TRY();
 		{
-			walrcv_alter_slot(wrconn, sub->slotname, opts.failover);
+			walrcv_alter_slot(wrconn, sub->slotname, opts.twophase, opts.failover);
 		}
 		PG_FINALLY();
 		{
@@ -1962,7 +1990,7 @@ AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
 							  form->oid, 0);
 
 	/* Wake up related background processes to handle this change quickly. */
-	ApplyLauncherWakeupAtCommit();
+	ApplyLauncherWakeupAtEOXact(true);
 	LogicalRepWorkersWakeupAtCommit(form->oid);
 }
 
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 3c2b1bb496..baef3bdec0 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -80,7 +80,7 @@ static char *libpqrcv_create_slot(WalReceiverConn *conn,
 								  CRSSnapshotAction snapshot_action,
 								  XLogRecPtr *lsn);
 static void libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
-								bool failover);
+								bool two_phase, bool failover);
 static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn);
 static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn,
 									   const char *query,
@@ -1121,14 +1121,15 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
  */
 static void
 libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
-					bool failover)
+					bool two_phase, bool failover)
 {
 	StringInfoData cmd;
 	PGresult   *res;
 
 	initStringInfo(&cmd);
-	appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( FAILOVER %s )",
+	appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( TWO_PHASE %s, FAILOVER %s )",
 					 quote_identifier(slotname),
+					 two_phase ? "true" : "false",
 					 failover ? "true" : "false");
 
 	res = libpqrcv_PQexec(conn->streamConn, cmd.data);
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 66070e9131..3e0e5a77e0 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -89,6 +89,7 @@ static dsa_area *last_start_times_dsa = NULL;
 static dshash_table *last_start_times = NULL;
 
 static bool on_commit_launcher_wakeup = false;
+static bool launcher_wakeup = false;
 
 
 static void ApplyLauncherWakeup(void);
@@ -1085,13 +1086,22 @@ ApplyLauncherForgetWorkerStartTime(Oid subid)
 void
 AtEOXact_ApplyLauncher(bool isCommit)
 {
+	bool		kicked = false;
+
 	if (isCommit)
 	{
 		if (on_commit_launcher_wakeup)
+		{
 			ApplyLauncherWakeup();
+			kicked = true;
+		}
 	}
 
+	if (!kicked && launcher_wakeup)
+		ApplyLauncherWakeup();
+
 	on_commit_launcher_wakeup = false;
+	launcher_wakeup = false;
 }
 
 /*
@@ -1102,10 +1112,15 @@ AtEOXact_ApplyLauncher(bool isCommit)
  * tuple was added to the pg_subscription catalog.
 */
 void
-ApplyLauncherWakeupAtCommit(void)
+ApplyLauncherWakeupAtEOXact(bool on_commit)
 {
-	if (!on_commit_launcher_wakeup)
-		on_commit_launcher_wakeup = true;
+	if (on_commit)
+	{
+		if (!on_commit_launcher_wakeup)
+			on_commit_launcher_wakeup = true;
+	}
+	else if (!launcher_wakeup)
+		launcher_wakeup = true;
 }
 
 static void
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index b5a80fe3e8..ca3d260fc3 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3911,9 +3911,6 @@ maybe_reread_subscription(void)
 	/* !slotname should never happen when enabled is true. */
 	Assert(newsub->slotname);
 
-	/* two-phase should not be altered */
-	Assert(newsub->twophasestate == MySubscription->twophasestate);
-
 	/*
 	 * Exit if any parameter that affects the remote connection was changed.
 	 * The launcher will start a new worker but note that the parallel apply
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index cebf44bb0f..621f35ab1e 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -800,8 +800,10 @@ ReplicationSlotDrop(const char *name, bool nowait)
  * Change the definition of the slot identified by the specified name.
  */
 void
-ReplicationSlotAlter(const char *name, bool failover)
+ReplicationSlotAlter(const char *name, bool two_phase, bool failover)
 {
+	bool		update_slot = false;
+
 	Assert(MyReplicationSlot == NULL);
 
 	ReplicationSlotAcquire(name, false);
@@ -844,12 +846,27 @@ ReplicationSlotAlter(const char *name, bool failover)
 				errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 				errmsg("cannot enable failover for a temporary replication slot"));
 
+	if (MyReplicationSlot->data.two_phase != two_phase)
+	{
+		SpinLockAcquire(&MyReplicationSlot->mutex);
+		MyReplicationSlot->data.two_phase = two_phase;
+		SpinLockRelease(&MyReplicationSlot->mutex);
+
+		update_slot = true;
+	}
+
+
 	if (MyReplicationSlot->data.failover != failover)
 	{
 		SpinLockAcquire(&MyReplicationSlot->mutex);
 		MyReplicationSlot->data.failover = failover;
 		SpinLockRelease(&MyReplicationSlot->mutex);
 
+		update_slot = true;
+	}
+
+	if (update_slot)
+	{
 		ReplicationSlotMarkDirty();
 		ReplicationSlotSave();
 	}
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index bc40c454de..be155067ce 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1411,14 +1411,25 @@ DropReplicationSlot(DropReplicationSlotCmd *cmd)
  * Process extra options given to ALTER_REPLICATION_SLOT.
  */
 static void
-ParseAlterReplSlotOptions(AlterReplicationSlotCmd *cmd, bool *failover)
+ParseAlterReplSlotOptions(AlterReplicationSlotCmd *cmd,
+						  bool *two_phase, bool *failover)
 {
+	bool		two_phase_given = false;
 	bool		failover_given = false;
 
 	/* Parse options */
 	foreach_ptr(DefElem, defel, cmd->options)
 	{
-		if (strcmp(defel->defname, "failover") == 0)
+		if (strcmp(defel->defname, "two_phase") == 0)
+		{
+			if (two_phase_given)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+			two_phase_given = true;
+			*two_phase = defGetBoolean(defel);
+		}
+		else if (strcmp(defel->defname, "failover") == 0)
 		{
 			if (failover_given)
 				ereport(ERROR,
@@ -1438,10 +1449,11 @@ ParseAlterReplSlotOptions(AlterReplicationSlotCmd *cmd, bool *failover)
 static void
 AlterReplicationSlot(AlterReplicationSlotCmd *cmd)
 {
+	bool		two_phase = false;
 	bool		failover = false;
 
-	ParseAlterReplSlotOptions(cmd, &failover);
-	ReplicationSlotAlter(cmd->slotname, failover);
+	ParseAlterReplSlotOptions(cmd, &two_phase, &failover);
+	ReplicationSlotAlter(cmd->slotname, two_phase, failover);
 }
 
 /*
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 6fee3160f0..5ff84301cd 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -1948,7 +1948,7 @@ psql_completion(const char *text, int start, int end)
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "("))
 		COMPLETE_WITH("binary", "disable_on_error", "failover", "origin",
 					  "password_required", "run_as_owner", "slot_name",
-					  "streaming", "synchronous_commit");
+					  "streaming", "synchronous_commit", "two_phase");
 	/* ALTER SUBSCRIPTION <name> SKIP ( */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SKIP", "("))
 		COMPLETE_WITH("lsn");
diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h
index 56248c0006..d493ed24c5 100644
--- a/src/include/access/twophase.h
+++ b/src/include/access/twophase.h
@@ -62,4 +62,7 @@ extern void PrepareRedoRemove(TransactionId xid, bool giveWarning);
 extern void restoreTwoPhaseData(void);
 extern bool LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn,
 						TimestampTz origin_prepare_timestamp);
+
+extern bool LookupGXactBySubid(Oid subid);
+
 #endif							/* TWOPHASE_H */
diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h
index ff0438b5bb..075842c67e 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -24,7 +24,7 @@ extern void ApplyLauncherShmemInit(void);
 
 extern void ApplyLauncherForgetWorkerStartTime(Oid subid);
 
-extern void ApplyLauncherWakeupAtCommit(void);
+extern void ApplyLauncherWakeupAtEOXact(bool on_commit);
 extern void AtEOXact_ApplyLauncher(bool isCommit);
 
 extern bool IsLogicalLauncher(void);
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 7b937d1a0c..2fcb11418f 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -243,7 +243,8 @@ extern void ReplicationSlotCreate(const char *name, bool db_specific,
 extern void ReplicationSlotPersist(void);
 extern void ReplicationSlotDrop(const char *name, bool nowait);
 extern void ReplicationSlotDropAcquired(void);
-extern void ReplicationSlotAlter(const char *name, bool failover);
+extern void ReplicationSlotAlter(const char *name, bool two_phase,
+								 bool failover);
 
 extern void ReplicationSlotAcquire(const char *name, bool nowait);
 extern void ReplicationSlotRelease(void);
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 12f71fa99b..a443f402f5 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -377,6 +377,7 @@ typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn,
  */
 typedef void (*walrcv_alter_slot_fn) (WalReceiverConn *conn,
 									  const char *slotname,
+									  bool two_phase,
 									  bool failover);
 
 /*
@@ -455,8 +456,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
 	WalReceiverFunctions->walrcv_send(conn, buffer, nbytes)
 #define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn) \
 	WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
-#define walrcv_alter_slot(conn, slotname, failover) \
-	WalReceiverFunctions->walrcv_alter_slot(conn, slotname, failover)
+#define walrcv_alter_slot(conn, slotname, two_phase, failover) \
+	WalReceiverFunctions->walrcv_alter_slot(conn, slotname, two_phase, failover)
 #define walrcv_get_backend_pid(conn) \
 	WalReceiverFunctions->walrcv_get_backend_pid(conn)
 #define walrcv_exec(conn, exec, nRetTypes, retTypes) \
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 1eee6b17b8..9bba656e00 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -379,10 +379,7 @@ HINT:  To initiate replication, you must manually create the replication slot, e
  regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | p                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
---fail - alter of two_phase option not supported.
-ALTER SUBSCRIPTION regress_testsub SET (two_phase = false);
-ERROR:  unrecognized subscription parameter: "two_phase"
--- but can alter streaming when two_phase enabled
+-- We can alter streaming when two_phase enabled
 ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
 \dRs+
                                                                                                                 List of subscriptions
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index 1b2a23ba7b..9ff151f806 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -257,10 +257,7 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true);
 
 \dRs+
---fail - alter of two_phase option not supported.
-ALTER SUBSCRIPTION regress_testsub SET (two_phase = false);
-
--- but can alter streaming when two_phase enabled
+-- We can alter streaming when two_phase enabled
 ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
 
 \dRs+
-- 
2.43.0

