On 28 March 2017 at 09:25, Andres Freund <and...@anarazel.de> wrote:

> If you actually need separate decoding of 2PC, then you want to wait for
> the PREPARE to be replicated.  If that replication has to wait for the
> to-be-replicated prepared transaction to commit prepared, and commit
> prepare will only happen once replication happened...

In other words, the output plugin cannot decode a transaction at
PREPARE TRANSACTION time if that xact holds an AccessExclusiveLock on
a catalog relation we must be able to read in order to decode the
xact.

>> Is there any other scenarios where catalog readers are blocked except 
>> explicit lock
>> on catalog table? Alters on catalogs seems to be prohibited.
>
> VACUUM FULL on catalog tables (but that can't happen in xact => 2pc)
> CLUSTER on catalog tables (can happen in xact)
> ALTER on tables modified in the same transaction (even of non catalog
> tables!), because a lot of routines will do a heap_open() to get the
> tupledesc etc.

Right, and the latter one is the main issue, since it's by far the
most likely and hard to just work around.

The tests Stas has in place aren't sufficient to cover this, as they
decode only after everything has committed. I'm expanding the
pg_regress coverage to do decoding between prepare and commit (when we
actually care) first, and will add some tests involving strong locks.
I've found one bug where it doesn't decode a 2pc xact at prepare or
commit time, even without restart or strong lock issues. Pretty sure
it's due to assumptions made about the filter callback.

The current code as used by test_decoding won't work correctly. If
txn->has_catalog_changes and if it's still in-progress, the filter
skips decoding at PREPARE time. But it isn't then decoded at COMMIT
PREPARED time either, if we processed past the PREPARE TRANSACTION.
Bug.

Also, by skipping decoding of 2pc xacts with catalog changes in this
test we also hide the locking issues.

However, even once I add an option to force decoding of 2pc xacts with
catalog changes to test_decoding, I cannot reproduce the expected
locking issues so far. See tests in attached updated version, in
contrib/test_decoding/sql/prepare.sql .

Haven't done any TAP tests yet, since the pg_regress tests are so far
sufficient to turn up issues.

-- 
 Craig Ringer                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services
From e23909a9929b561e011f41891825bfb5b1ecb1b3 Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Tue, 28 Mar 2017 10:51:48 +0800
Subject: [PATCH] Logical decoding of two-phase commit transactions at PREPARE
 time

---
 contrib/test_decoding/expected/prepared.out     | 253 ++++++++++++++++++++----
 contrib/test_decoding/sql/prepared.sql          |  82 +++++++-
 contrib/test_decoding/test_decoding.c           | 205 ++++++++++++++++++-
 src/backend/access/rmgrdesc/xactdesc.c          |  37 +++-
 src/backend/access/transam/twophase.c           |  89 ++++++++-
 src/backend/access/transam/xact.c               |  72 ++++++-
 src/backend/replication/logical/decode.c        | 148 ++++++++++++--
 src/backend/replication/logical/logical.c       | 141 +++++++++++++
 src/backend/replication/logical/reorderbuffer.c | 129 +++++++++++-
 src/backend/replication/logical/snapbuild.c     |  48 ++++-
 src/include/access/twophase.h                   |   3 +
 src/include/access/xact.h                       |  44 ++++-
 src/include/replication/logical.h               |   6 +-
 src/include/replication/output_plugin.h         |  36 ++++
 src/include/replication/reorderbuffer.h         |  50 +++++
 src/include/replication/snapbuild.h             |   4 +
 16 files changed, 1254 insertions(+), 93 deletions(-)

diff --git a/contrib/test_decoding/expected/prepared.out b/contrib/test_decoding/expected/prepared.out
index 46e915d..56c6e72 100644
--- a/contrib/test_decoding/expected/prepared.out
+++ b/contrib/test_decoding/expected/prepared.out
@@ -6,19 +6,84 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d
  init
 (1 row)
 
-CREATE TABLE test_prepared1(id int);
-CREATE TABLE test_prepared2(id int);
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_2pc', 'test_decoding');
+ ?column? 
+----------
+ init
+(1 row)
+
+CREATE TABLE test_prepared1(id integer primary key);
+CREATE TABLE test_prepared2(id integer primary key);
+-- Reused queries
+\set get_no2pc 'SELECT data FROM pg_logical_slot_get_changes(''regression_slot_2pc'', NULL, NULL, ''include-xids'', ''0'', ''skip-empty-xacts'', ''1'');'
+\set get_with2pc 'SELECT data FROM pg_logical_slot_get_changes(''regression_slot'', NULL, NULL, ''include-xids'', ''0'', ''skip-empty-xacts'', ''1'', ''twophase-decoding'', ''1'');'
+\set get_with2pc_nofilter 'SELECT data FROM pg_logical_slot_get_changes(''regression_slot'', NULL, NULL, ''include-xids'', ''0'', ''skip-empty-xacts'', ''1'', ''twophase-decoding'', ''1'', ''twophase-decode-with-catalog-changes'', ''1'');'
 -- test simple successful use of a prepared xact
 BEGIN;
 INSERT INTO test_prepared1 VALUES (1);
 PREPARE TRANSACTION 'test_prepared#1';
+:get_with2pc
+                        data                        
+----------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:1
+ PREPARE TRANSACTION 'test_prepared#1'
+(3 rows)
+
+:get_no2pc
+ data 
+------
+(0 rows)
+
 COMMIT PREPARED 'test_prepared#1';
+:get_with2pc
+ data 
+------
+(0 rows)
+
+:get_no2pc
+                        data                        
+----------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:1
+ COMMIT
+(3 rows)
+
 INSERT INTO test_prepared1 VALUES (2);
 -- test abort of a prepared xact
 BEGIN;
 INSERT INTO test_prepared1 VALUES (3);
 PREPARE TRANSACTION 'test_prepared#2';
+:get_no2pc
+                        data                        
+----------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:2
+ COMMIT
+(3 rows)
+
+:get_with2pc
+                        data                        
+----------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:2
+ COMMIT
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:3
+ PREPARE TRANSACTION 'test_prepared#2'
+(6 rows)
+
 ROLLBACK PREPARED 'test_prepared#2';
+:get_no2pc
+ data 
+------
+(0 rows)
+
+:get_with2pc
+ data 
+------
+(0 rows)
+
 INSERT INTO test_prepared1 VALUES (4);
 -- test prepared xact containing ddl
 BEGIN;
@@ -26,49 +91,169 @@ INSERT INTO test_prepared1 VALUES (5);
 ALTER TABLE test_prepared1 ADD COLUMN data text;
 INSERT INTO test_prepared1 VALUES (6, 'frakbar');
 PREPARE TRANSACTION 'test_prepared#3';
--- test that we decode correctly while an uncommitted prepared xact
--- with ddl exists.
--- separate table because of the lock from the ALTER
--- this will come before the '5' row above, as this commits before it.
+SELECT 'test_prepared_1' AS relation, locktype, mode
+FROM pg_locks
+WHERE locktype = 'relation'
+  AND relation = 'test_prepared1'::regclass;
+    relation     | locktype |        mode         
+-----------------+----------+---------------------
+ test_prepared_1 | relation | RowExclusiveLock
+ test_prepared_1 | relation | AccessExclusiveLock
+(2 rows)
+
+:get_no2pc
+                        data                        
+----------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:4
+ COMMIT
+(3 rows)
+
+:get_with2pc
+                        data                        
+----------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:4
+ COMMIT
+(3 rows)
+
+-- Test that we decode correctly while an uncommitted prepared xact
+-- with ddl exists. Our 2pc filter callback will skip decoding of xacts
+-- with catalog changes at PREPARE time, so we don't decode it now.
+--
+-- Use a separate table for the concurrent transaction because the lock from
+-- the ALTER will stop us inserting into the other one.
+--
+-- We should see '7' before '5' in our results since it commits first.
+--
 INSERT INTO test_prepared2 VALUES (7);
+:get_with2pc
+                        data                        
+----------------------------------------------------
+ BEGIN
+ table public.test_prepared2: INSERT: id[integer]:7
+ COMMIT
+(3 rows)
+
+:get_no2pc
+                        data                        
+----------------------------------------------------
+ BEGIN
+ table public.test_prepared2: INSERT: id[integer]:7
+ COMMIT
+(3 rows)
+
 COMMIT PREPARED 'test_prepared#3';
+:get_no2pc
+                                  data                                   
+-------------------------------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:5
+ table public.test_prepared1: INSERT: id[integer]:6 data[text]:'frakbar'
+ COMMIT
+(4 rows)
+
+:get_with2pc
+                                  data                                   
+-------------------------------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:5
+ table public.test_prepared1: INSERT: id[integer]:6 data[text]:'frakbar'
+ PREPARE TRANSACTION 'test_prepared#3';
+ COMMIT PREPARED 'test_prepared#3';
+(5 rows)
+
 -- make sure stuff still works
 INSERT INTO test_prepared1 VALUES (8);
 INSERT INTO test_prepared2 VALUES (9);
+:get_with2pc
+                                data                                
+--------------------------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:8 data[text]:null
+ COMMIT
+ BEGIN
+ table public.test_prepared2: INSERT: id[integer]:9
+ COMMIT
+(6 rows)
+
+:get_no2pc
+                                data                                
+--------------------------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:8 data[text]:null
+ COMMIT
+ BEGIN
+ table public.test_prepared2: INSERT: id[integer]:9
+ COMMIT
+(6 rows)
+
+-- If we do something that takes a strong lock on a catalog relation we need to
+-- read in order to decode a transaction we deadlock; we can't finish decoding
+-- until the lock is released, but we're waiting for decoding to finish so we
+-- can make a commit/abort decision.
+---
+BEGIN;
+INSERT INTO test_prepared1 VALUES (10, 'othercol');
+CLUSTER test_prepared1 USING test_prepared1_pkey;
+INSERT INTO test_prepared1 VALUES (11, 'othercol2');
+PREPARE TRANSACTION 'test_prepared_lock';
+SELECT 'pg_class' AS relation, locktype, mode
+FROM pg_locks
+WHERE locktype = 'relation'
+  AND relation = 'pg_class'::regclass;
+ relation | locktype | mode 
+----------+----------+------
+(0 rows)
+
+-- Shouldn't see anything with 2pc decoding off
+:get_no2pc
+ data 
+------
+(0 rows)
+
+-- If we try to decode it now we'll deadlock
+SET statement_timeout = '10s';
+:get_with2pc_nofilter
+-- FIXME we expect a timeout here, but it actually works...
+ERROR: statement timed out
+
+RESET statement_timeout;
+-- we can decode past it by skipping xacts with catalog changes
+-- and let it be decoded after COMMIT PREPARED, though.
+:get_with2pc
+ data 
+------
+(0 rows)
+
+COMMIT PREPARED 'test_prepared_lock';
+-- Both will work normally after we commit
+:get_no2pc
+                                    data                                    
+----------------------------------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:10 data[text]:'othercol'
+ table public.test_prepared1: INSERT: id[integer]:11 data[text]:'othercol2'
+ COMMIT
+(4 rows)
+
+:get_with2pc
+ data 
+------
+(0 rows)
+
 -- cleanup
 DROP TABLE test_prepared1;
 DROP TABLE test_prepared2;
--- show results
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
-                                  data                                   
--------------------------------------------------------------------------
- BEGIN
- table public.test_prepared1: INSERT: id[integer]:1
- COMMIT
- BEGIN
- table public.test_prepared1: INSERT: id[integer]:2
- COMMIT
- BEGIN
- table public.test_prepared1: INSERT: id[integer]:4
- COMMIT
- BEGIN
- table public.test_prepared2: INSERT: id[integer]:7
- COMMIT
- BEGIN
- table public.test_prepared1: INSERT: id[integer]:5
- table public.test_prepared1: INSERT: id[integer]:6 data[text]:'frakbar'
- COMMIT
- BEGIN
- table public.test_prepared1: INSERT: id[integer]:8 data[text]:null
- COMMIT
- BEGIN
- table public.test_prepared2: INSERT: id[integer]:9
- COMMIT
-(22 rows)
-
 SELECT pg_drop_replication_slot('regression_slot');
  pg_drop_replication_slot 
 --------------------------
  
 (1 row)
 
+SELECT pg_drop_replication_slot('regression_slot_2pc');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
diff --git a/contrib/test_decoding/sql/prepared.sql b/contrib/test_decoding/sql/prepared.sql
index e726397..a94503c 100644
--- a/contrib/test_decoding/sql/prepared.sql
+++ b/contrib/test_decoding/sql/prepared.sql
@@ -1,22 +1,36 @@
 -- predictability
 SET synchronous_commit = on;
 SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_2pc', 'test_decoding');
 
-CREATE TABLE test_prepared1(id int);
-CREATE TABLE test_prepared2(id int);
+CREATE TABLE test_prepared1(id integer primary key);
+CREATE TABLE test_prepared2(id integer primary key);
+
+-- Reused queries
+\set get_no2pc 'SELECT data FROM pg_logical_slot_get_changes(''regression_slot_2pc'', NULL, NULL, ''include-xids'', ''0'', ''skip-empty-xacts'', ''1'');'
+\set get_with2pc 'SELECT data FROM pg_logical_slot_get_changes(''regression_slot'', NULL, NULL, ''include-xids'', ''0'', ''skip-empty-xacts'', ''1'', ''twophase-decoding'', ''1'');'
+\set get_with2pc_nofilter 'SELECT data FROM pg_logical_slot_get_changes(''regression_slot'', NULL, NULL, ''include-xids'', ''0'', ''skip-empty-xacts'', ''1'', ''twophase-decoding'', ''1'', ''twophase-decode-with-catalog-changes'', ''1'');'
 
 -- test simple successful use of a prepared xact
 BEGIN;
 INSERT INTO test_prepared1 VALUES (1);
 PREPARE TRANSACTION 'test_prepared#1';
+:get_with2pc
+:get_no2pc
 COMMIT PREPARED 'test_prepared#1';
+:get_with2pc
+:get_no2pc
 INSERT INTO test_prepared1 VALUES (2);
 
 -- test abort of a prepared xact
 BEGIN;
 INSERT INTO test_prepared1 VALUES (3);
 PREPARE TRANSACTION 'test_prepared#2';
+:get_no2pc
+:get_with2pc
 ROLLBACK PREPARED 'test_prepared#2';
+:get_no2pc
+:get_with2pc
 
 INSERT INTO test_prepared1 VALUES (4);
 
@@ -27,24 +41,74 @@ ALTER TABLE test_prepared1 ADD COLUMN data text;
 INSERT INTO test_prepared1 VALUES (6, 'frakbar');
 PREPARE TRANSACTION 'test_prepared#3';
 
--- test that we decode correctly while an uncommitted prepared xact
--- with ddl exists.
+SELECT 'test_prepared_1' AS relation, locktype, mode
+FROM pg_locks
+WHERE locktype = 'relation'
+  AND relation = 'test_prepared1'::regclass;
 
--- separate table because of the lock from the ALTER
--- this will come before the '5' row above, as this commits before it.
+:get_no2pc
+:get_with2pc
+
+-- Test that we decode correctly while an uncommitted prepared xact
+-- with ddl exists. Our 2pc filter callback will skip decoding of xacts
+-- with catalog changes at PREPARE time, so we don't decode it now.
+--
+-- Use a separate table for the concurrent transaction because the lock from
+-- the ALTER will stop us inserting into the other one.
+--
+-- We should see '7' before '5' in our results since it commits first.
+--
 INSERT INTO test_prepared2 VALUES (7);
+:get_with2pc
+:get_no2pc
 
 COMMIT PREPARED 'test_prepared#3';
+:get_no2pc
+:get_with2pc
 
 -- make sure stuff still works
 INSERT INTO test_prepared1 VALUES (8);
 INSERT INTO test_prepared2 VALUES (9);
+:get_with2pc
+:get_no2pc
+
+-- If we do something that takes a strong lock on a catalog relation we need to
+-- read in order to decode a transaction we deadlock; we can't finish decoding
+-- until the lock is released, but we're waiting for decoding to finish so we
+-- can make a commit/abort decision.
+---
+BEGIN;
+INSERT INTO test_prepared1 VALUES (10, 'othercol');
+CLUSTER test_prepared1 USING test_prepared1_pkey;
+INSERT INTO test_prepared1 VALUES (11, 'othercol2');
+PREPARE TRANSACTION 'test_prepared_lock';
+
+SELECT 'pg_class' AS relation, locktype, mode
+FROM pg_locks
+WHERE locktype = 'relation'
+  AND relation = 'pg_class'::regclass;
+
+-- Shouldn't see anything with 2pc decoding off
+:get_no2pc
+
+-- If we try to decode it now we'll deadlock
+SET statement_timeout = '10s';
+:get_with2pc_nofilter
+RESET statement_timeout;
+
+-- we can decode past it by skipping xacts with catalog changes
+-- and let it be decoded after COMMIT PREPARED, though.
+:get_with2pc
+
+COMMIT PREPARED 'test_prepared_lock';
+
+-- Both will work normally after we commit
+:get_no2pc
+:get_with2pc
 
 -- cleanup
 DROP TABLE test_prepared1;
 DROP TABLE test_prepared2;
 
--- show results
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
-
 SELECT pg_drop_replication_slot('regression_slot');
+SELECT pg_drop_replication_slot('regression_slot_2pc');
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 21cfd67..0f0bb1b 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -24,6 +24,8 @@
 #include "replication/message.h"
 #include "replication/origin.h"
 
+#include "storage/procarray.h"
+
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
@@ -46,6 +48,8 @@ typedef struct
 	bool		skip_empty_xacts;
 	bool		xact_wrote_changes;
 	bool		only_local;
+	bool		twophase_decoding;
+	bool		twophase_decode_with_catalog_changes;
 } TestDecodingData;
 
 static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
