From fb03795b01583a6d26ded9674f4e2173b16d7cd8 Mon Sep 17 00:00:00 2001
From: Ajin Cherian <ajinc@fast.au.fujitsu.com>
Date: Tue, 23 Feb 2021 23:13:53 -0500
Subject: [PATCH v1] Avoid repeated decoding of prepared transactions.

Prepared transactions were decoded again after a restart on COMMIT PREPARED
when two-phase commits were enabled. This was done to avoid missing a prepared
transaction that is not part of initial snapshot. Now, this missing PREPARE is identified
by defining a new LSN called snapshot_was_exported_at_lsn and stored in the
slot and snapbuild structures. Prepared transactions that were prior this LSN
will be replayed on a COMMIT PREPARED.

This commit also changes the way two-phase commits are enabled in test_decoding plugin.
Two-phase commits can now only be enabled while creating the slot using
pg_create_logical_replication_slot() and cannot be set using pg_logical_slot_get_changes().
For this the API pg_create_logical_replication_slot() is modified to take one more
optional boolean parameter 'twophase', which when set to TRUE enables two-phase commits.
The parameter defaults to FALSE.
---
 contrib/test_decoding/expected/twophase.out        | 72 +++++++++-------------
 .../test_decoding/expected/twophase_snapshot.out   |  6 +-
 contrib/test_decoding/expected/twophase_stream.out | 38 +++---------
 contrib/test_decoding/specs/twophase_snapshot.spec |  4 +-
 contrib/test_decoding/sql/twophase.sql             | 34 +++++-----
 contrib/test_decoding/sql/twophase_stream.sql      | 10 +--
 contrib/test_decoding/test_decoding.c              | 18 ++----
 doc/src/sgml/logicaldecoding.sgml                  | 15 ++---
 src/backend/catalog/system_views.sql               |  1 +
 src/backend/replication/logical/decode.c           | 11 +++-
 src/backend/replication/logical/logical.c          | 13 +++-
 src/backend/replication/logical/logicalfuncs.c     |  8 +++
 src/backend/replication/logical/reorderbuffer.c    | 28 ++-------
 src/backend/replication/logical/snapbuild.c        | 19 +++++-
 src/backend/replication/repl_gram.y                | 14 ++++-
 src/backend/replication/repl_scanner.l             |  1 +
 src/backend/replication/slot.c                     |  3 +-
 src/backend/replication/slotfuncs.c                | 10 ++-
 src/backend/replication/walsender.c                |  6 +-
 src/include/catalog/pg_proc.dat                    |  8 +--
 src/include/nodes/replnodes.h                      |  1 +
 src/include/replication/reorderbuffer.h            |  2 +
 src/include/replication/slot.h                     | 14 ++++-
 src/include/replication/snapbuild.h                |  4 +-
 24 files changed, 178 insertions(+), 162 deletions(-)

diff --git a/contrib/test_decoding/expected/twophase.out b/contrib/test_decoding/expected/twophase.out
index f9f6bed..8d61107 100644
--- a/contrib/test_decoding/expected/twophase.out
+++ b/contrib/test_decoding/expected/twophase.out
@@ -1,7 +1,7 @@
 -- Test prepared transactions. When two-phase-commit is enabled, transactions are
 -- decoded at PREPARE time rather than at COMMIT PREPARED time.
 SET synchronous_commit = on;
-SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding', false, true);
  ?column? 
 ----------
  init
@@ -15,14 +15,14 @@ BEGIN;
 INSERT INTO test_prepared1 VALUES (1);
 INSERT INTO test_prepared1 VALUES (2);
 -- should show nothing because the xact has not been prepared yet.
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
  data 
 ------
 (0 rows)
 
 PREPARE TRANSACTION 'test_prepared#1';
 -- should show both the above inserts and the PREPARE TRANSACTION.
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
                         data                        
 ----------------------------------------------------
  BEGIN
@@ -32,21 +32,17 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
 (4 rows)
 
 COMMIT PREPARED 'test_prepared#1';
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
-                        data                        
-----------------------------------------------------
- BEGIN
- table public.test_prepared1: INSERT: id[integer]:1
- table public.test_prepared1: INSERT: id[integer]:2
- PREPARE TRANSACTION 'test_prepared#1'
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+               data                
+-----------------------------------
  COMMIT PREPARED 'test_prepared#1'
-(5 rows)
+(1 row)
 
 -- Test that rollback of a prepared xact is decoded.
 BEGIN;
 INSERT INTO test_prepared1 VALUES (3);
 PREPARE TRANSACTION 'test_prepared#2';
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
                         data                        
 ----------------------------------------------------
  BEGIN
@@ -55,7 +51,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
 (3 rows)
 
 ROLLBACK PREPARED 'test_prepared#2';
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
                 data                 
 -------------------------------------
  ROLLBACK PREPARED 'test_prepared#2'
