From 7a87391f4a0e62525a92ca6a8f3aa0de62f169e6 Mon Sep 17 00:00:00 2001
From: Hou Zhijie <houzj.fnst@cn.fujitsu.com>
Date: Mon, 7 Apr 2025 15:53:37 +0800
Subject: [PATCH v5_aprch_2] PG17 Approach 2 Fix slot synchronization for
 two_phase enables slots.

The issue is that the transactions prepared before two-phase decoding is
enabled can fail to replicate to the subscriber after being committed on a
promoted standby following a failover. This is because the two_phase_at
field of a slot, which tracks the LSN from which two-phase decoding
starts, is not synchronized to standby servers. Without two_phase_at, the
logical decoding might incorrectly identify prepared transaction as
already replicated to the subscriber after promotion of standby server,
causing them to be skipped.

Instead of disallowing the use of two-phase and failover together, a more
flexible strategy could be only restrict failover for slots with two-phase
enabled when there's a possibility of existing prepared transactions before the
two_phase_at that are not yet replicated. During slot creation with two-phase
and failover, we could check for any decoded prepared transactions when
determining the decoding start point (DecodingContextFindStartpoint). For
subsequent attempts to alter failover to true, we ensure that two_phase_at is
less than restart_lsn, indicating that all prepared transactions have been
committed and replicated, thus the bug would not happen.
---
 src/backend/commands/subscriptioncmds.c       | 28 ++++++++++++++++
 src/backend/replication/logical/decode.c      |  1 +
 src/backend/replication/logical/logical.c     | 21 ++++++++++++
 .../replication/logical/reorderbuffer.c       | 21 ++++++++++++
 src/backend/replication/slot.c                | 17 ++++++++++
 src/include/replication/reorderbuffer.h       |  1 +
 .../t/040_standby_failover_slots_sync.pl      | 32 +++++++++++++++++++
 src/test/regress/expected/subscription.out    |  3 ++
 src/test/regress/sql/subscription.sql         |  4 +++
 9 files changed, 128 insertions(+)

diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 9467f58a23d..59e5827b06a 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -648,6 +648,19 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 				 errmsg("password_required=false is superuser-only"),
 				 errhint("Subscriptions with the password_required option set to false may only be created or modified by the superuser.")));
 
+	/*
+	 * Do not allow users to enable the failover and two_phase options together
+	 * when create_slot is specified.
+	 *
+	 * See comments atop the similar check in DecodingContextFindStartpoint()
+	 * for a detailed reason.
+	 */
+	if (!opts.create_slot && opts.twophase && opts.failover)
+		ereport(ERROR,
+				errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				errmsg("\"%s\" and \"%s\" are mutually exclusive options when \"%s\" is specified",
+					   "failover", "two_phase", "create_slot"));
+
 	/*
 	 * If built with appropriate switch, whine when regression-testing
 	 * conventions for subscription names are violated.
@@ -1245,6 +1258,21 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 								 errmsg("cannot set %s for enabled subscription",
 										"failover")));
 
+					/*
+					 * Do not allow users to enable failover if the two_phase
+					 * state is still pending, indicating that the replication
+					 * slot's two_phase option has yet to be finalized. This
+					 * restriction is necessary because the check in
+					 * ReplicationSlotAlter() forbids enabling two_phase and
+					 * failover together and requires access to the actual
+					 * two_phase value.
+					 */
+					if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
+						opts.failover)
+						ereport(ERROR,
+								errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+								errmsg("cannot enable failover for a subscription with a pending two_phase state"));
+
 					/*
 					 * The changed failover option of the slot can't be rolled
 					 * back.
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 8ec5adfd909..60dbec37309 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -327,6 +327,7 @@ xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 				{
 					ReorderBufferProcessXid(reorder, parsed.twophase_xid,
 											buf->origptr);
+					ReorderBufferSkipPrepare(reorder, parsed.twophase_xid);
 					break;
 				}
 
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index e941bb491d8..5a71d441d52 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -679,6 +679,27 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
 		CHECK_FOR_INTERRUPTS();
 	}
 
+	/*
+	 * Do not allow users to enable failover for a two_phase enabled slot if
+	 * there are transactions prepared before reaching the consistent point
+	 * that were skipped during decoding. These prepared transactions can fail
+	 * to replicate to the subscriber after being committed on a promoted
+	 * standby following a failover.
+	 *
+	 * This is because the two_phase_at field of a slot, which tracks the LSN,
+	 * from which two-phase decoding starts, is not synchronized to standby
+	 * servers. Without two_phase_at, the logical decoding might incorrectly
+	 * identify prepared transaction as already replicated to the subscriber
+	 * after promotion of standby server, causing them to be skipped.
+	 */
+	if (slot->data.two_phase && slot->data.failover &&
+		ReorderBufferSkippedPrepare(ctx->reorder))
+		ereport(ERROR,
+				errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				errmsg("cannot enable failover for a two-phase enabled replication slot"),
+				errdetail("Prepared transactions exist before the logical decoding starting point %X/%X.",
+						  LSN_FORMAT_ARGS(ctx->reader->EndRecPtr)));
+
 	SpinLockAcquire(&slot->mutex);
 	slot->data.confirmed_flush = ctx->reader->EndRecPtr;
 	if (slot->data.two_phase)
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 03eb005c39d..80c2b99a71b 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2800,6 +2800,27 @@ ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid)
 	txn->txn_flags |= RBTXN_SKIPPED_PREPARE;
 }
 