@@ -68,6 +72,19 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
 				  ReorderBufferTXN *txn, XLogRecPtr message_lsn,
 				  bool transactional, const char *prefix,
 				  Size sz, const char *message);
+static bool pg_filter_prepare(LogicalDecodingContext *ctx,
+				  ReorderBufferTXN *txn,
+				  char *gid);
+static void pg_decode_prepare_txn(LogicalDecodingContext *ctx,
+				  ReorderBufferTXN *txn,
+				  XLogRecPtr prepare_lsn);
+static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx,
+				  ReorderBufferTXN *txn,
+				  XLogRecPtr commit_lsn);
+static void pg_decode_abort_prepared_txn(LogicalDecodingContext *ctx,
+				  ReorderBufferTXN *txn,
+				  XLogRecPtr abort_lsn);
+
 
 void
 _PG_init(void)
@@ -85,9 +102,15 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->begin_cb = pg_decode_begin_txn;
 	cb->change_cb = pg_decode_change;
 	cb->commit_cb = pg_decode_commit_txn;
+
 	cb->filter_by_origin_cb = pg_decode_filter;
 	cb->shutdown_cb = pg_decode_shutdown;
 	cb->message_cb = pg_decode_message;
+
+	cb->filter_prepare_cb = pg_filter_prepare;
+	cb->prepare_cb = pg_decode_prepare_txn;
+	cb->commit_prepared_cb = pg_decode_commit_prepared_txn;
+	cb->abort_prepared_cb = pg_decode_abort_prepared_txn;
 }
 
 
