From 4dac078a5e6069b29e70302c8d2a31ba899f14cc Mon Sep 17 00:00:00 2001
From: Hou Zhijie <houzj.fnst@cn.fujitsu.com>
Date: Mon, 24 Feb 2025 17:24:41 +0800
Subject: [PATCH v3 2/2] Copy the two_phase option in
 pg_copy_logical_replication_slot

Previously, the two_phase option was not copied when calling
pg_copy_logical_replication_slot(). However, since it is feasible and more
intuitive to copy this option rather than documenting its exclusion, this patch
implements the copying of the two_phase option.
---
 contrib/test_decoding/expected/slot.out | 26 ++++++++++++-------------
 contrib/test_decoding/sql/slot.sql      |  6 +++---
 src/backend/replication/slotfuncs.c     | 11 +++++++++++
 3 files changed, 27 insertions(+), 16 deletions(-)

diff --git a/contrib/test_decoding/expected/slot.out b/contrib/test_decoding/expected/slot.out
index 7de03c79f6..c590901181 100644
--- a/contrib/test_decoding/expected/slot.out
+++ b/contrib/test_decoding/expected/slot.out
@@ -176,7 +176,7 @@ SELECT pg_drop_replication_slot('regression_slot3');
 -- Test copy functions for logical replication slots
 --
 -- Create and copy logical slots
-SELECT 'init' FROM pg_create_logical_replication_slot('orig_slot1', 'test_decoding', false);
+SELECT 'init' FROM pg_create_logical_replication_slot('orig_slot1', 'test_decoding', false, true);
  ?column? 
 ----------
  init
@@ -202,18 +202,18 @@ SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_
 
 -- Check all copied slots status
 SELECT