@@ -78,7 +74,7 @@ WHERE locktype = 'relation'
 (2 rows)
 
 -- The insert should show the newly altered column but not the DDL.
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
                                   data                                   
 -------------------------------------------------------------------------
  BEGIN
@@ -93,7 +89,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
 -- the ALTER will stop us inserting into the other one.
 --
 INSERT INTO test_prepared2 VALUES (5);
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
                         data                        
 ----------------------------------------------------
  BEGIN
@@ -102,19 +98,16 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
 (3 rows)
 
 COMMIT PREPARED 'test_prepared#3';
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
-                                  data                                   
--------------------------------------------------------------------------
- BEGIN
- table public.test_prepared1: INSERT: id[integer]:4 data[text]:'frakbar'
- PREPARE TRANSACTION 'test_prepared#3'
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+               data                
+-----------------------------------
  COMMIT PREPARED 'test_prepared#3'
-(4 rows)
+(1 row)
 
 -- make sure stuff still works
 INSERT INTO test_prepared1 VALUES (6);
 INSERT INTO test_prepared2 VALUES (7);
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
                                 data                                
 --------------------------------------------------------------------
  BEGIN
@@ -146,7 +139,7 @@ WHERE locktype = 'relation'
 -- The above CLUSTER command shouldn't cause a timeout on 2pc decoding. The
 -- call should return within a second.
 SET statement_timeout = '1s';
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
                                    data                                    
 ---------------------------------------------------------------------------
  BEGIN
@@ -158,15 +151,11 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
 RESET statement_timeout;
 COMMIT PREPARED 'test_prepared_lock';
 -- consume the commit
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
-                                   data                                    
----------------------------------------------------------------------------
- BEGIN
- table public.test_prepared1: INSERT: id[integer]:8 data[text]:'othercol'
- table public.test_prepared1: INSERT: id[integer]:9 data[text]:'othercol2'
- PREPARE TRANSACTION 'test_prepared_lock'
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+                 data                 
+--------------------------------------
  COMMIT PREPARED 'test_prepared_lock'
-(5 rows)
+(1 row)
 
 -- Test savepoints and sub-xacts. Creating savepoints will create
 -- sub-xacts implicitly.
@@ -178,7 +167,7 @@ INSERT INTO test_prepared_savepoint VALUES (2);
 ROLLBACK TO SAVEPOINT test_savepoint;
 PREPARE TRANSACTION 'test_prepared_savepoint';
 -- should show only 1, not 2
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
                             data                            
 ------------------------------------------------------------
  BEGIN
@@ -188,28 +177,25 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
 
 COMMIT PREPARED 'test_prepared_savepoint';
 -- consume the commit
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
-                            data                            
-------------------------------------------------------------
- BEGIN
- table public.test_prepared_savepoint: INSERT: a[integer]:1
- PREPARE TRANSACTION 'test_prepared_savepoint'
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+                   data                    
+-------------------------------------------
  COMMIT PREPARED 'test_prepared_savepoint'
-(4 rows)
+(1 row)
 
 -- Test that a GID containing "_nodecode" gets decoded at commit prepared time.
 BEGIN;
 INSERT INTO test_prepared1 VALUES (20);
 PREPARE TRANSACTION 'test_prepared_nodecode';
 -- should show nothing
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
  data 
 ------
 (0 rows)
 
 COMMIT PREPARED 'test_prepared_nodecode';
 -- should be decoded now
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
                                 data                                 
 ---------------------------------------------------------------------
  BEGIN
@@ -222,7 +208,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
 DROP TABLE test_prepared1;
 DROP TABLE test_prepared2;
 -- show results. There should be nothing to show
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
  data 
 ------
 (0 rows)
diff --git a/contrib/test_decoding/expected/twophase_snapshot.out b/contrib/test_decoding/expected/twophase_snapshot.out
index 14d9387..0e8e1f5 100644
--- a/contrib/test_decoding/expected/twophase_snapshot.out
+++ b/contrib/test_decoding/expected/twophase_snapshot.out
@@ -6,7 +6,7 @@ step s2txid: SELECT pg_current_xact_id() IS NULL;
 ?column?       
 
 f              
-step s1init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); <waiting ...>
+step s1init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding', false, true); <waiting ...>
 step s3b: BEGIN;
 step s3txid: SELECT pg_current_xact_id() IS NULL;
 ?column?       
@@ -22,14 +22,14 @@ step s1init: <... completed>
 
 init           
 step s1insert: INSERT INTO do_write DEFAULT VALUES;
-step s1start: SELECT data  FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false', 'skip-empty-xacts', '1', 'two-phase-commit', '1');
+step s1start: SELECT data  FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false', 'skip-empty-xacts', '1');
 data           
 
 BEGIN          
 table public.do_write: INSERT: id[integer]:2
 COMMIT         
 step s2cp: COMMIT PREPARED 'test1';