@@ -107,6 +130,8 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 	data->include_timestamp = false;
 	data->skip_empty_xacts = false;
 	data->only_local = false;
+	data->twophase_decoding = false;
+	data->twophase_decode_with_catalog_changes = false;
 
 	ctx->output_plugin_private = data;
 
@@ -176,6 +201,27 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 				  errmsg("could not parse value \"%s\" for parameter \"%s\"",
 						 strVal(elem->arg), elem->defname)));
 		}
+		else if (strcmp(elem->defname, "twophase-decoding") == 0)
+		{
+
+			if (elem->arg == NULL)
+				data->twophase_decoding = true;
+			else if (!parse_bool(strVal(elem->arg), &data->twophase_decoding))
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				  errmsg("could not parse value \"%s\" for parameter \"%s\"",
+						 strVal(elem->arg), elem->defname)));
+		}
+		else if (strcmp(elem->defname, "twophase-decode-with-catalog-changes") == 0)
+		{
+			if (elem->arg == NULL)
+				data->twophase_decode_with_catalog_changes = true;
+			else if (!parse_bool(strVal(elem->arg), &data->twophase_decode_with_catalog_changes))
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				  errmsg("could not parse value \"%s\" for parameter \"%s\"",
+						 strVal(elem->arg), elem->defname)));
+		}
 		else
 		{
 			ereport(ERROR,
@@ -232,10 +278,163 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 		return;
 
 	OutputPluginPrepareWrite(ctx, true);
+
+	appendStringInfoString(ctx->out, "COMMIT");
+
 	if (data->include_xids)
-		appendStringInfo(ctx->out, "COMMIT %u", txn->xid);
-	else
-		appendStringInfoString(ctx->out, "COMMIT");
+		appendStringInfo(ctx->out, " %u", txn->xid);
+
+	if (data->include_timestamp)
+		appendStringInfo(ctx->out, " (at %s)",
+						 timestamptz_to_str(txn->commit_time));
+
+	OutputPluginWrite(ctx, true);
+}
+
+
+/* Filter out unnecessary two-phase transactions */
+static bool
+pg_filter_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+					char *gid)
+{
+	TestDecodingData *data = ctx->output_plugin_private;
+
+	/* treat all transaction as one-phase */
+	if (!data->twophase_decoding)
+		return true;
+
+	/*
+	 * Two-phase transactions that accessed catalog require special
+	 * treatment.
+	 *
+	 * Right now we don't have a safe way to decode catalog changes made in
+	 * prepared transaction that was already aborted by the time of
+	 * decoding.
+	 *
+	 * That kind of problem arises only when we are trying to
+	 * retrospectively decode aborted transactions with catalog changes -
+	 * including if a transaction aborts while we're decoding it. If one
+	 * wants to code distributed commit based on prepare decoding then
+	 * commits/aborts will happend strictly after decoding will be
+	 * completed, so it is possible to skip any checks/locks here.
+	 *
+	 * We'll also get stuck trying to acquire locks on catalog relations
+	 * we need for decoding if the prepared xact holds a strong lock on
+	 * one of them and we also need to decode row changes.
+	 */
+	if (txn->has_catalog_changes)
+	{
+		LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
+
+		if (TransactionIdIsInProgress(txn->xid))
+		{
+			/*
+			 * For the sake of simplicity, by default we just
+			 * ignore in-progess prepared transactions with catalog
+			 * changes in this extension. If they abort during
+			 * decoding then tuples we need to decode them may be
+			 * overwritten while we're still decoding, causing
+			 * wrong catalog lookups.
+			 *
+			 * It is possible to move that LWLockRelease() to
+			 * pg_decode_prepare_txn() and allow decoding of
+			 * running prepared tx, but such lock will prevent any
+			 * 2pc transaction commit during decoding time.  That
+			 * can be a long time in case of lots of
+			 * changes/inserts in that tx or if the downstream is
+			 * slow/unresonsive.
+			 *
+			 * (Continuing to decode without the lock is unsafe, XXX)
+			 */
+			LWLockRelease(TwoPhaseStateLock);
+			return !data->twophase_decode_with_catalog_changes;
+		}
+		else if (TransactionIdDidAbort(txn->xid))
+		{
+			/*
+			 * Here we know that it is already aborted and there is
+			 * not much sense in doing something with this
+			 * transaction.  Consequently ABORT PREPARED will be
+			 * suppressed.
+			 */
+			LWLockRelease(TwoPhaseStateLock);
+			return true;
+		}
+
+		LWLockRelease(TwoPhaseStateLock);
+	}
+
+	return false;
+}
+
+
+/* PREPARE callback */
+static void
+pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+					XLogRecPtr prepare_lsn)
+{
+	TestDecodingData *data = ctx->output_plugin_private;
+
+	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+		return;
+
+	OutputPluginPrepareWrite(ctx, true);
+
+	appendStringInfo(ctx->out, "PREPARE TRANSACTION %s",
+		quote_literal_cstr(txn->gid));
+
+	if (data->include_xids)
+		appendStringInfo(ctx->out, " %u", txn->xid);
+
+	if (data->include_timestamp)
+		appendStringInfo(ctx->out, " (at %s)",
+						 timestamptz_to_str(txn->commit_time));
+
+	OutputPluginWrite(ctx, true);
+}
+
+/* COMMIT PREPARED callback */
+static void
+pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+					XLogRecPtr commit_lsn)
+{
+	TestDecodingData *data = ctx->output_plugin_private;
+
+	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+		return;
+
+	OutputPluginPrepareWrite(ctx, true);
+
+	appendStringInfo(ctx->out, "COMMIT PREPARED %s",
+		quote_literal_cstr(txn->gid));
+
+	if (data->include_xids)
+		appendStringInfo(ctx->out, " %u", txn->xid);
+
+	if (data->include_timestamp)
+		appendStringInfo(ctx->out, " (at %s)",
+						 timestamptz_to_str(txn->commit_time));
+
+	OutputPluginWrite(ctx, true);
+}
+
+/* ABORT PREPARED callback */
+static void
+pg_decode_abort_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+					XLogRecPtr abort_lsn)
+{
+	TestDecodingData *data = ctx->output_plugin_private;
+
+	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+		return;
+
+	OutputPluginPrepareWrite(ctx, true);
+
+	appendStringInfo(ctx->out, "ABORT PREPARED %s",
+		quote_literal_cstr(txn->gid));
+
+	if (data->include_xids)
+		appendStringInfo(ctx->out, " %u", txn->xid);
 
 	if (data->include_timestamp)
 		appendStringInfo(ctx->out, " (at %s)",
diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c
index 735f8c5..ed75503 100644
--- a/src/backend/access/rmgrdesc/xactdesc.c
+++ b/src/backend/access/rmgrdesc/xactdesc.c
@@ -100,8 +100,13 @@ ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *pars
 		xl_xact_twophase *xl_twophase = (xl_xact_twophase *) data;
 
 		parsed->twophase_xid = xl_twophase->xid;
-
 		data += sizeof(xl_xact_twophase);
+
+		if (parsed->xinfo & XACT_XINFO_HAS_GID)
+		{
+			strcpy(parsed->twophase_gid, data);
+			data += strlen(parsed->twophase_gid) + 1;
+		}
 	}
 
 	if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
@@ -139,6 +144,16 @@ ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed)
 		data += sizeof(xl_xact_xinfo);
 	}
 