-    o.slot_name, o.plugin, o.temporary, c.slot_name, c.plugin, c.temporary
+    o.slot_name, o.plugin, o.temporary, c.slot_name, c.plugin, c.temporary, c.two_phase
 FROM
     (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o
     LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn  AND o.confirmed_flush_lsn = c.confirmed_flush_lsn
 WHERE
     o.slot_name != c.slot_name
 ORDER BY o.slot_name, c.slot_name;
- slot_name  |    plugin     | temporary |            slot_name            |    plugin     | temporary 
-------------+---------------+-----------+---------------------------------+---------------+-----------
- orig_slot1 | test_decoding | f         | copied_slot1_change_plugin      | pgoutput      | f
- orig_slot1 | test_decoding | f         | copied_slot1_change_plugin_temp | pgoutput      | t
- orig_slot1 | test_decoding | f         | copied_slot1_no_change          | test_decoding | f
+ slot_name  |    plugin     | temporary |            slot_name            |    plugin     | temporary | two_phase 
+------------+---------------+-----------+---------------------------------+---------------+-----------+-----------
+ orig_slot1 | test_decoding | f         | copied_slot1_change_plugin      | pgoutput      | f         | t
+ orig_slot1 | test_decoding | f         | copied_slot1_change_plugin_temp | pgoutput      | t         | t
+ orig_slot1 | test_decoding | f         | copied_slot1_no_change          | test_decoding | f         | t
 (3 rows)
 
 -- Now we have maximum 4 replication slots. Check slots are properly
@@ -267,18 +267,18 @@ SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_
 
 -- Check all copied slots status
 SELECT
-    o.slot_name, o.plugin, o.temporary, c.slot_name, c.plugin, c.temporary
+    o.slot_name, o.plugin, o.temporary, c.slot_name, c.plugin, c.temporary, c.two_phase
 FROM
     (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o
     LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn  AND o.confirmed_flush_lsn = c.confirmed_flush_lsn
 WHERE
     o.slot_name != c.slot_name
 ORDER BY o.slot_name, c.slot_name;
- slot_name  |    plugin     | temporary |            slot_name            |    plugin     | temporary 
-------------+---------------+-----------+---------------------------------+---------------+-----------
- orig_slot2 | test_decoding | t         | copied_slot2_change_plugin      | pgoutput      | t
- orig_slot2 | test_decoding | t         | copied_slot2_change_plugin_temp | pgoutput      | f
- orig_slot2 | test_decoding | t         | copied_slot2_no_change          | test_decoding | t
+ slot_name  |    plugin     | temporary |            slot_name            |    plugin     | temporary | two_phase 
+------------+---------------+-----------+---------------------------------+---------------+-----------+-----------
+ orig_slot2 | test_decoding | t         | copied_slot2_change_plugin      | pgoutput      | t         | f
+ orig_slot2 | test_decoding | t         | copied_slot2_change_plugin_temp | pgoutput      | f         | f
+ orig_slot2 | test_decoding | t         | copied_slot2_no_change          | test_decoding | t         | f
 (3 rows)
 
 -- Cannot copy a logical slot to a physical slot
diff --git a/contrib/test_decoding/sql/slot.sql b/contrib/test_decoding/sql/slot.sql
index 580e3ae3be..70356e7a56 100644
--- a/contrib/test_decoding/sql/slot.sql
+++ b/contrib/test_decoding/sql/slot.sql
@@ -88,14 +88,14 @@ SELECT pg_drop_replication_slot('regression_slot3');
 --
 
 -- Create and copy logical slots
-SELECT 'init' FROM pg_create_logical_replication_slot('orig_slot1', 'test_decoding', false);
+SELECT 'init' FROM pg_create_logical_replication_slot('orig_slot1', 'test_decoding', false, true);
 SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_no_change');
 SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_change_plugin', false, 'pgoutput');
 SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot1', 'copied_slot1_change_plugin_temp', true, 'pgoutput');
 
 -- Check all copied slots status
 SELECT
-    o.slot_name, o.plugin, o.temporary, c.slot_name, c.plugin, c.temporary
+    o.slot_name, o.plugin, o.temporary, c.slot_name, c.plugin, c.temporary, c.two_phase
 FROM
     (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o
     LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn  AND o.confirmed_flush_lsn = c.confirmed_flush_lsn
@@ -120,7 +120,7 @@ SELECT 'copy' FROM pg_copy_logical_replication_slot('orig_slot2', 'copied_slot2_
 
 -- Check all copied slots status
 SELECT
-    o.slot_name, o.plugin, o.temporary, c.slot_name, c.plugin, c.temporary
+    o.slot_name, o.plugin, o.temporary, c.slot_name, c.plugin, c.temporary, c.two_phase
 FROM
     (SELECT * FROM pg_replication_slots WHERE slot_name LIKE 'orig%') as o
     LEFT JOIN pg_replication_slots as c ON o.restart_lsn = c.restart_lsn  AND o.confirmed_flush_lsn = c.confirmed_flush_lsn
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 8222e5a109..acca37d333 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -707,6 +707,11 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
 		 * up, logical replication cannot continue using the synchronized slot
 		 * on the promoted standby because the slot retains the restart_lsn and
 		 * confirmed_flush_lsn that are much later than expected.
+		 *
+		 * The two_phase option is not copied initially because the source
+		 * option value could change concurrently anyway. Instead, it will be
+		 * copied after the slot creation along with other properties like
+		 * confirmed_flush.
 		 */
 		create_logical_replication_slot(NameStr(*dst_name),
 										plugin,
@@ -733,6 +738,8 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
 		TransactionId copy_catalog_xmin;
 		XLogRecPtr	copy_restart_lsn;
 		XLogRecPtr	copy_confirmed_flush;
+		XLogRecPtr	copy_two_phase_at;
+		bool		copy_two_phase;
 		bool		copy_islogical;
 		char	   *copy_name;
 
@@ -748,6 +755,8 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
 		copy_catalog_xmin = second_slot_contents.data.catalog_xmin;
 		copy_restart_lsn = second_slot_contents.data.restart_lsn;
 		copy_confirmed_flush = second_slot_contents.data.confirmed_flush;
+		copy_two_phase_at = second_slot_contents.data.two_phase_at;
+		copy_two_phase = second_slot_contents.data.two_phase;
 
 		/* for existence check */
 		copy_name = NameStr(second_slot_contents.data.name);
@@ -788,6 +797,8 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
 		MyReplicationSlot->data.catalog_xmin = copy_catalog_xmin;
 		MyReplicationSlot->data.restart_lsn = copy_restart_lsn;
 		MyReplicationSlot->data.confirmed_flush = copy_confirmed_flush;
+		MyReplicationSlot->data.two_phase_at = copy_two_phase_at;
+		MyReplicationSlot->data.two_phase = copy_two_phase;
 		SpinLockRelease(&MyReplicationSlot->mutex);
 
 		ReplicationSlotMarkDirty();
-- 
2.30.0.windows.2

