On Thu, Mar 27, 2025 at 2:29 PM Amit Kapila wrote:

> 
> On Tue, Mar 25, 2025 at 12:1 PM Amit Kapila <amit.kapil...@gmail.com>
> wrote:
> >
> > On Tue, Mar 25, 2025 at 11:05 AM Zhijie Hou (Fujitsu)
> > <houzj.f...@fujitsu.com> wrote:
> > >
> > > Hi,
> > >
> > > When testing the slot synchronization with logical replication slots that
> > > enabled two_phase decoding, I found that transactions prepared before
> two-phase
> > > decoding is enabled may fail to replicate to the subscriber after being
> > > committed on a promoted standby following a failover.
> > >
> > > To reproduce this issue, please follow these steps (also detailed in the
> > > attached TAP test, v1-0001):
> > >
> > > 1. sub: create a subscription with (two_phase = false)
> > > 2. primary (pub): prepare a txn A.
> > > 3. sub: alter subscription set (two_phase = true) and wait for the logical
> slot to
> > >    be synced to standby.
> > > 4. primary (pub): stop primary, promote the standby and let the subscriber
> use
> > >    the promoted standby as publisher.
> > > 5. promoted standby (pub): COMMIT PREPARED A;
> > > 6. sub: the apply worker will report the following ERROR because it didn't
> > >    receive the PREPARE.
> > >    ERROR:  prepared transaction with identifier "pg_gid_16387_752"
> does not exist
> > >
> > > I think the root cause of this issue is that the two_phase_at field of the
> > > slot, which indicates the LSN from which two-phase decoding is enabled
> (used to
> > > prevent duplicate data transmission for prepared transactions), is not
> > > synchronized to the standby server.
> > >
> > > In step 3, transaction A is not immediately replicated because it occurred
> > > before enabling two-phase decoding. Thus, the prepared transaction
> should only
> > > be replicated after decoding the final COMMIT PREPARED, as referenced
> in
> > > ReorderBufferFinishPrepared(). However, due to the invalid two_phase_at
> on the
> > > standby, the prepared transaction fails to send at that time.
> > >
> > > This problem arises after the support for altering the two-phase option
> > > (1462aad).
> > >
> 
> I suspect that this can happen in PG17 as well, but I need to think
> more about it to make a reproducible test case.

After further analysis, I was able to reproduce the same issue [1] in
PG 17.

However, since the proposed fix requires catalog changes and the issue is not a
security risk significant enough to justify changing the catalog in back
branches, we cannot back-patch the same solution. Following off-list
discussions with Amit and Kuroda-san, we are considering disallowing enabling
failover and two-phase decoding together for a replication slot, as suggested
in attachment 0002.

Another idea considered is to prevent the slot that enables two-phase decoding
from being synced to standby. IOW, this means displaying the failover field as
false in the view, if there is any possibility that transactions prepared
before the two_phase_at position exist (e.g., if restart_lsn is less than
two_phase_at). However, implementing this change would require additional
explanations to users for this new behavior, which seems tricky.