+	if (parsed->xinfo & XACT_XINFO_HAS_DBINFO)
+	{
+		xl_xact_dbinfo *xl_dbinfo = (xl_xact_dbinfo *) data;
+
+		parsed->dbId = xl_dbinfo->dbId;
+		parsed->tsId = xl_dbinfo->tsId;
+
+		data += sizeof(xl_xact_dbinfo);
+	}
+
 	if (parsed->xinfo & XACT_XINFO_HAS_SUBXACTS)
 	{
 		xl_xact_subxacts *xl_subxacts = (xl_xact_subxacts *) data;
@@ -166,8 +181,26 @@ ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed)
 		xl_xact_twophase *xl_twophase = (xl_xact_twophase *) data;
 
 		parsed->twophase_xid = xl_twophase->xid;
-
 		data += sizeof(xl_xact_twophase);
+
+		if (parsed->xinfo & XACT_XINFO_HAS_GID)
+		{
+			strcpy(parsed->twophase_gid, data);
+			data += strlen(parsed->twophase_gid) + 1;
+		}
+	}
+
+	if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
+	{
+		xl_xact_origin xl_origin;
+
+		/* we're only guaranteed 4 byte alignment, so copy onto stack */
+		memcpy(&xl_origin, data, sizeof(xl_origin));
+
+		parsed->origin_lsn = xl_origin.origin_lsn;
+		parsed->origin_timestamp = xl_origin.origin_timestamp;
+
+		data += sizeof(xl_xact_origin);
 	}
 }
 
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 83169cc..b58b9a3 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -129,7 +129,6 @@ int			max_prepared_xacts = 0;
  * Note that the max value of GIDSIZE must fit in the uint16 gidlen,
  * specified in TwoPhaseFileHeader.
  */
-#define GIDSIZE 200
 
 typedef struct GlobalTransactionData
 {
@@ -187,12 +186,14 @@ static void RecordTransactionCommitPrepared(TransactionId xid,
 								RelFileNode *rels,
 								int ninvalmsgs,
 								SharedInvalidationMessage *invalmsgs,
-								bool initfileinval);
+								bool initfileinval,
+								const char *gid);
 static void RecordTransactionAbortPrepared(TransactionId xid,
 							   int nchildren,
 							   TransactionId *children,
 							   int nrels,
-							   RelFileNode *rels);
+							   RelFileNode *rels,
+							   const char *gid);
 static void ProcessRecords(char *bufptr, TransactionId xid,
 			   const TwoPhaseCallback callbacks[]);
 static void RemoveGXact(GlobalTransaction gxact);
@@ -854,7 +855,7 @@ TwoPhaseGetDummyProc(TransactionId xid)
 /*
  * Header for a 2PC state file
  */
-#define TWOPHASE_MAGIC	0x57F94533		/* format identifier */
+#define TWOPHASE_MAGIC	0x57F94534		/* format identifier */
 
 typedef struct TwoPhaseFileHeader
 {
@@ -870,6 +871,8 @@ typedef struct TwoPhaseFileHeader
 	int32		ninvalmsgs;		/* number of cache invalidation messages */
 	bool		initfileinval;	/* does relcache init file need invalidation? */
 	uint16		gidlen;			/* length of the GID - GID follows the header */
+	XLogRecPtr	origin_lsn;		/* lsn of this record at origin node */
+	TimestampTz origin_timestamp; /* time of prepare at origin node */
 } TwoPhaseFileHeader;
 
 /*
@@ -1021,6 +1024,7 @@ EndPrepare(GlobalTransaction gxact)
 {
 	TwoPhaseFileHeader *hdr;
 	StateFileChunk *record;
+	bool			replorigin;
 
 	/* Add the end sentinel to the list of 2PC records */
 	RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
@@ -1031,6 +1035,21 @@ EndPrepare(GlobalTransaction gxact)
 	Assert(hdr->magic == TWOPHASE_MAGIC);
 	hdr->total_len = records.total_len + sizeof(pg_crc32c);
 
+	replorigin = (replorigin_session_origin != InvalidRepOriginId &&
+				  replorigin_session_origin != DoNotReplicateId);
+
+	if (replorigin)
+	{
+		Assert(replorigin_session_origin_lsn != InvalidXLogRecPtr);
+		hdr->origin_lsn = replorigin_session_origin_lsn;
+		hdr->origin_timestamp = replorigin_session_origin_timestamp;
+	}
+	else
+	{
+		hdr->origin_lsn = InvalidXLogRecPtr;
+		hdr->origin_timestamp = 0;
+	}
+
 	/*
 	 * If the data size exceeds MaxAllocSize, we won't be able to read it in
 	 * ReadTwoPhaseFile. Check for that now, rather than fail in the case
@@ -1061,9 +1080,19 @@ EndPrepare(GlobalTransaction gxact)
 	MyPgXact->delayChkpt = true;
 
 	XLogBeginInsert();
+
 	for (record = records.head; record != NULL; record = record->next)
 		XLogRegisterData(record->data, record->len);
+
+	XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
+
 	gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
+
+	if (replorigin)
+		/* Move LSNs forward for this replication origin */
+		replorigin_session_advance(replorigin_session_origin_lsn,
+								   gxact->prepare_end_lsn);
+
 	XLogFlush(gxact->prepare_end_lsn);
 
 	/* If we crash now, we have prepared: WAL replay will fix things */
@@ -1239,6 +1268,43 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
 	return buf;
 }
 
+/*
+ * ParsePrepareRecord
+ */
+void
+ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed)
+{
+	TwoPhaseFileHeader *hdr;
+	char *bufptr;
+
+	hdr = (TwoPhaseFileHeader *) xlrec;
+	bufptr = xlrec + MAXALIGN(sizeof(TwoPhaseFileHeader));
+
+	parsed->origin_lsn = hdr->origin_lsn;
+	parsed->origin_timestamp = hdr->origin_timestamp;
+	parsed->twophase_xid = hdr->xid;
+	parsed->dbId = hdr->database;
+	parsed->nsubxacts = hdr->nsubxacts;
+	parsed->ncommitrels = hdr->ncommitrels;
+	parsed->nabortrels = hdr->nabortrels;
+	parsed->nmsgs = hdr->ninvalmsgs;
+
+	strncpy(parsed->twophase_gid, bufptr, hdr->gidlen);
+	bufptr += MAXALIGN(hdr->gidlen);
+
+	parsed->subxacts = (TransactionId *) bufptr;
+	bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
+
+	parsed->commitrels = (RelFileNode *) bufptr;
+	bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
+
+	parsed->abortrels = (RelFileNode *) bufptr;
+	bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
+
+	parsed->msgs = (SharedInvalidationMessage *) bufptr;
+	bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
+}
+
 
 /*
  * Reads 2PC data from xlog. During checkpoint this data will be moved to
@@ -1392,11 +1458,12 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
 										hdr->nsubxacts, children,
 										hdr->ncommitrels, commitrels,
 										hdr->ninvalmsgs, invalmsgs,
-										hdr->initfileinval);
+										hdr->initfileinval, gid);
 	else
 		RecordTransactionAbortPrepared(xid,
 									   hdr->nsubxacts, children,
-									   hdr->nabortrels, abortrels);
+									   hdr->nabortrels, abortrels,
+									   gid);
 
 	ProcArrayRemove(proc, latestXid);
 
@@ -2055,7 +2122,8 @@ RecordTransactionCommitPrepared(TransactionId xid,
 								RelFileNode *rels,
 								int ninvalmsgs,
 								SharedInvalidationMessage *invalmsgs,
-								bool initfileinval)
+								bool initfileinval,
+								const char *gid)
 {
 	XLogRecPtr	recptr;
 	TimestampTz committs = GetCurrentTimestamp();
@@ -2082,7 +2150,7 @@ RecordTransactionCommitPrepared(TransactionId xid,
 								 ninvalmsgs, invalmsgs,
 								 initfileinval, false,
 						 MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
-								 xid);
+								 xid, gid);
 
 
 	if (replorigin)
@@ -2144,7 +2212,8 @@ RecordTransactionAbortPrepared(TransactionId xid,
 							   int nchildren,
 							   TransactionId *children,
 							   int nrels,
-							   RelFileNode *rels)
+							   RelFileNode *rels,
+							   const char *gid)
 {
 	XLogRecPtr	recptr;
 
@@ -2166,7 +2235,7 @@ RecordTransactionAbortPrepared(TransactionId xid,
 								nchildren, children,
 								nrels, rels,
 						 MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
-								xid);
+								xid, gid);
 
 	/* Always flush, since we're about to remove the 2PC state file */
 	XLogFlush(recptr);
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index c8751c6..9e407d5 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -1233,7 +1233,7 @@ RecordTransactionCommit(void)
 							nmsgs, invalMessages,
 							RelcacheInitFileInval, forceSyncCommit,
 							MyXactFlags,