-step s1start: SELECT data  FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false', 'skip-empty-xacts', '1', 'two-phase-commit', '1');
+step s1start: SELECT data  FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false', 'skip-empty-xacts', '1');
 data           
 
 BEGIN          
diff --git a/contrib/test_decoding/expected/twophase_stream.out b/contrib/test_decoding/expected/twophase_stream.out
index 3acc4acd3..b08bb0e 100644
--- a/contrib/test_decoding/expected/twophase_stream.out
+++ b/contrib/test_decoding/expected/twophase_stream.out
@@ -1,6 +1,6 @@
 -- Test streaming of two-phase commits
 SET synchronous_commit = on;
-SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding', false, true);
  ?column? 
 ----------
  init
@@ -28,7 +28,7 @@ ROLLBACK TO s1;
 INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
 PREPARE TRANSACTION 'test1';
 -- should show the inserts after a ROLLBACK
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
                            data                           
 ----------------------------------------------------------
  streaming message: transactional: 1 prefix: test, sz: 50
@@ -59,33 +59,11 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-
 
 COMMIT PREPARED 'test1';
 --should show the COMMIT PREPARED and the other changes in the transaction
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
-                            data                             
--------------------------------------------------------------
- BEGIN
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa1'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa2'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa3'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa4'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa5'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa6'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa7'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa8'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa9'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa10'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa11'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa12'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa13'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa14'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa15'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa16'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa17'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa18'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa19'
- table public.stream_test: INSERT: data[text]:'aaaaaaaaaa20'
- PREPARE TRANSACTION 'test1'
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+          data           
+-------------------------
  COMMIT PREPARED 'test1'
-(23 rows)
+(1 row)
 
 -- streaming test with sub-transaction and PREPARE/COMMIT PREPARED but with
 -- filtered gid. gids with '_nodecode' will not be decoded at prepare time.
@@ -103,7 +81,7 @@ ROLLBACK to s1;
 INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
 PREPARE TRANSACTION 'test1_nodecode';
 -- should NOT show inserts after a ROLLBACK
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
                            data                           
 ----------------------------------------------------------
  streaming message: transactional: 1 prefix: test, sz: 50
@@ -111,7 +89,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-
 
 COMMIT PREPARED 'test1_nodecode';
 -- should show the inserts but not show a COMMIT PREPARED but a COMMIT
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
                             data                             
 -------------------------------------------------------------
  BEGIN
diff --git a/contrib/test_decoding/specs/twophase_snapshot.spec b/contrib/test_decoding/specs/twophase_snapshot.spec
index 3e70040..e8d9567 100644
--- a/contrib/test_decoding/specs/twophase_snapshot.spec
+++ b/contrib/test_decoding/specs/twophase_snapshot.spec
@@ -15,8 +15,8 @@ teardown
 session "s1"
 setup { SET synchronous_commit=on; }
 