> 
> In the meantime, I have a few minor comments on the proposed patches:
> 1.
> ##################################################
>  # Promote the standby1 to primary. Confirm that:
>  # a) the slot 'lsub1_slot' and 'snap_test_slot' are retained on the new 
> primary
>  # b) logical replication for regress_mysub1 is resumed successfully
> after failover
>  # c) changes can be consumed from the synced slot 'snap_test_slot'
>  ##################################################
> -$standby1->start;
>  $primary->wait_for_replay_catchup($standby1);
> 
>  # Capture the time before the standby is promoted
> @@ -885,6 +940,15 @@ $standby1->wait_for_catchup('regress_mysub1');
>  is($subscriber1->safe_psql('postgres', q{SELECT count(*) FROM tab_int;}),
>   "20", 'data replicated from the new primary');
> 
> +# Commit the prepared transaction
> +$standby1->safe_psql('postgres',
> + "COMMIT PREPARED 'test_twophase_slotsync';");
> +$standby1->wait_for_catchup('regress_mysub1');
> +
> +# Confirm that the prepared transaction is replicated to the subscriber
> +is($subscriber1->safe_psql('postgres', q{SELECT count(*) FROM tab_int;}),
> + "21", 'prepared data replicated from the new primary');
> 
> The commentary above this test should include information about
> verifying the replication of previously prepared transactions after
> promotion. Also, it would be better if confirm the commit prepared
> before confirming the new Insert is replicated after promotion.
> 
> 2.
> @@ -249,6 +250,7 @@ update_local_synced_slot(RemoteSlot *remote_slot,
> Oid remote_dbid,
>   SpinLockAcquire(&slot->mutex);
>   slot->data.restart_lsn = remote_slot->restart_lsn;
>   slot->data.confirmed_flush = remote_slot->confirmed_lsn;
> + slot->data.two_phase_at = remote_slot->two_phase_at;
> 
> Why do we need to update the two_phase_at here when the patch does it
> later in this function when local and remote values don't match?

Thanks for the comments, they have been addressed in V2.

[1]
- pub: created a slot 'sub' with two_phase=false, then prepared a transaction
- pub: after some activity, advanced the confirmed_flush_lsn of 'sub', so it is
  greater than prepared txn lsn.
- sub: create subscription with (slot_name='sub', create_slot=false, failover =
 true, two_phase=true, copy_data=false); two_phase_at will be set to the same
 as confirmed_flush_lsn which is greater than the prepared transaction.
- stop the primary and promote the standby.
- commit the prepared transaction on standby, the following error will be
  reported on subscriber:

LOG:  logical replication apply worker for subscription "sub2" has started
ERROR:  prepared transaction with identifier "pg_gid_16398_764" does not exist.


Best Regards,
Hou zj

Attachment: v2-0001-Fix-slot-synchronization-with-two_phase-decoding-.patch
Description: v2-0001-Fix-slot-synchronization-with-two_phase-decoding-.patch

From 71c23ab9a87817eab41b3d31516ceddfff04ed66 Mon Sep 17 00:00:00 2001
From: Hou Zhijie <houzj.f...@cn.fujitsu.com>
Date: Mon, 31 Mar 2025 16:28:57 +0800
Subject: [PATCH v2] Disallow enabling failover for a replication slot that
 enables two-phase decoding

This commit fixes a bug for slot synchronization with logical replication
slots that enabled two_phase decoding. As it stands, transactions prepared
before two-phase decoding is enabled may fail to replicate to the subscriber
after being committed on a promoted standby following a failover.

The issue arises because the two_phase_at field of a slot, which tracks the LSN
from which two-phase decoding starts, is not synchronized to standby servers.
Without this field, the logical decoding might incorrectly identify prepared
transaction as already replicated to the subscriber, causing them to be
skipped.

To address the issue on HEAD, this commit makes the two_phase_at field of the 
slot
visible in the pg_replication_slots view and enables the slot synchronization
to copy this value to the corresponding synced slot on the standby server.

The bug has been present since the introduction of slot synchronization in
PostgreSQL 17. However, due to the need for catalog changes, backpatching this
fix is not feasible. Instead, to prevent the risk of losing prepared
transactions in prior versions, we now disallow enabling failover and two-phase
decoding together for a replication slot.
---
 contrib/test_decoding/expected/slot.out    |  2 ++
 contrib/test_decoding/sql/slot.sql         |  1 +
 src/backend/commands/subscriptioncmds.c    | 11 +++++++++
 src/backend/replication/slot.c             | 27 ++++++++++++++++++++++
 src/test/regress/expected/subscription.out |  3 +++
 src/test/regress/sql/subscription.sql      |  4 ++++
 6 files changed, 48 insertions(+)

diff --git a/contrib/test_decoding/expected/slot.out 
b/contrib/test_decoding/expected/slot.out
index 7de03c79f6f..347b9c11467 100644
--- a/contrib/test_decoding/expected/slot.out
+++ b/contrib/test_decoding/expected/slot.out
@@ -427,6 +427,8 @@ SELECT 'init' FROM 
pg_create_logical_replication_slot('failover_default_slot', '
 
 SELECT 'init' FROM 
pg_create_logical_replication_slot('failover_true_temp_slot', 'test_decoding', 
true, false, true);
 ERROR:  cannot enable failover for a temporary replication slot
+SELECT 'init' FROM 
pg_create_logical_replication_slot('failover_twophase_true_slot', 
'test_decoding', false, true, true);
+ERROR:  cannot enable failover for a replication slot that enables two-phase 
decoding
 SELECT 'init' FROM pg_create_physical_replication_slot('physical_slot');
  ?column? 
 ----------
diff --git a/contrib/test_decoding/sql/slot.sql 
b/contrib/test_decoding/sql/slot.sql
index 580e3ae3bef..a89fe712ff6 100644
--- a/contrib/test_decoding/sql/slot.sql
+++ b/contrib/test_decoding/sql/slot.sql
@@ -182,6 +182,7 @@ SELECT 'init' FROM 
pg_create_logical_replication_slot('failover_true_slot', 'tes
 SELECT 'init' FROM pg_create_logical_replication_slot('failover_false_slot', 
'test_decoding', false, false, false);
 SELECT 'init' FROM pg_create_logical_replication_slot('failover_default_slot', 
'test_decoding', false, false);
 SELECT 'init' FROM 
pg_create_logical_replication_slot('failover_true_temp_slot', 'test_decoding', 
true, false, true);
+SELECT 'init' FROM 
pg_create_logical_replication_slot('failover_twophase_true_slot', 
'test_decoding', false, true, true);
 SELECT 'init' FROM pg_create_physical_replication_slot('physical_slot');
 
 SELECT slot_name, slot_type, failover FROM pg_replication_slots;
diff --git a/src/backend/commands/subscriptioncmds.c 
b/src/backend/commands/subscriptioncmds.c
index 9467f58a23d..8308ccaad5a 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -648,6 +648,17 @@ CreateSubscription(ParseState *pstate, 
CreateSubscriptionStmt *stmt,
                                 errmsg("password_required=false is 
superuser-only"),
                                 errhint("Subscriptions with the 
password_required option set to false may only be created or modified by the 
superuser.")));
 
+       /*
+        * Do not allow users to enable failover and two_phase option together.
+        *
+        * See comments atop the similar check in ReplicationSlotCreate() for
+        * detailed reasons.
+        */
+       if (opts.twophase && opts.failover)
+               ereport(ERROR,
+                               errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                               errmsg("cannot enable failover option when 
two_phase option is enabled"));
+
        /*
         * If built with appropriate switch, whine when regression-testing
         * conventions for subscription names are violated.
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 780d22afbca..5b349b3c3af 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -343,6 +343,21 @@ ReplicationSlotCreate(const char *name, bool db_specific,
                        ereport(ERROR,
                                        errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                                        errmsg("cannot enable failover for a 
temporary replication slot"));
+
+               /*
+                * Do not allow users to enable failover for slots that enable
+                * two-phase decoding.
+                *
+                * This is because the two_phase_at field of a slot, which 
tracks the
+                * LSN from which two-phase decoding starts, is not 
synchronized to
+                * standby servers. Without this field, the logical decoding 
might
+                * incorrectly identify prepared transaction as already 
replicated to
+                * the subscriber, causing them to be skipped.
+                */
+               if (two_phase)
+                       ereport(ERROR,
+                                       errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                       errmsg("cannot enable failover for a 
replication slot that enables two-phase decoding"));
        }
 
        /*
@@ -848,6 +863,18 @@ ReplicationSlotAlter(const char *name, bool failover)
                                errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                                errmsg("cannot enable failover for a temporary 
replication slot"));
 
+       /*
+        * Do not allow users to enable failover for slots that enable two-phase
+        * decoding.
+        *
+        * See comments atop the similar check in ReplicationSlotCreate() for
+        * detailed reasons.
+        */
+       if (failover && MyReplicationSlot->data.two_phase)
+               ereport(ERROR,
+                               errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                               errmsg("cannot enable failover for a 
replication slot that enables two-phase decoding"));
+
        if (MyReplicationSlot->data.failover != failover)
        {
                SpinLockAcquire(&MyReplicationSlot->mutex);
diff --git a/src/test/regress/expected/subscription.out 
b/src/test/regress/expected/subscription.out
index 0f2a25cdc19..ee3603b67ef 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -479,6 +479,9 @@ COMMIT;
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 DROP SUBSCRIPTION regress_testsub;
 RESET SESSION AUTHORIZATION;
+-- fail - cannot enable two_phase and failover together
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' 
PUBLICATION testpub WITH (connect = false, two_phase = true, failover = true);
+ERROR:  cannot enable failover option when two_phase option is enabled
 DROP ROLE regress_subscription_user;
 DROP ROLE regress_subscription_user2;
 DROP ROLE regress_subscription_user3;
diff --git a/src/test/regress/sql/subscription.sql 
b/src/test/regress/sql/subscription.sql
index 3e5ba4cb8c6..47fc1e5329b 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -342,6 +342,10 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 DROP SUBSCRIPTION regress_testsub;
 
 RESET SESSION AUTHORIZATION;
+
+-- fail - cannot enable two_phase and failover together
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' 
PUBLICATION testpub WITH (connect = false, two_phase = true, failover = true);
+
 DROP ROLE regress_subscription_user;
 DROP ROLE regress_subscription_user2;
 DROP ROLE regress_subscription_user3;
-- 
2.31.1

Reply via email to