-							InvalidTransactionId /* plain commit */ );
+							InvalidTransactionId, NULL /* plain commit */ );
 
 		if (replorigin)
 			/* Move LSNs forward for this replication origin */
@@ -1585,7 +1585,8 @@ RecordTransactionAbort(bool isSubXact)
 	XactLogAbortRecord(xact_time,
 					   nchildren, children,
 					   nrels, rels,
-					   MyXactFlags, InvalidTransactionId);
+					   MyXactFlags, InvalidTransactionId,
+					   NULL);
 
 	/*
 	 * Report the latest async abort LSN, so that the WAL writer knows to
@@ -3471,7 +3472,7 @@ BeginTransactionBlock(void)
  * resource owner, etc while executing inside a Portal.
  */
 bool
-PrepareTransactionBlock(char *gid)
+PrepareTransactionBlock(const char *gid)
 {
 	TransactionState s;
 	bool		result;
@@ -5110,7 +5111,8 @@ XactLogCommitRecord(TimestampTz commit_time,
 					int nrels, RelFileNode *rels,
 					int nmsgs, SharedInvalidationMessage *msgs,
 					bool relcacheInval, bool forceSync,
-					int xactflags, TransactionId twophase_xid)
+					int xactflags, TransactionId twophase_xid,
+					const char *twophase_gid)
 {
 	xl_xact_commit xlrec;
 	xl_xact_xinfo xl_xinfo;
@@ -5122,6 +5124,7 @@ XactLogCommitRecord(TimestampTz commit_time,
 	xl_xact_origin xl_origin;
 
 	uint8		info;
+	int			gidlen = 0;
 
 	Assert(CritSectionCount > 0);
 
@@ -5184,6 +5187,13 @@ XactLogCommitRecord(TimestampTz commit_time,
 	{
 		xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE;
 		xl_twophase.xid = twophase_xid;
+		Assert(twophase_gid != NULL);
+
+		if (XLogLogicalInfoActive())
+		{
+			xl_xinfo.xinfo |= XACT_XINFO_HAS_GID;
+			gidlen = strlen(twophase_gid) + 1; /* include '\0' */
+		}
 	}
 
 	/* dump transaction origin information */
@@ -5234,8 +5244,13 @@ XactLogCommitRecord(TimestampTz commit_time,
 	}
 
 	if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE)
+	{
 		XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase));
 
+		if (xl_xinfo.xinfo & XACT_XINFO_HAS_GID)
+			XLogRegisterData((char *) twophase_gid, gidlen);
+	}
+
 	if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN)
 		XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin));
 
@@ -5255,15 +5270,19 @@ XLogRecPtr
 XactLogAbortRecord(TimestampTz abort_time,
 				   int nsubxacts, TransactionId *subxacts,
 				   int nrels, RelFileNode *rels,
-				   int xactflags, TransactionId twophase_xid)
+				   int xactflags, TransactionId twophase_xid,
+				   const char *twophase_gid)
 {
 	xl_xact_abort xlrec;
 	xl_xact_xinfo xl_xinfo;
 	xl_xact_subxacts xl_subxacts;
 	xl_xact_relfilenodes xl_relfilenodes;
 	xl_xact_twophase xl_twophase;
+	xl_xact_dbinfo xl_dbinfo;
+	xl_xact_origin xl_origin;
 
 	uint8		info;
+	int			gidlen = 0;
 
 	Assert(CritSectionCount > 0);
 
@@ -5275,7 +5294,6 @@ XactLogAbortRecord(TimestampTz abort_time,
 	else
 		info = XLOG_XACT_ABORT_PREPARED;
 
-
 	/* First figure out and collect all the information needed */
 
 	xlrec.xact_time = abort_time;
@@ -5299,6 +5317,31 @@ XactLogAbortRecord(TimestampTz abort_time,
 	{
 		xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE;
 		xl_twophase.xid = twophase_xid;
+		Assert(twophase_gid != NULL);
+
+		if (XLogLogicalInfoActive())
+		{
+			xl_xinfo.xinfo |= XACT_XINFO_HAS_GID;
+			gidlen = strlen(twophase_gid) + 1; /* include '\0' */
+		}
+	}
+
+	if (TransactionIdIsValid(twophase_xid) && XLogLogicalInfoActive())
+	{
+		xl_xinfo.xinfo |= XACT_XINFO_HAS_DBINFO;
+		xl_dbinfo.dbId = MyDatabaseId;
+		xl_dbinfo.tsId = MyDatabaseTableSpace;
+	}
+
+	/* dump transaction origin information only for abort prepared */
+	if ( (replorigin_session_origin != InvalidRepOriginId) &&
+			TransactionIdIsValid(twophase_xid) &&
+			XLogLogicalInfoActive())
+	{
+		xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN;
+
+		xl_origin.origin_lsn = replorigin_session_origin_lsn;
+		xl_origin.origin_timestamp = replorigin_session_origin_timestamp;
 	}
 
 	if (xl_xinfo.xinfo != 0)
@@ -5313,6 +5356,9 @@ XactLogAbortRecord(TimestampTz abort_time,
 	if (xl_xinfo.xinfo != 0)
 		XLogRegisterData((char *) (&xl_xinfo), sizeof(xl_xinfo));
 
+	if (xl_xinfo.xinfo & XACT_XINFO_HAS_DBINFO)
+		XLogRegisterData((char *) (&xl_dbinfo), sizeof(xl_dbinfo));
+
 	if (xl_xinfo.xinfo & XACT_XINFO_HAS_SUBXACTS)
 	{
 		XLogRegisterData((char *) (&xl_subxacts),
@@ -5330,8 +5376,22 @@ XactLogAbortRecord(TimestampTz abort_time,
 	}
 
 	if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE)
+	{
 		XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase));
 
+		if (xl_xinfo.xinfo & XACT_XINFO_HAS_GID)
+			XLogRegisterData((char *) twophase_gid, gidlen);
+	}
+
+	if (xl_xinfo.xinfo & XACT_XINFO_HAS_DBINFO)
+		XLogRegisterData((char *) (&xl_dbinfo), sizeof(xl_dbinfo));
+
+	if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN)
+		XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin));
+
+	if (TransactionIdIsValid(twophase_xid))
+		XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
+
 	return XLogInsert(RM_XACT_ID, info);
 }
 
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 5c13d26..b1e39c55 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -34,6 +34,7 @@
 #include "access/xlogutils.h"
 #include "access/xlogreader.h"
 #include "access/xlogrecord.h"
+#include "access/twophase.h"
 
 #include "catalog/pg_control.h"
 
@@ -71,7 +72,9 @@ static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf
 static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 			 xl_xact_parsed_commit *parsed, TransactionId xid);
 static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
-			xl_xact_parsed_abort *parsed, TransactionId xid);
+			 xl_xact_parsed_abort *parsed, TransactionId xid);
+static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
+			 xl_xact_parsed_prepare *parsed);
 
 /* common function to decode tuples */
 static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
@@ -277,17 +280,33 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 				break;
 			}
 		case XLOG_XACT_PREPARE:
+			{
+				xl_xact_parsed_prepare parsed;
 
-			/*
-			 * Currently decoding ignores PREPARE TRANSACTION and will just
-			 * decode the transaction when the COMMIT PREPARED is sent or
-			 * throw away the transaction's contents when a ROLLBACK PREPARED
-			 * is received. In the future we could add code to expose prepared
-			 * transactions in the changestream allowing for a kind of
-			 * distributed 2PC.
-			 */
-			ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr);
-			break;
+				/* check that output plugin capable of twophase decoding */
+				if (!ctx->twophase_hadling)
+				{
+					ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr);
+					break;
+				}
+
+				/* ok, parse it */
+				ParsePrepareRecord(XLogRecGetInfo(buf->record),
+										XLogRecGetData(buf->record), &parsed);
+
+				/* does output plugin wants this particular transaction? */
+				if (ctx->callbacks.filter_prepare_cb &&
+					ReorderBufferPrepareNeedSkip(reorder, parsed.twophase_xid,
+													 parsed.twophase_gid))
+				{
+					ReorderBufferProcessXid(reorder, parsed.twophase_xid,
+												buf->origptr);
+					break;
+				}
+
+				DecodePrepare(ctx, buf, &parsed);
+				break;
+			}
 		default:
 			elog(ERROR, "unexpected RM_XACT_ID record type: %u", info);
 	}
@@ -551,8 +570,13 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 	 * Process invalidation messages, even if we're not interested in the
 	 * transaction's contents, since the various caches need to always be
 	 * consistent.
+	 *
+	 * Also if that transaction was sent to prepare callback then both
+	 * this function were called during prepare.
 	 */
-	if (parsed->nmsgs > 0)
+	if (parsed->nmsgs > 0 &&
+			!(TransactionIdIsValid(parsed->twophase_xid) &&
+				ReorderBufferTxnIsPrepared(ctx->reorder, xid)))
 	{
 		ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr,
 									  parsed->nmsgs, parsed->msgs);