-step "s1init" {SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');}
-step "s1start" {SELECT data  FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false', 'skip-empty-xacts', '1', 'two-phase-commit', '1');}
+step "s1init" {SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding', false, true);}
+step "s1start" {SELECT data  FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false', 'skip-empty-xacts', '1');}
 step "s1insert" { INSERT INTO do_write DEFAULT VALUES; }
 
 session "s2"
diff --git a/contrib/test_decoding/sql/twophase.sql b/contrib/test_decoding/sql/twophase.sql
index 894e4f5..17ada0f 100644
--- a/contrib/test_decoding/sql/twophase.sql
+++ b/contrib/test_decoding/sql/twophase.sql
@@ -1,7 +1,7 @@
 -- Test prepared transactions. When two-phase-commit is enabled, transactions are
 -- decoded at PREPARE time rather than at COMMIT PREPARED time.
 SET synchronous_commit = on;
-SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding', false, true);
 
 CREATE TABLE test_prepared1(id integer primary key);
 CREATE TABLE test_prepared2(id integer primary key);
@@ -12,20 +12,20 @@ BEGIN;
 INSERT INTO test_prepared1 VALUES (1);
 INSERT INTO test_prepared1 VALUES (2);
 -- should show nothing because the xact has not been prepared yet.
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 PREPARE TRANSACTION 'test_prepared#1';
 -- should show both the above inserts and the PREPARE TRANSACTION.
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 COMMIT PREPARED 'test_prepared#1';
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 
 -- Test that rollback of a prepared xact is decoded.
 BEGIN;
 INSERT INTO test_prepared1 VALUES (3);
 PREPARE TRANSACTION 'test_prepared#2';
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 ROLLBACK PREPARED 'test_prepared#2';
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 
 -- Test prepare of a xact containing ddl. Leaving xact uncommitted for next test.
 BEGIN;
@@ -38,7 +38,7 @@ FROM pg_locks
 WHERE locktype = 'relation'
   AND relation = 'test_prepared1'::regclass;
 -- The insert should show the newly altered column but not the DDL.
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 
 -- Test that we decode correctly while an uncommitted prepared xact
 -- with ddl exists.
@@ -47,14 +47,14 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
 -- the ALTER will stop us inserting into the other one.
 --
 INSERT INTO test_prepared2 VALUES (5);
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 
 COMMIT PREPARED 'test_prepared#3';
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 -- make sure stuff still works
 INSERT INTO test_prepared1 VALUES (6);
 INSERT INTO test_prepared2 VALUES (7);
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 
 -- Check 'CLUSTER' (as operation that hold exclusive lock) doesn't block
 -- logical decoding.
@@ -71,11 +71,11 @@ WHERE locktype = 'relation'
 -- The above CLUSTER command shouldn't cause a timeout on 2pc decoding. The
 -- call should return within a second.
 SET statement_timeout = '1s';
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 RESET statement_timeout;
 COMMIT PREPARED 'test_prepared_lock';
 -- consume the commit
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 
 -- Test savepoints and sub-xacts. Creating savepoints will create
 -- sub-xacts implicitly.
@@ -87,26 +87,26 @@ INSERT INTO test_prepared_savepoint VALUES (2);
 ROLLBACK TO SAVEPOINT test_savepoint;
 PREPARE TRANSACTION 'test_prepared_savepoint';
 -- should show only 1, not 2
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 COMMIT PREPARED 'test_prepared_savepoint';
 -- consume the commit
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 
 -- Test that a GID containing "_nodecode" gets decoded at commit prepared time.
 BEGIN;
 INSERT INTO test_prepared1 VALUES (20);
 PREPARE TRANSACTION 'test_prepared_nodecode';
 -- should show nothing
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 COMMIT PREPARED 'test_prepared_nodecode';
 -- should be decoded now
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 
 -- Test 8:
 -- cleanup and make sure results are also empty
 DROP TABLE test_prepared1;
 DROP TABLE test_prepared2;
 -- show results. There should be nothing to show
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 
 SELECT pg_drop_replication_slot('regression_slot');
diff --git a/contrib/test_decoding/sql/twophase_stream.sql b/contrib/test_decoding/sql/twophase_stream.sql
index e9dd44f..646076d 100644
--- a/contrib/test_decoding/sql/twophase_stream.sql
+++ b/contrib/test_decoding/sql/twophase_stream.sql
@@ -1,7 +1,7 @@
 -- Test streaming of two-phase commits
 
 SET synchronous_commit = on;
-SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding', false, true);
 
 CREATE TABLE stream_test(data text);
 
@@ -18,11 +18,11 @@ ROLLBACK TO s1;
 INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
 PREPARE TRANSACTION 'test1';
 -- should show the inserts after a ROLLBACK
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
 
 COMMIT PREPARED 'test1';
 --should show the COMMIT PREPARED and the other changes in the transaction
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
 
 -- streaming test with sub-transaction and PREPARE/COMMIT PREPARED but with
 -- filtered gid. gids with '_nodecode' will not be decoded at prepare time.
@@ -35,11 +35,11 @@ ROLLBACK to s1;
 INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
 PREPARE TRANSACTION 'test1_nodecode';
 -- should NOT show inserts after a ROLLBACK
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
 
 COMMIT PREPARED 'test1_nodecode';
 -- should show the inserts but not show a COMMIT PREPARED but a COMMIT
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
 
 DROP TABLE stream_test;
 SELECT pg_drop_replication_slot('regression_slot');
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 929255e..28c876d 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -164,7 +164,6 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 	ListCell   *option;
 	TestDecodingData *data;
 	bool		enable_streaming = false;
-	bool		enable_twophase = false;
 
 	data = palloc0(sizeof(TestDecodingData));
 	data->context = AllocSetContextCreate(ctx->context,
@@ -265,16 +264,6 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 						 errmsg("could not parse value \"%s\" for parameter \"%s\"",
 								strVal(elem->arg), elem->defname)));
 		}
-		else if (strcmp(elem->defname, "two-phase-commit") == 0)
-		{
-			if (elem->arg == NULL)
-				continue;
-			else if (!parse_bool(strVal(elem->arg), &enable_twophase))
-				ereport(ERROR,
-						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-						 errmsg("could not parse value \"%s\" for parameter \"%s\"",
-								strVal(elem->arg), elem->defname)));
-		}
 		else
 		{
 			ereport(ERROR,
@@ -286,7 +275,12 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 	}
 
 	ctx->streaming &= enable_streaming;
-	ctx->twophase &= enable_twophase;
+
+	/*
+	 * Disable two-phase here, it will be set in the core if it was
+	 * enabled whole creating the slot.
+	 */
+	ctx->twophase = false;
 }
 
 /* cleanup this plugin's resources */
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index cf705ed..562a7cb 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -55,7 +55,7 @@
 
 <programlisting>
 postgres=# -- Create a slot named 'regression_slot' using the output plugin 'test_decoding'
-postgres=# SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+postgres=# SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding', false, true);
     slot_name    |    lsn
 -----------------+-----------
  regression_slot | 0/16B1970
@@ -179,7 +179,7 @@ postgres=# BEGIN;
 postgres=*# INSERT INTO data(data) VALUES('5');
 postgres=*# PREPARE TRANSACTION 'test_prepared1';
 
-postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1');
+postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
     lsn    | xid |                          data                           
 -----------+-----+---------------------------------------------------------
  0/1689DC0 | 529 | BEGIN 529
@@ -188,7 +188,7 @@ postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NU
 (3 rows)
 
 postgres=# COMMIT PREPARED 'test_prepared1';
-postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1');
+postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL);
     lsn    | xid |                    data                    
 -----------+-----+--------------------------------------------
  0/1689DC0 | 529 | BEGIN 529