+/*
+ * Check whether we have skipped processing prepared transactions.
+ */
+bool
+ReorderBufferSkippedPrepare(ReorderBuffer *rb)
+{
+	dlist_mutable_iter it;
+
+	dlist_foreach_modify(it, &rb->toplevel_by_lsn)
+	{
+		ReorderBufferTXN *txn;
+
+		txn = dlist_container(ReorderBufferTXN, node, it.cur);
+
+		if (rbtxn_skip_prepared(txn))
+			return true;
+	}
+
+	return false;
+}
+
 /*
  * Prepare a two-phase transaction.
  *
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index a1d4768623f..df0602cf640 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -848,6 +848,23 @@ ReplicationSlotAlter(const char *name, bool failover)
 				errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 				errmsg("cannot enable failover for a temporary replication slot"));
 
+	/*
+	 * Do not allow users to enable failover for a two_phase enabled slot
+	 * where slot's restart_lsn is less than two_phase_at. In such cases,
+	 * there is a risk that transactions prepared before two_phase_at exist
+	 * and would be skipped during decoding.
+	 *
+	 * See comments atop the similar check in DecodingContextFindStartpoint()
+	 * for a detailed reason.
+	 */
+	if (failover && MyReplicationSlot->data.two_phase &&
+		MyReplicationSlot->data.restart_lsn < MyReplicationSlot->data.two_phase_at)
+		ereport(ERROR,
+				errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				errmsg("cannot enable failover for a two-phase enabled replication slot"),
+				errhint("WALs prior to %X/%X, where two-phase decoding begins, are still reserved by this slot.",
+						LSN_FORMAT_ARGS(MyReplicationSlot->data.two_phase_at)));
+
 	if (MyReplicationSlot->data.failover != failover)
 	{
 		SpinLockAcquire(&MyReplicationSlot->mutex);
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 4c56f219fd8..43d6e8608ab 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -722,6 +722,7 @@ extern bool ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xi
 											 TimestampTz prepare_time,
 											 RepOriginId origin_id, XLogRecPtr origin_lsn);
 extern void ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid);
+extern bool ReorderBufferSkippedPrepare(ReorderBuffer *rb);
 extern void ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, char *gid);
 extern ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *rb);
 extern TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb);
diff --git a/src/test/recovery/t/040_standby_failover_slots_sync.pl b/src/test/recovery/t/040_standby_failover_slots_sync.pl
index 823857bb329..9f0ce33af6a 100644
--- a/src/test/recovery/t/040_standby_failover_slots_sync.pl
+++ b/src/test/recovery/t/040_standby_failover_slots_sync.pl
@@ -98,6 +98,38 @@ my ($result, $stdout, $stderr) = $subscriber1->psql('postgres',
 ok( $stderr =~ /ERROR:  cannot set failover for enabled subscription/,
 	"altering failover is not allowed for enabled subscription");
 
+##################################################
+# Test that the failover option cannot be enabled for a two_phase replication
+# slot.
+##################################################
+
+# # Create a slot with two_phase enabled
+$publisher->psql('postgres',
+	"SELECT pg_create_logical_replication_slot('regress_mysub2', 'pgoutput', false, true, false);");
+
+# Create a subscription with two_phase enabled
+$subscriber1->safe_psql('postgres',
+	"CREATE SUBSCRIPTION regress_mysub2 CONNECTION '$publisher_connstr' PUBLICATION regress_mypub WITH (create_slot = false, copy_data = false, enabled = false, two_phase = true);"
+);
+
+# Enable failover for the subscription
+($result, $stdout, $stderr) = $subscriber1->psql('postgres',
+	"ALTER SUBSCRIPTION regress_mysub2 SET (failover = true)");
+ok( $stderr =~ /ERROR:  cannot enable failover for a subscription with a pending two_phase state/,
+	"Enabling failover is not allowed for a two_phase pending subscription");
+
+# Attempt to enable failover for a two-phase enabled slot
+($result, $stdout, $stderr) = $publisher->psql(
+	'postgres',
+	qq[ALTER_REPLICATION_SLOT regress_mysub2 (failover);],
+	replication => 'database');
+ok($stderr =~ /ERROR:  cannot enable failover for a two-phase enabled replication slot/,
+	"Enabling failover is not allowed for a two_phase enabled slot");
+
+# Drop the subscription
+$subscriber1->safe_psql(
+	'postgres', "DROP SUBSCRIPTION regress_mysub2;");
+
 ##################################################
 # Test that pg_sync_replication_slots() cannot be executed on a non-standby server.
 ##################################################
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 0f2a25cdc19..0815094967f 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -479,6 +479,9 @@ COMMIT;
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 DROP SUBSCRIPTION regress_testsub;
 RESET SESSION AUTHORIZATION;
+-- fail - cannot enable two_phase and failover together
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true, failover = true);
+ERROR:  "failover" and "two_phase" are mutually exclusive options when "create_slot" is specified
 DROP ROLE regress_subscription_user;
 DROP ROLE regress_subscription_user2;
 DROP ROLE regress_subscription_user3;
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index 3e5ba4cb8c6..47fc1e5329b 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -342,6 +342,10 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 DROP SUBSCRIPTION regress_testsub;
 
 RESET SESSION AUTHORIZATION;
+
+-- fail - cannot enable two_phase and failover together
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true, failover = true);
+
 DROP ROLE regress_subscription_user;
 DROP ROLE regress_subscription_user2;
 DROP ROLE regress_subscription_user3;
-- 
2.34.1