@@ -607,9 +631,81 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 								 buf->origptr, buf->endptr);
 	}
 
+	if (TransactionIdIsValid(parsed->twophase_xid) &&
+								ReorderBufferTxnIsPrepared(ctx->reorder, xid))
+	{
+		/*
+		 * We are processing COMMIT PREPARED and know that reorder buffer is
+		 * empty. So we can skip use shortcut for coomiting bare xact.
+		 */
+		ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
+					commit_time, origin_id, origin_lsn, parsed->twophase_gid, true);
+	}
+	else
+	{
+		/* replay actions of all transaction + subtransactions in order */
+		ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
+							commit_time, origin_id, origin_lsn);
+	}
+}
+
+
+/*
+ * Decode PREPARE record. Same logic as in COMMIT, but diffent calls
+ * to SnapshotBuilder as we need to mark this transaction as commited
+ * instead of running to properly decode it. When prepared transation
+ * is decoded we mark it in snapshot as running again.
+ */
+static void
+DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
+			 xl_xact_parsed_prepare *parsed)
+{
+	XLogRecPtr	origin_lsn = parsed->origin_lsn;
+	TimestampTz	commit_time = parsed->origin_timestamp;
+	XLogRecPtr	origin_id = XLogRecGetOrigin(buf->record);
+	int			i;
+	TransactionId xid = parsed->twophase_xid;
+
+	/*
+	 * Process invalidation messages, even if we're not interested in the
+	 * transaction's contents, since the various caches need to always be
+	 * consistent.
+	 */
+	if (parsed->nmsgs > 0)
+	{
+		ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr,
+									  parsed->nmsgs, parsed->msgs);
+		ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr);
+	}
+
+	SnapBuildPrepareTxnStart(ctx->snapshot_builder, buf->origptr, xid,
+					   parsed->nsubxacts, parsed->subxacts);
+
+	if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
+		(parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) ||
+		FilterByOrigin(ctx, origin_id))
+	{
+		for (i = 0; i < parsed->nsubxacts; i++)
+		{
+			ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr);
+		}
+		ReorderBufferForget(ctx->reorder, xid, buf->origptr);
+
+		return;
+	}
+
+	/* tell the reorderbuffer about the surviving subtransactions */
+	for (i = 0; i < parsed->nsubxacts; i++)
+	{
+		ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
+								 buf->origptr, buf->endptr);
+	}
+
 	/* replay actions of all transaction + subtransactions in order */
-	ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
-						commit_time, origin_id, origin_lsn);
+	ReorderBufferPrepare(ctx->reorder, xid, buf->origptr, buf->endptr,
+						commit_time, origin_id, origin_lsn, parsed->twophase_gid);
+
+	SnapBuildPrepareTxnFinish(ctx->snapshot_builder, xid);
 }
 
 /*
@@ -621,6 +717,30 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 			xl_xact_parsed_abort *parsed, TransactionId xid)
 {
 	int			i;
+	XLogRecPtr	origin_lsn = InvalidXLogRecPtr;
+	TimestampTz	commit_time = 0;
+	XLogRecPtr	origin_id = XLogRecGetOrigin(buf->record);
+
+	if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
+	{
+		origin_lsn = parsed->origin_lsn;
+		commit_time = parsed->origin_timestamp;
+	}
+
+	/*
+	 * If that is ROLLBACK PREPARED than send that to callbacks.
+	 */
+	if (TransactionIdIsValid(xid) &&
+			!SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) &&
+			parsed->dbId == ctx->slot->data.database &&
+			!FilterByOrigin(ctx, origin_id) &&
+			ReorderBufferTxnIsPrepared(ctx->reorder, xid))
+	{
+		ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
+						commit_time, origin_id, origin_lsn,
+						parsed->twophase_gid, false);
+		return;
+	}
 
 	SnapBuildAbortTxn(ctx->snapshot_builder, buf->record->EndRecPtr, xid,
 					  parsed->nsubxacts, parsed->subxacts);
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 5529ac8..9a66194 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -58,6 +58,14 @@ static void startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions
 				   bool is_init);
 static void shutdown_cb_wrapper(LogicalDecodingContext *ctx);
 static void begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn);
+static bool filter_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+				  char *gid);
+static void prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+				  XLogRecPtr prepare_lsn);
+static void commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+				  XLogRecPtr commit_lsn);
+static void abort_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+				  XLogRecPtr abort_lsn);
 static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 				  XLogRecPtr commit_lsn);
 static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
@@ -122,6 +130,7 @@ StartupDecodingContext(List *output_plugin_options,
 	MemoryContext context,
 				old_context;
 	LogicalDecodingContext *ctx;
+	int			twophase_callbacks;
 
 	/* shorter lines... */
 	slot = MyReplicationSlot;
@@ -179,8 +188,25 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->reorder->begin = begin_cb_wrapper;
 	ctx->reorder->apply_change = change_cb_wrapper;
 	ctx->reorder->commit = commit_cb_wrapper;
+	ctx->reorder->filter_prepare = filter_prepare_cb_wrapper;
+	ctx->reorder->prepare = prepare_cb_wrapper;
+	ctx->reorder->commit_prepared = commit_prepared_cb_wrapper;
+	ctx->reorder->abort_prepared = abort_prepared_cb_wrapper;
 	ctx->reorder->message = message_cb_wrapper;
 
+	/* check that plugin implements all necessary callbacks to perform 2PC */
+	twophase_callbacks = (ctx->callbacks.prepare_cb != NULL) +
+		(ctx->callbacks.commit_prepared_cb != NULL) +
+		(ctx->callbacks.abort_prepared_cb != NULL);
+
+	ctx->twophase_hadling = (twophase_callbacks == 3);
+
+	if (twophase_callbacks != 3 && twophase_callbacks != 0)
+		ereport(WARNING,
+			(errmsg("Output plugin registered only %d twophase callbacks out of 3. "
+					"Twophase transactions will be decoded as ordinary ones.",
+					twophase_callbacks)));
+
 	ctx->out = makeStringInfo();
 	ctx->prepare_write = prepare_write;
 	ctx->write = do_write;
@@ -650,6 +676,93 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 }
 
 static void
+prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+				  XLogRecPtr prepare_lsn)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "prepare";
+	state.report_location = txn->final_lsn;		/* beginning of commit record */
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = true;
+	ctx->write_xid = txn->xid;
+	ctx->write_location = txn->end_lsn; /* points to the end of the record */
+
+	/* do the actual work: call callback */
+	ctx->callbacks.prepare_cb(ctx, txn, prepare_lsn);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
+static void
+commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+				  XLogRecPtr commit_lsn)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "commit_prepared";
+	state.report_location = txn->final_lsn;		/* beginning of commit record */
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = true;
+	ctx->write_xid = txn->xid;
+	ctx->write_location = txn->end_lsn; /* points to the end of the record */
+
+	/* do the actual work: call callback */
+	ctx->callbacks.commit_prepared_cb(ctx, txn, commit_lsn);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
+static void
+abort_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+				  XLogRecPtr abort_lsn)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "abort_prepared";
+	state.report_location = txn->final_lsn;		/* beginning of commit record */
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = true;
+	ctx->write_xid = txn->xid;
+	ctx->write_location = txn->end_lsn; /* points to the end of the record */
+
+	/* do the actual work: call callback */
+	ctx->callbacks.abort_prepared_cb(ctx, txn, abort_lsn);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
+static void
 change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 				  Relation relation, ReorderBufferChange *change)
 {
@@ -684,6 +797,34 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	error_context_stack = errcallback.previous;
 }
 