@@ -201,7 +201,7 @@ postgres=#-- you can also rollback a prepared transaction
 postgres=# BEGIN;
 postgres=*# INSERT INTO data(data) VALUES('6');
 postgres=*# PREPARE TRANSACTION 'test_prepared2';
-postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1');
+postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL);
     lsn    | xid |                          data                           
 -----------+-----+---------------------------------------------------------
  0/168A180 | 530 | BEGIN 530
@@ -210,7 +210,7 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU
 (3 rows)
 
 postgres=# ROLLBACK PREPARED 'test_prepared2';
-postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1');
+postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL);
     lsn    | xid |                     data                     
 -----------+-----+----------------------------------------------
  0/168A4B8 | 530 | ROLLBACK PREPARED 'test_prepared2', txid 530
@@ -822,10 +822,7 @@ typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx
       <parameter>gid</parameter> field, which is part of the
       <parameter>txn</parameter> parameter can be used in this callback to
       check if the plugin has already received this prepare in which case it
-      can skip the remaining changes of the transaction. This can only happen
-      if the user restarts the decoding after receiving the prepare for a
-      transaction but before receiving the commit prepared say because of some
-      error.
+      can either error out or skip the remaining changes of the transaction.
       <programlisting>
        typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx,
                                                     ReorderBufferTXN *txn);
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index fa58afd..f6c5fc5 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1318,6 +1318,7 @@ AS 'pg_create_physical_replication_slot';
 CREATE OR REPLACE FUNCTION pg_create_logical_replication_slot(
     IN slot_name name, IN plugin name,
     IN temporary boolean DEFAULT false,
+    IN twophase boolean DEFAULT false,
     OUT slot_name name, OUT lsn pg_lsn)
 RETURNS RECORD
 LANGUAGE INTERNAL
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index afa1df0..1be4715 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -663,6 +663,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 	XLogRecPtr	origin_lsn = InvalidXLogRecPtr;
 	TimestampTz commit_time = parsed->xact_time;
 	RepOriginId origin_id = XLogRecGetOrigin(buf->record);
+	XLogRecPtr  snapshot_was_exported_at_lsn = InvalidXLogRecPtr;
 	int			i;
 
 	if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
@@ -715,7 +716,14 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 	 */
 	if (two_phase)
 	{
+		/*
+		 * Get the LSN at which the snapshot for this slot was exported.
+		 * ReorderBufferFinishPrepared will decide based on this if the
+		 * transaction should be replayed on COMMIT PREPARED.
+		 */
+		snapshot_was_exported_at_lsn = SnapBuildExportLSNAt(ctx->snapshot_builder);
 		ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
+									snapshot_was_exported_at_lsn,
 									commit_time, origin_id, origin_lsn,
 									parsed->twophase_gid, true);
 	}
@@ -774,7 +782,6 @@ DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 	/* We can't start streaming unless a consistent state is reached. */
 	if (SnapBuildCurrentState(builder) < SNAPBUILD_CONSISTENT)
 	{
-		ReorderBufferSkipPrepare(ctx->reorder, xid);
 		return;
 	}
 
@@ -792,7 +799,6 @@ DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 	 */
 	if (DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id))
 	{
-		ReorderBufferSkipPrepare(ctx->reorder, xid);
 		ReorderBufferInvalidate(ctx->reorder, xid, buf->origptr);
 		return;
 	}
@@ -854,6 +860,7 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 	{
 		ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
 									abort_time, origin_id, origin_lsn,
+									InvalidXLogRecPtr,
 									parsed->twophase_gid, false);
 	}
 	else
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index baeb45f..8555f5e 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -207,7 +207,7 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->reorder = ReorderBufferAllocate();
 	ctx->snapshot_builder =
 		AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn,
-								need_full_snapshot);
+								need_full_snapshot, slot->data.snapshot_was_exported_at_lsn);
 
 	ctx->reorder->private_data = ctx;
 
@@ -590,6 +590,17 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
 
 	SpinLockAcquire(&slot->mutex);
 	slot->data.confirmed_flush = ctx->reader->EndRecPtr;
