From e4ec647fc5114440b1061d1376caca73c03f7936 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Mon, 8 Apr 2024 12:39:12 +0000
Subject: [PATCH 3/4] Abort prepared transactions while altering two_phase to
 false

---
 src/backend/access/transam/twophase.c         | 19 +++++-----
 src/backend/commands/subscriptioncmds.c       | 27 +++++++++++---
 src/include/access/twophase.h                 |  3 +-
 src/test/subscription/t/099_twophase_added.pl | 35 +++++++++++++++++++
 4 files changed, 68 insertions(+), 16 deletions(-)

diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 2148daba3c..e18e80cf4e 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -2678,13 +2678,13 @@ checkGid(char *gid, Oid subid)
 }
 
 /*
- * LookupGXactBySubid
- *		Check if the prepared transaction done by apply worker exists.
+ * GetGidListBySubid
+ *      Get a list of GIDs which is PREPARE'd by the given subscription.
  */
-bool
-LookupGXactBySubid(Oid subid)
+List *
+GetGidListBySubid(Oid subid)
 {
-	bool		found = false;
+	List *list = NIL;
 
 	LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
 	for (int i = 0; i < TwoPhaseState->numPrepXacts; i++)
@@ -2693,11 +2693,10 @@ LookupGXactBySubid(Oid subid)
 
 		/* Ignore not-yet-valid GIDs. */
 		if (gxact->valid && checkGid(gxact->gid, subid))
-		{
-			found = true;
-			break;
-		}
+			list = lappend(list, pstrdup(gxact->gid));
+
 	}
 	LWLockRelease(TwoPhaseStateLock);
-	return found;
+
+	return list;
 }
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 955d5e4899..b1c00e36db 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -1142,6 +1142,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 				/* XXX */
 				if (IsSet(opts.specified_opts, SUBOPT_TWOPHASE_COMMIT))
 				{
+					List *prepared_xacts = NIL;
+
 					/*
 					 * two_phase can be only changed for disabled
 					 * subscriptions
@@ -1158,13 +1160,28 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					 */
 					logicalrep_workers_stop(subid);
 
-					/* Check whether the number of prepared transactions */
+					/*
+					 * If two phase was enabled, there is a possibility the
+					 * transactions has already been PREPARE'd.
+					 */
 					if (!opts.twophase &&
 						form->subtwophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED &&
-						LookupGXactBySubid(subid))
-						ereport(ERROR,
-								(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-								 errmsg("cannot disable two_phase when uncommitted prepared transactions present")));
+						(prepared_xacts = GetGidListBySubid(subid)) != NIL)
+					{
+						ListCell	*cell;
+
+						/* Must not be in the transaction */
+						PreventInTransactionBlock(isTopLevel,
+												  "ALTER SUBSCRIPTION ... SET (two_phase = ...)");
+
+						/* Abort all listed transactions */
+						foreach(cell, prepared_xacts)
+						{
+							FinishPreparedTransaction((char *) lfirst(cell),
+													  false);
+							prepared_xacts = list_delete_cell(prepared_xacts, cell);
+						}
+					}
 
 					/* Change system catalog acoordingly */
 					values[Anum_pg_subscription_subtwophasestate - 1] =
diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h
index dac3f27bc3..8eebfa7267 100644
--- a/src/include/access/twophase.h
+++ b/src/include/access/twophase.h
@@ -18,6 +18,7 @@
 #include "access/xlogdefs.h"
 #include "datatype/timestamp.h"
 #include "storage/lock.h"
+#include "nodes/pg_list.h"
 
 /*
  * GlobalTransactionData is defined in twophase.c; other places have no
@@ -63,6 +64,6 @@ extern void restoreTwoPhaseData(void);
 extern bool LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn,
 						TimestampTz origin_prepare_timestamp);
 
-extern bool LookupGXactBySubid(Oid subid);
+extern List *GetGidListBySubid(Oid subid);
 
 #endif							/* TWOPHASE_H */
diff --git a/src/test/subscription/t/099_twophase_added.pl b/src/test/subscription/t/099_twophase_added.pl
index c13a37675a..a8135b671c 100644
--- a/src/test/subscription/t/099_twophase_added.pl
+++ b/src/test/subscription/t/099_twophase_added.pl
@@ -69,4 +69,39 @@ $result = $node_subscriber->safe_psql('postgres',
 is($result, q(5),
    "prepared transactions done before altering can be replicated");
 
+######
+# Check the case that prepared transactions exist on subscriber node
+######
+
+$node_publisher->safe_psql(
+	'postgres', "
+	BEGIN;
+	INSERT INTO tab_full VALUES (generate_series(6, 10));
+	PREPARE TRANSACTION 'test_prepared_tab_full';");
+
+$node_publisher->wait_for_catchup('sub');
+
+$result = $node_subscriber->safe_psql('postgres',
+    "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, q(1), "transaction has been prepared on subscriber");
+
+$node_subscriber->safe_psql(
+    'postgres', "
+    ALTER SUBSCRIPTION sub DISABLE;
+    ALTER SUBSCRIPTION sub SET (two_phase = off);
+    ALTER SUBSCRIPTION sub ENABLE;");
+
+$result = $node_subscriber->safe_psql('postgres',
+    "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, q(0), "prepared transaction done by worker is aborted");
+
+$node_publisher->safe_psql( 'postgres',
+    "COMMIT PREPARED 'test_prepared_tab_full';");
+$node_publisher->wait_for_catchup('sub');
+
+$result = $node_subscriber->safe_psql('postgres',
+    "SELECT count(10) FROM tab_full;");
+is($result, q(10),
+   "prepared transactions on publisher can be replicated");
+
 done_testing();
-- 
2.43.0