+static bool
+filter_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, char *gid)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+	bool		ret;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "filter_prepare";
+	state.report_location = InvalidXLogRecPtr;
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = false;
+
+	/* do the actual work: call callback */
+	ret = ctx->callbacks.filter_prepare_cb(ctx, txn, gid);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+	return ret;
+}
+
 bool
 filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
 {
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index b437799..f633523 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -1308,21 +1308,18 @@ ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
  * the top and subtransactions (using a k-way merge) and replay the changes in
  * lsn order.
  */
-void
-ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
+static void
+ReorderBufferCommitInternal(ReorderBufferTXN *txn,
+					ReorderBuffer *rb, TransactionId xid,
 					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
 					TimestampTz commit_time,
 					RepOriginId origin_id, XLogRecPtr origin_lsn)
 {
-	ReorderBufferTXN *txn;
 	volatile Snapshot snapshot_now;
 	volatile CommandId command_id = FirstCommandId;
 	bool		using_subtxn;
 	ReorderBufferIterTXNState *volatile iterstate = NULL;
 
-	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
-								false);
-
 	/* unknown transaction, nothing to replay */
 	if (txn == NULL)
 		return;
@@ -1605,8 +1602,11 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 		ReorderBufferIterTXNFinish(rb, iterstate);
 		iterstate = NULL;
 
-		/* call commit callback */
-		rb->commit(rb, txn, commit_lsn);
+		/* call commit or prepare callback */
+		if (txn->prepared)
+			rb->prepare(rb, txn, commit_lsn);
+		else
+			rb->commit(rb, txn, commit_lsn);
 
 		/* this is just a sanity check against bad output plugin behaviour */
 		if (GetCurrentTransactionIdIfAny() != InvalidTransactionId)
@@ -1633,8 +1633,12 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 		if (snapshot_now->copied)
 			ReorderBufferFreeSnap(rb, snapshot_now);
 
-		/* remove potential on-disk data, and deallocate */
-		ReorderBufferCleanupTXN(rb, txn);
+		/*
+		 * remove potential on-disk data, and deallocate or postpone that
+		 * till the finish of two-phase tx
+		 */
+		if (!txn->prepared)
+			ReorderBufferCleanupTXN(rb, txn);
 	}
 	PG_CATCH();
 	{
@@ -1668,6 +1672,111 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 }
 
 /*
+ * Ask output plugin whether we want to skip this PREPARE and send
+ * this transaction as one-phase later on commit.
+ */
+bool
+ReorderBufferPrepareNeedSkip(ReorderBuffer *rb, TransactionId xid, char *gid)
+{
+	ReorderBufferTXN *txn;
+
+	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
+
+	return rb->filter_prepare(rb, txn, gid);
+}
+
+
+/*
+ * Commit non-twophase transaction. See comments to ReorderBufferCommitInternal()
+ */
+void
+ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
+					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+					TimestampTz commit_time,
+					RepOriginId origin_id, XLogRecPtr origin_lsn)
+{
+	ReorderBufferTXN *txn;
+
+	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
+								false);
+
+	ReorderBufferCommitInternal(txn, rb, xid, commit_lsn, end_lsn,
+										commit_time, origin_id, origin_lsn);
+}
+
+/*
+ * Prepare twophase transaction. It calls ReorderBufferCommitInternal()
+ * since all transaction changes should be decoded on PREPARE.
+ */
+void
+ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
+					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+					TimestampTz commit_time,
+					RepOriginId origin_id, XLogRecPtr origin_lsn,
+					char *gid)
+{
+	ReorderBufferTXN *txn;
+
+	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
+								false);
+
+	txn->prepared = true;
+	strcpy(txn->gid, gid);
+
+	ReorderBufferCommitInternal(txn, rb, xid, commit_lsn, end_lsn,
+										commit_time, origin_id, origin_lsn);
+}
+
+/*
+ * Check whether this transaction was sent as prepared to receiver.
+ * Called upon commit/abort prepared.
+ */
+bool
+ReorderBufferTxnIsPrepared(ReorderBuffer *rb, TransactionId xid)
+{
+	ReorderBufferTXN *txn;
+
+	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
+								false);
+
+	/*
+	 * If txn == NULL then presumably subscriber confirmed prepare
+	 * but we are rebooted.
+	 */
+	return txn == NULL ? true : txn->prepared;
+}
+
+/*
+ * Send standalone xact event. This is used to handle COMMIT/ABORT PREPARED.
+ */
+void
+ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
+					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+					TimestampTz commit_time,
+					RepOriginId origin_id, XLogRecPtr origin_lsn,
+					char *gid, bool is_commit)
+{
+	ReorderBufferTXN *txn;
+
+	txn = ReorderBufferTXNByXid(rb, xid, true, NULL, commit_lsn,
+								true);
+
+	txn->final_lsn = commit_lsn;
+	txn->end_lsn = end_lsn;
+	txn->commit_time = commit_time;
+	txn->origin_id = origin_id;
+	txn->origin_lsn = origin_lsn;
+	strcpy(txn->gid, gid);
+
+	if (is_commit)
+		rb->commit_prepared(rb, txn, commit_lsn);
+	else
+		rb->abort_prepared(rb, txn, commit_lsn);
+
+	ReorderBufferCleanupTXN(rb, txn);
+}
+
+/*
  * Abort a transaction that possibly has previous changes. Needs to be first
  * called for subtransactions and then for the toplevel xid.
  *
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 2279604..c1ca998 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -885,7 +885,7 @@ SnapBuildPurgeCommittedTxn(SnapBuild *builder)
 	/* copy xids that still are interesting to workspace */
 	for (off = 0; off < builder->committed.xcnt; off++)
 	{
-		if (NormalTransactionIdPrecedes(builder->committed.xip[off],
+		if (TransactionIdPrecedes(builder->committed.xip[off],
 										builder->xmin))
 			;					/* remove */
 		else
@@ -1118,6 +1118,52 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
 	}
 }
 
+/*
+ * Just a wrapper to clarify DecodePrepare().
+ * Right now we can't extract correct historic catalog data that
+ * was produced by aborted prepared transaction, so it work of
+ * decoding plugin to avoid such situation and here we just construct usual
+ * snapshot to able to decode prepare.
+ */
+void
+SnapBuildPrepareTxnStart(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
+				   int nsubxacts, TransactionId *subxacts)
+{
+	SnapBuildCommitTxn(builder, lsn, xid, nsubxacts, subxacts);
+}
+
+
+/*
+ * When decoding of preppare is finished we want should exclude our xid
+ * from list of committed xids to have correct snapshot between prepare
+ * and commit.
+ *
+ * However, this is not sctrictly needed. Prepared transaction holds locks
+ * between prepare and commit so nodody can produce new version of our
+ * catalog tuples. In case of abort we will have this xid in array of
+ * commited xids, but it also will not cause a problem since checks of
+ * HeapTupleHeaderXminInvalid() in HeapTupleSatisfiesHistoricMVCC()
+ * have higher priority then checks for xip array. Anyway let's be consistent
+ * about definitions and delete this xid from xip array.
+ */
+void
+SnapBuildPrepareTxnFinish(SnapBuild *builder, TransactionId xid)
+{
+	TransactionId *search = bsearch(&xid, builder->running.xip,
+				builder->running.xcnt, sizeof(TransactionId), xidComparator);
+
+	if (search == NULL)
+		return;
+
+	/* delete that xid */
+	memmove(search, search + 1,
+			((builder->running.xip + builder->running.xcnt - 1) - search) * sizeof(TransactionId));
+	builder->running.xcnt--;
+
+	/* update min/max */
+	builder->running.xmin = builder->running.xip[0];
+	builder->running.xmax = builder->running.xip[builder->running.xcnt - 1];
+}
 
 /* -----------------------------------
  * Snapshot building functions dealing with xlog records
diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h
index b2b7848..6c0445a 100644
--- a/src/include/access/twophase.h
+++ b/src/include/access/twophase.h
@@ -15,6 +15,7 @@
 #define TWOPHASE_H
 
 #include "access/xlogdefs.h"
+#include "access/xact.h"
 #include "datatype/timestamp.h"
 #include "storage/lock.h"
 
@@ -46,6 +47,8 @@ extern bool StandbyTransactionIdIsPrepared(TransactionId xid);
 
 extern TransactionId PrescanPreparedTransactions(TransactionId **xids_p,
 							int *nxids_p);
+extern void ParsePrepareRecord(uint8 info, char *xlrec,
+							xl_xact_parsed_prepare *parsed);
 extern void StandbyRecoverPreparedTransactions(bool overwriteOK);
 extern void RecoverPreparedTransactions(void);
 
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index 5b37c05..e8bf39b 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -21,6 +21,10 @@
 #include "storage/sinval.h"
 #include "utils/datetime.h"
 
+/*
+ * Maximum size of Global Transaction ID (including '\0').
+ */
+#define GIDSIZE 200
 
 /*
  * Xact isolation levels
@@ -157,6 +161,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
 #define XACT_XINFO_HAS_TWOPHASE			(1U << 4)
 #define XACT_XINFO_HAS_ORIGIN			(1U << 5)
 #define XACT_XINFO_HAS_AE_LOCKS			(1U << 6)
+#define XACT_XINFO_HAS_GID				(1U << 7)
 
 /*
  * Also stored in xinfo, these indicating a variety of additional actions that
@@ -303,13 +308,40 @@ typedef struct xl_xact_parsed_commit
 	SharedInvalidationMessage *msgs;
 
 	TransactionId twophase_xid; /* only for 2PC */
+	char 		twophase_gid[GIDSIZE];
 
 	XLogRecPtr	origin_lsn;
 	TimestampTz origin_timestamp;
 } xl_xact_parsed_commit;
 