+
+	/*
+	 * The snapshot_was_exported_at_lsn point is required in two-phase
+	 * commits to handle prepared transactions that were not part of this
+	 * snapshot at export time. PREPAREs prior to this point need special
+	 * handling if two-phase commits are enabled.
+	 * The snapshot_was_exported_at_lsn is only updated once when
+	 * the slot is created and is not modified on restarts unlike the
+	 * confirmed_flush point.
+	 */
+	slot->data.snapshot_was_exported_at_lsn = ctx->reader->EndRecPtr;
 	SpinLockRelease(&slot->mutex);
 }
 
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index f7e0558..4a919d1 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -239,6 +239,14 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 									LogicalOutputPrepareWrite,
 									LogicalOutputWrite, NULL);
 
+		/* If twophase is set on the slot at create time, then
+		 * make sure the field in the context is also updated
+		 */
+		if (MyReplicationSlot->data.twophase)
+		{
+			ctx->twophase = true;
+		}
+
 		/*
 		 * After the sanity checks in CreateDecodingContext, make sure the
 		 * restart_lsn is valid.  Avoid "cannot get changes" wording in this
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index c3b9632..9a95a15 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2623,21 +2623,6 @@ ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid,
 	return true;
 }
 
-/* Remember that we have skipped prepare */
-void
-ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid)
-{
-	ReorderBufferTXN *txn;
-
-	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
-
-	/* unknown transaction, nothing to do */
-	if (txn == NULL)
-		return;
-
-	txn->txn_flags |= RBTXN_SKIPPED_PREPARE;
-}
-
 /*
  * Prepare a two-phase transaction.
  *
@@ -2672,6 +2657,7 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
 void
 ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
 							XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+							XLogRecPtr snapshot_was_exported_at_lsn,
 							TimestampTz commit_time, RepOriginId origin_id,
 							XLogRecPtr origin_lsn, char *gid, bool is_commit)
 {
@@ -2696,14 +2682,12 @@ ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
 	txn->gid = pstrdup(gid);
 
 	/*
-	 * It is possible that this transaction is not decoded at prepare time
-	 * either because by that time we didn't have a consistent snapshot or it
-	 * was decoded earlier but we have restarted. We can't distinguish between
-	 * those two cases so we send the prepare in both the cases and let
-	 * downstream decide whether to process or skip it. We don't need to
-	 * decode the xact for aborts if it is not done already.
+	 * It is possible that this transaction was not decoded at prepare time
+	 * because by that time we didn't have a consistent snapshot.
+	 * In which case we need to replay the prepared transaction here because
+	 * downstream would not have seen this transaction yet.
 	 */
-	if (!rbtxn_prepared(txn) && is_commit)
+	if ((txn->final_lsn < snapshot_was_exported_at_lsn) && is_commit)
 	{
 		txn->txn_flags |= RBTXN_PREPARE;
 
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index e117887..7622b1d 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -165,6 +165,12 @@ struct SnapBuild
 	XLogRecPtr	start_decoding_at;
 
 	/*
+	 * In two-phase commits, if the PREPARE is prior to this LSN, then the
+	 * whole transaction needs to be replayed at COMMIT PREPARED.
+	 */
+	XLogRecPtr  snapshot_was_exported_at_lsn;
+
+	/*
 	 * Don't start decoding WAL until the "xl_running_xacts" information
 	 * indicates there are no running xids with an xid smaller than this.
 	 */
@@ -269,7 +275,8 @@ SnapBuild *
 AllocateSnapshotBuilder(ReorderBuffer *reorder,
 						TransactionId xmin_horizon,
 						XLogRecPtr start_lsn,
-						bool need_full_snapshot)
+						bool need_full_snapshot,
+						XLogRecPtr snapshot_was_exported_at_lsn)
 {
 	MemoryContext context;
 	MemoryContext oldcontext;
@@ -297,6 +304,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder,
 	builder->initial_xmin_horizon = xmin_horizon;
 	builder->start_decoding_at = start_lsn;
 	builder->building_full_snapshot = need_full_snapshot;
+	builder->snapshot_was_exported_at_lsn = snapshot_was_exported_at_lsn;
 
 	MemoryContextSwitchTo(oldcontext);
 
@@ -357,6 +365,15 @@ SnapBuildCurrentState(SnapBuild *builder)
 }
 
 /*
+ * Return the LSN at which the snapshot was exported
+ */
+XLogRecPtr
+SnapBuildExportLSNAt(SnapBuild *builder)
+{
+	return builder->snapshot_was_exported_at_lsn;
+}
+
+/*
  * Should the contents of transaction ending at 'ptr' be decoded?
  */
 bool
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index eb283a8..aeec791 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -84,6 +84,7 @@ static SQLCmd *make_sqlcmd(void);
 %token K_SLOT
 %token K_RESERVE_WAL
 %token K_TEMPORARY
+%token K_TWOPHASE
 %token K_EXPORT_SNAPSHOT
 %token K_NOEXPORT_SNAPSHOT
 %token K_USE_SNAPSHOT
