On Wed, Apr 2, 2025 at 12:41 PM Amit Kapila wrote:

> 
> On Tue, Apr 1, 2025 at 4:28 PM Zhijie Hou (Fujitsu) 
> <houzj.f...@fujitsu.com>
> wrote:
> >
> > Here is the V3 patch set which addressed all the comments.
> >
> 
> Comment 0n 0001
> <literal>NULL</literal> for logical slots where
> +       <structfield>two_phase</structfield> is false and physical slots.
> +      </para></entry>
> 
> change above to:
> <literal>NULL</literal> for logical slots where 
> <structfield>two_phase</structfield> is false and for physical slots.

Changed.

> 
> Comment on 0002
> +# Create a subscription with two_phase enabled 
> +$subscriber1->safe_psql('postgres',
> + "CREATE SUBSCRIPTION regress_mysub2 CONNECTION
> '$publisher_connstr'
> PUBLICATION regress_mypub WITH (slot_name = lsub1_slot, create_slot = 
> false, enabled = false, two_phase = true);"
> +);
> +
> +# Enable failover for the subscription ($result, $stdout, $stderr) = 
> +$subscriber1->psql('postgres',  "ALTER SUBSCRIPTION regress_mysub2 
> +SET (failover = true)"); ok( $stderr =~
> +/ERROR:  cannot enable failover for a two_phase
> enabled subscription/,
> + "Enabling failover is not allowed for a two_phase enabled 
> + subscription");
> 
> Is there a need for this test to be in .pl file? Can't we add it in .sql file?

Right, I moved the test into subscription.sql

> Apart from the above, I have made minor modifications to the PG17 
> patch in the attached.

Here is V4 patch set which addressed above comments and passed
pgindent test.

Best Regards,
Hou zj

Attachment: v4-0001-HEAD-Fix-slot-synchronization-for-two_phase-enables-sl.patch
Description: v4-0001-HEAD-Fix-slot-synchronization-for-two_phase-enables-sl.patch

From e1ad472436f02ed9b410bc96c1b5d0e7952f7883 Mon Sep 17 00:00:00 2001
From: Hou Zhijie <houzj.f...@cn.fujitsu.com>
Date: Mon, 31 Mar 2025 16:28:57 +0800
Subject: [PATCH v4 2/2] Fix slot synchronization for two_phase enables slots.

The issue is that the transactions prepared before two-phase decoding is
enabled can fail to replicate to the subscriber after being committed on a
promoted standby following a failover. This is because the two_phase_at
field of a slot, which tracks the LSN from which two-phase decoding
starts, is not synchronized to standby servers. Without two_phase_at, the
logical decoding might incorrectly identify prepared transaction as
already replicated to the subscriber after promotion of standby server,
causing them to be skipped.

To address the issue on HEAD, the two_phase_at field of the slot is
exposed by the pg_replication_slots view and allows the slot
synchronization to copy this value to the corresponding synced slot on the
standby server.

However, we can't fix the issue in the same way in PG17 where slot
synchronization has been introduced, as it requires a change in view and
builtin function definition, which in turn requires a catversion bump.
Instead, to prevent the risk of losing prepared transactions, we disallow
enabling failover and two-phase decoding together for a replication slot.
Users are advised to disable slot synchronization for two_phase-enabled
slots or re-create the subscriptions with option 'two_phase' as false and
'failover' as true.
---
 contrib/test_decoding/expected/slot.out    |  2 ++
 contrib/test_decoding/sql/slot.sql         |  1 +
 src/backend/commands/subscriptioncmds.c    | 26 +++++++++++++++++++++
 src/backend/replication/slot.c             | 27 ++++++++++++++++++++++
 src/bin/pg_upgrade/t/003_logical_slots.pl  |  6 ++---
 src/test/regress/expected/subscription.out |  6 +++++
 src/test/regress/sql/subscription.sql      |  7 ++++++
 7 files changed, 72 insertions(+), 3 deletions(-)

diff --git a/contrib/test_decoding/expected/slot.out 
b/contrib/test_decoding/expected/slot.out
index 7de03c79f6f..8fd762dea85 100644
--- a/contrib/test_decoding/expected/slot.out
+++ b/contrib/test_decoding/expected/slot.out
@@ -427,6 +427,8 @@ SELECT 'init' FROM 
pg_create_logical_replication_slot('failover_default_slot', '
 
 SELECT 'init' FROM 
pg_create_logical_replication_slot('failover_true_temp_slot', 'test_decoding', 
true, false, true);
 ERROR:  cannot enable failover for a temporary replication slot
+SELECT 'init' FROM 
pg_create_logical_replication_slot('failover_twophase_true_slot', 
'test_decoding', false, true, true);
+ERROR:  "failover" and "two_phase" are mutually exclusive options
 SELECT 'init' FROM pg_create_physical_replication_slot('physical_slot');
  ?column? 
 ----------
diff --git a/contrib/test_decoding/sql/slot.sql 
b/contrib/test_decoding/sql/slot.sql
index 580e3ae3bef..a89fe712ff6 100644
--- a/contrib/test_decoding/sql/slot.sql
+++ b/contrib/test_decoding/sql/slot.sql
@@ -182,6 +182,7 @@ SELECT 'init' FROM 
pg_create_logical_replication_slot('failover_true_slot', 'tes
 SELECT 'init' FROM pg_create_logical_replication_slot('failover_false_slot', 
'test_decoding', false, false, false);
 SELECT 'init' FROM pg_create_logical_replication_slot('failover_default_slot', 
'test_decoding', false, false);
 SELECT 'init' FROM 
pg_create_logical_replication_slot('failover_true_temp_slot', 'test_decoding', 
true, false, true);
+SELECT 'init' FROM 
pg_create_logical_replication_slot('failover_twophase_true_slot', 
'test_decoding', false, true, true);
 SELECT 'init' FROM pg_create_physical_replication_slot('physical_slot');
 
 SELECT slot_name, slot_type, failover FROM pg_replication_slots;
diff --git a/src/backend/commands/subscriptioncmds.c 
b/src/backend/commands/subscriptioncmds.c
index 9467f58a23d..09c5f0ef685 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -648,6 +648,19 @@ CreateSubscription(ParseState *pstate, 
CreateSubscriptionStmt *stmt,
                                 errmsg("password_required=false is 
superuser-only"),
                                 errhint("Subscriptions with the 
password_required option set to false may only be created or modified by the 
superuser.")));
 
+       /*
+        * Do not allow users to enable the failover and two_phase options
+        * together.
+        *
+        * See comments atop the similar check in ReplicationSlotCreate() for a
+        * detailed reason.
+        */
+       if (opts.twophase && opts.failover)
+               ereport(ERROR,
+                               errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                               errmsg("\"%s\" and \"%s\" are mutually 
exclusive options",
+                                          "failover", "two_phase"));
+
        /*
         * If built with appropriate switch, whine when regression-testing
         * conventions for subscription names are violated.
@@ -1245,6 +1258,19 @@ AlterSubscription(ParseState *pstate, 
AlterSubscriptionStmt *stmt,
                                                                 errmsg("cannot 
set %s for enabled subscription",
                                                                                
"failover")));
 
+                                       /*
+                                        * Do not allow users to enable the 
failover and two_phase
+                                        * options together.
+                                        *
+                                        * See comments atop the similar check 
in
+                                        * ReplicationSlotCreate() for a 
detailed reason.
+                                        */
+                                       if (sub->twophasestate != 
LOGICALREP_TWOPHASE_STATE_DISABLED &&
+                                               opts.failover)
+                                               ereport(ERROR,
+                                                               
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                                               errmsg("cannot 
enable failover for a two_phase enabled subscription"));
+
                                        /*
                                         * The changed failover option of the 
slot can't be rolled
                                         * back.
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 780d22afbca..3293651f725 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -343,6 +343,22 @@ ReplicationSlotCreate(const char *name, bool db_specific,
                        ereport(ERROR,
                                        errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                                        errmsg("cannot enable failover for a 
temporary replication slot"));
+
+               /*
+                * Do not allow users to enable both failover and two_phase for 
slots.
+                *
+                * This is because the two_phase_at field of a slot, which 
tracks the
+                * LSN, from which two-phase decoding starts, is not 
synchronized to
+                * standby servers. Without two_phase_at, the logical decoding 
might
+                * incorrectly identify prepared transaction as already 
replicated to
+                * the subscriber after promotion of standby server, causing 
them to
+                * be skipped.
+                */
+               if (two_phase)
+                       ereport(ERROR,
+                                       errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                       errmsg("\"%s\" and \"%s\" are mutually 
exclusive options",
+                                                  "failover", "two_phase"));
        }
 
        /*
@@ -848,6 +864,17 @@ ReplicationSlotAlter(const char *name, bool failover)
                                errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                                errmsg("cannot enable failover for a temporary 
replication slot"));
 
+       /*
+        * Do not allow users to enable failover for a two_phase enabled slot.
+        *
+        * See comments atop the similar check in ReplicationSlotCreate() for a
+        * detailed reason.
+        */
+       if (failover && MyReplicationSlot->data.two_phase)
+               ereport(ERROR,
+                               errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                               errmsg("cannot enable failover for a two-phase 
enabled replication slot"));
+
        if (MyReplicationSlot->data.failover != failover)
        {
                SpinLockAcquire(&MyReplicationSlot->mutex);
diff --git a/src/bin/pg_upgrade/t/003_logical_slots.pl 
b/src/bin/pg_upgrade/t/003_logical_slots.pl
index 0a2483d3dfc..e329cc609ce 100644
--- a/src/bin/pg_upgrade/t/003_logical_slots.pl
+++ b/src/bin/pg_upgrade/t/003_logical_slots.pl
@@ -173,7 +173,7 @@ $sub->start;
 $sub->safe_psql(
        'postgres', qq[
        CREATE TABLE tbl (a int);
-       CREATE SUBSCRIPTION regress_sub CONNECTION '$old_connstr' PUBLICATION 
regress_pub WITH (two_phase = 'true', failover = 'true')
+       CREATE SUBSCRIPTION regress_sub CONNECTION '$old_connstr' PUBLICATION 
regress_pub WITH (two_phase = 'true')
 ]);
 $sub->wait_for_subscription_sync($oldpub, 'regress_sub');
 
@@ -193,8 +193,8 @@ command_ok([@pg_upgrade_cmd], 'run of pg_upgrade of old 
cluster');
 # Check that the slot 'regress_sub' has migrated to the new cluster
 $newpub->start;
 my $result = $newpub->safe_psql('postgres',
-       "SELECT slot_name, two_phase, failover FROM pg_replication_slots");
-is($result, qq(regress_sub|t|t), 'check the slot exists on new cluster');
+       "SELECT slot_name, two_phase FROM pg_replication_slots");
+is($result, qq(regress_sub|t), 'check the slot exists on new cluster');
 
 # Update the connection
 my $new_connstr = $newpub->connstr . ' dbname=postgres';
diff --git a/src/test/regress/expected/subscription.out 
b/src/test/regress/expected/subscription.out
index 0f2a25cdc19..fd94389d916 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -389,6 +389,9 @@ ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
  regress_testsub | regress_subscription_user | f       | {testpub}   | f      
| on        | p                | f                | any    | t                 
| f             | f        | off                | dbname=regress_doesnotexist | 
0/0
 (1 row)
 
+--fail - cannot enable failover for a two_phase enabled subscription
+ALTER SUBSCRIPTION regress_testsub SET (failover = true);
+ERROR:  cannot enable failover for a two_phase enabled subscription
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 DROP SUBSCRIPTION regress_testsub;
 -- two_phase and streaming are compatible.
@@ -479,6 +482,9 @@ COMMIT;
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 DROP SUBSCRIPTION regress_testsub;
 RESET SESSION AUTHORIZATION;
+-- fail - cannot enable two_phase and failover together
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' 
PUBLICATION testpub WITH (connect = false, two_phase = true, failover = true);
+ERROR:  "failover" and "two_phase" are mutually exclusive options
 DROP ROLE regress_subscription_user;
 DROP ROLE regress_subscription_user2;
 DROP ROLE regress_subscription_user3;
diff --git a/src/test/regress/sql/subscription.sql 
b/src/test/regress/sql/subscription.sql
index 3e5ba4cb8c6..bc6e0f04e64 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -264,6 +264,9 @@ ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
 
 \dRs+
 
+--fail - cannot enable failover for a two_phase enabled subscription
+ALTER SUBSCRIPTION regress_testsub SET (failover = true);
+
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 DROP SUBSCRIPTION regress_testsub;
 
@@ -342,6 +345,10 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 DROP SUBSCRIPTION regress_testsub;
 
 RESET SESSION AUTHORIZATION;
+
+-- fail - cannot enable two_phase and failover together
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' 
PUBLICATION testpub WITH (connect = false, two_phase = true, failover = true);
+
 DROP ROLE regress_subscription_user;
 DROP ROLE regress_subscription_user2;
 DROP ROLE regress_subscription_user3;
-- 
2.31.1

Reply via email to