+typedef struct xl_xact_parsed_prepare
+{
+	Oid			dbId;			/* MyDatabaseId */
+
+	int			nsubxacts;
+	TransactionId *subxacts;
+
+	int			ncommitrels;
+	RelFileNode *commitrels;
+
+	int			nabortrels;
+	RelFileNode *abortrels;
+
+	int			nmsgs;
+	SharedInvalidationMessage *msgs;
+
+	TransactionId twophase_xid;
+	char 		twophase_gid[GIDSIZE];
+
+	XLogRecPtr	origin_lsn;
+	TimestampTz origin_timestamp;
+} xl_xact_parsed_prepare;
+
 typedef struct xl_xact_parsed_abort
 {
+	Oid			dbId;
+	Oid			tsId;
+
 	TimestampTz xact_time;
 	uint32		xinfo;
 
@@ -320,6 +352,10 @@ typedef struct xl_xact_parsed_abort
 	RelFileNode *xnodes;
 
 	TransactionId twophase_xid; /* only for 2PC */
+	char 		twophase_gid[GIDSIZE];
+
+	XLogRecPtr	origin_lsn;
+	TimestampTz origin_timestamp;
 } xl_xact_parsed_abort;
 
 
@@ -351,7 +387,7 @@ extern void CommitTransactionCommand(void);
 extern void AbortCurrentTransaction(void);
 extern void BeginTransactionBlock(void);
 extern bool EndTransactionBlock(void);
-extern bool PrepareTransactionBlock(char *gid);
+extern bool PrepareTransactionBlock(const char *gid);
 extern void UserAbortTransactionBlock(void);
 extern void ReleaseSavepoint(List *options);
 extern void DefineSavepoint(char *name);
@@ -385,12 +421,14 @@ extern XLogRecPtr XactLogCommitRecord(TimestampTz commit_time,
 					int nmsgs, SharedInvalidationMessage *msgs,
 					bool relcacheInval, bool forceSync,
 					int xactflags,
-					TransactionId twophase_xid);
+					TransactionId twophase_xid, const char *twophase_gid);
 
 extern XLogRecPtr XactLogAbortRecord(TimestampTz abort_time,
 				   int nsubxacts, TransactionId *subxacts,
 				   int nrels, RelFileNode *rels,
-				   int xactflags, TransactionId twophase_xid);
+				   int xactflags, TransactionId twophase_xid,
+				   const char *twophase_gid);
+
 extern void xact_redo(XLogReaderState *record);
 
 /* xactdesc.c */
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 7d6c88e..7352b07 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -75,6 +75,11 @@ typedef struct LogicalDecodingContext
 	bool		prepared_write;
 	XLogRecPtr	write_location;
 	TransactionId write_xid;
+
+	/*
+	 * Capabilities of decoding plugin used.
+	 */
+	bool		twophase_hadling;
 } LogicalDecodingContext;
 
 
@@ -109,5 +114,4 @@ extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
 extern void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
 
 extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
-
 #endif
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 08e962d..be32774 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -68,6 +68,38 @@ typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
 												   XLogRecPtr commit_lsn);
 
 /*
+ * Called before decoding of PREPARE record to decide whether this
+ * transaction should be decoded with separate calls to prepare
+ * and commit_prepared/abort_prepared callbacks or wait till COMMIT PREPARED
+ * and sent as usual transaction.
+ */
+typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
+													  ReorderBufferTXN *txn,
+													  char *gid);
+
+/*
+ * Called for PREPARE record unless it was filtered by filter_prepare()
+ * callback.
+ */
+typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
+												   ReorderBufferTXN *txn,
+												   XLogRecPtr prepare_lsn);
+
+/*
+ * Called for COMMIT PREPARED.
+ */
+typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx,
+												   ReorderBufferTXN *txn,
+												   XLogRecPtr commit_lsn);
+
+/*
+ * Called for ROLLBACK PREPARED.
+ */
+typedef void (*LogicalDecodeAbortPreparedCB) (struct LogicalDecodingContext *ctx,
+												   ReorderBufferTXN *txn,
+												   XLogRecPtr abort_lsn);
+
+/*
  * Called for the generic logical decoding messages.
  */
 typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
@@ -98,6 +130,10 @@ typedef struct OutputPluginCallbacks
 	LogicalDecodeBeginCB begin_cb;
 	LogicalDecodeChangeCB change_cb;
 	LogicalDecodeCommitCB commit_cb;
+	LogicalDecodeFilterPrepareCB filter_prepare_cb;
+	LogicalDecodePrepareCB prepare_cb;
+	LogicalDecodeCommitPreparedCB commit_prepared_cb;
+	LogicalDecodeAbortPreparedCB abort_prepared_cb;
 	LogicalDecodeMessageCB message_cb;
 	LogicalDecodeFilterByOriginCB filter_by_origin_cb;
 	LogicalDecodeShutdownCB shutdown_cb;
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 17e47b3..99aa17f 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -10,6 +10,7 @@
 #define REORDERBUFFER_H
 
 #include "access/htup_details.h"
+#include "access/twophase.h"
 #include "lib/ilist.h"
 #include "storage/sinval.h"
 #include "utils/hsearch.h"
@@ -144,6 +145,16 @@ typedef struct ReorderBufferTXN
 	 */
 	TransactionId xid;
 
+	/* In case of 2PC we need to pass GID to output plugin */
+	char		gid[GIDSIZE];
+
+	/*
+	 * By using filter_prepare() callback we can force decoding to treat
+	 * two-phase transaction as on ordinary one. This flag is set if we are
+	 * actually called prepape() callback in output plugin.
+	 */
+	bool		prepared;
+
 	/* did the TX have catalog changes */
 	bool		has_catalog_changes;
 
@@ -283,6 +294,29 @@ typedef void (*ReorderBufferCommitCB) (
 												   ReorderBufferTXN *txn,
 												   XLogRecPtr commit_lsn);
 
+typedef bool (*ReorderBufferFilterPrepareCB) (
+												   ReorderBuffer *rb,
+												   ReorderBufferTXN *txn,
+												   char *gid);
+
+/* prepare callback signature */
+typedef void (*ReorderBufferPrepareCB) (
+												   ReorderBuffer *rb,
+												   ReorderBufferTXN *txn,
+												   XLogRecPtr prepare_lsn);
+
+/* commit prepared callback signature */
+typedef void (*ReorderBufferCommitPreparedCB) (
+												   ReorderBuffer *rb,
+												   ReorderBufferTXN *txn,
+												   XLogRecPtr commit_lsn);
+
+/* abort prepared callback signature */
+typedef void (*ReorderBufferAbortPreparedCB) (
+												   ReorderBuffer *rb,
+												   ReorderBufferTXN *txn,
+												   XLogRecPtr abort_lsn);
+
 /* message callback signature */
 typedef void (*ReorderBufferMessageCB) (
 													ReorderBuffer *rb,
@@ -318,6 +352,10 @@ struct ReorderBuffer
 	ReorderBufferBeginCB begin;
 	ReorderBufferApplyChangeCB apply_change;
 	ReorderBufferCommitCB commit;
+	ReorderBufferFilterPrepareCB filter_prepare;
+	ReorderBufferPrepareCB prepare;
+	ReorderBufferCommitPreparedCB commit_prepared;
+	ReorderBufferAbortPreparedCB abort_prepared;
 	ReorderBufferMessageCB message;
 
 	/*
@@ -373,6 +411,11 @@ void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot
 void ReorderBufferCommit(ReorderBuffer *, TransactionId,
 					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
 	  TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
+void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
+					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+					TimestampTz commit_time,
+					RepOriginId origin_id, XLogRecPtr origin_lsn,
+					char *gid, bool is_commit);
 void		ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn);
 void ReorderBufferCommitChild(ReorderBuffer *, TransactionId, TransactionId,
 						 XLogRecPtr commit_lsn, XLogRecPtr end_lsn);
@@ -396,6 +439,13 @@ void		ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLog
 bool		ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid);
 bool		ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid);
 
+bool		ReorderBufferPrepareNeedSkip(ReorderBuffer *rb, TransactionId xid, char *gid);
+bool		ReorderBufferTxnIsPrepared(ReorderBuffer *rb, TransactionId xid);
+void		ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
+					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+					TimestampTz commit_time,
+					RepOriginId origin_id, XLogRecPtr origin_lsn,
+					char *gid);
 ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *);
 
 void		ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr);
diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h
index a8ae631..400ffe1 100644
--- a/src/include/replication/snapbuild.h
+++ b/src/include/replication/snapbuild.h
@@ -72,6 +72,10 @@ extern bool SnapBuildXactNeedsSkip(SnapBuild *snapstate, XLogRecPtr ptr);
 extern void SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn,
 				   TransactionId xid, int nsubxacts,
 				   TransactionId *subxacts);
+extern void SnapBuildPrepareTxnStart(SnapBuild *builder, XLogRecPtr lsn,
+				   TransactionId xid, int nsubxacts,
+				   TransactionId *subxacts);
+extern void SnapBuildPrepareTxnFinish(SnapBuild *builder, TransactionId xid);
 extern void SnapBuildAbortTxn(SnapBuild *builder, XLogRecPtr lsn,
 				  TransactionId xid, int nsubxacts,
 				  TransactionId *subxacts);
-- 
2.5.5

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to