@@ -102,6 +103,7 @@ static SQLCmd *make_sqlcmd(void);
 %type <node>	plugin_opt_arg
 %type <str>		opt_slot var_name
 %type <boolval>	opt_temporary
+%type <boolval>	opt_twophase
 %type <list>	create_slot_opt_list
 %type <defelt>	create_slot_opt
 
@@ -242,15 +244,16 @@ create_replication_slot:
 					$$ = (Node *) cmd;
 				}
 			/* CREATE_REPLICATION_SLOT slot TEMPORARY LOGICAL plugin */
-			| K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_LOGICAL IDENT create_slot_opt_list
+			| K_CREATE_REPLICATION_SLOT IDENT opt_temporary opt_twophase K_LOGICAL IDENT create_slot_opt_list
 				{
 					CreateReplicationSlotCmd *cmd;
 					cmd = makeNode(CreateReplicationSlotCmd);
 					cmd->kind = REPLICATION_KIND_LOGICAL;
 					cmd->slotname = $2;
 					cmd->temporary = $3;
-					cmd->plugin = $5;
-					cmd->options = $6;
+					cmd->twophase = $4;
+					cmd->plugin = $6;
+					cmd->options = $7;
 					$$ = (Node *) cmd;
 				}
 			;
@@ -365,6 +368,11 @@ opt_temporary:
 			| /* EMPTY */					{ $$ = false; }
 			;
 
+opt_twophase:
+			K_TWOPHASE						{ $$ = true; }
+			| /* EMPTY */					{ $$ = false; }
+			;
+
 opt_slot:
 			K_SLOT IDENT
 				{ $$ = $2; }
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index dcc3c3f..3032c28 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -103,6 +103,7 @@ RESERVE_WAL			{ return K_RESERVE_WAL; }
 LOGICAL				{ return K_LOGICAL; }
 SLOT				{ return K_SLOT; }
 TEMPORARY			{ return K_TEMPORARY; }
+TWOPHASE			{ return K_TWOPHASE; }
 EXPORT_SNAPSHOT		{ return K_EXPORT_SNAPSHOT; }
 NOEXPORT_SNAPSHOT	{ return K_NOEXPORT_SNAPSHOT; }
 USE_SNAPSHOT		{ return K_USE_SNAPSHOT; }
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index fb4af2e..38c385b 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -219,7 +219,7 @@ ReplicationSlotValidateName(const char *name, int elevel)
  */
 void
 ReplicationSlotCreate(const char *name, bool db_specific,
-					  ReplicationSlotPersistency persistency)
+					  ReplicationSlotPersistency persistency, bool twophase)
 {
 	ReplicationSlot *slot = NULL;
 	int			i;
@@ -277,6 +277,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 	namestrcpy(&slot->data.name, name);
 	slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
 	slot->data.persistency = persistency;
+	slot->data.twophase    = twophase;
 
 	/* and then data only present in shared memory */
 	slot->just_dirtied = false;
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index d24bb5b..a441fa4 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -50,7 +50,7 @@ create_physical_replication_slot(char *name, bool immediately_reserve,
 
 	/* acquire replication slot, this will check for conflicting names */
 	ReplicationSlotCreate(name, false,
-						  temporary ? RS_TEMPORARY : RS_PERSISTENT);
+						  temporary ? RS_TEMPORARY : RS_PERSISTENT, false);
 
 	if (immediately_reserve)
 	{
@@ -124,7 +124,8 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
  */
 static void
 create_logical_replication_slot(char *name, char *plugin,
-								bool temporary, XLogRecPtr restart_lsn,
+								bool temporary, bool twophase,
+								XLogRecPtr restart_lsn,
 								bool find_startpoint)
 {
 	LogicalDecodingContext *ctx = NULL;
@@ -140,7 +141,7 @@ create_logical_replication_slot(char *name, char *plugin,
 	 * error as well.
 	 */
 	ReplicationSlotCreate(name, true,
-						  temporary ? RS_TEMPORARY : RS_EPHEMERAL);
+						  temporary ? RS_TEMPORARY : RS_EPHEMERAL, twophase);
 
 	/*
 	 * Create logical decoding context to find start point or, if we don't
@@ -177,6 +178,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
 	Name		name = PG_GETARG_NAME(0);
 	Name		plugin = PG_GETARG_NAME(1);
 	bool		temporary = PG_GETARG_BOOL(2);
+	bool		twophase = PG_GETARG_BOOL(3);
 	Datum		result;
 	TupleDesc	tupdesc;
 	HeapTuple	tuple;
@@ -193,6 +195,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
 	create_logical_replication_slot(NameStr(*name),
 									NameStr(*plugin),
 									temporary,
+									twophase,
 									InvalidXLogRecPtr,
 									true);
 
@@ -796,6 +799,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
 		create_logical_replication_slot(NameStr(*dst_name),
 										plugin,
 										temporary,
+										false,
 										src_restart_lsn,
 										false);
 	}
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 8124454..9146e62 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -937,7 +937,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 	if (cmd->kind == REPLICATION_KIND_PHYSICAL)
 	{
 		ReplicationSlotCreate(cmd->slotname, false,
-							  cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT);
+							  cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT,
+							  false);
 	}
 	else
 	{
@@ -951,7 +952,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 		 * they get dropped on error as well.
 		 */
 		ReplicationSlotCreate(cmd->slotname, true,
-							  cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL);
+							  cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL,
+							  cmd->twophase);
 	}
 
 	if (cmd->kind == REPLICATION_KIND_LOGICAL)
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 1604412..8459488 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -10502,10 +10502,10 @@
   prosrc => 'pg_get_replication_slots' },
 { oid => '3786', descr => 'set up a logical replication slot',
   proname => 'pg_create_logical_replication_slot', provolatile => 'v',
-  proparallel => 'u', prorettype => 'record', proargtypes => 'name name bool',
-  proallargtypes => '{name,name,bool,name,pg_lsn}',
-  proargmodes => '{i,i,i,o,o}',
-  proargnames => '{slot_name,plugin,temporary,slot_name,lsn}',
+  proparallel => 'u', prorettype => 'record', proargtypes => 'name name bool bool',
+  proallargtypes => '{name,name,bool,bool,name,pg_lsn}',
+  proargmodes => '{i,i,i,i,o,o}',
+  proargnames => '{slot_name,plugin,temporary,twophase,slot_name,lsn}',
   prosrc => 'pg_create_logical_replication_slot' },
 { oid => '4222',
   descr => 'copy a logical replication slot, changing temporality and plugin',
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index faa3a25..1a933e2 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -56,6 +56,7 @@ typedef struct CreateReplicationSlotCmd
 	ReplicationKind kind;
 	char	   *plugin;
 	bool		temporary;
+	bool		twophase;
 	List	   *options;
 } CreateReplicationSlotCmd;
 
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index bab31bf..e1842c0 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -643,6 +643,7 @@ void		ReorderBufferCommit(ReorderBuffer *, TransactionId,
 								TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
 void		ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
 										XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+										XLogRecPtr snapshot_consistency_lsn,
 										TimestampTz commit_time,
 										RepOriginId origin_id, XLogRecPtr origin_lsn,
 										char *gid, bool is_commit);
@@ -676,6 +677,7 @@ bool		ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid,
 											 TimestampTz prepare_time,
 											 RepOriginId origin_id, XLogRecPtr origin_lsn);
 void		ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid);
+void		ReorderBufferMarkPrepare(ReorderBuffer *rb, TransactionId xid);
 void		ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, char *gid);
 ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *);
 TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb);
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 38a9a0b..9452604 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -91,6 +91,18 @@ typedef struct ReplicationSlotPersistentData
 	 */
 	XLogRecPtr	confirmed_flush;
 
+	/*
+	 * LSN at which this slot found consistent point and snapshot exported.
+	 * This is required for two-phase transactions to decide if the whole
+	 * transaction should be replayed at COMMIT PREPARED.
+	 */
+	XLogRecPtr  snapshot_was_exported_at_lsn;
+
+	/*
+	 * Is the slot two-phase enabled?
+	 */
+	bool        twophase;
+
 	/* plugin name */
 	NameData	plugin;
 } ReplicationSlotPersistentData;
@@ -192,7 +204,7 @@ extern void ReplicationSlotsShmemInit(void);
 
 /* management of individual slots */
 extern void ReplicationSlotCreate(const char *name, bool db_specific,
-								  ReplicationSlotPersistency p);
+								  ReplicationSlotPersistency p, bool twophase);
 extern void ReplicationSlotPersist(void);
 extern void ReplicationSlotDrop(const char *name, bool nowait);
 
diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h
index d9f187a..0693115 100644
--- a/src/include/replication/snapbuild.h
+++ b/src/include/replication/snapbuild.h
@@ -61,7 +61,8 @@ extern void CheckPointSnapBuild(void);
 
 extern SnapBuild *AllocateSnapshotBuilder(struct ReorderBuffer *cache,
 										  TransactionId xmin_horizon, XLogRecPtr start_lsn,
-										  bool need_full_snapshot);
+										  bool need_full_snapshot,
+										  XLogRecPtr snapshot_was_exported_at_lsn);
 extern void FreeSnapshotBuilder(SnapBuild *cache);
 
 extern void SnapBuildSnapDecRefcount(Snapshot snap);
@@ -75,6 +76,7 @@ extern Snapshot SnapBuildGetOrBuildSnapshot(SnapBuild *builder,
 											TransactionId xid);
 
 extern bool SnapBuildXactNeedsSkip(SnapBuild *snapstate, XLogRecPtr ptr);
+extern XLogRecPtr SnapBuildExportLSNAt(SnapBuild *builder);
 
 extern void SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn,
 							   TransactionId xid, int nsubxacts,
-- 
1.8.3.1

